Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cancellation can leave an HTTP/1.x connection in half-closed state #2264

Merged
merged 3 commits into from
Jul 18, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2020-2021 Apple Inc. and the ServiceTalk project authors
* Copyright © 2020-2022 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,27 +15,38 @@
*/
package io.servicetalk.http.netty;

import io.servicetalk.buffer.api.CompositeBuffer;
import io.servicetalk.client.api.ConnectionFactory;
import io.servicetalk.client.api.ConnectionFactoryFilter;
import io.servicetalk.client.api.DelegatingConnectionFactory;
import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.SingleSource.Processor;
import io.servicetalk.concurrent.SingleSource.Subscriber;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.context.api.ContextMap;
import io.servicetalk.http.api.FilterableStreamingHttpConnection;
import io.servicetalk.http.api.HttpClient;
import io.servicetalk.http.api.HttpConnection;
import io.servicetalk.http.api.HttpExecutionStrategies;
import io.servicetalk.http.api.HttpResponse;
import io.servicetalk.http.api.HttpRequester;
import io.servicetalk.http.api.StreamingHttpConnectionFilter;
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.transport.api.ConnectionContext;
import io.servicetalk.transport.api.DelegatingConnectionAcceptor;
import io.servicetalk.transport.api.ServerContext;
import io.servicetalk.transport.api.TransportObserver;
import io.servicetalk.transport.netty.internal.ExecutionContextExtension;

import org.hamcrest.Matcher;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
Expand All @@ -46,37 +57,64 @@
import javax.annotation.Nullable;

import static io.servicetalk.concurrent.api.AsyncCloseables.newCompositeCloseable;
import static io.servicetalk.concurrent.api.Completable.completed;
import static io.servicetalk.concurrent.api.Processors.newSingleProcessor;
import static io.servicetalk.concurrent.api.Single.defer;
import static io.servicetalk.concurrent.api.SourceAdapters.fromSource;
import static io.servicetalk.http.netty.HttpClients.forSingleAddress;
import static io.servicetalk.http.netty.HttpServers.forAddress;
import static io.servicetalk.logging.api.LogLevel.TRACE;
import static io.servicetalk.transport.netty.internal.AddressUtils.localAddress;
import static io.servicetalk.transport.netty.internal.AddressUtils.serverHostAndPort;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;

