diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/AbstractLifecycleObserverHttpFilter.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/AbstractLifecycleObserverHttpFilter.java index 96e4bddbf4..697785e935 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/AbstractLifecycleObserverHttpFilter.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/AbstractLifecycleObserverHttpFilter.java @@ -17,7 +17,6 @@ import io.servicetalk.buffer.api.Buffer; import io.servicetalk.concurrent.PublisherSource; -import io.servicetalk.concurrent.api.AsyncContext; import io.servicetalk.concurrent.api.Single; import io.servicetalk.concurrent.api.TerminalSignalConsumer; import io.servicetalk.context.api.ContextMap; @@ -82,18 +81,17 @@ final Single trackLifecycle(@Nullable final ConnectionInf return defer(() -> { final HttpExchangeObserver onExchange = safeReport(observer::onNewExchange, observer, "onNewExchange", NoopHttpExchangeObserver.INSTANCE); - final boolean clearAsyncContext; + final Runnable clearContext; if (connInfo != null) { safeReport(onExchange::onConnectionSelected, connInfo, onExchange, "onConnectionSelected"); - clearAsyncContext = false; + clearContext = () -> { /* noop */ }; } else { - // Pass it down to LoadBalancedStreamingHttpClient - // FIXME: switch to RequestContext when it's available - AsyncContext.put(ON_CONNECTION_SELECTED_CONSUMER, selectedConnection -> safeReport( + // Pass it down to LoadBalancedStreamingHttpClient and clear inside `onExchangeFinally`: + request.context().put(ON_CONNECTION_SELECTED_CONSUMER, selectedConnection -> safeReport( onExchange::onConnectionSelected, selectedConnection, onExchange, "onConnectionSelected")); - clearAsyncContext = true; + clearContext = () -> request.context().remove(ON_CONNECTION_SELECTED_CONSUMER); } - final ExchangeContext exchangeContext = new ExchangeContext(onExchange, client, clearAsyncContext); + final ExchangeContext exchangeContext = new ExchangeContext(onExchange, client, clearContext); final HttpRequestObserver onRequest = safeReport(onExchange::onRequest, request, onExchange, "onRequest", NoopHttpRequestObserver.INSTANCE); final StreamingHttpRequest transformed = request @@ -178,17 +176,17 @@ private static final class ExchangeContext implements TerminalSignalConsumer { AtomicIntegerFieldUpdater.newUpdater(ExchangeContext.class, "remaining"); private final HttpExchangeObserver onExchange; - private final boolean clearAsyncContext; + private final Runnable clearContext; @Nullable private HttpResponseObserver onResponse; private volatile int remaining; private ExchangeContext(final HttpExchangeObserver onExchange, final boolean client, - final boolean clearAsyncContext) { + final Runnable clearContext) { this.onExchange = onExchange; // server has to always drain request, but client may fail before request message body starts: remaining = client ? 1 : 2; - this.clearAsyncContext = clearAsyncContext; + this.clearContext = clearContext; } void onResponse(HttpResponseMetaData responseMetaData) { @@ -249,9 +247,7 @@ void decrementRemaining() { if (remainingUpdater.decrementAndGet(this) == 0) { // Exchange completes only if both request and response terminate safeReport(onExchange::onExchangeFinally, onExchange, "onExchangeFinally"); - if (clearAsyncContext) { - AsyncContext.remove(ON_CONNECTION_SELECTED_CONSUMER); - } + clearContext.run(); } } } diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/LoadBalancedStreamingHttpClient.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/LoadBalancedStreamingHttpClient.java index cd5091e996..3625b7e1b5 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/LoadBalancedStreamingHttpClient.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/LoadBalancedStreamingHttpClient.java @@ -16,7 +16,6 @@ package io.servicetalk.http.netty; import io.servicetalk.client.api.LoadBalancer; -import io.servicetalk.concurrent.api.AsyncContext; import io.servicetalk.concurrent.api.Completable; import io.servicetalk.concurrent.api.Single; import io.servicetalk.concurrent.api.TerminalSignalConsumer; @@ -73,7 +72,10 @@ public Single request(final StreamingHttpRequest request) // following the LoadBalancer API which this Client depends upon to ensure the concurrent request count state is // correct. return loadBalancer.selectConnection(SELECTOR_FOR_REQUEST, request.context()).flatMap(c -> { - final Consumer onConnectionSelected = AsyncContext.get(ON_CONNECTION_SELECTED_CONSUMER); + // Do not remove ON_CONNECTION_SELECTED_CONSUMER from the context to track new connection selections for + // retries and redirects. + final Consumer onConnectionSelected = request.context() + .get(ON_CONNECTION_SELECTED_CONSUMER); if (onConnectionSelected != null) { onConnectionSelected.accept(c.connectionContext()); }