Skip to content

Commit

Permalink
TimeoutHttpRequesterFilter as a connection filter causes `ClosedCha…
Browse files Browse the repository at this point in the history
…nnelException` (#2263)

Motivation:

If applied as a connection-level filter, it skips the logic defined in
`LoadBalancedStreamingHttpClient` for the cancellation. Cancel will be
scheduled on the event-loop, then `onError` is propagated.
`LoadBalancedStreamingHttpClient` will mark the request as finished, it
won’t see cancel. As the result, the same connection can be selected for
another request, and users see `ClosedChannelException`.

Modifications:

- For `StreamingHttpConnectionFilterFactory` variant used for HTTP/1.X
connections, apply `BeforeFinallyHttpOperator` with `cancel` callback
similar to `LoadBalancedStreamingHttpClient`;
- Enhance mocks to account for new behavior;
- Move the common logic of extracting a fallback executor as a static
method in `AbstractTimeoutHttpFilter`;

Result:

HTTP/1.X requests don't fail with `ClosedChannelException` if a previous
request on the same connection times out.
  • Loading branch information
idelpivnitskiy authored Jul 13, 2022
1 parent 7071632 commit d3d4667
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 30 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2021 Apple Inc. and the ServiceTalk project authors
* Copyright © 2021-2022 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,6 +24,7 @@
import io.servicetalk.http.api.HttpRequestMetaData;
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.transport.api.ExecutionContext;

import java.time.Duration;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -98,17 +99,16 @@ public final HttpExecutionStrategy requiredOffloads() {
*
* @param request The request requiring a response.
* @param responseFunction Function which generates the response.
* @param contextExecutor Executor from the request/connection context to be used for the timeout terminal signal if
* no specific timeout executor is defined for filter.
* @param context {@link ExecutionContext} to extract fallback {@link Executor} to be used for the timeout terminal
* signal if no specific timeout executor is defined for the filter.
* @return response single
*/
final Single<StreamingHttpResponse> withTimeout(final StreamingHttpRequest request,
final Function<StreamingHttpRequest, Single<StreamingHttpResponse>> responseFunction,
final Executor contextExecutor) {

final Executor useForTimeout = null != this.timeoutExecutor ? this.timeoutExecutor : contextExecutor;

final ExecutionContext<HttpExecutionStrategy> context) {
return Single.defer(() -> {
final Executor useForTimeout = null != this.timeoutExecutor ?
this.timeoutExecutor : contextExecutor(context);
final Duration timeout = timeoutForRequest.apply(request, useForTimeout);
Single<StreamingHttpResponse> response = responseFunction.apply(request);
if (null != timeout) {
Expand All @@ -118,7 +118,7 @@ final Single<StreamingHttpResponse> withTimeout(final StreamingHttpRequest reque
final long deadline = useForTimeout.currentTime(NANOSECONDS) + timeout.toNanos();
response = timeoutResponse.map(resp -> resp.transformMessageBody(body -> defer(() -> {
final Duration remaining = ofNanos(deadline - useForTimeout.currentTime(NANOSECONDS));
return (body.timeoutTerminal(remaining, useForTimeout))
return body.timeoutTerminal(remaining, useForTimeout)
.onErrorMap(TimeoutException.class, t ->
new MappedTimeoutException("message body timeout after " + timeout.toMillis() +
"ms", t))
Expand All @@ -133,6 +133,10 @@ final Single<StreamingHttpResponse> withTimeout(final StreamingHttpRequest reque
});
}

private static Executor contextExecutor(ExecutionContext<HttpExecutionStrategy> context) {
return context.executionStrategy().hasOffloads() ? context.executor() : context.ioExecutor();
}

private static final class MappedTimeoutException extends TimeoutException {
private static final long serialVersionUID = -8230476062001221272L;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2019, 2021 Apple Inc. and the ServiceTalk project authors
* Copyright © 2019, 2021-2022 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,7 @@
import io.servicetalk.concurrent.TimeSource;
import io.servicetalk.concurrent.api.Executor;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.TerminalSignalConsumer;
import io.servicetalk.http.api.FilterableStreamingHttpClient;
import io.servicetalk.http.api.FilterableStreamingHttpConnection;
import io.servicetalk.http.api.HttpExecutionContext;
Expand All @@ -31,6 +32,9 @@
import io.servicetalk.http.api.StreamingHttpRequester;
import io.servicetalk.http.api.StreamingHttpResponse;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.function.BiFunction;

Expand All @@ -52,6 +56,8 @@
public final class TimeoutHttpRequesterFilter extends AbstractTimeoutHttpFilter
implements StreamingHttpClientFilterFactory, StreamingHttpConnectionFilterFactory {

private static final Logger LOGGER = LoggerFactory.getLogger(TimeoutHttpRequesterFilter.class);

/**
* Creates a new instance which requires only that the response metadata be received before the timeout.
*
Expand Down Expand Up @@ -161,10 +167,7 @@ public StreamingHttpClientFilter create(final FilterableStreamingHttpClient clie
@Override
protected Single<StreamingHttpResponse> request(final StreamingHttpRequester delegate,
final StreamingHttpRequest request) {
HttpExecutionContext executionContext = client.executionContext();
return TimeoutHttpRequesterFilter.this.withTimeout(request, delegate::request,
executionContext.executionStrategy().hasOffloads() ?
executionContext.executor() : executionContext.ioExecutor());
return TimeoutHttpRequesterFilter.this.withTimeout(request, delegate::request, executionContext());
}
};
}
Expand All @@ -174,10 +177,39 @@ public StreamingHttpConnectionFilter create(final FilterableStreamingHttpConnect
return new StreamingHttpConnectionFilter(connection) {
@Override
public Single<StreamingHttpResponse> request(final StreamingHttpRequest request) {
HttpExecutionContext executionContext = connection.executionContext();
return TimeoutHttpRequesterFilter.this.withTimeout(request, r -> delegate().request(r),
executionContext.executionStrategy().hasOffloads() ?
executionContext.executor() : executionContext.ioExecutor());
final FilterableStreamingHttpConnection delegate = delegate();
return TimeoutHttpRequesterFilter.this.withTimeout(request,
delegate.connectionContext().protocol().major() >= 2 ? delegate::request :
r -> delegate.request(r).liftSync(new BeforeFinallyHttpOperator(
new TerminalSignalConsumer() {
@Override
public void onComplete() {
// noop
}

@Override
public void onError(final Throwable throwable) {
// noop
}

@Override
public void cancel() {
// In alignment with cancellation processing in
// LoadBalancedStreamingHttpClient, we need to close the HTTP/1.x
// connection when this filter is applied at connection level.
// Otherwise, a connection will be marked as "free" and can be selected
// for another request, racing with closure.
LOGGER.debug("{} closing this {} connection due to cancellation",
delegate, delegate.connectionContext().protocol());
delegate.closeAsync().subscribe();
// Not necessary to do anything for HTTP/2 at this level because
// NettyChannelPublisher#cancel0 will be scheduled on the EventLoop
// prior marking the request as finished. Therefore, any new attempt to
// open a stream on the same h2-connection will see the current stream
// as already cancelled and won't result in "max-concurrent-streams"
// error.
}
})), executionContext());
}
};
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2021 Apple Inc. and the ServiceTalk project authors
* Copyright © 2021-2022 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -158,11 +158,8 @@ public StreamingHttpServiceFilter create(final StreamingHttpService service) {
public Single<StreamingHttpResponse> handle(final HttpServiceContext ctx,
final StreamingHttpRequest request,
final StreamingHttpResponseFactory responseFactory) {
HttpExecutionContext executionContext = ctx.executionContext();
return TimeoutHttpServiceFilter.this.withTimeout(request,
r -> delegate().handle(ctx, r, responseFactory),
executionContext.executionStrategy().hasOffloads() ?
executionContext.executor() : executionContext.ioExecutor());
r -> delegate().handle(ctx, r, responseFactory), ctx.executionContext());
}
};
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2021 Apple Inc. and the ServiceTalk project authors
* Copyright © 2021-2022 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -66,7 +66,9 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

abstract class AbstractTimeoutHttpFilterTest {

Expand Down Expand Up @@ -95,7 +97,7 @@ void constructorValidatesDuration() {
assertThrows(IllegalArgumentException.class, () -> newFilter(ofNanos(1L).negated()));
}

@ParameterizedTest
@ParameterizedTest(name = "{index}: fullRequestResponse={0}")
@ValueSource(booleans = {false, true})
void responseTimeout(boolean fullRequestResponse) {
TestSingle<StreamingHttpResponse> responseSingle = new TestSingle<>();
Expand All @@ -105,13 +107,13 @@ void responseTimeout(boolean fullRequestResponse) {
assertThat("No subscribe for response single", responseSingle.isSubscribed(), is(true));
}

@ParameterizedTest
@ParameterizedTest(name = "{index}: fullRequestResponse={0}")
@ValueSource(booleans = {false, true})
void responseWithZeroTimeout(boolean fullRequestResponse) {
responseWithNonPositiveTimeout(ZERO, fullRequestResponse);
}

@ParameterizedTest
@ParameterizedTest(name = "{index}: fullRequestResponse={0}")
@ValueSource(booleans = {false, true})
void responseWithNegativeTimeout(boolean fullRequestResponse) {
responseWithNonPositiveTimeout(ofNanos(1L).negated(), fullRequestResponse);
Expand All @@ -125,13 +127,17 @@ private void responseWithNonPositiveTimeout(Duration timeout, boolean fullReques
assertThat("No subscribe for payload body", responseSingle.isSubscribed(), is(true));
}

@ParameterizedTest
@ParameterizedTest(name = "{index}: fullRequestResponse={0}")
@ValueSource(booleans = {false, true})
void responseCompletesBeforeTimeout(boolean fullRequestResponse) {
TestSingle<StreamingHttpResponse> responseSingle = new TestSingle<>();
StepVerifiers.create(applyFilter(ofSeconds(DEFAULT_TIMEOUT_SECONDS / 2),
fullRequestResponse, defaultStrategy(), responseSingle))
.then(() -> immediate().schedule(() -> responseSingle.onSuccess(mock(StreamingHttpResponse.class)),
.then(() -> immediate().schedule(() -> {
StreamingHttpResponse response = mock(StreamingHttpResponse.class);
when(response.transformMessageBody(any())).thenReturn(response);
responseSingle.onSuccess(response);
},
ofMillis(50L)))
.expectSuccess()
.verify();
Expand All @@ -144,7 +150,7 @@ static Iterable<HttpExecutionStrategy> executionStrategies() {
defaultStrategy(), offloadAll());
}

@ParameterizedTest
@ParameterizedTest(name = "{index}: strategy={0}")
@MethodSource("executionStrategies")
void payloadBodyTimeout(HttpExecutionStrategy strategy) {
TestPublisher<Buffer> payloadBody = new TestPublisher<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2021 Apple Inc. and the ServiceTalk project authors
* Copyright © 2021-2022 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,6 +31,8 @@
import java.util.function.BiFunction;

import static io.servicetalk.buffer.netty.BufferAllocators.DEFAULT_ALLOCATOR;
import static io.servicetalk.concurrent.api.Completable.completed;
import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_1_1;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -67,9 +69,12 @@ private static Single<StreamingHttpResponse> applyFilter(TimeoutHttpRequesterFil

HttpConnectionContext connectionContext = mock(HttpConnectionContext.class);
when(connectionContext.executionContext()).thenReturn(executionContext);
when(connectionContext.protocol()).thenReturn(HTTP_1_1);
FilterableStreamingHttpConnection connection = mock(FilterableStreamingHttpConnection.class);
when(connection.connectionContext()).thenReturn(connectionContext);
when(connection.executionContext()).thenReturn(executionContext);
when(connection.request(any())).thenReturn(responseSingle);
when(connection.closeAsync()).thenReturn(completed());

StreamingHttpRequester requester = filterFactory.create(connection);
return requester.request(mock(StreamingHttpRequest.class));
Expand Down

0 comments on commit d3d4667

Please sign in to comment.