diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferCallbackAccumulator.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferCallbackAccumulator.java new file mode 100644 index 000000000000..72a1ed7569e4 --- /dev/null +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferCallbackAccumulator.java @@ -0,0 +1,99 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.io; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.Callback; + +/** + * This class can be used to accumulate pairs of {@link ByteBuffer} and {@link Callback}, and eventually copy + * these into a single {@link ByteBuffer} or byte array and succeed the callbacks. + */ +public class ByteBufferCallbackAccumulator +{ + private final List _entries = new ArrayList<>(); + private int _length; + + private static class Entry + { + private final ByteBuffer buffer; + private final Callback callback; + + Entry(ByteBuffer buffer, Callback callback) + { + this.buffer = buffer; + this.callback = callback; + } + } + + public void addEntry(ByteBuffer buffer, Callback callback) + { + _entries.add(new Entry(buffer, callback)); + _length = Math.addExact(_length, buffer.remaining()); + } + + /** + * @return the total length of the content in the accumulator. + */ + public int getLength() + { + return _length; + } + + /** + * @return a newly allocated byte array containing all content written into the accumulator. + */ + public byte[] takeByteArray() + { + int length = getLength(); + if (length == 0) + return new byte[0]; + + byte[] bytes = new byte[length]; + ByteBuffer buffer = BufferUtil.toBuffer(bytes); + BufferUtil.clear(buffer); + writeTo(buffer); + return bytes; + } + + public void writeTo(ByteBuffer buffer) + { + if (BufferUtil.space(buffer) < _length) + throw new IllegalArgumentException("not enough buffer space remaining"); + + int pos = BufferUtil.flipToFill(buffer); + for (Entry entry : _entries) + { + buffer.put(entry.buffer); + entry.callback.succeeded(); + } + BufferUtil.flipToFlush(buffer, pos); + _entries.clear(); + _length = 0; + } + + public void fail(Throwable t) + { + for (Entry entry : _entries) + { + entry.callback.failed(t); + } + _entries.clear(); + _length = 0; + } +} diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java index d5c2fd60e3a8..e10f63e3a1db 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java @@ -152,7 +152,7 @@ public InvocationType getInvocationType() } /** - * Creaste a callback that runs completed when it succeeds or fails + * Creates a callback that runs completed when it succeeds or fails * * @param completed The completion to run on success or failure * @return a new callback @@ -169,7 +169,7 @@ public void completed() } /** - * Create a nested callback that runs completed after + * Creates a nested callback that runs completed after * completing the nested callback. * * @param callback The nested callback @@ -188,7 +188,7 @@ public void completed() } /** - * Create a nested callback that runs completed before + * Creates a nested callback that runs completed before * completing the nested callback. * * @param callback The nested callback @@ -231,7 +231,7 @@ public void failed(Throwable x) } /** - * Create a nested callback which always fails the nested callback on completion. + * Creates a nested callback which always fails the nested callback on completion. * * @param callback The nested callback * @param cause The cause to fail the nested callback, if the new callback is failed the reason @@ -258,7 +258,7 @@ public void failed(Throwable x) } /** - * Create a callback which combines two other callbacks and will succeed or fail them both. + * Creates a callback which combines two other callbacks and will succeed or fail them both. * @param callback1 The first callback * @param callback2 The second callback * @return a new callback. @@ -411,6 +411,32 @@ public InvocationType getInvocationType() */ class Completable extends CompletableFuture implements Callback { + /** + * Creates a completable future given a callback. + * + * @param callback The nested callback. + * @return a new Completable which will succeed this callback when completed. + */ + public static Completable from(Callback callback) + { + return new Completable(callback.getInvocationType()) + { + @Override + public void succeeded() + { + callback.succeeded(); + super.succeeded(); + } + + @Override + public void failed(Throwable x) + { + callback.failed(x); + super.failed(x); + } + }; + } + private final InvocationType invocation; public Completable() diff --git a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/CoreSession.java b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/CoreSession.java index bbaf05c4e5a7..303db8dd408d 100644 --- a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/CoreSession.java +++ b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/CoreSession.java @@ -110,6 +110,11 @@ public interface CoreSession extends OutgoingFrames, Configuration */ SocketAddress getRemoteAddress(); + /** + * @return True if the websocket is open inbound + */ + boolean isInputOpen(); + /** * @return True if the websocket is open outbound */ @@ -253,6 +258,12 @@ public SocketAddress getRemoteAddress() return null; } + @Override + public boolean isInputOpen() + { + return true; + } + @Override public boolean isOutputOpen() { diff --git a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketCoreSession.java b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketCoreSession.java index d76bdee679da..8b7c4c8813de 100644 --- a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketCoreSession.java +++ b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketCoreSession.java @@ -183,6 +183,12 @@ public SocketAddress getRemoteAddress() return getConnection().getEndPoint().getRemoteSocketAddress(); } + @Override + public boolean isInputOpen() + { + return sessionState.isInputOpen(); + } + @Override public boolean isOutputOpen() { @@ -416,8 +422,10 @@ public void demand(long n) { if (!demanding) throw new IllegalStateException("FrameHandler is not demanding: " + this); + if (!sessionState.isInputOpen()) throw new IllegalStateException("FrameHandler input not open: " + this); + connection.demand(n); } diff --git a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/ByteArrayMessageSink.java b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/ByteArrayMessageSink.java index 1eeda1230dc3..3b220c5330b9 100644 --- a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/ByteArrayMessageSink.java +++ b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/ByteArrayMessageSink.java @@ -13,12 +13,11 @@ package org.eclipse.jetty.websocket.core.internal.messages; -import java.io.ByteArrayOutputStream; -import java.io.IOException; import java.lang.invoke.MethodHandle; import java.lang.invoke.MethodType; import java.nio.ByteBuffer; +import org.eclipse.jetty.io.ByteBufferCallbackAccumulator; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.websocket.core.CoreSession; @@ -29,9 +28,7 @@ public class ByteArrayMessageSink extends AbstractMessageSink { private static final byte[] EMPTY_BUFFER = new byte[0]; - private static final int BUFFER_SIZE = 65535; - private ByteArrayOutputStream out; - private int size; + private ByteBufferCallbackAccumulator out; public ByteArrayMessageSink(CoreSession session, MethodHandle methodHandle) { @@ -51,12 +48,12 @@ public void accept(Frame frame, Callback callback) { try { - size += frame.getPayloadLength(); + long size = (out == null ? 0 : out.getLength()) + frame.getPayloadLength(); long maxBinaryMessageSize = session.getMaxBinaryMessageSize(); if (maxBinaryMessageSize > 0 && size > maxBinaryMessageSize) { - throw new MessageTooLargeException(String.format("Binary message too large: (actual) %,d > (configured max binary message size) %,d", - size, maxBinaryMessageSize)); + throw new MessageTooLargeException( + String.format("Binary message too large: (actual) %,d > (configured max binary message size) %,d", size, maxBinaryMessageSize)); } // If we are fin and no OutputStream has been created we don't need to aggregate. @@ -71,19 +68,33 @@ public void accept(Frame frame, Callback callback) methodHandle.invoke(EMPTY_BUFFER, 0, 0); callback.succeeded(); + session.demand(1); return; } - aggregatePayload(frame); + // Aggregate the frame payload. + if (frame.hasPayload()) + { + ByteBuffer payload = frame.getPayload(); + if (out == null) + out = new ByteBufferCallbackAccumulator(); + out.addEntry(payload, callback); + } + + // If the methodHandle throws we don't want to fail callback twice. + callback = Callback.NOOP; if (frame.isFin()) { - byte[] buf = out.toByteArray(); + byte[] buf = out.takeByteArray(); methodHandle.invoke(buf, 0, buf.length); } - callback.succeeded(); + + session.demand(1); } catch (Throwable t) { + if (out != null) + out.fail(t); callback.failed(t); } finally @@ -92,19 +103,7 @@ public void accept(Frame frame, Callback callback) { // reset out = null; - size = 0; } } } - - private void aggregatePayload(Frame frame) throws IOException - { - if (frame.hasPayload()) - { - ByteBuffer payload = frame.getPayload(); - if (out == null) - out = new ByteArrayOutputStream(BUFFER_SIZE); - BufferUtil.writeTo(payload, out); - } - } } diff --git a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/ByteBufferMessageSink.java b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/ByteBufferMessageSink.java index 677406d36b75..5c58e90b0f07 100644 --- a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/ByteBufferMessageSink.java +++ b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/ByteBufferMessageSink.java @@ -13,13 +13,13 @@ package org.eclipse.jetty.websocket.core.internal.messages; -import java.io.ByteArrayOutputStream; -import java.io.IOException; import java.lang.invoke.MethodHandle; import java.lang.invoke.MethodType; import java.nio.ByteBuffer; import java.util.Objects; +import org.eclipse.jetty.io.ByteBufferCallbackAccumulator; +import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.websocket.core.CoreSession; @@ -29,9 +29,7 @@ public class ByteBufferMessageSink extends AbstractMessageSink { - private static final int BUFFER_SIZE = 65535; - private ByteArrayOutputStream out; - private int size; + private ByteBufferCallbackAccumulator out; public ByteBufferMessageSink(CoreSession session, MethodHandle methodHandle) { @@ -51,7 +49,7 @@ public void accept(Frame frame, Callback callback) { try { - size += frame.getPayloadLength(); + long size = (out == null ? 0 : out.getLength()) + frame.getPayloadLength(); long maxBinaryMessageSize = session.getMaxBinaryMessageSize(); if (maxBinaryMessageSize > 0 && size > maxBinaryMessageSize) { @@ -68,41 +66,51 @@ public void accept(Frame frame, Callback callback) methodHandle.invoke(BufferUtil.EMPTY_BUFFER); callback.succeeded(); + session.demand(1); return; } - aggregatePayload(frame); + // Aggregate the frame payload. + if (frame.hasPayload()) + { + ByteBuffer payload = frame.getPayload(); + if (out == null) + out = new ByteBufferCallbackAccumulator(); + out.addEntry(payload, callback); + } + + // If the methodHandle throws we don't want to fail callback twice. + callback = Callback.NOOP; if (frame.isFin()) - methodHandle.invoke(ByteBuffer.wrap(out.toByteArray())); + { + ByteBufferPool bufferPool = session.getByteBufferPool(); + ByteBuffer buffer = bufferPool.acquire(out.getLength(), false); + out.writeTo(buffer); - callback.succeeded(); + try + { + methodHandle.invoke(buffer); + } + finally + { + bufferPool.release(buffer); + } + } + + session.demand(1); } catch (Throwable t) { + if (out != null) + out.fail(t); callback.failed(t); } finally { if (frame.isFin()) { - // reset out = null; - size = 0; } } } - - private void aggregatePayload(Frame frame) throws IOException - { - if (frame.hasPayload()) - { - ByteBuffer payload = frame.getPayload(); - - if (out == null) - out = new ByteArrayOutputStream(BUFFER_SIZE); - - BufferUtil.writeTo(payload, out); - payload.position(payload.limit()); // consume buffer - } - } } diff --git a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/DispatchedMessageSink.java b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/DispatchedMessageSink.java index 57d31e0d3f67..049d937a8079 100644 --- a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/DispatchedMessageSink.java +++ b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/DispatchedMessageSink.java @@ -135,22 +135,32 @@ public void accept(Frame frame, final Callback callback) }); } - Callback frameCallback = callback; + Callback frameCallback; if (frame.isFin()) { // This is the final frame we should wait for the frame callback and the dispatched thread. - Callback.Completable completableCallback = new Callback.Completable(); - frameCallback = completableCallback; - CompletableFuture.allOf(dispatchComplete, completableCallback).whenComplete((aVoid, throwable) -> + Callback.Completable finComplete = Callback.Completable.from(callback); + frameCallback = finComplete; + CompletableFuture.allOf(dispatchComplete, finComplete).whenComplete((aVoid, throwable) -> { typeSink = null; dispatchComplete = null; - if (throwable != null) - callback.failed(throwable); - else - callback.succeeded(); + if (throwable == null) + session.demand(1); }); } + else + { + frameCallback = new Callback.Nested(callback) + { + @Override + public void succeeded() + { + super.succeeded(); + session.demand(1); + } + }; + } typeSink.accept(frame, frameCallback); } diff --git a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/PartialByteArrayMessageSink.java b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/PartialByteArrayMessageSink.java index 59a39984030b..076ef5f5c372 100644 --- a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/PartialByteArrayMessageSink.java +++ b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/PartialByteArrayMessageSink.java @@ -41,6 +41,7 @@ public void accept(Frame frame, Callback callback) } callback.succeeded(); + session.demand(1); } catch (Throwable t) { diff --git a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/PartialByteBufferMessageSink.java b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/PartialByteBufferMessageSink.java index f463645e7946..0331c477c7cf 100644 --- a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/PartialByteBufferMessageSink.java +++ b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/PartialByteBufferMessageSink.java @@ -35,6 +35,7 @@ public void accept(Frame frame, Callback callback) methodHandle.invoke(frame.getPayload(), frame.isFin()); callback.succeeded(); + session.demand(1); } catch (Throwable t) { diff --git a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/PartialStringMessageSink.java b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/PartialStringMessageSink.java index 6e01aa3327fd..9061884c7fd6 100644 --- a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/PartialStringMessageSink.java +++ b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/PartialStringMessageSink.java @@ -51,6 +51,7 @@ public void accept(Frame frame, Callback callback) } callback.succeeded(); + session.demand(1); } catch (Throwable t) { diff --git a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/StringMessageSink.java b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/StringMessageSink.java index 6cad40d22794..810e4330a116 100644 --- a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/StringMessageSink.java +++ b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/StringMessageSink.java @@ -53,6 +53,7 @@ public void accept(Frame frame, Callback callback) methodHandle.invoke(out.toString()); callback.succeeded(); + session.demand(1); } catch (Throwable t) { diff --git a/jetty-websocket/websocket-javax-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler.java b/jetty-websocket/websocket-javax-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler.java index 030312178243..24fb6d451674 100644 --- a/jetty-websocket/websocket-javax-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler.java +++ b/jetty-websocket/websocket-javax-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler.java @@ -178,6 +178,7 @@ public void onOpen(CoreSession coreSession, Callback callback) container.notifySessionListeners((listener) -> listener.onJavaxWebSocketSessionOpened(session)); callback.succeeded(); + coreSession.demand(1); } catch (Throwable cause) { @@ -321,6 +322,12 @@ public void onError(Throwable cause, Callback callback) } } + @Override + public boolean isDemanding() + { + return true; + } + public Set getMessageHandlers() { return messageHandlerMap.values().stream() @@ -591,6 +598,7 @@ public void onPing(Frame frame, Callback callback) ByteBuffer payload = BufferUtil.copy(frame.getPayload()); coreSession.sendFrame(new Frame(OpCode.PONG).setPayload(payload), Callback.NOOP, false); callback.succeeded(); + coreSession.demand(1); } public void onPong(Frame frame, Callback callback) @@ -613,6 +621,7 @@ public void onPong(Frame frame, Callback callback) } } callback.succeeded(); + coreSession.demand(1); } public void onText(Frame frame, Callback callback) diff --git a/jetty-websocket/websocket-javax-common/src/test/java/org/eclipse/jetty/websocket/javax/common/AbstractSessionTest.java b/jetty-websocket/websocket-javax-common/src/test/java/org/eclipse/jetty/websocket/javax/common/AbstractSessionTest.java index 30fa05edad77..9fac9ab9bc18 100644 --- a/jetty-websocket/websocket-javax-common/src/test/java/org/eclipse/jetty/websocket/javax/common/AbstractSessionTest.java +++ b/jetty-websocket/websocket-javax-common/src/test/java/org/eclipse/jetty/websocket/javax/common/AbstractSessionTest.java @@ -13,39 +13,35 @@ package org.eclipse.jetty.websocket.javax.common; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import javax.websocket.Endpoint; import javax.websocket.EndpointConfig; import javax.websocket.Session; +import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.websocket.core.CoreSession; import org.eclipse.jetty.websocket.core.WebSocketComponents; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; +import static org.junit.jupiter.api.Assertions.assertTrue; + public abstract class AbstractSessionTest { protected static JavaxWebSocketSession session; - protected static JavaxWebSocketContainer container; - protected static WebSocketComponents components; + protected static JavaxWebSocketContainer container = new DummyContainer(); + protected static WebSocketComponents components = new WebSocketComponents(); + protected static TestCoreSession coreSession = new TestCoreSession(); @BeforeAll public static void initSession() throws Exception { - container = new DummyContainer(); container.start(); - components = new WebSocketComponents(); components.start(); Object websocketPojo = new DummyEndpoint(); UpgradeRequest upgradeRequest = new UpgradeRequestAdapter(); JavaxWebSocketFrameHandler frameHandler = container.newFrameHandler(websocketPojo, upgradeRequest); - CoreSession coreSession = new CoreSession.Empty() - { - @Override - public WebSocketComponents getWebSocketComponents() - { - return components; - } - }; session = new JavaxWebSocketSession(container, coreSession, frameHandler, container.getFrameHandlerFactory() .newDefaultEndpointConfig(websocketPojo.getClass())); } @@ -57,6 +53,34 @@ public static void stopContainer() throws Exception container.stop(); } + public static class TestCoreSession extends CoreSession.Empty + { + private final Semaphore demand = new Semaphore(0); + + @Override + public WebSocketComponents getWebSocketComponents() + { + return components; + } + + @Override + public ByteBufferPool getByteBufferPool() + { + return components.getBufferPool(); + } + + public void waitForDemand(long timeout, TimeUnit timeUnit) throws InterruptedException + { + assertTrue(demand.tryAcquire(timeout, timeUnit)); + } + + @Override + public void demand(long n) + { + demand.release(); + } + } + public static class DummyEndpoint extends Endpoint { @Override diff --git a/jetty-websocket/websocket-javax-common/src/test/java/org/eclipse/jetty/websocket/javax/common/messages/AbstractMessageSinkTest.java b/jetty-websocket/websocket-javax-common/src/test/java/org/eclipse/jetty/websocket/javax/common/messages/AbstractMessageSinkTest.java index 83682c9a413d..4c6281c74b56 100644 --- a/jetty-websocket/websocket-javax-common/src/test/java/org/eclipse/jetty/websocket/javax/common/messages/AbstractMessageSinkTest.java +++ b/jetty-websocket/websocket-javax-common/src/test/java/org/eclipse/jetty/websocket/javax/common/messages/AbstractMessageSinkTest.java @@ -20,15 +20,12 @@ import javax.websocket.ClientEndpointConfig; import javax.websocket.Decoder; -import org.eclipse.jetty.websocket.core.WebSocketComponents; import org.eclipse.jetty.websocket.javax.common.AbstractSessionTest; import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketFrameHandlerFactory; import org.eclipse.jetty.websocket.javax.common.decoders.RegisteredDecoder; public abstract class AbstractMessageSinkTest extends AbstractSessionTest { - private final WebSocketComponents _components = new WebSocketComponents(); - public List toRegisteredDecoderList(Class clazz, Class objectType) { Class interfaceType; @@ -43,7 +40,7 @@ else if (Decoder.BinaryStream.class.isAssignableFrom(clazz)) else throw new IllegalStateException(); - return List.of(new RegisteredDecoder(clazz, interfaceType, objectType, ClientEndpointConfig.Builder.create().build(), _components)); + return List.of(new RegisteredDecoder(clazz, interfaceType, objectType, ClientEndpointConfig.Builder.create().build(), components)); } public MethodHandle getAcceptHandle(Consumer copy, Class type) diff --git a/jetty-websocket/websocket-javax-common/src/test/java/org/eclipse/jetty/websocket/javax/common/messages/DecodedBinaryMessageSinkTest.java b/jetty-websocket/websocket-javax-common/src/test/java/org/eclipse/jetty/websocket/javax/common/messages/DecodedBinaryMessageSinkTest.java index 96d8652aaed2..d57e2b0220c7 100644 --- a/jetty-websocket/websocket-javax-common/src/test/java/org/eclipse/jetty/websocket/javax/common/messages/DecodedBinaryMessageSinkTest.java +++ b/jetty-websocket/websocket-javax-common/src/test/java/org/eclipse/jetty/websocket/javax/common/messages/DecodedBinaryMessageSinkTest.java @@ -56,11 +56,11 @@ public void testCalendar1Frame() throws Exception data.put((byte)31); data.flip(); sink.accept(new Frame(OpCode.BINARY).setPayload(data).setFin(true), finCallback); + coreSession.waitForDemand(1, TimeUnit.SECONDS); - finCallback.get(1, TimeUnit.SECONDS); // wait for callback Calendar decoded = copyFuture.get(1, TimeUnit.SECONDS); - assertThat("FinCallback.done", finCallback.isDone(), is(true)); assertThat("Decoded.contents", format(decoded, "MM-dd-yyyy"), is("12-31-1999")); + assertThat("FinCallback.done", finCallback.isDone(), is(true)); } @Test @@ -89,16 +89,18 @@ public void testCalendar3Frames() throws Exception data3.flip(); sink.accept(new Frame(OpCode.BINARY).setPayload(data1).setFin(false), callback1); + coreSession.waitForDemand(1, TimeUnit.SECONDS); sink.accept(new Frame(OpCode.CONTINUATION).setPayload(data2).setFin(false), callback2); + coreSession.waitForDemand(1, TimeUnit.SECONDS); sink.accept(new Frame(OpCode.CONTINUATION).setPayload(data3).setFin(true), finCallback); + coreSession.waitForDemand(1, TimeUnit.SECONDS); finCallback.get(1, TimeUnit.SECONDS); // wait for callback Calendar decoded = copyFuture.get(1, TimeUnit.SECONDS); + assertThat("Decoded.contents", format(decoded, "MM-dd-yyyy"), is("01-01-2000")); assertThat("Callback1.done", callback1.isDone(), is(true)); assertThat("Callback2.done", callback2.isDone(), is(true)); assertThat("finCallback.done", finCallback.isDone(), is(true)); - - assertThat("Decoded.contents", format(decoded, "MM-dd-yyyy"), is("01-01-2000")); } private String format(Calendar cal, String formatPattern) diff --git a/jetty-websocket/websocket-javax-common/src/test/java/org/eclipse/jetty/websocket/javax/common/messages/DecodedBinaryStreamMessageSinkTest.java b/jetty-websocket/websocket-javax-common/src/test/java/org/eclipse/jetty/websocket/javax/common/messages/DecodedBinaryStreamMessageSinkTest.java index 9ae5519f8623..d0f2528bd1c3 100644 --- a/jetty-websocket/websocket-javax-common/src/test/java/org/eclipse/jetty/websocket/javax/common/messages/DecodedBinaryStreamMessageSinkTest.java +++ b/jetty-websocket/websocket-javax-common/src/test/java/org/eclipse/jetty/websocket/javax/common/messages/DecodedBinaryStreamMessageSinkTest.java @@ -58,11 +58,11 @@ public void testCalendar1Frame() throws Exception data.put((byte)31); data.flip(); sink.accept(new Frame(OpCode.BINARY).setPayload(data).setFin(true), finCallback); + coreSession.waitForDemand(1, TimeUnit.SECONDS); - finCallback.get(1, TimeUnit.SECONDS); // wait for callback Calendar decoded = copyFuture.get(1, TimeUnit.SECONDS); - assertThat("FinCallback.done", finCallback.isDone(), is(true)); assertThat("Decoded.contents", format(decoded, "MM-dd-yyyy"), is("12-31-1999")); + assertThat("FinCallback.done", finCallback.isDone(), is(true)); } @Test @@ -91,16 +91,17 @@ public void testCalendar3Frames() throws Exception data3.flip(); sink.accept(new Frame(OpCode.BINARY).setPayload(data1).setFin(false), callback1); + coreSession.waitForDemand(1, TimeUnit.SECONDS); sink.accept(new Frame(OpCode.CONTINUATION).setPayload(data2).setFin(false), callback2); + coreSession.waitForDemand(1, TimeUnit.SECONDS); sink.accept(new Frame(OpCode.CONTINUATION).setPayload(data3).setFin(true), finCallback); + coreSession.waitForDemand(1, TimeUnit.SECONDS); - finCallback.get(1, TimeUnit.SECONDS); // wait for callback Calendar decoded = copyFuture.get(1, TimeUnit.SECONDS); + assertThat("Decoded.contents", format(decoded, "MM-dd-yyyy"), is("01-01-2000")); assertThat("Callback1.done", callback1.isDone(), is(true)); assertThat("Callback2.done", callback2.isDone(), is(true)); assertThat("finCallback.done", finCallback.isDone(), is(true)); - - assertThat("Decoded.contents", format(decoded, "MM-dd-yyyy"), is("01-01-2000")); } private String format(Calendar cal, String formatPattern) diff --git a/jetty-websocket/websocket-javax-common/src/test/java/org/eclipse/jetty/websocket/javax/common/messages/DecodedTextMessageSinkTest.java b/jetty-websocket/websocket-javax-common/src/test/java/org/eclipse/jetty/websocket/javax/common/messages/DecodedTextMessageSinkTest.java index 9559659f605e..56fb9ad295ce 100644 --- a/jetty-websocket/websocket-javax-common/src/test/java/org/eclipse/jetty/websocket/javax/common/messages/DecodedTextMessageSinkTest.java +++ b/jetty-websocket/websocket-javax-common/src/test/java/org/eclipse/jetty/websocket/javax/common/messages/DecodedTextMessageSinkTest.java @@ -51,11 +51,11 @@ public void testDate1Frame() throws Exception FutureCallback finCallback = new FutureCallback(); sink.accept(new Frame(OpCode.TEXT).setPayload("2018.02.13").setFin(true), finCallback); + coreSession.waitForDemand(1, TimeUnit.SECONDS); - finCallback.get(1, TimeUnit.SECONDS); // wait for callback Date decoded = copyFuture.get(1, TimeUnit.SECONDS); - assertThat("FinCallback.done", finCallback.isDone(), is(true)); assertThat("Decoded.contents", format(decoded, "MM-dd-yyyy"), is("02-13-2018")); + assertThat("FinCallback.done", finCallback.isDone(), is(true)); } @Test @@ -72,16 +72,17 @@ public void testDate3Frames() throws Exception FutureCallback finCallback = new FutureCallback(); sink.accept(new Frame(OpCode.TEXT).setPayload("2023").setFin(false), callback1); + coreSession.waitForDemand(1, TimeUnit.SECONDS); sink.accept(new Frame(OpCode.CONTINUATION).setPayload(".08").setFin(false), callback2); + coreSession.waitForDemand(1, TimeUnit.SECONDS); sink.accept(new Frame(OpCode.CONTINUATION).setPayload(".22").setFin(true), finCallback); + coreSession.waitForDemand(1, TimeUnit.SECONDS); - finCallback.get(1, TimeUnit.SECONDS); // wait for callback Date decoded = copyFuture.get(1, TimeUnit.SECONDS); + assertThat("Decoded.contents", format(decoded, "MM-dd-yyyy"), is("08-22-2023")); assertThat("Callback1.done", callback1.isDone(), is(true)); assertThat("Callback2.done", callback2.isDone(), is(true)); assertThat("finCallback.done", finCallback.isDone(), is(true)); - - assertThat("Decoded.contents", format(decoded, "MM-dd-yyyy"), is("08-22-2023")); } private String format(Date date, String formatPattern) diff --git a/jetty-websocket/websocket-javax-common/src/test/java/org/eclipse/jetty/websocket/javax/common/messages/DecodedTextStreamMessageSinkTest.java b/jetty-websocket/websocket-javax-common/src/test/java/org/eclipse/jetty/websocket/javax/common/messages/DecodedTextStreamMessageSinkTest.java index 92523bb18c6c..e28ba72ca838 100644 --- a/jetty-websocket/websocket-javax-common/src/test/java/org/eclipse/jetty/websocket/javax/common/messages/DecodedTextStreamMessageSinkTest.java +++ b/jetty-websocket/websocket-javax-common/src/test/java/org/eclipse/jetty/websocket/javax/common/messages/DecodedTextStreamMessageSinkTest.java @@ -54,11 +54,11 @@ public void testDate1Frame() throws Exception FutureCallback finCallback = new FutureCallback(); sink.accept(new Frame(OpCode.TEXT).setPayload("2018.02.13").setFin(true), finCallback); + coreSession.waitForDemand(1, TimeUnit.SECONDS); - finCallback.get(1, TimeUnit.SECONDS); // wait for callback Date decoded = copyFuture.get(1, TimeUnit.SECONDS); - assertThat("FinCallback.done", finCallback.isDone(), is(true)); assertThat("Decoded.contents", format(decoded, "MM-dd-yyyy"), is("02-13-2018")); + assertThat("FinCallback.done", finCallback.isDone(), is(true)); } @Test @@ -75,16 +75,17 @@ public void testDate3Frames() throws Exception FutureCallback finCallback = new FutureCallback(); sink.accept(new Frame(OpCode.TEXT).setPayload("2023").setFin(false), callback1); + coreSession.waitForDemand(1, TimeUnit.SECONDS); sink.accept(new Frame(OpCode.CONTINUATION).setPayload(".08").setFin(false), callback2); + coreSession.waitForDemand(1, TimeUnit.SECONDS); sink.accept(new Frame(OpCode.CONTINUATION).setPayload(".22").setFin(true), finCallback); + coreSession.waitForDemand(1, TimeUnit.SECONDS); - finCallback.get(1, TimeUnit.SECONDS); // wait for callback Date decoded = copyFuture.get(1, TimeUnit.SECONDS); + assertThat("Decoded.contents", format(decoded, "MM-dd-yyyy"), is("08-22-2023")); assertThat("Callback1.done", callback1.isDone(), is(true)); assertThat("Callback2.done", callback2.isDone(), is(true)); assertThat("finCallback.done", finCallback.isDone(), is(true)); - - assertThat("Decoded.contents", format(decoded, "MM-dd-yyyy"), is("08-22-2023")); } private String format(Date date, String formatPattern) diff --git a/jetty-websocket/websocket-javax-common/src/test/java/org/eclipse/jetty/websocket/javax/common/messages/InputStreamMessageSinkTest.java b/jetty-websocket/websocket-javax-common/src/test/java/org/eclipse/jetty/websocket/javax/common/messages/InputStreamMessageSinkTest.java index 7e5d26b3180a..6a92b007d58f 100644 --- a/jetty-websocket/websocket-javax-common/src/test/java/org/eclipse/jetty/websocket/javax/common/messages/InputStreamMessageSinkTest.java +++ b/jetty-websocket/websocket-javax-common/src/test/java/org/eclipse/jetty/websocket/javax/common/messages/InputStreamMessageSinkTest.java @@ -18,6 +18,7 @@ import java.io.InputStream; import java.lang.invoke.MethodHandle; import java.nio.ByteBuffer; +import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -51,10 +52,11 @@ public void testInputStream1Message1Frame() throws InterruptedException, Executi ByteBuffer data = BufferUtil.toBuffer("Hello World", UTF_8); sink.accept(new Frame(OpCode.BINARY).setPayload(data), finCallback); - finCallback.get(1, TimeUnit.SECONDS); // wait for callback + coreSession.waitForDemand(1, TimeUnit.SECONDS); + finCallback.get(1, TimeUnit.SECONDS); ByteArrayOutputStream byteStream = copy.poll(1, TimeUnit.SECONDS); + assertThat("Writer.contents", byteStream.toString(UTF_8), is("Hello World")); assertThat("FinCallback.done", finCallback.isDone(), is(true)); - assertThat("Writer.contents", new String(byteStream.toByteArray(), UTF_8), is("Hello World")); } @Test @@ -68,19 +70,22 @@ public void testInputStream2Messages2Frames() throws InterruptedException, Execu ByteBuffer data1 = BufferUtil.toBuffer("Hello World", UTF_8); sink.accept(new Frame(OpCode.BINARY).setPayload(data1).setFin(true), fin1Callback); - fin1Callback.get(1, TimeUnit.SECONDS); // wait for callback (can't sent next message until this callback finishes) + // wait for demand (can't sent next message until a new frame is demanded) + coreSession.waitForDemand(1, TimeUnit.SECONDS); + fin1Callback.get(1, TimeUnit.SECONDS); ByteArrayOutputStream byteStream = copy.poll(1, TimeUnit.SECONDS); + assertThat("Writer.contents", byteStream.toString(UTF_8), is("Hello World")); assertThat("FinCallback.done", fin1Callback.isDone(), is(true)); - assertThat("Writer.contents", new String(byteStream.toByteArray(), UTF_8), is("Hello World")); FutureCallback fin2Callback = new FutureCallback(); ByteBuffer data2 = BufferUtil.toBuffer("Greetings Earthling", UTF_8); sink.accept(new Frame(OpCode.BINARY).setPayload(data2).setFin(true), fin2Callback); - fin2Callback.get(1, TimeUnit.SECONDS); // wait for callback + coreSession.waitForDemand(1, TimeUnit.SECONDS); + fin2Callback.get(1, TimeUnit.SECONDS); byteStream = copy.poll(1, TimeUnit.SECONDS); + assertThat("Writer.contents", byteStream.toString(UTF_8), is("Greetings Earthling")); assertThat("FinCallback.done", fin2Callback.isDone(), is(true)); - assertThat("Writer.contents", new String(byteStream.toByteArray(), UTF_8), is("Greetings Earthling")); } @Test @@ -95,16 +100,17 @@ public void testInputStream1Message3Frames() throws InterruptedException, Execut FutureCallback finCallback = new FutureCallback(); sink.accept(new Frame(OpCode.BINARY).setPayload("Hello").setFin(false), callback1); + coreSession.waitForDemand(1, TimeUnit.SECONDS); sink.accept(new Frame(OpCode.CONTINUATION).setPayload(", ").setFin(false), callback2); + coreSession.waitForDemand(1, TimeUnit.SECONDS); sink.accept(new Frame(OpCode.CONTINUATION).setPayload("World").setFin(true), finCallback); + coreSession.waitForDemand(1, TimeUnit.SECONDS); - finCallback.get(1, TimeUnit.SECONDS); // wait for callback ByteArrayOutputStream byteStream = copy.poll(1, TimeUnit.SECONDS); - assertThat("Callback1.done", callback1.isDone(), is(true)); - assertThat("Callback2.done", callback2.isDone(), is(true)); + assertThat("Writer.contents", byteStream.toString(UTF_8), is("Hello, World")); + assertThat("callback1.done", callback1.isDone(), is(true)); + assertThat("callback2.done", callback2.isDone(), is(true)); assertThat("finCallback.done", finCallback.isDone(), is(true)); - - assertThat("Writer.contents", new String(byteStream.toByteArray(), UTF_8), is("Hello, World")); } @Test @@ -120,18 +126,20 @@ public void testInputStream1Message4FramesEmptyFin() throws InterruptedException FutureCallback finCallback = new FutureCallback(); sink.accept(new Frame(OpCode.BINARY).setPayload("Greetings").setFin(false), callback1); + coreSession.waitForDemand(1, TimeUnit.SECONDS); sink.accept(new Frame(OpCode.CONTINUATION).setPayload(", ").setFin(false), callback2); + coreSession.waitForDemand(1, TimeUnit.SECONDS); sink.accept(new Frame(OpCode.CONTINUATION).setPayload("Earthling").setFin(false), callback3); + coreSession.waitForDemand(1, TimeUnit.SECONDS); sink.accept(new Frame(OpCode.CONTINUATION).setPayload(new byte[0]).setFin(true), finCallback); + coreSession.waitForDemand(1, TimeUnit.SECONDS); - finCallback.get(5, TimeUnit.SECONDS); // wait for callback ByteArrayOutputStream byteStream = copy.poll(1, TimeUnit.SECONDS); + assertThat("Writer.contents", byteStream.toString(UTF_8), is("Greetings, Earthling")); assertThat("Callback1.done", callback1.isDone(), is(true)); assertThat("Callback2.done", callback2.isDone(), is(true)); assertThat("Callback3.done", callback3.isDone(), is(true)); assertThat("finCallback.done", finCallback.isDone(), is(true)); - - assertThat("Writer.contents", new String(byteStream.toByteArray(), UTF_8), is("Greetings, Earthling")); } public static class InputStreamCopy implements Consumer @@ -156,9 +164,9 @@ public void accept(InputStream in) } } - public ByteArrayOutputStream poll(long time, TimeUnit unit) throws InterruptedException, ExecutionException + public ByteArrayOutputStream poll(long time, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - return streams.poll(time, unit).get(); + return Objects.requireNonNull(streams.poll(time, unit)).get(time, unit); } } } diff --git a/jetty-websocket/websocket-javax-common/src/test/java/org/eclipse/jetty/websocket/javax/common/messages/ReaderMessageSinkTest.java b/jetty-websocket/websocket-javax-common/src/test/java/org/eclipse/jetty/websocket/javax/common/messages/ReaderMessageSinkTest.java index 08243d3993e5..7fb403de1b28 100644 --- a/jetty-websocket/websocket-javax-common/src/test/java/org/eclipse/jetty/websocket/javax/common/messages/ReaderMessageSinkTest.java +++ b/jetty-websocket/websocket-javax-common/src/test/java/org/eclipse/jetty/websocket/javax/common/messages/ReaderMessageSinkTest.java @@ -45,11 +45,11 @@ public void testReader1Frame() throws InterruptedException, ExecutionException, FutureCallback finCallback = new FutureCallback(); sink.accept(new Frame(OpCode.TEXT).setPayload("Hello World"), finCallback); + coreSession.waitForDemand(1, TimeUnit.SECONDS); - finCallback.get(1, TimeUnit.SECONDS); // wait for callback StringWriter writer = copyFuture.get(1, TimeUnit.SECONDS); - assertThat("FinCallback.done", finCallback.isDone(), is(true)); assertThat("Writer.contents", writer.getBuffer().toString(), is("Hello World")); + assertThat("FinCallback.done", finCallback.isDone(), is(true)); } @Test @@ -65,15 +65,17 @@ public void testReader3Frames() throws InterruptedException, ExecutionException, FutureCallback finCallback = new FutureCallback(); sink.accept(new Frame(OpCode.TEXT).setPayload("Hello").setFin(false), callback1); + coreSession.waitForDemand(1, TimeUnit.SECONDS); sink.accept(new Frame(OpCode.CONTINUATION).setPayload(", ").setFin(false), callback2); + coreSession.waitForDemand(1, TimeUnit.SECONDS); sink.accept(new Frame(OpCode.CONTINUATION).setPayload("World").setFin(true), finCallback); + coreSession.waitForDemand(1, TimeUnit.SECONDS); - finCallback.get(1, TimeUnit.SECONDS); // wait for fin callback StringWriter writer = copyFuture.get(1, TimeUnit.SECONDS); + assertThat("Writer contents", writer.getBuffer().toString(), is("Hello, World")); assertThat("Callback1.done", callback1.isDone(), is(true)); assertThat("Callback2.done", callback2.isDone(), is(true)); assertThat("finCallback.done", finCallback.isDone(), is(true)); - assertThat("Writer contents", writer.getBuffer().toString(), is("Hello, World")); } public static class ReaderCopy implements Consumer diff --git a/jetty-websocket/websocket-jetty-common/pom.xml b/jetty-websocket/websocket-jetty-common/pom.xml index 320df7dc0dff..d5db982aed53 100644 --- a/jetty-websocket/websocket-jetty-common/pom.xml +++ b/jetty-websocket/websocket-jetty-common/pom.xml @@ -45,5 +45,10 @@ websocket-core-common ${project.version} + + org.eclipse.jetty + jetty-slf4j-impl + test + diff --git a/jetty-websocket/websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java b/jetty-websocket/websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java index 992c9042dd2a..36a64385249c 100644 --- a/jetty-websocket/websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java +++ b/jetty-websocket/websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java @@ -82,6 +82,7 @@ private enum SuspendState private WebSocketSession session; private SuspendState state = SuspendState.DEMANDING; private Runnable delayedOnFrame; + private CoreSession coreSession; public JettyWebSocketFrameHandler(WebSocketContainer container, Object endpointInstance, @@ -150,6 +151,7 @@ public void onOpen(CoreSession coreSession, Callback callback) try { customizer.customize(coreSession); + this.coreSession = coreSession; session = new WebSocketSession(container, coreSession, this); if (!session.isOpen()) throw new IllegalStateException("Session is not open"); @@ -223,43 +225,25 @@ public void onFrame(Frame frame, Callback callback) } } - // Demand after succeeding any received frame - Callback demandingCallback = Callback.from(() -> - { - try - { - demand(); - } - catch (Throwable t) - { - callback.failed(t); - return; - } - - callback.succeeded(); - }, - callback::failed - ); - switch (frame.getOpCode()) { case OpCode.CLOSE: onCloseFrame(frame, callback); break; case OpCode.PING: - onPingFrame(frame, demandingCallback); + onPingFrame(frame, callback); break; case OpCode.PONG: - onPongFrame(frame, demandingCallback); + onPongFrame(frame, callback); break; case OpCode.TEXT: - onTextFrame(frame, demandingCallback); + onTextFrame(frame, callback); break; case OpCode.BINARY: - onBinaryFrame(frame, demandingCallback); + onBinaryFrame(frame, callback); break; case OpCode.CONTINUATION: - onContinuationFrame(frame, demandingCallback); + onContinuationFrame(frame, callback); break; default: callback.failed(new IllegalStateException()); @@ -342,6 +326,7 @@ private void acceptMessage(Frame frame, Callback callback) if (activeMessageSink == null) { callback.succeeded(); + demand(); return; } @@ -387,7 +372,9 @@ private void onPingFrame(Frame frame, Callback callback) ByteBuffer payload = BufferUtil.copy(frame.getPayload()); getSession().getRemote().sendPong(payload, WriteCallback.NOOP); } + callback.succeeded(); + demand(); } private void onPongFrame(Frame frame, Callback callback) @@ -407,7 +394,9 @@ private void onPongFrame(Frame frame, Callback callback) throw new WebSocketException(endpointInstance.getClass().getSimpleName() + " PONG method error: " + cause.getMessage(), cause); } } + callback.succeeded(); + demand(); } private void onTextFrame(Frame frame, Callback callback) diff --git a/jetty-websocket/websocket-jetty-common/src/test/java/org/eclipse/jetty/websocket/common/OutgoingMessageCapture.java b/jetty-websocket/websocket-jetty-common/src/test/java/org/eclipse/jetty/websocket/common/OutgoingMessageCapture.java index b7494b5da0bf..5ef8f6eeafde 100644 --- a/jetty-websocket/websocket-jetty-common/src/test/java/org/eclipse/jetty/websocket/common/OutgoingMessageCapture.java +++ b/jetty-websocket/websocket-jetty-common/src/test/java/org/eclipse/jetty/websocket/common/OutgoingMessageCapture.java @@ -20,6 +20,8 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; +import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.io.NullByteBufferPool; import org.eclipse.jetty.toolchain.test.Hex; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.websocket.core.CloseStatus; @@ -40,6 +42,7 @@ public class OutgoingMessageCapture extends CoreSession.Empty implements CoreSes public BlockingQueue binaryMessages = new LinkedBlockingDeque<>(); public BlockingQueue events = new LinkedBlockingDeque<>(); + private final ByteBufferPool bufferPool = new NullByteBufferPool(); private final MethodHandle wholeTextHandle; private final MethodHandle wholeBinaryHandle; private MessageSink messageSink; @@ -116,16 +119,19 @@ public void sendFrame(Frame frame, Callback callback, boolean batch) if (OpCode.isDataFrame(frame.getOpCode())) { - messageSink.accept(Frame.copy(frame), callback); + Frame copy = Frame.copy(frame); + messageSink.accept(copy, Callback.from(() -> {}, Throwable::printStackTrace)); if (frame.isFin()) - { messageSink = null; - } - } - else - { - callback.succeeded(); } + + callback.succeeded(); + } + + @Override + public ByteBufferPool getByteBufferPool() + { + return bufferPool; } @SuppressWarnings("unused")