Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

source-bigquery: escape column names and update google cloud bigquery artifact #11484

Merged
merged 13 commits into from
Apr 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
- name: BigQuery
sourceDefinitionId: bfd1ddf8-ae8a-4620-b1d7-55597d2ba08c
dockerRepository: airbyte/source-bigquery
dockerImageTag: 0.1.6
dockerImageTag: 0.1.7
documentationUrl: https://docs.airbyte.io/integrations/sources/bigquery
icon: bigquery.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-bigquery:0.1.6"
- dockerImage: "airbyte/source-bigquery:0.1.7"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/bigquery"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-bigquery/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ ENV APPLICATION source-bigquery
COPY --from=build /airbyte /airbyte

# Airbyte's build system uses these labels to know what to name and tag the docker images produced by this Dockerfile.
LABEL io.airbyte.version=0.1.6
LABEL io.airbyte.version=0.1.7
LABEL io.airbyte.name=airbyte/source-bigquery
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ application {
}

dependencies {
implementation 'com.google.cloud:google-cloud-bigquery:1.122.2'
implementation 'com.google.cloud:google-cloud-bigquery:2.10.1'
implementation 'org.apache.commons:commons-lang3:3.11'
implementation project(':airbyte-db:lib')
implementation project(':airbyte-integrations:bases:base-java')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@
public class BigQuerySource extends AbstractRelationalDbSource<StandardSQLTypeName, BigQueryDatabase> implements Source {

private static final Logger LOGGER = LoggerFactory.getLogger(BigQuerySource.class);
private static final String QUOTE = "`";

public static final String CONFIG_DATASET_ID = "dataset_id";
public static final String CONFIG_PROJECT_ID = "project_id";
public static final String CONFIG_CREDS = "credentials_json";

private final String quote = "";
private JsonNode dbConfig;
private final BigQuerySourceOperations sourceOperations = new BigQuerySourceOperations();

Expand Down Expand Up @@ -129,7 +129,7 @@ protected Map<String, List<String>> discoverPrimaryKeys(final BigQueryDatabase d

@Override
protected String getQuoteString() {
return quote;
return QUOTE;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,39 +7,29 @@
import static io.airbyte.integrations.source.bigquery.BigQuerySource.CONFIG_CREDS;
import static io.airbyte.integrations.source.bigquery.BigQuerySource.CONFIG_DATASET_ID;
import static io.airbyte.integrations.source.bigquery.BigQuerySource.CONFIG_PROJECT_ID;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.db.bigquery.BigQueryDatabase;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.List;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class BigQuerySourceTest {
abstract class AbstractBigQuerySourceTest {

private static final Path CREDENTIALS_PATH = Path.of("secrets/credentials.json");
private static final String STREAM_NAME = "id_and_name";

private BigQueryDatabase database;
private Dataset dataset;
private JsonNode config;
protected BigQueryDatabase database;
protected Dataset dataset;
protected JsonNode config;

@BeforeEach
void setUp() throws IOException, SQLException {
Expand Down Expand Up @@ -69,37 +59,16 @@ void setUp() throws IOException, SQLException {
DatasetInfo.newBuilder(config.get(CONFIG_DATASET_ID).asText()).setLocation(datasetLocation).build();
dataset = database.getBigQuery().create(datasetInfo);

database.execute(
"CREATE TABLE " + datasetId
+ ".id_and_name(id INT64, array_val ARRAY<STRUCT<key string, value STRUCT<string_val string>>>, object_val STRUCT<val_array ARRAY<STRUCT<value_str1 string>>, value_str2 string>);");
database.execute(
"INSERT INTO " + datasetId
+ ".id_and_name (id, array_val, object_val) VALUES "
+ "(1, [STRUCT('test1_1', STRUCT('struct1_1')), STRUCT('test1_2', STRUCT('struct1_2'))], STRUCT([STRUCT('value1_1'), STRUCT('value1_2')], 'test1_1')), "
+ "(2, [STRUCT('test2_1', STRUCT('struct2_1')), STRUCT('test2_2', STRUCT('struct2_2'))], STRUCT([STRUCT('value2_1'), STRUCT('value2_2')], 'test2_1')), "
+ "(3, [STRUCT('test3_1', STRUCT('struct3_1')), STRUCT('test3_2', STRUCT('struct3_2'))], STRUCT([STRUCT('value3_1'), STRUCT('value3_2')], 'test3_1'));");
createTable(datasetId);
}

@AfterEach
void tearDown() {
database.cleanDataSet(dataset.getDatasetId().getDataset());
}

@Test
public void testReadSuccess() throws Exception {
final List<AirbyteMessage> actualMessages = MoreIterators.toList(new BigQuerySource().read(config, getConfiguredCatalog(), null));
protected abstract void createTable(String datasetId) throws SQLException;

assertNotNull(actualMessages);
assertEquals(3, actualMessages.size());
}

private ConfiguredAirbyteCatalog getConfiguredCatalog() {
return CatalogHelpers.createConfiguredAirbyteCatalog(
STREAM_NAME,
config.get(CONFIG_DATASET_ID).asText(),
Field.of("id", JsonSchemaType.NUMBER),
Field.of("array_val", JsonSchemaType.ARRAY),
Field.of("object_val", JsonSchemaType.OBJECT));
}
protected abstract ConfiguredAirbyteCatalog getConfiguredCatalog();

}
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,15 @@ protected void initTests() {
.addInsertValues("[STRUCT('qqq' as fff, [STRUCT('fff' as ooo, 1 as kkk), STRUCT('hhh' as ooo, 2 as kkk)] as ggg)]")
.addExpectedValues("[{\"fff\":\"qqq\",\"ggg\":[{\"ooo\":\"fff\",\"kkk\":1},{\"ooo\":\"hhh\",\"kkk\":2}]}]")
.build());

addDataTypeTestData(
TestDataHolder.builder()
.sourceType("interval")
.airbyteType(JsonSchemaType.STRING)
.createTablePatternSql(CREATE_SQL_PATTERN)
.addInsertValues("MAKE_INTERVAL(2021, 10, 10, 10, 10, 10)", "null")
.addExpectedValues("2021-10 10 10:10:10", null)
.build());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.bigquery;

import static io.airbyte.integrations.source.bigquery.BigQuerySource.CONFIG_DATASET_ID;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

import io.airbyte.commons.util.MoreIterators;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import java.sql.SQLException;
import java.util.List;
import org.junit.jupiter.api.Test;

public class BigQuerySourceEscapeColumnNameTest extends AbstractBigQuerySourceTest {

@Override
public void createTable(String datasetId) throws SQLException {
// create column name interval which should be escaped
database.execute("CREATE TABLE " + datasetId + ".id_and_interval(id INT64, `interval` STRING);");
database.execute("INSERT INTO " + datasetId + ".id_and_interval (id, `interval`) VALUES (1,'picard');");
}

@Test
public void testReadSuccess() throws Exception {
final List<AirbyteMessage> actualMessages = MoreIterators.toList(new BigQuerySource().read(config, getConfiguredCatalog(), null));
assertNotNull(actualMessages);
assertEquals(1, actualMessages.size());

assertNotNull(actualMessages.get(0).getRecord().getData().get("interval"));
assertEquals("picard", actualMessages.get(0).getRecord().getData().get("interval").asText());
}

protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
return CatalogHelpers.createConfiguredAirbyteCatalog(
"id_and_interval",
config.get(CONFIG_DATASET_ID).asText(),
Field.of("id", JsonSchemaType.NUMBER),
Field.of("interval", JsonSchemaType.STRING));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.bigquery;

import static io.airbyte.integrations.source.bigquery.BigQuerySource.CONFIG_DATASET_ID;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

import io.airbyte.commons.util.MoreIterators;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import java.sql.SQLException;
import java.util.List;
import org.junit.jupiter.api.Test;

public class BigQueryStructureSourceTest extends AbstractBigQuerySourceTest {

@Override
protected void createTable(String datasetId) throws SQLException {
database.execute(
"CREATE TABLE " + datasetId
+ ".id_and_name(id INT64, array_val ARRAY<STRUCT<key string, value STRUCT<string_val string>>>, object_val STRUCT<val_array ARRAY<STRUCT<value_str1 string>>, value_str2 string>);");
database.execute(
"INSERT INTO " + datasetId
+ ".id_and_name (id, array_val, object_val) VALUES "
+ "(1, [STRUCT('test1_1', STRUCT('struct1_1')), STRUCT('test1_2', STRUCT('struct1_2'))], STRUCT([STRUCT('value1_1'), STRUCT('value1_2')], 'test1_1')), "
+ "(2, [STRUCT('test2_1', STRUCT('struct2_1')), STRUCT('test2_2', STRUCT('struct2_2'))], STRUCT([STRUCT('value2_1'), STRUCT('value2_2')], 'test2_1')), "
+ "(3, [STRUCT('test3_1', STRUCT('struct3_1')), STRUCT('test3_2', STRUCT('struct3_2'))], STRUCT([STRUCT('value3_1'), STRUCT('value3_2')], 'test3_1'));");
}

protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
return CatalogHelpers.createConfiguredAirbyteCatalog(
"id_and_name",
config.get(CONFIG_DATASET_ID).asText(),
Field.of("id", JsonSchemaType.NUMBER),
Field.of("array_val", JsonSchemaType.ARRAY),
Field.of("object_val", JsonSchemaType.OBJECT));
}

@Test
public void testReadSuccess() throws Exception {
final List<AirbyteMessage> actualMessages = MoreIterators.toList(new BigQuerySource().read(config, getConfiguredCatalog(), null));

assertNotNull(actualMessages);
assertEquals(3, actualMessages.size());
}

}
1 change: 1 addition & 0 deletions docs/integrations/sources/bigquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ Once you've configured BigQuery as a source, delete the Service Account Key from

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.7 | 2022-04-11 | [11484](https://github.com/airbytehq/airbyte/pull/11484) | BigQuery connector escape column names |
| 0.1.6 | 2022-02-14 | [10256](https://github.com/airbytehq/airbyte/pull/10256) | Add `-XX:+ExitOnOutOfMemoryError` JVM option |
| 0.1.5 | 2021-12-23 | [8434](https://github.com/airbytehq/airbyte/pull/8434) | Update fields in source-connectors specifications |
| 0.1.4 | 2021-09-30 | [\#6524](https://github.com/airbytehq/airbyte/pull/6524) | Allow `dataset_id` null in spec |
Expand Down