Skip to content

Commit

Permalink
WriteStreamSubscriber: don't interact with Subscription after termi…
Browse files Browse the repository at this point in the history
…nal (#2276)

Motivation:

#1838 marks `WriteStreamSubscriber.this.subscription` as `CANCELLED`
when source terminates. It covered one use-case (cancel after terminal),
but does not cover another use-case (request after terminal) introduced
later in #1850 due to race between a thread that emits a terminal signal
and the event-loop.

Modifications:

- Mark `WriteStreamSubscriber.this.subscription` as `CANCELLED` before
`promise.sourceTerminated` is scheduled on the event-loop;

Result:

No more interactions with the subscription after terminal signal is
received. Technically, race is still possible if a terminal signal is
delayed, but the time window is lower now.

It also resolves a flaky test
`HttpLifecycleObserverTest.testCompleteWithPayloadBodyAndTrailers` that
may observe `onRequestDataRequested` after `onRequestComplete`. Due to
offloading, `onRequestDataRequested` may be observed significantly later
than response completes and verification starts.
  • Loading branch information
idelpivnitskiy authored Jul 20, 2022
1 parent 06281ef commit 68b8609
Showing 1 changed file with 24 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -189,26 +189,35 @@ void doWrite(Object msg) {
public void onError(Throwable cause) {
requireNonNull(cause);
if (enqueueWrites || !eventLoop.inEventLoop()) {
eventLoop.execute(() -> promise.sourceTerminated(cause));
scheduleSourceTerminated(cause);
} else {
promise.sourceTerminated(cause);
promise.sourceTerminated(cause, true);
}
}

@Override
public void onComplete() {
if (enqueueWrites || !eventLoop.inEventLoop()) {
eventLoop.execute(() -> promise.sourceTerminated(null));
scheduleSourceTerminated(null);
} else {
promise.sourceTerminated(null);
promise.sourceTerminated(null, true);
}
}

private void scheduleSourceTerminated(@Nullable Throwable cause) {
// To mitigate a race between the caller and EventLoop threads, mark current subscription as `CANCELLED` to
// prevent any further interactions with it, like propagating `cancel` from `channelClosed(Throwable)` or
// `request(MAX_VALUE)` from `channelOutboundClosed()`.
// See https://github.com/reactive-streams/reactive-streams-jvm#2.4
this.subscription = CANCELLED;
eventLoop.execute(() -> promise.sourceTerminated(cause, false));
}

@Override
public void channelWritable() {
assert eventLoop.inEventLoop();
final Subscription subscription = this.subscription;
if (isClient && subscription != null && !promise.written) {
if (isClient && subscription != null && subscription != CANCELLED && !promise.written) {
// If nothing was written, make initial requestN
initialRequestN(subscription);
} else {
Expand Down Expand Up @@ -237,7 +246,7 @@ public void channelOutboundClosed() {
// we may deadlock if we don't request enough onNext signals to see the terminal signal.
sub.request(Long.MAX_VALUE);
}
promise.sourceTerminated(null);
promise.sourceTerminated(null, true);
}

@Override
Expand All @@ -246,7 +255,7 @@ public void terminateSource() {
// Terminate the source only if it awaits continuation.
if (shouldWaitFlag) {
assert promise.activeWrites == 0; // We never start sending payload body until we receive 100 (Continue)
promise.sourceTerminated(null);
promise.sourceTerminated(null, true);
}
}

Expand Down Expand Up @@ -408,17 +417,21 @@ void writeNext(Object msg) {
}
}

void sourceTerminated(@Nullable Throwable cause) {
void sourceTerminated(@Nullable Throwable cause, boolean markCancelled) {
assert eventLoop.inEventLoop();
if (isAnySet(state, SUBSCRIBER_OR_SOURCE_TERMINATED)) {
// We have terminated prematurely perhaps due to write failure.
return;
}
this.failureCause = cause;
state = set(state, SOURCE_TERMINATED);
// Mark the subscription as CANCELLED to prevent propagating cancel from channelClosed. At this point we
// always have a non-null subscription because this is reachable only if publisher emitted some signals.
WriteStreamSubscriber.this.subscription = CANCELLED;
if (markCancelled) {
// When we know that the source is effectively terminated and won't emit any new items, mark the
// subscription as CANCELLED to prevent any further interactions with it, like propagating `cancel` from
// `channelClosed(Throwable)` or `request(MAX_VALUE)` from `channelOutboundClosed()`. At this point we
// always have a non-null subscription because this is reachable only if publisher emitted some signals.
WriteStreamSubscriber.this.subscription = CANCELLED;
}
if (activeWrites == 0) {
try {
state = set(state, SUBSCRIBER_TERMINATED);
Expand Down

0 comments on commit 68b8609

Please sign in to comment.