Skip to content

Commit

Permalink
Add graceful shutdown method to ClientChannelManager (#1483)
Browse files Browse the repository at this point in the history
* Add graceful shutdown methods to ClientChannelManager and IConnectionPool

* Move explicit close during release to connection pool

* Add header

* Switch to for-each

* Handle no connections for event loop
  • Loading branch information
jguerra authored Feb 28, 2023
1 parent 8972924 commit a4539e1
Show file tree
Hide file tree
Showing 7 changed files with 440 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ public interface ClientChannelManager

void shutdown();

default void gracefulShutdown() {
shutdown();
}

boolean release(PooledConnection conn);

Promise<PooledConnection> acquire(EventLoop eventLoop);
Expand All @@ -60,4 +64,5 @@ Promise<PooledConnection> acquire(
int getConnsInUse();

ConnectionPoolConfig getConfig();

}
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,19 @@ public void shutdown() {
}
}

/**
* Gracefully shuts down a DefaultClientChannelManager by allowing in-flight requests to finish before closing the connections.
* Idle connections in the connection pools are closed, and any connections associated with an in-flight request
* will be closed upon trying to return the connection to the pool
*/
@Override
public void gracefulShutdown() {
LOG.info("Starting a graceful shutdown of {}", clientConfig.getClientName());
shuttingDown = true;
dynamicServerResolver.shutdown();
perServerPools.values().forEach(IConnectionPool::drain);
}

