Skip to content

Commit

Permalink
Merge pull request #6635 from eclipse/jetty-10.0.x-6566-WebSocketMess…
Browse files Browse the repository at this point in the history
…ageSinks

Issue #6566 - use the demand API for the websocket MessageSinks
  • Loading branch information
lachlan-roberts authored Aug 31, 2021
2 parents 4be1e63 + 763820e commit 453bcbb
Show file tree
Hide file tree
Showing 23 changed files with 356 additions and 146 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//

package org.eclipse.jetty.io;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;

import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;

/**
* This class can be used to accumulate pairs of {@link ByteBuffer} and {@link Callback}, and eventually copy
* these into a single {@link ByteBuffer} or byte array and succeed the callbacks.
*/
public class ByteBufferCallbackAccumulator
{
private final List<Entry> _entries = new ArrayList<>();
private int _length;

private static class Entry
{
private final ByteBuffer buffer;
private final Callback callback;

Entry(ByteBuffer buffer, Callback callback)
{
this.buffer = buffer;
this.callback = callback;
}
}

public void addEntry(ByteBuffer buffer, Callback callback)
{
_entries.add(new Entry(buffer, callback));
_length = Math.addExact(_length, buffer.remaining());
}

/**
* @return the total length of the content in the accumulator.
*/
public int getLength()
{
return _length;
}

/**
* @return a newly allocated byte array containing all content written into the accumulator.
*/
public byte[] takeByteArray()
{
int length = getLength();
if (length == 0)
return new byte[0];

byte[] bytes = new byte[length];
ByteBuffer buffer = BufferUtil.toBuffer(bytes);
BufferUtil.clear(buffer);
writeTo(buffer);
return bytes;
}

public void writeTo(ByteBuffer buffer)
{
if (BufferUtil.space(buffer) < _length)
throw new IllegalArgumentException("not enough buffer space remaining");

int pos = BufferUtil.flipToFill(buffer);
for (Entry entry : _entries)
{
buffer.put(entry.buffer);
entry.callback.succeeded();
}
BufferUtil.flipToFlush(buffer, pos);
_entries.clear();
_length = 0;
}

public void fail(Throwable t)
{
for (Entry entry : _entries)
{
entry.callback.failed(t);
}
_entries.clear();
_length = 0;
}
}
36 changes: 31 additions & 5 deletions jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public InvocationType getInvocationType()
}

/**
* Creaste a callback that runs completed when it succeeds or fails
* Creates a callback that runs completed when it succeeds or fails
*
* @param completed The completion to run on success or failure
* @return a new callback
Expand All @@ -169,7 +169,7 @@ public void completed()
}

/**
* Create a nested callback that runs completed after
* Creates a nested callback that runs completed after
* completing the nested callback.
*
* @param callback The nested callback
Expand All @@ -188,7 +188,7 @@ public void completed()
}

/**
* Create a nested callback that runs completed before
* Creates a nested callback that runs completed before
* completing the nested callback.
*
* @param callback The nested callback
Expand Down Expand Up @@ -231,7 +231,7 @@ public void failed(Throwable x)
}

/**
* Create a nested callback which always fails the nested callback on completion.
* Creates a nested callback which always fails the nested callback on completion.
*
* @param callback The nested callback
* @param cause The cause to fail the nested callback, if the new callback is failed the reason
Expand All @@ -258,7 +258,7 @@ public void failed(Throwable x)
}

/**
* Create a callback which combines two other callbacks and will succeed or fail them both.
* Creates a callback which combines two other callbacks and will succeed or fail them both.
* @param callback1 The first callback
* @param callback2 The second callback
* @return a new callback.
Expand Down Expand Up @@ -411,6 +411,32 @@ public InvocationType getInvocationType()
*/
class Completable extends CompletableFuture<Void> implements Callback
{
/**
* Creates a completable future given a callback.
*
* @param callback The nested callback.
* @return a new Completable which will succeed this callback when completed.
*/
public static Completable from(Callback callback)
{
return new Completable(callback.getInvocationType())
{
@Override
public void succeeded()
{
callback.succeeded();
super.succeeded();
}

@Override
public void failed(Throwable x)
{
callback.failed(x);
super.failed(x);
}
};
}

private final InvocationType invocation;

public Completable()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ public interface CoreSession extends OutgoingFrames, Configuration
*/
SocketAddress getRemoteAddress();

/**
* @return True if the websocket is open inbound
*/
boolean isInputOpen();

/**
* @return True if the websocket is open outbound
*/
Expand Down Expand Up @@ -253,6 +258,12 @@ public SocketAddress getRemoteAddress()
return null;
}

