Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #6566 - use the demand API for the websocket MessageSinks #6635

Merged
merged 13 commits into from
Aug 31, 2021
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//

package org.eclipse.jetty.io;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;

import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;

/**
* This class can be used to accumulate pairs of {@link ByteBuffer} and {@link Callback}, and eventually copy
* these into a single {@link ByteBuffer} or byte array and succeed the callbacks.
*/
public class ByteBufferCallbackAccumulator
{
private final List<Entry> _entries = new ArrayList<>();
private int _length;

private static class Entry
{
private final ByteBuffer buffer;
private final Callback callback;

Entry(ByteBuffer buffer, Callback callback)
{
this.buffer = buffer;
this.callback = callback;
}
}

public void addEntry(ByteBuffer buffer, Callback callback)
{
_entries.add(new Entry(buffer, callback));
_length = Math.addExact(_length, buffer.remaining());
}

/**
* @return the total length of the content in the accumulator.
*/
public int getLength()
{
return _length;
}

/**
* @return a newly allocated byte array containing all content written into the accumulator.
*/
public byte[] takeByteArray()
{
int length = getLength();
if (length == 0)
return new byte[0];

byte[] bytes = new byte[length];
ByteBuffer buffer = BufferUtil.toBuffer(bytes);
BufferUtil.clearToFill(buffer);
writeTo(buffer);
return bytes;
}

public void writeTo(ByteBuffer buffer)
{
if (buffer.remaining() < _length)
throw new IllegalArgumentException("not enough buffer space remaining");

for (Entry entry : _entries)
{
buffer.put(entry.buffer);
entry.callback.succeeded();
}
_entries.clear();
_length = 0;
}

public void fail(Throwable t)
{
for (Entry entry : _entries)
{
entry.callback.failed(t);
}
_entries.clear();
_length = 0;
}
}
36 changes: 31 additions & 5 deletions jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public InvocationType getInvocationType()
}

/**
* Creaste a callback that runs completed when it succeeds or fails
* Creates a callback that runs completed when it succeeds or fails
*
* @param completed The completion to run on success or failure
* @return a new callback
Expand All @@ -169,7 +169,7 @@ public void completed()
}

/**
* Create a nested callback that runs completed after
* Creates a nested callback that runs completed after
* completing the nested callback.
*
* @param callback The nested callback
Expand All @@ -188,7 +188,7 @@ public void completed()
}

/**
* Create a nested callback that runs completed before
* Creates a nested callback that runs completed before
* completing the nested callback.
*
* @param callback The nested callback
Expand Down Expand Up @@ -231,7 +231,7 @@ public void failed(Throwable x)
}

/**
* Create a nested callback which always fails the nested callback on completion.
* Creates a nested callback which always fails the nested callback on completion.
*
* @param callback The nested callback
* @param cause The cause to fail the nested callback, if the new callback is failed the reason
Expand All @@ -258,7 +258,7 @@ public void failed(Throwable x)
}

/**
* Create a callback which combines two other callbacks and will succeed or fail them both.
* Creates a callback which combines two other callbacks and will succeed or fail them both.
* @param callback1 The first callback
* @param callback2 The second callback
* @return a new callback.
Expand Down Expand Up @@ -411,6 +411,32 @@ public InvocationType getInvocationType()
*/
class Completable extends CompletableFuture<Void> implements Callback
{
/**
* Creates a completable future given a callback.
*
* @param callback The nested callback.
* @return a new Completable which will succeed this callback when completed.
*/
public static Completable from(Callback callback)
{
return new Completable(callback.getInvocationType())
{
@Override
public void succeeded()
{
callback.succeeded();
super.succeeded();
}

@Override
public void failed(Throwable x)
{
callback.failed(x);
super.failed(x);
}
};
}

private final InvocationType invocation;

public Completable()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ public interface CoreSession extends OutgoingFrames, Configuration
*/
SocketAddress getRemoteAddress();

/**
* @return True if the websocket is open inbound
*/
boolean isInputOpen();

/**
* @return True if the websocket is open outbound
*/
Expand Down Expand Up @@ -253,6 +258,12 @@ public SocketAddress getRemoteAddress()
return null;
}

@Override
public boolean isInputOpen()
{
return true;
}

@Override
public boolean isOutputOpen()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,12 @@ public SocketAddress getRemoteAddress()
return getConnection().getEndPoint().getRemoteSocketAddress();
}

@Override
public boolean isInputOpen()
{
return sessionState.isInputOpen();
}

@Override
public boolean isOutputOpen()
{
Expand Down Expand Up @@ -416,8 +422,10 @@ public void demand(long n)
{
if (!demanding)
throw new IllegalStateException("FrameHandler is not demanding: " + this);

if (!sessionState.isInputOpen())
throw new IllegalStateException("FrameHandler input not open: " + this);

connection.demand(n);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,11 @@

package org.eclipse.jetty.websocket.core.internal.messages;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodType;
import java.nio.ByteBuffer;

import org.eclipse.jetty.io.ByteBufferCallbackAccumulator;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.CoreSession;
Expand All @@ -29,9 +28,7 @@
public class ByteArrayMessageSink extends AbstractMessageSink
{
private static final byte[] EMPTY_BUFFER = new byte[0];
private static final int BUFFER_SIZE = 65535;
private ByteArrayOutputStream out;
private int size;
private ByteBufferCallbackAccumulator out;

public ByteArrayMessageSink(CoreSession session, MethodHandle methodHandle)
{
Expand All @@ -51,12 +48,12 @@ public void accept(Frame frame, Callback callback)
{
try
{
size += frame.getPayloadLength();
long size = (out == null ? 0 : out.getLength()) + frame.getPayloadLength();
long maxBinaryMessageSize = session.getMaxBinaryMessageSize();
if (maxBinaryMessageSize > 0 && size > maxBinaryMessageSize)
{
throw new MessageTooLargeException(String.format("Binary message too large: (actual) %,d > (configured max binary message size) %,d",
size, maxBinaryMessageSize));
throw new MessageTooLargeException(
String.format("Binary message too large: (actual) %,d > (configured max binary message size) %,d", size, maxBinaryMessageSize));
}

// If we are fin and no OutputStream has been created we don't need to aggregate.
Expand All @@ -71,19 +68,33 @@ public void accept(Frame frame, Callback callback)
methodHandle.invoke(EMPTY_BUFFER, 0, 0);

callback.succeeded();
session.demand(1);
return;
}

aggregatePayload(frame);
// Aggregate the frame payload.
if (frame.hasPayload())
{
ByteBuffer payload = frame.getPayload();
if (out == null)
out = new ByteBufferCallbackAccumulator();
out.addEntry(payload, callback);
}

// If the methodHandle throws we don't want to fail callback twice.
callback = Callback.NOOP;
if (frame.isFin())
{
byte[] buf = out.toByteArray();
byte[] buf = out.takeByteArray();
methodHandle.invoke(buf, 0, buf.length);
}
callback.succeeded();

session.demand(1);
}
catch (Throwable t)
{
if (out != null)
out.fail(t);
callback.failed(t);
}
finally
Expand All @@ -92,19 +103,7 @@ public void accept(Frame frame, Callback callback)
{
// reset
out = null;
size = 0;
}
}
}

private void aggregatePayload(Frame frame) throws IOException
{
if (frame.hasPayload())
{
ByteBuffer payload = frame.getPayload();
if (out == null)
out = new ByteArrayOutputStream(BUFFER_SIZE);
BufferUtil.writeTo(payload, out);
}
}
}
Loading