Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][CDC][SqlServer] Support multi-table read #4377

Merged
merged 1 commit into from
Mar 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 15 additions & 19 deletions docs/en/connector-v2/source/SqlServer-CDC.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@ describes how to setup the SqlServer CDC connector to run SQL queries against Sq

| name | type | required | default value |
|------------------------------------------------|----------|----------|---------------|
| hostname | String | Yes | - |
| port | Integer | No | 3306 |
| username | String | Yes | - |
| password | String | Yes | - |
| database-name | String | Yes | - |
| table-name | String | Yes | - |
| database-names | List | Yes | - |
| table-names | List | Yes | - |
| base-url | String | Yes | - |
| startup.mode | Enum | No | INITIAL |
| startup.timestamp | Long | No | - |
| startup.specific-offset.file | String | No | - |
Expand All @@ -47,14 +46,6 @@ describes how to setup the SqlServer CDC connector to run SQL queries against Sq
| format | Enum | No | DEFAULT |
| common-options | | no | - |

### hostname [String]

IP address or hostname of the database server.

### port [Integer]

Integer port number of the database server.

### username [String]

Name of the database to use when connecting to the database server.
Expand All @@ -63,13 +54,17 @@ Name of the database to use when connecting to the database server.

Password to use when connecting to the database server.

### database-name [String]
### database-names [List]

Database name of the database to monitor.

### table-name [String]
### table-names [List]

Table name is a combination of schema name and table name (databaseName.schemaName.tableName).

### base-url [String]

Table name is a combination of schema name and table name (schemaName.tableName).
URL has to be with database, like "jdbc:sqlserver://localhost:1433;databaseName=test".

### startup.mode [Enum]

Expand Down Expand Up @@ -178,12 +173,12 @@ Source plugin common parameters, please refer to [Source Common Options](common-
source {
SqlServer-CDC {
result_table_name = "customers"
hostname = "sqlserver-host"
port = "1433"

username = "sa"
password = "Password!"
database-name = "column_type_test"
table-name = "dbo.full_types"
database-names = ["exampledb"]
table-names = ["exampledb.dbo.table_x"]
base-url="jdbc:sqlserver://localhost:1433;databaseName=exampledb"
}
}
```
Expand All @@ -194,4 +189,5 @@ source {

- Add SqlServer CDC Source Connector
- [Doc] Add SqlServer CDC Source Connector document ([3993](https://github.com/apache/incubator-seatunnel/pull/3993))
- [Feature] Support multi-table read ([4377](https://github.com/apache/incubator-seatunnel/pull/4377))

Original file line number Diff line number Diff line change
Expand Up @@ -23,43 +23,70 @@
public final class TablePath implements Serializable {
private static final long serialVersionUID = 1L;
private final String databaseName;
private final String schemaName;
private final String tableName;

private TablePath(String databaseName, String tableName) {
this(databaseName, null, tableName);
}

private TablePath(String databaseName, String schemaName, String tableName) {
this.databaseName = databaseName;
this.schemaName = schemaName;
this.tableName = tableName;
}

public static TablePath of(String fullName) {
String[] paths = fullName.split("\\.");

if (paths.length != 2) {
throw new IllegalArgumentException(
String.format(
"Cannot get split '%s' to get databaseName and tableName", fullName));
if (paths.length == 2) {
return new TablePath(paths[0], paths[1]);
}

return new TablePath(paths[0], paths[1]);
if (paths.length == 3) {
return new TablePath(paths[0], paths[1], paths[2]);
}
throw new IllegalArgumentException(
String.format("Cannot get split '%s' to get databaseName and tableName", fullName));
}

public static TablePath of(String databaseName, String tableName) {
return new TablePath(databaseName, tableName);
return of(databaseName, null, tableName);
}

public static TablePath of(String databaseName, String schemaName, String tableName) {
return new TablePath(databaseName, schemaName, tableName);
}

public String getDatabaseName() {
return databaseName;
}

public String getTableName() {
return tableName;
if (schemaName == null) {
return tableName;
}
return String.format("%s.%s", schemaName, tableName);
}

public String getFullName() {
return String.format("%s.%s", databaseName, tableName);
if (schemaName == null) {
return String.format("%s.%s", databaseName, tableName);
}
return String.format("%s.%s.%s", databaseName, schemaName, tableName);
}

public String getFullNameWithQuoted() {
return String.format("`%s`.`%s`", databaseName, tableName);
return getFullNameWithQuoted("`");
}

public String getFullNameWithQuoted(String quote) {
if (schemaName == null) {
return String.format(
"%s%s%s.%s%s%s", quote, databaseName, quote, quote, tableName, quote);
}
return String.format(
"%s%s%s.%s%s%s.%s%s%s",
quote, databaseName, quote, quote, schemaName, quote, quote, tableName, quote);
}

@Override
Expand All @@ -75,12 +102,13 @@ public boolean equals(Object o) {
TablePath that = (TablePath) o;

return Objects.equals(databaseName, that.databaseName)
&& Objects.equals(schemaName, that.schemaName)
&& Objects.equals(tableName, that.tableName);
}