@Override
public boolean release(final PooledConnection conn) {

Expand All @@ -224,10 +237,6 @@ public boolean release(final PooledConnection conn) {
discoveryResult.decrementActiveRequestsCount();
discoveryResult.incrementNumRequests();

if (shuttingDown) {
return false;
}

boolean released = false;

// if the connection has been around too long (i.e. too many requests), then close it
Expand Down Expand Up @@ -268,14 +277,14 @@ else if (!conn.isActive()) {
released = pool.release(conn);
}
else {
// The pool for this server no longer exists (maybe due to it failling out of
// The pool for this server no longer exists (maybe due to it falling out of
// discovery).
conn.setInPool(false);
released = false;
conn.close();
}

if (LOG.isDebugEnabled()) LOG.debug("PooledConnection released: {}", conn.toString());
LOG.debug("PooledConnection released: {}", conn);
}

return released;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,13 @@ Promise<PooledConnection> acquire(
boolean release(PooledConnection conn);
boolean remove(PooledConnection conn);
void shutdown();

default void drain() {
shutdown();
}
boolean isAvailable();
int getConnsInUse();
int getConnsInPool();
ConnectionPoolConfig getConfig();

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.Deque;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
Expand Down Expand Up @@ -77,6 +79,8 @@ public class PerServerConnectionPool implements IConnectionPool
*/
private final AtomicInteger connCreationsInProgress;

private volatile boolean draining;

public PerServerConnectionPool(
DiscoveryResult server,
SocketAddress serverAddr,
Expand All @@ -87,7 +91,8 @@ public PerServerConnectionPool(
Counter createNewConnCounter,
Counter createConnSucceededCounter,
Counter createConnFailedCounter,
Counter requestConnCounter, Counter reuseConnCounter,
Counter requestConnCounter,
Counter reuseConnCounter,
Counter connTakenFromPoolIsNotOpen,
Counter closeAboveHighWaterMarkCounter,
Counter maxConnsPerHostExceededCounter,
Expand Down Expand Up @@ -130,7 +135,7 @@ public IClientConfig getNiwsClientConfig()
@Override
public boolean isAvailable()
{
return true;
return !draining;
}

/** function to run when a connection is acquired before returning it to caller. */
Expand All @@ -140,7 +145,7 @@ protected void onAcquire(final PooledConnection conn, CurrentPassport passport)
removeIdleStateHandler(conn);

conn.setInUse();
if (LOG.isDebugEnabled()) LOG.debug("PooledConnection acquired: {}", conn.toString());
LOG.debug("PooledConnection acquired: {}", conn);
}

protected void removeIdleStateHandler(PooledConnection conn) {
Expand All @@ -150,6 +155,11 @@ protected void removeIdleStateHandler(PooledConnection conn) {
@Override
public Promise<PooledConnection> acquire(
EventLoop eventLoop, CurrentPassport passport, AtomicReference<? super InetAddress> selectedHostAddr) {

if(draining) {
throw new IllegalStateException("Attempt to acquire connection while draining");
}

requestConnCounter.increment();
server.incrementActiveRequestsCount();

Expand Down Expand Up @@ -314,6 +324,12 @@ public boolean release(PooledConnection conn)
return false;
}

if(draining) {
LOG.debug("[{}] closing released connection during drain", conn.getChannel().id());
conn.getChannel().close();
return false;
}

// Get the eventloop for this channel.
EventLoop eventLoop = conn.getChannel().eventLoop();
Deque<PooledConnection> connections = getPoolForEventLoop(eventLoop);
Expand Down Expand Up @@ -356,7 +372,7 @@ public boolean remove(PooledConnection conn)
// Get the eventloop for this channel.
EventLoop eventLoop = conn.getChannel().eventLoop();

// Attempt to return connection to the pool.
// Attempt to remove connection from the pool.
Deque<PooledConnection> connections = getPoolForEventLoop(eventLoop);
if (connections.remove(conn)) {
conn.setInPool(false);
Expand All @@ -378,6 +394,16 @@ public void shutdown()
}
}

@Override
public void drain() {
if(draining) {
throw new IllegalStateException("Already draining");
}

draining = true;
connectionsPerEventLoop.forEach((eventLoop,v) -> drainIdleConnectionsOnEventLoop(eventLoop));
}

@Override
public int getConnsInPool() {
return connsInPool.get();
Expand All @@ -398,4 +424,28 @@ private static InetAddress getSelectedHostString(SocketAddress addr) {
}
}

/**
* Closes idle connections in the connection pool for a given EventLoop. The closing is performed on the EventLoop
* thread since the connection pool is not thread safe.
*
* @param eventLoop - the event loop to drain
*/
void drainIdleConnectionsOnEventLoop(EventLoop eventLoop) {
eventLoop.execute(() -> {
Deque<PooledConnection> connections = connectionsPerEventLoop.get(eventLoop);
if(connections == null) {
return;
}

for(PooledConnection connection : connections) {
//any connections in the Deque are idle since they are removed in tryGettingFromConnectionPool()
connection.setInPool(false);
LOG.debug("Closing connection {}", connection);
connection.close();
connsInPool.decrementAndGet();
}
});
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,13 @@ public void startReadTimeoutHandler(Duration readTimeout)
new ReadTimeoutHandler(readTimeout.toMillis(), TimeUnit.MILLISECONDS));
}

ConnectionState getConnectionState() {
return connectionState;
}

boolean isReleased() {
return released;
}

@Override
public String toString()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

package com.netflix.zuul.netty.connectionpool;

import static org.junit.jupiter.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand All @@ -26,6 +28,7 @@
import com.netflix.appinfo.InstanceInfo.Builder;
import com.netflix.client.config.DefaultClientConfigImpl;
import com.netflix.spectator.api.DefaultRegistry;
import com.netflix.spectator.api.NoopRegistry;
import com.netflix.spectator.api.Registry;
import com.netflix.zuul.discovery.DiscoveryResult;
import com.netflix.zuul.discovery.DynamicServerResolver;
Expand All @@ -35,6 +38,7 @@
import com.netflix.zuul.passport.CurrentPassport;
import io.netty.channel.DefaultEventLoop;
import io.netty.channel.EventLoop;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.Promise;
Expand Down
Loading

0 comments on commit a4539e1

Please sign in to comment.