Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify and speed up CDC delete support [DestinationsV2] #28029

Merged
merged 12 commits into from
Jul 11, 2023
2 changes: 1 addition & 1 deletion airbyte-integrations/bases/base-normalization/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM fishtownanalytics/dbt:1.0.0
FROM fishtownanalytics/dbt:1.0.0-dev
COPY --from=airbyte/base-airbyte-protocol-python:0.1.1 /airbyte /airbyte

# Install SSH Tunneling dependencies
Expand Down
22 changes: 19 additions & 3 deletions airbyte-integrations/bases/base-normalization/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,30 @@ def buildAirbyteDocker(String customConnector) {
arch = 'linux/arm64'
}

def baseCommand = ['docker', 'buildx', 'build', '--load', '--platform', arch, '-f', getDockerfile(customConnector), '-t', getImageNameWithTag(customConnector), '.']
// println("Building normalization container: " + baseCommand.join(" "))
def cmdArray = ['docker', 'buildx', 'build', '--load', '--platform', arch, '-f', getDockerfile(customConnector), '-t', getImageNameWithTag(customConnector), '.']
// println("Building normalization container: " + cmdArray.join(" "))

return {
commandLine baseCommand
commandLine cmdArray
}
}

task rebuildDbtForLocalArch(type: Exec) {
// we need to rebuild the dbt base image for this system's architecture

def arch = 'linux/amd64'
if (Os.isArch("aarch_64") || Os.isArch("aarch64")) {
arch = 'linux/arm64'
}

def cmdArray = ['docker', 'buildx', 'build', '--load', '--platform', arch, '-f', 'dbt.Dockerfile', '-t', 'fishtownanalytics/dbt:1.0.0-dev', '.']
println("Rebuilding base normalization container: " + cmdArray.join(" "))

commandLine cmdArray
}

airbyteDocker.dependsOn rebuildDbtForLocalArch

