Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix client thread leak after origin reload. #421

Merged
merged 7 commits into from
Jun 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (C) 2013-2018 Expedia Inc.
Copyright (C) 2013-2019 Expedia Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -23,6 +23,7 @@

import java.io.Closeable;
import java.util.EventListener;
import java.util.concurrent.CompletableFuture;

/**
* A connection to an origin.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (C) 2013-2018 Expedia Inc.
Copyright (C) 2013-2019 Expedia Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -57,15 +57,6 @@ private StyxHttpClient(NettyConnectionFactory connectionFactory, Builder paramet
this.connectionFactory = connectionFactory;
}

/**
* Shuts the styx HTTP client thread pool.
*
* @return A {@link CompletableFuture} that completes when the thread pool is terminated
*/
public CompletableFuture<Void> shutdown() {
return connectionFactory.close();
}

/**
* Indicates that a request should be sent using secure {@code https} protocol.
*
Expand Down Expand Up @@ -183,7 +174,6 @@ private static Origin originFromRequest(LiveHttpRequest request, Boolean isHttps
* Builder for {@link StyxHttpClient}.
*/
public static class Builder {
private String threadName = "simple-netty-http-client";
private int connectTimeoutMillis = 1000;
private int maxResponseSize = 1024 * 100;
private int responseTimeout = 60000;
Expand All @@ -196,7 +186,6 @@ public Builder() {
}

Builder(Builder another) {
this.threadName = another.threadName;
this.connectTimeoutMillis = another.connectTimeoutMillis;
this.maxResponseSize = another.maxResponseSize;
this.responseTimeout = another.responseTimeout;
Expand All @@ -206,17 +195,6 @@ public Builder() {
this.userAgent = another.userAgent;
}

/**
* Sets a thread name used for the thread pool.
*
* @param threadName thread name
* @return this {@link Builder}
*/
public Builder threadName(String threadName) {
this.threadName = threadName;
return this;
}

/**
* Sets the TCP connection timeout.
*
Expand Down Expand Up @@ -339,7 +317,6 @@ Builder copy() {
*/
public StyxHttpClient build() {
NettyConnectionFactory connectionFactory = new NettyConnectionFactory.Builder()
.name(threadName)
.httpConfig(newHttpConfigBuilder().setMaxHeadersSize(maxHeaderSize).build())
.tlsSettings(tlsSettings)
.httpRequestOperationFactory(request -> new HttpRequestOperation(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (C) 2013-2018 Expedia Inc.
Copyright (C) 2013-2019 Expedia Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.hotels.styx.client.HttpRequestOperationFactory;
import com.hotels.styx.client.netty.eventloop.PlatformAwareClientEventLoopGroupFactory;
import com.hotels.styx.client.ssl.SslContextFactory;
import com.hotels.styx.common.CompletableFutures;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
Expand All @@ -36,7 +35,6 @@
import reactor.core.publisher.Mono;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import static com.hotels.styx.client.HttpConfig.defaultHttpConfig;
import static com.hotels.styx.client.HttpRequestOperationFactory.Builder.httpRequestOperationFactoryBuilder;
Expand All @@ -50,6 +48,12 @@
* A connection factory that creates connections using netty.
*/
public class NettyConnectionFactory implements Connection.Factory {
private static final PlatformAwareClientEventLoopGroupFactory FACTORY = new PlatformAwareClientEventLoopGroupFactory(
"Styx-Client-Global",
0);
private static final Class<? extends Channel> GLOBAL_CLIENT_EVENT_LOOP_CLASS = FACTORY.clientSocketChannelClass();
private static final EventLoopGroup GLOBAL_CLIENT_EVENT_LOOP = FACTORY.newClientWorkerEventLoopGroup();

private final HttpConfig httpConfig;
private final SslContext sslContext;
private final boolean sendSni;
Expand All @@ -60,14 +64,10 @@ public class NettyConnectionFactory implements Connection.Factory {
private Class<? extends Channel> clientSocketChannelClass;

private NettyConnectionFactory(Builder builder) {
PlatformAwareClientEventLoopGroupFactory eventLoopGroupFactory = new PlatformAwareClientEventLoopGroupFactory(
builder.name,
builder.clientWorkerThreadsCount
);
this.eventLoopGroup = eventLoopGroupFactory.newClientWorkerEventLoopGroup();
this.clientSocketChannelClass = requireNonNull(builder.channelClass);
this.eventLoopGroup = requireNonNull(builder.eventLoopGroup);
this.httpConfig = requireNonNull(builder.httpConfig);
this.sslContext = builder.tlsSettings == null ? null : SslContextFactory.get(builder.tlsSettings);
this.clientSocketChannelClass = eventLoopGroupFactory.clientSocketChannelClass();
this.httpRequestOperationFactory = requireNonNull(builder.httpRequestOperationFactory);
this.sendSni = builder.tlsSettings != null && builder.tlsSettings.sendSni();
this.sniHost = builder.tlsSettings != null ? builder.tlsSettings.sniHost() : Optional.empty();
Expand Down Expand Up @@ -114,10 +114,6 @@ private synchronized void bootstrap(ConnectionSettings connectionSettings) {
}
}

public CompletableFuture<Void> close() {
return CompletableFutures.fromNettyFuture(eventLoopGroup.shutdownGracefully());
}

private class Initializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) {
Expand All @@ -129,30 +125,14 @@ protected void initChannel(Channel ch) {
*/
public static final class Builder {
private HttpRequestOperationFactory httpRequestOperationFactory = httpRequestOperationFactoryBuilder().build();
private String name = "Styx-Client";
private int clientWorkerThreadsCount = 1;
private HttpConfig httpConfig = defaultHttpConfig();
private TlsSettings tlsSettings;
private EventLoopGroup eventLoopGroup = GLOBAL_CLIENT_EVENT_LOOP;
private Class<? extends Channel> channelClass = GLOBAL_CLIENT_EVENT_LOOP_CLASS;

/**
* Sets the name.
*
* @param name name
* @return this builder
*/
public Builder name(String name) {
this.name = requireNonNull(name);
return this;
}

/**
* Sets number of client worker threads.
*
* @param clientWorkerThreadsCount number of client worker threads
* @return this builder
*/
public Builder clientWorkerThreadsCount(int clientWorkerThreadsCount) {
this.clientWorkerThreadsCount = clientWorkerThreadsCount;
public Builder nettyEventLoop(EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) {
this.eventLoopGroup = requireNonNull(eventLoopGroup);
this.channelClass = requireNonNull(channelClass);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (C) 2013-2018 Expedia Inc.
Copyright (C) 2013-2019 Expedia Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -45,7 +45,6 @@
import static com.hotels.styx.common.StyxFutures.await;
import static com.hotels.styx.support.server.UrlMatchingStrategies.urlStartingWith;
import static java.lang.String.format;
import static java.lang.Thread.getAllStackTraces;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -80,28 +79,6 @@ public void tearDown() {
server.stop();
}

@Test
public void closesThreadPoolAfterUse() throws InterruptedException, ExecutionException {
StyxHttpClient client = new StyxHttpClient.Builder()
.threadName("test-client")
.build();

// Ensures that a thread is created before the assertions
client.send(httpRequest).get();

assertThat(threadExists("test-client"), is(true));

client.shutdown().get();

assertThat(threadExists("test-client"), is(false));
}

private static Boolean threadExists(String threadName) {
return getAllStackTraces().keySet().stream()
.anyMatch(thread ->
thread.getName().startsWith(threadName));
}

/*
* StyxHttpClient.Builder
* - Cannot retrospectively modify user agent string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import com.hotels.styx.routing.handlers.HttpInterceptorPipeline;
import com.hotels.styx.startup.PipelineFactory;
import com.hotels.styx.startup.StyxServerComponents;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;

import java.util.List;
import java.util.Map;
Expand All @@ -51,18 +53,25 @@ public final class StyxPipelineFactory implements PipelineFactory {
private final Environment environment;
private final Map<String, StyxService> services;
private final List<NamedPlugin> plugins;
private final EventLoopGroup eventLoopGroup;
private final Class<? extends SocketChannel> nettySocketChannelClass;


public StyxPipelineFactory(
StyxObjectStore<RoutingObjectRecord> routeDb,
RoutingObjectFactory routingObjectFactory,
Environment environment,
Map<String, StyxService> services,
List<NamedPlugin> plugins) {
List<NamedPlugin> plugins,
EventLoopGroup eventLoopGroup,
Class<? extends SocketChannel> nettySocketChannelClass) {
this.routeDb = requireNonNull(routeDb);
this.routingObjectFactory = requireNonNull(routingObjectFactory);
this.environment = requireNonNull(environment);
this.services = requireNonNull(services);
this.plugins = requireNonNull(plugins);
this.eventLoopGroup = requireNonNull(eventLoopGroup);
this.nettySocketChannelClass = requireNonNull(nettySocketChannelClass);
}

@Override
Expand All @@ -87,7 +96,7 @@ private RoutingObject configuredPipeline(RoutingObjectFactory routingObjectFacto
})
.orElseGet(() -> {
Registry<BackendService> backendServicesRegistry = (Registry<BackendService>) services.get("backendServiceRegistry");
return new StaticPipelineFactory(environment, backendServicesRegistry, plugins, requestTracking);
return new StaticPipelineFactory(environment, backendServicesRegistry, plugins, eventLoopGroup, nettySocketChannelClass, requestTracking);
});

return pipelineBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,9 @@ public StyxServer(StyxServerComponents components, Stopwatch stopwatch) {
components.routingObjectFactory(),
components.environment(),
components.services(),
components.plugins()));
components.plugins(),
components.eventLoopGroup(),
components.nettySocketChannelClass()));

this.proxyServer = proxyServerSetUp.createProxyServer(components);
this.adminServer = createAdminServer(components);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.hotels.styx.api.extension.service.BackendService;
import com.hotels.styx.api.extension.service.ConnectionPoolSettings;
import com.hotels.styx.api.extension.service.HealthCheckConfig;
import com.hotels.styx.api.extension.service.TlsSettings;
import com.hotels.styx.api.extension.service.spi.Registry;
import com.hotels.styx.client.BackendServiceClient;
import com.hotels.styx.client.Connection;
Expand All @@ -44,6 +45,8 @@
import com.hotels.styx.client.healthcheck.UrlRequestHealthCheck;
import com.hotels.styx.client.netty.connectionpool.NettyConnectionFactory;
import com.hotels.styx.server.HttpRouter;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import org.slf4j.Logger;

import java.util.Map;
Expand All @@ -67,16 +70,22 @@ public class BackendServicesRouter implements HttpRouter, Registry.ChangeListene

private final BackendServiceClientFactory clientFactory;
private final Environment environment;
private final EventLoopGroup nettyEventLoopGroup;
private final Class<? extends SocketChannel> socketChannelClass;
private final ConcurrentMap<String, ProxyToClientPipeline> routes;
private final int clientWorkerThreadsCount;

public BackendServicesRouter(BackendServiceClientFactory clientFactory, Environment environment) {
public BackendServicesRouter(BackendServiceClientFactory clientFactory,
Environment environment,
EventLoopGroup nettyEventLoopGroup,
Class<? extends SocketChannel> socketChannelClass) {
this.clientFactory = requireNonNull(clientFactory);
this.environment = environment;
this.environment = requireNonNull(environment);
this.nettyEventLoopGroup = requireNonNull(nettyEventLoopGroup);
this.socketChannelClass = requireNonNull(socketChannelClass);

this.routes = new ConcurrentSkipListMap<>(
comparingInt(String::length).reversed()
.thenComparing(naturalOrder()));
this.clientWorkerThreadsCount = environment.styxConfig().proxyServerConfig().clientWorkerThreadsCount();
}

ConcurrentMap<String, ProxyToClientPipeline> routes() {
Expand Down Expand Up @@ -115,7 +124,10 @@ public void onChange(Registry.Changes<BackendService> changes) {
ConnectionPoolSettings poolSettings = backendService.connectionPoolConfig();

Connection.Factory connectionFactory = connectionFactory(
backendService,
nettyEventLoopGroup,
socketChannelClass,
backendService.responseTimeoutMillis(),
backendService.tlsSettings().orElse(null),
requestLoggingEnabled,
longFormat,
originStatsFactory,
Expand Down Expand Up @@ -148,7 +160,6 @@ public void onChange(Registry.Changes<BackendService> changes) {
pipeline = new ProxyToClientPipeline(newClientHandler(backendService, inventory, originStatsFactory), () -> {
inventory.close();
healthStatusMonitor.stop();
healthCheckClient.shutdown();
});

routes.put(backendService.path(), pipeline);
Expand All @@ -170,7 +181,6 @@ private OriginHealthStatusMonitor healthStatusMonitor(BackendService backendServ
private StyxHttpClient healthCheckClient(BackendService backendService) {
StyxHttpClient.Builder builder = new StyxHttpClient.Builder()
.connectTimeout(backendService.connectionPoolConfig().connectTimeoutMillis(), MILLISECONDS)
.threadName("Health-Check-Monitor-" + backendService.id())
.userAgent("Styx/" + environment.buildInfo().releaseVersion());

backendService.tlsSettings().ifPresent(builder::tlsSettings);
Expand All @@ -179,25 +189,27 @@ private StyxHttpClient healthCheckClient(BackendService backendService) {
}

private Connection.Factory connectionFactory(
BackendService backendService,
EventLoopGroup nettyEventLoopGroup,
Class<? extends SocketChannel> socketChannelClass,
int responseTimeoutMillis,
TlsSettings tlsSettings,
boolean requestLoggingEnabled,
boolean longFormat,
OriginStatsFactory originStatsFactory,
long connectionExpiration) {

Connection.Factory factory = new NettyConnectionFactory.Builder()
.name("Styx")
.nettyEventLoop(nettyEventLoopGroup, socketChannelClass)
.httpRequestOperationFactory(
httpRequestOperationFactoryBuilder()
.flowControlEnabled(true)
.originStatsFactory(originStatsFactory)
.responseTimeoutMillis(backendService.responseTimeoutMillis())
.responseTimeoutMillis(responseTimeoutMillis)
.requestLoggingEnabled(requestLoggingEnabled)
.longFormat(longFormat)
.build()
)
.clientWorkerThreadsCount(clientWorkerThreadsCount)
.tlsSettings(backendService.tlsSettings().orElse(null))
.tlsSettings(tlsSettings)
.build();

if (connectionExpiration > 0) {
Expand Down
Loading