Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 New BigQuery destination with Structured/Repeated Records #4176

Merged
merged 14 commits into from
Jun 23, 2021

Conversation

ChristopheDuong
Copy link
Contributor

@ChristopheDuong ChristopheDuong commented Jun 17, 2021

What

Closes #1927

How

A new destination that does not rely on base-normalization but implements its own native normalization with BigQuery by converting JSON Schema into google cloud schema, and thus, handle structured//repeated records/arrays.

Implementation notes

This issue was not doable from base-normalization python/dbt codebase as it seemed difficult (not possible?) to implement in BQ Standard SQL logic that parses a JSON column string into separate columns while building or casting to a STRUCT field... (the logic with nested is making this harder to think about too, and certainly not very efficient!)

It seemed more standard (from google docs) to provide the JSON Schema to BigQuery at loading time instead when creating the tables. Therefore, do the implementation in the java codebase.

I started implementing this following the same pattern of CopyDestination that is first uploading to a cloud storage and then load the warehouse or directly to the warehouse depending on some configs values.

However, we then run into challenges to solve where conflicts can arise:

  1. Run a BigQuery destination with struct/repeated fields
  2. The destination could additionally persist into _airbyte_raw tables with JSON Blobs
  3. Run normalization that will override what was produced by step 1 using what is produced by step 2 is counterproductive

Normalization code should probably be tweaked to be more easily compatible with the tables produced by this new denormalized destination instead. But that would mean more development work into the scope of this issue.

As a result, it is more straightforward to separate them into two distinct connectors for the moment with the "de-normalized" destination not being able to support normalization (and not supporting append-dedup either). In those cases, users can fallback to standard destination-bigquery or implement their own custom transformations post-sync instead for the moment.

When/if normalization is refactored to be compatible with de-normalized tables, then we could merge the two bigquery destinations back together into a single connector.

Recommended reading order

  1. airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestination.java
  2. airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedRecordConsumer.java
  3. airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/JsonSchemaType.java
  4. airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java
  5. the rest

Pre-merge Checklist

Expand the checklist which is relevant for this PR.

Connector checklist

  • Issue acceptance criteria met
  • PR name follows PR naming conventions
  • Secrets are annotated with airbyte_secret in output spec
  • Unit & integration tests added as appropriate (and are passing)
    • Community members: please provide proof of this succeeding locally e.g: screenshot or copy-paste acceptance test output. To run acceptance tests for a Python connector, follow instructions in the README. For java connectors run ./gradlew :airbyte-integrations:connectors:<name>:integrationTest.
  • /test connector=connectors/<name> command as documented here is passing.
    • Community members can skip this, Airbyters will run this for you.
  • Code reviews completed
  • Credentials added to Github CI if needed and not already present. instructions for injecting secrets into CI.
  • Documentation updated
    • README
    • CHANGELOG.md
    • Reference docs in the docs/integrations/ directory.
    • Build status added to build page
  • Build is successful
  • Connector version bumped like described here
  • New Connector version released on Dockerhub by running the /publish command described here
  • No major blockers
  • PR merged into master branch
  • Follow up tickets have been created
  • Associated tickets have been closed & stakeholders notified

@github-actions github-actions bot added the area/connectors Connector related issues label Jun 17, 2021
@ChristopheDuong ChristopheDuong marked this pull request as draft June 17, 2021 12:57
@ChristopheDuong
Copy link
Contributor Author

ChristopheDuong commented Jun 17, 2021

/test connector=connectors/destination-bigquery

🕑 connectors/destination-bigquery https://github.com/airbytehq/airbyte/actions/runs/946571871
✅ connectors/destination-bigquery https://github.com/airbytehq/airbyte/actions/runs/946571871

@ChristopheDuong
Copy link
Contributor Author

ChristopheDuong commented Jun 17, 2021

/test connector=connectors/destination-bigquery-denormalized

🕑 connectors/destination-bigquery-denormalized https://github.com/airbytehq/airbyte/actions/runs/946679518
✅ connectors/destination-bigquery-denormalized https://github.com/airbytehq/airbyte/actions/runs/946679518

@ChristopheDuong ChristopheDuong marked this pull request as ready for review June 17, 2021 15:48
Copy link
Contributor

@tuliren tuliren left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good! Left a few minor comments.

Also this destination should have a doc.

# Changelog

## 0.1.0
Implementation of a destination for BigQuery with RECORD/REPEATED columns instead of raw JSON blobs.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the latest guide, we should track the changelog in the public documentation of the connector:
https://docs.airbyte.io/contributing-to-airbyte/updating-documentation#changelogs

