Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #6328 - avoid binding WebSocket MethodHandles #12181

Merged
merged 5 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@

package org.eclipse.jetty.websocket.core.messages;

import java.lang.invoke.MethodHandle;
import java.util.Objects;

import org.eclipse.jetty.websocket.core.CoreSession;
import org.eclipse.jetty.websocket.core.util.MethodHolder;

/**
* <p>Abstract implementation of {@link MessageSink}.</p>
Expand All @@ -42,21 +42,21 @@
public abstract class AbstractMessageSink implements MessageSink
{
private final CoreSession session;
private final MethodHandle methodHandle;
private final MethodHolder methodHandle;
private final boolean autoDemand;

/**
* Creates a new {@link MessageSink}.
*
* @param session the WebSocket session
* @param methodHandle the application function to invoke
* @param methodHolder the application function to invoke
* @param autoDemand whether this {@link MessageSink} manages demand automatically
* as explained in {@link AbstractMessageSink}
*/
public AbstractMessageSink(CoreSession session, MethodHandle methodHandle, boolean autoDemand)
public AbstractMessageSink(CoreSession session, MethodHolder methodHolder, boolean autoDemand)
{
this.session = Objects.requireNonNull(session, "CoreSession");
this.methodHandle = Objects.requireNonNull(methodHandle, "MethodHandle");
this.methodHandle = Objects.requireNonNull(methodHolder, "MethodHolder");
this.autoDemand = autoDemand;
}

Expand All @@ -73,7 +73,7 @@ public CoreSession getCoreSession()
* Get the application function.
* @return the application function
*/
public MethodHandle getMethodHandle()
public MethodHolder getMethodHolder()
{
return methodHandle;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,15 @@

package org.eclipse.jetty.websocket.core.messages;

import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodType;
import java.nio.ByteBuffer;

import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.CoreSession;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.exception.InvalidSignatureException;
import org.eclipse.jetty.websocket.core.exception.MessageTooLargeException;
import org.eclipse.jetty.websocket.core.util.MethodHolder;

/**
* <p>A {@link MessageSink} implementation that accumulates BINARY frames
Expand All @@ -38,17 +36,12 @@ public class ByteArrayMessageSink extends AbstractMessageSink
* Creates a new {@link ByteArrayMessageSink}.
*
* @param session the WebSocket session
* @param methodHandle the application function to invoke when a new message has been assembled
* @param methodHolder the application function to invoke when a new message has been assembled
* @param autoDemand whether this {@link MessageSink} manages demand automatically
*/
public ByteArrayMessageSink(CoreSession session, MethodHandle methodHandle, boolean autoDemand)
public ByteArrayMessageSink(CoreSession session, MethodHolder methodHolder, boolean autoDemand)
{
super(session, methodHandle, autoDemand);
// This uses the offset length byte array signature not supported by jakarta websocket.
// The jakarta layer instead uses decoders for whole byte array messages instead of this message sink.
MethodType onMessageType = MethodType.methodType(Void.TYPE, byte[].class, int.class, int.class);
if (methodHandle.type().changeReturnType(void.class) != onMessageType.changeReturnType(void.class))
throw InvalidSignatureException.build(onMessageType, methodHandle.type());
super(session, methodHolder, autoDemand);
}

@Override
Expand All @@ -70,7 +63,7 @@ public void accept(Frame frame, Callback callback)
if (frame.isFin() && (accumulator == null || accumulator.isEmpty()))
{
byte[] buf = BufferUtil.toArray(payload);
getMethodHandle().invoke(buf, 0, buf.length);
getMethodHolder().invoke(buf, 0, buf.length);
callback.succeeded();
autoDemand();
return;
Expand All @@ -93,7 +86,7 @@ public void accept(Frame frame, Callback callback)
callback = Callback.NOOP;
int length = accumulator.remaining();
byte[] buf = accumulator.takeByteArray();
getMethodHandle().invoke(buf, 0, length);
getMethodHolder().invoke(buf, 0, length);
autoDemand();
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@

package org.eclipse.jetty.websocket.core.messages;

import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodType;
import java.nio.ByteBuffer;

import org.eclipse.jetty.io.ByteBufferCallbackAccumulator;
Expand All @@ -23,8 +21,8 @@
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.CoreSession;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.exception.InvalidSignatureException;
import org.eclipse.jetty.websocket.core.exception.MessageTooLargeException;
import org.eclipse.jetty.websocket.core.util.MethodHolder;

/**
* <p>A {@link MessageSink} implementation that accumulates BINARY frames
Expand All @@ -39,24 +37,12 @@ public class ByteBufferMessageSink extends AbstractMessageSink
* Creates a new {@link ByteBufferMessageSink}.
*
* @param session the WebSocket session
* @param methodHandle the application function to invoke when a new message has been assembled
* @param methodHolder the application function to invoke when a new message has been assembled
* @param autoDemand whether this {@link MessageSink} manages demand automatically
*/
public ByteBufferMessageSink(CoreSession session, MethodHandle methodHandle, boolean autoDemand)
public ByteBufferMessageSink(CoreSession session, MethodHolder methodHolder, boolean autoDemand)
{
this(session, methodHandle, autoDemand, true);
}

protected ByteBufferMessageSink(CoreSession session, MethodHandle methodHandle, boolean autoDemand, boolean validateSignature)
{
super(session, methodHandle, autoDemand);

if (validateSignature)
{
MethodType onMessageType = MethodType.methodType(Void.TYPE, ByteBuffer.class);
if (methodHandle.type() != onMessageType)
throw InvalidSignatureException.build(onMessageType, methodHandle.type());
}
super(session, methodHolder, autoDemand);
}

@Override
Expand All @@ -76,7 +62,7 @@ public void accept(Frame frame, Callback callback)
// created or used, then we don't need to aggregate.
if (frame.isFin() && (accumulator == null || accumulator.getLength() == 0))
{
invoke(getMethodHandle(), frame.getPayload(), callback);
invoke(getMethodHolder(), frame.getPayload(), callback);
autoDemand();
return;
}
Expand All @@ -99,7 +85,7 @@ public void accept(Frame frame, Callback callback)
ByteBuffer byteBuffer = buffer.getByteBuffer();
accumulator.writeTo(byteBuffer);
callback = Callback.from(buffer::release);
invoke(getMethodHandle(), byteBuffer, callback);
invoke(getMethodHolder(), byteBuffer, callback);
autoDemand();
}
else
Expand All @@ -122,9 +108,9 @@ public void fail(Throwable failure)
accumulator.fail(failure);
}

