From b518d6851429db640e80ab78b3f827bb334f5409 Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Mon, 1 May 2023 17:26:41 -0400 Subject: [PATCH] feat: add APIs to enable batch write flow control (#1730) * feat: add APIs to enable batch write flow control * address comments --- .../data/v2/BigtableDataSettings.java | 32 +++++++++++++++++++ .../v2/stub/BigtableBatchingCallSettings.java | 27 ++++++++++++++++ .../data/v2/stub/EnhancedBigtableStub.java | 8 +++-- .../data/v2/BigtableDataSettingsTest.java | 1 + .../BigtableBatchingCallSettingsTest.java | 12 ++++++- 5 files changed, 77 insertions(+), 3 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java index 8579c0e4cd..a887097485 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java @@ -16,6 +16,7 @@ package com.google.cloud.bigtable.data.v2; import com.google.api.core.BetaApi; +import com.google.api.core.InternalApi; import com.google.api.gax.batching.Batcher; import com.google.api.gax.batching.FlowController; import com.google.api.gax.core.CredentialsProvider; @@ -278,6 +279,15 @@ public Long getBatchMutationsTargetRpcLatencyMs() { return stubSettings.bulkMutateRowsSettings().getTargetRpcLatencyMs(); } + /** + * Gets if flow control is enabled for {@link BigtableDataClient#newBulkMutationBatcher(String)} + * based on the load of the Bigtable server. + */ + @InternalApi("Intended for use by the Bigtable dataflow connectors only") + public boolean isBulkMutationFlowControlEnabled() { + return stubSettings.bulkMutateRowsSettings().isServerInitiatedFlowControlEnabled(); + } + /** Returns the underlying RPC settings. */ public EnhancedBigtableStubSettings getStubSettings() { return stubSettings; @@ -505,6 +515,28 @@ public Long getTargetRpcLatencyMsForBatchMutation() { return stubSettings.bulkMutateRowsSettings().getTargetRpcLatencyMs(); } + /** + * Configure flow control for {@link BigtableDataClient#newBulkMutationBatcher(String)} based on + * the current load on the Bigtable cluster. + * + *

This is different from the {@link FlowController} that's always enabled on batch reads and + * batch writes, which limits the number of outstanding requests to the Bigtable server. + */ + @InternalApi("Intended for use by the Bigtable dataflow connectors only") + public Builder setBulkMutationFlowControl(boolean isEnableFlowControl) { + stubSettings.bulkMutateRowsSettings().setServerInitiatedFlowControl(isEnableFlowControl); + return this; + } + + /** + * Gets if flow control is enabled for {@link BigtableDataClient#newBulkMutationBatcher(String)} + * based on the load of the Bigtable server. + */ + @InternalApi("Intended for use by the Bigtable dataflow connectors only") + public boolean isBulkMutationFlowControlEnabled() { + return stubSettings.bulkMutateRowsSettings().isServerInitiatedFlowControlEnabled(); + } + /** * Returns the underlying settings for making RPC calls. The settings should be changed with * care. diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableBatchingCallSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableBatchingCallSettings.java index 09e657ac07..21f837f87f 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableBatchingCallSettings.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableBatchingCallSettings.java @@ -16,6 +16,7 @@ package com.google.cloud.bigtable.data.v2.stub; import com.google.api.core.BetaApi; +import com.google.api.core.InternalApi; import com.google.api.gax.batching.BatchingCallSettings; import com.google.api.gax.batching.BatchingDescriptor; import com.google.api.gax.batching.BatchingSettings; @@ -66,6 +67,8 @@ public final class BigtableBatchingCallSettings extends UnaryCallSettings batchingDescriptor) { return new Builder(batchingDescriptor); @@ -130,6 +140,7 @@ public String toString() { .add("isLatencyBasedThrottlingEnabled", isLatencyBasedThrottlingEnabled) .add("targetRpcLatency", targetRpcLatencyMs) .add("dynamicFlowControlSettings", dynamicFlowControlSettings) + .add("isServerInitiatedFlowControlEnabled", isServerInitiatedFlowControlEnabled) .toString(); } @@ -145,6 +156,8 @@ public static class Builder extends UnaryCallSettings.Builder batchingDescriptor) { @@ -159,6 +172,7 @@ private Builder(@Nonnull BigtableBatchingCallSettings settings) { this.isLatencyBasedThrottlingEnabled = settings.isLatencyBasedThrottlingEnabled(); this.targetRpcLatencyMs = settings.getTargetRpcLatencyMs(); this.dynamicFlowControlSettings = settings.getDynamicFlowControlSettings(); + this.isServerInitiatedFlowControlEnabled = settings.isServerInitiatedFlowControlEnabled(); } /** Sets the batching settings with various thresholds. */ @@ -263,6 +277,19 @@ DynamicFlowControlSettings getDynamicFlowControlSettings() { return this.dynamicFlowControlSettings; } + /** Configure flow control based on the current load of the Bigtable server. */ + @InternalApi("Intended for use by the Bigtable dataflow connectors only") + public Builder setServerInitiatedFlowControl(boolean isEnable) { + this.isServerInitiatedFlowControlEnabled = isEnable; + return this; + } + + /** Gets if flow control is enabled based on the load of the Bigtable server. */ + @InternalApi("Intended for use by the Bigtable dataflow connectors only") + public boolean isServerInitiatedFlowControlEnabled() { + return this.isServerInitiatedFlowControlEnabled; + } + /** Builds the {@link BigtableBatchingCallSettings} object with provided configuration. */ @Override public BigtableBatchingCallSettings build() { diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index 820dc7c652..c46539cddf 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -733,15 +733,19 @@ public Map extract(MutateRowsRequest mutateRowsRequest) { .build(), settings.bulkMutateRowsSettings().getRetryableCodes()); - ServerStreamingCallable withStatsHeaders = + ServerStreamingCallable callable = new StatsHeadersServerStreamingCallable<>(base); + if (settings.bulkMutateRowsSettings().isServerInitiatedFlowControlEnabled()) { + callable = new RateLimitingServerStreamingCallable(callable); + } + // Sometimes MutateRows connections are disconnected via an RST frame. This error is transient // and // should be treated similar to UNAVAILABLE. However, this exception has an INTERNAL error code // which by default is not retryable. Convert the exception so it can be retried in the client. ServerStreamingCallable convertException = - new ConvertExceptionCallable<>(withStatsHeaders); + new ConvertExceptionCallable<>(callable); RetryAlgorithm retryAlgorithm = new RetryAlgorithm<>( diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataSettingsTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataSettingsTest.java index de4fbe92fc..6b8d3f9c51 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataSettingsTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataSettingsTest.java @@ -36,6 +36,7 @@ public void testToString() { // disable channel priming so we won't need authentication // for sending the prime request since we're only testing the settings. .setRefreshingChannel(false) + .setBulkMutationFlowControl(true) .build(); EnhancedBigtableStubSettings stubSettings = settings.getStubSettings(); assertThat(settings.toString()) diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableBatchingCallSettingsTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableBatchingCallSettingsTest.java index 488805f60c..3337e12b6d 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableBatchingCallSettingsTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableBatchingCallSettingsTest.java @@ -96,6 +96,14 @@ public void testBuilder() { assertThat(settings.getDynamicFlowControlSettings()).isNotNull(); verifyFlowControlSettingWhenLatencyBasedThrottlingDisabled( settings.getDynamicFlowControlSettings()); + + builder.setServerInitiatedFlowControl(true); + settings = builder.build(); + assertThat(settings.isServerInitiatedFlowControlEnabled()).isTrue(); + + builder.setServerInitiatedFlowControl(false); + settings = builder.build(); + assertThat(settings.isServerInitiatedFlowControlEnabled()).isFalse(); } @Test @@ -108,7 +116,8 @@ public void testBuilderFromSettings() { .setBatchingSettings(BATCHING_SETTINGS) .setRetryableCodes(StatusCode.Code.UNAVAILABLE, StatusCode.Code.UNAUTHENTICATED) .setRetrySettings(retrySettings) - .enableLatencyBasedThrottling(10L); + .enableLatencyBasedThrottling(10L) + .setServerInitiatedFlowControl(true); BigtableBatchingCallSettings settings = builder.build(); BigtableBatchingCallSettings.Builder newBuilder = settings.toBuilder(); @@ -122,6 +131,7 @@ public void testBuilderFromSettings() { assertThat(newBuilder.getDynamicFlowControlSettings()).isNotNull(); verifyFlowControlSettingWhenLatencyBasedThrottlingEnabled( newBuilder.getDynamicFlowControlSettings()); + assertThat(newBuilder.isServerInitiatedFlowControlEnabled()).isTrue(); } @Test