Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Racy cancel from transport does not always cancel the message body #2369

Merged
merged 11 commits into from
Oct 4, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import static io.servicetalk.http.netty.HeaderUtils.canAddRequestContentLength;
import static io.servicetalk.http.netty.HeaderUtils.emptyMessageBody;
import static io.servicetalk.http.netty.HeaderUtils.flatEmptyMessage;
import static io.servicetalk.http.netty.HeaderUtils.flatMessage;
import static io.servicetalk.http.netty.HeaderUtils.setRequestContentLength;
import static io.servicetalk.http.netty.HeaderUtils.shouldAppendTrailers;
import static io.servicetalk.transport.netty.internal.FlushStrategies.flushOnEnd;
Expand Down Expand Up @@ -166,12 +167,13 @@ public Single<StreamingHttpResponse> request(final StreamingHttpRequest request)
if (canAddRequestContentLength(request)) {
flatRequest = setRequestContentLength(connectionContext().protocol(), request);
} else {
if (emptyMessageBody(request, request.messageBody())) {
flatRequest = flatEmptyMessage(connectionContext().protocol(), request, request.messageBody());
final Publisher<Object> messageBody = request.messageBody();
if (emptyMessageBody(request, messageBody)) {
flatRequest = flatEmptyMessage(connectionContext().protocol(), request, messageBody);
} else {
// Defer subscribe to the messageBody until transport requests it to allow clients retry failed
// requests with non-replayable messageBody
flatRequest = Single.<Object>succeeded(request).concat(request.messageBody(), true);
flatRequest = flatMessage(request, messageBody, true);
if (shouldAppendTrailers(connectionContext().protocol(), request)) {
flatRequest = flatRequest.scanWith(HeaderUtils::appendTrailersMapper);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
import io.servicetalk.buffer.api.CharSequences;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.ScanWithMapper;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.SingleTerminalSignalConsumer;
import io.servicetalk.concurrent.api.TerminalSignalConsumer;
import io.servicetalk.concurrent.internal.CancelImmediatelySubscriber;
import io.servicetalk.http.api.EmptyHttpHeaders;
import io.servicetalk.http.api.HttpHeaderNames;
import io.servicetalk.http.api.HttpHeaders;
Expand All @@ -40,6 +44,7 @@
import static io.servicetalk.concurrent.api.Publisher.empty;
import static io.servicetalk.concurrent.api.Publisher.from;
import static io.servicetalk.concurrent.api.Publisher.fromIterable;
import static io.servicetalk.concurrent.api.SourceAdapters.toSource;
import static io.servicetalk.http.api.HeaderUtils.isTransferEncodingChunked;
import static io.servicetalk.http.api.HttpApiConversions.isPayloadEmpty;
import static io.servicetalk.http.api.HttpApiConversions.isSafeToAggregate;
Expand Down Expand Up @@ -219,8 +224,55 @@ static Publisher<Object> flatEmptyMessage(final HttpProtocolVersion protocolVers
from(metadata, EmptyHttpHeaders.INSTANCE);
return messageBody == empty() ? flatMessage :
// Subscribe to the messageBody publisher to trigger any applied transformations, but ignore its
// content because the PayloadInfo indicated it's effectively empty and does not contain trailers
flatMessage.concat(messageBody.ignoreElements());
// content because the PayloadInfo indicated it's effectively empty and does not contain trailers.
// Because `concat` won't subscribe to the messageBody in case of cancellation or an error, we use
// afterFinally to guarantee messageBody sees cancel too. Otherwise, observers won't complete exchange.
flatMessage.afterFinally(new TerminalSignalConsumer() {
idelpivnitskiy marked this conversation as resolved.
Show resolved Hide resolved
@Override
public void onComplete() {
// noop, rely on `concat`
}

@Override
public void onError(final Throwable throwable) {
cancelMessageBody();
}

@Override
public void cancel() {
cancelMessageBody();
}

private void cancelMessageBody() {
toSource(messageBody).subscribe(CancelImmediatelySubscriber.INSTANCE);
}
}).concat(messageBody.ignoreElements());
}

static Publisher<Object> flatMessage(final HttpMetaData metadata, final Publisher<Object> messageBody,
final boolean deferSubscribe) {
// Because `concat` won't subscribe to the messageBody in case of cancellation or an error, we use
// afterFinally to guarantee messageBody sees cancel too. Otherwise, observers won't complete exchange.
return Single.<Object>succeeded(metadata).afterFinally(new SingleTerminalSignalConsumer<Object>() {
idelpivnitskiy marked this conversation as resolved.
Show resolved Hide resolved
@Override
public void onSuccess(@Nullable final Object result) {
// noop, rely on `concat`
}

@Override
public void onError(final Throwable throwable) {
cancelMessageBody();
}

@Override
public void cancel() {
cancelMessageBody();
}

private void cancelMessageBody() {
toSource(messageBody).subscribe(CancelImmediatelySubscriber.INSTANCE);
}
}).concat(messageBody, deferSubscribe);
}

private static final class ContentLengthList<T> extends ArrayList<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
import static io.servicetalk.http.netty.HeaderUtils.canAddResponseContentLength;
import static io.servicetalk.http.netty.HeaderUtils.emptyMessageBody;
import static io.servicetalk.http.netty.HeaderUtils.flatEmptyMessage;
import static io.servicetalk.http.netty.HeaderUtils.flatMessage;
import static io.servicetalk.http.netty.HeaderUtils.setResponseContentLength;
import static io.servicetalk.http.netty.HeaderUtils.shouldAppendTrailers;
import static io.servicetalk.http.netty.HttpDebugUtils.showPipeline;
Expand Down Expand Up @@ -453,11 +454,13 @@ private static Publisher<Object> handleResponse(final HttpProtocolVersion protoc
return setResponseContentLength(protocolVersion, response);
} else {
Publisher<Object> flatResponse;
if (emptyMessageBody(response, response.messageBody())) {
flatResponse = flatEmptyMessage(protocolVersion, response, response.messageBody());
final Publisher<Object> messageBody = response.messageBody();
if (emptyMessageBody(response, messageBody)) {
flatResponse = flatEmptyMessage(protocolVersion, response, messageBody);
} else {
// Not necessary to defer subscribe to the messageBody because server does not retry responses
flatResponse = Single.<Object>succeeded(response).concat(response.messageBody());
// Not necessary to defer subscribe to the messageBody because server does not retry responses.
// Threfore, we don't need to replay messageBody.
flatResponse = flatMessage(response, messageBody, false);
if (shouldAppendTrailers(protocolVersion, response)) {
flatResponse = flatResponse.scanWith(HeaderUtils::appendTrailersMapper);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ public Single<StreamingHttpResponse> handle(HttpServiceContext ctx,
verify(serverExchangeObserver, atMostOnce()).onResponse(any(StreamingHttpResponse.class));
verify(serverResponseObserver, atMostOnce()).onResponseDataRequested(anyLong());
verify(serverResponseObserver, atMostOnce()).onResponseComplete();
verify(serverResponseObserver, atMostOnce()).onResponseCancel();
serverInOrder.verify(serverExchangeObserver).onExchangeFinally();
verifyNoMoreInteractions(serverLifecycleObserver, serverExchangeObserver,
serverRequestObserver, serverResponseObserver);
Expand Down