From 1f91ae7fa2100a05f969a7429cb619a2b8b42dee Mon Sep 17 00:00:00 2001 From: Phong Chuong <147636638+PhongChuong@users.noreply.github.com> Date: Wed, 24 Jul 2024 16:02:46 -0400 Subject: [PATCH] feat: Add ability to specify RetryOptions and BigQueryRetryConfig when create job and waitFor (#3398) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: initial implementation of customizable BigQueryRetryConfig * Add unit and integration tests * Fix lint issues * Revert unintentional changes to testQueryStatistics in ITBigQueryTest * Revert unintentional changes to testQueryStatistics in ITBigQueryTest * Revert unintentional changes to testQueryStatistics in ITBigQueryTest * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Add additional comments to Job.waitFor() for BigQueryRetryConfig --------- Co-authored-by: Owl Bot --- .../com/google/cloud/bigquery/BigQuery.java | 12 +- .../google/cloud/bigquery/BigQueryImpl.java | 19 +- .../java/com/google/cloud/bigquery/Job.java | 51 +++++- .../cloud/bigquery/spi/v2/BigQueryRpc.java | 4 +- .../cloud/bigquery/BigQueryImplTest.java | 114 ++++++++++++ .../com/google/cloud/bigquery/JobTest.java | 170 ++++++++++++++++-- .../cloud/bigquery/it/ITBigQueryTest.java | 21 +++ 7 files changed, 370 insertions(+), 21 deletions(-) diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQuery.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQuery.java index 80fd6618d..e391c054d 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQuery.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQuery.java @@ -561,7 +561,7 @@ public static JobListOption fields(JobField... fields) { /** Class for specifying table get and create options. */ class JobOption extends Option { - private static final long serialVersionUID = -3111736712316353665L; + private static final long serialVersionUID = -3111736712316353664L; private JobOption(BigQueryRpc.Option option, Object value) { super(option, value); @@ -578,6 +578,16 @@ public static JobOption fields(JobField... fields) { return new JobOption( BigQueryRpc.Option.FIELDS, Helper.selector(JobField.REQUIRED_FIELDS, fields)); } + + /** Returns an option to specify the job's BigQuery retry configuration. */ + public static JobOption bigQueryRetryConfig(BigQueryRetryConfig bigQueryRetryConfig) { + return new JobOption(BigQueryRpc.Option.BIGQUERY_RETRY_CONFIG, bigQueryRetryConfig); + } + + /** Returns an option to specify the job's retry options. */ + public static JobOption retryOptions(RetryOption... options) { + return new JobOption(BigQueryRpc.Option.RETRY_OPTIONS, options); + } } /** Class for specifying query results options. */ diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java index acfa1b7f1..576083215 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java @@ -39,6 +39,7 @@ import com.google.cloud.Policy; import com.google.cloud.RetryHelper; import com.google.cloud.RetryHelper.RetryHelperException; +import com.google.cloud.RetryOption; import com.google.cloud.Tuple; import com.google.cloud.bigquery.InsertAllRequest.RowToInsert; import com.google.cloud.bigquery.QueryJobConfiguration.JobCreationMode; @@ -415,10 +416,15 @@ public com.google.api.services.bigquery.model.Job call() { } } }, - getOptions().getRetrySettings(), + getRetryOptions(optionsMap) != null + ? RetryOption.mergeToSettings( + getOptions().getRetrySettings(), getRetryOptions(optionsMap)) + : getOptions().getRetrySettings(), BigQueryBaseService.BIGQUERY_EXCEPTION_HANDLER, getOptions().getClock(), - DEFAULT_RETRY_CONFIG)); + getBigQueryRetryConfig(optionsMap) != null + ? getBigQueryRetryConfig(optionsMap) + : DEFAULT_RETRY_CONFIG)); } catch (BigQueryRetryHelper.BigQueryRetryHelperException e) { throw BigQueryException.translateAndThrow(e); } @@ -1628,4 +1634,13 @@ public com.google.api.services.bigquery.model.TestIamPermissionsResponse call() } return optionMap; } + + static BigQueryRetryConfig getBigQueryRetryConfig(Map options) { + return (BigQueryRetryConfig) + options.getOrDefault(BigQueryRpc.Option.BIGQUERY_RETRY_CONFIG, null); + } + + static RetryOption[] getRetryOptions(Map options) { + return (RetryOption[]) options.getOrDefault(BigQueryRpc.Option.RETRY_OPTIONS, null); + } } diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/Job.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/Job.java index d23e4ea52..793b25687 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/Job.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/Job.java @@ -196,12 +196,21 @@ public boolean isDone() { Job job = bigquery.getJob(getJobId(), JobOption.fields(BigQuery.JobField.STATUS)); return job == null || JobStatus.State.DONE.equals(job.getStatus().getState()); } + + /** See {@link #waitFor(BigQueryRetryConfig, RetryOption...)} */ + public Job waitFor(RetryOption... waitOptions) throws InterruptedException { + return waitForInternal(DEFAULT_RETRY_CONFIG, waitOptions); + } + /** * Blocks until this job completes its execution, either failing or succeeding. This method * returns current job's latest information. If the job no longer exists, this method returns * {@code null}. By default, the job status is checked using jittered exponential backoff with 1 * second as an initial delay, 2.0 as a backoff factor, 1 minute as maximum delay between polls, - * 12 hours as a total timeout and unlimited number of attempts. + * 12 hours as a total timeout and unlimited number of attempts. For query jobs, the job status + * check can be configured to retry on specific BigQuery error messages using {@link + * BigQueryRetryConfig}. This {@link BigQueryRetryConfig} configuration is not available for + * non-query jobs. * *

Example usage of {@code waitFor()}. * @@ -232,18 +241,46 @@ public boolean isDone() { * } * } * + *

