Skip to content

Commit

Permalink
This is perhaps more robust.
Browse files Browse the repository at this point in the history
  • Loading branch information
bryce-anderson committed Aug 14, 2024
1 parent 4569c6d commit 8ccfdff
Showing 1 changed file with 32 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
import io.servicetalk.concurrent.CompletableSource;
import io.servicetalk.concurrent.SingleSource.Subscriber;
import io.servicetalk.concurrent.TimeSource;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.Executor;
import io.servicetalk.concurrent.api.Processors;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.SourceAdapters;
import io.servicetalk.concurrent.internal.CancelImmediatelySubscriber;
import io.servicetalk.concurrent.internal.DelayedCancellable;
import io.servicetalk.concurrent.internal.ThrowableUtils;
import io.servicetalk.http.api.FilterableStreamingHttpConnection;
import io.servicetalk.http.api.HttpContextKeys;
Expand All @@ -45,7 +47,6 @@
import java.time.Duration;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BiFunction;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -171,16 +172,12 @@ public Single<StreamingHttpResponse> request(final StreamingHttpRequest request)
}
return body;
}))
.<StreamingHttpResponse>liftSync(ResponseTrap::new)
// Defer timeout counter until after the request payload body is complete
.ambWith(SourceAdapters.fromSource(requestProcessor)
.<StreamingHttpResponse>liftSync(subscriber ->
new CustomAmb(subscriber, SourceAdapters.fromSource(requestProcessor)
// Start timeout counter after requestProcessor completes
.concat(Single.<StreamingHttpResponse>never().timeout(timeout, timeoutExecutor)
.onErrorMap(TimeoutException.class, t -> newStacklessSocketTimeoutException(
"Read timed out after " + timeout.toMillis() +
"ms waiting for response meta-data")
.initCause(t))))
.<StreamingHttpResponse>liftSync(CancelTrap::new)
.concat(Completable.<StreamingHttpResponse>never().timeout(timeout, timeoutExecutor))))
// TODO: we can flatten the `map` call into our CustomAmb.
.map(response -> response.transformMessageBody(p -> p.timeout(timeout, timeoutExecutor)
.onErrorMap(TimeoutException.class, t -> newStacklessSocketTimeoutException(
"Read timed out after " + timeout.toMillis() +
Expand All @@ -192,100 +189,58 @@ public Single<StreamingHttpResponse> request(final StreamingHttpRequest request)
};
}

// The purpose of the `ResponseTrap` and `CancelTrap` is to prevent resource leaks and duplicate message body
// subscribes.
// The `CancelTrap` applies post-amb operator and acts to create a choke point for cancellation and
// responses: if cancellation happens first then that will be forwarded and any responses will be cleaned/disposed
// of. This lets the `ResponseTrap` make the assumption that if it gets a `.cancel()` call, it's safe to dispose
// of the response without worrying about races upstream. This is because the cancel originated upstream and the
// door is now latched, or it came from the other branch of the `ambWith` because that branch 'won'.
private static final class ResponseTrap implements Subscriber<StreamingHttpResponse> {
private static final class CustomAmb implements Subscriber<StreamingHttpResponse> {

// The terminal state.
private static final String CANCELLED = "cancelled";

private static final AtomicReferenceFieldUpdater<ResponseTrap, Object> responseUpdater =
AtomicReferenceFieldUpdater.newUpdater(ResponseTrap.class, Object.class, "response");
private final AtomicBoolean once = new AtomicBoolean(); // TODO: flatten.
private final DelayedCancellable requestCancellable = new DelayedCancellable();
private final DelayedCancellable timeoutCancellable = new DelayedCancellable();
private final Subscriber<? super StreamingHttpResponse> delegate;
private final Completable timeout;


@Nullable
private volatile Object response;

public ResponseTrap(final Subscriber<? super StreamingHttpResponse> delegate) {
public CustomAmb(Subscriber<? super StreamingHttpResponse> delegate, Completable timeout) {
this.delegate = delegate;
this.timeout = timeout;
}

@Override
public void onSubscribe(final Cancellable cancellable) {
public void onSubscribe(Cancellable cancellable) {
delegate.onSubscribe(() -> {
try {
Object result = responseUpdater.getAndSet(this, CANCELLED);
if (result instanceof StreamingHttpResponse) {
clean((StreamingHttpResponse) result);
}
} finally {
cancellable.cancel();
if (once()) {
timeoutCancellable.cancel();
requestCancellable.cancel();
}
});
timeoutCancellable.delayedCancellable(timeout.beforeOnError(this::handleTimeout).subscribe());
requestCancellable.delayedCancellable(cancellable);
}

@Override
public void onSuccess(@Nullable StreamingHttpResponse result) {
try {
if (!responseUpdater.compareAndSet(this, null, result == null ? CANCELLED : result)) {
assert response == CANCELLED;
if (result != null) {
clean(result);
}
}
} finally {
delegate.onSuccess(result);
private void handleTimeout(Throwable t) {
// TODO: we can lift the onErrorMap here if we want.
if (once()) {
requestCancellable.cancel();
delegate.onError(t);
}
}

@Override
public void onError(Throwable t) {
delegate.onError(t);
}
}

private static final class CancelTrap implements Subscriber<StreamingHttpResponse> {

private final AtomicBoolean once = new AtomicBoolean();
private final Subscriber<? super StreamingHttpResponse> delegate;

CancelTrap(final Subscriber<? super StreamingHttpResponse> delegate) {
this.delegate = delegate;
}

@Override
public void onSubscribe(final Cancellable cancellable) {
assert !once.get();
delegate.onSubscribe(() -> {
try {
once();
} finally {
cancellable.cancel();
}
});
}

@Override
public void onSuccess(@Nullable StreamingHttpResponse result) {
if (once()) {
// cancel timeout timer
timeoutCancellable.cancel();
delegate.onSuccess(result);
} else if (result != null) {
clean(result);
} else {
if (result != null) {
toSource(result.messageBody()).subscribe(CancelImmediatelySubscriber.INSTANCE);
}
}
}

@Override
public void onError(Throwable t) {
if (once()) {
// cancel timeout timer.
timeoutCancellable.cancel();
delegate.onError(t);
} else {
// anything to do here?
}
}

Expand Down

0 comments on commit 8ccfdff

Please sign in to comment.