task airbyteDockerMSSql(type: Exec, dependsOn: checkSshScriptCopy) {
configure buildAirbyteDocker('mssql')
dependsOn assemble
Expand Down
3 changes: 3 additions & 0 deletions airbyte-integrations/bases/base-normalization/dbt.Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# This dockerfile only exists to pull and re-export this image converted to the local arch of this machine
# It is then consumed by the Dockerfile in this direcotry as "fishtownanalytics/dbt:1.0.0-dev"
FROM fishtownanalytics/dbt:1.0.0
Original file line number Diff line number Diff line change
Expand Up @@ -224,31 +224,37 @@ public String updateTable(final String finalSuffix, final StreamConfig stream) {
}
final String insertNewRecords = insertNewRecords(stream.id(), finalSuffix, stream.columns());
String dedupFinalTable = "";
String cdcDeletes = "";
String dedupRawTable = "";
if (stream.destinationSyncMode() == DestinationSyncMode.APPEND_DEDUP) {
dedupRawTable = dedupRawTable(stream.id(), finalSuffix, stream.columns());
dedupRawTable = dedupRawTable(stream.id(), finalSuffix);
// If we're in dedup mode, then we must have a cursor
dedupFinalTable = dedupFinalTable(stream.id(), finalSuffix, stream.primaryKey(), stream.cursor().get(), stream.columns());
dedupFinalTable = dedupFinalTable(stream.id(), finalSuffix, stream.primaryKey(), stream.cursor().get());
cdcDeletes = cdcDeletes(stream, finalSuffix, stream.columns());
}
final String commitRawTable = commitRawTable(stream.id());

return new StringSubstitutor(Map.of(
"validate_primary_keys", validatePrimaryKeys,
"insert_new_records", insertNewRecords,
"dedup_final_table", dedupFinalTable,
"cdc_deletes", cdcDeletes,
"dedupe_raw_table", dedupRawTable,
"commit_raw_table", commitRawTable)).replace(
"""
DECLARE missing_pk_count INT64;

BEGIN TRANSACTION;

${validate_primary_keys}

${insert_new_records}

${dedup_final_table}

${dedupe_raw_table}

${cdc_deletes}

${commit_raw_table}

Expand Down Expand Up @@ -280,7 +286,8 @@ SELECT COUNT(1)

IF missing_pk_count > 0 THEN
RAISE USING message = FORMAT("Raw table has %s rows missing a primary key", CAST(missing_pk_count AS STRING));
END IF;""");
END IF
;""");
}

@VisibleForTesting
Expand All @@ -304,68 +311,64 @@ String insertNewRecords(final StreamId id, final String finalSuffix, final Linke
.collect(joining(",\n"));
final String columnList = streamColumns.keySet().stream().map(quotedColumnId -> quotedColumnId.name(QUOTE) + ",").collect(joining("\n"));

// Note that we intentionally excluded deleted records from this insert. See dedupRawRecords for an
// explanation of how CDC deletes work.
String cdcConditionalOrIncludeStatement = "";
if (streamColumns.containsKey(CDC_DELETED_AT_COLUMN)){
cdcConditionalOrIncludeStatement = """
OR (
_airbyte_loaded_at IS NOT NULL
AND JSON_VALUE(`_airbyte_data`, '$._ab_cdc_deleted_at') IS NOT NULL
)
""";
}

return new StringSubstitutor(Map.of(
"raw_table_id", id.rawTableId(QUOTE),
"final_table_id", id.finalTableId(finalSuffix, QUOTE),
"column_casts", columnCasts,
"column_errors", columnErrors,
"cdcConditionalOrIncludeStatement", cdcConditionalOrIncludeStatement,
"column_list", columnList)).replace(
"""
INSERT INTO ${final_table_id}
(
${column_list}
_airbyte_meta,
_airbyte_raw_id,
_airbyte_extracted_at
)
WITH intermediate_data AS (
SELECT
${column_casts}
array_concat(
${column_errors}
) as _airbyte_cast_errors,
_airbyte_raw_id,
_airbyte_extracted_at
FROM ${raw_table_id}
WHERE
_airbyte_loaded_at IS NULL
AND (
evantahler marked this conversation as resolved.
Show resolved Hide resolved
JSON_QUERY(`_airbyte_data`, '$._ab_cdc_deleted_at') IS NULL
OR JSON_TYPE(JSON_QUERY(`_airbyte_data`, '$._ab_cdc_deleted_at')) = 'null'
INSERT INTO ${final_table_id}
(
${column_list}
_airbyte_meta,
_airbyte_raw_id,
_airbyte_extracted_at
)
)
SELECT
${column_list}
to_json(struct(_airbyte_cast_errors AS errors)) AS _airbyte_meta,
_airbyte_raw_id,
_airbyte_extracted_at
FROM intermediate_data;""");
WITH intermediate_data AS (
SELECT
${column_casts}
array_concat(
${column_errors}
) as _airbyte_cast_errors,
_airbyte_raw_id,
_airbyte_extracted_at
FROM ${raw_table_id}
WHERE
_airbyte_loaded_at IS NULL
${cdcConditionalOrIncludeStatement}
)
SELECT
${column_list}
to_json(struct(_airbyte_cast_errors AS errors)) AS _airbyte_meta,
_airbyte_raw_id,
_airbyte_extracted_at
FROM intermediate_data;""");
}

