Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Jdbc] Support read multiple tables #5581

Merged
merged 42 commits into from
Oct 25, 2023
Merged
Changes from 1 commit
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
cbaeee8
[Feature][Jdbc] Support read table list through table-path/query-sql
hailin0 Sep 28, 2023
578db41
fix
hailin0 Oct 8, 2023
f95f4d3
Update CatalogUtils.java
hailin0 Oct 9, 2023
2f8bbe0
Merge branch 'dev' into dev-jdbc-support-read-tables
hailin0 Oct 9, 2023
c18fd61
add docs
hailin0 Oct 10, 2023
7cb0053
fix
hailin0 Oct 10, 2023
34e6d56
fix
hailin0 Oct 10, 2023
d3eb655
fix
hailin0 Oct 11, 2023
e632e97
fix
hailin0 Oct 11, 2023
237889e
fix
hailin0 Oct 16, 2023
c08fc43
Merge branch 'apache:dev' into dev-jdbc-support-read-tables
hailin0 Oct 16, 2023
8f52007
fix
hailin0 Oct 19, 2023
eb2de8f
Merge branch 'dev' into dev-jdbc-support-read-tables
hailin0 Oct 19, 2023
9c07b4d
fix
hailin0 Oct 19, 2023
139d334
fix
hailin0 Oct 19, 2023
a0628a3
fix
hailin0 Oct 19, 2023
10210dd
Merge branch 'apache:dev' into dev-jdbc-support-read-tables
hailin0 Oct 19, 2023
b264b16
fix
hailin0 Oct 19, 2023
369023c
fix
hailin0 Oct 19, 2023
f1e1c02
fix
hailin0 Oct 19, 2023
d260043
Merge branch 'apache:dev' into dev-jdbc-support-read-tables
hailin0 Oct 20, 2023
9ac7a29
Update connector-v2-features.md
hailin0 Oct 20, 2023
b24cd68
Update Jdbc.md
hailin0 Oct 20, 2023
fad1456
fix
hailin0 Oct 20, 2023
2ee320e
fix
hailin0 Oct 20, 2023
5a110eb
fix
hailin0 Oct 20, 2023
e0af13c
fix
hailin0 Oct 21, 2023
694ac06
fix catalog-name/dialectName
hailin0 Oct 21, 2023
b44612f
fix testcase
hailin0 Oct 22, 2023
01c8f1b
fix
hailin0 Oct 22, 2023
bc1303e
fix
hailin0 Oct 22, 2023
9f19cbf
fix
hailin0 Oct 22, 2023
19d43c1
fix
hailin0 Oct 23, 2023
e22cbc1
fix
hailin0 Oct 23, 2023
26c4298
add where_condition
hailin0 Oct 23, 2023
39aaa1f
fix mysql version
hailin0 Oct 23, 2023
68d059c
fix
hailin0 Oct 23, 2023
6789dba
add test
hailin0 Oct 23, 2023
7165a62
fix
hailin0 Oct 24, 2023
0c7d9a9
add testlog
hailin0 Oct 24, 2023
010148f
remove testlog
hailin0 Oct 24, 2023
b1905aa
update AbstractJdbcCatalog
hailin0 Oct 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Merge branch 'dev' into dev-jdbc-support-read-tables
  • Loading branch information
hailin0 committed Oct 19, 2023
commit eb2de8ff0f491c4305b2e4842f6ccc6838fddc2a
10 changes: 10 additions & 0 deletions docs/en/connector-v2/source/Jdbc.md
Original file line number Diff line number Diff line change
@@ -43,6 +43,7 @@ supports query SQL and can achieve projection effect.
| partition_lower_bound | Long | No | - |
| partition_num | Int | No | job parallelism |
| fetch_size | Int | No | 0 |
| properties | Map | No | - |
| table_path | String | No | - |
| table_list | Array | No | - |
| split.size | Int | No | 8096 |
@@ -101,6 +102,8 @@ The number of partition count, only support positive integer. default value is j
For queries that return a large number of objects, you can configure the row fetch size used in the query to
improve performance by reducing the number database hits required to satisfy the selection criteria. Zero means use jdbc default value.

<<<<<<< HEAD

### table_path

The path to the full path of table, you can use this configuration instead of `query`.
@@ -148,6 +151,13 @@ This configuration specifies the threshold of estimated shard count to trigger t
### split.inverse-sampling.rate

