From 2019474768767614782128fb8a5d3144aa424af5 Mon Sep 17 00:00:00 2001 From: Justin Guerra Date: Thu, 2 Mar 2023 09:43:06 -0700 Subject: [PATCH] Replace PromiseCombiner usage on ClientConnectionsShutdown (#1487) * Remove PromiseCombiner usage on ClientConnectionsShutdown and add tests * Make ClientConnectionsShutdown non-blocking. Move blocking operation onto jvm shutdown hook thread. Make force close timeout configurable * Delete log statement --- .../server/ClientConnectionsShutdown.java | 122 ++++------- .../com/netflix/zuul/netty/server/Server.java | 2 +- .../server/ClientConnectionsShutdownTest.java | 194 ++++++++++++++++++ 3 files changed, 238 insertions(+), 80 deletions(-) create mode 100644 zuul-core/src/test/java/com/netflix/zuul/netty/server/ClientConnectionsShutdownTest.java diff --git a/zuul-core/src/main/java/com/netflix/zuul/netty/server/ClientConnectionsShutdown.java b/zuul-core/src/main/java/com/netflix/zuul/netty/server/ClientConnectionsShutdown.java index 7447aee1d0..647f5be438 100644 --- a/zuul-core/src/main/java/com/netflix/zuul/netty/server/ClientConnectionsShutdown.java +++ b/zuul-core/src/main/java/com/netflix/zuul/netty/server/ClientConnectionsShutdown.java @@ -23,44 +23,39 @@ import com.netflix.discovery.StatusChangeEvent; import com.netflix.netty.common.ConnectionCloseChannelAttributes; import com.netflix.netty.common.ConnectionCloseType; -import com.netflix.netty.common.HttpChannelFlags; import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelPromise; import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.ChannelGroupFuture; import io.netty.util.concurrent.EventExecutor; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.ImmediateEventExecutor; import io.netty.util.concurrent.Promise; -import io.netty.util.concurrent.PromiseCombiner; +import io.netty.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeUnit; - /** * TODO: Change this class to be an instance per-port. * So that then the configuration can be different per-port, which is need for the combined FTL/Cloud clusters. - * + *

* User: michaels@netflix.com * Date: 3/6/17 * Time: 12:36 PM */ -public class ClientConnectionsShutdown -{ +public class ClientConnectionsShutdown { + private static final Logger LOG = LoggerFactory.getLogger(ClientConnectionsShutdown.class); - private static final DynamicBooleanProperty ENABLED = new DynamicBooleanProperty("server.outofservice.connections.shutdown", false); + private static final DynamicBooleanProperty ENABLED = new DynamicBooleanProperty( + "server.outofservice.connections.shutdown", false); private static final DynamicIntProperty DELAY_AFTER_OUT_OF_SERVICE_MS = new DynamicIntProperty("server.outofservice.connections.delay", 2000); + private static final DynamicIntProperty GRACEFUL_CLOSE_TIMEOUT = new DynamicIntProperty("server.outofservice.close.timeout", 30); private final ChannelGroup channels; private final EventExecutor executor; private final EurekaClient discoveryClient; - public ClientConnectionsShutdown(ChannelGroup channels, EventExecutor executor, EurekaClient discoveryClient) - { + public ClientConnectionsShutdown(ChannelGroup channels, EventExecutor executor, EurekaClient discoveryClient) { this.channels = channels; this.executor = executor; this.discoveryClient = discoveryClient; @@ -68,89 +63,58 @@ public ClientConnectionsShutdown(ChannelGroup channels, EventExecutor executor, if (discoveryClient != null) { initDiscoveryListener(); } - - // Only uncomment this for local testing! - // Allow a fast property to invoke connection shutdown for testing purposes. -// DynamicBooleanProperty DEBUG_SHUTDOWN = new DynamicBooleanProperty("server.outofservice.connections.shutdown.debug", false); -// DEBUG_SHUTDOWN.addCallback(() -> { -// if (DEBUG_SHUTDOWN.get()) { -// gracefullyShutdownClientChannels(); -// } -// }); } - private void initDiscoveryListener() - { + private void initDiscoveryListener() { this.discoveryClient.registerEventListener(event -> { if (event instanceof StatusChangeEvent) { StatusChangeEvent sce = (StatusChangeEvent) event; - LOG.info("Received {}", sce.toString()); + LOG.info("Received {}", sce); if (sce.getPreviousStatus() == InstanceInfo.InstanceStatus.UP - && (sce.getStatus() == InstanceInfo.InstanceStatus.OUT_OF_SERVICE || sce.getStatus() == InstanceInfo.InstanceStatus.DOWN)) - { + && (sce.getStatus() == InstanceInfo.InstanceStatus.OUT_OF_SERVICE + || sce.getStatus() == InstanceInfo.InstanceStatus.DOWN)) { // TODO - Also should stop accepting any new client connections now too? // Schedule to gracefully close all the client connections. if (ENABLED.get()) { - executor.schedule(() -> { - gracefullyShutdownClientChannels(); - }, DELAY_AFTER_OUT_OF_SERVICE_MS.get(), TimeUnit.MILLISECONDS); + executor.schedule(this::gracefullyShutdownClientChannels, DELAY_AFTER_OUT_OF_SERVICE_MS.get(), + TimeUnit.MILLISECONDS); } } } }); } - /** - * Note this blocks until all the channels have finished closing. - */ - public void gracefullyShutdownClientChannels() - { - LOG.warn("Gracefully shutting down all client channels"); - try { - - - // Mark all active connections to be closed after next response sent. - LOG.warn("Flagging CLOSE_AFTER_RESPONSE on {} client channels.", channels.size()); - // Pick some arbitrary executor. - PromiseCombiner closeAfterPromises = new PromiseCombiner(ImmediateEventExecutor.INSTANCE); - for (Channel channel : channels) - { - ConnectionCloseType.setForChannel(channel, ConnectionCloseType.DELAYED_GRACEFUL); - - ChannelPromise closePromise = channel.pipeline().newPromise(); - channel.attr(ConnectionCloseChannelAttributes.CLOSE_AFTER_RESPONSE).set(closePromise); - // TODO(carl-mastrangelo): remove closePromise, since I don't think it's needed. Need to verify. - closeAfterPromises.add(channel.closeFuture()); + public Promise gracefullyShutdownClientChannels() { + // Mark all active connections to be closed after next response sent. + LOG.warn("Flagging CLOSE_AFTER_RESPONSE on {} client channels.", channels.size()); + + //racy situation if new connections are still coming in, but any channels created after newCloseFuture will + //be closed during the force close stage + ChannelGroupFuture closeFuture = channels.newCloseFuture(); + for (Channel channel : channels) { + ConnectionCloseType.setForChannel(channel, ConnectionCloseType.DELAYED_GRACEFUL); + ChannelPromise closePromise = channel.pipeline().newPromise(); + channel.attr(ConnectionCloseChannelAttributes.CLOSE_AFTER_RESPONSE).set(closePromise); + } + + Promise promise = executor.newPromise(); + + ScheduledFuture timeoutTask = executor.schedule(() -> { + LOG.warn("Force closing remaining {} active client channels.", channels.size()); + channels.close(); + }, GRACEFUL_CLOSE_TIMEOUT.get(), TimeUnit.SECONDS); + + closeFuture.addListener(future -> { + if (!timeoutTask.isDone()) { + //close happened before the timeout + timeoutTask.cancel(false); } + promise.setSuccess(null); + }); - // Wait for all of the attempts to close connections gracefully, or max of 30 secs each. - Promise combinedCloseAfterPromise = executor.newPromise(); - closeAfterPromises.finish(combinedCloseAfterPromise); - combinedCloseAfterPromise.await(30, TimeUnit.SECONDS); - - // Close all of the remaining active connections. - LOG.warn("Closing remaining active client channels."); - List forceCloseFutures = new ArrayList<>(); - channels.forEach(channel -> { - if (channel.isActive()) { - ChannelFuture f = channel.pipeline().close(); - forceCloseFutures.add(f); - } - }); - - LOG.warn("Waiting for {} client channels to be closed.", forceCloseFutures.size()); - PromiseCombiner closePromisesCombiner = new PromiseCombiner(ImmediateEventExecutor.INSTANCE); - closePromisesCombiner.addAll(forceCloseFutures.toArray(new ChannelFuture[0])); - Promise combinedClosePromise = executor.newPromise(); - closePromisesCombiner.finish(combinedClosePromise); - combinedClosePromise.await(5, TimeUnit.SECONDS); - LOG.warn("{} client channels closed.", forceCloseFutures.size()); - } - catch (InterruptedException ie) { - LOG.warn("Interrupted while shutting down client channels"); - } + return promise; } } diff --git a/zuul-core/src/main/java/com/netflix/zuul/netty/server/Server.java b/zuul-core/src/main/java/com/netflix/zuul/netty/server/Server.java index 23c7189aa3..c36b6663a0 100644 --- a/zuul-core/src/main/java/com/netflix/zuul/netty/server/Server.java +++ b/zuul-core/src/main/java/com/netflix/zuul/netty/server/Server.java @@ -435,7 +435,7 @@ synchronized private void stop() // NOTE: ClientConnectionsShutdown can also be configured to gracefully close connections when the // discovery status changes to DOWN. So if it has been configured that way, then this will be an additional // call to gracefullyShutdownClientChannels(), which will be a noop. - clientConnectionsShutdown.gracefullyShutdownClientChannels(); + clientConnectionsShutdown.gracefullyShutdownClientChannels().syncUninterruptibly(); LOG.info("Shutting down event loops"); List allEventLoopGroups = new ArrayList<>(); diff --git a/zuul-core/src/test/java/com/netflix/zuul/netty/server/ClientConnectionsShutdownTest.java b/zuul-core/src/test/java/com/netflix/zuul/netty/server/ClientConnectionsShutdownTest.java new file mode 100644 index 0000000000..94a04bdd51 --- /dev/null +++ b/zuul-core/src/test/java/com/netflix/zuul/netty/server/ClientConnectionsShutdownTest.java @@ -0,0 +1,194 @@ +/* + * Copyright 2023 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netflix.zuul.netty.server; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import com.netflix.appinfo.InstanceInfo.InstanceStatus; +import com.netflix.config.ConfigurationManager; +import com.netflix.discovery.EurekaClient; +import com.netflix.discovery.EurekaEventListener; +import com.netflix.discovery.StatusChangeEvent; +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.DefaultEventLoop; +import io.netty.channel.DefaultEventLoopGroup; +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.DefaultChannelGroup; +import io.netty.channel.local.LocalAddress; +import io.netty.channel.local.LocalChannel; +import io.netty.channel.local.LocalServerChannel; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.Promise; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.apache.commons.configuration.AbstractConfiguration; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.ArgumentMatchers; +import org.mockito.Mockito; + +/** + * @author Justin Guerra + * @since 2/28/23 + */ +class ClientConnectionsShutdownTest { + + //using LocalChannels instead of EmbeddedChannels to re-create threading behavior in an actual deployment + private static LocalAddress LOCAL_ADDRESS; + private static DefaultEventLoopGroup SERVER_EVENT_LOOP; + private static DefaultEventLoopGroup CLIENT_EVENT_LOOP; + private static DefaultEventLoop EVENT_LOOP; + + @BeforeAll + static void staticSetup() throws InterruptedException { + LOCAL_ADDRESS = new LocalAddress(UUID.randomUUID().toString()); + + CLIENT_EVENT_LOOP = new DefaultEventLoopGroup(4); + SERVER_EVENT_LOOP = new DefaultEventLoopGroup(4); + ServerBootstrap serverBootstrap = new ServerBootstrap().group(SERVER_EVENT_LOOP) + .localAddress(LOCAL_ADDRESS) + .channel(LocalServerChannel.class) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(LocalChannel ch) { + + } + }); + + serverBootstrap.bind().sync(); + EVENT_LOOP = new DefaultEventLoop(Executors.newSingleThreadExecutor()); + } + + @AfterAll + static void staticCleanup() { + CLIENT_EVENT_LOOP.shutdownGracefully(); + SERVER_EVENT_LOOP.shutdownGracefully(); + EVENT_LOOP.shutdownGracefully(); + } + + private ChannelGroup channels; + private ClientConnectionsShutdown shutdown; + + @BeforeEach + void setup() { + channels = new DefaultChannelGroup(EVENT_LOOP); + shutdown = new ClientConnectionsShutdown(channels, EVENT_LOOP, null); + } + + @Test + @SuppressWarnings("unchecked") + void discoveryShutdown() { + String configName = "server.outofservice.connections.shutdown"; + AbstractConfiguration configuration = ConfigurationManager.getConfigInstance(); + + try { + configuration.setProperty(configName, "true"); + EurekaClient eureka = Mockito.mock(EurekaClient.class); + EventExecutor executor = Mockito.mock(EventExecutor.class); + + ArgumentCaptor captor = ArgumentCaptor.forClass( + EurekaEventListener.class); + shutdown = spy(new ClientConnectionsShutdown(channels, executor, eureka)); + verify(eureka).registerEventListener(captor.capture()); + doReturn(executor.newPromise()).when(shutdown).gracefullyShutdownClientChannels(); + + EurekaEventListener listener = captor.getValue(); + + listener.onEvent(new StatusChangeEvent(InstanceStatus.UP, InstanceStatus.DOWN)); + verify(executor).schedule(ArgumentMatchers.isA(Callable.class), anyLong(), eq(TimeUnit.MILLISECONDS)); + + Mockito.reset(executor); + listener.onEvent(new StatusChangeEvent(InstanceStatus.UP, InstanceStatus.OUT_OF_SERVICE)); + verify(executor).schedule(ArgumentMatchers.isA(Callable.class), anyLong(), eq(TimeUnit.MILLISECONDS)); + + Mockito.reset(executor); + listener.onEvent(new StatusChangeEvent(InstanceStatus.STARTING, InstanceStatus.OUT_OF_SERVICE)); + verify(executor, never()).schedule(ArgumentMatchers.isA(Callable.class), anyLong(), eq(TimeUnit.MILLISECONDS)); + } finally { + configuration.setProperty(configName, "false"); + } + } + + @Test + void allConnectionsGracefullyClosed() throws Exception { + createChannels(100); + Promise promise = shutdown.gracefullyShutdownClientChannels(); + Promise testPromise = EVENT_LOOP.newPromise(); + + promise.addListener(future -> { + if(future.isSuccess()) { + testPromise.setSuccess(null); + } else { + testPromise.setFailure(future.cause()); + } + }); + + channels.forEach(Channel::close); + testPromise.await(10, TimeUnit.SECONDS); + assertTrue(channels.isEmpty()); + } + + @Test + void connectionNeedsToBeForceClosed() throws Exception { + String configName = "server.outofservice.close.timeout"; + AbstractConfiguration configuration = ConfigurationManager.getConfigInstance(); + + try { + configuration.setProperty(configName, "0"); + createChannels(10); + shutdown.gracefullyShutdownClientChannels().await(10, TimeUnit.SECONDS); + + assertTrue(channels.isEmpty(), "All channels in group should have been force closed after the timeout was triggered"); + } finally { + configuration.setProperty(configName, "30"); + } + } + + private void createChannels(int numChannels) throws InterruptedException { + ChannelInitializer initializer = new ChannelInitializer() { + @Override + protected void initChannel(LocalChannel ch) {} + }; + + for (int i = 0; i < numChannels; ++i) { + ChannelFuture connect = new Bootstrap() + .group(CLIENT_EVENT_LOOP) + .channel(LocalChannel.class) + .handler(initializer) + .remoteAddress(LOCAL_ADDRESS) + .connect() + .sync(); + + channels.add(connect.channel()); + } + } + +} \ No newline at end of file