@VisibleForTesting
String dedupFinalTable(final StreamId id,
final String finalSuffix,
final List<ColumnId> primaryKey,
final ColumnId cursor,
final LinkedHashMap<ColumnId, AirbyteType> streamColumns) {
final ColumnId cursor) {
final String pkList = primaryKey.stream().map(columnId -> columnId.name(QUOTE)).collect(joining(","));
final String pkCastList = streamColumns.entrySet().stream()
.filter(e -> primaryKey.contains(e.getKey()))
.map(e -> extractAndCast(e.getKey(), e.getValue()))
.collect(joining(",\n "));
final String cursorCast = extractAndCast(cursor, streamColumns.get(cursor));

// See dedupRawTable for an explanation of why we delete records using the raw data rather than the
// final table's _ab_cdc_deleted_at column.

return new StringSubstitutor(Map.of(
"raw_table_id", id.rawTableId(QUOTE),
"final_table_id", id.finalTableId(finalSuffix, QUOTE),
"pk_list", pkList,
"pk_cast_list", pkCastList,
"cursor_name", cursor.name(QUOTE),
"cursor_cast", cursorCast)).replace(
"cursor_name", cursor.name(QUOTE))
).replace(
"""
DELETE FROM ${final_table_id}
WHERE
Expand All @@ -376,52 +379,54 @@ String dedupFinalTable(final StreamId id,
) as row_number FROM ${final_table_id}
)
WHERE row_number != 1
);

DELETE FROM ${final_table_id}
WHERE
${pk_list} IN (
SELECT (
${pk_cast_list}
)
FROM ${raw_table_id}
WHERE
JSON_QUERY(`_airbyte_data`, '$._ab_cdc_deleted_at') IS NOT NULL
AND JSON_TYPE(JSON_QUERY(`_airbyte_data`, '$._ab_cdc_deleted_at')) != 'null'
AND ${cursor_name} < ${cursor_cast}
);""");
)
;""");
}

@VisibleForTesting
String dedupRawTable(final StreamId id, final String finalSuffix, LinkedHashMap<ColumnId, AirbyteType> streamColumns) {
/*
* Note that we need to keep the deletion raw records because of how async syncs work. Consider this
* sequence of source events: 1. Insert record id=1 2. Update record id=1 3. Delete record id=1
*
* It's possible for the destination to receive them out of order, e.g.: 1. Insert 2. Delete 3.
* Update
*
* We can generally resolve this using the cursor column (e.g. multiple updates in the wrong order).
* However, deletions are special because we propagate them as hard deletes to the final table. As a
* result, we need to keep the deletion in the raw table, so that a late-arriving update doesn't
* incorrectly reinsert the final record.
*/
String cdcDeletedAtClause;
if (streamColumns.containsKey(CDC_DELETED_AT_COLUMN)) {
cdcDeletedAtClause = """
AND (
JSON_QUERY(`_airbyte_data`, '$._ab_cdc_deleted_at') IS NULL
OR JSON_TYPE(JSON_QUERY(`_airbyte_data`, '$._ab_cdc_deleted_at')) = 'null'
)
""";
} else {
cdcDeletedAtClause = "";
String cdcDeletes(final StreamConfig stream,
final String finalSuffix,
final LinkedHashMap<ColumnId, AirbyteType> streamColumns) {

if (stream.destinationSyncMode() != DestinationSyncMode.APPEND_DEDUP){
return "";
}

if (!streamColumns.containsKey(CDC_DELETED_AT_COLUMN)){
evantahler marked this conversation as resolved.
Show resolved Hide resolved
return "";
}

final String pkList = stream.primaryKey().stream().map(columnId -> columnId.name(QUOTE)).collect(joining(","));
String pkCasts = stream.primaryKey().stream().map(pk -> extractAndCast(pk, streamColumns.get(pk))).collect(joining(",\n"));

// we want to grab IDs for deletion from the raw table (not the final table itself) to hand out-of-order record insertions after the delete has been registered
return new StringSubstitutor(Map.of(
"final_table_id", stream.id().finalTableId(finalSuffix, QUOTE),
"raw_table_id", stream.id().rawTableId(QUOTE),
"pk_list", pkList,
"pk_extracts", pkCasts,
"quoted_cdc_delete_column", QUOTE + "_ab_cdc_deleted_at" + QUOTE)
).replace(
"""
DELETE FROM ${final_table_id}
WHERE
(${pk_list}) IN (
SELECT (
${pk_extracts}
)
FROM ${raw_table_id}
WHERE
JSON_VALUE(`_airbyte_data`, '$._ab_cdc_deleted_at') IS NOT NULL
)
;"""
);
}

@VisibleForTesting
String dedupRawTable(final StreamId id, final String finalSuffix) {
return new StringSubstitutor(Map.of(
"raw_table_id", id.rawTableId(QUOTE),
"final_table_id", id.finalTableId(finalSuffix, QUOTE),
"cdc_deleted_at_clause", cdcDeletedAtClause)).replace(
"final_table_id", id.finalTableId(finalSuffix, QUOTE))).replace(
// Note that this leaves _all_ deletion records in the raw table. We _could_ clear them out, but it
// would be painful,
// and it only matters in a few edge cases.
Expand All @@ -432,7 +437,6 @@ OR JSON_TYPE(JSON_QUERY(`_airbyte_data`, '$._ab_cdc_deleted_at')) = 'null'
`_airbyte_raw_id` NOT IN (
SELECT `_airbyte_raw_id` FROM ${final_table_id}
)
${cdc_deleted_at_clause}
;""");
}

Expand Down
Loading