Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify and speed up CDC delete support [DestinationsV2] #28029

Merged
merged 12 commits into from
Jul 11, 2023
2 changes: 1 addition & 1 deletion airbyte-integrations/bases/base-normalization/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM fishtownanalytics/dbt:1.0.0
FROM fishtownanalytics/dbt:1.0.0-dev
COPY --from=airbyte/base-airbyte-protocol-python:0.1.1 /airbyte /airbyte

# Install SSH Tunneling dependencies
Expand Down
22 changes: 19 additions & 3 deletions airbyte-integrations/bases/base-normalization/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,30 @@ def buildAirbyteDocker(String customConnector) {
arch = 'linux/arm64'
}

def baseCommand = ['docker', 'buildx', 'build', '--load', '--platform', arch, '-f', getDockerfile(customConnector), '-t', getImageNameWithTag(customConnector), '.']
// println("Building normalization container: " + baseCommand.join(" "))
def cmdArray = ['docker', 'buildx', 'build', '--load', '--platform', arch, '-f', getDockerfile(customConnector), '-t', getImageNameWithTag(customConnector), '.']
// println("Building normalization container: " + cmdArray.join(" "))

return {
commandLine baseCommand
commandLine cmdArray
}
}

task rebuildDbtForLocalArch(type: Exec) {
// we need to rebuild the dbt base image for this system's architecture

def arch = 'linux/amd64'
if (Os.isArch("aarch_64") || Os.isArch("aarch64")) {
arch = 'linux/arm64'
}

def cmdArray = ['docker', 'buildx', 'build', '--load', '--platform', arch, '-f', 'dbt.Dockerfile', '-t', 'fishtownanalytics/dbt:1.0.0-dev', '.']
println("Rebuilding base normalization container: " + cmdArray.join(" "))

commandLine cmdArray
}

airbyteDocker.dependsOn rebuildDbtForLocalArch

task airbyteDockerMSSql(type: Exec, dependsOn: checkSshScriptCopy) {
configure buildAirbyteDocker('mssql')
dependsOn assemble
Expand Down
3 changes: 3 additions & 0 deletions airbyte-integrations/bases/base-normalization/dbt.Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# This dockerfile only exists to pull and re-export this image converted to the local arch of this machine
# It is then consumed by the Dockerfile in this direcotry as "fishtownanalytics/dbt:1.0.0-dev"
FROM fishtownanalytics/dbt:1.0.0
Original file line number Diff line number Diff line change
Expand Up @@ -223,31 +223,37 @@ public String updateTable(final String finalSuffix, final StreamConfig stream) {
}
final String insertNewRecords = insertNewRecords(stream.id(), finalSuffix, stream.columns());
String dedupFinalTable = "";
String cdcDeletes = "";
String dedupRawTable = "";
if (stream.destinationSyncMode() == DestinationSyncMode.APPEND_DEDUP) {
dedupRawTable = dedupRawTable(stream.id(), finalSuffix, stream.columns());
dedupRawTable = dedupRawTable(stream.id(), finalSuffix);
// If we're in dedup mode, then we must have a cursor
dedupFinalTable = dedupFinalTable(stream.id(), finalSuffix, stream.primaryKey(), stream.cursor().get(), stream.columns());
dedupFinalTable = dedupFinalTable(stream.id(), finalSuffix, stream.primaryKey(), stream.cursor().get());
cdcDeletes = cdcDeletes(stream, finalSuffix, stream.columns());
}
final String commitRawTable = commitRawTable(stream.id());

return new StringSubstitutor(Map.of(
"validate_primary_keys", validatePrimaryKeys,
"insert_new_records", insertNewRecords,
"dedup_final_table", dedupFinalTable,
"cdc_deletes", cdcDeletes,
"dedupe_raw_table", dedupRawTable,
"commit_raw_table", commitRawTable)).replace(
"""
DECLARE missing_pk_count INT64;

BEGIN TRANSACTION;

${validate_primary_keys}

${insert_new_records}

${dedup_final_table}

${dedupe_raw_table}

${cdc_deletes}

${commit_raw_table}

Expand Down Expand Up @@ -279,7 +285,8 @@ SELECT COUNT(1)

IF missing_pk_count > 0 THEN
RAISE USING message = FORMAT("Raw table has %s rows missing a primary key", CAST(missing_pk_count AS STRING));
END IF;""");
END IF
;""");
}

@VisibleForTesting
Expand All @@ -303,68 +310,56 @@ String insertNewRecords(final StreamId id, final String finalSuffix, final Linke
.collect(joining(",\n"));
final String columnList = streamColumns.keySet().stream().map(quotedColumnId -> quotedColumnId.name(QUOTE) + ",").collect(joining("\n"));

// Note that we intentionally excluded deleted records from this insert. See dedupRawRecords for an
// explanation of how CDC deletes work.
return new StringSubstitutor(Map.of(
"raw_table_id", id.rawTableId(QUOTE),
"final_table_id", id.finalTableId(finalSuffix, QUOTE),
"column_casts", columnCasts,
"column_errors", columnErrors,
"column_list", columnList)).replace(
"""
INSERT INTO ${final_table_id}
(
${column_list}
_airbyte_meta,
_airbyte_raw_id,
_airbyte_extracted_at
)
WITH intermediate_data AS (
SELECT
${column_casts}
array_concat(
${column_errors}
) as _airbyte_cast_errors,
_airbyte_raw_id,
_airbyte_extracted_at
FROM ${raw_table_id}
WHERE
_airbyte_loaded_at IS NULL
AND (
evantahler marked this conversation as resolved.
Show resolved Hide resolved
JSON_QUERY(`_airbyte_data`, '$._ab_cdc_deleted_at') IS NULL
OR JSON_TYPE(JSON_QUERY(`_airbyte_data`, '$._ab_cdc_deleted_at')) = 'null'
INSERT INTO ${final_table_id}
(
${column_list}
_airbyte_meta,
_airbyte_raw_id,
_airbyte_extracted_at
)
)
SELECT
${column_list}
to_json(struct(_airbyte_cast_errors AS errors)) AS _airbyte_meta,
_airbyte_raw_id,
_airbyte_extracted_at
FROM intermediate_data;""");
WITH intermediate_data AS (
SELECT
${column_casts}
array_concat(
${column_errors}
) as _airbyte_cast_errors,
_airbyte_raw_id,
_airbyte_extracted_at
FROM ${raw_table_id}
WHERE
_airbyte_loaded_at IS NULL
OR (
_airbyte_loaded_at IS NOT NULL
AND JSON_VALUE(`_airbyte_data`, '$._ab_cdc_deleted_at') IS NOT NULL
)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's the main logical change (from airbytehq/typing-and-deduping-sql#21) - we are inserting both the new records or previous CDC-deleted records to the raw table for cursor comparison

)
SELECT
${column_list}
to_json(struct(_airbyte_cast_errors AS errors)) AS _airbyte_meta,
_airbyte_raw_id,
_airbyte_extracted_at
FROM intermediate_data;""");
}

