Skip to content

Commit

Permalink
Fix increased latency for gRPC after adding `ListenableAsyncCloseable…
Browse files Browse the repository at this point in the history
….onClosing()` (#2473)

Motivation:

#2430 adds `ListenableAsyncCloseable.onClosing()` method. Its implementations
require allocation of a `CompletableProcessor`.
Because we wrap channel's `EventLoop` with `IoExecutor` on every new connection
and every new HTTP/2 stream, it caused significant increase of allocations and
pressure for GC. As the result, latency increased for all HTTP/2 and gRPC
use-cases.

Modifications:

- Add `ExecutionContextUtils` that adapt `EventLoop` to `IoExecutor` API for
`ExecutionContext` and cache its value in `FastThreadLocal` to reduce
allocations;
- Make sure that HTTP/1.x and parent HTTP/2 connections use
`ExecutionContextUtils`;
- For each new HTTP/2 stream (child channel) take execution context from the
parent channel because child channel uses the same `EventLoop` as its parent
channel;
- Add a new `DefaultNettyConnection.initChannel` overload that takes already
constructed `ExecutionContext`;
- Adjust existing tests for new API;

Result:

Significantly reduced allocation rate and less GC pauses for HTTP/2 and gRPC.
  • Loading branch information
idelpivnitskiy authored Jan 5, 2023
1 parent f9afee4 commit 5e3e4ce
Show file tree
Hide file tree
Showing 11 changed files with 178 additions and 48 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright © 2022 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.http.netty;

import io.servicetalk.concurrent.api.ListenableAsyncCloseable;
import io.servicetalk.http.api.DelegatingHttpExecutionContext;
import io.servicetalk.http.api.HttpExecutionContext;
import io.servicetalk.transport.api.IoExecutor;

import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.util.concurrent.FastThreadLocal;

import static io.servicetalk.transport.netty.internal.NettyIoExecutors.fromNettyEventLoop;

final class ExecutionContextUtils {

private static final FastThreadLocal<IoExecutor> CHANNEL_IO_EXECUTOR = new FastThreadLocal<>();

private ExecutionContextUtils() {
// No instances
}

/**
* Utility that maps {@link Channel#eventLoop()} into {@link IoExecutor} and caches the result for future mappings
* to reduce allocations. Because {@link IoExecutor} implements {@link ListenableAsyncCloseable} interface, its
* allocation cost is relatively high.
*
* @param channel {@link Channel} registered for a single {@link EventLoop} thread
* @param builderExecutionContext {@link HttpExecutionContext} pre-computed by the builder for new connections
* @return {@link HttpExecutionContext} which has {@link IoExecutor} backed by a single {@link EventLoop} thread
* associated with the passed {@link Channel}.
*/
static HttpExecutionContext channelExecutionContext(final Channel channel,
final HttpExecutionContext builderExecutionContext) {
final IoExecutor channelIoExecutor = fromChannel(channel,
builderExecutionContext.ioExecutor().isIoThreadSupported());
return new DelegatingHttpExecutionContext(builderExecutionContext) {
@Override
public IoExecutor ioExecutor() {
return channelIoExecutor;
}
};
}

private static IoExecutor fromChannel(final Channel channel, boolean isIoThreadSupported) {
assert channel.eventLoop().inEventLoop();
IoExecutor ioExecutor = CHANNEL_IO_EXECUTOR.getIfExists();
if (ioExecutor != null) {
return ioExecutor;
}
ioExecutor = fromNettyEventLoop(channel.eventLoop(), isIoThreadSupported);
CHANNEL_IO_EXECUTOR.set(ioExecutor);
return ioExecutor;
}

// for tests only
static void clearThreadLocal() {
CHANNEL_IO_EXECUTOR.remove();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import io.servicetalk.concurrent.SingleSource;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.internal.DelayedCancellable;
import io.servicetalk.http.api.DefaultHttpExecutionContext;
import io.servicetalk.http.api.HttpConnectionContext;
import io.servicetalk.http.api.HttpExecutionContext;
import io.servicetalk.http.api.HttpProtocolVersion;
Expand Down Expand Up @@ -54,9 +53,9 @@
import static io.servicetalk.concurrent.api.Processors.newSingleProcessor;
import static io.servicetalk.concurrent.api.SourceAdapters.fromSource;
import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_2_0;
import static io.servicetalk.http.netty.ExecutionContextUtils.channelExecutionContext;
import static io.servicetalk.http.netty.NettyHttp2ExceptionUtils.wrapIfNecessary;
import static io.servicetalk.transport.netty.internal.ChannelCloseUtils.assignConnectionError;
import static io.servicetalk.transport.netty.internal.NettyIoExecutors.fromNettyEventLoop;
import static io.servicetalk.transport.netty.internal.NettyPipelineSslUtils.extractSslSessionAndReport;
import static io.servicetalk.transport.netty.internal.SocketOptionUtils.getOption;

Expand All @@ -77,9 +76,7 @@ class H2ParentConnectionContext extends NettyChannelListenableAsyncCloseable imp
@Nullable final SslConfig sslConfig,
final KeepAliveManager keepAliveManager) {
super(channel, executionContext.executor());
this.executionContext = new DefaultHttpExecutionContext(executionContext.bufferAllocator(),
fromNettyEventLoop(channel.eventLoop(), executionContext.ioExecutor().isIoThreadSupported()),
executionContext.executor(), executionContext.executionStrategy());
this.executionContext = channelExecutionContext(channel, executionContext);
this.flushStrategyHolder = new FlushStrategyHolder(flushStrategy);
this.sslConfig = sslConfig;
this.idleTimeoutMs = idleTimeoutMs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_2_0;
import static io.servicetalk.http.api.StreamingHttpRequests.newTransportRequest;
import static io.servicetalk.http.netty.AbstractStreamingHttpConnection.isSafeToAggregateOrEmpty;
import static io.servicetalk.http.netty.ExecutionContextUtils.channelExecutionContext;
import static io.servicetalk.http.netty.HeaderUtils.REQ_EXPECT_CONTINUE;
import static io.servicetalk.http.netty.HeaderUtils.addResponseTransferEncodingIfNecessary;
import static io.servicetalk.http.netty.HeaderUtils.canAddResponseContentLength;
Expand Down Expand Up @@ -141,18 +142,18 @@ private static Throwable newH1ConfigException() {
}

static Single<NettyHttpServerConnection> initChannel(final Channel channel,
final HttpExecutionContext httpExecutionContext,
final HttpExecutionContext builderExecutionContext,
final ReadOnlyHttpServerConfig config,
final ChannelInitializer initializer,
final StreamingHttpService service,
final boolean drainRequestPayloadBody,
final ConnectionObserver observer) {
return initChannel(channel, httpExecutionContext, config, initializer, service, drainRequestPayloadBody,
return initChannel(channel, builderExecutionContext, config, initializer, service, drainRequestPayloadBody,
observer, forPipelinedRequestResponse(false, channel.config()));
}

private static Single<NettyHttpServerConnection> initChannel(final Channel channel,
final HttpExecutionContext httpExecutionContext,
final HttpExecutionContext builderExecutionContext,
final ReadOnlyHttpServerConfig config,
final ChannelInitializer initializer,
final StreamingHttpService service,
Expand All @@ -165,15 +166,15 @@ private static Single<NettyHttpServerConnection> initChannel(final Channel chann
}
final ReadOnlyTcpServerConfig tcpConfig = config.tcpConfig();
return showPipeline(DefaultNettyConnection.initChannel(channel,
httpExecutionContext.bufferAllocator(), httpExecutionContext.executor(),
httpExecutionContext.ioExecutor(), closeHandler, tcpConfig.flushStrategy(), tcpConfig.idleTimeoutMs(),
tcpConfig.sslConfig(),
initializer.andThen(getChannelInitializer(getByteBufAllocator(httpExecutionContext.bufferAllocator()),
h1Config, closeHandler)), httpExecutionContext.executionStrategy(), HTTP_1_1, observer, false,
__ -> false)
channelExecutionContext(channel, builderExecutionContext),
closeHandler, tcpConfig.flushStrategy(), tcpConfig.idleTimeoutMs(), tcpConfig.sslConfig(),
initializer.andThen(getChannelInitializer(
getByteBufAllocator(builderExecutionContext.bufferAllocator()), h1Config, closeHandler)),
HTTP_1_1, observer, false, __ -> false)
.map(conn -> new NettyHttpServerConnection(conn, service,
HTTP_1_1, h1Config.headersFactory(), drainRequestPayloadBody,
config.allowDropTrailersReadFromTransport())), HTTP_1_1, channel);
config.allowDropTrailersReadFromTransport())),
HTTP_1_1, channel);
}

