-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Source support primary keys #2488
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -56,6 +56,7 @@ | |
import java.sql.SQLException; | ||
import java.time.Instant; | ||
import java.util.ArrayList; | ||
import java.util.Collections; | ||
import java.util.HashSet; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
@@ -135,7 +136,9 @@ public AirbyteCatalog discover(JsonNode config) throws Exception { | |
Optional.ofNullable(config.get("schema")).map(JsonNode::asText)) | ||
.stream() | ||
.map(t -> CatalogHelpers.createAirbyteStream(t.getName(), t.getFields()) | ||
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))) | ||
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) | ||
.withSourceDefinedPrimaryKey( | ||
t.getPrimaryKeys().stream().filter(Objects::nonNull).map(Collections::singletonList).collect(Collectors.toList()))) | ||
.collect(Collectors.toList())); | ||
} | ||
} | ||
|
@@ -290,7 +293,7 @@ private List<TableInfo> getTables(final JdbcDatabase database, | |
.distinct() | ||
.collect(Collectors.toList()); | ||
|
||
return new TableInfo(JdbcUtils.getFullyQualifiedTableName(t.getSchemaName(), t.getName()), fields); | ||
return new TableInfo(JdbcUtils.getFullyQualifiedTableName(t.getSchemaName(), t.getName()), fields, t.getPrimaryKeys()); | ||
}) | ||
.collect(Collectors.toList()); | ||
} | ||
|
@@ -317,7 +320,7 @@ private List<TableInfoInternal> discoverInternal(final JdbcDatabase database, | |
final Optional<String> schemaOptional) | ||
throws Exception { | ||
final Set<String> internalSchemas = new HashSet<>(getExcludedInternalSchemas()); | ||
return database.bufferedResultSetQuery( | ||
final List<TableInfoInternal> result = database.bufferedResultSetQuery( | ||
conn -> conn.getMetaData().getColumns(databaseOptional.orElse(null), schemaOptional.orElse(null), null, null), | ||
resultSet -> Jsons.jsonNode(ImmutableMap.<String, Object>builder() | ||
// we always want a namespace, if we cannot get a schema, use db name. | ||
|
@@ -355,6 +358,17 @@ private List<TableInfoInternal> discoverInternal(final JdbcDatabase database, | |
}) | ||
.collect(Collectors.toList()))) | ||
.collect(Collectors.toList()); | ||
result.forEach(t -> { | ||
try { | ||
final List<String> primaryKeys = database.bufferedResultSetQuery( | ||
conn -> conn.getMetaData().getPrimaryKeys(databaseOptional.orElse(null), t.getSchemaName(), t.getName()), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we need to do this query once per table? or if we leave table null, can we just run this query once for the whole database? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've tried to run the function without table names. It seems like for MySQL this is not allowed and is throwing an exception about not having a table name but it does work for some other Databases... I made a new PR for this See also:
|
||
resultSet -> resultSet.getString(JDBC_COLUMN_COLUMN_NAME)); | ||
t.addPrimaryKeys(primaryKeys); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One convention that we try to follow in our java code is to keep objects immutable wherever possible. This pattern of setting fields after initialization should only be used if absolutely necessary (e.g. it represents different stages of the object's lifecycle). You'll notice that in almost all of our java classes all of the fields are final. Here it is not necessary to set the field late. The integrity of this object is compromised by because we happen to use two separate queries to construct it. I would suggest the following you do both queries 1. get all columns. 2 get all primary keys. and then construct the table info one time (no need for addPrimaryKeys). LMK if any of this is unclear or if the motivation doesn't make sense. |
||
} catch (SQLException e) { | ||
LOGGER.warn(String.format("Could not find primary keys for %s.%s: %s", t.getSchemaName(), t.getName(), e)); | ||
} | ||
}); | ||
return result; | ||
} | ||
|
||
private static AutoCloseableIterator<AirbyteMessage> getMessageIterator(AutoCloseableIterator<JsonNode> recordIterator, | ||
|
@@ -440,10 +454,12 @@ protected static class TableInfo { | |
|
||
private final String name; | ||
private final List<Field> fields; | ||
private final List<String> primaryKeys; | ||
|
||
public TableInfo(String name, List<Field> fields) { | ||
public TableInfo(String name, List<Field> fields, List<String> primaryKeys) { | ||
this.name = name; | ||
this.fields = fields; | ||
this.primaryKeys = primaryKeys; | ||
} | ||
|
||
public String getName() { | ||
|
@@ -454,18 +470,24 @@ public List<Field> getFields() { | |
return fields; | ||
} | ||
|
||
public List<String> getPrimaryKeys() { | ||
return primaryKeys; | ||
} | ||
|
||
} | ||
|
||
protected static class TableInfoInternal { | ||
|
||
private final String schemaName; | ||
private final String name; | ||
private final List<ColumnInfo> fields; | ||
private final List<String> primaryKeys; | ||
|
||
public TableInfoInternal(String schemaName, String tableName, List<ColumnInfo> fields) { | ||
this.schemaName = schemaName; | ||
this.name = tableName; | ||
this.fields = fields; | ||
this.primaryKeys = new ArrayList<>(); | ||
} | ||
|
||
public String getSchemaName() { | ||
|
@@ -480,6 +502,14 @@ public List<ColumnInfo> getFields() { | |
return fields; | ||
} | ||
|
||
public void addPrimaryKeys(List<String> primaryKeys) { | ||
this.primaryKeys.addAll(primaryKeys); | ||
} | ||
|
||
public List<String> getPrimaryKeys() { | ||
return primaryKeys; | ||
} | ||
|
||
} | ||
|
||
protected static class ColumnInfo { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -62,6 +62,7 @@ | |
import io.airbyte.protocol.models.SyncMode; | ||
import java.sql.SQLException; | ||
import java.util.ArrayList; | ||
import java.util.Collections; | ||
import java.util.Comparator; | ||
import java.util.List; | ||
import java.util.Optional; | ||
|
@@ -87,6 +88,8 @@ public abstract class JdbcSourceStandardTest { | |
private static final Set<String> TEST_SCHEMAS = ImmutableSet.of(SCHEMA_NAME, SCHEMA_NAME2); | ||
|
||
private static final String TABLE_NAME = "id_and_name"; | ||
private static final String TABLE_NAME_WITHOUT_PK = "id_and_name_without_pk"; | ||
private static final String TABLE_NAME_FULL_NAMES = "full_names"; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we call this multi column pk or something? i had trouble parsing this name. wasn't until i read the create table query that i understood it. |
||
|
||
private JsonNode config; | ||
private JdbcDatabase database; | ||
|
@@ -142,13 +145,30 @@ public void setup() throws Exception { | |
createSchemas(); | ||
} | ||
database.execute(connection -> { | ||
|
||
connection.createStatement() | ||
.execute(String.format("CREATE TABLE %s(id INTEGER, name VARCHAR(200), updated_at DATE);", getFullyQualifiedTableName(TABLE_NAME))); | ||
.execute(String.format("CREATE TABLE %s(id INTEGER, name VARCHAR(200), updated_at DATE, PRIMARY KEY (id));", | ||
getFullyQualifiedTableName(TABLE_NAME))); | ||
connection.createStatement().execute( | ||
String.format( | ||
"INSERT INTO %s(id, name, updated_at) VALUES (1,'picard', '2004-10-19'), (2, 'crusher', '2005-10-19'), (3, 'vash', '2006-10-19');", | ||
getFullyQualifiedTableName(TABLE_NAME))); | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good call adding these test cases here. glad that this could be added without having to rewrite all of the tests! |
||
connection.createStatement() | ||
.execute(String.format("CREATE TABLE %s(id INTEGER, name VARCHAR(200), updated_at DATE);", | ||
getFullyQualifiedTableName(TABLE_NAME_WITHOUT_PK))); | ||
connection.createStatement().execute( | ||
String.format( | ||
"INSERT INTO %s(id, name, updated_at) VALUES (1,'picard', '2004-10-19'), (2, 'crusher', '2005-10-19'), (3, 'vash', '2006-10-19');", | ||
getFullyQualifiedTableName(TABLE_NAME_WITHOUT_PK))); | ||
|
||
connection.createStatement() | ||
.execute( | ||
String.format("CREATE TABLE %s(first_name VARCHAR(200), last_name VARCHAR(200), updated_at DATE, PRIMARY KEY (first_name, last_name));", | ||
getFullyQualifiedTableName(TABLE_NAME_FULL_NAMES))); | ||
connection.createStatement().execute( | ||
String.format( | ||
"INSERT INTO %s(first_name, last_name, updated_at) VALUES ('first' ,'picard', '2004-10-19'), ('second', 'crusher', '2005-10-19'), ('third', 'vash', '2006-10-19');", | ||
getFullyQualifiedTableName(TABLE_NAME_FULL_NAMES))); | ||
}); | ||
} | ||
|
||
|
@@ -183,8 +203,14 @@ void testCheckFailure() throws Exception { | |
|
||
@Test | ||
void testDiscover() throws Exception { | ||
final AirbyteCatalog actual = source.discover(config); | ||
assertEquals(getCatalog(), filterOutOtherSchemas(actual)); | ||
final AirbyteCatalog actual = filterOutOtherSchemas(source.discover(config)); | ||
assertEquals(getCatalog(getDefaultNamespace()).getStreams().size(), actual.getStreams().size()); | ||
actual.getStreams().forEach(actualStream -> { | ||
final Optional<AirbyteStream> expectedStream = | ||
getCatalog(getDefaultNamespace()).getStreams().stream().filter(stream -> stream.getName().equals(actualStream.getName())).findAny(); | ||
assertTrue(expectedStream.isPresent(), String.format("Unexpected stream %s", actualStream.getName())); | ||
assertEquals(expectedStream.get(), actualStream); | ||
}); | ||
} | ||
|
||
private AirbyteCatalog filterOutOtherSchemas(AirbyteCatalog catalog) { | ||
|
@@ -219,7 +245,7 @@ void testDiscoverWithMultipleSchemas() throws Exception { | |
|
||
final AirbyteCatalog actual = source.discover(config); | ||
|
||
final AirbyteCatalog expected = getCatalog(); | ||
final AirbyteCatalog expected = getCatalog(getDefaultNamespace()); | ||
expected.getStreams().add(CatalogHelpers.createAirbyteStream(JdbcUtils.getFullyQualifiedTableName(SCHEMA_NAME2, TABLE_NAME), | ||
Field.of("id", JsonSchemaPrimitive.STRING), | ||
Field.of("name", JsonSchemaPrimitive.STRING)) | ||
|
@@ -232,7 +258,8 @@ void testDiscoverWithMultipleSchemas() throws Exception { | |
|
||
@Test | ||
void testReadSuccess() throws Exception { | ||
final List<AirbyteMessage> actualMessages = MoreIterators.toList(source.read(config, getConfiguredCatalog(), null)); | ||
final List<AirbyteMessage> actualMessages = | ||
MoreIterators.toList(source.read(config, getConfiguredCatalogWithOneStream(getDefaultNamespace()), null)); | ||
|
||
setEmittedAtToNull(actualMessages); | ||
|
||
|
@@ -259,8 +286,7 @@ void testReadOneColumn() throws Exception { | |
|
||
@Test | ||
void testReadMultipleTables() throws Exception { | ||
final ConfiguredAirbyteCatalog catalog = | ||
new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(getConfiguredCatalog().getStreams().get(0))); | ||
final ConfiguredAirbyteCatalog catalog = getConfiguredCatalogWithOneStream(getDefaultNamespace()); | ||
final List<AirbyteMessage> expectedMessages = new ArrayList<>(getTestMessages()); | ||
|
||
for (int i = 2; i < 10; i++) { | ||
|
@@ -300,7 +326,7 @@ void testTablesWithQuoting() throws Exception { | |
final ConfiguredAirbyteStream streamForTableWithSpaces = createTableWithSpaces(); | ||
|
||
final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList( | ||
getConfiguredCatalog().getStreams().get(0), | ||
getConfiguredCatalogWithOneStream(getDefaultNamespace()).getStreams().get(0), | ||
streamForTableWithSpaces)); | ||
final List<AirbyteMessage> actualMessages = MoreIterators.toList(source.read(config, catalog, null)); | ||
|
||
|
@@ -324,7 +350,7 @@ void testTablesWithQuoting() throws Exception { | |
@SuppressWarnings("ResultOfMethodCallIgnored") | ||
@Test | ||
void testReadFailure() { | ||
final ConfiguredAirbyteStream spiedAbStream = spy(getConfiguredCatalog().getStreams().get(0)); | ||
final ConfiguredAirbyteStream spiedAbStream = spy(getConfiguredCatalogWithOneStream(getDefaultNamespace()).getStreams().get(0)); | ||
final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(spiedAbStream)); | ||
doCallRealMethod().doThrow(new RuntimeException()).when(spiedAbStream).getStream(); | ||
|
||
|
@@ -407,7 +433,7 @@ void testIncrementalCursorChanges() throws Exception { | |
|
||
@Test | ||
void testReadOneTableIncrementallyTwice() throws Exception { | ||
final ConfiguredAirbyteCatalog configuredCatalog = getConfiguredCatalog(); | ||
final ConfiguredAirbyteCatalog configuredCatalog = getConfiguredCatalogWithOneStream(getDefaultNamespace()); | ||
configuredCatalog.getStreams().forEach(airbyteStream -> { | ||
airbyteStream.setSyncMode(SyncMode.INCREMENTAL); | ||
airbyteStream.setCursorField(Lists.newArrayList("id")); | ||
|
@@ -458,7 +484,7 @@ void testReadMultipleTablesIncrementally() throws Exception { | |
String.format("INSERT INTO %s(id, name) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash');", getFullyQualifiedTableName(tableName2))); | ||
}); | ||
|
||
final ConfiguredAirbyteCatalog configuredCatalog = getConfiguredCatalog(); | ||
final ConfiguredAirbyteCatalog configuredCatalog = getConfiguredCatalogWithOneStream(getDefaultNamespace()); | ||
configuredCatalog.getStreams().add(CatalogHelpers.createConfiguredAirbyteStream( | ||
streamName2, | ||
Field.of("id", JsonSchemaPrimitive.NUMBER), | ||
|
@@ -537,7 +563,7 @@ private void incrementalCursorCheck( | |
List<AirbyteMessage> expectedRecordMessages) | ||
throws Exception { | ||
incrementalCursorCheck(initialCursorField, cursorField, initialCursorValue, endCursorValue, expectedRecordMessages, | ||
getConfiguredCatalog().getStreams().get(0)); | ||
getConfiguredCatalogWithOneStream(getDefaultNamespace()).getStreams().get(0)); | ||
} | ||
|
||
private void incrementalCursorCheck( | ||
|
@@ -577,17 +603,36 @@ private void incrementalCursorCheck( | |
} | ||
|
||
// get catalog and perform a defensive copy. | ||
private static ConfiguredAirbyteCatalog getConfiguredCatalog() { | ||
return CatalogHelpers.toDefaultConfiguredCatalog(getCatalog()); | ||
} | ||
|
||
private static AirbyteCatalog getCatalog() { | ||
return new AirbyteCatalog().withStreams(Lists.newArrayList(CatalogHelpers.createAirbyteStream( | ||
streamName, | ||
Field.of("id", JsonSchemaPrimitive.NUMBER), | ||
Field.of("name", JsonSchemaPrimitive.STRING), | ||
Field.of("updated_at", JsonSchemaPrimitive.STRING)) | ||
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)))); | ||
private static ConfiguredAirbyteCatalog getConfiguredCatalogWithOneStream(final String defaultNamespace) { | ||
final ConfiguredAirbyteCatalog catalog = CatalogHelpers.toDefaultConfiguredCatalog(getCatalog(defaultNamespace)); | ||
// Filter to only keep the main stream name as configured stream | ||
catalog.withStreams(catalog.getStreams().stream().filter(s -> s.getStream().getName().equals(streamName)).collect(Collectors.toList())); | ||
return catalog; | ||
} | ||
|
||
private static AirbyteCatalog getCatalog(final String defaultNamespace) { | ||
return new AirbyteCatalog().withStreams(Lists.newArrayList( | ||
CatalogHelpers.createAirbyteStream( | ||
defaultNamespace + "." + TABLE_NAME, | ||
Field.of("id", JsonSchemaPrimitive.NUMBER), | ||
Field.of("name", JsonSchemaPrimitive.STRING), | ||
Field.of("updated_at", JsonSchemaPrimitive.STRING)) | ||
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) | ||
.withSourceDefinedPrimaryKey(List.of(List.of("id"))), | ||
CatalogHelpers.createAirbyteStream( | ||
defaultNamespace + "." + TABLE_NAME_WITHOUT_PK, | ||
Field.of("id", JsonSchemaPrimitive.NUMBER), | ||
Field.of("name", JsonSchemaPrimitive.STRING), | ||
Field.of("updated_at", JsonSchemaPrimitive.STRING)) | ||
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) | ||
.withSourceDefinedPrimaryKey(Collections.emptyList()), | ||
CatalogHelpers.createAirbyteStream( | ||
defaultNamespace + "." + TABLE_NAME_FULL_NAMES, | ||
Field.of("first_name", JsonSchemaPrimitive.STRING), | ||
Field.of("last_name", JsonSchemaPrimitive.STRING), | ||
Field.of("updated_at", JsonSchemaPrimitive.STRING)) | ||
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) | ||
.withSourceDefinedPrimaryKey(List.of(List.of("first_name"), List.of("last_name"))))); | ||
} | ||
|
||
private static List<AirbyteMessage> getTestMessages() { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
styling nit.