Skip to content

Commit

Permalink
core: Free unused MessageProducer in RetriableStream
Browse files Browse the repository at this point in the history
This prevents leaking message buffers.

Fixes grpc#9563
  • Loading branch information
ejona86 committed Jan 20, 2023
1 parent bb55f58 commit eed23fe
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 0 deletions.
1 change: 1 addition & 0 deletions core/src/main/java/io/grpc/internal/RetriableStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -1060,6 +1060,7 @@ public void messagesAvailable(final MessageProducer producer) {
checkState(
savedState.winningSubstream != null, "Headers should be received prior to messages.");
if (savedState.winningSubstream != substream) {
GrpcUtil.closeQuietly(producer);
return;
}
listenerSerializeExecutor.execute(
Expand Down
41 changes: 41 additions & 0 deletions core/src/test/java/io/grpc/internal/RetriableStreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@
import io.grpc.internal.StreamListener.MessageProducer;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -991,6 +993,27 @@ public void messageAvailable() {
verify(masterListener).messagesAvailable(messageProducer);
}

@Test
public void inboundMessagesClosedOnCancel() throws Exception {
ClientStream mockStream1 = mock(ClientStream.class);
doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0);

retriableStream.start(masterListener);
retriableStream.request(1);
retriableStream.cancel(Status.CANCELLED.withDescription("on purpose"));

ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class);
verify(mockStream1).start(sublistenerCaptor1.capture());

ClientStreamListener listener = sublistenerCaptor1.getValue();
listener.headersRead(new Metadata());
InputStream is = mock(InputStream.class);
listener.messagesAvailable(new FakeMessageProducer(is));
verify(masterListener, never()).messagesAvailable(any(MessageProducer.class));
verify(is).close();
}

@Test
public void closedWhileDraining() {
ClientStream mockStream1 = mock(ClientStream.class);
Expand Down Expand Up @@ -2718,4 +2741,22 @@ private interface RetriableStreamRecorder {

Status prestart();
}

private static final class FakeMessageProducer implements MessageProducer {
private final Iterator<InputStream> iterator;

public FakeMessageProducer(InputStream... iss) {
this.iterator = Arrays.asList(iss).iterator();
}

@Override
@Nullable
public InputStream next() {
if (iterator.hasNext()) {
return iterator.next();
} else {
return null;
}
}
}
}

0 comments on commit eed23fe

Please sign in to comment.