@VisibleForTesting
String dedupFinalTable(final StreamId id,
final String finalSuffix,
final List<ColumnId> primaryKey,
final ColumnId cursor,
final LinkedHashMap<ColumnId, AirbyteType> streamColumns) {
final ColumnId cursor) {
final String pkList = primaryKey.stream().map(columnId -> columnId.name(QUOTE)).collect(joining(","));
final String pkCastList = streamColumns.entrySet().stream()
.filter(e -> primaryKey.contains(e.getKey()))
.map(e -> extractAndCast(e.getKey(), e.getValue()))
.collect(joining(",\n "));
final String cursorCast = extractAndCast(cursor, streamColumns.get(cursor));

// See dedupRawTable for an explanation of why we delete records using the raw data rather than the
// final table's _ab_cdc_deleted_at column.

return new StringSubstitutor(Map.of(
"raw_table_id", id.rawTableId(QUOTE),
"final_table_id", id.finalTableId(finalSuffix, QUOTE),
"pk_list", pkList,
"pk_cast_list", pkCastList,
"cursor_name", cursor.name(QUOTE),
"cursor_cast", cursorCast)).replace(
"cursor_name", cursor.name(QUOTE))
).replace(
"""
DELETE FROM ${final_table_id}
WHERE
Expand All @@ -375,52 +370,50 @@ String dedupFinalTable(final StreamId id,
) as row_number FROM ${final_table_id}
)
WHERE row_number != 1
);

DELETE FROM ${final_table_id}
WHERE
${pk_list} IN (
SELECT (
${pk_cast_list}
)
FROM ${raw_table_id}
WHERE
JSON_QUERY(`_airbyte_data`, '$._ab_cdc_deleted_at') IS NOT NULL
AND JSON_TYPE(JSON_QUERY(`_airbyte_data`, '$._ab_cdc_deleted_at')) != 'null'
AND ${cursor_name} < ${cursor_cast}
);""");
)
;""");
}

@VisibleForTesting
String dedupRawTable(final StreamId id, final String finalSuffix, LinkedHashMap<ColumnId, AirbyteType> streamColumns) {
/*
* Note that we need to keep the deletion raw records because of how async syncs work. Consider this
* sequence of source events: 1. Insert record id=1 2. Update record id=1 3. Delete record id=1
*
* It's possible for the destination to receive them out of order, e.g.: 1. Insert 2. Delete 3.
* Update
*
* We can generally resolve this using the cursor column (e.g. multiple updates in the wrong order).
* However, deletions are special because we propagate them as hard deletes to the final table. As a
* result, we need to keep the deletion in the raw table, so that a late-arriving update doesn't
* incorrectly reinsert the final record.
*/
String cdcDeletedAtClause;
if (streamColumns.containsKey(CDC_DELETED_AT_COLUMN)) {
cdcDeletedAtClause = """
AND (
JSON_QUERY(`_airbyte_data`, '$._ab_cdc_deleted_at') IS NULL
OR JSON_TYPE(JSON_QUERY(`_airbyte_data`, '$._ab_cdc_deleted_at')) = 'null'
)
""";
} else {
cdcDeletedAtClause = "";
String cdcDeletes(final StreamConfig stream,
final String finalSuffix,
final LinkedHashMap<ColumnId, AirbyteType> streamColumns) {

if (stream.destinationSyncMode() != DestinationSyncMode.APPEND_DEDUP){
return "";
}

if (!streamColumns.containsKey(CDC_DELETED_AT_COLUMN)){
evantahler marked this conversation as resolved.
Show resolved Hide resolved
return "";
}

// we want to grab IDs for deletion from the raw table (not the final table itself) to hand out-of-order record insertions after the delete has been registered
return new StringSubstitutor(Map.of(
"final_table_id", stream.id().finalTableId(finalSuffix, QUOTE),
"raw_table_id", stream.id().rawTableId(QUOTE),
"quoted_cdc_delete_column", QUOTE + "_ab_cdc_deleted_at" + QUOTE)
).replace(
// TODO replace `id`, `$.id` with PK
// TODO replace `INT64` with PK's type
Copy link
Contributor Author

@evantahler evantahler Jul 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"""
DELETE FROM ${final_table_id}
WHERE
`id` IN (
SELECT
SAFE_CAST(JSON_VALUE(`_airbyte_data`, '$.id') as INT64) as id
FROM ${raw_table_id}
WHERE
JSON_VALUE(`_airbyte_data`, '$._ab_cdc_deleted_at') IS NOT NULL
)
;"""
);
}

@VisibleForTesting
String dedupRawTable(final StreamId id, final String finalSuffix) {
return new StringSubstitutor(Map.of(
"raw_table_id", id.rawTableId(QUOTE),
"final_table_id", id.finalTableId(finalSuffix, QUOTE),
"cdc_deleted_at_clause", cdcDeletedAtClause)).replace(
"final_table_id", id.finalTableId(finalSuffix, QUOTE))).replace(
// Note that this leaves _all_ deletion records in the raw table. We _could_ clear them out, but it
// would be painful,
// and it only matters in a few edge cases.
Expand All @@ -431,7 +424,6 @@ OR JSON_TYPE(JSON_QUERY(`_airbyte_data`, '$._ab_cdc_deleted_at')) = 'null'
`_airbyte_raw_id` NOT IN (
SELECT `_airbyte_raw_id` FROM ${final_table_id}
)
${cdc_deleted_at_clause}
;""");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ public void testDedupFinalTable() throws InterruptedException {
"""))
.build());

final String sql = GENERATOR.dedupFinalTable(streamId, "", PRIMARY_KEY, CURSOR, COLUMNS);
final String sql = GENERATOR.dedupFinalTable(streamId, "", PRIMARY_KEY, CURSOR);
logAndExecute(sql);

final TableResult result = bq.query(QueryJobConfiguration.newBuilder("SELECT * FROM " + streamId.finalTableId(QUOTE)).build());
Expand Down Expand Up @@ -343,7 +343,7 @@ public void testDedupRawTable() throws InterruptedException {
"""))
.build());

