diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/079d5540-f236-4294-ba7c-ade8fd918496.json b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/079d5540-f236-4294-ba7c-ade8fd918496.json index ea1fd8ef6ef9..2a948beb7b30 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/079d5540-f236-4294-ba7c-ade8fd918496.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/079d5540-f236-4294-ba7c-ade8fd918496.json @@ -2,6 +2,6 @@ "destinationDefinitionId": "079d5540-f236-4294-ba7c-ade8fd918496", "name": "BigQuery (denormalized typed struct)", "dockerRepository": "airbyte/destination-bigquery-denormalized", - "dockerImageTag": "0.1.6", + "dockerImageTag": "0.1.8", "documentationUrl": "https://docs.airbyte.io/integrations/destinations/bigquery" } diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 8f37a86f8f06..ff89e7a3e60d 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -11,7 +11,7 @@ - name: BigQuery (denormalized typed struct) destinationDefinitionId: 079d5540-f236-4294-ba7c-ade8fd918496 dockerRepository: airbyte/destination-bigquery-denormalized - dockerImageTag: 0.1.7 + dockerImageTag: 0.1.8 documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery - name: Chargify (Keen) destinationDefinitionId: 81740ce8-d764-4ea7-94df-16bb41de36ae diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/BOOTSTRAP.md b/airbyte-integrations/connectors/destination-bigquery-denormalized/BOOTSTRAP.md new file mode 100644 index 000000000000..edb26b327d2a --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/BOOTSTRAP.md @@ -0,0 +1,5 @@ +# BigQuery Denormalized Destination Connector Bootstrap + +Instead of splitting the final data into multiple tables, this destination leverages BigQuery capabilities with [Structured and Repeated fields](https://cloud.google.com/bigquery/docs/nested-repeated) to produce a single "big" table per stream. This does not write the `_airbyte_raw_*` tables in the destination and normalization from this connector is not supported at this time. + +See [this](https://docs.airbyte.io/integrations/destinations/databricks) link for the nuances about the connector. \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile b/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile index 2ad0b213627c..ec6426734c09 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.1.7 +LABEL io.airbyte.version=0.1.8 LABEL io.airbyte.name=airbyte/destination-bigquery-denormalized diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedRecordConsumer.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedRecordConsumer.java index d52da1ffe77b..9048dab2a3b6 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedRecordConsumer.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedRecordConsumer.java @@ -40,10 +40,10 @@ public class BigQueryDenormalizedRecordConsumer extends BigQueryRecordConsumer { private final Set invalidKeys; public BigQueryDenormalizedRecordConsumer(final BigQuery bigquery, - final Map writeConfigs, - final ConfiguredAirbyteCatalog catalog, - final Consumer outputRecordCollector, - final StandardNameTransformer namingResolver) { + final Map writeConfigs, + final ConfiguredAirbyteCatalog catalog, + final Consumer outputRecordCollector, + final StandardNameTransformer namingResolver) { super(bigquery, writeConfigs, catalog, outputRecordCollector, false, false); this.namingResolver = namingResolver; invalidKeys = new HashSet<>(); @@ -59,6 +59,7 @@ protected JsonNode formatRecord(final Schema schema, final AirbyteRecordMessage final ObjectNode data = (ObjectNode) formatData(schema.getFields(), recordMessage.getData()); data.put(JavaBaseConstants.COLUMN_NAME_AB_ID, UUID.randomUUID().toString()); data.put(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, formattedEmittedAt); + return data; } @@ -67,6 +68,10 @@ protected JsonNode formatData(final FieldList fields, final JsonNode root) { if (fields == null) { return root; } + List dateTimeFields = BigQueryUtils.getDateTimeFieldsFromSchema(fields); + if (!dateTimeFields.isEmpty()) { + BigQueryUtils.transformJsonDateTimeToBigDataFormat(dateTimeFields, (ObjectNode) root); + } if (root.isObject()) { final List fieldNames = fields.stream().map(Field::getName).collect(Collectors.toList()); return Jsons.jsonNode(Jsons.keys(root).stream() diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationTest.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationTest.java index aa9026098504..aa13e9fb02c4 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationTest.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationTest.java @@ -4,6 +4,7 @@ package io.airbyte.integrations.destination.bigquery; +import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.*; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.params.provider.Arguments.arguments; @@ -42,6 +43,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; +import org.joda.time.DateTime; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -73,6 +75,10 @@ class BigQueryDenormalizedDestinationTest { .withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME) .withData(getDataWithFormats()) .withEmittedAt(NOW.toEpochMilli())); + private static final AirbyteMessage MESSAGE_USERS4 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) + .withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME) + .withData(getDataWithJSONDateTimeFormats()) + .withEmittedAt(NOW.toEpochMilli())); private JsonNode config; @@ -109,6 +115,7 @@ void setup(final TestInfo info) throws IOException { MESSAGE_USERS1.getRecord().setNamespace(datasetId); MESSAGE_USERS2.getRecord().setNamespace(datasetId); MESSAGE_USERS3.getRecord().setNamespace(datasetId); + MESSAGE_USERS4.getRecord().setNamespace(datasetId); final DatasetInfo datasetInfo = DatasetInfo.newBuilder(datasetId).setLocation(datasetLocation).build(); dataset = bigquery.create(datasetInfo); @@ -199,7 +206,7 @@ void testWriteWithFormat() throws Exception { // Bigquery's datetime type accepts multiple input format but always outputs the same, so we can't // expect to receive the value we sent. - assertEquals(extractJsonValues(resultJson, "updated_at"), Set.of("2018-08-19T12:11:35.220")); + assertEquals(extractJsonValues(resultJson, "updated_at"), Set.of("2021-10-11T06:36:53")); final Schema expectedSchema = Schema.of( Field.of("name", StandardSQLTypeName.STRING), @@ -211,6 +218,29 @@ void testWriteWithFormat() throws Exception { assertEquals(BigQueryUtils.getTableDefinition(bigquery, dataset.getDatasetId().getDataset(), USERS_STREAM_NAME).getSchema(), expectedSchema); } + @Test + void testIfJSONDateTimeWasConvertedToBigQueryFormat() throws Exception { + catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(new ConfiguredAirbyteStream() + .withStream(new AirbyteStream().withName(USERS_STREAM_NAME).withNamespace(datasetId).withJsonSchema(getSchemaWithDateTime())) + .withSyncMode(SyncMode.FULL_REFRESH).withDestinationSyncMode(DestinationSyncMode.OVERWRITE))); + + final BigQueryDestination destination = new BigQueryDenormalizedDestination(); + final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector); + + consumer.accept(MESSAGE_USERS4); + consumer.close(); + + final List usersActual = retrieveRecordsAsJson(USERS_STREAM_NAME); + assertEquals(usersActual.size(), 1); + final JsonNode resultJson = usersActual.get(0); + + // BigQuery Accepts "YYYY-MM-DD HH:MM:SS[.SSSSSS]" format + // returns "yyyy-MM-dd'T'HH:mm:ss" format + assertEquals(Set.of(new DateTime("2021-10-11T06:36:53+00:00").toString("yyyy-MM-dd'T'HH:mm:ss")), extractJsonValues(resultJson, "updated_at")); + //check nested datetime + assertEquals(Set.of(new DateTime("2021-11-11T06:36:53+00:00").toString("yyyy-MM-dd'T'HH:mm:ss")), extractJsonValues(resultJson.get("items"), "nested_datetime")); + } + private Set extractJsonValues(final JsonNode node, final String attributeName) { final List valuesNode = node.findValues(attributeName); final Set resultSet = new HashSet<>(); @@ -233,7 +263,6 @@ private List retrieveRecordsAsJson(final String tableName) throws Exce .newBuilder( String.format("select TO_JSON_STRING(t) as jsonValue from %s.%s t;", dataset.getDatasetId().getDataset(), tableName.toLowerCase())) .setUseLegacySql(false).build(); - BigQueryUtils.executeQuery(bigquery, queryConfig); return StreamSupport @@ -249,171 +278,4 @@ private static Stream schemaAndDataProvider() { arguments(getSchemaWithInvalidArrayType(), MESSAGE_USERS1), arguments(getSchema(), MESSAGE_USERS2)); } - - private static JsonNode getSchema() { - return Jsons.deserialize( - "{\n" - + " \"type\": [\n" - + " \"object\"\n" - + " ],\n" - + " \"properties\": {\n" - + " \"name\": {\n" - + " \"type\": [\n" - + " \"string\"\n" - + " ]\n" - + " },\n" - + " \"permissions\": {\n" - + " \"type\": [\n" - + " \"array\"\n" - + " ],\n" - + " \"items\": {\n" - + " \"type\": [\n" - + " \"object\"\n" - + " ],\n" - + " \"properties\": {\n" - + " \"domain\": {\n" - + " \"type\": [\n" - + " \"string\"\n" - + " ]\n" - + " },\n" - + " \"grants\": {\n" - + " \"type\": [\n" - + " \"array\"\n" - + " ],\n" - + " \"items\": {\n" - + " \"type\": [\n" - + " \"string\"\n" - + " ]\n" - + " }\n" - + " }\n" - + " }\n" - + " }\n" - + " }\n" - + " }\n" - + "}"); - - } - - private static JsonNode getSchemaWithFormats() { - return Jsons.deserialize( - "{\n" - + " \"type\": [\n" - + " \"object\"\n" - + " ],\n" - + " \"properties\": {\n" - + " \"name\": {\n" - + " \"type\": [\n" - + " \"string\"\n" - + " ]\n" - + " },\n" - + " \"date_of_birth\": {\n" - + " \"type\": [\n" - + " \"string\"\n" - + " ],\n" - + " \"format\": \"date\"\n" - + " },\n" - + " \"updated_at\": {\n" - + " \"type\": [\n" - + " \"string\"\n" - + " ],\n" - + " \"format\": \"date-time\"\n" - + " }\n" - + " }\n" - + "}"); - } - - private static JsonNode getSchemaWithInvalidArrayType() { - return Jsons.deserialize( - "{\n" - + " \"type\": [\n" - + " \"object\"\n" - + " ],\n" - + " \"properties\": {\n" - + " \"name\": {\n" - + " \"type\": [\n" - + " \"string\"\n" - + " ]\n" - + " },\n" - + " \"permissions\": {\n" - + " \"type\": [\n" - + " \"array\"\n" - + " ],\n" - + " \"items\": {\n" - + " \"type\": [\n" - + " \"object\"\n" - + " ],\n" - + " \"properties\": {\n" - + " \"domain\": {\n" - + " \"type\": [\n" - + " \"string\"\n" - + " ]\n" - + " },\n" - + " \"grants\": {\n" - + " \"type\": [\n" - + " \"array\"\n" // missed "items" element - + " ]\n" - + " }\n" - + " }\n" - + " }\n" - + " }\n" - + " }\n" - + "}"); - - } - - private static JsonNode getData() { - return Jsons.deserialize( - "{\n" - + " \"name\": \"Andrii\",\n" - + " \"permissions\": [\n" - + " {\n" - + " \"domain\": \"abs\",\n" - + " \"grants\": [\n" - + " \"admin\"\n" - + " ]\n" - + " },\n" - + " {\n" - + " \"domain\": \"tools\",\n" - + " \"grants\": [\n" - + " \"read\", \"write\"\n" - + " ]\n" - + " }\n" - + " ]\n" - + "}"); - } - - private static JsonNode getDataWithFormats() { - return Jsons.deserialize( - "{\n" - + " \"name\": \"Andrii\",\n" - + " \"date_of_birth\": \"1996-01-25\",\n" - + " \"updated_at\": \"2018-08-19 12:11:35.22\"\n" - + "}"); - } - - private static JsonNode getDataWithEmptyObjectAndArray() { - return Jsons.deserialize( - "{\n" - + " \"name\": \"Andrii\",\n" - + " \"permissions\": [\n" - + " {\n" - + " \"domain\": \"abs\",\n" - + " \"items\": {},\n" // empty object - + " \"grants\": [\n" - + " \"admin\"\n" - + " ]\n" - + " },\n" - + " {\n" - + " \"domain\": \"tools\",\n" - + " \"grants\": [],\n" // empty array - + " \"items\": {\n" // object with empty array and object - + " \"object\": {},\n" - + " \"array\": []\n" - + " }\n" - + " }\n" - + " ]\n" - + "}"); - - } - } diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/util/BigQueryDenormalizedTestDataUtils.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/util/BigQueryDenormalizedTestDataUtils.java new file mode 100644 index 000000000000..c2fa24cdec10 --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/util/BigQueryDenormalizedTestDataUtils.java @@ -0,0 +1,224 @@ +package io.airbyte.integrations.destination.bigquery.util; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.json.Jsons; + +public class BigQueryDenormalizedTestDataUtils { + + public static JsonNode getSchema() { + return Jsons.deserialize( + "{\n" + + " \"type\": [\n" + + " \"object\"\n" + + " ],\n" + + " \"properties\": {\n" + + " \"accepts_marketing_updated_at\": {\n" + + " \"type\": [\n" + + " \"null\",\n" + + " \"string\"\n" + + " ],\n" + + " \"format\": \"date-time\"\n" + + " },\n" + + " \"name\": {\n" + + " \"type\": [\n" + + " \"string\"\n" + + " ]\n" + + " },\n" + + " \"permissions\": {\n" + + " \"type\": [\n" + + " \"array\"\n" + + " ],\n" + + " \"items\": {\n" + + " \"type\": [\n" + + " \"object\"\n" + + " ],\n" + + " \"properties\": {\n" + + " \"domain\": {\n" + + " \"type\": [\n" + + " \"string\"\n" + + " ]\n" + + " },\n" + + " \"grants\": {\n" + + " \"type\": [\n" + + " \"array\"\n" + + " ],\n" + + " \"items\": {\n" + + " \"type\": [\n" + + " \"string\"\n" + + " ]\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + "}"); + + } + + public static JsonNode getSchemaWithFormats() { + return Jsons.deserialize( + "{\n" + + " \"type\": [\n" + + " \"object\"\n" + + " ],\n" + + " \"properties\": {\n" + + " \"name\": {\n" + + " \"type\": [\n" + + " \"string\"\n" + + " ]\n" + + " },\n" + + " \"date_of_birth\": {\n" + + " \"type\": [\n" + + " \"string\"\n" + + " ],\n" + + " \"format\": \"date\"\n" + + " },\n" + + " \"updated_at\": {\n" + + " \"type\": [\n" + + " \"string\"\n" + + " ],\n" + + " \"format\": \"date-time\"\n" + + " }\n" + + " }\n" + + "}"); + } + + public static JsonNode getSchemaWithDateTime() { + return Jsons.deserialize( + "{\n" + + " \"type\": [\n" + + " \"object\"\n" + + " ],\n" + + " \"properties\": {\n" + + " " + + + "\"updated_at\": {\n" + + " \"type\": [\n" + + " \"string\"\n" + + " ],\n" + + " \"format\": \"date-time\"\n" + + " },\n" + + " \"items\": {\n" + + " \"type\": [\n" + + " \"object\"\n" + + " ],\n" + + " \"properties\": {\n" + + " \"nested_datetime\": {\n" + + " \"type\": [\n" + + " \"string\"\n" + + " ],\n" + + " \"format\": \"date-time\"\n" + + " }\n" + + + " " + + "}\n" + + " }\n" + + " }\n" + + "}"); + } + + public static JsonNode getSchemaWithInvalidArrayType() { + return Jsons.deserialize( + "{\n" + + " \"type\": [\n" + + " \"object\"\n" + + " ],\n" + + " \"properties\": {\n" + + " \"name\": {\n" + + " \"type\": [\n" + + " \"string\"\n" + + " ]\n" + + " },\n" + + " \"permissions\": {\n" + + " \"type\": [\n" + + " \"array\"\n" + + " ],\n" + + " \"items\": {\n" + + " \"type\": [\n" + + " \"object\"\n" + + " ],\n" + + " \"properties\": {\n" + + " \"domain\": {\n" + + " \"type\": [\n" + + " \"string\"\n" + + " ]\n" + + " },\n" + + " \"grants\": {\n" + + " \"type\": [\n" + + " \"array\"\n" // missed "items" element + + " ]\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + "}"); + + } + + public static JsonNode getData() { + return Jsons.deserialize( + "{\n" + + " \"name\": \"Andrii\",\n" + + " \"accepts_marketing_updated_at\": \"2021-10-11T06:36:53-07:00\",\n" + + " \"permissions\": [\n" + + " {\n" + + " \"domain\": \"abs\",\n" + + " \"grants\": [\n" + + " \"admin\"\n" + + " ]\n" + + " },\n" + + " {\n" + + " \"domain\": \"tools\",\n" + + " \"grants\": [\n" + + " \"read\", \"write\"\n" + + " ]\n" + + " }\n" + + " ]\n" + + "}"); + } + + public static JsonNode getDataWithFormats() { + return Jsons.deserialize( + "{\n" + + " \"name\": \"Andrii\",\n" + + " \"date_of_birth\": \"1996-01-25\",\n" + + " \"updated_at\": \"2021-10-11T06:36:53\"\n" + + "}"); + } + + public static JsonNode getDataWithJSONDateTimeFormats() { + return Jsons.deserialize( + "{\n" + + " \"updated_at\": \"2021-10-11T06:36:53+00:00\",\n" + + " \"items\": {\n" + + " \"nested_datetime\": \"2021-11-11T06:36:53+00:00\"\n" + + " }\n" + + "}"); + } + + public static JsonNode getDataWithEmptyObjectAndArray() { + return Jsons.deserialize( + "{\n" + + " \"name\": \"Andrii\",\n" + + " \"permissions\": [\n" + + " {\n" + + " \"domain\": \"abs\",\n" + + " \"items\": {},\n" // empty object + + " \"grants\": [\n" + + " \"admin\"\n" + + " ]\n" + + " },\n" + + " {\n" + + " \"domain\": \"tools\",\n" + + " \"grants\": [],\n" // empty array + + " \"items\": {\n" // object with empty array and object + + " \"object\": {},\n" + + " \"array\": []\n" + + " }\n" + + " }\n" + + " ]\n" + + "}"); + } +} diff --git a/airbyte-integrations/connectors/destination-bigquery/BOOTSTRAP.md b/airbyte-integrations/connectors/destination-bigquery/BOOTSTRAP.md new file mode 100644 index 000000000000..9a5d31b12234 --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery/BOOTSTRAP.md @@ -0,0 +1,8 @@ +# BigQuery Destination Connector Bootstrap + +BigQuery is a serverless, highly scalable, and cost-effective data warehouse +offered by Google Cloud Provider. + +BigQuery connector is producing the standard Airbyte outputs using a `_airbyte_raw_*` tables storing the JSON blob data first. Afterward, these are transformed and normalized into separate tables, potentially "exploding" nested streams into their own tables if [basic normalization](https://docs.airbyte.io/understanding-airbyte/basic-normalization) is configured. + +See [this](https://docs.airbyte.io/integrations/destinations/bigquery) link for more information about the connector. diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java index d5fc8a397cb0..613ec652407f 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java @@ -5,16 +5,21 @@ package io.airbyte.integrations.destination.bigquery; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryException; import com.google.cloud.bigquery.Clustering; import com.google.cloud.bigquery.Dataset; import com.google.cloud.bigquery.DatasetInfo; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.FieldList; import com.google.cloud.bigquery.Job; import com.google.cloud.bigquery.JobId; import com.google.cloud.bigquery.JobInfo; import com.google.cloud.bigquery.QueryJobConfiguration; +import com.google.cloud.bigquery.QueryParameterValue; import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardSQLTypeName; import com.google.cloud.bigquery.StandardTableDefinition; import com.google.cloud.bigquery.TableDefinition; import com.google.cloud.bigquery.TableId; @@ -24,15 +29,20 @@ import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.base.JavaBaseConstants; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import java.util.ArrayList; +import java.util.List; import java.util.Set; import java.util.UUID; import org.apache.commons.lang3.tuple.ImmutablePair; +import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class BigQueryUtils { private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryUtils.class); + private static final String BIG_QUERY_DATETIME_FORMAT = "yyyy-MM-dd HH:mm:ss.SSSSSS"; static ImmutablePair executeQuery(final BigQuery bigquery, final QueryJobConfiguration queryConfig) { final JobId jobId = JobId.of(UUID.randomUUID().toString()); @@ -143,4 +153,40 @@ static TableDefinition getTableDefinition(final BigQuery bigquery, final String return bigquery.getTable(tableId).getDefinition(); } + /** + * @param fieldList - the list to be checked + * @return The list of fields with datetime format. + * + */ + public static List getDateTimeFieldsFromSchema(FieldList fieldList) { + List dateTimeFields = new ArrayList<>(); + for (Field field : fieldList) { + if (field.getType().getStandardType().equals(StandardSQLTypeName.DATETIME)) { + dateTimeFields.add(field.getName()); + } + } + return dateTimeFields; + } + + /** + * @param dateTimeFields - list contains fields of DATETIME format + * @param data - Json will be sent to Google BigData service + * + * The special DATETIME format is required to save this type to BigQuery. + * @see Supported Google bigquery datatype + * This method is responsible to adapt JSON DATETIME to Bigquery + */ + public static void transformJsonDateTimeToBigDataFormat(List dateTimeFields, ObjectNode data) { + dateTimeFields.forEach(e -> { + if (data.findValue(e) != null && !data.get(e).isNull()) { + String googleBigQueryDateFormat = QueryParameterValue + .dateTime(new DateTime(data + .findValue(e) + .asText()) + .toString(BIG_QUERY_DATETIME_FORMAT)) + .getValue(); + data.put(e, googleBigQueryDateFormat); + } + }); + } } diff --git a/docs/integrations/destinations/bigquery.md b/docs/integrations/destinations/bigquery.md index 3b691446003a..ede3b62ab233 100644 --- a/docs/integrations/destinations/bigquery.md +++ b/docs/integrations/destinations/bigquery.md @@ -169,6 +169,7 @@ Therefore, Airbyte BigQuery destination will convert any invalid characters into | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | +| 0.1.8 | 2021-10-27 | [\#7413](https://github.com/airbytehq/airbyte/issues/7413) | Fixed DATETIME conversion for BigQuery | | 0.1.7 | 2021-10-26 | [\#7240](https://github.com/airbytehq/airbyte/issues/7240) | Output partitioned/clustered tables | | 0.1.6 | 2021-09-16 | [\#6145](https://github.com/airbytehq/airbyte/pull/6145) | BigQuery Denormalized support for date, datetime & timestamp types through the json "format" key | | 0.1.5 | 2021-09-07 | [\#5881](https://github.com/airbytehq/airbyte/pull/5881) | BigQuery Denormalized NPE fix |