@Timeout(3)
idelpivnitskiy marked this conversation as resolved.
Show resolved Hide resolved
class ResponseCancelTest {

private final BlockingQueue<Processor<HttpResponse, HttpResponse>> serverResponses;
@RegisterExtension
static final ExecutionContextExtension SERVER_CTX =
ExecutionContextExtension.cached("server-io", "server-executor")
.setClassLevel(true);
@RegisterExtension
static final ExecutionContextExtension CLIENT_CTX =
ExecutionContextExtension.cached("client-io", "client-executor")
.setClassLevel(true);

private final BlockingQueue<Processor<StreamingHttpResponse, StreamingHttpResponse>> serverResponses;
private final BlockingQueue<Cancellable> delayedClientCancels;
private final BlockingQueue<ClientTerminationSignal> delayedClientTermination;
private final ServerContext ctx;
private final HttpClient client;
private final AtomicInteger connectionCount = new AtomicInteger();
private final CountDownLatch clientConnectionClosed = new CountDownLatch(1);
private final CountDownLatch serverConnectionClosed = new CountDownLatch(1);

ResponseCancelTest() throws Exception {
serverResponses = new LinkedBlockingQueue<>();
delayedClientCancels = new LinkedBlockingQueue<>();
delayedClientTermination = new LinkedBlockingQueue<>();
ctx = forAddress(localAddress(0))
.listenAndAwait((__, ___, factory) -> {
Processor<HttpResponse, HttpResponse> resp = newSingleProcessor();
.ioExecutor(SERVER_CTX.ioExecutor())
.executor(SERVER_CTX.executor())
.enableWireLogging("servicetalk-tests-wire-logger", TRACE, () -> true)
.appendConnectionAcceptorFilter(original -> new DelegatingConnectionAcceptor(original) {
@Override
public Completable accept(final ConnectionContext context) {
context.onClose().whenFinally(serverConnectionClosed::countDown).subscribe();
idelpivnitskiy marked this conversation as resolved.
Show resolved Hide resolved
return completed();
}
})
.listenStreamingAndAwait((__, ___, factory) -> {
Processor<StreamingHttpResponse, StreamingHttpResponse> resp = newSingleProcessor();
serverResponses.add(resp);
return fromSource(resp);
});
client = forSingleAddress(serverHostAndPort(ctx))
.ioExecutor(CLIENT_CTX.ioExecutor())
.executor(CLIENT_CTX.executor())
.enableWireLogging("servicetalk-tests-wire-logger", TRACE, () -> true)
.appendConnectionFilter(connection -> new StreamingHttpConnectionFilter(connection) {
@Override
public Single<StreamingHttpResponse> request(final StreamingHttpRequest request) {
Expand All @@ -100,7 +138,7 @@ public void onError(final Throwable t) {
}
})
.appendConnectionFactoryFilter(ConnectionFactoryFilter.withStrategy(
original -> new CountingConnectionFactory(original, connectionCount),
original -> new CountingConnectionFactory(original, connectionCount, clientConnectionClosed),
HttpExecutionStrategies.offloadNone()))
.build();
}
Expand All @@ -112,9 +150,9 @@ void tearDown() throws Exception {
}

@Test
void cancel() throws Throwable {
void clientCancel() throws Throwable {
CountDownLatch latch1 = new CountDownLatch(1);
Cancellable cancellable = sendRequest(latch1);
Cancellable cancellable = sendRequest(client, latch1);
// wait for server to receive request.
serverResponses.take();
assertThat("Unexpected connections count.", connectionCount.get(), is(1));
Expand All @@ -126,54 +164,108 @@ void cancel() throws Throwable {
// and hence fail the response.
ClientTerminationSignal.resumeExpectFailure(delayedClientTermination, latch1,
instanceOf(ClosedChannelException.class));
clientConnectionClosed.await();
// serverConnectionClosed.await(); server doesn't close the connection bcz it hasn't finished the response yet
idelpivnitskiy marked this conversation as resolved.
Show resolved Hide resolved

CountDownLatch latch2 = new CountDownLatch(1);
sendRequest(latch2);
serverResponses.take().onSuccess(client.httpResponseFactory().ok());
ClientTerminationSignal.resume(delayedClientTermination, latch2);
assertThat("Unexpected connections count.", connectionCount.get(), is(2));
sendSecondRequest();
}

@Test
void cancelAfterSuccessOnTransport() throws Throwable {
void clientCancelAfterSuccessOnTransport() throws Throwable {
CountDownLatch latch1 = new CountDownLatch(1);
Cancellable cancellable = sendRequest(latch1);
Cancellable cancellable = sendRequest(client, latch1);
// wait for server to receive request.
Processor<HttpResponse, HttpResponse> serverResp = serverResponses.take();
Processor<StreamingHttpResponse, StreamingHttpResponse> serverResp = serverResponses.take();
assertThat("Unexpected connections count.", connectionCount.get(), is(1));

serverResp.onSuccess(client.httpResponseFactory().ok());
serverResp.onSuccess(client.asStreamingClient().httpResponseFactory().ok());
cancellable.cancel();
// wait for cancel to be observed but don't send cancel to the transport so that transport does not close the
// connection which will then be ambiguous.
delayedClientCancels.take();
// As there is a race between completion and cancellation, we may get a success or failure, so just wait for
// any termination.
delayedClientTermination.take().resume();
latch1.await();
clientConnectionClosed.await();
serverConnectionClosed.await();

sendSecondRequest();
}

@Test
void connectionCancel() throws Throwable {
HttpConnection connection = client.reserveConnection(client.get("/")).toFuture().get();
Cancellable cancellable = sendRequest(connection, null);
// wait for server to receive request.
serverResponses.take();
assertThat("Unexpected connections count.", connectionCount.get(), is(1));
cancellable.cancel();
// wait for cancel to be observed and propagate it to the transport to initiate connection closure.
delayedClientCancels.take().cancel();
// Transport should close the connection, the response terminal signal is not guaranteed after cancellation.
clientConnectionClosed.await();
// serverConnectionClosed.await(); server doesn't close the connection bcz it hasn't finished the response yet

sendSecondRequest();
}

@ParameterizedTest
@ValueSource(booleans = {false, true})
void connectionCancelWaitingForPayloadBody(boolean finishRequest) throws Throwable {
HttpConnection connection = client.reserveConnection(client.get("/")).toFuture().get();
Cancellable cancellable = finishRequest ? sendRequest(connection, null) :
connection.asStreamingConnection().request(connection.asStreamingConnection().post("/")
.payloadBody(Publisher.never())).flatMapPublisher(StreamingHttpResponse::payloadBody)
.collect(() -> connection.executionContext().bufferAllocator().newCompositeBuffer(),
CompositeBuffer::addBuffer)
.subscribe(__ -> { });
// wait for server to receive request.
Processor<StreamingHttpResponse, StreamingHttpResponse> serverResp = serverResponses.take();
assertThat("Unexpected connections count.", connectionCount.get(), is(1));

serverResp.onSuccess(connection.asStreamingConnection().httpResponseFactory().ok()
.payloadBody(Publisher.never()));
// wait for response meta-data to be received.
delayedClientTermination.take().resume();
// cancel payload body.
cancellable.cancel();
// Transport should close the connection, the response terminal signal is not guaranteed after cancellation.
clientConnectionClosed.await();
// serverConnectionClosed.await(); server doesn't close the connection bcz it hasn't finished the response yet

sendSecondRequest();
}

private void sendSecondRequest() throws Throwable {
// Validate client can still communicate with a server using a new connection.
CountDownLatch latch2 = new CountDownLatch(1);
sendRequest(latch2);
serverResponses.take().onSuccess(client.httpResponseFactory().ok());
sendRequest(client, latch2);
serverResponses.take().onSuccess(client.asStreamingClient().httpResponseFactory().ok());
ClientTerminationSignal.resume(delayedClientTermination, latch2);
assertThat("Unexpected connections count.", connectionCount.get(), is(2));
}

private Cancellable sendRequest(final CountDownLatch latch) {
return client.request(client.get("/"))
.afterOnSuccess(__ -> latch.countDown())
.afterOnError(__ -> latch.countDown())
.subscribe(__ -> { });
private static Cancellable sendRequest(final HttpRequester requester, @Nullable final CountDownLatch latch) {
return (latch == null ? requester.request(requester.get("/")) :
requester.request(requester.get("/"))
.afterOnSuccess(__ -> latch.countDown())
.afterOnError(__ -> latch.countDown())
).subscribe(__ -> { });
}

private static class CountingConnectionFactory
extends DelegatingConnectionFactory<InetSocketAddress, FilterableStreamingHttpConnection> {
private final AtomicInteger connectionCount;
private final CountDownLatch clientConnectionClosed;

CountingConnectionFactory(
final ConnectionFactory<InetSocketAddress, FilterableStreamingHttpConnection> delegate,
final AtomicInteger connectionCount) {
final AtomicInteger connectionCount,
final CountDownLatch clientConnectionClosed) {
super(delegate);
this.connectionCount = connectionCount;
this.clientConnectionClosed = clientConnectionClosed;
}

@Override
Expand All @@ -182,7 +274,9 @@ public Single<FilterableStreamingHttpConnection> newConnection(final InetSocketA
@Nullable final TransportObserver observer) {
return defer(() -> {
connectionCount.incrementAndGet();
return delegate().newConnection(inetSocketAddress, context, observer);
return delegate().newConnection(inetSocketAddress, context, observer).whenOnSuccess(c -> {
c.onClose().whenFinally(clientConnectionClosed::countDown).subscribe();
});
});
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2018, 2020 Apple Inc. and the ServiceTalk project authors
* Copyright © 2018, 2020-2022 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -260,9 +260,12 @@ private void emitCatchError(@Nullable SubscriptionImpl target, Throwable cause,
if (target != null) {
emitError(target, cause);
} else {
LOGGER.debug("caught unexpected exception, closing channel {}", channel, cause);
// This branch executes only when an error is originated by the current Subscriber: either an unexpected
// exception is thrown from Subscriber.onComplete() or cancellation.
// If an incomplete subscriber is cancelled then close channel. A subscriber can cancel after getting
// complete, which should not close the channel.
// complete, which should not close the channel (won't reach this point, returns earlier).
// Use outbound/inbound closure instead of channel.close() to register CHANNEL_CLOSED_OUTBOUND event.
closeChannelOutbound();
closeChannelInbound();
}
}
Expand All @@ -282,6 +285,8 @@ private void emitComplete(SubscriptionImpl target) {
try {
target.associatedSub.onComplete();
} catch (Throwable cause) {
LOGGER.debug("Caught unexpected exception from Subscriber {}, closing channel {}",
target.associatedSub, channel, cause);
emitCatchError(null, cause, false);
}
}
Expand All @@ -308,13 +313,20 @@ private void cancel0(SubscriptionImpl forSubscription) {

// If a cancel occurs with a valid subscription we need to clear any pending data and set a fatalError so that
// any future Subscribers don't get partial data delivered from the queue.
// We don't need to terminate the subscriber because cancellation is originated by the subscriber, pass null.
emitCatchError(null, StacklessClosedChannelException.newInstance(NettyChannelPublisher.class, "cancel"), true);
}

// For cases when an error occurred in netty pipeline
private void closeChannelInbound() {
closeHandler.closeChannelInbound(channel);
}

// For cases with an error occurred in subscriber or a result of cancellation
private void closeChannelOutbound() {
closeHandler.closeChannelOutbound(channel);
}

private void resetSubscription() {
subscription = null;
requestCount = 0;
Expand Down