From 8e35022f777c052eb685bf878cea8f49e332f48c Mon Sep 17 00:00:00 2001 From: Krupskis <44723913+Krupskis@users.noreply.github.com> Date: Mon, 30 Aug 2021 05:50:53 +0300 Subject: [PATCH] Deprecate `syncUninterruptibly()` and `awaitRequestNUninterruptibly()` in tests (#1756) Motivation: Currently netty's syncUninterruptibly() and awaitRequestNUninterruptibly() do not propagate `InterruptedException`, meaning that jUnit5 is not able to stop timeouted tests. Modifications: - change the uses of `syncUninterruptibly()` to `sync()` - on `safeSync()` use `Callable` instead of `Runnable`, because `Runnable` is unable to throw exception; - Change all usages of `awaitRequestNUninterruptibly()` to `awaitRequestN()`; - add `@Deprecated` to `awaitRequestNUninterruptibly()`; Result: `InterruptedException` is no longer silently discarded, meaning jUnit5 will be able to abort the timeouted test. --- .../concurrent/api/TestSubscription.java | 2 + .../netty/NettyChannelContentCodec.java | 2 +- .../netty/NettyCompressionSerializer.java | 2 +- .../netty/H2ConcurrencyControllerTest.java | 8 +-- .../H2PriorKnowledgeFeatureParityTest.java | 10 ++-- .../MalformedDataAfterHttpMessageTest.java | 6 +-- .../netty/NettyHttpServerConnectionTest.java | 2 +- ...eClosureBeforeResponsePayloadBodyTest.java | 4 +- .../netty/ServerRespondsOnClosingTest.java | 6 +-- ...eTalkContentEncodingCompatibilityTest.java | 6 +-- ...ToNettyContentCodingCompatibilityTest.java | 10 ++-- .../http/netty/StreamObserverTest.java | 16 +++--- .../tcp/netty/internal/TcpClient.java | 6 +-- ...stractSslCloseNotifyAlertHandlingTest.java | 2 +- .../internal/DefaultNettyConnectionTest.java | 4 +- .../RequestResponseCloseHandlerTest.java | 54 +++++++++---------- 16 files changed, 73 insertions(+), 67 deletions(-) diff --git a/servicetalk-concurrent-api/src/testFixtures/java/io/servicetalk/concurrent/api/TestSubscription.java b/servicetalk-concurrent-api/src/testFixtures/java/io/servicetalk/concurrent/api/TestSubscription.java index a664b7bf4b..42c7ea0c58 100644 --- a/servicetalk-concurrent-api/src/testFixtures/java/io/servicetalk/concurrent/api/TestSubscription.java +++ b/servicetalk-concurrent-api/src/testFixtures/java/io/servicetalk/concurrent/api/TestSubscription.java @@ -103,9 +103,11 @@ public void awaitRequestN(long amount) throws InterruptedException { /** * Wait until the {@link Subscription#request(long)} amount exceeds {@code amount} without being interrupted. This * method catches an {@link InterruptedException} and discards it silently. + * @deprecated use {@link #awaitRequestN(long)} instead. * * @param amount the amount to wait for. */ + @Deprecated public void awaitRequestNUninterruptibly(long amount) { boolean interrupted = false; synchronized (waitingLock) { diff --git a/servicetalk-encoding-netty/src/main/java/io/servicetalk/encoding/netty/NettyChannelContentCodec.java b/servicetalk-encoding-netty/src/main/java/io/servicetalk/encoding/netty/NettyChannelContentCodec.java index 19bbad6c4b..74ed79c3c3 100644 --- a/servicetalk-encoding-netty/src/main/java/io/servicetalk/encoding/netty/NettyChannelContentCodec.java +++ b/servicetalk-encoding-netty/src/main/java/io/servicetalk/encoding/netty/NettyChannelContentCodec.java @@ -280,7 +280,7 @@ private EmbeddedChannel newEmbeddedChannel(final ChannelHandler handler, final B private static void preparePendingData(final EmbeddedChannel channel) { try { - channel.close().syncUninterruptibly().get(); + channel.close().sync().get(); channel.checkException(); } catch (InterruptedException | ExecutionException ex) { throwException(ex); diff --git a/servicetalk-encoding-netty/src/main/java/io/servicetalk/encoding/netty/NettyCompressionSerializer.java b/servicetalk-encoding-netty/src/main/java/io/servicetalk/encoding/netty/NettyCompressionSerializer.java index 1666ee96e6..48fb009f52 100644 --- a/servicetalk-encoding-netty/src/main/java/io/servicetalk/encoding/netty/NettyCompressionSerializer.java +++ b/servicetalk-encoding-netty/src/main/java/io/servicetalk/encoding/netty/NettyCompressionSerializer.java @@ -117,7 +117,7 @@ private static EmbeddedChannel newEmbeddedChannel(final ChannelHandler handler, static void preparePendingData(final EmbeddedChannel channel) { try { - channel.close().syncUninterruptibly().get(); + channel.close().sync().get(); channel.checkException(); } catch (InterruptedException | ExecutionException ex) { throwException(ex); diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/H2ConcurrencyControllerTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/H2ConcurrencyControllerTest.java index d54a8c5481..9c29f518a8 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/H2ConcurrencyControllerTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/H2ConcurrencyControllerTest.java @@ -83,7 +83,7 @@ class H2ConcurrencyControllerTest { private final CountDownLatch[] latches = new CountDownLatch[N_ITERATIONS]; @BeforeEach - void setUp() { + void setUp() throws Exception { serverEventLoopGroup = createIoExecutor(1, "server-io").eventLoopGroup(); for (int i = 0; i < N_ITERATIONS; i++) { latches[i] = new CountDownLatch(1); @@ -149,9 +149,9 @@ public void onError(final Throwable t) { } @AfterEach - void tearDown() { - safeSync(() -> serverAcceptorChannel.close().syncUninterruptibly()); - safeSync(() -> serverEventLoopGroup.shutdownGracefully(0, 0, MILLISECONDS).syncUninterruptibly()); + void tearDown() throws Exception { + safeSync(() -> serverAcceptorChannel.close().sync()); + safeSync(() -> serverEventLoopGroup.shutdownGracefully(0, 0, MILLISECONDS).sync()); safeClose(client); } diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/H2PriorKnowledgeFeatureParityTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/H2PriorKnowledgeFeatureParityTest.java index efcb1e36d9..8616dbc08b 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/H2PriorKnowledgeFeatureParityTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/H2PriorKnowledgeFeatureParityTest.java @@ -193,12 +193,12 @@ private static Stream clientExecutors() { @AfterEach void teardown() throws Exception { if (serverAcceptorChannel != null) { - serverAcceptorChannel.close().syncUninterruptibly(); + serverAcceptorChannel.close().sync(); } if (h1ServerContext != null) { h1ServerContext.close(); } - serverEventLoopGroup.shutdownGracefully(0, 0, MILLISECONDS).syncUninterruptibly(); + serverEventLoopGroup.shutdownGracefully(0, 0, MILLISECONDS).sync(); Executor executor = clientExecutionStrategy.executor(); if (executor != null) { executor.closeAsync().toFuture().get(); @@ -1322,7 +1322,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception serverChannelLatch.await(); Channel serverParentChannel = serverParentChannelRef.get(); serverParentChannel.writeAndFlush(new DefaultHttp2SettingsFrame( - new Http2Settings().maxConcurrentStreams(expectedMaxConcurrent))).syncUninterruptibly(); + new Http2Settings().maxConcurrentStreams(expectedMaxConcurrent))).sync(); Iterator> maxItr = maxConcurrentPubQueue.take().toIterable().iterator(); // Verify that the initial maxConcurrency value is the default number @@ -1488,7 +1488,7 @@ private static Processor newProcessor() { static Channel bindH2Server(EventLoopGroup serverEventLoopGroup, ChannelHandler childChannelHandler, Consumer parentChannelInitializer, - UnaryOperator configureH2Codec) { + UnaryOperator configureH2Codec) throws Exception { ServerBootstrap sb = new ServerBootstrap(); sb.group(serverEventLoopGroup); sb.channel(serverChannel(serverEventLoopGroup, InetSocketAddress.class)); @@ -1500,7 +1500,7 @@ protected void initChannel(final Channel ch) { parentChannelInitializer.accept(ch.pipeline()); } }); - return sb.bind(localAddress(0)).syncUninterruptibly().channel(); + return sb.bind(localAddress(0)).sync().channel(); } private InetSocketAddress bindHttpEchoServer() throws Exception { diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/MalformedDataAfterHttpMessageTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/MalformedDataAfterHttpMessageTest.java index 8bfc0f73ee..e643a828a3 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/MalformedDataAfterHttpMessageTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/MalformedDataAfterHttpMessageTest.java @@ -98,7 +98,7 @@ void afterResponse() throws Exception { assertThrows(DecoderException.class, () -> connection.request(connection.get("/"))); connectionClosedLatch.await(); } finally { - server.close().syncUninterruptibly(); + server.close().sync(); } } @@ -126,7 +126,7 @@ void afterRequest() throws Exception { } } - private static ServerSocketChannel nettyServer(String response) { + private static ServerSocketChannel nettyServer(String response) throws Exception { EventLoopGroup eventLoopGroup = toEventLoopAwareNettyIoExecutor(SERVER_CTX.ioExecutor()).eventLoopGroup(); ServerBootstrap bs = new ServerBootstrap(); bs.group(eventLoopGroup); @@ -147,7 +147,7 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) { }); } }); - return (ServerSocketChannel) bs.bind(localAddress(0)).syncUninterruptibly().channel(); + return (ServerSocketChannel) bs.bind(localAddress(0)).sync().channel(); } private static ServerContext stServer() throws Exception { diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/NettyHttpServerConnectionTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/NettyHttpServerConnectionTest.java index 9211a15300..6fd7681f12 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/NettyHttpServerConnectionTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/NettyHttpServerConnectionTest.java @@ -119,7 +119,7 @@ void updateFlushStrategy(HttpExecutionStrategy serverExecutionStrategy, String payloadBodyString = "foo"; TestSubscription testSubscription1 = new TestSubscription(); responsePublisher.onSubscribe(testSubscription1); - testSubscription1.awaitRequestNUninterruptibly(1); + testSubscription1.awaitRequestN(1); responsePublisher.onNext(DEFAULT_ALLOCATOR.fromAscii(payloadBodyString)); responsePublisher.onComplete(); customFlushSender.flush(); diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/PrematureClosureBeforeResponsePayloadBodyTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/PrematureClosureBeforeResponsePayloadBodyTest.java index 63db9abb8f..91844f5764 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/PrematureClosureBeforeResponsePayloadBodyTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/PrematureClosureBeforeResponsePayloadBodyTest.java @@ -108,7 +108,7 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) { bs.childOption(ALLOW_HALF_CLOSURE, true); bs.childOption(AUTO_CLOSE, false); server = (ServerSocketChannel) bs.bind(localAddress(0)) - .syncUninterruptibly().channel(); + .sync().channel(); client = HttpClients.forSingleAddress(HostAndPort.of(server.localAddress())) .enableWireLogging("servicetalk-tests-wire-logger", TRACE, () -> true) @@ -125,7 +125,7 @@ void turnDown() throws Exception { try { client.closeGracefully(); } finally { - server.close().syncUninterruptibly(); + server.close().sync(); } } diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ServerRespondsOnClosingTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ServerRespondsOnClosingTest.java index 1e12ed81f1..37e7d4cfaf 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ServerRespondsOnClosingTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ServerRespondsOnClosingTest.java @@ -94,7 +94,7 @@ void tearDown() throws Exception { serverConnection.closeAsyncGracefully().toFuture().get(); } finally { channel.finishAndReleaseAll(); - channel.close().syncUninterruptibly(); + channel.close().sync(); } } @@ -244,9 +244,9 @@ private void verifyResponse(String requestPath) { assertThat("Unexpected response trailers object", trailers.readableBytes(), is(0)); } - private void respondWithFIN() { + private void respondWithFIN() throws Exception { assertThat("Server did not shutdown output", channel.isOutputShutdown(), is(true)); - channel.shutdownInput().syncUninterruptibly(); // simulate FIN from the client + channel.shutdownInput().sync(); // simulate FIN from the client } private void assertServerConnectionClosed() throws Exception { diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ServiceTalkContentEncodingCompatibilityTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ServiceTalkContentEncodingCompatibilityTest.java index b3350affcc..0419997493 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ServiceTalkContentEncodingCompatibilityTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ServiceTalkContentEncodingCompatibilityTest.java @@ -87,7 +87,7 @@ protected void initChannel(final Channel ch) { p.addLast(EchoServerHandler.INSTANCE); } }); - serverAcceptorChannel = sb.bind(localAddress(0)).syncUninterruptibly().channel(); + serverAcceptorChannel = sb.bind(localAddress(0)).sync().channel(); try (BlockingHttpClient client = HttpClients.forSingleAddress( HostAndPort.of((InetSocketAddress) serverAcceptorChannel.localAddress())) @@ -106,9 +106,9 @@ protected void initChannel(final Channel ch) { } } finally { if (serverAcceptorChannel != null) { - serverAcceptorChannel.close().syncUninterruptibly(); + serverAcceptorChannel.close().sync(); } - serverEventLoopGroup.shutdownGracefully(0, 0, MILLISECONDS).syncUninterruptibly(); + serverEventLoopGroup.shutdownGracefully(0, 0, MILLISECONDS).sync(); } } diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ServiceTalkToNettyContentCodingCompatibilityTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ServiceTalkToNettyContentCodingCompatibilityTest.java index 0f387ea50e..13ebffafc7 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ServiceTalkToNettyContentCodingCompatibilityTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ServiceTalkToNettyContentCodingCompatibilityTest.java @@ -59,7 +59,7 @@ class ServiceTalkToNettyContentCodingCompatibilityTest extends ServiceTalkConten private BlockingHttpClient client; @Override - void start() { + void start() throws Exception { serverEventLoopGroup = createIoExecutor(2, "server-io").eventLoopGroup(); serverAcceptorChannel = newNettyServer(); InetSocketAddress serverAddress = (InetSocketAddress) serverAcceptorChannel.localAddress(); @@ -70,17 +70,17 @@ void start() { @AfterEach void finish() throws Exception { if (serverAcceptorChannel != null) { - serverAcceptorChannel.close().syncUninterruptibly(); + serverAcceptorChannel.close().sync(); } if (serverEventLoopGroup != null) { - serverEventLoopGroup.shutdownGracefully(0, 0, MILLISECONDS).syncUninterruptibly(); + serverEventLoopGroup.shutdownGracefully(0, 0, MILLISECONDS).sync(); } if (client != null) { client.close(); } } - private Channel newNettyServer() { + private Channel newNettyServer() throws Exception { ServerBootstrap sb = new ServerBootstrap(); sb.group(serverEventLoopGroup); sb.channel(serverChannel(serverEventLoopGroup, InetSocketAddress.class)); @@ -97,7 +97,7 @@ protected void initChannel(final Channel ch) { p.addLast(EchoServerHandler.INSTANCE); } }); - return sb.bind(localAddress(0)).syncUninterruptibly().channel(); + return sb.bind(localAddress(0)).sync().channel(); } @Override diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/StreamObserverTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/StreamObserverTest.java index 063e771b7d..ce4a8f44d4 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/StreamObserverTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/StreamObserverTest.java @@ -47,6 +47,7 @@ import java.net.InetSocketAddress; import java.nio.channels.ClosedChannelException; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -83,7 +84,7 @@ class StreamObserverTest { private final HttpClient client; private final CountDownLatch requestReceived = new CountDownLatch(1); - StreamObserverTest() { + StreamObserverTest() throws Exception { clientTransportObserver = mock(TransportObserver.class, "clientTransportObserver"); clientConnectionObserver = mock(ConnectionObserver.class, "clientConnectionObserver"); clientMultiplexedObserver = mock(MultiplexedObserver.class, "clientMultiplexedObserver"); @@ -122,15 +123,18 @@ protected void channelRead0(final ChannelHandlerContext ctx, final Object msg) { } @AfterEach - void teardown() { - safeSync(() -> serverAcceptorChannel.close().syncUninterruptibly()); - safeSync(() -> serverEventLoopGroup.shutdownGracefully(0, 0, MILLISECONDS).syncUninterruptibly()); + void teardown() throws Exception { + safeSync(() -> serverAcceptorChannel.close().sync()); + safeSync(() -> serverEventLoopGroup.shutdownGracefully(0, 0, MILLISECONDS).sync()); safeClose(client); } - static void safeSync(Runnable runnable) { + static void safeSync(Callable callable) throws Exception { try { - runnable.run(); + callable.call(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw e; } catch (Exception e) { e.printStackTrace(); } diff --git a/servicetalk-tcp-netty-internal/src/testFixtures/java/io/servicetalk/tcp/netty/internal/TcpClient.java b/servicetalk-tcp-netty-internal/src/testFixtures/java/io/servicetalk/tcp/netty/internal/TcpClient.java index f6e8fa58b1..889a09ce98 100644 --- a/servicetalk-tcp-netty-internal/src/testFixtures/java/io/servicetalk/tcp/netty/internal/TcpClient.java +++ b/servicetalk-tcp-netty-internal/src/testFixtures/java/io/servicetalk/tcp/netty/internal/TcpClient.java @@ -112,7 +112,7 @@ public Single> connect(ExecutionContext executio */ public NettyConnection connectWithFdBlocking(ExecutionContext executionContext, SocketAddress address) - throws ExecutionException, InterruptedException { + throws Exception { assumeTrue(executionContext.ioExecutor().isFileDescriptorSocketAddressSupported()); assumeTrue(Epoll.isAvailable() || KQueue.isAvailable()); @@ -137,10 +137,10 @@ public void channelRead0(ChannelHandlerContext ctx, Object msg) { } }) .connect(address) - .syncUninterruptibly().channel(); + .sync().channel(); // Unregister it from the netty EventLoop as we want to to handle it via ST. - channel.deregister().syncUninterruptibly(); + channel.deregister().sync(); FileDescriptorSocketAddress fd = new FileDescriptorSocketAddress(channel.fd().intValue()); NettyConnection connection = connectBlocking(executionContext, fd); assertThat("Data read on the FileDescriptor from netty pipeline.", diff --git a/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/AbstractSslCloseNotifyAlertHandlingTest.java b/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/AbstractSslCloseNotifyAlertHandlingTest.java index fe2f456813..d4ade3ce6c 100644 --- a/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/AbstractSslCloseNotifyAlertHandlingTest.java +++ b/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/AbstractSslCloseNotifyAlertHandlingTest.java @@ -95,7 +95,7 @@ public void tearDown() throws Exception { conn.closeAsync().toFuture().get(); } finally { channel.finishAndReleaseAll(); - channel.close().syncUninterruptibly(); + channel.close().sync(); } } } diff --git a/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/DefaultNettyConnectionTest.java b/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/DefaultNettyConnectionTest.java index 37e05589e8..b6dcc28f15 100644 --- a/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/DefaultNettyConnectionTest.java +++ b/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/DefaultNettyConnectionTest.java @@ -328,8 +328,8 @@ void testTerminalPredicateThrowTerminatesReadPublisher() throws Exception { } @Test - void testNoErrorEnrichmentWithoutCloseHandlerOnError() { - channel.close().syncUninterruptibly(); + void testNoErrorEnrichmentWithoutCloseHandlerOnError() throws Exception { + channel.close().sync(); toSource(conn.write(publisher)).subscribe(writeListener); Throwable error = writeListener.awaitOnError(); diff --git a/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/RequestResponseCloseHandlerTest.java b/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/RequestResponseCloseHandlerTest.java index 2669f5e420..4fe364e8b0 100644 --- a/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/RequestResponseCloseHandlerTest.java +++ b/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/RequestResponseCloseHandlerTest.java @@ -518,7 +518,7 @@ void simulate(final ScenarioMode mode, final List events, final class RequestResponseUserEventTest { @Test - void clientOutboundDataEndEventEmitsUserEventAlways() { + void clientOutboundDataEndEventEmitsUserEventAlways() throws Exception { AtomicBoolean ab = new AtomicBoolean(false); final EmbeddedChannel channel = new EmbeddedChannel(new ChannelInboundHandlerAdapter() { @Override @@ -532,12 +532,12 @@ public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt final RequestResponseCloseHandler ch = new RequestResponseCloseHandler(true); channel.eventLoop().execute(() -> ch.protocolPayloadEndOutbound(channel.pipeline().firstContext(), channel.newPromise())); - channel.close().syncUninterruptibly(); + channel.close().sync(); assertThat("OutboundDataEndEvent not fired", ab.get(), is(true)); } @Test - void serverOutboundDataEndEventDoesntEmitUntilClosing() { + void serverOutboundDataEndEventDoesntEmitUntilClosing() throws Exception { AtomicBoolean ab = new AtomicBoolean(false); final EmbeddedChannel channel = new EmbeddedChannel(new ChannelInboundHandlerAdapter() { @Override @@ -551,7 +551,7 @@ public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt final RequestResponseCloseHandler ch = new RequestResponseCloseHandler(false); channel.eventLoop().execute(() -> ch.protocolPayloadEndOutbound(channel.pipeline().firstContext(), channel.newPromise())); - channel.close().syncUninterruptibly(); + channel.close().sync(); assertThat("OutboundDataEndEvent should not fire", ab.get(), is(false)); } @@ -584,12 +584,12 @@ public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt // Response #2 channel.eventLoop().execute(() -> ch.protocolPayloadBeginOutbound(ctx)); channel.eventLoop().execute(() -> ch.protocolPayloadEndOutbound(ctx, ctx.newPromise())); - channel.close().syncUninterruptibly(); + channel.close().sync(); assertThat("OutboundDataEndEvent not fired", ab.get(), is(true)); } @Test - void serverOutboundDataEndEventEmitsUserEventWhenClosing() { + void serverOutboundDataEndEventEmitsUserEventWhenClosing() throws Exception { AtomicBoolean ab = new AtomicBoolean(false); final EmbeddedChannel channel = new EmbeddedChannel(new ChannelInboundHandlerAdapter() { @Override @@ -604,7 +604,7 @@ public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt channel.eventLoop().execute(() -> ch.gracefulUserClosing(channel)); channel.eventLoop().execute(() -> ch.protocolPayloadEndOutbound(channel.pipeline().firstContext(), channel.newPromise())); - channel.close().syncUninterruptibly(); + channel.close().sync(); assertThat("OutboundDataEndEvent not fired", ab.get(), is(true)); } } @@ -634,21 +634,21 @@ class ChannelBehavior { @BeforeEach @SuppressWarnings("unchecked") - public void setup() throws InterruptedException { + public void setup() throws Exception { ssChannel = startServer(); cChannel = connectClient(ssChannel.localAddress()); connectedLatch.await(); } @AfterEach - public void dispose() { - cChannel.close().syncUninterruptibly(); - sChannel.close().syncUninterruptibly(); - ssChannel.close().syncUninterruptibly(); + public void dispose() throws Exception { + cChannel.close().sync(); + sChannel.close().sync(); + ssChannel.close().sync(); } // Based on TcpServerInitializer - private ServerSocketChannel startServer() { + private ServerSocketChannel startServer() throws Exception { EventLoopAwareNettyIoExecutor eventLoopAwareNettyIoExecutor = toEventLoopAwareNettyIoExecutor(serverCtx.ioExecutor()); EventLoop loop = eventLoopAwareNettyIoExecutor.eventLoopGroup().next(); @@ -683,11 +683,11 @@ public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt bs.childOption(AUTO_CLOSE, false); return (ServerSocketChannel) bs.bind(localAddress(0)) - .syncUninterruptibly().channel(); + .sync().channel(); } // Based on TcpConnector - private SocketChannel connectClient(InetSocketAddress address) { + private SocketChannel connectClient(InetSocketAddress address) throws Exception { EventLoopAwareNettyIoExecutor eventLoopAwareNettyIoExecutor = toEventLoopAwareNettyIoExecutor(clientCtx.ioExecutor()); EventLoop loop = eventLoopAwareNettyIoExecutor.eventLoopGroup().next(); @@ -719,12 +719,12 @@ public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt bs.option(ALLOW_HALF_CLOSURE, true); bs.option(AUTO_CLOSE, false); - return (SocketChannel) bs.connect(address).syncUninterruptibly().channel(); + return (SocketChannel) bs.connect(address).sync().channel(); } @Test - void clientCloseEmitsNoShutdownEventsOnClient() { - cChannel.close().syncUninterruptibly(); + void clientCloseEmitsNoShutdownEventsOnClient() throws Exception { + cChannel.close().sync(); assertThat(clientOutputShutdownLatch.getCount(), equalTo(1L)); assertThat(clientInputShutdownLatch.getCount(), equalTo(1L)); assertThat(clientInputShutdownReadCompleteLatch.getCount(), equalTo(1L)); @@ -735,7 +735,7 @@ void clientCloseEmitsNoShutdownEventsOnClient() { @Test void clientCloseEmitsServerInputShutdownImmediatelyAndOutputAfterWriting() throws Exception { - cChannel.close().syncUninterruptibly(); + cChannel.close().sync(); serverInputShutdownReadCompleteLatch.await(); serverInputShutdownLatch.await(); assertThat(sChannel.isInputShutdown(), is(true)); @@ -749,7 +749,7 @@ void clientCloseEmitsServerInputShutdownImmediatelyAndOutputAfterWriting() throw @Test void clientShutdownOutputEmitsClientOutputShutdownAndServerInputShutdown() throws Exception { - cChannel.shutdownOutput().syncUninterruptibly(); + cChannel.shutdownOutput().sync(); clientOutputShutdownLatch.await(); serverInputShutdownReadCompleteLatch.await(); serverInputShutdownLatch.await(); @@ -764,7 +764,7 @@ void clientShutdownOutputEmitsClientOutputShutdownAndServerInputShutdown() throw @Test void serverShutdownInputEmitsServerInputShutdownReadCompleteOnly() throws Exception { assumeFalse(sChannel instanceof NioSocketChannel, "Windows doesn't emit ChannelInputShutdownReadComplete. Investigation Required."); - sChannel.shutdownInput().syncUninterruptibly(); + sChannel.shutdownInput().sync(); serverInputShutdownReadCompleteLatch.await(); assertThat(serverInputShutdownLatch.getCount(), is(1L)); assertThat(sChannel.isInputShutdown(), is(true)); @@ -776,8 +776,8 @@ void serverShutdownInputEmitsServerInputShutdownReadCompleteOnly() throws Except } @Test - void serverCloseEmitsNoShutdownEventsOnServer() { - sChannel.close().syncUninterruptibly(); + void serverCloseEmitsNoShutdownEventsOnServer() throws Exception { + sChannel.close().sync(); assertThat(serverOutputShutdownLatch.getCount(), equalTo(1L)); assertThat(serverInputShutdownLatch.getCount(), equalTo(1L)); assertThat(serverInputShutdownReadCompleteLatch.getCount(), equalTo(1L)); @@ -788,7 +788,7 @@ void serverCloseEmitsNoShutdownEventsOnServer() { @Test void serverCloseEmitsClientInputShutdownImmediatelyAndOutputAfterWriting() throws Exception { - sChannel.close().syncUninterruptibly(); + sChannel.close().sync(); clientInputShutdownReadCompleteLatch.await(); clientInputShutdownLatch.await(); assertThat(cChannel.isInputShutdown(), is(true)); @@ -800,12 +800,12 @@ void serverCloseEmitsClientInputShutdownImmediatelyAndOutputAfterWriting() throw assertThat(cChannel.isOpen(), is(true)); } - private void writeUntilFailure(Channel channel) throws InterruptedException { - channel.writeAndFlush(channel.alloc().buffer(1).writeZero(1)).syncUninterruptibly(); // triggers RST + private void writeUntilFailure(Channel channel) throws Exception { + channel.writeAndFlush(channel.alloc().buffer(1).writeZero(1)).sync(); // triggers RST for (;;) { try { // observes error - channel.writeAndFlush(channel.alloc().buffer(1).writeZero(1)).syncUninterruptibly(); + channel.writeAndFlush(channel.alloc().buffer(1).writeZero(1)).sync(); } catch (Exception ignored) { break; }