From 6bbbabf6abc0b4c33c8cbb8c0f1632f6d83c86bb Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 26 Nov 2024 17:23:15 -0500 Subject: [PATCH 1/6] add dynamic dest test --- .../StorageApiSinkSchemaUpdateIT.java | 182 ++++++++++++++++-- 1 file changed, 167 insertions(+), 15 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaUpdateIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaUpdateIT.java index ae9e9cefb150..7fd5fe82d87e 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaUpdateIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaUpdateIT.java @@ -19,6 +19,8 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects.firstNonNull; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assume.assumeTrue; @@ -33,7 +35,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.Set; import java.util.function.Function; @@ -60,6 +64,7 @@ import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.joda.time.Duration; @@ -124,6 +129,8 @@ public static Iterable data() { private static final int TOTAL_N = 70; // Number of rows with the original schema private static final int ORIGINAL_N = 60; + // for dynamic destination test + private static final int NUM_DESTINATIONS = 3; private final Random randomGenerator = new Random(); @@ -145,6 +152,11 @@ public static void cleanUp() { } private String createTable(TableSchema tableSchema) throws IOException, InterruptedException { + return createTable(tableSchema, ""); + } + + private String createTable(TableSchema tableSchema, String suffix) + throws IOException, InterruptedException { String tableId = Iterables.get(Splitter.on('[').split(testName.getMethodName()), 0); if (useInputSchema) { tableId += "WithInputSchema"; @@ -152,6 +164,8 @@ private String createTable(TableSchema tableSchema) throws IOException, Interrup if (changeTableSchema) { tableId += "OnSchemaChange"; } + tableId += suffix; + BQ_CLIENT.deleteTable(PROJECT, BIG_QUERY_DATASET_ID, tableId); BQ_CLIENT.createNewTable( PROJECT, @@ -170,9 +184,8 @@ static class UpdateSchemaDoFn extends DoFn, TableRow> { private final String projectId; private final String datasetId; - private final String tableId; // represent as String because TableSchema is not serializable - private final String newSchema; + private final Map newSchemas; private transient BigqueryClient bqClient; @@ -183,11 +196,14 @@ static class UpdateSchemaDoFn extends DoFn, TableRow> { private final StateSpec> counter; public UpdateSchemaDoFn( - String projectId, String datasetId, String tableId, TableSchema newSchema) { + String projectId, String datasetId, Map newSchemas) { this.projectId = projectId; this.datasetId = datasetId; - this.tableId = tableId; - this.newSchema = BigQueryHelpers.toJsonString(newSchema); + Map serializableSchemas = new HashMap<>(); + for (Map.Entry entry : newSchemas.entrySet()) { + serializableSchemas.put(entry.getKey(), BigQueryHelpers.toJsonString(entry.getValue())); + } + this.newSchemas = serializableSchemas; this.bqClient = null; this.counter = StateSpecs.value(); } @@ -204,11 +220,13 @@ public void processElement(ProcessContext c, @StateId(ROW_COUNTER) ValueState entry : newSchemas.entrySet()) { + bqClient.updateTableSchema( + projectId, + datasetId, + entry.getKey(), + BigQueryHelpers.fromJsonString(entry.getValue(), TableSchema.class)); + } } counter.write(++current); @@ -394,7 +412,8 @@ private void runStreamingPipelineWithSchemaChange( .apply( "Update Schema", ParDo.of( - new UpdateSchemaDoFn(PROJECT, BIG_QUERY_DATASET_ID, tableId, updatedSchema))); + new UpdateSchemaDoFn( + PROJECT, BIG_QUERY_DATASET_ID, ImmutableMap.of(tableId, updatedSchema)))); } WriteResult result = rows.apply("Stream to BigQuery", write); if (useIgnoreUnknownValues) { @@ -494,13 +513,13 @@ public void checkRowsWithUpdatedSchema( if (Integer.parseInt((String) row.get("id")) < ORIGINAL_N || !useAutoSchemaUpdate || !changeTableSchema) { - assertTrue( + assertNull( String.format("Expected row to NOT have field %s:\n%s", extraField, row), - row.get(extraField) == null); + row.get(extraField)); } else { - assertTrue( + assertNotNull( String.format("Expected row to have field %s:\n%s", extraField, row), - row.get(extraField) != null); + row.get(extraField)); } } } @@ -539,4 +558,137 @@ public void testAtLeastOnceWithIgnoreUnknownValues() throws Exception { public void testAtLeastOnceWithAutoSchemaUpdate() throws Exception { runStreamingPipelineWithSchemaChange(Write.Method.STORAGE_API_AT_LEAST_ONCE, true, true); } + + @Test + public void testExactlyOnceDynamicDestinationsWithAutoSchemaUpdate() throws Exception { + Pipeline p = Pipeline.create(TestPipeline.testingPipelineOptions()); + // Set threshold bytes to 0 so that the stream attempts to fetch an updated schema after each + // append + p.getOptions().as(BigQueryOptions.class).setStorageApiAppendThresholdBytes(0); + // Limit parallelism so that all streams recognize the new schema in an expected short amount + // of time (before we start writing rows with updated schema) + p.getOptions().as(BigQueryOptions.class).setNumStorageWriteApiStreams(3); + // Need to manually enable streaming engine for legacy dataflow runner + ExperimentalOptions.addExperiment( + p.getOptions().as(ExperimentalOptions.class), GcpOptions.STREAMING_ENGINE_EXPERIMENT); + // Only run the most relevant test case on Dataflow + if (p.getOptions().getRunner().getName().contains("DataflowRunner")) { + assumeTrue( + "Skipping in favor of more relevant test case", changeTableSchema && useInputSchema); + } + + List fieldNamesOrigin = new ArrayList(Arrays.asList(FIELDS)); + + // Shuffle the fields in the write schema to do fuzz testing on field order + List fieldNamesShuffled = new ArrayList(fieldNamesOrigin); + Collections.shuffle(fieldNamesShuffled, randomGenerator); + TableSchema bqTableSchema = makeTableSchemaFromTypes(fieldNamesOrigin, null); + TableSchema inputSchema = makeTableSchemaFromTypes(fieldNamesShuffled, null); + + Map destinations = new HashMap<>(NUM_DESTINATIONS); + Map updatedSchemas = new HashMap<>(NUM_DESTINATIONS); + Map extraFields = new HashMap<>(NUM_DESTINATIONS); + Map rowFuncs = new HashMap<>(NUM_DESTINATIONS); + for (int i = 0; i < NUM_DESTINATIONS; i++) { + // The updated schema includes all fields in the original schema plus a random new field + List fieldNamesWithExtra = new ArrayList(fieldNamesOrigin); + String extraField = + fieldNamesOrigin.get(randomGenerator.nextInt(fieldNamesOrigin.size())) + "_EXTRA"; + fieldNamesWithExtra.add(extraField); + TableSchema updatedSchema = + makeTableSchemaFromTypes(fieldNamesWithExtra, ImmutableSet.of(extraField)); + GenerateRowFunc generateRowFunc = new GenerateRowFunc(fieldNamesOrigin, fieldNamesWithExtra); + + String tableId = createTable(bqTableSchema, "_dynamic_" + i); + String tableSpec = PROJECT + ":" + BIG_QUERY_DATASET_ID + "." + tableId; + + rowFuncs.put((long) i, generateRowFunc); + destinations.put((long) i, tableSpec); + updatedSchemas.put(tableId, updatedSchema); + extraFields.put(tableSpec, extraField); + } + + // build write transform + Write write = + BigQueryIO.writeTableRows() + .to( + row -> { + long l = (int) row.getValue().get("id") % NUM_DESTINATIONS; + String destination = destinations.get(l); + return new TableDestination(destination, null); + }) + .withAutoSchemaUpdate(true) + .ignoreUnknownValues() + .withMethod(Write.Method.STORAGE_WRITE_API) + .withTriggeringFrequency(Duration.standardSeconds(1)) + .withCreateDisposition(CreateDisposition.CREATE_NEVER) + .withWriteDisposition(WriteDisposition.WRITE_APPEND); + if (useInputSchema) { + write = write.withSchema(inputSchema); + } + + int numRows = TOTAL_N * NUM_DESTINATIONS; + // set up and build pipeline + Instant start = new Instant(0); + // We give a healthy waiting period between each element to give Storage API streams a chance to + // recognize the new schema. Apply on relevant tests. + Duration interval = changeTableSchema ? Duration.standardSeconds(1) : Duration.millis(1); + Duration stop = + changeTableSchema ? Duration.standardSeconds(numRows - 1) : Duration.millis(numRows - 1); + Function getIdFromInstant = + changeTableSchema + ? (Function & Serializable) + (Instant instant) -> instant.getMillis() / 1000 + : (Function & Serializable) Instant::getMillis; + + // Generates rows with original schema up for row IDs under ORIGINAL_N + // Then generates rows with updated schema for the rest + // Rows with updated schema should only reach the table if ignoreUnknownValues is set, + // and the extra field should be present only when autoSchemaUpdate is set + PCollection instants = + p.apply( + "Generate Instants", + PeriodicImpulse.create() + .startAt(start) + .stopAt(start.plus(stop)) + .withInterval(interval) + .catchUpToNow(false)); + PCollection rows = + instants.apply( + "Create TableRows", + MapElements.into(TypeDescriptor.of(TableRow.class)) + .via( + instant -> { + long rowId = getIdFromInstant.apply(instant); + long dest = rowId % NUM_DESTINATIONS; + return rowFuncs.get(dest).apply(rowId); + })); + if (changeTableSchema) { + rows = + rows + // UpdateSchemaDoFn uses state, so need to have a KV input + .apply("Add a dummy key", WithKeys.of(1)) + .apply( + "Update Schema", + ParDo.of(new UpdateSchemaDoFn(PROJECT, BIG_QUERY_DATASET_ID, updatedSchemas))); + } + + WriteResult result = rows.apply("Stream to BigQuery", write); + // We ignore the extra fields, so no rows should have been sent to DLQ + PAssert.that("Check DLQ is empty", result.getFailedStorageApiInserts()).empty(); + p.run().waitUntilFinish(); + + Map expectedCounts = new HashMap<>(NUM_DESTINATIONS); + for (int i = 0; i < numRows; i++) { + long mod = i % NUM_DESTINATIONS; + String destination = destinations.get(mod); + expectedCounts.merge(destination, 1, Integer::sum); + } + + for (Map.Entry expectedCount : expectedCounts.entrySet()) { + String dest = expectedCount.getKey(); + checkRowCompleteness(dest, expectedCount.getValue(), true); + checkRowsWithUpdatedSchema(dest, extraFields.get(dest), true); + } + } } From ad879e6bc394e1cc092bbd916e35beb04f1c3775 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 17 Dec 2024 18:59:48 -0500 Subject: [PATCH 2/6] fix and add some tests --- .../io/gcp/bigquery/BigQueryServicesImpl.java | 8 +++- .../StorageApiWriteUnshardedRecords.java | 14 +++--- .../StorageApiWritesShardedRecords.java | 25 +++++++++++ .../StorageApiSinkSchemaUpdateIT.java | 44 +++++++++++++------ 4 files changed, 71 insertions(+), 20 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index 432f31e81c90..22c9088753fe 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -76,6 +76,7 @@ import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamResponse; import com.google.cloud.bigquery.storage.v1.FlushRowsRequest; import com.google.cloud.bigquery.storage.v1.FlushRowsResponse; +import com.google.cloud.bigquery.storage.v1.GetWriteStreamRequest; import com.google.cloud.bigquery.storage.v1.ProtoRows; import com.google.cloud.bigquery.storage.v1.ProtoSchema; import com.google.cloud.bigquery.storage.v1.ReadRowsRequest; @@ -86,6 +87,7 @@ import com.google.cloud.bigquery.storage.v1.StreamWriter; import com.google.cloud.bigquery.storage.v1.TableSchema; import com.google.cloud.bigquery.storage.v1.WriteStream; +import com.google.cloud.bigquery.storage.v1.WriteStreamView; import com.google.cloud.hadoop.util.ApiErrorExtractor; import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer; import com.google.protobuf.DescriptorProtos; @@ -1419,7 +1421,11 @@ public WriteStream createWriteStream(String tableUrn, WriteStream.Type type) @Override public @Nullable WriteStream getWriteStream(String writeStream) { - return newWriteClient.getWriteStream(writeStream); + return newWriteClient.getWriteStream( + GetWriteStreamRequest.newBuilder() + .setView(WriteStreamView.FULL) + .setName(writeStream) + .build()); } @Override diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index c4ba8a8600d9..bfa75444aaa6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -475,11 +475,15 @@ SchemaAndDescriptor getCurrentTableSchema(String stream, @Nullable TableSchema u Preconditions.checkStateNotNull(maybeWriteStreamService) .getWriteStream(streamName); if (writeStream != null && writeStream.hasTableSchema()) { - TableSchema updatedFromStream = writeStream.getTableSchema(); - currentSchema.set(updatedFromStream); - updated.set(true); - LOG.debug( - "Fetched updated schema for table {}:\n\t{}", tableUrn, updatedFromStream); + Optional newSchema = + TableSchemaUpdateUtils.getUpdatedSchema( + initialTableSchema, writeStream.getTableSchema()); + if (newSchema.isPresent()) { + currentSchema.set(newSchema.get()); + updated.set(true); + LOG.debug( + "Fetched updated schema for table {}:\n\t{}", tableUrn, newSchema.get()); + } } } return null; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java index e2674fe34f2e..56c50ed73c01 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java @@ -28,6 +28,7 @@ import com.google.cloud.bigquery.storage.v1.Exceptions.StreamFinalizedException; import com.google.cloud.bigquery.storage.v1.ProtoRows; import com.google.cloud.bigquery.storage.v1.TableSchema; +import com.google.cloud.bigquery.storage.v1.WriteStream; import com.google.cloud.bigquery.storage.v1.WriteStream.Type; import com.google.protobuf.ByteString; import com.google.protobuf.DescriptorProtos; @@ -527,6 +528,30 @@ public void process( element.getKey().getKey(), dynamicDestinations, datasetService); tableSchema = converter.getTableSchema(); descriptor = converter.getDescriptor(false); + + if (autoUpdateSchema) { + // A StreamWriter ignores table schema updates that happen prior to its creation. + // So before creating a StreamWriter below, we fetch the table schema to check if we + // missed an update. + // If so, use the new schema instead of the base schema + @Nullable + WriteStream writeStream = + writeStreamService.getWriteStream(getOrCreateStream.get()); + TableSchema streamSchema = + writeStream == null + ? TableSchema.getDefaultInstance() + : writeStream.getTableSchema(); + Optional newSchema = + TableSchemaUpdateUtils.getUpdatedSchema(tableSchema, streamSchema); + + if (newSchema.isPresent()) { + tableSchema = newSchema.get(); + descriptor = + TableRowToStorageApiProto.descriptorSchemaFromTableSchema( + tableSchema, true, false); + updatedSchema.write(tableSchema); + } + } } AppendClientInfo info = AppendClientInfo.of( diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaUpdateIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaUpdateIT.java index 7fd5fe82d87e..3118e97b2b93 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaUpdateIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaUpdateIT.java @@ -131,6 +131,7 @@ public static Iterable data() { private static final int ORIGINAL_N = 60; // for dynamic destination test private static final int NUM_DESTINATIONS = 3; + private static final int TOTAL_NUM_STREAMS = 9; private final Random randomGenerator = new Random(); @@ -217,9 +218,10 @@ public void setup() { public void processElement(ProcessContext c, @StateId(ROW_COUNTER) ValueState counter) throws Exception { int current = firstNonNull(counter.read(), 0); - // We update schema early on to leave a healthy amount of time for - // StreamWriter to recognize it. - if (current == 10) { + // We update schema early on to leave a healthy amount of time for StreamWriter to recognize + // it. + // We also update halfway through so that some writers are created *after* the schema update + if (current == TOTAL_NUM_STREAMS / 2) { for (Map.Entry entry : newSchemas.entrySet()) { bqClient.updateTableSchema( projectId, @@ -322,7 +324,7 @@ private void runStreamingPipelineWithSchemaChange( p.getOptions().as(BigQueryOptions.class).setStorageApiAppendThresholdBytes(0); // Limit parallelism so that all streams recognize the new schema in an expected short amount // of time (before we start writing rows with updated schema) - p.getOptions().as(BigQueryOptions.class).setNumStorageWriteApiStreams(3); + p.getOptions().as(BigQueryOptions.class).setNumStorageWriteApiStreams(TOTAL_NUM_STREAMS); // Need to manually enable streaming engine for legacy dataflow runner ExperimentalOptions.addExperiment( p.getOptions().as(ExperimentalOptions.class), GcpOptions.STREAMING_ENGINE_EXPERIMENT); @@ -559,15 +561,14 @@ public void testAtLeastOnceWithAutoSchemaUpdate() throws Exception { runStreamingPipelineWithSchemaChange(Write.Method.STORAGE_API_AT_LEAST_ONCE, true, true); } - @Test - public void testExactlyOnceDynamicDestinationsWithAutoSchemaUpdate() throws Exception { + public void runDynamicDestinationsWithAutoSchemaUpdate(boolean useAtLeastOnce) throws Exception { Pipeline p = Pipeline.create(TestPipeline.testingPipelineOptions()); - // Set threshold bytes to 0 so that the stream attempts to fetch an updated schema after each - // append + // 0 threshold so that the stream tries fetching an updated schema after each append p.getOptions().as(BigQueryOptions.class).setStorageApiAppendThresholdBytes(0); - // Limit parallelism so that all streams recognize the new schema in an expected short amount - // of time (before we start writing rows with updated schema) - p.getOptions().as(BigQueryOptions.class).setNumStorageWriteApiStreams(3); + // Total streams per destination + p.getOptions() + .as(BigQueryOptions.class) + .setNumStorageWriteApiStreams(TOTAL_NUM_STREAMS / NUM_DESTINATIONS); // Need to manually enable streaming engine for legacy dataflow runner ExperimentalOptions.addExperiment( p.getOptions().as(ExperimentalOptions.class), GcpOptions.STREAMING_ENGINE_EXPERIMENT); @@ -619,15 +620,20 @@ public void testExactlyOnceDynamicDestinationsWithAutoSchemaUpdate() throws Exce }) .withAutoSchemaUpdate(true) .ignoreUnknownValues() - .withMethod(Write.Method.STORAGE_WRITE_API) - .withTriggeringFrequency(Duration.standardSeconds(1)) + .withMethod(Write.Method.STORAGE_API_AT_LEAST_ONCE) .withCreateDisposition(CreateDisposition.CREATE_NEVER) .withWriteDisposition(WriteDisposition.WRITE_APPEND); if (useInputSchema) { write = write.withSchema(inputSchema); } + if (!useAtLeastOnce) { + write = + write + .withMethod(Write.Method.STORAGE_WRITE_API) + .withTriggeringFrequency(Duration.standardSeconds(1)); + } - int numRows = TOTAL_N * NUM_DESTINATIONS; + int numRows = TOTAL_N; // set up and build pipeline Instant start = new Instant(0); // We give a healthy waiting period between each element to give Storage API streams a chance to @@ -691,4 +697,14 @@ public void testExactlyOnceDynamicDestinationsWithAutoSchemaUpdate() throws Exce checkRowsWithUpdatedSchema(dest, extraFields.get(dest), true); } } + + @Test + public void testExactlyOnceDynamicDestinationsWithAutoSchemaUpdate() throws Exception { + runDynamicDestinationsWithAutoSchemaUpdate(false); + } + + @Test + public void testAtLeastOnceDynamicDestinationsWithAutoSchemaUpdate() throws Exception { + runDynamicDestinationsWithAutoSchemaUpdate(true); + } } From 7d455f9a1168f7720ce790c3c8c3e1bb98106c62 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 17 Dec 2024 19:16:34 -0500 Subject: [PATCH 3/6] add to changes.md --- CHANGES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES.md b/CHANGES.md index 7707e252961b..0e07e72d5a8d 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -90,6 +90,7 @@ * Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). * Fixed EventTimeTimer ordering in Prism. ([#32222](https://github.com/apache/beam/issues/32222)). +* [BigQueryIO] Fixed an issue where Storage Write API sometimes doesn't pick up auto-schema updates ([#33231](https://github.com/apache/beam/pull/33231)) ## Security Fixes * Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)). From 5159a611999f36655abc9a643687711dcfe7684e Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 18 Dec 2024 12:05:32 -0500 Subject: [PATCH 4/6] fix whitespace --- CHANGES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 0e07e72d5a8d..b7a8d4d6588d 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -90,7 +90,7 @@ * Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). * Fixed EventTimeTimer ordering in Prism. ([#32222](https://github.com/apache/beam/issues/32222)). -* [BigQueryIO] Fixed an issue where Storage Write API sometimes doesn't pick up auto-schema updates ([#33231](https://github.com/apache/beam/pull/33231)) +* [BigQueryIO] Fixed an issue where Storage Write API sometimes doesn't pick up auto-schema updates ([#33231](https://github.com/apache/beam/pull/33231)) ## Security Fixes * Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)). From 177726983551d2cd115b913503729bfa57336fb7 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 30 Dec 2024 12:33:41 -0500 Subject: [PATCH 5/6] trigger postcommits --- .github/trigger_files/beam_PostCommit_Java_DataflowV2.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json b/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json index 3f63c0c9975f..bbdc3a3910ef 100644 --- a/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json +++ b/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 2 + "modification": 3 } From f7779b436608ede657f8aa3791459d8d1702249d Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 7 Jan 2025 10:37:27 -0500 Subject: [PATCH 6/6] address comments --- CHANGES.md | 1 - .../sdk/io/gcp/bigquery/BigQueryServices.java | 2 +- .../sdk/io/gcp/bigquery/BigQueryServicesImpl.java | 15 +++++++++------ .../bigquery/StorageApiWriteUnshardedRecords.java | 10 ++++------ .../bigquery/StorageApiWritesShardedRecords.java | 9 +++------ .../sdk/io/gcp/testing/FakeDatasetService.java | 4 ++-- 6 files changed, 19 insertions(+), 22 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 1c4b9cdc0db3..05902bcbf8b6 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -79,7 +79,6 @@ ## Bugfixes * Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). -* Fixed EventTimeTimer ordering in Prism. ([#32222](https://github.com/apache/beam/issues/32222)). * [BigQueryIO] Fixed an issue where Storage Write API sometimes doesn't pick up auto-schema updates ([#33231](https://github.com/apache/beam/pull/33231)) ## Security Fixes diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java index 374288de6562..8dbc47359b70 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java @@ -213,7 +213,7 @@ WriteStream createWriteStream(String tableUrn, WriteStream.Type type) throws IOException, InterruptedException; @Nullable - WriteStream getWriteStream(String writeStream); + TableSchema getWriteStreamSchema(String writeStream); /** * Create an append client for a given Storage API write stream. The stream must be created diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index 22c9088753fe..0688694fb9be 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -1420,12 +1420,15 @@ public WriteStream createWriteStream(String tableUrn, WriteStream.Type type) } @Override - public @Nullable WriteStream getWriteStream(String writeStream) { - return newWriteClient.getWriteStream( - GetWriteStreamRequest.newBuilder() - .setView(WriteStreamView.FULL) - .setName(writeStream) - .build()); + public @Nullable TableSchema getWriteStreamSchema(String writeStream) { + @Nullable + WriteStream stream = + newWriteClient.getWriteStream( + GetWriteStreamRequest.newBuilder() + .setView(WriteStreamView.FULL) + .setName(writeStream) + .build()); + return (stream != null && stream.hasTableSchema()) ? stream.getTableSchema() : null; } @Override diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index fbf437794cf1..1b5f32f7e431 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -29,7 +29,6 @@ import com.google.cloud.bigquery.storage.v1.Exceptions; import com.google.cloud.bigquery.storage.v1.ProtoRows; import com.google.cloud.bigquery.storage.v1.TableSchema; -import com.google.cloud.bigquery.storage.v1.WriteStream; import com.google.cloud.bigquery.storage.v1.WriteStream.Type; import com.google.protobuf.ByteString; import com.google.protobuf.DescriptorProtos; @@ -475,13 +474,12 @@ SchemaAndDescriptor getCurrentTableSchema(String stream, @Nullable TableSchema u () -> { if (autoUpdateSchema) { @Nullable - WriteStream writeStream = + TableSchema streamSchema = Preconditions.checkStateNotNull(maybeWriteStreamService) - .getWriteStream(streamName); - if (writeStream != null && writeStream.hasTableSchema()) { + .getWriteStreamSchema(streamName); + if (streamSchema != null) { Optional newSchema = - TableSchemaUpdateUtils.getUpdatedSchema( - initialTableSchema, writeStream.getTableSchema()); + TableSchemaUpdateUtils.getUpdatedSchema(initialTableSchema, streamSchema); if (newSchema.isPresent()) { currentSchema.set(newSchema.get()); updated.set(true); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java index 8728e71e3d31..d905c4bf93ca 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java @@ -28,7 +28,6 @@ import com.google.cloud.bigquery.storage.v1.Exceptions.StreamFinalizedException; import com.google.cloud.bigquery.storage.v1.ProtoRows; import com.google.cloud.bigquery.storage.v1.TableSchema; -import com.google.cloud.bigquery.storage.v1.WriteStream; import com.google.cloud.bigquery.storage.v1.WriteStream.Type; import com.google.protobuf.ByteString; import com.google.protobuf.DescriptorProtos; @@ -539,12 +538,10 @@ public void process( // missed an update. // If so, use the new schema instead of the base schema @Nullable - WriteStream writeStream = - writeStreamService.getWriteStream(getOrCreateStream.get()); TableSchema streamSchema = - writeStream == null - ? TableSchema.getDefaultInstance() - : writeStream.getTableSchema(); + MoreObjects.firstNonNull( + writeStreamService.getWriteStreamSchema(getOrCreateStream.get()), + TableSchema.getDefaultInstance()); Optional newSchema = TableSchemaUpdateUtils.getUpdatedSchema(tableSchema, streamSchema); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java index a99e4bea37a7..24229643eca3 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java @@ -590,11 +590,11 @@ public WriteStream createWriteStream(String tableUrn, Type type) throws Interrup @Override @Nullable - public WriteStream getWriteStream(String streamName) { + public com.google.cloud.bigquery.storage.v1.TableSchema getWriteStreamSchema(String streamName) { synchronized (FakeDatasetService.class) { @Nullable Stream stream = writeStreams.get(streamName); if (stream != null) { - return stream.toWriteStream(); + return stream.toWriteStream().getTableSchema(); } } // TODO(relax): Return the exact error that BigQuery returns.