From 7fb6875bafe6437afb268b7449630f9609ac37a9 Mon Sep 17 00:00:00 2001 From: Idel Pivnitskiy Date: Thu, 13 Oct 2022 16:47:29 -0700 Subject: [PATCH] Revert "Revert "`WriteStreamSubscriber`: respect termination of the publisher (#2387)"" This reverts commit 1a33f6cfe8f3f884f78420abb80333d2ce771f95. --- .../netty/internal/WriteStreamSubscriber.java | 46 +++-- .../netty/internal/AbstractWriteTest.java | 30 +-- ...SslCloseNotifyAlertClientHandlingTest.java | 1 + ...SslCloseNotifyAlertServerHandlingTest.java | 47 ++++- .../internal/WriteStreamSubscriberTest.java | 186 ++++++++++++++---- 5 files changed, 245 insertions(+), 65 deletions(-) diff --git a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriber.java b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriber.java index 2aaed934e1..a14df7f524 100644 --- a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriber.java +++ b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriber.java @@ -85,6 +85,7 @@ final class WriteStreamSubscriber implements PublisherSource.Subscriber, private static final byte CHANNEL_CLOSED = 1 << 1; private static final byte CLOSE_OUTBOUND_ON_SUBSCRIBER_TERMINATION = 1 << 2; private static final byte SUBSCRIBER_TERMINATED = 1 << 3; + private static final byte SOURCE_OUTBOUND_CLOSED = 1 << 4; private static final byte SUBSCRIBER_OR_SOURCE_TERMINATED = SOURCE_TERMINATED | SUBSCRIBER_TERMINATED; private static final Subscription CANCELLED = newEmptySubscription(); private static final AtomicReferenceFieldUpdater subscriptionUpdater = @@ -177,8 +178,8 @@ void doWrite(Object msg) { demandEstimator.onItemWrite(msg, capacityBefore, capacityAfter); // Client-side always starts a request with request(1) to probe a Channel with meta-data before continuing // to write the payload body, see https://github.com/apple/servicetalk/pull/1644. - // Requests that await feedback from the remote peer should not request more until they receive - // continueWriting() signal. + // Requests that await feedback from the remote peer should not request more data from the publisher until + // they receive continueWriting() signal. if (!isClient || !(shouldWaitFlag = shouldWait.test(msg))) { requestMoreIfRequired(subscription, capacityAfter); } @@ -246,7 +247,7 @@ public void channelOutboundClosed() { // we may deadlock if we don't request enough onNext signals to see the terminal signal. sub.request(Long.MAX_VALUE); } - promise.sourceTerminated(null, true); + promise.outboundClosed(); } @Override @@ -254,7 +255,12 @@ public void terminateSource() { assert eventLoop.inEventLoop(); // Terminate the source only if it awaits continuation. if (shouldWaitFlag) { - assert promise.activeWrites == 0; // We never start sending payload body until we receive 100 (Continue) + assert promise.activeWrites == 0 : + channel + " Unexpected activeWrites=" + promise.activeWrites + " while waiting for continuation"; + // Cancel the passed write Publisher to signal transport is not interested in more data. + final Subscription sub = this.subscription; + assert sub != null : channel + " Unexpected subscription=null while waiting for continuation"; + sub.cancel(); promise.sourceTerminated(null, true); } } @@ -408,6 +414,8 @@ boolean isWritable() { void writeNext(Object msg) { assert eventLoop.inEventLoop(); + assert isWritable() : channel + " Unexpected writeNext: " + msg + " during non-writable state=" + + Integer.toString(state, 2); activeWrites++; listenersOnWriteBoundaries.addLast(WRITE_BOUNDARY); channel.write(msg, this); @@ -416,6 +424,16 @@ void writeNext(Object msg) { } } + void outboundClosed() { + assert eventLoop.inEventLoop(); + if (isAnySet(state, SUBSCRIBER_OR_SOURCE_TERMINATED)) { + // We have terminated prematurely perhaps due to write failure. + return; + } + state = set(state, SOURCE_OUTBOUND_CLOSED); // Assign a state to mark the promise as not writable. + markCancelled(); + } + void sourceTerminated(@Nullable Throwable cause, boolean markCancelled) { assert eventLoop.inEventLoop(); if (isAnySet(state, SUBSCRIBER_OR_SOURCE_TERMINATED)) { @@ -425,15 +443,10 @@ void sourceTerminated(@Nullable Throwable cause, boolean markCancelled) { this.failureCause = cause; state = set(state, SOURCE_TERMINATED); if (markCancelled) { - // When we know that the source is effectively terminated and won't emit any new items, mark the - // subscription as CANCELLED to prevent any further interactions with it, like propagating `cancel` from - // `channelClosed(Throwable)` or `request(MAX_VALUE)` from `channelOutboundClosed()`. At this point we - // always have a non-null subscription because this is reachable only if publisher emitted some signals. - WriteStreamSubscriber.this.subscription = CANCELLED; + markCancelled(); } if (activeWrites == 0) { try { - state = set(state, SUBSCRIBER_TERMINATED); terminateSubscriber(cause); } catch (Throwable t) { tryFailureOrLog(t); @@ -449,6 +462,14 @@ void sourceTerminated(@Nullable Throwable cause, boolean markCancelled) { } } + void markCancelled() { + // When we know that the source is effectively terminated and won't emit any new items, mark the + // subscription as CANCELLED to prevent any further interactions with it, like propagating `cancel` from + // `channelClosed(Throwable)` or `request(MAX_VALUE)` from `channelOutboundClosed()`. At this point we + // always have a non-null subscription because this is reachable only if publisher emitted some signals. + WriteStreamSubscriber.this.subscription = CANCELLED; + } + void close(Throwable cause, boolean closeOutboundIfIdle) { assert eventLoop.inEventLoop(); if (isAllSet(state, CHANNEL_CLOSED)) { @@ -461,7 +482,7 @@ void close(Throwable cause, boolean closeOutboundIfIdle) { // just close the channel now. closeHandler.closeChannelOutbound(channel); } - } else if (activeWrites > 0) { + } else if (activeWrites > 0 || isAllSet(state, SOURCE_OUTBOUND_CLOSED)) { // Writes are pending, we will close the channel once writes are done. state = set(state, CLOSE_OUTBOUND_ON_SUBSCRIBER_TERMINATION); } else { @@ -507,7 +528,6 @@ private boolean setSuccess0() { } observer.itemFlushed(); if (--activeWrites == 0 && isAllSet(state, SOURCE_TERMINATED)) { - state = set(state, SUBSCRIBER_TERMINATED); try { terminateSubscriber(failureCause); } catch (Throwable t) { @@ -531,7 +551,6 @@ private boolean setFailure0(Throwable cause) { if (isAllSet(state, SUBSCRIBER_TERMINATED)) { return nettySharedPromiseTryStatus(); } - state = set(state, SUBSCRIBER_TERMINATED); Subscription oldVal = subscriptionUpdater.getAndSet(WriteStreamSubscriber.this, CANCELLED); if (oldVal != null && !isAllSet(state, SOURCE_TERMINATED)) { oldVal.cancel(); @@ -554,6 +573,7 @@ private boolean nettySharedPromiseTryStatus() { } private void terminateSubscriber(@Nullable Throwable cause) { + state = set(state, SUBSCRIBER_TERMINATED); if (cause == null) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("{} Terminate subscriber, state: {}", channel, Integer.toString(state, 2)); diff --git a/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/AbstractWriteTest.java b/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/AbstractWriteTest.java index b0331c523b..77b43afaf7 100644 --- a/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/AbstractWriteTest.java +++ b/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/AbstractWriteTest.java @@ -22,7 +22,8 @@ import io.netty.channel.ChannelPromise; import io.netty.channel.embedded.EmbeddedChannel; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; + +import javax.annotation.Nullable; import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION; import static org.hamcrest.MatcherAssert.assertThat; @@ -36,18 +37,10 @@ public abstract class AbstractWriteTest { - protected EmbeddedChannel channel; - protected WriteDemandEstimator demandEstimator; - protected CompletableSource.Subscriber completableSubscriber; - protected FailingWriteHandler failingWriteHandler; - - @BeforeEach - public void setUp() throws Exception { - completableSubscriber = mock(CompletableSource.Subscriber.class); - failingWriteHandler = new FailingWriteHandler(); - channel = new EmbeddedChannel(failingWriteHandler); - demandEstimator = mock(WriteDemandEstimator.class); - } + protected final WriteDemandEstimator demandEstimator = mock(WriteDemandEstimator.class); + protected final CompletableSource.Subscriber completableSubscriber = mock(CompletableSource.Subscriber.class); + protected final FailingWriteHandler failingWriteHandler = new FailingWriteHandler(); + protected final EmbeddedChannel channel = new EmbeddedChannel(failingWriteHandler); @AfterEach public void tearDown() throws Exception { @@ -59,6 +52,9 @@ protected void verifyWriteSuccessful(String... items) { channel.flushOutbound(); if (items.length > 0) { assertThat("Message not written.", channel.outboundMessages(), contains((String[]) items)); + for (int i = 0; i < items.length; ++i) { + channel.readOutbound(); // discard written items + } } else { assertThat("Unexpected message(s) written.", channel.outboundMessages(), is(empty())); } @@ -71,6 +67,14 @@ protected void verifyListenerSuccessful() { verifyNoMoreInteractions(completableSubscriber); } + protected void verifyListenerFailed(@Nullable Throwable t) { + channel.flushOutbound(); + assertThat("Unexpected message(s) written.", channel.outboundMessages(), is(empty())); + verify(completableSubscriber).onSubscribe(any()); + verify(completableSubscriber).onError(t != null ? t : any()); + verifyNoMoreInteractions(completableSubscriber); + } + static final class FailingWriteHandler extends ChannelDuplexHandler { private volatile boolean failNextWritePromise; private volatile boolean throwFromNextWrite; diff --git a/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/SslCloseNotifyAlertClientHandlingTest.java b/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/SslCloseNotifyAlertClientHandlingTest.java index d509cfab51..fac3296f66 100644 --- a/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/SslCloseNotifyAlertClientHandlingTest.java +++ b/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/SslCloseNotifyAlertClientHandlingTest.java @@ -116,6 +116,7 @@ private void sendRequest() { .then(() -> { writeMsg(writeSource, BEGIN); writeMsg(writeSource, END); + writeSource.onComplete(); }) .expectComplete() .verify(); diff --git a/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/SslCloseNotifyAlertServerHandlingTest.java b/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/SslCloseNotifyAlertServerHandlingTest.java index f9e013394e..2a41ad671c 100644 --- a/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/SslCloseNotifyAlertServerHandlingTest.java +++ b/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/SslCloseNotifyAlertServerHandlingTest.java @@ -44,8 +44,24 @@ void afterExchangeIdleConnection() { .then(() -> { writeMsg(writeSource, BEGIN); writeMsg(writeSource, END); + writeSource.onComplete(); closeNotifyAndVerifyClosing(); }) + .expectComplete() + .verify(); + } + + @Test + void afterRequestAndWritingResponseButBeforeCompletingWrite() { + receiveRequest(); + PublisherSource.Processor writeSource = newPublisherProcessor(); + StepVerifiers.create(conn.write(fromSource(writeSource))) + .then(() -> { + writeMsg(writeSource, BEGIN); + writeMsg(writeSource, END); + closeNotifyAndVerifyClosing(); + writeSource.onComplete(); + }) .expectErrorConsumed(cause -> { assertThat("Unexpected write failure cause", cause, instanceOf(CloseEventObservedException.class)); CloseEventObservedException ceoe = (CloseEventObservedException) cause; @@ -55,7 +71,7 @@ void afterExchangeIdleConnection() { } @Test - void afterRequestBeforeSendingResponse() { + void afterRequestBeforeWritingResponse() { receiveRequest(); PublisherSource.Processor writeSource = newPublisherProcessor(); @@ -66,7 +82,7 @@ void afterRequestBeforeSendingResponse() { } @Test - void afterRequestWhileSendingResponse() { + void afterRequestWhileWritingResponse() { receiveRequest(); PublisherSource.Processor writeSource = newPublisherProcessor(); @@ -80,7 +96,7 @@ void afterRequestWhileSendingResponse() { } @Test - void whileReadingRequestBeforeSendingResponse() { + void whileReadingRequestBeforeWritingResponse() { StepVerifiers.create(conn.write(fromSource(newPublisherProcessor())).merge(conn.read())) .then(() -> { // Start reading request @@ -93,7 +109,7 @@ void whileReadingRequestBeforeSendingResponse() { } @Test - void whileReadingRequestAndSendingResponse() { + void whileReadingRequestAndWritingResponse() { PublisherSource.Processor writeSource = newPublisherProcessor(); StepVerifiers.create(conn.write(fromSource(writeSource)).merge(conn.read())) .then(() -> { @@ -109,7 +125,7 @@ void whileReadingRequestAndSendingResponse() { } @Test - void whileReadingRequestAfterSendingResponse() { + void whileReadingRequestAfterWritingResponse() { PublisherSource.Processor writeSource = newPublisherProcessor(); StepVerifiers.create(conn.write(fromSource(writeSource)).merge(conn.read())) .then(() -> { @@ -118,6 +134,7 @@ void whileReadingRequestAfterSendingResponse() { // Send response writeMsg(writeSource, BEGIN); writeMsg(writeSource, END); + writeSource.onComplete(); }) .expectNext(BEGIN) .then(this::closeNotifyAndVerifyClosing) @@ -125,6 +142,26 @@ void whileReadingRequestAfterSendingResponse() { .verify(); } + @Test + void whileReadingRequestAfterWritingResponseButBeforeCompletingWrite() { + PublisherSource.Processor writeSource = newPublisherProcessor(); + StepVerifiers.create(conn.write(fromSource(writeSource)).merge(conn.read())) + .then(() -> { + // Start reading request + channel.writeInbound(BEGIN); + // Send response + writeMsg(writeSource, BEGIN); + writeMsg(writeSource, END); + }) + .expectNext(BEGIN) + .then(() -> { + closeNotifyAndVerifyClosing(); + writeSource.onComplete(); + }) + .expectError(ClosedChannelException.class) + .verify(); + } + private void receiveRequest() { StepVerifiers.create(conn.read()) .then(() -> channel.writeInbound(BEGIN)) diff --git a/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriberTest.java b/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriberTest.java index 461f6547a8..b7ebad3244 100644 --- a/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriberTest.java +++ b/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriberTest.java @@ -20,12 +20,15 @@ import io.servicetalk.transport.netty.internal.WriteStreamSubscriber.AbortedFirstWriteException; import io.netty.channel.Channel; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.ArgumentCaptor; +import java.nio.channels.ClosedChannelException; + import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION; -import static io.servicetalk.transport.netty.internal.CloseHandler.UNSUPPORTED_PROTOCOL_CLOSE_HANDLER; +import static java.lang.Long.MAX_VALUE; import static java.util.function.UnaryOperator.identity; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; @@ -35,9 +38,9 @@ import static org.mockito.ArgumentCaptor.forClass; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; -import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; @@ -46,24 +49,20 @@ class WriteStreamSubscriberTest extends AbstractWriteTest { + private final Subscription subscription = mock(Subscription.class); + private final CloseHandler closeHandler = mock(CloseHandler.class); private WriteStreamSubscriber subscriber; - private Subscription subscription; - private CloseHandler closeHandler; - - @BeforeEach - @Override - public void setUp() throws Exception { - super.setUp(); - closeHandler = mock(CloseHandler.class); + + void setUp(boolean isClient, boolean shouldWait) { subscriber = new WriteStreamSubscriber(channel, demandEstimator, completableSubscriber, closeHandler, - NoopWriteObserver.INSTANCE, identity(), false, __ -> false); - subscription = mock(Subscription.class); + NoopWriteObserver.INSTANCE, identity(), isClient, __ -> shouldWait); when(demandEstimator.estimateRequestN(anyLong())).thenReturn(1L); subscriber.onSubscribe(subscription); } @Test void testSingleItem() { + setUp(false, false); WriteInfo info = writeAndFlush("Hello"); subscriber.onComplete(); verifyListenerSuccessful(); @@ -74,6 +73,7 @@ void testSingleItem() { @Test void testMultipleItem() { + setUp(false, false); WriteInfo info1 = writeAndFlush("Hello1"); WriteInfo info2 = writeAndFlush("Hello2"); WriteInfo info3 = writeAndFlush("Hello3"); @@ -84,8 +84,41 @@ void testMultipleItem() { verifyNoInteractions(closeHandler); } + @ParameterizedTest(name = "{displayName} [{index}] error={0}") + @ValueSource(booleans = {false, true}) + void testMultipleItemThenOutboundClosed(boolean error) { + setUp(false, false); + verify(subscription).request(anyLong()); + WriteInfo info1 = write("Hello1"); + WriteInfo info2 = write("Hello2"); + WriteInfo info3 = write("Hello3"); + + subscriber.channelOutboundClosed(); + verify(subscription).request(eq(MAX_VALUE)); + verify(completableSubscriber).onSubscribe(any()); + verifyNoMoreInteractions(completableSubscriber); + + verifyWriteSuccessful("Hello1", "Hello2", "Hello3"); + verifyWrite(info1, info2, info3); + + // Write after channelOutboundClosed should be discarded + writeAndFlush("Hello4"); + assertThat("Unexpected message(s) written.", channel.outboundMessages(), is(empty())); + + verifyNoMoreInteractions(completableSubscriber); + if (error) { + subscriber.onError(DELIBERATE_EXCEPTION); + verify(completableSubscriber).onError(DELIBERATE_EXCEPTION); + } else { + subscriber.onComplete(); + verify(completableSubscriber).onComplete(); + } + verifyNoMoreInteractions(completableSubscriber, subscription, closeHandler); + } + @Test void testOnErrorNoWrite() throws InterruptedException { + setUp(false, false); subscriber.onError(DELIBERATE_EXCEPTION); ArgumentCaptor exceptionCaptor = forClass(Throwable.class); verify(this.completableSubscriber).onError(exceptionCaptor.capture()); @@ -96,6 +129,7 @@ void testOnErrorNoWrite() throws InterruptedException { @Test void testOnCompleteNoWrite() { + setUp(false, false); subscriber.onComplete(); verify(this.completableSubscriber).onComplete(); verifyNoInteractions(closeHandler); @@ -103,6 +137,7 @@ void testOnCompleteNoWrite() { @Test void testOnErrorPostWrite() throws InterruptedException { + setUp(false, false); writeAndFlush("Hello"); channel.flushOutbound(); subscriber.onError(DELIBERATE_EXCEPTION); @@ -113,9 +148,9 @@ void testOnErrorPostWrite() throws InterruptedException { @Test void testCancelBeforeOnSubscribe() { - subscriber = new WriteStreamSubscriber(channel, demandEstimator, completableSubscriber, - UNSUPPORTED_PROTOCOL_CLOSE_HANDLER, NoopWriteObserver.INSTANCE, identity(), false, __ -> false); - subscription = mock(Subscription.class); + subscriber = new WriteStreamSubscriber(channel, demandEstimator, completableSubscriber, closeHandler, + NoopWriteObserver.INSTANCE, identity(), false, __ -> false); + when(demandEstimator.estimateRequestN(anyLong())).thenReturn(1L); subscriber.cancel(); subscriber.onSubscribe(subscription); verify(subscription).cancel(); @@ -124,6 +159,7 @@ void testCancelBeforeOnSubscribe() { @Test void testCancelAfterOnSubscribe() { + setUp(false, false); subscriber.cancel(); verify(subscription).cancel(); verifyNoInteractions(closeHandler); @@ -131,11 +167,10 @@ void testCancelAfterOnSubscribe() { @Test void testRequestMoreBeforeOnSubscribe() { - reset(completableSubscriber); - subscriber = new WriteStreamSubscriber(channel, demandEstimator, completableSubscriber, - UNSUPPORTED_PROTOCOL_CLOSE_HANDLER, NoopWriteObserver.INSTANCE, identity(), false, __ -> false); + subscriber = new WriteStreamSubscriber(channel, demandEstimator, completableSubscriber, closeHandler, + NoopWriteObserver.INSTANCE, identity(), false, __ -> false); + when(demandEstimator.estimateRequestN(anyLong())).thenReturn(1L); subscriber.channelWritable(); - subscription = mock(Subscription.class); subscriber.onSubscribe(subscription); WriteInfo info = writeAndFlush("Hello"); subscriber.onComplete(); @@ -147,38 +182,61 @@ void testRequestMoreBeforeOnSubscribe() { @Test void writeFailureClosesChannel() throws Exception { - failingWriteClosesChannel(() -> failingWriteHandler.failNextWritePromise()); + setUp(false, false); + failingWriteClosesChannel(failingWriteHandler::failNextWritePromise); } @Test void uncaughtWriteExceptionClosesChannel() throws Exception { - failingWriteClosesChannel(() -> failingWriteHandler.throwFromNextWrite()); + setUp(false, false); + failingWriteClosesChannel(failingWriteHandler::throwFromNextWrite); } @Test void onNextAfterChannelClose() { + setUp(false, false); subscriber.channelClosed(DELIBERATE_EXCEPTION); + verify(subscription).cancel(); subscriber.onNext("Hello"); - channel.flushOutbound(); + verifyListenerFailed(null); assertThat("Unexpected message(s) written.", channel.outboundMessages(), is(empty())); } @Test - void onNextAfterChannelCloseWhileActiveWritesPending() { + void secondOnNextAfterChannelClose() { + setUp(false, false); subscriber.onNext("Hello"); + verifyWriteSuccessful("Hello"); subscriber.channelClosed(DELIBERATE_EXCEPTION); + verify(subscription).cancel(); + subscriber.onNext("Hello2"); + verifyListenerFailed(DELIBERATE_EXCEPTION); + } + + @Test + void closeChannelDuringFirstWrite() { + setUp(false, false); + subscriber.onNext("Hello"); + channel.close(); // fails the first write with StacklessClosedChannelException + subscriber.channelClosed(// simulate channelInactive event + StacklessClosedChannelException.newInstance(WriteStreamSubscriberTest.class, "channel.close()")); + verify(subscription).cancel(); + subscriber.onNext("Hello2"); + channel.runPendingTasks(); + channel.flush(); assertThat("Unexpected message(s) written.", channel.outboundMessages(), is(empty())); - verifyWriteSuccessful("Hello"); + + verify(completableSubscriber).onSubscribe(any()); + verify(completableSubscriber).onError(any(ClosedChannelException.class)); + verifyNoMoreInteractions(completableSubscriber); } @Test void clientRequestsOne() { - reset(completableSubscriber, demandEstimator); when(demandEstimator.estimateRequestN(anyLong())).thenReturn(10L); - subscriber = new WriteStreamSubscriber(channel, demandEstimator, completableSubscriber, - UNSUPPORTED_PROTOCOL_CLOSE_HANDLER, NoopWriteObserver.INSTANCE, identity(), true, __ -> false); - subscription = mock(Subscription.class); + subscriber = new WriteStreamSubscriber(channel, demandEstimator, completableSubscriber, closeHandler, + NoopWriteObserver.INSTANCE, identity(), true, __ -> false); subscriber.onSubscribe(subscription); verify(subscription).request(1L); verify(demandEstimator, never()).estimateRequestN(anyLong()); @@ -194,23 +252,73 @@ void clientRequestsOne() { @Test void channelClosedDoesNotCancelAfterOutboundEnd() throws Exception { + setUp(false, false); + verify(completableSubscriber).onSubscribe(any()); WriteInfo info = writeAndFlush("Hello"); + verify(subscription, times(2)).request(anyLong()); + verifyNoMoreInteractions(completableSubscriber, subscription); + subscriber.channelOutboundClosed(); + verify(subscription).request(MAX_VALUE); - verifyListenerSuccessful(); verifyWriteSuccessful("Hello"); verifyWrite(info); + verifyNoMoreInteractions(completableSubscriber, subscription); channel.finishAndReleaseAll(); subscriber.channelClosed(DELIBERATE_EXCEPTION); // simulate channelInactive event from DefaultNettyConnection - verify(closeHandler).closeChannelOutbound(any(Channel.class)); + // channelClosed after channelOutboundClosed should not fail subscriber + verifyNoMoreInteractions(completableSubscriber, closeHandler); subscriber.onComplete(); // signal completion after "channelInactive" - verify(subscription, never()).cancel(); - verifyNoMoreInteractions(closeHandler); + verify(completableSubscriber).onComplete(); + verify(closeHandler).closeChannelOutbound(any(Channel.class)); + verifyNoMoreInteractions(completableSubscriber, subscription, closeHandler); assertChannelClose(); } + @Test + void clientContinueWriting() { + when(demandEstimator.estimateRequestN(anyLong())).thenReturn(10L); + subscriber = new WriteStreamSubscriber(channel, demandEstimator, completableSubscriber, closeHandler, + NoopWriteObserver.INSTANCE, identity(), true, __ -> true); + subscriber.onSubscribe(subscription); + + verify(subscription).request(1L); + writeAndFlush("Hello1"); + verifyNoMoreInteractions(subscription); + + subscriber.continueWriting(); + verify(subscription).request(10L); + writeAndFlush("Hello2"); + + verifyWriteSuccessful("Hello1", "Hello2"); + subscriber.onComplete(); + verifyListenerSuccessful(); + verifyNoMoreInteractions(subscription, closeHandler); + } + + @Test + void clientTerminateSource() { + when(demandEstimator.estimateRequestN(anyLong())).thenReturn(10L); + subscriber = new WriteStreamSubscriber(channel, demandEstimator, completableSubscriber, closeHandler, + NoopWriteObserver.INSTANCE, identity(), true, __ -> true); + subscriber.onSubscribe(subscription); + + verify(subscription).request(1L); + writeAndFlush("Hello"); + verifyNoMoreInteractions(subscription); + + subscriber.terminateSource(); + verify(subscription).cancel(); + + verifyListenerSuccessful(); + verifyWriteSuccessful("Hello"); + + subscriber.onComplete(); + verifyNoMoreInteractions(subscription, closeHandler, completableSubscriber); + } + private void failingWriteClosesChannel(Runnable enableWriteFailure) throws InterruptedException { WriteInfo info1 = writeAndFlush("Hello1"); verify(completableSubscriber).onSubscribe(any()); @@ -231,11 +339,21 @@ private void verifyWrite(WriteInfo... infos) { verify(subscription, times(infos.length + 1)).request(1); } + private WriteInfo write(String msg) { + return write(msg, false); + } + private WriteInfo writeAndFlush(String msg) { + return write(msg, true); + } + + private WriteInfo write(String msg, boolean shouldFlush) { long pre = channel.bytesBeforeUnwritable(); subscriber.onNext(msg); long post = channel.bytesBeforeUnwritable(); - channel.flushOutbound(); + if (shouldFlush) { + channel.flushOutbound(); + } return new WriteInfo(pre, post, msg); }