Skip to content

Commit

Permalink
core: Free unused MessageProducer in RetriableStream
Browse files Browse the repository at this point in the history
This prevents leaking message buffers.

Fixes grpc#9563
  • Loading branch information
ejona86 committed Jan 18, 2023
1 parent 0f4b767 commit 39da7c3
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 0 deletions.
1 change: 1 addition & 0 deletions core/src/main/java/io/grpc/internal/RetriableStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -1099,6 +1099,7 @@ public void messagesAvailable(final MessageProducer producer) {
checkState(
savedState.winningSubstream != null, "Headers should be received prior to messages.");
if (savedState.winningSubstream != substream) {
GrpcUtil.closeQuietly(producer);
return;
}
listenerSerializeExecutor.execute(
Expand Down
41 changes: 41 additions & 0 deletions core/src/test/java/io/grpc/internal/RetriableStreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
import io.grpc.internal.StreamListener.MessageProducer;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -998,6 +1000,27 @@ public void messageAvailable() {
verify(masterListener).messagesAvailable(messageProducer);
}

@Test
public void inboundMessagesClosedOnCancel() throws Exception {
ClientStream mockStream1 = mock(ClientStream.class);
doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0);

retriableStream.start(masterListener);
retriableStream.request(1);
retriableStream.cancel(Status.CANCELLED.withDescription("on purpose"));

ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class);
verify(mockStream1).start(sublistenerCaptor1.capture());

ClientStreamListener listener = sublistenerCaptor1.getValue();
listener.headersRead(new Metadata());
InputStream is = mock(InputStream.class);
listener.messagesAvailable(new FakeMessageProducer(is));
verify(masterListener, never()).messagesAvailable(any(MessageProducer.class));
verify(is).close();
}

@Test
public void notAdd0PrevRetryAttemptsToRespHeaders() {
ClientStream mockStream1 = mock(ClientStream.class);
Expand Down Expand Up @@ -2786,4 +2809,22 @@ private interface RetriableStreamRecorder {

Status prestart();
}

private static final class FakeMessageProducer implements MessageProducer {
private final Iterator<InputStream> iterator;

public FakeMessageProducer(InputStream... iss) {
this.iterator = Arrays.asList(iss).iterator();
}

@Override
@Nullable
public InputStream next() {
if (iterator.hasNext()) {
return iterator.next();
} else {
return null;
}
}
}
}

0 comments on commit 39da7c3

Please sign in to comment.