@Override
public boolean isInputOpen()
{
return true;
}

@Override
public boolean isOutputOpen()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,12 @@ public SocketAddress getRemoteAddress()
return getConnection().getEndPoint().getRemoteSocketAddress();
}

@Override
public boolean isInputOpen()
{
return sessionState.isInputOpen();
}

@Override
public boolean isOutputOpen()
{
Expand Down Expand Up @@ -416,8 +422,10 @@ public void demand(long n)
{
if (!demanding)
throw new IllegalStateException("FrameHandler is not demanding: " + this);

if (!sessionState.isInputOpen())
throw new IllegalStateException("FrameHandler input not open: " + this);

connection.demand(n);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,11 @@

package org.eclipse.jetty.websocket.core.internal.messages;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodType;
import java.nio.ByteBuffer;

import org.eclipse.jetty.io.ByteBufferCallbackAccumulator;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.CoreSession;
Expand All @@ -29,9 +28,7 @@
public class ByteArrayMessageSink extends AbstractMessageSink
{
private static final byte[] EMPTY_BUFFER = new byte[0];
private static final int BUFFER_SIZE = 65535;
private ByteArrayOutputStream out;
private int size;
private ByteBufferCallbackAccumulator out;

public ByteArrayMessageSink(CoreSession session, MethodHandle methodHandle)
{
Expand All @@ -51,12 +48,12 @@ public void accept(Frame frame, Callback callback)
{
try
{
size += frame.getPayloadLength();
long size = (out == null ? 0 : out.getLength()) + frame.getPayloadLength();
long maxBinaryMessageSize = session.getMaxBinaryMessageSize();
if (maxBinaryMessageSize > 0 && size > maxBinaryMessageSize)
{
throw new MessageTooLargeException(String.format("Binary message too large: (actual) %,d > (configured max binary message size) %,d",
size, maxBinaryMessageSize));
throw new MessageTooLargeException(
String.format("Binary message too large: (actual) %,d > (configured max binary message size) %,d", size, maxBinaryMessageSize));
}

// If we are fin and no OutputStream has been created we don't need to aggregate.
Expand All @@ -71,19 +68,33 @@ public void accept(Frame frame, Callback callback)
methodHandle.invoke(EMPTY_BUFFER, 0, 0);

callback.succeeded();
session.demand(1);
return;
}

aggregatePayload(frame);
// Aggregate the frame payload.
if (frame.hasPayload())
{
ByteBuffer payload = frame.getPayload();
if (out == null)
out = new ByteBufferCallbackAccumulator();
out.addEntry(payload, callback);
}

// If the methodHandle throws we don't want to fail callback twice.
callback = Callback.NOOP;
if (frame.isFin())
{
byte[] buf = out.toByteArray();
byte[] buf = out.takeByteArray();
methodHandle.invoke(buf, 0, buf.length);
}
callback.succeeded();

session.demand(1);
}
catch (Throwable t)
{
if (out != null)
out.fail(t);
callback.failed(t);
}
finally
Expand All @@ -92,19 +103,7 @@ public void accept(Frame frame, Callback callback)
{
// reset
out = null;
size = 0;
}
}
}

private void aggregatePayload(Frame frame) throws IOException
{
if (frame.hasPayload())
{
ByteBuffer payload = frame.getPayload();
if (out == null)
out = new ByteArrayOutputStream(BUFFER_SIZE);
BufferUtil.writeTo(payload, out);
}
}
}
Loading

0 comments on commit 453bcbb

Please sign in to comment.