Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix reporting metrics not supported warning for BigQueryIO Direct read when throttled #31096

Merged
merged 1 commit into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,14 @@ interface StorageClient extends AutoCloseable {
/* This method variant collects request count metric, using the fullTableID metadata. */
SplitReadStreamResponse splitReadStream(SplitReadStreamRequest request, String fullTableId);

/**
* Call this method on Work Item thread to report outstanding metrics.
*
* <p>Because incrementing metrics is only supported on the execution thread, callback thread
* that has pending metrics cannot report it directly.
*/
default void reportPendingMetrics() {}

/**
* Close the client object.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1653,13 +1653,30 @@ public void cancel() {

static class StorageClientImpl implements StorageClient {

public final Counter throttlingMsecs =
Metrics.counter(StorageClientImpl.class, "throttling-msecs");

private transient long unreportedDelay = 0L;

private void addToPendingMetrics(long delay) {
unreportedDelay += delay;
}

@Override
public void reportPendingMetrics() {
long delay = unreportedDelay;
unreportedDelay = 0L;

if (delay > 0) {
throttlingMsecs.inc(delay);
}
}

// If client retries ReadRows requests due to RESOURCE_EXHAUSTED error, bump
// throttlingMsecs according to delay. Runtime can use this information for
// autoscaling decisions.
@VisibleForTesting
public static class RetryAttemptCounter implements BigQueryReadSettings.RetryAttemptListener {
public final Counter throttlingMsecs =
Metrics.counter(StorageClientImpl.class, "throttling-msecs");
class RetryAttemptCounter implements BigQueryReadSettings.RetryAttemptListener {

@SuppressWarnings("ProtoDurationGetSecondsGetNano")
@Override
Expand All @@ -1673,7 +1690,7 @@ public void onRetryAttempt(Status status, Metadata metadata) {
long delay =
retryInfo.getRetryDelay().getSeconds() * 1000
+ retryInfo.getRetryDelay().getNanos() / 1000000;
throttlingMsecs.inc(delay);
addToPendingMetrics(delay);
}
}
}
Expand All @@ -1685,15 +1702,19 @@ public void onRetryAttempt(Status status, Metadata metadata) {

private final BigQueryReadClient client;

private StorageClientImpl(BigQueryOptions options) throws IOException {
private final RetryAttemptCounter listener;

@VisibleForTesting
StorageClientImpl(BigQueryOptions options) throws IOException {
listener = new RetryAttemptCounter();
BigQueryReadSettings.Builder settingsBuilder =
BigQueryReadSettings.newBuilder()
.setCredentialsProvider(FixedCredentialsProvider.create(options.getGcpCredential()))
.setTransportChannelProvider(
BigQueryReadSettings.defaultGrpcTransportProviderBuilder()
.setHeaderProvider(USER_AGENT_HEADER_PROVIDER)
.build())
.setReadRowsRetryAttemptListener(new RetryAttemptCounter());
.setReadRowsRetryAttemptListener(listener);

UnaryCallSettings.Builder<CreateReadSessionRequest, ReadSession> createReadSessionSettings =
settingsBuilder.getStubSettingsBuilder().createReadSessionSettings();
Expand Down Expand Up @@ -1723,6 +1744,11 @@ private StorageClientImpl(BigQueryOptions options) throws IOException {
this.client = BigQueryReadClient.create(settingsBuilder.build());
}

@VisibleForTesting
RetryAttemptCounter getListener() {
return listener;
}

// Since BigQueryReadClient client's methods are final they cannot be mocked with Mockito for
// testing
// So this wrapper method can be mocked in tests, instead.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,11 @@ public synchronized boolean advance() throws IOException {
private synchronized boolean readNextRecord() throws IOException {
Iterator<ReadRowsResponse> responseIterator = this.responseIterator;
while (reader.readyForNextReadResponse()) {
if (!responseIterator.hasNext()) {
// hasNext call has internal retry. Record throttling metrics after called
boolean hasNext = responseIterator.hasNext();
storageClient.reportPendingMetrics();

if (!hasNext) {
fractionConsumed = 1d;
return false;
}
Expand Down Expand Up @@ -385,6 +389,7 @@ public synchronized BigQueryStorageStreamSource<T> getCurrentSource() {
// the SplitReadStream validation logic depends. Removing it will cause incorrect
// split operations to succeed.
newResponseIterator.hasNext();
storageClient.reportPendingMetrics();
} catch (FailedPreconditionException e) {
// The current source has already moved past the split point, so this split attempt
// is unsuccessful.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2027,9 +2027,11 @@ public Object getTransportCode() {
}

@Test
public void testRetryAttemptCounter() {
BigQueryServicesImpl.StorageClientImpl.RetryAttemptCounter counter =
new BigQueryServicesImpl.StorageClientImpl.RetryAttemptCounter();
public void testRetryAttemptCounter() throws IOException {
BigQueryServicesImpl.StorageClientImpl impl =
new BigQueryServicesImpl.StorageClientImpl(
PipelineOptionsFactory.create().as(BigQueryOptions.class));
BigQueryServicesImpl.StorageClientImpl.RetryAttemptCounter counter = impl.getListener();

RetryInfo retryInfo =
RetryInfo.newBuilder()
Expand Down Expand Up @@ -2071,19 +2073,23 @@ public RetryInfo parseBytes(byte[] serialized) {

// Nulls don't bump the counter.
counter.onRetryAttempt(null, null);
impl.reportPendingMetrics();
assertEquals(0, (long) container.getCounter(metricName).getCumulative());

// Resource exhausted with empty metadata doesn't bump the counter.
counter.onRetryAttempt(
Status.RESOURCE_EXHAUSTED.withDescription("You have consumed some quota"), new Metadata());
impl.reportPendingMetrics();
assertEquals(0, (long) container.getCounter(metricName).getCumulative());

// Resource exhausted with retry info bumps the counter.
counter.onRetryAttempt(Status.RESOURCE_EXHAUSTED.withDescription("Stop for a while"), metadata);
impl.reportPendingMetrics();
assertEquals(123456, (long) container.getCounter(metricName).getCumulative());

// Other errors with retry info doesn't bump the counter.
counter.onRetryAttempt(Status.UNAVAILABLE.withDescription("Server is gone"), metadata);
impl.reportPendingMetrics();
assertEquals(123456, (long) container.getCounter(metricName).getCumulative());
}
}
Loading