-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
🎉 New BigQuery destination with Structured/Repeated Records #4176
Conversation
/test connector=connectors/destination-bigquery
|
/test connector=connectors/destination-bigquery-denormalized
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! Left a few minor comments.
Also this destination should have a doc.
# Changelog | ||
|
||
## 0.1.0 | ||
Implementation of a destination for BigQuery with RECORD/REPEATED columns instead of raw JSON blobs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to the latest guide, we should track the changelog in the public documentation of the connector:
https://docs.airbyte.io/contributing-to-airbyte/updating-documentation#changelogs
...ion/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationAcceptanceTest.java
Show resolved
Hide resolved
airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/resources/spec.json
Outdated
Show resolved
Hide resolved
.filter(key -> { | ||
final boolean validKey = fieldNames.contains(namingResolver.getIdentifier(key)); | ||
if (!validKey) { | ||
LOGGER.warn("Ignoring field {} as it is not defined in catalog", key); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be a debug
level message? Otherwise, it can be quite noisy, since it can emitted for every record.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch, thanks
...airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationAcceptanceTest.java
Show resolved
Hide resolved
} | ||
if (fieldList.stream().noneMatch(f -> f.getName().equals(JavaBaseConstants.COLUMN_NAME_EMITTED_AT))) { | ||
fieldList.add(Field.of(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, StandardSQLTypeName.TIMESTAMP)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are the above two if
checks always true
? It seems that the original Json schema will never have the two Airbyte columns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the original JSON Schema can have the two airbyte columns if the streams were produced by airbyte and re-used as source streams
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I see. Good to know this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. I feel like this should support copy destination functionality too. If that's too hard to do now then let's at least create an issue to do it.
@@ -0,0 +1,7 @@ | |||
{ | |||
"destinationDefinitionId": "079d5540-f236-4294-ba7c-ade8fd918496", | |||
"name": "BigQuery de-normalized", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"name": "BigQuery de-normalized", | |
"name": "BigQuery (Typed Struct)", |
I think this display name is a little bit clearer? Fine with me if you want to stick with denormalized but if you do it should be one word and I'd suggest putting it in parens. so BigQuery (Denormalized)
.
@@ -41,7 +41,7 @@ public String getRawTableName(String streamName) { | |||
|
|||
@Override | |||
public String getTmpTableName(String streamName) { | |||
return convertStreamName("_airbyte_" + Instant.now().toEpochMilli() + "_" + getRawTableName(streamName)); | |||
return convertStreamName(Strings.addRandomSuffix("_airbyte_tmp", "_", 3) + "_" + streamName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it was originally needed when I was producing both a raw table (json blob) and struct typed table by the same destination, so I needed two different "tmp" names. But then I moved away from that approach...
Should I revert to the old naming with timestamp?
@@ -389,9 +389,9 @@ def cast_property_type(self, property_name: str, column_name: str, jinja_column: | |||
print(f"WARN: Unknown type for column {property_name} at {self.current_json_path()}") | |||
return column_name | |||
elif is_array(definition["type"]): | |||
return self.cast_property_type_as_array(property_name, column_name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this changing? isn't this affecting the original BQ destination?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is doing the exact same thing as before (minus the extra function call) so it's not changing anything.
The extra function call was a placeholder to implement the struct/repeated "casting" there but it's actually not doable so not useful anymore
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class BigQueryDenormalizedDestination extends BigQueryDestination { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you explain some more why we wouldn't want to make this a CopyDestination
? I understand why this is split from BigQueryDestination
--that totally makes sense to me. But it seems like this should be able to support CopyDestination
and normal insert?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not saying it shouldn't be a CopyDestination
, I just tried a similar approach to CopyDestination
, and I faced issues that did not make it easy to pursue for this use case with two modes of writing for a destination. Thus, chose to make two destinations.
BigQueryDestination
isn't currently implemented for CopyDestination
, and it was not the goal of this PR either. But I guess we could indeed make both destination-bigquery
and destination-bigquery-denormalized
adopt a CopyDestination
strategy too
@ChristopheDuong i'd bumped a new version of the BigQuery destination last week, please merge master into your branch. |
/publish connector=connectors/destination-bigquery
|
/publish connector=connectors/destination-bigquery-denormalized
|
What
Closes #1927
How
A new destination that does not rely on base-normalization but implements its own native normalization with BigQuery by converting JSON Schema into google cloud schema, and thus, handle structured//repeated records/arrays.
Implementation notes
This issue was not doable from base-normalization python/dbt codebase as it seemed difficult (not possible?) to implement in BQ Standard SQL logic that parses a JSON column string into separate columns while building or casting to a STRUCT field... (the logic with nested is making this harder to think about too, and certainly not very efficient!)
It seemed more standard (from google docs) to provide the JSON Schema to BigQuery at loading time instead when creating the tables. Therefore, do the implementation in the java codebase.
I started implementing this following the same pattern of CopyDestination that is first uploading to a cloud storage and then load the warehouse or directly to the warehouse depending on some configs values.
However, we then run into challenges to solve where conflicts can arise:
Normalization code should probably be tweaked to be more easily compatible with the tables produced by this new denormalized destination instead. But that would mean more development work into the scope of this issue.
As a result, it is more straightforward to separate them into two distinct connectors for the moment with the "de-normalized" destination not being able to support normalization (and not supporting append-dedup either). In those cases, users can fallback to standard
destination-bigquery
or implement their own custom transformations post-sync instead for the moment.When/if normalization is refactored to be compatible with de-normalized tables, then we could merge the two bigquery destinations back together into a single connector.
Recommended reading order
airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestination.java
airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedRecordConsumer.java
airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/JsonSchemaType.java
airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java
Pre-merge Checklist
Expand the checklist which is relevant for this PR.
Connector checklist
airbyte_secret
in output spec./gradlew :airbyte-integrations:connectors:<name>:integrationTest
./test connector=connectors/<name>
command as documented here is passing.docs/integrations/
directory./publish
command described here