The inverse of the sampling rate used in the sample sharding strategy. For example, if this value is set to 1000, it means a 1/1000 sampling rate is applied during the sampling process. This option provides flexibility in controlling the granularity of the sampling, thus affecting the final number of shards. It's especially useful when dealing with very large datasets where a lower sampling rate is preferred. The default value is 1000.
=======================================================================================================================================================================================================================================================================================================================================================================================================================================================

### properties

Additional connection configuration parameters,when properties and URL have the same parameters, the priority is determined by the <br/>specific implementation of the driver. For example, in MySQL, properties take precedence over the URL.

>>>>>>> dev

### common options

1 change: 1 addition & 0 deletions docs/en/connector-v2/source/Mysql.md
Original file line number Diff line number Diff line change
@@ -74,6 +74,7 @@ Read external data source data through JDBC.
| partition_upper_bound | BigDecimal | No | - | The partition_column max value for scan, if not set SeaTunnel will query database get max value. |
| partition_num | Int | No | job parallelism | The number of partition count, only support positive integer. default value is job parallelism |
| fetch_size | Int | No | 0 | For queries that return a large number of objects,you can configure<br/> the row fetch size used in the query toimprove performance by<br/> reducing the number database hits required to satisfy the selection criteria.<br/> Zero means use jdbc default value. |
| properties | Map | No | - | Additional connection configuration parameters,when properties and URL have the same parameters, the priority is determined by the <br/>specific implementation of the driver. For example, in MySQL, properties take precedence over the URL. |
| table_path | Int | No | 0 | The path to the full path of table, you can use this configuration instead of `query`. <br/>examples: <br/>mysql: "testdb.table1" <br/>oracle: "test_schema.table1" <br/>sqlserver: "testdb.test_schema.table1" <br/>postgresql: "testdb.test_schema.table1" |
| table_list | Array | No | 0 | The list of tables to be read, you can use this configuration instead of `table_path` example: ```[{ table_path = "testdb.table1"}, {table_path = "testdb.table2", query = "select * id, name from testdb.table2"}]``` |
| split.size | Int | No | 8096 | The split size (number of rows) of table, captured tables are split into multiple splits when read of table. |
Original file line number Diff line number Diff line change
@@ -52,14 +52,6 @@ public static TablePath of(String fullName, boolean schemaFirst) {
}
return of(paths[0], null, paths[1]);
}

if (paths.length == 2) {
if (schemaFirst) {
return of(null, paths[0], paths[1]);
}
return of(paths[0], null, paths[1]);
}

if (paths.length == 3) {
return of(paths[0], paths[1], paths[2]);
}
Original file line number Diff line number Diff line change
@@ -33,7 +33,9 @@
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

@@ -244,6 +246,20 @@ default String getFieldIde(String identifier, String fieldIde) {
}
}

default Map<String, String> defaultParameter() {
return new HashMap<>();
}

default void connectionUrlParse(
String url, Map<String, String> info, Map<String, String> defaultParameter) {
defaultParameter.forEach(
(key, value) -> {
if (!url.contains(key) && !info.containsKey(key)) {
info.put(key, value);
}
});
}

default TablePath parse(String tablePath) {
return TablePath.of(tablePath);
}
Original file line number Diff line number Diff line change
@@ -106,6 +106,13 @@ public String extractTableName(TablePath tablePath) {
return tablePath.getTableName();
}

@Override
public Map<String, String> defaultParameter() {
HashMap<String, String> map = new HashMap<>();
map.put("rewriteBatchedStatements", "true");
return map;
}

