Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Try to make one query to DB to retrieve primary keys #2514

Merged
merged 2 commits into from
Mar 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,10 @@
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.time.Instant;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -137,8 +139,11 @@ public AirbyteCatalog discover(JsonNode config) throws Exception {
.stream()
.map(t -> CatalogHelpers.createAirbyteStream(t.getName(), t.getFields())
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(
t.getPrimaryKeys().stream().filter(Objects::nonNull).map(Collections::singletonList).collect(Collectors.toList())))
.withSourceDefinedPrimaryKey(t.getPrimaryKeys()
.stream()
.filter(Objects::nonNull)
.map(Collections::singletonList)
.collect(Collectors.toList())))
.collect(Collectors.toList()));
}
}
Expand Down Expand Up @@ -280,8 +285,9 @@ private List<TableInfo> getTables(final JdbcDatabase database,
final Optional<String> databaseOptional,
final Optional<String> schemaOptional)
throws Exception {

return discoverInternal(database, databaseOptional, schemaOptional).stream()
final List<TableInfoInternal> tableInfos = discoverInternal(database, databaseOptional, schemaOptional);
final Map<String, List<String>> tablePrimaryKeys = discoverPrimaryKeys(database, databaseOptional, schemaOptional, tableInfos);
return tableInfos.stream()
.map(t -> {
// some databases return multiple copies of the same record for a column (e.g. redshift) because
// they have at least once delivery guarantees. we want to dedupe these, but first we check that the
Expand All @@ -292,12 +298,76 @@ private List<TableInfo> getTables(final JdbcDatabase database,
.map(f -> Field.of(f.getColumnName(), JdbcUtils.getType(f.getColumnType())))
.distinct()
.collect(Collectors.toList());

return new TableInfo(JdbcUtils.getFullyQualifiedTableName(t.getSchemaName(), t.getName()), fields, t.getPrimaryKeys());
final String streamName = JdbcUtils.getFullyQualifiedTableName(t.getSchemaName(), t.getName());
final List<String> primaryKeys = tablePrimaryKeys.getOrDefault(streamName, Collections.emptyList());
return new TableInfo(streamName, fields, primaryKeys);
})
.collect(Collectors.toList());
}

/**
* Discover Primary keys for each table and @return a map of schema.table name to their associated
* list of primary key fields.
*
* When invoking the conn.getMetaData().getPrimaryKeys() function without a table name, it may fail
* on some databases (for example MySql) but works on others (for instance Postgres). To avoid
* making repeated queries to the DB, we try to get all primary keys without specifying a table
* first, if it doesn't work, we retry one table at a time.
*/
private Map<String, List<String>> discoverPrimaryKeys(JdbcDatabase database,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it. i'd suggest 1 of 3 options here:

  1. have this return an optional if the query doesn't work
  2. have this method always succeed by hiding this logic internally. So it always takes the table names. If it doesn't need them it doesn't use them, but if it does it has them.
    e.g. discoverPrimaryKeys(JdbcDatabase database, Optional<String> databaseOptional, Optional<String> schemaOptional, String tableName) => discoverPrimaryKeys(JdbcDatabase database, Optional<String> databaseOptional, Optional<String> schemaOptional, List<String> tableNames)
  3. just always query one table at a time.

i think 2 or 3 are equally good options and are clearer than 1. with the weird quirk you discovered that you can't always use null for getPrimaryKeys it's not the end of the world if we just do a query per table.

Optional<String> databaseOptional,
Optional<String> schemaOptional,
List<TableInfoInternal> tableInfos) {
try {
// Get all primary keys without specifying a table name
final Map<String, List<String>> tablePrimaryKeys = aggregatePrimateKeys(database.bufferedResultSetQuery(
conn -> conn.getMetaData().getPrimaryKeys(databaseOptional.orElse(null), schemaOptional.orElse(null), null),
r -> {
final String schemaName =
r.getObject(JDBC_COLUMN_SCHEMA_NAME) != null ? r.getString(JDBC_COLUMN_SCHEMA_NAME) : r.getString(JDBC_COLUMN_DATABASE_NAME);
final String streamName = JdbcUtils.getFullyQualifiedTableName(schemaName, r.getString(JDBC_COLUMN_TABLE_NAME));
final String primaryKey = r.getString(JDBC_COLUMN_COLUMN_NAME);
return new SimpleImmutableEntry<>(streamName, primaryKey);
}));
if (!tablePrimaryKeys.isEmpty()) {
return tablePrimaryKeys;
}
} catch (SQLException e) {
LOGGER.debug(String.format("Could not retrieve primary keys without a table name (%s), retrying", e));
}
// Get primary keys one table at a time
return tableInfos.stream()
.collect(Collectors.toMap(
tableInfo -> JdbcUtils.getFullyQualifiedTableName(tableInfo.getSchemaName(), tableInfo.getName()),
tableInfo -> {
final String streamName = JdbcUtils.getFullyQualifiedTableName(tableInfo.getSchemaName(), tableInfo.getName());
try {
final Map<String, List<String>> primaryKeys = aggregatePrimateKeys(database.bufferedResultSetQuery(
conn -> conn.getMetaData().getPrimaryKeys(databaseOptional.orElse(null), tableInfo.getSchemaName(), tableInfo.getName()),
r -> new SimpleImmutableEntry<>(streamName, r.getString(JDBC_COLUMN_COLUMN_NAME))));
return primaryKeys.getOrDefault(streamName, Collections.emptyList());
} catch (SQLException e) {
LOGGER.error(String.format("Could not retrieve primary keys for %s: %s", streamName, e));
return Collections.emptyList();
}
}));
}

/**
* Aggregate list of @param entries of StreamName and PrimaryKey and
* @return a map by StreamName to associated list of primary keys
*/
private static Map<String, List<String>> aggregatePrimateKeys(List<SimpleImmutableEntry<String, String>> entries) {
final Map<String, List<String>> result = new HashMap<>();
entries.forEach(entry -> {
if (!result.containsKey(entry.getKey())) {
result.put(entry.getKey(), new ArrayList<>());
}
result.get(entry.getKey()).add(entry.getValue());
});
return result;
}

private static void assertColumnsWithSameNameAreSame(String schemaName, String tableName, List<ColumnInfo> columns) {
columns.stream()
.collect(Collectors.groupingBy(ColumnInfo::getColumnName))
Expand All @@ -320,7 +390,7 @@ private List<TableInfoInternal> discoverInternal(final JdbcDatabase database,
final Optional<String> schemaOptional)
throws Exception {
final Set<String> internalSchemas = new HashSet<>(getExcludedInternalSchemas());
final List<TableInfoInternal> result = database.bufferedResultSetQuery(
return database.bufferedResultSetQuery(
conn -> conn.getMetaData().getColumns(databaseOptional.orElse(null), schemaOptional.orElse(null), null, null),
resultSet -> Jsons.jsonNode(ImmutableMap.<String, Object>builder()
// we always want a namespace, if we cannot get a schema, use db name.
Expand Down Expand Up @@ -358,17 +428,6 @@ private List<TableInfoInternal> discoverInternal(final JdbcDatabase database,
})
.collect(Collectors.toList())))
.collect(Collectors.toList());
result.forEach(t -> {
try {
final List<String> primaryKeys = database.bufferedResultSetQuery(
conn -> conn.getMetaData().getPrimaryKeys(databaseOptional.orElse(null), t.getSchemaName(), t.getName()),
resultSet -> resultSet.getString(JDBC_COLUMN_COLUMN_NAME));
t.addPrimaryKeys(primaryKeys);
} catch (SQLException e) {
LOGGER.warn(String.format("Could not find primary keys for %s.%s: %s", t.getSchemaName(), t.getName(), e));
}
});
return result;
}

private static AutoCloseableIterator<AirbyteMessage> getMessageIterator(AutoCloseableIterator<JsonNode> recordIterator,
Expand Down Expand Up @@ -481,13 +540,11 @@ protected static class TableInfoInternal {
private final String schemaName;
private final String name;
private final List<ColumnInfo> fields;
private final List<String> primaryKeys;

public TableInfoInternal(String schemaName, String tableName, List<ColumnInfo> fields) {
this.schemaName = schemaName;
this.name = tableName;
this.fields = fields;
this.primaryKeys = new ArrayList<>();
}

public String getSchemaName() {
Expand All @@ -502,14 +559,6 @@ public List<ColumnInfo> getFields() {
return fields;
}

public void addPrimaryKeys(List<String> primaryKeys) {
this.primaryKeys.addAll(primaryKeys);
}

public List<String> getPrimaryKeys() {
return primaryKeys;
}

}

protected static class ColumnInfo {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public abstract class JdbcSourceStandardTest {

private static final String TABLE_NAME = "id_and_name";
private static final String TABLE_NAME_WITHOUT_PK = "id_and_name_without_pk";
private static final String TABLE_NAME_FULL_NAMES = "full_names";
private static final String TABLE_NAME_COMPOSITE_PK = "full_name_composite_pk";

private JsonNode config;
private JdbcDatabase database;
Expand Down Expand Up @@ -164,11 +164,11 @@ public void setup() throws Exception {
connection.createStatement()
.execute(
String.format("CREATE TABLE %s(first_name VARCHAR(200), last_name VARCHAR(200), updated_at DATE, PRIMARY KEY (first_name, last_name));",
getFullyQualifiedTableName(TABLE_NAME_FULL_NAMES)));
getFullyQualifiedTableName(TABLE_NAME_COMPOSITE_PK)));
connection.createStatement().execute(
String.format(
"INSERT INTO %s(first_name, last_name, updated_at) VALUES ('first' ,'picard', '2004-10-19'), ('second', 'crusher', '2005-10-19'), ('third', 'vash', '2006-10-19');",
getFullyQualifiedTableName(TABLE_NAME_FULL_NAMES)));
getFullyQualifiedTableName(TABLE_NAME_COMPOSITE_PK)));
});
}

Expand Down Expand Up @@ -627,7 +627,7 @@ private static AirbyteCatalog getCatalog(final String defaultNamespace) {
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(Collections.emptyList()),
CatalogHelpers.createAirbyteStream(
defaultNamespace + "." + TABLE_NAME_FULL_NAMES,
defaultNamespace + "." + TABLE_NAME_COMPOSITE_PK,
Field.of("first_name", JsonSchemaPrimitive.STRING),
Field.of("last_name", JsonSchemaPrimitive.STRING),
Field.of("updated_at", JsonSchemaPrimitive.STRING))
Expand Down