diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/SpliceFlatStreamToMetaSingle.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/SpliceFlatStreamToMetaSingle.java index 0bddc645a0..0aba50e78a 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/SpliceFlatStreamToMetaSingle.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/SpliceFlatStreamToMetaSingle.java @@ -24,6 +24,7 @@ import io.servicetalk.concurrent.api.PublisherToSingleOperator; import io.servicetalk.concurrent.api.Single; import io.servicetalk.concurrent.api.internal.SubscribablePublisher; +import io.servicetalk.concurrent.internal.DelayedSubscription; import io.servicetalk.concurrent.internal.DuplicateSubscribeException; import io.servicetalk.http.api.HttpResponseMetaData; import io.servicetalk.http.api.StreamingHttpResponse; @@ -74,7 +75,7 @@ public PublisherSource.Subscriber apply(Subscriber subscri /* Visible for testing */ static final class SplicingSubscriber implements PublisherSource.Subscriber { - + @SuppressWarnings("rawtypes") private static final AtomicReferenceFieldUpdater maybePayloadSubUpdater = AtomicReferenceFieldUpdater.newUpdater(SplicingSubscriber.class, Object.class, "maybePayloadSub"); @@ -225,17 +226,21 @@ private Publisher newPayloadPublisher() { return new SubscribablePublisher() { @Override protected void handleSubscribe(PublisherSource.Subscriber newSubscriber) { + final DelayedSubscription delayedSubscription = new DelayedSubscription(); + // newSubscriber.onSubscribe MUST be called before making newSubscriber visible below with the CAS + // on maybePayloadSubUpdater. Otherwise there is a potential for concurrent invocation on the + // Subscriber which is not allowed by the Reactive Streams specification. + newSubscriber.onSubscribe(delayedSubscription); if (maybePayloadSubUpdater.compareAndSet(SplicingSubscriber.this, PENDING, newSubscriber)) { - // TODO risk of a race here with terminal events, will be addressed in follow-up PR assert rawSubscription != null; - newSubscriber.onSubscribe(rawSubscription); + delayedSubscription.delayedSubscription(rawSubscription); } else { // Entering this branch means either a duplicate subscriber or a stream that completed or failed // without a subscriber present. The consequence is that unless we've seen payload data we may // not send onComplete() or onError() to the original subscriber, but that is OK as long as one // subscriber of them gets the correct signal and all others get a duplicate subscriber error. final Object maybeSubscriber = SplicingSubscriber.this.maybePayloadSub; - newSubscriber.onSubscribe(EMPTY_SUBSCRIPTION); + delayedSubscription.delayedSubscription(EMPTY_SUBSCRIPTION); if (maybeSubscriber == EMPTY_COMPLETED && maybePayloadSubUpdater .compareAndSet(SplicingSubscriber.this, EMPTY_COMPLETED, EMPTY_COMPLETED_DELIVERED)) { // Prematurely completed (header + empty payload)