From a9d3ced41942b0e1ae9213191d87fcbc7b4f9d7d Mon Sep 17 00:00:00 2001 From: subodh Date: Wed, 21 Sep 2022 16:29:05 +0530 Subject: [PATCH 1/9] mysql source : implement support for snapshot of new tables in cdc mode --- .../debezium/AirbyteDebeziumHandler.java | 39 +++-- .../integrations/debezium/CdcSourceTest.java | 141 ++++++++++++++++-- .../source/jdbc/AbstractJdbcSource.java | 24 +++ .../source/mysql/MySqlCdcProperties.java | 59 +++++--- .../source/mysql/MySqlCdcStateHandler.java | 8 +- .../source/mysql/MySqlSource.java | 47 ++++-- .../source/mysql/CdcMysqlSourceTest.java | 52 ++----- .../postgres/PostgresCdcProperties.java | 31 ++-- .../source/postgres/PostgresSource.java | 25 +--- .../postgres/CdcPostgresSourceTest.java | 5 + 10 files changed, 292 insertions(+), 139 deletions(-) diff --git a/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/AirbyteDebeziumHandler.java b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/AirbyteDebeziumHandler.java index 3fae01472f93c..36400e876c166 100644 --- a/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/AirbyteDebeziumHandler.java +++ b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/AirbyteDebeziumHandler.java @@ -58,32 +58,29 @@ public AirbyteDebeziumHandler(final JsonNode config, this.firstRecordWaitTime = firstRecordWaitTime; } - public AutoCloseableIterator getSnapshotIterators(final ConfiguredAirbyteCatalog catalog, + public AutoCloseableIterator getSnapshotIterators( + final ConfiguredAirbyteCatalog catalogContainingStreamsToSnapshot, final CdcMetadataInjector cdcMetadataInjector, - final Properties connectorProperties, + final Properties snapshotProperties, final CdcStateHandler cdcStateHandler, final Instant emittedAt) { - LOGGER.info("Running snapshot for " + catalog.getStreams().size() + " new tables"); + + LOGGER.info("Running snapshot for " + catalogContainingStreamsToSnapshot.getStreams().size() + " new tables"); final LinkedBlockingQueue> queue = new LinkedBlockingQueue<>(QUEUE_CAPACITY); final AirbyteFileOffsetBackingStore offsetManager = AirbyteFileOffsetBackingStore.initializeDummyStateForSnapshotPurpose(); - /* - * TODO(Subodh) : Since Postgres doesn't require schema history this is fine but we need to fix this - * for MySQL and MSSQL - */ - final Optional schemaHistoryManager = Optional.empty(); - final DebeziumRecordPublisher publisher = new DebeziumRecordPublisher(connectorProperties, + final DebeziumRecordPublisher tableSnapshotPublisher = new DebeziumRecordPublisher(snapshotProperties, config, - catalog, + catalogContainingStreamsToSnapshot, offsetManager, - schemaHistoryManager); - publisher.start(queue); + schemaHistoryManager(new EmptySavedInfo())); + tableSnapshotPublisher.start(queue); final AutoCloseableIterator> eventIterator = new DebeziumRecordIterator( queue, targetPosition, - publisher::hasClosed, - publisher::close, + tableSnapshotPublisher::hasClosed, + tableSnapshotPublisher::close, firstRecordWaitTime); return AutoCloseableIterators.concatWithEagerClose(AutoCloseableIterators @@ -155,4 +152,18 @@ public static boolean shouldUseCDC(final ConfiguredAirbyteCatalog catalog) { .anyMatch(syncMode -> syncMode == SyncMode.INCREMENTAL); } + private static class EmptySavedInfo implements CdcSavedInfoFetcher { + + @Override + public JsonNode getSavedOffset() { + return null; + } + + @Override + public Optional getSavedSchemaHistory() { + return Optional.empty(); + } + + } + } diff --git a/airbyte-integrations/bases/debezium-v1-9-2/src/testFixtures/java/io/airbyte/integrations/debezium/CdcSourceTest.java b/airbyte-integrations/bases/debezium-v1-9-2/src/testFixtures/java/io/airbyte/integrations/debezium/CdcSourceTest.java index f6890d72bfc88..688a117f95886 100644 --- a/airbyte-integrations/bases/debezium-v1-9-2/src/testFixtures/java/io/airbyte/integrations/debezium/CdcSourceTest.java +++ b/airbyte-integrations/bases/debezium-v1-9-2/src/testFixtures/java/io/airbyte/integrations/debezium/CdcSourceTest.java @@ -5,6 +5,7 @@ package io.airbyte.integrations.debezium; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -27,11 +28,13 @@ import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.AirbyteStateMessage; import io.airbyte.protocol.models.AirbyteStream; +import io.airbyte.protocol.models.AirbyteStreamState; import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.protocol.models.Field; import io.airbyte.protocol.models.JsonSchemaType; +import io.airbyte.protocol.models.StreamDescriptor; import io.airbyte.protocol.models.SyncMode; import java.sql.SQLException; import java.util.ArrayList; @@ -177,12 +180,14 @@ private void createAndPopulateActualTable() { * databases not being synced by Airbyte are not causing issues with our debezium logic */ private void createAndPopulateRandomTable() { - createSchema(MODELS_SCHEMA + "_random"); - createTable(MODELS_SCHEMA + "_random", MODELS_STREAM_NAME + "_random", + if (!randomTableSchema().equals(MODELS_SCHEMA)) { + createSchema(randomTableSchema()); + } + createTable(randomTableSchema(), MODELS_STREAM_NAME + "_random", columnClause(ImmutableMap.of(COL_ID + "_random", "INTEGER", COL_MAKE_ID + "_random", "INTEGER", COL_MODEL + "_random", "VARCHAR(200)"), Optional.of(COL_ID + "_random"))); for (final JsonNode recordJson : MODEL_RECORDS_RANDOM) { - writeRecords(recordJson, MODELS_SCHEMA + "_random", MODELS_STREAM_NAME + "_random", + writeRecords(recordJson, randomTableSchema(), MODELS_STREAM_NAME + "_random", COL_ID + "_random", COL_MAKE_ID + "_random", COL_MODEL + "_random"); } } @@ -585,6 +590,120 @@ void testDiscover() throws Exception { .collect(Collectors.toList())); } + @Test + public void newTableSnapshotTest() throws Exception { + final AutoCloseableIterator firstBatchIterator = getSource() + .read(getConfig(), CONFIGURED_CATALOG, null); + final List dataFromFirstBatch = AutoCloseableIterators + .toListAndClose(firstBatchIterator); + final Set recordsFromFirstBatch = extractRecordMessages( + dataFromFirstBatch); + final List stateAfterFirstBatch = extractStateMessages(dataFromFirstBatch); + assertEquals(1, stateAfterFirstBatch.size()); + + final AirbyteStateMessage stateMessageEmittedAfterFirstSyncCompletion = stateAfterFirstBatch.get(0); + assertEquals(AirbyteStateMessage.AirbyteStateType.GLOBAL, stateMessageEmittedAfterFirstSyncCompletion.getType()); + assertNotNull(stateMessageEmittedAfterFirstSyncCompletion.getGlobal().getSharedState()); + final Set streamsInStateAfterFirstSyncCompletion = stateMessageEmittedAfterFirstSyncCompletion.getGlobal().getStreamStates() + .stream() + .map(AirbyteStreamState::getStreamDescriptor) + .collect(Collectors.toSet()); + assertEquals(1, streamsInStateAfterFirstSyncCompletion.size()); + assertTrue(streamsInStateAfterFirstSyncCompletion.contains(new StreamDescriptor().withName(MODELS_STREAM_NAME).withNamespace(MODELS_SCHEMA))); + assertNotNull(stateMessageEmittedAfterFirstSyncCompletion.getData()); + + assertEquals((MODEL_RECORDS.size()), recordsFromFirstBatch.size()); + assertExpectedRecords(new HashSet<>(MODEL_RECORDS), recordsFromFirstBatch); + + final JsonNode state = stateAfterFirstBatch.get(0).getData(); + + final ConfiguredAirbyteCatalog newTables = CatalogHelpers + .toDefaultConfiguredCatalog(new AirbyteCatalog().withStreams(List.of( + CatalogHelpers.createAirbyteStream( + MODELS_STREAM_NAME + "_random", + randomTableSchema(), + Field.of(COL_ID + "_random", JsonSchemaType.NUMBER), + Field.of(COL_MAKE_ID + "_random", JsonSchemaType.NUMBER), + Field.of(COL_MODEL + "_random", JsonSchemaType.STRING)) + .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) + .withSourceDefinedPrimaryKey(List.of(List.of(COL_ID + "_random")))))); + + newTables.getStreams().forEach(s -> s.setSyncMode(SyncMode.INCREMENTAL)); + final List combinedStreams = new ArrayList<>(); + combinedStreams.addAll(CONFIGURED_CATALOG.getStreams()); + combinedStreams.addAll(newTables.getStreams()); + + final ConfiguredAirbyteCatalog updatedCatalog = new ConfiguredAirbyteCatalog().withStreams(combinedStreams); + + /* + * Write 20 records to the existing table + */ + final Set recordsWritten = new HashSet<>(); + for (int recordsCreated = 0; recordsCreated < 20; recordsCreated++) { + final JsonNode record = + Jsons.jsonNode(ImmutableMap + .of(COL_ID, 100 + recordsCreated, COL_MAKE_ID, 1, COL_MODEL, + "F-" + recordsCreated)); + recordsWritten.add(record); + writeModelRecord(record); + } + + final AutoCloseableIterator secondBatchIterator = getSource() + .read(getConfig(), updatedCatalog, state); + final List dataFromSecondBatch = AutoCloseableIterators + .toListAndClose(secondBatchIterator); + + final List stateAfterSecondBatch = extractStateMessages(dataFromSecondBatch); + assertEquals(2, stateAfterSecondBatch.size()); + + final AirbyteStateMessage stateMessageEmittedAfterSnapshotCompletionInSecondSync = stateAfterSecondBatch.get(0); + assertEquals(AirbyteStateMessage.AirbyteStateType.GLOBAL, stateMessageEmittedAfterSnapshotCompletionInSecondSync.getType()); + assertEquals(stateMessageEmittedAfterFirstSyncCompletion.getGlobal().getSharedState(), + stateMessageEmittedAfterSnapshotCompletionInSecondSync.getGlobal().getSharedState()); + final Set streamsInSnapshotState = stateMessageEmittedAfterSnapshotCompletionInSecondSync.getGlobal().getStreamStates() + .stream() + .map(AirbyteStreamState::getStreamDescriptor) + .collect(Collectors.toSet()); + assertEquals(2, streamsInSnapshotState.size()); + assertTrue( + streamsInSnapshotState.contains(new StreamDescriptor().withName(MODELS_STREAM_NAME + "_random").withNamespace(randomTableSchema()))); + assertTrue(streamsInSnapshotState.contains(new StreamDescriptor().withName(MODELS_STREAM_NAME).withNamespace(MODELS_SCHEMA))); + assertNotNull(stateMessageEmittedAfterSnapshotCompletionInSecondSync.getData()); + + final AirbyteStateMessage stateMessageEmittedAfterSecondSyncCompletion = stateAfterSecondBatch.get(1); + assertEquals(AirbyteStateMessage.AirbyteStateType.GLOBAL, stateMessageEmittedAfterSecondSyncCompletion.getType()); + assertNotEquals(stateMessageEmittedAfterFirstSyncCompletion.getGlobal().getSharedState(), + stateMessageEmittedAfterSecondSyncCompletion.getGlobal().getSharedState()); + final Set streamsInSyncCompletionState = stateMessageEmittedAfterSecondSyncCompletion.getGlobal().getStreamStates() + .stream() + .map(AirbyteStreamState::getStreamDescriptor) + .collect(Collectors.toSet()); + assertEquals(2, streamsInSnapshotState.size()); + assertTrue( + streamsInSyncCompletionState.contains( + new StreamDescriptor().withName(MODELS_STREAM_NAME + "_random").withNamespace(randomTableSchema()))); + assertTrue(streamsInSyncCompletionState.contains(new StreamDescriptor().withName(MODELS_STREAM_NAME).withNamespace(MODELS_SCHEMA))); + assertNotNull(stateMessageEmittedAfterSecondSyncCompletion.getData()); + + final Map> recordsStreamWise = extractRecordMessagesStreamWise(dataFromSecondBatch); + assertTrue(recordsStreamWise.containsKey(MODELS_STREAM_NAME)); + assertTrue(recordsStreamWise.containsKey(MODELS_STREAM_NAME + "_random")); + + final Set recordsForModelsStreamFromSecondBatch = recordsStreamWise.get(MODELS_STREAM_NAME); + final Set recordsForModelsRandomStreamFromSecondBatch = recordsStreamWise.get(MODELS_STREAM_NAME + "_random"); + + assertEquals((MODEL_RECORDS_RANDOM.size()), recordsForModelsRandomStreamFromSecondBatch.size()); + assertEquals(20, recordsForModelsStreamFromSecondBatch.size()); + assertExpectedRecords(new HashSet<>(MODEL_RECORDS_RANDOM), recordsForModelsRandomStreamFromSecondBatch, + recordsForModelsRandomStreamFromSecondBatch.stream().map(AirbyteRecordMessage::getStream).collect( + Collectors.toSet()), + Sets + .newHashSet(MODELS_STREAM_NAME + "_random"), + randomTableSchema()); + assertExpectedRecords(recordsWritten, recordsForModelsStreamFromSecondBatch); + + } + protected AirbyteCatalog expectedCatalogForDiscover() { final AirbyteCatalog expectedCatalog = Jsons.clone(CATALOG); @@ -608,7 +727,7 @@ protected AirbyteCatalog expectedCatalogForDiscover() { final AirbyteStream randomStream = CatalogHelpers.createAirbyteStream( MODELS_STREAM_NAME + "_random", - MODELS_SCHEMA + "_random", + randomTableSchema(), Field.of(COL_ID + "_random", JsonSchemaType.INTEGER), Field.of(COL_MAKE_ID + "_random", JsonSchemaType.INTEGER), Field.of(COL_MODEL + "_random", JsonSchemaType.STRING)) @@ -623,17 +742,19 @@ protected AirbyteCatalog expectedCatalogForDiscover() { return expectedCatalog; } + protected abstract String randomTableSchema(); + protected abstract CdcTargetPosition cdcLatestTargetPosition(); - protected abstract CdcTargetPosition extractPosition(JsonNode record); + protected abstract CdcTargetPosition extractPosition(final JsonNode record); - protected abstract void assertNullCdcMetaData(JsonNode data); + protected abstract void assertNullCdcMetaData(final JsonNode data); - protected abstract void assertCdcMetaData(JsonNode data, boolean deletedAtNull); + protected abstract void assertCdcMetaData(final JsonNode data, final boolean deletedAtNull); - protected abstract void removeCDCColumns(ObjectNode data); + protected abstract void removeCDCColumns(final ObjectNode data); - protected abstract void addCdcMetadataColumns(AirbyteStream stream); + protected abstract void addCdcMetadataColumns(final AirbyteStream stream); protected abstract Source getSource(); @@ -641,6 +762,6 @@ protected AirbyteCatalog expectedCatalogForDiscover() { protected abstract Database getDatabase(); - protected abstract void assertExpectedStateMessages(List stateMessages); + protected abstract void assertExpectedStateMessages(final List stateMessages); } diff --git a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java index 007abf97f9200..bd172b3f464a2 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java +++ b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; import io.airbyte.commons.functional.CheckedConsumer; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.map.MoreMaps; @@ -37,11 +38,15 @@ import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.db.jdbc.StreamingJdbcDatabase; import io.airbyte.db.jdbc.streaming.JdbcStreamingQueryConfig; +import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.base.Source; import io.airbyte.integrations.source.jdbc.dto.JdbcPrivilegeDto; import io.airbyte.integrations.source.relationaldb.AbstractRelationalDbSource; import io.airbyte.integrations.source.relationaldb.TableInfo; +import io.airbyte.integrations.source.relationaldb.state.StateManager; import io.airbyte.protocol.models.CommonField; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.protocol.models.JsonSchemaType; import java.net.MalformedURLException; import java.net.URI; @@ -522,4 +527,23 @@ protected String toSslJdbcParam(final SslMode sslMode) { // Default implementation return sslMode.name(); } + + + + protected List identifyStreamsToSnapshot(final ConfiguredAirbyteCatalog catalog, final StateManager stateManager) { + final Set alreadySyncedStreams = stateManager.getCdcStateManager().getInitialStreamsSynced(); + if (alreadySyncedStreams.isEmpty() && (stateManager.getCdcStateManager().getCdcState() == null + || stateManager.getCdcStateManager().getCdcState().getState() == null)) { + return Collections.emptyList(); + } + + final Set allStreams = AirbyteStreamNameNamespacePair.fromConfiguredCatalog(catalog); + + final Set newlyAddedStreams = new HashSet<>(Sets.difference(allStreams, alreadySyncedStreams)); + + return catalog.getStreams().stream() + .filter(stream -> newlyAddedStreams.contains(AirbyteStreamNameNamespacePair.fromAirbyteSteam(stream.getStream()))) + .map(Jsons::clone) + .collect(Collectors.toList()); + } } diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcProperties.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcProperties.java index 7b7cc60a59de4..edb7b839ccbf4 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcProperties.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcProperties.java @@ -17,14 +17,25 @@ import java.net.URI; import java.nio.file.Path; import java.util.Properties; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class MySqlCdcProperties { - final static private Logger LOGGER = LoggerFactory.getLogger(MySqlCdcProperties.class); - static Properties getDebeziumProperties(final JdbcDatabase database) { + final JsonNode sourceConfig = database.getSourceConfig(); + final Properties props = commonProperties(database); + // snapshot config + if (sourceConfig.has("snapshot_mode")) { + // The parameter `snapshot_mode` is passed in test to simulate reading the binlog directly and skip + // initial snapshot + props.setProperty("snapshot.mode", sourceConfig.get("snapshot_mode").asText()); + } else { + // https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-snapshot-mode + props.setProperty("snapshot.mode", "when_needed"); + } + return props; + } + + private static Properties commonProperties(final JdbcDatabase database) { final Properties props = new Properties(); final JsonNode sourceConfig = database.getSourceConfig(); final JsonNode dbConfig = database.getDatabaseConfig(); @@ -41,26 +52,6 @@ static Properties getDebeziumProperties(final JdbcDatabase database) { props.setProperty("boolean.type", "io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter"); props.setProperty("datetime.type", "io.airbyte.integrations.debezium.internals.MySQLDateTimeConverter"); - // snapshot config - if (sourceConfig.has("snapshot_mode")) { - // The parameter `snapshot_mode` is passed in test to simulate reading the binlog directly and skip - // initial snapshot - props.setProperty("snapshot.mode", sourceConfig.get("snapshot_mode").asText()); - } else { - // https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-snapshot-mode - props.setProperty("snapshot.mode", "when_needed"); - } - // https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-snapshot-locking-mode - // This is to make sure other database clients are allowed to write to a table while Airbyte is - // taking a snapshot. There is a risk involved that - // if any database client makes a schema change then the sync might break - props.setProperty("snapshot.locking.mode", "none"); - // https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-include-schema-changes - props.setProperty("include.schema.changes", "false"); - // This to make sure that binary data represented as a base64-encoded String. - // https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-binary-handling-mode - props.setProperty("binary.handling.mode", "base64"); - props.setProperty("database.include.list", sourceConfig.get("database").asText()); // Check params for SSL connection in config and add properties for CDC SSL connection // https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-database-ssl-mode if (!sourceConfig.has(JdbcUtils.SSL_KEY) || sourceConfig.get(JdbcUtils.SSL_KEY).asBoolean()) { @@ -106,6 +97,26 @@ static Properties getDebeziumProperties(final JdbcDatabase database) { props.setProperty("database.ssl.mode", "required"); } } + + // https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-snapshot-locking-mode + // This is to make sure other database clients are allowed to write to a table while Airbyte is + // taking a snapshot. There is a risk involved that + // if any database client makes a schema change then the sync might break + props.setProperty("snapshot.locking.mode", "none"); + // https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-include-schema-changes + props.setProperty("include.schema.changes", "false"); + // This to make sure that binary data represented as a base64-encoded String. + // https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-binary-handling-mode + props.setProperty("binary.handling.mode", "base64"); + props.setProperty("database.include.list", sourceConfig.get("database").asText()); + + return props; + + } + + static Properties getSnapshotProperties(final JdbcDatabase database) { + final Properties props = commonProperties(database); + props.setProperty("snapshot.mode", "initial_only"); return props; } diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcStateHandler.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcStateHandler.java index 2688ac7fc7dc3..befefde6b7e4e 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcStateHandler.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcStateHandler.java @@ -53,7 +53,13 @@ public AirbyteMessage saveState(final Map offset, final String d @Override public AirbyteMessage saveStateAfterCompletionOfSnapshotOfNewStreams() { - throw new RuntimeException("Snapshot of individual tables currently not supported in MySQL"); + LOGGER.info("Snapshot of new tables is complete, saving state"); + /* + * Namespace pair is ignored by global state manager, but is needed for satisfy the API contract. + * Therefore, provide an empty optional. + */ + final AirbyteStateMessage stateMessage = stateManager.emit(Optional.empty()); + return new AirbyteMessage().withType(Type.STATE).withState(stateMessage); } } diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java index 34679cb8bc892..b84186a26478d 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java @@ -19,6 +19,7 @@ import io.airbyte.commons.functional.CheckedConsumer; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.util.AutoCloseableIterator; +import io.airbyte.commons.util.AutoCloseableIterators; import io.airbyte.db.factory.DatabaseDriver; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.db.jdbc.JdbcUtils; @@ -31,7 +32,6 @@ import io.airbyte.integrations.source.mysql.helpers.CdcConfigurationHelper; import io.airbyte.integrations.source.relationaldb.TableInfo; import io.airbyte.integrations.source.relationaldb.models.CdcState; -import io.airbyte.integrations.source.relationaldb.models.DbState; import io.airbyte.integrations.source.relationaldb.state.StateManager; import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.AirbyteGlobalState; @@ -42,15 +42,18 @@ import io.airbyte.protocol.models.AirbyteStreamState; import io.airbyte.protocol.models.CommonField; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.protocol.models.SyncMode; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.function.Supplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -195,20 +198,20 @@ private static boolean isCdc(final JsonNode config) { @Override protected AirbyteStateType getSupportedStateType(final JsonNode config) { - if (!featureFlags.useStreamCapableState()) { - return AirbyteStateType.LEGACY; - } +// if (!featureFlags.useStreamCapableState()) { +// return AirbyteStateType.LEGACY; +// } return isCdc(config) ? AirbyteStateType.GLOBAL : AirbyteStateType.STREAM; } @Override protected List generateEmptyInitialState(final JsonNode config) { - if (!featureFlags.useStreamCapableState()) { - return List.of(new AirbyteStateMessage() - .withType(AirbyteStateType.LEGACY) - .withData(Jsons.jsonNode(new DbState()))); - } +// if (!featureFlags.useStreamCapableState()) { +// return List.of(new AirbyteStateMessage() +// .withType(AirbyteStateType.LEGACY) +// .withData(Jsons.jsonNode(new DbState()))); +// } if (getSupportedStateType(config) == AirbyteStateType.GLOBAL) { final AirbyteGlobalState globalState = new AirbyteGlobalState() @@ -233,14 +236,32 @@ public List> getIncrementalIterators(final final AirbyteDebeziumHandler handler = new AirbyteDebeziumHandler(sourceConfig, MySqlCdcTargetPosition.targetPosition(database), true, Duration.ofMinutes(5)); + final MySqlCdcStateHandler mySqlCdcStateHandler = new MySqlCdcStateHandler(stateManager); + final MySqlCdcConnectorMetadataInjector mySqlCdcConnectorMetadataInjector = new MySqlCdcConnectorMetadataInjector(); + + final List streamsToSnapshot = identifyStreamsToSnapshot(catalog, stateManager); final Optional cdcState = Optional.ofNullable(stateManager.getCdcStateManager().getCdcState()); - final MySqlCdcSavedInfoFetcher fetcher = new MySqlCdcSavedInfoFetcher(cdcState.orElse(null)); - return Collections.singletonList(handler.getIncrementalIterators(catalog, - fetcher, + + final Supplier> incrementalIteratorSupplier = () -> handler.getIncrementalIterators(catalog, + new MySqlCdcSavedInfoFetcher(cdcState.orElse(null)), new MySqlCdcStateHandler(stateManager), new MySqlCdcConnectorMetadataInjector(), MySqlCdcProperties.getDebeziumProperties(database), - emittedAt)); + emittedAt); + + if (streamsToSnapshot.isEmpty()) { + return Collections.singletonList(incrementalIteratorSupplier.get()); + } + + final AutoCloseableIterator snapshotIterator = handler.getSnapshotIterators( + new ConfiguredAirbyteCatalog().withStreams(streamsToSnapshot), + mySqlCdcConnectorMetadataInjector, + MySqlCdcProperties.getSnapshotProperties(database), + mySqlCdcStateHandler, + emittedAt); + + return Collections.singletonList( + AutoCloseableIterators.concatWithEagerClose(snapshotIterator, AutoCloseableIterators.lazyIterator(incrementalIteratorSupplier))); } else { LOGGER.info("using CDC: {}", false); return super.getIncrementalIterators(database, catalog, tableNameToTable, stateManager, diff --git a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java index b4a9513bbd2b8..15e44b1f78206 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java @@ -33,19 +33,13 @@ import io.airbyte.integrations.base.Source; import io.airbyte.integrations.debezium.CdcSourceTest; import io.airbyte.integrations.debezium.CdcTargetPosition; -import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.AirbyteStateMessage; import io.airbyte.protocol.models.AirbyteStream; -import io.airbyte.protocol.models.CatalogHelpers; -import io.airbyte.protocol.models.Field; -import io.airbyte.protocol.models.JsonSchemaType; -import io.airbyte.protocol.models.SyncMode; import java.sql.SQLException; import java.util.Collections; import java.util.List; -import java.util.Optional; import java.util.Set; import javax.sql.DataSource; import org.jooq.SQLDialect; @@ -64,7 +58,7 @@ public class CdcMysqlSourceTest extends CdcSourceTest { @BeforeEach public void setup() throws SQLException { - setEnv(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true"); +// setEnv(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true"); init(); revokeAllPermissions(); grantCorrectPermissions(); @@ -131,12 +125,12 @@ protected CdcTargetPosition cdcLatestTargetPosition() { } @Override - protected CdcTargetPosition extractPosition(JsonNode record) { + protected CdcTargetPosition extractPosition(final JsonNode record) { return new MySqlCdcTargetPosition(record.get(CDC_LOG_FILE).asText(), record.get(CDC_LOG_POS).asInt()); } @Override - protected void assertNullCdcMetaData(JsonNode data) { + protected void assertNullCdcMetaData(final JsonNode data) { assertNull(data.get(CDC_LOG_FILE)); assertNull(data.get(CDC_LOG_POS)); assertNull(data.get(CDC_UPDATED_AT)); @@ -144,7 +138,7 @@ protected void assertNullCdcMetaData(JsonNode data) { } @Override - protected void assertCdcMetaData(JsonNode data, boolean deletedAtNull) { + protected void assertCdcMetaData(final JsonNode data, final boolean deletedAtNull) { assertNotNull(data.get(CDC_LOG_FILE)); assertNotNull(data.get(CDC_LOG_POS)); assertNotNull(data.get(CDC_UPDATED_AT)); @@ -156,7 +150,7 @@ protected void assertCdcMetaData(JsonNode data, boolean deletedAtNull) { } @Override - protected void removeCDCColumns(ObjectNode data) { + protected void removeCDCColumns(final ObjectNode data) { data.remove(CDC_LOG_FILE); data.remove(CDC_LOG_POS); data.remove(CDC_UPDATED_AT); @@ -164,9 +158,9 @@ protected void removeCDCColumns(ObjectNode data) { } @Override - protected void addCdcMetadataColumns(AirbyteStream stream) { - ObjectNode jsonSchema = (ObjectNode) stream.getJsonSchema(); - ObjectNode properties = (ObjectNode) jsonSchema.get("properties"); + protected void addCdcMetadataColumns(final AirbyteStream stream) { + final ObjectNode jsonSchema = (ObjectNode) stream.getJsonSchema(); + final ObjectNode properties = (ObjectNode) jsonSchema.get("properties"); final JsonNode numberType = Jsons.jsonNode(ImmutableMap.of("type", "number")); @@ -193,38 +187,16 @@ protected Database getDatabase() { } @Override - public void assertExpectedStateMessages(List stateMessages) { - for (AirbyteStateMessage stateMessage : stateMessages) { + public void assertExpectedStateMessages(final List stateMessages) { + for (final AirbyteStateMessage stateMessage : stateMessages) { assertNotNull(stateMessage.getData().get("cdc_state").get("state").get(MYSQL_CDC_OFFSET)); assertNotNull(stateMessage.getData().get("cdc_state").get("state").get(MYSQL_DB_HISTORY)); } } @Override - protected AirbyteCatalog expectedCatalogForDiscover() { - final AirbyteCatalog expectedCatalog = Jsons.clone(CATALOG); - - createTable(MODELS_SCHEMA, MODELS_STREAM_NAME + "_2", - columnClause(ImmutableMap.of(COL_ID, "INTEGER", COL_MAKE_ID, "INTEGER", COL_MODEL, "VARCHAR(200)"), Optional.empty())); - - List streams = expectedCatalog.getStreams(); - // stream with PK - streams.get(0).setSourceDefinedCursor(true); - addCdcMetadataColumns(streams.get(0)); - - AirbyteStream streamWithoutPK = CatalogHelpers.createAirbyteStream( - MODELS_STREAM_NAME + "_2", - MODELS_SCHEMA, - Field.of(COL_ID, JsonSchemaType.INTEGER), - Field.of(COL_MAKE_ID, JsonSchemaType.INTEGER), - Field.of(COL_MODEL, JsonSchemaType.STRING)); - streamWithoutPK.setSourceDefinedPrimaryKey(Collections.emptyList()); - streamWithoutPK.setSupportedSyncModes(List.of(SyncMode.FULL_REFRESH)); - addCdcMetadataColumns(streamWithoutPK); - - streams.add(streamWithoutPK); - expectedCatalog.withStreams(streams); - return expectedCatalog; + protected String randomTableSchema() { + return MODELS_SCHEMA; } @Test diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcProperties.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcProperties.java index 7008f5ee24ff9..19cf76edb417a 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcProperties.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcProperties.java @@ -26,8 +26,7 @@ public class PostgresCdcProperties { private static final Logger LOGGER = LoggerFactory.getLogger(PostgresCdcProperties.class); static Properties getDebeziumDefaultProperties(final JdbcDatabase database) { final JsonNode sourceConfig = database.getSourceConfig(); - final JsonNode dbConfig = database.getDatabaseConfig(); - final Properties props = commonProperties(); + final Properties props = commonProperties(database); props.setProperty("plugin.name", PostgresUtils.getPluginValue(sourceConfig.get("replication_method"))); if (sourceConfig.has("snapshot_mode")) { // The parameter `snapshot_mode` is passed in test to simulate reading the WAL Logs directly and @@ -42,6 +41,20 @@ static Properties getDebeziumDefaultProperties(final JdbcDatabase database) { props.setProperty("publication.autocreate.mode", "disabled"); + return props; + } + + private static Properties commonProperties(final JdbcDatabase database) { + final JsonNode dbConfig = database.getDatabaseConfig(); + final JsonNode sourceConfig = database.getSourceConfig(); + + final Properties props = new Properties(); + props.setProperty("connector.class", "io.debezium.connector.postgresql.PostgresConnector"); + + props.setProperty("converters", "datetime"); + props.setProperty("datetime.type", PostgresConverter.class.getName()); + props.setProperty("include.unknown.datatypes", "true"); + // Check params for SSL connection in config and add properties for CDC SSL connection // https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-property-database-sslmode if (!sourceConfig.has(JdbcUtils.SSL_KEY) || sourceConfig.get(JdbcUtils.SSL_KEY).asBoolean()) { @@ -93,18 +106,8 @@ static Properties getDebeziumDefaultProperties(final JdbcDatabase database) { return props; } - private static Properties commonProperties() { - final Properties props = new Properties(); - props.setProperty("connector.class", "io.debezium.connector.postgresql.PostgresConnector"); - - props.setProperty("converters", "datetime"); - props.setProperty("datetime.type", PostgresConverter.class.getName()); - props.setProperty("include.unknown.datatypes", "true"); - return props; - } - - static Properties getSnapshotProperties() { - final Properties props = commonProperties(); + static Properties getSnapshotProperties(final JdbcDatabase database) { + final Properties props = commonProperties(database); props.setProperty("snapshot.mode", "initial_only"); return props; } diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java index 0bb435afa0274..5ff208e90c963 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java @@ -16,8 +16,6 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableMap.Builder; -import com.google.common.collect.Sets; import io.airbyte.commons.features.EnvVariableFeatureFlags; import io.airbyte.commons.features.FeatureFlags; import io.airbyte.commons.functional.CheckedConsumer; @@ -56,7 +54,6 @@ import io.airbyte.protocol.models.ConfiguredAirbyteStream; import java.net.URI; import java.net.URISyntaxException; -import java.net.URL; import java.nio.file.Path; import java.sql.Connection; import java.sql.JDBCType; @@ -66,14 +63,13 @@ import java.time.Instant; import java.util.ArrayList; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.function.Supplier; import java.util.stream.Collectors; -import org.postgresql.jdbc.SslMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -365,7 +361,7 @@ public List> getIncrementalIterators(final final AutoCloseableIterator snapshotIterator = handler.getSnapshotIterators( new ConfiguredAirbyteCatalog().withStreams(streamsToSnapshot), new PostgresCdcConnectorMetadataInjector(), - PostgresCdcProperties.getSnapshotProperties(), postgresCdcStateHandler, emittedAt); + PostgresCdcProperties.getSnapshotProperties(database), postgresCdcStateHandler, emittedAt); return Collections.singletonList( AutoCloseableIterators.concatWithEagerClose(snapshotIterator, AutoCloseableIterators.lazyIterator(incrementalIteratorSupplier))); @@ -374,23 +370,6 @@ public List> getIncrementalIterators(final } } - protected List identifyStreamsToSnapshot(final ConfiguredAirbyteCatalog catalog, final StateManager stateManager) { - final Set alreadySyncedStreams = stateManager.getCdcStateManager().getInitialStreamsSynced(); - if (alreadySyncedStreams.isEmpty() && (stateManager.getCdcStateManager().getCdcState() == null - || stateManager.getCdcStateManager().getCdcState().getState() == null)) { - return Collections.emptyList(); - } - - final Set allStreams = AirbyteStreamNameNamespacePair.fromConfiguredCatalog(catalog); - - final Set newlyAddedStreams = new HashSet<>(Sets.difference(allStreams, alreadySyncedStreams)); - - return catalog.getStreams().stream() - .filter(stream -> newlyAddedStreams.contains(AirbyteStreamNameNamespacePair.fromAirbyteSteam(stream.getStream()))) - .map(Jsons::clone) - .collect(Collectors.toList()); - } - @Override public Set getPrivilegesTableForCurrentUser(final JdbcDatabase database, final String schema) diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java index f463f29d7c507..768c30355638a 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java @@ -344,6 +344,11 @@ public void testRecordsProducedDuringAndAfterSync() throws Exception { .size()); } + @Override + protected String randomTableSchema() { + return MODELS_SCHEMA + "_random"; + } + @Test public void testTableWithTimestampColDefault() throws Exception { createAndPopulateTimestampTable(); From b1c324a026f91a9a427e667ea206c56bb508b9df Mon Sep 17 00:00:00 2001 From: subodh Date: Thu, 22 Sep 2022 17:59:24 +0530 Subject: [PATCH 2/9] undo unwanted changes --- .../integrations/source/mysql/MySqlSource.java | 17 +++++++++-------- .../source/mysql/CdcMysqlSourceTest.java | 2 +- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java index b84186a26478d..15ca65d9188bb 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java @@ -32,6 +32,7 @@ import io.airbyte.integrations.source.mysql.helpers.CdcConfigurationHelper; import io.airbyte.integrations.source.relationaldb.TableInfo; import io.airbyte.integrations.source.relationaldb.models.CdcState; +import io.airbyte.integrations.source.relationaldb.models.DbState; import io.airbyte.integrations.source.relationaldb.state.StateManager; import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.AirbyteGlobalState; @@ -198,20 +199,20 @@ private static boolean isCdc(final JsonNode config) { @Override protected AirbyteStateType getSupportedStateType(final JsonNode config) { -// if (!featureFlags.useStreamCapableState()) { -// return AirbyteStateType.LEGACY; -// } + if (!featureFlags.useStreamCapableState()) { + return AirbyteStateType.LEGACY; + } return isCdc(config) ? AirbyteStateType.GLOBAL : AirbyteStateType.STREAM; } @Override protected List generateEmptyInitialState(final JsonNode config) { -// if (!featureFlags.useStreamCapableState()) { -// return List.of(new AirbyteStateMessage() -// .withType(AirbyteStateType.LEGACY) -// .withData(Jsons.jsonNode(new DbState()))); -// } + if (!featureFlags.useStreamCapableState()) { + return List.of(new AirbyteStateMessage() + .withType(AirbyteStateType.LEGACY) + .withData(Jsons.jsonNode(new DbState()))); + } if (getSupportedStateType(config) == AirbyteStateType.GLOBAL) { final AirbyteGlobalState globalState = new AirbyteGlobalState() diff --git a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java index 15e44b1f78206..ebf25dc63304d 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java @@ -58,7 +58,7 @@ public class CdcMysqlSourceTest extends CdcSourceTest { @BeforeEach public void setup() throws SQLException { -// setEnv(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true"); + setEnv(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true"); init(); revokeAllPermissions(); grantCorrectPermissions(); From e0b14c419835e53550982d30c1fd647e48b1504c Mon Sep 17 00:00:00 2001 From: subodh Date: Thu, 22 Sep 2022 19:00:48 +0530 Subject: [PATCH 3/9] add more assertions --- .../integrations/debezium/CdcSourceTest.java | 65 +++++++++- .../postgres/CdcPostgresSourceTest.java | 115 ------------------ 2 files changed, 64 insertions(+), 116 deletions(-) diff --git a/airbyte-integrations/bases/debezium-v1-9-2/src/testFixtures/java/io/airbyte/integrations/debezium/CdcSourceTest.java b/airbyte-integrations/bases/debezium-v1-9-2/src/testFixtures/java/io/airbyte/integrations/debezium/CdcSourceTest.java index 688a117f95886..fc55657ad6bcb 100644 --- a/airbyte-integrations/bases/debezium-v1-9-2/src/testFixtures/java/io/airbyte/integrations/debezium/CdcSourceTest.java +++ b/airbyte-integrations/bases/debezium-v1-9-2/src/testFixtures/java/io/airbyte/integrations/debezium/CdcSourceTest.java @@ -702,6 +702,67 @@ public void newTableSnapshotTest() throws Exception { randomTableSchema()); assertExpectedRecords(recordsWritten, recordsForModelsStreamFromSecondBatch); + /* + * Write 20 records to both the tables + */ + final Set recordsWrittenInRandomTable = new HashSet<>(); + recordsWritten.clear(); + for (int recordsCreated = 30; recordsCreated < 50; recordsCreated++) { + final JsonNode record = + Jsons.jsonNode(ImmutableMap + .of(COL_ID, 100 + recordsCreated, COL_MAKE_ID, 1, COL_MODEL, + "F-" + recordsCreated)); + writeModelRecord(record); + recordsWritten.add(record); + + final JsonNode record2 = Jsons + .jsonNode(ImmutableMap + .of(COL_ID + "_random", 11000 + recordsCreated, COL_MAKE_ID + "_random", 1 + recordsCreated, COL_MODEL + "_random", + "Fiesta-random" + recordsCreated)); + writeRecords(record2, randomTableSchema(), MODELS_STREAM_NAME + "_random", + COL_ID + "_random", COL_MAKE_ID + "_random", COL_MODEL + "_random"); + recordsWrittenInRandomTable.add(record2); + } + + final JsonNode state2 = stateAfterSecondBatch.get(1).getData(); + final AutoCloseableIterator thirdBatchIterator = getSource() + .read(getConfig(), updatedCatalog, state2); + final List dataFromThirdBatch = AutoCloseableIterators + .toListAndClose(thirdBatchIterator); + + final List stateAfterThirdBatch = extractStateMessages(dataFromThirdBatch); + assertEquals(1, stateAfterThirdBatch.size()); + + final AirbyteStateMessage stateMessageEmittedAfterThirdSyncCompletion = stateAfterThirdBatch.get(0); + assertEquals(AirbyteStateMessage.AirbyteStateType.GLOBAL, stateMessageEmittedAfterThirdSyncCompletion.getType()); + assertNotEquals(stateMessageEmittedAfterThirdSyncCompletion.getGlobal().getSharedState(), + stateMessageEmittedAfterSecondSyncCompletion.getGlobal().getSharedState()); + final Set streamsInSyncCompletionStateAfterThirdSync = stateMessageEmittedAfterThirdSyncCompletion.getGlobal().getStreamStates() + .stream() + .map(AirbyteStreamState::getStreamDescriptor) + .collect(Collectors.toSet()); + assertTrue( + streamsInSyncCompletionStateAfterThirdSync.contains( + new StreamDescriptor().withName(MODELS_STREAM_NAME + "_random").withNamespace(randomTableSchema()))); + assertTrue(streamsInSyncCompletionStateAfterThirdSync.contains(new StreamDescriptor().withName(MODELS_STREAM_NAME).withNamespace(MODELS_SCHEMA))); + assertNotNull(stateMessageEmittedAfterThirdSyncCompletion.getData()); + + final Map> recordsStreamWiseFromThirdBatch = extractRecordMessagesStreamWise(dataFromThirdBatch); + assertTrue(recordsStreamWiseFromThirdBatch.containsKey(MODELS_STREAM_NAME)); + assertTrue(recordsStreamWiseFromThirdBatch.containsKey(MODELS_STREAM_NAME + "_random")); + + final Set recordsForModelsStreamFromThirdBatch = recordsStreamWiseFromThirdBatch.get(MODELS_STREAM_NAME); + final Set recordsForModelsRandomStreamFromThirdBatch = recordsStreamWiseFromThirdBatch.get(MODELS_STREAM_NAME + "_random"); + + assertEquals(20, recordsForModelsStreamFromThirdBatch.size()); + assertEquals(20, recordsForModelsRandomStreamFromThirdBatch.size()); + assertExpectedRecords(recordsWritten, recordsForModelsStreamFromThirdBatch); + assertExpectedRecords(recordsWrittenInRandomTable, recordsForModelsRandomStreamFromThirdBatch, + recordsForModelsRandomStreamFromThirdBatch.stream().map(AirbyteRecordMessage::getStream).collect( + Collectors.toSet()), + Sets + .newHashSet(MODELS_STREAM_NAME + "_random"), + randomTableSchema()); } protected AirbyteCatalog expectedCatalogForDiscover() { @@ -741,7 +802,9 @@ protected AirbyteCatalog expectedCatalogForDiscover() { expectedCatalog.withStreams(streams); return expectedCatalog; } - + /** + * The schema of a random table which is used as a new table in snapshot test + */ protected abstract String randomTableSchema(); protected abstract CdcTargetPosition cdcLatestTargetPosition(); diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java index 768c30355638a..564f443ba4b69 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java @@ -406,121 +406,6 @@ private void createAndPopulateTimestampTable() { } } - // TODO (Subodh): This should be a generic test - @Test - public void newTableSnapshotTest() throws Exception { - final AutoCloseableIterator firstBatchIterator = getSource() - .read(getConfig(), CONFIGURED_CATALOG, null); - final List dataFromFirstBatch = AutoCloseableIterators - .toListAndClose(firstBatchIterator); - final Set recordsFromFirstBatch = extractRecordMessages( - dataFromFirstBatch); - final List stateAfterFirstBatch = extractStateMessages(dataFromFirstBatch); - assertEquals(1, stateAfterFirstBatch.size()); - - final AirbyteStateMessage stateMessageEmittedAfterFirstSyncCompletion = stateAfterFirstBatch.get(0); - assertEquals(AirbyteStateMessage.AirbyteStateType.GLOBAL, stateMessageEmittedAfterFirstSyncCompletion.getType()); - assertNotNull(stateMessageEmittedAfterFirstSyncCompletion.getGlobal().getSharedState()); - final Set streamsInStateAfterFirstSyncCompletion = stateMessageEmittedAfterFirstSyncCompletion.getGlobal().getStreamStates() - .stream() - .map(AirbyteStreamState::getStreamDescriptor) - .collect(Collectors.toSet()); - assertEquals(1, streamsInStateAfterFirstSyncCompletion.size()); - assertTrue(streamsInStateAfterFirstSyncCompletion.contains(new StreamDescriptor().withName(MODELS_STREAM_NAME).withNamespace(MODELS_SCHEMA))); - assertNotNull(stateMessageEmittedAfterFirstSyncCompletion.getData()); - - assertEquals((MODEL_RECORDS.size()), recordsFromFirstBatch.size()); - assertExpectedRecords(new HashSet<>(MODEL_RECORDS), recordsFromFirstBatch); - - final JsonNode state = stateAfterFirstBatch.get(0).getData(); - - final ConfiguredAirbyteCatalog newTables = CatalogHelpers - .toDefaultConfiguredCatalog(new AirbyteCatalog().withStreams(List.of( - CatalogHelpers.createAirbyteStream( - MODELS_STREAM_NAME + "_random", - MODELS_SCHEMA + "_random", - Field.of(COL_ID + "_random", JsonSchemaType.NUMBER), - Field.of(COL_MAKE_ID + "_random", JsonSchemaType.NUMBER), - Field.of(COL_MODEL + "_random", JsonSchemaType.STRING)) - .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) - .withSourceDefinedPrimaryKey(List.of(List.of(COL_ID + "_random")))))); - - newTables.getStreams().forEach(s -> s.setSyncMode(SyncMode.INCREMENTAL)); - final List combinedStreams = new ArrayList<>(); - combinedStreams.addAll(CONFIGURED_CATALOG.getStreams()); - combinedStreams.addAll(newTables.getStreams()); - - final ConfiguredAirbyteCatalog updatedCatalog = new ConfiguredAirbyteCatalog().withStreams(combinedStreams); - - /* - * Write 20 records to the existing table - */ - final Set recordsWritten = new HashSet<>(); - for (int recordsCreated = 0; recordsCreated < 20; recordsCreated++) { - final JsonNode record = - Jsons.jsonNode(ImmutableMap - .of(COL_ID, 100 + recordsCreated, COL_MAKE_ID, 1, COL_MODEL, - "F-" + recordsCreated)); - recordsWritten.add(record); - writeModelRecord(record); - } - - final AutoCloseableIterator secondBatchIterator = getSource() - .read(getConfig(), updatedCatalog, state); - final List dataFromSecondBatch = AutoCloseableIterators - .toListAndClose(secondBatchIterator); - - final List stateAfterSecondBatch = extractStateMessages(dataFromSecondBatch); - assertEquals(2, stateAfterSecondBatch.size()); - - final AirbyteStateMessage stateMessageEmittedAfterSnapshotCompletionInSecondSync = stateAfterSecondBatch.get(0); - assertEquals(AirbyteStateMessage.AirbyteStateType.GLOBAL, stateMessageEmittedAfterSnapshotCompletionInSecondSync.getType()); - assertEquals(stateMessageEmittedAfterFirstSyncCompletion.getGlobal().getSharedState(), - stateMessageEmittedAfterSnapshotCompletionInSecondSync.getGlobal().getSharedState()); - final Set streamsInSnapshotState = stateMessageEmittedAfterSnapshotCompletionInSecondSync.getGlobal().getStreamStates() - .stream() - .map(AirbyteStreamState::getStreamDescriptor) - .collect(Collectors.toSet()); - assertEquals(2, streamsInSnapshotState.size()); - assertTrue( - streamsInSnapshotState.contains(new StreamDescriptor().withName(MODELS_STREAM_NAME + "_random").withNamespace(MODELS_SCHEMA + "_random"))); - assertTrue(streamsInSnapshotState.contains(new StreamDescriptor().withName(MODELS_STREAM_NAME).withNamespace(MODELS_SCHEMA))); - assertNotNull(stateMessageEmittedAfterSnapshotCompletionInSecondSync.getData()); - - final AirbyteStateMessage stateMessageEmittedAfterSecondSyncCompletion = stateAfterSecondBatch.get(1); - assertEquals(AirbyteStateMessage.AirbyteStateType.GLOBAL, stateMessageEmittedAfterSecondSyncCompletion.getType()); - assertNotEquals(stateMessageEmittedAfterFirstSyncCompletion.getGlobal().getSharedState(), - stateMessageEmittedAfterSecondSyncCompletion.getGlobal().getSharedState()); - final Set streamsInSyncCompletionState = stateMessageEmittedAfterSecondSyncCompletion.getGlobal().getStreamStates() - .stream() - .map(AirbyteStreamState::getStreamDescriptor) - .collect(Collectors.toSet()); - assertEquals(2, streamsInSnapshotState.size()); - assertTrue( - streamsInSyncCompletionState.contains( - new StreamDescriptor().withName(MODELS_STREAM_NAME + "_random").withNamespace(MODELS_SCHEMA + "_random"))); - assertTrue(streamsInSyncCompletionState.contains(new StreamDescriptor().withName(MODELS_STREAM_NAME).withNamespace(MODELS_SCHEMA))); - assertNotNull(stateMessageEmittedAfterSecondSyncCompletion.getData()); - - final Map> recordsStreamWise = extractRecordMessagesStreamWise(dataFromSecondBatch); - assertTrue(recordsStreamWise.containsKey(MODELS_STREAM_NAME)); - assertTrue(recordsStreamWise.containsKey(MODELS_STREAM_NAME + "_random")); - - final Set recordsForModelsStreamFromSecondBatch = recordsStreamWise.get(MODELS_STREAM_NAME); - final Set recordsForModelsRandomStreamFromSecondBatch = recordsStreamWise.get(MODELS_STREAM_NAME + "_random"); - - assertEquals((MODEL_RECORDS_RANDOM.size()), recordsForModelsRandomStreamFromSecondBatch.size()); - assertEquals(20, recordsForModelsStreamFromSecondBatch.size()); - assertExpectedRecords(new HashSet<>(MODEL_RECORDS_RANDOM), recordsForModelsRandomStreamFromSecondBatch, - recordsForModelsRandomStreamFromSecondBatch.stream().map(AirbyteRecordMessage::getStream).collect( - Collectors.toSet()), - Sets - .newHashSet(MODELS_STREAM_NAME + "_random"), - MODELS_SCHEMA + "_random"); - assertExpectedRecords(recordsWritten, recordsForModelsStreamFromSecondBatch); - - } - @Test protected void syncShouldHandlePurgedLogsGracefully() throws Exception { From 2a773145526b302c5a997bf7f550809058cb1c2c Mon Sep 17 00:00:00 2001 From: subodh Date: Thu, 22 Sep 2022 19:02:55 +0530 Subject: [PATCH 4/9] format --- .../java/io/airbyte/integrations/debezium/CdcSourceTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/airbyte-integrations/bases/debezium-v1-9-2/src/testFixtures/java/io/airbyte/integrations/debezium/CdcSourceTest.java b/airbyte-integrations/bases/debezium-v1-9-2/src/testFixtures/java/io/airbyte/integrations/debezium/CdcSourceTest.java index fc55657ad6bcb..ded6b0239afb2 100644 --- a/airbyte-integrations/bases/debezium-v1-9-2/src/testFixtures/java/io/airbyte/integrations/debezium/CdcSourceTest.java +++ b/airbyte-integrations/bases/debezium-v1-9-2/src/testFixtures/java/io/airbyte/integrations/debezium/CdcSourceTest.java @@ -802,6 +802,7 @@ protected AirbyteCatalog expectedCatalogForDiscover() { expectedCatalog.withStreams(streams); return expectedCatalog; } + /** * The schema of a random table which is used as a new table in snapshot test */ From 79b06fe0c2c3f38c5c38579e96091692a1a48dca Mon Sep 17 00:00:00 2001 From: subodh Date: Thu, 22 Sep 2022 20:26:09 +0530 Subject: [PATCH 5/9] fix build --- .../integrations/source/mysql/MySqlSourceOperationsTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlSourceOperationsTest.java b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlSourceOperationsTest.java index 5500f45295d54..48f065ddbdae4 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlSourceOperationsTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlSourceOperationsTest.java @@ -41,7 +41,7 @@ public class MySqlSourceOperationsTest { private Database database; @BeforeEach - private void init() { + public void init() { container = new MySQLContainer<>("mysql:8.0"); container.start(); database = new Database(DSLContextFactory.create( From bbf1d43327e11aa9cee16f3bb788efcac73527e4 Mon Sep 17 00:00:00 2001 From: subodh Date: Fri, 23 Sep 2022 13:51:45 +0530 Subject: [PATCH 6/9] fix build --- .../sources}/AbstractMySqlSourceDatatypeTest.java | 2 +- .../AbstractMySqlSslCertificateSourceAcceptanceTest.java | 3 +-- .../sources}/AbstractSshMySqlSourceAcceptanceTest.java | 2 +- .../sources}/CdcBinlogsMySqlSourceDatatypeTest.java | 2 +- .../sources}/CdcInitialSnapshotMySqlSourceDatatypeTest.java | 2 +- .../sources}/CdcMySqlSourceAcceptanceTest.java | 2 +- .../CdcMySqlSslCaCertificateSourceAcceptanceTest.java | 2 +- .../integration_tests/sources}/MySqlSourceAcceptanceTest.java | 2 +- .../integration_tests/sources}/MySqlSourceDatatypeTest.java | 2 +- .../sources}/MySqlSslCaCertificateSourceAcceptanceTest.java | 2 +- .../sources}/MySqlSslFullCertificateSourceAcceptanceTest.java | 2 +- .../sources}/MySqlSslSourceAcceptanceTest.java | 4 +--- .../sources}/SshKeyMySqlSourceAcceptanceTest.java | 2 +- .../sources}/SshPasswordMySqlSourceAcceptanceTest.java | 2 +- 14 files changed, 14 insertions(+), 17 deletions(-) rename airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/{source/mysql => io/airbyte/integration_tests/sources}/AbstractMySqlSourceDatatypeTest.java (99%) rename airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/{source/mysql => io/airbyte/integration_tests/sources}/AbstractMySqlSslCertificateSourceAcceptanceTest.java (95%) rename airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/{source/mysql => io/airbyte/integration_tests/sources}/AbstractSshMySqlSourceAcceptanceTest.java (97%) rename airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/{source/mysql => io/airbyte/integration_tests/sources}/CdcBinlogsMySqlSourceDatatypeTest.java (98%) rename airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/{source/mysql => io/airbyte/integration_tests/sources}/CdcInitialSnapshotMySqlSourceDatatypeTest.java (97%) rename airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/{source/mysql => io/airbyte/integration_tests/sources}/CdcMySqlSourceAcceptanceTest.java (99%) rename airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/{source/mysql => io/airbyte/integration_tests/sources}/CdcMySqlSslCaCertificateSourceAcceptanceTest.java (99%) rename airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/{source/mysql => io/airbyte/integration_tests/sources}/MySqlSourceAcceptanceTest.java (98%) rename airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/{source/mysql => io/airbyte/integration_tests/sources}/MySqlSourceDatatypeTest.java (97%) rename airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/{source/mysql => io/airbyte/integration_tests/sources}/MySqlSslCaCertificateSourceAcceptanceTest.java (91%) rename airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/{source/mysql => io/airbyte/integration_tests/sources}/MySqlSslFullCertificateSourceAcceptanceTest.java (92%) rename airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/{source/mysql => io/airbyte/integration_tests/sources}/MySqlSslSourceAcceptanceTest.java (95%) rename airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/{source/mysql => io/airbyte/integration_tests/sources}/SshKeyMySqlSourceAcceptanceTest.java (81%) rename airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/{source/mysql => io/airbyte/integration_tests/sources}/SshPasswordMySqlSourceAcceptanceTest.java (81%) diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/AbstractMySqlSourceDatatypeTest.java b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractMySqlSourceDatatypeTest.java similarity index 99% rename from airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/AbstractMySqlSourceDatatypeTest.java rename to airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractMySqlSourceDatatypeTest.java index b93a74e1d2b02..55ed3a3654d44 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/AbstractMySqlSourceDatatypeTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractMySqlSourceDatatypeTest.java @@ -2,7 +2,7 @@ * Copyright (c) 2022 Airbyte, Inc., all rights reserved. */ -package io.airbyte.integrations.source.mysql; +package io.airbyte.integrations.io.airbyte.integration_tests.sources; import com.fasterxml.jackson.databind.JsonNode; import com.mysql.cj.MysqlType; diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/AbstractMySqlSslCertificateSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractMySqlSslCertificateSourceAcceptanceTest.java similarity index 95% rename from airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/AbstractMySqlSslCertificateSourceAcceptanceTest.java rename to airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractMySqlSslCertificateSourceAcceptanceTest.java index 68efdaa1be3ac..0086d0045d048 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/AbstractMySqlSslCertificateSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractMySqlSslCertificateSourceAcceptanceTest.java @@ -2,7 +2,7 @@ * Copyright (c) 2022 Airbyte, Inc., all rights reserved. */ -package io.airbyte.integrations.source.mysql; +package io.airbyte.integrations.io.airbyte.integration_tests.sources; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; @@ -12,7 +12,6 @@ import io.airbyte.db.factory.DSLContextFactory; import io.airbyte.db.factory.DatabaseDriver; import io.airbyte.db.jdbc.JdbcUtils; -import io.airbyte.integrations.source.mysql.MySqlSource.ReplicationMethod; import io.airbyte.integrations.standardtest.source.TestDestinationEnv; import java.io.IOException; import org.jooq.DSLContext; diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/AbstractSshMySqlSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractSshMySqlSourceAcceptanceTest.java similarity index 97% rename from airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/AbstractSshMySqlSourceAcceptanceTest.java rename to airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractSshMySqlSourceAcceptanceTest.java index 2cc6986db5f71..14c45a6bbbf2c 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/AbstractSshMySqlSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractSshMySqlSourceAcceptanceTest.java @@ -2,7 +2,7 @@ * Copyright (c) 2022 Airbyte, Inc., all rights reserved. */ -package io.airbyte.integrations.source.mysql; +package io.airbyte.integrations.io.airbyte.integration_tests.sources; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.Lists; diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcBinlogsMySqlSourceDatatypeTest.java b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcBinlogsMySqlSourceDatatypeTest.java similarity index 98% rename from airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcBinlogsMySqlSourceDatatypeTest.java rename to airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcBinlogsMySqlSourceDatatypeTest.java index 045b78b828b94..64187512eaa70 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcBinlogsMySqlSourceDatatypeTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcBinlogsMySqlSourceDatatypeTest.java @@ -2,7 +2,7 @@ * Copyright (c) 2022 Airbyte, Inc., all rights reserved. */ -package io.airbyte.integrations.source.mysql; +package io.airbyte.integrations.io.airbyte.integration_tests.sources; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcInitialSnapshotMySqlSourceDatatypeTest.java b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcInitialSnapshotMySqlSourceDatatypeTest.java similarity index 97% rename from airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcInitialSnapshotMySqlSourceDatatypeTest.java rename to airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcInitialSnapshotMySqlSourceDatatypeTest.java index d52defbdaa99a..caa6795a11a37 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcInitialSnapshotMySqlSourceDatatypeTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcInitialSnapshotMySqlSourceDatatypeTest.java @@ -2,7 +2,7 @@ * Copyright (c) 2022 Airbyte, Inc., all rights reserved. */ -package io.airbyte.integrations.source.mysql; +package io.airbyte.integrations.io.airbyte.integration_tests.sources; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcMySqlSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcMySqlSourceAcceptanceTest.java similarity index 99% rename from airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcMySqlSourceAcceptanceTest.java rename to airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcMySqlSourceAcceptanceTest.java index c06efcef7493c..c3621ff424176 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcMySqlSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcMySqlSourceAcceptanceTest.java @@ -2,7 +2,7 @@ * Copyright (c) 2022 Airbyte, Inc., all rights reserved. */ -package io.airbyte.integrations.source.mysql; +package io.airbyte.integrations.io.airbyte.integration_tests.sources; import static io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest.setEnv; import static io.airbyte.protocol.models.SyncMode.INCREMENTAL; diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcMySqlSslCaCertificateSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcMySqlSslCaCertificateSourceAcceptanceTest.java similarity index 99% rename from airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcMySqlSslCaCertificateSourceAcceptanceTest.java rename to airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcMySqlSslCaCertificateSourceAcceptanceTest.java index 3141173e8b86a..fa568f32d71c1 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcMySqlSslCaCertificateSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcMySqlSslCaCertificateSourceAcceptanceTest.java @@ -2,7 +2,7 @@ * Copyright (c) 2022 Airbyte, Inc., all rights reserved. */ -package io.airbyte.integrations.source.mysql; +package io.airbyte.integrations.io.airbyte.integration_tests.sources; import static io.airbyte.protocol.models.SyncMode.INCREMENTAL; import static org.junit.jupiter.api.Assertions.assertEquals; diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/MySqlSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MySqlSourceAcceptanceTest.java similarity index 98% rename from airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/MySqlSourceAcceptanceTest.java rename to airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MySqlSourceAcceptanceTest.java index 1bb3257ea5cfb..250832e97ae11 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/MySqlSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MySqlSourceAcceptanceTest.java @@ -2,7 +2,7 @@ * Copyright (c) 2022 Airbyte, Inc., all rights reserved. */ -package io.airbyte.integrations.source.mysql; +package io.airbyte.integrations.io.airbyte.integration_tests.sources; import static io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest.setEnv; diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/MySqlSourceDatatypeTest.java b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MySqlSourceDatatypeTest.java similarity index 97% rename from airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/MySqlSourceDatatypeTest.java rename to airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MySqlSourceDatatypeTest.java index a77f81dbeb63b..e32c509780af1 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/MySqlSourceDatatypeTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MySqlSourceDatatypeTest.java @@ -2,7 +2,7 @@ * Copyright (c) 2022 Airbyte, Inc., all rights reserved. */ -package io.airbyte.integrations.source.mysql; +package io.airbyte.integrations.io.airbyte.integration_tests.sources; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/MySqlSslCaCertificateSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MySqlSslCaCertificateSourceAcceptanceTest.java similarity index 91% rename from airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/MySqlSslCaCertificateSourceAcceptanceTest.java rename to airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MySqlSslCaCertificateSourceAcceptanceTest.java index 1e18005796812..49490d5f2ba15 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/MySqlSslCaCertificateSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MySqlSslCaCertificateSourceAcceptanceTest.java @@ -2,7 +2,7 @@ * Copyright (c) 2022 Airbyte, Inc., all rights reserved. */ -package io.airbyte.integrations.source.mysql; +package io.airbyte.integrations.io.airbyte.integration_tests.sources; import com.google.common.collect.ImmutableMap; import io.airbyte.db.MySqlUtils; diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/MySqlSslFullCertificateSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MySqlSslFullCertificateSourceAcceptanceTest.java similarity index 92% rename from airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/MySqlSslFullCertificateSourceAcceptanceTest.java rename to airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MySqlSslFullCertificateSourceAcceptanceTest.java index abb5d3d9f3487..f63e8b5b81fcc 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/MySqlSslFullCertificateSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MySqlSslFullCertificateSourceAcceptanceTest.java @@ -2,7 +2,7 @@ * Copyright (c) 2022 Airbyte, Inc., all rights reserved. */ -package io.airbyte.integrations.source.mysql; +package io.airbyte.integrations.io.airbyte.integration_tests.sources; import com.google.common.collect.ImmutableMap; import io.airbyte.db.MySqlUtils; diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/MySqlSslSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MySqlSslSourceAcceptanceTest.java similarity index 95% rename from airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/MySqlSslSourceAcceptanceTest.java rename to airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MySqlSslSourceAcceptanceTest.java index 0c7d79c48d41c..9fadbb08b494d 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/MySqlSslSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MySqlSslSourceAcceptanceTest.java @@ -2,9 +2,7 @@ * Copyright (c) 2022 Airbyte, Inc., all rights reserved. */ -package io.airbyte.integrations.source.mysql; - -import static io.airbyte.integrations.source.mysql.MySqlSource.SSL_PARAMETERS; +package io.airbyte.integrations.io.airbyte.integration_tests.sources; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/SshKeyMySqlSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SshKeyMySqlSourceAcceptanceTest.java similarity index 81% rename from airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/SshKeyMySqlSourceAcceptanceTest.java rename to airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SshKeyMySqlSourceAcceptanceTest.java index 9c975b65ffcb5..ab028c24a72f3 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/SshKeyMySqlSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SshKeyMySqlSourceAcceptanceTest.java @@ -2,7 +2,7 @@ * Copyright (c) 2022 Airbyte, Inc., all rights reserved. */ -package io.airbyte.integrations.source.mysql; +package io.airbyte.integrations.io.airbyte.integration_tests.sources; import java.nio.file.Path; diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/SshPasswordMySqlSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SshPasswordMySqlSourceAcceptanceTest.java similarity index 81% rename from airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/SshPasswordMySqlSourceAcceptanceTest.java rename to airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SshPasswordMySqlSourceAcceptanceTest.java index fb32a73a8bc32..647f40dc90a7a 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/SshPasswordMySqlSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SshPasswordMySqlSourceAcceptanceTest.java @@ -2,7 +2,7 @@ * Copyright (c) 2022 Airbyte, Inc., all rights reserved. */ -package io.airbyte.integrations.source.mysql; +package io.airbyte.integrations.io.airbyte.integration_tests.sources; import java.nio.file.Path; From c0527731d3215e3236867175e1e2850aabc9c433 Mon Sep 17 00:00:00 2001 From: subodh Date: Mon, 26 Sep 2022 16:52:14 +0530 Subject: [PATCH 7/9] revert acceptance test changes --- .../source-mysql/acceptance-test-docker.sh | 15 --------------- .../connectors/source-mysql/build.gradle | 1 - 2 files changed, 16 deletions(-) delete mode 100755 airbyte-integrations/connectors/source-mysql/acceptance-test-docker.sh diff --git a/airbyte-integrations/connectors/source-mysql/acceptance-test-docker.sh b/airbyte-integrations/connectors/source-mysql/acceptance-test-docker.sh deleted file mode 100755 index 4ceedd9e7ba03..0000000000000 --- a/airbyte-integrations/connectors/source-mysql/acceptance-test-docker.sh +++ /dev/null @@ -1,15 +0,0 @@ -#!/usr/bin/env sh - -# Build latest connector image -docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2):dev - -# Pull latest acctest image -docker pull airbyte/source-acceptance-test:latest - -# Run -docker run --rm -it \ - -v /var/run/docker.sock:/var/run/docker.sock \ - -v /tmp:/tmp \ - -v $(pwd):/test_input \ - airbyte/source-acceptance-test \ - --acceptance-test-config /test_input diff --git a/airbyte-integrations/connectors/source-mysql/build.gradle b/airbyte-integrations/connectors/source-mysql/build.gradle index 295a1afb9b081..08797b63673fe 100644 --- a/airbyte-integrations/connectors/source-mysql/build.gradle +++ b/airbyte-integrations/connectors/source-mysql/build.gradle @@ -3,7 +3,6 @@ plugins { id 'airbyte-docker' id 'airbyte-integration-test-java' id 'airbyte-performance-test-java' - id 'airbyte-source-acceptance-test' } application { From 65658ca9b383099aceac2819f2bf808b245b976f Mon Sep 17 00:00:00 2001 From: subodh Date: Mon, 26 Sep 2022 20:05:27 +0530 Subject: [PATCH 8/9] bump version --- .../connectors/source-mysql-strict-encrypt/Dockerfile | 2 +- airbyte-integrations/connectors/source-mysql/Dockerfile | 2 +- docs/integrations/sources/mysql.md | 1 + 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-mysql-strict-encrypt/Dockerfile b/airbyte-integrations/connectors/source-mysql-strict-encrypt/Dockerfile index 82ef0224d505d..fc2801800ee2c 100644 --- a/airbyte-integrations/connectors/source-mysql-strict-encrypt/Dockerfile +++ b/airbyte-integrations/connectors/source-mysql-strict-encrypt/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-mysql-strict-encrypt COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.6.13 +LABEL io.airbyte.version=0.6.14 LABEL io.airbyte.name=airbyte/source-mysql-strict-encrypt diff --git a/airbyte-integrations/connectors/source-mysql/Dockerfile b/airbyte-integrations/connectors/source-mysql/Dockerfile index cdb98f8c20a88..d1c21df03ab90 100644 --- a/airbyte-integrations/connectors/source-mysql/Dockerfile +++ b/airbyte-integrations/connectors/source-mysql/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-mysql COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.6.13 +LABEL io.airbyte.version=0.6.14 LABEL io.airbyte.name=airbyte/source-mysql diff --git a/docs/integrations/sources/mysql.md b/docs/integrations/sources/mysql.md index dd766ea102f82..24ee39d445c62 100644 --- a/docs/integrations/sources/mysql.md +++ b/docs/integrations/sources/mysql.md @@ -226,6 +226,7 @@ WHERE actor_definition_id ='435bb9a5-7887-4809-aa58-28c27df0d7ad' AND (configura | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------| +| 0.6.14 | 2022-09-26 | [16954](https://github.com/airbytehq/airbyte/pull/16954) | Implement support for snapshot of new tables in CDC mode | | 0.6.13 | 2022-09-14 | [15668](https://github.com/airbytehq/airbyte/pull/15668) | Wrap logs in AirbyteLogMessage | | 0.6.12 | 2022-09-13 | [16657](https://github.com/airbytehq/airbyte/pull/16657) | Improve CDC record queueing performance | | 0.6.11 | 2022-09-08 | [16202](https://github.com/airbytehq/airbyte/pull/16202) | Adds error messaging factory to UI | From ab9365ea12153b8a3b044b7d97469df9a59877a0 Mon Sep 17 00:00:00 2001 From: Octavia Squidington III Date: Mon, 26 Sep 2022 18:20:40 +0000 Subject: [PATCH 9/9] auto-bump connector version [ci skip] --- .../init/src/main/resources/seed/source_definitions.yaml | 2 +- airbyte-config/init/src/main/resources/seed/source_specs.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 5ee37a036b514..a8e19d1a354a8 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -655,7 +655,7 @@ - name: MySQL sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad dockerRepository: airbyte/source-mysql - dockerImageTag: 0.6.13 + dockerImageTag: 0.6.14 documentationUrl: https://docs.airbyte.io/integrations/sources/mysql icon: mysql.svg sourceType: database diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 32e30acb6fa9b..cf16b1627718c 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -6628,7 +6628,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-mysql:0.6.13" +- dockerImage: "airbyte/source-mysql:0.6.14" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/mysql" connectionSpecification: