Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race condition in SpliceFlatStreamToMetaSingle #1845

Merged
merged 1 commit into from
Sep 29, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.servicetalk.concurrent.api.PublisherToSingleOperator;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.internal.SubscribablePublisher;
import io.servicetalk.concurrent.internal.DelayedSubscription;
import io.servicetalk.concurrent.internal.DuplicateSubscribeException;
import io.servicetalk.http.api.HttpResponseMetaData;
import io.servicetalk.http.api.StreamingHttpResponse;
Expand Down Expand Up @@ -74,7 +75,7 @@ public PublisherSource.Subscriber<Object> apply(Subscriber<? super Data> subscri

/* Visible for testing */
static final class SplicingSubscriber<Data, MetaData, Payload> implements PublisherSource.Subscriber<Object> {

@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<SplicingSubscriber, Object>
maybePayloadSubUpdater = AtomicReferenceFieldUpdater.newUpdater(SplicingSubscriber.class,
Object.class, "maybePayloadSub");
Expand Down Expand Up @@ -225,17 +226,21 @@ private Publisher<Payload> newPayloadPublisher() {
return new SubscribablePublisher<Payload>() {
@Override
protected void handleSubscribe(PublisherSource.Subscriber<? super Payload> newSubscriber) {
final DelayedSubscription delayedSubscription = new DelayedSubscription();
// newSubscriber.onSubscribe MUST be called before making newSubscriber visible below with the CAS
// on maybePayloadSubUpdater. Otherwise there is a potential for concurrent invocation on the
// Subscriber which is not allowed by the Reactive Streams specification.
newSubscriber.onSubscribe(delayedSubscription);
if (maybePayloadSubUpdater.compareAndSet(SplicingSubscriber.this, PENDING, newSubscriber)) {
// TODO risk of a race here with terminal events, will be addressed in follow-up PR
assert rawSubscription != null;
newSubscriber.onSubscribe(rawSubscription);
delayedSubscription.delayedSubscription(rawSubscription);
} else {
// Entering this branch means either a duplicate subscriber or a stream that completed or failed
// without a subscriber present. The consequence is that unless we've seen payload data we may
// not send onComplete() or onError() to the original subscriber, but that is OK as long as one
// subscriber of them gets the correct signal and all others get a duplicate subscriber error.
final Object maybeSubscriber = SplicingSubscriber.this.maybePayloadSub;
newSubscriber.onSubscribe(EMPTY_SUBSCRIPTION);
delayedSubscription.delayedSubscription(EMPTY_SUBSCRIPTION);
if (maybeSubscriber == EMPTY_COMPLETED && maybePayloadSubUpdater
.compareAndSet(SplicingSubscriber.this, EMPTY_COMPLETED, EMPTY_COMPLETED_DELIVERED)) {
// Prematurely completed (header + empty payload)
Expand Down