Skip to content

Commit

Permalink
Replace PromiseCombiner usage on ClientConnectionsShutdown (#1487)
Browse files Browse the repository at this point in the history
* Remove PromiseCombiner usage on ClientConnectionsShutdown and add tests

* Make ClientConnectionsShutdown non-blocking. Move blocking operation onto jvm shutdown hook thread. Make force close timeout configurable

* Delete log statement
  • Loading branch information
jguerra authored Mar 2, 2023
1 parent 6a4fdf9 commit 2019474
Show file tree
Hide file tree
Showing 3 changed files with 238 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,134 +23,98 @@
import com.netflix.discovery.StatusChangeEvent;
import com.netflix.netty.common.ConnectionCloseChannelAttributes;
import com.netflix.netty.common.ConnectionCloseType;
import com.netflix.netty.common.HttpChannelFlags;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPromise;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.ChannelGroupFuture;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.PromiseCombiner;
import io.netty.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
* TODO: Change this class to be an instance per-port.
* So that then the configuration can be different per-port, which is need for the combined FTL/Cloud clusters.
*
* <p>
* User: [email protected]
* Date: 3/6/17
* Time: 12:36 PM
*/
public class ClientConnectionsShutdown
{
public class ClientConnectionsShutdown {

private static final Logger LOG = LoggerFactory.getLogger(ClientConnectionsShutdown.class);
private static final DynamicBooleanProperty ENABLED = new DynamicBooleanProperty("server.outofservice.connections.shutdown", false);
private static final DynamicBooleanProperty ENABLED = new DynamicBooleanProperty(
"server.outofservice.connections.shutdown", false);
private static final DynamicIntProperty DELAY_AFTER_OUT_OF_SERVICE_MS =
new DynamicIntProperty("server.outofservice.connections.delay", 2000);
private static final DynamicIntProperty GRACEFUL_CLOSE_TIMEOUT = new DynamicIntProperty("server.outofservice.close.timeout", 30);

private final ChannelGroup channels;
private final EventExecutor executor;
private final EurekaClient discoveryClient;

public ClientConnectionsShutdown(ChannelGroup channels, EventExecutor executor, EurekaClient discoveryClient)
{
public ClientConnectionsShutdown(ChannelGroup channels, EventExecutor executor, EurekaClient discoveryClient) {
this.channels = channels;
this.executor = executor;
this.discoveryClient = discoveryClient;

if (discoveryClient != null) {
initDiscoveryListener();
}

// Only uncomment this for local testing!
// Allow a fast property to invoke connection shutdown for testing purposes.
// DynamicBooleanProperty DEBUG_SHUTDOWN = new DynamicBooleanProperty("server.outofservice.connections.shutdown.debug", false);
// DEBUG_SHUTDOWN.addCallback(() -> {
// if (DEBUG_SHUTDOWN.get()) {
// gracefullyShutdownClientChannels();
// }
// });
}

private void initDiscoveryListener()
{
private void initDiscoveryListener() {
this.discoveryClient.registerEventListener(event -> {
if (event instanceof StatusChangeEvent) {
StatusChangeEvent sce = (StatusChangeEvent) event;

LOG.info("Received {}", sce.toString());
LOG.info("Received {}", sce);

if (sce.getPreviousStatus() == InstanceInfo.InstanceStatus.UP
&& (sce.getStatus() == InstanceInfo.InstanceStatus.OUT_OF_SERVICE || sce.getStatus() == InstanceInfo.InstanceStatus.DOWN))
{
&& (sce.getStatus() == InstanceInfo.InstanceStatus.OUT_OF_SERVICE
|| sce.getStatus() == InstanceInfo.InstanceStatus.DOWN)) {
// TODO - Also should stop accepting any new client connections now too?

// Schedule to gracefully close all the client connections.
if (ENABLED.get()) {
executor.schedule(() -> {
gracefullyShutdownClientChannels();
}, DELAY_AFTER_OUT_OF_SERVICE_MS.get(), TimeUnit.MILLISECONDS);
executor.schedule(this::gracefullyShutdownClientChannels, DELAY_AFTER_OUT_OF_SERVICE_MS.get(),
TimeUnit.MILLISECONDS);
}
}
}
});
}

/**
* Note this blocks until all the channels have finished closing.
*/
public void gracefullyShutdownClientChannels()
{
LOG.warn("Gracefully shutting down all client channels");
try {


// Mark all active connections to be closed after next response sent.
LOG.warn("Flagging CLOSE_AFTER_RESPONSE on {} client channels.", channels.size());
// Pick some arbitrary executor.
PromiseCombiner closeAfterPromises = new PromiseCombiner(ImmediateEventExecutor.INSTANCE);
for (Channel channel : channels)
{
ConnectionCloseType.setForChannel(channel, ConnectionCloseType.DELAYED_GRACEFUL);

ChannelPromise closePromise = channel.pipeline().newPromise();
channel.attr(ConnectionCloseChannelAttributes.CLOSE_AFTER_RESPONSE).set(closePromise);
// TODO(carl-mastrangelo): remove closePromise, since I don't think it's needed. Need to verify.
closeAfterPromises.add(channel.closeFuture());
public Promise<Void> gracefullyShutdownClientChannels() {
// Mark all active connections to be closed after next response sent.
LOG.warn("Flagging CLOSE_AFTER_RESPONSE on {} client channels.", channels.size());

//racy situation if new connections are still coming in, but any channels created after newCloseFuture will
//be closed during the force close stage
ChannelGroupFuture closeFuture = channels.newCloseFuture();
for (Channel channel : channels) {
ConnectionCloseType.setForChannel(channel, ConnectionCloseType.DELAYED_GRACEFUL);
ChannelPromise closePromise = channel.pipeline().newPromise();
channel.attr(ConnectionCloseChannelAttributes.CLOSE_AFTER_RESPONSE).set(closePromise);
}

Promise<Void> promise = executor.newPromise();

ScheduledFuture<?> timeoutTask = executor.schedule(() -> {
LOG.warn("Force closing remaining {} active client channels.", channels.size());
channels.close();
}, GRACEFUL_CLOSE_TIMEOUT.get(), TimeUnit.SECONDS);

closeFuture.addListener(future -> {
if (!timeoutTask.isDone()) {
//close happened before the timeout
timeoutTask.cancel(false);
}
promise.setSuccess(null);
});

// Wait for all of the attempts to close connections gracefully, or max of 30 secs each.
Promise<Void> combinedCloseAfterPromise = executor.newPromise();
closeAfterPromises.finish(combinedCloseAfterPromise);
combinedCloseAfterPromise.await(30, TimeUnit.SECONDS);

// Close all of the remaining active connections.
LOG.warn("Closing remaining active client channels.");
List<ChannelFuture> forceCloseFutures = new ArrayList<>();
channels.forEach(channel -> {
if (channel.isActive()) {
ChannelFuture f = channel.pipeline().close();
forceCloseFutures.add(f);
}
});

LOG.warn("Waiting for {} client channels to be closed.", forceCloseFutures.size());
PromiseCombiner closePromisesCombiner = new PromiseCombiner(ImmediateEventExecutor.INSTANCE);
closePromisesCombiner.addAll(forceCloseFutures.toArray(new ChannelFuture[0]));
Promise<Void> combinedClosePromise = executor.newPromise();
closePromisesCombiner.finish(combinedClosePromise);
combinedClosePromise.await(5, TimeUnit.SECONDS);
LOG.warn("{} client channels closed.", forceCloseFutures.size());
}
catch (InterruptedException ie) {
LOG.warn("Interrupted while shutting down client channels");
}
return promise;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ synchronized private void stop()
// NOTE: ClientConnectionsShutdown can also be configured to gracefully close connections when the
// discovery status changes to DOWN. So if it has been configured that way, then this will be an additional
// call to gracefullyShutdownClientChannels(), which will be a noop.
clientConnectionsShutdown.gracefullyShutdownClientChannels();
clientConnectionsShutdown.gracefullyShutdownClientChannels().syncUninterruptibly();

LOG.info("Shutting down event loops");
List<EventLoopGroup> allEventLoopGroups = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
/*
* Copyright 2023 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.zuul.netty.server;

import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import com.netflix.appinfo.InstanceInfo.InstanceStatus;
import com.netflix.config.ConfigurationManager;
import com.netflix.discovery.EurekaClient;
import com.netflix.discovery.EurekaEventListener;
import com.netflix.discovery.StatusChangeEvent;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.DefaultEventLoop;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalServerChannel;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Promise;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.commons.configuration.AbstractConfiguration;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/**
* @author Justin Guerra
* @since 2/28/23
*/
class ClientConnectionsShutdownTest {

//using LocalChannels instead of EmbeddedChannels to re-create threading behavior in an actual deployment
private static LocalAddress LOCAL_ADDRESS;
private static DefaultEventLoopGroup SERVER_EVENT_LOOP;
private static DefaultEventLoopGroup CLIENT_EVENT_LOOP;
private static DefaultEventLoop EVENT_LOOP;

@BeforeAll
static void staticSetup() throws InterruptedException {
LOCAL_ADDRESS = new LocalAddress(UUID.randomUUID().toString());

CLIENT_EVENT_LOOP = new DefaultEventLoopGroup(4);
SERVER_EVENT_LOOP = new DefaultEventLoopGroup(4);
ServerBootstrap serverBootstrap = new ServerBootstrap().group(SERVER_EVENT_LOOP)
.localAddress(LOCAL_ADDRESS)
.channel(LocalServerChannel.class)
.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
protected void initChannel(LocalChannel ch) {

}
});

serverBootstrap.bind().sync();
EVENT_LOOP = new DefaultEventLoop(Executors.newSingleThreadExecutor());
}

@AfterAll
static void staticCleanup() {
CLIENT_EVENT_LOOP.shutdownGracefully();
SERVER_EVENT_LOOP.shutdownGracefully();
EVENT_LOOP.shutdownGracefully();
}

private ChannelGroup channels;
private ClientConnectionsShutdown shutdown;

@BeforeEach
void setup() {
channels = new DefaultChannelGroup(EVENT_LOOP);
shutdown = new ClientConnectionsShutdown(channels, EVENT_LOOP, null);
}

@Test
@SuppressWarnings("unchecked")
void discoveryShutdown() {
String configName = "server.outofservice.connections.shutdown";
AbstractConfiguration configuration = ConfigurationManager.getConfigInstance();

try {
configuration.setProperty(configName, "true");
EurekaClient eureka = Mockito.mock(EurekaClient.class);
EventExecutor executor = Mockito.mock(EventExecutor.class);

ArgumentCaptor<EurekaEventListener> captor = ArgumentCaptor.forClass(
EurekaEventListener.class);
shutdown = spy(new ClientConnectionsShutdown(channels, executor, eureka));
verify(eureka).registerEventListener(captor.capture());
doReturn(executor.newPromise()).when(shutdown).gracefullyShutdownClientChannels();

EurekaEventListener listener = captor.getValue();

listener.onEvent(new StatusChangeEvent(InstanceStatus.UP, InstanceStatus.DOWN));
verify(executor).schedule(ArgumentMatchers.isA(Callable.class), anyLong(), eq(TimeUnit.MILLISECONDS));

Mockito.reset(executor);
listener.onEvent(new StatusChangeEvent(InstanceStatus.UP, InstanceStatus.OUT_OF_SERVICE));
verify(executor).schedule(ArgumentMatchers.isA(Callable.class), anyLong(), eq(TimeUnit.MILLISECONDS));

Mockito.reset(executor);
listener.onEvent(new StatusChangeEvent(InstanceStatus.STARTING, InstanceStatus.OUT_OF_SERVICE));
verify(executor, never()).schedule(ArgumentMatchers.isA(Callable.class), anyLong(), eq(TimeUnit.MILLISECONDS));
} finally {
configuration.setProperty(configName, "false");
}
}

@Test
void allConnectionsGracefullyClosed() throws Exception {
createChannels(100);
Promise<Void> promise = shutdown.gracefullyShutdownClientChannels();
Promise<Object> testPromise = EVENT_LOOP.newPromise();

promise.addListener(future -> {
if(future.isSuccess()) {
testPromise.setSuccess(null);
} else {
testPromise.setFailure(future.cause());
}
});

channels.forEach(Channel::close);
testPromise.await(10, TimeUnit.SECONDS);
assertTrue(channels.isEmpty());
}

@Test
void connectionNeedsToBeForceClosed() throws Exception {
String configName = "server.outofservice.close.timeout";
AbstractConfiguration configuration = ConfigurationManager.getConfigInstance();

try {
configuration.setProperty(configName, "0");
createChannels(10);
shutdown.gracefullyShutdownClientChannels().await(10, TimeUnit.SECONDS);

assertTrue(channels.isEmpty(), "All channels in group should have been force closed after the timeout was triggered");
} finally {
configuration.setProperty(configName, "30");
}
}

private void createChannels(int numChannels) throws InterruptedException {
ChannelInitializer<LocalChannel> initializer = new ChannelInitializer<LocalChannel>() {
@Override
protected void initChannel(LocalChannel ch) {}
};

for (int i = 0; i < numChannels; ++i) {
ChannelFuture connect = new Bootstrap()
.group(CLIENT_EVENT_LOOP)
.channel(LocalChannel.class)
.handler(initializer)
.remoteAddress(LOCAL_ADDRESS)
.connect()
.sync();

channels.add(connect.channel());
}
}

}

0 comments on commit 2019474

Please sign in to comment.