From 62cec7314d6123637d09f3dd2d7a62c8781285a9 Mon Sep 17 00:00:00 2001 From: Idel Pivnitskiy Date: Tue, 4 Oct 2022 17:26:58 -0700 Subject: [PATCH 01/10] `WriteStreamSubscriber`: respect termination of the publisher Motivation: 1. Protocol can signal when the effective write of the message is complete and no more items are expected. In this case, `WriteStreamSubscriber` will do `request(MAX_VALUE)` to drain the remaining (likely empty) items from the publisher and terminate subscriber. However, premature termination of the completable subscriber breaks Reactive Streams control flow. Users of `connection.write(publisher)` can compose more operators on the returned `Completable` and they expect that the `Completable` terminates only after the passed `publisher` terminates too. The `publisher` can have more business logic after it emits all items. For example, service can read request payload body after it returns the full response. 2. If it waits for continuation but protocol signals it's not interested in more data, publisher won't know about it. Modifications: - On `channelOutboundClosed()`, to do attempt to terminate subscriber. Instead, mark the state as `SOURCE_OUTBOUND_CLOSED` and wait for termination of the publisher; - If `channelClosed(...)` was invoked after `channelOutboundClosed()`, do not fail the subscriber. Instead, wait for termination of the publisher; - When `terminateSource()` is invoked, cancel the current subscription to let the publisher know we are not interested in more items. For HTTP client it won't have any actual effect for request payload body because we use `Single.concat(Publisher)` with `deferSubscribe` option, but it's necessary to propagate signal for other use-cases; - Adjust tests for new behavior; - Increase test coverage for `WriteStreamSubscriber`; Result: Terminal signals correctly propagated through RS chain when users use `connection.write(publisher)`. --- .../netty/internal/WriteStreamSubscriber.java | 46 +++-- .../netty/internal/AbstractWriteTest.java | 30 +-- ...SslCloseNotifyAlertClientHandlingTest.java | 1 + .../internal/WriteStreamSubscriberTest.java | 186 ++++++++++++++---- 4 files changed, 203 insertions(+), 60 deletions(-) diff --git a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriber.java b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriber.java index 2aaed934e1..a14df7f524 100644 --- a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriber.java +++ b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriber.java @@ -85,6 +85,7 @@ final class WriteStreamSubscriber implements PublisherSource.Subscriber, private static final byte CHANNEL_CLOSED = 1 << 1; private static final byte CLOSE_OUTBOUND_ON_SUBSCRIBER_TERMINATION = 1 << 2; private static final byte SUBSCRIBER_TERMINATED = 1 << 3; + private static final byte SOURCE_OUTBOUND_CLOSED = 1 << 4; private static final byte SUBSCRIBER_OR_SOURCE_TERMINATED = SOURCE_TERMINATED | SUBSCRIBER_TERMINATED; private static final Subscription CANCELLED = newEmptySubscription(); private static final AtomicReferenceFieldUpdater subscriptionUpdater = @@ -177,8 +178,8 @@ void doWrite(Object msg) { demandEstimator.onItemWrite(msg, capacityBefore, capacityAfter); // Client-side always starts a request with request(1) to probe a Channel with meta-data before continuing // to write the payload body, see https://github.com/apple/servicetalk/pull/1644. - // Requests that await feedback from the remote peer should not request more until they receive - // continueWriting() signal. + // Requests that await feedback from the remote peer should not request more data from the publisher until + // they receive continueWriting() signal. if (!isClient || !(shouldWaitFlag = shouldWait.test(msg))) { requestMoreIfRequired(subscription, capacityAfter); } @@ -246,7 +247,7 @@ public void channelOutboundClosed() { // we may deadlock if we don't request enough onNext signals to see the terminal signal. sub.request(Long.MAX_VALUE); } - promise.sourceTerminated(null, true); + promise.outboundClosed(); } @Override @@ -254,7 +255,12 @@ public void terminateSource() { assert eventLoop.inEventLoop(); // Terminate the source only if it awaits continuation. if (shouldWaitFlag) { - assert promise.activeWrites == 0; // We never start sending payload body until we receive 100 (Continue) + assert promise.activeWrites == 0 : + channel + " Unexpected activeWrites=" + promise.activeWrites + " while waiting for continuation"; + // Cancel the passed write Publisher to signal transport is not interested in more data. + final Subscription sub = this.subscription; + assert sub != null : channel + " Unexpected subscription=null while waiting for continuation"; + sub.cancel(); promise.sourceTerminated(null, true); } } @@ -408,6 +414,8 @@ boolean isWritable() { void writeNext(Object msg) { assert eventLoop.inEventLoop(); + assert isWritable() : channel + " Unexpected writeNext: " + msg + " during non-writable state=" + + Integer.toString(state, 2); activeWrites++; listenersOnWriteBoundaries.addLast(WRITE_BOUNDARY); channel.write(msg, this); @@ -416,6 +424,16 @@ void writeNext(Object msg) { } } + void outboundClosed() { + assert eventLoop.inEventLoop(); + if (isAnySet(state, SUBSCRIBER_OR_SOURCE_TERMINATED)) { + // We have terminated prematurely perhaps due to write failure. + return; + } + state = set(state, SOURCE_OUTBOUND_CLOSED); // Assign a state to mark the promise as not writable. + markCancelled(); + } + void sourceTerminated(@Nullable Throwable cause, boolean markCancelled) { assert eventLoop.inEventLoop(); if (isAnySet(state, SUBSCRIBER_OR_SOURCE_TERMINATED)) { @@ -425,15 +443,10 @@ void sourceTerminated(@Nullable Throwable cause, boolean markCancelled) { this.failureCause = cause; state = set(state, SOURCE_TERMINATED); if (markCancelled) { - // When we know that the source is effectively terminated and won't emit any new items, mark the - // subscription as CANCELLED to prevent any further interactions with it, like propagating `cancel` from - // `channelClosed(Throwable)` or `request(MAX_VALUE)` from `channelOutboundClosed()`. At this point we - // always have a non-null subscription because this is reachable only if publisher emitted some signals. - WriteStreamSubscriber.this.subscription = CANCELLED; + markCancelled(); } if (activeWrites == 0) { try { - state = set(state, SUBSCRIBER_TERMINATED); terminateSubscriber(cause); } catch (Throwable t) { tryFailureOrLog(t); @@ -449,6 +462,14 @@ void sourceTerminated(@Nullable Throwable cause, boolean markCancelled) { } } + void markCancelled() { + // When we know that the source is effectively terminated and won't emit any new items, mark the + // subscription as CANCELLED to prevent any further interactions with it, like propagating `cancel` from + // `channelClosed(Throwable)` or `request(MAX_VALUE)` from `channelOutboundClosed()`. At this point we + // always have a non-null subscription because this is reachable only if publisher emitted some signals. + WriteStreamSubscriber.this.subscription = CANCELLED; + } + void close(Throwable cause, boolean closeOutboundIfIdle) { assert eventLoop.inEventLoop(); if (isAllSet(state, CHANNEL_CLOSED)) { @@ -461,7 +482,7 @@ void close(Throwable cause, boolean closeOutboundIfIdle) { // just close the channel now. closeHandler.closeChannelOutbound(channel); } - } else if (activeWrites > 0) { + } else if (activeWrites > 0 || isAllSet(state, SOURCE_OUTBOUND_CLOSED)) { // Writes are pending, we will close the channel once writes are done. state = set(state, CLOSE_OUTBOUND_ON_SUBSCRIBER_TERMINATION); } else { @@ -507,7 +528,6 @@ private boolean setSuccess0() { } observer.itemFlushed(); if (--activeWrites == 0 && isAllSet(state, SOURCE_TERMINATED)) { - state = set(state, SUBSCRIBER_TERMINATED); try { terminateSubscriber(failureCause); } catch (Throwable t) { @@ -531,7 +551,6 @@ private boolean setFailure0(Throwable cause) { if (isAllSet(state, SUBSCRIBER_TERMINATED)) { return nettySharedPromiseTryStatus(); } - state = set(state, SUBSCRIBER_TERMINATED); Subscription oldVal = subscriptionUpdater.getAndSet(WriteStreamSubscriber.this, CANCELLED); if (oldVal != null && !isAllSet(state, SOURCE_TERMINATED)) { oldVal.cancel(); @@ -554,6 +573,7 @@ private boolean nettySharedPromiseTryStatus() { } private void terminateSubscriber(@Nullable Throwable cause) { + state = set(state, SUBSCRIBER_TERMINATED); if (cause == null) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("{} Terminate subscriber, state: {}", channel, Integer.toString(state, 2)); diff --git a/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/AbstractWriteTest.java b/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/AbstractWriteTest.java index b0331c523b..77b43afaf7 100644 --- a/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/AbstractWriteTest.java +++ b/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/AbstractWriteTest.java @@ -22,7 +22,8 @@ import io.netty.channel.ChannelPromise; import io.netty.channel.embedded.EmbeddedChannel; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; + +import javax.annotation.Nullable; import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION; import static org.hamcrest.MatcherAssert.assertThat; @@ -36,18 +37,10 @@ public abstract class AbstractWriteTest { - protected EmbeddedChannel channel; - protected WriteDemandEstimator demandEstimator; - protected CompletableSource.Subscriber completableSubscriber; - protected FailingWriteHandler failingWriteHandler; - - @BeforeEach - public void setUp() throws Exception { - completableSubscriber = mock(CompletableSource.Subscriber.class); - failingWriteHandler = new FailingWriteHandler(); - channel = new EmbeddedChannel(failingWriteHandler); - demandEstimator = mock(WriteDemandEstimator.class); - } + protected final WriteDemandEstimator demandEstimator = mock(WriteDemandEstimator.class); + protected final CompletableSource.Subscriber completableSubscriber = mock(CompletableSource.Subscriber.class); + protected final FailingWriteHandler failingWriteHandler = new FailingWriteHandler(); + protected final EmbeddedChannel channel = new EmbeddedChannel(failingWriteHandler); @AfterEach public void tearDown() throws Exception { @@ -59,6 +52,9 @@ protected void verifyWriteSuccessful(String... items) { channel.flushOutbound(); if (items.length > 0) { assertThat("Message not written.", channel.outboundMessages(), contains((String[]) items)); + for (int i = 0; i < items.length; ++i) { + channel.readOutbound(); // discard written items + } } else { assertThat("Unexpected message(s) written.", channel.outboundMessages(), is(empty())); } @@ -71,6 +67,14 @@ protected void verifyListenerSuccessful() { verifyNoMoreInteractions(completableSubscriber); } + protected void verifyListenerFailed(@Nullable Throwable t) { + channel.flushOutbound(); + assertThat("Unexpected message(s) written.", channel.outboundMessages(), is(empty())); + verify(completableSubscriber).onSubscribe(any()); + verify(completableSubscriber).onError(t != null ? t : any()); + verifyNoMoreInteractions(completableSubscriber); + } + static final class FailingWriteHandler extends ChannelDuplexHandler { private volatile boolean failNextWritePromise; private volatile boolean throwFromNextWrite; diff --git a/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/SslCloseNotifyAlertClientHandlingTest.java b/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/SslCloseNotifyAlertClientHandlingTest.java index d509cfab51..fac3296f66 100644 --- a/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/SslCloseNotifyAlertClientHandlingTest.java +++ b/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/SslCloseNotifyAlertClientHandlingTest.java @@ -116,6 +116,7 @@ private void sendRequest() { .then(() -> { writeMsg(writeSource, BEGIN); writeMsg(writeSource, END); + writeSource.onComplete(); }) .expectComplete() .verify(); diff --git a/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriberTest.java b/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriberTest.java index 461f6547a8..b7ebad3244 100644 --- a/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriberTest.java +++ b/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriberTest.java @@ -20,12 +20,15 @@ import io.servicetalk.transport.netty.internal.WriteStreamSubscriber.AbortedFirstWriteException; import io.netty.channel.Channel; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.ArgumentCaptor; +import java.nio.channels.ClosedChannelException; + import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION; -import static io.servicetalk.transport.netty.internal.CloseHandler.UNSUPPORTED_PROTOCOL_CLOSE_HANDLER; +import static java.lang.Long.MAX_VALUE; import static java.util.function.UnaryOperator.identity; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; @@ -35,9 +38,9 @@ import static org.mockito.ArgumentCaptor.forClass; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; -import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; @@ -46,24 +49,20 @@ class WriteStreamSubscriberTest extends AbstractWriteTest { + private final Subscription subscription = mock(Subscription.class); + private final CloseHandler closeHandler = mock(CloseHandler.class); private WriteStreamSubscriber subscriber; - private Subscription subscription; - private CloseHandler closeHandler; - - @BeforeEach - @Override - public void setUp() throws Exception { - super.setUp(); - closeHandler = mock(CloseHandler.class); + + void setUp(boolean isClient, boolean shouldWait) { subscriber = new WriteStreamSubscriber(channel, demandEstimator, completableSubscriber, closeHandler, - NoopWriteObserver.INSTANCE, identity(), false, __ -> false); - subscription = mock(Subscription.class); + NoopWriteObserver.INSTANCE, identity(), isClient, __ -> shouldWait); when(demandEstimator.estimateRequestN(anyLong())).thenReturn(1L); subscriber.onSubscribe(subscription); } @Test void testSingleItem() { + setUp(false, false); WriteInfo info = writeAndFlush("Hello"); subscriber.onComplete(); verifyListenerSuccessful(); @@ -74,6 +73,7 @@ void testSingleItem() { @Test void testMultipleItem() { + setUp(false, false); WriteInfo info1 = writeAndFlush("Hello1"); WriteInfo info2 = writeAndFlush("Hello2"); WriteInfo info3 = writeAndFlush("Hello3"); @@ -84,8 +84,41 @@ void testMultipleItem() { verifyNoInteractions(closeHandler); } + @ParameterizedTest(name = "{displayName} [{index}] error={0}") + @ValueSource(booleans = {false, true}) + void testMultipleItemThenOutboundClosed(boolean error) { + setUp(false, false); + verify(subscription).request(anyLong()); + WriteInfo info1 = write("Hello1"); + WriteInfo info2 = write("Hello2"); + WriteInfo info3 = write("Hello3"); + + subscriber.channelOutboundClosed(); + verify(subscription).request(eq(MAX_VALUE)); + verify(completableSubscriber).onSubscribe(any()); + verifyNoMoreInteractions(completableSubscriber); + + verifyWriteSuccessful("Hello1", "Hello2", "Hello3"); + verifyWrite(info1, info2, info3); + + // Write after channelOutboundClosed should be discarded + writeAndFlush("Hello4"); + assertThat("Unexpected message(s) written.", channel.outboundMessages(), is(empty())); + + verifyNoMoreInteractions(completableSubscriber); + if (error) { + subscriber.onError(DELIBERATE_EXCEPTION); + verify(completableSubscriber).onError(DELIBERATE_EXCEPTION); + } else { + subscriber.onComplete(); + verify(completableSubscriber).onComplete(); + } + verifyNoMoreInteractions(completableSubscriber, subscription, closeHandler); + } + @Test void testOnErrorNoWrite() throws InterruptedException { + setUp(false, false); subscriber.onError(DELIBERATE_EXCEPTION); ArgumentCaptor exceptionCaptor = forClass(Throwable.class); verify(this.completableSubscriber).onError(exceptionCaptor.capture()); @@ -96,6 +129,7 @@ void testOnErrorNoWrite() throws InterruptedException { @Test void testOnCompleteNoWrite() { + setUp(false, false); subscriber.onComplete(); verify(this.completableSubscriber).onComplete(); verifyNoInteractions(closeHandler); @@ -103,6 +137,7 @@ void testOnCompleteNoWrite() { @Test void testOnErrorPostWrite() throws InterruptedException { + setUp(false, false); writeAndFlush("Hello"); channel.flushOutbound(); subscriber.onError(DELIBERATE_EXCEPTION); @@ -113,9 +148,9 @@ void testOnErrorPostWrite() throws InterruptedException { @Test void testCancelBeforeOnSubscribe() { - subscriber = new WriteStreamSubscriber(channel, demandEstimator, completableSubscriber, - UNSUPPORTED_PROTOCOL_CLOSE_HANDLER, NoopWriteObserver.INSTANCE, identity(), false, __ -> false); - subscription = mock(Subscription.class); + subscriber = new WriteStreamSubscriber(channel, demandEstimator, completableSubscriber, closeHandler, + NoopWriteObserver.INSTANCE, identity(), false, __ -> false); + when(demandEstimator.estimateRequestN(anyLong())).thenReturn(1L); subscriber.cancel(); subscriber.onSubscribe(subscription); verify(subscription).cancel(); @@ -124,6 +159,7 @@ void testCancelBeforeOnSubscribe() { @Test void testCancelAfterOnSubscribe() { + setUp(false, false); subscriber.cancel(); verify(subscription).cancel(); verifyNoInteractions(closeHandler); @@ -131,11 +167,10 @@ void testCancelAfterOnSubscribe() { @Test void testRequestMoreBeforeOnSubscribe() { - reset(completableSubscriber); - subscriber = new WriteStreamSubscriber(channel, demandEstimator, completableSubscriber, - UNSUPPORTED_PROTOCOL_CLOSE_HANDLER, NoopWriteObserver.INSTANCE, identity(), false, __ -> false); + subscriber = new WriteStreamSubscriber(channel, demandEstimator, completableSubscriber, closeHandler, + NoopWriteObserver.INSTANCE, identity(), false, __ -> false); + when(demandEstimator.estimateRequestN(anyLong())).thenReturn(1L); subscriber.channelWritable(); - subscription = mock(Subscription.class); subscriber.onSubscribe(subscription); WriteInfo info = writeAndFlush("Hello"); subscriber.onComplete(); @@ -147,38 +182,61 @@ void testRequestMoreBeforeOnSubscribe() { @Test void writeFailureClosesChannel() throws Exception { - failingWriteClosesChannel(() -> failingWriteHandler.failNextWritePromise()); + setUp(false, false); + failingWriteClosesChannel(failingWriteHandler::failNextWritePromise); } @Test void uncaughtWriteExceptionClosesChannel() throws Exception { - failingWriteClosesChannel(() -> failingWriteHandler.throwFromNextWrite()); + setUp(false, false); + failingWriteClosesChannel(failingWriteHandler::throwFromNextWrite); } @Test void onNextAfterChannelClose() { + setUp(false, false); subscriber.channelClosed(DELIBERATE_EXCEPTION); + verify(subscription).cancel(); subscriber.onNext("Hello"); - channel.flushOutbound(); + verifyListenerFailed(null); assertThat("Unexpected message(s) written.", channel.outboundMessages(), is(empty())); } @Test - void onNextAfterChannelCloseWhileActiveWritesPending() { + void secondOnNextAfterChannelClose() { + setUp(false, false); subscriber.onNext("Hello"); + verifyWriteSuccessful("Hello"); subscriber.channelClosed(DELIBERATE_EXCEPTION); + verify(subscription).cancel(); + subscriber.onNext("Hello2"); + verifyListenerFailed(DELIBERATE_EXCEPTION); + } + + @Test + void closeChannelDuringFirstWrite() { + setUp(false, false); + subscriber.onNext("Hello"); + channel.close(); // fails the first write with StacklessClosedChannelException + subscriber.channelClosed(// simulate channelInactive event + StacklessClosedChannelException.newInstance(WriteStreamSubscriberTest.class, "channel.close()")); + verify(subscription).cancel(); + subscriber.onNext("Hello2"); + channel.runPendingTasks(); + channel.flush(); assertThat("Unexpected message(s) written.", channel.outboundMessages(), is(empty())); - verifyWriteSuccessful("Hello"); + + verify(completableSubscriber).onSubscribe(any()); + verify(completableSubscriber).onError(any(ClosedChannelException.class)); + verifyNoMoreInteractions(completableSubscriber); } @Test void clientRequestsOne() { - reset(completableSubscriber, demandEstimator); when(demandEstimator.estimateRequestN(anyLong())).thenReturn(10L); - subscriber = new WriteStreamSubscriber(channel, demandEstimator, completableSubscriber, - UNSUPPORTED_PROTOCOL_CLOSE_HANDLER, NoopWriteObserver.INSTANCE, identity(), true, __ -> false); - subscription = mock(Subscription.class); + subscriber = new WriteStreamSubscriber(channel, demandEstimator, completableSubscriber, closeHandler, + NoopWriteObserver.INSTANCE, identity(), true, __ -> false); subscriber.onSubscribe(subscription); verify(subscription).request(1L); verify(demandEstimator, never()).estimateRequestN(anyLong()); @@ -194,23 +252,73 @@ void clientRequestsOne() { @Test void channelClosedDoesNotCancelAfterOutboundEnd() throws Exception { + setUp(false, false); + verify(completableSubscriber).onSubscribe(any()); WriteInfo info = writeAndFlush("Hello"); + verify(subscription, times(2)).request(anyLong()); + verifyNoMoreInteractions(completableSubscriber, subscription); + subscriber.channelOutboundClosed(); + verify(subscription).request(MAX_VALUE); - verifyListenerSuccessful(); verifyWriteSuccessful("Hello"); verifyWrite(info); + verifyNoMoreInteractions(completableSubscriber, subscription); channel.finishAndReleaseAll(); subscriber.channelClosed(DELIBERATE_EXCEPTION); // simulate channelInactive event from DefaultNettyConnection - verify(closeHandler).closeChannelOutbound(any(Channel.class)); + // channelClosed after channelOutboundClosed should not fail subscriber + verifyNoMoreInteractions(completableSubscriber, closeHandler); subscriber.onComplete(); // signal completion after "channelInactive" - verify(subscription, never()).cancel(); - verifyNoMoreInteractions(closeHandler); + verify(completableSubscriber).onComplete(); + verify(closeHandler).closeChannelOutbound(any(Channel.class)); + verifyNoMoreInteractions(completableSubscriber, subscription, closeHandler); assertChannelClose(); } + @Test + void clientContinueWriting() { + when(demandEstimator.estimateRequestN(anyLong())).thenReturn(10L); + subscriber = new WriteStreamSubscriber(channel, demandEstimator, completableSubscriber, closeHandler, + NoopWriteObserver.INSTANCE, identity(), true, __ -> true); + subscriber.onSubscribe(subscription); + + verify(subscription).request(1L); + writeAndFlush("Hello1"); + verifyNoMoreInteractions(subscription); + + subscriber.continueWriting(); + verify(subscription).request(10L); + writeAndFlush("Hello2"); + + verifyWriteSuccessful("Hello1", "Hello2"); + subscriber.onComplete(); + verifyListenerSuccessful(); + verifyNoMoreInteractions(subscription, closeHandler); + } + + @Test + void clientTerminateSource() { + when(demandEstimator.estimateRequestN(anyLong())).thenReturn(10L); + subscriber = new WriteStreamSubscriber(channel, demandEstimator, completableSubscriber, closeHandler, + NoopWriteObserver.INSTANCE, identity(), true, __ -> true); + subscriber.onSubscribe(subscription); + + verify(subscription).request(1L); + writeAndFlush("Hello"); + verifyNoMoreInteractions(subscription); + + subscriber.terminateSource(); + verify(subscription).cancel(); + + verifyListenerSuccessful(); + verifyWriteSuccessful("Hello"); + + subscriber.onComplete(); + verifyNoMoreInteractions(subscription, closeHandler, completableSubscriber); + } + private void failingWriteClosesChannel(Runnable enableWriteFailure) throws InterruptedException { WriteInfo info1 = writeAndFlush("Hello1"); verify(completableSubscriber).onSubscribe(any()); @@ -231,11 +339,21 @@ private void verifyWrite(WriteInfo... infos) { verify(subscription, times(infos.length + 1)).request(1); } + private WriteInfo write(String msg) { + return write(msg, false); + } + private WriteInfo writeAndFlush(String msg) { + return write(msg, true); + } + + private WriteInfo write(String msg, boolean shouldFlush) { long pre = channel.bytesBeforeUnwritable(); subscriber.onNext(msg); long post = channel.bytesBeforeUnwritable(); - channel.flushOutbound(); + if (shouldFlush) { + channel.flushOutbound(); + } return new WriteInfo(pre, post, msg); } From 88ad0dc77daf1816ae40195bbaffe231b3dce73a Mon Sep 17 00:00:00 2001 From: Idel Pivnitskiy Date: Wed, 5 Oct 2022 09:56:31 -0700 Subject: [PATCH 02/10] trigger CI 1 --- .../transport/netty/internal/WriteStreamSubscriber.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriber.java b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriber.java index a14df7f524..aa2701fd6d 100644 --- a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriber.java +++ b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriber.java @@ -414,7 +414,7 @@ boolean isWritable() { void writeNext(Object msg) { assert eventLoop.inEventLoop(); - assert isWritable() : channel + " Unexpected writeNext: " + msg + " during non-writable state=" + + assert isWritable() : channel + " Unexpected -writeNext: " + msg + " during non-writable state=" + Integer.toString(state, 2); activeWrites++; listenersOnWriteBoundaries.addLast(WRITE_BOUNDARY); From b95a99394e8ce6089723d42a1784ba57d070c27d Mon Sep 17 00:00:00 2001 From: Idel Pivnitskiy Date: Wed, 5 Oct 2022 09:56:39 -0700 Subject: [PATCH 03/10] trigger CI 2 --- .../transport/netty/internal/WriteStreamSubscriber.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriber.java b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriber.java index aa2701fd6d..6336c30556 100644 --- a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriber.java +++ b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriber.java @@ -414,7 +414,7 @@ boolean isWritable() { void writeNext(Object msg) { assert eventLoop.inEventLoop(); - assert isWritable() : channel + " Unexpected -writeNext: " + msg + " during non-writable state=" + + assert isWritable() : channel + " Unexpected --writeNext: " + msg + " during non-writable state=" + Integer.toString(state, 2); activeWrites++; listenersOnWriteBoundaries.addLast(WRITE_BOUNDARY); From dcfa13f911569cc6adc1e5d59e4f535f1ab1777e Mon Sep 17 00:00:00 2001 From: Idel Pivnitskiy Date: Wed, 5 Oct 2022 09:56:46 -0700 Subject: [PATCH 04/10] trigger CI 3 --- .../transport/netty/internal/WriteStreamSubscriber.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriber.java b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriber.java index 6336c30556..aa2701fd6d 100644 --- a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriber.java +++ b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriber.java @@ -414,7 +414,7 @@ boolean isWritable() { void writeNext(Object msg) { assert eventLoop.inEventLoop(); - assert isWritable() : channel + " Unexpected --writeNext: " + msg + " during non-writable state=" + + assert isWritable() : channel + " Unexpected -writeNext: " + msg + " during non-writable state=" + Integer.toString(state, 2); activeWrites++; listenersOnWriteBoundaries.addLast(WRITE_BOUNDARY); From 303ae71c6d3a1118761f6514a9a0b0a025a2c1ef Mon Sep 17 00:00:00 2001 From: Idel Pivnitskiy Date: Wed, 5 Oct 2022 09:56:57 -0700 Subject: [PATCH 05/10] trigger CI 4 --- .../transport/netty/internal/WriteStreamSubscriber.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriber.java b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriber.java index aa2701fd6d..a14df7f524 100644 --- a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriber.java +++ b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriber.java @@ -414,7 +414,7 @@ boolean isWritable() { void writeNext(Object msg) { assert eventLoop.inEventLoop(); - assert isWritable() : channel + " Unexpected -writeNext: " + msg + " during non-writable state=" + + assert isWritable() : channel + " Unexpected writeNext: " + msg + " during non-writable state=" + Integer.toString(state, 2); activeWrites++; listenersOnWriteBoundaries.addLast(WRITE_BOUNDARY); From 54add10f56c1e6c380f846fc7d1dcebc6216fb26 Mon Sep 17 00:00:00 2001 From: Idel Pivnitskiy Date: Wed, 5 Oct 2022 10:57:34 -0700 Subject: [PATCH 06/10] Verify behavior for `writeSource.onComplete()` in SslCloseNotifyAlertServerHandlingTest --- ...SslCloseNotifyAlertServerHandlingTest.java | 47 +++++++++++++++++-- 1 file changed, 42 insertions(+), 5 deletions(-) diff --git a/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/SslCloseNotifyAlertServerHandlingTest.java b/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/SslCloseNotifyAlertServerHandlingTest.java index f9e013394e..2a41ad671c 100644 --- a/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/SslCloseNotifyAlertServerHandlingTest.java +++ b/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/SslCloseNotifyAlertServerHandlingTest.java @@ -44,8 +44,24 @@ void afterExchangeIdleConnection() { .then(() -> { writeMsg(writeSource, BEGIN); writeMsg(writeSource, END); + writeSource.onComplete(); closeNotifyAndVerifyClosing(); }) + .expectComplete() + .verify(); + } + + @Test + void afterRequestAndWritingResponseButBeforeCompletingWrite() { + receiveRequest(); + PublisherSource.Processor writeSource = newPublisherProcessor(); + StepVerifiers.create(conn.write(fromSource(writeSource))) + .then(() -> { + writeMsg(writeSource, BEGIN); + writeMsg(writeSource, END); + closeNotifyAndVerifyClosing(); + writeSource.onComplete(); + }) .expectErrorConsumed(cause -> { assertThat("Unexpected write failure cause", cause, instanceOf(CloseEventObservedException.class)); CloseEventObservedException ceoe = (CloseEventObservedException) cause; @@ -55,7 +71,7 @@ void afterExchangeIdleConnection() { } @Test - void afterRequestBeforeSendingResponse() { + void afterRequestBeforeWritingResponse() { receiveRequest(); PublisherSource.Processor writeSource = newPublisherProcessor(); @@ -66,7 +82,7 @@ void afterRequestBeforeSendingResponse() { } @Test - void afterRequestWhileSendingResponse() { + void afterRequestWhileWritingResponse() { receiveRequest(); PublisherSource.Processor writeSource = newPublisherProcessor(); @@ -80,7 +96,7 @@ void afterRequestWhileSendingResponse() { } @Test - void whileReadingRequestBeforeSendingResponse() { + void whileReadingRequestBeforeWritingResponse() { StepVerifiers.create(conn.write(fromSource(newPublisherProcessor())).merge(conn.read())) .then(() -> { // Start reading request @@ -93,7 +109,7 @@ void whileReadingRequestBeforeSendingResponse() { } @Test - void whileReadingRequestAndSendingResponse() { + void whileReadingRequestAndWritingResponse() { PublisherSource.Processor writeSource = newPublisherProcessor(); StepVerifiers.create(conn.write(fromSource(writeSource)).merge(conn.read())) .then(() -> { @@ -109,7 +125,7 @@ void whileReadingRequestAndSendingResponse() { } @Test - void whileReadingRequestAfterSendingResponse() { + void whileReadingRequestAfterWritingResponse() { PublisherSource.Processor writeSource = newPublisherProcessor(); StepVerifiers.create(conn.write(fromSource(writeSource)).merge(conn.read())) .then(() -> { @@ -118,6 +134,7 @@ void whileReadingRequestAfterSendingResponse() { // Send response writeMsg(writeSource, BEGIN); writeMsg(writeSource, END); + writeSource.onComplete(); }) .expectNext(BEGIN) .then(this::closeNotifyAndVerifyClosing) @@ -125,6 +142,26 @@ void whileReadingRequestAfterSendingResponse() { .verify(); } + @Test + void whileReadingRequestAfterWritingResponseButBeforeCompletingWrite() { + PublisherSource.Processor writeSource = newPublisherProcessor(); + StepVerifiers.create(conn.write(fromSource(writeSource)).merge(conn.read())) + .then(() -> { + // Start reading request + channel.writeInbound(BEGIN); + // Send response + writeMsg(writeSource, BEGIN); + writeMsg(writeSource, END); + }) + .expectNext(BEGIN) + .then(() -> { + closeNotifyAndVerifyClosing(); + writeSource.onComplete(); + }) + .expectError(ClosedChannelException.class) + .verify(); + } + private void receiveRequest() { StepVerifiers.create(conn.read()) .then(() -> channel.writeInbound(BEGIN)) From 98997718f63176e31767153447e97fef86e2c802 Mon Sep 17 00:00:00 2001 From: Idel Pivnitskiy Date: Wed, 5 Oct 2022 10:58:58 -0700 Subject: [PATCH 07/10] trigger CI 1 --- .../transport/netty/internal/WriteStreamSubscriber.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriber.java b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriber.java index a14df7f524..aa2701fd6d 100644 --- a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriber.java +++ b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriber.java @@ -414,7 +414,7 @@ boolean isWritable() { void writeNext(Object msg) { assert eventLoop.inEventLoop(); - assert isWritable() : channel + " Unexpected writeNext: " + msg + " during non-writable state=" + + assert isWritable() : channel + " Unexpected -writeNext: " + msg + " during non-writable state=" + Integer.toString(state, 2); activeWrites++; listenersOnWriteBoundaries.addLast(WRITE_BOUNDARY); From 6d929231358703e20b6dbc50924b1e720b882115 Mon Sep 17 00:00:00 2001 From: Idel Pivnitskiy Date: Wed, 5 Oct 2022 10:59:16 -0700 Subject: [PATCH 08/10] trigger CI 2 --- .../transport/netty/internal/WriteStreamSubscriber.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriber.java b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriber.java index aa2701fd6d..6336c30556 100644 --- a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriber.java +++ b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriber.java @@ -414,7 +414,7 @@ boolean isWritable() { void writeNext(Object msg) { assert eventLoop.inEventLoop(); - assert isWritable() : channel + " Unexpected -writeNext: " + msg + " during non-writable state=" + + assert isWritable() : channel + " Unexpected --writeNext: " + msg + " during non-writable state=" + Integer.toString(state, 2); activeWrites++; listenersOnWriteBoundaries.addLast(WRITE_BOUNDARY); From b27a07a004ab0df829026737fdbdb9ff642c8a43 Mon Sep 17 00:00:00 2001 From: Idel Pivnitskiy Date: Wed, 5 Oct 2022 10:59:33 -0700 Subject: [PATCH 09/10] trigger CI 3 --- .../transport/netty/internal/WriteStreamSubscriber.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriber.java b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriber.java index 6336c30556..aa2701fd6d 100644 --- a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriber.java +++ b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriber.java @@ -414,7 +414,7 @@ boolean isWritable() { void writeNext(Object msg) { assert eventLoop.inEventLoop(); - assert isWritable() : channel + " Unexpected --writeNext: " + msg + " during non-writable state=" + + assert isWritable() : channel + " Unexpected -writeNext: " + msg + " during non-writable state=" + Integer.toString(state, 2); activeWrites++; listenersOnWriteBoundaries.addLast(WRITE_BOUNDARY); From 5269b846971c1e22bf126527e971d424b42ace6c Mon Sep 17 00:00:00 2001 From: Idel Pivnitskiy Date: Wed, 5 Oct 2022 10:59:50 -0700 Subject: [PATCH 10/10] trigger CI 4 --- .../transport/netty/internal/WriteStreamSubscriber.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriber.java b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriber.java index aa2701fd6d..a14df7f524 100644 --- a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriber.java +++ b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriber.java @@ -414,7 +414,7 @@ boolean isWritable() { void writeNext(Object msg) { assert eventLoop.inEventLoop(); - assert isWritable() : channel + " Unexpected -writeNext: " + msg + " during non-writable state=" + + assert isWritable() : channel + " Unexpected writeNext: " + msg + " during non-writable state=" + Integer.toString(state, 2); activeWrites++; listenersOnWriteBoundaries.addLast(WRITE_BOUNDARY);