diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseCatalogFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseCatalogFactory.java index 58dfa5b884a..01d035e1677 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseCatalogFactory.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseCatalogFactory.java @@ -31,6 +31,9 @@ import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.google.auto.service.AutoService; import java.util.Optional; @@ -38,6 +41,8 @@ @AutoService(Factory.class) public class OceanBaseCatalogFactory implements CatalogFactory { + private static final Logger log = LoggerFactory.getLogger(OceanBaseCatalogFactory.class); + @Override public String factoryIdentifier() { return DatabaseIdentifier.OCENABASE; diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java index 58cdb5c4131..08aa0faea08 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java @@ -17,10 +17,44 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oceanbase; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.ConstraintKey; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.exception.CatalogException; +import org.apache.seatunnel.api.table.converter.BasicTypeDefine; import org.apache.seatunnel.common.utils.JdbcUrlUtil; -import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MySqlCatalog; +import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog; +import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbase.OceanBaseMySqlTypeConverter; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbase.OceanBaseMySqlTypeMapper; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbase.OceanBaseMysqlType; -public class OceanBaseMySqlCatalog extends MySqlCatalog { +import com.google.common.base.Preconditions; +import lombok.extern.slf4j.Slf4j; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Iterator; +import java.util.List; +import java.util.Locale; + +@Slf4j +public class OceanBaseMySqlCatalog extends AbstractJdbcCatalog { + + private static final String SELECT_COLUMNS_SQL_TEMPLATE = + "SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME ='%s' ORDER BY ORDINAL_POSITION ASC"; + + private static final String SELECT_DATABASE_EXISTS = + "SELECT SCHEMA_NAME FROM information_schema.schemata WHERE SCHEMA_NAME = '%s'"; + + private static final String SELECT_TABLE_EXISTS = + "SELECT TABLE_SCHEMA,TABLE_NAME FROM information_schema.tables WHERE table_schema = '%s' AND table_name = '%s'"; static { SYS_DATABASES.clear(); @@ -32,8 +66,161 @@ public class OceanBaseMySqlCatalog extends MySqlCatalog { SYS_DATABASES.add("SYS"); } + private OceanBaseMySqlTypeConverter typeConverter; + public OceanBaseMySqlCatalog( String catalogName, String username, String pwd, JdbcUrlUtil.UrlInfo urlInfo) { - super(catalogName, username, pwd, urlInfo); + super(catalogName, username, pwd, urlInfo, null); + this.typeConverter = new OceanBaseMySqlTypeConverter(); + } + + @Override + protected String getDatabaseWithConditionSql(String databaseName) { + return String.format(SELECT_DATABASE_EXISTS, databaseName); + } + + @Override + protected String getTableWithConditionSql(TablePath tablePath) { + return String.format( + SELECT_TABLE_EXISTS, tablePath.getDatabaseName(), tablePath.getTableName()); + } + + @Override + protected String getListDatabaseSql() { + return "SHOW DATABASES;"; + } + + @Override + protected String getListTableSql(String databaseName) { + return "SHOW TABLES;"; + } + + @Override + protected String getTableName(ResultSet rs) throws SQLException { + return rs.getString(1); + } + + @Override + protected String getTableName(TablePath tablePath) { + return tablePath.getTableName(); + } + + @Override + protected String getSelectColumnsSql(TablePath tablePath) { + return String.format( + SELECT_COLUMNS_SQL_TEMPLATE, tablePath.getDatabaseName(), tablePath.getTableName()); + } + + @Override + protected TableIdentifier getTableIdentifier(TablePath tablePath) { + return TableIdentifier.of( + catalogName, tablePath.getDatabaseName(), tablePath.getTableName()); + } + + @Override + protected List getConstraintKeys(DatabaseMetaData metaData, TablePath tablePath) + throws SQLException { + List indexList = + super.getConstraintKeys( + metaData, + tablePath.getDatabaseName(), + tablePath.getSchemaName(), + tablePath.getTableName()); + for (Iterator it = indexList.iterator(); it.hasNext(); ) { + ConstraintKey index = it.next(); + if (ConstraintKey.ConstraintType.UNIQUE_KEY.equals(index.getConstraintType()) + && "PRIMARY".equals(index.getConstraintName())) { + it.remove(); + } + } + return indexList; + } + + @Override + protected Column buildColumn(ResultSet resultSet) throws SQLException { + String columnName = resultSet.getString("COLUMN_NAME"); + // e.g. tinyint(1) unsigned + String columnType = resultSet.getString("COLUMN_TYPE"); + // e.g. tinyint + String dataType = resultSet.getString("DATA_TYPE").toUpperCase(); + String comment = resultSet.getString("COLUMN_COMMENT"); + Object defaultValue = resultSet.getObject("COLUMN_DEFAULT"); + String isNullableStr = resultSet.getString("IS_NULLABLE"); + boolean isNullable = isNullableStr.equals("YES"); + // e.g. `decimal(10, 2)` is 10 + long numberPrecision = resultSet.getInt("NUMERIC_PRECISION"); + // e.g. `decimal(10, 2)` is 2 + int numberScale = resultSet.getInt("NUMERIC_SCALE"); + // e.g. `varchar(10)` is 40 + long charOctetLength = resultSet.getLong("CHARACTER_OCTET_LENGTH"); + // e.g. `timestamp(3)` is 3 + // int timePrecision = + // MySqlVersion.V_5_5.equals(version) ? 0 : + // resultSet.getInt("DATETIME_PRECISION"); + int timePrecision = resultSet.getInt("DATETIME_PRECISION"); + Preconditions.checkArgument(!(numberPrecision > 0 && charOctetLength > 0)); + Preconditions.checkArgument(!(numberScale > 0 && timePrecision > 0)); + + OceanBaseMysqlType oceanbaseMysqlType = OceanBaseMysqlType.getByName(columnType); + boolean unsigned = columnType.toLowerCase(Locale.ROOT).contains("unsigned"); + + BasicTypeDefine typeDefine = + BasicTypeDefine.builder() + .name(columnName) + .columnType(columnType) + .dataType(dataType) + .nativeType(oceanbaseMysqlType) + .unsigned(unsigned) + .length(Math.max(charOctetLength, numberPrecision)) + .precision(numberPrecision) + .scale(Math.max(numberScale, timePrecision)) + .nullable(isNullable) + .defaultValue(defaultValue) + .comment(comment) + .build(); + return typeConverter.convert(typeDefine); + } + + @Override + protected String getCreateTableSql(TablePath tablePath, CatalogTable table) { + return OceanBaseMysqlCreateTableSqlBuilder.builder(tablePath, table, typeConverter) + .build(table.getCatalogName()); + } + + @Override + protected String getDropTableSql(TablePath tablePath) { + return String.format( + "DROP TABLE `%s`.`%s`;", tablePath.getDatabaseName(), tablePath.getTableName()); + } + + @Override + protected String getCreateDatabaseSql(String databaseName) { + return String.format("CREATE DATABASE `%s`;", databaseName); + } + + @Override + protected String getDropDatabaseSql(String databaseName) { + return String.format("DROP DATABASE `%s`;", databaseName); + } + + @Override + public CatalogTable getTable(String sqlQuery) throws SQLException { + Connection defaultConnection = getConnection(defaultUrl); + Statement statement = defaultConnection.createStatement(); + ResultSetMetaData metaData = statement.executeQuery(sqlQuery).getMetaData(); + return CatalogUtils.getCatalogTable( + metaData, new OceanBaseMySqlTypeMapper(typeConverter), sqlQuery); + } + + @Override + protected String getTruncateTableSql(TablePath tablePath) throws CatalogException { + return String.format( + "TRUNCATE TABLE `%s`.`%s`;", tablePath.getDatabaseName(), tablePath.getTableName()); + } + + public String getExistDataSql(TablePath tablePath) { + return String.format( + "SELECT * FROM `%s`.`%s` LIMIT 1;", + tablePath.getDatabaseName(), tablePath.getTableName()); } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMysqlCreateTableSqlBuilder.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMysqlCreateTableSqlBuilder.java new file mode 100644 index 00000000000..bc3413dbd82 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMysqlCreateTableSqlBuilder.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oceanbase; + +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.ConstraintKey; +import org.apache.seatunnel.api.table.catalog.PrimaryKey; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.converter.BasicTypeDefine; +import org.apache.seatunnel.api.table.type.SqlType; +import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbase.OceanBaseMySqlTypeConverter; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbase.OceanBaseMysqlType; + +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument; +import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull; + +public class OceanBaseMysqlCreateTableSqlBuilder { + + private final String tableName; + private List columns; + + private String comment; + + private String engine; + private String charset; + private String collate; + + private PrimaryKey primaryKey; + + private List constraintKeys; + + private String fieldIde; + + private final OceanBaseMySqlTypeConverter typeConverter; + + private OceanBaseMysqlCreateTableSqlBuilder( + String tableName, OceanBaseMySqlTypeConverter typeConverter) { + checkNotNull(tableName, "tableName must not be null"); + this.tableName = tableName; + this.typeConverter = typeConverter; + } + + public static OceanBaseMysqlCreateTableSqlBuilder builder( + TablePath tablePath, + CatalogTable catalogTable, + OceanBaseMySqlTypeConverter typeConverter) { + checkNotNull(tablePath, "tablePath must not be null"); + checkNotNull(catalogTable, "catalogTable must not be null"); + + TableSchema tableSchema = catalogTable.getTableSchema(); + checkNotNull(tableSchema, "tableSchema must not be null"); + + return new OceanBaseMysqlCreateTableSqlBuilder(tablePath.getTableName(), typeConverter) + .comment(catalogTable.getComment()) + // todo: set charset and collate + .engine(null) + .charset(null) + .primaryKey(tableSchema.getPrimaryKey()) + .constraintKeys(tableSchema.getConstraintKeys()) + .addColumn(tableSchema.getColumns()) + .fieldIde(catalogTable.getOptions().get("fieldIde")); + } + + public OceanBaseMysqlCreateTableSqlBuilder addColumn(List columns) { + checkArgument(CollectionUtils.isNotEmpty(columns), "columns must not be empty"); + this.columns = columns; + return this; + } + + public OceanBaseMysqlCreateTableSqlBuilder primaryKey(PrimaryKey primaryKey) { + this.primaryKey = primaryKey; + return this; + } + + public OceanBaseMysqlCreateTableSqlBuilder fieldIde(String fieldIde) { + this.fieldIde = fieldIde; + return this; + } + + public OceanBaseMysqlCreateTableSqlBuilder constraintKeys(List constraintKeys) { + this.constraintKeys = constraintKeys; + return this; + } + + public OceanBaseMysqlCreateTableSqlBuilder engine(String engine) { + this.engine = engine; + return this; + } + + public OceanBaseMysqlCreateTableSqlBuilder charset(String charset) { + this.charset = charset; + return this; + } + + public OceanBaseMysqlCreateTableSqlBuilder collate(String collate) { + this.collate = collate; + return this; + } + + public OceanBaseMysqlCreateTableSqlBuilder comment(String comment) { + this.comment = comment; + return this; + } + + public String build(String catalogName) { + List sqls = new ArrayList<>(); + sqls.add( + String.format( + "CREATE TABLE %s (\n%s\n)", + CatalogUtils.quoteIdentifier(tableName, fieldIde, "`"), + buildColumnsIdentifySql(catalogName))); + if (engine != null) { + sqls.add("ENGINE = " + engine); + } + if (charset != null) { + sqls.add("DEFAULT CHARSET = " + charset); + } + if (collate != null) { + sqls.add("COLLATE = " + collate); + } + if (comment != null) { + sqls.add("COMMENT = '" + comment + "'"); + } + return String.join(" ", sqls) + ";"; + } + + private String buildColumnsIdentifySql(String catalogName) { + List columnSqls = new ArrayList<>(); + Map columnTypeMap = new HashMap<>(); + for (Column column : columns) { + columnSqls.add("\t" + buildColumnIdentifySql(column, catalogName, columnTypeMap)); + } + if (primaryKey != null) { + columnSqls.add("\t" + buildPrimaryKeySql()); + } + if (CollectionUtils.isNotEmpty(constraintKeys)) { + for (ConstraintKey constraintKey : constraintKeys) { + if (StringUtils.isBlank(constraintKey.getConstraintName())) { + continue; + } + String constraintKeyStr = buildConstraintKeySql(constraintKey, columnTypeMap); + if (StringUtils.isNotBlank(constraintKeyStr)) { + columnSqls.add("\t" + constraintKeyStr); + } + } + } + return String.join(", \n", columnSqls); + } + + private String buildColumnIdentifySql( + Column column, String catalogName, Map columnTypeMap) { + final List columnSqls = new ArrayList<>(); + columnSqls.add(CatalogUtils.quoteIdentifier(column.getName(), fieldIde, "`")); + String type; + if ((SqlType.TIME.equals(column.getDataType().getSqlType()) + || SqlType.TIMESTAMP.equals(column.getDataType().getSqlType())) + && column.getScale() != null) { + BasicTypeDefine typeDefine = typeConverter.reconvert(column); + type = typeDefine.getColumnType(); + } else if (StringUtils.equals(catalogName, DatabaseIdentifier.MYSQL) + && StringUtils.isNotBlank(column.getSourceType())) { + type = column.getSourceType(); + } else { + BasicTypeDefine typeDefine = typeConverter.reconvert(column); + type = typeDefine.getColumnType(); + } + columnSqls.add(type); + columnTypeMap.put(column.getName(), type); + // nullable + if (column.isNullable()) { + columnSqls.add("NULL"); + } else { + columnSqls.add("NOT NULL"); + } + + if (column.getComment() != null) { + columnSqls.add( + "COMMENT '" + + column.getComment().replace("'", "''").replace("\\", "\\\\") + + "'"); + } + + return String.join(" ", columnSqls); + } + + private String buildPrimaryKeySql() { + String key = + primaryKey.getColumnNames().stream() + .map(columnName -> "`" + columnName + "`") + .collect(Collectors.joining(", ")); + // add sort type + return String.format("PRIMARY KEY (%s)", CatalogUtils.quoteIdentifier(key, fieldIde)); + } + + private String buildConstraintKeySql( + ConstraintKey constraintKey, Map columnTypeMap) { + ConstraintKey.ConstraintType constraintType = constraintKey.getConstraintType(); + String indexColumns = + constraintKey.getColumnNames().stream() + .map( + constraintKeyColumn -> { + String columnName = constraintKeyColumn.getColumnName(); + boolean withLength = false; + if (columnTypeMap.containsKey(columnName)) { + String columnType = columnTypeMap.get(columnName); + if (columnType.endsWith("BLOB") + || columnType.endsWith("TEXT")) { + withLength = true; + } + } + if (constraintKeyColumn.getSortType() == null) { + return String.format( + "`%s`%s", + CatalogUtils.getFieldIde(columnName, fieldIde), + withLength ? "(255)" : ""); + } + return String.format( + "`%s`%s %s", + CatalogUtils.getFieldIde(columnName, fieldIde), + withLength ? "(255)" : "", + constraintKeyColumn.getSortType().name()); + }) + .collect(Collectors.joining(", ")); + String keyName = null; + switch (constraintType) { + case INDEX_KEY: + keyName = "KEY"; + break; + case UNIQUE_KEY: + keyName = "UNIQUE KEY"; + break; + case FOREIGN_KEY: + keyName = "FOREIGN KEY"; + // todo: + break; + default: + throw new UnsupportedOperationException( + "Unsupported constraint type: " + constraintType); + } + return String.format( + "%s `%s` (%s)", keyName, constraintKey.getConstraintName(), indexColumns); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java index b3a456870cc..d25d48b4f2c 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java @@ -19,7 +19,6 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory; -import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MysqlDialect; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle.OracleDialect; import com.google.auto.service.AutoService; @@ -44,6 +43,6 @@ public JdbcDialect create(@Nonnull String compatibleMode, String fieldIde) { if ("oracle".equalsIgnoreCase(compatibleMode)) { return new OracleDialect(); } - return new MysqlDialect(); + return new OceanBaseMysqlDialect(); } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMySqlTypeConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMySqlTypeConverter.java new file mode 100644 index 00000000000..4e9fa04d0d3 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMySqlTypeConverter.java @@ -0,0 +1,513 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbase; + +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.converter.BasicTypeDefine; +import org.apache.seatunnel.api.table.converter.TypeConverter; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.DecimalType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; +import org.apache.seatunnel.common.exception.CommonError; +import org.apache.seatunnel.connectors.seatunnel.common.source.TypeDefineUtils; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier; + +import com.google.auto.service.AutoService; +import com.google.common.base.Preconditions; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@AutoService(TypeConverter.class) +public class OceanBaseMySqlTypeConverter + implements TypeConverter> { + + // ============================data types===================== + static final String MYSQL_NULL = "NULL"; + static final String MYSQL_BIT = "BIT"; + + // -------------------------number---------------------------- + static final String MYSQL_TINYINT = "TINYINT"; + static final String MYSQL_TINYINT_UNSIGNED = "TINYINT UNSIGNED"; + static final String MYSQL_SMALLINT = "SMALLINT"; + static final String MYSQL_SMALLINT_UNSIGNED = "SMALLINT UNSIGNED"; + static final String MYSQL_MEDIUMINT = "MEDIUMINT"; + static final String MYSQL_MEDIUMINT_UNSIGNED = "MEDIUMINT UNSIGNED"; + static final String MYSQL_INT = "INT"; + static final String MYSQL_INT_UNSIGNED = "INT UNSIGNED"; + static final String MYSQL_INTEGER = "INTEGER"; + static final String MYSQL_INTEGER_UNSIGNED = "INTEGER UNSIGNED"; + static final String MYSQL_BIGINT = "BIGINT"; + static final String MYSQL_BIGINT_UNSIGNED = "BIGINT UNSIGNED"; + static final String MYSQL_DECIMAL = "DECIMAL"; + static final String MYSQL_DECIMAL_UNSIGNED = "DECIMAL UNSIGNED"; + static final String MYSQL_FLOAT = "FLOAT"; + static final String MYSQL_FLOAT_UNSIGNED = "FLOAT UNSIGNED"; + static final String MYSQL_DOUBLE = "DOUBLE"; + static final String MYSQL_DOUBLE_UNSIGNED = "DOUBLE UNSIGNED"; + + // -------------------------string---------------------------- + public static final String MYSQL_CHAR = "CHAR"; + public static final String MYSQL_VARCHAR = "VARCHAR"; + static final String MYSQL_TINYTEXT = "TINYTEXT"; + static final String MYSQL_MEDIUMTEXT = "MEDIUMTEXT"; + static final String MYSQL_TEXT = "TEXT"; + static final String MYSQL_LONGTEXT = "LONGTEXT"; + static final String MYSQL_JSON = "JSON"; + static final String MYSQL_ENUM = "ENUM"; + + // ------------------------------time------------------------- + static final String MYSQL_DATE = "DATE"; + public static final String MYSQL_DATETIME = "DATETIME"; + public static final String MYSQL_TIME = "TIME"; + public static final String MYSQL_TIMESTAMP = "TIMESTAMP"; + static final String MYSQL_YEAR = "YEAR"; + + // ------------------------------blob------------------------- + static final String MYSQL_TINYBLOB = "TINYBLOB"; + static final String MYSQL_MEDIUMBLOB = "MEDIUMBLOB"; + static final String MYSQL_BLOB = "BLOB"; + static final String MYSQL_LONGBLOB = "LONGBLOB"; + static final String MYSQL_BINARY = "BINARY"; + static final String MYSQL_VARBINARY = "VARBINARY"; + static final String MYSQL_GEOMETRY = "GEOMETRY"; + + public static final int DEFAULT_PRECISION = 38; + public static final int MAX_PRECISION = 65; + public static final int DEFAULT_SCALE = 18; + public static final int MAX_SCALE = 30; + public static final int MAX_TIME_SCALE = 6; + public static final int MAX_TIMESTAMP_SCALE = 6; + public static final long POWER_2_8 = (long) Math.pow(2, 8); + public static final long POWER_2_16 = (long) Math.pow(2, 16); + public static final long POWER_2_24 = (long) Math.pow(2, 24); + public static final long POWER_2_32 = (long) Math.pow(2, 32); + public static final long MAX_VARBINARY_LENGTH = POWER_2_16 - 4; + + @Override + public String identifier() { + return DatabaseIdentifier.OCENABASE; + } + + @Override + public Column convert(BasicTypeDefine typeDefine) { + PhysicalColumn.PhysicalColumnBuilder builder = + PhysicalColumn.builder() + .name(typeDefine.getName()) + .sourceType(typeDefine.getColumnType()) + .nullable(typeDefine.isNullable()) + .defaultValue(typeDefine.getDefaultValue()) + .comment(typeDefine.getComment()); + + String mysqlDataType = typeDefine.getDataType().toUpperCase(); + if (typeDefine.isUnsigned() && !(mysqlDataType.endsWith(" UNSIGNED"))) { + mysqlDataType = mysqlDataType + " UNSIGNED"; + } + switch (mysqlDataType) { + case MYSQL_NULL: + builder.dataType(BasicType.VOID_TYPE); + break; + case MYSQL_BIT: + if (typeDefine.getLength() == null || typeDefine.getLength() <= 0) { + builder.dataType(BasicType.BOOLEAN_TYPE); + } else if (typeDefine.getLength() == 1) { + builder.dataType(BasicType.BOOLEAN_TYPE); + } else { + builder.dataType(PrimitiveByteArrayType.INSTANCE); + // BIT(M) -> BYTE(M/8) + long byteLength = typeDefine.getLength() / 8; + byteLength += typeDefine.getLength() % 8 > 0 ? 1 : 0; + builder.columnLength(byteLength); + } + break; + case MYSQL_TINYINT: + if (typeDefine.getColumnType().equalsIgnoreCase("tinyint(1)")) { + builder.dataType(BasicType.BOOLEAN_TYPE); + } else { + builder.dataType(BasicType.BYTE_TYPE); + } + break; + case MYSQL_TINYINT_UNSIGNED: + case MYSQL_SMALLINT: + builder.dataType(BasicType.SHORT_TYPE); + break; + case MYSQL_SMALLINT_UNSIGNED: + case MYSQL_MEDIUMINT: + case MYSQL_MEDIUMINT_UNSIGNED: + case MYSQL_INT: + case MYSQL_INTEGER: + case MYSQL_YEAR: + builder.dataType(BasicType.INT_TYPE); + break; + case MYSQL_INT_UNSIGNED: + case MYSQL_INTEGER_UNSIGNED: + case MYSQL_BIGINT: + builder.dataType(BasicType.LONG_TYPE); + break; + case MYSQL_BIGINT_UNSIGNED: + DecimalType intDecimalType = new DecimalType(20, 0); + builder.dataType(intDecimalType); + builder.columnLength(Long.valueOf(intDecimalType.getPrecision())); + builder.scale(intDecimalType.getScale()); + break; + case MYSQL_FLOAT: + builder.dataType(BasicType.FLOAT_TYPE); + break; + case MYSQL_FLOAT_UNSIGNED: + log.warn("{} will probably cause value overflow.", MYSQL_FLOAT_UNSIGNED); + builder.dataType(BasicType.FLOAT_TYPE); + break; + case MYSQL_DOUBLE: + builder.dataType(BasicType.DOUBLE_TYPE); + break; + case MYSQL_DOUBLE_UNSIGNED: + log.warn("{} will probably cause value overflow.", MYSQL_DOUBLE_UNSIGNED); + builder.dataType(BasicType.DOUBLE_TYPE); + break; + case MYSQL_DECIMAL: + Preconditions.checkArgument(typeDefine.getPrecision() > 0); + + DecimalType decimalType; + if (typeDefine.getPrecision() > DEFAULT_PRECISION) { + log.warn("{} will probably cause value overflow.", MYSQL_DECIMAL); + decimalType = new DecimalType(DEFAULT_PRECISION, DEFAULT_SCALE); + } else { + decimalType = + new DecimalType( + typeDefine.getPrecision().intValue(), + typeDefine.getScale() == null + ? 0 + : typeDefine.getScale().intValue()); + } + builder.dataType(decimalType); + builder.columnLength(Long.valueOf(decimalType.getPrecision())); + builder.scale(decimalType.getScale()); + break; + case MYSQL_DECIMAL_UNSIGNED: + Preconditions.checkArgument(typeDefine.getPrecision() > 0); + + log.warn("{} will probably cause value overflow.", MYSQL_DECIMAL_UNSIGNED); + DecimalType decimalUnsignedType = + new DecimalType( + typeDefine.getPrecision().intValue() + 1, + typeDefine.getScale() == null + ? 0 + : typeDefine.getScale().intValue()); + builder.dataType(decimalUnsignedType); + builder.columnLength(Long.valueOf(decimalUnsignedType.getPrecision())); + builder.scale(decimalUnsignedType.getScale()); + break; + case MYSQL_ENUM: + builder.dataType(BasicType.STRING_TYPE); + if (typeDefine.getLength() == null || typeDefine.getLength() <= 0) { + builder.columnLength(100L); + } else { + builder.columnLength(typeDefine.getLength()); + } + break; + case MYSQL_CHAR: + case MYSQL_VARCHAR: + if (typeDefine.getLength() == null || typeDefine.getLength() <= 0) { + builder.columnLength(TypeDefineUtils.charTo4ByteLength(1L)); + } else { + builder.columnLength(typeDefine.getLength()); + } + builder.dataType(BasicType.STRING_TYPE); + break; + case MYSQL_TINYTEXT: + builder.dataType(BasicType.STRING_TYPE); + builder.columnLength(POWER_2_8 - 1); + break; + case MYSQL_TEXT: + builder.dataType(BasicType.STRING_TYPE); + builder.columnLength(POWER_2_16 - 1); + break; + case MYSQL_MEDIUMTEXT: + builder.dataType(BasicType.STRING_TYPE); + builder.columnLength(POWER_2_24 - 1); + break; + case MYSQL_LONGTEXT: + builder.dataType(BasicType.STRING_TYPE); + builder.columnLength(POWER_2_32 - 1); + break; + case MYSQL_JSON: + builder.dataType(BasicType.STRING_TYPE); + break; + case MYSQL_BINARY: + case MYSQL_VARBINARY: + if (typeDefine.getLength() == null || typeDefine.getLength() <= 0) { + builder.columnLength(1L); + } else { + builder.columnLength(typeDefine.getLength()); + } + builder.dataType(PrimitiveByteArrayType.INSTANCE); + break; + case MYSQL_TINYBLOB: + builder.dataType(PrimitiveByteArrayType.INSTANCE); + builder.columnLength(POWER_2_8 - 1); + break; + case MYSQL_BLOB: + builder.dataType(PrimitiveByteArrayType.INSTANCE); + builder.columnLength(POWER_2_16 - 1); + break; + case MYSQL_MEDIUMBLOB: + builder.dataType(PrimitiveByteArrayType.INSTANCE); + builder.columnLength(POWER_2_24 - 1); + break; + case MYSQL_LONGBLOB: + builder.dataType(PrimitiveByteArrayType.INSTANCE); + builder.columnLength(POWER_2_32 - 1); + break; + case MYSQL_GEOMETRY: + builder.dataType(PrimitiveByteArrayType.INSTANCE); + break; + case MYSQL_DATE: + builder.dataType(LocalTimeType.LOCAL_DATE_TYPE); + break; + case MYSQL_TIME: + builder.dataType(LocalTimeType.LOCAL_TIME_TYPE); + builder.scale(typeDefine.getScale()); + break; + case MYSQL_DATETIME: + case MYSQL_TIMESTAMP: + builder.dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE); + builder.scale(typeDefine.getScale()); + break; + default: + throw CommonError.convertToSeaTunnelTypeError( + DatabaseIdentifier.OCENABASE, mysqlDataType, typeDefine.getName()); + } + return builder.build(); + } + + @Override + public BasicTypeDefine reconvert(Column column) { + BasicTypeDefine.BasicTypeDefineBuilder builder = + BasicTypeDefine.builder() + .name(column.getName()) + .nullable(column.isNullable()) + .comment(column.getComment()) + .defaultValue(column.getDefaultValue()); + switch (column.getDataType().getSqlType()) { + case NULL: + builder.nativeType(OceanBaseMysqlType.NULL); + builder.columnType(MYSQL_NULL); + builder.dataType(MYSQL_NULL); + break; + case BOOLEAN: + builder.nativeType(OceanBaseMysqlType.BOOLEAN); + builder.columnType(String.format("%s(%s)", MYSQL_TINYINT, 1)); + builder.dataType(MYSQL_TINYINT); + builder.length(1L); + break; + case TINYINT: + builder.nativeType(OceanBaseMysqlType.TINYINT); + builder.columnType(MYSQL_TINYINT); + builder.dataType(MYSQL_TINYINT); + break; + case SMALLINT: + builder.nativeType(OceanBaseMysqlType.SMALLINT); + builder.columnType(MYSQL_SMALLINT); + builder.dataType(MYSQL_SMALLINT); + break; + case INT: + builder.nativeType(OceanBaseMysqlType.INT); + builder.columnType(MYSQL_INT); + builder.dataType(MYSQL_INT); + break; + case BIGINT: + builder.nativeType(OceanBaseMysqlType.BIGINT); + builder.columnType(MYSQL_BIGINT); + builder.dataType(MYSQL_BIGINT); + break; + case FLOAT: + builder.nativeType(OceanBaseMysqlType.FLOAT); + builder.columnType(MYSQL_FLOAT); + builder.dataType(MYSQL_FLOAT); + break; + case DOUBLE: + builder.nativeType(OceanBaseMysqlType.DOUBLE); + builder.columnType(MYSQL_DOUBLE); + builder.dataType(MYSQL_DOUBLE); + break; + case DECIMAL: + DecimalType decimalType = (DecimalType) column.getDataType(); + long precision = decimalType.getPrecision(); + int scale = decimalType.getScale(); + if (precision <= 0) { + precision = DEFAULT_PRECISION; + scale = DEFAULT_SCALE; + log.warn( + "The decimal column {} type decimal({},{}) is out of range, " + + "which is precision less than 0, " + + "it will be converted to decimal({},{})", + column.getName(), + decimalType.getPrecision(), + decimalType.getScale(), + precision, + scale); + } else if (precision > MAX_PRECISION) { + scale = (int) Math.max(0, scale - (precision - MAX_PRECISION)); + precision = MAX_PRECISION; + log.warn( + "The decimal column {} type decimal({},{}) is out of range, " + + "which exceeds the maximum precision of {}, " + + "it will be converted to decimal({},{})", + column.getName(), + decimalType.getPrecision(), + decimalType.getScale(), + MAX_PRECISION, + precision, + scale); + } + if (scale < 0) { + scale = 0; + log.warn( + "The decimal column {} type decimal({},{}) is out of range, " + + "which is scale less than 0, " + + "it will be converted to decimal({},{})", + column.getName(), + decimalType.getPrecision(), + decimalType.getScale(), + precision, + scale); + } else if (scale > MAX_SCALE) { + scale = MAX_SCALE; + log.warn( + "The decimal column {} type decimal({},{}) is out of range, " + + "which exceeds the maximum scale of {}, " + + "it will be converted to decimal({},{})", + column.getName(), + decimalType.getPrecision(), + decimalType.getScale(), + MAX_SCALE, + precision, + scale); + } + + builder.nativeType(OceanBaseMysqlType.DECIMAL); + builder.columnType(String.format("%s(%s,%s)", MYSQL_DECIMAL, precision, scale)); + builder.dataType(MYSQL_DECIMAL); + builder.precision(precision); + builder.scale(scale); + break; + case BYTES: + if (column.getColumnLength() == null || column.getColumnLength() <= 0) { + builder.nativeType(OceanBaseMysqlType.VARBINARY); + builder.columnType( + String.format("%s(%s)", MYSQL_VARBINARY, MAX_VARBINARY_LENGTH / 2)); + builder.dataType(MYSQL_VARBINARY); + } else if (column.getColumnLength() < MAX_VARBINARY_LENGTH) { + builder.nativeType(OceanBaseMysqlType.VARBINARY); + builder.columnType( + String.format("%s(%s)", MYSQL_VARBINARY, column.getColumnLength())); + builder.dataType(MYSQL_VARBINARY); + } else if (column.getColumnLength() < POWER_2_24) { + builder.nativeType(OceanBaseMysqlType.MEDIUMBLOB); + builder.columnType(MYSQL_MEDIUMBLOB); + builder.dataType(MYSQL_MEDIUMBLOB); + } else { + builder.nativeType(OceanBaseMysqlType.LONGBLOB); + builder.columnType(MYSQL_LONGBLOB); + builder.dataType(MYSQL_LONGBLOB); + } + break; + case STRING: + if (column.getColumnLength() == null || column.getColumnLength() <= 0) { + builder.nativeType(OceanBaseMysqlType.LONGTEXT); + builder.columnType(MYSQL_LONGTEXT); + builder.dataType(MYSQL_LONGTEXT); + } else if (column.getColumnLength() < POWER_2_8) { + builder.nativeType(OceanBaseMysqlType.VARCHAR); + builder.columnType( + String.format("%s(%s)", MYSQL_VARCHAR, column.getColumnLength())); + builder.dataType(MYSQL_VARCHAR); + } else if (column.getColumnLength() < POWER_2_16) { + builder.nativeType(OceanBaseMysqlType.TEXT); + builder.columnType(MYSQL_TEXT); + builder.dataType(MYSQL_TEXT); + } else if (column.getColumnLength() < POWER_2_24) { + builder.nativeType(OceanBaseMysqlType.MEDIUMTEXT); + builder.columnType(MYSQL_MEDIUMTEXT); + builder.dataType(MYSQL_MEDIUMTEXT); + } else { + builder.nativeType(OceanBaseMysqlType.LONGTEXT); + builder.columnType(MYSQL_LONGTEXT); + builder.dataType(MYSQL_LONGTEXT); + } + break; + case DATE: + builder.nativeType(OceanBaseMysqlType.DATE); + builder.columnType(MYSQL_DATE); + builder.dataType(MYSQL_DATE); + break; + case TIME: + builder.nativeType(OceanBaseMysqlType.TIME); + builder.dataType(MYSQL_TIME); + if (column.getScale() != null && column.getScale() > 0) { + int timeScale = column.getScale(); + if (timeScale > MAX_TIME_SCALE) { + timeScale = MAX_TIME_SCALE; + log.warn( + "The time column {} type time({}) is out of range, " + + "which exceeds the maximum scale of {}, " + + "it will be converted to time({})", + column.getName(), + column.getScale(), + MAX_SCALE, + timeScale); + } + builder.columnType(String.format("%s(%s)", MYSQL_TIME, timeScale)); + builder.scale(timeScale); + } else { + builder.columnType(MYSQL_TIME); + } + break; + case TIMESTAMP: + builder.nativeType(OceanBaseMysqlType.DATETIME); + builder.dataType(MYSQL_DATETIME); + if (column.getScale() != null && column.getScale() > 0) { + int timestampScale = column.getScale(); + if (timestampScale > MAX_TIMESTAMP_SCALE) { + timestampScale = MAX_TIMESTAMP_SCALE; + log.warn( + "The timestamp column {} type timestamp({}) is out of range, " + + "which exceeds the maximum scale of {}, " + + "it will be converted to timestamp({})", + column.getName(), + column.getScale(), + MAX_TIMESTAMP_SCALE, + timestampScale); + } + builder.columnType(String.format("%s(%s)", MYSQL_DATETIME, timestampScale)); + builder.scale(timestampScale); + } else { + builder.columnType(MYSQL_DATETIME); + } + break; + default: + throw CommonError.convertToConnectorTypeError( + DatabaseIdentifier.OCENABASE, + column.getDataType().getSqlType().name(), + column.getName()); + } + + return builder.build(); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMySqlTypeMapper.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMySqlTypeMapper.java new file mode 100644 index 00000000000..e4d6e8b9739 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMySqlTypeMapper.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbase; + +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.converter.BasicTypeDefine; +import org.apache.seatunnel.connectors.seatunnel.common.source.TypeDefineUtils; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper; + +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.Arrays; + +public class OceanBaseMySqlTypeMapper implements JdbcDialectTypeMapper { + + private OceanBaseMySqlTypeConverter typeConverter; + + public OceanBaseMySqlTypeMapper() { + this.typeConverter = new OceanBaseMySqlTypeConverter(); + } + + public OceanBaseMySqlTypeMapper(OceanBaseMySqlTypeConverter typeConverter) { + this.typeConverter = typeConverter; + } + + @Override + public Column mappingColumn(BasicTypeDefine typeDefine) { + return typeConverter.convert(typeDefine); + } + + @Override + public Column mappingColumn(ResultSetMetaData metadata, int colIndex) throws SQLException { + String columnName = metadata.getColumnLabel(colIndex); + // e.g. tinyint unsigned + String nativeType = metadata.getColumnTypeName(colIndex); + int isNullable = metadata.isNullable(colIndex); + int precision = metadata.getPrecision(colIndex); + int scale = metadata.getScale(colIndex); + + if (Arrays.asList("CHAR", "VARCHAR", "ENUM").contains(nativeType)) { + long octetLength = TypeDefineUtils.charTo4ByteLength((long) precision); + precision = (int) Math.max(precision, octetLength); + } + + BasicTypeDefine typeDefine = + BasicTypeDefine.builder() + .name(columnName) + .columnType(nativeType) + .dataType(nativeType) + .nullable(isNullable == ResultSetMetaData.columnNullable) + .length((long) precision) + .precision((long) precision) + .scale(scale) + .build(); + return mappingColumn(typeDefine); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlDialect.java new file mode 100644 index 00000000000..83d3220b129 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlDialect.java @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbase; + +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.converter.BasicTypeDefine; +import org.apache.seatunnel.api.table.event.AlterTableColumnEvent; +import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode; +import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.SQLUtils; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum; +import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceTable; +import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.MysqlDefaultValueUtils; + +import org.apache.commons.lang3.StringUtils; + +import lombok.extern.slf4j.Slf4j; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +@Slf4j +public class OceanBaseMysqlDialect implements JdbcDialect { + + private static final List NOT_SUPPORTED_DEFAULT_VALUES = + Arrays.asList( + OceanBaseMysqlType.BLOB, + OceanBaseMysqlType.TEXT, + OceanBaseMysqlType.JSON, + OceanBaseMysqlType.GEOMETRY); + + public String fieldIde = FieldIdeEnum.ORIGINAL.getValue(); + + public OceanBaseMysqlDialect() {} + + public OceanBaseMysqlDialect(String fieldIde) { + this.fieldIde = fieldIde; + } + + @Override + public String dialectName() { + return DatabaseIdentifier.OCENABASE; + } + + @Override + public JdbcRowConverter getRowConverter() { + return new OceanBaseMysqlJdbcRowConverter(); + } + + @Override + public JdbcDialectTypeMapper getJdbcDialectTypeMapper() { + return new OceanBaseMySqlTypeMapper(); + } + + @Override + public String quoteIdentifier(String identifier) { + return "`" + getFieldIde(identifier, fieldIde) + "`"; + } + + @Override + public String quoteDatabaseIdentifier(String identifier) { + return "`" + identifier + "`"; + } + + @Override + public Optional getUpsertStatement( + String database, String tableName, String[] fieldNames, String[] uniqueKeyFields) { + String updateClause = + Arrays.stream(fieldNames) + .map( + fieldName -> + quoteIdentifier(fieldName) + + "=VALUES(" + + quoteIdentifier(fieldName) + + ")") + .collect(Collectors.joining(", ")); + String upsertSQL = + getInsertIntoStatement(database, tableName, fieldNames) + + " ON DUPLICATE KEY UPDATE " + + updateClause; + return Optional.of(upsertSQL); + } + + @Override + public PreparedStatement creatPreparedStatement( + Connection connection, String queryTemplate, int fetchSize) throws SQLException { + PreparedStatement statement = + connection.prepareStatement( + queryTemplate, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + statement.setFetchSize(Integer.MIN_VALUE); + return statement; + } + + @Override + public String extractTableName(TablePath tablePath) { + return tablePath.getTableName(); + } + + @Override + public Map defaultParameter() { + HashMap map = new HashMap<>(); + map.put("rewriteBatchedStatements", "true"); + return map; + } + + @Override + public TablePath parse(String tablePath) { + return TablePath.of(tablePath, false); + } + + @Override + public Object[] sampleDataFromColumn( + Connection connection, + JdbcSourceTable table, + String columnName, + int samplingRate, + int fetchSize) + throws Exception { + String sampleQuery; + if (StringUtils.isNotBlank(table.getQuery())) { + sampleQuery = + String.format( + "SELECT %s FROM (%s) AS T", + quoteIdentifier(columnName), table.getQuery()); + } else { + sampleQuery = + String.format( + "SELECT %s FROM %s", + quoteIdentifier(columnName), tableIdentifier(table.getTablePath())); + } + + try (Statement stmt = + connection.createStatement( + ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) { + stmt.setFetchSize(Integer.MIN_VALUE); + try (ResultSet rs = stmt.executeQuery(sampleQuery)) { + int count = 0; + List results = new ArrayList<>(); + + while (rs.next()) { + count++; + if (count % samplingRate == 0) { + results.add(rs.getObject(1)); + } + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException("Thread interrupted"); + } + } + Object[] resultsArray = results.toArray(); + Arrays.sort(resultsArray); + return resultsArray; + } + } + } + + @Override + public Long approximateRowCntStatement(Connection connection, JdbcSourceTable table) + throws SQLException { + + // 1. If no query is configured, use TABLE STATUS. + // 2. If a query is configured but does not contain a WHERE clause and tablePath is + // configured , use TABLE STATUS. + // 3. If a query is configured with a WHERE clause, or a query statement is configured but + // tablePath is TablePath.DEFAULT, use COUNT(*). + + boolean useTableStats = + StringUtils.isBlank(table.getQuery()) + || (!table.getQuery().toLowerCase().contains("where") + && table.getTablePath() != null + && !TablePath.DEFAULT + .getFullName() + .equals(table.getTablePath().getFullName())); + + if (useTableStats) { + // The statement used to get approximate row count which is less + // accurate than COUNT(*), but is more efficient for large table. + TablePath tablePath = table.getTablePath(); + String useDatabaseStatement = + String.format("USE %s;", quoteDatabaseIdentifier(tablePath.getDatabaseName())); + String rowCountQuery = + String.format("SHOW TABLE STATUS LIKE '%s';", tablePath.getTableName()); + + try (Statement stmt = connection.createStatement()) { + log.info("Split Chunk, approximateRowCntStatement: {}", useDatabaseStatement); + stmt.execute(useDatabaseStatement); + log.info("Split Chunk, approximateRowCntStatement: {}", rowCountQuery); + try (ResultSet rs = stmt.executeQuery(rowCountQuery)) { + if (!rs.next() || rs.getMetaData().getColumnCount() < 5) { + throw new SQLException( + String.format( + "No result returned after running query [%s]", + rowCountQuery)); + } + return rs.getLong(5); + } + } + } + + return SQLUtils.countForSubquery(connection, table.getQuery()); + } + + @Override + public void refreshTableSchemaBySchemaChangeEvent( + String sourceDialectName, + AlterTableColumnEvent event, + JdbcConnectionProvider refreshTableSchemaConnectionProvider, + TablePath sinkTablePath) { + try (Connection connection = + refreshTableSchemaConnectionProvider.getOrEstablishConnection(); + Statement stmt = connection.createStatement()) { + String alterTableSql = generateAlterTableSql(sourceDialectName, event, sinkTablePath); + log.info("Apply schema change with sql: {}", alterTableSql); + stmt.execute(alterTableSql); + } catch (Exception e) { + throw new JdbcConnectorException( + JdbcConnectorErrorCode.REFRESH_PHYSICAL_TABLESCHEMA_BY_SCHEMA_CHANGE_EVENT, e); + } + } + + @Override + public String decorateWithComment(String basicSql, BasicTypeDefine typeBasicTypeDefine) { + OceanBaseMysqlType nativeType = (OceanBaseMysqlType) typeBasicTypeDefine.getNativeType(); + if (NOT_SUPPORTED_DEFAULT_VALUES.contains(nativeType)) { + return basicSql; + } + return JdbcDialect.super.decorateWithComment(basicSql, typeBasicTypeDefine); + } + + @Override + public boolean needsQuotesWithDefaultValue(String sqlType) { + OceanBaseMysqlType mysqlType = OceanBaseMysqlType.getByName(sqlType); + switch (mysqlType) { + case CHAR: + case VARCHAR: + case TEXT: + case TINYTEXT: + case MEDIUMTEXT: + case LONGTEXT: + case ENUM: + case SET: + case BLOB: + case TINYBLOB: + case MEDIUMBLOB: + case LONGBLOB: + case DATE: + case DATETIME: + case TIMESTAMP: + case TIME: + case YEAR: + return true; + default: + return false; + } + } + + @Override + public boolean isSpecialDefaultValue(Object defaultValue) { + return MysqlDefaultValueUtils.isSpecialDefaultValue(defaultValue); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java new file mode 100644 index 00000000000..2033518108c --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbase; + +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier; + +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; + +public class OceanBaseMysqlJdbcRowConverter extends AbstractJdbcRowConverter { + @Override + public String converterName() { + return DatabaseIdentifier.OCENABASE; + } + + @Override + protected void writeTime(PreparedStatement statement, int index, LocalTime time) + throws SQLException { + // Write to time column using timestamp retains milliseconds + statement.setTimestamp( + index, java.sql.Timestamp.valueOf(LocalDateTime.of(LocalDate.now(), time))); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlType.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlType.java new file mode 100644 index 00000000000..01f8141c392 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlType.java @@ -0,0 +1,567 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbase; + +import org.apache.commons.lang3.StringUtils; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.sql.Date; +import java.sql.SQLType; +import java.sql.Time; +import java.sql.Timestamp; +import java.sql.Types; +import java.time.LocalDateTime; + +public enum OceanBaseMysqlType implements SQLType { + DECIMAL( + "DECIMAL", + Types.DECIMAL, + BigDecimal.class, + OceanBaseMysqlType.FIELD_FLAG_ZEROFILL, + OceanBaseMysqlType.IS_DECIMAL, + 65L, + "[(M[,D])] [UNSIGNED] [ZEROFILL]"), + + DECIMAL_UNSIGNED( + "DECIMAL UNSIGNED", + Types.DECIMAL, + BigDecimal.class, + OceanBaseMysqlType.FIELD_FLAG_UNSIGNED | OceanBaseMysqlType.FIELD_FLAG_ZEROFILL, + OceanBaseMysqlType.IS_DECIMAL, + 65L, + "[(M[,D])] [UNSIGNED] [ZEROFILL]"), + + TINYINT( + "TINYINT", + Types.TINYINT, + Integer.class, + OceanBaseMysqlType.FIELD_FLAG_ZEROFILL, + OceanBaseMysqlType.IS_DECIMAL, + 3L, + "[(M)] [UNSIGNED] [ZEROFILL]"), + + TINYINT_UNSIGNED( + "TINYINT UNSIGNED", + Types.TINYINT, + Integer.class, + OceanBaseMysqlType.FIELD_FLAG_UNSIGNED | OceanBaseMysqlType.FIELD_FLAG_ZEROFILL, + OceanBaseMysqlType.IS_DECIMAL, + 3L, + "[(M)] [UNSIGNED] [ZEROFILL]"), + + BOOLEAN("BOOLEAN", Types.BOOLEAN, Boolean.class, 0, OceanBaseMysqlType.IS_NOT_DECIMAL, 3L, ""), + + SMALLINT( + "SMALLINT", + Types.SMALLINT, + Integer.class, + OceanBaseMysqlType.FIELD_FLAG_ZEROFILL, + OceanBaseMysqlType.IS_DECIMAL, + 5L, + "[(M)] [UNSIGNED] [ZEROFILL]"), + + SMALLINT_UNSIGNED( + "SMALLINT UNSIGNED", + Types.SMALLINT, + Integer.class, + OceanBaseMysqlType.FIELD_FLAG_UNSIGNED | OceanBaseMysqlType.FIELD_FLAG_ZEROFILL, + OceanBaseMysqlType.IS_DECIMAL, + 5L, + "[(M)] [UNSIGNED] [ZEROFILL]"), + + INT( + "INT", + Types.INTEGER, + Integer.class, + OceanBaseMysqlType.FIELD_FLAG_ZEROFILL, + OceanBaseMysqlType.IS_DECIMAL, + 10L, + "[(M)] [UNSIGNED] [ZEROFILL]"), + + INT_UNSIGNED( + "INT UNSIGNED", + Types.INTEGER, + Long.class, + OceanBaseMysqlType.FIELD_FLAG_UNSIGNED | OceanBaseMysqlType.FIELD_FLAG_ZEROFILL, + OceanBaseMysqlType.IS_DECIMAL, + 10L, + "[(M)] [UNSIGNED] [ZEROFILL]"), + + FLOAT( + "FLOAT", + Types.REAL, + Float.class, + OceanBaseMysqlType.FIELD_FLAG_ZEROFILL, + OceanBaseMysqlType.IS_DECIMAL, + 12L, + "[(M,D)] [UNSIGNED] [ZEROFILL]"), + + FLOAT_UNSIGNED( + "FLOAT UNSIGNED", + Types.REAL, + Float.class, + OceanBaseMysqlType.FIELD_FLAG_UNSIGNED | OceanBaseMysqlType.FIELD_FLAG_ZEROFILL, + OceanBaseMysqlType.IS_DECIMAL, + 12L, + "[(M,D)] [UNSIGNED] [ZEROFILL]"), + + DOUBLE( + "DOUBLE", + Types.DOUBLE, + Double.class, + OceanBaseMysqlType.FIELD_FLAG_ZEROFILL, + OceanBaseMysqlType.IS_DECIMAL, + 22L, + "[(M,D)] [UNSIGNED] [ZEROFILL]"), + + DOUBLE_UNSIGNED( + "DOUBLE UNSIGNED", + Types.DOUBLE, + Double.class, + OceanBaseMysqlType.FIELD_FLAG_UNSIGNED | OceanBaseMysqlType.FIELD_FLAG_ZEROFILL, + OceanBaseMysqlType.IS_DECIMAL, + 22L, + "[(M,D)] [UNSIGNED] [ZEROFILL]"), + /** FIELD_TYPE_NULL = 6 */ + NULL("NULL", Types.NULL, Object.class, 0, OceanBaseMysqlType.IS_NOT_DECIMAL, 0L, ""), + + TIMESTAMP( + "TIMESTAMP", + Types.TIMESTAMP, + Timestamp.class, + 0, + OceanBaseMysqlType.IS_NOT_DECIMAL, + 26L, + "[(fsp)]"), + + BIGINT( + "BIGINT", + Types.BIGINT, + Long.class, + OceanBaseMysqlType.FIELD_FLAG_ZEROFILL, + OceanBaseMysqlType.IS_DECIMAL, + 19L, + "[(M)] [UNSIGNED] [ZEROFILL]"), + + BIGINT_UNSIGNED( + "BIGINT UNSIGNED", + Types.BIGINT, + BigInteger.class, + OceanBaseMysqlType.FIELD_FLAG_UNSIGNED | OceanBaseMysqlType.FIELD_FLAG_ZEROFILL, + OceanBaseMysqlType.IS_DECIMAL, + 20L, + "[(M)] [UNSIGNED] [ZEROFILL]"), + + MEDIUMINT( + "MEDIUMINT", + Types.INTEGER, + Integer.class, + OceanBaseMysqlType.FIELD_FLAG_ZEROFILL, + OceanBaseMysqlType.IS_DECIMAL, + 7L, + "[(M)] [UNSIGNED] [ZEROFILL]"), + + MEDIUMINT_UNSIGNED( + "MEDIUMINT UNSIGNED", + Types.INTEGER, + Integer.class, + OceanBaseMysqlType.FIELD_FLAG_UNSIGNED | OceanBaseMysqlType.FIELD_FLAG_ZEROFILL, + OceanBaseMysqlType.IS_DECIMAL, + 8L, + "[(M)] [UNSIGNED] [ZEROFILL]"), + + DATE("DATE", Types.DATE, Date.class, 0, OceanBaseMysqlType.IS_NOT_DECIMAL, 10L, ""), + + TIME("TIME", Types.TIME, Time.class, 0, OceanBaseMysqlType.IS_NOT_DECIMAL, 16L, "[(fsp)]"), + + DATETIME( + "DATETIME", + Types.TIMESTAMP, + LocalDateTime.class, + 0, + OceanBaseMysqlType.IS_NOT_DECIMAL, + 26L, + "[(fsp)]"), + + YEAR("YEAR", Types.DATE, Date.class, 0, OceanBaseMysqlType.IS_NOT_DECIMAL, 4L, "[(4)]"), + + VARCHAR( + "VARCHAR", + Types.VARCHAR, + String.class, + 0, + OceanBaseMysqlType.IS_NOT_DECIMAL, + 65535L, + "(M) [CHARACTER SET charset_name] [COLLATE collation_name]"), + + VARBINARY( + "VARBINARY", + Types.VARBINARY, + null, + 0, + OceanBaseMysqlType.IS_NOT_DECIMAL, + 65535L, + "(M)"), + + BIT("BIT", Types.BIT, Boolean.class, 0, OceanBaseMysqlType.IS_DECIMAL, 1L, "[(M)]"), + + JSON( + "JSON", + Types.LONGVARCHAR, + String.class, + 0, + OceanBaseMysqlType.IS_NOT_DECIMAL, + 1073741824L, + ""), + + ENUM( + "ENUM", + Types.CHAR, + String.class, + 0, + OceanBaseMysqlType.IS_NOT_DECIMAL, + 65535L, + "('value1','value2',...) [CHARACTER SET charset_name] [COLLATE collation_name]"), + + SET( + "SET", + Types.CHAR, + String.class, + 0, + OceanBaseMysqlType.IS_NOT_DECIMAL, + 64L, + "('value1','value2',...) [CHARACTER SET charset_name] [COLLATE collation_name]"), + + TINYBLOB("TINYBLOB", Types.VARBINARY, null, 0, OceanBaseMysqlType.IS_NOT_DECIMAL, 255L, ""), + + TINYTEXT( + "TINYTEXT", + Types.VARCHAR, + String.class, + 0, + OceanBaseMysqlType.IS_NOT_DECIMAL, + 255L, + " [CHARACTER SET charset_name] [COLLATE collation_name]"), + + MEDIUMBLOB( + "MEDIUMBLOB", + Types.LONGVARBINARY, + null, + 0, + OceanBaseMysqlType.IS_NOT_DECIMAL, + 16777215L, + ""), + + MEDIUMTEXT( + "MEDIUMTEXT", + Types.LONGVARCHAR, + String.class, + 0, + OceanBaseMysqlType.IS_NOT_DECIMAL, + 16777215L, + " [CHARACTER SET charset_name] [COLLATE collation_name]"), + + LONGBLOB( + "LONGBLOB", + Types.LONGVARBINARY, + null, + 0, + OceanBaseMysqlType.IS_NOT_DECIMAL, + 4294967295L, + ""), + + LONGTEXT( + "LONGTEXT", + Types.LONGVARCHAR, + String.class, + 0, + OceanBaseMysqlType.IS_NOT_DECIMAL, + 4294967295L, + " [CHARACTER SET charset_name] [COLLATE collation_name]"), + + BLOB("BLOB", Types.LONGVARBINARY, null, 0, OceanBaseMysqlType.IS_NOT_DECIMAL, 65535L, "[(M)]"), + + TEXT( + "TEXT", + Types.LONGVARCHAR, + String.class, + 0, + OceanBaseMysqlType.IS_NOT_DECIMAL, + 65535L, + "[(M)] [CHARACTER SET charset_name] [COLLATE collation_name]"), + + CHAR( + "CHAR", + Types.CHAR, + String.class, + 0, + OceanBaseMysqlType.IS_NOT_DECIMAL, + 255L, + "[(M)] [CHARACTER SET charset_name] [COLLATE collation_name]"), + + BINARY("BINARY", Types.BINARY, null, 0, OceanBaseMysqlType.IS_NOT_DECIMAL, 255L, "(M)"), + + GEOMETRY("GEOMETRY", Types.BINARY, null, 0, OceanBaseMysqlType.IS_NOT_DECIMAL, 65535L, ""), + // is represented by BLOB + UNKNOWN("UNKNOWN", Types.OTHER, null, 0, OceanBaseMysqlType.IS_NOT_DECIMAL, 65535L, ""); + + private final String name; + protected int jdbcType; + protected final Class javaClass; + private final int flagsMask; + private final boolean isDecimal; + private final Long precision; + private final String createParams; + + private OceanBaseMysqlType( + String oceanBaseMysqlTypeName, + int jdbcType, + Class javaClass, + int allowedFlags, + boolean isDec, + Long precision, + String createParams) { + this.name = oceanBaseMysqlTypeName; + this.jdbcType = jdbcType; + this.javaClass = javaClass; + this.flagsMask = allowedFlags; + this.isDecimal = isDec; + this.precision = precision; + this.createParams = createParams; + } + + public static final int FIELD_FLAG_UNSIGNED = 32; + public static final int FIELD_FLAG_ZEROFILL = 64; + + private static final boolean IS_DECIMAL = true; + private static final boolean IS_NOT_DECIMAL = false; + + public static OceanBaseMysqlType getByName(String fullMysqlTypeName) { + + String typeName = ""; + + if (fullMysqlTypeName.indexOf("(") != -1) { + typeName = fullMysqlTypeName.substring(0, fullMysqlTypeName.indexOf("(")).trim(); + } else { + typeName = fullMysqlTypeName; + } + + // the order of checks is important because some short names could match parts of longer + // names + if (StringUtils.indexOfIgnoreCase(typeName, "DECIMAL") != -1 + || StringUtils.indexOfIgnoreCase(typeName, "DEC") != -1 + || StringUtils.indexOfIgnoreCase(typeName, "NUMERIC") != -1 + || StringUtils.indexOfIgnoreCase(typeName, "FIXED") != -1) { + return StringUtils.indexOfIgnoreCase(fullMysqlTypeName, "UNSIGNED") != -1 + ? DECIMAL_UNSIGNED + : DECIMAL; + + } else if (StringUtils.indexOfIgnoreCase(typeName, "TINYBLOB") != -1) { + // IMPORTANT: "TINYBLOB" must be checked before "TINY" + return TINYBLOB; + + } else if (StringUtils.indexOfIgnoreCase(typeName, "TINYTEXT") != -1) { + // IMPORTANT: "TINYTEXT" must be checked before "TINY" + return TINYTEXT; + + } else if (StringUtils.indexOfIgnoreCase(typeName, "TINYINT") != -1 + || StringUtils.indexOfIgnoreCase(typeName, "TINY") != -1 + || StringUtils.indexOfIgnoreCase(typeName, "INT1") != -1) { + return StringUtils.indexOfIgnoreCase(fullMysqlTypeName, "UNSIGNED") != -1 + || StringUtils.indexOfIgnoreCase(fullMysqlTypeName, "ZEROFILL") != -1 + ? TINYINT_UNSIGNED + : TINYINT; + + } else if (StringUtils.indexOfIgnoreCase(typeName, "MEDIUMINT") != -1 + // IMPORTANT: "INT24" must be checked before "INT2" + || StringUtils.indexOfIgnoreCase(typeName, "INT24") != -1 + || StringUtils.indexOfIgnoreCase(typeName, "INT3") != -1 + || StringUtils.indexOfIgnoreCase(typeName, "MIDDLEINT") != -1) { + return StringUtils.indexOfIgnoreCase(fullMysqlTypeName, "UNSIGNED") != -1 + || StringUtils.indexOfIgnoreCase(fullMysqlTypeName, "ZEROFILL") != -1 + ? MEDIUMINT_UNSIGNED + : MEDIUMINT; + + } else if (StringUtils.indexOfIgnoreCase(typeName, "SMALLINT") != -1 + || StringUtils.indexOfIgnoreCase(typeName, "INT2") != -1) { + return StringUtils.indexOfIgnoreCase(fullMysqlTypeName, "UNSIGNED") != -1 + || StringUtils.indexOfIgnoreCase(fullMysqlTypeName, "ZEROFILL") != -1 + ? SMALLINT_UNSIGNED + : SMALLINT; + + } else if (StringUtils.indexOfIgnoreCase(typeName, "BIGINT") != -1 + || StringUtils.indexOfIgnoreCase(typeName, "SERIAL") != -1 + || StringUtils.indexOfIgnoreCase(typeName, "INT8") != -1) { + // SERIAL is an alias for BIGINT UNSIGNED NOT NULL AUTO_INCREMENT UNIQUE. + return StringUtils.indexOfIgnoreCase(fullMysqlTypeName, "UNSIGNED") != -1 + || StringUtils.indexOfIgnoreCase(fullMysqlTypeName, "ZEROFILL") != -1 + ? BIGINT_UNSIGNED + : BIGINT; + + } else if (StringUtils.indexOfIgnoreCase(typeName, "POINT") != -1) { + // also covers "MULTIPOINT" + // IMPORTANT: "POINT" must be checked before "INT" + } else if (StringUtils.indexOfIgnoreCase(typeName, "INT") != -1 + || StringUtils.indexOfIgnoreCase(typeName, "INTEGER") != -1 + || StringUtils.indexOfIgnoreCase(typeName, "INT4") != -1) { + // IMPORTANT: "INT" must be checked after all "*INT*" types + return StringUtils.indexOfIgnoreCase(fullMysqlTypeName, "UNSIGNED") != -1 + || StringUtils.indexOfIgnoreCase(fullMysqlTypeName, "ZEROFILL") != -1 + ? INT_UNSIGNED + : INT; + + } else if (StringUtils.indexOfIgnoreCase(typeName, "DOUBLE") != -1 + || StringUtils.indexOfIgnoreCase(typeName, "REAL") != -1 + /* || StringUtils.indexOfIgnoreCase(name, "DOUBLE PRECISION") != -1 is caught by "DOUBLE" check */ + // IMPORTANT: "FLOAT8" must be checked before "FLOAT" + || StringUtils.indexOfIgnoreCase(typeName, "FLOAT8") != -1) { + return StringUtils.indexOfIgnoreCase(fullMysqlTypeName, "UNSIGNED") != -1 + || StringUtils.indexOfIgnoreCase(fullMysqlTypeName, "ZEROFILL") != -1 + ? DOUBLE_UNSIGNED + : DOUBLE; + + } else if (StringUtils.indexOfIgnoreCase(typeName, "FLOAT") != -1 /* + * || StringUtils.indexOfIgnoreCase(name, "FLOAT4") != -1 is caught by + * "FLOAT" check + */) { + return StringUtils.indexOfIgnoreCase(fullMysqlTypeName, "UNSIGNED") != -1 + || StringUtils.indexOfIgnoreCase(fullMysqlTypeName, "ZEROFILL") != -1 + ? FLOAT_UNSIGNED + : FLOAT; + + } else if (StringUtils.indexOfIgnoreCase(typeName, "NULL") != -1) { + return NULL; + + } else if (StringUtils.indexOfIgnoreCase(typeName, "TIMESTAMP") != -1) { + // IMPORTANT: "TIMESTAMP" must be checked before "TIME" + return TIMESTAMP; + + } else if (StringUtils.indexOfIgnoreCase(typeName, "DATETIME") != -1) { + // IMPORTANT: "DATETIME" must be checked before "DATE" and "TIME" + return DATETIME; + + } else if (StringUtils.indexOfIgnoreCase(typeName, "DATE") != -1) { + return DATE; + + } else if (StringUtils.indexOfIgnoreCase(typeName, "TIME") != -1) { + return TIME; + + } else if (StringUtils.indexOfIgnoreCase(typeName, "YEAR") != -1) { + return YEAR; + + } else if (StringUtils.indexOfIgnoreCase(typeName, "LONGBLOB") != -1) { + // IMPORTANT: "LONGBLOB" must be checked before "LONG" and "BLOB" + return LONGBLOB; + + } else if (StringUtils.indexOfIgnoreCase(typeName, "LONGTEXT") != -1) { + // IMPORTANT: "LONGTEXT" must be checked before "LONG" and "TEXT" + return LONGTEXT; + + } else if (StringUtils.indexOfIgnoreCase(typeName, "MEDIUMBLOB") != -1 + || StringUtils.indexOfIgnoreCase(typeName, "LONG VARBINARY") != -1) { + // IMPORTANT: "MEDIUMBLOB" must be checked before "BLOB" + // IMPORTANT: "LONG VARBINARY" must be checked before "LONG" and "VARBINARY" + return MEDIUMBLOB; + + } else if (StringUtils.indexOfIgnoreCase(typeName, "MEDIUMTEXT") != -1 + || StringUtils.indexOfIgnoreCase(typeName, "LONG VARCHAR") != -1 + || StringUtils.indexOfIgnoreCase(typeName, "LONG") != -1) { + // IMPORTANT: "MEDIUMTEXT" must be checked before "TEXT" + // IMPORTANT: "LONG VARCHAR" must be checked before "VARCHAR" + return MEDIUMTEXT; + + } else if (StringUtils.indexOfIgnoreCase(typeName, "VARCHAR") != -1 + || StringUtils.indexOfIgnoreCase(typeName, "NVARCHAR") != -1 + || StringUtils.indexOfIgnoreCase(typeName, "NATIONAL VARCHAR") != -1 + || StringUtils.indexOfIgnoreCase(typeName, "CHARACTER VARYING") != -1) { + // IMPORTANT: "CHARACTER VARYING" must be checked before "CHARACTER" and "CHAR" + return VARCHAR; + + } else if (StringUtils.indexOfIgnoreCase(typeName, "VARBINARY") != -1) { + return VARBINARY; + + } else if (StringUtils.indexOfIgnoreCase(typeName, "BINARY") != -1 + || StringUtils.indexOfIgnoreCase(typeName, "CHAR BYTE") != -1) { + // IMPORTANT: "BINARY" must be checked after all "*BINARY" types + // IMPORTANT: "CHAR BYTE" must be checked before "CHAR" + return BINARY; + + } else if (StringUtils.indexOfIgnoreCase(typeName, "LINESTRING") != -1) { + // also covers "MULTILINESTRING" + // IMPORTANT: "LINESTRING" must be checked before "STRING" + return GEOMETRY; + + } else if (StringUtils.indexOfIgnoreCase(typeName, "STRING") != -1 + // IMPORTANT: "CHAR" must be checked after all "*CHAR*" types + || StringUtils.indexOfIgnoreCase(typeName, "CHAR") != -1 + || StringUtils.indexOfIgnoreCase(typeName, "NCHAR") != -1 + || StringUtils.indexOfIgnoreCase(typeName, "NATIONAL CHAR") != -1 + || StringUtils.indexOfIgnoreCase(typeName, "CHARACTER") != -1) { + return CHAR; + + } else if (StringUtils.indexOfIgnoreCase(typeName, "BOOLEAN") != -1 + || StringUtils.indexOfIgnoreCase(typeName, "BOOL") != -1) { + return BOOLEAN; + + } else if (StringUtils.indexOfIgnoreCase(typeName, "BIT") != -1) { + return BIT; + + } else if (StringUtils.indexOfIgnoreCase(typeName, "JSON") != -1) { + return JSON; + + } else if (StringUtils.indexOfIgnoreCase(typeName, "ENUM") != -1) { + return ENUM; + + } else if (StringUtils.indexOfIgnoreCase(typeName, "SET") != -1) { + return SET; + + } else if (StringUtils.indexOfIgnoreCase(typeName, "BLOB") != -1) { + return BLOB; + + } else if (StringUtils.indexOfIgnoreCase(typeName, "TEXT") != -1) { + return TEXT; + + } else if (StringUtils.indexOfIgnoreCase(typeName, "GEOM") + != -1 // covers "GEOMETRY", "GEOMETRYCOLLECTION" and "GEOMCOLLECTION" + || StringUtils.indexOfIgnoreCase(typeName, "POINT") + != -1 // also covers "MULTIPOINT" + || StringUtils.indexOfIgnoreCase(typeName, "POLYGON") + != -1 // also covers "MULTIPOLYGON" + ) { + return GEOMETRY; + } + + return UNKNOWN; + } + + @Override + public String getVendor() { + return "com.oceanbase"; + } + + @Override + public Integer getVendorTypeNumber() { + return this.jdbcType; + } + + @Override + public String getName() { + return this.name; + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java index a6896322065..860131041a9 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java @@ -393,6 +393,8 @@ private static ReadonlyConfig extractCatalogConfig(JdbcConnectionConfig config) .ifPresent(val -> catalogConfig.put(JdbcCatalogOptions.USERNAME.key(), val)); config.getPassword() .ifPresent(val -> catalogConfig.put(JdbcCatalogOptions.PASSWORD.key(), val)); + Optional.ofNullable(config.getCompatibleMode()) + .ifPresent(val -> catalogConfig.put(JdbcCatalogOptions.COMPATIBLE_MODE.key(), val)); return ReadonlyConfig.fromMap(catalogConfig); } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMysqlIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMysqlIT.java index 3208473d619..a747058391b 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMysqlIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMysqlIT.java @@ -66,16 +66,10 @@ public class JdbcOceanBaseMysqlIT extends JdbcOceanBaseITBase { "bash", "-c", "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd /tmp/seatunnel/plugins/Jdbc/lib && wget " - + driverUrl() - + " && wget " - + mysqlDriverUrl()); + + driverUrl()); Assertions.assertEquals(0, extraCommands.getExitCode(), extraCommands.getStderr()); }; - String mysqlDriverUrl() { - return "https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.32/mysql-connector-j-8.0.32.jar"; - } - @Override List configFile() { return Lists.newArrayList("/jdbc_oceanbase_mysql_source_and_sink.conf");