Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RetryingHttpRequesterFilter: add on request retry callback #2916

Merged
merged 3 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2021 Apple Inc. and the ServiceTalk project authors
* Copyright © 2021-2024 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -36,6 +36,7 @@
import io.servicetalk.http.api.HttpRequestMetaData;
import io.servicetalk.http.api.HttpResponseMetaData;
import io.servicetalk.http.api.HttpResponseStatus;
import io.servicetalk.http.api.StreamingHttpClient;
import io.servicetalk.http.api.StreamingHttpClientFilter;
import io.servicetalk.http.api.StreamingHttpClientFilterFactory;
import io.servicetalk.http.api.StreamingHttpRequest;
Expand Down Expand Up @@ -90,10 +91,10 @@ public final class RetryingHttpRequesterFilter
static final int DEFAULT_MAX_TOTAL_RETRIES = 4;
private static final RetryingHttpRequesterFilter DISABLE_AUTO_RETRIES =
new RetryingHttpRequesterFilter(true, false, false, 1, null,
(__, ___) -> NO_RETRIES);
(__, ___) -> NO_RETRIES, null);
private static final RetryingHttpRequesterFilter DISABLE_ALL_RETRIES =
new RetryingHttpRequesterFilter(false, true, false, 0, null,
(__, ___) -> NO_RETRIES);
(__, ___) -> NO_RETRIES, null);

private final boolean waitForLb;
private final boolean ignoreSdErrors;
Expand All @@ -102,18 +103,22 @@ public final class RetryingHttpRequesterFilter
@Nullable
private final Function<HttpResponseMetaData, HttpResponseException> responseMapper;
private final BiFunction<HttpRequestMetaData, Throwable, BackOffPolicy> retryFor;
@Nullable
private final RetryCallbacks onRequestRetry;

RetryingHttpRequesterFilter(
final boolean waitForLb, final boolean ignoreSdErrors, final boolean mayReplayRequestPayload,
final int maxTotalRetries,
@Nullable final Function<HttpResponseMetaData, HttpResponseException> responseMapper,
final BiFunction<HttpRequestMetaData, Throwable, BackOffPolicy> retryFor) {
final BiFunction<HttpRequestMetaData, Throwable, BackOffPolicy> retryFor,
@Nullable final RetryCallbacks onRequestRetry) {
this.waitForLb = waitForLb;
this.ignoreSdErrors = ignoreSdErrors;
this.mayReplayRequestPayload = mayReplayRequestPayload;
this.maxTotalRetries = maxTotalRetries;
this.responseMapper = responseMapper;
this.retryFor = retryFor;
this.onRequestRetry = onRequestRetry;
}

@Override
Expand Down Expand Up @@ -151,15 +156,20 @@ void inject(@Nullable final Publisher<Object> lbEventStream,
private final class OuterRetryStrategy implements BiIntFunction<Throwable, Completable> {
private final Executor executor;
private final HttpRequestMetaData requestMetaData;
@Nullable
private final RetryCallbacks retryCallbacks;
/**
* The outer retry strategy handles both "load balancer not ready" and "request failed" cases. This count
* discounts the former so the ladder strategies only count actual request attempts.
*/
private int lbNotReadyCount;

private OuterRetryStrategy(final Executor executor, final HttpRequestMetaData requestMetaData) {
private OuterRetryStrategy(final Executor executor,
final HttpRequestMetaData requestMetaData,
@Nullable final RetryCallbacks retryCallbacks) {
this.executor = executor;
this.requestMetaData = requestMetaData;
this.retryCallbacks = retryCallbacks;
}

@Override
Expand All @@ -179,40 +189,48 @@ public Completable apply(final int count, final Throwable t) {
!(lbEvent instanceof LoadBalancerReadyEvent &&
((LoadBalancerReadyEvent) lbEvent).isReady()))
.ignoreElements();
return sdStatus == null ? onHostsAvailable : onHostsAvailable.ambWith(sdStatus);
return applyRetryCallbacks(
sdStatus == null ? onHostsAvailable : onHostsAvailable.ambWith(sdStatus), count, t);
}

final BackOffPolicy backOffPolicy = retryFor.apply(requestMetaData, t);
if (backOffPolicy != NO_RETRIES) {
final int offsetCount = count - lbNotReadyCount;
Completable retryWhen = backOffPolicy.newStrategy(executor).apply(offsetCount, t);
if (t instanceof DelayedRetry) {
final Duration constant = ((DelayedRetry) t).delay();
return backOffPolicy.newStrategy(executor).apply(offsetCount, t)
.concat(executor.timer(constant));
retryWhen = retryWhen.concat(executor.timer(constant));
}

return backOffPolicy.newStrategy(executor).apply(offsetCount, t);
return applyRetryCallbacks(retryWhen, count, t);
}

return failed(t);
}

Completable applyRetryCallbacks(final Completable completable, final int retryCount, final Throwable t) {
return retryCallbacks == null ? completable :
completable.beforeOnComplete(() -> retryCallbacks.beforeRetry(retryCount, requestMetaData, t));
}
}

