Skip to content

Commit

Permalink
[Feature][CDC] Support custom table primary key (#6106)
Browse files Browse the repository at this point in the history
* [Feature][CDC] Support custom table primary key

* Update seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java

Co-authored-by: TaoZex <[email protected]>

* Update seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/CatalogTableUtils.java

Co-authored-by: TaoZex <[email protected]>

---------

Co-authored-by: TaoZex <[email protected]>
  • Loading branch information
hailin0 and TaoZex authored Jan 3, 2024
1 parent edcaace commit 1312a1d
Show file tree
Hide file tree
Showing 19 changed files with 873 additions and 15 deletions.
35 changes: 32 additions & 3 deletions docs/en/connector-v2/source/MySQL-CDC.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ When an initial consistent snapshot is made for large databases, your establishe
| password | String | Yes | - | Password to use when connecting to the database server. |
| database-names | List | No | - | Database name of the database to monitor. |
| table-names | List | Yes | - | Table name of the database to monitor. The table name needs to include the database name, for example: `database_name.table_name` |
| table-names-config | List | No | - | Table config list. for example: [{"table": "db1.schema1.table1","primaryKeys":["key1"]}] |
| startup.mode | Enum | No | INITIAL | Optional startup mode for MySQL CDC consumer, valid enumerations are `initial`, `earliest`, `latest` and `specific`. <br/> `initial`: Synchronize historical data at startup, and then synchronize incremental data.<br/> `earliest`: Startup from the earliest offset possible.<br/> `latest`: Startup from the latest offset.<br/> `specific`: Startup from user-supplied specific offsets. |
| startup.specific-offset.file | String | No | - | Start from the specified binlog file name. **Note, This option is required when the `startup.mode` option used `specific`.** |
| startup.specific-offset.pos | Long | No | - | Start from the specified binlog file position. **Note, This option is required when the `startup.mode` option used `specific`.** |
Expand Down Expand Up @@ -190,9 +191,6 @@ env {
source {
MySQL-CDC {
catalog = {
factory = MySQL
}
base-url = "jdbc:mysql://localhost:3306/testdb"
username = "root"
password = "root@123"
Expand All @@ -212,6 +210,37 @@ sink {

> Must be used with kafka connector sink, see [compatible debezium format](../formats/cdc-compatible-debezium-json.md) for details
### Support custom primary key for table

```
env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 10000
}
source {
MySQL-CDC {
base-url = "jdbc:mysql://localhost:3306/testdb"
username = "root"
password = "root@123"
table-names = ["testdb.table1", "testdb.table2"]
table-names-config = [
{
table = "testdb.table2"
primaryKeys = ["id"]
}
]
}
}
sink {
Console {
}
}
```

## Changelog

- Add MySQL CDC Source Connector
Expand Down
32 changes: 32 additions & 0 deletions docs/en/connector-v2/source/SqlServer-CDC.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ Please download and put SqlServer driver in `${SEATUNNEL_HOME}/lib/` dir. For ex
| password | String | Yes | - | Password to use when connecting to the database server. |
| database-names | List | Yes | - | Database name of the database to monitor. |
| table-names | List | Yes | - | Table name is a combination of schema name and table name (databaseName.schemaName.tableName). |
| table-names-config | List | No | - | Table config list. for example: [{"table": "db1.schema1.table1","primaryKeys":["key1"]}] |
| base-url | String | Yes | - | URL has to be with database, like "jdbc:sqlserver://localhost:1433;databaseName=test". |
| startup.mode | Enum | No | INITIAL | Optional startup mode for SqlServer CDC consumer, valid enumerations are "initial", "earliest", "latest" and "specific". |
| startup.timestamp | Long | No | - | Start from the specified epoch timestamp (in milliseconds).<br/> **Note, This option is required when** the **"startup.mode" option used `'timestamp'`.** |
Expand Down Expand Up @@ -186,3 +187,34 @@ sink {
}
```

### Support custom primary key for table

```
env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source {
SqlServer-CDC {
base-url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
username = "sa"
password = "Y.sa123456"
database-names = ["column_type_test"]
table-names = ["column_type_test.dbo.simple_types", "column_type_test.dbo.full_types"]
table-names-config = [
{
table = "column_type_test.dbo.full_types"
primaryKeys = ["id"]
}
]
}
}
sink {
console {
}
```

Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.cdc.base.config;

import lombok.Data;

import java.io.Serializable;
import java.util.List;

@Data
public class JdbcSourceTableConfig implements Serializable {
private String table;
private List<String> primaryKeys;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceTableConfig;
import org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource;

import java.time.ZoneId;
Expand Down Expand Up @@ -141,4 +142,17 @@ public class JdbcSourceOptions extends SourceOptions {
+ "The value represents the denominator of the sampling rate fraction. "
+ "For example, a value of 1000 means a sampling rate of 1/1000. "
+ "This parameter is used when the sample sharding strategy is triggered.");

public static final Option<List<JdbcSourceTableConfig>> TABLE_NAMES_CONFIG =
Options.key("table-names-config")
.listType(JdbcSourceTableConfig.class)
.noDefaultValue()
.withDescription(
"Config table configs. Example: "
+ "["
+ " {"
+ " \"table\": \"db1.schema1.table1\","
+ " \"primaryKeys\": [\"key1\",\"key2\"]"
+ " }"
+ "]");
}
Loading

0 comments on commit 1312a1d

Please sign in to comment.