diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java index 9a11ab6a24a..2d1e3e21476 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java @@ -110,6 +110,8 @@ public Iterator pollSplitRecords() throws InterruptedException { for (DataChangeEvent event : batch) { if (shouldEmit(event.getRecord())) { sourceRecords.add(event.getRecord()); + } else { + LOG.debug("{} data change event should not emit", event); } } } diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/JdbcSourceFetchTaskContext.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/JdbcSourceFetchTaskContext.java index 99fb648c33f..6dda90c4d7d 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/JdbcSourceFetchTaskContext.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/JdbcSourceFetchTaskContext.java @@ -157,7 +157,7 @@ public CommonConnectorConfig getDbzConnectorConfig() { } public SchemaNameAdjuster getSchemaNameAdjuster() { - return null; + return schemaNameAdjuster; } public abstract RelationalDatabaseSchema getDatabaseSchema(); diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/OracleSource.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/OracleSource.java index 08d4aa71e46..183a5c8e96b 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/OracleSource.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/OracleSource.java @@ -19,7 +19,6 @@ import com.ververica.cdc.connectors.base.options.StartupOptions; import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import com.ververica.cdc.debezium.DebeziumSourceFunction; -import com.ververica.cdc.debezium.internal.DebeziumOffset; import io.debezium.connector.oracle.OracleConnector; import javax.annotation.Nullable; @@ -167,7 +166,6 @@ public DebeziumSourceFunction build() { props.setProperty("table.include.list", String.join(",", tableList)); } - DebeziumOffset specificOffset = null; switch (startupOptions.startupMode) { case INITIAL: props.setProperty("snapshot.mode", "initial"); @@ -193,7 +191,7 @@ public DebeziumSourceFunction build() { } return new DebeziumSourceFunction<>( - deserializer, props, specificOffset, new OracleValidator(props)); + deserializer, props, null, new OracleValidator(props)); } } } diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitter.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitter.java index 01698ff26e1..4071a555d0b 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitter.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitter.java @@ -74,7 +74,8 @@ public Collection generateSplits(TableId tableId) { long start = System.currentTimeMillis(); Table table = dialect.queryTableSchema(jdbc, tableId).getTable(); - Column splitColumn = getSplitColumn(table, sourceConfig); + Column splitColumn = + ChunkUtils.getChunkKeyColumn(table, sourceConfig.getChunkKeyColumn()); final List chunks; try { chunks = splitTableIntoChunks(jdbc, tableId, splitColumn); @@ -377,8 +378,4 @@ private static void maySleep(int count, TableId tableId) { LOG.info("JdbcSourceChunkSplitter has split {} chunks for table {}", count, tableId); } } - - public static Column getSplitColumn(Table table, JdbcSourceConfig sourceConfig) { - return ChunkUtils.getChunkKeyColumn(table, sourceConfig.getChunkKeyColumn()); - } } diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java index 0051f3c2121..6cc097922ce 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java @@ -25,9 +25,11 @@ import com.ververica.cdc.connectors.base.source.meta.offset.Offset; import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase; import com.ververica.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext; +import com.ververica.cdc.connectors.base.utils.SourceRecordUtils; import com.ververica.cdc.connectors.oracle.source.config.OracleSourceConfig; import com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffset; import com.ververica.cdc.connectors.oracle.source.utils.OracleUtils; +import com.ververica.cdc.connectors.oracle.util.ChunkUtils; import io.debezium.connector.base.ChangeEventQueue; import io.debezium.connector.oracle.OracleChangeEventSourceMetricsFactory; import io.debezium.connector.oracle.OracleConnection; @@ -52,14 +54,19 @@ import io.debezium.schema.DataCollectionId; import io.debezium.schema.TopicSelector; import io.debezium.util.Collect; +import oracle.sql.ROWID; import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.header.ConnectHeaders; import org.apache.kafka.connect.source.SourceRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.sql.SQLException; import java.time.Instant; import java.util.Map; +import static com.ververica.cdc.connectors.oracle.util.ChunkUtils.getChunkKeyColumn; + /** The context for fetch task that fetching data of snapshot split from Oracle data source. */ public class OracleSourceFetchTaskContext extends JdbcSourceFetchTaskContext { @@ -186,7 +193,43 @@ public OracleDatabaseSchema getDatabaseSchema() { @Override public RowType getSplitType(Table table) { - return OracleUtils.getSplitType(table); + OracleSourceConfig oracleSourceConfig = getSourceConfig(); + return ChunkUtils.getSplitType( + getChunkKeyColumn(table, oracleSourceConfig.getChunkKeyColumn())); + } + + @Override + public boolean isDataChangeRecord(SourceRecord record) { + return SourceRecordUtils.isDataChangeRecord(record); + } + + @Override + public boolean isRecordBetween(SourceRecord record, Object[] splitStart, Object[] splitEnd) { + RowType splitKeyType = + getSplitType(getDatabaseSchema().tableFor(SourceRecordUtils.getTableId(record))); + + // RowId is chunk key column by default, compare RowId + if (splitKeyType.getFieldNames().contains(ROWID.class.getSimpleName())) { + ConnectHeaders headers = (ConnectHeaders) record.headers(); + ROWID rowId = null; + try { + rowId = new ROWID(headers.iterator().next().value().toString()); + } catch (SQLException e) { + LOG.error("{} can not convert to RowId", record); + } + Object[] rowIds = new ROWID[] {rowId}; + return SourceRecordUtils.splitKeyRangeContains(rowIds, splitStart, splitEnd); + } else { + // config chunk key column compare + Object[] key = + SourceRecordUtils.getSplitKey(splitKeyType, record, getSchemaNameAdjuster()); + return SourceRecordUtils.splitKeyRangeContains(key, splitStart, splitEnd); + } + } + + @Override + public TableId getTableId(SourceRecord record) { + return SourceRecordUtils.getTableId(record); } @Override diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/utils/OracleUtils.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/utils/OracleUtils.java index 58fee6746cc..56c1cc5c987 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/utils/OracleUtils.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/utils/OracleUtils.java @@ -16,7 +16,6 @@ package com.ververica.cdc.connectors.oracle.source.utils; -import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.types.logical.RowType; import com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffset; @@ -28,8 +27,6 @@ import io.debezium.connector.oracle.OracleValueConverters; import io.debezium.connector.oracle.StreamingAdapter; import io.debezium.jdbc.JdbcConnection; -import io.debezium.relational.Column; -import io.debezium.relational.Table; import io.debezium.relational.TableId; import io.debezium.schema.TopicSelector; import io.debezium.util.SchemaNameAdjuster; @@ -40,14 +37,11 @@ import java.sql.SQLException; import java.util.HashMap; import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; import static com.ververica.cdc.connectors.base.utils.SourceRecordUtils.rowToArray; -import static org.apache.flink.table.api.DataTypes.FIELD; -import static org.apache.flink.table.api.DataTypes.ROW; /** Utils to prepare Oracle SQL statement. */ public class OracleUtils { @@ -245,20 +239,6 @@ public static PreparedStatement readTableSplitDataStatement( } } - public static RowType getSplitType(Table table) { - List primaryKeys = table.primaryKeyColumns(); - if (primaryKeys.isEmpty()) { - throw new ValidationException( - String.format( - "Incremental snapshot for tables requires primary key," - + " but table %s doesn't have primary key.", - table.id())); - } - - // use first field in primary key as the split key - return getSplitType(primaryKeys.get(0)); - } - /** Creates a new {@link OracleDatabaseSchema} to monitor the latest oracle database schemas. */ public static OracleDatabaseSchema createOracleDatabaseSchema( OracleConnectorConfig dbzOracleConfig) { @@ -313,26 +293,6 @@ public static RedoLogOffset getRedoLogPosition(Map offset) { return new RedoLogOffset(offsetStrMap); } - public static RowType getSplitType(Column splitColumn) { - return (RowType) - ROW(FIELD(splitColumn.name(), OracleTypeUtils.fromDbzColumn(splitColumn))) - .getLogicalType(); - } - - public static Column getSplitColumn(Table table) { - List primaryKeys = table.primaryKeyColumns(); - if (primaryKeys.isEmpty()) { - throw new ValidationException( - String.format( - "Incremental snapshot for tables requires primary key," - + " but table %s doesn't have primary key.", - table.id())); - } - - // use first field in primary key as the split key - return primaryKeys.get(0); - } - public static String quote(String dbOrTableName) { return "\"" + dbOrTableName + "\""; } diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/util/ChunkUtils.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/util/ChunkUtils.java index 7487b284135..d21d21dbfc0 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/util/ChunkUtils.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/util/ChunkUtils.java @@ -17,7 +17,9 @@ package com.ververica.cdc.connectors.oracle.util; import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.types.logical.RowType; +import com.ververica.cdc.connectors.oracle.source.utils.OracleTypeUtils; import io.debezium.relational.Column; import io.debezium.relational.Table; import oracle.sql.ROWID; @@ -29,11 +31,20 @@ import java.util.Optional; import java.util.stream.Collectors; +import static org.apache.flink.table.api.DataTypes.FIELD; +import static org.apache.flink.table.api.DataTypes.ROW; + /** Utilities to split chunks of table. */ public class ChunkUtils { private ChunkUtils() {} + public static RowType getSplitType(Column splitColumn) { + return (RowType) + ROW(FIELD(splitColumn.name(), OracleTypeUtils.fromDbzColumn(splitColumn))) + .getLogicalType(); + } + public static Column getChunkKeyColumn(Table table, @Nullable String chunkKeyColumn) { List primaryKeys = table.primaryKeyColumns(); diff --git a/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerChangeRecordEmitter.java b/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerChangeRecordEmitter.java new file mode 100644 index 00000000000..4165083ecf0 --- /dev/null +++ b/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerChangeRecordEmitter.java @@ -0,0 +1,268 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ + +package io.debezium.connector.oracle.logminer; + +import io.debezium.DebeziumException; +import io.debezium.connector.oracle.BaseChangeRecordEmitter; +import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntry; +import io.debezium.data.Envelope.Operation; +import io.debezium.pipeline.spi.OffsetContext; +import io.debezium.relational.Table; +import io.debezium.relational.TableSchema; +import io.debezium.schema.DataCollectionSchema; +import io.debezium.util.Clock; +import oracle.sql.ROWID; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.header.ConnectHeaders; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Objects; + +/** + * Copied from Debezium 1.6.4.Final. + * + *

Emits change record based on a single {@link LogMinerDmlEntry} event. + * + *

This class overrides the emit methods to put the ROWID in the header. + * + *

Line 59 ~ 257: add ROWID and emit methods. + */ +public class LogMinerChangeRecordEmitter extends BaseChangeRecordEmitter { + + private static final Logger LOGGER = LoggerFactory.getLogger(LogMinerChangeRecordEmitter.class); + + private final int operation; + private final Object[] oldValues; + private final Object[] newValues; + private final String rowId; + + public LogMinerChangeRecordEmitter( + OffsetContext offset, + int operation, + Object[] oldValues, + Object[] newValues, + Table table, + Clock clock) { + super(offset, table, clock); + this.operation = operation; + this.oldValues = oldValues; + this.newValues = newValues; + this.rowId = null; + } + + public LogMinerChangeRecordEmitter( + OffsetContext offset, + int operation, + Object[] oldValues, + Object[] newValues, + Table table, + Clock clock, + String rowId) { + super(offset, table, clock); + this.operation = operation; + this.oldValues = oldValues; + this.newValues = newValues; + this.rowId = rowId; + } + + @Override + protected Operation getOperation() { + switch (operation) { + case RowMapper.INSERT: + return Operation.CREATE; + case RowMapper.UPDATE: + case RowMapper.SELECT_LOB_LOCATOR: + return Operation.UPDATE; + case RowMapper.DELETE: + return Operation.DELETE; + default: + throw new DebeziumException("Unsupported operation type: " + operation); + } + } + + @Override + public void emitChangeRecords(DataCollectionSchema schema, Receiver receiver) + throws InterruptedException { + TableSchema tableSchema = (TableSchema) schema; + Operation operation = getOperation(); + + switch (operation) { + case CREATE: + emitCreateRecord(receiver, tableSchema); + break; + case READ: + emitReadRecord(receiver, tableSchema); + break; + case UPDATE: + emitUpdateRecord(receiver, tableSchema); + break; + case DELETE: + emitDeleteRecord(receiver, tableSchema); + break; + case TRUNCATE: + emitTruncateRecord(receiver, tableSchema); + break; + default: + throw new IllegalArgumentException("Unsupported operation: " + operation); + } + } + + @Override + protected void emitCreateRecord(Receiver receiver, TableSchema tableSchema) + throws InterruptedException { + Object[] newColumnValues = getNewColumnValues(); + Struct newKey = tableSchema.keyFromColumnData(newColumnValues); + Struct newValue = tableSchema.valueFromColumnData(newColumnValues); + Struct envelope = + tableSchema + .getEnvelopeSchema() + .create( + newValue, + getOffset().getSourceInfo(), + getClock().currentTimeAsInstant()); + ConnectHeaders headers = new ConnectHeaders(); + headers.add(ROWID.class.getSimpleName(), new SchemaAndValue(null, rowId)); + + if (skipEmptyMessages() && (newColumnValues == null || newColumnValues.length == 0)) { + // This case can be hit on UPDATE / DELETE when there's no primary key defined while + // using certain decoders + LOGGER.warn( + "no new values found for table '{}' from create message at '{}'; skipping record", + tableSchema, + getOffset().getSourceInfo()); + return; + } + receiver.changeRecord( + tableSchema, Operation.CREATE, newKey, envelope, getOffset(), headers); + } + + @Override + protected void emitReadRecord(Receiver receiver, TableSchema tableSchema) + throws InterruptedException { + Object[] newColumnValues = getNewColumnValues(); + Struct newKey = tableSchema.keyFromColumnData(newColumnValues); + Struct newValue = tableSchema.valueFromColumnData(newColumnValues); + Struct envelope = + tableSchema + .getEnvelopeSchema() + .read( + newValue, + getOffset().getSourceInfo(), + getClock().currentTimeAsInstant()); + ConnectHeaders headers = new ConnectHeaders(); + headers.add(ROWID.class.getSimpleName(), new SchemaAndValue(null, rowId)); + + receiver.changeRecord(tableSchema, Operation.READ, newKey, envelope, getOffset(), headers); + } + + @Override + protected void emitUpdateRecord(Receiver receiver, TableSchema tableSchema) + throws InterruptedException { + Object[] oldColumnValues = getOldColumnValues(); + Object[] newColumnValues = getNewColumnValues(); + + Struct oldKey = tableSchema.keyFromColumnData(oldColumnValues); + Struct newKey = tableSchema.keyFromColumnData(newColumnValues); + + Struct newValue = tableSchema.valueFromColumnData(newColumnValues); + Struct oldValue = tableSchema.valueFromColumnData(oldColumnValues); + + if (skipEmptyMessages() && (newColumnValues == null || newColumnValues.length == 0)) { + LOGGER.warn( + "no new values found for table '{}' from update message at '{}'; skipping record", + tableSchema, + getOffset().getSourceInfo()); + return; + } + // some configurations does not provide old values in case of updates + // in this case we handle all updates as regular ones + if (oldKey == null || Objects.equals(oldKey, newKey)) { + Struct envelope = + tableSchema + .getEnvelopeSchema() + .update( + oldValue, + newValue, + getOffset().getSourceInfo(), + getClock().currentTimeAsInstant()); + ConnectHeaders headers = new ConnectHeaders(); + headers.add(ROWID.class.getSimpleName(), new SchemaAndValue(null, rowId)); + receiver.changeRecord( + tableSchema, Operation.UPDATE, newKey, envelope, getOffset(), headers); + } + // PK update -> emit as delete and re-insert with new key + else { + ConnectHeaders headers = new ConnectHeaders(); + headers.add(PK_UPDATE_NEWKEY_FIELD, newKey, tableSchema.keySchema()); + + Struct envelope = + tableSchema + .getEnvelopeSchema() + .delete( + oldValue, + getOffset().getSourceInfo(), + getClock().currentTimeAsInstant()); + receiver.changeRecord( + tableSchema, Operation.DELETE, oldKey, envelope, getOffset(), headers); + + headers = new ConnectHeaders(); + headers.add(PK_UPDATE_OLDKEY_FIELD, oldKey, tableSchema.keySchema()); + + headers.add(ROWID.class.getSimpleName(), new SchemaAndValue(null, rowId)); + + envelope = + tableSchema + .getEnvelopeSchema() + .create( + newValue, + getOffset().getSourceInfo(), + getClock().currentTimeAsInstant()); + receiver.changeRecord( + tableSchema, Operation.CREATE, newKey, envelope, getOffset(), headers); + } + } + + @Override + protected void emitDeleteRecord(Receiver receiver, TableSchema tableSchema) + throws InterruptedException { + Object[] oldColumnValues = getOldColumnValues(); + Struct oldKey = tableSchema.keyFromColumnData(oldColumnValues); + Struct oldValue = tableSchema.valueFromColumnData(oldColumnValues); + ConnectHeaders headers = new ConnectHeaders(); + headers.add(ROWID.class.getSimpleName(), new SchemaAndValue(null, rowId)); + + if (skipEmptyMessages() && (oldColumnValues == null || oldColumnValues.length == 0)) { + LOGGER.warn( + "no old values found for table '{}' from delete message at '{}'; skipping record", + tableSchema, + getOffset().getSourceInfo()); + return; + } + + Struct envelope = + tableSchema + .getEnvelopeSchema() + .delete( + oldValue, + getOffset().getSourceInfo(), + getClock().currentTimeAsInstant()); + receiver.changeRecord( + tableSchema, Operation.DELETE, oldKey, envelope, getOffset(), headers); + } + + @Override + protected Object[] getOldColumnValues() { + return oldValues; + } + + @Override + protected Object[] getNewColumnValues() { + return newValues; + } +} diff --git a/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBuffer.java b/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBuffer.java index 996b03a490f..101f941c933 100644 --- a/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBuffer.java +++ b/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBuffer.java @@ -410,7 +410,8 @@ boolean commit( event.getEntry().getOldValues(), event.getEntry().getNewValues(), schema.tableFor(event.getTableId()), - clock)); + clock, + event.rowId)); } lastCommittedScn = Scn.valueOf(scn.longValue()); diff --git a/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/OracleChangeEventSourceExampleTest.java b/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/OracleChangeEventSourceExampleTest.java index 7be17326bda..8a06adfbab9 100644 --- a/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/OracleChangeEventSourceExampleTest.java +++ b/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/OracleChangeEventSourceExampleTest.java @@ -116,7 +116,6 @@ public void testConsumingAllEvents() throws Exception { .setParallelism(DEFAULT_PARALLELISM) .print() .setParallelism(1); - env.execute("Print Oracle Snapshot + RedoLog"); } } diff --git a/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/table/OracleConnectorITCase.java b/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/table/OracleConnectorITCase.java index 96ae27d6f71..cef8a413342 100644 --- a/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/table/OracleConnectorITCase.java +++ b/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/table/OracleConnectorITCase.java @@ -26,7 +26,6 @@ import com.ververica.cdc.connectors.oracle.utils.OracleTestUtils; import org.junit.After; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -103,7 +102,6 @@ public void teardown() { } @Test - @Ignore("It can be open until issue 1875 fix") public void testConsumingAllEvents() throws SQLException, ExecutionException, InterruptedException { String sourceDDL = @@ -152,11 +150,11 @@ public void testConsumingAllEvents() tEnv.executeSql( "INSERT INTO sink SELECT NAME, SUM(WEIGHT) FROM debezium_source GROUP BY NAME"); - waitForSnapshotStarted("sink"); + // There are 9 records in the table, wait until the snapshot phase finished + waitForSinkSize("sink", 9); try (Connection connection = getJdbcConnection(); Statement statement = connection.createStatement()) { - statement.execute( "UPDATE debezium.products SET DESCRIPTION='18oz carpenter hammer' WHERE ID=106"); statement.execute("UPDATE debezium.products SET WEIGHT=5.1 WHERE ID=107"); @@ -214,6 +212,9 @@ public void testConsumingAllEvents() @Test public void testConsumingAllEventsByChunkKeyColumn() throws SQLException, ExecutionException, InterruptedException { + if (!parallelismSnapshot) { + return; + } String sourceDDL = String.format( "CREATE TABLE debezium_source (" @@ -261,7 +262,7 @@ public void testConsumingAllEventsByChunkKeyColumn() tEnv.executeSql( "INSERT INTO sink SELECT NAME, SUM(WEIGHT) FROM debezium_source GROUP BY NAME"); - waitForSnapshotStarted("sink"); + waitForSinkSize("sink", 9); try (Connection connection = getJdbcConnection(); Statement statement = connection.createStatement()) { @@ -400,7 +401,6 @@ public void testMetadataColumns() throws Throwable { @Test public void testStartupFromLatestOffset() throws Exception { - // database.createAndInitialize(); String sourceDDL = String.format( "CREATE TABLE debezium_source (" @@ -441,7 +441,7 @@ public void testStartupFromLatestOffset() throws Exception { // async submit job TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source"); // wait for the source startup, we don't have a better way to wait it, use sleep for now - Thread.sleep(5000L); + Thread.sleep(10000L); try (Connection connection = getJdbcConnection(); Statement statement = connection.createStatement()) {