// Visible for testing
BiIntFunction<Throwable, Completable> retryStrategy(final HttpRequestMetaData requestMetaData,
final ExecutionContext<HttpExecutionStrategy> context) {
final ExecutionContext<HttpExecutionStrategy> context,
final boolean forRequest) {
final HttpExecutionStrategy strategy = requestMetaData.context()
.getOrDefault(HTTP_EXECUTION_STRATEGY_KEY, context.executionStrategy());
assert strategy != null;
return new OuterRetryStrategy(strategy.isRequestResponseOffloaded() ?
context.executor() : context.ioExecutor(), requestMetaData);
context.executor() : context.ioExecutor(), requestMetaData,
forRequest ? onRequestRetry : null);
}

@Override
public Single<? extends FilterableReservedStreamingHttpConnection> reserveConnection(
final HttpRequestMetaData metaData) {
return delegate().reserveConnection(metaData)
.retryWhen(retryStrategy(metaData, executionContext()));
.retryWhen(retryStrategy(metaData, executionContext(), false));
}

@Override
Expand Down Expand Up @@ -251,7 +269,7 @@ protected Single<StreamingHttpResponse> request(final StreamingHttpRequester del
// 1. Metadata is shared across retries
// 2. Publisher state is restored to original state for each retry
// duplicatedRequest isn't used below because retryWhen must be applied outside the defer operator for (2).
return single.retryWhen(retryStrategy(request, executionContext()));
return single.retryWhen(retryStrategy(request, executionContext(), true));
}
}

Expand Down Expand Up @@ -402,6 +420,18 @@ public static final class BackOffPolicy {
this.maxRetries = ensureNonNegative(maxRetries, "maxRetries");
}

@Override
public String toString() {
return getClass().getSimpleName() +
"{maxRetries=" + maxRetries +
", initialDelay=" + initialDelay +
", jitter=" + jitter +
", maxDelay=" + maxDelay +
", exponential=" + exponential +
", timerExecutor=" + timerExecutor +
'}';
}

/**
* Creates a new {@link BackOffPolicy} that retries failures instantly up-to 3 max retries.
*
Expand Down Expand Up @@ -642,6 +672,22 @@ public interface DelayedRetry {
Duration delay();
}

/**
* Callbacks invoked on a retry attempt.
*/
@FunctionalInterface
public interface RetryCallbacks {

/**
* Called after a retry decision has been made, but before the retry is performed.
*
* @param retryCount a current retry counter value for this attempt
* @param requestMetaData {@link HttpRequestMetaData} that is being retried
* @param cause {@link Throwable} cause for the retry
*/
void beforeRetry(int retryCount, HttpRequestMetaData requestMetaData, Throwable cause);
}

/**
* A builder for {@link RetryingHttpRequesterFilter}, which puts an upper bound on retry attempts.
* To configure the maximum number of retry attempts see {@link #maxTotalRetries(int)}.
Expand All @@ -665,20 +711,19 @@ public static final class Builder {
private Function<HttpResponseMetaData, HttpResponseException> responseMapper;

@Nullable
private BiFunction<HttpRequestMetaData, IOException, BackOffPolicy>
retryIdempotentRequests;
private BiFunction<HttpRequestMetaData, IOException, BackOffPolicy> retryIdempotentRequests;

@Nullable
private BiFunction<HttpRequestMetaData, DelayedRetry, BackOffPolicy> retryDelayedRetries;

@Nullable
private BiFunction<HttpRequestMetaData, DelayedRetry, BackOffPolicy>
retryDelayedRetries;
private BiFunction<HttpRequestMetaData, HttpResponseException, BackOffPolicy> retryResponses;

@Nullable
private BiFunction<HttpRequestMetaData, HttpResponseException, BackOffPolicy>
retryResponses;
private BiFunction<HttpRequestMetaData, Throwable, BackOffPolicy> retryOther;

@Nullable
private BiFunction<HttpRequestMetaData, Throwable, BackOffPolicy>
retryOther;
private RetryCallbacks onRequestRetry;

/**
* By default, automatic retries wait for the associated {@link LoadBalancer} to be
Expand Down Expand Up @@ -842,6 +887,22 @@ public Builder retryOther(
return this;
}

/**
* Callback invoked on every {@link StreamingHttpClient#request(StreamingHttpRequest) request} retry attempt.
* <p>
* This can be used to track when {@link BackOffPolicy} actually decides to retry a request, to update
* {@link HttpRequestMetaData request meta-data} before a retry, or implement logging/metrics. However, it
* can not be used to influence the retry decision, use other "retry*" functions for that purpose.
*
* @param onRequestRetry {@link RetryCallbacks} to get notified on every
* {@link StreamingHttpClient#request(StreamingHttpRequest) request} retry attempt
* @return {@code this}
*/
public Builder onRequestRetry(final RetryCallbacks onRequestRetry) {
this.onRequestRetry = requireNonNull(onRequestRetry);
return this;
}

