-
Notifications
You must be signed in to change notification settings - Fork 183
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WriteStreamSubscriber
: respect termination of the publisher
#2387
Conversation
Motivation: 1. Protocol can signal when the effective write of the message is complete and no more items are expected. In this case, `WriteStreamSubscriber` will do `request(MAX_VALUE)` to drain the remaining (likely empty) items from the publisher and terminate subscriber. However, premature termination of the completable subscriber breaks Reactive Streams control flow. Users of `connection.write(publisher)` can compose more operators on the returned `Completable` and they expect that the `Completable` terminates only after the passed `publisher` terminates too. The `publisher` can have more business logic after it emits all items. For example, service can read request payload body after it returns the full response. 2. If it waits for continuation but protocol signals it's not interested in more data, publisher won't know about it. Modifications: - On `channelOutboundClosed()`, to do attempt to terminate subscriber. Instead, mark the state as `SOURCE_OUTBOUND_CLOSED` and wait for termination of the publisher; - If `channelClosed(...)` was invoked after `channelOutboundClosed()`, do not fail the subscriber. Instead, wait for termination of the publisher; - When `terminateSource()` is invoked, cancel the current subscription to let the publisher know we are not interested in more items. For HTTP client it won't have any actual effect for request payload body because we use `Single.concat(Publisher)` with `deferSubscribe` option, but it's necessary to propagate signal for other use-cases; - Adjust tests for new behavior; - Increase test coverage for `WriteStreamSubscriber`; Result: Terminal signals correctly propagated through RS chain when users use `connection.write(publisher)`.
Required for #2367. |
@@ -408,6 +414,8 @@ boolean isWritable() { | |||
|
|||
void writeNext(Object msg) { | |||
assert eventLoop.inEventLoop(); | |||
assert isWritable() : channel + " Unexpected writeNext: " + msg + " during non-writable state=" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this assertion necessary? wondering if we need to prevent any write-after-close/terminated scenarios as we may want to pass on the object anyways (reference counted, ordered promos completion, ..)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really, invocation of this method is currently guarded by isWritable()
check:
void doWrite(Object msg) {
// Ignore onNext if the channel is already closed.
if (promise.isWritable()) {
long capacityBefore = channel.bytesBeforeUnwritable();
promise.writeNext(msg);
Added this assection just in case the contract changes, to make sure we handle writes correctly as it will likely will also required changes in AllWritesPromise
.
Motivation:
WriteStreamSubscriber
will dorequest(MAX_VALUE)
to drain the remaining (likely empty) items from the publisher and terminate subscriber. However, premature termination of the completable subscriber breaks Reactive Streams control flow. Users ofconnection.write(publisher)
can compose more operators on the returnedCompletable
and they expect that theCompletable
terminates only after the passedpublisher
terminates too. Thepublisher
can have more business logic after it emits all items. For example, service can read request payload body after it returns the full response.Modifications:
channelOutboundClosed()
, do not attempt to terminate subscriber. Instead, mark the state asSOURCE_OUTBOUND_CLOSED
and wait for termination of the publisher;channelClosed(...)
was invoked afterchannelOutboundClosed()
, do not fail the subscriber. Instead, wait for termination of the publisher;terminateSource()
is invoked, cancel the current subscription to let the publisher know we are not interested in more items. For HTTP client it won't have any actual effect for request payload body because we useSingle.concat(Publisher)
withdeferSubscribe
option, but it's necessary to propagate signal for other use-cases;WriteStreamSubscriber
;Result:
Terminal signals are correctly propagated through the RS chain when users use
connection.write(publisher)
.