From 31c40452e2cfd51982288ff4c551be57f6b3bae6 Mon Sep 17 00:00:00 2001 From: jiayang Date: Thu, 17 Aug 2023 17:23:00 +0800 Subject: [PATCH 1/9] [feature] catalog add indent --- .../api/table/catalog/TablePath.java | 23 +++++++ .../jdbc/catalog/mysql/MySqlCatalog.java | 6 +- .../mysql/MysqlCreateTableSqlBuilder.java | 20 +++--- .../jdbc/catalog/oracle/OracleCatalog.java | 4 +- .../oracle/OracleCreateTableSqlBuilder.java | 54 ++++++++++----- .../jdbc/catalog/psql/PostgresCatalog.java | 4 +- .../psql/PostgresCreateTableSqlBuilder.java | 33 ++++++--- .../catalog/sqlserver/SqlServerCatalog.java | 2 +- .../SqlServerCreateTableSqlBuilder.java | 13 +++- .../jdbc/catalog/utils/CatalogUtils.java | 69 +++++++++++++++++++ .../seatunnel/jdbc/config/JdbcOptions.java | 7 ++ .../dialect/dialectenum/FieldIdeEnum.java | 34 +++++++++ .../seatunnel/jdbc/sink/JdbcSink.java | 12 +++- .../sql/MysqlCreateTableSqlBuilderTest.java | 2 +- 14 files changed, 238 insertions(+), 45 deletions(-) create mode 100644 seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java create mode 100644 seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dialectenum/FieldIdeEnum.java diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java index 7b2dd6d5533..1b7694754cb 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java @@ -57,6 +57,10 @@ public String getSchemaAndTableName() { return String.format("%s.%s", schemaName, tableName); } + public String getSchemaAndTableName(String quote) { + return String.format("%s%s%s.%s%s%s", quote, schemaName, quote, quote, tableName, quote); + } + public String getFullName() { if (schemaName == null) { return String.format("%s.%s", databaseName, tableName); @@ -78,6 +82,25 @@ public String getFullNameWithQuoted(String quote) { quote, databaseName, quote, quote, schemaName, quote, quote, tableName, quote); } + public String getFullNameWithQuoted(String quoteLeft, String quoteRight) { + if (schemaName == null) { + return String.format( + "%s%s%s.%s%s%s", + quoteLeft, databaseName, quoteRight, quoteLeft, tableName, quoteRight); + } + return String.format( + "%s%s%s.%s%s%s.%s%s%s", + quoteLeft, + databaseName, + quoteRight, + quoteLeft, + schemaName, + quoteRight, + quoteLeft, + tableName, + quoteRight); + } + @Override public String toString() { return getFullName(); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java index 267a68f0eef..fb7f29c3fb6 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java @@ -31,6 +31,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.common.utils.JdbcUrlUtil; import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog; +import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils; import com.mysql.cj.MysqlType; import com.mysql.cj.jdbc.result.ResultSetImpl; @@ -290,7 +291,10 @@ protected boolean createTableInternal(TablePath tablePath, CatalogTable table) String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName()); String createTableSql = - MysqlCreateTableSqlBuilder.builder(tablePath, table).build(table.getCatalogName()); + MysqlCreateTableSqlBuilder.builder(tablePath, table) + .build(table.getCatalogName(), table.getOptions().get("fieldIde")); + createTableSql = + CatalogUtils.getFieldIde(createTableSql, table.getOptions().get("fieldIde")); Connection connection = getConnection(dbUrl); log.info("create table sql: {}", createTableSql); try (PreparedStatement ps = connection.prepareStatement(createTableSql)) { diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java index 608062fc999..c226082d321 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java @@ -25,6 +25,7 @@ import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.api.table.type.DecimalType; import org.apache.seatunnel.api.table.type.SqlType; +import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -115,12 +116,13 @@ public MysqlCreateTableSqlBuilder comment(String comment) { return this; } - public String build(String catalogName) { + public String build(String catalogName, String fieldIde) { List sqls = new ArrayList<>(); sqls.add( String.format( "CREATE TABLE IF NOT EXISTS %s (\n%s\n)", - tableName, buildColumnsIdentifySql(catalogName))); + CatalogUtils.quoteIdentifier(tableName, fieldIde, "`"), + buildColumnsIdentifySql(catalogName, fieldIde))); if (engine != null) { sqls.add("ENGINE = " + engine); } @@ -136,13 +138,13 @@ public String build(String catalogName) { return String.join(" ", sqls) + ";"; } - private String buildColumnsIdentifySql(String catalogName) { + private String buildColumnsIdentifySql(String catalogName, String fieldIde) { List columnSqls = new ArrayList<>(); for (Column column : columns) { - columnSqls.add("\t" + buildColumnIdentifySql(column, catalogName)); + columnSqls.add("\t" + buildColumnIdentifySql(column, catalogName, fieldIde)); } if (primaryKey != null) { - columnSqls.add("\t" + buildPrimaryKeySql()); + columnSqls.add("\t" + buildPrimaryKeySql(fieldIde)); } if (CollectionUtils.isNotEmpty(constraintKeys)) { for (ConstraintKey constraintKey : constraintKeys) { @@ -155,9 +157,9 @@ private String buildColumnsIdentifySql(String catalogName) { return String.join(", \n", columnSqls); } - private String buildColumnIdentifySql(Column column, String catalogName) { + private String buildColumnIdentifySql(Column column, String catalogName, String fieldIde) { final List columnSqls = new ArrayList<>(); - columnSqls.add(column.getName()); + columnSqls.add(CatalogUtils.quoteIdentifier(column.getName(), fieldIde, "`")); if (StringUtils.equals(catalogName, "mysql")) { columnSqls.add(column.getSourceType()); } else { @@ -237,13 +239,13 @@ private String buildColumnIdentifySql(Column column, String catalogName) { return String.join(" ", columnSqls); } - private String buildPrimaryKeySql() { + private String buildPrimaryKeySql(String fieldIde) { String key = primaryKey.getColumnNames().stream() .map(columnName -> "`" + columnName + "`") .collect(Collectors.joining(", ")); // add sort type - return String.format("PRIMARY KEY (%s)", key); + return String.format("PRIMARY KEY (%s)", CatalogUtils.quoteIdentifier(key, fieldIde)); } private String buildConstraintKeySql(ConstraintKey constraintKey) { diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java index 261f4f7fb6f..6fe9e2a0940 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java @@ -148,7 +148,9 @@ public List listDatabases() throws CatalogException { @Override protected boolean createTableInternal(TablePath tablePath, CatalogTable table) throws CatalogException { - String createTableSql = new OracleCreateTableSqlBuilder(table).build(tablePath); + String createTableSql = + new OracleCreateTableSqlBuilder(table) + .build(tablePath, table.getOptions().get("fieldIde")); String[] createTableSqls = createTableSql.split(";"); for (String sql : createTableSqls) { log.info("create table sql: {}", sql); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCreateTableSqlBuilder.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCreateTableSqlBuilder.java index 984dd93e6a6..61dee202965 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCreateTableSqlBuilder.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCreateTableSqlBuilder.java @@ -23,6 +23,7 @@ import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.type.DecimalType; import org.apache.seatunnel.api.table.type.SqlType; +import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils; import org.apache.commons.lang3.StringUtils; @@ -45,20 +46,26 @@ public OracleCreateTableSqlBuilder(CatalogTable catalogTable) { } public String build(TablePath tablePath) { + return build(tablePath, ""); + } + + public String build(TablePath tablePath, String fieldIde) { StringBuilder createTableSql = new StringBuilder(); createTableSql .append("CREATE TABLE ") - .append(tablePath.getSchemaAndTableName()) + .append(tablePath.getSchemaAndTableName("\"")) .append(" (\n"); List columnSqls = - columns.stream().map(this::buildColumnSql).collect(Collectors.toList()); + columns.stream() + .map(column -> CatalogUtils.getFieldIde(buildColumnSql(column), fieldIde)) + .collect(Collectors.toList()); // Add primary key directly in the create table statement if (primaryKey != null && primaryKey.getColumnNames() != null && primaryKey.getColumnNames().size() > 0) { - columnSqls.add(buildPrimaryKeySql(primaryKey)); + columnSqls.add(buildPrimaryKeySql(primaryKey, fieldIde)); } createTableSql.append(String.join(",\n", columnSqls)); @@ -70,7 +77,9 @@ public String build(TablePath tablePath) { .map( column -> buildColumnCommentSql( - column, tablePath.getSchemaAndTableName())) + column, + tablePath.getSchemaAndTableName("\""), + fieldIde)) .collect(Collectors.toList()); if (!commentSqls.isEmpty()) { @@ -83,7 +92,7 @@ public String build(TablePath tablePath) { private String buildColumnSql(Column column) { StringBuilder columnSql = new StringBuilder(); - columnSql.append(column.getName()).append(" "); + columnSql.append("\"").append(column.getName()).append("\" "); String columnType = sourceCatalogName.equals("oracle") @@ -138,9 +147,13 @@ private String buildColumnType(Column column) { } } - private String buildPrimaryKeySql(PrimaryKey primaryKey) { + private String buildPrimaryKeySql(PrimaryKey primaryKey, String fieldIde) { String randomSuffix = UUID.randomUUID().toString().replace("-", "").substring(0, 4); - String columnNamesString = String.join(", ", primaryKey.getColumnNames()); + // String columnNamesString = String.join(", ", primaryKey.getColumnNames()); + String columnNamesString = + primaryKey.getColumnNames().stream() + .map(columnName -> "\"" + columnName + "\"") + .collect(Collectors.joining(", ")); // In Oracle database, the maximum length for an identifier is 30 characters. String primaryKeyStr = primaryKey.getPrimaryKey(); @@ -148,21 +161,26 @@ private String buildPrimaryKeySql(PrimaryKey primaryKey) { primaryKeyStr = primaryKeyStr.substring(0, 25); } - return "CONSTRAINT " - + primaryKeyStr - + "_" - + randomSuffix - + " PRIMARY KEY (" - + columnNamesString - + ")"; + return CatalogUtils.getFieldIde( + "CONSTRAINT " + + primaryKeyStr + + "_" + + randomSuffix + + " PRIMARY KEY (" + + columnNamesString + + ")", + fieldIde); } - private String buildColumnCommentSql(Column column, String tableName) { + private String buildColumnCommentSql(Column column, String tableName, String fieldIde) { StringBuilder columnCommentSql = new StringBuilder(); - columnCommentSql.append("COMMENT ON COLUMN ").append(tableName).append("."); columnCommentSql - .append(column.getName()) - .append(" IS '") + .append(CatalogUtils.quoteIdentifier("COMMENT ON COLUMN ", fieldIde)) + .append(tableName) + .append("."); + columnCommentSql + .append(CatalogUtils.quoteIdentifier(column.getName(), fieldIde, "\"")) + .append(CatalogUtils.quoteIdentifier(" IS '", fieldIde)) .append(column.getComment()) .append("'"); return columnCommentSql.toString(); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java index e3507666d08..b73a7492dc1 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java @@ -331,7 +331,9 @@ private void buildColumn(ResultSet resultSet, TableSchema.Builder builder) throw @Override protected boolean createTableInternal(TablePath tablePath, CatalogTable table) throws CatalogException { - String createTableSql = new PostgresCreateTableSqlBuilder(table).build(tablePath); + String createTableSql = + new PostgresCreateTableSqlBuilder(table) + .build(tablePath, table.getOptions().get("fieldIde")); String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName()); Connection conn = getConnection(dbUrl); log.info("create table sql: {}", createTableSql); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilder.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilder.java index 85f4468bef9..d646ed053d7 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilder.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilder.java @@ -23,6 +23,7 @@ import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.type.DecimalType; import org.apache.seatunnel.api.table.type.SqlType; +import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils; import org.apache.commons.lang3.StringUtils; @@ -46,14 +47,23 @@ public PostgresCreateTableSqlBuilder(CatalogTable catalogTable) { } public String build(TablePath tablePath) { + return build(tablePath, ""); + } + + public String build(TablePath tablePath, String fieldIde) { StringBuilder createTableSql = new StringBuilder(); createTableSql - .append("CREATE TABLE IF NOT EXISTS ") - .append(tablePath.getSchemaAndTableName()) + .append(CatalogUtils.quoteIdentifier("CREATE TABLE IF NOT EXISTS ", fieldIde)) + .append(tablePath.getSchemaAndTableName("\"")) .append(" (\n"); List columnSqls = - columns.stream().map(this::buildColumnSql).collect(Collectors.toList()); + columns.stream() + .map( + column -> + CatalogUtils.quoteIdentifier( + buildColumnSql(column), fieldIde)) + .collect(Collectors.toList()); createTableSql.append(String.join(",\n", columnSqls)); createTableSql.append("\n);"); @@ -64,7 +74,9 @@ public String build(TablePath tablePath) { .map( columns -> buildColumnCommentSql( - columns, tablePath.getSchemaAndTableName())) + columns, + tablePath.getSchemaAndTableName("\""), + fieldIde)) .collect(Collectors.toList()); if (!commentSqls.isEmpty()) { @@ -77,7 +89,7 @@ public String build(TablePath tablePath) { private String buildColumnSql(Column column) { StringBuilder columnSql = new StringBuilder(); - columnSql.append(column.getName()).append(" "); + columnSql.append("\"").append(column.getName()).append("\" "); // For simplicity, assume the column type in SeaTunnelDataType is the same as in PostgreSQL String columnType = @@ -131,12 +143,15 @@ private String buildColumnType(Column column) { } } - private String buildColumnCommentSql(Column column, String tableName) { + private String buildColumnCommentSql(Column column, String tableName, String fieldIde) { StringBuilder columnCommentSql = new StringBuilder(); - columnCommentSql.append("COMMENT ON COLUMN ").append(tableName).append("."); columnCommentSql - .append(column.getName()) - .append(" IS '") + .append(CatalogUtils.quoteIdentifier("COMMENT ON COLUMN ", fieldIde)) + .append(tableName) + .append("."); + columnCommentSql + .append(CatalogUtils.quoteIdentifier(column.getName(), fieldIde, "\"")) + .append(CatalogUtils.quoteIdentifier(" IS '", fieldIde)) .append(column.getComment()) .append("'"); return columnCommentSql.toString(); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java index ea04c60bff5..6ea1584ea99 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java @@ -345,7 +345,7 @@ protected boolean dropDatabaseInternal(String databaseName) throws CatalogExcept try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd); PreparedStatement ps = conn.prepareStatement( - String.format("DROP DATABASE IF EXISTS `%s`;", databaseName))) { + String.format("DROP DATABASE IF EXISTS [%s];", databaseName))) { return ps.execute(); } catch (Exception e) { throw new CatalogException( diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilder.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilder.java index 0bec148b372..1774518dcdd 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilder.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilder.java @@ -25,6 +25,7 @@ import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.api.table.type.DecimalType; import org.apache.seatunnel.api.table.type.SqlType; +import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -117,7 +118,7 @@ public SqlServerCreateTableSqlBuilder comment(String comment) { public String build(TablePath tablePath, CatalogTable catalogTable) { List sqls = new ArrayList<>(); - String sqlTableName = tablePath.getFullName(); + String sqlTableName = tablePath.getFullNameWithQuoted("[", "]"); Map columnComments = new HashMap<>(); sqls.add( String.format( @@ -137,6 +138,9 @@ public String build(TablePath tablePath, CatalogTable catalogTable) { sqls.add("COLLATE = " + collate); } String sqlTableSql = String.join(" ", sqls) + ";"; + sqlTableSql = + CatalogUtils.quoteIdentifier( + sqlTableSql, catalogTable.getOptions().get("fieldIde")); StringBuilder tableAndColumnComment = new StringBuilder(); if (comment != null) { sqls.add("COMMENT = '" + comment + "'"); @@ -185,7 +189,7 @@ private String buildColumnsIdentifySql(String catalogName, Map c private String buildColumnIdentifySql( Column column, String catalogName, Map columnComments) { final List columnSqls = new ArrayList<>(); - columnSqls.add(column.getName()); + columnSqls.add("[" + column.getName() + "]"); String tyNameDef = ""; if (StringUtils.equals(catalogName, "sqlserver")) { columnSqls.add(column.getSourceType()); @@ -267,7 +271,10 @@ private String buildColumnIdentifySql( private String buildPrimaryKeySql() { // .map(columnName -> "`" + columnName + "`") - String key = String.join(", ", primaryKey.getColumnNames()); + String key = + primaryKey.getColumnNames().stream() + .map(columnName -> "[" + columnName + "]") + .collect(Collectors.joining(", ")); // add sort type return String.format("PRIMARY KEY (%s)", key); } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java new file mode 100644 index 00000000000..72c08ba4bc5 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils; + +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum; + +public class CatalogUtils { + public static String getFieldIde(String identifier, String fieldIde) { + if (fieldIde == null) { + return identifier; + } + switch (FieldIdeEnum.valueOf(fieldIde.toUpperCase())) { + case LOWERCASE: + return identifier.toLowerCase(); + case UPPERCASE: + return identifier.toUpperCase(); + default: + return identifier; + } + } + + public static String quoteIdentifier(String identifier, String fieldIde, String quote) { + if (identifier.contains(".")) { + String[] parts = identifier.split("\\."); + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < parts.length - 1; i++) { + sb.append(quote).append(parts[i]).append(quote).append("."); + } + return sb.append(quote) + .append(getFieldIde(parts[parts.length - 1], fieldIde)) + .append(quote) + .toString(); + } + + return quote + getFieldIde(identifier, fieldIde) + quote; + } + + public static String quoteIdentifier(String identifier, String fieldIde) { + return getFieldIde(identifier, fieldIde); + } + + public static String quoteTableIdentifier(String identifier, String fieldIde) { + if (identifier.contains(".")) { + String[] parts = identifier.split("\\."); + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < parts.length - 1; i++) { + sb.append(parts[i]).append("."); + } + return sb.append(getFieldIde(parts[parts.length - 1], fieldIde)).toString(); + } + + return getFieldIde(identifier, fieldIde); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java index 207995d0b45..85750128591 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.Options; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum; import java.math.BigDecimal; import java.util.List; @@ -155,4 +156,10 @@ public interface JdbcOptions { .intType() .noDefaultValue() .withDescription("partition num"); + + Option FIELD_IDE = + Options.key("field_ide") + .enumType(FieldIdeEnum.class) + .noDefaultValue() + .withDescription("Whether case conversion is required"); } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dialectenum/FieldIdeEnum.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dialectenum/FieldIdeEnum.java new file mode 100644 index 00000000000..39f95210623 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dialectenum/FieldIdeEnum.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum; + +public enum FieldIdeEnum { + ORIGINAL("original"), // Original string form + UPPERCASE("uppercase"), // Convert to uppercase + LOWERCASE("lowercase"); // Convert to lowercase + + private final String value; + + FieldIdeEnum(String value) { + this.value = value; + } + + public String getValue() { + return value; + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java index c23619b5aad..59f0be60783 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java @@ -38,10 +38,13 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils; +import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions; import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig; import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectLoader; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum; import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcAggregatedCommitInfo; import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSinkState; import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo; @@ -206,14 +209,21 @@ public void handleSaveMode(DataSaveMode saveMode) { catalogFactory.factoryIdentifier(), ReadonlyConfig.fromMap(new HashMap<>(catalogOptions)))) { catalog.open(); + FieldIdeEnum fieldIdeEnum = config.get(JdbcOptions.FIELD_IDE); + String fieldIde = + fieldIdeEnum == null + ? FieldIdeEnum.ORIGINAL.getValue() + : fieldIdeEnum.getValue(); TablePath tablePath = TablePath.of( jdbcSinkConfig.getDatabase() + "." - + jdbcSinkConfig.getTable()); + + CatalogUtils.quoteTableIdentifier( + jdbcSinkConfig.getTable(), fieldIde)); if (!catalog.databaseExists(jdbcSinkConfig.getDatabase())) { catalog.createDatabase(tablePath, true); } + catalogTable.getOptions().put("fieldIde", fieldIde); if (!catalog.tableExists(tablePath)) { catalog.createTable(tablePath, catalogTable, true); } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java index 3de5c65bf8d..dc69c58a028 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java @@ -90,7 +90,7 @@ public void testBuild() { "User table"); String createTableSql = - MysqlCreateTableSqlBuilder.builder(tablePath, catalogTable).build("mysql"); + MysqlCreateTableSqlBuilder.builder(tablePath, catalogTable).build("mysql", ""); // create table sql is change; The old unit tests are no longer applicable String expect = "CREATE TABLE IF NOT EXISTS test_table (\n" From d27107a006bce4f129dbbd3032bcb0e151c09d2e Mon Sep 17 00:00:00 2001 From: jiayang Date: Mon, 21 Aug 2023 18:19:16 +0800 Subject: [PATCH 2/9] [bugfix] field ide is "" --- .../seatunnel/jdbc/catalog/utils/CatalogUtils.java | 4 +++- .../catalog/sql/MysqlCreateTableSqlBuilderTest.java | 12 ++++++------ 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java index 72c08ba4bc5..3dd5aba1e19 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java @@ -19,9 +19,11 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum; +import org.apache.commons.lang3.StringUtils; + public class CatalogUtils { public static String getFieldIde(String identifier, String fieldIde) { - if (fieldIde == null) { + if (StringUtils.isEmpty(fieldIde)) { return identifier; } switch (FieldIdeEnum.valueOf(fieldIde.toUpperCase())) { diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java index dc69c58a028..53f6aaf5170 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java @@ -93,12 +93,12 @@ public void testBuild() { MysqlCreateTableSqlBuilder.builder(tablePath, catalogTable).build("mysql", ""); // create table sql is change; The old unit tests are no longer applicable String expect = - "CREATE TABLE IF NOT EXISTS test_table (\n" - + "\tid null NOT NULL COMMENT 'id', \n" - + "\tname null NOT NULL COMMENT 'name', \n" - + "\tage null NULL COMMENT 'age', \n" - + "\tcreateTime null NULL COMMENT 'createTime', \n" - + "\tlastUpdateTime null NULL COMMENT 'lastUpdateTime', \n" + "CREATE TABLE IF NOT EXISTS `test_table` (\n" + + "\t`id` null NOT NULL COMMENT 'id', \n" + + "\t`name` null NOT NULL COMMENT 'name', \n" + + "\t`age` null NULL COMMENT 'age', \n" + + "\t`createTime` null NULL COMMENT 'createTime', \n" + + "\t`lastUpdateTime` null NULL COMMENT 'lastUpdateTime', \n" + "\tPRIMARY KEY (`id`)\n" + ") COMMENT = 'User table';"; CONSOLE.println(expect); From e298797d5bc4262705d634308de665b149f74af1 Mon Sep 17 00:00:00 2001 From: jiayang Date: Mon, 21 Aug 2023 18:23:04 +0800 Subject: [PATCH 3/9] [bugfix] empty update blank --- .../connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java index 3dd5aba1e19..4b60f92d80a 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java @@ -23,7 +23,7 @@ public class CatalogUtils { public static String getFieldIde(String identifier, String fieldIde) { - if (StringUtils.isEmpty(fieldIde)) { + if (StringUtils.isBlank(fieldIde)) { return identifier; } switch (FieldIdeEnum.valueOf(fieldIde.toUpperCase())) { From 9228126951ed9f6de5f97f0b4821e894c54c94ef Mon Sep 17 00:00:00 2001 From: jiayang Date: Tue, 5 Sep 2023 11:46:00 +0800 Subject: [PATCH 4/9] [bugfix] filed update identifierCase --- .../jdbc/catalog/mysql/MySqlCatalog.java | 6 ++-- .../mysql/MysqlCreateTableSqlBuilder.java | 30 +++++++++++------- .../jdbc/catalog/oracle/OracleCatalog.java | 4 +-- .../oracle/OracleCreateTableSqlBuilder.java | 31 +++++++++---------- .../jdbc/catalog/psql/PostgresCatalog.java | 4 +-- .../psql/PostgresCreateTableSqlBuilder.java | 22 ++++++------- .../SqlServerCreateTableSqlBuilder.java | 14 ++++++--- .../jdbc/catalog/utils/CatalogUtils.java | 24 +++++++------- .../seatunnel/jdbc/config/JdbcOptions.java | 8 ++--- ...{FieldIdeEnum.java => IdentifierCase.java} | 4 +-- .../seatunnel/jdbc/sink/JdbcSink.java | 16 +++++----- .../sql/MysqlCreateTableSqlBuilderTest.java | 2 +- 12 files changed, 85 insertions(+), 80 deletions(-) rename seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dialectenum/{FieldIdeEnum.java => IdentifierCase.java} (94%) diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java index fb7f29c3fb6..e364ae22a98 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java @@ -291,10 +291,10 @@ protected boolean createTableInternal(TablePath tablePath, CatalogTable table) String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName()); String createTableSql = - MysqlCreateTableSqlBuilder.builder(tablePath, table) - .build(table.getCatalogName(), table.getOptions().get("fieldIde")); + MysqlCreateTableSqlBuilder.builder(tablePath, table).build(table.getCatalogName()); createTableSql = - CatalogUtils.getFieldIde(createTableSql, table.getOptions().get("fieldIde")); + CatalogUtils.getIdentifierCase( + createTableSql, table.getOptions().get("identifierCase")); Connection connection = getConnection(dbUrl); log.info("create table sql: {}", createTableSql); try (PreparedStatement ps = connection.prepareStatement(createTableSql)) { diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java index c226082d321..a3353f7db1d 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java @@ -56,6 +56,8 @@ public class MysqlCreateTableSqlBuilder { private MysqlDataTypeConvertor mysqlDataTypeConvertor; + private String identifierCase; + private MysqlCreateTableSqlBuilder(String tableName) { checkNotNull(tableName, "tableName must not be null"); this.tableName = tableName; @@ -77,7 +79,8 @@ public static MysqlCreateTableSqlBuilder builder( .charset(null) .primaryKey(tableSchema.getPrimaryKey()) .constraintKeys(tableSchema.getConstraintKeys()) - .addColumn(tableSchema.getColumns()); + .addColumn(tableSchema.getColumns()) + .identifierCase(catalogTable.getOptions().get("identifierCase")); } public MysqlCreateTableSqlBuilder addColumn(List columns) { @@ -91,6 +94,11 @@ public MysqlCreateTableSqlBuilder primaryKey(PrimaryKey primaryKey) { return this; } + public MysqlCreateTableSqlBuilder identifierCase(String identifierCase) { + this.identifierCase = identifierCase; + return this; + } + public MysqlCreateTableSqlBuilder constraintKeys(List constraintKeys) { this.constraintKeys = constraintKeys; return this; @@ -116,13 +124,13 @@ public MysqlCreateTableSqlBuilder comment(String comment) { return this; } - public String build(String catalogName, String fieldIde) { + public String build(String catalogName) { List sqls = new ArrayList<>(); sqls.add( String.format( "CREATE TABLE IF NOT EXISTS %s (\n%s\n)", - CatalogUtils.quoteIdentifier(tableName, fieldIde, "`"), - buildColumnsIdentifySql(catalogName, fieldIde))); + CatalogUtils.quoteIdentifier(tableName, identifierCase, "`"), + buildColumnsIdentifySql(catalogName))); if (engine != null) { sqls.add("ENGINE = " + engine); } @@ -138,13 +146,13 @@ public String build(String catalogName, String fieldIde) { return String.join(" ", sqls) + ";"; } - private String buildColumnsIdentifySql(String catalogName, String fieldIde) { + private String buildColumnsIdentifySql(String catalogName) { List columnSqls = new ArrayList<>(); for (Column column : columns) { - columnSqls.add("\t" + buildColumnIdentifySql(column, catalogName, fieldIde)); + columnSqls.add("\t" + buildColumnIdentifySql(column, catalogName)); } if (primaryKey != null) { - columnSqls.add("\t" + buildPrimaryKeySql(fieldIde)); + columnSqls.add("\t" + buildPrimaryKeySql()); } if (CollectionUtils.isNotEmpty(constraintKeys)) { for (ConstraintKey constraintKey : constraintKeys) { @@ -157,9 +165,9 @@ private String buildColumnsIdentifySql(String catalogName, String fieldIde) { return String.join(", \n", columnSqls); } - private String buildColumnIdentifySql(Column column, String catalogName, String fieldIde) { + private String buildColumnIdentifySql(Column column, String catalogName) { final List columnSqls = new ArrayList<>(); - columnSqls.add(CatalogUtils.quoteIdentifier(column.getName(), fieldIde, "`")); + columnSqls.add(CatalogUtils.quoteIdentifier(column.getName(), identifierCase, "`")); if (StringUtils.equals(catalogName, "mysql")) { columnSqls.add(column.getSourceType()); } else { @@ -239,13 +247,13 @@ private String buildColumnIdentifySql(Column column, String catalogName, String return String.join(" ", columnSqls); } - private String buildPrimaryKeySql(String fieldIde) { + private String buildPrimaryKeySql() { String key = primaryKey.getColumnNames().stream() .map(columnName -> "`" + columnName + "`") .collect(Collectors.joining(", ")); // add sort type - return String.format("PRIMARY KEY (%s)", CatalogUtils.quoteIdentifier(key, fieldIde)); + return String.format("PRIMARY KEY (%s)", CatalogUtils.quoteIdentifier(key, identifierCase)); } private String buildConstraintKeySql(ConstraintKey constraintKey) { diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java index 6fe9e2a0940..261f4f7fb6f 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java @@ -148,9 +148,7 @@ public List listDatabases() throws CatalogException { @Override protected boolean createTableInternal(TablePath tablePath, CatalogTable table) throws CatalogException { - String createTableSql = - new OracleCreateTableSqlBuilder(table) - .build(tablePath, table.getOptions().get("fieldIde")); + String createTableSql = new OracleCreateTableSqlBuilder(table).build(tablePath); String[] createTableSqls = createTableSql.split(";"); for (String sql : createTableSqls) { log.info("create table sql: {}", sql); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCreateTableSqlBuilder.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCreateTableSqlBuilder.java index 61dee202965..6cec1fda685 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCreateTableSqlBuilder.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCreateTableSqlBuilder.java @@ -37,19 +37,17 @@ public class OracleCreateTableSqlBuilder { private PrimaryKey primaryKey; private OracleDataTypeConvertor oracleDataTypeConvertor; private String sourceCatalogName; + private String identifierCase; public OracleCreateTableSqlBuilder(CatalogTable catalogTable) { this.columns = catalogTable.getTableSchema().getColumns(); this.primaryKey = catalogTable.getTableSchema().getPrimaryKey(); this.oracleDataTypeConvertor = new OracleDataTypeConvertor(); this.sourceCatalogName = catalogTable.getCatalogName(); + this.identifierCase = catalogTable.getOptions().get("identifierCase"); } public String build(TablePath tablePath) { - return build(tablePath, ""); - } - - public String build(TablePath tablePath, String fieldIde) { StringBuilder createTableSql = new StringBuilder(); createTableSql .append("CREATE TABLE ") @@ -58,14 +56,17 @@ public String build(TablePath tablePath, String fieldIde) { List columnSqls = columns.stream() - .map(column -> CatalogUtils.getFieldIde(buildColumnSql(column), fieldIde)) + .map( + column -> + CatalogUtils.getIdentifierCase( + buildColumnSql(column), identifierCase)) .collect(Collectors.toList()); // Add primary key directly in the create table statement if (primaryKey != null && primaryKey.getColumnNames() != null && primaryKey.getColumnNames().size() > 0) { - columnSqls.add(buildPrimaryKeySql(primaryKey, fieldIde)); + columnSqls.add(buildPrimaryKeySql(primaryKey)); } createTableSql.append(String.join(",\n", columnSqls)); @@ -77,9 +78,7 @@ public String build(TablePath tablePath, String fieldIde) { .map( column -> buildColumnCommentSql( - column, - tablePath.getSchemaAndTableName("\""), - fieldIde)) + column, tablePath.getSchemaAndTableName("\""))) .collect(Collectors.toList()); if (!commentSqls.isEmpty()) { @@ -147,7 +146,7 @@ private String buildColumnType(Column column) { } } - private String buildPrimaryKeySql(PrimaryKey primaryKey, String fieldIde) { + private String buildPrimaryKeySql(PrimaryKey primaryKey) { String randomSuffix = UUID.randomUUID().toString().replace("-", "").substring(0, 4); // String columnNamesString = String.join(", ", primaryKey.getColumnNames()); String columnNamesString = @@ -161,7 +160,7 @@ private String buildPrimaryKeySql(PrimaryKey primaryKey, String fieldIde) { primaryKeyStr = primaryKeyStr.substring(0, 25); } - return CatalogUtils.getFieldIde( + return CatalogUtils.getIdentifierCase( "CONSTRAINT " + primaryKeyStr + "_" @@ -169,18 +168,18 @@ private String buildPrimaryKeySql(PrimaryKey primaryKey, String fieldIde) { + " PRIMARY KEY (" + columnNamesString + ")", - fieldIde); + identifierCase); } - private String buildColumnCommentSql(Column column, String tableName, String fieldIde) { + private String buildColumnCommentSql(Column column, String tableName) { StringBuilder columnCommentSql = new StringBuilder(); columnCommentSql - .append(CatalogUtils.quoteIdentifier("COMMENT ON COLUMN ", fieldIde)) + .append(CatalogUtils.quoteIdentifier("COMMENT ON COLUMN ", identifierCase)) .append(tableName) .append("."); columnCommentSql - .append(CatalogUtils.quoteIdentifier(column.getName(), fieldIde, "\"")) - .append(CatalogUtils.quoteIdentifier(" IS '", fieldIde)) + .append(CatalogUtils.quoteIdentifier(column.getName(), identifierCase, "\"")) + .append(CatalogUtils.quoteIdentifier(" IS '", identifierCase)) .append(column.getComment()) .append("'"); return columnCommentSql.toString(); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java index b73a7492dc1..e3507666d08 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java @@ -331,9 +331,7 @@ private void buildColumn(ResultSet resultSet, TableSchema.Builder builder) throw @Override protected boolean createTableInternal(TablePath tablePath, CatalogTable table) throws CatalogException { - String createTableSql = - new PostgresCreateTableSqlBuilder(table) - .build(tablePath, table.getOptions().get("fieldIde")); + String createTableSql = new PostgresCreateTableSqlBuilder(table).build(tablePath); String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName()); Connection conn = getConnection(dbUrl); log.info("create table sql: {}", createTableSql); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilder.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilder.java index d646ed053d7..0e0d4d150fb 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilder.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilder.java @@ -38,22 +38,20 @@ public class PostgresCreateTableSqlBuilder { private PrimaryKey primaryKey; private PostgresDataTypeConvertor postgresDataTypeConvertor; private String sourceCatalogName; + private String identifierCase; public PostgresCreateTableSqlBuilder(CatalogTable catalogTable) { this.columns = catalogTable.getTableSchema().getColumns(); this.primaryKey = catalogTable.getTableSchema().getPrimaryKey(); this.postgresDataTypeConvertor = new PostgresDataTypeConvertor(); this.sourceCatalogName = catalogTable.getCatalogName(); + this.identifierCase = catalogTable.getOptions().get("identifierCase"); } public String build(TablePath tablePath) { - return build(tablePath, ""); - } - - public String build(TablePath tablePath, String fieldIde) { StringBuilder createTableSql = new StringBuilder(); createTableSql - .append(CatalogUtils.quoteIdentifier("CREATE TABLE IF NOT EXISTS ", fieldIde)) + .append(CatalogUtils.quoteIdentifier("CREATE TABLE IF NOT EXISTS ", identifierCase)) .append(tablePath.getSchemaAndTableName("\"")) .append(" (\n"); @@ -62,7 +60,7 @@ public String build(TablePath tablePath, String fieldIde) { .map( column -> CatalogUtils.quoteIdentifier( - buildColumnSql(column), fieldIde)) + buildColumnSql(column), identifierCase)) .collect(Collectors.toList()); createTableSql.append(String.join(",\n", columnSqls)); @@ -74,9 +72,7 @@ public String build(TablePath tablePath, String fieldIde) { .map( columns -> buildColumnCommentSql( - columns, - tablePath.getSchemaAndTableName("\""), - fieldIde)) + columns, tablePath.getSchemaAndTableName("\""))) .collect(Collectors.toList()); if (!commentSqls.isEmpty()) { @@ -143,15 +139,15 @@ private String buildColumnType(Column column) { } } - private String buildColumnCommentSql(Column column, String tableName, String fieldIde) { + private String buildColumnCommentSql(Column column, String tableName) { StringBuilder columnCommentSql = new StringBuilder(); columnCommentSql - .append(CatalogUtils.quoteIdentifier("COMMENT ON COLUMN ", fieldIde)) + .append(CatalogUtils.quoteIdentifier("COMMENT ON COLUMN ", identifierCase)) .append(tableName) .append("."); columnCommentSql - .append(CatalogUtils.quoteIdentifier(column.getName(), fieldIde, "\"")) - .append(CatalogUtils.quoteIdentifier(" IS '", fieldIde)) + .append(CatalogUtils.quoteIdentifier(column.getName(), identifierCase, "\"")) + .append(CatalogUtils.quoteIdentifier(" IS '", identifierCase)) .append(column.getComment()) .append("'"); return columnCommentSql.toString(); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilder.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilder.java index 1774518dcdd..d3fdaa074b6 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilder.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilder.java @@ -56,6 +56,8 @@ public class SqlServerCreateTableSqlBuilder { private SqlServerDataTypeConvertor sqlServerDataTypeConvertor; + private String identifierCase; + private SqlServerCreateTableSqlBuilder(String tableName) { checkNotNull(tableName, "tableName must not be null"); this.tableName = tableName; @@ -77,7 +79,8 @@ public static SqlServerCreateTableSqlBuilder builder( .charset(null) .primaryKey(tableSchema.getPrimaryKey()) .constraintKeys(tableSchema.getConstraintKeys()) - .addColumn(tableSchema.getColumns()); + .addColumn(tableSchema.getColumns()) + .identifierCase(catalogTable.getOptions().get("identifierCase")); } public SqlServerCreateTableSqlBuilder addColumn(List columns) { @@ -91,6 +94,11 @@ public SqlServerCreateTableSqlBuilder primaryKey(PrimaryKey primaryKey) { return this; } + public SqlServerCreateTableSqlBuilder identifierCase(String identifierCase) { + this.identifierCase = identifierCase; + return this; + } + public SqlServerCreateTableSqlBuilder constraintKeys(List constraintKeys) { this.constraintKeys = constraintKeys; return this; @@ -138,9 +146,7 @@ public String build(TablePath tablePath, CatalogTable catalogTable) { sqls.add("COLLATE = " + collate); } String sqlTableSql = String.join(" ", sqls) + ";"; - sqlTableSql = - CatalogUtils.quoteIdentifier( - sqlTableSql, catalogTable.getOptions().get("fieldIde")); + sqlTableSql = CatalogUtils.quoteIdentifier(sqlTableSql, identifierCase); StringBuilder tableAndColumnComment = new StringBuilder(); if (comment != null) { sqls.add("COMMENT = '" + comment + "'"); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java index 4b60f92d80a..1bcb9acc2e8 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java @@ -17,16 +17,16 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils; -import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.IdentifierCase; import org.apache.commons.lang3.StringUtils; public class CatalogUtils { - public static String getFieldIde(String identifier, String fieldIde) { - if (StringUtils.isBlank(fieldIde)) { + public static String getIdentifierCase(String identifier, String identifierCase) { + if (StringUtils.isBlank(identifierCase)) { return identifier; } - switch (FieldIdeEnum.valueOf(fieldIde.toUpperCase())) { + switch (IdentifierCase.valueOf(identifierCase.toUpperCase())) { case LOWERCASE: return identifier.toLowerCase(); case UPPERCASE: @@ -36,7 +36,7 @@ public static String getFieldIde(String identifier, String fieldIde) { } } - public static String quoteIdentifier(String identifier, String fieldIde, String quote) { + public static String quoteIdentifier(String identifier, String identifierCase, String quote) { if (identifier.contains(".")) { String[] parts = identifier.split("\\."); StringBuilder sb = new StringBuilder(); @@ -44,28 +44,28 @@ public static String quoteIdentifier(String identifier, String fieldIde, String sb.append(quote).append(parts[i]).append(quote).append("."); } return sb.append(quote) - .append(getFieldIde(parts[parts.length - 1], fieldIde)) + .append(getIdentifierCase(parts[parts.length - 1], identifierCase)) .append(quote) .toString(); } - return quote + getFieldIde(identifier, fieldIde) + quote; + return quote + getIdentifierCase(identifier, identifierCase) + quote; } - public static String quoteIdentifier(String identifier, String fieldIde) { - return getFieldIde(identifier, fieldIde); + public static String quoteIdentifier(String identifier, String identifierCase) { + return getIdentifierCase(identifier, identifierCase); } - public static String quoteTableIdentifier(String identifier, String fieldIde) { + public static String quoteTableIdentifier(String identifier, String identifierCase) { if (identifier.contains(".")) { String[] parts = identifier.split("\\."); StringBuilder sb = new StringBuilder(); for (int i = 0; i < parts.length - 1; i++) { sb.append(parts[i]).append("."); } - return sb.append(getFieldIde(parts[parts.length - 1], fieldIde)).toString(); + return sb.append(getIdentifierCase(parts[parts.length - 1], identifierCase)).toString(); } - return getFieldIde(identifier, fieldIde); + return getIdentifierCase(identifier, identifierCase); } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java index b01fc872f31..15aeb051ea9 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java @@ -19,7 +19,7 @@ import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.Options; -import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.IdentifierCase; import java.math.BigDecimal; import java.util.List; @@ -156,9 +156,9 @@ public interface JdbcOptions { .noDefaultValue() .withDescription("partition num"); - Option FIELD_IDE = - Options.key("field_ide") - .enumType(FieldIdeEnum.class) + Option FIELD_IDE = + Options.key("identifier_case") + .enumType(IdentifierCase.class) .noDefaultValue() .withDescription("Whether case conversion is required"); } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dialectenum/FieldIdeEnum.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dialectenum/IdentifierCase.java similarity index 94% rename from seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dialectenum/FieldIdeEnum.java rename to seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dialectenum/IdentifierCase.java index 39f95210623..fcd67f6d1d6 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dialectenum/FieldIdeEnum.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dialectenum/IdentifierCase.java @@ -17,14 +17,14 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum; -public enum FieldIdeEnum { +public enum IdentifierCase { ORIGINAL("original"), // Original string form UPPERCASE("uppercase"), // Convert to uppercase LOWERCASE("lowercase"); // Convert to lowercase private final String value; - FieldIdeEnum(String value) { + IdentifierCase(String value) { this.value = value; } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java index 59f0be60783..6de49940c4c 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java @@ -44,7 +44,7 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectLoader; -import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.IdentifierCase; import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcAggregatedCommitInfo; import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSinkState; import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo; @@ -209,21 +209,21 @@ public void handleSaveMode(DataSaveMode saveMode) { catalogFactory.factoryIdentifier(), ReadonlyConfig.fromMap(new HashMap<>(catalogOptions)))) { catalog.open(); - FieldIdeEnum fieldIdeEnum = config.get(JdbcOptions.FIELD_IDE); - String fieldIde = - fieldIdeEnum == null - ? FieldIdeEnum.ORIGINAL.getValue() - : fieldIdeEnum.getValue(); + IdentifierCase identifierCaseEnum = config.get(JdbcOptions.FIELD_IDE); + String identifierCase = + identifierCaseEnum == null + ? IdentifierCase.ORIGINAL.getValue() + : identifierCaseEnum.getValue(); TablePath tablePath = TablePath.of( jdbcSinkConfig.getDatabase() + "." + CatalogUtils.quoteTableIdentifier( - jdbcSinkConfig.getTable(), fieldIde)); + jdbcSinkConfig.getTable(), identifierCase)); if (!catalog.databaseExists(jdbcSinkConfig.getDatabase())) { catalog.createDatabase(tablePath, true); } - catalogTable.getOptions().put("fieldIde", fieldIde); + catalogTable.getOptions().put("identifierCase", identifierCase); if (!catalog.tableExists(tablePath)) { catalog.createTable(tablePath, catalogTable, true); } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java index 53f6aaf5170..355830258de 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java @@ -90,7 +90,7 @@ public void testBuild() { "User table"); String createTableSql = - MysqlCreateTableSqlBuilder.builder(tablePath, catalogTable).build("mysql", ""); + MysqlCreateTableSqlBuilder.builder(tablePath, catalogTable).build("mysql"); // create table sql is change; The old unit tests are no longer applicable String expect = "CREATE TABLE IF NOT EXISTS `test_table` (\n" From e7326d5feb23b7fdf835cdf24a50cc7e5480c7e5 Mon Sep 17 00:00:00 2001 From: jiayang Date: Tue, 5 Sep 2023 12:06:39 +0800 Subject: [PATCH 5/9] [feature] filed update identifierCase --- .../seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java | 2 +- .../seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java index 15aeb051ea9..55939fcfae6 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java @@ -156,7 +156,7 @@ public interface JdbcOptions { .noDefaultValue() .withDescription("partition num"); - Option FIELD_IDE = + Option IDENTIFIER_CASE = Options.key("identifier_case") .enumType(IdentifierCase.class) .noDefaultValue() diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java index 6de49940c4c..cc59be820dc 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java @@ -209,7 +209,7 @@ public void handleSaveMode(DataSaveMode saveMode) { catalogFactory.factoryIdentifier(), ReadonlyConfig.fromMap(new HashMap<>(catalogOptions)))) { catalog.open(); - IdentifierCase identifierCaseEnum = config.get(JdbcOptions.FIELD_IDE); + IdentifierCase identifierCaseEnum = config.get(JdbcOptions.IDENTIFIER_CASE); String identifierCase = identifierCaseEnum == null ? IdentifierCase.ORIGINAL.getValue() From 59a8a49f0bd5496ec50439266d8ce154476614af Mon Sep 17 00:00:00 2001 From: jiayang Date: Tue, 5 Sep 2023 16:50:04 +0800 Subject: [PATCH 6/9] [fix] remove docs --- .../oracle/OracleCreateTableSqlBuilder.java | 6 ------ .../psql/PostgresCreateTableSqlBuilder.java | 6 ------ .../sqlserver/SqlServerCreateTableSqlBuilder.java | 14 +------------- 3 files changed, 1 insertion(+), 25 deletions(-) diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCreateTableSqlBuilder.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCreateTableSqlBuilder.java index 6cec1fda685..6c942283a14 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCreateTableSqlBuilder.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCreateTableSqlBuilder.java @@ -103,11 +103,6 @@ private String buildColumnSql(Column column) { columnSql.append(" NOT NULL"); } - // if (column.getDefaultValue() != null) { - // columnSql.append(" DEFAULT - // '").append(column.getDefaultValue().toString()).append("'"); - // } - return columnSql.toString(); } @@ -148,7 +143,6 @@ private String buildColumnType(Column column) { private String buildPrimaryKeySql(PrimaryKey primaryKey) { String randomSuffix = UUID.randomUUID().toString().replace("-", "").substring(0, 4); - // String columnNamesString = String.join(", ", primaryKey.getColumnNames()); String columnNamesString = primaryKey.getColumnNames().stream() .map(columnName -> "\"" + columnName + "\"") diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilder.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilder.java index 0e0d4d150fb..bd10464a19a 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilder.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilder.java @@ -104,12 +104,6 @@ private String buildColumnSql(Column column) { columnSql.append(" PRIMARY KEY"); } - // Add default value if exists - // if (column.getDefaultValue() != null) { - // columnSql.append(" DEFAULT - // '").append(column.getDefaultValue().toString()).append("'"); - // } - return columnSql.toString(); } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilder.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilder.java index d3fdaa074b6..c19cc1df7bd 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilder.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilder.java @@ -254,19 +254,7 @@ private String buildColumnIdentifySql( } else { columnSqls.add("NOT NULL"); } - // default value - // if (column.getDefaultValue() != null) { - // String defaultValue = "'" + column.getDefaultValue().toString() + "'"; - // if (StringUtils.equals(SqlServerType.BINARY.getName(), tyNameDef) - // && defaultValue.contains("b'")) { - // String rep = defaultValue.replace("b", "").replace("'", ""); - // defaultValue = "0x" + Integer.toHexString(Integer.parseInt(rep)); - // } else if (StringUtils.equals(SqlServerType.BIT.getName(), tyNameDef) - // && defaultValue.contains("b'")) { - // defaultValue = defaultValue.replace("b", "").replace("'", ""); - // } - // columnSqls.add("DEFAULT " + defaultValue); - // } + // comment if (column.getComment() != null) { columnComments.put(column.getName(), column.getComment()); From 130f6fb6af3d3a75d1565ed385d93a3014bb3529 Mon Sep 17 00:00:00 2001 From: jiayang Date: Sat, 9 Sep 2023 12:09:23 +0800 Subject: [PATCH 7/9] [feature] spotless --- .../mysql/MysqlCreateTableSqlBuilder.java | 14 +++++------ .../oracle/OracleCreateTableSqlBuilder.java | 19 +++++++-------- .../psql/PostgresCreateTableSqlBuilder.java | 14 +++++------ .../SqlServerCreateTableSqlBuilder.java | 10 ++++---- .../jdbc/catalog/utils/CatalogUtils.java | 24 +++++++++---------- .../seatunnel/jdbc/config/JdbcOptions.java | 8 +++---- ...{IdentifierCase.java => FieldIdeEnum.java} | 4 ++-- .../seatunnel/jdbc/sink/JdbcSink.java | 16 ++++++------- 8 files changed, 53 insertions(+), 56 deletions(-) rename seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dialectenum/{IdentifierCase.java => FieldIdeEnum.java} (94%) diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java index 1262b0653fe..3430de04b5a 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java @@ -56,7 +56,7 @@ public class MysqlCreateTableSqlBuilder { private MysqlDataTypeConvertor mysqlDataTypeConvertor; - private String identifierCase; + private String fieldIde; private MysqlCreateTableSqlBuilder(String tableName) { checkNotNull(tableName, "tableName must not be null"); @@ -80,7 +80,7 @@ public static MysqlCreateTableSqlBuilder builder( .primaryKey(tableSchema.getPrimaryKey()) .constraintKeys(tableSchema.getConstraintKeys()) .addColumn(tableSchema.getColumns()) - .identifierCase(catalogTable.getOptions().get("identifierCase")); + .fieldIde(catalogTable.getOptions().get("fieldIde")); } public MysqlCreateTableSqlBuilder addColumn(List columns) { @@ -94,8 +94,8 @@ public MysqlCreateTableSqlBuilder primaryKey(PrimaryKey primaryKey) { return this; } - public MysqlCreateTableSqlBuilder identifierCase(String identifierCase) { - this.identifierCase = identifierCase; + public MysqlCreateTableSqlBuilder fieldIde(String fieldIde) { + this.fieldIde = fieldIde; return this; } @@ -129,7 +129,7 @@ public String build(String catalogName) { sqls.add( String.format( "CREATE TABLE %s (\n%s\n)", - CatalogUtils.quoteIdentifier(tableName, identifierCase, "`"), + CatalogUtils.quoteIdentifier(tableName, fieldIde, "`"), buildColumnsIdentifySql(catalogName))); if (engine != null) { sqls.add("ENGINE = " + engine); @@ -167,7 +167,7 @@ private String buildColumnsIdentifySql(String catalogName) { private String buildColumnIdentifySql(Column column, String catalogName) { final List columnSqls = new ArrayList<>(); - columnSqls.add(CatalogUtils.quoteIdentifier(column.getName(), identifierCase, "`")); + columnSqls.add(CatalogUtils.quoteIdentifier(column.getName(), fieldIde, "`")); if (StringUtils.equals(catalogName, "mysql")) { columnSqls.add(column.getSourceType()); } else { @@ -253,7 +253,7 @@ private String buildPrimaryKeySql() { .map(columnName -> "`" + columnName + "`") .collect(Collectors.joining(", ")); // add sort type - return String.format("PRIMARY KEY (%s)", CatalogUtils.quoteIdentifier(key, identifierCase)); + return String.format("PRIMARY KEY (%s)", CatalogUtils.quoteIdentifier(key, fieldIde)); } private String buildConstraintKeySql(ConstraintKey constraintKey) { diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCreateTableSqlBuilder.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCreateTableSqlBuilder.java index 6c942283a14..4b780131d54 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCreateTableSqlBuilder.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCreateTableSqlBuilder.java @@ -37,14 +37,14 @@ public class OracleCreateTableSqlBuilder { private PrimaryKey primaryKey; private OracleDataTypeConvertor oracleDataTypeConvertor; private String sourceCatalogName; - private String identifierCase; + private String fieldIde; public OracleCreateTableSqlBuilder(CatalogTable catalogTable) { this.columns = catalogTable.getTableSchema().getColumns(); this.primaryKey = catalogTable.getTableSchema().getPrimaryKey(); this.oracleDataTypeConvertor = new OracleDataTypeConvertor(); this.sourceCatalogName = catalogTable.getCatalogName(); - this.identifierCase = catalogTable.getOptions().get("identifierCase"); + this.fieldIde = catalogTable.getOptions().get("fieldIde"); } public String build(TablePath tablePath) { @@ -56,10 +56,7 @@ public String build(TablePath tablePath) { List columnSqls = columns.stream() - .map( - column -> - CatalogUtils.getIdentifierCase( - buildColumnSql(column), identifierCase)) + .map(column -> CatalogUtils.getFieldIde(buildColumnSql(column), fieldIde)) .collect(Collectors.toList()); // Add primary key directly in the create table statement @@ -154,7 +151,7 @@ private String buildPrimaryKeySql(PrimaryKey primaryKey) { primaryKeyStr = primaryKeyStr.substring(0, 25); } - return CatalogUtils.getIdentifierCase( + return CatalogUtils.getFieldIde( "CONSTRAINT " + primaryKeyStr + "_" @@ -162,18 +159,18 @@ private String buildPrimaryKeySql(PrimaryKey primaryKey) { + " PRIMARY KEY (" + columnNamesString + ")", - identifierCase); + fieldIde); } private String buildColumnCommentSql(Column column, String tableName) { StringBuilder columnCommentSql = new StringBuilder(); columnCommentSql - .append(CatalogUtils.quoteIdentifier("COMMENT ON COLUMN ", identifierCase)) + .append(CatalogUtils.quoteIdentifier("COMMENT ON COLUMN ", fieldIde)) .append(tableName) .append("."); columnCommentSql - .append(CatalogUtils.quoteIdentifier(column.getName(), identifierCase, "\"")) - .append(CatalogUtils.quoteIdentifier(" IS '", identifierCase)) + .append(CatalogUtils.quoteIdentifier(column.getName(), fieldIde, "\"")) + .append(CatalogUtils.quoteIdentifier(" IS '", fieldIde)) .append(column.getComment()) .append("'"); return columnCommentSql.toString(); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilder.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilder.java index 0da2427a65f..74b684c0e39 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilder.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilder.java @@ -38,20 +38,20 @@ public class PostgresCreateTableSqlBuilder { private PrimaryKey primaryKey; private PostgresDataTypeConvertor postgresDataTypeConvertor; private String sourceCatalogName; - private String identifierCase; + private String fieldIde; public PostgresCreateTableSqlBuilder(CatalogTable catalogTable) { this.columns = catalogTable.getTableSchema().getColumns(); this.primaryKey = catalogTable.getTableSchema().getPrimaryKey(); this.postgresDataTypeConvertor = new PostgresDataTypeConvertor(); this.sourceCatalogName = catalogTable.getCatalogName(); - this.identifierCase = catalogTable.getOptions().get("identifierCase"); + this.fieldIde = catalogTable.getOptions().get("fieldIde"); } public String build(TablePath tablePath) { StringBuilder createTableSql = new StringBuilder(); createTableSql - .append(CatalogUtils.quoteIdentifier("CREATE TABLE ", identifierCase)) + .append(CatalogUtils.quoteIdentifier("CREATE TABLE ", fieldIde)) .append(tablePath.getSchemaAndTableName("\"")) .append(" (\n"); @@ -60,7 +60,7 @@ public String build(TablePath tablePath) { .map( column -> CatalogUtils.quoteIdentifier( - buildColumnSql(column), identifierCase)) + buildColumnSql(column), fieldIde)) .collect(Collectors.toList()); createTableSql.append(String.join(",\n", columnSqls)); @@ -136,12 +136,12 @@ private String buildColumnType(Column column) { private String buildColumnCommentSql(Column column, String tableName) { StringBuilder columnCommentSql = new StringBuilder(); columnCommentSql - .append(CatalogUtils.quoteIdentifier("COMMENT ON COLUMN ", identifierCase)) + .append(CatalogUtils.quoteIdentifier("COMMENT ON COLUMN ", fieldIde)) .append(tableName) .append("."); columnCommentSql - .append(CatalogUtils.quoteIdentifier(column.getName(), identifierCase, "\"")) - .append(CatalogUtils.quoteIdentifier(" IS '", identifierCase)) + .append(CatalogUtils.quoteIdentifier(column.getName(), fieldIde, "\"")) + .append(CatalogUtils.quoteIdentifier(" IS '", fieldIde)) .append(column.getComment()) .append("'"); return columnCommentSql.toString(); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilder.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilder.java index c19cc1df7bd..86afa6e41e1 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilder.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilder.java @@ -56,7 +56,7 @@ public class SqlServerCreateTableSqlBuilder { private SqlServerDataTypeConvertor sqlServerDataTypeConvertor; - private String identifierCase; + private String fieldIde; private SqlServerCreateTableSqlBuilder(String tableName) { checkNotNull(tableName, "tableName must not be null"); @@ -80,7 +80,7 @@ public static SqlServerCreateTableSqlBuilder builder( .primaryKey(tableSchema.getPrimaryKey()) .constraintKeys(tableSchema.getConstraintKeys()) .addColumn(tableSchema.getColumns()) - .identifierCase(catalogTable.getOptions().get("identifierCase")); + .fieldIde(catalogTable.getOptions().get("fieldIde")); } public SqlServerCreateTableSqlBuilder addColumn(List columns) { @@ -94,8 +94,8 @@ public SqlServerCreateTableSqlBuilder primaryKey(PrimaryKey primaryKey) { return this; } - public SqlServerCreateTableSqlBuilder identifierCase(String identifierCase) { - this.identifierCase = identifierCase; + public SqlServerCreateTableSqlBuilder fieldIde(String fieldIde) { + this.fieldIde = fieldIde; return this; } @@ -146,7 +146,7 @@ public String build(TablePath tablePath, CatalogTable catalogTable) { sqls.add("COLLATE = " + collate); } String sqlTableSql = String.join(" ", sqls) + ";"; - sqlTableSql = CatalogUtils.quoteIdentifier(sqlTableSql, identifierCase); + sqlTableSql = CatalogUtils.quoteIdentifier(sqlTableSql, fieldIde); StringBuilder tableAndColumnComment = new StringBuilder(); if (comment != null) { sqls.add("COMMENT = '" + comment + "'"); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java index 1bcb9acc2e8..4b60f92d80a 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java @@ -17,16 +17,16 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils; -import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.IdentifierCase; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum; import org.apache.commons.lang3.StringUtils; public class CatalogUtils { - public static String getIdentifierCase(String identifier, String identifierCase) { - if (StringUtils.isBlank(identifierCase)) { + public static String getFieldIde(String identifier, String fieldIde) { + if (StringUtils.isBlank(fieldIde)) { return identifier; } - switch (IdentifierCase.valueOf(identifierCase.toUpperCase())) { + switch (FieldIdeEnum.valueOf(fieldIde.toUpperCase())) { case LOWERCASE: return identifier.toLowerCase(); case UPPERCASE: @@ -36,7 +36,7 @@ public static String getIdentifierCase(String identifier, String identifierCase) } } - public static String quoteIdentifier(String identifier, String identifierCase, String quote) { + public static String quoteIdentifier(String identifier, String fieldIde, String quote) { if (identifier.contains(".")) { String[] parts = identifier.split("\\."); StringBuilder sb = new StringBuilder(); @@ -44,28 +44,28 @@ public static String quoteIdentifier(String identifier, String identifierCase, S sb.append(quote).append(parts[i]).append(quote).append("."); } return sb.append(quote) - .append(getIdentifierCase(parts[parts.length - 1], identifierCase)) + .append(getFieldIde(parts[parts.length - 1], fieldIde)) .append(quote) .toString(); } - return quote + getIdentifierCase(identifier, identifierCase) + quote; + return quote + getFieldIde(identifier, fieldIde) + quote; } - public static String quoteIdentifier(String identifier, String identifierCase) { - return getIdentifierCase(identifier, identifierCase); + public static String quoteIdentifier(String identifier, String fieldIde) { + return getFieldIde(identifier, fieldIde); } - public static String quoteTableIdentifier(String identifier, String identifierCase) { + public static String quoteTableIdentifier(String identifier, String fieldIde) { if (identifier.contains(".")) { String[] parts = identifier.split("\\."); StringBuilder sb = new StringBuilder(); for (int i = 0; i < parts.length - 1; i++) { sb.append(parts[i]).append("."); } - return sb.append(getIdentifierCase(parts[parts.length - 1], identifierCase)).toString(); + return sb.append(getFieldIde(parts[parts.length - 1], fieldIde)).toString(); } - return getIdentifierCase(identifier, identifierCase); + return getFieldIde(identifier, fieldIde); } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java index 55939fcfae6..b01fc872f31 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java @@ -19,7 +19,7 @@ import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.Options; -import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.IdentifierCase; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum; import java.math.BigDecimal; import java.util.List; @@ -156,9 +156,9 @@ public interface JdbcOptions { .noDefaultValue() .withDescription("partition num"); - Option IDENTIFIER_CASE = - Options.key("identifier_case") - .enumType(IdentifierCase.class) + Option FIELD_IDE = + Options.key("field_ide") + .enumType(FieldIdeEnum.class) .noDefaultValue() .withDescription("Whether case conversion is required"); } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dialectenum/IdentifierCase.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dialectenum/FieldIdeEnum.java similarity index 94% rename from seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dialectenum/IdentifierCase.java rename to seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dialectenum/FieldIdeEnum.java index fcd67f6d1d6..39f95210623 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dialectenum/IdentifierCase.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dialectenum/FieldIdeEnum.java @@ -17,14 +17,14 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum; -public enum IdentifierCase { +public enum FieldIdeEnum { ORIGINAL("original"), // Original string form UPPERCASE("uppercase"), // Convert to uppercase LOWERCASE("lowercase"); // Convert to lowercase private final String value; - IdentifierCase(String value) { + FieldIdeEnum(String value) { this.value = value; } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java index cc59be820dc..acac2e68371 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java @@ -44,7 +44,7 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectLoader; -import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.IdentifierCase; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum; import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcAggregatedCommitInfo; import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSinkState; import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo; @@ -209,21 +209,21 @@ public void handleSaveMode(DataSaveMode saveMode) { catalogFactory.factoryIdentifier(), ReadonlyConfig.fromMap(new HashMap<>(catalogOptions)))) { catalog.open(); - IdentifierCase identifierCaseEnum = config.get(JdbcOptions.IDENTIFIER_CASE); - String identifierCase = - identifierCaseEnum == null - ? IdentifierCase.ORIGINAL.getValue() - : identifierCaseEnum.getValue(); + FieldIdeEnum fieldIdeEnumEnum = config.get(JdbcOptions.FIELD_IDE); + String fieldIde = + fieldIdeEnumEnum == null + ? FieldIdeEnum.ORIGINAL.getValue() + : fieldIdeEnumEnum.getValue(); TablePath tablePath = TablePath.of( jdbcSinkConfig.getDatabase() + "." + CatalogUtils.quoteTableIdentifier( - jdbcSinkConfig.getTable(), identifierCase)); + jdbcSinkConfig.getTable(), fieldIde)); if (!catalog.databaseExists(jdbcSinkConfig.getDatabase())) { catalog.createDatabase(tablePath, true); } - catalogTable.getOptions().put("identifierCase", identifierCase); + catalogTable.getOptions().put("fieldIde", fieldIde); if (!catalog.tableExists(tablePath)) { catalog.createTable(tablePath, catalogTable, true); } From f7fa31aabed163d15941360cb70f29d5d4a11a74 Mon Sep 17 00:00:00 2001 From: jiayang Date: Mon, 11 Sep 2023 19:03:29 +0800 Subject: [PATCH 8/9] [feature] TablePath reconfiguration --- .../api/table/catalog/TablePath.java | 59 ++++++++++--------- 1 file changed, 31 insertions(+), 28 deletions(-) diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java index 1b7694754cb..358e873b991 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java @@ -22,6 +22,8 @@ import lombok.RequiredArgsConstructor; import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; @Getter @EqualsAndHashCode @@ -54,18 +56,15 @@ public static TablePath of(String databaseName, String schemaName, String tableN } public String getSchemaAndTableName() { - return String.format("%s.%s", schemaName, tableName); + return getNameCommon(null, schemaName, tableName, null, null); } public String getSchemaAndTableName(String quote) { - return String.format("%s%s%s.%s%s%s", quote, schemaName, quote, quote, tableName, quote); + return getNameCommon(null, schemaName, tableName, quote, quote); } public String getFullName() { - if (schemaName == null) { - return String.format("%s.%s", databaseName, tableName); - } - return String.format("%s.%s.%s", databaseName, schemaName, tableName); + return getNameCommon(databaseName, schemaName, tableName, null, null); } public String getFullNameWithQuoted() { @@ -73,32 +72,36 @@ public String getFullNameWithQuoted() { } public String getFullNameWithQuoted(String quote) { - if (schemaName == null) { - return String.format( - "%s%s%s.%s%s%s", quote, databaseName, quote, quote, tableName, quote); - } - return String.format( - "%s%s%s.%s%s%s.%s%s%s", - quote, databaseName, quote, quote, schemaName, quote, quote, tableName, quote); + return getNameCommon(databaseName, schemaName, tableName, quote, quote); } public String getFullNameWithQuoted(String quoteLeft, String quoteRight) { - if (schemaName == null) { - return String.format( - "%s%s%s.%s%s%s", - quoteLeft, databaseName, quoteRight, quoteLeft, tableName, quoteRight); + return getNameCommon(databaseName, schemaName, tableName, quoteLeft, quoteRight); + } + + private String getNameCommon( + String databaseName, + String schemaName, + String tableName, + String quoteLeft, + String quoteRight) { + List joinList = new ArrayList<>(); + quoteLeft = quoteLeft == null ? "" : quoteLeft; + quoteRight = quoteRight == null ? "" : quoteRight; + + if (databaseName != null) { + joinList.add(quoteLeft + databaseName + quoteRight); } - return String.format( - "%s%s%s.%s%s%s.%s%s%s", - quoteLeft, - databaseName, - quoteRight, - quoteLeft, - schemaName, - quoteRight, - quoteLeft, - tableName, - quoteRight); + + if (schemaName != null) { + joinList.add(quoteLeft + schemaName + quoteRight); + } + + if (tableName != null) { + joinList.add(quoteLeft + tableName + quoteRight); + } + + return String.join(".", joinList); } @Override From fc10c1c23c8843fcd1792907876d04382cc5ba88 Mon Sep 17 00:00:00 2001 From: jiayang Date: Tue, 12 Sep 2023 14:56:04 +0800 Subject: [PATCH 9/9] [feature] spotless:apply --- .../seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java | 1 - 1 file changed, 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java index 37ec2fafd49..bbb776e486a 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java @@ -38,7 +38,6 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions; import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils; import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions; import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig;