Skip to content

Commit

Permalink
transports can send a msg larger than their flow control window (#10842)
Browse files Browse the repository at this point in the history
  • Loading branch information
jdcormie authored Jan 22, 2024
1 parent c9db8fa commit 09acf2f
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1354,8 +1354,8 @@ public void flowControlPushBack() throws Exception {
serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;

ClientStream clientStream = client.newStream(
methodDescriptor, new Metadata(), callOptions, tracers);
ClientStream clientStream =
client.newStream(methodDescriptor, new Metadata(), callOptions, tracers);
ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
clientStream.start(clientStreamListener);
StreamCreation serverStreamCreation =
Expand All @@ -1366,15 +1366,7 @@ public void flowControlPushBack() throws Exception {

serverStream.writeHeaders(new Metadata(), true);

String largeMessage;
{
int size = 1 * 1024;
StringBuilder sb = new StringBuilder(size);
for (int i = 0; i < size; i++) {
sb.append('a');
}
largeMessage = sb.toString();
}
String largeMessage = newString(1024);

serverStream.request(1);
assertTrue(clientStreamListener.awaitOnReadyAndDrain(TIMEOUT_MS, TimeUnit.MILLISECONDS));
Expand Down Expand Up @@ -1494,6 +1486,64 @@ public void flowControlPushBack() throws Exception {
assertEquals(status.getDescription(), clientStreamStatus.getDescription());
}

@Test
public void flowControlDoesNotDeadlockLargeMessage() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
startTransport(client, mockClientTransportListener);
MockServerTransportListener serverTransportListener =
serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;

ClientStream clientStream =
client.newStream(methodDescriptor, new Metadata(), callOptions, tracers);
ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
clientStream.start(clientStreamListener);
StreamCreation serverStreamCreation =
serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
assertEquals(methodDescriptor.getFullMethodName(), serverStreamCreation.method);
ServerStream serverStream = serverStreamCreation.stream;
ServerStreamListenerBase serverStreamListener = serverStreamCreation.listener;

serverStream.writeHeaders(new Metadata(), true);

String largeMessage = newString(TEST_FLOW_CONTROL_WINDOW + 1);

serverStream.request(1);
assertTrue(clientStreamListener.awaitOnReadyAndDrain(TIMEOUT_MS, TimeUnit.MILLISECONDS));
assertTrue(clientStream.isReady());
clientStream.writeMessage(methodDescriptor.streamRequest(largeMessage));
clientStream.flush();
doPingPong(serverListener);

verifyMessageCountAndClose(serverStreamListener.messageQueue, 1);

clientStream.request(1);
assertTrue(serverStreamListener.awaitOnReadyAndDrain(TIMEOUT_MS, TimeUnit.MILLISECONDS));
assertTrue(serverStream.isReady());
serverStream.writeMessage(methodDescriptor.streamResponse(largeMessage));
serverStream.flush();
doPingPong(serverListener);

verifyMessageCountAndClose(clientStreamListener.messageQueue, 1);

// And now check that the streams can still complete normally.
clientStream.halfClose();
doPingPong(serverListener);
serverStream.request(1);
assertTrue(serverStreamListener.awaitHalfClosed(TIMEOUT_MS, TimeUnit.MILLISECONDS));

Status status = Status.OK.withDescription("... quite a lengthy discussion");
serverStream.close(status, new Metadata());
doPingPong(serverListener);
clientStream.request(1);
assertCodeEquals(Status.OK, serverStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
Status clientStreamStatus = clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
assertEquals(status.getCode(), clientStreamStatus.getCode());
assertEquals(status.getDescription(), clientStreamStatus.getDescription());
}

private int verifyMessageCountAndClose(BlockingQueue<InputStream> messageQueue, int count)
throws Exception {
InputStream message;
Expand Down Expand Up @@ -2399,4 +2449,12 @@ private static TransportStats getTransportStats(InternalInstrumented<SocketStats
throws ExecutionException, InterruptedException {
return socket.getStats().get().data;
}

private static String newString(int size) {
StringBuilder sb = new StringBuilder(size);
for (int i = 0; i < size; i++) {
sb.append('a');
}
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,12 @@ public void serverNotListening() {
public void flowControlPushBack() {
}

@Override
@Ignore("Servlet flow control not implemented yet")
@Test
public void flowControlDoesNotDeadlockLargeMessage() {
}

// FIXME
@Override
@Ignore("Jetty is broken on client RST_STREAM")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,13 @@ public void shutdownNowKillsClientStream() {}
@Test
public void flowControlPushBack() {}

// FIXME
@Override
@Ignore("Servlet flow control not implemented yet")
@Test
public void flowControlDoesNotDeadlockLargeMessage() {
}

@Override
@Ignore("Server side sockets are managed by the servlet container")
@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,13 @@ public void clientCancelFromWithinMessageRead() {}
@Test
public void flowControlPushBack() {}

// FIXME
@Override
@Ignore("Servlet flow control not implemented yet")
@Test
public void flowControlDoesNotDeadlockLargeMessage() {
}

@Override
@Ignore("Server side sockets are managed by the servlet container")
@Test
Expand Down

0 comments on commit 09acf2f

Please sign in to comment.