/**
* Builds a retrying {@link RetryingHttpRequesterFilter} with this' builders configuration.
*
Expand Down Expand Up @@ -921,7 +982,7 @@ public RetryingHttpRequesterFilter build() {
return NO_RETRIES;
};
return new RetryingHttpRequesterFilter(waitForLb, ignoreSdErrors, mayReplayRequestPayload,
maxTotalRetries, responseMapper, allPredicate);
maxTotalRetries, responseMapper, allPredicate, onRequestRetry);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2021 Apple Inc. and the ServiceTalk project authors
* Copyright © 2019-2024 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -38,6 +38,7 @@
import org.mockito.stubbing.Answer;

import java.net.UnknownHostException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;

import static io.servicetalk.buffer.netty.BufferAllocators.DEFAULT_ALLOCATOR;
Expand Down Expand Up @@ -101,13 +102,16 @@ void disableWaitForLb(boolean offloading) {
@ParameterizedTest
@ValueSource(booleans = {false, true})
void disableRetryAllRetryableExWithRetryable(boolean offloading) {
final ContextAwareRetryingHttpClientFilter filter =
newFilter(new RetryingHttpRequesterFilter.Builder()
.retryRetryableExceptions((__, ___) -> ofNoRetries()), offloading);
AtomicInteger onRequestRetryCounter = new AtomicInteger();
final ContextAwareRetryingHttpClientFilter filter = newFilter(new RetryingHttpRequesterFilter.Builder()
.retryRetryableExceptions((__, ___) -> ofNoRetries())
.onRequestRetry((count, req, t) -> assertThat(onRequestRetryCounter.incrementAndGet(), is(count))),
offloading);

Completable retry = applyRetry(filter, 1, RETRYABLE_EXCEPTION);
toSource(retry).subscribe(retrySubscriber);
verifyRetryResultError(RETRYABLE_EXCEPTION);
assertThat("Unexpected calls to onRequestRetry.", onRequestRetryCounter.get(), is(0));
}

@ParameterizedTest
Expand Down Expand Up @@ -170,13 +174,16 @@ void defaultForRetryableEx(boolean offloading) {
@ParameterizedTest
@ValueSource(booleans = {false, true})
void defaultForNoAvailableHost(boolean offloading) {
final ContextAwareRetryingHttpClientFilter filter =
newFilter(new RetryingHttpRequesterFilter.Builder(), offloading);
AtomicInteger onRequestRetryCounter = new AtomicInteger();
final ContextAwareRetryingHttpClientFilter filter = newFilter(new RetryingHttpRequesterFilter.Builder()
.onRequestRetry((count, req, t) -> assertThat(onRequestRetryCounter.incrementAndGet(), is(count))),
offloading);
Completable retry = applyRetry(filter, 1, NO_AVAILABLE_HOST);
toSource(retry).subscribe(retrySubscriber);
assertThat(retrySubscriber.pollTerminal(10, MILLISECONDS), is(nullValue()));
lbEvents.onNext(LOAD_BALANCER_READY_EVENT);
verifyRetryResultCompleted();
assertThat("Unexpected calls to onRequestRetry.", onRequestRetryCounter.get(), is(1));
}

@ParameterizedTest
Expand Down Expand Up @@ -255,7 +262,7 @@ void noActiveHostException(boolean offloading) {
.maxTotalRetries(Integer.MAX_VALUE), offloading);
lbEvents.onNext(LOAD_BALANCER_READY_EVENT); // LB is ready before subscribing to the response
BiIntFunction<Throwable, Completable> retryStrategy =
filter.retryStrategy(REQUEST_META_DATA, filter.executionContext());
filter.retryStrategy(REQUEST_META_DATA, filter.executionContext(), true);
for (int i = 1; i <= DEFAULT_MAX_TOTAL_RETRIES * 2; i++) {
Completable retry = retryStrategy.apply(i, NO_ACTIVE_HOST);
TestCompletableSubscriber subscriber = new TestCompletableSubscriber();
Expand Down Expand Up @@ -316,6 +323,6 @@ private ContextAwareRetryingHttpClientFilter newFilter(final RetryingHttpRequest
@Nonnull
private Completable applyRetry(final ContextAwareRetryingHttpClientFilter filter,
final int count, final Throwable t) {
return filter.retryStrategy(REQUEST_META_DATA, filter.executionContext()).apply(count, t);
return filter.retryStrategy(REQUEST_META_DATA, filter.executionContext(), true).apply(count, t);
}
}
Loading
Loading