Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BigQueryIO] fetch updated schema for newly created Storage API stream writers #33231

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PostCommit_Java_DataflowV2.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 2
"modification": 3
}
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@
## Bugfixes

* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Fixed EventTimeTimer ordering in Prism. ([#32222](https://github.com/apache/beam/issues/32222)).
ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved
* [BigQueryIO] Fixed an issue where Storage Write API sometimes doesn't pick up auto-schema updates ([#33231](https://github.com/apache/beam/pull/33231))

## Security Fixes
* Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamResponse;
import com.google.cloud.bigquery.storage.v1.FlushRowsRequest;
import com.google.cloud.bigquery.storage.v1.FlushRowsResponse;
import com.google.cloud.bigquery.storage.v1.GetWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.cloud.bigquery.storage.v1.ProtoSchema;
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
Expand All @@ -86,6 +87,7 @@
import com.google.cloud.bigquery.storage.v1.StreamWriter;
import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.cloud.bigquery.storage.v1.WriteStreamView;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
import com.google.protobuf.DescriptorProtos;
Expand Down Expand Up @@ -1419,7 +1421,11 @@ public WriteStream createWriteStream(String tableUrn, WriteStream.Type type)

@Override
public @Nullable WriteStream getWriteStream(String writeStream) {
return newWriteClient.getWriteStream(writeStream);
return newWriteClient.getWriteStream(
ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved
GetWriteStreamRequest.newBuilder()
.setView(WriteStreamView.FULL)
.setName(writeStream)
.build());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,11 +479,15 @@ SchemaAndDescriptor getCurrentTableSchema(String stream, @Nullable TableSchema u
Preconditions.checkStateNotNull(maybeWriteStreamService)
.getWriteStream(streamName);
if (writeStream != null && writeStream.hasTableSchema()) {
TableSchema updatedFromStream = writeStream.getTableSchema();
currentSchema.set(updatedFromStream);
updated.set(true);
LOG.debug(
"Fetched updated schema for table {}:\n\t{}", tableUrn, updatedFromStream);
Optional<TableSchema> newSchema =
TableSchemaUpdateUtils.getUpdatedSchema(
initialTableSchema, writeStream.getTableSchema());
if (newSchema.isPresent()) {
currentSchema.set(newSchema.get());
updated.set(true);
LOG.debug(
"Fetched updated schema for table {}:\n\t{}", tableUrn, newSchema.get());
}
}
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.cloud.bigquery.storage.v1.Exceptions.StreamFinalizedException;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.cloud.bigquery.storage.v1.WriteStream.Type;
import com.google.protobuf.ByteString;
import com.google.protobuf.DescriptorProtos;
Expand Down Expand Up @@ -531,6 +532,30 @@ public void process(
element.getKey().getKey(), dynamicDestinations, datasetService);
tableSchema = converter.getTableSchema();
descriptor = converter.getDescriptor(false);

if (autoUpdateSchema) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is the ideal place to put this. getAppendClientInfo is called whenever the static cache is populated, meaning that on any worker restart, range move, etc. we'll be forced to call this API again. However we have persistent state in this DoFn, so we know if it's a "new" key or not. Can we use that to gate calling this method instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should always perform this check before creating a new StreamWriter , regardless of the reason for its creation. The only exception is if we already have an updated schema stored in state (see first if block above). If I'm following correctly, this method (getAppendClientInfo) will always create a new stream writer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also note that the updated schema is ignored when the StreamWriter object's creation time is later than the updated schema's.
i.e. it doesn't matter when the WriteStream itself was created

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this doesn't quite solve the following race condition:

  • schema S is updated to S' before creating stream writer, we detect it in this codepath and store it in state.
  • StreamWriter is destroyed (either because it is idle and evicted from the cache, or because the worker crashes).
  • We later recreate the StreamWriter. Because we have an updated schema in cache, we don't execute this codepath.

In this case, I think we'll completely miss the new schema. IIUC the best way to address this race would be for BigQuery to provide us with a version of StreamWriter that returns schemas with new fields - i.e. not basing it off of creation time.

Copy link
Contributor Author

@ahmedabu98 ahmedabu98 Jan 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, this if block would only be executed if we have not detected S' yet.

But if S' is detected and stored in state, we would execute the first if block (line 520 here) and use that stored value. We still end up using the updated S', no?

Copy link
Contributor Author

@ahmedabu98 ahmedabu98 Jan 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless:

  1. S is updated to S'
  2. current writer recognizes it and stores S' in state
  3. writer crashes for whatever reason
  4. S' is updated to S'' (before new writer is created)
  5. new writer is created, using the schema stored in state: S'

This new writer would have no chance to recognize S'' because it was created after that update.

Copy link
Contributor Author

@ahmedabu98 ahmedabu98 Jan 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a much narrower edge-case though. It's a small subset of the bigger problem we currently have, which involves essentially any new StreamWriter.

Are we okay with merging this solution and continue to pursue BigQuery to improve things on their side?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that´s the race condition I had in mind. Is BigQuery planning on improving things on their side_

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will follow up with them

// A StreamWriter ignores table schema updates that happen prior to its creation.
// So before creating a StreamWriter below, we fetch the table schema to check if we
// missed an update.
// If so, use the new schema instead of the base schema
@Nullable
WriteStream writeStream =
writeStreamService.getWriteStream(getOrCreateStream.get());
TableSchema streamSchema =
writeStream == null
? TableSchema.getDefaultInstance()
: writeStream.getTableSchema();
Optional<TableSchema> newSchema =
TableSchemaUpdateUtils.getUpdatedSchema(tableSchema, streamSchema);

if (newSchema.isPresent()) {
tableSchema = newSchema.get();
descriptor =
TableRowToStorageApiProto.descriptorSchemaFromTableSchema(
tableSchema, true, false);
updatedSchema.write(tableSchema);
}
}
}
AppendClientInfo info =
AppendClientInfo.of(
Expand Down
Loading
Loading