Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deprecate syncUninterruptibly() and awaitRequestNUninterruptibly() in tests #1756

Merged
merged 3 commits into from
Aug 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,11 @@ public void awaitRequestN(long amount) throws InterruptedException {
/**
* Wait until the {@link Subscription#request(long)} amount exceeds {@code amount} without being interrupted. This
* method catches an {@link InterruptedException} and discards it silently.
* @deprecated use {@link #awaitRequestN(long)} instead.
*
* @param amount the amount to wait for.
*/
@Deprecated
public void awaitRequestNUninterruptibly(long amount) {
boolean interrupted = false;
synchronized (waitingLock) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ private EmbeddedChannel newEmbeddedChannel(final ChannelHandler handler, final B

private static void preparePendingData(final EmbeddedChannel channel) {
try {
channel.close().syncUninterruptibly().get();
channel.close().sync().get();
channel.checkException();
} catch (InterruptedException | ExecutionException ex) {
throwException(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ private static EmbeddedChannel newEmbeddedChannel(final ChannelHandler handler,

static void preparePendingData(final EmbeddedChannel channel) {
try {
channel.close().syncUninterruptibly().get();
channel.close().sync().get();
channel.checkException();
} catch (InterruptedException | ExecutionException ex) {
throwException(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class H2ConcurrencyControllerTest {
private final CountDownLatch[] latches = new CountDownLatch[N_ITERATIONS];

@BeforeEach
void setUp() {
void setUp() throws Exception {
serverEventLoopGroup = createIoExecutor(1, "server-io").eventLoopGroup();
for (int i = 0; i < N_ITERATIONS; i++) {
latches[i] = new CountDownLatch(1);
Expand Down Expand Up @@ -149,9 +149,9 @@ public void onError(final Throwable t) {
}

@AfterEach
void tearDown() {
safeSync(() -> serverAcceptorChannel.close().syncUninterruptibly());
safeSync(() -> serverEventLoopGroup.shutdownGracefully(0, 0, MILLISECONDS).syncUninterruptibly());
void tearDown() throws Exception {
safeSync(() -> serverAcceptorChannel.close().sync());
safeSync(() -> serverEventLoopGroup.shutdownGracefully(0, 0, MILLISECONDS).sync());
safeClose(client);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,12 @@ private static Stream<Arguments> clientExecutors() {
@AfterEach
void teardown() throws Exception {
if (serverAcceptorChannel != null) {
serverAcceptorChannel.close().syncUninterruptibly();
serverAcceptorChannel.close().sync();
}
if (h1ServerContext != null) {
h1ServerContext.close();
}
serverEventLoopGroup.shutdownGracefully(0, 0, MILLISECONDS).syncUninterruptibly();
serverEventLoopGroup.shutdownGracefully(0, 0, MILLISECONDS).sync();
Executor executor = clientExecutionStrategy.executor();
if (executor != null) {
executor.closeAsync().toFuture().get();
Expand Down Expand Up @@ -1322,7 +1322,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
serverChannelLatch.await();
Channel serverParentChannel = serverParentChannelRef.get();
serverParentChannel.writeAndFlush(new DefaultHttp2SettingsFrame(
new Http2Settings().maxConcurrentStreams(expectedMaxConcurrent))).syncUninterruptibly();
new Http2Settings().maxConcurrentStreams(expectedMaxConcurrent))).sync();

Iterator<? extends ConsumableEvent<Integer>> maxItr = maxConcurrentPubQueue.take().toIterable().iterator();
// Verify that the initial maxConcurrency value is the default number
Expand Down Expand Up @@ -1488,7 +1488,7 @@ private static Processor<Buffer, Buffer> newProcessor() {

static Channel bindH2Server(EventLoopGroup serverEventLoopGroup, ChannelHandler childChannelHandler,
Consumer<ChannelPipeline> parentChannelInitializer,
UnaryOperator<Http2FrameCodecBuilder> configureH2Codec) {
UnaryOperator<Http2FrameCodecBuilder> configureH2Codec) throws Exception {
ServerBootstrap sb = new ServerBootstrap();
sb.group(serverEventLoopGroup);
sb.channel(serverChannel(serverEventLoopGroup, InetSocketAddress.class));
Expand All @@ -1500,7 +1500,7 @@ protected void initChannel(final Channel ch) {
parentChannelInitializer.accept(ch.pipeline());
}
});
return sb.bind(localAddress(0)).syncUninterruptibly().channel();
return sb.bind(localAddress(0)).sync().channel();
}

private InetSocketAddress bindHttpEchoServer() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ void afterResponse() throws Exception {
assertThrows(DecoderException.class, () -> connection.request(connection.get("/")));
connectionClosedLatch.await();
} finally {
server.close().syncUninterruptibly();
server.close().sync();
}
}

Expand Down Expand Up @@ -126,7 +126,7 @@ void afterRequest() throws Exception {
}
}

private static ServerSocketChannel nettyServer(String response) {
private static ServerSocketChannel nettyServer(String response) throws Exception {
EventLoopGroup eventLoopGroup = toEventLoopAwareNettyIoExecutor(SERVER_CTX.ioExecutor()).eventLoopGroup();
ServerBootstrap bs = new ServerBootstrap();
bs.group(eventLoopGroup);
Expand All @@ -147,7 +147,7 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
});
}
});
return (ServerSocketChannel) bs.bind(localAddress(0)).syncUninterruptibly().channel();
return (ServerSocketChannel) bs.bind(localAddress(0)).sync().channel();
}

private static ServerContext stServer() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ void updateFlushStrategy(HttpExecutionStrategy serverExecutionStrategy,
String payloadBodyString = "foo";
TestSubscription testSubscription1 = new TestSubscription();
responsePublisher.onSubscribe(testSubscription1);
testSubscription1.awaitRequestNUninterruptibly(1);
testSubscription1.awaitRequestN(1);
responsePublisher.onNext(DEFAULT_ALLOCATOR.fromAscii(payloadBodyString));
responsePublisher.onComplete();
customFlushSender.flush();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
bs.childOption(ALLOW_HALF_CLOSURE, true);
bs.childOption(AUTO_CLOSE, false);
server = (ServerSocketChannel) bs.bind(localAddress(0))
.syncUninterruptibly().channel();
.sync().channel();

client = HttpClients.forSingleAddress(HostAndPort.of(server.localAddress()))
.enableWireLogging("servicetalk-tests-wire-logger", TRACE, () -> true)
Expand All @@ -125,7 +125,7 @@ void turnDown() throws Exception {
try {
client.closeGracefully();
} finally {
server.close().syncUninterruptibly();
server.close().sync();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ void tearDown() throws Exception {
serverConnection.closeAsyncGracefully().toFuture().get();
} finally {
channel.finishAndReleaseAll();
channel.close().syncUninterruptibly();
channel.close().sync();
}
}

Expand Down Expand Up @@ -244,9 +244,9 @@ private void verifyResponse(String requestPath) {
assertThat("Unexpected response trailers object", trailers.readableBytes(), is(0));
}

private void respondWithFIN() {
private void respondWithFIN() throws Exception {
assertThat("Server did not shutdown output", channel.isOutputShutdown(), is(true));
channel.shutdownInput().syncUninterruptibly(); // simulate FIN from the client
channel.shutdownInput().sync(); // simulate FIN from the client
}

private void assertServerConnectionClosed() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ protected void initChannel(final Channel ch) {
p.addLast(EchoServerHandler.INSTANCE);
}
});
serverAcceptorChannel = sb.bind(localAddress(0)).syncUninterruptibly().channel();
serverAcceptorChannel = sb.bind(localAddress(0)).sync().channel();

try (BlockingHttpClient client = HttpClients.forSingleAddress(
HostAndPort.of((InetSocketAddress) serverAcceptorChannel.localAddress()))
Expand All @@ -106,9 +106,9 @@ protected void initChannel(final Channel ch) {
}
} finally {
if (serverAcceptorChannel != null) {
serverAcceptorChannel.close().syncUninterruptibly();
serverAcceptorChannel.close().sync();
}
serverEventLoopGroup.shutdownGracefully(0, 0, MILLISECONDS).syncUninterruptibly();
serverEventLoopGroup.shutdownGracefully(0, 0, MILLISECONDS).sync();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class ServiceTalkToNettyContentCodingCompatibilityTest extends ServiceTalkConten
private BlockingHttpClient client;

@Override
void start() {
void start() throws Exception {
serverEventLoopGroup = createIoExecutor(2, "server-io").eventLoopGroup();
serverAcceptorChannel = newNettyServer();
InetSocketAddress serverAddress = (InetSocketAddress) serverAcceptorChannel.localAddress();
Expand All @@ -70,17 +70,17 @@ void start() {
@AfterEach
void finish() throws Exception {
if (serverAcceptorChannel != null) {
serverAcceptorChannel.close().syncUninterruptibly();
serverAcceptorChannel.close().sync();
}
if (serverEventLoopGroup != null) {
serverEventLoopGroup.shutdownGracefully(0, 0, MILLISECONDS).syncUninterruptibly();
serverEventLoopGroup.shutdownGracefully(0, 0, MILLISECONDS).sync();
}
if (client != null) {
client.close();
}
}

private Channel newNettyServer() {
private Channel newNettyServer() throws Exception {
ServerBootstrap sb = new ServerBootstrap();
sb.group(serverEventLoopGroup);
sb.channel(serverChannel(serverEventLoopGroup, InetSocketAddress.class));
Expand All @@ -97,7 +97,7 @@ protected void initChannel(final Channel ch) {
p.addLast(EchoServerHandler.INSTANCE);
}
});
return sb.bind(localAddress(0)).syncUninterruptibly().channel();
return sb.bind(localAddress(0)).sync().channel();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@

import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;

Expand Down Expand Up @@ -83,7 +84,7 @@ class StreamObserverTest {
private final HttpClient client;
private final CountDownLatch requestReceived = new CountDownLatch(1);

StreamObserverTest() {
StreamObserverTest() throws Exception {
clientTransportObserver = mock(TransportObserver.class, "clientTransportObserver");
clientConnectionObserver = mock(ConnectionObserver.class, "clientConnectionObserver");
clientMultiplexedObserver = mock(MultiplexedObserver.class, "clientMultiplexedObserver");
Expand Down Expand Up @@ -122,15 +123,18 @@ protected void channelRead0(final ChannelHandlerContext ctx, final Object msg) {
}

@AfterEach
void teardown() {
safeSync(() -> serverAcceptorChannel.close().syncUninterruptibly());
safeSync(() -> serverEventLoopGroup.shutdownGracefully(0, 0, MILLISECONDS).syncUninterruptibly());
void teardown() throws Exception {
safeSync(() -> serverAcceptorChannel.close().sync());
safeSync(() -> serverEventLoopGroup.shutdownGracefully(0, 0, MILLISECONDS).sync());
safeClose(client);
}

static void safeSync(Runnable runnable) {
static void safeSync(Callable<Object> callable) throws Exception {
try {
runnable.run();
callable.call();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw e;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Doesn’t safeClose() in HttpsProxyTest also not Propagate the InterruptedException, which might result in timeouts as well?

Good catch! You are right. Also, before re-throwing the InterruptedException, we should reset interrupted flag:

Thread.currentThread().interrupt();
throw e;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, forgot that it resets the value :)

} catch (Exception e) {
e.printStackTrace();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public Single<NettyConnection<Buffer, Buffer>> connect(ExecutionContext executio
*/
public NettyConnection<Buffer, Buffer> connectWithFdBlocking(ExecutionContext executionContext,
SocketAddress address)
throws ExecutionException, InterruptedException {
throws Exception {
assumeTrue(executionContext.ioExecutor().isFileDescriptorSocketAddressSupported());
assumeTrue(Epoll.isAvailable() || KQueue.isAvailable());

Expand All @@ -137,10 +137,10 @@ public void channelRead0(ChannelHandlerContext ctx, Object msg) {
}
})
.connect(address)
.syncUninterruptibly().channel();
.sync().channel();

// Unregister it from the netty EventLoop as we want to to handle it via ST.
channel.deregister().syncUninterruptibly();
channel.deregister().sync();
FileDescriptorSocketAddress fd = new FileDescriptorSocketAddress(channel.fd().intValue());
NettyConnection<Buffer, Buffer> connection = connectBlocking(executionContext, fd);
assertThat("Data read on the FileDescriptor from netty pipeline.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public void tearDown() throws Exception {
conn.closeAsync().toFuture().get();
} finally {
channel.finishAndReleaseAll();
channel.close().syncUninterruptibly();
channel.close().sync();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,8 @@ void testTerminalPredicateThrowTerminatesReadPublisher() throws Exception {
}

@Test
void testNoErrorEnrichmentWithoutCloseHandlerOnError() {
channel.close().syncUninterruptibly();
void testNoErrorEnrichmentWithoutCloseHandlerOnError() throws Exception {
channel.close().sync();
toSource(conn.write(publisher)).subscribe(writeListener);

Throwable error = writeListener.awaitOnError();
Expand Down
Loading