-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Source support primary keys #2488
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add more test cases to cover this feature more thoroughly? Also one style comment
@@ -354,6 +357,16 @@ private static void assertColumnsWithSameNameAreSame(String schemaName, String t | |||
return new ColumnInfo(f.get(INTERNAL_COLUMN_NAME).asText(), jdbcType); | |||
}) | |||
.collect(Collectors.toList()))) | |||
.peek(t -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this would be better done in a for loop after the sequence of stream operations for readability
...ource-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java
Show resolved
Hide resolved
/publish connector=connectors/source-postgres
|
/publish connector=connectors/source-mysql
|
/publish connector=connectors/source-mssql
|
/publish connector=connectors/source-redshift
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am going to sleep so well tonight with all these amazing test cases!!! Looks great.
.withSourceDefinedPrimaryKey( | ||
t.getPrimaryKeys().stream().filter(Objects::nonNull).map(Collections::singletonList).collect(Collectors.toList()))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
styling nit.
.withSourceDefinedPrimaryKey( | |
t.getPrimaryKeys().stream().filter(Objects::nonNull).map(Collections::singletonList).collect(Collectors.toList()))) | |
.withSourceDefinedPrimaryKey(t.getPrimaryKeys() | |
.stream() | |
.filter(Objects::nonNull) | |
.map(Collections::singletonList) | |
.collect(Collectors.toList()))) |
result.forEach(t -> { | ||
try { | ||
final List<String> primaryKeys = database.bufferedResultSetQuery( | ||
conn -> conn.getMetaData().getPrimaryKeys(databaseOptional.orElse(null), t.getSchemaName(), t.getName()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to do this query once per table? or if we leave table null, can we just run this query once for the whole database?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've tried to run the function without table names.
It seems like for MySQL this is not allowed and is throwing an exception about not having a table name but it does work for some other Databases... I made a new PR for this
See also:
The method getTables takes a like-pattern for the tableNamePattern parameter, so "%" matches all table names.
The method getPrimaryKeys and getExportedKeys do not take a pattern, so you will need to loop over the result of
getTables and execute those methods for each row of the getTables result set.
final List<String> primaryKeys = database.bufferedResultSetQuery( | ||
conn -> conn.getMetaData().getPrimaryKeys(databaseOptional.orElse(null), t.getSchemaName(), t.getName()), | ||
resultSet -> resultSet.getString(JDBC_COLUMN_COLUMN_NAME)); | ||
t.addPrimaryKeys(primaryKeys); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One convention that we try to follow in our java code is to keep objects immutable wherever possible. This pattern of setting fields after initialization should only be used if absolutely necessary (e.g. it represents different stages of the object's lifecycle). You'll notice that in almost all of our java classes all of the fields are final. Here it is not necessary to set the field late. The integrity of this object is compromised by because we happen to use two separate queries to construct it.
I would suggest the following you do both queries 1. get all columns. 2 get all primary keys. and then construct the table info one time (no need for addPrimaryKeys).
LMK if any of this is unclear or if the motivation doesn't make sense.
connection.createStatement().execute( | ||
String.format( | ||
"INSERT INTO %s(id, name, updated_at) VALUES (1,'picard', '2004-10-19'), (2, 'crusher', '2005-10-19'), (3, 'vash', '2006-10-19');", | ||
getFullyQualifiedTableName(TABLE_NAME))); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good call adding these test cases here. glad that this could be added without having to rewrite all of the tests!
@@ -87,6 +88,8 @@ | |||
private static final Set<String> TEST_SCHEMAS = ImmutableSet.of(SCHEMA_NAME, SCHEMA_NAME2); | |||
|
|||
private static final String TABLE_NAME = "id_and_name"; | |||
private static final String TABLE_NAME_WITHOUT_PK = "id_and_name_without_pk"; | |||
private static final String TABLE_NAME_FULL_NAMES = "full_names"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we call this multi column pk or something? i had trouble parsing this name. wasn't until i read the create table query that i understood it.
What
Closes #2461
How
JDBC Abstract Sources are filling up the
sourceDefinedPrimaryKey
fieldPre-merge Checklist
Recommended reading order
airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java