Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mysql source : implement support for snapshot of new tables in cdc mode #16954

Merged
merged 14 commits into from
Sep 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@
- name: MySQL
sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerRepository: airbyte/source-mysql
dockerImageTag: 0.6.13
dockerImageTag: 0.6.14
documentationUrl: https://docs.airbyte.io/integrations/sources/mysql
icon: mysql.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6628,7 +6628,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-mysql:0.6.13"
- dockerImage: "airbyte/source-mysql:0.6.14"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/mysql"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,32 +58,29 @@ public AirbyteDebeziumHandler(final JsonNode config,
this.firstRecordWaitTime = firstRecordWaitTime;
}

public AutoCloseableIterator<AirbyteMessage> getSnapshotIterators(final ConfiguredAirbyteCatalog catalog,
public AutoCloseableIterator<AirbyteMessage> getSnapshotIterators(
final ConfiguredAirbyteCatalog catalogContainingStreamsToSnapshot,
final CdcMetadataInjector cdcMetadataInjector,
final Properties connectorProperties,
final Properties snapshotProperties,
final CdcStateHandler cdcStateHandler,
final Instant emittedAt) {
LOGGER.info("Running snapshot for " + catalog.getStreams().size() + " new tables");

LOGGER.info("Running snapshot for " + catalogContainingStreamsToSnapshot.getStreams().size() + " new tables");
final LinkedBlockingQueue<ChangeEvent<String, String>> queue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);

final AirbyteFileOffsetBackingStore offsetManager = AirbyteFileOffsetBackingStore.initializeDummyStateForSnapshotPurpose();
/*
* TODO(Subodh) : Since Postgres doesn't require schema history this is fine but we need to fix this
* for MySQL and MSSQL
*/
final Optional<AirbyteSchemaHistoryStorage> schemaHistoryManager = Optional.empty();
final DebeziumRecordPublisher publisher = new DebeziumRecordPublisher(connectorProperties,
final DebeziumRecordPublisher tableSnapshotPublisher = new DebeziumRecordPublisher(snapshotProperties,
config,
catalog,
catalogContainingStreamsToSnapshot,
offsetManager,
schemaHistoryManager);
publisher.start(queue);
schemaHistoryManager(new EmptySavedInfo()));
tableSnapshotPublisher.start(queue);

final AutoCloseableIterator<ChangeEvent<String, String>> eventIterator = new DebeziumRecordIterator(
queue,
targetPosition,
publisher::hasClosed,
publisher::close,
tableSnapshotPublisher::hasClosed,
tableSnapshotPublisher::close,
firstRecordWaitTime);

return AutoCloseableIterators.concatWithEagerClose(AutoCloseableIterators
Expand Down Expand Up @@ -155,4 +152,18 @@ public static boolean shouldUseCDC(final ConfiguredAirbyteCatalog catalog) {
.anyMatch(syncMode -> syncMode == SyncMode.INCREMENTAL);
}

private static class EmptySavedInfo implements CdcSavedInfoFetcher {

@Override
public JsonNode getSavedOffset() {
return null;
}

@Override
public Optional<JsonNode> getSavedSchemaHistory() {
return Optional.empty();
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.integrations.debezium;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand All @@ -27,11 +28,13 @@
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.AirbyteStreamState;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.StreamDescriptor;
import io.airbyte.protocol.models.SyncMode;
import java.sql.SQLException;
import java.util.ArrayList;
Expand Down Expand Up @@ -177,12 +180,14 @@ private void createAndPopulateActualTable() {
* databases not being synced by Airbyte are not causing issues with our debezium logic
*/
private void createAndPopulateRandomTable() {
createSchema(MODELS_SCHEMA + "_random");
createTable(MODELS_SCHEMA + "_random", MODELS_STREAM_NAME + "_random",
if (!randomTableSchema().equals(MODELS_SCHEMA)) {
createSchema(randomTableSchema());
}
createTable(randomTableSchema(), MODELS_STREAM_NAME + "_random",
columnClause(ImmutableMap.of(COL_ID + "_random", "INTEGER", COL_MAKE_ID + "_random", "INTEGER", COL_MODEL + "_random", "VARCHAR(200)"),
Optional.of(COL_ID + "_random")));
for (final JsonNode recordJson : MODEL_RECORDS_RANDOM) {
writeRecords(recordJson, MODELS_SCHEMA + "_random", MODELS_STREAM_NAME + "_random",
writeRecords(recordJson, randomTableSchema(), MODELS_STREAM_NAME + "_random",
COL_ID + "_random", COL_MAKE_ID + "_random", COL_MODEL + "_random");
}
}
Expand Down Expand Up @@ -585,6 +590,181 @@ void testDiscover() throws Exception {
.collect(Collectors.toList()));
}

@Test
public void newTableSnapshotTest() throws Exception {
final AutoCloseableIterator<AirbyteMessage> firstBatchIterator = getSource()
.read(getConfig(), CONFIGURED_CATALOG, null);
final List<AirbyteMessage> dataFromFirstBatch = AutoCloseableIterators
.toListAndClose(firstBatchIterator);
final Set<AirbyteRecordMessage> recordsFromFirstBatch = extractRecordMessages(
dataFromFirstBatch);
final List<AirbyteStateMessage> stateAfterFirstBatch = extractStateMessages(dataFromFirstBatch);
assertEquals(1, stateAfterFirstBatch.size());

final AirbyteStateMessage stateMessageEmittedAfterFirstSyncCompletion = stateAfterFirstBatch.get(0);
assertEquals(AirbyteStateMessage.AirbyteStateType.GLOBAL, stateMessageEmittedAfterFirstSyncCompletion.getType());
assertNotNull(stateMessageEmittedAfterFirstSyncCompletion.getGlobal().getSharedState());
final Set<StreamDescriptor> streamsInStateAfterFirstSyncCompletion = stateMessageEmittedAfterFirstSyncCompletion.getGlobal().getStreamStates()
.stream()
.map(AirbyteStreamState::getStreamDescriptor)
.collect(Collectors.toSet());
assertEquals(1, streamsInStateAfterFirstSyncCompletion.size());
assertTrue(streamsInStateAfterFirstSyncCompletion.contains(new StreamDescriptor().withName(MODELS_STREAM_NAME).withNamespace(MODELS_SCHEMA)));
assertNotNull(stateMessageEmittedAfterFirstSyncCompletion.getData());

assertEquals((MODEL_RECORDS.size()), recordsFromFirstBatch.size());
assertExpectedRecords(new HashSet<>(MODEL_RECORDS), recordsFromFirstBatch);

final JsonNode state = stateAfterFirstBatch.get(0).getData();

final ConfiguredAirbyteCatalog newTables = CatalogHelpers
.toDefaultConfiguredCatalog(new AirbyteCatalog().withStreams(List.of(
CatalogHelpers.createAirbyteStream(
MODELS_STREAM_NAME + "_random",
randomTableSchema(),
Field.of(COL_ID + "_random", JsonSchemaType.NUMBER),
Field.of(COL_MAKE_ID + "_random", JsonSchemaType.NUMBER),
Field.of(COL_MODEL + "_random", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of(COL_ID + "_random"))))));

newTables.getStreams().forEach(s -> s.setSyncMode(SyncMode.INCREMENTAL));
final List<ConfiguredAirbyteStream> combinedStreams = new ArrayList<>();
combinedStreams.addAll(CONFIGURED_CATALOG.getStreams());
combinedStreams.addAll(newTables.getStreams());

final ConfiguredAirbyteCatalog updatedCatalog = new ConfiguredAirbyteCatalog().withStreams(combinedStreams);

/*
* Write 20 records to the existing table
*/
final Set<JsonNode> recordsWritten = new HashSet<>();
for (int recordsCreated = 0; recordsCreated < 20; recordsCreated++) {
final JsonNode record =
Jsons.jsonNode(ImmutableMap
.of(COL_ID, 100 + recordsCreated, COL_MAKE_ID, 1, COL_MODEL,
"F-" + recordsCreated));
recordsWritten.add(record);
writeModelRecord(record);
}

final AutoCloseableIterator<AirbyteMessage> secondBatchIterator = getSource()
.read(getConfig(), updatedCatalog, state);
final List<AirbyteMessage> dataFromSecondBatch = AutoCloseableIterators
.toListAndClose(secondBatchIterator);

final List<AirbyteStateMessage> stateAfterSecondBatch = extractStateMessages(dataFromSecondBatch);
assertEquals(2, stateAfterSecondBatch.size());

final AirbyteStateMessage stateMessageEmittedAfterSnapshotCompletionInSecondSync = stateAfterSecondBatch.get(0);
assertEquals(AirbyteStateMessage.AirbyteStateType.GLOBAL, stateMessageEmittedAfterSnapshotCompletionInSecondSync.getType());
assertEquals(stateMessageEmittedAfterFirstSyncCompletion.getGlobal().getSharedState(),
stateMessageEmittedAfterSnapshotCompletionInSecondSync.getGlobal().getSharedState());
final Set<StreamDescriptor> streamsInSnapshotState = stateMessageEmittedAfterSnapshotCompletionInSecondSync.getGlobal().getStreamStates()
.stream()
.map(AirbyteStreamState::getStreamDescriptor)
.collect(Collectors.toSet());
assertEquals(2, streamsInSnapshotState.size());
assertTrue(
streamsInSnapshotState.contains(new StreamDescriptor().withName(MODELS_STREAM_NAME + "_random").withNamespace(randomTableSchema())));
assertTrue(streamsInSnapshotState.contains(new StreamDescriptor().withName(MODELS_STREAM_NAME).withNamespace(MODELS_SCHEMA)));
assertNotNull(stateMessageEmittedAfterSnapshotCompletionInSecondSync.getData());

final AirbyteStateMessage stateMessageEmittedAfterSecondSyncCompletion = stateAfterSecondBatch.get(1);
assertEquals(AirbyteStateMessage.AirbyteStateType.GLOBAL, stateMessageEmittedAfterSecondSyncCompletion.getType());
assertNotEquals(stateMessageEmittedAfterFirstSyncCompletion.getGlobal().getSharedState(),
stateMessageEmittedAfterSecondSyncCompletion.getGlobal().getSharedState());
final Set<StreamDescriptor> streamsInSyncCompletionState = stateMessageEmittedAfterSecondSyncCompletion.getGlobal().getStreamStates()
.stream()
.map(AirbyteStreamState::getStreamDescriptor)
.collect(Collectors.toSet());
assertEquals(2, streamsInSnapshotState.size());
assertTrue(
streamsInSyncCompletionState.contains(
new StreamDescriptor().withName(MODELS_STREAM_NAME + "_random").withNamespace(randomTableSchema())));
assertTrue(streamsInSyncCompletionState.contains(new StreamDescriptor().withName(MODELS_STREAM_NAME).withNamespace(MODELS_SCHEMA)));
assertNotNull(stateMessageEmittedAfterSecondSyncCompletion.getData());

final Map<String, Set<AirbyteRecordMessage>> recordsStreamWise = extractRecordMessagesStreamWise(dataFromSecondBatch);
assertTrue(recordsStreamWise.containsKey(MODELS_STREAM_NAME));
assertTrue(recordsStreamWise.containsKey(MODELS_STREAM_NAME + "_random"));

final Set<AirbyteRecordMessage> recordsForModelsStreamFromSecondBatch = recordsStreamWise.get(MODELS_STREAM_NAME);
final Set<AirbyteRecordMessage> recordsForModelsRandomStreamFromSecondBatch = recordsStreamWise.get(MODELS_STREAM_NAME + "_random");

assertEquals((MODEL_RECORDS_RANDOM.size()), recordsForModelsRandomStreamFromSecondBatch.size());
assertEquals(20, recordsForModelsStreamFromSecondBatch.size());
assertExpectedRecords(new HashSet<>(MODEL_RECORDS_RANDOM), recordsForModelsRandomStreamFromSecondBatch,
recordsForModelsRandomStreamFromSecondBatch.stream().map(AirbyteRecordMessage::getStream).collect(
Collectors.toSet()),
Sets
.newHashSet(MODELS_STREAM_NAME + "_random"),
randomTableSchema());
assertExpectedRecords(recordsWritten, recordsForModelsStreamFromSecondBatch);

/*
* Write 20 records to both the tables
*/
final Set<JsonNode> recordsWrittenInRandomTable = new HashSet<>();
recordsWritten.clear();
for (int recordsCreated = 30; recordsCreated < 50; recordsCreated++) {
final JsonNode record =
Jsons.jsonNode(ImmutableMap
.of(COL_ID, 100 + recordsCreated, COL_MAKE_ID, 1, COL_MODEL,
"F-" + recordsCreated));
writeModelRecord(record);
recordsWritten.add(record);

final JsonNode record2 = Jsons
.jsonNode(ImmutableMap
.of(COL_ID + "_random", 11000 + recordsCreated, COL_MAKE_ID + "_random", 1 + recordsCreated, COL_MODEL + "_random",
"Fiesta-random" + recordsCreated));
writeRecords(record2, randomTableSchema(), MODELS_STREAM_NAME + "_random",
COL_ID + "_random", COL_MAKE_ID + "_random", COL_MODEL + "_random");
recordsWrittenInRandomTable.add(record2);
}

final JsonNode state2 = stateAfterSecondBatch.get(1).getData();
final AutoCloseableIterator<AirbyteMessage> thirdBatchIterator = getSource()
.read(getConfig(), updatedCatalog, state2);
final List<AirbyteMessage> dataFromThirdBatch = AutoCloseableIterators
.toListAndClose(thirdBatchIterator);

final List<AirbyteStateMessage> stateAfterThirdBatch = extractStateMessages(dataFromThirdBatch);
assertEquals(1, stateAfterThirdBatch.size());

final AirbyteStateMessage stateMessageEmittedAfterThirdSyncCompletion = stateAfterThirdBatch.get(0);
assertEquals(AirbyteStateMessage.AirbyteStateType.GLOBAL, stateMessageEmittedAfterThirdSyncCompletion.getType());
assertNotEquals(stateMessageEmittedAfterThirdSyncCompletion.getGlobal().getSharedState(),
stateMessageEmittedAfterSecondSyncCompletion.getGlobal().getSharedState());
final Set<StreamDescriptor> streamsInSyncCompletionStateAfterThirdSync = stateMessageEmittedAfterThirdSyncCompletion.getGlobal().getStreamStates()
.stream()
.map(AirbyteStreamState::getStreamDescriptor)
.collect(Collectors.toSet());
assertTrue(
streamsInSyncCompletionStateAfterThirdSync.contains(
new StreamDescriptor().withName(MODELS_STREAM_NAME + "_random").withNamespace(randomTableSchema())));
assertTrue(streamsInSyncCompletionStateAfterThirdSync.contains(new StreamDescriptor().withName(MODELS_STREAM_NAME).withNamespace(MODELS_SCHEMA)));
assertNotNull(stateMessageEmittedAfterThirdSyncCompletion.getData());

final Map<String, Set<AirbyteRecordMessage>> recordsStreamWiseFromThirdBatch = extractRecordMessagesStreamWise(dataFromThirdBatch);
assertTrue(recordsStreamWiseFromThirdBatch.containsKey(MODELS_STREAM_NAME));
assertTrue(recordsStreamWiseFromThirdBatch.containsKey(MODELS_STREAM_NAME + "_random"));

final Set<AirbyteRecordMessage> recordsForModelsStreamFromThirdBatch = recordsStreamWiseFromThirdBatch.get(MODELS_STREAM_NAME);
final Set<AirbyteRecordMessage> recordsForModelsRandomStreamFromThirdBatch = recordsStreamWiseFromThirdBatch.get(MODELS_STREAM_NAME + "_random");

assertEquals(20, recordsForModelsStreamFromThirdBatch.size());
assertEquals(20, recordsForModelsRandomStreamFromThirdBatch.size());
assertExpectedRecords(recordsWritten, recordsForModelsStreamFromThirdBatch);
assertExpectedRecords(recordsWrittenInRandomTable, recordsForModelsRandomStreamFromThirdBatch,
recordsForModelsRandomStreamFromThirdBatch.stream().map(AirbyteRecordMessage::getStream).collect(
Collectors.toSet()),
Sets
.newHashSet(MODELS_STREAM_NAME + "_random"),
randomTableSchema());
}

protected AirbyteCatalog expectedCatalogForDiscover() {
final AirbyteCatalog expectedCatalog = Jsons.clone(CATALOG);

Expand All @@ -608,7 +788,7 @@ protected AirbyteCatalog expectedCatalogForDiscover() {

final AirbyteStream randomStream = CatalogHelpers.createAirbyteStream(
MODELS_STREAM_NAME + "_random",
MODELS_SCHEMA + "_random",
randomTableSchema(),
Field.of(COL_ID + "_random", JsonSchemaType.INTEGER),
Field.of(COL_MAKE_ID + "_random", JsonSchemaType.INTEGER),
Field.of(COL_MODEL + "_random", JsonSchemaType.STRING))
Expand All @@ -623,24 +803,29 @@ protected AirbyteCatalog expectedCatalogForDiscover() {
return expectedCatalog;
}

/**
* The schema of a random table which is used as a new table in snapshot test
*/
protected abstract String randomTableSchema();
edgao marked this conversation as resolved.
Show resolved Hide resolved

protected abstract CdcTargetPosition cdcLatestTargetPosition();

protected abstract CdcTargetPosition extractPosition(JsonNode record);
protected abstract CdcTargetPosition extractPosition(final JsonNode record);

protected abstract void assertNullCdcMetaData(JsonNode data);
protected abstract void assertNullCdcMetaData(final JsonNode data);

protected abstract void assertCdcMetaData(JsonNode data, boolean deletedAtNull);
protected abstract void assertCdcMetaData(final JsonNode data, final boolean deletedAtNull);

protected abstract void removeCDCColumns(ObjectNode data);
protected abstract void removeCDCColumns(final ObjectNode data);

protected abstract void addCdcMetadataColumns(AirbyteStream stream);
protected abstract void addCdcMetadataColumns(final AirbyteStream stream);

protected abstract Source getSource();

protected abstract JsonNode getConfig();

protected abstract Database getDatabase();

protected abstract void assertExpectedStateMessages(List<AirbyteStateMessage> stateMessages);
protected abstract void assertExpectedStateMessages(final List<AirbyteStateMessage> stateMessages);

}
Loading