final String sql = GENERATOR.dedupRawTable(streamId, "", CDC_COLUMNS);
final String sql = GENERATOR.dedupRawTable(streamId, "");
logAndExecute(sql);

final TableResult result = bq.query(QueryJobConfiguration.newBuilder("SELECT * FROM " + streamId.rawTableId(QUOTE)).build());
Expand Down Expand Up @@ -618,6 +618,32 @@ public void testRenameFinalTable() throws InterruptedException {
assertNotNull(table);
}

@Test
public void testCdcBasics() throws InterruptedException {
createRawTable();
createFinalTableCdc();
bq.query(QueryJobConfiguration.newBuilder(
new StringSubstitutor(Map.of(
"dataset", testDataset)).replace(
"""
INSERT INTO ${dataset}.users_raw (`_airbyte_data`, `_airbyte_raw_id`, `_airbyte_extracted_at`, `_airbyte_loaded_at`) VALUES
(JSON'{"id": 1, "_ab_cdc_lsn": 10001, "_ab_cdc_deleted_at": "2023-01-01T00:01:00Z"}', generate_uuid(), '2023-01-01T00:00:00Z', NULL);
"""))
.build());

final String sql = GENERATOR.updateTable("", cdcStreamConfig());
logAndExecute(sql);

// TODO better asserts
final long finalRows = bq.query(QueryJobConfiguration.newBuilder("SELECT * FROM " + streamId.finalTableId("", QUOTE)).build()).getTotalRows();
assertEquals(0, finalRows);
final long rawRows = bq.query(QueryJobConfiguration.newBuilder("SELECT * FROM " + streamId.rawTableId(QUOTE)).build()).getTotalRows();
assertEquals(1, rawRows);
final long rawUntypedRows = bq.query(QueryJobConfiguration.newBuilder(
"SELECT * FROM " + streamId.rawTableId(QUOTE) + " WHERE _airbyte_loaded_at IS NULL").build()).getTotalRows();
assertEquals(0, rawUntypedRows);
}