.filter(key -> {
final boolean validKey = fieldNames.contains(namingResolver.getIdentifier(key));
if (!validKey) {
LOGGER.warn("Ignoring field {} as it is not defined in catalog", key);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a debug level message? Otherwise, it can be quite noisy, since it can emitted for every record.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, thanks

}
if (fieldList.stream().noneMatch(f -> f.getName().equals(JavaBaseConstants.COLUMN_NAME_EMITTED_AT))) {
fieldList.add(Field.of(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, StandardSQLTypeName.TIMESTAMP));
}
Copy link
Contributor

@tuliren tuliren Jun 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the above two if checks always true? It seems that the original Json schema will never have the two Airbyte columns.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the original JSON Schema can have the two airbyte columns if the streams were produced by airbyte and re-used as source streams

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see. Good to know this.

Copy link
Contributor

@cgardens cgardens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. I feel like this should support copy destination functionality too. If that's too hard to do now then let's at least create an issue to do it.

@@ -0,0 +1,7 @@
{
"destinationDefinitionId": "079d5540-f236-4294-ba7c-ade8fd918496",
"name": "BigQuery de-normalized",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"name": "BigQuery de-normalized",
"name": "BigQuery (Typed Struct)",

I think this display name is a little bit clearer? Fine with me if you want to stick with denormalized but if you do it should be one word and I'd suggest putting it in parens. so BigQuery (Denormalized).

@@ -41,7 +41,7 @@ public String getRawTableName(String streamName) {

@Override
public String getTmpTableName(String streamName) {
return convertStreamName("_airbyte_" + Instant.now().toEpochMilli() + "_" + getRawTableName(streamName));
return convertStreamName(Strings.addRandomSuffix("_airbyte_tmp", "_", 3) + "_" + streamName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it was originally needed when I was producing both a raw table (json blob) and struct typed table by the same destination, so I needed two different "tmp" names. But then I moved away from that approach...

Should I revert to the old naming with timestamp?

@@ -389,9 +389,9 @@ def cast_property_type(self, property_name: str, column_name: str, jinja_column:
print(f"WARN: Unknown type for column {property_name} at {self.current_json_path()}")
return column_name
elif is_array(definition["type"]):
return self.cast_property_type_as_array(property_name, column_name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this changing? isn't this affecting the original BQ destination?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is doing the exact same thing as before (minus the extra function call) so it's not changing anything.

The extra function call was a placeholder to implement the struct/repeated "casting" there but it's actually not doable so not useful anymore

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BigQueryDenormalizedDestination extends BigQueryDestination {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain some more why we wouldn't want to make this a CopyDestination? I understand why this is split from BigQueryDestination--that totally makes sense to me. But it seems like this should be able to support CopyDestination and normal insert?

Copy link
Contributor Author

@ChristopheDuong ChristopheDuong Jun 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not saying it shouldn't be a CopyDestination, I just tried a similar approach to CopyDestination, and I faced issues that did not make it easy to pursue for this use case with two modes of writing for a destination. Thus, chose to make two destinations.

BigQueryDestination isn't currently implemented for CopyDestination, and it was not the goal of this PR either. But I guess we could indeed make both destination-bigquery and destination-bigquery-denormalized adopt a CopyDestination strategy too

@marcosmarxm
Copy link
Member

@ChristopheDuong i'd bumped a new version of the BigQuery destination last week, please merge master into your branch.

@github-actions github-actions bot added the area/documentation Improvements or additions to documentation label Jun 21, 2021
@ChristopheDuong
Copy link
Contributor Author

ChristopheDuong commented Jun 23, 2021

/publish connector=connectors/destination-bigquery

🕑 connectors/destination-bigquery https://github.com/airbytehq/airbyte/actions/runs/964401886
✅ connectors/destination-bigquery https://github.com/airbytehq/airbyte/actions/runs/964401886

@ChristopheDuong
Copy link
Contributor Author

ChristopheDuong commented Jun 23, 2021

/publish connector=connectors/destination-bigquery-denormalized

🕑 connectors/destination-bigquery-denormalized https://github.com/airbytehq/airbyte/actions/runs/964402184
✅ connectors/destination-bigquery-denormalized https://github.com/airbytehq/airbyte/actions/runs/964402184

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Load JSON data into BigQuery as structured data (records with repeated and nested fields, using STRUCT types)
6 participants