Skip to content

Commit

Permalink
[Feature][CDC][SqlServer] Support multi-table read
Browse files Browse the repository at this point in the history
  • Loading branch information
hailin0 committed Mar 22, 2023
1 parent e1f6d3b commit 7800f06
Show file tree
Hide file tree
Showing 15 changed files with 927 additions and 72 deletions.
34 changes: 15 additions & 19 deletions docs/en/connector-v2/source/SqlServer-CDC.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@ describes how to setup the SqlServer CDC connector to run SQL queries against Sq

| name | type | required | default value |
|------------------------------------------------|----------|----------|---------------|
| hostname | String | Yes | - |
| port | Integer | No | 3306 |
| username | String | Yes | - |
| password | String | Yes | - |
| database-name | String | Yes | - |
| table-name | String | Yes | - |
| database-names | List | Yes | - |
| table-names | List | Yes | - |
| base-url | String | Yes | - |
| startup.mode | Enum | No | INITIAL |
| startup.timestamp | Long | No | - |
| startup.specific-offset.file | String | No | - |
Expand All @@ -47,14 +46,6 @@ describes how to setup the SqlServer CDC connector to run SQL queries against Sq
| format | Enum | No | DEFAULT |
| common-options | | no | - |

### hostname [String]

IP address or hostname of the database server.

### port [Integer]

Integer port number of the database server.

### username [String]

Name of the database to use when connecting to the database server.
Expand All @@ -63,13 +54,17 @@ Name of the database to use when connecting to the database server.

Password to use when connecting to the database server.

### database-name [String]
### database-names [List]

Database name of the database to monitor.

### table-name [String]
### table-names [List]

Table name is a combination of schema name and table name (databaseName.schemaName.tableName).

### base-url [String]

Table name is a combination of schema name and table name (schemaName.tableName).
URL has to be with database, like "jdbc:sqlserver://localhost:1433;databaseName=test".

### startup.mode [Enum]

Expand Down Expand Up @@ -178,12 +173,12 @@ Source plugin common parameters, please refer to [Source Common Options](common-
source {
SqlServer-CDC {
result_table_name = "customers"
hostname = "sqlserver-host"
port = "1433"
username = "sa"
password = "Password!"
database-name = "column_type_test"
table-name = "dbo.full_types"
database-names = ["exampledb"]
table-names = ["exampledb.dbo.table_x"]
base-url="jdbc:sqlserver://localhost:1433;databaseName=exampledb"
}
}
```
Expand All @@ -194,4 +189,5 @@ source {

- Add SqlServer CDC Source Connector
- [Doc] Add SqlServer CDC Source Connector document ([3993](https://github.com/apache/incubator-seatunnel/pull/3993))
- [Feature] Support multi-table read ([4377](https://github.com/apache/incubator-seatunnel/pull/4377))

Original file line number Diff line number Diff line change
Expand Up @@ -23,43 +23,70 @@
public final class TablePath implements Serializable {
private static final long serialVersionUID = 1L;
private final String databaseName;
private final String schemaName;
private final String tableName;

private TablePath(String databaseName, String tableName) {
this(databaseName, null, tableName);
}

private TablePath(String databaseName, String schemaName, String tableName) {
this.databaseName = databaseName;
this.schemaName = schemaName;
this.tableName = tableName;
}

public static TablePath of(String fullName) {
String[] paths = fullName.split("\\.");

if (paths.length != 2) {
throw new IllegalArgumentException(
String.format(
"Cannot get split '%s' to get databaseName and tableName", fullName));
if (paths.length == 2) {
return new TablePath(paths[0], paths[1]);
}

return new TablePath(paths[0], paths[1]);
if (paths.length == 3) {
return new TablePath(paths[0], paths[1], paths[2]);
}
throw new IllegalArgumentException(
String.format("Cannot get split '%s' to get databaseName and tableName", fullName));
}

public static TablePath of(String databaseName, String tableName) {
return new TablePath(databaseName, tableName);
return of(databaseName, null, tableName);
}

public static TablePath of(String databaseName, String schemaName, String tableName) {
return new TablePath(databaseName, schemaName, tableName);
}

public String getDatabaseName() {
return databaseName;
}

public String getTableName() {
return tableName;
if (schemaName == null) {
return tableName;
}
return String.format("%s.%s", schemaName, tableName);
}

public String getFullName() {
return String.format("%s.%s", databaseName, tableName);
if (schemaName == null) {
return String.format("%s.%s", databaseName, tableName);
}
return String.format("%s.%s.%s", databaseName, schemaName, tableName);
}

public String getFullNameWithQuoted() {
return String.format("`%s`.`%s`", databaseName, tableName);
return getFullNameWithQuoted("`");
}

public String getFullNameWithQuoted(String quote) {
if (schemaName == null) {
return String.format(
"%s%s%s.%s%s%s", quote, databaseName, quote, quote, tableName, quote);
}
return String.format(
"%s%s%s.%s%s%s.%s%s%s",
quote, databaseName, quote, quote, schemaName, quote, quote, tableName, quote);
}

@Override
Expand All @@ -75,12 +102,13 @@ public boolean equals(Object o) {
TablePath that = (TablePath) o;

return Objects.equals(databaseName, that.databaseName)
&& Objects.equals(schemaName, that.schemaName)
&& Objects.equals(tableName, that.tableName);
}

