Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BigQueryIO] fetch updated schema for newly created Storage API stream writers #33231

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@
## Bugfixes

* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Fixed EventTimeTimer ordering in Prism. ([#32222](https://github.com/apache/beam/issues/32222)).
ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved
* [BigQueryIO] Fixed an issue where Storage Write API sometimes doesn't pick up auto-schema updates ([#33231](https://github.com/apache/beam/pull/33231))

## Security Fixes
* Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamResponse;
import com.google.cloud.bigquery.storage.v1.FlushRowsRequest;
import com.google.cloud.bigquery.storage.v1.FlushRowsResponse;
import com.google.cloud.bigquery.storage.v1.GetWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.cloud.bigquery.storage.v1.ProtoSchema;
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
Expand All @@ -86,6 +87,7 @@
import com.google.cloud.bigquery.storage.v1.StreamWriter;
import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.cloud.bigquery.storage.v1.WriteStreamView;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
import com.google.protobuf.DescriptorProtos;
Expand Down Expand Up @@ -1419,7 +1421,11 @@ public WriteStream createWriteStream(String tableUrn, WriteStream.Type type)

@Override
public @Nullable WriteStream getWriteStream(String writeStream) {
return newWriteClient.getWriteStream(writeStream);
return newWriteClient.getWriteStream(
ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved
GetWriteStreamRequest.newBuilder()
.setView(WriteStreamView.FULL)
.setName(writeStream)
.build());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,11 +479,15 @@ SchemaAndDescriptor getCurrentTableSchema(String stream, @Nullable TableSchema u
Preconditions.checkStateNotNull(maybeWriteStreamService)
.getWriteStream(streamName);
if (writeStream != null && writeStream.hasTableSchema()) {
TableSchema updatedFromStream = writeStream.getTableSchema();
currentSchema.set(updatedFromStream);
updated.set(true);
LOG.debug(
"Fetched updated schema for table {}:\n\t{}", tableUrn, updatedFromStream);
Optional<TableSchema> newSchema =
TableSchemaUpdateUtils.getUpdatedSchema(
initialTableSchema, writeStream.getTableSchema());
if (newSchema.isPresent()) {
currentSchema.set(newSchema.get());
updated.set(true);
LOG.debug(
"Fetched updated schema for table {}:\n\t{}", tableUrn, newSchema.get());
}
}
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.cloud.bigquery.storage.v1.Exceptions.StreamFinalizedException;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.cloud.bigquery.storage.v1.WriteStream.Type;
import com.google.protobuf.ByteString;
import com.google.protobuf.DescriptorProtos;
Expand Down Expand Up @@ -531,6 +532,30 @@ public void process(
element.getKey().getKey(), dynamicDestinations, datasetService);
tableSchema = converter.getTableSchema();
descriptor = converter.getDescriptor(false);

if (autoUpdateSchema) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is the ideal place to put this. getAppendClientInfo is called whenever the static cache is populated, meaning that on any worker restart, range move, etc. we'll be forced to call this API again. However we have persistent state in this DoFn, so we know if it's a "new" key or not. Can we use that to gate calling this method instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should always perform this check before creating a new StreamWriter , regardless of the reason for its creation. The only exception is if we already have an updated schema stored in state (see first if block above). If I'm following correctly, this method (getAppendClientInfo) will always create a new stream writer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also note that the updated schema is ignored when the StreamWriter object's creation time is later than the updated schema's.
i.e. it doesn't matter when the WriteStream itself was created

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this doesn't quite solve the following race condition:

  • schema S is updated to S' before creating stream writer, we detect it in this codepath and store it in state.
  • StreamWriter is destroyed (either because it is idle and evicted from the cache, or because the worker crashes).
  • We later recreate the StreamWriter. Because we have an updated schema in cache, we don't execute this codepath.

In this case, I think we'll completely miss the new schema. IIUC the best way to address this race would be for BigQuery to provide us with a version of StreamWriter that returns schemas with new fields - i.e. not basing it off of creation time.

Copy link
Contributor Author

@ahmedabu98 ahmedabu98 Jan 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, this if block would only be executed if we have not detected S' yet.

But if S' is detected and stored in state, we would execute the first if block (line 520 here) and use that stored value. We still end up using the updated S', no?

Copy link
Contributor Author

@ahmedabu98 ahmedabu98 Jan 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless:

  1. S is updated to S'
  2. current writer recognizes it and stores S' in state
  3. writer crashes for whatever reason
  4. S' is updated to S'' (before new writer is created)
  5. new writer is created, using the schema stored in state: S'

This new writer would have no chance to recognize S'' because it was created after that update.

Copy link
Contributor Author

@ahmedabu98 ahmedabu98 Jan 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a much narrower edge-case though. It's a small subset of the bigger problem we currently have, which involves essentially any new StreamWriter.

Are we okay with merging this solution and continue to pursue BigQuery to improve things on their side?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that´s the race condition I had in mind. Is BigQuery planning on improving things on their side_

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will follow up with them

// A StreamWriter ignores table schema updates that happen prior to its creation.
// So before creating a StreamWriter below, we fetch the table schema to check if we
// missed an update.
// If so, use the new schema instead of the base schema
@Nullable
WriteStream writeStream =
writeStreamService.getWriteStream(getOrCreateStream.get());
TableSchema streamSchema =
writeStream == null
? TableSchema.getDefaultInstance()
: writeStream.getTableSchema();
Optional<TableSchema> newSchema =
TableSchemaUpdateUtils.getUpdatedSchema(tableSchema, streamSchema);

if (newSchema.isPresent()) {
tableSchema = newSchema.get();
descriptor =
TableRowToStorageApiProto.descriptorSchemaFromTableSchema(
tableSchema, true, false);
updatedSchema.write(tableSchema);
}
}
}
AppendClientInfo info =
AppendClientInfo.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects.firstNonNull;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue;

Expand All @@ -33,7 +35,9 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.function.Function;
Expand All @@ -60,6 +64,7 @@
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.joda.time.Duration;
Expand Down Expand Up @@ -124,6 +129,9 @@ public static Iterable<Object[]> data() {
private static final int TOTAL_N = 70;
// Number of rows with the original schema
private static final int ORIGINAL_N = 60;
// for dynamic destination test
private static final int NUM_DESTINATIONS = 3;
private static final int TOTAL_NUM_STREAMS = 9;

private final Random randomGenerator = new Random();

Expand All @@ -145,13 +153,20 @@ public static void cleanUp() {
}

private String createTable(TableSchema tableSchema) throws IOException, InterruptedException {
return createTable(tableSchema, "");
}

private String createTable(TableSchema tableSchema, String suffix)
throws IOException, InterruptedException {
String tableId = Iterables.get(Splitter.on('[').split(testName.getMethodName()), 0);
if (useInputSchema) {
tableId += "WithInputSchema";
}
if (changeTableSchema) {
tableId += "OnSchemaChange";
}
tableId += suffix;

BQ_CLIENT.deleteTable(PROJECT, BIG_QUERY_DATASET_ID, tableId);
BQ_CLIENT.createNewTable(
PROJECT,
Expand All @@ -170,9 +185,8 @@ static class UpdateSchemaDoFn extends DoFn<KV<Integer, TableRow>, TableRow> {

private final String projectId;
private final String datasetId;
private final String tableId;
// represent as String because TableSchema is not serializable
private final String newSchema;
private final Map<String, String> newSchemas;

private transient BigqueryClient bqClient;

Expand All @@ -183,11 +197,14 @@ static class UpdateSchemaDoFn extends DoFn<KV<Integer, TableRow>, TableRow> {
private final StateSpec<ValueState<Integer>> counter;

public UpdateSchemaDoFn(
String projectId, String datasetId, String tableId, TableSchema newSchema) {
String projectId, String datasetId, Map<String, TableSchema> newSchemas) {
this.projectId = projectId;
this.datasetId = datasetId;
this.tableId = tableId;
this.newSchema = BigQueryHelpers.toJsonString(newSchema);
Map<String, String> serializableSchemas = new HashMap<>();
for (Map.Entry<String, TableSchema> entry : newSchemas.entrySet()) {
serializableSchemas.put(entry.getKey(), BigQueryHelpers.toJsonString(entry.getValue()));
}
this.newSchemas = serializableSchemas;
this.bqClient = null;
this.counter = StateSpecs.value();
}
Expand All @@ -201,14 +218,17 @@ public void setup() {
public void processElement(ProcessContext c, @StateId(ROW_COUNTER) ValueState<Integer> counter)
throws Exception {
int current = firstNonNull(counter.read(), 0);
// We update schema early on to leave a healthy amount of time for
// StreamWriter to recognize it.
if (current == 10) {
bqClient.updateTableSchema(
projectId,
datasetId,
tableId,
BigQueryHelpers.fromJsonString(newSchema, TableSchema.class));
// We update schema early on to leave a healthy amount of time for StreamWriter to recognize
// it.
// We also update halfway through so that some writers are created *after* the schema update
if (current == TOTAL_NUM_STREAMS / 2) {
for (Map.Entry<String, String> entry : newSchemas.entrySet()) {
bqClient.updateTableSchema(
projectId,
datasetId,
entry.getKey(),
BigQueryHelpers.fromJsonString(entry.getValue(), TableSchema.class));
}
}

counter.write(++current);
Expand Down Expand Up @@ -304,7 +324,7 @@ private void runStreamingPipelineWithSchemaChange(
p.getOptions().as(BigQueryOptions.class).setStorageApiAppendThresholdBytes(0);
// Limit parallelism so that all streams recognize the new schema in an expected short amount
// of time (before we start writing rows with updated schema)
p.getOptions().as(BigQueryOptions.class).setNumStorageWriteApiStreams(3);
p.getOptions().as(BigQueryOptions.class).setNumStorageWriteApiStreams(TOTAL_NUM_STREAMS);
// Need to manually enable streaming engine for legacy dataflow runner
ExperimentalOptions.addExperiment(
p.getOptions().as(ExperimentalOptions.class), GcpOptions.STREAMING_ENGINE_EXPERIMENT);
Expand Down Expand Up @@ -394,7 +414,8 @@ private void runStreamingPipelineWithSchemaChange(
.apply(
"Update Schema",
ParDo.of(
new UpdateSchemaDoFn(PROJECT, BIG_QUERY_DATASET_ID, tableId, updatedSchema)));
new UpdateSchemaDoFn(
PROJECT, BIG_QUERY_DATASET_ID, ImmutableMap.of(tableId, updatedSchema))));
}
WriteResult result = rows.apply("Stream to BigQuery", write);
if (useIgnoreUnknownValues) {
Expand Down Expand Up @@ -494,13 +515,13 @@ public void checkRowsWithUpdatedSchema(
if (Integer.parseInt((String) row.get("id")) < ORIGINAL_N
|| !useAutoSchemaUpdate
|| !changeTableSchema) {
assertTrue(
assertNull(
String.format("Expected row to NOT have field %s:\n%s", extraField, row),
row.get(extraField) == null);
row.get(extraField));
} else {
assertTrue(
assertNotNull(
String.format("Expected row to have field %s:\n%s", extraField, row),
row.get(extraField) != null);
row.get(extraField));
}
}
}
Expand Down Expand Up @@ -539,4 +560,151 @@ public void testAtLeastOnceWithIgnoreUnknownValues() throws Exception {
public void testAtLeastOnceWithAutoSchemaUpdate() throws Exception {
runStreamingPipelineWithSchemaChange(Write.Method.STORAGE_API_AT_LEAST_ONCE, true, true);
}

public void runDynamicDestinationsWithAutoSchemaUpdate(boolean useAtLeastOnce) throws Exception {
ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved
Pipeline p = Pipeline.create(TestPipeline.testingPipelineOptions());
// 0 threshold so that the stream tries fetching an updated schema after each append
p.getOptions().as(BigQueryOptions.class).setStorageApiAppendThresholdBytes(0);
// Total streams per destination
p.getOptions()
.as(BigQueryOptions.class)
.setNumStorageWriteApiStreams(TOTAL_NUM_STREAMS / NUM_DESTINATIONS);
// Need to manually enable streaming engine for legacy dataflow runner
ExperimentalOptions.addExperiment(
p.getOptions().as(ExperimentalOptions.class), GcpOptions.STREAMING_ENGINE_EXPERIMENT);
// Only run the most relevant test case on Dataflow
if (p.getOptions().getRunner().getName().contains("DataflowRunner")) {
assumeTrue(
"Skipping in favor of more relevant test case", changeTableSchema && useInputSchema);
}

List<String> fieldNamesOrigin = new ArrayList<String>(Arrays.asList(FIELDS));

// Shuffle the fields in the write schema to do fuzz testing on field order
List<String> fieldNamesShuffled = new ArrayList<String>(fieldNamesOrigin);
Collections.shuffle(fieldNamesShuffled, randomGenerator);
TableSchema bqTableSchema = makeTableSchemaFromTypes(fieldNamesOrigin, null);
TableSchema inputSchema = makeTableSchemaFromTypes(fieldNamesShuffled, null);

Map<Long, String> destinations = new HashMap<>(NUM_DESTINATIONS);
Map<String, TableSchema> updatedSchemas = new HashMap<>(NUM_DESTINATIONS);
Map<String, String> extraFields = new HashMap<>(NUM_DESTINATIONS);
Map<Long, GenerateRowFunc> rowFuncs = new HashMap<>(NUM_DESTINATIONS);
for (int i = 0; i < NUM_DESTINATIONS; i++) {
// The updated schema includes all fields in the original schema plus a random new field
List<String> fieldNamesWithExtra = new ArrayList<String>(fieldNamesOrigin);
String extraField =
fieldNamesOrigin.get(randomGenerator.nextInt(fieldNamesOrigin.size())) + "_EXTRA";
fieldNamesWithExtra.add(extraField);
TableSchema updatedSchema =
makeTableSchemaFromTypes(fieldNamesWithExtra, ImmutableSet.of(extraField));
GenerateRowFunc generateRowFunc = new GenerateRowFunc(fieldNamesOrigin, fieldNamesWithExtra);

String tableId = createTable(bqTableSchema, "_dynamic_" + i);
String tableSpec = PROJECT + ":" + BIG_QUERY_DATASET_ID + "." + tableId;

rowFuncs.put((long) i, generateRowFunc);
destinations.put((long) i, tableSpec);
updatedSchemas.put(tableId, updatedSchema);
extraFields.put(tableSpec, extraField);
}

// build write transform
Write<TableRow> write =
BigQueryIO.writeTableRows()
.to(
row -> {
long l = (int) row.getValue().get("id") % NUM_DESTINATIONS;
String destination = destinations.get(l);
return new TableDestination(destination, null);
})
.withAutoSchemaUpdate(true)
.ignoreUnknownValues()
.withMethod(Write.Method.STORAGE_API_AT_LEAST_ONCE)
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND);
if (useInputSchema) {
write = write.withSchema(inputSchema);
}
if (!useAtLeastOnce) {
write =
write
.withMethod(Write.Method.STORAGE_WRITE_API)
.withTriggeringFrequency(Duration.standardSeconds(1));
}

int numRows = TOTAL_N;
// set up and build pipeline
Instant start = new Instant(0);
// We give a healthy waiting period between each element to give Storage API streams a chance to
// recognize the new schema. Apply on relevant tests.
Duration interval = changeTableSchema ? Duration.standardSeconds(1) : Duration.millis(1);
Duration stop =
changeTableSchema ? Duration.standardSeconds(numRows - 1) : Duration.millis(numRows - 1);
Function<Instant, Long> getIdFromInstant =
changeTableSchema
? (Function<Instant, Long> & Serializable)
(Instant instant) -> instant.getMillis() / 1000
: (Function<Instant, Long> & Serializable) Instant::getMillis;

// Generates rows with original schema up for row IDs under ORIGINAL_N
// Then generates rows with updated schema for the rest
// Rows with updated schema should only reach the table if ignoreUnknownValues is set,
// and the extra field should be present only when autoSchemaUpdate is set
PCollection<Instant> instants =
p.apply(
"Generate Instants",
PeriodicImpulse.create()
.startAt(start)
.stopAt(start.plus(stop))
.withInterval(interval)
.catchUpToNow(false));
PCollection<TableRow> rows =
instants.apply(
"Create TableRows",
MapElements.into(TypeDescriptor.of(TableRow.class))
.via(
instant -> {
long rowId = getIdFromInstant.apply(instant);
long dest = rowId % NUM_DESTINATIONS;
return rowFuncs.get(dest).apply(rowId);
}));
if (changeTableSchema) {
rows =
rows
// UpdateSchemaDoFn uses state, so need to have a KV input
.apply("Add a dummy key", WithKeys.of(1))
.apply(
"Update Schema",
ParDo.of(new UpdateSchemaDoFn(PROJECT, BIG_QUERY_DATASET_ID, updatedSchemas)));
}

WriteResult result = rows.apply("Stream to BigQuery", write);
// We ignore the extra fields, so no rows should have been sent to DLQ
PAssert.that("Check DLQ is empty", result.getFailedStorageApiInserts()).empty();
p.run().waitUntilFinish();

Map<String, Integer> expectedCounts = new HashMap<>(NUM_DESTINATIONS);
for (int i = 0; i < numRows; i++) {
long mod = i % NUM_DESTINATIONS;
String destination = destinations.get(mod);
expectedCounts.merge(destination, 1, Integer::sum);
}

for (Map.Entry<String, Integer> expectedCount : expectedCounts.entrySet()) {
String dest = expectedCount.getKey();
checkRowCompleteness(dest, expectedCount.getValue(), true);
checkRowsWithUpdatedSchema(dest, extraFields.get(dest), true);
}
}

@Test
public void testExactlyOnceDynamicDestinationsWithAutoSchemaUpdate() throws Exception {
runDynamicDestinationsWithAutoSchemaUpdate(false);
}

@Test
public void testAtLeastOnceDynamicDestinationsWithAutoSchemaUpdate() throws Exception {
runDynamicDestinationsWithAutoSchemaUpdate(true);
}
}
Loading