Skip to content

Commit

Permalink
[Fix][Connector-V2][OceanBase] Remove OceanBase catalog's dependency …
Browse files Browse the repository at this point in the history
…on mysql driver (apache#7311)
xxsc0529 authored Aug 9, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent 9216627 commit 3130ae0
Showing 11 changed files with 1,954 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -31,13 +31,18 @@

import org.apache.commons.lang3.StringUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.auto.service.AutoService;

import java.util.Optional;

@AutoService(Factory.class)
public class OceanBaseCatalogFactory implements CatalogFactory {

private static final Logger log = LoggerFactory.getLogger(OceanBaseCatalogFactory.class);

@Override
public String factoryIdentifier() {
return DatabaseIdentifier.OCENABASE;
Original file line number Diff line number Diff line change
@@ -17,10 +17,44 @@

package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oceanbase;

import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MySqlCatalog;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbase.OceanBaseMySqlTypeConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbase.OceanBaseMySqlTypeMapper;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbase.OceanBaseMysqlType;

public class OceanBaseMySqlCatalog extends MySqlCatalog {
import com.google.common.base.Preconditions;
import lombok.extern.slf4j.Slf4j;

import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;

@Slf4j
public class OceanBaseMySqlCatalog extends AbstractJdbcCatalog {

private static final String SELECT_COLUMNS_SQL_TEMPLATE =
"SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME ='%s' ORDER BY ORDINAL_POSITION ASC";

private static final String SELECT_DATABASE_EXISTS =
"SELECT SCHEMA_NAME FROM information_schema.schemata WHERE SCHEMA_NAME = '%s'";

private static final String SELECT_TABLE_EXISTS =
"SELECT TABLE_SCHEMA,TABLE_NAME FROM information_schema.tables WHERE table_schema = '%s' AND table_name = '%s'";

static {
SYS_DATABASES.clear();
@@ -32,8 +66,161 @@ public class OceanBaseMySqlCatalog extends MySqlCatalog {
SYS_DATABASES.add("SYS");
}

private OceanBaseMySqlTypeConverter typeConverter;

public OceanBaseMySqlCatalog(
String catalogName, String username, String pwd, JdbcUrlUtil.UrlInfo urlInfo) {
super(catalogName, username, pwd, urlInfo);
super(catalogName, username, pwd, urlInfo, null);
this.typeConverter = new OceanBaseMySqlTypeConverter();
}

@Override
protected String getDatabaseWithConditionSql(String databaseName) {
return String.format(SELECT_DATABASE_EXISTS, databaseName);
}

@Override
protected String getTableWithConditionSql(TablePath tablePath) {
return String.format(
SELECT_TABLE_EXISTS, tablePath.getDatabaseName(), tablePath.getTableName());
}

@Override
protected String getListDatabaseSql() {
return "SHOW DATABASES;";
}

@Override
protected String getListTableSql(String databaseName) {
return "SHOW TABLES;";
}

@Override
protected String getTableName(ResultSet rs) throws SQLException {
return rs.getString(1);
}

@Override
protected String getTableName(TablePath tablePath) {
return tablePath.getTableName();
}

@Override
protected String getSelectColumnsSql(TablePath tablePath) {
return String.format(
SELECT_COLUMNS_SQL_TEMPLATE, tablePath.getDatabaseName(), tablePath.getTableName());
}

@Override
protected TableIdentifier getTableIdentifier(TablePath tablePath) {
return TableIdentifier.of(
catalogName, tablePath.getDatabaseName(), tablePath.getTableName());
}

@Override
protected List<ConstraintKey> getConstraintKeys(DatabaseMetaData metaData, TablePath tablePath)
throws SQLException {
List<ConstraintKey> indexList =
super.getConstraintKeys(
metaData,
tablePath.getDatabaseName(),
tablePath.getSchemaName(),
tablePath.getTableName());
for (Iterator<ConstraintKey> it = indexList.iterator(); it.hasNext(); ) {
ConstraintKey index = it.next();
if (ConstraintKey.ConstraintType.UNIQUE_KEY.equals(index.getConstraintType())
&& "PRIMARY".equals(index.getConstraintName())) {
it.remove();
}
}
return indexList;
}

@Override
protected Column buildColumn(ResultSet resultSet) throws SQLException {
String columnName = resultSet.getString("COLUMN_NAME");
// e.g. tinyint(1) unsigned
String columnType = resultSet.getString("COLUMN_TYPE");
// e.g. tinyint
String dataType = resultSet.getString("DATA_TYPE").toUpperCase();
String comment = resultSet.getString("COLUMN_COMMENT");
Object defaultValue = resultSet.getObject("COLUMN_DEFAULT");
String isNullableStr = resultSet.getString("IS_NULLABLE");
boolean isNullable = isNullableStr.equals("YES");
// e.g. `decimal(10, 2)` is 10
long numberPrecision = resultSet.getInt("NUMERIC_PRECISION");
// e.g. `decimal(10, 2)` is 2
int numberScale = resultSet.getInt("NUMERIC_SCALE");
// e.g. `varchar(10)` is 40
long charOctetLength = resultSet.getLong("CHARACTER_OCTET_LENGTH");
// e.g. `timestamp(3)` is 3
// int timePrecision =
// MySqlVersion.V_5_5.equals(version) ? 0 :
// resultSet.getInt("DATETIME_PRECISION");
int timePrecision = resultSet.getInt("DATETIME_PRECISION");
Preconditions.checkArgument(!(numberPrecision > 0 && charOctetLength > 0));
Preconditions.checkArgument(!(numberScale > 0 && timePrecision > 0));

OceanBaseMysqlType oceanbaseMysqlType = OceanBaseMysqlType.getByName(columnType);
boolean unsigned = columnType.toLowerCase(Locale.ROOT).contains("unsigned");

BasicTypeDefine<OceanBaseMysqlType> typeDefine =
BasicTypeDefine.<OceanBaseMysqlType>builder()
.name(columnName)
.columnType(columnType)
.dataType(dataType)
.nativeType(oceanbaseMysqlType)
.unsigned(unsigned)
.length(Math.max(charOctetLength, numberPrecision))
.precision(numberPrecision)
.scale(Math.max(numberScale, timePrecision))
.nullable(isNullable)
.defaultValue(defaultValue)
.comment(comment)
.build();
return typeConverter.convert(typeDefine);
}

@Override
protected String getCreateTableSql(TablePath tablePath, CatalogTable table) {
return OceanBaseMysqlCreateTableSqlBuilder.builder(tablePath, table, typeConverter)
.build(table.getCatalogName());
}

@Override
protected String getDropTableSql(TablePath tablePath) {
return String.format(
"DROP TABLE `%s`.`%s`;", tablePath.getDatabaseName(), tablePath.getTableName());
}

@Override
protected String getCreateDatabaseSql(String databaseName) {
return String.format("CREATE DATABASE `%s`;", databaseName);
}

@Override
protected String getDropDatabaseSql(String databaseName) {
return String.format("DROP DATABASE `%s`;", databaseName);
}

@Override
public CatalogTable getTable(String sqlQuery) throws SQLException {
Connection defaultConnection = getConnection(defaultUrl);
Statement statement = defaultConnection.createStatement();
ResultSetMetaData metaData = statement.executeQuery(sqlQuery).getMetaData();
return CatalogUtils.getCatalogTable(
metaData, new OceanBaseMySqlTypeMapper(typeConverter), sqlQuery);
}

@Override
protected String getTruncateTableSql(TablePath tablePath) throws CatalogException {
return String.format(
"TRUNCATE TABLE `%s`.`%s`;", tablePath.getDatabaseName(), tablePath.getTableName());
}

public String getExistDataSql(TablePath tablePath) {
return String.format(
"SELECT * FROM `%s`.`%s` LIMIT 1;",
tablePath.getDatabaseName(), tablePath.getTableName());
}
}
Loading

0 comments on commit 3130ae0

Please sign in to comment.