Skip to content

Commit

Permalink
Update ignored_error logic for circuit_breaker
Browse files Browse the repository at this point in the history
When the digest size exceeds the max configured digest size by remote-cache, an "out_of_range" error is returned. These errors should not be considered as API failures for the circuit breaker logic, as they do not indicate any issues with the remote-cache service.
Similarly there are other non-retriable errors that should not be treated as server failure such as ALREADY_EXISTS.

This change considers non-retriable errors as user/client error and logs them as success. While retriable errors such `DEADLINE_EXCEEDED`, `UNKNOWN` etc are logged as failure.

Related PRs
#18359
#18539

Closes #18613.

PiperOrigin-RevId: 539948823
Change-Id: I5b51f6a3aecab7c17d73f78b8234d9a6da49fe6c
  • Loading branch information
amishra-u authored and copybara-github committed Jun 13, 2023
1 parent d9b94cb commit c05ba76
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 67 deletions.
25 changes: 16 additions & 9 deletions src/main/java/com/google/devtools/build/lib/remote/Retrier.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.build.lib.remote.Retrier.CircuitBreaker.State;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -100,7 +101,7 @@ enum State {
State state();

/** Called after an execution failed. */
void recordFailure(Exception e);
void recordFailure();

/** Called after an execution succeeded. */
void recordSuccess();
Expand Down Expand Up @@ -130,7 +131,7 @@ public State state() {
}

@Override
public void recordFailure(Exception e) {}
public void recordFailure() {}

@Override
public void recordSuccess() {}
Expand Down Expand Up @@ -245,12 +246,14 @@ public <T> T execute(Callable<T> call, Backoff backoff) throws Exception {
circuitBreaker.recordSuccess();
return r;
} catch (Exception e) {
circuitBreaker.recordFailure(e);
Throwables.throwIfInstanceOf(e, InterruptedException.class);
if (State.TRIAL_CALL.equals(circuitState)) {
if (!shouldRetry.test(e)) {
// A non-retriable error doesn't represent server failure.
circuitBreaker.recordSuccess();
throw e;
}
if (!shouldRetry.test(e)) {
circuitBreaker.recordFailure();
if (Objects.equals(circuitState, State.TRIAL_CALL)) {
throw e;
}
final long delayMillis = backoff.nextDelayMillis(e);
Expand Down Expand Up @@ -297,11 +300,11 @@ public <T> ListenableFuture<T> executeAsync(AsyncCallable<T> call, Backoff backo

private <T> ListenableFuture<T> onExecuteAsyncFailure(
Exception t, AsyncCallable<T> call, Backoff backoff, State circuitState) {
circuitBreaker.recordFailure(t);
if (circuitState.equals(State.TRIAL_CALL)) {
return Futures.immediateFailedFuture(t);
}
if (isRetriable(t)) {
circuitBreaker.recordFailure();
if (circuitState.equals(State.TRIAL_CALL)) {
return Futures.immediateFailedFuture(t);
}
long waitMillis = backoff.nextDelayMillis(t);
if (waitMillis >= 0) {
try {
Expand All @@ -315,6 +318,10 @@ private <T> ListenableFuture<T> onExecuteAsyncFailure(
return Futures.immediateFailedFuture(t);
}
} else {
// gRPC Errors NOT_FOUND, OUT_OF_RANGE, ALREADY_EXISTS etc. are non-retriable error, and they
// don't represent an
// issue in Server. So treating these errors as successful api call.
circuitBreaker.recordSuccess();
return Futures.immediateFailedFuture(t);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,11 @@
// limitations under the License.
package com.google.devtools.build.lib.remote.circuitbreaker;

import com.google.common.collect.ImmutableSet;
import com.google.devtools.build.lib.remote.Retrier;
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
import com.google.devtools.build.lib.remote.options.RemoteOptions;

/** Factory for {@link Retrier.CircuitBreaker} */
public class CircuitBreakerFactory {

public static final ImmutableSet<Class<? extends Exception>> DEFAULT_IGNORED_ERRORS =
ImmutableSet.of(CacheNotFoundException.class);
public static final int DEFAULT_MIN_CALL_COUNT_TO_COMPUTE_FAILURE_RATE = 100;

private CircuitBreakerFactory() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.
package com.google.devtools.build.lib.remote.circuitbreaker;

import com.google.common.collect.ImmutableSet;
import com.google.devtools.build.lib.remote.Retrier;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -33,12 +32,10 @@ public class FailureCircuitBreaker implements Retrier.CircuitBreaker {
private State state;
private final AtomicInteger successes;
private final AtomicInteger failures;
private final AtomicInteger ignoredFailures;
private final int failureRateThreshold;
private final int slidingWindowSize;
private final int minCallCountToComputeFailureRate;
private final ScheduledExecutorService scheduledExecutor;
private final ImmutableSet<Class<? extends Exception>> ignoredErrors;

/**
* Creates a {@link FailureCircuitBreaker}.
Expand All @@ -51,15 +48,13 @@ public class FailureCircuitBreaker implements Retrier.CircuitBreaker {
public FailureCircuitBreaker(int failureRateThreshold, int slidingWindowSize) {
this.failures = new AtomicInteger(0);
this.successes = new AtomicInteger(0);
this.ignoredFailures = new AtomicInteger(0);
this.failureRateThreshold = failureRateThreshold;
this.slidingWindowSize = slidingWindowSize;
this.minCallCountToComputeFailureRate =
CircuitBreakerFactory.DEFAULT_MIN_CALL_COUNT_TO_COMPUTE_FAILURE_RATE;
this.state = State.ACCEPT_CALLS;
this.scheduledExecutor =
slidingWindowSize > 0 ? Executors.newSingleThreadScheduledExecutor() : null;
this.ignoredErrors = CircuitBreakerFactory.DEFAULT_IGNORED_ERRORS;
}

@Override
Expand All @@ -68,33 +63,24 @@ public State state() {
}

@Override
public void recordFailure(Exception e) {
if (!ignoredErrors.contains(e.getClass())) {
int failureCount = failures.incrementAndGet();
int totalCallCount = successes.get() + failureCount + ignoredFailures.get();
if (slidingWindowSize > 0) {
var unused =
scheduledExecutor.schedule(
failures::decrementAndGet, slidingWindowSize, TimeUnit.MILLISECONDS);
}
public void recordFailure() {
int failureCount = failures.incrementAndGet();
int totalCallCount = successes.get() + failureCount;
if (slidingWindowSize > 0) {
var unused =
scheduledExecutor.schedule(
failures::decrementAndGet, slidingWindowSize, TimeUnit.MILLISECONDS);
}

if (totalCallCount < minCallCountToComputeFailureRate) {
// The remote call count is below the threshold required to calculate the failure rate.
return;
}
double failureRate = (failureCount * 100.0) / totalCallCount;
if (totalCallCount < minCallCountToComputeFailureRate) {
// The remote call count is below the threshold required to calculate the failure rate.
return;
}
double failureRate = (failureCount * 100.0) / totalCallCount;

// Since the state can only be changed to the open state, synchronization is not required.
if (failureRate > this.failureRateThreshold) {
this.state = State.REJECT_CALLS;
}
} else {
ignoredFailures.incrementAndGet();
if (slidingWindowSize > 0) {
var unused =
scheduledExecutor.schedule(
ignoredFailures::decrementAndGet, slidingWindowSize, TimeUnit.MILLISECONDS);
}
// Since the state can only be changed to the open state, synchronization is not required.
if (failureRate > this.failureRateThreshold) {
this.state = State.REJECT_CALLS;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand All @@ -31,6 +30,10 @@
import com.google.devtools.build.lib.remote.Retrier.CircuitBreakerException;
import com.google.devtools.build.lib.remote.Retrier.ZeroBackoff;
import com.google.devtools.build.lib.testutil.TestUtils;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -94,7 +97,7 @@ public void retryShouldWork_failure() throws Exception {
assertThat(e).hasMessageThat().isEqualTo("call failed");

assertThat(numCalls.get()).isEqualTo(3);
verify(alwaysOpen, times(3)).recordFailure(any(Exception.class));
verify(alwaysOpen, times(3)).recordFailure();
verify(alwaysOpen, never()).recordSuccess();
}

Expand All @@ -118,8 +121,8 @@ public void retryShouldWorkNoRetries_failure() throws Exception {
assertThat(e).hasMessageThat().isEqualTo("call failed");

assertThat(numCalls.get()).isEqualTo(1);
verify(alwaysOpen, times(1)).recordFailure(e);
verify(alwaysOpen, never()).recordSuccess();
verify(alwaysOpen, never()).recordFailure();
verify(alwaysOpen, times(1)).recordSuccess();
}

@Test
Expand All @@ -139,7 +142,7 @@ public void retryShouldWork_success() throws Exception {
});
assertThat(val).isEqualTo(1);

verify(alwaysOpen, times(2)).recordFailure(any(Exception.class));
verify(alwaysOpen, times(2)).recordFailure();
verify(alwaysOpen, times(1)).recordSuccess();
}

Expand Down Expand Up @@ -332,6 +335,46 @@ public void asyncRetryEmptyError() throws Exception {
assertThat(e).hasCauseThat().hasMessageThat().isEqualTo("");
}

@Test
public void testCircuitBreakerFailureAndSuccessCallOnDifferentGrpcError() {
int maxRetries = 2;
Supplier<Backoff> s = () -> new ZeroBackoff(maxRetries);
List<Status> retriableGrpcError =
Arrays.asList(Status.ABORTED, Status.UNKNOWN, Status.DEADLINE_EXCEEDED);
List<Status> nonRetriableGrpcError =
Arrays.asList(Status.NOT_FOUND, Status.OUT_OF_RANGE, Status.ALREADY_EXISTS);
TripAfterNCircuitBreaker cb =
new TripAfterNCircuitBreaker(retriableGrpcError.size() * (maxRetries + 1));
Retrier r = new Retrier(s, RemoteRetrier.RETRIABLE_GRPC_ERRORS, retryService, cb);

int expectedConsecutiveFailures = 0;

for (Status status : retriableGrpcError) {
ListenableFuture<Void> res =
r.executeAsync(
() -> {
throw new StatusRuntimeException(status);
});
expectedConsecutiveFailures += maxRetries + 1;
assertThrows(ExecutionException.class, res::get);
assertThat(cb.consecutiveFailures).isEqualTo(expectedConsecutiveFailures);
}

assertThat(cb.state).isEqualTo(State.REJECT_CALLS);
cb.trialCall();

for (Status status : nonRetriableGrpcError) {
ListenableFuture<Void> res =
r.executeAsync(
() -> {
throw new StatusRuntimeException(status);
});
assertThat(cb.consecutiveFailures).isEqualTo(0);
assertThrows(ExecutionException.class, res::get);
}
assertThat(cb.state).isEqualTo(State.ACCEPT_CALLS);
}

/** Simple circuit breaker that trips after N consecutive failures. */
@ThreadSafe
private static class TripAfterNCircuitBreaker implements CircuitBreaker {
Expand All @@ -351,7 +394,7 @@ public synchronized State state() {
}

@Override
public synchronized void recordFailure(Exception e) {
public synchronized void recordFailure() {
consecutiveFailures++;
if (consecutiveFailures >= maxConsecutiveFailures) {
state = State.REJECT_CALLS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@

import static com.google.common.truth.Truth.assertThat;

import build.bazel.remote.execution.v2.Digest;
import com.google.devtools.build.lib.remote.Retrier.CircuitBreaker.State;
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand All @@ -30,40 +28,37 @@
public class FailureCircuitBreakerTest {

@Test
public void testRecordFailure_withIgnoredErrors() throws InterruptedException {
public void testRecordFailure_circuitTrips() throws InterruptedException {
final int failureRateThreshold = 10;
final int windowInterval = 100;
FailureCircuitBreaker failureCircuitBreaker =
new FailureCircuitBreaker(failureRateThreshold, windowInterval);

List<Exception> listOfExceptionThrownOnFailure = new ArrayList<>();
List<Runnable> listOfSuccessAndFailureCalls = new ArrayList<>();
for (int index = 0; index < failureRateThreshold; index++) {
listOfExceptionThrownOnFailure.add(new Exception());
listOfSuccessAndFailureCalls.add(failureCircuitBreaker::recordFailure);
}

for (int index = 0; index < failureRateThreshold * 9; index++) {
listOfExceptionThrownOnFailure.add(new CacheNotFoundException(Digest.newBuilder().build()));
listOfSuccessAndFailureCalls.add(failureCircuitBreaker::recordSuccess);
}

Collections.shuffle(listOfExceptionThrownOnFailure);
Collections.shuffle(listOfSuccessAndFailureCalls);

// make calls equals to threshold number of not ignored failure calls in parallel.
listOfExceptionThrownOnFailure.stream()
.parallel()
.forEach(failureCircuitBreaker::recordFailure);
listOfSuccessAndFailureCalls.stream().parallel().forEach(Runnable::run);
assertThat(failureCircuitBreaker.state()).isEqualTo(State.ACCEPT_CALLS);

// Sleep for windowInterval + 1ms.
Thread.sleep(windowInterval + 1 /*to compensate any delay*/);

// make calls equals to threshold number of not ignored failure calls in parallel.
listOfExceptionThrownOnFailure.stream()
.parallel()
.forEach(failureCircuitBreaker::recordFailure);
listOfSuccessAndFailureCalls.stream().parallel().forEach(Runnable::run);
assertThat(failureCircuitBreaker.state()).isEqualTo(State.ACCEPT_CALLS);

// Sleep for less than windowInterval.
Thread.sleep(windowInterval - 5);
failureCircuitBreaker.recordFailure(new Exception());
failureCircuitBreaker.recordFailure();
assertThat(failureCircuitBreaker.state()).isEqualTo(State.REJECT_CALLS);
}

Expand All @@ -80,15 +75,15 @@ public void testRecordFailure_minCallCriteriaNotMet() throws InterruptedExceptio
// minCallToComputeFailure.
IntStream.range(0, minCallToComputeFailure >> 1)
.parallel()
.forEach(i -> failureCircuitBreaker.recordFailure(new Exception()));
.forEach(i -> failureCircuitBreaker.recordFailure());
IntStream.range(0, minCallToComputeFailure >> 1)
.parallel()
.forEach(i -> failureCircuitBreaker.recordSuccess());
assertThat(failureCircuitBreaker.state()).isEqualTo(State.ACCEPT_CALLS);

// Sleep for less than windowInterval.
Thread.sleep(windowInterval - 20);
failureCircuitBreaker.recordFailure(new Exception());
Thread.sleep(windowInterval - 50);
failureCircuitBreaker.recordFailure();
assertThat(failureCircuitBreaker.state()).isEqualTo(State.REJECT_CALLS);
}
}

0 comments on commit c05ba76

Please sign in to comment.