Skip to content

Commit

Permalink
Revert "Revert "Server control flow should close connection on `Decod…
Browse files Browse the repository at this point in the history
…erException` (apple#2419)""

This reverts commit de8721e.
  • Loading branch information
idelpivnitskiy committed Dec 17, 2022
1 parent 009a874 commit 6d65dd6
Show file tree
Hide file tree
Showing 2 changed files with 200 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import io.servicetalk.transport.api.ConnectionObserver;
import io.servicetalk.transport.api.ServerContext;
import io.servicetalk.transport.api.SslConfig;
import io.servicetalk.transport.netty.internal.ChannelCloseUtils;
import io.servicetalk.transport.netty.internal.ChannelInitializer;
import io.servicetalk.transport.netty.internal.CloseHandler;
import io.servicetalk.transport.netty.internal.CloseHandler.CloseEventObservedException;
Expand Down Expand Up @@ -288,7 +289,7 @@ void process(final boolean handleMultipleRequests) {
meta.headers(), executionContext().bufferAllocator(), payload,
requireTrailerHeader, headersFactory)));
toSource(handleRequestAndWriteResponse(requestSingle, handleMultipleRequests))
.subscribe(new ErrorLoggingHttpSubscriber(connection));
.subscribe(new ErrorLoggingHttpSubscriber(this));
}

@Override
Expand Down Expand Up @@ -587,9 +588,9 @@ private static final class ErrorLoggingHttpSubscriber implements CompletableSour

private static final Logger LOGGER = LoggerFactory.getLogger(ErrorLoggingHttpSubscriber.class);

private final NettyConnection<Object, Object> connection;
private final NettyHttpServerConnection connection;