Example usage of {@code waitFor()} with BigQuery retry configuration to retry on rate limit + * exceeded error messages for query jobs. + * + *

{@code
+   * Job completedJob =
+   *     job.waitFor(
+   *             BigQueryRetryConfig.newBuilder()
+   *                 .retryOnMessage(BigQueryErrorMessages.RATE_LIMIT_EXCEEDED_MSG)
+   *                 .retryOnMessage(BigQueryErrorMessages.JOB_RATE_LIMIT_EXCEEDED_MSG)
+   *                 .retryOnRegEx(BigQueryErrorMessages.RetryRegExPatterns.RATE_LIMIT_EXCEEDED_REGEX)
+   *                 .build());
+   * if (completedJob == null) {
+   *   // job no longer exists
+   * } else if (completedJob.getStatus().getError() != null) {
+   *   // job failed, handle error
+   * } else {
+   *   // job completed successfully
+   * }
+   * }
+ * + * @param bigQueryRetryConfig configures retries for query jobs for BigQuery failures * @param waitOptions options to configure checking period and timeout * @throws BigQueryException upon failure, check {@link BigQueryException#getCause()} for details * @throws InterruptedException if the current thread gets interrupted while waiting for the job * to complete */ - public Job waitFor(RetryOption... waitOptions) throws InterruptedException { + public Job waitFor(BigQueryRetryConfig bigQueryRetryConfig, RetryOption... waitOptions) + throws InterruptedException { + return waitForInternal(bigQueryRetryConfig, waitOptions); + } + + private Job waitForInternal(BigQueryRetryConfig bigQueryRetryConfig, RetryOption... waitOptions) + throws InterruptedException { checkNotDryRun("waitFor"); Object completedJobResponse; if (getConfiguration().getType() == Type.QUERY) { completedJobResponse = waitForQueryResults( RetryOption.mergeToSettings(DEFAULT_JOB_WAIT_SETTINGS, waitOptions), + bigQueryRetryConfig, DEFAULT_QUERY_WAIT_OPTIONS); } else { completedJobResponse = @@ -294,7 +331,9 @@ public TableResult getQueryResults(QueryResultsOption... options) QueryResponse response = waitForQueryResults( - DEFAULT_JOB_WAIT_SETTINGS, waitOptions.toArray(new QueryResultsOption[0])); + DEFAULT_JOB_WAIT_SETTINGS, + DEFAULT_RETRY_CONFIG, + waitOptions.toArray(new QueryResultsOption[0])); // Get the job resource to determine if it has errored. Job job = this; @@ -334,7 +373,9 @@ public TableResult getQueryResults(QueryResultsOption... options) } private QueryResponse waitForQueryResults( - RetrySettings retrySettings, final QueryResultsOption... resultsOptions) + RetrySettings retrySettings, + BigQueryRetryConfig bigQueryRetryConfig, + final QueryResultsOption... resultsOptions) throws InterruptedException { if (getConfiguration().getType() != Type.QUERY) { throw new UnsupportedOperationException( @@ -360,7 +401,7 @@ public boolean shouldRetry( } }, options.getClock(), - DEFAULT_RETRY_CONFIG); + bigQueryRetryConfig); } catch (BigQueryRetryHelper.BigQueryRetryHelperException e) { throw BigQueryException.translateAndThrow(e); } diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/spi/v2/BigQueryRpc.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/spi/v2/BigQueryRpc.java index 341640919..57f1a05c0 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/spi/v2/BigQueryRpc.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/spi/v2/BigQueryRpc.java @@ -57,7 +57,9 @@ enum Option { STATE_FILTER("stateFilter"), TIMEOUT("timeoutMs"), REQUESTED_POLICY_VERSION("requestedPolicyVersion"), - TABLE_METADATA_VIEW("view"); + TABLE_METADATA_VIEW("view"), + RETRY_OPTIONS("retryOptions"), + BIGQUERY_RETRY_CONFIG("bigQueryRetryConfig"); private final String value; diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java index d3d374006..88b8f6dbf 100644 --- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java +++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java @@ -27,6 +27,7 @@ import com.google.api.services.bigquery.model.*; import com.google.api.services.bigquery.model.JobStatistics; import com.google.cloud.Policy; +import com.google.cloud.RetryOption; import com.google.cloud.ServiceOptions; import com.google.cloud.Tuple; import com.google.cloud.bigquery.BigQuery.JobOption; @@ -1594,6 +1595,119 @@ public void testCreateJobFailureShouldRetry() { verify(bigqueryRpcMock, times(6)).create(jobCapture.capture(), eq(EMPTY_RPC_OPTIONS)); } + @Test + public void testCreateJobWithBigQueryRetryConfigFailureShouldRetry() { + // Validate create job with BigQueryRetryConfig that retries on rate limit error message. + JobOption bigQueryRetryConfigOption = + JobOption.bigQueryRetryConfig( + BigQueryRetryConfig.newBuilder() + .retryOnMessage(BigQueryErrorMessages.RATE_LIMIT_EXCEEDED_MSG) + .retryOnMessage(BigQueryErrorMessages.JOB_RATE_LIMIT_EXCEEDED_MSG) + .retryOnRegEx(BigQueryErrorMessages.RetryRegExPatterns.RATE_LIMIT_EXCEEDED_REGEX) + .build()); + + Map bigQueryRpcOptions = optionMap(bigQueryRetryConfigOption); + when(bigqueryRpcMock.create(jobCapture.capture(), eq(bigQueryRpcOptions))) + .thenThrow( + new BigQueryException( + 400, RATE_LIMIT_ERROR_MSG)) // retrial on based on RATE_LIMIT_EXCEEDED_MSG + .thenThrow(new BigQueryException(200, RATE_LIMIT_ERROR_MSG)) + .thenReturn(newJobPb()); + + bigquery = options.getService(); + bigquery = + options + .toBuilder() + .setRetrySettings(ServiceOptions.getDefaultRetrySettings()) + .build() + .getService(); + + ((BigQueryImpl) bigquery) + .create(JobInfo.of(QUERY_JOB_CONFIGURATION_FOR_DMLQUERY), bigQueryRetryConfigOption); + verify(bigqueryRpcMock, times(3)).create(jobCapture.capture(), eq(bigQueryRpcOptions)); + } + + @Test + public void testCreateJobWithBigQueryRetryConfigFailureShouldNotRetry() { + // Validate create job with BigQueryRetryConfig that does not retry on rate limit error message. + JobOption bigQueryRetryConfigOption = + JobOption.bigQueryRetryConfig(BigQueryRetryConfig.newBuilder().build()); + + Map bigQueryRpcOptions = optionMap(bigQueryRetryConfigOption); + when(bigqueryRpcMock.create(jobCapture.capture(), eq(bigQueryRpcOptions))) + .thenThrow(new BigQueryException(400, RATE_LIMIT_ERROR_MSG)); + + bigquery = options.getService(); + bigquery = + options + .toBuilder() + .setRetrySettings(ServiceOptions.getDefaultRetrySettings()) + .build() + .getService(); + + try { + ((BigQueryImpl) bigquery) + .create(JobInfo.of(QUERY_JOB_CONFIGURATION_FOR_DMLQUERY), bigQueryRetryConfigOption); + fail("JobException expected"); + } catch (BigQueryException e) { + assertNotNull(e.getMessage()); + } + // Verify that getQueryResults is attempted only once and not retried since the error message + // does not match. + verify(bigqueryRpcMock, times(1)).create(jobCapture.capture(), eq(bigQueryRpcOptions)); + } + + @Test + public void testCreateJobWithRetryOptionsFailureShouldRetry() { + // Validate create job with RetryOptions. + JobOption retryOptions = JobOption.retryOptions(RetryOption.maxAttempts(4)); + Map bigQueryRpcOptions = optionMap(retryOptions); + when(bigqueryRpcMock.create(jobCapture.capture(), eq(bigQueryRpcOptions))) + .thenThrow(new BigQueryException(500, "InternalError")) + .thenThrow(new BigQueryException(502, "Bad Gateway")) + .thenThrow(new BigQueryException(503, "Service Unavailable")) + .thenReturn(newJobPb()); + + bigquery = options.getService(); + bigquery = + options + .toBuilder() + .setRetrySettings(ServiceOptions.getDefaultRetrySettings()) + .build() + .getService(); + + ((BigQueryImpl) bigquery) + .create(JobInfo.of(QUERY_JOB_CONFIGURATION_FOR_DMLQUERY), retryOptions); + verify(bigqueryRpcMock, times(4)).create(jobCapture.capture(), eq(bigQueryRpcOptions)); + } + + @Test + public void testCreateJobWithRetryOptionsFailureShouldNotRetry() { + // Validate create job with RetryOptions that only attempts once (no retry). + JobOption retryOptions = JobOption.retryOptions(RetryOption.maxAttempts(1)); + Map bigQueryRpcOptions = optionMap(retryOptions); + when(bigqueryRpcMock.create(jobCapture.capture(), eq(bigQueryRpcOptions))) + .thenThrow(new BigQueryException(500, "InternalError")) + .thenReturn(newJobPb()); + + bigquery = options.getService(); + bigquery = + options + .toBuilder() + .setRetrySettings(ServiceOptions.getDefaultRetrySettings()) + .build() + .getService(); + + try { + ((BigQueryImpl) bigquery) + .create(JobInfo.of(QUERY_JOB_CONFIGURATION_FOR_DMLQUERY), retryOptions); + fail("JobException expected"); + } catch (BigQueryException e) { + assertNotNull(e.getMessage()); + } + verify(bigqueryRpcMock, times(1)).create(jobCapture.capture(), eq(bigQueryRpcOptions)); + } + @Test public void testCreateJobWithSelectedFields() { when(bigqueryRpcMock.create( diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/JobTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/JobTest.java index d10203444..396bb754a 100644 --- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/JobTest.java +++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/JobTest.java @@ -65,6 +65,10 @@ public class JobTest { CopyStatistics.newBuilder().setCreationTimestamp(1L).setEndTime(3L).setStartTime(2L).build(); private static final CopyJobConfiguration COPY_CONFIGURATION = CopyJobConfiguration.of(TABLE_ID1, TABLE_ID2); + private static final QueryJobConfiguration DDL_QUERY_CONFIGURATION = + QueryJobConfiguration.newBuilder("CREATE VIEW").setDestinationTable(TABLE_ID1).build(); + private static final QueryJobConfiguration DRL_QUERY_CONFIGURATION = + QueryJobConfiguration.newBuilder("SELECT 1").setDestinationTable(TABLE_ID1).build(); private static final JobInfo JOB_INFO = JobInfo.newBuilder(COPY_CONFIGURATION) .setJobId(JOB_ID) @@ -85,6 +89,11 @@ public class JobTest { RetryOption.retryDelayMultiplier(1.0) }; + private static final BigQueryRetryConfig TEST_BIGQUERY_RETRY_CONFIG = + BigQueryRetryConfig.newBuilder() + .retryOnMessage(BigQueryErrorMessages.RATE_LIMIT_EXCEEDED_MSG) + .build(); + @Rule public MockitoRule rule; private BigQuery bigquery; @@ -191,8 +200,6 @@ public void testWaitFor() throws InterruptedException { @Test public void testWaitForAndGetQueryResultsEmpty() throws InterruptedException { - QueryJobConfiguration jobConfig = - QueryJobConfiguration.newBuilder("CREATE VIEW").setDestinationTable(TABLE_ID1).build(); QueryStatistics jobStatistics = QueryStatistics.newBuilder() .setCreationTimestamp(1L) @@ -200,7 +207,7 @@ public void testWaitForAndGetQueryResultsEmpty() throws InterruptedException { .setStartTime(2L) .build(); JobInfo jobInfo = - JobInfo.newBuilder(jobConfig) + JobInfo.newBuilder(DDL_QUERY_CONFIGURATION) .setJobId(JOB_ID) .setStatistics(jobStatistics) .setJobId(JOB_ID) @@ -228,7 +235,7 @@ public void testWaitForAndGetQueryResultsEmpty() throws InterruptedException { when(bigquery.getQueryResults(jobInfo.getJobId(), Job.DEFAULT_QUERY_WAIT_OPTIONS)) .thenReturn(completedQuery); when(bigquery.getJob(JOB_INFO.getJobId())).thenReturn(completedJob); - job = this.job.toBuilder().setConfiguration(jobConfig).build(); + job = this.job.toBuilder().setConfiguration(DDL_QUERY_CONFIGURATION).build(); assertThat(job.waitFor(TEST_RETRY_OPTIONS)).isSameInstanceAs(completedJob); assertThat(job.getQueryResults().iterateAll()).isEmpty(); verify(bigquery, times(2)).getQueryResults(jobInfo.getJobId(), Job.DEFAULT_QUERY_WAIT_OPTIONS); @@ -237,8 +244,6 @@ public void testWaitForAndGetQueryResultsEmpty() throws InterruptedException { @Test public void testWaitForAndGetQueryResultsEmptyWithSchema() throws InterruptedException { - QueryJobConfiguration jobConfig = - QueryJobConfiguration.newBuilder("CREATE VIEW").setDestinationTable(TABLE_ID1).build(); QueryStatistics jobStatistics = QueryStatistics.newBuilder() .setCreationTimestamp(1L) @@ -246,7 +251,7 @@ public void testWaitForAndGetQueryResultsEmptyWithSchema() throws InterruptedExc .setStartTime(2L) .build(); JobInfo jobInfo = - JobInfo.newBuilder(jobConfig) + JobInfo.newBuilder(DDL_QUERY_CONFIGURATION) .setJobId(JOB_ID) .setStatistics(jobStatistics) .setJobId(JOB_ID) @@ -274,7 +279,7 @@ public void testWaitForAndGetQueryResultsEmptyWithSchema() throws InterruptedExc when(bigquery.getJob(JOB_INFO.getJobId())).thenReturn(completedJob); when(bigquery.getQueryResults(jobInfo.getJobId(), Job.DEFAULT_QUERY_WAIT_OPTIONS)) .thenReturn(completedQuery); - job = this.job.toBuilder().setConfiguration(jobConfig).build(); + job = this.job.toBuilder().setConfiguration(DDL_QUERY_CONFIGURATION).build(); assertThat(job.waitFor(TEST_RETRY_OPTIONS)).isSameInstanceAs(completedJob); assertThat(job.getQueryResults().getSchema()) .isEqualTo(Schema.of(Field.of("field1", LegacySQLTypeName.BOOLEAN))); @@ -284,8 +289,6 @@ public void testWaitForAndGetQueryResultsEmptyWithSchema() throws InterruptedExc @Test public void testWaitForAndGetQueryResults() throws InterruptedException { - QueryJobConfiguration jobConfig = - QueryJobConfiguration.newBuilder("SELECT 1").setDestinationTable(TABLE_ID1).build(); QueryStatistics jobStatistics = QueryStatistics.newBuilder() .setCreationTimestamp(1L) @@ -293,7 +296,7 @@ public void testWaitForAndGetQueryResults() throws InterruptedException { .setStartTime(2L) .build(); JobInfo jobInfo = - JobInfo.newBuilder(jobConfig) + JobInfo.newBuilder(DRL_QUERY_CONFIGURATION) .setJobId(JOB_ID) .setStatistics(jobStatistics) .setJobId(JOB_ID) @@ -329,7 +332,7 @@ public void testWaitForAndGetQueryResults() throws InterruptedException { when(bigquery.getQueryResults(jobInfo.getJobId(), Job.DEFAULT_QUERY_WAIT_OPTIONS)) .thenReturn(completedQuery); when(bigquery.listTableData(eq(TABLE_ID1), any(Schema.class))).thenReturn(result); - job = this.job.toBuilder().setConfiguration(jobConfig).build(); + job = this.job.toBuilder().setConfiguration(DRL_QUERY_CONFIGURATION).build(); assertThat(job.waitFor(TEST_RETRY_OPTIONS)).isSameInstanceAs(completedJob); assertThat(job.getQueryResults().iterateAll()).hasSize(0); verify(bigquery, times(2)).getQueryResults(jobInfo.getJobId(), Job.DEFAULT_QUERY_WAIT_OPTIONS); @@ -406,6 +409,149 @@ public void testWaitForWithTimeout() throws InterruptedException { } } + @Test + public void testWaitForWithBigQueryRetryConfig() throws InterruptedException { + QueryStatistics jobStatistics = + QueryStatistics.newBuilder() + .setCreationTimestamp(1L) + .setEndTime(3L) + .setStartTime(2L) + .build(); + JobInfo jobInfo = + JobInfo.newBuilder(DRL_QUERY_CONFIGURATION) + .setJobId(JOB_ID) + .setStatistics(jobStatistics) + .setJobId(JOB_ID) + .setEtag(ETAG) + .setGeneratedId(GENERATED_ID) + .setSelfLink(SELF_LINK) + .setUserEmail(EMAIL) + .setStatus(JOB_STATUS) + .build(); + + when(bigquery.getOptions()).thenReturn(mockOptions); + when(mockOptions.getClock()).thenReturn(CurrentMillisClock.getDefaultClock()); + Job completedJob = + expectedJob.toBuilder().setStatus(new JobStatus(JobStatus.State.RUNNING)).build(); + QueryResponse completedQuery = + QueryResponse.newBuilder() + .setCompleted(true) + .setTotalRows(1) // Lies to force call of listTableData(). + .setSchema(Schema.of(Field.of("_f0", LegacySQLTypeName.INTEGER))) + .setErrors(ImmutableList.of()) + .build(); + + when(bigquery.getJob(JOB_INFO.getJobId())).thenReturn(completedJob); + when(bigquery.getQueryResults(jobInfo.getJobId(), Job.DEFAULT_QUERY_WAIT_OPTIONS)) + .thenReturn(completedQuery); + job = this.job.toBuilder().setConfiguration(DRL_QUERY_CONFIGURATION).build(); + assertThat(job.waitFor(TEST_BIGQUERY_RETRY_CONFIG, TEST_RETRY_OPTIONS)) + .isSameInstanceAs(completedJob); + verify(bigquery, times(1)).getQueryResults(jobInfo.getJobId(), Job.DEFAULT_QUERY_WAIT_OPTIONS); + verify(bigquery).getJob(JOB_INFO.getJobId()); + } + + @Test + public void testWaitForWithBigQueryRetryConfigShouldRetry() throws InterruptedException { + QueryStatistics jobStatistics = + QueryStatistics.newBuilder() + .setCreationTimestamp(1L) + .setEndTime(3L) + .setStartTime(2L) + .build(); + JobInfo jobInfo = + JobInfo.newBuilder(DRL_QUERY_CONFIGURATION) + .setJobId(JOB_ID) + .setStatistics(jobStatistics) + .setJobId(JOB_ID) + .setEtag(ETAG) + .setGeneratedId(GENERATED_ID) + .setSelfLink(SELF_LINK) + .setUserEmail(EMAIL) + .setStatus(JOB_STATUS) + .build(); + + when(bigquery.getOptions()).thenReturn(mockOptions); + when(mockOptions.getClock()).thenReturn(CurrentMillisClock.getDefaultClock()); + Job completedJob = + expectedJob.toBuilder().setStatus(new JobStatus(JobStatus.State.RUNNING)).build(); + QueryResponse completedQuery = + QueryResponse.newBuilder() + .setCompleted(true) + .setTotalRows(1) // Lies to force call of listTableData(). + .setSchema(Schema.of(Field.of("_f0", LegacySQLTypeName.INTEGER))) + .setErrors(ImmutableList.of()) + .build(); + + when(bigquery.getJob(JOB_INFO.getJobId())).thenReturn(completedJob); + BigQueryError bigQueryError = + new BigQueryError( + "testReasonRateLimitExceeded", "US", "testMessage: Exceeded rate limits:"); + + ImmutableList bigQueryErrorList = ImmutableList.of(bigQueryError); + BigQueryException bigQueryException = new BigQueryException(bigQueryErrorList); + when(bigquery.getQueryResults(jobInfo.getJobId(), Job.DEFAULT_QUERY_WAIT_OPTIONS)) + .thenThrow(bigQueryException) + .thenReturn(completedQuery); + job = this.job.toBuilder().setConfiguration(DRL_QUERY_CONFIGURATION).build(); + assertThat(job.waitFor(TEST_BIGQUERY_RETRY_CONFIG, TEST_RETRY_OPTIONS)) + .isSameInstanceAs(completedJob); + // Verify that getQueryResults is attempted twice. First during bigQueryException with "Exceeded + // rate limits" error message and the second successful attempt. + verify(bigquery, times(2)).getQueryResults(jobInfo.getJobId(), Job.DEFAULT_QUERY_WAIT_OPTIONS); + verify(bigquery).getJob(JOB_INFO.getJobId()); + } + + @Test + public void testWaitForWithBigQueryRetryConfigErrorShouldNotRetry() throws InterruptedException { + QueryStatistics jobStatistics = + QueryStatistics.newBuilder() + .setCreationTimestamp(1L) + .setEndTime(3L) + .setStartTime(2L) + .build(); + JobInfo jobInfo = + JobInfo.newBuilder(DRL_QUERY_CONFIGURATION) + .setJobId(JOB_ID) + .setStatistics(jobStatistics) + .setJobId(JOB_ID) + .setEtag(ETAG) + .setGeneratedId(GENERATED_ID) + .setSelfLink(SELF_LINK) + .setUserEmail(EMAIL) + .setStatus(JOB_STATUS) + .build(); + + when(bigquery.getOptions()).thenReturn(mockOptions); + when(mockOptions.getClock()).thenReturn(CurrentMillisClock.getDefaultClock()); + QueryResponse completedQuery = + QueryResponse.newBuilder() + .setCompleted(true) + .setTotalRows(1) // Lies to force call of listTableData(). + .setSchema(Schema.of(Field.of("_f0", LegacySQLTypeName.INTEGER))) + .setErrors(ImmutableList.of()) + .build(); + + BigQueryError bigQueryError = + new BigQueryError("testReasonRateLimitExceeded", "US", "testMessage: do not retry error"); + + ImmutableList bigQueryErrorList = ImmutableList.of(bigQueryError); + BigQueryException bigQueryException = new BigQueryException(bigQueryErrorList); + when(bigquery.getQueryResults(jobInfo.getJobId(), Job.DEFAULT_QUERY_WAIT_OPTIONS)) + .thenThrow(bigQueryException) + .thenReturn(completedQuery); + job = this.job.toBuilder().setConfiguration(DRL_QUERY_CONFIGURATION).build(); + try { + job.waitFor(TEST_BIGQUERY_RETRY_CONFIG, TEST_RETRY_OPTIONS); + fail("JobException expected"); + } catch (BigQueryException e) { + assertNotNull(e.getErrors()); + } + // Verify that getQueryResults is attempted only once and not retried since the error message + // does not match. + verify(bigquery, times(1)).getQueryResults(jobInfo.getJobId(), Job.DEFAULT_QUERY_WAIT_OPTIONS); + } + @Test public void testReload() { JobInfo updatedInfo = JOB_INFO.toBuilder().setEtag("etag").build(); diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java index b1352f512..63265d58a 100644 --- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java +++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java @@ -57,6 +57,7 @@ import com.google.cloud.bigquery.BigQueryException; import com.google.cloud.bigquery.BigQueryOptions; import com.google.cloud.bigquery.BigQueryResult; +import com.google.cloud.bigquery.BigQueryRetryConfig; import com.google.cloud.bigquery.BigQuerySQLException; import com.google.cloud.bigquery.CloneDefinition; import com.google.cloud.bigquery.Clustering; @@ -5256,6 +5257,26 @@ public void testCreateAndGetJob() throws InterruptedException, TimeoutException assertTrue(bigquery.delete(destinationTable)); } + @Test + public void testCreateJobAndWaitForWithRetryOptions() + throws InterruptedException, TimeoutException { + // Note: This only tests the non failure/retry case. For retry cases, see unit tests with mocked + // RPC calls. + QueryJobConfiguration config = + QueryJobConfiguration.newBuilder("SELECT CURRENT_TIMESTAMP() as ts") + .setDefaultDataset(DATASET) + .setUseLegacySql(false) + .build(); + + BigQueryRetryConfig bigQueryRetryConfig = BigQueryRetryConfig.newBuilder().build(); + JobOption bigQueryRetryConfigOption = JobOption.bigQueryRetryConfig(bigQueryRetryConfig); + JobOption retryOptions = JobOption.retryOptions(RetryOption.maxAttempts(1)); + + Job job = bigquery.create(JobInfo.of(config), bigQueryRetryConfigOption, retryOptions); + job = job.waitFor(bigQueryRetryConfig); + assertEquals(DONE, job.getStatus().getState()); + } + @Test public void testCreateAndGetJobWithSelectedFields() throws InterruptedException, TimeoutException {