Skip to content

Commit

Permalink
Handle HTTP/1.1 response cancelation same way at all levels (#2266)
Browse files Browse the repository at this point in the history
Motivation:

When response cancel is generated at the client level,
`LoadBalancedStreamingHttpClient` has a logic in place to close the
connection to mitigate a race between connection availability and
closure. However, this logic is skipped if cancellation is originated from
a connection-level filter.

Modifications:

- Move cancellation handling from `LoadBalancedStreamingHttpClient` to
`AbstractStreamingHttpConnection`;
- Propagate `OnStreamClosedRunnable` using `request.context()`;
- Remove `StreamingHttpRequestWithContext`;
- Remove "cancel handling" logic from `TimeoutHttpRequesterFilter`
introduced in #2263;
- Add more comments and javadoc;
- Adjust tests and comments;

Result:

Request to cancel is intercepted after all user-defined filters to make sure
we catch all sources of cancellation.
  • Loading branch information
idelpivnitskiy authored Jul 26, 2022
1 parent da06991 commit 4bae4c1
Show file tree
Hide file tree
Showing 11 changed files with 180 additions and 645 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.TerminalSignalConsumer;
import io.servicetalk.http.api.FilterableStreamingHttpConnection;
import io.servicetalk.http.api.HttpConnectionContext;
import io.servicetalk.http.api.HttpEventKey;
Expand All @@ -34,10 +35,14 @@
import io.servicetalk.http.api.StreamingHttpRequestResponseFactory;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.http.api.StreamingHttpResponseFactory;
import io.servicetalk.http.netty.LoadBalancedStreamingHttpClient.OnStreamClosedRunnable;
import io.servicetalk.transport.api.IoThreadFactory;
import io.servicetalk.transport.netty.internal.FlushStrategy;
import io.servicetalk.transport.netty.internal.NettyConnectionContext;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import static io.servicetalk.concurrent.api.Executors.immediate;
Expand All @@ -61,6 +66,8 @@

abstract class AbstractStreamingHttpConnection<CC extends NettyConnectionContext>
implements FilterableStreamingHttpConnection {

private static final Logger LOGGER = LoggerFactory.getLogger(AbstractStreamingHttpConnection.class);
private static final IgnoreConsumedEvent<Integer> ZERO_MAX_CONCURRENCY_EVENT = new IgnoreConsumedEvent<>(0);

final CC connection;
Expand Down Expand Up @@ -100,10 +107,67 @@ public final <T> Publisher<? extends T> transportEventStream(final HttpEventKey<
failed(new IllegalArgumentException("Unknown key: " + eventKey));
}

private Single<StreamingHttpResponse> makeRequest(final Publisher<Object> flattenedRequest,
private Single<StreamingHttpResponse> makeRequest(final HttpRequestMetaData requestMetaData,
final Publisher<Object> flattenedRequest,
@Nullable final FlushStrategy flushStrategy) {
return writeAndRead(flattenedRequest, flushStrategy).liftSyncToSingle(new SpliceFlatStreamToMetaSingle<>(
this::newSplicedResponse));
return writeAndRead(flattenedRequest, flushStrategy)
// Handle cancellation for LoadBalancedStreamingHttpClient. We do it here for several reasons:
// 1. Intercepting cancel next to the transport layer (after all user-defined filters and internal HTTP
// logic) helps to capture all possible sources of cancellation.
// 2. Intercepting cancel on the caller thread before jumping to the event-loop thread helps to notify
// concurrency controller that the channel is going to close before potentially delivering a
// terminal event back to the response Subscriber (e.g. TimeoutHttpRequesterFilter emits "onError"
// right after propagating "cancel").
// 3. Doing it before SpliceFlatStreamToMetaSingle helps to avoid the need for
// BeforeFinallyHttpOperator.
// 4. Doing it before offloading of terminal signals helps to reduce the risk of closing a connection
// after response terminates.
// We use beforeFinally instead of beforeCancel to avoid closing connection after response terminates.
.beforeFinally(new TerminalSignalConsumer() {
@Override
public void onComplete() {
// noop
}

@Override
public void onError(final Throwable throwable) {
// noop
}

@Override
public void cancel() {
// If the request gets cancelled before termination, we pessimistically assume that the
// transport will close the connection since the Subscriber did not read the entire response and
// cancelled. This reduces the time window during which a connection is eligible for selection
// by the load balancer post cancel and the connection being closed by the transport.
// Transport MAY not close the connection if cancel raced with completion and completion was
// seen by the transport before cancel. We have no way of knowing at this layer if this indeed
// happen.
//
// For H2 and above, connection are multiplexed and use virtual streams for each
// request-response exchange. Because we don't have access to the stream at this level
// we cannot close it. Instead, we use a OnStreamClosedRunnable which will be registered for the
// stream and executed when it closes. However, cancellation can happen before transport
// creates a stream. We check the ownership of the OnStreamClosedRunnable and if it was not
// owned by the transport, we mark request as finished immediately bcz
// H2ClientParentConnectionContext won't even try to open a new stream without owning the
// OnStreamClosedRunnable.
@Nullable
final OnStreamClosedRunnable onStreamClosed =
requestMetaData.context().get(OnStreamClosedRunnable.KEY);
if (onStreamClosed == null) {
LOGGER.debug("{} {} request was cancelled before receiving the full response, " +
"closing this {} connection to stop receiving more data",
connectionContext, requestMetaData, connectionContext.protocol());
closeAsync().subscribe();
} else if (onStreamClosed.own()) {
LOGGER.debug("{} request was cancelled before a stream is created, stream won't be open",
connectionContext);
onStreamClosed.run();
}
}
})
.liftSyncToSingle(new SpliceFlatStreamToMetaSingle<>(this::newSplicedResponse));
}

@Override
Expand Down Expand Up @@ -133,7 +197,9 @@ public Single<StreamingHttpResponse> request(final StreamingHttpRequest request)
flatRequest = flatRequest.subscribeOn(connectionContext.executionContext().executor(),
IoThreadFactory.IoThread::currentThreadIsIoThread);
}
Single<StreamingHttpResponse> resp = makeRequest(flatRequest, determineFlushStrategyForApi(request));
Single<StreamingHttpResponse> resp = makeRequest(request, flatRequest,
determineFlushStrategyForApi(request));

if (strategy.isMetadataReceiveOffloaded()) {
resp = resp.publishOn(
connectionContext.executionContext().executor(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import io.servicetalk.http.api.StreamingHttpRequestResponseFactory;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.http.api.StreamingHttpResponseFactory;
import io.servicetalk.http.netty.LoadBalancedStreamingHttpClient.OwnedRunnable;
import io.servicetalk.http.netty.LoadBalancedStreamingHttpClient.OnStreamClosedRunnable;
import io.servicetalk.transport.api.ConnectionObserver;
import io.servicetalk.transport.api.ConnectionObserver.MultiplexedObserver;
import io.servicetalk.transport.api.ConnectionObserver.StreamObserver;
Expand Down Expand Up @@ -245,10 +245,9 @@ public Single<StreamingHttpResponse> request(final StreamingHttpRequest request)
@Override
protected void handleSubscribe(final Subscriber<? super StreamingHttpResponse> subscriber) {
final StreamObserver observer = multiplexedObserver.onNewStream();
final StreamingHttpRequest originalReq;
final Promise<Http2StreamChannel> promise;
final SequentialCancellable sequentialCancellable;
Runnable ownedRunnable = null;
OnStreamClosedRunnable ownedRunnable = null;
try {
final EventExecutor e = parentContext.nettyChannel().eventLoop();
promise = e.newPromise();
Expand All @@ -257,12 +256,9 @@ protected void handleSubscribe(final Subscriber<? super StreamingHttpResponse> s
// LoadBalancedStreamingHttpClient may prematurely mark the request as finished before netty
// marks the stream as inactive. This code is responsible for running this Runnable in case of
// any errors or stream closure.
if (StreamingHttpRequestWithContext.class.equals(request.getClass())) {
final StreamingHttpRequestWithContext wrappedRequest =
(StreamingHttpRequestWithContext) request;
// Unwrap the original request to let the following transformations access the PayloadInfo
originalReq = wrappedRequest.unwrap();
OwnedRunnable runnable = wrappedRequest.runnable();
// See LoadBalancedStreamingHttpClient and AbstractStreamingHttpConnection.
final OnStreamClosedRunnable runnable = request.context().get(OnStreamClosedRunnable.KEY);
if (runnable != null) {
if (runnable.own()) {
ownedRunnable = runnable;
} else {
Expand All @@ -274,11 +270,10 @@ protected void handleSubscribe(final Subscriber<? super StreamingHttpResponse> s
deliverErrorFromSource(subscriber, cause);
return;
}
} else {
// User wrapped the original request object at the connection level
// (after LoadBalancedStreamingHttpClient) => Runnable will always be owned by originator
originalReq = request;
}
// If OwnedRunnable is null, it means a user wiped/modified the request context after
// LoadBalancedStreamingHttpClient => as a fallback, OwnedRunnable will always be owned by
// originator.
bs.open(promise);
sequentialCancellable = new SequentialCancellable(() -> promise.cancel(true));
} catch (Throwable cause) {
Expand All @@ -297,11 +292,11 @@ protected void handleSubscribe(final Subscriber<? super StreamingHttpResponse> s

final Runnable onCloseRunnable = ownedRunnable;
if (promise.isDone()) {
childChannelActive(promise, subscriber, sequentialCancellable, originalReq, observer,
childChannelActive(promise, subscriber, sequentialCancellable, request, observer,
allowDropTrailersReadFromTransport, onCloseRunnable);
} else {
promise.addListener((FutureListener<Http2StreamChannel>) future -> childChannelActive(
future, subscriber, sequentialCancellable, originalReq, observer,
future, subscriber, sequentialCancellable, request, observer,
allowDropTrailersReadFromTransport, onCloseRunnable));
}
}
Expand Down
Loading

0 comments on commit 4bae4c1

Please sign in to comment.