From d3d4667c28dea4a096a964b715365967682226ed Mon Sep 17 00:00:00 2001 From: Idel Pivnitskiy Date: Wed, 13 Jul 2022 12:09:19 -0500 Subject: [PATCH] `TimeoutHttpRequesterFilter` as a connection filter causes `ClosedChannelException` (#2263) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Motivation: If applied as a connection-level filter, it skips the logic defined in `LoadBalancedStreamingHttpClient` for the cancellation. Cancel will be scheduled on the event-loop, then `onError` is propagated. `LoadBalancedStreamingHttpClient` will mark the request as finished, it won’t see cancel. As the result, the same connection can be selected for another request, and users see `ClosedChannelException`. Modifications: - For `StreamingHttpConnectionFilterFactory` variant used for HTTP/1.X connections, apply `BeforeFinallyHttpOperator` with `cancel` callback similar to `LoadBalancedStreamingHttpClient`; - Enhance mocks to account for new behavior; - Move the common logic of extracting a fallback executor as a static method in `AbstractTimeoutHttpFilter`; Result: HTTP/1.X requests don't fail with `ClosedChannelException` if a previous request on the same connection times out. --- .../http/utils/AbstractTimeoutHttpFilter.java | 20 +++++--- .../utils/TimeoutHttpRequesterFilter.java | 50 +++++++++++++++---- .../http/utils/TimeoutHttpServiceFilter.java | 7 +-- .../utils/AbstractTimeoutHttpFilterTest.java | 20 +++++--- .../utils/TimeoutHttpRequesterFilterTest.java | 7 ++- 5 files changed, 74 insertions(+), 30 deletions(-) diff --git a/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/AbstractTimeoutHttpFilter.java b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/AbstractTimeoutHttpFilter.java index e61e4c9919..0d689cb2b7 100644 --- a/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/AbstractTimeoutHttpFilter.java +++ b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/AbstractTimeoutHttpFilter.java @@ -1,5 +1,5 @@ /* - * Copyright © 2021 Apple Inc. and the ServiceTalk project authors + * Copyright © 2021-2022 Apple Inc. and the ServiceTalk project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,6 +24,7 @@ import io.servicetalk.http.api.HttpRequestMetaData; import io.servicetalk.http.api.StreamingHttpRequest; import io.servicetalk.http.api.StreamingHttpResponse; +import io.servicetalk.transport.api.ExecutionContext; import java.time.Duration; import java.util.concurrent.TimeoutException; @@ -98,17 +99,16 @@ public final HttpExecutionStrategy requiredOffloads() { * * @param request The request requiring a response. * @param responseFunction Function which generates the response. - * @param contextExecutor Executor from the request/connection context to be used for the timeout terminal signal if - * no specific timeout executor is defined for filter. + * @param context {@link ExecutionContext} to extract fallback {@link Executor} to be used for the timeout terminal + * signal if no specific timeout executor is defined for the filter. * @return response single */ final Single withTimeout(final StreamingHttpRequest request, final Function> responseFunction, - final Executor contextExecutor) { - - final Executor useForTimeout = null != this.timeoutExecutor ? this.timeoutExecutor : contextExecutor; - + final ExecutionContext context) { return Single.defer(() -> { + final Executor useForTimeout = null != this.timeoutExecutor ? + this.timeoutExecutor : contextExecutor(context); final Duration timeout = timeoutForRequest.apply(request, useForTimeout); Single response = responseFunction.apply(request); if (null != timeout) { @@ -118,7 +118,7 @@ final Single withTimeout(final StreamingHttpRequest reque final long deadline = useForTimeout.currentTime(NANOSECONDS) + timeout.toNanos(); response = timeoutResponse.map(resp -> resp.transformMessageBody(body -> defer(() -> { final Duration remaining = ofNanos(deadline - useForTimeout.currentTime(NANOSECONDS)); - return (body.timeoutTerminal(remaining, useForTimeout)) + return body.timeoutTerminal(remaining, useForTimeout) .onErrorMap(TimeoutException.class, t -> new MappedTimeoutException("message body timeout after " + timeout.toMillis() + "ms", t)) @@ -133,6 +133,10 @@ final Single withTimeout(final StreamingHttpRequest reque }); } + private static Executor contextExecutor(ExecutionContext context) { + return context.executionStrategy().hasOffloads() ? context.executor() : context.ioExecutor(); + } + private static final class MappedTimeoutException extends TimeoutException { private static final long serialVersionUID = -8230476062001221272L; diff --git a/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/TimeoutHttpRequesterFilter.java b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/TimeoutHttpRequesterFilter.java index 15ef4ebec3..f482311565 100644 --- a/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/TimeoutHttpRequesterFilter.java +++ b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/TimeoutHttpRequesterFilter.java @@ -1,5 +1,5 @@ /* - * Copyright © 2019, 2021 Apple Inc. and the ServiceTalk project authors + * Copyright © 2019, 2021-2022 Apple Inc. and the ServiceTalk project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ import io.servicetalk.concurrent.TimeSource; import io.servicetalk.concurrent.api.Executor; import io.servicetalk.concurrent.api.Single; +import io.servicetalk.concurrent.api.TerminalSignalConsumer; import io.servicetalk.http.api.FilterableStreamingHttpClient; import io.servicetalk.http.api.FilterableStreamingHttpConnection; import io.servicetalk.http.api.HttpExecutionContext; @@ -31,6 +32,9 @@ import io.servicetalk.http.api.StreamingHttpRequester; import io.servicetalk.http.api.StreamingHttpResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.time.Duration; import java.util.function.BiFunction; @@ -52,6 +56,8 @@ public final class TimeoutHttpRequesterFilter extends AbstractTimeoutHttpFilter implements StreamingHttpClientFilterFactory, StreamingHttpConnectionFilterFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(TimeoutHttpRequesterFilter.class); + /** * Creates a new instance which requires only that the response metadata be received before the timeout. * @@ -161,10 +167,7 @@ public StreamingHttpClientFilter create(final FilterableStreamingHttpClient clie @Override protected Single request(final StreamingHttpRequester delegate, final StreamingHttpRequest request) { - HttpExecutionContext executionContext = client.executionContext(); - return TimeoutHttpRequesterFilter.this.withTimeout(request, delegate::request, - executionContext.executionStrategy().hasOffloads() ? - executionContext.executor() : executionContext.ioExecutor()); + return TimeoutHttpRequesterFilter.this.withTimeout(request, delegate::request, executionContext()); } }; } @@ -174,10 +177,39 @@ public StreamingHttpConnectionFilter create(final FilterableStreamingHttpConnect return new StreamingHttpConnectionFilter(connection) { @Override public Single request(final StreamingHttpRequest request) { - HttpExecutionContext executionContext = connection.executionContext(); - return TimeoutHttpRequesterFilter.this.withTimeout(request, r -> delegate().request(r), - executionContext.executionStrategy().hasOffloads() ? - executionContext.executor() : executionContext.ioExecutor()); + final FilterableStreamingHttpConnection delegate = delegate(); + return TimeoutHttpRequesterFilter.this.withTimeout(request, + delegate.connectionContext().protocol().major() >= 2 ? delegate::request : + r -> delegate.request(r).liftSync(new BeforeFinallyHttpOperator( + new TerminalSignalConsumer() { + @Override + public void onComplete() { + // noop + } + + @Override + public void onError(final Throwable throwable) { + // noop + } + + @Override + public void cancel() { + // In alignment with cancellation processing in + // LoadBalancedStreamingHttpClient, we need to close the HTTP/1.x + // connection when this filter is applied at connection level. + // Otherwise, a connection will be marked as "free" and can be selected + // for another request, racing with closure. + LOGGER.debug("{} closing this {} connection due to cancellation", + delegate, delegate.connectionContext().protocol()); + delegate.closeAsync().subscribe(); + // Not necessary to do anything for HTTP/2 at this level because + // NettyChannelPublisher#cancel0 will be scheduled on the EventLoop + // prior marking the request as finished. Therefore, any new attempt to + // open a stream on the same h2-connection will see the current stream + // as already cancelled and won't result in "max-concurrent-streams" + // error. + } + })), executionContext()); } }; } diff --git a/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/TimeoutHttpServiceFilter.java b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/TimeoutHttpServiceFilter.java index e889ce78c9..19131a5b78 100644 --- a/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/TimeoutHttpServiceFilter.java +++ b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/TimeoutHttpServiceFilter.java @@ -1,5 +1,5 @@ /* - * Copyright © 2021 Apple Inc. and the ServiceTalk project authors + * Copyright © 2021-2022 Apple Inc. and the ServiceTalk project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -158,11 +158,8 @@ public StreamingHttpServiceFilter create(final StreamingHttpService service) { public Single handle(final HttpServiceContext ctx, final StreamingHttpRequest request, final StreamingHttpResponseFactory responseFactory) { - HttpExecutionContext executionContext = ctx.executionContext(); return TimeoutHttpServiceFilter.this.withTimeout(request, - r -> delegate().handle(ctx, r, responseFactory), - executionContext.executionStrategy().hasOffloads() ? - executionContext.executor() : executionContext.ioExecutor()); + r -> delegate().handle(ctx, r, responseFactory), ctx.executionContext()); } }; } diff --git a/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/AbstractTimeoutHttpFilterTest.java b/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/AbstractTimeoutHttpFilterTest.java index e36cf0eb37..9fdb896081 100644 --- a/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/AbstractTimeoutHttpFilterTest.java +++ b/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/AbstractTimeoutHttpFilterTest.java @@ -1,5 +1,5 @@ /* - * Copyright © 2021 Apple Inc. and the ServiceTalk project authors + * Copyright © 2021-2022 Apple Inc. and the ServiceTalk project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -66,7 +66,9 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; abstract class AbstractTimeoutHttpFilterTest { @@ -95,7 +97,7 @@ void constructorValidatesDuration() { assertThrows(IllegalArgumentException.class, () -> newFilter(ofNanos(1L).negated())); } - @ParameterizedTest + @ParameterizedTest(name = "{index}: fullRequestResponse={0}") @ValueSource(booleans = {false, true}) void responseTimeout(boolean fullRequestResponse) { TestSingle responseSingle = new TestSingle<>(); @@ -105,13 +107,13 @@ void responseTimeout(boolean fullRequestResponse) { assertThat("No subscribe for response single", responseSingle.isSubscribed(), is(true)); } - @ParameterizedTest + @ParameterizedTest(name = "{index}: fullRequestResponse={0}") @ValueSource(booleans = {false, true}) void responseWithZeroTimeout(boolean fullRequestResponse) { responseWithNonPositiveTimeout(ZERO, fullRequestResponse); } - @ParameterizedTest + @ParameterizedTest(name = "{index}: fullRequestResponse={0}") @ValueSource(booleans = {false, true}) void responseWithNegativeTimeout(boolean fullRequestResponse) { responseWithNonPositiveTimeout(ofNanos(1L).negated(), fullRequestResponse); @@ -125,13 +127,17 @@ private void responseWithNonPositiveTimeout(Duration timeout, boolean fullReques assertThat("No subscribe for payload body", responseSingle.isSubscribed(), is(true)); } - @ParameterizedTest + @ParameterizedTest(name = "{index}: fullRequestResponse={0}") @ValueSource(booleans = {false, true}) void responseCompletesBeforeTimeout(boolean fullRequestResponse) { TestSingle responseSingle = new TestSingle<>(); StepVerifiers.create(applyFilter(ofSeconds(DEFAULT_TIMEOUT_SECONDS / 2), fullRequestResponse, defaultStrategy(), responseSingle)) - .then(() -> immediate().schedule(() -> responseSingle.onSuccess(mock(StreamingHttpResponse.class)), + .then(() -> immediate().schedule(() -> { + StreamingHttpResponse response = mock(StreamingHttpResponse.class); + when(response.transformMessageBody(any())).thenReturn(response); + responseSingle.onSuccess(response); + }, ofMillis(50L))) .expectSuccess() .verify(); @@ -144,7 +150,7 @@ static Iterable executionStrategies() { defaultStrategy(), offloadAll()); } - @ParameterizedTest + @ParameterizedTest(name = "{index}: strategy={0}") @MethodSource("executionStrategies") void payloadBodyTimeout(HttpExecutionStrategy strategy) { TestPublisher payloadBody = new TestPublisher<>(); diff --git a/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/TimeoutHttpRequesterFilterTest.java b/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/TimeoutHttpRequesterFilterTest.java index 8ce5f6955e..bc7d31a54c 100644 --- a/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/TimeoutHttpRequesterFilterTest.java +++ b/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/TimeoutHttpRequesterFilterTest.java @@ -1,5 +1,5 @@ /* - * Copyright © 2021 Apple Inc. and the ServiceTalk project authors + * Copyright © 2021-2022 Apple Inc. and the ServiceTalk project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,6 +31,8 @@ import java.util.function.BiFunction; import static io.servicetalk.buffer.netty.BufferAllocators.DEFAULT_ALLOCATOR; +import static io.servicetalk.concurrent.api.Completable.completed; +import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_1_1; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -67,9 +69,12 @@ private static Single applyFilter(TimeoutHttpRequesterFil HttpConnectionContext connectionContext = mock(HttpConnectionContext.class); when(connectionContext.executionContext()).thenReturn(executionContext); + when(connectionContext.protocol()).thenReturn(HTTP_1_1); FilterableStreamingHttpConnection connection = mock(FilterableStreamingHttpConnection.class); + when(connection.connectionContext()).thenReturn(connectionContext); when(connection.executionContext()).thenReturn(executionContext); when(connection.request(any())).thenReturn(responseSingle); + when(connection.closeAsync()).thenReturn(completed()); StreamingHttpRequester requester = filterFactory.create(connection); return requester.request(mock(StreamingHttpRequest.class));