Skip to content

Commit

Permalink
gRPC Trailers-Only response doesn't mark a streaming request as finis…
Browse files Browse the repository at this point in the history
…hed (#2313)

Motivation:

If a server returns a `Trailers-Only` response for a streaming request, we
never drain the response message-body, because `concat` never subscribes
to the next source if the first source fails. As the result, concurrency
controller won't mark the request as finished. If the request publisher
never completes we will start leaking connections.

Modifications:

- Enhance `TrailersOnlyErrorTest` to cover blocking client and make sure
the HTTP response terminates;
- In case of an error in `Trailers-Only` response, subscribe and cancel
the response message body to abort the request;
- Always check that `Trailers-Only` response doesn't have any further
frames on the wire;

Result:

gRPC response correctly terminates even if client receives a
`Trailers-Only` response.
  • Loading branch information
idelpivnitskiy authored Aug 10, 2022
1 parent a99f319 commit 986a80c
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public final <E extends Throwable> Completable onErrorComplete(Class<E> type) {
* }
* }
* }</pre>
* @param predicate returns {@code true} if the {@link Throwable} should be transformed to and
* @param predicate returns {@code true} if the {@link Throwable} should be transformed to an
* {@link Subscriber#onComplete()} signal. Returns {@code false} to propagate the error.
* @return A {@link Completable} which transform errors emitted on this {@link Completable} which match
* {@code predicate} into a {@link Subscriber#onComplete()} signal (e.g. swallows the error).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public <Req, Resp> StreamingClientCall<Req, Resp> newStreamingCall(
.flatMapPublisher(response -> validateResponseAndGetPayload(response, responseContentType,
streamingHttpClient.executionContext().bufferAllocator(), readGrpcMessageEncodingRaw(
response.headers(), deserializerIdentity, deserializers,
GrpcStreamingDeserializer::messageEncoding)))
GrpcStreamingDeserializer::messageEncoding), httpRequest.requestTarget()))
.onErrorMap(GrpcUtils::toGrpcException);
};
}
Expand Down Expand Up @@ -288,7 +288,7 @@ public <Req, Resp> BlockingStreamingClientCall<Req, Resp> newBlockingStreamingCa
return validateResponseAndGetPayload(response.toStreamingResponse(), responseContentType,
client.executionContext().bufferAllocator(), readGrpcMessageEncodingRaw(
response.headers(), deserializerIdentity, deserializers,
GrpcStreamingDeserializer::messageEncoding)).toIterable();
GrpcStreamingDeserializer::messageEncoding), httpRequest.requestTarget()).toIterable();
} catch (Throwable cause) {
throw toGrpcException(cause);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.servicetalk.buffer.api.Buffer;
import io.servicetalk.buffer.api.BufferAllocator;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.encoding.api.BufferDecoder;
import io.servicetalk.encoding.api.BufferDecoderGroup;
Expand Down Expand Up @@ -79,6 +80,7 @@
import static io.servicetalk.grpc.api.GrpcStatusCode.CANCELLED;
import static io.servicetalk.grpc.api.GrpcStatusCode.DEADLINE_EXCEEDED;
import static io.servicetalk.grpc.api.GrpcStatusCode.FAILED_PRECONDITION;
import static io.servicetalk.grpc.api.GrpcStatusCode.INTERNAL;
import static io.servicetalk.grpc.api.GrpcStatusCode.INVALID_ARGUMENT;
import static io.servicetalk.grpc.api.GrpcStatusCode.PERMISSION_DENIED;
import static io.servicetalk.grpc.api.GrpcStatusCode.UNAUTHENTICATED;
Expand Down Expand Up @@ -112,6 +114,8 @@
import static java.util.Objects.requireNonNull;

final class GrpcUtils {

private static final Logger LOGGER = LoggerFactory.getLogger(GrpcUtils.class);
private static final GrpcStatus STATUS_OK = GrpcStatus.fromCodeValue(GrpcStatusCode.OK.value());
private static final BufferDecoderGroup EMPTY_BUFFER_DECODER_GROUP = new BufferDecoderGroupBuilder().build();

Expand Down Expand Up @@ -302,7 +306,8 @@ private static void validateStatusCode(HttpResponseStatus status) {
static <Resp> Publisher<Resp> validateResponseAndGetPayload(final StreamingHttpResponse response,
final CharSequence expectedContentType,
final BufferAllocator allocator,
final GrpcStreamingDeserializer<Resp> deserializer) {
final GrpcStreamingDeserializer<Resp> deserializer,
final String httpPath) {
validateStatusCode(response.status()); // gRPC protocol requires 200, don't look further if this check fails.
// In case of an empty response, gRPC-server may return only one HEADER frame with endStream=true. Our
// HTTP1-based implementation translates them into response headers so we need to look for a grpc-status in both
Expand All @@ -312,13 +317,36 @@ static <Resp> Publisher<Resp> validateResponseAndGetPayload(final StreamingHttpR
validateContentType(headers, expectedContentType);
final GrpcStatusCode grpcStatusCode = extractGrpcStatusCodeFromHeaders(headers);
if (grpcStatusCode != null) {
// Drain the response messageBody to make sure concurrency controller marks the request as finished.
// In case the grpc-status is received in headers, we expect an empty messageBody, draining should not see
// any other frames. However, the messageBody won't complete until after the request stream completes too.
final Completable drainResponse = response.messageBody().beforeOnNext(frame -> {
throw new GrpcStatus(INTERNAL, null, "Violation of the protocol: received unexpected " +
(frame instanceof HttpHeaders ? "Trailers" : "Data") +
"frame after Trailers-Only response is received with grpc-status: " +
grpcStatusCode.value() + '(' + grpcStatusCode + ')').asException();
}).ignoreElements();
final GrpcStatusException grpcStatusException = convertToGrpcStatusException(grpcStatusCode, headers);
if (grpcStatusException != null) {
// Give priority to the error if it happens, to allow delayed requests or streams to terminate.
return Publisher.<Resp>failed(grpcStatusException)
.concat(response.messageBody().ignoreElements());
// In case of an error, we cannot concat GrpcStatusException after drainResponse because users may never
// see the exception if the request publisher never terminates, or they may see a TimeoutException that
// will hide the original exception. Therefore, we have to return an error asap and then immediately
// subscribe & cancel the drainResponse. Cancellation is necessary to prevent sending large request
// payloads over network when server returns an error.
return Publisher.<Resp>failed(grpcStatusException).afterOnError(__ -> {
// Because we subscribe asynchronously, users won't receive any further errors from drainResponse.
// Instead, we log those errors for visibility. Use onErrorComplete instead of whenOnError to avoid
// logging the same exception twice inside SimpleCompletableSubscriber.
drainResponse.onErrorComplete(t -> {
LOGGER.error("Unexpected error while asynchronously draining a Trailers-Only response for {}",
httpPath, t);
return true;
}).subscribe().cancel();
});
} else {
return response.messageBody().ignoreElements().toPublisher();
// In case of OK, return drainResponse to make sure the full request is transmitted to the server before
// we terminate the response publisher.
return drainResponse.toPublisher();
}
}

Expand Down
Loading

0 comments on commit 986a80c

Please sign in to comment.