Skip to content

Commit

Permalink
BeforeFinallyHttpOperator dereference subscriber after termination (#…
Browse files Browse the repository at this point in the history
…2327)

Motivation:
After terminating the Single subscriber we need to dereference the Single subscriber as otherwise
we may retain a reference to a downstream Subscriber and violate RS 3.13.
https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.4/README.md#3.13
  • Loading branch information
Scottmitch authored Aug 18, 2022
1 parent d557575 commit 797b5e5
Showing 1 changed file with 26 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,22 @@ private static final class ResponseCompletionSubscriber implements SingleSource.
private static final int TERMINATED = 4;
private static final AtomicIntegerFieldUpdater<ResponseCompletionSubscriber> stateUpdater =
newUpdater(ResponseCompletionSubscriber.class, "state");
private static final SingleSource.Subscriber<StreamingHttpResponse> NOOP_SUBSCRIBER =
new SingleSource.Subscriber<StreamingHttpResponse>() {
@Override
public void onSubscribe(final Cancellable cancellable) {
}

@Override
public void onSuccess(@Nullable final StreamingHttpResponse result) {
}

private final SingleSource.Subscriber<? super StreamingHttpResponse> subscriber;
@Override
public void onError(final Throwable t) {
}
};

private SingleSource.Subscriber<? super StreamingHttpResponse> subscriber;
private final TerminalSignalConsumer beforeFinally;
private final boolean discardEventsAfterCancel;
private volatile int state;
Expand Down Expand Up @@ -388,6 +402,7 @@ private void cancel0(final boolean propagateCancel) {
return Publisher.failed(new CancellationException("Received response post cancel."));
}));
}
dereferenceSubscriber();
}

@Override
Expand All @@ -402,6 +417,7 @@ public void onError(final Throwable t) {
addSuppressed(t, cause);
}
subscriber.onError(t);
dereferenceSubscriber();
}

private void sendNullResponse() {
Expand All @@ -414,9 +430,18 @@ private void sendNullResponse() {
}
} catch (Throwable cause) {
subscriber.onError(cause);
dereferenceSubscriber();
return;
}
subscriber.onSuccess(null);
dereferenceSubscriber();
}

private void dereferenceSubscriber() {
// After terminating the Single subscriber we need to dereference the Single subscriber as otherwise
// we may retain a reference to a downstream Subscriber and violate RS 3.13.
// https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.4/README.md#3.13
subscriber = NOOP_SUBSCRIBER;
}
}
}

0 comments on commit 797b5e5

Please sign in to comment.