Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement failure circuit breaker #18359

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/main/java/com/google/devtools/build/lib/remote/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package(
filegroup(
name = "srcs",
srcs = glob(["*"]) + [
"//src/main/java/com/google/devtools/build/lib/remote/circuitbreaker:srcs",
"//src/main/java/com/google/devtools/build/lib/remote/common:srcs",
"//src/main/java/com/google/devtools/build/lib/remote/disk:srcs",
"//src/main/java/com/google/devtools/build/lib/remote/downloader:srcs",
Expand Down Expand Up @@ -90,6 +91,7 @@ java_library(
"//src/main/java/com/google/devtools/build/lib/remote/common",
"//src/main/java/com/google/devtools/build/lib/remote/common:bulk_transfer_exception",
"//src/main/java/com/google/devtools/build/lib/remote/common:cache_not_found_exception",
"//src/main/java/com/google/devtools/build/lib/remote/circuitbreaker",
"//src/main/java/com/google/devtools/build/lib/remote/disk",
"//src/main/java/com/google/devtools/build/lib/remote/downloader",
"//src/main/java/com/google/devtools/build/lib/remote/grpc",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -500,4 +500,8 @@ ListenableFuture<Void> uploadChunker(
MoreExecutors.directExecutor());
return f;
}

Retrier getRetrier() {
return this.retrier;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -252,4 +252,8 @@ public void close() {
}
channel.release();
}

RemoteRetrier getRetrier() {
return this.retrier;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this needed at all? I don't see this gets used anywhere or in the initerface

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is used test methods inside RemoteModuleTest class.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import com.google.devtools.build.lib.exec.ModuleActionContextRegistry;
import com.google.devtools.build.lib.exec.SpawnStrategyRegistry;
import com.google.devtools.build.lib.remote.RemoteServerCapabilities.ServerCapabilitiesRequirement;
import com.google.devtools.build.lib.remote.circuitbreaker.CircuitBreakerFactory;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
import com.google.devtools.build.lib.remote.common.RemoteExecutionClient;
import com.google.devtools.build.lib.remote.downloader.GrpcRemoteDownloader;
Expand Down Expand Up @@ -511,12 +512,13 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
GoogleAuthUtils.newCallCredentialsProvider(credentials);
CallCredentials callCredentials = callCredentialsProvider.getCallCredentials();

Retrier.CircuitBreaker circuitBreaker = CircuitBreakerFactory.createCircuitBreaker(remoteOptions);
RemoteRetrier retrier =
new RemoteRetrier(
remoteOptions,
RemoteRetrier.RETRIABLE_GRPC_ERRORS,
retryScheduler,
Retrier.ALLOW_ALL_CALLS);
circuitBreaker);

// We only check required capabilities for a given endpoint.
//
Expand Down Expand Up @@ -637,7 +639,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
remoteOptions,
RemoteRetrier.RETRIABLE_GRPC_ERRORS, // Handle NOT_FOUND internally
retryScheduler,
Retrier.ALLOW_ALL_CALLS);
circuitBreaker);
remoteExecutor =
new ExperimentalGrpcRemoteExecutor(
executionCapabilities,
Expand All @@ -651,7 +653,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
remoteOptions,
RemoteRetrier.RETRIABLE_GRPC_EXEC_ERRORS,
retryScheduler,
Retrier.ALLOW_ALL_CALLS);
circuitBreaker);
remoteExecutor =
new GrpcRemoteExecutor(
executionCapabilities, execChannel.retain(), callCredentialsProvider, execRetrier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import com.google.devtools.build.lib.profiler.SilentCloseable;
import com.google.devtools.build.lib.remote.RemoteExecutionService.RemoteActionResult;
import com.google.devtools.build.lib.remote.RemoteExecutionService.ServerLogs;
import com.google.devtools.build.lib.remote.circuitbreaker.CircuitBreakerFactory;
import com.google.devtools.build.lib.remote.common.BulkTransferException;
import com.google.devtools.build.lib.remote.common.OperationObserver;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
Expand Down Expand Up @@ -655,6 +656,6 @@ private void report(Event evt) {
private static RemoteRetrier createExecuteRetrier(
RemoteOptions options, ListeningScheduledExecutorService retryService) {
return new ExecuteRetrier(
options.remoteMaxRetryAttempts, retryService, Retrier.ALLOW_ALL_CALLS);
options.remoteMaxRetryAttempts, retryService, CircuitBreakerFactory.createCircuitBreaker(options));
}
}
33 changes: 26 additions & 7 deletions src/main/java/com/google/devtools/build/lib/remote/Retrier.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ enum State {
State state();

/** Called after an execution failed. */
void recordFailure();
void recordFailure(Exception e);

/** Called after an execution succeeded. */
void recordSuccess();
Expand Down Expand Up @@ -130,7 +130,7 @@ public State state() {
}

@Override
public void recordFailure() {}
public void recordFailure(Exception e) {}

@Override
public void recordSuccess() {}
Expand Down Expand Up @@ -245,7 +245,7 @@ public <T> T execute(Callable<T> call, Backoff backoff) throws Exception {
circuitBreaker.recordSuccess();
return r;
} catch (Exception e) {
circuitBreaker.recordFailure();
circuitBreaker.recordFailure(e);
Throwables.throwIfInstanceOf(e, InterruptedException.class);
if (State.TRIAL_CALL.equals(circuitState)) {
throw e;
Expand All @@ -272,19 +272,34 @@ public <T> ListenableFuture<T> executeAsync(AsyncCallable<T> call) {
* backoff.
*/
public <T> ListenableFuture<T> executeAsync(AsyncCallable<T> call, Backoff backoff) {
final State circuitState = circuitBreaker.state();
if (State.REJECT_CALLS.equals(circuitState)) {
return Futures.immediateFailedFuture(new CircuitBreakerException());
}
try {
return Futures.catchingAsync(
ListenableFuture<T> future = Futures.transformAsync(
call.call(),
(f) -> {
circuitBreaker.recordSuccess();
return Futures.immediateFuture(f);
},
MoreExecutors.directExecutor());
return Futures.catchingAsync(
future,
Exception.class,
t -> onExecuteAsyncFailure(t, call, backoff),
t -> onExecuteAsyncFailure(t, call, backoff, circuitState),
MoreExecutors.directExecutor());
} catch (Exception e) {
return onExecuteAsyncFailure(e, call, backoff);
return onExecuteAsyncFailure(e, call, backoff, circuitState);
}
}

private <T> ListenableFuture<T> onExecuteAsyncFailure(
Exception t, AsyncCallable<T> call, Backoff backoff) {
Exception t, AsyncCallable<T> call, Backoff backoff, State circuitState) {
circuitBreaker.recordFailure(t);
if (circuitState.equals(State.TRIAL_CALL)) {
return Futures.immediateFailedFuture(t);
}
if (isRetriable(t)) {
long waitMillis = backoff.nextDelayMillis(t);
if (waitMillis >= 0) {
Expand All @@ -310,4 +325,8 @@ public Backoff newBackoff() {
public boolean isRetriable(Exception e) {
return shouldRetry.test(e);
}

CircuitBreaker getCircuitBreaker() {
return this.circuitBreaker;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
load("@rules_java//java:defs.bzl", "java_library")

package(
default_applicable_licenses = ["//:license"],
default_visibility = ["//src:__subpackages__"],
)

filegroup(
name = "srcs",
srcs = glob(["*"]),
visibility = ["//src:__subpackages__"],
)

java_library(
name = "circuitbreaker",
srcs = glob(["*.java"]),
deps = [
"//src/main/java/com/google/devtools/build/lib/remote:Retrier",
"//src/main/java/com/google/devtools/build/lib/remote/common:cache_not_found_exception",
"//src/main/java/com/google/devtools/build/lib/remote/options",
"//third_party:guava",
]
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.google.devtools.build.lib.remote.circuitbreaker;

import com.google.devtools.build.lib.remote.Retrier;
import com.google.common.collect.ImmutableSet;
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
import com.google.devtools.build.lib.remote.options.RemoteOptions;

/**
* Factory for {@link Retrier.CircuitBreaker}
*/
public class CircuitBreakerFactory {

public static final ImmutableSet<Class<? extends Exception>> DEFAULT_IGNORED_ERRORS =
ImmutableSet.of(CacheNotFoundException.class);

private CircuitBreakerFactory() {
}

/**
* Creates the instance of the {@link Retrier.CircuitBreaker} as per the strategy defined in {@link RemoteOptions}.
* In case of undefined strategy defaults to {@link Retrier.ALLOW_ALL_CALLS} implementation.
*
* @param remoteOptions The configuration for the CircuitBreaker implementation.
* @return an instance of CircuitBreaker.
*/
public static Retrier.CircuitBreaker createCircuitBreaker(final RemoteOptions remoteOptions) {
if (remoteOptions.circuitBreakerStrategy == RemoteOptions.CircuitBreakerStrategy.FAILURE) {
return new FailureCircuitBreaker(remoteOptions.remoteFailureThreshold,
(int) remoteOptions.remoteFailureWindowInterval.toMillis());
}
return Retrier.ALLOW_ALL_CALLS;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.google.devtools.build.lib.remote.circuitbreaker;

import com.google.common.collect.ImmutableSet;
import com.google.devtools.build.lib.remote.Retrier;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
* The {@link FailureCircuitBreaker} implementation of the {@link Retrier.CircuitBreaker} prevents further calls to a remote cache
* once the number of failures within a given window exceeds a specified threshold for a build.
* In the context of Bazel, a new instance of {@link Retrier.CircuitBreaker} is created for each build. Therefore, if the circuit
* breaker trips during a build, the remote cache will be disabled for that build. However, it will be enabled again
* for the next build as a new instance of {@link Retrier.CircuitBreaker} will be created.
*/
public class FailureCircuitBreaker implements Retrier.CircuitBreaker {

private State state;
private final AtomicInteger failures;
private final int failureThreshold;
private final int slidingWindowSize;
private final ScheduledExecutorService scheduledExecutor;
private final ImmutableSet<Class<? extends Exception>> ignoredErrors;

/**
* Creates a {@link FailureCircuitBreaker}.
*
* @param failureThreshold is used to set the number of failures required to trip the circuit breaker in given
* time window.
* @param slidingWindowSize the size of the sliding window in milliseconds to calculate the number of failures.
*/
public FailureCircuitBreaker(int failureThreshold, int slidingWindowSize) {
this.failureThreshold = failureThreshold;
this.failures = new AtomicInteger(0);
this.slidingWindowSize = slidingWindowSize;
this.state = State.ACCEPT_CALLS;
this.scheduledExecutor = slidingWindowSize > 0 ? Executors.newSingleThreadScheduledExecutor() : null;
this.ignoredErrors = CircuitBreakerFactory.DEFAULT_IGNORED_ERRORS;
}

@Override
public State state() {
return this.state;
}

@Override
public void recordFailure(Exception e) {
if (!ignoredErrors.contains(e.getClass())) {
int failureCount = failures.incrementAndGet();
if (slidingWindowSize > 0) {
scheduledExecutor.schedule(failures::decrementAndGet, slidingWindowSize, TimeUnit.MILLISECONDS);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a comment maybe? WhyincrementAndGet then decrement after a few milliseconds

}
// Since the state can only be changed to the open state, synchronization is not required.
if (failureCount > this.failureThreshold) {
this.state = State.REJECT_CALLS;
}
}
}

@Override
public void recordSuccess() {
// do nothing, implement if we need to set threshold on failure rate instead of count.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,41 @@ public RemoteOutputsStrategyConverter() {
+ "cache misses and retries.")
public boolean remoteDiscardMerkleTrees;

@Option(
name = "experimental_circuit_breaker_strategy",
documentationCategory = OptionDocumentationCategory.REMOTE,
defaultValue = "null",
effectTags = {OptionEffectTag.EXECUTION},
converter = CircuitBreakerStrategy.Converter.class,
help =
"Specifies the strategy for the circuit breaker to use. Available strategies are \"failure\". "
+ "On invalid value for the option the behavior same as the option is not set.")
public CircuitBreakerStrategy circuitBreakerStrategy;

@Option(
name = "experimental_remote_failure_threshold",
defaultValue = "100",
documentationCategory = OptionDocumentationCategory.REMOTE,
effectTags = {OptionEffectTag.EXECUTION},
help =
"Sets the allowed number of failures in a specific time window after which it stops calling to the "
+ "remote cache/executor. By default the value is 100. Setting this to 0 or negative means "
+ "no limitation.")
public int remoteFailureThreshold;

@Option(
name = "experimental_remote_failure_window_interval",
defaultValue = "60s",
documentationCategory = OptionDocumentationCategory.REMOTE,
effectTags = {OptionEffectTag.EXECUTION},
converter = RemoteDurationConverter.class,
help =
"The interval in which the failure count of the remote requests are computed. On zero or negative "
+ "value the failure duration is computed the whole duration of the execution."
+ "Following units can be used: Days (d), hours (h), minutes (m), seconds (s), and milliseconds (ms). "
+ "If the unit is omitted, the value is interpreted as seconds.")
public Duration remoteFailureWindowInterval;

// The below options are not configurable by users, only tests.
// This is part of the effort to reduce the overall number of flags.

Expand Down Expand Up @@ -749,4 +784,16 @@ public boolean shouldPrintMessages(boolean success) {
|| this == ExecutionMessagePrintMode.ALL);
}
}

/** An enum for specifying different strategy for circuit breaker. */
public enum CircuitBreakerStrategy {
FAILURE;

/** Converts to {@link CircuitBreakerStrategy}. */
public static class Converter extends EnumConverter<CircuitBreakerStrategy> {
public Converter() {
super(CircuitBreakerStrategy.class, "CircuitBreaker strategy");
}
}
}
}
2 changes: 2 additions & 0 deletions src/test/java/com/google/devtools/build/lib/remote/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ filegroup(
name = "srcs",
testonly = 0,
srcs = glob(["**"]) + [
"//src/test/java/com/google/devtools/build/lib/remote/circuitbreaker:srcs",
"//src/test/java/com/google/devtools/build/lib/remote/downloader:srcs",
"//src/test/java/com/google/devtools/build/lib/remote/grpc:srcs",
"//src/test/java/com/google/devtools/build/lib/remote/http:srcs",
Expand Down Expand Up @@ -74,6 +75,7 @@ java_test(
"//src/main/java/com/google/devtools/build/lib/remote",
"//src/main/java/com/google/devtools/build/lib/remote:abstract_action_input_prefetcher",
"//src/main/java/com/google/devtools/build/lib/remote:remote_output_checker",
"//src/main/java/com/google/devtools/build/lib/remote/circuitbreaker",
"//src/main/java/com/google/devtools/build/lib/remote/common",
"//src/main/java/com/google/devtools/build/lib/remote/common:bulk_transfer_exception",
"//src/main/java/com/google/devtools/build/lib/remote/common:cache_not_found_exception",
Expand Down
Loading