@Override
public int hashCode() {
return Objects.hash(databaseName, tableName);
return Objects.hash(databaseName, schemaName, tableName);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public static Object[] rowToArray(ResultSet rs, int size) throws SQLException {
public static Long getMessageTimestamp(SourceRecord record) {
Schema schema = record.valueSchema();
Struct value = (Struct) record.value();
if (schema.field(Envelope.FieldName.SOURCE) == null) {
if (schema == null || schema.field(Envelope.FieldName.SOURCE) == null) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,13 @@ public void deserialize(SourceRecord record, Collector<SeaTunnelRow> collector)
Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE);
String databaseName = sourceStruct.getString(AbstractSourceInfo.DATABASE_NAME_KEY);
String tableName = sourceStruct.getString(AbstractSourceInfo.TABLE_NAME_KEY);
String tableId = TablePath.of(databaseName, tableName).toString();
String schemaName = null;
try {
schemaName = sourceStruct.getString(AbstractSourceInfo.SCHEMA_NAME_KEY);
} catch (Throwable e) {
// ignore
}
String tableId = TablePath.of(databaseName, schemaName, tableName).toString();
SeaTunnelRowDebeziumDeserializationConverters converters;
if (!multipleTableRowConverters.isEmpty()) {
converters = multipleTableRowConverters.get(tableId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-sqlserver</artifactId>
</dependency>

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-jdbc</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,11 @@ public SqlServerSourceConfig create(int subtask) {
}
if (tableList != null) {
// SqlServer identifier is of the form schemaName.tableName
props.setProperty(
"table.include.list",
String tableIncludeList =
tableList.stream()
.map(
tableStr -> {
return tableStr.substring(tableStr.indexOf(".") + 1);
})
.collect(Collectors.joining(",")));
.map(table -> table.substring(table.indexOf(".") + 1))
.collect(Collectors.joining(","));
props.setProperty("table.include.list", tableIncludeList);
}

if (dbzProperties != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
Expand All @@ -37,23 +39,32 @@
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.config.SqlServerSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.config.SqlServerSourceConfigFactory;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source.offset.LsnOffsetFactory;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sqlserver.SqlServerURLParser;

import com.google.auto.service.AutoService;
import io.debezium.connector.sqlserver.SqlServerConnection;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import lombok.NoArgsConstructor;

import java.time.ZoneId;

import static org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerConnectionUtils.createSqlServerConnection;
import static org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerTypeUtils.convertFromTable;

@NoArgsConstructor
@AutoService(SeaTunnelSource.class)
public class SqlServerIncrementalSource<T> extends IncrementalSource<T, JdbcSourceConfig>
implements SupportParallelism {

static final String IDENTIFIER = "SqlServer-CDC";

public SqlServerIncrementalSource(
ReadonlyConfig options, SeaTunnelDataType<SeaTunnelRow> dataType) {
super(options, dataType);
}

@Override
public String getPluginName() {
return IDENTIFIER;
Expand All @@ -75,6 +86,11 @@ public SourceConfig.Factory<JdbcSourceConfig> createSourceConfigFactory(Readonly
configFactory.fromReadonlyConfig(readonlyConfig);
configFactory.startupOptions(startupConfig);
configFactory.stopOptions(stopConfig);
JdbcUrlUtil.UrlInfo urlInfo =
SqlServerURLParser.parse(config.get(JdbcCatalogOptions.BASE_URL));
configFactory.originUrl(urlInfo.getOrigin());
configFactory.hostname(urlInfo.getHost());
configFactory.port(urlInfo.getPort());
return configFactory;
}

Expand All @@ -89,26 +105,27 @@ public DebeziumDeserializationSchema<T> createDebeziumDeserializationSchema(
config.get(JdbcSourceOptions.DEBEZIUM_PROPERTIES));
}

SqlServerSourceConfig sqlServerSourceConfig =
(SqlServerSourceConfig) this.configFactory.create(0);
TableId tableId =
this.dataSourceDialect.discoverDataCollections(sqlServerSourceConfig).get(0);

SqlServerConnection sqlServerConnection =
createSqlServerConnection(sqlServerSourceConfig.getDbzConfiguration());

Table table =
((SqlServerDialect) dataSourceDialect)
.queryTableSchema(sqlServerConnection, tableId)
.getTable();

SeaTunnelRowType seaTunnelRowType = convertFromTable(table);

SeaTunnelDataType<SeaTunnelRow> physicalRowType;
if (dataType == null) {
SqlServerSourceConfig sqlServerSourceConfig =
(SqlServerSourceConfig) this.configFactory.create(0);
TableId tableId =
this.dataSourceDialect.discoverDataCollections(sqlServerSourceConfig).get(0);
SqlServerConnection sqlServerConnection =
createSqlServerConnection(sqlServerSourceConfig.getDbzConfiguration());
Table table =
((SqlServerDialect) dataSourceDialect)
.queryTableSchema(sqlServerConnection, tableId)
.getTable();
physicalRowType = convertFromTable(table);
} else {
physicalRowType = dataType;
}
String zoneId = config.get(JdbcSourceOptions.SERVER_TIME_ZONE);
return (DebeziumDeserializationSchema<T>)
SeaTunnelRowDebeziumDeserializeSchema.builder()
.setPhysicalRowType(seaTunnelRowType)
.setResultTypeInfo(seaTunnelRowType)
.setPhysicalRowType(physicalRowType)
.setResultTypeInfo(physicalRowType)
.setServerTimeZone(ZoneId.of(zoneId))
.build();
}
Expand Down
Loading