protected void invoke(MethodHandle methodHandle, ByteBuffer byteBuffer, Callback callback) throws Throwable
protected void invoke(MethodHolder methodHolder, ByteBuffer byteBuffer, Callback callback) throws Throwable
{
methodHandle.invoke(byteBuffer);
methodHolder.invoke(byteBuffer);
callback.succeeded();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import java.io.Closeable;
import java.io.InputStream;
import java.io.Reader;
import java.lang.invoke.MethodHandle;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
Expand All @@ -26,6 +25,7 @@
import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.CoreSession;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.util.MethodHolder;

/**
* <p>A partial implementation of {@link MessageSink} for methods that consume WebSocket
Expand All @@ -51,9 +51,9 @@ public abstract class DispatchedMessageSink extends AbstractMessageSink
private volatile CompletableFuture<Void> dispatchComplete;
private MessageSink typeSink;

public DispatchedMessageSink(CoreSession session, MethodHandle methodHandle, boolean autoDemand)
public DispatchedMessageSink(CoreSession session, MethodHolder methodHolder, boolean autoDemand)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to provide all the old APIs deprecated?

Suggested change
public DispatchedMessageSink(CoreSession session, MethodHolder methodHolder, boolean autoDemand)
@Deprecated
public DispatchedMessageSink(CoreSession session, MethodHandle methodHandle, boolean autoDemand)
{
this(session, MethodHolder.from(methodHandle), autoDemand);
}
public DispatchedMessageSink(CoreSession session, MethodHolder methodHolder, boolean autoDemand)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, this is websocket-core, and this is really not supposed to be used by the application more of an implementation class.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah but it is public and not internal.... @sbordet thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this probably should be internal.

Its to reduce having code duplication from having this in our jakarta and jetty websocket implementations. But making it internal makes things difficult with JPMS.

But there is no API to actually use a MessageSink directly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, leave out and we can add back deprecated if anybody complains.... although it is against our new deprecated policy :)

{
super(session, methodHandle, autoDemand);
super(session, methodHolder, autoDemand);
if (!autoDemand)
throw new IllegalArgumentException("%s must be auto-demanding".formatted(getClass().getSimpleName()));
executor = session.getWebSocketComponents().getExecutor();
Expand All @@ -74,7 +74,7 @@ public void accept(Frame frame, final Callback callback)
{
try
{
getMethodHandle().invoke(typeSink);
getMethodHolder().invoke(typeSink);
if (typeSink instanceof Closeable closeable)
IO.close(closeable);
dispatchComplete.complete(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,14 @@

package org.eclipse.jetty.websocket.core.messages;

import java.lang.invoke.MethodHandle;

import org.eclipse.jetty.websocket.core.CoreSession;
import org.eclipse.jetty.websocket.core.util.MethodHolder;

public class InputStreamMessageSink extends DispatchedMessageSink
{
public InputStreamMessageSink(CoreSession session, MethodHandle methodHandle, boolean autoDemand)
public InputStreamMessageSink(CoreSession session, MethodHolder methodHolder, boolean autoDemand)
{
super(session, methodHandle, autoDemand);
super(session, methodHolder, autoDemand);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,11 @@

package org.eclipse.jetty.websocket.core.messages;

import java.lang.invoke.MethodHandle;

import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.CoreSession;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.util.MethodHolder;

/**
* <p>A {@link MessageSink} implementation that delivers BINARY frames
Expand All @@ -31,12 +30,12 @@ public class PartialByteArrayMessageSink extends AbstractMessageSink
* Creates a new {@link PartialByteArrayMessageSink}.
*
* @param session the WebSocket session
* @param methodHandle the application function to invoke when a new frame has arrived
* @param methodHolder the application function to invoke when a new frame has arrived
* @param autoDemand whether this {@link MessageSink} manages demand automatically
*/
public PartialByteArrayMessageSink(CoreSession session, MethodHandle methodHandle, boolean autoDemand)
public PartialByteArrayMessageSink(CoreSession session, MethodHolder methodHolder, boolean autoDemand)
{
super(session, methodHandle, autoDemand);
super(session, methodHolder, autoDemand);
}

@Override
Expand All @@ -47,7 +46,7 @@ public void accept(Frame frame, Callback callback)
if (frame.hasPayload() || frame.isFin())
{
byte[] buffer = BufferUtil.toArray(frame.getPayload());
getMethodHandle().invoke(buffer, frame.isFin());
getMethodHolder().invoke(buffer, frame.isFin());
callback.succeeded();
autoDemand();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@

package org.eclipse.jetty.websocket.core.messages;

import java.lang.invoke.MethodHandle;
import java.nio.ByteBuffer;

import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.CoreSession;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.util.MethodHolder;

/**
* <p>A {@link MessageSink} implementation that delivers BINARY frames
Expand All @@ -31,12 +31,12 @@ public class PartialByteBufferMessageSink extends AbstractMessageSink
* Creates a new {@link PartialByteBufferMessageSink}.
*
* @param session the WebSocket session
* @param methodHandle the application function to invoke when a new frame has arrived
* @param methodHolder the application function to invoke when a new frame has arrived
* @param autoDemand whether this {@link MessageSink} manages demand automatically
*/
public PartialByteBufferMessageSink(CoreSession session, MethodHandle methodHandle, boolean autoDemand)
public PartialByteBufferMessageSink(CoreSession session, MethodHolder methodHolder, boolean autoDemand)
{
super(session, methodHandle, autoDemand);
super(session, methodHolder, autoDemand);
}

@Override
Expand All @@ -46,7 +46,7 @@ public void accept(Frame frame, Callback callback)
{
if (frame.hasPayload() || frame.isFin())
{
invoke(getMethodHandle(), frame.getPayload(), frame.isFin(), callback);
invoke(getMethodHolder(), frame.getPayload(), frame.isFin(), callback);
autoDemand();
}
else
Expand All @@ -61,9 +61,9 @@ public void accept(Frame frame, Callback callback)
}
}

protected void invoke(MethodHandle methodHandle, ByteBuffer byteBuffer, boolean fin, Callback callback) throws Throwable
protected void invoke(MethodHolder methodHolder, ByteBuffer byteBuffer, boolean fin, Callback callback) throws Throwable
{
methodHandle.invoke(byteBuffer, fin);
methodHolder.invoke(byteBuffer, fin);
callback.succeeded();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,12 @@

package org.eclipse.jetty.websocket.core.messages;

import java.lang.invoke.MethodHandle;

import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.websocket.core.CoreSession;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.exception.BadPayloadException;
import org.eclipse.jetty.websocket.core.util.MethodHolder;

/**
* <p>A {@link MessageSink} implementation that delivers TEXT frames
Expand All @@ -34,12 +33,12 @@ public class PartialStringMessageSink extends AbstractMessageSink
* Creates a new {@link PartialStringMessageSink}.
*
* @param session the WebSocket session
* @param methodHandle the application function to invoke when a new frame has arrived
* @param methodHolder the application function to invoke when a new frame has arrived
* @param autoDemand whether this {@link MessageSink} manages demand automatically
*/
public PartialStringMessageSink(CoreSession session, MethodHandle methodHandle, boolean autoDemand)
public PartialStringMessageSink(CoreSession session, MethodHolder methodHolder, boolean autoDemand)
{
super(session, methodHandle, autoDemand);
super(session, methodHolder, autoDemand);
}

@Override
Expand All @@ -55,12 +54,12 @@ public void accept(Frame frame, Callback callback)
if (frame.isFin())
{
String complete = accumulator.takeCompleteString(BadPayloadException.InvalidUtf8::new);
getMethodHandle().invoke(complete, true);
getMethodHolder().invoke(complete, true);
}
else
{
String partial = accumulator.takePartialString(BadPayloadException.InvalidUtf8::new);
getMethodHandle().invoke(partial, false);
getMethodHolder().invoke(partial, false);
}

callback.succeeded();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,14 @@

package org.eclipse.jetty.websocket.core.messages;

import java.lang.invoke.MethodHandle;

import org.eclipse.jetty.websocket.core.CoreSession;
import org.eclipse.jetty.websocket.core.util.MethodHolder;

public class ReaderMessageSink extends DispatchedMessageSink
{
public ReaderMessageSink(CoreSession session, MethodHandle methodHandle, boolean autoDemand)
public ReaderMessageSink(CoreSession session, MethodHolder methodHolder, boolean autoDemand)
{
super(session, methodHandle, autoDemand);
super(session, methodHolder, autoDemand);
}

@Override
Expand Down
Loading
Loading