Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase retry backoff for Storage API batch to survive AppendRows quota refill #31837

Merged
merged 7 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@
*/
package org.apache.beam.sdk.io.gcp.bigquery;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import java.io.IOException;
import java.time.Instant;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -52,6 +51,9 @@
class RetryManager<ResultT, ContextT extends Context<ResultT>> {
private Queue<Operation<ResultT, ContextT>> operations;
private final BackOff backoff;

// Longer backoff for quota errors because AppendRows throughput takes a long time to cool off
private final BackOff quotaBackoff;
private static final ExecutorService executor =
Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setNameFormat("BeamBQRetryManager-%d").build());
Expand All @@ -61,7 +63,9 @@ enum RetryType {
// The in-flight operations will not be retried.
DONT_RETRY,
// All operations will be retried.
RETRY_ALL_OPERATIONS
RETRY_ALL_OPERATIONS,
// Retry operations due to a quota error. Tells RetryManager to wait longer between retries
RETRY_QUOTA
};

static class WrappedFailure extends Throwable {
Expand All @@ -85,6 +89,13 @@ Object getResult() {
.withMaxBackoff(maxBackoff)
.withMaxRetries(maxRetries)
.backoff();

quotaBackoff =
FluentBackoff.DEFAULT
.withInitialBackoff(initialBackoff.multipliedBy(5))
.withMaxBackoff(maxBackoff.multipliedBy(3))
.withMaxRetries(maxRetries)
.backoff();
}

RetryManager(
Expand All @@ -97,6 +108,14 @@ Object getResult() {
.withMaxRetries(maxRetries)
.withThrottledTimeCounter(throttledTimeCounter)
ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved
.backoff();

quotaBackoff =
FluentBackoff.DEFAULT
.withInitialBackoff(initialBackoff.multipliedBy(5))
.withMaxBackoff(maxBackoff.multipliedBy(3))
.withMaxRetries(maxRetries)
.withThrottledTimeCounter(throttledTimeCounter)
.backoff();
}

static class Operation<ResultT, ContextT extends Context<ResultT>> {
Expand Down Expand Up @@ -313,10 +332,9 @@ void await() throws Exception {
if (retryType == RetryType.DONT_RETRY) {
operations.clear();
} else {
checkState(RetryType.RETRY_ALL_OPERATIONS == retryType);
if (!BackOffUtils.next(Sleeper.DEFAULT, backoff)) {
throw new RuntimeException(failure);
}
sleepOrFail(
retryType == RetryType.RETRY_ALL_OPERATIONS ? backoff : quotaBackoff, failure);

for (Operation<ResultT, ?> awaitOperation : operations) {
awaitOperation.await();
}
Expand All @@ -330,4 +348,11 @@ void await() throws Exception {
}
}
}

private void sleepOrFail(BackOff backoff, @Nullable Throwable failure)
throws IOException, InterruptedException {
if (!BackOffUtils.next(Sleeper.DEFAULT, backoff)) {
throw new RuntimeException(failure);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,9 @@ long flush(
.inc(numRowsRetried);

appendFailures.inc();
if (quotaError) {
return RetryType.RETRY_QUOTA;
}
return RetryType.RETRY_ALL_OPERATIONS;
},
c -> {
Expand Down
Loading