-
Notifications
You must be signed in to change notification settings - Fork 183
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
http-utils: fix leak in AbstractTimeoutHttpFilter #3038
Changes from 4 commits
8abe3f0
c545560
b2ab87e
62041a0
63e8285
5736f0b
25cf35f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,9 +15,12 @@ | |
*/ | ||
package io.servicetalk.http.utils; | ||
|
||
import io.servicetalk.concurrent.Cancellable; | ||
import io.servicetalk.concurrent.Executor; | ||
import io.servicetalk.concurrent.SingleSource; | ||
import io.servicetalk.concurrent.TimeSource; | ||
import io.servicetalk.concurrent.api.Single; | ||
import io.servicetalk.concurrent.internal.CancelImmediatelySubscriber; | ||
import io.servicetalk.http.api.HttpExecutionStrategies; | ||
import io.servicetalk.http.api.HttpExecutionStrategy; | ||
import io.servicetalk.http.api.HttpExecutionStrategyInfluencer; | ||
|
@@ -28,11 +31,13 @@ | |
|
||
import java.time.Duration; | ||
import java.util.concurrent.TimeoutException; | ||
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; | ||
import java.util.function.BiFunction; | ||
import java.util.function.Function; | ||
import javax.annotation.Nullable; | ||
|
||
import static io.servicetalk.concurrent.api.Publisher.defer; | ||
import static io.servicetalk.concurrent.api.SourceAdapters.toSource; | ||
import static io.servicetalk.http.api.HttpContextKeys.HTTP_EXECUTION_STRATEGY_KEY; | ||
import static io.servicetalk.utils.internal.DurationUtils.ensurePositive; | ||
import static java.time.Duration.ofNanos; | ||
|
@@ -117,7 +122,9 @@ final Single<StreamingHttpResponse> withTimeout(final StreamingHttpRequest reque | |
final Duration timeout = timeoutForRequest.apply(request, useForTimeout); | ||
Single<StreamingHttpResponse> response = responseFunction.apply(request); | ||
if (null != timeout) { | ||
final Single<StreamingHttpResponse> timeoutResponse = response.timeout(timeout, useForTimeout); | ||
final Single<StreamingHttpResponse> timeoutResponse = response | ||
.<StreamingHttpResponse>liftSync(CleanupSubscriber::new) | ||
.timeout(timeout, useForTimeout); | ||
|
||
if (fullRequestResponse) { | ||
final long deadline = useForTimeout.currentTime(NANOSECONDS) + timeout.toNanos(); | ||
|
@@ -151,6 +158,62 @@ private Executor contextExecutor(final HttpRequestMetaData requestMetaData, | |
context.executor() : context.ioExecutor(); | ||
} | ||
|
||
// This is responsible for ensuring that we don't abandon the response resources due to the use of Single.timeout. | ||
// It does so by retaining a reference to the StreamingHttpResponse in case we receive a cancellation. It will | ||
// then attempt to drain the resource. We're protected from multiple subscribes because the TimeoutSingle will | ||
// short circuit also on upstream cancellation, although that is not obviously correct behavior. | ||
private static final class CleanupSubscriber implements SingleSource.Subscriber<StreamingHttpResponse> { | ||
idelpivnitskiy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
private static final AtomicReferenceFieldUpdater<CleanupSubscriber, Object> stateUpdater = | ||
AtomicReferenceFieldUpdater.newUpdater(CleanupSubscriber.class, Object.class, "state"); | ||
|
||
private static final String COMPLETE = "complete"; | ||
|
||
private final SingleSource.Subscriber<? super StreamingHttpResponse> delegate; | ||
@Nullable | ||
private volatile Object state; | ||
|
||
CleanupSubscriber(final SingleSource.Subscriber<? super StreamingHttpResponse> delegate) { | ||
this.delegate = delegate; | ||
} | ||
|
||
@Override | ||
public void onSubscribe(Cancellable cancellable) { | ||
delegate.onSubscribe(() -> { | ||
try { | ||
Object current = stateUpdater.getAndSet(this, COMPLETE); | ||
if (current instanceof StreamingHttpResponse) { | ||
clean((StreamingHttpResponse) current); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if we've propagated the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The TL;DR is that if we get a cancel it passed through the timeout operator and 'won', therefore the message will never be propagated. I added clarifying comments and pointed to the test that ensures the behavior. |
||
} | ||
} finally { | ||
cancellable.cancel(); | ||
} | ||
}); | ||
} | ||
|
||
@Override | ||
public void onSuccess(@Nullable StreamingHttpResponse result) { | ||
assert result != null; | ||
if (stateUpdater.compareAndSet(this, null, result)) { | ||
// We win. | ||
delegate.onSuccess(result); | ||
} else { | ||
// we lost to cancellation. No need to send it forward. | ||
clean(result); | ||
} | ||
} | ||
|
||
@Override | ||
public void onError(Throwable t) { | ||
assert !(state instanceof StreamingHttpResponse); | ||
delegate.onError(t); | ||
} | ||
|
||
private void clean(StreamingHttpResponse httpResponse) { | ||
toSource(httpResponse.messageBody()).subscribe(CancelImmediatelySubscriber.INSTANCE); | ||
} | ||
} | ||
|
||
private static final class MappedTimeoutException extends TimeoutException { | ||
private static final long serialVersionUID = -8230476062001221272L; | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feels like we need something similar inside
JavaNetSoTimeoutHttpConnectionFilter
as well. In that case, iftimeout
deliversonError
, amb operator will do the cancel for the response and then we are in the same situation of response racing with cancel.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about in a followup?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Works for me