@Override
public TablePath parse(String tablePath) {
return TablePath.of(tablePath, false);
Original file line number Diff line number Diff line change
@@ -79,7 +79,6 @@ public String getPluginName() {
public void prepare(Config pluginConfig) throws PrepareFailException {
ReadonlyConfig config = ReadonlyConfig.fromConfig(pluginConfig);
ConfigValidator.of(config).validate(new JdbcSourceFactory().optionRule());

this.jdbcSourceConfig = JdbcSourceConfig.of(config);
this.jdbcSourceTables =
JdbcCatalogUtils.getTables(
Original file line number Diff line number Diff line change
@@ -25,6 +25,8 @@
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectLoader;

import com.google.auto.service.AutoService;
import lombok.extern.slf4j.Slf4j;
@@ -64,6 +66,14 @@ public String factoryIdentifier() {
public <T, SplitT extends SourceSplit, StateT extends Serializable>
TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) {
JdbcSourceConfig config = JdbcSourceConfig.of(context.getOptions());
JdbcDialect jdbcDialect =
JdbcDialectLoader.load(
config.getJdbcConnectionConfig().getUrl(),
config.getJdbcConnectionConfig().getCompatibleMode());
jdbcDialect.connectionUrlParse(
config.getJdbcConnectionConfig().getUrl(),
config.getJdbcConnectionConfig().getProperties(),
jdbcDialect.defaultParameter());
return () -> (SeaTunnelSource<T, SplitT, StateT>) new JdbcSource(config);
}

@@ -81,6 +91,7 @@ public OptionRule optionRule() {
PARTITION_LOWER_BOUND,
PARTITION_NUM,
COMPATIBLE_MODE,
PROPERTIES,
QUERY,
TABLE_PATH,
TABLE_LIST,
Original file line number Diff line number Diff line change
@@ -26,10 +26,9 @@
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.factory.CatalogFactory;
import org.apache.seatunnel.api.table.factory.FactoryUtil;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogFactorySelector;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConnectionConfig;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceTableConfig;
@@ -76,7 +75,7 @@ public static Map<TablePath, JdbcSourceTable> getTables(
JdbcDialect jdbcDialect =
JdbcDialectLoader.load(
jdbcConnectionConfig.getUrl(), jdbcConnectionConfig.getCompatibleMode());
Optional<Catalog> catalog = findCatalog(jdbcConnectionConfig);
Optional<Catalog> catalog = findCatalog(jdbcConnectionConfig, jdbcDialect);
if (catalog.isPresent()) {
try (AbstractJdbcCatalog jdbcCatalog = (AbstractJdbcCatalog) catalog.get()) {
log.info("Loading catalog tables for catalog : {}", jdbcCatalog.getClass());
@@ -310,28 +309,6 @@ private static CatalogTable getCatalogTable(
return CatalogUtils.getCatalogTable(resultSetMetaData);
}

private static Optional<Catalog> findCatalog(JdbcConnectionConfig config) {
Optional<CatalogFactory> catalogFactory = CatalogFactorySelector.select(config.getUrl());
if (catalogFactory.isPresent()) {
ReadonlyConfig catalogConfig = extractCatalogConfig(config);
Catalog catalog = catalogFactory.get().createCatalog(DEFAULT_CATALOG, catalogConfig);
return Optional.of(catalog);
}

log.debug("No catalog found for jdbc url: {}", config.getUrl());
return Optional.empty();
}

private static ReadonlyConfig extractCatalogConfig(JdbcConnectionConfig config) {
Map<String, Object> catalogConfig = new HashMap<>();
catalogConfig.put(JdbcCatalogOptions.BASE_URL.key(), config.getUrl());
config.getUsername()
.ifPresent(val -> catalogConfig.put(JdbcCatalogOptions.USERNAME.key(), val));
config.getPassword()
.ifPresent(val -> catalogConfig.put(JdbcCatalogOptions.PASSWORD.key(), val));
return ReadonlyConfig.fromMap(catalogConfig);
}

private static TableIdentifier convert(TablePath tablePath) {
return convert(DEFAULT_CATALOG, tablePath);
}
@@ -351,4 +328,23 @@ private static Connection getConnection(JdbcConnectionConfig config) throws SQLE
}
return DriverManager.getConnection(config.getUrl());
}

public static Optional<Catalog> findCatalog(JdbcConnectionConfig config, JdbcDialect dialect) {
ReadonlyConfig catalogConfig = extractCatalogConfig(config);
return FactoryUtil.createOptionalCatalog(
dialect.dialectName(),
catalogConfig,
JdbcCatalogUtils.class.getClassLoader(),
dialect.dialectName());
}

private static ReadonlyConfig extractCatalogConfig(JdbcConnectionConfig config) {
Map<String, Object> catalogConfig = new HashMap<>();
catalogConfig.put(JdbcCatalogOptions.BASE_URL.key(), config.getUrl());
config.getUsername()
.ifPresent(val -> catalogConfig.put(JdbcCatalogOptions.USERNAME.key(), val));
config.getPassword()
.ifPresent(val -> catalogConfig.put(JdbcCatalogOptions.PASSWORD.key(), val));
return ReadonlyConfig.fromMap(catalogConfig);
}
}
You are viewing a condensed version of this merge commit. You can view the full changes here.