diff --git a/docs/en/connector-v2/source/SqlServer-CDC.md b/docs/en/connector-v2/source/SqlServer-CDC.md index deb4f8574d2..5b310a9471c 100644 --- a/docs/en/connector-v2/source/SqlServer-CDC.md +++ b/docs/en/connector-v2/source/SqlServer-CDC.md @@ -20,12 +20,11 @@ describes how to setup the SqlServer CDC connector to run SQL queries against Sq | name | type | required | default value | |------------------------------------------------|----------|----------|---------------| -| hostname | String | Yes | - | -| port | Integer | No | 3306 | | username | String | Yes | - | | password | String | Yes | - | -| database-name | String | Yes | - | -| table-name | String | Yes | - | +| database-names | List | Yes | - | +| table-names | List | Yes | - | +| base-url | String | Yes | - | | startup.mode | Enum | No | INITIAL | | startup.timestamp | Long | No | - | | startup.specific-offset.file | String | No | - | @@ -47,14 +46,6 @@ describes how to setup the SqlServer CDC connector to run SQL queries against Sq | format | Enum | No | DEFAULT | | common-options | | no | - | -### hostname [String] - -IP address or hostname of the database server. - -### port [Integer] - -Integer port number of the database server. - ### username [String] Name of the database to use when connecting to the database server. @@ -63,13 +54,17 @@ Name of the database to use when connecting to the database server. Password to use when connecting to the database server. -### database-name [String] +### database-names [List] Database name of the database to monitor. -### table-name [String] +### table-names [List] + +Table name is a combination of schema name and table name (databaseName.schemaName.tableName). + +### base-url [String] -Table name is a combination of schema name and table name (schemaName.tableName). +URL has to be with database, like "jdbc:sqlserver://localhost:1433;databaseName=test". ### startup.mode [Enum] @@ -178,12 +173,12 @@ Source plugin common parameters, please refer to [Source Common Options](common- source { SqlServer-CDC { result_table_name = "customers" - hostname = "sqlserver-host" - port = "1433" + username = "sa" password = "Password!" - database-name = "column_type_test" - table-name = "dbo.full_types" + database-names = ["exampledb"] + table-names = ["exampledb.dbo.table_x"] + base-url="jdbc:sqlserver://localhost:1433;databaseName=exampledb" } } ``` @@ -194,4 +189,5 @@ source { - Add SqlServer CDC Source Connector - [Doc] Add SqlServer CDC Source Connector document ([3993](https://github.com/apache/incubator-seatunnel/pull/3993)) +- [Feature] Support multi-table read ([4377](https://github.com/apache/incubator-seatunnel/pull/4377)) diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java index 0de425e63ab..7969e14349d 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java @@ -23,27 +23,38 @@ public final class TablePath implements Serializable { private static final long serialVersionUID = 1L; private final String databaseName; + private final String schemaName; private final String tableName; private TablePath(String databaseName, String tableName) { + this(databaseName, null, tableName); + } + + private TablePath(String databaseName, String schemaName, String tableName) { this.databaseName = databaseName; + this.schemaName = schemaName; this.tableName = tableName; } public static TablePath of(String fullName) { String[] paths = fullName.split("\\."); - if (paths.length != 2) { - throw new IllegalArgumentException( - String.format( - "Cannot get split '%s' to get databaseName and tableName", fullName)); + if (paths.length == 2) { + return new TablePath(paths[0], paths[1]); } - - return new TablePath(paths[0], paths[1]); + if (paths.length == 3) { + return new TablePath(paths[0], paths[1], paths[2]); + } + throw new IllegalArgumentException( + String.format("Cannot get split '%s' to get databaseName and tableName", fullName)); } public static TablePath of(String databaseName, String tableName) { - return new TablePath(databaseName, tableName); + return of(databaseName, null, tableName); + } + + public static TablePath of(String databaseName, String schemaName, String tableName) { + return new TablePath(databaseName, schemaName, tableName); } public String getDatabaseName() { @@ -51,15 +62,31 @@ public String getDatabaseName() { } public String getTableName() { - return tableName; + if (schemaName == null) { + return tableName; + } + return String.format("%s.%s", schemaName, tableName); } public String getFullName() { - return String.format("%s.%s", databaseName, tableName); + if (schemaName == null) { + return String.format("%s.%s", databaseName, tableName); + } + return String.format("%s.%s.%s", databaseName, schemaName, tableName); } public String getFullNameWithQuoted() { - return String.format("`%s`.`%s`", databaseName, tableName); + return getFullNameWithQuoted("`"); + } + + public String getFullNameWithQuoted(String quote) { + if (schemaName == null) { + return String.format( + "%s%s%s.%s%s%s", quote, databaseName, quote, quote, tableName, quote); + } + return String.format( + "%s%s%s.%s%s%s.%s%s%s", + quote, databaseName, quote, quote, schemaName, quote, quote, tableName, quote); } @Override @@ -75,12 +102,13 @@ public boolean equals(Object o) { TablePath that = (TablePath) o; return Objects.equals(databaseName, that.databaseName) + && Objects.equals(schemaName, that.schemaName) && Objects.equals(tableName, that.tableName); } @Override public int hashCode() { - return Objects.hash(databaseName, tableName); + return Objects.hash(databaseName, schemaName, tableName); } @Override diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/SourceRecordUtils.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/SourceRecordUtils.java index 8893347059d..e172b389b4a 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/SourceRecordUtils.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/SourceRecordUtils.java @@ -65,7 +65,7 @@ public static Object[] rowToArray(ResultSet rs, int size) throws SQLException { public static Long getMessageTimestamp(SourceRecord record) { Schema schema = record.valueSchema(); Struct value = (Struct) record.value(); - if (schema.field(Envelope.FieldName.SOURCE) == null) { + if (schema == null || schema.field(Envelope.FieldName.SOURCE) == null) { return null; } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java index 855656c3c32..d9cdf2f5051 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java @@ -122,7 +122,13 @@ public void deserialize(SourceRecord record, Collector collector) Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE); String databaseName = sourceStruct.getString(AbstractSourceInfo.DATABASE_NAME_KEY); String tableName = sourceStruct.getString(AbstractSourceInfo.TABLE_NAME_KEY); - String tableId = TablePath.of(databaseName, tableName).toString(); + String schemaName = null; + try { + schemaName = sourceStruct.getString(AbstractSourceInfo.SCHEMA_NAME_KEY); + } catch (Throwable e) { + // ignore + } + String tableId = TablePath.of(databaseName, schemaName, tableName).toString(); SeaTunnelRowDebeziumDeserializationConverters converters; if (!multipleTableRowConverters.isEmpty()) { converters = multipleTableRowConverters.get(tableId); diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/pom.xml b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/pom.xml index b351b180cbb..3d54cd4b1b9 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/pom.xml +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/pom.xml @@ -58,6 +58,12 @@ io.debezium debezium-connector-sqlserver + + + org.apache.seatunnel + connector-jdbc + ${project.version} + diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/config/SqlServerSourceConfigFactory.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/config/SqlServerSourceConfigFactory.java index c54af62606d..4cb63a26e69 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/config/SqlServerSourceConfigFactory.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/config/SqlServerSourceConfigFactory.java @@ -66,14 +66,11 @@ public SqlServerSourceConfig create(int subtask) { } if (tableList != null) { // SqlServer identifier is of the form schemaName.tableName - props.setProperty( - "table.include.list", + String tableIncludeList = tableList.stream() - .map( - tableStr -> { - return tableStr.substring(tableStr.indexOf(".") + 1); - }) - .collect(Collectors.joining(","))); + .map(table -> table.substring(table.indexOf(".") + 1)) + .collect(Collectors.joining(",")); + props.setProperty("table.include.list", tableIncludeList); } if (dbzProperties != null) { diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSource.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSource.java index 91408cd1d1c..a92d8831e24 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSource.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSource.java @@ -21,7 +21,9 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.source.SupportParallelism; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.common.utils.JdbcUrlUtil; import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig; import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig; import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect; @@ -37,23 +39,32 @@ import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.config.SqlServerSourceConfig; import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.config.SqlServerSourceConfigFactory; import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source.offset.LsnOffsetFactory; +import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions; +import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sqlserver.SqlServerURLParser; import com.google.auto.service.AutoService; import io.debezium.connector.sqlserver.SqlServerConnection; import io.debezium.relational.Table; import io.debezium.relational.TableId; +import lombok.NoArgsConstructor; import java.time.ZoneId; import static org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerConnectionUtils.createSqlServerConnection; import static org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerTypeUtils.convertFromTable; +@NoArgsConstructor @AutoService(SeaTunnelSource.class) public class SqlServerIncrementalSource extends IncrementalSource implements SupportParallelism { static final String IDENTIFIER = "SqlServer-CDC"; + public SqlServerIncrementalSource( + ReadonlyConfig options, SeaTunnelDataType dataType) { + super(options, dataType); + } + @Override public String getPluginName() { return IDENTIFIER; @@ -75,6 +86,11 @@ public SourceConfig.Factory createSourceConfigFactory(Readonly configFactory.fromReadonlyConfig(readonlyConfig); configFactory.startupOptions(startupConfig); configFactory.stopOptions(stopConfig); + JdbcUrlUtil.UrlInfo urlInfo = + SqlServerURLParser.parse(config.get(JdbcCatalogOptions.BASE_URL)); + configFactory.originUrl(urlInfo.getOrigin()); + configFactory.hostname(urlInfo.getHost()); + configFactory.port(urlInfo.getPort()); return configFactory; } @@ -89,26 +105,27 @@ public DebeziumDeserializationSchema createDebeziumDeserializationSchema( config.get(JdbcSourceOptions.DEBEZIUM_PROPERTIES)); } - SqlServerSourceConfig sqlServerSourceConfig = - (SqlServerSourceConfig) this.configFactory.create(0); - TableId tableId = - this.dataSourceDialect.discoverDataCollections(sqlServerSourceConfig).get(0); - - SqlServerConnection sqlServerConnection = - createSqlServerConnection(sqlServerSourceConfig.getDbzConfiguration()); - - Table table = - ((SqlServerDialect) dataSourceDialect) - .queryTableSchema(sqlServerConnection, tableId) - .getTable(); - - SeaTunnelRowType seaTunnelRowType = convertFromTable(table); - + SeaTunnelDataType physicalRowType; + if (dataType == null) { + SqlServerSourceConfig sqlServerSourceConfig = + (SqlServerSourceConfig) this.configFactory.create(0); + TableId tableId = + this.dataSourceDialect.discoverDataCollections(sqlServerSourceConfig).get(0); + SqlServerConnection sqlServerConnection = + createSqlServerConnection(sqlServerSourceConfig.getDbzConfiguration()); + Table table = + ((SqlServerDialect) dataSourceDialect) + .queryTableSchema(sqlServerConnection, tableId) + .getTable(); + physicalRowType = convertFromTable(table); + } else { + physicalRowType = dataType; + } String zoneId = config.get(JdbcSourceOptions.SERVER_TIME_ZONE); return (DebeziumDeserializationSchema) SeaTunnelRowDebeziumDeserializeSchema.builder() - .setPhysicalRowType(seaTunnelRowType) - .setResultTypeInfo(seaTunnelRowType) + .setPhysicalRowType(physicalRowType) + .setResultTypeInfo(physicalRowType) .setServerTimeZone(ZoneId.of(zoneId)) .build(); } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java index b476ab0138c..a7dca4c3319 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java @@ -19,14 +19,33 @@ import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceSplit; import org.apache.seatunnel.api.table.catalog.CatalogOptions; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.connector.TableSource; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.SupportMultipleTable; +import org.apache.seatunnel.api.table.factory.TableFactoryContext; import org.apache.seatunnel.api.table.factory.TableSourceFactory; +import org.apache.seatunnel.api.table.type.MultipleRowType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions; import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions; import org.apache.seatunnel.connectors.cdc.base.option.StartupMode; import org.apache.seatunnel.connectors.cdc.base.option.StopMode; +import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions; -public class SqlServerIncrementalSourceFactory implements TableSourceFactory { +import com.google.auto.service.AutoService; + +import java.io.Serializable; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +@AutoService(Factory.class) +public class SqlServerIncrementalSourceFactory implements TableSourceFactory, SupportMultipleTable { @Override public String factoryIdentifier() { @@ -37,13 +56,12 @@ public String factoryIdentifier() { public OptionRule optionRule() { return JdbcSourceOptions.getBaseRule() .required( - JdbcSourceOptions.HOSTNAME, JdbcSourceOptions.USERNAME, JdbcSourceOptions.PASSWORD, - CatalogOptions.TABLE_NAMES) + CatalogOptions.TABLE_NAMES, + JdbcCatalogOptions.BASE_URL) .optional( JdbcSourceOptions.DATABASE_NAMES, - JdbcSourceOptions.PORT, JdbcSourceOptions.SERVER_TIME_ZONE, JdbcSourceOptions.CONNECT_TIMEOUT_MS, JdbcSourceOptions.CONNECT_MAX_RETRIES, @@ -72,4 +90,29 @@ public OptionRule optionRule() { public Class getSourceClass() { return SqlServerIncrementalSource.class; } + + @Override + public + TableSource createSource(TableFactoryContext context) { + return () -> { + SeaTunnelDataType dataType; + if (context.getCatalogTables().size() == 1) { + dataType = + context.getCatalogTables().get(0).getTableSchema().toPhysicalRowDataType(); + } else { + Map rowTypeMap = new HashMap<>(); + for (CatalogTable catalogTable : context.getCatalogTables()) { + String tableId = catalogTable.getTableId().toTablePath().toString(); + rowTypeMap.put(tableId, catalogTable.getTableSchema().toPhysicalRowDataType()); + } + dataType = new MultipleRowType(rowTypeMap); + } + return new SqlServerIncrementalSource(context.getOptions(), dataType); + }; + } + + @Override + public SupportMultipleTable.Result applyTables(TableFactoryContext context) { + return SupportMultipleTable.Result.of(context.getCatalogTables(), Collections.emptyList()); + } } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotSplitReadTask.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotSplitReadTask.java index 132f7c7fbc7..8995ef4f5a4 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotSplitReadTask.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotSplitReadTask.java @@ -122,7 +122,7 @@ protected SnapshotResult doExecute( ctx.offset = offsetContext; final LsnOffset lowWatermark = SqlServerUtils.currentLsn(jdbcConnection); - log.info( + log.debug( "Snapshot step 1 - Determining low watermark {} for split {}", lowWatermark, snapshotSplit); @@ -130,11 +130,11 @@ protected SnapshotResult doExecute( dispatcher.dispatchWatermarkEvent( offsetContext.getPartition(), snapshotSplit, lowWatermark, WatermarkKind.LOW); - log.info("Snapshot step 2 - Snapshotting data"); + log.debug("Snapshot step 2 - Snapshotting data"); createDataEvents(ctx, snapshotSplit.getTableId()); final LsnOffset highWatermark = SqlServerUtils.currentLsn(jdbcConnection); - log.info( + log.debug( "Snapshot step 3 - Determining high watermark {} for split {}", highWatermark, snapshotSplit); @@ -174,7 +174,8 @@ private void createDataEventsForTable( throws InterruptedException { long exportStart = clock.currentTimeInMillis(); - log.info("Exporting data from split '{}' of table {}", snapshotSplit.splitId(), table.id()); + log.debug( + "Exporting data from split '{}' of table {}", snapshotSplit.splitId(), table.id()); final String selectSql = SqlServerUtils.buildSplitScanQuery( @@ -182,7 +183,7 @@ private void createDataEventsForTable( snapshotSplit.getSplitKeyType(), snapshotSplit.getSplitStart() == null, snapshotSplit.getSplitEnd() == null); - log.info( + log.debug( "For split '{}' of table {} using select statement: '{}'", snapshotSplit.splitId(), table.id(), @@ -213,7 +214,7 @@ private void createDataEventsForTable( } if (logTimer.expired()) { long stop = clock.currentTimeInMillis(); - log.info( + log.debug( "Exported {} records for split '{}' after {}", rows, snapshotSplit.splitId(), @@ -226,7 +227,7 @@ private void createDataEventsForTable( getChangeRecordEmitter(snapshotContext, table.id(), row), snapshotReceiver); } - log.info( + log.debug( "Finished exporting {} records for split '{}', total duration '{}'", rows, snapshotSplit.splitId(), diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java new file mode 100644 index 00000000000..9f110ced782 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sqlserver; + +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.ConstraintKey; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.catalog.PrimaryKey; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.catalog.exception.CatalogException; +import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException; +import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.common.utils.JdbcUrlUtil; +import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog; + +import org.apache.commons.lang3.tuple.Pair; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +public class SqlServerCatalog extends AbstractJdbcCatalog { + + private static final Set SYS_DATABASES = new HashSet<>(4); + + static { + SYS_DATABASES.add("master"); + SYS_DATABASES.add("tempdb"); + SYS_DATABASES.add("model"); + SYS_DATABASES.add("msdb"); + } + + public SqlServerCatalog( + String catalogName, String username, String pwd, JdbcUrlUtil.UrlInfo urlInfo) { + super(catalogName, username, pwd, urlInfo); + } + + @Override + public List listDatabases() throws CatalogException { + try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd); + PreparedStatement ps = conn.prepareStatement("SELECT NAME FROM SYS.DATABASES")) { + + List databases = new ArrayList<>(); + ResultSet rs = ps.executeQuery(); + + while (rs.next()) { + String databaseName = rs.getString(1); + if (!SYS_DATABASES.contains(databaseName)) { + databases.add(databaseName); + } + } + + return databases; + } catch (Exception e) { + throw new CatalogException( + String.format("Failed listing database in catalog %s", this.catalogName), e); + } + } + + @Override + public List listTables(String databaseName) + throws CatalogException, DatabaseNotExistException { + if (!databaseExists(databaseName)) { + throw new DatabaseNotExistException(this.catalogName, databaseName); + } + + String dbUrl = getUrlFromDatabaseName(databaseName); + try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd); + PreparedStatement ps = + conn.prepareStatement( + "SELECT TABLE_SCHEMA, TABLE_NAME FROM " + + databaseName + + ".INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE'")) { + + ResultSet rs = ps.executeQuery(); + + List tables = new ArrayList<>(); + + while (rs.next()) { + tables.add(rs.getString(1) + "." + rs.getString(2)); + } + + return tables; + } catch (Exception e) { + throw new CatalogException( + String.format("Failed listing database in catalog %s", catalogName), e); + } + } + + @Override + public CatalogTable getTable(TablePath tablePath) + throws CatalogException, TableNotExistException { + if (!tableExists(tablePath)) { + throw new TableNotExistException(catalogName, tablePath); + } + + String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName()); + try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) { + DatabaseMetaData metaData = conn.getMetaData(); + Optional primaryKey = + getPrimaryKey(metaData, tablePath.getDatabaseName(), tablePath.getTableName()); + List constraintKeys = + getConstraintKeys( + metaData, tablePath.getDatabaseName(), tablePath.getTableName()); + + try (PreparedStatement ps = + conn.prepareStatement( + String.format( + "SELECT * FROM %s WHERE 1 = 0;", + tablePath.getFullNameWithQuoted("\"")))) { + ResultSetMetaData tableMetaData = ps.getMetaData(); + TableSchema.Builder builder = TableSchema.builder(); + // add column + for (int i = 1; i <= tableMetaData.getColumnCount(); i++) { + String columnName = tableMetaData.getColumnName(i); + SeaTunnelDataType type = fromJdbcType(tableMetaData, i); + int columnDisplaySize = tableMetaData.getColumnDisplaySize(i); + String comment = tableMetaData.getColumnLabel(i); + boolean isNullable = + tableMetaData.isNullable(i) == ResultSetMetaData.columnNullable; + Object defaultValue = + getColumnDefaultValue(metaData, tablePath.getTableName(), columnName) + .orElse(null); + + PhysicalColumn physicalColumn = + PhysicalColumn.of( + columnName, + type, + columnDisplaySize, + isNullable, + defaultValue, + comment); + builder.column(physicalColumn); + } + // add primary key + primaryKey.ifPresent(builder::primaryKey); + // add constraint key + constraintKeys.forEach(builder::constraintKey); + TableIdentifier tableIdentifier = + TableIdentifier.of( + catalogName, tablePath.getDatabaseName(), tablePath.getTableName()); + return CatalogTable.of( + tableIdentifier, + builder.build(), + buildConnectorOptions(tablePath), + Collections.emptyList(), + ""); + } + + } catch (Exception e) { + throw new CatalogException( + String.format("Failed getting table %s", tablePath.getFullName()), e); + } + } + + @Override + protected boolean createTableInternal(TablePath tablePath, CatalogTable table) + throws CatalogException { + throw new UnsupportedOperationException("Unsupported create table"); + } + + @Override + protected boolean dropTableInternal(TablePath tablePath) throws CatalogException { + String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName()); + try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd); + PreparedStatement ps = + conn.prepareStatement( + String.format("DROP TABLE IF EXIST %s", tablePath.getFullName()))) { + // Will there exist concurrent drop for one table? + return ps.execute(); + } catch (SQLException e) { + throw new CatalogException( + String.format("Failed dropping table %s", tablePath.getFullName()), e); + } + } + + @Override + protected boolean createDatabaseInternal(String databaseName) throws CatalogException { + try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd); + PreparedStatement ps = + conn.prepareStatement( + String.format("CREATE DATABASE `%s`", databaseName))) { + return ps.execute(); + } catch (Exception e) { + throw new CatalogException( + String.format( + "Failed creating database %s in catalog %s", + databaseName, this.catalogName), + e); + } + } + + @Override + protected boolean dropDatabaseInternal(String databaseName) throws CatalogException { + try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd); + PreparedStatement ps = + conn.prepareStatement( + String.format("DROP DATABASE IF EXISTS `%s`;", databaseName))) { + return ps.execute(); + } catch (Exception e) { + throw new CatalogException( + String.format( + "Failed dropping database %s in catalog %s", + databaseName, this.catalogName), + e); + } + } + + @SuppressWarnings("unchecked") + private SeaTunnelDataType fromJdbcType(ResultSetMetaData metadata, int colIndex) + throws SQLException { + Pair> pair = + SqlServerType.parse(metadata.getColumnTypeName(colIndex)); + Map dataTypeProperties = new HashMap<>(); + dataTypeProperties.put( + SqlServerDataTypeConvertor.PRECISION, metadata.getPrecision(colIndex)); + dataTypeProperties.put(SqlServerDataTypeConvertor.SCALE, metadata.getScale(colIndex)); + return new SqlServerDataTypeConvertor().toSeaTunnelType(pair.getLeft(), dataTypeProperties); + } + + @SuppressWarnings("MagicNumber") + private Map buildConnectorOptions(TablePath tablePath) { + Map options = new HashMap<>(8); + options.put("connector", "jdbc"); + options.put("url", getUrlFromDatabaseName(tablePath.getDatabaseName())); + options.put("table-name", tablePath.getFullName()); + options.put("username", username); + options.put("password", pwd); + return options; + } + + private String getUrlFromDatabaseName(String databaseName) { + return baseUrl + ";databaseName=" + databaseName + ";" + suffix; + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalogFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalogFactory.java new file mode 100644 index 00000000000..a59b7e399f3 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalogFactory.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sqlserver; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.configuration.util.OptionValidationException; +import org.apache.seatunnel.api.table.catalog.Catalog; +import org.apache.seatunnel.api.table.factory.CatalogFactory; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.common.utils.JdbcUrlUtil; +import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions; + +import com.google.auto.service.AutoService; + +import java.util.Optional; + +@AutoService(Factory.class) +public class SqlServerCatalogFactory implements CatalogFactory { + + @Override + public String factoryIdentifier() { + return "SqlServer"; + } + + @Override + public Catalog createCatalog(String catalogName, ReadonlyConfig options) { + String url = options.get(JdbcCatalogOptions.BASE_URL); + JdbcUrlUtil.UrlInfo urlInfo = SqlServerURLParser.parse(url); + Optional defaultDatabase = urlInfo.getDefaultDatabase(); + if (!defaultDatabase.isPresent()) { + throw new OptionValidationException(JdbcCatalogOptions.BASE_URL); + } + return new SqlServerCatalog( + catalogName, + options.get(JdbcCatalogOptions.USERNAME), + options.get(JdbcCatalogOptions.PASSWORD), + urlInfo); + } + + @Override + public OptionRule optionRule() { + return JdbcCatalogOptions.BASE_RULE.build(); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerDataTypeConvertor.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerDataTypeConvertor.java new file mode 100644 index 00000000000..e04be54a56b --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerDataTypeConvertor.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sqlserver; + +import org.apache.seatunnel.api.table.catalog.DataTypeConvertException; +import org.apache.seatunnel.api.table.catalog.DataTypeConvertor; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.DecimalType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SqlType; +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException; + +import org.apache.commons.lang3.tuple.Pair; + +import com.google.auto.service.AutoService; +import lombok.NonNull; + +import java.util.Map; + +@AutoService(DataTypeConvertor.class) +public class SqlServerDataTypeConvertor implements DataTypeConvertor { + public static final String PRECISION = "precision"; + public static final String SCALE = "scale"; + public static final Integer DEFAULT_PRECISION = 10; + public static final Integer DEFAULT_SCALE = 0; + + @Override + public SeaTunnelDataType toSeaTunnelType(@NonNull String connectorDataType) { + Pair> sqlServerType = + SqlServerType.parse(connectorDataType); + return toSeaTunnelType(sqlServerType.getLeft(), sqlServerType.getRight()); + } + + @Override + public SeaTunnelDataType toSeaTunnelType( + @NonNull SqlServerType connectorDataType, Map dataTypeProperties) + throws DataTypeConvertException { + switch (connectorDataType) { + case BIT: + return BasicType.BOOLEAN_TYPE; + case TINYINT: + case SMALLINT: + return BasicType.SHORT_TYPE; + case INTEGER: + return BasicType.INT_TYPE; + case BIGINT: + return BasicType.LONG_TYPE; + case DECIMAL: + case NUMERIC: + case MONEY: + case SMALLMONEY: + int precision = (int) dataTypeProperties.getOrDefault(PRECISION, DEFAULT_PRECISION); + int scale = (int) dataTypeProperties.getOrDefault(SCALE, DEFAULT_SCALE); + return new DecimalType(precision, scale); + case REAL: + return BasicType.FLOAT_TYPE; + case FLOAT: + return BasicType.DOUBLE_TYPE; + case CHAR: + case NCHAR: + case VARCHAR: + case NTEXT: + case NVARCHAR: + case TEXT: + return BasicType.STRING_TYPE; + case DATE: + return LocalTimeType.LOCAL_DATE_TYPE; + case TIME: + return LocalTimeType.LOCAL_TIME_TYPE; + case DATETIME: + case DATETIME2: + case SMALLDATETIME: + case DATETIMEOFFSET: + return LocalTimeType.LOCAL_DATE_TIME_TYPE; + case TIMESTAMP: + case BINARY: + case VARBINARY: + case IMAGE: + return PrimitiveByteArrayType.INSTANCE; + case UNKNOWN: + default: + throw new JdbcConnectorException( + CommonErrorCode.UNSUPPORTED_OPERATION, + String.format("Doesn't support SQLSERVER type '%s'", connectorDataType)); + } + } + + @Override + public SqlServerType toConnectorType( + SeaTunnelDataType seaTunnelDataType, Map dataTypeProperties) + throws DataTypeConvertException { + SqlType sqlType = seaTunnelDataType.getSqlType(); + switch (sqlType) { + case STRING: + return SqlServerType.VARCHAR; + case BOOLEAN: + return SqlServerType.BIT; + case TINYINT: + return SqlServerType.TINYINT; + case SMALLINT: + return SqlServerType.SMALLINT; + case INT: + return SqlServerType.INTEGER; + case BIGINT: + return SqlServerType.BIGINT; + case FLOAT: + return SqlServerType.REAL; + case DOUBLE: + return SqlServerType.FLOAT; + case DECIMAL: + return SqlServerType.DECIMAL; + case BYTES: + return SqlServerType.BINARY; + case DATE: + return SqlServerType.DATE; + case TIME: + return SqlServerType.DATETIME; + case TIMESTAMP: + return SqlServerType.TIMESTAMP; + default: + throw new JdbcConnectorException( + CommonErrorCode.UNSUPPORTED_DATA_TYPE, + String.format("Doesn't support SqlServer type '%s' yet", sqlType)); + } + } + + @Override + public String getIdentity() { + return "SqlServer"; + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerType.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerType.java new file mode 100644 index 00000000000..e848498c93d --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerType.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sqlserver; + +import org.apache.commons.lang3.tuple.Pair; + +import com.google.common.collect.ImmutableMap; + +import java.math.BigDecimal; +import java.sql.SQLType; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +public enum SqlServerType implements SQLType { + UNKNOWN("unknown", 999, Object.class), + TINYINT("tinyint", java.sql.Types.TINYINT, Short.class), + BIT("bit", java.sql.Types.BIT, Boolean.class), + SMALLINT("smallint", java.sql.Types.SMALLINT, Short.class), + INTEGER("int", java.sql.Types.INTEGER, Integer.class), + BIGINT("bigint", java.sql.Types.BIGINT, Long.class), + FLOAT("float", java.sql.Types.DOUBLE, Double.class), + REAL("real", java.sql.Types.REAL, Float.class), + SMALLDATETIME("smalldatetime", microsoft.sql.Types.SMALLDATETIME, java.sql.Timestamp.class), + DATETIME("datetime", microsoft.sql.Types.DATETIME, java.sql.Timestamp.class), + DATE("date", java.sql.Types.DATE, java.sql.Date.class), + TIME("time", java.sql.Types.TIME, java.sql.Time.class), + DATETIME2("datetime2", java.sql.Types.TIMESTAMP, java.sql.Timestamp.class), + DATETIMEOFFSET( + "datetimeoffset", + microsoft.sql.Types.DATETIMEOFFSET, + microsoft.sql.DateTimeOffset.class), + SMALLMONEY("smallmoney", microsoft.sql.Types.SMALLMONEY, BigDecimal.class), + MONEY("money", microsoft.sql.Types.MONEY, BigDecimal.class), + CHAR("char", java.sql.Types.CHAR, String.class), + VARCHAR("varchar", java.sql.Types.VARCHAR, String.class), + VARCHARMAX("varchar", java.sql.Types.LONGVARCHAR, String.class), + TEXT("text", java.sql.Types.LONGVARCHAR, String.class), + NCHAR("nchar", -15, String.class), + NVARCHAR("nvarchar", -9, String.class), + NVARCHARMAX("nvarchar", -16, String.class), + NTEXT("ntext", -16, String.class), + BINARY("binary", java.sql.Types.BINARY, byte[].class), + VARBINARY("varbinary", java.sql.Types.VARBINARY, byte[].class), + VARBINARYMAX("varbinary", java.sql.Types.LONGVARBINARY, byte[].class), + IMAGE("image", java.sql.Types.LONGVARBINARY, byte[].class), + DECIMAL("decimal", java.sql.Types.DECIMAL, BigDecimal.class, true, true), + NUMERIC("numeric", java.sql.Types.NUMERIC, BigDecimal.class), + GUID("uniqueidentifier", microsoft.sql.Types.GUID, String.class), + SQL_VARIANT("sql_variant", microsoft.sql.Types.SQL_VARIANT, Object.class), + UDT("udt", java.sql.Types.VARBINARY, byte[].class), + XML("xml", -16, String.class), + TIMESTAMP("timestamp", java.sql.Types.BINARY, byte[].class), + GEOMETRY("geometry", microsoft.sql.Types.GEOMETRY, Object.class), + GEOGRAPHY("geography", microsoft.sql.Types.GEOMETRY, Object.class); + + private static final String PRECISION = "precision"; + private static final String SCALE = "scale"; + private static final String LENGTH = "length"; + + private final String name; + private final int jdbcType; + private final Class javaClass; + private final boolean isDecimal; + private final boolean hasLength; + + SqlServerType(String sqlServerTypeName, int jdbcType, Class javaClass) { + this(sqlServerTypeName, jdbcType, javaClass, false, false); + } + + SqlServerType( + String sqlServerTypeName, + int jdbcType, + Class javaClass, + boolean isDec, + boolean hasLength) { + this.name = sqlServerTypeName; + this.jdbcType = jdbcType; + this.javaClass = javaClass; + this.isDecimal = isDec; + this.hasLength = hasLength; + } + + @Override + public String getName() { + return name; + } + + @Override + public String getVendor() { + return "com.microsoft.sqlserver.jdbc"; + } + + @Override + public Integer getVendorTypeNumber() { + return jdbcType; + } + + public String getSqlTypeName(Map params) { + if (isDecimal) { + Object precision = params.get(PRECISION); + Object scale = params.get(SCALE); + return String.format("%s(%s, %s)", getName(), precision, scale); + } + if (hasLength) { + Object length = params.get(LENGTH); + return String.format("%s(%s)", getName(), length); + } + return getName(); + } + + public String getSqlTypeName() { + return getSqlTypeName(Collections.emptyMap()); + } + + public String getSqlTypeName(long length) { + return getSqlTypeName(Collections.singletonMap(LENGTH, length)); + } + + public String getSqlTypeName(long precision, long scale) { + return getSqlTypeName(ImmutableMap.of(PRECISION, precision, SCALE, scale)); + } + + public static Pair> parse(String fullTypeName) { + Map params = new HashMap<>(); + String typeName = fullTypeName; + if (fullTypeName.indexOf("(") != -1) { + typeName = fullTypeName.substring(0, fullTypeName.indexOf("(")).trim(); + String paramsStr = + fullTypeName.substring( + fullTypeName.indexOf("(") + 1, fullTypeName.indexOf(")")); + if (DECIMAL.getName().equalsIgnoreCase(typeName)) { + String[] precisionAndScale = paramsStr.split(","); + params.put(PRECISION, precisionAndScale[0].trim()); + params.put(SCALE, precisionAndScale[1].trim()); + } else { + params.put(LENGTH, paramsStr.trim()); + } + } + + SqlServerType sqlServerType = null; + for (SqlServerType type : SqlServerType.values()) { + if (type.getName().equalsIgnoreCase(typeName)) { + sqlServerType = type; + break; + } + } + Objects.requireNonNull(sqlServerType); + return Pair.of(sqlServerType, params); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerURLParser.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerURLParser.java new file mode 100644 index 00000000000..f8c20582717 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerURLParser.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sqlserver; + +import org.apache.seatunnel.common.utils.JdbcUrlUtil; + +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +public class SqlServerURLParser { + private static final int DEFAULT_PORT = 1433; + + public static JdbcUrlUtil.UrlInfo parse(String url) { + String serverName = ""; + Integer port = DEFAULT_PORT; + String dbInstance = null; + int hostIndex = url.indexOf("://"); + if (hostIndex <= 0) { + return null; + } + + Map props = Collections.emptyMap(); + String[] split = url.split(";", 2); + if (split.length > 1) { + props = parseQueryParams(split[1], ";"); + serverName = props.get("serverName"); + dbInstance = props.get("databaseName"); + if (props.containsKey("portNumber")) { + String portNumber = props.get("portNumber"); + try { + port = Integer.parseInt(portNumber); + } catch (NumberFormatException e) { + } + } + } + + String urlServerName = split[0].substring(hostIndex + 3); + if (!urlServerName.isEmpty()) { + serverName = urlServerName; + } + + int portLoc = serverName.indexOf(":"); + if (portLoc > 1) { + port = Integer.parseInt(serverName.substring(portLoc + 1)); + serverName = serverName.substring(0, portLoc); + } + + int instanceLoc = serverName.indexOf("\\"); + if (instanceLoc > 1) { + serverName = serverName.substring(0, instanceLoc); + } + + if (serverName.isEmpty()) { + return null; + } + + String suffix = + props.entrySet().stream() + .map(e -> e.getKey() + "=" + e.getValue()) + .collect(Collectors.joining(";", ";", "")); + suffix = Optional.ofNullable(suffix).orElse(""); + return new JdbcUrlUtil.UrlInfo( + url, + String.format("jdbc:sqlserver://%s:%s", serverName, port) + suffix, + serverName, + port, + dbInstance, + suffix); + } + + private static Map parseQueryParams(String query, String separator) { + if (query == null || query.isEmpty()) { + return Collections.emptyMap(); + } + Map queryParams = new LinkedHashMap<>(); + String[] pairs = query.split(separator); + for (String pair : pairs) { + try { + int idx = pair.indexOf("="); + String key = + idx > 0 + ? URLDecoder.decode( + pair.substring(0, idx), StandardCharsets.UTF_8.name()) + : pair; + if (!queryParams.containsKey(key)) { + String value = + idx > 0 && pair.length() > idx + 1 + ? URLDecoder.decode( + pair.substring(idx + 1), StandardCharsets.UTF_8.name()) + : null; + queryParams.put(key, value); + } + } catch (UnsupportedEncodingException e) { + // Ignore. + } + } + return queryParams; + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/sqlservercdc_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/sqlservercdc_to_console.conf index dce455b4159..6dee20a1791 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/sqlservercdc_to_console.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/sqlservercdc_to_console.conf @@ -29,12 +29,11 @@ source { # This is a example source plugin **only for test and demonstrate the feature source plugin** SqlServer-CDC { result_table_name = "customers" - hostname = "sqlserver-host" - port = "1433" username = "sa" password = "Password!" - database-name = "column_type_test" - table-name = "dbo.full_types" + database-names = ["column_type_test"] + table-names = ["column_type_test.dbo.full_types"] + base-url = "jdbc:sqlserver://sqlserver-host:1433;databaseName=column_type_test" } }