ErrorLoggingHttpSubscriber(final NettyConnection<Object, Object> connection) {
ErrorLoggingHttpSubscriber(final NettyHttpServerConnection connection) {
this.connection = connection;
}

Expand Down Expand Up @@ -624,14 +625,24 @@ public void onError(final Throwable t) {
}

private static void logDecoderException(final DecoderException e,
final NettyConnection<Object, Object> connection) {
LOGGER.warn("{} Can not decode a message, no more requests will be received on this {} {}.", connection,
connection.protocol(), HTTP_2_0.equals(connection.protocol()) ? "stream" : "connection", e);
final NettyHttpServerConnection connection) {
final String whatClosing = HTTP_2_0.compareTo(connection.protocol()) <= 0 ? "stream" : "connection";
final boolean isOpen = connection.nettyChannel().isOpen();
final String closeStatement = isOpen ? ", closing it" : "";
LOGGER.warn("{} Can not decode a message, no more requests will be received on this {} {}{} due to:",
connection, connection.protocol(), whatClosing, closeStatement, e);
if (isOpen) {
ChannelCloseUtils.close(connection.nettyChannel(), e);
}
}

private static void logUnexpectedException(final Throwable t, NettyConnection<Object, Object> connection) {
LOGGER.debug("{} Unexpected error received, closing {} {} due to:", connection, connection.protocol(),
HTTP_2_0.equals(connection.protocol()) ? "stream" : "connection", t);
private static void logUnexpectedException(final Throwable t, NettyHttpServerConnection connection) {
final String whatClosing = HTTP_2_0.compareTo(connection.protocol()) <= 0 ? "stream" : "connection";
LOGGER.debug("{} Unexpected error received, closing {} {} due to:",
connection, connection.protocol(), whatClosing, t);
if (connection.nettyChannel().isOpen()) {
ChannelCloseUtils.close(connection.nettyChannel(), t);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,38 @@
import io.servicetalk.concurrent.PublisherSource.Subscriber;
import io.servicetalk.concurrent.PublisherSource.Subscription;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.http.api.BlockingHttpClient;
import io.servicetalk.http.api.Http2Exception;
import io.servicetalk.http.api.HttpExecutionStrategy;
import io.servicetalk.http.api.HttpPayloadWriter;
import io.servicetalk.http.api.HttpServerBuilder;
import io.servicetalk.http.api.HttpServerContext;
import io.servicetalk.http.api.HttpServiceContext;
import io.servicetalk.http.api.StreamingHttpClient;
import io.servicetalk.http.api.StreamingHttpConnection;
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.http.api.StreamingHttpResponseFactory;
import io.servicetalk.http.api.StreamingHttpService;
import io.servicetalk.http.api.StreamingHttpServiceFilter;
import io.servicetalk.http.api.StreamingHttpServiceFilterFactory;
import io.servicetalk.http.netty.RetryingHttpRequesterFilter.HttpResponseException;
import io.servicetalk.transport.netty.internal.CloseHandler.CloseEventObservedException;
import io.servicetalk.transport.netty.internal.ExecutionContextExtension;

import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.MethodSource;

import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -46,23 +65,34 @@
import static io.servicetalk.concurrent.api.Single.succeeded;
import static io.servicetalk.concurrent.api.SourceAdapters.fromSource;
import static io.servicetalk.concurrent.api.SourceAdapters.toSource;
import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION;
import static io.servicetalk.concurrent.internal.TestTimeoutConstants.DEFAULT_TIMEOUT_SECONDS;
import static io.servicetalk.http.api.Http2ErrorCode.CANCEL;
import static io.servicetalk.http.api.HttpExecutionStrategies.defaultStrategy;
import static io.servicetalk.http.api.HttpExecutionStrategies.offloadNone;
import static io.servicetalk.http.api.HttpHeaderNames.CONTENT_LENGTH;
import static io.servicetalk.http.api.HttpHeaderValues.ZERO;
import static io.servicetalk.http.api.HttpResponseStatus.INTERNAL_SERVER_ERROR;
import static io.servicetalk.http.api.HttpResponseStatus.NO_CONTENT;
import static io.servicetalk.http.api.HttpResponseStatus.OK;
import static io.servicetalk.http.api.HttpSerializers.appSerializerAsciiFixLen;
import static io.servicetalk.http.netty.BuilderUtils.newClientBuilder;
import static io.servicetalk.http.netty.BuilderUtils.newClientBuilderWithConfigs;
import static io.servicetalk.http.netty.BuilderUtils.newServerBuilderWithConfigs;
import static io.servicetalk.http.netty.BuilderUtils.newServerBuilder;
import static io.servicetalk.http.netty.GracefulConnectionClosureHandlingTest.RAW_STRING_SERIALIZER;
import static io.servicetalk.test.resources.TestUtils.assertNoAsyncErrors;
import static io.servicetalk.transport.netty.internal.CloseHandler.CloseEvent.CHANNEL_CLOSED_INBOUND;
import static java.lang.Long.MAX_VALUE;
import static java.nio.charset.StandardCharsets.US_ASCII;
import static java.time.Duration.ofMillis;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assumptions.assumeTrue;

/**
* Test the following scenario:
Expand Down Expand Up @@ -208,7 +238,7 @@ public void onComplete() {

private void test(HttpServerFactory serverFactory, boolean serverHasOffloading, boolean drainRequestPayloadBody,
boolean responseHasPayload) throws Exception {
try (HttpServerContext serverContext = serverFactory.create(newServerBuilderWithConfigs(SERVER_CTX)
try (HttpServerContext serverContext = serverFactory.create(newServerBuilder(SERVER_CTX)
.executionStrategy(serverHasOffloading ? defaultStrategy() : offloadNone())
.drainRequestPayloadBody(drainRequestPayloadBody));
StreamingHttpClient client = newClientBuilderWithConfigs(serverContext, CLIENT_CTX,
Expand All @@ -232,6 +262,121 @@ private void test(HttpServerFactory serverFactory, boolean serverHasOffloading,
assertNoAsyncErrors(asyncErrors);
}

@Timeout(2)
@ParameterizedTest(name = "{displayName} [{index}] protocol={0} serviceApi={1} serverHasOffloading={2}")
@MethodSource("data")
void testMetaDataError(HttpProtocol protocol, ServiceApi serviceApi,
boolean serverHasOffloading) throws Exception {
try (HttpServerContext serverContext = serviceApi.create(newServerBuilder(SERVER_CTX, protocol)
.executionStrategy(serverHasOffloading ? defaultStrategy() : offloadNone()));
BlockingHttpClient client = newClientBuilder(serverContext, CLIENT_CTX, protocol).buildBlocking()) {
IOException e = assertThrows(IOException.class, () -> {
switch (protocol) {
case HTTP_1:
// \r\n is illegal inside header values
client.request(client.get("/").setHeader("some-header", "invalid\r\nvalue"));
break;
case HTTP_2:
// TRACE methods can not have content-length header
client.request(client.trace("/").setHeader(CONTENT_LENGTH, ZERO));
break;
default:
throw new AssertionError("Unexpected protocol: " + protocol);
}
});
switch (protocol) {
case HTTP_1:
if (e instanceof CloseEventObservedException) {
assertThat(((CloseEventObservedException) e).event(), is(CHANNEL_CLOSED_INBOUND));
} else {
assertThat(e, instanceOf(IOException.class));
}
break;
case HTTP_2:
assertThat(e, instanceOf(Http2Exception.class));
assertThat(((Http2Exception) e).errorCode(), is(CANCEL));
break;
default:
throw new AssertionError("Unexpected protocol: " + protocol);
}
}
}

@ParameterizedTest(name = "{displayName} [{index}] protocol={0} serviceApi={1} serverHasOffloading={2}")
@MethodSource("data")
void testPayloadBodyError(HttpProtocol protocol, ServiceApi serviceApi,
boolean serverHasOffloading) throws Exception {
assumeTrue(serviceApi != ServiceApi.BLOCKING_STREAMING || serverHasOffloading,
"BLOCKING_STREAMING service can deadlock without offloading");
try (HttpServerContext serverContext = serviceApi.create(newServerBuilder(SERVER_CTX, protocol)
.executionStrategy(serverHasOffloading ? defaultStrategy() : offloadNone())
.appendServiceFilter(new StreamingHttpServiceFilterFactory() {
@Override
public StreamingHttpServiceFilter create(StreamingHttpService service) {
return new StreamingHttpServiceFilter(service) {
@Override
public Single<StreamingHttpResponse> handle(HttpServiceContext ctx,
StreamingHttpRequest request,
StreamingHttpResponseFactory responseFactory) {
return delegate().handle(ctx, request.transformMessageBody(publisher -> publisher
.map(item -> {
throw DELIBERATE_EXCEPTION;
})), responseFactory);
}
};
}

@Override
public HttpExecutionStrategy requiredOffloads() {
return offloadNone();
}
}));
StreamingHttpClient client = newClientBuilder(serverContext, CLIENT_CTX, protocol).buildStreaming()) {
Throwable e = assertThrows(Throwable.class, () -> {
StreamingHttpResponse response = client.request(client.post("/")
.payloadBody(from("content"), appSerializerAsciiFixLen()))
.toFuture().get();
response.payloadBody().toFuture().get();
// Aggregated API can return 500
throw new ExecutionException(new HttpResponseException("Response complete", response));
});
assertThat(e, instanceOf(ExecutionException.class));
e = e.getCause();
switch (protocol) {
case HTTP_1:
assertThat(e, anyOf(instanceOf(CloseEventObservedException.class), instanceOf(IOException.class),
instanceOf(HttpResponseException.class)));
if (e instanceof CloseEventObservedException) {
assertThat(((CloseEventObservedException) e).event(), is(CHANNEL_CLOSED_INBOUND));
}
break;
case HTTP_2:
assertThat(e, anyOf(instanceOf(ClosedChannelException.class), instanceOf(Http2Exception.class),
instanceOf(HttpResponseException.class)));
if (e instanceof Http2Exception) {
assertThat(((Http2Exception) e).errorCode(), is(CANCEL));
}
break;
default:
throw new AssertionError("Unexpected protocol: " + protocol);
}
if (e instanceof HttpResponseException) {
assertThat(((HttpResponseException) e).metaData().status(), is(INTERNAL_SERVER_ERROR));
}
}
}

private static List<Arguments> data() {
List<Arguments> data = new ArrayList<>();
for (HttpProtocol protocol : HttpProtocol.values()) {
for (ServiceApi api : ServiceApi.values()) {
data.add(Arguments.of(protocol, api, true));
data.add(Arguments.of(protocol, api, false));
}
}
return data;
}

private static Future<StreamingHttpResponse> requestFuture(StreamingHttpConnection connection, String name) {
return connection.request(connection.post('/' + name)
.payloadBody(connection.executionContext().executor().timer(ofMillis(50))
Expand Down Expand Up @@ -264,11 +409,39 @@ private interface HttpServerFactory {
HttpServerContext create(HttpServerBuilder builder) throws Exception;
}

private static final class StacklessException extends Exception {
private static final long serialVersionUID = 6439192160547836620L;

StacklessException(String msg) {
super(msg, null, false, false);
private enum ServiceApi implements HttpServerFactory {
ASYNC_AGGREGATED {
@Override
public HttpServerContext create(HttpServerBuilder builder) throws Exception {
return builder.listenAndAwait((ctx, request, responseFactory) ->
succeeded(responseFactory.ok().payloadBody(request.payloadBody())));
}
},
ASYNC_STREAMING {
@Override
public HttpServerContext create(HttpServerBuilder builder) throws Exception {
return builder.listenStreamingAndAwait((ctx, request, responseFactory) ->
succeeded(responseFactory.ok().payloadBody(request.payloadBody())));
}
},
BLOCKING_AGGREGATED {
@Override
public HttpServerContext create(HttpServerBuilder builder) throws Exception {
return builder.listenBlockingAndAwait((ctx, request, responseFactory) ->
responseFactory.ok().payloadBody(request.payloadBody()));
}
},
BLOCKING_STREAMING {
@Override
public HttpServerContext create(HttpServerBuilder builder) throws Exception {
return builder.listenBlockingStreamingAndAwait((ctx, request, response) -> {
try (HttpPayloadWriter<Buffer> writer = response.sendMetaData()) {
for (Buffer chunk : request.payloadBody()) {
writer.write(chunk);
}
}
});
}
}
}
}

0 comments on commit 6d65dd6

Please sign in to comment.