Skip to content

Commit

Permalink
Make exception mapper filters public (#2050)
Browse files Browse the repository at this point in the history
Motivation:

Some users want to map exceptions earlier in the request processing
pipeline in order to observe a real HTTP status code in other filters.

Modifications:

- Move `DefaultHttpServerBuilder.ExceptionMapperServiceFilter` ->
`io.servicetalk.http.api.HttpExceptionMapperServiceFilter`;
- Move `io.servicetalk.grpc.netty.CatchAllHttpServiceFilter` ->
`io.servicetalk.grpc.api.GrpcExceptionMapperServiceFilter`;
- Rename `CatchAllHttpServiceFilterTest` ->
`GrpcExceptionMapperServiceFilterTest`;
- Remove `io.servicetalk.grpc.netty.GrpcUtils` duplicate;
- Add logging in `GrpcExceptionMapperServiceFilter`;

Result:

Users have access to our exception mapper implementations and can move
them around other filters in the processing pipeline.
  • Loading branch information
idelpivnitskiy committed Jan 13, 2022
1 parent 06a3969 commit 6df6952
Show file tree
Hide file tree
Showing 6 changed files with 206 additions and 130 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright © 2021-2022 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.grpc.api;

import io.servicetalk.concurrent.api.Single;
import io.servicetalk.http.api.HttpExecutionStrategy;
import io.servicetalk.http.api.HttpExecutionStrategyInfluencer;
import io.servicetalk.http.api.HttpServiceContext;
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.http.api.StreamingHttpResponseFactory;
import io.servicetalk.http.api.StreamingHttpService;
import io.servicetalk.http.api.StreamingHttpServiceFilter;
import io.servicetalk.http.api.StreamingHttpServiceFilterFactory;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static io.servicetalk.concurrent.api.Single.succeeded;
import static io.servicetalk.grpc.api.GrpcStatusCode.fromCodeValue;
import static io.servicetalk.grpc.api.GrpcUtils.GRPC_STATUS_CODE_TRAILER;
import static io.servicetalk.grpc.api.GrpcUtils.newErrorResponse;

/**
* Filter that maps known {@link Throwable} subtypes into an gRPC response with an appropriate {@link GrpcStatusCode}.
* <p>
* This filter is recommended to be placed as early as possible to make sure it captures all exceptions that may be
* generated by other filters.
*/
public final class GrpcExceptionMapperServiceFilter implements StreamingHttpServiceFilterFactory,
HttpExecutionStrategyInfluencer {

private static final Logger LOGGER = LoggerFactory.getLogger(GrpcExceptionMapperServiceFilter.class);

/**
* Instance of {@link GrpcExceptionMapperServiceFilter}.
*/
public static final StreamingHttpServiceFilterFactory INSTANCE = new GrpcExceptionMapperServiceFilter();

private GrpcExceptionMapperServiceFilter() {
// Singleton
}

@Override
public StreamingHttpServiceFilter create(final StreamingHttpService service) {
return new StreamingHttpServiceFilter(service) {

@Override
public Single<StreamingHttpResponse> handle(final HttpServiceContext ctx,
final StreamingHttpRequest request,
final StreamingHttpResponseFactory responseFactory) {
final Single<StreamingHttpResponse> handle;
try {
handle = delegate().handle(ctx, request, responseFactory);
} catch (Throwable cause) {
return succeeded(convertToGrpcErrorResponse("handle(...)", ctx, request, responseFactory,
cause));
}
return handle.onErrorReturn(cause ->
convertToGrpcErrorResponse("response", ctx, request, responseFactory, cause));
}
};
}

private static StreamingHttpResponse convertToGrpcErrorResponse(final String what,
final HttpServiceContext ctx,
final StreamingHttpRequest request,
final StreamingHttpResponseFactory responseFactory,
final Throwable cause) {
final StreamingHttpResponse response = newErrorResponse(responseFactory, null, null, cause,
ctx.executionContext().bufferAllocator());
final CharSequence codeValue = response.headers().get(GRPC_STATUS_CODE_TRAILER);
assert codeValue != null;
LOGGER.error("Unexpected exception during a {} processing for connection={}, request='{} {} {}' was mapped " +
"to grpc-status: {} ({})", what, ctx, request.method(), request.requestTarget(),
request.version(), codeValue, fromCodeValue(codeValue), cause);
return response;
}

@Override
public HttpExecutionStrategy influenceStrategy(final HttpExecutionStrategy strategy) {
// No influence since we do not block
return strategy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,11 @@
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.grpc.internal.DeadlineUtils;
import io.servicetalk.http.api.HttpExecutionStrategy;
import io.servicetalk.http.api.HttpExecutionStrategyInfluencer;
import io.servicetalk.http.api.HttpLifecycleObserver;
import io.servicetalk.http.api.HttpProtocolConfig;
import io.servicetalk.http.api.HttpRequest;
import io.servicetalk.http.api.HttpServerBuilder;
import io.servicetalk.http.api.HttpServiceContext;
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.http.api.StreamingHttpResponseFactory;
import io.servicetalk.http.api.StreamingHttpService;
import io.servicetalk.http.api.StreamingHttpServiceFilter;
import io.servicetalk.http.api.StreamingHttpServiceFilterFactory;
import io.servicetalk.logging.api.LogLevel;
import io.servicetalk.transport.api.ConnectionAcceptor;
Expand All @@ -50,9 +44,7 @@
import java.util.function.BooleanSupplier;
import java.util.function.Predicate;

import static io.servicetalk.concurrent.api.Single.succeeded;
import static io.servicetalk.concurrent.internal.FutureUtils.awaitResult;
import static io.servicetalk.grpc.api.GrpcUtils.newErrorResponse;

/**
* A builder for building a <a href="https://www.grpc.io">gRPC</a> server.
Expand Down Expand Up @@ -600,46 +592,6 @@ protected static void appendCatchAllFilter(HttpServerBuilder httpServerBuilder)
// TODO(dj): Move to DefaultGrpcServerBuilder
// This code depends on GrpcUtils which is inaccessible from the servicetalk-grpc-netty module.
// When this class is converted to an interface we can also refactor that part.
httpServerBuilder.appendServiceFilter(CatchAllHttpServiceFilter.INSTANCE);
}

static final class CatchAllHttpServiceFilter implements StreamingHttpServiceFilterFactory,
HttpExecutionStrategyInfluencer {

static final StreamingHttpServiceFilterFactory INSTANCE = new CatchAllHttpServiceFilter();

private CatchAllHttpServiceFilter() {
// Singleton
}

@Override
public StreamingHttpServiceFilter create(final StreamingHttpService service) {
return new StreamingHttpServiceFilter(service) {

@Override
public Single<StreamingHttpResponse> handle(final HttpServiceContext ctx,
final StreamingHttpRequest request,
final StreamingHttpResponseFactory responseFactory) {
final Single<StreamingHttpResponse> handle;
try {
handle = delegate().handle(ctx, request, responseFactory);
} catch (Throwable cause) {
return succeeded(convertToGrpcErrorResponse(ctx, responseFactory, cause));
}
return handle.onErrorReturn(cause -> convertToGrpcErrorResponse(ctx, responseFactory, cause));
}
};
}

private static StreamingHttpResponse convertToGrpcErrorResponse(
final HttpServiceContext ctx, final StreamingHttpResponseFactory responseFactory,
final Throwable cause) {
return newErrorResponse(responseFactory, null, null, cause, ctx.executionContext().bufferAllocator());
}

@Override
public HttpExecutionStrategy influenceStrategy(final HttpExecutionStrategy strategy) {
return strategy; // no influence since we do not block
}
httpServerBuilder.appendServiceFilter(GrpcExceptionMapperServiceFilter.INSTANCE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
final class GrpcUtils {
// TODO could/should add "+proto"
private static final CharSequence GRPC_CONTENT_TYPE = newAsciiString("application/grpc");
private static final CharSequence GRPC_STATUS_CODE_TRAILER = newAsciiString("grpc-status");
static final CharSequence GRPC_STATUS_CODE_TRAILER = newAsciiString("grpc-status");
private static final CharSequence GRPC_STATUS_DETAILS_TRAILER = newAsciiString("grpc-status-details-bin");
private static final CharSequence GRPC_STATUS_MESSAGE_TRAILER = newAsciiString("grpc-message");
// TODO (nkant): add project version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.grpc.api;
package io.servicetalk.grpc.netty;

import io.servicetalk.grpc.api.GrpcServerBuilder.CatchAllHttpServiceFilter;
import io.servicetalk.grpc.api.GrpcExceptionMapperServiceFilter;

import org.junit.jupiter.api.Test;

import static io.servicetalk.http.netty.AsyncContextHttpFilterVerifier.verifyServerFilterAsyncContextVisibility;

class CatchAllHttpServiceFilterTest {
class GrpcExceptionMapperServiceFilterTest {

@Test
void verifyAsyncContext() throws Exception {
verifyServerFilterAsyncContextVisibility(CatchAllHttpServiceFilter.INSTANCE);
verifyServerFilterAsyncContextVisibility(GrpcExceptionMapperServiceFilter.INSTANCE);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright © 2022 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.http.api;

import io.servicetalk.concurrent.api.Single;
import io.servicetalk.serialization.api.SerializationException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.RejectedExecutionException;

import static io.servicetalk.concurrent.api.Single.failed;
import static io.servicetalk.http.api.HttpHeaderNames.CONTENT_LENGTH;
import static io.servicetalk.http.api.HttpHeaderValues.ZERO;
import static io.servicetalk.http.api.HttpResponseStatus.INTERNAL_SERVER_ERROR;
import static io.servicetalk.http.api.HttpResponseStatus.SERVICE_UNAVAILABLE;
import static io.servicetalk.http.api.HttpResponseStatus.UNSUPPORTED_MEDIA_TYPE;

/**
* Filter that maps known {@link Exception} subtypes into an HTTP response with an appropriate
* {@link HttpResponseStatus}.
* <p>
* This filter is recommended to be placed as early as possible to make sure it captures all exceptions that may be
* generated by other filters.
*/
public final class HttpExceptionMapperServiceFilter implements StreamingHttpServiceFilterFactory,
HttpExecutionStrategyInfluencer {

/**
* Instance of {@link HttpExceptionMapperServiceFilter}.
*/
public static final StreamingHttpServiceFilterFactory INSTANCE = new HttpExceptionMapperServiceFilter();

private static final Logger LOGGER = LoggerFactory.getLogger(HttpExceptionMapperServiceFilter.class);

private HttpExceptionMapperServiceFilter() {
// Singleton
}

@Override
public StreamingHttpServiceFilter create(final StreamingHttpService service) {
return new StreamingHttpServiceFilter(service) {
@Override
public Single<StreamingHttpResponse> handle(final HttpServiceContext ctx,
final StreamingHttpRequest request,
final StreamingHttpResponseFactory responseFactory) {
Single<StreamingHttpResponse> respSingle;
try {
respSingle = delegate().handle(ctx, request, responseFactory);
} catch (Throwable cause) {
respSingle = failed(cause);
}
return respSingle.onErrorReturn(cause -> newErrorResponse(cause, ctx, request, responseFactory));
}
};
}

private static StreamingHttpResponse newErrorResponse(final Throwable cause,
final HttpServiceContext ctx,
final StreamingHttpRequest request,
final StreamingHttpResponseFactory responseFactory) {
final HttpResponseStatus status;
if (cause instanceof RejectedExecutionException) {
status = SERVICE_UNAVAILABLE;
LOGGER.error("Task rejected by service processing for connection={}, request='{} {} {}'. Returning: {}",
ctx, request.method(), request.requestTarget(), request.version(), status, cause);
} else if (cause instanceof SerializationException) {
// It is assumed that a failure occurred when attempting to deserialize the request.
status = UNSUPPORTED_MEDIA_TYPE;
LOGGER.error("Failed to deserialize or serialize for connection={}, request='{} {} {}'. Returning: {}",
ctx, request.method(), request.requestTarget(), request.version(), status, cause);
} else {
status = INTERNAL_SERVER_ERROR;
LOGGER.error("Unexpected exception during service processing for connection={}, request='{} {} {}'. " +
"Trying to return: {}", ctx, request.method(), request.requestTarget(), request.version(),
status, cause);
}
return responseFactory.newResponse(status).setHeader(CONTENT_LENGTH, ZERO);
}

@Override
public HttpExecutionStrategy influenceStrategy(final HttpExecutionStrategy strategy) {
return strategy; // no influence since we do not block
}
}
Loading

0 comments on commit 6df6952

Please sign in to comment.