private static ChannelInitializer getChannelInitializer(final ByteBufAllocator alloc, final H1ProtocolConfig config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import static io.servicetalk.buffer.netty.BufferUtils.getByteBufAllocator;
import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_1_1;
import static io.servicetalk.http.netty.ExecutionContextUtils.channelExecutionContext;
import static io.servicetalk.http.netty.HeaderUtils.OBJ_EXPECT_CONTINUE;
import static io.servicetalk.http.netty.HttpDebugUtils.showPipeline;
import static io.servicetalk.transport.netty.internal.CloseHandler.forPipelinedRequestResponse;
Expand All @@ -60,17 +61,17 @@ static <ResolvedAddress> Single<? extends NettyConnection<Object, Object>> build
observer);
}

static Single<? extends DefaultNettyConnection<Object, Object>> createConnection(final Channel channel,
final HttpExecutionContext executionContext, final H1ProtocolConfig h1Config,
static Single<? extends NettyConnection<Object, Object>> createConnection(final Channel channel,
final HttpExecutionContext builderExecutionContext, final H1ProtocolConfig h1Config,
final ReadOnlyTcpClientConfig tcpConfig, final ChannelInitializer initializer,
final ConnectionObserver connectionObserver) {
final CloseHandler closeHandler = forPipelinedRequestResponse(true, channel.config());
return showPipeline(DefaultNettyConnection.initChannel(channel, executionContext.bufferAllocator(),
executionContext.executor(), executionContext.ioExecutor(), closeHandler,
return showPipeline(DefaultNettyConnection.initChannel(channel,
channelExecutionContext(channel, builderExecutionContext), closeHandler,
tcpConfig.flushStrategy(), tcpConfig.idleTimeoutMs(), tcpConfig.sslConfig(),
initializer.andThen(new HttpClientChannelInitializer(
getByteBufAllocator(executionContext.bufferAllocator()), h1Config, closeHandler)),
executionContext.executionStrategy(), HTTP_1_1, connectionObserver, true, OBJ_EXPECT_CONTINUE),
getByteBufAllocator(builderExecutionContext.bufferAllocator()), h1Config, closeHandler)),
HTTP_1_1, connectionObserver, true, OBJ_EXPECT_CONTINUE),
HTTP_1_1, channel);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import io.servicetalk.concurrent.api.TestSubscription;
import io.servicetalk.concurrent.test.internal.TestPublisherSubscriber;
import io.servicetalk.transport.api.ConnectionInfo.Protocol;
import io.servicetalk.transport.api.DefaultExecutionContext;
import io.servicetalk.transport.api.ExecutionContext;
import io.servicetalk.transport.api.RetryableException;
import io.servicetalk.transport.netty.internal.CloseHandler;
import io.servicetalk.transport.netty.internal.DefaultNettyConnection;
Expand Down Expand Up @@ -64,6 +66,7 @@
import static io.servicetalk.http.api.HttpExecutionStrategies.defaultStrategy;
import static io.servicetalk.transport.netty.internal.CloseHandler.UNSUPPORTED_PROTOCOL_CLOSE_HANDLER;
import static io.servicetalk.transport.netty.internal.FlushStrategies.defaultFlushStrategy;
import static io.servicetalk.transport.netty.internal.NettyIoExecutors.fromNettyEventLoop;
import static java.lang.Integer.MAX_VALUE;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.hamcrest.MatcherAssert.assertThat;
Expand Down Expand Up @@ -93,23 +96,26 @@ class NettyPipelinedConnectionTest {

@BeforeEach
void setUp() throws Exception {
ExecutionContextUtils.clearThreadLocal();
channel = new EmbeddedDuplexChannel(false);
WriteDemandEstimator demandEstimator = mock(WriteDemandEstimator.class);
writePublisher1 = new TestPublisher<>();
writePublisher2 = new TestPublisher<>();
when(demandEstimator.estimateRequestN(anyLong())).then(invocation1 -> MAX_VALUE);
CloseHandler closeHandler = UNSUPPORTED_PROTOCOL_CLOSE_HANDLER;
ExecutionContext<?> executionContext = new DefaultExecutionContext<>(DEFAULT_ALLOCATOR,
fromNettyEventLoop(channel.eventLoop(), false), immediate(), defaultStrategy());
final DefaultNettyConnection<Integer, Integer> connection =
DefaultNettyConnection.<Integer, Integer>initChannel(channel, DEFAULT_ALLOCATOR,
immediate(), null, closeHandler, defaultFlushStrategy(), 0L, null, channel2 -> {
DefaultNettyConnection.<Integer, Integer>initChannel(channel, executionContext,
closeHandler, defaultFlushStrategy(), 0L, null, channel2 -> {
channel2.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.fireChannelRead(msg);
closeHandler.protocolPayloadEndInbound(ctx);
}
});
}, defaultStrategy(), mock(Protocol.class), NoopConnectionObserver.INSTANCE, true, __ -> false)
}, mock(Protocol.class), NoopConnectionObserver.INSTANCE, true, __ -> false)
.toFuture().get();
requester = new NettyPipelinedConnection<>(connection, 8);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class ServerRespondsOnClosingTest {
private final Queue<Exchange> requests = new ArrayDeque<>();

ServerRespondsOnClosingTest() throws Exception {
ExecutionContextUtils.clearThreadLocal();
channel = new EmbeddedDuplexChannel(false);
DefaultHttpExecutionContext httpExecutionContext = new DefaultHttpExecutionContext(DEFAULT_ALLOCATOR,
fromNettyEventLoop(channel.eventLoop()), immediate(), offloadNone());
Expand Down
Loading

0 comments on commit 5e3e4ce

Please sign in to comment.