Skip to content

Commit

Permalink
Flatten some operations and cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
bryce-anderson committed Aug 14, 2024
1 parent c7c0d01 commit 38adcda
Showing 1 changed file with 42 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import java.net.SocketTimeoutException;
import java.time.Duration;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.BiFunction;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -174,59 +174,79 @@ public Single<StreamingHttpResponse> request(final StreamingHttpRequest request)
}))
// Defer timeout counter until after the request payload body is complete
.<StreamingHttpResponse>liftSync(subscriber ->
new CustomAmb(subscriber, SourceAdapters.fromSource(requestProcessor)
// Start timeout counter after requestProcessor completes
.concat(Completable.<StreamingHttpResponse>never().timeout(timeout, timeoutExecutor))))
// TODO: we can flatten the `map` call into our CustomAmb.
.map(response -> response.transformMessageBody(p -> p.timeout(timeout, timeoutExecutor)
.onErrorMap(TimeoutException.class, t -> newStacklessSocketTimeoutException(
"Read timed out after " + timeout.toMillis() +
"ms waiting for the next response payload body chunk")
.initCause(t))))
new RequestTimeoutSubscriber(subscriber, SourceAdapters.fromSource(requestProcessor),
timeout, timeoutExecutor))
.shareContextOnSubscribe();
});
}
};
}

private static final class CustomAmb implements Subscriber<StreamingHttpResponse> {
private static final class RequestTimeoutSubscriber implements Subscriber<StreamingHttpResponse> {

private final AtomicBoolean once = new AtomicBoolean(); // TODO: flatten.
private static final AtomicIntegerFieldUpdater<RequestTimeoutSubscriber> onceUpdater =
AtomicIntegerFieldUpdater.newUpdater(RequestTimeoutSubscriber.class, "once");
private final DelayedCancellable requestCancellable = new DelayedCancellable();
private final DelayedCancellable timeoutCancellable = new DelayedCancellable();
private final Subscriber<? super StreamingHttpResponse> delegate;
private final Completable timeout;

public CustomAmb(Subscriber<? super StreamingHttpResponse> delegate, Completable timeout) {
private final Completable requestComplete;
private final Duration timeout;
private final Executor timeoutExecutor;

private volatile int once;

public RequestTimeoutSubscriber(Subscriber<? super StreamingHttpResponse> delegate, Completable requestComplete,
Duration timeout, Executor timeoutExecutor) {
this.delegate = delegate;
this.requestComplete = requestComplete;
this.timeout = timeout;
this.timeoutExecutor = timeoutExecutor;

}

@Override
public void onSubscribe(Cancellable cancellable) {
delegate.onSubscribe(() -> {
// TODO: we could not condition the cancel calls on the `once()` if we always want to propagate
// cancellation even after we've emitted a result.
if (once()) {
timeoutCancellable.cancel();
requestCancellable.cancel();
}
});
timeoutCancellable.delayedCancellable(timeout.beforeOnError(this::handleTimeout).subscribe());

requestCancellable.delayedCancellable(cancellable);
timeoutCancellable.delayedCancellable(requestComplete.concat(Completable.never()
.timeout(timeout, timeoutExecutor)).beforeOnError(this::handleInterruptions).subscribe());
}

private void handleTimeout(Throwable t) {
// TODO: we can lift the onErrorMap here if we want.
private void handleInterruptions(Throwable t) {
if (once()) {
requestCancellable.cancel();
delegate.onError(t);
Throwable result = t;
// We can get a SocketTimeoutException waiting for a 100 Continue response.
if (t instanceof TimeoutException) {
result = newStacklessSocketTimeoutException(
"Read timed out after " + timeout.toMillis() +
"ms waiting for response meta-data")
.initCause(t);
}
delegate.onError(result);
}
}

@Override
public void onSuccess(@Nullable StreamingHttpResponse result) {
if (once()) {
// cancel timeout timer
timeoutCancellable.cancel();
if (result != null) {
result = result.transformMessageBody(p -> p.timeout(timeout, timeoutExecutor)
.onErrorMap(TimeoutException.class, t -> newStacklessSocketTimeoutException(
"Read timed out after " + timeout.toMillis() +
"ms waiting for the next response payload body chunk")
.initCause(t)));
}
delegate.onSuccess(result);
} else {
if (result != null) {
Expand All @@ -238,14 +258,13 @@ public void onSuccess(@Nullable StreamingHttpResponse result) {
@Override
public void onError(Throwable t) {
if (once()) {
// cancel timeout timer.
timeoutCancellable.cancel();
delegate.onError(t);
}
}

private boolean once() {
return !once.getAndSet(true);
return 0 == onceUpdater.getAndSet(this, 1);
}
}

Expand All @@ -267,8 +286,8 @@ public HttpExecutionStrategy requiredOffloads() {
return HttpExecutionStrategies.offloadNone();
}

private StacklessSocketTimeoutException newStacklessSocketTimeoutException(final String message) {
return StacklessSocketTimeoutException.newInstance(message, this.getClass(), "request");
private static StacklessSocketTimeoutException newStacklessSocketTimeoutException(final String message) {
return StacklessSocketTimeoutException.newInstance(message, JavaNetSoTimeoutHttpConnectionFilter.class, "request");
}

private static final class StacklessSocketTimeoutException extends SocketTimeoutException {
Expand All @@ -289,8 +308,4 @@ static StacklessSocketTimeoutException newInstance(final String message, final C
return ThrowableUtils.unknownStackTrace(new StacklessSocketTimeoutException(message), clazz, method);
}
}

private static void clean(StreamingHttpResponse response) {
toSource(response.messageBody()).subscribe(CancelImmediatelySubscriber.INSTANCE);
}
}

0 comments on commit 38adcda

Please sign in to comment.