Skip to content

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanhang1993 committed Jun 15, 2023
1 parent 8512e0e commit 655b44c
Show file tree
Hide file tree
Showing 8 changed files with 27 additions and 201 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public Iterator<SourceRecords> pollSplitRecords() throws InterruptedException {
if (shouldEmit(event.getRecord())) {
sourceRecords.add(event.getRecord());
} else {
LOG.info("{} data change event should not emit", event);
LOG.debug("{} data change event should not emit", event);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ public Collection<SnapshotSplit> generateSplits(TableId tableId) {
long start = System.currentTimeMillis();

Table table = dialect.queryTableSchema(jdbc, tableId).getTable();
Column splitColumn = getSplitColumn(table, sourceConfig);
Column splitColumn =
ChunkUtils.getChunkKeyColumn(table, sourceConfig.getChunkKeyColumn());
final List<ChunkRange> chunks;
try {
chunks = splitTableIntoChunks(jdbc, tableId, splitColumn);
Expand Down Expand Up @@ -377,8 +378,4 @@ private static void maySleep(int count, TableId tableId) {
LOG.info("JdbcSourceChunkSplitter has split {} chunks for table {}", count, tableId);
}
}

public static Column getSplitColumn(Table table, JdbcSourceConfig sourceConfig) {
return ChunkUtils.getChunkKeyColumn(table, sourceConfig.getChunkKeyColumn());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.ververica.cdc.connectors.oracle.source.config.OracleSourceConfig;
import com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffset;
import com.ververica.cdc.connectors.oracle.source.utils.OracleUtils;
import com.ververica.cdc.connectors.oracle.util.ChunkUtils;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.oracle.OracleChangeEventSourceMetricsFactory;
import io.debezium.connector.oracle.OracleConnection;
Expand All @@ -47,7 +48,6 @@
import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
Expand All @@ -62,10 +62,10 @@
import org.slf4j.LoggerFactory;

import java.sql.SQLException;
import java.sql.Types;
import java.time.Instant;
import java.util.Map;
import java.util.Objects;

import static com.ververica.cdc.connectors.oracle.util.ChunkUtils.getChunkKeyColumn;

/** The context for fetch task that fetching data of snapshot split from Oracle data source. */
public class OracleSourceFetchTaskContext extends JdbcSourceFetchTaskContext {
Expand Down Expand Up @@ -194,14 +194,8 @@ public OracleDatabaseSchema getDatabaseSchema() {
@Override
public RowType getSplitType(Table table) {
OracleSourceConfig oracleSourceConfig = getSourceConfig();
// config chunk key column then return type of this column
if (Objects.nonNull(oracleSourceConfig.getChunkKeyColumn())) {
return OracleUtils.getSplitType(table);
}

// RowId is chunk key column by default
return OracleUtils.getSplitType(
Column.editor().jdbcType(Types.VARCHAR).name(ROWID.class.getSimpleName()).create());
return ChunkUtils.getSplitType(
getChunkKeyColumn(table, oracleSourceConfig.getChunkKeyColumn()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.ververica.cdc.connectors.oracle.source.utils;

import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.types.logical.RowType;

import com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffset;
Expand All @@ -28,8 +27,6 @@
import io.debezium.connector.oracle.OracleValueConverters;
import io.debezium.connector.oracle.StreamingAdapter;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.schema.TopicSelector;
import io.debezium.util.SchemaNameAdjuster;
Expand All @@ -40,14 +37,11 @@
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

import static com.ververica.cdc.connectors.base.utils.SourceRecordUtils.rowToArray;
import static org.apache.flink.table.api.DataTypes.FIELD;
import static org.apache.flink.table.api.DataTypes.ROW;

/** Utils to prepare Oracle SQL statement. */
public class OracleUtils {
Expand Down Expand Up @@ -245,20 +239,6 @@ public static PreparedStatement readTableSplitDataStatement(
}
}

public static RowType getSplitType(Table table) {
List<Column> primaryKeys = table.primaryKeyColumns();
if (primaryKeys.isEmpty()) {
throw new ValidationException(
String.format(
"Incremental snapshot for tables requires primary key,"
+ " but table %s doesn't have primary key.",
table.id()));
}

// use first field in primary key as the split key
return getSplitType(primaryKeys.get(0));
}

/** Creates a new {@link OracleDatabaseSchema} to monitor the latest oracle database schemas. */
public static OracleDatabaseSchema createOracleDatabaseSchema(
OracleConnectorConfig dbzOracleConfig) {
Expand Down Expand Up @@ -313,26 +293,6 @@ public static RedoLogOffset getRedoLogPosition(Map<String, ?> offset) {
return new RedoLogOffset(offsetStrMap);
}

public static RowType getSplitType(Column splitColumn) {
return (RowType)
ROW(FIELD(splitColumn.name(), OracleTypeUtils.fromDbzColumn(splitColumn)))
.getLogicalType();
}

public static Column getSplitColumn(Table table) {
List<Column> primaryKeys = table.primaryKeyColumns();
if (primaryKeys.isEmpty()) {
throw new ValidationException(
String.format(
"Incremental snapshot for tables requires primary key,"
+ " but table %s doesn't have primary key.",
table.id()));
}

// use first field in primary key as the split key
return primaryKeys.get(0);
}

public static String quote(String dbOrTableName) {
return "\"" + dbOrTableName + "\"";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
package com.ververica.cdc.connectors.oracle.util;

import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.types.logical.RowType;

import com.ververica.cdc.connectors.oracle.source.utils.OracleTypeUtils;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import oracle.sql.ROWID;
Expand All @@ -29,11 +31,20 @@
import java.util.Optional;
import java.util.stream.Collectors;

import static org.apache.flink.table.api.DataTypes.FIELD;
import static org.apache.flink.table.api.DataTypes.ROW;

/** Utilities to split chunks of table. */
public class ChunkUtils {

private ChunkUtils() {}

public static RowType getSplitType(Column splitColumn) {
return (RowType)
ROW(FIELD(splitColumn.name(), OracleTypeUtils.fromDbzColumn(splitColumn)))
.getLogicalType();
}

public static Column getChunkKeyColumn(Table table, @Nullable String chunkKeyColumn) {
List<Column> primaryKeys = table.primaryKeyColumns();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/

package io.debezium.connector.oracle.logminer;

import io.debezium.DebeziumException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,7 @@
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Stream;

/** Example Tests for {@link JdbcIncrementalSource}. */
Expand Down Expand Up @@ -122,137 +116,6 @@ public void testConsumingAllEvents() throws Exception {
.setParallelism(DEFAULT_PARALLELISM)
.print()
.setParallelism(1);

ExecutorService executorService = Executors.newCachedThreadPool();
executorService.submit(
() -> {
try (Connection connection = getJdbcConnection();
Statement statement = connection.createStatement()) {

Thread.sleep(1000);
statement.execute(
"UPDATE debezium.products SET DESCRIPTION='18oz carpenter hammer' WHERE ID=106");
statement.execute("UPDATE debezium.products SET WEIGHT=5.1 WHERE ID=107");
Thread.sleep(1000);
statement.execute(
"INSERT INTO debezium.products VALUES (111,'jacket','water resistent white wind breaker',0.2)"); // 110
Thread.sleep(1000);
statement.execute(
"INSERT INTO debezium.products VALUES (112,'scooter','Big 2-wheel scooter ',5.18)");
Thread.sleep(1000);
statement.execute(
"UPDATE debezium.products SET DESCRIPTION='new water resistent white wind breaker', WEIGHT=0.5 WHERE ID=111");
Thread.sleep(1000);
statement.execute("UPDATE debezium.products SET WEIGHT=5.17 WHERE ID=112");
Thread.sleep(1000);
statement.execute("DELETE FROM debezium.products WHERE ID=112");

// for (int i = 100000; i < 100010; i++) {
// Thread.sleep(1000);
// int id = i;
// String name = "jacket"+i;
// String sql = "INSERT INTO debezium.products VALUES
// (id, 'jacket1' , 'water resistent white wind
// breaker',0.2)".replaceAll("id", id+"");
// statement.execute(sql);
// String deleteSql = "DELETE FROM debezium.products
// WHERE ID=id".replaceAll("id", id+"");
// statement.execute(deleteSql);
// }
} catch (Exception e) {
LOG.error("", e);
}
});

env.execute("Print Oracle Snapshot + RedoLog");
}

@Test
@Ignore("Test ignored because it won't stop and is used for manual test")
public void testConsumingAllEventsWithChunkKey() throws Exception {
LOG.info(
"getOraclePort:{},getUsername:{},getPassword:{}",
oracleContainer.getOraclePort(),
oracleContainer.getUsername(),
oracleContainer.getPassword());

Properties debeziumProperties = new Properties();
debeziumProperties.setProperty("log.mining.strategy", "online_catalog");
debeziumProperties.setProperty("log.mining.continuous.mine", "true");

JdbcIncrementalSource<String> oracleChangeEventSource =
new OracleSourceBuilder()
.hostname(oracleContainer.getHost())
.port(oracleContainer.getOraclePort())
.databaseList("XE")
.schemaList("DEBEZIUM")
.tableList("DEBEZIUM.PRODUCTS")
.username(oracleContainer.getUsername())
.password(oracleContainer.getPassword())
.deserializer(new JsonDebeziumDeserializationSchema())
.includeSchemaChanges(true) // output the schema changes as well
.startupOptions(StartupOptions.initial())
.debeziumProperties(debeziumProperties)
.chunkKeyColumn("ID")
.splitSize(2)
.build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// enable checkpoint
env.enableCheckpointing(DEFAULT_CHECKPOINT_INTERVAL);
// set the source parallelism to 4
env.fromSource(
oracleChangeEventSource,
WatermarkStrategy.noWatermarks(),
"OracleParallelSource")
.setParallelism(DEFAULT_PARALLELISM)
.print()
.setParallelism(1);

ExecutorService executorService = Executors.newCachedThreadPool();
executorService.submit(
() -> {
try (Connection connection = getJdbcConnection();
Statement statement = connection.createStatement()) {
Thread.sleep(1000);
statement.execute(
"UPDATE debezium.products SET DESCRIPTION='18oz carpenter hammer' WHERE ID=106");
statement.execute("UPDATE debezium.products SET WEIGHT=5.1 WHERE ID=107");
Thread.sleep(1000);
statement.execute(
"INSERT INTO debezium.products VALUES (111,'jacket','water resistent white wind breaker',0.2)"); // 110
Thread.sleep(1000);
statement.execute(
"INSERT INTO debezium.products VALUES (112,'scooter','Big 2-wheel scooter ',5.18)");
Thread.sleep(1000);
statement.execute(
"UPDATE debezium.products SET DESCRIPTION='new water resistent white wind breaker', WEIGHT=0.5 WHERE ID=111");
Thread.sleep(1000);
statement.execute("UPDATE debezium.products SET WEIGHT=5.17 WHERE ID=112");
Thread.sleep(1000);
statement.execute("DELETE FROM debezium.products WHERE ID=112");

for (int i = 100000; i < 100010; i++) {
Thread.sleep(1000);
int id = i;
String name = "jacket" + i;
String sql =
"INSERT INTO debezium.products VALUES (id, 'jacket1' , 'water resistent white wind breaker',0.2)"
.replaceAll("id", id + "");
statement.execute(sql);
}
} catch (Exception e) {
LOG.error("", e);
}
});

env.execute("Print Oracle Snapshot + RedoLog");
}

public Connection getJdbcConnection() throws SQLException {
return DriverManager.getConnection(
oracleContainer.getJdbcUrl(),
oracleContainer.getUsername(),
oracleContainer.getPassword());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void before() throws Exception {

if (parallelismSnapshot) {
env.setParallelism(4);
env.enableCheckpointing(2000);
env.enableCheckpointing(200);
} else {
env.setParallelism(1);
}
Expand Down Expand Up @@ -150,11 +150,11 @@ public void testConsumingAllEvents()
tEnv.executeSql(
"INSERT INTO sink SELECT NAME, SUM(WEIGHT) FROM debezium_source GROUP BY NAME");

waitForSnapshotStarted("sink");
// There
waitForSinkSize("sink", 9);

try (Connection connection = getJdbcConnection();
Statement statement = connection.createStatement()) {

statement.execute(
"UPDATE debezium.products SET DESCRIPTION='18oz carpenter hammer' WHERE ID=106");
statement.execute("UPDATE debezium.products SET WEIGHT=5.1 WHERE ID=107");
Expand Down Expand Up @@ -781,10 +781,10 @@ private static void waitForSinkSize(String sinkName, int expectedSize)
private static int sinkSize(String sinkName) {
synchronized (TestValuesTableFactory.class) {
try {
LOG.info(
"sink result size: {}, sink result: {}",
TestValuesTableFactory.getRawResults(sinkName).size(),
TestValuesTableFactory.getResults("sink"));
// LOG.info(
// "sink result size: {}, sink result: {}",
// TestValuesTableFactory.getRawResults(sinkName).size(),
// TestValuesTableFactory.getResults("sink"));
return TestValuesTableFactory.getRawResults(sinkName).size();
} catch (IllegalArgumentException e) {
// job is not started yet
Expand Down

0 comments on commit 655b44c

Please sign in to comment.