@Override
public int hashCode() {
return Objects.hash(databaseName, tableName);
return Objects.hash(databaseName, schemaName, tableName);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public static Object[] rowToArray(ResultSet rs, int size) throws SQLException {
public static Long getMessageTimestamp(SourceRecord record) {
Schema schema = record.valueSchema();
Struct value = (Struct) record.value();
if (schema.field(Envelope.FieldName.SOURCE) == null) {
if (schema == null || schema.field(Envelope.FieldName.SOURCE) == null) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,13 @@ public void deserialize(SourceRecord record, Collector<SeaTunnelRow> collector)
Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE);
String databaseName = sourceStruct.getString(AbstractSourceInfo.DATABASE_NAME_KEY);
String tableName = sourceStruct.getString(AbstractSourceInfo.TABLE_NAME_KEY);
String tableId = TablePath.of(databaseName, tableName).toString();
String schemaName = null;
try {
schemaName = sourceStruct.getString(AbstractSourceInfo.SCHEMA_NAME_KEY);
} catch (Throwable e) {
// ignore
}
String tableId = TablePath.of(databaseName, schemaName, tableName).toString();
SeaTunnelRowDebeziumDeserializationConverters converters;
if (!multipleTableRowConverters.isEmpty()) {
converters = multipleTableRowConverters.get(tableId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-sqlserver</artifactId>
</dependency>

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-jdbc</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,11 @@ public SqlServerSourceConfig create(int subtask) {
}
if (tableList != null) {
// SqlServer identifier is of the form schemaName.tableName
props.setProperty(
"table.include.list",
String tableIncludeList =
tableList.stream()
.map(
tableStr -> {
return tableStr.substring(tableStr.indexOf(".") + 1);
})
.collect(Collectors.joining(",")));
.map(table -> table.substring(table.indexOf(".") + 1))
.collect(Collectors.joining(","));
props.setProperty("table.include.list", tableIncludeList);
}

if (dbzProperties != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
Expand All @@ -37,23 +39,32 @@
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.config.SqlServerSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.config.SqlServerSourceConfigFactory;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source.offset.LsnOffsetFactory;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sqlserver.SqlServerURLParser;

import com.google.auto.service.AutoService;
import io.debezium.connector.sqlserver.SqlServerConnection;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import lombok.NoArgsConstructor;

import java.time.ZoneId;

import static org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerConnectionUtils.createSqlServerConnection;
import static org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerTypeUtils.convertFromTable;

@NoArgsConstructor
@AutoService(SeaTunnelSource.class)
public class SqlServerIncrementalSource<T> extends IncrementalSource<T, JdbcSourceConfig>
implements SupportParallelism {

static final String IDENTIFIER = "SqlServer-CDC";

public SqlServerIncrementalSource(
ReadonlyConfig options, SeaTunnelDataType<SeaTunnelRow> dataType) {
super(options, dataType);
}

@Override
public String getPluginName() {
return IDENTIFIER;
Expand All @@ -75,6 +86,11 @@ public SourceConfig.Factory<JdbcSourceConfig> createSourceConfigFactory(Readonly
configFactory.fromReadonlyConfig(readonlyConfig);
configFactory.startupOptions(startupConfig);
configFactory.stopOptions(stopConfig);
JdbcUrlUtil.UrlInfo urlInfo =
SqlServerURLParser.parse(config.get(JdbcCatalogOptions.BASE_URL));
configFactory.originUrl(urlInfo.getOrigin());
configFactory.hostname(urlInfo.getHost());
configFactory.port(urlInfo.getPort());
return configFactory;
}

Expand All @@ -89,26 +105,27 @@ public DebeziumDeserializationSchema<T> createDebeziumDeserializationSchema(
config.get(JdbcSourceOptions.DEBEZIUM_PROPERTIES));
}

SqlServerSourceConfig sqlServerSourceConfig =
(SqlServerSourceConfig) this.configFactory.create(0);
TableId tableId =
this.dataSourceDialect.discoverDataCollections(sqlServerSourceConfig).get(0);

SqlServerConnection sqlServerConnection =
createSqlServerConnection(sqlServerSourceConfig.getDbzConfiguration());

Table table =
((SqlServerDialect) dataSourceDialect)
.queryTableSchema(sqlServerConnection, tableId)
.getTable();

SeaTunnelRowType seaTunnelRowType = convertFromTable(table);

SeaTunnelDataType<SeaTunnelRow> physicalRowType;
if (dataType == null) {
SqlServerSourceConfig sqlServerSourceConfig =
(SqlServerSourceConfig) this.configFactory.create(0);
TableId tableId =
this.dataSourceDialect.discoverDataCollections(sqlServerSourceConfig).get(0);
SqlServerConnection sqlServerConnection =
createSqlServerConnection(sqlServerSourceConfig.getDbzConfiguration());
Table table =
((SqlServerDialect) dataSourceDialect)
.queryTableSchema(sqlServerConnection, tableId)
.getTable();
physicalRowType = convertFromTable(table);
} else {
physicalRowType = dataType;
}
String zoneId = config.get(JdbcSourceOptions.SERVER_TIME_ZONE);
return (DebeziumDeserializationSchema<T>)
SeaTunnelRowDebeziumDeserializeSchema.builder()
.setPhysicalRowType(seaTunnelRowType)
.setResultTypeInfo(seaTunnelRowType)
.setPhysicalRowType(physicalRowType)
.setResultTypeInfo(physicalRowType)
.setServerTimeZone(ZoneId.of(zoneId))
.build();
}
Expand Down
Loading

0 comments on commit 7800f06

Please sign in to comment.