Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Racy cancel from transport does not always cancel the message body #2369

Merged
merged 11 commits into from
Oct 4, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,18 @@ public Single<StreamingHttpResponse> request(final StreamingHttpRequest request)
if (canAddRequestContentLength(request)) {
flatRequest = setRequestContentLength(connectionContext().protocol(), request);
} else {
if (emptyMessageBody(request, request.messageBody())) {
flatRequest = flatEmptyMessage(connectionContext().protocol(), request, request.messageBody());
final Publisher<Object> messageBody = request.messageBody();
// Do not propagate cancel to the messageBody if cancel arrives before meta-data completes. Client-side
// state machine does not depend on termination of the messageBody until after transport subscribes to
// it. It's preferable to avoid subscribe to the messageBody in case of cancellation to allow requests
// with non-replayable messageBody to retry.
if (emptyMessageBody(request, messageBody)) {
flatRequest = flatEmptyMessage(connectionContext().protocol(), request, messageBody,
/* propagateCancel */ false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: /* propagateCancel */ -> do we need this? I think it's safe to assume folks will be able to follow the code in IDE or preferred method to understand parameter meanings (e.g. expectation for java code). The more interesting part is explaining why (e.g. client doesn't need it, but server does).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't hurt to have it either. I frequently read code on GitHub without IDE and share a direct link to interesting places.

} else {
// Defer subscribe to the messageBody until transport requests it to allow clients retry failed
// requests with non-replayable messageBody
flatRequest = Single.<Object>succeeded(request).concat(request.messageBody(), true);
flatRequest = Single.<Object>succeeded(request).concat(messageBody, /* deferSubscribe */ true);
if (shouldAppendTrailers(connectionContext().protocol(), request)) {
flatRequest = flatRequest.scanWith(HeaderUtils::appendTrailersMapper);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,13 @@ static Publisher<Object> setRequestContentLength(final HttpProtocolVersion proto
final StreamingHttpRequest request) {
return setContentLength(request, request.messageBody(),
shouldAddZeroContentLength(request.method()) ? HeaderUtils::updateContentLength :
HeaderUtils::updateRequestContentLengthNonZero, protocolVersion);
HeaderUtils::updateRequestContentLengthNonZero, protocolVersion, /* propagateCancel */ false);
}

static Publisher<Object> setResponseContentLength(final HttpProtocolVersion protocolVersion,
final StreamingHttpResponse response) {
return setContentLength(response, response.messageBody(), HeaderUtils::updateContentLength, protocolVersion);
return setContentLength(response, response.messageBody(), HeaderUtils::updateContentLength, protocolVersion,
/* propagateCancel */ true);
}

private static void updateRequestContentLengthNonZero(final int contentLength, final HttpHeaders headers) {
Expand Down Expand Up @@ -210,16 +211,22 @@ static boolean emptyMessageBody(final HttpMetaData metadata) {
}

static Publisher<Object> flatEmptyMessage(final HttpProtocolVersion protocolVersion,
final HttpMetaData metadata, final Publisher<Object> messageBody) {
final HttpMetaData metadata,
final Publisher<Object> messageBody,
final boolean propagateCancel) {
assert emptyMessageBody(metadata, messageBody);
// HTTP/2 and above can write meta-data as a single frame with endStream=true flag. To check the version, use
// HttpProtocolVersion from ConnectionInfo because HttpMetaData may have different version.
final Publisher<Object> flatMessage =
protocolVersion.major() > 1 || !shouldAppendTrailers(protocolVersion, metadata) ? from(metadata) :
from(metadata, EmptyHttpHeaders.INSTANCE);
return messageBody == empty() ? flatMessage :
// Subscribe to the messageBody publisher to trigger any applied transformations, but ignore its
// content because the PayloadInfo indicated it's effectively empty and does not contain trailers
if (messageBody == empty()) {
return flatMessage;
}
// Subscribe to the messageBody publisher to trigger any applied transformations, but ignore its content because
// the PayloadInfo indicated it's effectively empty and does not contain trailers.
return propagateCancel ?
flatMessage.concatPropagateCancel(messageBody.ignoreElements()) :
flatMessage.concat(messageBody.ignoreElements());
}

Expand All @@ -246,10 +253,11 @@ public boolean equals(Object o) {
private static Publisher<Object> setContentLength(final HttpMetaData metadata,
final Publisher<Object> messageBody,
final BiIntConsumer<HttpHeaders> contentLengthUpdater,
final HttpProtocolVersion protocolVersion) {
final HttpProtocolVersion protocolVersion,
final boolean propagateCancel) {
if (emptyMessageBody(metadata, messageBody)) {
contentLengthUpdater.apply(0, metadata.headers());
return flatEmptyMessage(protocolVersion, metadata, messageBody);
return flatEmptyMessage(protocolVersion, metadata, messageBody, propagateCancel);
}
return messageBody.collect(() -> null, (reduction, item) -> {
if (reduction == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,11 +453,14 @@ private static Publisher<Object> handleResponse(final HttpProtocolVersion protoc
return setResponseContentLength(protocolVersion, response);
} else {
Publisher<Object> flatResponse;
if (emptyMessageBody(response, response.messageBody())) {
flatResponse = flatEmptyMessage(protocolVersion, response, response.messageBody());
final Publisher<Object> messageBody = response.messageBody();
// Ensure cancel is propagated through the messageBody. Otherwise, if cancel from transport races with
// execution of this method and wins, BeforeFinallyHttpOperator won't trigger and observers won't
// complete the exchange.
if (emptyMessageBody(response, messageBody)) {
flatResponse = flatEmptyMessage(protocolVersion, response, messageBody, /* propagateCancel */ true);
} else {
// Not necessary to defer subscribe to the messageBody because server does not retry responses
flatResponse = Single.<Object>succeeded(response).concat(response.messageBody());
flatResponse = Single.<Object>succeeded(response).concatPropagateCancel(messageBody);
if (shouldAppendTrailers(protocolVersion, response)) {
flatResponse = flatResponse.scanWith(HeaderUtils::appendTrailersMapper);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ public Single<StreamingHttpResponse> handle(HttpServiceContext ctx,
verify(serverExchangeObserver, atMostOnce()).onResponse(any(StreamingHttpResponse.class));
verify(serverResponseObserver, atMostOnce()).onResponseDataRequested(anyLong());
verify(serverResponseObserver, atMostOnce()).onResponseComplete();
verify(serverResponseObserver, atMostOnce()).onResponseCancel();
serverInOrder.verify(serverExchangeObserver).onExchangeFinally();
verifyNoMoreInteractions(serverLifecycleObserver, serverExchangeObserver,
serverRequestObserver, serverResponseObserver);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public Single<StreamingHttpResponse> request(StreamingHttpRequest request) {
try {
assertThat("Unexpected exception type", t,
instanceOf(RetryableException.class));
assertThat("Unexpected exception type",
assertThat("Unexpected exception cause type",
t.getCause(), instanceOf(DeliberateException.class));
assertThat("Unexpected subscribe to payload body",
payloadBody.isSubscribed(), is(false));
Expand Down