@Test
public void testCdcUpdate() throws InterruptedException {
createRawTable();
Expand Down Expand Up @@ -652,18 +678,10 @@ public void testCdcUpdate() throws InterruptedException {
final String sql = GENERATOR.updateTable("", cdcStreamConfig());
logAndExecute(sql);

// TODO
final long finalRows = bq.query(QueryJobConfiguration.newBuilder("SELECT * FROM " + streamId.finalTableId("", QUOTE)).build()).getTotalRows();
assertEquals(4, finalRows);
assertEquals(5, finalRows);
edgao marked this conversation as resolved.
Show resolved Hide resolved
final long rawRows = bq.query(QueryJobConfiguration.newBuilder("SELECT * FROM " + streamId.rawTableId(QUOTE)).build()).getTotalRows();
// Explanation:
// id=0 has two raw records (the old deletion record + zombie_returned)
// id=1 has one raw record (the new deletion record; the old raw record was deleted)
// id=2 has one raw record (the newer alice2 record)
// id=3 has one raw record
// id=4 has one raw record
// id=5 has one raw deletion record
assertEquals(7, rawRows);
assertEquals(6, rawRows); // we only keep the newest raw record for reach PK
final long rawUntypedRows = bq.query(QueryJobConfiguration.newBuilder(
"SELECT * FROM " + streamId.rawTableId(QUOTE) + " WHERE _airbyte_loaded_at IS NULL").build()).getTotalRows();
assertEquals(0, rawUntypedRows);
Expand Down Expand Up @@ -704,7 +722,6 @@ -- insert raw record from the second record batch - this is an outdated record t
final String sql = GENERATOR.updateTable("", cdcStreamConfig());
logAndExecute(sql);

// TODO better asserts
final long finalRows = bq.query(QueryJobConfiguration.newBuilder("SELECT * FROM " + streamId.finalTableId("", QUOTE)).build()).getTotalRows();
assertEquals(0, finalRows);
final long rawRows = bq.query(QueryJobConfiguration.newBuilder("SELECT * FROM " + streamId.rawTableId(QUOTE)).build()).getTotalRows();
Expand Down Expand Up @@ -755,7 +772,7 @@ -- second record batch
final long finalRows = bq.query(QueryJobConfiguration.newBuilder("SELECT * FROM " + streamId.finalTableId("", QUOTE)).build()).getTotalRows();
assertEquals(1, finalRows);
final long rawRows = bq.query(QueryJobConfiguration.newBuilder("SELECT * FROM " + streamId.rawTableId(QUOTE)).build()).getTotalRows();
assertEquals(2, rawRows);
assertEquals(1, rawRows);
final long rawUntypedRows = bq.query(QueryJobConfiguration.newBuilder(
"SELECT * FROM " + streamId.rawTableId(QUOTE) + " WHERE _airbyte_loaded_at IS NULL").build()).getTotalRows();
assertEquals(0, rawUntypedRows);
Expand Down