diff --git a/docs/en/connector-v2/source/MySQL-CDC.md b/docs/en/connector-v2/source/MySQL-CDC.md
index 42803e7e365..bc562213a2d 100644
--- a/docs/en/connector-v2/source/MySQL-CDC.md
+++ b/docs/en/connector-v2/source/MySQL-CDC.md
@@ -153,6 +153,7 @@ When an initial consistent snapshot is made for large databases, your establishe
| password | String | Yes | - | Password to use when connecting to the database server. |
| database-names | List | No | - | Database name of the database to monitor. |
| table-names | List | Yes | - | Table name of the database to monitor. The table name needs to include the database name, for example: `database_name.table_name` |
+| table-names-config | List | No | - | Table config list. for example: [{"table": "db1.schema1.table1","primaryKeys":["key1"]}] |
| startup.mode | Enum | No | INITIAL | Optional startup mode for MySQL CDC consumer, valid enumerations are `initial`, `earliest`, `latest` and `specific`.
`initial`: Synchronize historical data at startup, and then synchronize incremental data.
`earliest`: Startup from the earliest offset possible.
`latest`: Startup from the latest offset.
`specific`: Startup from user-supplied specific offsets. |
| startup.specific-offset.file | String | No | - | Start from the specified binlog file name. **Note, This option is required when the `startup.mode` option used `specific`.** |
| startup.specific-offset.pos | Long | No | - | Start from the specified binlog file position. **Note, This option is required when the `startup.mode` option used `specific`.** |
@@ -190,9 +191,6 @@ env {
source {
MySQL-CDC {
- catalog = {
- factory = MySQL
- }
base-url = "jdbc:mysql://localhost:3306/testdb"
username = "root"
password = "root@123"
@@ -212,6 +210,37 @@ sink {
> Must be used with kafka connector sink, see [compatible debezium format](../formats/cdc-compatible-debezium-json.md) for details
+### Support custom primary key for table
+
+```
+env {
+ parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 10000
+}
+
+source {
+ MySQL-CDC {
+ base-url = "jdbc:mysql://localhost:3306/testdb"
+ username = "root"
+ password = "root@123"
+
+ table-names = ["testdb.table1", "testdb.table2"]
+ table-names-config = [
+ {
+ table = "testdb.table2"
+ primaryKeys = ["id"]
+ }
+ ]
+ }
+}
+
+sink {
+ Console {
+ }
+}
+```
+
## Changelog
- Add MySQL CDC Source Connector
diff --git a/docs/en/connector-v2/source/SqlServer-CDC.md b/docs/en/connector-v2/source/SqlServer-CDC.md
index 1b32c824a47..62b788ac155 100644
--- a/docs/en/connector-v2/source/SqlServer-CDC.md
+++ b/docs/en/connector-v2/source/SqlServer-CDC.md
@@ -60,6 +60,7 @@ Please download and put SqlServer driver in `${SEATUNNEL_HOME}/lib/` dir. For ex
| password | String | Yes | - | Password to use when connecting to the database server. |
| database-names | List | Yes | - | Database name of the database to monitor. |
| table-names | List | Yes | - | Table name is a combination of schema name and table name (databaseName.schemaName.tableName). |
+| table-names-config | List | No | - | Table config list. for example: [{"table": "db1.schema1.table1","primaryKeys":["key1"]}] |
| base-url | String | Yes | - | URL has to be with database, like "jdbc:sqlserver://localhost:1433;databaseName=test". |
| startup.mode | Enum | No | INITIAL | Optional startup mode for SqlServer CDC consumer, valid enumerations are "initial", "earliest", "latest" and "specific". |
| startup.timestamp | Long | No | - | Start from the specified epoch timestamp (in milliseconds).
**Note, This option is required when** the **"startup.mode" option used `'timestamp'`.** |
@@ -186,3 +187,34 @@ sink {
}
```
+### Support custom primary key for table
+
+```
+env {
+ parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+}
+
+source {
+ SqlServer-CDC {
+ base-url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
+ username = "sa"
+ password = "Y.sa123456"
+ database-names = ["column_type_test"]
+
+ table-names = ["column_type_test.dbo.simple_types", "column_type_test.dbo.full_types"]
+ table-names-config = [
+ {
+ table = "column_type_test.dbo.full_types"
+ primaryKeys = ["id"]
+ }
+ ]
+ }
+}
+
+sink {
+ console {
+ }
+```
+
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceTableConfig.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceTableConfig.java
new file mode 100644
index 00000000000..5cafa363e8d
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceTableConfig.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.cdc.base.config;
+
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Data
+public class JdbcSourceTableConfig implements Serializable {
+ private String table;
+ private List primaryKeys;
+}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java
index 9542a8e9701..6cd7ba0631e 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java
@@ -19,6 +19,7 @@
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceTableConfig;
import org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource;
import java.time.ZoneId;
@@ -141,4 +142,17 @@ public class JdbcSourceOptions extends SourceOptions {
+ "The value represents the denominator of the sampling rate fraction. "
+ "For example, a value of 1000 means a sampling rate of 1/1000. "
+ "This parameter is used when the sample sharding strategy is triggered.");
+
+ public static final Option> TABLE_NAMES_CONFIG =
+ Options.key("table-names-config")
+ .listType(JdbcSourceTableConfig.class)
+ .noDefaultValue()
+ .withDescription(
+ "Config table configs. Example: "
+ + "["
+ + " {"
+ + " \"table\": \"db1.schema1.table1\","
+ + " \"primaryKeys\": [\"key1\",\"key2\"]"
+ + " }"
+ + "]");
}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/CatalogTableUtils.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/CatalogTableUtils.java
new file mode 100644
index 00000000000..78f01370f2c
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/CatalogTableUtils.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.cdc.base.utils;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceTableConfig;
+
+import io.debezium.relational.Table;
+import io.debezium.relational.TableId;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class CatalogTableUtils {
+
+ public static List mergeCatalogTableConfig(
+ List tables,
+ List tableConfigs,
+ Function parser) {
+ Map catalogTableMap =
+ tables.stream()
+ .collect(Collectors.toMap(t -> t.getTableId().toTablePath(), t -> t));
+ for (JdbcSourceTableConfig catalogTableConfig : tableConfigs) {
+ TablePath tablePath = parser.apply(catalogTableConfig.getTable());
+ CatalogTable catalogTable = catalogTableMap.get(tablePath);
+ if (catalogTable != null) {
+ catalogTable = mergeCatalogTableConfig(catalogTable, catalogTableConfig);
+ catalogTableMap.put(tablePath, catalogTable);
+ log.info(
+ "Override primary key({}) for catalog table {}",
+ catalogTableConfig.getPrimaryKeys(),
+ catalogTableConfig.getTable());
+ } else {
+ log.warn(
+ "Table {} is not found in catalog tables, skip to merge config",
+ catalogTableConfig.getTable());
+ }
+ }
+ return new ArrayList<>(catalogTableMap.values());
+ }
+
+ public static CatalogTable mergeCatalogTableConfig(
+ final CatalogTable table, JdbcSourceTableConfig config) {
+ List columnNames =
+ table.getTableSchema().getColumns().stream()
+ .map(c -> c.getName())
+ .collect(Collectors.toList());
+ for (String pk : config.getPrimaryKeys()) {
+ if (!columnNames.contains(pk)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Primary key(%s) is not in table(%s) columns(%s)",
+ pk, table.getTablePath(), columnNames));
+ }
+ }
+ PrimaryKey primaryKeys =
+ PrimaryKey.of(
+ "pk" + (config.getPrimaryKeys().hashCode() & Integer.MAX_VALUE),
+ config.getPrimaryKeys());
+ List columns =
+ table.getTableSchema().getColumns().stream()
+ .map(
+ column -> {
+ if (config.getPrimaryKeys().contains(column.getName())
+ && column.isNullable()) {
+ log.warn(
+ "Primary key({}) is nullable for catalog table {}",
+ column.getName(),
+ table.getTablePath());
+ return PhysicalColumn.of(
+ column.getName(),
+ column.getDataType(),
+ column.getColumnLength(),
+ false,
+ column.getDefaultValue(),
+ column.getComment());
+ }
+ return column;
+ })
+ .collect(Collectors.toList());
+
+ return CatalogTable.of(
+ table.getTableId(),
+ TableSchema.builder()
+ .primaryKey(primaryKeys)
+ .columns(columns)
+ .constraintKey(table.getTableSchema().getConstraintKeys())
+ .build(),
+ table.getOptions(),
+ table.getPartitionKeys(),
+ table.getComment());
+ }
+
+ public static Table mergeCatalogTableConfig(Table debeziumTable, CatalogTable catalogTable) {
+ PrimaryKey pk = catalogTable.getTableSchema().getPrimaryKey();
+ if (pk != null) {
+ debeziumTable = debeziumTable.edit().setPrimaryKeyNames(pk.getColumnNames()).create();
+ log.info(
+ "Override primary key({}) for catalog table {}",
+ pk.getColumnNames(),
+ debeziumTable.id());
+ }
+ return debeziumTable;
+ }
+
+ public static Map convertTables(List catalogTables) {
+ Map tableMap =
+ catalogTables.stream()
+ .collect(
+ Collectors.toMap(
+ e ->
+ new TableId(
+ e.getTableId().getDatabaseName(),
+ e.getTableId().getSchemaName(),
+ e.getTableId().getTableName()),
+ e -> e));
+ return Collections.unmodifiableMap(tableMap);
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java
index 30e7ddf0f33..c43b819f066 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java
@@ -17,6 +17,9 @@
package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
@@ -24,6 +27,7 @@
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
import org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
+import org.apache.seatunnel.connectors.cdc.base.utils.CatalogTableUtils;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceConfigFactory;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.eumerator.MySqlChunkSplitter;
@@ -40,6 +44,8 @@
import java.sql.SQLException;
import java.util.List;
+import java.util.Map;
+import java.util.Optional;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.isTableIdCaseSensitive;
@@ -49,9 +55,11 @@ public class MySqlDialect implements JdbcDataSourceDialect {
private static final long serialVersionUID = 1L;
private final MySqlSourceConfig sourceConfig;
private transient MySqlSchema mySqlSchema;
+ private final Map tableMap;
- public MySqlDialect(MySqlSourceConfigFactory configFactory) {
+ public MySqlDialect(MySqlSourceConfigFactory configFactory, List catalogTables) {
this.sourceConfig = configFactory.create(0);
+ this.tableMap = CatalogTableUtils.convertTables(catalogTables);
}
@Override
@@ -93,7 +101,8 @@ public List discoverDataCollections(JdbcSourceConfig sourceConfig) {
public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc, TableId tableId) {
if (mySqlSchema == null) {
mySqlSchema =
- new MySqlSchema(sourceConfig, isDataCollectionIdCaseSensitive(sourceConfig));
+ new MySqlSchema(
+ sourceConfig, isDataCollectionIdCaseSensitive(sourceConfig), tableMap);
}
return mySqlSchema.getTableSchema(jdbc, tableId);
}
@@ -112,4 +121,14 @@ public FetchTask createFetchTask(SourceSplitBase sourceSplitBas
return new MySqlBinlogFetchTask(sourceSplitBase.asIncrementalSplit());
}
}
+
+ @Override
+ public Optional getPrimaryKey(JdbcConnection jdbcConnection, TableId tableId) {
+ return Optional.ofNullable(tableMap.get(tableId).getTableSchema().getPrimaryKey());
+ }
+
+ @Override
+ public List getConstraintKeys(JdbcConnection jdbcConnection, TableId tableId) {
+ return tableMap.get(tableId).getTableSchema().getConstraintKeys();
+ }
}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
index f221256e1d2..67ff9ff6079 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
@@ -114,7 +114,7 @@ public DebeziumDeserializationSchema createDebeziumDeserializationSchema(
@Override
public DataSourceDialect createDataSourceDialect(ReadonlyConfig config) {
- return new MySqlDialect((MySqlSourceConfigFactory) configFactory);
+ return new MySqlDialect((MySqlSourceConfigFactory) configFactory, catalogTables);
}
@Override
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
index 1ec94c3cfc2..defe0a6ab98 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
@@ -23,22 +23,26 @@
import org.apache.seatunnel.api.table.catalog.CatalogOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceTableConfig;
import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
import org.apache.seatunnel.connectors.cdc.base.option.StopMode;
+import org.apache.seatunnel.connectors.cdc.base.utils.CatalogTableUtils;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
import com.google.auto.service.AutoService;
import java.io.Serializable;
import java.util.List;
+import java.util.Optional;
@AutoService(Factory.class)
public class MySqlIncrementalSourceFactory implements TableSourceFactory {
@@ -65,7 +69,8 @@ public OptionRule optionRule() {
JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND,
JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND,
JdbcSourceOptions.SAMPLE_SHARDING_THRESHOLD,
- JdbcSourceOptions.INVERSE_SAMPLING_RATE)
+ JdbcSourceOptions.INVERSE_SAMPLING_RATE,
+ JdbcSourceOptions.TABLE_NAMES_CONFIG)
.optional(MySqlSourceOptions.STARTUP_MODE, MySqlSourceOptions.STOP_MODE)
.conditional(
MySqlSourceOptions.STARTUP_MODE,
@@ -96,6 +101,15 @@ TableSource createSource(TableSourceFactoryContext context) {
List catalogTables =
CatalogTableUtil.getCatalogTables(
context.getOptions(), context.getClassLoader());
+ Optional> tableConfigs =
+ context.getOptions().getOptional(JdbcSourceOptions.TABLE_NAMES_CONFIG);
+ if (tableConfigs.isPresent()) {
+ catalogTables =
+ CatalogTableUtils.mergeCatalogTableConfig(
+ catalogTables,
+ tableConfigs.get(),
+ text -> TablePath.of(text, false));
+ }
SeaTunnelDataType dataType =
CatalogTableUtil.convertToMultipleRowType(catalogTables);
return (SeaTunnelSource)
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlSchema.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlSchema.java
index e9213429988..324f91fc6ea 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlSchema.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlSchema.java
@@ -17,13 +17,17 @@
package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.connectors.cdc.base.utils.CatalogTableUtils;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceConfig;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlDatabaseSchema;
import io.debezium.connector.mysql.MySqlOffsetContext;
import io.debezium.jdbc.JdbcConnection;
+import io.debezium.relational.Table;
import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
import io.debezium.relational.history.TableChanges.TableChange;
import io.debezium.schema.SchemaChangeEvent;
@@ -41,13 +45,18 @@ public class MySqlSchema {
private final MySqlConnectorConfig connectorConfig;
private final MySqlDatabaseSchema databaseSchema;
private final Map schemasByTableId;
+ private final Map tableMap;
- public MySqlSchema(MySqlSourceConfig sourceConfig, boolean isTableIdCaseSensitive) {
+ public MySqlSchema(
+ MySqlSourceConfig sourceConfig,
+ boolean isTableIdCaseSensitive,
+ Map tableMap) {
this.connectorConfig = sourceConfig.getDbzConnectorConfig();
this.databaseSchema =
MySqlConnectionUtils.createMySqlDatabaseSchema(
connectorConfig, isTableIdCaseSensitive);
this.schemasByTableId = new HashMap<>();
+ this.tableMap = tableMap;
}
/**
@@ -81,7 +90,13 @@ private TableChange readTableSchema(JdbcConnection jdbc, TableId tableId) {
for (SchemaChangeEvent schemaChangeEvent : schemaChangeEvents) {
for (TableChange tableChange :
schemaChangeEvent.getTableChanges()) {
- tableChangeMap.put(tableId, tableChange);
+ Table table =
+ CatalogTableUtils.mergeCatalogTableConfig(
+ tableChange.getTable(), tableMap.get(tableId));
+ TableChange newTableChange =
+ new TableChange(
+ TableChanges.TableChangeType.CREATE, table);
+ tableChangeMap.put(tableId, newTableChange);
}
}
}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java
index c337ddc2a0b..e667412378c 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java
@@ -17,6 +17,9 @@
package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
@@ -24,6 +27,7 @@
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
import org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
+import org.apache.seatunnel.connectors.cdc.base.utils.CatalogTableUtils;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.config.SqlServerSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.config.SqlServerSourceConfigFactory;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source.eumerator.SqlServerChunkSplitter;
@@ -40,6 +44,8 @@
import java.sql.SQLException;
import java.util.List;
+import java.util.Map;
+import java.util.Optional;
import static org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerConnectionUtils.createSqlServerConnection;
@@ -50,9 +56,12 @@ public class SqlServerDialect implements JdbcDataSourceDialect {
private final SqlServerSourceConfig sourceConfig;
private transient SqlServerSchema sqlServerSchema;
+ private final Map tableMap;
- public SqlServerDialect(SqlServerSourceConfigFactory configFactory) {
+ public SqlServerDialect(
+ SqlServerSourceConfigFactory configFactory, List catalogTables) {
this.sourceConfig = configFactory.create(0);
+ this.tableMap = CatalogTableUtils.convertTables(catalogTables);
}
@Override
@@ -95,7 +104,7 @@ public List discoverDataCollections(JdbcSourceConfig sourceConfig) {
@Override
public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc, TableId tableId) {
if (sqlServerSchema == null) {
- sqlServerSchema = new SqlServerSchema(sourceConfig.getDbzConnectorConfig());
+ sqlServerSchema = new SqlServerSchema(sourceConfig.getDbzConnectorConfig(), tableMap);
}
return sqlServerSchema.getTableSchema(jdbc, tableId);
}
@@ -115,4 +124,14 @@ public FetchTask createFetchTask(SourceSplitBase sourceSplitBas
return new SqlServerTransactionLogFetchTask(sourceSplitBase.asIncrementalSplit());
}
}
+
+ @Override
+ public Optional getPrimaryKey(JdbcConnection jdbcConnection, TableId tableId) {
+ return Optional.ofNullable(tableMap.get(tableId).getTableSchema().getPrimaryKey());
+ }
+
+ @Override
+ public List getConstraintKeys(JdbcConnection jdbcConnection, TableId tableId) {
+ return tableMap.get(tableId).getTableSchema().getConstraintKeys();
+ }
}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSource.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSource.java
index 5a3a3cc9eb9..4ab64ff692f 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSource.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSource.java
@@ -114,7 +114,7 @@ public DebeziumDeserializationSchema createDebeziumDeserializationSchema(
@Override
public DataSourceDialect createDataSourceDialect(ReadonlyConfig config) {
- return new SqlServerDialect((SqlServerSourceConfigFactory) configFactory);
+ return new SqlServerDialect((SqlServerSourceConfigFactory) configFactory, catalogTables);
}
@Override
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
index 6338d85aa2f..95031e9b9ff 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
@@ -23,22 +23,26 @@
import org.apache.seatunnel.api.table.catalog.CatalogOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceTableConfig;
import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
import org.apache.seatunnel.connectors.cdc.base.option.StopMode;
+import org.apache.seatunnel.connectors.cdc.base.utils.CatalogTableUtils;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
import com.google.auto.service.AutoService;
import java.io.Serializable;
import java.util.List;
+import java.util.Optional;
@AutoService(Factory.class)
public class SqlServerIncrementalSourceFactory implements TableSourceFactory {
@@ -64,7 +68,8 @@ public OptionRule optionRule() {
JdbcSourceOptions.CONNECTION_POOL_SIZE,
JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND,
JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND,
- JdbcSourceOptions.SAMPLE_SHARDING_THRESHOLD)
+ JdbcSourceOptions.SAMPLE_SHARDING_THRESHOLD,
+ JdbcSourceOptions.TABLE_NAMES_CONFIG)
.optional(SqlServerSourceOptions.STARTUP_MODE, SqlServerSourceOptions.STOP_MODE)
.conditional(
SqlServerSourceOptions.STARTUP_MODE,
@@ -101,6 +106,15 @@ TableSource createSource(TableSourceFactoryContext context) {
List catalogTables =
CatalogTableUtil.getCatalogTables(
context.getOptions(), context.getClassLoader());
+ Optional> tableConfigs =
+ context.getOptions().getOptional(JdbcSourceOptions.TABLE_NAMES_CONFIG);
+ if (tableConfigs.isPresent()) {
+ catalogTables =
+ CatalogTableUtils.mergeCatalogTableConfig(
+ catalogTables,
+ tableConfigs.get(),
+ text -> TablePath.of(text, true));
+ }
SeaTunnelDataType dataType =
CatalogTableUtil.convertToMultipleRowType(catalogTables);
return new SqlServerIncrementalSource(context.getOptions(), dataType, catalogTables);
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerSchema.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerSchema.java
index 83d51ae31ba..79f58e3e2df 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerSchema.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerSchema.java
@@ -17,7 +17,9 @@
package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.common.utils.SeaTunnelException;
+import org.apache.seatunnel.connectors.cdc.base.utils.CatalogTableUtils;
import io.debezium.connector.sqlserver.SqlServerConnection;
import io.debezium.connector.sqlserver.SqlServerConnectorConfig;
@@ -38,10 +40,13 @@ public class SqlServerSchema {
private final SqlServerConnectorConfig connectorConfig;
private final Map schemasByTableId;
+ private final Map tableMap;
- public SqlServerSchema(SqlServerConnectorConfig connectorConfig) {
+ public SqlServerSchema(
+ SqlServerConnectorConfig connectorConfig, Map tableMap) {
this.schemasByTableId = new ConcurrentHashMap<>();
this.connectorConfig = connectorConfig;
+ this.tableMap = tableMap;
}
public TableChange getTableSchema(JdbcConnection jdbc, TableId tableId) {
@@ -67,7 +72,9 @@ private TableChange readTableSchema(JdbcConnection jdbc, TableId tableId) {
connectorConfig.getTableFilters().dataCollectionFilter(),
null,
false);
- Table table = tables.forTable(tableId);
+ Table table =
+ CatalogTableUtils.mergeCatalogTableConfig(
+ tables.forTable(tableId), tableMap.get(tableId));
TableChange tableChange = new TableChange(TableChanges.TableChangeType.CREATE, table);
tableChangeMap.put(tableId, tableChange);
} catch (SQLException e) {
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
index 366b3146b17..f2d5669c37f 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
@@ -96,6 +96,11 @@ public class MysqlCDCIT extends TestSuiteBase implements TestResource {
private static final String SOURCE_TABLE_2 = "mysql_cdc_e2e_source_table2";
private static final String SOURCE_TABLE_NO_PRIMARY_KEY =
"mysql_cdc_e2e_source_table_no_primary_key";
+
+ private static final String SOURCE_TABLE_1_CUSTOM_PRIMARY_KEY =
+ "mysql_cdc_e2e_source_table_1_custom_primary_key";
+ private static final String SOURCE_TABLE_2_CUSTOM_PRIMARY_KEY =
+ "mysql_cdc_e2e_source_table_2_custom_primary_key";
private static final String SINK_TABLE = "mysql_cdc_e2e_sink_table";
private static MySqlContainer createMySqlContainer(MySqlVersion version) {
@@ -414,6 +419,58 @@ public void testMultiTableWithRestore(TestContainer container)
log.info("****************** container logs end ******************");
}
+ @TestTemplate
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason = "Currently SPARK and FLINK do not support multi table")
+ public void testMysqlCdcMultiTableWithCustomPrimaryKey(TestContainer container) {
+ // Clear related content to ensure that multiple operations are not affected
+ clearTable(MYSQL_DATABASE2, SOURCE_TABLE_1_CUSTOM_PRIMARY_KEY);
+ clearTable(MYSQL_DATABASE2, SOURCE_TABLE_2_CUSTOM_PRIMARY_KEY);
+
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ container.executeJob("/mysqlcdc_to_mysql_with_custom_primary_key.conf");
+ } catch (Exception e) {
+ log.error("Commit task exception :" + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ return null;
+ });
+
+ // insert update delete
+ upsertDeleteSourceTable(MYSQL_DATABASE, SOURCE_TABLE_1_CUSTOM_PRIMARY_KEY);
+ upsertDeleteSourceTable(MYSQL_DATABASE, SOURCE_TABLE_2_CUSTOM_PRIMARY_KEY);
+
+ // stream stage
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertAll(
+ () ->
+ Assertions.assertIterableEquals(
+ query(
+ getSourceQuerySQL(
+ MYSQL_DATABASE,
+ SOURCE_TABLE_1_CUSTOM_PRIMARY_KEY)),
+ query(
+ getSourceQuerySQL(
+ MYSQL_DATABASE2,
+ SOURCE_TABLE_1_CUSTOM_PRIMARY_KEY))),
+ () ->
+ Assertions.assertIterableEquals(
+ query(
+ getSourceQuerySQL(
+ MYSQL_DATABASE,
+ SOURCE_TABLE_2_CUSTOM_PRIMARY_KEY)),
+ query(
+ getSourceQuerySQL(
+ MYSQL_DATABASE2,
+ SOURCE_TABLE_2_CUSTOM_PRIMARY_KEY)))));
+ }
+
private Connection getJdbcConnection() throws SQLException {
return DriverManager.getConnection(
MYSQL_CONTAINER.getJdbcUrl(),
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/mysql_cdc.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/mysql_cdc.sql
index 91ae73bd27f..b909f9aacd3 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/mysql_cdc.sql
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/mysql_cdc.sql
@@ -170,6 +170,102 @@ CREATE TABLE mysql_cdc_e2e_source_table_no_primary_key
DEFAULT CHARSET = utf8mb4
COLLATE = utf8mb4_0900_ai_ci;
+CREATE TABLE mysql_cdc_e2e_source_table_1_custom_primary_key
+(
+ `id` int NOT NULL,
+ `f_binary` binary(64) DEFAULT NULL,
+ `f_blob` blob,
+ `f_long_varbinary` mediumblob,
+ `f_longblob` longblob,
+ `f_tinyblob` tinyblob,
+ `f_varbinary` varbinary(100) DEFAULT NULL,
+ `f_smallint` smallint DEFAULT NULL,
+ `f_smallint_unsigned` smallint unsigned DEFAULT NULL,
+ `f_mediumint` mediumint DEFAULT NULL,
+ `f_mediumint_unsigned` mediumint unsigned DEFAULT NULL,
+ `f_int` int DEFAULT NULL,
+ `f_int_unsigned` int unsigned DEFAULT NULL,
+ `f_integer` int DEFAULT NULL,
+ `f_integer_unsigned` int unsigned DEFAULT NULL,
+ `f_bigint` bigint DEFAULT NULL,
+ `f_bigint_unsigned` bigint unsigned DEFAULT NULL,
+ `f_numeric` decimal(10, 0) DEFAULT NULL,
+ `f_decimal` decimal(10, 0) DEFAULT NULL,
+ `f_float` float DEFAULT NULL,
+ `f_double` double DEFAULT NULL,
+ `f_double_precision` double DEFAULT NULL,
+ `f_longtext` longtext,
+ `f_mediumtext` mediumtext,
+ `f_text` text,
+ `f_tinytext` tinytext,
+ `f_varchar` varchar(100) DEFAULT NULL,
+ `f_date` date DEFAULT NULL,
+ `f_datetime` datetime DEFAULT NULL,
+ `f_timestamp` timestamp NULL DEFAULT NULL,
+ `f_bit1` bit(1) DEFAULT NULL,
+ `f_bit64` bit(64) DEFAULT NULL,
+ `f_char` char(1) DEFAULT NULL,
+ `f_enum` enum ('enum1','enum2','enum3') DEFAULT NULL,
+ `f_mediumblob` mediumblob,
+ `f_long_varchar` mediumtext,
+ `f_real` double DEFAULT NULL,
+ `f_time` time DEFAULT NULL,
+ `f_tinyint` tinyint DEFAULT NULL,
+ `f_tinyint_unsigned` tinyint unsigned DEFAULT NULL,
+ `f_json` json DEFAULT NULL,
+ `f_year` year DEFAULT NULL
+) ENGINE = InnoDB
+ DEFAULT CHARSET = utf8mb4
+ COLLATE = utf8mb4_0900_ai_ci;
+
+CREATE TABLE mysql_cdc_e2e_source_table_2_custom_primary_key
+(
+ `id` int NOT NULL,
+ `f_binary` binary(64) DEFAULT NULL,
+ `f_blob` blob,
+ `f_long_varbinary` mediumblob,
+ `f_longblob` longblob,
+ `f_tinyblob` tinyblob,
+ `f_varbinary` varbinary(100) DEFAULT NULL,
+ `f_smallint` smallint DEFAULT NULL,
+ `f_smallint_unsigned` smallint unsigned DEFAULT NULL,
+ `f_mediumint` mediumint DEFAULT NULL,
+ `f_mediumint_unsigned` mediumint unsigned DEFAULT NULL,
+ `f_int` int DEFAULT NULL,
+ `f_int_unsigned` int unsigned DEFAULT NULL,
+ `f_integer` int DEFAULT NULL,
+ `f_integer_unsigned` int unsigned DEFAULT NULL,
+ `f_bigint` bigint DEFAULT NULL,
+ `f_bigint_unsigned` bigint unsigned DEFAULT NULL,
+ `f_numeric` decimal(10, 0) DEFAULT NULL,
+ `f_decimal` decimal(10, 0) DEFAULT NULL,
+ `f_float` float DEFAULT NULL,
+ `f_double` double DEFAULT NULL,
+ `f_double_precision` double DEFAULT NULL,
+ `f_longtext` longtext,
+ `f_mediumtext` mediumtext,
+ `f_text` text,
+ `f_tinytext` tinytext,
+ `f_varchar` varchar(100) DEFAULT NULL,
+ `f_date` date DEFAULT NULL,
+ `f_datetime` datetime DEFAULT NULL,
+ `f_timestamp` timestamp NULL DEFAULT NULL,
+ `f_bit1` bit(1) DEFAULT NULL,
+ `f_bit64` bit(64) DEFAULT NULL,
+ `f_char` char(1) DEFAULT NULL,
+ `f_enum` enum ('enum1','enum2','enum3') DEFAULT NULL,
+ `f_mediumblob` mediumblob,
+ `f_long_varchar` mediumtext,
+ `f_real` double DEFAULT NULL,
+ `f_time` time DEFAULT NULL,
+ `f_tinyint` tinyint DEFAULT NULL,
+ `f_tinyint_unsigned` tinyint unsigned DEFAULT NULL,
+ `f_json` json DEFAULT NULL,
+ `f_year` year DEFAULT NULL
+) ENGINE = InnoDB
+ DEFAULT CHARSET = utf8mb4
+ COLLATE = utf8mb4_0900_ai_ci;
+
CREATE TABLE mysql_cdc_e2e_sink_table
(
`id` int NOT NULL AUTO_INCREMENT,
@@ -223,6 +319,8 @@ CREATE TABLE mysql_cdc_e2e_sink_table
truncate table mysql_cdc_e2e_source_table;
truncate table mysql_cdc_e2e_source_table2;
truncate table mysql_cdc_e2e_source_table_no_primary_key;
+truncate table mysql_cdc_e2e_source_table_1_custom_primary_key;
+truncate table mysql_cdc_e2e_source_table_2_custom_primary_key;
truncate table mysql_cdc_e2e_sink_table;
INSERT INTO mysql_cdc_e2e_source_table ( id, f_binary, f_blob, f_long_varbinary, f_longblob, f_tinyblob, f_varbinary, f_smallint,
@@ -318,6 +416,68 @@ VALUES ( 1, 0x616263740000000000000000000000000000000000000000000000000000000000
0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 'This is a long varchar field', 112.345,
'14:30:00', -128, 22, '{ "key": "value" }', 2021 );
+INSERT INTO mysql_cdc_e2e_source_table_1_custom_primary_key ( id, f_binary, f_blob, f_long_varbinary, f_longblob, f_tinyblob, f_varbinary, f_smallint,
+ f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, f_integer,
+ f_integer_unsigned, f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double,
+ f_double_precision, f_longtext, f_mediumtext, f_text, f_tinytext, f_varchar, f_date, f_datetime,
+ f_timestamp, f_bit1, f_bit64, f_char, f_enum, f_mediumblob, f_long_varchar, f_real, f_time,
+ f_tinyint, f_tinyint_unsigned, f_json, f_year )
+VALUES ( 1, 0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,
+ 0x68656C6C6F, 0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL,
+ 0x74696E79626C6F62, 0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321, 1234567, 7654321,
+ 123456789, 987654321, 123, 789, 12.34, 56.78, 90.12, 'This is a long text field', 'This is a medium text field',
+ 'This is a text field', 'This is a tiny text field', 'This is a varchar field', '2022-04-27', '2022-04-27 14:30:00',
+ '2023-04-27 11:08:40', 1, b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 'enum2',
+ 0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 'This is a long varchar field',
+ 12.345, '14:30:00', -128, 255, '{ "key": "value" }', 2022 ),
+ ( 2, 0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,
+ 0x68656C6C6F, 0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL, 0x74696E79626C6F62,
+ 0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321, 1234567, 7654321, 123456789, 987654321,
+ 123, 789, 12.34, 56.78, 90.12, 'This is a long text field', 'This is a medium text field', 'This is a text field',
+ 'This is a tiny text field', 'This is a varchar field', '2022-04-27', '2022-04-27 14:30:00', '2023-04-27 11:08:40',
+ 1, b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 'enum2',
+ 0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 'This is a long varchar field',
+ 112.345, '14:30:00', -128, 22, '{ "key": "value" }', 2013 ),
+ ( 3, 0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,
+ 0x68656C6C6F, 0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL, 0x74696E79626C6F62,
+ 0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321, 1234567, 7654321, 123456789, 987654321, 123,
+ 789, 12.34, 56.78, 90.12, 'This is a long text field', 'This is a medium text field', 'This is a text field',
+ 'This is a tiny text field', 'This is a varchar field', '2022-04-27', '2022-04-27 14:30:00', '2023-04-27 11:08:40',
+ 1, b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 'enum2',
+ 0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 'This is a long varchar field', 112.345,
+ '14:30:00', -128, 22, '{ "key": "value" }', 2021 );
+
+INSERT INTO mysql_cdc_e2e_source_table_2_custom_primary_key ( id, f_binary, f_blob, f_long_varbinary, f_longblob, f_tinyblob, f_varbinary, f_smallint,
+ f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, f_integer,
+ f_integer_unsigned, f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double,
+ f_double_precision, f_longtext, f_mediumtext, f_text, f_tinytext, f_varchar, f_date, f_datetime,
+ f_timestamp, f_bit1, f_bit64, f_char, f_enum, f_mediumblob, f_long_varchar, f_real, f_time,
+ f_tinyint, f_tinyint_unsigned, f_json, f_year )
+VALUES ( 1, 0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,
+ 0x68656C6C6F, 0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL,
+ 0x74696E79626C6F62, 0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321, 1234567, 7654321,
+ 123456789, 987654321, 123, 789, 12.34, 56.78, 90.12, 'This is a long text field', 'This is a medium text field',
+ 'This is a text field', 'This is a tiny text field', 'This is a varchar field', '2022-04-27', '2022-04-27 14:30:00',
+ '2023-04-27 11:08:40', 1, b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 'enum2',
+ 0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 'This is a long varchar field',
+ 12.345, '14:30:00', -128, 255, '{ "key": "value" }', 2022 ),
+ ( 2, 0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,
+ 0x68656C6C6F, 0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL, 0x74696E79626C6F62,
+ 0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321, 1234567, 7654321, 123456789, 987654321,
+ 123, 789, 12.34, 56.78, 90.12, 'This is a long text field', 'This is a medium text field', 'This is a text field',
+ 'This is a tiny text field', 'This is a varchar field', '2022-04-27', '2022-04-27 14:30:00', '2023-04-27 11:08:40',
+ 1, b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 'enum2',
+ 0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 'This is a long varchar field',
+ 112.345, '14:30:00', -128, 22, '{ "key": "value" }', 2013 ),
+ ( 3, 0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,
+ 0x68656C6C6F, 0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL, 0x74696E79626C6F62,
+ 0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321, 1234567, 7654321, 123456789, 987654321, 123,
+ 789, 12.34, 56.78, 90.12, 'This is a long text field', 'This is a medium text field', 'This is a text field',
+ 'This is a tiny text field', 'This is a varchar field', '2022-04-27', '2022-04-27 14:30:00', '2023-04-27 11:08:40',
+ 1, b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 'enum2',
+ 0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 'This is a long varchar field', 112.345,
+ '14:30:00', -128, 22, '{ "key": "value" }', 2021 );
+
CREATE DATABASE IF NOT EXISTS `mysql_cdc2`;
use mysql_cdc2;
@@ -421,3 +581,101 @@ CREATE TABLE mysql_cdc_e2e_source_table2
AUTO_INCREMENT = 2
DEFAULT CHARSET = utf8mb4
COLLATE = utf8mb4_0900_ai_ci;
+
+CREATE TABLE mysql_cdc_e2e_source_table_1_custom_primary_key
+(
+ `id` int NOT NULL,
+ `f_binary` binary(64) DEFAULT NULL,
+ `f_blob` blob,
+ `f_long_varbinary` mediumblob,
+ `f_longblob` longblob,
+ `f_tinyblob` tinyblob,
+ `f_varbinary` varbinary(100) DEFAULT NULL,
+ `f_smallint` smallint DEFAULT NULL,
+ `f_smallint_unsigned` smallint unsigned DEFAULT NULL,
+ `f_mediumint` mediumint DEFAULT NULL,
+ `f_mediumint_unsigned` mediumint unsigned DEFAULT NULL,
+ `f_int` int DEFAULT NULL,
+ `f_int_unsigned` int unsigned DEFAULT NULL,
+ `f_integer` int DEFAULT NULL,
+ `f_integer_unsigned` int unsigned DEFAULT NULL,
+ `f_bigint` bigint DEFAULT NULL,
+ `f_bigint_unsigned` bigint unsigned DEFAULT NULL,
+ `f_numeric` decimal(10, 0) DEFAULT NULL,
+ `f_decimal` decimal(10, 0) DEFAULT NULL,
+ `f_float` float DEFAULT NULL,
+ `f_double` double DEFAULT NULL,
+ `f_double_precision` double DEFAULT NULL,
+ `f_longtext` longtext,
+ `f_mediumtext` mediumtext,
+ `f_text` text,
+ `f_tinytext` tinytext,
+ `f_varchar` varchar(100) DEFAULT NULL,
+ `f_date` date DEFAULT NULL,
+ `f_datetime` datetime DEFAULT NULL,
+ `f_timestamp` timestamp NULL DEFAULT NULL,
+ `f_bit1` bit(1) DEFAULT NULL,
+ `f_bit64` bit(64) DEFAULT NULL,
+ `f_char` char(1) DEFAULT NULL,
+ `f_enum` enum ('enum1','enum2','enum3') DEFAULT NULL,
+ `f_mediumblob` mediumblob,
+ `f_long_varchar` mediumtext,
+ `f_real` double DEFAULT NULL,
+ `f_time` time DEFAULT NULL,
+ `f_tinyint` tinyint DEFAULT NULL,
+ `f_tinyint_unsigned` tinyint unsigned DEFAULT NULL,
+ `f_json` json DEFAULT NULL,
+ `f_year` year DEFAULT NULL,
+ PRIMARY KEY (`id`)
+) ENGINE = InnoDB
+ DEFAULT CHARSET = utf8mb4
+ COLLATE = utf8mb4_0900_ai_ci;
+
+CREATE TABLE mysql_cdc_e2e_source_table_2_custom_primary_key
+(
+ `id` int NOT NULL,
+ `f_binary` binary(64) DEFAULT NULL,
+ `f_blob` blob,
+ `f_long_varbinary` mediumblob,
+ `f_longblob` longblob,
+ `f_tinyblob` tinyblob,
+ `f_varbinary` varbinary(100) DEFAULT NULL,
+ `f_smallint` smallint DEFAULT NULL,
+ `f_smallint_unsigned` smallint unsigned DEFAULT NULL,
+ `f_mediumint` mediumint DEFAULT NULL,
+ `f_mediumint_unsigned` mediumint unsigned DEFAULT NULL,
+ `f_int` int DEFAULT NULL,
+ `f_int_unsigned` int unsigned DEFAULT NULL,
+ `f_integer` int DEFAULT NULL,
+ `f_integer_unsigned` int unsigned DEFAULT NULL,
+ `f_bigint` bigint DEFAULT NULL,
+ `f_bigint_unsigned` bigint unsigned DEFAULT NULL,
+ `f_numeric` decimal(10, 0) DEFAULT NULL,
+ `f_decimal` decimal(10, 0) DEFAULT NULL,
+ `f_float` float DEFAULT NULL,
+ `f_double` double DEFAULT NULL,
+ `f_double_precision` double DEFAULT NULL,
+ `f_longtext` longtext,
+ `f_mediumtext` mediumtext,
+ `f_text` text,
+ `f_tinytext` tinytext,
+ `f_varchar` varchar(100) DEFAULT NULL,
+ `f_date` date DEFAULT NULL,
+ `f_datetime` datetime DEFAULT NULL,
+ `f_timestamp` timestamp NULL DEFAULT NULL,
+ `f_bit1` bit(1) DEFAULT NULL,
+ `f_bit64` bit(64) DEFAULT NULL,
+ `f_char` char(1) DEFAULT NULL,
+ `f_enum` enum ('enum1','enum2','enum3') DEFAULT NULL,
+ `f_mediumblob` mediumblob,
+ `f_long_varchar` mediumtext,
+ `f_real` double DEFAULT NULL,
+ `f_time` time DEFAULT NULL,
+ `f_tinyint` tinyint DEFAULT NULL,
+ `f_tinyint_unsigned` tinyint unsigned DEFAULT NULL,
+ `f_json` json DEFAULT NULL,
+ `f_year` year DEFAULT NULL,
+ PRIMARY KEY (`id`)
+) ENGINE = InnoDB
+ DEFAULT CHARSET = utf8mb4
+ COLLATE = utf8mb4_0900_ai_ci;
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_custom_primary_key.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_custom_primary_key.conf
new file mode 100644
index 00000000000..ba3e94855fd
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_custom_primary_key.conf
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ # You can set engine configuration here
+ parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+}
+
+source {
+ MySQL-CDC {
+ result_table_name = "customers_mysql_cdc"
+ server-id = 5652
+ base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
+ username = "mysqluser"
+ password = "mysqlpw"
+ exactly_once = true
+ table-names = ["mysql_cdc.mysql_cdc_e2e_source_table_1_custom_primary_key", "mysql_cdc.mysql_cdc_e2e_source_table_2_custom_primary_key"]
+ table-names-config = [
+ {
+ table = "mysql_cdc.mysql_cdc_e2e_source_table_1_custom_primary_key"
+ primaryKeys = ["id"]
+ },
+ {
+ table = "mysql_cdc.mysql_cdc_e2e_source_table_2_custom_primary_key"
+ primaryKeys = ["id"]
+ }
+ ]
+ }
+}
+
+sink {
+ jdbc {
+ source_table_name = "customers_mysql_cdc"
+ url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc2"
+ driver = "com.mysql.cj.jdbc.Driver"
+ user = "mysqluser"
+ password = "mysqlpw"
+ database = "mysql_cdc2"
+ generate_sink_sql = true
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
index 597838096bc..1216c696454 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
@@ -83,6 +83,8 @@ public class SqlServerCDCIT extends TestSuiteBase implements TestResource {
private static final String SOURCE_TABLE = "column_type_test.dbo.full_types";
private static final String SOURCE_TABLE_NO_PRIMARY_KEY =
"column_type_test.dbo.full_types_no_primary_key";
+ private static final String SOURCE_TABLE_CUSTOM_PRIMARY_KEY =
+ "column_type_test.dbo.full_types_custom_primary_key";
private static final String SINK_TABLE = "column_type_test.dbo.full_types_sink";
private static final String SELECT_SOURCE_SQL =
"select\n"
@@ -265,6 +267,44 @@ public void testCDCWithNoPrimaryKey(TestContainer container) {
});
}
+ @TestTemplate
+ public void testCDCWithCustomPrimaryKey(TestContainer container) {
+ initializeSqlServerTable("column_type_test");
+
+ CompletableFuture executeJobFuture =
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ container.executeJob(
+ "/sqlservercdc_to_sqlserver_with_custom_primary_key.conf");
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return null;
+ });
+
+ // snapshot stage
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Assertions.assertIterableEquals(
+ querySql(SELECT_SOURCE_SQL, SOURCE_TABLE_CUSTOM_PRIMARY_KEY),
+ querySql(SELECT_SINK_SQL, SINK_TABLE));
+ });
+
+ // insert update delete
+ updateSourceTable(SOURCE_TABLE_CUSTOM_PRIMARY_KEY);
+
+ // stream stage
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Assertions.assertIterableEquals(
+ querySql(SELECT_SOURCE_SQL, SOURCE_TABLE_CUSTOM_PRIMARY_KEY),
+ querySql(SELECT_SINK_SQL, SINK_TABLE));
+ });
+ }
+
/**
* Executes a JDBC statement using the default jdbc config without autocommitting the
* connection.
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/ddl/column_type_test.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/ddl/column_type_test.sql
index d227c346152..0c6aebe4fd8 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/ddl/column_type_test.sql
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/ddl/column_type_test.sql
@@ -120,6 +120,54 @@ INSERT INTO full_types_no_primary_key VALUES (2,
'b',SYSDATETIMEOFFSET(),CAST('test_varbinary' AS varbinary(100)));
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'full_types_no_primary_key', @role_name = NULL, @supports_net_changes = 0;
+CREATE TABLE full_types_custom_primary_key (
+ id int NOT NULL,
+ val_char char(3),
+ val_varchar varchar(1000),
+ val_text text,
+ val_nchar nchar(3),
+ val_nvarchar nvarchar(1000),
+ val_ntext ntext,
+ val_decimal decimal(6,3),
+ val_numeric numeric,
+ val_float float,
+ val_real real,
+ val_smallmoney smallmoney,
+ val_money money,
+ val_bit bit,
+ val_tinyint tinyint,
+ val_smallint smallint,
+ val_int int,
+ val_bigint bigint,
+ val_date date,
+ val_time time,
+ val_datetime2 datetime2,
+ val_datetime datetime,
+ val_smalldatetime smalldatetime,
+ val_xml xml,
+ val_datetimeoffset DATETIMEOFFSET(4),
+ val_varbinary varbinary(100)
+);
+INSERT INTO full_types_custom_primary_key VALUES (0,
+ 'cč0', 'vcč', 'tč', N'cč', N'vcč', N'tč',
+ 1.123, 2, 3.323, 4.323, 5.323, 6.323,
+ 1, 22, 333, 4444, 55555,
+ '2018-07-13', '10:23:45', '2018-07-13 11:23:45.34', '2018-07-13 13:23:45.78', '2018-07-13 14:23:45',
+ 'b',SYSDATETIMEOFFSET(),CAST('test_varbinary' AS varbinary(100)));
+INSERT INTO full_types_custom_primary_key VALUES (1,
+ 'cč1', 'vcč', 'tč', N'cč', N'vcč', N'tč',
+ 1.123, 2, 3.323, 4.323, 5.323, 6.323,
+ 1, 22, 333, 4444, 55555,
+ '2018-07-13', '10:23:45', '2018-07-13 11:23:45.34', '2018-07-13 13:23:45.78', '2018-07-13 14:23:45',
+ 'b',SYSDATETIMEOFFSET(),CAST('test_varbinary' AS varbinary(100)));
+INSERT INTO full_types_custom_primary_key VALUES (2,
+ 'cč2', 'vcč', 'tč', N'cč', N'vcč', N'tč',
+ 1.123, 2, 3.323, 4.323, 5.323, 6.323,
+ 1, 22, 333, 4444, 55555,
+ '2018-07-13', '10:23:45', '2018-07-13 11:23:45.34', '2018-07-13 13:23:45.78', '2018-07-13 14:23:45',
+ 'b',SYSDATETIMEOFFSET(),CAST('test_varbinary' AS varbinary(100)));
+EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'full_types_custom_primary_key', @role_name = NULL, @supports_net_changes = 0;
+
CREATE TABLE full_types_sink (
id int NOT NULL,
val_char char(3),
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/sqlservercdc_to_sqlserver_with_custom_primary_key.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/sqlservercdc_to_sqlserver_with_custom_primary_key.conf
new file mode 100644
index 00000000000..4e9f4e5e734
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/sqlservercdc_to_sqlserver_with_custom_primary_key.conf
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ # You can set engine configuration here
+ parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the feature source plugin**
+ SqlServer-CDC {
+ result_table_name = "customers"
+ username = "sa"
+ password = "Password!"
+ database-names = ["column_type_test"]
+ table-names = ["column_type_test.dbo.full_types_custom_primary_key"]
+ base-url = "jdbc:sqlserver://sqlserver-host:1433;databaseName=column_type_test"
+
+ exactly_once = false
+ }
+}
+
+transform {
+}
+
+sink {
+ Jdbc {
+ source_table_name = "customers"
+ driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
+ url = "jdbc:sqlserver://sqlserver-host:1433;encrypt=false"
+ user = "sa"
+ password = "Password!"
+ generate_sink_sql = true
+ database = "column_type_test"
+ table = "dbo.full_types_sink"
+ batch_size = 1
+ primary_keys = ["id"]
+ }
+}
\ No newline at end of file