Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WriteStreamSubscriber: respect termination of the publisher #2387

Merged
merged 10 commits into from
Oct 12, 2022

Conversation

idelpivnitskiy
Copy link
Member

@idelpivnitskiy idelpivnitskiy commented Oct 5, 2022

Motivation:

  1. Protocol can signal when the effective write of the message is complete and no more items are expected. In this case, WriteStreamSubscriber will do request(MAX_VALUE) to drain the remaining (likely empty) items from the publisher and terminate subscriber. However, premature termination of the completable subscriber breaks Reactive Streams control flow. Users of
    connection.write(publisher) can compose more operators on the returned Completable and they expect that the Completable terminates only after the passed publisher terminates too. The publisher can have more business logic after it emits all items. For example, service can read request payload body after it returns the full response.
  2. If it waits for continuation but protocol signals it's not interested in more data, publisher won't know about it.

Modifications:

  • On channelOutboundClosed(), do not attempt to terminate subscriber. Instead, mark the state as SOURCE_OUTBOUND_CLOSED and wait for termination of the publisher;
  • If channelClosed(...) was invoked after channelOutboundClosed(), do not fail the subscriber. Instead, wait for termination of the publisher;
  • When terminateSource() is invoked, cancel the current subscription to let the publisher know we are not interested in more items. For HTTP client it won't have any actual effect for request payload body because we use Single.concat(Publisher) with deferSubscribe option, but it's necessary to propagate signal for other use-cases;
  • Adjust tests for new behavior;
  • Increase test coverage for WriteStreamSubscriber;

Result:

Terminal signals are correctly propagated through the RS chain when users use connection.write(publisher).

Motivation:

1. Protocol can signal when the effective write of the message is
complete and no more items are expected. In this case,
`WriteStreamSubscriber` will do `request(MAX_VALUE)` to drain the
remaining (likely empty) items from the publisher and terminate
subscriber. However, premature termination of the completable subscriber
breaks Reactive Streams control flow. Users of
`connection.write(publisher)` can compose more operators on the returned
`Completable` and they expect that the `Completable` terminates only
after the passed `publisher` terminates too. The `publisher` can have
more business logic after it emits all items. For example, service can
read request payload body after it returns the full response.
2. If it waits for continuation but protocol signals it's not interested
in more data, publisher won't know about it.

Modifications:

- On `channelOutboundClosed()`, to do attempt to terminate subscriber.
Instead, mark the state as `SOURCE_OUTBOUND_CLOSED` and wait for
termination of the publisher;
- If `channelClosed(...)` was invoked after `channelOutboundClosed()`,
do not fail the subscriber. Instead, wait for termination of the
publisher;
- When `terminateSource()` is invoked, cancel the current subscription
to let the publisher know we are not interested in more items. For HTTP
client it won't have any actual effect for request payload body because
we use `Single.concat(Publisher)` with `deferSubscribe` option, but it's
necessary to propagate signal for other use-cases;
- Adjust tests for new behavior;
- Increase test coverage for `WriteStreamSubscriber`;

Result:

Terminal signals correctly propagated through RS chain when users use
`connection.write(publisher)`.
@idelpivnitskiy idelpivnitskiy self-assigned this Oct 5, 2022
@idelpivnitskiy idelpivnitskiy marked this pull request as ready for review October 5, 2022 00:27
@idelpivnitskiy
Copy link
Member Author

Required for #2367.

@@ -408,6 +414,8 @@ boolean isWritable() {

void writeNext(Object msg) {
assert eventLoop.inEventLoop();
assert isWritable() : channel + " Unexpected writeNext: " + msg + " during non-writable state=" +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this assertion necessary? wondering if we need to prevent any write-after-close/terminated scenarios as we may want to pass on the object anyways (reference counted, ordered promos completion, ..)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really, invocation of this method is currently guarded by isWritable() check:

    void doWrite(Object msg) {
        // Ignore onNext if the channel is already closed.
        if (promise.isWritable()) {
            long capacityBefore = channel.bytesBeforeUnwritable();
            promise.writeNext(msg);

Added this assection just in case the contract changes, to make sure we handle writes correctly as it will likely will also required changes in AllWritesPromise.

@idelpivnitskiy idelpivnitskiy merged commit 14b612f into apple:main Oct 12, 2022
@idelpivnitskiy idelpivnitskiy deleted the wss branch October 12, 2022 23:14
idelpivnitskiy added a commit that referenced this pull request Oct 13, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants