Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source support primary keys #2488

Merged
merged 5 commits into from
Mar 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "435bb9a5-7887-4809-aa58-28c27df0d7ad",
"name": "MySQL",
"dockerRepository": "airbyte/source-mysql",
"dockerImageTag": "0.2.0",
"dockerImageTag": "0.2.1",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/mysql"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "b5ea17b1-f170-46dc-bc31-cc744ca984c1",
"name": "Microsoft SQL Server (MSSQL)",
"dockerRepository": "airbyte/source-mssql",
"dockerImageTag": "0.2.0",
"dockerImageTag": "0.2.1",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-mssql"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "decd338e-5647-4c0b-adf4-da0e75f5a750",
"name": "Postgres",
"dockerRepository": "airbyte/source-postgres",
"dockerImageTag": "0.2.0",
"dockerImageTag": "0.2.1",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-postgres"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "e87ffa8e-a3b5-f69c-9076-6011339de1f6",
"name": "Redshift",
"dockerRepository": "airbyte/source-redshift",
"dockerImageTag": "0.2.0",
"dockerImageTag": "0.2.1",
"documentationUrl": "https://hub.docker.com/repository/docker/airbyte/source-redshift"
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
- sourceDefinitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
name: Microsoft SQL Server (MSSQL)
dockerRepository: airbyte/source-mssql
dockerImageTag: 0.2.0
dockerImageTag: 0.2.1
documentationUrl: https://hub.docker.com/r/airbyte/source-mssql
- sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
name: Postgres
dockerRepository: airbyte/source-postgres
dockerImageTag: 0.2.0
dockerImageTag: 0.2.1
documentationUrl: https://hub.docker.com/r/airbyte/source-postgres
- sourceDefinitionId: cd42861b-01fc-4658-a8ab-5d11d0510f01
name: Recurly
Expand All @@ -51,7 +51,7 @@
- sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
name: MySQL
dockerRepository: airbyte/source-mysql
dockerImageTag: 0.2.0
dockerImageTag: 0.2.1
documentationUrl: https://docs.airbyte.io/integrations/sources/mysql
- sourceDefinitionId: 2470e835-feaf-4db6-96f3-70fd645acc77
name: Salesforce
Expand Down Expand Up @@ -96,7 +96,7 @@
- sourceDefinitionId: e87ffa8e-a3b5-f69c-9076-6011339de1f6
name: Redshift
dockerRepository: airbyte/source-redshift
dockerImageTag: 0.2.0
dockerImageTag: 0.2.1
documentationUrl: https://hub.docker.com/repository/docker/airbyte/source-redshift
- sourceDefinitionId: 932e6363-d006-4464-a9f5-102b82e07c06
name: Twilio
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.sql.SQLException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -135,7 +136,9 @@ public AirbyteCatalog discover(JsonNode config) throws Exception {
Optional.ofNullable(config.get("schema")).map(JsonNode::asText))
.stream()
.map(t -> CatalogHelpers.createAirbyteStream(t.getName(), t.getFields())
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(
t.getPrimaryKeys().stream().filter(Objects::nonNull).map(Collections::singletonList).collect(Collectors.toList())))
Comment on lines +140 to +141
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

styling nit.

Suggested change
.withSourceDefinedPrimaryKey(
t.getPrimaryKeys().stream().filter(Objects::nonNull).map(Collections::singletonList).collect(Collectors.toList())))
.withSourceDefinedPrimaryKey(t.getPrimaryKeys()
.stream()
.filter(Objects::nonNull)
.map(Collections::singletonList)
.collect(Collectors.toList())))

.collect(Collectors.toList()));
}
}
Expand Down Expand Up @@ -290,7 +293,7 @@ private List<TableInfo> getTables(final JdbcDatabase database,
.distinct()
.collect(Collectors.toList());

return new TableInfo(JdbcUtils.getFullyQualifiedTableName(t.getSchemaName(), t.getName()), fields);
return new TableInfo(JdbcUtils.getFullyQualifiedTableName(t.getSchemaName(), t.getName()), fields, t.getPrimaryKeys());
})
.collect(Collectors.toList());
}
Expand All @@ -317,7 +320,7 @@ private List<TableInfoInternal> discoverInternal(final JdbcDatabase database,
final Optional<String> schemaOptional)
throws Exception {
final Set<String> internalSchemas = new HashSet<>(getExcludedInternalSchemas());
return database.bufferedResultSetQuery(
final List<TableInfoInternal> result = database.bufferedResultSetQuery(
conn -> conn.getMetaData().getColumns(databaseOptional.orElse(null), schemaOptional.orElse(null), null, null),
resultSet -> Jsons.jsonNode(ImmutableMap.<String, Object>builder()
// we always want a namespace, if we cannot get a schema, use db name.
Expand Down Expand Up @@ -355,6 +358,17 @@ private List<TableInfoInternal> discoverInternal(final JdbcDatabase database,
})
.collect(Collectors.toList())))
.collect(Collectors.toList());
result.forEach(t -> {
try {
final List<String> primaryKeys = database.bufferedResultSetQuery(
conn -> conn.getMetaData().getPrimaryKeys(databaseOptional.orElse(null), t.getSchemaName(), t.getName()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to do this query once per table? or if we leave table null, can we just run this query once for the whole database?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tried to run the function without table names.

It seems like for MySQL this is not allowed and is throwing an exception about not having a table name but it does work for some other Databases... I made a new PR for this

See also:

The method getTables takes a like-pattern for the tableNamePattern parameter, so "%" matches all table names.
The method getPrimaryKeys and getExportedKeys do not take a pattern, so you will need to loop over the result of
getTables and execute those methods for each row of the getTables result set.

https://stackoverflow.com/a/40841486

resultSet -> resultSet.getString(JDBC_COLUMN_COLUMN_NAME));
t.addPrimaryKeys(primaryKeys);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One convention that we try to follow in our java code is to keep objects immutable wherever possible. This pattern of setting fields after initialization should only be used if absolutely necessary (e.g. it represents different stages of the object's lifecycle). You'll notice that in almost all of our java classes all of the fields are final. Here it is not necessary to set the field late. The integrity of this object is compromised by because we happen to use two separate queries to construct it.

I would suggest the following you do both queries 1. get all columns. 2 get all primary keys. and then construct the table info one time (no need for addPrimaryKeys).

LMK if any of this is unclear or if the motivation doesn't make sense.

} catch (SQLException e) {
LOGGER.warn(String.format("Could not find primary keys for %s.%s: %s", t.getSchemaName(), t.getName(), e));
}
});
return result;
}

private static AutoCloseableIterator<AirbyteMessage> getMessageIterator(AutoCloseableIterator<JsonNode> recordIterator,
Expand Down Expand Up @@ -440,10 +454,12 @@ protected static class TableInfo {

private final String name;
private final List<Field> fields;
private final List<String> primaryKeys;

public TableInfo(String name, List<Field> fields) {
public TableInfo(String name, List<Field> fields, List<String> primaryKeys) {
this.name = name;
this.fields = fields;
this.primaryKeys = primaryKeys;
}

public String getName() {
Expand All @@ -454,18 +470,24 @@ public List<Field> getFields() {
return fields;
}

public List<String> getPrimaryKeys() {
return primaryKeys;
}

}

protected static class TableInfoInternal {

private final String schemaName;
private final String name;
private final List<ColumnInfo> fields;
private final List<String> primaryKeys;

public TableInfoInternal(String schemaName, String tableName, List<ColumnInfo> fields) {
this.schemaName = schemaName;
this.name = tableName;
this.fields = fields;
this.primaryKeys = new ArrayList<>();
}

public String getSchemaName() {
Expand All @@ -480,6 +502,14 @@ public List<ColumnInfo> getFields() {
return fields;
}

public void addPrimaryKeys(List<String> primaryKeys) {
this.primaryKeys.addAll(primaryKeys);
}

public List<String> getPrimaryKeys() {
return primaryKeys;
}

}

protected static class ColumnInfo {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import io.airbyte.protocol.models.SyncMode;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
Expand All @@ -87,6 +88,8 @@ public abstract class JdbcSourceStandardTest {
private static final Set<String> TEST_SCHEMAS = ImmutableSet.of(SCHEMA_NAME, SCHEMA_NAME2);

private static final String TABLE_NAME = "id_and_name";
private static final String TABLE_NAME_WITHOUT_PK = "id_and_name_without_pk";
private static final String TABLE_NAME_FULL_NAMES = "full_names";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we call this multi column pk or something? i had trouble parsing this name. wasn't until i read the create table query that i understood it.


private JsonNode config;
private JdbcDatabase database;
Expand Down Expand Up @@ -142,13 +145,30 @@ public void setup() throws Exception {
createSchemas();
}
database.execute(connection -> {

connection.createStatement()
.execute(String.format("CREATE TABLE %s(id INTEGER, name VARCHAR(200), updated_at DATE);", getFullyQualifiedTableName(TABLE_NAME)));
.execute(String.format("CREATE TABLE %s(id INTEGER, name VARCHAR(200), updated_at DATE, PRIMARY KEY (id));",
getFullyQualifiedTableName(TABLE_NAME)));
connection.createStatement().execute(
String.format(
"INSERT INTO %s(id, name, updated_at) VALUES (1,'picard', '2004-10-19'), (2, 'crusher', '2005-10-19'), (3, 'vash', '2006-10-19');",
getFullyQualifiedTableName(TABLE_NAME)));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call adding these test cases here. glad that this could be added without having to rewrite all of the tests!

connection.createStatement()
.execute(String.format("CREATE TABLE %s(id INTEGER, name VARCHAR(200), updated_at DATE);",
getFullyQualifiedTableName(TABLE_NAME_WITHOUT_PK)));
connection.createStatement().execute(
String.format(
"INSERT INTO %s(id, name, updated_at) VALUES (1,'picard', '2004-10-19'), (2, 'crusher', '2005-10-19'), (3, 'vash', '2006-10-19');",
getFullyQualifiedTableName(TABLE_NAME_WITHOUT_PK)));

connection.createStatement()
.execute(
String.format("CREATE TABLE %s(first_name VARCHAR(200), last_name VARCHAR(200), updated_at DATE, PRIMARY KEY (first_name, last_name));",
getFullyQualifiedTableName(TABLE_NAME_FULL_NAMES)));
connection.createStatement().execute(
String.format(
"INSERT INTO %s(first_name, last_name, updated_at) VALUES ('first' ,'picard', '2004-10-19'), ('second', 'crusher', '2005-10-19'), ('third', 'vash', '2006-10-19');",
getFullyQualifiedTableName(TABLE_NAME_FULL_NAMES)));
});
}

Expand Down Expand Up @@ -183,8 +203,14 @@ void testCheckFailure() throws Exception {

@Test
void testDiscover() throws Exception {
final AirbyteCatalog actual = source.discover(config);
assertEquals(getCatalog(), filterOutOtherSchemas(actual));
final AirbyteCatalog actual = filterOutOtherSchemas(source.discover(config));
assertEquals(getCatalog(getDefaultNamespace()).getStreams().size(), actual.getStreams().size());
actual.getStreams().forEach(actualStream -> {
final Optional<AirbyteStream> expectedStream =
getCatalog(getDefaultNamespace()).getStreams().stream().filter(stream -> stream.getName().equals(actualStream.getName())).findAny();
assertTrue(expectedStream.isPresent(), String.format("Unexpected stream %s", actualStream.getName()));
assertEquals(expectedStream.get(), actualStream);
});
}

private AirbyteCatalog filterOutOtherSchemas(AirbyteCatalog catalog) {
Expand Down Expand Up @@ -219,7 +245,7 @@ void testDiscoverWithMultipleSchemas() throws Exception {

final AirbyteCatalog actual = source.discover(config);

final AirbyteCatalog expected = getCatalog();
final AirbyteCatalog expected = getCatalog(getDefaultNamespace());
expected.getStreams().add(CatalogHelpers.createAirbyteStream(JdbcUtils.getFullyQualifiedTableName(SCHEMA_NAME2, TABLE_NAME),
Field.of("id", JsonSchemaPrimitive.STRING),
Field.of("name", JsonSchemaPrimitive.STRING))
Expand All @@ -232,7 +258,8 @@ void testDiscoverWithMultipleSchemas() throws Exception {

@Test
void testReadSuccess() throws Exception {
final List<AirbyteMessage> actualMessages = MoreIterators.toList(source.read(config, getConfiguredCatalog(), null));
final List<AirbyteMessage> actualMessages =
MoreIterators.toList(source.read(config, getConfiguredCatalogWithOneStream(getDefaultNamespace()), null));

setEmittedAtToNull(actualMessages);

Expand All @@ -259,8 +286,7 @@ void testReadOneColumn() throws Exception {

@Test
void testReadMultipleTables() throws Exception {
final ConfiguredAirbyteCatalog catalog =
new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(getConfiguredCatalog().getStreams().get(0)));
final ConfiguredAirbyteCatalog catalog = getConfiguredCatalogWithOneStream(getDefaultNamespace());
final List<AirbyteMessage> expectedMessages = new ArrayList<>(getTestMessages());

for (int i = 2; i < 10; i++) {
Expand Down Expand Up @@ -300,7 +326,7 @@ void testTablesWithQuoting() throws Exception {
final ConfiguredAirbyteStream streamForTableWithSpaces = createTableWithSpaces();

final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(
getConfiguredCatalog().getStreams().get(0),
getConfiguredCatalogWithOneStream(getDefaultNamespace()).getStreams().get(0),
streamForTableWithSpaces));
final List<AirbyteMessage> actualMessages = MoreIterators.toList(source.read(config, catalog, null));

Expand All @@ -324,7 +350,7 @@ void testTablesWithQuoting() throws Exception {
@SuppressWarnings("ResultOfMethodCallIgnored")
@Test
void testReadFailure() {
final ConfiguredAirbyteStream spiedAbStream = spy(getConfiguredCatalog().getStreams().get(0));
final ConfiguredAirbyteStream spiedAbStream = spy(getConfiguredCatalogWithOneStream(getDefaultNamespace()).getStreams().get(0));
final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(spiedAbStream));
doCallRealMethod().doThrow(new RuntimeException()).when(spiedAbStream).getStream();

Expand Down Expand Up @@ -407,7 +433,7 @@ void testIncrementalCursorChanges() throws Exception {

@Test
void testReadOneTableIncrementallyTwice() throws Exception {
final ConfiguredAirbyteCatalog configuredCatalog = getConfiguredCatalog();
final ConfiguredAirbyteCatalog configuredCatalog = getConfiguredCatalogWithOneStream(getDefaultNamespace());
configuredCatalog.getStreams().forEach(airbyteStream -> {
airbyteStream.setSyncMode(SyncMode.INCREMENTAL);
airbyteStream.setCursorField(Lists.newArrayList("id"));
Expand Down Expand Up @@ -458,7 +484,7 @@ void testReadMultipleTablesIncrementally() throws Exception {
String.format("INSERT INTO %s(id, name) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash');", getFullyQualifiedTableName(tableName2)));
});

final ConfiguredAirbyteCatalog configuredCatalog = getConfiguredCatalog();
final ConfiguredAirbyteCatalog configuredCatalog = getConfiguredCatalogWithOneStream(getDefaultNamespace());
configuredCatalog.getStreams().add(CatalogHelpers.createConfiguredAirbyteStream(
streamName2,
Field.of("id", JsonSchemaPrimitive.NUMBER),
Expand Down Expand Up @@ -537,7 +563,7 @@ private void incrementalCursorCheck(
List<AirbyteMessage> expectedRecordMessages)
throws Exception {
incrementalCursorCheck(initialCursorField, cursorField, initialCursorValue, endCursorValue, expectedRecordMessages,
getConfiguredCatalog().getStreams().get(0));
getConfiguredCatalogWithOneStream(getDefaultNamespace()).getStreams().get(0));
}

private void incrementalCursorCheck(
Expand Down Expand Up @@ -577,17 +603,36 @@ private void incrementalCursorCheck(
}

// get catalog and perform a defensive copy.
private static ConfiguredAirbyteCatalog getConfiguredCatalog() {
return CatalogHelpers.toDefaultConfiguredCatalog(getCatalog());
}

private static AirbyteCatalog getCatalog() {
return new AirbyteCatalog().withStreams(Lists.newArrayList(CatalogHelpers.createAirbyteStream(
streamName,
Field.of("id", JsonSchemaPrimitive.NUMBER),
Field.of("name", JsonSchemaPrimitive.STRING),
Field.of("updated_at", JsonSchemaPrimitive.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))));
private static ConfiguredAirbyteCatalog getConfiguredCatalogWithOneStream(final String defaultNamespace) {
final ConfiguredAirbyteCatalog catalog = CatalogHelpers.toDefaultConfiguredCatalog(getCatalog(defaultNamespace));
// Filter to only keep the main stream name as configured stream
catalog.withStreams(catalog.getStreams().stream().filter(s -> s.getStream().getName().equals(streamName)).collect(Collectors.toList()));
return catalog;
}

private static AirbyteCatalog getCatalog(final String defaultNamespace) {
return new AirbyteCatalog().withStreams(Lists.newArrayList(
CatalogHelpers.createAirbyteStream(
defaultNamespace + "." + TABLE_NAME,
Field.of("id", JsonSchemaPrimitive.NUMBER),
Field.of("name", JsonSchemaPrimitive.STRING),
Field.of("updated_at", JsonSchemaPrimitive.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id"))),
CatalogHelpers.createAirbyteStream(
defaultNamespace + "." + TABLE_NAME_WITHOUT_PK,
Field.of("id", JsonSchemaPrimitive.NUMBER),
Field.of("name", JsonSchemaPrimitive.STRING),
Field.of("updated_at", JsonSchemaPrimitive.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(Collections.emptyList()),
CatalogHelpers.createAirbyteStream(
defaultNamespace + "." + TABLE_NAME_FULL_NAMES,
Field.of("first_name", JsonSchemaPrimitive.STRING),
Field.of("last_name", JsonSchemaPrimitive.STRING),
Field.of("updated_at", JsonSchemaPrimitive.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("first_name"), List.of("last_name")))));
}

private static List<AirbyteMessage> getTestMessages() {
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mssql/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.2.0
LABEL io.airbyte.version=0.2.1
LABEL io.airbyte.name=airbyte/source-mssql
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import io.airbyte.protocol.models.Field.JsonSchemaPrimitive;
import io.airbyte.protocol.models.SyncMode;
import java.sql.SQLException;
import java.util.List;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
Expand All @@ -54,7 +55,8 @@ class MssqlSourceTest {
Field.of("id", JsonSchemaPrimitive.NUMBER),
Field.of("name", JsonSchemaPrimitive.STRING),
Field.of("born", JsonSchemaPrimitive.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))));
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id")))));

private JsonNode configWithoutDbName;
private JsonNode config;
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.2.0
LABEL io.airbyte.version=0.2.1
LABEL io.airbyte.name=airbyte/source-mysql
Loading