Skip to content

Commit

Permalink
Deprecate syncUninterruptibly() and `awaitRequestNUninterruptibly()…
Browse files Browse the repository at this point in the history
…` in tests (#1756)

Motivation:
Currently netty's syncUninterruptibly() and
awaitRequestNUninterruptibly() do not
propagate `InterruptedException`, meaning that jUnit5 is not
able to stop timeouted tests.

Modifications:
- change the uses of `syncUninterruptibly()` to `sync()`
- on `safeSync()` use `Callable` instead of `Runnable`, because
`Runnable` is unable to throw exception;
- Change all usages of `awaitRequestNUninterruptibly()`
to `awaitRequestN()`;
- add `@Deprecated` to `awaitRequestNUninterruptibly()`;

Result:
`InterruptedException` is no longer silently discarded,
meaning jUnit5 will be able to abort the timeouted test.
  • Loading branch information
Krupskis authored Aug 30, 2021
1 parent 9c01cce commit 8e35022
Show file tree
Hide file tree
Showing 16 changed files with 73 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,11 @@ public void awaitRequestN(long amount) throws InterruptedException {
/**
* Wait until the {@link Subscription#request(long)} amount exceeds {@code amount} without being interrupted. This
* method catches an {@link InterruptedException} and discards it silently.
* @deprecated use {@link #awaitRequestN(long)} instead.
*
* @param amount the amount to wait for.
*/
@Deprecated
public void awaitRequestNUninterruptibly(long amount) {
boolean interrupted = false;
synchronized (waitingLock) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ private EmbeddedChannel newEmbeddedChannel(final ChannelHandler handler, final B

private static void preparePendingData(final EmbeddedChannel channel) {
try {
channel.close().syncUninterruptibly().get();
channel.close().sync().get();
channel.checkException();
} catch (InterruptedException | ExecutionException ex) {
throwException(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ private static EmbeddedChannel newEmbeddedChannel(final ChannelHandler handler,

static void preparePendingData(final EmbeddedChannel channel) {
try {
channel.close().syncUninterruptibly().get();
channel.close().sync().get();
channel.checkException();
} catch (InterruptedException | ExecutionException ex) {
throwException(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class H2ConcurrencyControllerTest {
private final CountDownLatch[] latches = new CountDownLatch[N_ITERATIONS];

@BeforeEach
void setUp() {
void setUp() throws Exception {
serverEventLoopGroup = createIoExecutor(1, "server-io").eventLoopGroup();
for (int i = 0; i < N_ITERATIONS; i++) {
latches[i] = new CountDownLatch(1);
Expand Down Expand Up @@ -149,9 +149,9 @@ public void onError(final Throwable t) {
}

@AfterEach
void tearDown() {
safeSync(() -> serverAcceptorChannel.close().syncUninterruptibly());
safeSync(() -> serverEventLoopGroup.shutdownGracefully(0, 0, MILLISECONDS).syncUninterruptibly());
void tearDown() throws Exception {
safeSync(() -> serverAcceptorChannel.close().sync());
safeSync(() -> serverEventLoopGroup.shutdownGracefully(0, 0, MILLISECONDS).sync());
safeClose(client);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,12 @@ private static Stream<Arguments> clientExecutors() {
@AfterEach
void teardown() throws Exception {
if (serverAcceptorChannel != null) {
serverAcceptorChannel.close().syncUninterruptibly();
serverAcceptorChannel.close().sync();
}
if (h1ServerContext != null) {
h1ServerContext.close();
}
serverEventLoopGroup.shutdownGracefully(0, 0, MILLISECONDS).syncUninterruptibly();
serverEventLoopGroup.shutdownGracefully(0, 0, MILLISECONDS).sync();
Executor executor = clientExecutionStrategy.executor();
if (executor != null) {
executor.closeAsync().toFuture().get();
Expand Down Expand Up @@ -1322,7 +1322,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
serverChannelLatch.await();
Channel serverParentChannel = serverParentChannelRef.get();
serverParentChannel.writeAndFlush(new DefaultHttp2SettingsFrame(
new Http2Settings().maxConcurrentStreams(expectedMaxConcurrent))).syncUninterruptibly();
new Http2Settings().maxConcurrentStreams(expectedMaxConcurrent))).sync();

Iterator<? extends ConsumableEvent<Integer>> maxItr = maxConcurrentPubQueue.take().toIterable().iterator();
// Verify that the initial maxConcurrency value is the default number
Expand Down Expand Up @@ -1488,7 +1488,7 @@ private static Processor<Buffer, Buffer> newProcessor() {

static Channel bindH2Server(EventLoopGroup serverEventLoopGroup, ChannelHandler childChannelHandler,
Consumer<ChannelPipeline> parentChannelInitializer,
UnaryOperator<Http2FrameCodecBuilder> configureH2Codec) {
UnaryOperator<Http2FrameCodecBuilder> configureH2Codec) throws Exception {
ServerBootstrap sb = new ServerBootstrap();
sb.group(serverEventLoopGroup);
sb.channel(serverChannel(serverEventLoopGroup, InetSocketAddress.class));
Expand All @@ -1500,7 +1500,7 @@ protected void initChannel(final Channel ch) {
parentChannelInitializer.accept(ch.pipeline());
}
});
return sb.bind(localAddress(0)).syncUninterruptibly().channel();
return sb.bind(localAddress(0)).sync().channel();
}

private InetSocketAddress bindHttpEchoServer() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ void afterResponse() throws Exception {
assertThrows(DecoderException.class, () -> connection.request(connection.get("/")));
connectionClosedLatch.await();
} finally {
server.close().syncUninterruptibly();
server.close().sync();
}
}

Expand Down Expand Up @@ -126,7 +126,7 @@ void afterRequest() throws Exception {
}
}

private static ServerSocketChannel nettyServer(String response) {
private static ServerSocketChannel nettyServer(String response) throws Exception {
EventLoopGroup eventLoopGroup = toEventLoopAwareNettyIoExecutor(SERVER_CTX.ioExecutor()).eventLoopGroup();
ServerBootstrap bs = new ServerBootstrap();
bs.group(eventLoopGroup);
Expand All @@ -147,7 +147,7 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
});
}
});
return (ServerSocketChannel) bs.bind(localAddress(0)).syncUninterruptibly().channel();
return (ServerSocketChannel) bs.bind(localAddress(0)).sync().channel();
}

private static ServerContext stServer() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ void updateFlushStrategy(HttpExecutionStrategy serverExecutionStrategy,
String payloadBodyString = "foo";
TestSubscription testSubscription1 = new TestSubscription();
responsePublisher.onSubscribe(testSubscription1);
testSubscription1.awaitRequestNUninterruptibly(1);
testSubscription1.awaitRequestN(1);
responsePublisher.onNext(DEFAULT_ALLOCATOR.fromAscii(payloadBodyString));
responsePublisher.onComplete();
customFlushSender.flush();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
bs.childOption(ALLOW_HALF_CLOSURE, true);
bs.childOption(AUTO_CLOSE, false);
server = (ServerSocketChannel) bs.bind(localAddress(0))
.syncUninterruptibly().channel();
.sync().channel();

client = HttpClients.forSingleAddress(HostAndPort.of(server.localAddress()))
.enableWireLogging("servicetalk-tests-wire-logger", TRACE, () -> true)
Expand All @@ -125,7 +125,7 @@ void turnDown() throws Exception {
try {
client.closeGracefully();
} finally {
server.close().syncUninterruptibly();
server.close().sync();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ void tearDown() throws Exception {
serverConnection.closeAsyncGracefully().toFuture().get();
} finally {
channel.finishAndReleaseAll();
channel.close().syncUninterruptibly();
channel.close().sync();
}
}

Expand Down Expand Up @@ -244,9 +244,9 @@ private void verifyResponse(String requestPath) {
assertThat("Unexpected response trailers object", trailers.readableBytes(), is(0));
}

private void respondWithFIN() {
private void respondWithFIN() throws Exception {
assertThat("Server did not shutdown output", channel.isOutputShutdown(), is(true));
channel.shutdownInput().syncUninterruptibly(); // simulate FIN from the client
channel.shutdownInput().sync(); // simulate FIN from the client
}

private void assertServerConnectionClosed() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ protected void initChannel(final Channel ch) {
p.addLast(EchoServerHandler.INSTANCE);
}
});
serverAcceptorChannel = sb.bind(localAddress(0)).syncUninterruptibly().channel();
serverAcceptorChannel = sb.bind(localAddress(0)).sync().channel();

try (BlockingHttpClient client = HttpClients.forSingleAddress(
HostAndPort.of((InetSocketAddress) serverAcceptorChannel.localAddress()))
Expand All @@ -106,9 +106,9 @@ protected void initChannel(final Channel ch) {
}
} finally {
if (serverAcceptorChannel != null) {
serverAcceptorChannel.close().syncUninterruptibly();
serverAcceptorChannel.close().sync();
}
serverEventLoopGroup.shutdownGracefully(0, 0, MILLISECONDS).syncUninterruptibly();
serverEventLoopGroup.shutdownGracefully(0, 0, MILLISECONDS).sync();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class ServiceTalkToNettyContentCodingCompatibilityTest extends ServiceTalkConten
private BlockingHttpClient client;

@Override
void start() {
void start() throws Exception {
serverEventLoopGroup = createIoExecutor(2, "server-io").eventLoopGroup();
serverAcceptorChannel = newNettyServer();
InetSocketAddress serverAddress = (InetSocketAddress) serverAcceptorChannel.localAddress();
Expand All @@ -70,17 +70,17 @@ void start() {
@AfterEach
void finish() throws Exception {
if (serverAcceptorChannel != null) {
serverAcceptorChannel.close().syncUninterruptibly();
serverAcceptorChannel.close().sync();
}
if (serverEventLoopGroup != null) {
serverEventLoopGroup.shutdownGracefully(0, 0, MILLISECONDS).syncUninterruptibly();
serverEventLoopGroup.shutdownGracefully(0, 0, MILLISECONDS).sync();
}
if (client != null) {
client.close();
}
}

private Channel newNettyServer() {
private Channel newNettyServer() throws Exception {
ServerBootstrap sb = new ServerBootstrap();
sb.group(serverEventLoopGroup);
sb.channel(serverChannel(serverEventLoopGroup, InetSocketAddress.class));
Expand All @@ -97,7 +97,7 @@ protected void initChannel(final Channel ch) {
p.addLast(EchoServerHandler.INSTANCE);
}
});
return sb.bind(localAddress(0)).syncUninterruptibly().channel();
return sb.bind(localAddress(0)).sync().channel();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@

import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;

Expand Down Expand Up @@ -83,7 +84,7 @@ class StreamObserverTest {
private final HttpClient client;
private final CountDownLatch requestReceived = new CountDownLatch(1);

StreamObserverTest() {
StreamObserverTest() throws Exception {
clientTransportObserver = mock(TransportObserver.class, "clientTransportObserver");
clientConnectionObserver = mock(ConnectionObserver.class, "clientConnectionObserver");
clientMultiplexedObserver = mock(MultiplexedObserver.class, "clientMultiplexedObserver");
Expand Down Expand Up @@ -122,15 +123,18 @@ protected void channelRead0(final ChannelHandlerContext ctx, final Object msg) {
}

@AfterEach
void teardown() {
safeSync(() -> serverAcceptorChannel.close().syncUninterruptibly());
safeSync(() -> serverEventLoopGroup.shutdownGracefully(0, 0, MILLISECONDS).syncUninterruptibly());
void teardown() throws Exception {
safeSync(() -> serverAcceptorChannel.close().sync());
safeSync(() -> serverEventLoopGroup.shutdownGracefully(0, 0, MILLISECONDS).sync());
safeClose(client);
}

static void safeSync(Runnable runnable) {
static void safeSync(Callable<Object> callable) throws Exception {
try {
runnable.run();
callable.call();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw e;
} catch (Exception e) {
e.printStackTrace();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public Single<NettyConnection<Buffer, Buffer>> connect(ExecutionContext executio
*/
public NettyConnection<Buffer, Buffer> connectWithFdBlocking(ExecutionContext executionContext,
SocketAddress address)
throws ExecutionException, InterruptedException {
throws Exception {
assumeTrue(executionContext.ioExecutor().isFileDescriptorSocketAddressSupported());
assumeTrue(Epoll.isAvailable() || KQueue.isAvailable());

Expand All @@ -137,10 +137,10 @@ public void channelRead0(ChannelHandlerContext ctx, Object msg) {
}
})
.connect(address)
.syncUninterruptibly().channel();
.sync().channel();

// Unregister it from the netty EventLoop as we want to to handle it via ST.
channel.deregister().syncUninterruptibly();
channel.deregister().sync();
FileDescriptorSocketAddress fd = new FileDescriptorSocketAddress(channel.fd().intValue());
NettyConnection<Buffer, Buffer> connection = connectBlocking(executionContext, fd);
assertThat("Data read on the FileDescriptor from netty pipeline.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public void tearDown() throws Exception {
conn.closeAsync().toFuture().get();
} finally {
channel.finishAndReleaseAll();
channel.close().syncUninterruptibly();
channel.close().sync();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,8 @@ void testTerminalPredicateThrowTerminatesReadPublisher() throws Exception {
}

@Test
void testNoErrorEnrichmentWithoutCloseHandlerOnError() {
channel.close().syncUninterruptibly();
void testNoErrorEnrichmentWithoutCloseHandlerOnError() throws Exception {
channel.close().sync();
toSource(conn.write(publisher)).subscribe(writeListener);

Throwable error = writeListener.awaitOnError();
Expand Down
Loading

0 comments on commit 8e35022

Please sign in to comment.