diff --git a/docs/en/connector-v2/source/MySQL-CDC.md b/docs/en/connector-v2/source/MySQL-CDC.md index 42803e7e365..bc562213a2d 100644 --- a/docs/en/connector-v2/source/MySQL-CDC.md +++ b/docs/en/connector-v2/source/MySQL-CDC.md @@ -153,6 +153,7 @@ When an initial consistent snapshot is made for large databases, your establishe | password | String | Yes | - | Password to use when connecting to the database server. | | database-names | List | No | - | Database name of the database to monitor. | | table-names | List | Yes | - | Table name of the database to monitor. The table name needs to include the database name, for example: `database_name.table_name` | +| table-names-config | List | No | - | Table config list. for example: [{"table": "db1.schema1.table1","primaryKeys":["key1"]}] | | startup.mode | Enum | No | INITIAL | Optional startup mode for MySQL CDC consumer, valid enumerations are `initial`, `earliest`, `latest` and `specific`.
`initial`: Synchronize historical data at startup, and then synchronize incremental data.
`earliest`: Startup from the earliest offset possible.
`latest`: Startup from the latest offset.
`specific`: Startup from user-supplied specific offsets. | | startup.specific-offset.file | String | No | - | Start from the specified binlog file name. **Note, This option is required when the `startup.mode` option used `specific`.** | | startup.specific-offset.pos | Long | No | - | Start from the specified binlog file position. **Note, This option is required when the `startup.mode` option used `specific`.** | @@ -190,9 +191,6 @@ env { source { MySQL-CDC { - catalog = { - factory = MySQL - } base-url = "jdbc:mysql://localhost:3306/testdb" username = "root" password = "root@123" @@ -212,6 +210,37 @@ sink { > Must be used with kafka connector sink, see [compatible debezium format](../formats/cdc-compatible-debezium-json.md) for details +### Support custom primary key for table + +``` +env { + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 10000 +} + +source { + MySQL-CDC { + base-url = "jdbc:mysql://localhost:3306/testdb" + username = "root" + password = "root@123" + + table-names = ["testdb.table1", "testdb.table2"] + table-names-config = [ + { + table = "testdb.table2" + primaryKeys = ["id"] + } + ] + } +} + +sink { + Console { + } +} +``` + ## Changelog - Add MySQL CDC Source Connector diff --git a/docs/en/connector-v2/source/SqlServer-CDC.md b/docs/en/connector-v2/source/SqlServer-CDC.md index 1b32c824a47..62b788ac155 100644 --- a/docs/en/connector-v2/source/SqlServer-CDC.md +++ b/docs/en/connector-v2/source/SqlServer-CDC.md @@ -60,6 +60,7 @@ Please download and put SqlServer driver in `${SEATUNNEL_HOME}/lib/` dir. For ex | password | String | Yes | - | Password to use when connecting to the database server. | | database-names | List | Yes | - | Database name of the database to monitor. | | table-names | List | Yes | - | Table name is a combination of schema name and table name (databaseName.schemaName.tableName). | +| table-names-config | List | No | - | Table config list. for example: [{"table": "db1.schema1.table1","primaryKeys":["key1"]}] | | base-url | String | Yes | - | URL has to be with database, like "jdbc:sqlserver://localhost:1433;databaseName=test". | | startup.mode | Enum | No | INITIAL | Optional startup mode for SqlServer CDC consumer, valid enumerations are "initial", "earliest", "latest" and "specific". | | startup.timestamp | Long | No | - | Start from the specified epoch timestamp (in milliseconds).
**Note, This option is required when** the **"startup.mode" option used `'timestamp'`.** | @@ -186,3 +187,34 @@ sink { } ``` +### Support custom primary key for table + +``` +env { + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 +} + +source { + SqlServer-CDC { + base-url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test" + username = "sa" + password = "Y.sa123456" + database-names = ["column_type_test"] + + table-names = ["column_type_test.dbo.simple_types", "column_type_test.dbo.full_types"] + table-names-config = [ + { + table = "column_type_test.dbo.full_types" + primaryKeys = ["id"] + } + ] + } +} + +sink { + console { + } +``` + diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceTableConfig.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceTableConfig.java new file mode 100644 index 00000000000..5cafa363e8d --- /dev/null +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceTableConfig.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.cdc.base.config; + +import lombok.Data; + +import java.io.Serializable; +import java.util.List; + +@Data +public class JdbcSourceTableConfig implements Serializable { + private String table; + private List primaryKeys; +} diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java index 9542a8e9701..6cd7ba0631e 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.Options; +import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceTableConfig; import org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource; import java.time.ZoneId; @@ -141,4 +142,17 @@ public class JdbcSourceOptions extends SourceOptions { + "The value represents the denominator of the sampling rate fraction. " + "For example, a value of 1000 means a sampling rate of 1/1000. " + "This parameter is used when the sample sharding strategy is triggered."); + + public static final Option> TABLE_NAMES_CONFIG = + Options.key("table-names-config") + .listType(JdbcSourceTableConfig.class) + .noDefaultValue() + .withDescription( + "Config table configs. Example: " + + "[" + + " {" + + " \"table\": \"db1.schema1.table1\"," + + " \"primaryKeys\": [\"key1\",\"key2\"]" + + " }" + + "]"); } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/CatalogTableUtils.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/CatalogTableUtils.java new file mode 100644 index 00000000000..78f01370f2c --- /dev/null +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/CatalogTableUtils.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.cdc.base.utils; + +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.catalog.PrimaryKey; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceTableConfig; + +import io.debezium.relational.Table; +import io.debezium.relational.TableId; +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +@Slf4j +public class CatalogTableUtils { + + public static List mergeCatalogTableConfig( + List tables, + List tableConfigs, + Function parser) { + Map catalogTableMap = + tables.stream() + .collect(Collectors.toMap(t -> t.getTableId().toTablePath(), t -> t)); + for (JdbcSourceTableConfig catalogTableConfig : tableConfigs) { + TablePath tablePath = parser.apply(catalogTableConfig.getTable()); + CatalogTable catalogTable = catalogTableMap.get(tablePath); + if (catalogTable != null) { + catalogTable = mergeCatalogTableConfig(catalogTable, catalogTableConfig); + catalogTableMap.put(tablePath, catalogTable); + log.info( + "Override primary key({}) for catalog table {}", + catalogTableConfig.getPrimaryKeys(), + catalogTableConfig.getTable()); + } else { + log.warn( + "Table {} is not found in catalog tables, skip to merge config", + catalogTableConfig.getTable()); + } + } + return new ArrayList<>(catalogTableMap.values()); + } + + public static CatalogTable mergeCatalogTableConfig( + final CatalogTable table, JdbcSourceTableConfig config) { + List columnNames = + table.getTableSchema().getColumns().stream() + .map(c -> c.getName()) + .collect(Collectors.toList()); + for (String pk : config.getPrimaryKeys()) { + if (!columnNames.contains(pk)) { + throw new IllegalArgumentException( + String.format( + "Primary key(%s) is not in table(%s) columns(%s)", + pk, table.getTablePath(), columnNames)); + } + } + PrimaryKey primaryKeys = + PrimaryKey.of( + "pk" + (config.getPrimaryKeys().hashCode() & Integer.MAX_VALUE), + config.getPrimaryKeys()); + List columns = + table.getTableSchema().getColumns().stream() + .map( + column -> { + if (config.getPrimaryKeys().contains(column.getName()) + && column.isNullable()) { + log.warn( + "Primary key({}) is nullable for catalog table {}", + column.getName(), + table.getTablePath()); + return PhysicalColumn.of( + column.getName(), + column.getDataType(), + column.getColumnLength(), + false, + column.getDefaultValue(), + column.getComment()); + } + return column; + }) + .collect(Collectors.toList()); + + return CatalogTable.of( + table.getTableId(), + TableSchema.builder() + .primaryKey(primaryKeys) + .columns(columns) + .constraintKey(table.getTableSchema().getConstraintKeys()) + .build(), + table.getOptions(), + table.getPartitionKeys(), + table.getComment()); + } + + public static Table mergeCatalogTableConfig(Table debeziumTable, CatalogTable catalogTable) { + PrimaryKey pk = catalogTable.getTableSchema().getPrimaryKey(); + if (pk != null) { + debeziumTable = debeziumTable.edit().setPrimaryKeyNames(pk.getColumnNames()).create(); + log.info( + "Override primary key({}) for catalog table {}", + pk.getColumnNames(), + debeziumTable.id()); + } + return debeziumTable; + } + + public static Map convertTables(List catalogTables) { + Map tableMap = + catalogTables.stream() + .collect( + Collectors.toMap( + e -> + new TableId( + e.getTableId().getDatabaseName(), + e.getTableId().getSchemaName(), + e.getTableId().getTableName()), + e -> e)); + return Collections.unmodifiableMap(tableMap); + } +} diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java index 30e7ddf0f33..c43b819f066 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java @@ -17,6 +17,9 @@ package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.ConstraintKey; +import org.apache.seatunnel.api.table.catalog.PrimaryKey; import org.apache.seatunnel.common.utils.SeaTunnelException; import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig; import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect; @@ -24,6 +27,7 @@ import org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter; import org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask; import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase; +import org.apache.seatunnel.connectors.cdc.base.utils.CatalogTableUtils; import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceConfig; import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceConfigFactory; import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.eumerator.MySqlChunkSplitter; @@ -40,6 +44,8 @@ import java.sql.SQLException; import java.util.List; +import java.util.Map; +import java.util.Optional; import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.isTableIdCaseSensitive; @@ -49,9 +55,11 @@ public class MySqlDialect implements JdbcDataSourceDialect { private static final long serialVersionUID = 1L; private final MySqlSourceConfig sourceConfig; private transient MySqlSchema mySqlSchema; + private final Map tableMap; - public MySqlDialect(MySqlSourceConfigFactory configFactory) { + public MySqlDialect(MySqlSourceConfigFactory configFactory, List catalogTables) { this.sourceConfig = configFactory.create(0); + this.tableMap = CatalogTableUtils.convertTables(catalogTables); } @Override @@ -93,7 +101,8 @@ public List discoverDataCollections(JdbcSourceConfig sourceConfig) { public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc, TableId tableId) { if (mySqlSchema == null) { mySqlSchema = - new MySqlSchema(sourceConfig, isDataCollectionIdCaseSensitive(sourceConfig)); + new MySqlSchema( + sourceConfig, isDataCollectionIdCaseSensitive(sourceConfig), tableMap); } return mySqlSchema.getTableSchema(jdbc, tableId); } @@ -112,4 +121,14 @@ public FetchTask createFetchTask(SourceSplitBase sourceSplitBas return new MySqlBinlogFetchTask(sourceSplitBase.asIncrementalSplit()); } } + + @Override + public Optional getPrimaryKey(JdbcConnection jdbcConnection, TableId tableId) { + return Optional.ofNullable(tableMap.get(tableId).getTableSchema().getPrimaryKey()); + } + + @Override + public List getConstraintKeys(JdbcConnection jdbcConnection, TableId tableId) { + return tableMap.get(tableId).getTableSchema().getConstraintKeys(); + } } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java index f221256e1d2..67ff9ff6079 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java @@ -114,7 +114,7 @@ public DebeziumDeserializationSchema createDebeziumDeserializationSchema( @Override public DataSourceDialect createDataSourceDialect(ReadonlyConfig config) { - return new MySqlDialect((MySqlSourceConfigFactory) configFactory); + return new MySqlDialect((MySqlSourceConfigFactory) configFactory, catalogTables); } @Override diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java index 1ec94c3cfc2..defe0a6ab98 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java @@ -23,22 +23,26 @@ import org.apache.seatunnel.api.table.catalog.CatalogOptions; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.connector.TableSource; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableSourceFactory; import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceTableConfig; import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions; import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions; import org.apache.seatunnel.connectors.cdc.base.option.StartupMode; import org.apache.seatunnel.connectors.cdc.base.option.StopMode; +import org.apache.seatunnel.connectors.cdc.base.utils.CatalogTableUtils; import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions; import com.google.auto.service.AutoService; import java.io.Serializable; import java.util.List; +import java.util.Optional; @AutoService(Factory.class) public class MySqlIncrementalSourceFactory implements TableSourceFactory { @@ -65,7 +69,8 @@ public OptionRule optionRule() { JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND, JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND, JdbcSourceOptions.SAMPLE_SHARDING_THRESHOLD, - JdbcSourceOptions.INVERSE_SAMPLING_RATE) + JdbcSourceOptions.INVERSE_SAMPLING_RATE, + JdbcSourceOptions.TABLE_NAMES_CONFIG) .optional(MySqlSourceOptions.STARTUP_MODE, MySqlSourceOptions.STOP_MODE) .conditional( MySqlSourceOptions.STARTUP_MODE, @@ -96,6 +101,15 @@ TableSource createSource(TableSourceFactoryContext context) { List catalogTables = CatalogTableUtil.getCatalogTables( context.getOptions(), context.getClassLoader()); + Optional> tableConfigs = + context.getOptions().getOptional(JdbcSourceOptions.TABLE_NAMES_CONFIG); + if (tableConfigs.isPresent()) { + catalogTables = + CatalogTableUtils.mergeCatalogTableConfig( + catalogTables, + tableConfigs.get(), + text -> TablePath.of(text, false)); + } SeaTunnelDataType dataType = CatalogTableUtil.convertToMultipleRowType(catalogTables); return (SeaTunnelSource) diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlSchema.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlSchema.java index e9213429988..324f91fc6ea 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlSchema.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlSchema.java @@ -17,13 +17,17 @@ package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.connectors.cdc.base.utils.CatalogTableUtils; import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceConfig; import io.debezium.connector.mysql.MySqlConnectorConfig; import io.debezium.connector.mysql.MySqlDatabaseSchema; import io.debezium.connector.mysql.MySqlOffsetContext; import io.debezium.jdbc.JdbcConnection; +import io.debezium.relational.Table; import io.debezium.relational.TableId; +import io.debezium.relational.history.TableChanges; import io.debezium.relational.history.TableChanges.TableChange; import io.debezium.schema.SchemaChangeEvent; @@ -41,13 +45,18 @@ public class MySqlSchema { private final MySqlConnectorConfig connectorConfig; private final MySqlDatabaseSchema databaseSchema; private final Map schemasByTableId; + private final Map tableMap; - public MySqlSchema(MySqlSourceConfig sourceConfig, boolean isTableIdCaseSensitive) { + public MySqlSchema( + MySqlSourceConfig sourceConfig, + boolean isTableIdCaseSensitive, + Map tableMap) { this.connectorConfig = sourceConfig.getDbzConnectorConfig(); this.databaseSchema = MySqlConnectionUtils.createMySqlDatabaseSchema( connectorConfig, isTableIdCaseSensitive); this.schemasByTableId = new HashMap<>(); + this.tableMap = tableMap; } /** @@ -81,7 +90,13 @@ private TableChange readTableSchema(JdbcConnection jdbc, TableId tableId) { for (SchemaChangeEvent schemaChangeEvent : schemaChangeEvents) { for (TableChange tableChange : schemaChangeEvent.getTableChanges()) { - tableChangeMap.put(tableId, tableChange); + Table table = + CatalogTableUtils.mergeCatalogTableConfig( + tableChange.getTable(), tableMap.get(tableId)); + TableChange newTableChange = + new TableChange( + TableChanges.TableChangeType.CREATE, table); + tableChangeMap.put(tableId, newTableChange); } } } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java index c337ddc2a0b..e667412378c 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java @@ -17,6 +17,9 @@ package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.ConstraintKey; +import org.apache.seatunnel.api.table.catalog.PrimaryKey; import org.apache.seatunnel.common.utils.SeaTunnelException; import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig; import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect; @@ -24,6 +27,7 @@ import org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter; import org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask; import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase; +import org.apache.seatunnel.connectors.cdc.base.utils.CatalogTableUtils; import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.config.SqlServerSourceConfig; import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.config.SqlServerSourceConfigFactory; import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source.eumerator.SqlServerChunkSplitter; @@ -40,6 +44,8 @@ import java.sql.SQLException; import java.util.List; +import java.util.Map; +import java.util.Optional; import static org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerConnectionUtils.createSqlServerConnection; @@ -50,9 +56,12 @@ public class SqlServerDialect implements JdbcDataSourceDialect { private final SqlServerSourceConfig sourceConfig; private transient SqlServerSchema sqlServerSchema; + private final Map tableMap; - public SqlServerDialect(SqlServerSourceConfigFactory configFactory) { + public SqlServerDialect( + SqlServerSourceConfigFactory configFactory, List catalogTables) { this.sourceConfig = configFactory.create(0); + this.tableMap = CatalogTableUtils.convertTables(catalogTables); } @Override @@ -95,7 +104,7 @@ public List discoverDataCollections(JdbcSourceConfig sourceConfig) { @Override public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc, TableId tableId) { if (sqlServerSchema == null) { - sqlServerSchema = new SqlServerSchema(sourceConfig.getDbzConnectorConfig()); + sqlServerSchema = new SqlServerSchema(sourceConfig.getDbzConnectorConfig(), tableMap); } return sqlServerSchema.getTableSchema(jdbc, tableId); } @@ -115,4 +124,14 @@ public FetchTask createFetchTask(SourceSplitBase sourceSplitBas return new SqlServerTransactionLogFetchTask(sourceSplitBase.asIncrementalSplit()); } } + + @Override + public Optional getPrimaryKey(JdbcConnection jdbcConnection, TableId tableId) { + return Optional.ofNullable(tableMap.get(tableId).getTableSchema().getPrimaryKey()); + } + + @Override + public List getConstraintKeys(JdbcConnection jdbcConnection, TableId tableId) { + return tableMap.get(tableId).getTableSchema().getConstraintKeys(); + } } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSource.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSource.java index 5a3a3cc9eb9..4ab64ff692f 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSource.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSource.java @@ -114,7 +114,7 @@ public DebeziumDeserializationSchema createDebeziumDeserializationSchema( @Override public DataSourceDialect createDataSourceDialect(ReadonlyConfig config) { - return new SqlServerDialect((SqlServerSourceConfigFactory) configFactory); + return new SqlServerDialect((SqlServerSourceConfigFactory) configFactory, catalogTables); } @Override diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java index 6338d85aa2f..95031e9b9ff 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java @@ -23,22 +23,26 @@ import org.apache.seatunnel.api.table.catalog.CatalogOptions; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.connector.TableSource; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableSourceFactory; import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceTableConfig; import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions; import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions; import org.apache.seatunnel.connectors.cdc.base.option.StartupMode; import org.apache.seatunnel.connectors.cdc.base.option.StopMode; +import org.apache.seatunnel.connectors.cdc.base.utils.CatalogTableUtils; import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions; import com.google.auto.service.AutoService; import java.io.Serializable; import java.util.List; +import java.util.Optional; @AutoService(Factory.class) public class SqlServerIncrementalSourceFactory implements TableSourceFactory { @@ -64,7 +68,8 @@ public OptionRule optionRule() { JdbcSourceOptions.CONNECTION_POOL_SIZE, JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND, JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND, - JdbcSourceOptions.SAMPLE_SHARDING_THRESHOLD) + JdbcSourceOptions.SAMPLE_SHARDING_THRESHOLD, + JdbcSourceOptions.TABLE_NAMES_CONFIG) .optional(SqlServerSourceOptions.STARTUP_MODE, SqlServerSourceOptions.STOP_MODE) .conditional( SqlServerSourceOptions.STARTUP_MODE, @@ -101,6 +106,15 @@ TableSource createSource(TableSourceFactoryContext context) { List catalogTables = CatalogTableUtil.getCatalogTables( context.getOptions(), context.getClassLoader()); + Optional> tableConfigs = + context.getOptions().getOptional(JdbcSourceOptions.TABLE_NAMES_CONFIG); + if (tableConfigs.isPresent()) { + catalogTables = + CatalogTableUtils.mergeCatalogTableConfig( + catalogTables, + tableConfigs.get(), + text -> TablePath.of(text, true)); + } SeaTunnelDataType dataType = CatalogTableUtil.convertToMultipleRowType(catalogTables); return new SqlServerIncrementalSource(context.getOptions(), dataType, catalogTables); diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerSchema.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerSchema.java index 83d51ae31ba..79f58e3e2df 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerSchema.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerSchema.java @@ -17,7 +17,9 @@ package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils; +import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.common.utils.SeaTunnelException; +import org.apache.seatunnel.connectors.cdc.base.utils.CatalogTableUtils; import io.debezium.connector.sqlserver.SqlServerConnection; import io.debezium.connector.sqlserver.SqlServerConnectorConfig; @@ -38,10 +40,13 @@ public class SqlServerSchema { private final SqlServerConnectorConfig connectorConfig; private final Map schemasByTableId; + private final Map tableMap; - public SqlServerSchema(SqlServerConnectorConfig connectorConfig) { + public SqlServerSchema( + SqlServerConnectorConfig connectorConfig, Map tableMap) { this.schemasByTableId = new ConcurrentHashMap<>(); this.connectorConfig = connectorConfig; + this.tableMap = tableMap; } public TableChange getTableSchema(JdbcConnection jdbc, TableId tableId) { @@ -67,7 +72,9 @@ private TableChange readTableSchema(JdbcConnection jdbc, TableId tableId) { connectorConfig.getTableFilters().dataCollectionFilter(), null, false); - Table table = tables.forTable(tableId); + Table table = + CatalogTableUtils.mergeCatalogTableConfig( + tables.forTable(tableId), tableMap.get(tableId)); TableChange tableChange = new TableChange(TableChanges.TableChangeType.CREATE, table); tableChangeMap.put(tableId, tableChange); } catch (SQLException e) { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java index 366b3146b17..f2d5669c37f 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java @@ -96,6 +96,11 @@ public class MysqlCDCIT extends TestSuiteBase implements TestResource { private static final String SOURCE_TABLE_2 = "mysql_cdc_e2e_source_table2"; private static final String SOURCE_TABLE_NO_PRIMARY_KEY = "mysql_cdc_e2e_source_table_no_primary_key"; + + private static final String SOURCE_TABLE_1_CUSTOM_PRIMARY_KEY = + "mysql_cdc_e2e_source_table_1_custom_primary_key"; + private static final String SOURCE_TABLE_2_CUSTOM_PRIMARY_KEY = + "mysql_cdc_e2e_source_table_2_custom_primary_key"; private static final String SINK_TABLE = "mysql_cdc_e2e_sink_table"; private static MySqlContainer createMySqlContainer(MySqlVersion version) { @@ -414,6 +419,58 @@ public void testMultiTableWithRestore(TestContainer container) log.info("****************** container logs end ******************"); } + @TestTemplate + @DisabledOnContainer( + value = {}, + type = {EngineType.SPARK, EngineType.FLINK}, + disabledReason = "Currently SPARK and FLINK do not support multi table") + public void testMysqlCdcMultiTableWithCustomPrimaryKey(TestContainer container) { + // Clear related content to ensure that multiple operations are not affected + clearTable(MYSQL_DATABASE2, SOURCE_TABLE_1_CUSTOM_PRIMARY_KEY); + clearTable(MYSQL_DATABASE2, SOURCE_TABLE_2_CUSTOM_PRIMARY_KEY); + + CompletableFuture.supplyAsync( + () -> { + try { + container.executeJob("/mysqlcdc_to_mysql_with_custom_primary_key.conf"); + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(e); + } + return null; + }); + + // insert update delete + upsertDeleteSourceTable(MYSQL_DATABASE, SOURCE_TABLE_1_CUSTOM_PRIMARY_KEY); + upsertDeleteSourceTable(MYSQL_DATABASE, SOURCE_TABLE_2_CUSTOM_PRIMARY_KEY); + + // stream stage + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> + Assertions.assertAll( + () -> + Assertions.assertIterableEquals( + query( + getSourceQuerySQL( + MYSQL_DATABASE, + SOURCE_TABLE_1_CUSTOM_PRIMARY_KEY)), + query( + getSourceQuerySQL( + MYSQL_DATABASE2, + SOURCE_TABLE_1_CUSTOM_PRIMARY_KEY))), + () -> + Assertions.assertIterableEquals( + query( + getSourceQuerySQL( + MYSQL_DATABASE, + SOURCE_TABLE_2_CUSTOM_PRIMARY_KEY)), + query( + getSourceQuerySQL( + MYSQL_DATABASE2, + SOURCE_TABLE_2_CUSTOM_PRIMARY_KEY))))); + } + private Connection getJdbcConnection() throws SQLException { return DriverManager.getConnection( MYSQL_CONTAINER.getJdbcUrl(), diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/mysql_cdc.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/mysql_cdc.sql index 91ae73bd27f..b909f9aacd3 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/mysql_cdc.sql +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/mysql_cdc.sql @@ -170,6 +170,102 @@ CREATE TABLE mysql_cdc_e2e_source_table_no_primary_key DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci; +CREATE TABLE mysql_cdc_e2e_source_table_1_custom_primary_key +( + `id` int NOT NULL, + `f_binary` binary(64) DEFAULT NULL, + `f_blob` blob, + `f_long_varbinary` mediumblob, + `f_longblob` longblob, + `f_tinyblob` tinyblob, + `f_varbinary` varbinary(100) DEFAULT NULL, + `f_smallint` smallint DEFAULT NULL, + `f_smallint_unsigned` smallint unsigned DEFAULT NULL, + `f_mediumint` mediumint DEFAULT NULL, + `f_mediumint_unsigned` mediumint unsigned DEFAULT NULL, + `f_int` int DEFAULT NULL, + `f_int_unsigned` int unsigned DEFAULT NULL, + `f_integer` int DEFAULT NULL, + `f_integer_unsigned` int unsigned DEFAULT NULL, + `f_bigint` bigint DEFAULT NULL, + `f_bigint_unsigned` bigint unsigned DEFAULT NULL, + `f_numeric` decimal(10, 0) DEFAULT NULL, + `f_decimal` decimal(10, 0) DEFAULT NULL, + `f_float` float DEFAULT NULL, + `f_double` double DEFAULT NULL, + `f_double_precision` double DEFAULT NULL, + `f_longtext` longtext, + `f_mediumtext` mediumtext, + `f_text` text, + `f_tinytext` tinytext, + `f_varchar` varchar(100) DEFAULT NULL, + `f_date` date DEFAULT NULL, + `f_datetime` datetime DEFAULT NULL, + `f_timestamp` timestamp NULL DEFAULT NULL, + `f_bit1` bit(1) DEFAULT NULL, + `f_bit64` bit(64) DEFAULT NULL, + `f_char` char(1) DEFAULT NULL, + `f_enum` enum ('enum1','enum2','enum3') DEFAULT NULL, + `f_mediumblob` mediumblob, + `f_long_varchar` mediumtext, + `f_real` double DEFAULT NULL, + `f_time` time DEFAULT NULL, + `f_tinyint` tinyint DEFAULT NULL, + `f_tinyint_unsigned` tinyint unsigned DEFAULT NULL, + `f_json` json DEFAULT NULL, + `f_year` year DEFAULT NULL +) ENGINE = InnoDB + DEFAULT CHARSET = utf8mb4 + COLLATE = utf8mb4_0900_ai_ci; + +CREATE TABLE mysql_cdc_e2e_source_table_2_custom_primary_key +( + `id` int NOT NULL, + `f_binary` binary(64) DEFAULT NULL, + `f_blob` blob, + `f_long_varbinary` mediumblob, + `f_longblob` longblob, + `f_tinyblob` tinyblob, + `f_varbinary` varbinary(100) DEFAULT NULL, + `f_smallint` smallint DEFAULT NULL, + `f_smallint_unsigned` smallint unsigned DEFAULT NULL, + `f_mediumint` mediumint DEFAULT NULL, + `f_mediumint_unsigned` mediumint unsigned DEFAULT NULL, + `f_int` int DEFAULT NULL, + `f_int_unsigned` int unsigned DEFAULT NULL, + `f_integer` int DEFAULT NULL, + `f_integer_unsigned` int unsigned DEFAULT NULL, + `f_bigint` bigint DEFAULT NULL, + `f_bigint_unsigned` bigint unsigned DEFAULT NULL, + `f_numeric` decimal(10, 0) DEFAULT NULL, + `f_decimal` decimal(10, 0) DEFAULT NULL, + `f_float` float DEFAULT NULL, + `f_double` double DEFAULT NULL, + `f_double_precision` double DEFAULT NULL, + `f_longtext` longtext, + `f_mediumtext` mediumtext, + `f_text` text, + `f_tinytext` tinytext, + `f_varchar` varchar(100) DEFAULT NULL, + `f_date` date DEFAULT NULL, + `f_datetime` datetime DEFAULT NULL, + `f_timestamp` timestamp NULL DEFAULT NULL, + `f_bit1` bit(1) DEFAULT NULL, + `f_bit64` bit(64) DEFAULT NULL, + `f_char` char(1) DEFAULT NULL, + `f_enum` enum ('enum1','enum2','enum3') DEFAULT NULL, + `f_mediumblob` mediumblob, + `f_long_varchar` mediumtext, + `f_real` double DEFAULT NULL, + `f_time` time DEFAULT NULL, + `f_tinyint` tinyint DEFAULT NULL, + `f_tinyint_unsigned` tinyint unsigned DEFAULT NULL, + `f_json` json DEFAULT NULL, + `f_year` year DEFAULT NULL +) ENGINE = InnoDB + DEFAULT CHARSET = utf8mb4 + COLLATE = utf8mb4_0900_ai_ci; + CREATE TABLE mysql_cdc_e2e_sink_table ( `id` int NOT NULL AUTO_INCREMENT, @@ -223,6 +319,8 @@ CREATE TABLE mysql_cdc_e2e_sink_table truncate table mysql_cdc_e2e_source_table; truncate table mysql_cdc_e2e_source_table2; truncate table mysql_cdc_e2e_source_table_no_primary_key; +truncate table mysql_cdc_e2e_source_table_1_custom_primary_key; +truncate table mysql_cdc_e2e_source_table_2_custom_primary_key; truncate table mysql_cdc_e2e_sink_table; INSERT INTO mysql_cdc_e2e_source_table ( id, f_binary, f_blob, f_long_varbinary, f_longblob, f_tinyblob, f_varbinary, f_smallint, @@ -318,6 +416,68 @@ VALUES ( 1, 0x616263740000000000000000000000000000000000000000000000000000000000 0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 'This is a long varchar field', 112.345, '14:30:00', -128, 22, '{ "key": "value" }', 2021 ); +INSERT INTO mysql_cdc_e2e_source_table_1_custom_primary_key ( id, f_binary, f_blob, f_long_varbinary, f_longblob, f_tinyblob, f_varbinary, f_smallint, + f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, f_integer, + f_integer_unsigned, f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double, + f_double_precision, f_longtext, f_mediumtext, f_text, f_tinytext, f_varchar, f_date, f_datetime, + f_timestamp, f_bit1, f_bit64, f_char, f_enum, f_mediumblob, f_long_varchar, f_real, f_time, + f_tinyint, f_tinyint_unsigned, f_json, f_year ) +VALUES ( 1, 0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000, + 0x68656C6C6F, 0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL, + 0x74696E79626C6F62, 0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321, 1234567, 7654321, + 123456789, 987654321, 123, 789, 12.34, 56.78, 90.12, 'This is a long text field', 'This is a medium text field', + 'This is a text field', 'This is a tiny text field', 'This is a varchar field', '2022-04-27', '2022-04-27 14:30:00', + '2023-04-27 11:08:40', 1, b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 'enum2', + 0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 'This is a long varchar field', + 12.345, '14:30:00', -128, 255, '{ "key": "value" }', 2022 ), + ( 2, 0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000, + 0x68656C6C6F, 0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL, 0x74696E79626C6F62, + 0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321, 1234567, 7654321, 123456789, 987654321, + 123, 789, 12.34, 56.78, 90.12, 'This is a long text field', 'This is a medium text field', 'This is a text field', + 'This is a tiny text field', 'This is a varchar field', '2022-04-27', '2022-04-27 14:30:00', '2023-04-27 11:08:40', + 1, b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 'enum2', + 0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 'This is a long varchar field', + 112.345, '14:30:00', -128, 22, '{ "key": "value" }', 2013 ), + ( 3, 0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000, + 0x68656C6C6F, 0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL, 0x74696E79626C6F62, + 0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321, 1234567, 7654321, 123456789, 987654321, 123, + 789, 12.34, 56.78, 90.12, 'This is a long text field', 'This is a medium text field', 'This is a text field', + 'This is a tiny text field', 'This is a varchar field', '2022-04-27', '2022-04-27 14:30:00', '2023-04-27 11:08:40', + 1, b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 'enum2', + 0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 'This is a long varchar field', 112.345, + '14:30:00', -128, 22, '{ "key": "value" }', 2021 ); + +INSERT INTO mysql_cdc_e2e_source_table_2_custom_primary_key ( id, f_binary, f_blob, f_long_varbinary, f_longblob, f_tinyblob, f_varbinary, f_smallint, + f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, f_integer, + f_integer_unsigned, f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double, + f_double_precision, f_longtext, f_mediumtext, f_text, f_tinytext, f_varchar, f_date, f_datetime, + f_timestamp, f_bit1, f_bit64, f_char, f_enum, f_mediumblob, f_long_varchar, f_real, f_time, + f_tinyint, f_tinyint_unsigned, f_json, f_year ) +VALUES ( 1, 0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000, + 0x68656C6C6F, 0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL, + 0x74696E79626C6F62, 0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321, 1234567, 7654321, + 123456789, 987654321, 123, 789, 12.34, 56.78, 90.12, 'This is a long text field', 'This is a medium text field', + 'This is a text field', 'This is a tiny text field', 'This is a varchar field', '2022-04-27', '2022-04-27 14:30:00', + '2023-04-27 11:08:40', 1, b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 'enum2', + 0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 'This is a long varchar field', + 12.345, '14:30:00', -128, 255, '{ "key": "value" }', 2022 ), + ( 2, 0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000, + 0x68656C6C6F, 0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL, 0x74696E79626C6F62, + 0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321, 1234567, 7654321, 123456789, 987654321, + 123, 789, 12.34, 56.78, 90.12, 'This is a long text field', 'This is a medium text field', 'This is a text field', + 'This is a tiny text field', 'This is a varchar field', '2022-04-27', '2022-04-27 14:30:00', '2023-04-27 11:08:40', + 1, b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 'enum2', + 0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 'This is a long varchar field', + 112.345, '14:30:00', -128, 22, '{ "key": "value" }', 2013 ), + ( 3, 0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000, + 0x68656C6C6F, 0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL, 0x74696E79626C6F62, + 0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321, 1234567, 7654321, 123456789, 987654321, 123, + 789, 12.34, 56.78, 90.12, 'This is a long text field', 'This is a medium text field', 'This is a text field', + 'This is a tiny text field', 'This is a varchar field', '2022-04-27', '2022-04-27 14:30:00', '2023-04-27 11:08:40', + 1, b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 'enum2', + 0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 'This is a long varchar field', 112.345, + '14:30:00', -128, 22, '{ "key": "value" }', 2021 ); + CREATE DATABASE IF NOT EXISTS `mysql_cdc2`; use mysql_cdc2; @@ -421,3 +581,101 @@ CREATE TABLE mysql_cdc_e2e_source_table2 AUTO_INCREMENT = 2 DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci; + +CREATE TABLE mysql_cdc_e2e_source_table_1_custom_primary_key +( + `id` int NOT NULL, + `f_binary` binary(64) DEFAULT NULL, + `f_blob` blob, + `f_long_varbinary` mediumblob, + `f_longblob` longblob, + `f_tinyblob` tinyblob, + `f_varbinary` varbinary(100) DEFAULT NULL, + `f_smallint` smallint DEFAULT NULL, + `f_smallint_unsigned` smallint unsigned DEFAULT NULL, + `f_mediumint` mediumint DEFAULT NULL, + `f_mediumint_unsigned` mediumint unsigned DEFAULT NULL, + `f_int` int DEFAULT NULL, + `f_int_unsigned` int unsigned DEFAULT NULL, + `f_integer` int DEFAULT NULL, + `f_integer_unsigned` int unsigned DEFAULT NULL, + `f_bigint` bigint DEFAULT NULL, + `f_bigint_unsigned` bigint unsigned DEFAULT NULL, + `f_numeric` decimal(10, 0) DEFAULT NULL, + `f_decimal` decimal(10, 0) DEFAULT NULL, + `f_float` float DEFAULT NULL, + `f_double` double DEFAULT NULL, + `f_double_precision` double DEFAULT NULL, + `f_longtext` longtext, + `f_mediumtext` mediumtext, + `f_text` text, + `f_tinytext` tinytext, + `f_varchar` varchar(100) DEFAULT NULL, + `f_date` date DEFAULT NULL, + `f_datetime` datetime DEFAULT NULL, + `f_timestamp` timestamp NULL DEFAULT NULL, + `f_bit1` bit(1) DEFAULT NULL, + `f_bit64` bit(64) DEFAULT NULL, + `f_char` char(1) DEFAULT NULL, + `f_enum` enum ('enum1','enum2','enum3') DEFAULT NULL, + `f_mediumblob` mediumblob, + `f_long_varchar` mediumtext, + `f_real` double DEFAULT NULL, + `f_time` time DEFAULT NULL, + `f_tinyint` tinyint DEFAULT NULL, + `f_tinyint_unsigned` tinyint unsigned DEFAULT NULL, + `f_json` json DEFAULT NULL, + `f_year` year DEFAULT NULL, + PRIMARY KEY (`id`) +) ENGINE = InnoDB + DEFAULT CHARSET = utf8mb4 + COLLATE = utf8mb4_0900_ai_ci; + +CREATE TABLE mysql_cdc_e2e_source_table_2_custom_primary_key +( + `id` int NOT NULL, + `f_binary` binary(64) DEFAULT NULL, + `f_blob` blob, + `f_long_varbinary` mediumblob, + `f_longblob` longblob, + `f_tinyblob` tinyblob, + `f_varbinary` varbinary(100) DEFAULT NULL, + `f_smallint` smallint DEFAULT NULL, + `f_smallint_unsigned` smallint unsigned DEFAULT NULL, + `f_mediumint` mediumint DEFAULT NULL, + `f_mediumint_unsigned` mediumint unsigned DEFAULT NULL, + `f_int` int DEFAULT NULL, + `f_int_unsigned` int unsigned DEFAULT NULL, + `f_integer` int DEFAULT NULL, + `f_integer_unsigned` int unsigned DEFAULT NULL, + `f_bigint` bigint DEFAULT NULL, + `f_bigint_unsigned` bigint unsigned DEFAULT NULL, + `f_numeric` decimal(10, 0) DEFAULT NULL, + `f_decimal` decimal(10, 0) DEFAULT NULL, + `f_float` float DEFAULT NULL, + `f_double` double DEFAULT NULL, + `f_double_precision` double DEFAULT NULL, + `f_longtext` longtext, + `f_mediumtext` mediumtext, + `f_text` text, + `f_tinytext` tinytext, + `f_varchar` varchar(100) DEFAULT NULL, + `f_date` date DEFAULT NULL, + `f_datetime` datetime DEFAULT NULL, + `f_timestamp` timestamp NULL DEFAULT NULL, + `f_bit1` bit(1) DEFAULT NULL, + `f_bit64` bit(64) DEFAULT NULL, + `f_char` char(1) DEFAULT NULL, + `f_enum` enum ('enum1','enum2','enum3') DEFAULT NULL, + `f_mediumblob` mediumblob, + `f_long_varchar` mediumtext, + `f_real` double DEFAULT NULL, + `f_time` time DEFAULT NULL, + `f_tinyint` tinyint DEFAULT NULL, + `f_tinyint_unsigned` tinyint unsigned DEFAULT NULL, + `f_json` json DEFAULT NULL, + `f_year` year DEFAULT NULL, + PRIMARY KEY (`id`) +) ENGINE = InnoDB + DEFAULT CHARSET = utf8mb4 + COLLATE = utf8mb4_0900_ai_ci; diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_custom_primary_key.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_custom_primary_key.conf new file mode 100644 index 00000000000..ba3e94855fd --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_custom_primary_key.conf @@ -0,0 +1,60 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set engine configuration here + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 +} + +source { + MySQL-CDC { + result_table_name = "customers_mysql_cdc" + server-id = 5652 + base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc" + username = "mysqluser" + password = "mysqlpw" + exactly_once = true + table-names = ["mysql_cdc.mysql_cdc_e2e_source_table_1_custom_primary_key", "mysql_cdc.mysql_cdc_e2e_source_table_2_custom_primary_key"] + table-names-config = [ + { + table = "mysql_cdc.mysql_cdc_e2e_source_table_1_custom_primary_key" + primaryKeys = ["id"] + }, + { + table = "mysql_cdc.mysql_cdc_e2e_source_table_2_custom_primary_key" + primaryKeys = ["id"] + } + ] + } +} + +sink { + jdbc { + source_table_name = "customers_mysql_cdc" + url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc2" + driver = "com.mysql.cj.jdbc.Driver" + user = "mysqluser" + password = "mysqlpw" + database = "mysql_cdc2" + generate_sink_sql = true + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java index 597838096bc..1216c696454 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java @@ -83,6 +83,8 @@ public class SqlServerCDCIT extends TestSuiteBase implements TestResource { private static final String SOURCE_TABLE = "column_type_test.dbo.full_types"; private static final String SOURCE_TABLE_NO_PRIMARY_KEY = "column_type_test.dbo.full_types_no_primary_key"; + private static final String SOURCE_TABLE_CUSTOM_PRIMARY_KEY = + "column_type_test.dbo.full_types_custom_primary_key"; private static final String SINK_TABLE = "column_type_test.dbo.full_types_sink"; private static final String SELECT_SOURCE_SQL = "select\n" @@ -265,6 +267,44 @@ public void testCDCWithNoPrimaryKey(TestContainer container) { }); } + @TestTemplate + public void testCDCWithCustomPrimaryKey(TestContainer container) { + initializeSqlServerTable("column_type_test"); + + CompletableFuture executeJobFuture = + CompletableFuture.supplyAsync( + () -> { + try { + container.executeJob( + "/sqlservercdc_to_sqlserver_with_custom_primary_key.conf"); + } catch (Exception e) { + throw new RuntimeException(e); + } + return null; + }); + + // snapshot stage + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Assertions.assertIterableEquals( + querySql(SELECT_SOURCE_SQL, SOURCE_TABLE_CUSTOM_PRIMARY_KEY), + querySql(SELECT_SINK_SQL, SINK_TABLE)); + }); + + // insert update delete + updateSourceTable(SOURCE_TABLE_CUSTOM_PRIMARY_KEY); + + // stream stage + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Assertions.assertIterableEquals( + querySql(SELECT_SOURCE_SQL, SOURCE_TABLE_CUSTOM_PRIMARY_KEY), + querySql(SELECT_SINK_SQL, SINK_TABLE)); + }); + } + /** * Executes a JDBC statement using the default jdbc config without autocommitting the * connection. diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/ddl/column_type_test.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/ddl/column_type_test.sql index d227c346152..0c6aebe4fd8 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/ddl/column_type_test.sql +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/ddl/column_type_test.sql @@ -120,6 +120,54 @@ INSERT INTO full_types_no_primary_key VALUES (2, 'b',SYSDATETIMEOFFSET(),CAST('test_varbinary' AS varbinary(100))); EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'full_types_no_primary_key', @role_name = NULL, @supports_net_changes = 0; +CREATE TABLE full_types_custom_primary_key ( + id int NOT NULL, + val_char char(3), + val_varchar varchar(1000), + val_text text, + val_nchar nchar(3), + val_nvarchar nvarchar(1000), + val_ntext ntext, + val_decimal decimal(6,3), + val_numeric numeric, + val_float float, + val_real real, + val_smallmoney smallmoney, + val_money money, + val_bit bit, + val_tinyint tinyint, + val_smallint smallint, + val_int int, + val_bigint bigint, + val_date date, + val_time time, + val_datetime2 datetime2, + val_datetime datetime, + val_smalldatetime smalldatetime, + val_xml xml, + val_datetimeoffset DATETIMEOFFSET(4), + val_varbinary varbinary(100) +); +INSERT INTO full_types_custom_primary_key VALUES (0, + 'cč0', 'vcč', 'tč', N'cč', N'vcč', N'tč', + 1.123, 2, 3.323, 4.323, 5.323, 6.323, + 1, 22, 333, 4444, 55555, + '2018-07-13', '10:23:45', '2018-07-13 11:23:45.34', '2018-07-13 13:23:45.78', '2018-07-13 14:23:45', + 'b',SYSDATETIMEOFFSET(),CAST('test_varbinary' AS varbinary(100))); +INSERT INTO full_types_custom_primary_key VALUES (1, + 'cč1', 'vcč', 'tč', N'cč', N'vcč', N'tč', + 1.123, 2, 3.323, 4.323, 5.323, 6.323, + 1, 22, 333, 4444, 55555, + '2018-07-13', '10:23:45', '2018-07-13 11:23:45.34', '2018-07-13 13:23:45.78', '2018-07-13 14:23:45', + 'b',SYSDATETIMEOFFSET(),CAST('test_varbinary' AS varbinary(100))); +INSERT INTO full_types_custom_primary_key VALUES (2, + 'cč2', 'vcč', 'tč', N'cč', N'vcč', N'tč', + 1.123, 2, 3.323, 4.323, 5.323, 6.323, + 1, 22, 333, 4444, 55555, + '2018-07-13', '10:23:45', '2018-07-13 11:23:45.34', '2018-07-13 13:23:45.78', '2018-07-13 14:23:45', + 'b',SYSDATETIMEOFFSET(),CAST('test_varbinary' AS varbinary(100))); +EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'full_types_custom_primary_key', @role_name = NULL, @supports_net_changes = 0; + CREATE TABLE full_types_sink ( id int NOT NULL, val_char char(3), diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/sqlservercdc_to_sqlserver_with_custom_primary_key.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/sqlservercdc_to_sqlserver_with_custom_primary_key.conf new file mode 100644 index 00000000000..4e9f4e5e734 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/sqlservercdc_to_sqlserver_with_custom_primary_key.conf @@ -0,0 +1,58 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set engine configuration here + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + SqlServer-CDC { + result_table_name = "customers" + username = "sa" + password = "Password!" + database-names = ["column_type_test"] + table-names = ["column_type_test.dbo.full_types_custom_primary_key"] + base-url = "jdbc:sqlserver://sqlserver-host:1433;databaseName=column_type_test" + + exactly_once = false + } +} + +transform { +} + +sink { + Jdbc { + source_table_name = "customers" + driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver" + url = "jdbc:sqlserver://sqlserver-host:1433;encrypt=false" + user = "sa" + password = "Password!" + generate_sink_sql = true + database = "column_type_test" + table = "dbo.full_types_sink" + batch_size = 1 + primary_keys = ["id"] + } +} \ No newline at end of file