diff --git a/docs/en/connector-v2/sink/Maxcompute.md b/docs/en/connector-v2/sink/Maxcompute.md index de5fc931203..2f428341896 100644 --- a/docs/en/connector-v2/sink/Maxcompute.md +++ b/docs/en/connector-v2/sink/Maxcompute.md @@ -51,6 +51,64 @@ Used to read data from Maxcompute. `overwrite` Whether to overwrite the table or partition, default: false. +### save_mode_create_template + +We use templates to automatically create MaxCompute tables, +which will create corresponding table creation statements based on the type of upstream data and schema type, +and the default template can be modified according to the situation. Only work on multi-table mode at now. + +Default template: + +```sql +CREATE TABLE IF NOT EXISTS `${table}` ( +${rowtype_fields} +); +``` + +If a custom field is filled in the template, such as adding an `id` field + +```sql +CREATE TABLE IF NOT EXISTS `${table}` +( + id, + ${rowtype_fields} +); +``` + +The connector will automatically obtain the corresponding type from the upstream to complete the filling, +and remove the id field from `rowtype_fields`. This method can be used to customize the modification of field types and attributes. + +You can use the following placeholders + +- database: Used to get the database in the upstream schema +- table_name: Used to get the table name in the upstream schema +- rowtype_fields: Used to get all the fields in the upstream schema, we will automatically map to the field + description of MaxCompute +- rowtype_primary_key: Used to get the primary key in the upstream schema (maybe a list) +- rowtype_unique_key: Used to get the unique key in the upstream schema (maybe a list) + +### schema_save_mode[Enum] + +Before the synchronous task is turned on, different treatment schemes are selected for the existing surface structure of the target side. +Option introduction: +`RECREATE_SCHEMA` :Will create when the table does not exist, delete and rebuild when the table is saved +`CREATE_SCHEMA_WHEN_NOT_EXIST` :Will Created when the table does not exist, skipped when the table is saved +`ERROR_WHEN_SCHEMA_NOT_EXIST` :Error will be reported when the table does not exist +`IGNORE` :Ignore the treatment of the table + +### data_save_mode[Enum] + +Before the synchronous task is turned on, different processing schemes are selected for data existing data on the target side. +Option introduction: +`DROP_DATA`: Preserve database structure and delete data +`APPEND_DATA`:Preserve database structure, preserve data +`CUSTOM_PROCESSING`:User defined processing +`ERROR_WHEN_DATA_EXISTS`:When there is data, an error is reported + +### custom_sql[String] + +When data_save_mode selects CUSTOM_PROCESSING, you should fill in the CUSTOM_SQL parameter. This parameter usually fills in a SQL that can be executed. SQL will be executed before synchronization tasks. + ### common options Sink plugin common parameters, please refer to [Sink Common Options](../sink-common-options.md) for details. @@ -70,10 +128,3 @@ sink { } } ``` - -## Changelog - -### next version - -- [Feature] Add Maxcompute Sink Connector([3640](https://github.com/apache/seatunnel/pull/3640)) - diff --git a/seatunnel-connectors-v2/connector-maxcompute/pom.xml b/seatunnel-connectors-v2/connector-maxcompute/pom.xml index 12029cd0156..3c4261f930e 100644 --- a/seatunnel-connectors-v2/connector-maxcompute/pom.xml +++ b/seatunnel-connectors-v2/connector-maxcompute/pom.xml @@ -30,7 +30,7 @@ SeaTunnel : Connectors V2 : Maxcompute - 0.31.3 + 0.51.0 3.4 diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalog.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalog.java index 9bd27081725..f90a5c40862 100644 --- a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalog.java +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalog.java @@ -22,29 +22,46 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.table.catalog.Catalog; import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.PreviewResult; +import org.apache.seatunnel.api.table.catalog.SQLPreviewResult; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.api.table.catalog.exception.CatalogException; import org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException; import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException; import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException; import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException; +import org.apache.seatunnel.api.table.converter.BasicTypeDefine; +import org.apache.seatunnel.connectors.seatunnel.maxcompute.datatype.MaxComputeTypeConverter; +import org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeUtil; + +import org.apache.commons.lang3.StringUtils; import com.aliyun.odps.Odps; import com.aliyun.odps.OdpsException; +import com.aliyun.odps.PartitionSpec; import com.aliyun.odps.Projects; +import com.aliyun.odps.Table; import com.aliyun.odps.Tables; import com.aliyun.odps.account.Account; import com.aliyun.odps.account.AliyunAccount; +import com.aliyun.odps.task.SQLTask; +import com.aliyun.odps.type.TypeInfo; import lombok.extern.slf4j.Slf4j; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Optional; import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_ID; import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_KEY; import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ENDPOINT; +import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC; import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PROJECT; +import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.SAVE_MODE_CREATE_TEMPLATE; +import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument; @Slf4j public class MaxComputeCatalog implements Catalog { @@ -79,11 +96,9 @@ public String getDefaultDatabase() throws CatalogException { @Override public boolean databaseExists(String databaseName) throws CatalogException { - Odps odps = new Odps(account); - odps.setEndpoint(readonlyConfig.get(ENDPOINT)); - odps.setDefaultProject(readonlyConfig.get(PROJECT)); - Projects projects = odps.projects(); try { + Odps odps = getOdps(readonlyConfig.get(PROJECT)); + Projects projects = odps.projects(); return projects.exists(databaseName); } catch (OdpsException e) { throw new CatalogException("Check " + databaseName + " exist error", e); @@ -107,9 +122,7 @@ public List listDatabases() throws CatalogException { @Override public List listTables(String databaseName) throws CatalogException, DatabaseNotExistException { - Odps odps = new Odps(account); - odps.setEndpoint(readonlyConfig.get(ENDPOINT)); - odps.setDefaultProject(databaseName); + Odps odps = getOdps(databaseName); Tables tables = odps.tables(); List tableNames = new ArrayList<>(); @@ -122,12 +135,9 @@ public List listTables(String databaseName) @Override public boolean tableExists(TablePath tablePath) throws CatalogException { - Odps odps = new Odps(account); - odps.setEndpoint(readonlyConfig.get(ENDPOINT)); - odps.setDefaultProject(tablePath.getDatabaseName()); - - Tables tables = odps.tables(); try { + Odps odps = getOdps(tablePath.getDatabaseName()); + com.aliyun.odps.Tables tables = odps.tables(); return tables.exists(tablePath.getTableName()); } catch (OdpsException e) { throw new CatalogException("tableExists" + tablePath + " error", e); @@ -137,19 +147,97 @@ public boolean tableExists(TablePath tablePath) throws CatalogException { @Override public CatalogTable getTable(TablePath tablePath) throws CatalogException, TableNotExistException { - throw new UnsupportedOperationException(); + if (!tableExists(tablePath)) { + throw new TableNotExistException(catalogName, tablePath); + } + Table odpsTable; + com.aliyun.odps.TableSchema odpsSchema; + boolean isPartitioned; + try { + Odps odps = getOdps(tablePath.getDatabaseName()); + odpsTable = + MaxcomputeUtil.parseTable( + odps, tablePath.getDatabaseName(), tablePath.getTableName()); + odpsSchema = odpsTable.getSchema(); + isPartitioned = odpsTable.isPartitioned(); + } catch (Exception ex) { + throw new CatalogException(catalogName, ex); + } + List partitionKeys = new ArrayList<>(); + TableSchema.Builder builder = TableSchema.builder(); + buildColumnsWithErrorCheck( + tablePath, + builder, + odpsSchema.getColumns().iterator(), + (column) -> { + BasicTypeDefine typeDefine = + BasicTypeDefine.builder() + .name(column.getName()) + .nativeType(column.getTypeInfo()) + .columnType(column.getTypeInfo().getTypeName()) + .dataType(column.getTypeInfo().getTypeName()) + .nullable(column.isNullable()) + .comment(column.getComment()) + .build(); + return MaxComputeTypeConverter.INSTANCE.convert(typeDefine); + }); + if (isPartitioned) { + buildColumnsWithErrorCheck( + tablePath, + builder, + odpsSchema.getPartitionColumns().iterator(), + (column) -> { + BasicTypeDefine typeDefine = + BasicTypeDefine.builder() + .name(column.getName()) + .nativeType(column.getTypeInfo()) + .columnType(column.getTypeInfo().getTypeName()) + .dataType(column.getTypeInfo().getTypeName()) + .nullable(column.isNullable()) + .comment(column.getComment()) + .build(); + partitionKeys.add(column.getName()); + return MaxComputeTypeConverter.INSTANCE.convert(typeDefine); + }); + } + TableSchema tableSchema = builder.build(); + TableIdentifier tableIdentifier = getTableIdentifier(tablePath); + return CatalogTable.of( + tableIdentifier, + tableSchema, + readonlyConfig.toMap(), + partitionKeys, + odpsTable.getComment(), + catalogName); } @Override public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists) throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { - throw new UnsupportedOperationException(); + try { + Odps odps = getOdps(tablePath.getDatabaseName()); + SQLTask.run( + odps, + MaxComputeCatalogUtil.getCreateTableStatement( + readonlyConfig.get(SAVE_MODE_CREATE_TEMPLATE), + tablePath, + table)) + .waitForSuccess(); + } catch (OdpsException e) { + throw new CatalogException("create table error", e); + } } @Override public void dropTable(TablePath tablePath, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException { - throw new UnsupportedOperationException(); + try { + Odps odps = getOdps(tablePath.getDatabaseName()); + SQLTask.run(odps, MaxComputeCatalogUtil.getDropTableQuery(tablePath, ignoreIfNotExists)) + .waitForSuccess(); + } catch (OdpsException e) { + throw new CatalogException("drop table error", e); + } } @Override @@ -163,4 +251,70 @@ public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists) throws DatabaseNotExistException, CatalogException { throw new UnsupportedOperationException(); } + + @Override + public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + try { + Odps odps = getOdps(tablePath.getDatabaseName()); + Table odpsTable = odps.tables().get(tablePath.getTableName()); + if (odpsTable.isPartitioned() + && StringUtils.isNotEmpty(readonlyConfig.get(PARTITION_SPEC))) { + PartitionSpec partitionSpec = new PartitionSpec(readonlyConfig.get(PARTITION_SPEC)); + odpsTable.deletePartition(partitionSpec, ignoreIfNotExists); + odpsTable.createPartition(partitionSpec, true); + } else { + odpsTable.truncate(); + } + } catch (Exception e) { + throw new CatalogException("truncate table error", e); + } + } + + @Override + public boolean isExistsData(TablePath tablePath) { + throw new UnsupportedOperationException(); + } + + @Override + public void executeSql(TablePath tablePath, String sql) { + try { + Odps odps = getOdps(tablePath.getDatabaseName()); + SQLTask.run(odps, sql).waitForSuccess(); + } catch (OdpsException e) { + throw new CatalogException("execute sql error", e); + } + } + + @Override + public PreviewResult previewAction( + ActionType actionType, TablePath tablePath, Optional catalogTable) { + if (actionType == ActionType.CREATE_TABLE) { + checkArgument(catalogTable.isPresent(), "CatalogTable cannot be null"); + return new SQLPreviewResult( + MaxComputeCatalogUtil.getCreateTableStatement( + readonlyConfig.get(SAVE_MODE_CREATE_TEMPLATE), + tablePath, + catalogTable.get())); + } else if (actionType == ActionType.DROP_TABLE) { + return new SQLPreviewResult(MaxComputeCatalogUtil.getDropTableQuery(tablePath, true)); + } else { + throw new UnsupportedOperationException("Unsupported action type: " + actionType); + } + } + + private Odps getOdps(String project) { + Odps odps = new Odps(account); + odps.setEndpoint(readonlyConfig.get(ENDPOINT)); + odps.setDefaultProject(project); + return odps; + } + + protected TableIdentifier getTableIdentifier(TablePath tablePath) { + return TableIdentifier.of( + catalogName, + tablePath.getDatabaseName(), + tablePath.getSchemaName(), + tablePath.getTableName()); + } } diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalogFactory.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalogFactory.java index dac0d3384fc..dd34731ed93 100644 --- a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalogFactory.java +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalogFactory.java @@ -52,7 +52,7 @@ public String factoryIdentifier() { public OptionRule optionRule() { return OptionRule.builder() .required(ACCESS_ID, ACCESS_KEY, ENDPOINT, PROJECT, TABLE_NAME) - .optional(PARTITION_SPEC, SPLIT_ROW, SPLIT_ROW, SCHEMA) + .optional(PARTITION_SPEC, SPLIT_ROW, SCHEMA) .build(); } } diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalogUtil.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalogUtil.java new file mode 100644 index 00000000000..8097d95a653 --- /dev/null +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalogUtil.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.maxcompute.catalog; + +import org.apache.seatunnel.api.sink.SaveModePlaceHolder; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.converter.BasicTypeDefine; +import org.apache.seatunnel.api.table.converter.TypeConverter; +import org.apache.seatunnel.connectors.seatunnel.maxcompute.datatype.MaxComputeTypeConverter; +import org.apache.seatunnel.connectors.seatunnel.maxcompute.util.CreateTableParser; + +import org.apache.commons.lang3.StringUtils; + +import com.aliyun.odps.type.TypeInfo; +import lombok.extern.slf4j.Slf4j; + +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull; + +@Slf4j +public class MaxComputeCatalogUtil { + + public static String getDropTableQuery(TablePath tablePath, boolean ignoreIfNotExists) { + return "DROP TABLE " + + (ignoreIfNotExists ? "IF EXISTS " : "") + + tablePath.getFullName() + + ";"; + } + + /** + * @param createTableTemplate create table template + * @param catalogTable catalog table + * @return create table stmt + */ + public static String getCreateTableStatement( + String createTableTemplate, TablePath tablePath, CatalogTable catalogTable) { + + String template = createTableTemplate; + if (!createTableTemplate.trim().endsWith(";")) { + template += ";"; + } + TableSchema tableSchema = catalogTable.getTableSchema(); + + String primaryKey = ""; + if (tableSchema.getPrimaryKey() != null) { + List fields = Arrays.asList(catalogTable.getTableSchema().getFieldNames()); + List keys = tableSchema.getPrimaryKey().getColumnNames(); + keys.sort(Comparator.comparingInt(fields::indexOf)); + primaryKey = keys.stream().map(r -> "`" + r + "`").collect(Collectors.joining(",")); + } + String uniqueKey = ""; + if (!tableSchema.getConstraintKeys().isEmpty()) { + uniqueKey = + tableSchema.getConstraintKeys().stream() + .flatMap(c -> c.getColumnNames().stream()) + .map(r -> "`" + r.getColumnName() + "`") + .collect(Collectors.joining(",")); + } + + template = + template.replaceAll( + SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getReplacePlaceHolder(), + primaryKey); + template = + template.replaceAll( + SaveModePlaceHolder.ROWTYPE_UNIQUE_KEY.getReplacePlaceHolder(), uniqueKey); + + Map columnInTemplate = + CreateTableParser.getColumnList(template); + template = mergeColumnInTemplate(columnInTemplate, tableSchema, template); + + String rowTypeFields = + tableSchema.getColumns().stream() + .filter(column -> !columnInTemplate.containsKey(column.getName())) + .map( + x -> + MaxComputeCatalogUtil.columnToMaxComputeType( + x, MaxComputeTypeConverter.INSTANCE)) + .collect(Collectors.joining(",\n")); + + return template.replaceAll( + SaveModePlaceHolder.DATABASE.getReplacePlaceHolder(), + tablePath.getDatabaseName()) + .replaceAll( + SaveModePlaceHolder.TABLE.getReplacePlaceHolder(), tablePath.getTableName()) + .replaceAll( + SaveModePlaceHolder.ROWTYPE_FIELDS.getReplacePlaceHolder(), rowTypeFields); + } + + private static String mergeColumnInTemplate( + Map columnInTemplate, + TableSchema tableSchema, + String template) { + int offset = 0; + Map columnMap = + tableSchema.getColumns().stream() + .collect(Collectors.toMap(Column::getName, Function.identity())); + List columnInfosInSeq = + columnInTemplate.values().stream() + .sorted( + Comparator.comparingInt( + CreateTableParser.ColumnInfo::getStartIndex)) + .collect(Collectors.toList()); + for (CreateTableParser.ColumnInfo columnInfo : columnInfosInSeq) { + String col = columnInfo.getName(); + if (StringUtils.isEmpty(columnInfo.getInfo())) { + if (columnMap.containsKey(col)) { + Column column = columnMap.get(col); + String newCol = + columnToMaxComputeType(column, MaxComputeTypeConverter.INSTANCE); + String prefix = template.substring(0, columnInfo.getStartIndex() + offset); + String suffix = template.substring(offset + columnInfo.getEndIndex()); + if (prefix.endsWith("`")) { + prefix = prefix.substring(0, prefix.length() - 1); + offset--; + } + if (suffix.startsWith("`")) { + suffix = suffix.substring(1); + offset--; + } + template = prefix + newCol + suffix; + offset += newCol.length() - columnInfo.getName().length(); + } else { + throw new IllegalArgumentException("Can't find column " + col + " in table."); + } + } + } + return template; + } + + public static String columnToMaxComputeType( + Column column, TypeConverter> typeConverter) { + checkNotNull(column, "The column is required."); + return String.format( + "`%s` %s %s %s", + column.getName(), + typeConverter.reconvert(column).getColumnType(), + column.isNullable() ? "NULL" : "NOT NULL", + StringUtils.isEmpty(column.getComment()) + ? "" + : "COMMENT '" + column.getComment() + "'"); + } +} diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertor.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertor.java index 110222ae324..17886b87c35 100644 --- a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertor.java +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertor.java @@ -17,34 +17,36 @@ package org.apache.seatunnel.connectors.seatunnel.maxcompute.catalog; +import org.apache.seatunnel.api.table.catalog.Column; import org.apache.seatunnel.api.table.catalog.DataTypeConvertor; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.converter.BasicTypeDefine; import org.apache.seatunnel.api.table.type.ArrayType; import org.apache.seatunnel.api.table.type.BasicType; import org.apache.seatunnel.api.table.type.DecimalType; import org.apache.seatunnel.api.table.type.LocalTimeType; import org.apache.seatunnel.api.table.type.MapType; -import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.exception.CommonError; import org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig; +import org.apache.seatunnel.connectors.seatunnel.maxcompute.datatype.MaxComputeTypeConverter; + +import org.apache.commons.collections4.MapUtils; -import com.aliyun.odps.OdpsType; -import com.aliyun.odps.type.ArrayTypeInfo; -import com.aliyun.odps.type.DecimalTypeInfo; -import com.aliyun.odps.type.MapTypeInfo; -import com.aliyun.odps.type.StructTypeInfo; import com.aliyun.odps.type.TypeInfo; -import com.aliyun.odps.type.TypeInfoFactory; import com.google.auto.service.AutoService; -import java.util.ArrayList; -import java.util.List; import java.util.Map; +import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull; + @AutoService(DataTypeConvertor.class) public class MaxComputeDataTypeConvertor implements DataTypeConvertor { + public static final String PRECISION = "precision"; + public static final String SCALE = "scale"; + @Override public SeaTunnelDataType toSeaTunnelType(String field, String connectorDataType) { if (connectorDataType.startsWith("MAP")) { @@ -146,82 +148,17 @@ public SeaTunnelDataType toSeaTunnelType(String field, String connectorDataTy @Override public SeaTunnelDataType toSeaTunnelType( String field, TypeInfo connectorDataType, Map dataTypeProperties) { - switch (connectorDataType.getOdpsType()) { - case MAP: - MapTypeInfo mapTypeInfo = (MapTypeInfo) connectorDataType; - return new MapType( - toSeaTunnelType(field, mapTypeInfo.getKeyTypeInfo(), dataTypeProperties), - toSeaTunnelType(field, mapTypeInfo.getValueTypeInfo(), dataTypeProperties)); - case ARRAY: - ArrayTypeInfo arrayTypeInfo = (ArrayTypeInfo) connectorDataType; - switch (arrayTypeInfo.getElementTypeInfo().getOdpsType()) { - case BOOLEAN: - return ArrayType.BOOLEAN_ARRAY_TYPE; - case INT: - return ArrayType.INT_ARRAY_TYPE; - case BIGINT: - return ArrayType.LONG_ARRAY_TYPE; - case FLOAT: - return ArrayType.FLOAT_ARRAY_TYPE; - case DOUBLE: - return ArrayType.DOUBLE_ARRAY_TYPE; - case STRING: - return ArrayType.STRING_ARRAY_TYPE; - default: - throw CommonError.convertToSeaTunnelTypeError( - MaxcomputeConfig.PLUGIN_NAME, - connectorDataType.getTypeName(), - field); - } - case STRUCT: - StructTypeInfo structTypeInfo = (StructTypeInfo) connectorDataType; - List fields = structTypeInfo.getFieldTypeInfos(); - List fieldNames = new ArrayList<>(fields.size()); - List> fieldTypes = new ArrayList<>(fields.size()); - for (TypeInfo f : fields) { - fieldNames.add(f.getTypeName()); - fieldTypes.add(toSeaTunnelType(f.getTypeName(), f, dataTypeProperties)); - } - return new SeaTunnelRowType( - fieldNames.toArray(new String[0]), - fieldTypes.toArray(new SeaTunnelDataType[0])); - case TINYINT: - return BasicType.BYTE_TYPE; - case SMALLINT: - return BasicType.SHORT_TYPE; - case INT: - return BasicType.INT_TYPE; - case BIGINT: - return BasicType.LONG_TYPE; - case BINARY: - return PrimitiveByteArrayType.INSTANCE; - case FLOAT: - return BasicType.FLOAT_TYPE; - case DOUBLE: - return BasicType.DOUBLE_TYPE; - case DECIMAL: - DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) connectorDataType; - return new DecimalType(decimalTypeInfo.getPrecision(), decimalTypeInfo.getScale()); - case VARCHAR: - case CHAR: - case STRING: - return BasicType.STRING_TYPE; - case DATE: - return LocalTimeType.LOCAL_DATE_TYPE; - case DATETIME: - return LocalTimeType.LOCAL_TIME_TYPE; - case TIMESTAMP: - return LocalTimeType.LOCAL_DATE_TIME_TYPE; - case BOOLEAN: - return BasicType.BOOLEAN_TYPE; - case VOID: - return BasicType.VOID_TYPE; - case INTERVAL_DAY_TIME: - case INTERVAL_YEAR_MONTH: - default: - throw CommonError.convertToSeaTunnelTypeError( - MaxcomputeConfig.PLUGIN_NAME, connectorDataType.getTypeName(), field); - } + checkNotNull(connectorDataType, "seaTunnelDataType cannot be null"); + + BasicTypeDefine typeDefine = + BasicTypeDefine.builder() + .name(field) + .columnType(connectorDataType.getTypeName()) + .dataType(connectorDataType.getOdpsType().name()) + .nativeType(connectorDataType) + .build(); + + return MaxComputeTypeConverter.INSTANCE.convert(typeDefine).getDataType(); } @Override @@ -229,62 +166,19 @@ public TypeInfo toConnectorType( String field, SeaTunnelDataType seaTunnelDataType, Map dataTypeProperties) { - switch (seaTunnelDataType.getSqlType()) { - case MAP: - MapType mapType = (MapType) seaTunnelDataType; - return TypeInfoFactory.getMapTypeInfo( - toConnectorType(field, mapType.getKeyType(), dataTypeProperties), - toConnectorType(field, mapType.getValueType(), dataTypeProperties)); - case ARRAY: - ArrayType arrayType = (ArrayType) seaTunnelDataType; - return TypeInfoFactory.getArrayTypeInfo( - toConnectorType(field, arrayType.getElementType(), dataTypeProperties)); - case ROW: - SeaTunnelRowType rowType = (SeaTunnelRowType) seaTunnelDataType; - List fieldNames = new ArrayList<>(rowType.getTotalFields()); - List fieldTypes = new ArrayList<>(rowType.getTotalFields()); - for (int i = 0; i < rowType.getTotalFields(); i++) { - fieldNames.add(rowType.getFieldName(i)); - fieldTypes.add( - toConnectorType(field, rowType.getFieldType(i), dataTypeProperties)); - } - return TypeInfoFactory.getStructTypeInfo(fieldNames, fieldTypes); - case TINYINT: - return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.TINYINT); - case SMALLINT: - return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.SMALLINT); - case INT: - return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.INT); - case BIGINT: - return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.BIGINT); - case BYTES: - return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.BINARY); - case FLOAT: - return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.FLOAT); - case DOUBLE: - return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.DOUBLE); - case DECIMAL: - DecimalType decimalType = (DecimalType) seaTunnelDataType; - return TypeInfoFactory.getDecimalTypeInfo( - decimalType.getPrecision(), decimalType.getScale()); - case STRING: - return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.STRING); - case DATE: - return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.DATE); - case TIMESTAMP: - return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.TIMESTAMP); - case TIME: - return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.DATETIME); - case BOOLEAN: - return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.BOOLEAN); - case NULL: - return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.VOID); - default: - throw CommonError.convertToConnectorTypeError( - MaxcomputeConfig.PLUGIN_NAME, - seaTunnelDataType.getSqlType().toString(), - field); - } + checkNotNull(seaTunnelDataType, "seaTunnelDataType cannot be null"); + Long precision = MapUtils.getLong(dataTypeProperties, PRECISION); + Integer scale = MapUtils.getInteger(dataTypeProperties, SCALE); + Column column = + PhysicalColumn.builder() + .name(field) + .dataType(seaTunnelDataType) + .columnLength(precision) + .scale(scale) + .nullable(true) + .build(); + BasicTypeDefine typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column); + return typeDefine.getNativeType(); } @Override diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeConfig.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeConfig.java index 84bbccddb79..d3465bd3dba 100644 --- a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeConfig.java +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeConfig.java @@ -19,6 +19,9 @@ import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.Options; +import org.apache.seatunnel.api.sink.DataSaveMode; +import org.apache.seatunnel.api.sink.SaveModePlaceHolder; +import org.apache.seatunnel.api.sink.SchemaSaveMode; import java.io.Serializable; @@ -69,4 +72,33 @@ public class MaxcomputeConfig implements Serializable { .booleanType() .defaultValue(false) .withDescription("Whether to overwrite the table or partition"); + + public static final Option SCHEMA_SAVE_MODE = + Options.key("schema_save_mode") + .enumType(SchemaSaveMode.class) + .defaultValue(SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST) + .withDescription("schema_save_mode"); + + public static final Option DATA_SAVE_MODE = + Options.key("data_save_mode") + .enumType(DataSaveMode.class) + .defaultValue(DataSaveMode.APPEND_DATA) + .withDescription("data_save_mode"); + + public static final Option CUSTOM_SQL = + Options.key("custom_sql").stringType().noDefaultValue().withDescription("custom_sql"); + + // create table + public static final Option SAVE_MODE_CREATE_TEMPLATE = + Options.key("save_mode_create_template") + .stringType() + .defaultValue( + "CREATE TABLE IF NOT EXISTS `" + + SaveModePlaceHolder.TABLE.getPlaceHolder() + + "` (\n" + + SaveModePlaceHolder.ROWTYPE_FIELDS.getPlaceHolder() + + "\n" + + ");") + .withDescription( + "Create table statement template, used to create MaxCompute table"); } diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/datatype/MaxComputeTypeConverter.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/datatype/MaxComputeTypeConverter.java new file mode 100644 index 00000000000..0c81dacc2a3 --- /dev/null +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/datatype/MaxComputeTypeConverter.java @@ -0,0 +1,542 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.maxcompute.datatype; + +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.converter.BasicTypeDefine; +import org.apache.seatunnel.api.table.converter.TypeConverter; +import org.apache.seatunnel.api.table.type.ArrayType; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.DecimalType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.api.table.type.MapType; +import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.exception.CommonError; +import org.apache.seatunnel.connectors.seatunnel.common.source.TypeDefineUtils; + +import com.aliyun.odps.OdpsType; +import com.aliyun.odps.type.AbstractCharTypeInfo; +import com.aliyun.odps.type.ArrayTypeInfo; +import com.aliyun.odps.type.DecimalTypeInfo; +import com.aliyun.odps.type.MapTypeInfo; +import com.aliyun.odps.type.StructTypeInfo; +import com.aliyun.odps.type.TypeInfo; +import com.aliyun.odps.type.TypeInfoFactory; +import com.google.auto.service.AutoService; +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PLUGIN_NAME; + +/** Refer https://help.aliyun.com/zh/maxcompute/user-guide/maxcompute-v2-0-data-type-edition */ +@Slf4j +@AutoService(TypeConverter.class) +public class MaxComputeTypeConverter implements TypeConverter> { + + // ============================data types===================== + static final String BOOLEAN = "BOOLEAN"; + + // -------------------------number---------------------------- + static final String TINYINT = "TINYINT"; + static final String SMALLINT = "SMALLINT"; + static final String INT = "INT"; + static final String BIGINT = "BIGINT"; + static final String DECIMAL = "DECIMAL"; + static final String FLOAT = "FLOAT"; + static final String DOUBLE = "DOUBLE"; + + // -------------------------string---------------------------- + public static final String CHAR = "CHAR"; + public static final String VARCHAR = "VARCHAR"; + public static final String STRING = "STRING"; + + // -------------------------complex---------------------------- + public static final String JSON = "JSON"; + public static final String ARRAY = "ARRAY"; + public static final String MAP = "MAP"; + public static final String STRUCT = "STRUCT"; + + // ------------------------------time------------------------- + public static final String DATE = "DATE"; + public static final String DATETIME = "DATETIME"; + public static final String TIMESTAMP = "TIMESTAMP"; + public static final String TIMESTAMP_NTZ = "TIMESTAMP_NTZ"; + + // ------------------------------blob------------------------- + static final String BINARY = "BINARY"; + + // ------------------------------other------------------------- + static final String INTERVAL = "INTERVAL"; + + public static final int DEFAULT_PRECISION = 38; + public static final int MAX_PRECISION = 38; + public static final int DEFAULT_SCALE = 18; + public static final int MAX_SCALE = 18; + public static final int MAX_TIMESTAMP_SCALE = 9; + + // 8MB + public static final long MAX_VARBINARY_LENGTH = (long) Math.pow(2, 23); + + public static final MaxComputeTypeConverter INSTANCE = new MaxComputeTypeConverter(); + + public MaxComputeTypeConverter() {} + + @Override + public String identifier() { + return PLUGIN_NAME; + } + + @Override + public Column convert(BasicTypeDefine typeDefine) { + PhysicalColumn.PhysicalColumnBuilder builder = + PhysicalColumn.builder() + .name(typeDefine.getName()) + .sourceType(typeDefine.getColumnType()) + .nullable(typeDefine.isNullable()) + .defaultValue(typeDefine.getDefaultValue()) + .comment(typeDefine.getComment()); + TypeInfo nativeType = typeDefine.getNativeType(); + if (nativeType instanceof ArrayTypeInfo) { + typeDefine.setColumnType( + ((ArrayTypeInfo) nativeType).getElementTypeInfo().getTypeName()); + typeDefine.setDataType( + ((ArrayTypeInfo) nativeType).getElementTypeInfo().getOdpsType().name()); + typeDefine.setNativeType(((ArrayTypeInfo) nativeType).getElementTypeInfo()); + Column arrayColumn = convert(typeDefine); + SeaTunnelDataType newType; + switch (arrayColumn.getDataType().getSqlType()) { + case STRING: + newType = ArrayType.STRING_ARRAY_TYPE; + break; + case BOOLEAN: + newType = ArrayType.BOOLEAN_ARRAY_TYPE; + break; + case TINYINT: + newType = ArrayType.BYTE_ARRAY_TYPE; + break; + case SMALLINT: + newType = ArrayType.SHORT_ARRAY_TYPE; + break; + case INT: + newType = ArrayType.INT_ARRAY_TYPE; + break; + case BIGINT: + newType = ArrayType.LONG_ARRAY_TYPE; + break; + case FLOAT: + newType = ArrayType.FLOAT_ARRAY_TYPE; + break; + case DOUBLE: + newType = ArrayType.DOUBLE_ARRAY_TYPE; + break; + case DATE: + newType = ArrayType.LOCAL_DATE_ARRAY_TYPE; + break; + case TIME: + newType = ArrayType.LOCAL_TIME_ARRAY_TYPE; + break; + case TIMESTAMP: + newType = ArrayType.LOCAL_DATE_TIME_ARRAY_TYPE; + break; + default: + throw CommonError.unsupportedDataType( + PLUGIN_NAME, + arrayColumn.getDataType().getSqlType().toString(), + typeDefine.getName()); + } + return new PhysicalColumn( + arrayColumn.getName(), + newType, + arrayColumn.getColumnLength(), + arrayColumn.getScale(), + arrayColumn.isNullable(), + arrayColumn.getDefaultValue(), + arrayColumn.getComment(), + "ARRAY<" + arrayColumn.getSourceType() + ">", + arrayColumn.getOptions()); + } + if (nativeType instanceof StructTypeInfo) { + List names = ((StructTypeInfo) nativeType).getFieldNames(); + List> types = new ArrayList<>(); + for (TypeInfo typeInfo : ((StructTypeInfo) nativeType).getFieldTypeInfos()) { + BasicTypeDefine fieldDefine = new BasicTypeDefine<>(); + fieldDefine.setName(names.get(types.size())); + fieldDefine.setColumnType(typeInfo.getTypeName()); + fieldDefine.setDataType(typeInfo.getOdpsType().name()); + fieldDefine.setNativeType(typeInfo); + types.add(convert(fieldDefine).getDataType()); + } + SeaTunnelRowType rowType = + new SeaTunnelRowType( + names.toArray(new String[0]), types.toArray(new SeaTunnelDataType[0])); + return new PhysicalColumn( + typeDefine.getName(), + rowType, + typeDefine.getLength(), + typeDefine.getScale(), + typeDefine.isNullable(), + typeDefine.getDefaultValue(), + typeDefine.getComment(), + typeDefine.getNativeType().getTypeName(), + new HashMap<>()); + } + + if (nativeType instanceof MapTypeInfo) { + BasicTypeDefine keyDefine = new BasicTypeDefine<>(); + keyDefine.setName("key"); + keyDefine.setColumnType(((MapTypeInfo) nativeType).getKeyTypeInfo().getTypeName()); + keyDefine.setDataType(((MapTypeInfo) nativeType).getKeyTypeInfo().getOdpsType().name()); + keyDefine.setNativeType(((MapTypeInfo) nativeType).getKeyTypeInfo()); + Column keyColumn = convert(keyDefine); + BasicTypeDefine valueDefine = new BasicTypeDefine<>(); + valueDefine.setName("value"); + valueDefine.setColumnType(((MapTypeInfo) nativeType).getValueTypeInfo().getTypeName()); + valueDefine.setDataType( + ((MapTypeInfo) nativeType).getValueTypeInfo().getOdpsType().name()); + valueDefine.setNativeType(((MapTypeInfo) nativeType).getValueTypeInfo()); + Column valueColumn = convert(valueDefine); + MapType mapType = new MapType(keyColumn.getDataType(), valueColumn.getDataType()); + return new PhysicalColumn( + typeDefine.getName(), + mapType, + typeDefine.getLength(), + typeDefine.getScale(), + typeDefine.isNullable(), + typeDefine.getDefaultValue(), + typeDefine.getComment(), + typeDefine.getNativeType().getTypeName(), + new HashMap<>()); + } + + if (typeDefine.getNativeType() instanceof DecimalTypeInfo) { + DecimalType decimalType; + if (((DecimalTypeInfo) typeDefine.getNativeType()).getPrecision() > DEFAULT_PRECISION) { + log.warn("{} will probably cause value overflow.", DECIMAL); + decimalType = new DecimalType(DEFAULT_PRECISION, DEFAULT_SCALE); + } else { + decimalType = + new DecimalType( + ((DecimalTypeInfo) typeDefine.getNativeType()).getPrecision(), + ((DecimalTypeInfo) typeDefine.getNativeType()).getScale()); + } + builder.dataType(decimalType); + builder.columnLength((long) decimalType.getPrecision()); + builder.scale(decimalType.getScale()); + } else if (typeDefine.getNativeType() instanceof AbstractCharTypeInfo) { + // CHAR(n) or VARCHAR(n) + builder.columnLength( + TypeDefineUtils.charTo4ByteLength( + (long) + ((AbstractCharTypeInfo) typeDefine.getNativeType()) + .getLength())); + builder.dataType(BasicType.STRING_TYPE); + } else { + String dataType = typeDefine.getDataType().toUpperCase(); + switch (dataType) { + case BOOLEAN: + builder.dataType(BasicType.BOOLEAN_TYPE); + break; + case TINYINT: + builder.dataType(BasicType.BYTE_TYPE); + break; + case SMALLINT: + builder.dataType(BasicType.SHORT_TYPE); + break; + case INT: + builder.dataType(BasicType.INT_TYPE); + break; + case BIGINT: + builder.dataType(BasicType.LONG_TYPE); + break; + case FLOAT: + builder.dataType(BasicType.FLOAT_TYPE); + break; + case DOUBLE: + builder.dataType(BasicType.DOUBLE_TYPE); + break; + case STRING: + if (typeDefine.getLength() == null || typeDefine.getLength() <= 0) { + builder.columnLength(MAX_VARBINARY_LENGTH); + } else { + builder.columnLength(typeDefine.getLength()); + } + builder.dataType(BasicType.STRING_TYPE); + break; + case JSON: + builder.dataType(BasicType.STRING_TYPE); + break; + case BINARY: + if (typeDefine.getLength() == null || typeDefine.getLength() <= 0) { + builder.columnLength(MAX_VARBINARY_LENGTH); + } else { + builder.columnLength(typeDefine.getLength()); + } + builder.dataType(PrimitiveByteArrayType.INSTANCE); + break; + case DATE: + builder.dataType(LocalTimeType.LOCAL_DATE_TYPE); + break; + case DATETIME: + case TIMESTAMP: + case TIMESTAMP_NTZ: + builder.dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE); + builder.scale(typeDefine.getScale()); + break; + case INTERVAL: + default: + throw CommonError.convertToSeaTunnelTypeError( + PLUGIN_NAME, dataType, typeDefine.getName()); + } + } + return builder.build(); + } + + @Override + public BasicTypeDefine reconvert(Column column) { + BasicTypeDefine.BasicTypeDefineBuilder builder = + BasicTypeDefine.builder() + .name(column.getName()) + .nullable(column.isNullable()) + .comment(column.getComment()) + .defaultValue(column.getDefaultValue()); + + switch (column.getDataType().getSqlType()) { + case NULL: + builder.nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.STRING)); + builder.columnType(STRING); + builder.dataType(STRING); + break; + case BOOLEAN: + builder.nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.BOOLEAN)); + builder.columnType(BOOLEAN); + builder.dataType(BOOLEAN); + builder.length(1L); + break; + case TINYINT: + builder.nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.TINYINT)); + builder.columnType(TINYINT); + builder.dataType(TINYINT); + break; + case SMALLINT: + builder.nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.SMALLINT)); + builder.columnType(SMALLINT); + builder.dataType(SMALLINT); + break; + case INT: + builder.nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.INT)); + builder.columnType(INT); + builder.dataType(INT); + break; + case BIGINT: + builder.nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.BIGINT)); + builder.columnType(BIGINT); + builder.dataType(BIGINT); + break; + case FLOAT: + builder.nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.FLOAT)); + builder.columnType(FLOAT); + builder.dataType(FLOAT); + break; + case DOUBLE: + builder.nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.DOUBLE)); + builder.columnType(DOUBLE); + builder.dataType(DOUBLE); + break; + case DECIMAL: + DecimalType decimalType = (DecimalType) column.getDataType(); + long precision = decimalType.getPrecision(); + int scale = decimalType.getScale(); + if (precision <= 0) { + precision = DEFAULT_PRECISION; + scale = DEFAULT_SCALE; + log.warn( + "The decimal column {} type decimal({},{}) is out of range, " + + "which is precision less than 0, " + + "it will be converted to decimal({},{})", + column.getName(), + decimalType.getPrecision(), + decimalType.getScale(), + precision, + scale); + } else if (precision > MAX_PRECISION) { + scale = (int) Math.max(0, scale - (precision - MAX_PRECISION)); + precision = MAX_PRECISION; + log.warn( + "The decimal column {} type decimal({},{}) is out of range, " + + "which exceeds the maximum precision of {}, " + + "it will be converted to decimal({},{})", + column.getName(), + decimalType.getPrecision(), + decimalType.getScale(), + MAX_PRECISION, + precision, + scale); + } + if (scale < 0) { + scale = 0; + log.warn( + "The decimal column {} type decimal({},{}) is out of range, " + + "which is scale less than 0, " + + "it will be converted to decimal({},{})", + column.getName(), + decimalType.getPrecision(), + decimalType.getScale(), + precision, + scale); + } else if (scale > MAX_SCALE) { + scale = MAX_SCALE; + log.warn( + "The decimal column {} type decimal({},{}) is out of range, " + + "which exceeds the maximum scale of {}, " + + "it will be converted to decimal({},{})", + column.getName(), + decimalType.getPrecision(), + decimalType.getScale(), + MAX_SCALE, + precision, + scale); + } + + String decimalTypeStr = String.format("%s(%s,%s)", DECIMAL, precision, scale); + builder.nativeType(TypeInfoFactory.getDecimalTypeInfo((int) precision, scale)); + builder.columnType(decimalTypeStr); + builder.dataType(DECIMAL); + builder.precision(precision); + builder.scale(scale); + break; + case BYTES: + builder.nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.BINARY)); + builder.columnType(BINARY); + builder.dataType(BINARY); + if (column.getColumnLength() == null || column.getColumnLength() <= 0) { + builder.length(MAX_VARBINARY_LENGTH); + } else { + builder.length(column.getColumnLength()); + } + break; + case STRING: + if (column.getColumnLength() == null || column.getColumnLength() <= 0) { + builder.nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.STRING)); + builder.columnType(STRING); + builder.dataType(STRING); + } else if (column.getColumnLength() <= 255) { + builder.nativeType( + TypeInfoFactory.getCharTypeInfo(column.getColumnLength().intValue())); + builder.columnType(String.format("%s(%s)", CHAR, column.getColumnLength())); + builder.dataType(CHAR); + builder.length(column.getColumnLength()); + } else if (column.getColumnLength() <= 65535) { + builder.nativeType( + TypeInfoFactory.getVarcharTypeInfo( + column.getColumnLength().intValue())); + builder.columnType(String.format("%s(%s)", VARCHAR, column.getColumnLength())); + builder.dataType(VARCHAR); + builder.length(column.getColumnLength()); + } else { + builder.nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.STRING)); + builder.columnType(STRING); + builder.dataType(STRING); + builder.length(column.getColumnLength()); + } + break; + case DATE: + builder.nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.DATE)); + builder.columnType(DATE); + builder.dataType(DATE); + break; + case TIMESTAMP: + if (column.getScale() == null || column.getScale() <= 3) { + builder.nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.DATETIME)); + builder.dataType(DATETIME); + builder.columnType(DATETIME); + } else { + int timestampScale = column.getScale(); + if (timestampScale > MAX_TIMESTAMP_SCALE) { + timestampScale = MAX_TIMESTAMP_SCALE; + log.warn( + "The timestamp column {} type timestamp({}) is out of range, " + + "which exceeds the maximum scale of {}, " + + "it will be converted to timestamp({})", + column.getName(), + column.getScale(), + MAX_TIMESTAMP_SCALE, + timestampScale); + } + builder.nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.TIMESTAMP)); + builder.dataType(TIMESTAMP); + builder.columnType(TIMESTAMP); + builder.scale(timestampScale); + } + break; + case MAP: + MapType mapType = (MapType) column.getDataType(); + SeaTunnelDataType keyType = mapType.getKeyType(); + SeaTunnelDataType valueType = mapType.getValueType(); + BasicTypeDefine keyDefine = + reconvert( + new PhysicalColumn( + "key", keyType, null, null, true, null, null, null, null)); + BasicTypeDefine valueDefine = + reconvert( + new PhysicalColumn( + "value", valueType, null, null, true, null, null, null, + null)); + builder.nativeType( + TypeInfoFactory.getMapTypeInfo( + keyDefine.getNativeType(), valueDefine.getNativeType())); + builder.columnType( + String.format( + "MAP<%s,%s>", + keyDefine.getColumnType(), valueDefine.getColumnType())); + builder.dataType(MAP); + break; + case ARRAY: + ArrayType arrayType = (ArrayType) column.getDataType(); + SeaTunnelDataType elementType = arrayType.getElementType(); + BasicTypeDefine elementDefine = + reconvert( + new PhysicalColumn( + "element", + elementType, + null, + null, + true, + null, + null, + null, + null)); + + builder.nativeType(TypeInfoFactory.getArrayTypeInfo(elementDefine.getNativeType())); + builder.columnType(String.format("ARRAY<%s>", elementDefine.getColumnType())); + builder.dataType(ARRAY); + break; + case TIME: + default: + throw CommonError.convertToConnectorTypeError( + PLUGIN_NAME, column.getDataType().getSqlType().name(), column.getName()); + } + + return builder.build(); + } +} diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java index 91e8c12dca3..ddfe1de08b3 100644 --- a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java @@ -17,32 +17,42 @@ package org.apache.seatunnel.connectors.seatunnel.maxcompute.sink; -import org.apache.seatunnel.shade.com.typesafe.config.Config; - -import org.apache.seatunnel.api.common.PrepareFailException; -import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.sink.DataSaveMode; +import org.apache.seatunnel.api.sink.DefaultSaveModeHandler; +import org.apache.seatunnel.api.sink.SaveModeHandler; import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.sink.SupportMultiTableSink; +import org.apache.seatunnel.api.sink.SupportSaveMode; import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.factory.CatalogFactory; import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; -import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; -import org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeUtil; +import org.apache.seatunnel.connectors.seatunnel.maxcompute.catalog.MaxComputeCatalog; +import org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig; +import org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.auto.service.AutoService; - import java.util.Optional; +import static org.apache.seatunnel.api.table.factory.FactoryUtil.discoverFactory; +import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.OVERWRITE; import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PLUGIN_NAME; -@AutoService(SeaTunnelSink.class) -public class MaxcomputeSink extends AbstractSimpleSink { +public class MaxcomputeSink extends AbstractSimpleSink + implements SupportSaveMode, SupportMultiTableSink { private static final Logger LOG = LoggerFactory.getLogger(MaxcomputeSink.class); - private Config pluginConfig; - private SeaTunnelRowType typeInfo; + private final ReadonlyConfig readonlyConfig; + private final CatalogTable catalogTable; + + public MaxcomputeSink(ReadonlyConfig readonlyConfig, CatalogTable catalogTable) { + this.readonlyConfig = readonlyConfig; + this.catalogTable = catalogTable; + } @Override public String getPluginName() { @@ -50,19 +60,46 @@ public String getPluginName() { } @Override - public void prepare(Config pluginConfig) throws PrepareFailException { - this.pluginConfig = pluginConfig; - MaxcomputeUtil.initTableOrPartition(pluginConfig); + public MaxcomputeWriter createWriter(SinkWriter.Context context) { + return new MaxcomputeWriter(this.readonlyConfig, this.catalogTable.getSeaTunnelRowType()); } @Override - public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) { - this.typeInfo = seaTunnelRowType; - } + public Optional getSaveModeHandler() { + CatalogFactory catalogFactory = + discoverFactory( + Thread.currentThread().getContextClassLoader(), + CatalogFactory.class, + "MaxCompute"); + if (catalogFactory == null) { + throw new MaxcomputeConnectorException( + SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, + String.format( + "PluginName: %s, PluginType: %s, Message: %s", + getPluginName(), + PluginType.SINK, + "Cannot find MaxCompute catalog factory")); + } + MaxComputeCatalog catalog = + (MaxComputeCatalog) + catalogFactory.createCatalog( + catalogFactory.factoryIdentifier(), readonlyConfig); - @Override - public AbstractSinkWriter createWriter(SinkWriter.Context context) { - return new MaxcomputeWriter(this.pluginConfig, this.typeInfo); + DataSaveMode dataSaveMode = readonlyConfig.get(MaxcomputeConfig.DATA_SAVE_MODE); + if (readonlyConfig.get(OVERWRITE)) { + // compatible with old version + LOG.warn( + "The configuration of 'overwrite' is deprecated, please use 'data_save_mode' instead."); + dataSaveMode = DataSaveMode.DROP_DATA; + } + + return Optional.of( + new DefaultSaveModeHandler( + readonlyConfig.get(MaxcomputeConfig.SCHEMA_SAVE_MODE), + dataSaveMode, + catalog, + catalogTable, + readonlyConfig.get(MaxcomputeConfig.CUSTOM_SQL))); } @Override diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSinkFactory.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSinkFactory.java index faa2f629102..16dededbfe5 100644 --- a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSinkFactory.java +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSinkFactory.java @@ -18,18 +18,27 @@ package org.apache.seatunnel.connectors.seatunnel.maxcompute.sink; import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.sink.SinkCommonOptions; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.connector.TableSink; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableSinkFactory; +import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext; import com.google.auto.service.AutoService; import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_ID; import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_KEY; +import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.CUSTOM_SQL; +import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.DATA_SAVE_MODE; import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ENDPOINT; import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.OVERWRITE; import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC; import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PLUGIN_NAME; import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PROJECT; +import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.SAVE_MODE_CREATE_TEMPLATE; +import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.SCHEMA_SAVE_MODE; import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.TABLE_NAME; @AutoService(Factory.class) @@ -43,7 +52,27 @@ public String factoryIdentifier() { public OptionRule optionRule() { return OptionRule.builder() .required(ACCESS_ID, ACCESS_KEY, ENDPOINT, PROJECT, TABLE_NAME) - .optional(PARTITION_SPEC, OVERWRITE) + .optional( + PARTITION_SPEC, + OVERWRITE, + SCHEMA_SAVE_MODE, + DATA_SAVE_MODE, + SAVE_MODE_CREATE_TEMPLATE, + CUSTOM_SQL, + SinkCommonOptions.MULTI_TABLE_SINK_REPLICA) .build(); } + + @Override + public TableSink createSink(TableSinkFactoryContext context) { + return () -> + new MaxcomputeSink( + context.getOptions(), + CatalogTable.of( + TableIdentifier.of( + context.getCatalogTable().getCatalogName(), + context.getOptions().get(PROJECT), + context.getOptions().get(TABLE_NAME)), + context.getCatalogTable())); + } } diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java index 51492ae5912..f72a3124b0b 100644 --- a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java @@ -17,8 +17,8 @@ package org.apache.seatunnel.connectors.seatunnel.maxcompute.sink; -import org.apache.seatunnel.shade.com.typesafe.config.Config; - +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; @@ -42,32 +42,31 @@ import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.TABLE_NAME; @Slf4j -public class MaxcomputeWriter extends AbstractSinkWriter { +public class MaxcomputeWriter extends AbstractSinkWriter + implements SupportMultiTableSinkWriter { private RecordWriter recordWriter; private final TableTunnel.UploadSession session; private final TableSchema tableSchema; private static final Long BLOCK_0 = 0L; - private SeaTunnelRowType rowType; + private final SeaTunnelRowType rowType; - public MaxcomputeWriter(Config pluginConfig, SeaTunnelRowType rowType) { + public MaxcomputeWriter(ReadonlyConfig readonlyConfig, SeaTunnelRowType rowType) { try { this.rowType = rowType; - Table table = MaxcomputeUtil.getTable(pluginConfig); + Table table = MaxcomputeUtil.getTable(readonlyConfig); this.tableSchema = table.getSchema(); - TableTunnel tunnel = MaxcomputeUtil.getTableTunnel(pluginConfig); - if (pluginConfig.hasPath(PARTITION_SPEC.key())) { - PartitionSpec partitionSpec = - new PartitionSpec(pluginConfig.getString(PARTITION_SPEC.key())); + TableTunnel tunnel = MaxcomputeUtil.getTableTunnel(readonlyConfig); + if (readonlyConfig.getOptional(PARTITION_SPEC).isPresent()) { + PartitionSpec partitionSpec = new PartitionSpec(readonlyConfig.get(PARTITION_SPEC)); session = tunnel.createUploadSession( - pluginConfig.getString(PROJECT.key()), - pluginConfig.getString(TABLE_NAME.key()), + readonlyConfig.get(PROJECT), + readonlyConfig.get(TABLE_NAME), partitionSpec); } else { session = tunnel.createUploadSession( - pluginConfig.getString(PROJECT.key()), - pluginConfig.getString(TABLE_NAME.key())); + readonlyConfig.get(PROJECT), readonlyConfig.get(TABLE_NAME)); } this.recordWriter = session.openRecordWriter(BLOCK_0); log.info("open record writer success"); diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceReader.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceReader.java index 71a5683a42d..0c38da1d730 100644 --- a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceReader.java +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceReader.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.source.Boundedness; import org.apache.seatunnel.api.source.Collector; import org.apache.seatunnel.api.source.SourceReader; @@ -67,7 +68,8 @@ public void pollNext(Collector output) throws Exception { source -> { try { TableTunnel.DownloadSession session = - MaxcomputeUtil.getDownloadSession(pluginConfig); + MaxcomputeUtil.getDownloadSession( + ReadonlyConfig.fromConfig(pluginConfig)); TunnelRecordReader recordReader = session.openRecordReader(source.getSplitId(), source.getRowNum()); log.info("open record reader success"); diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceSplitEnumerator.java index 3aa2e5f21a1..cde763add21 100644 --- a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceSplitEnumerator.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.source.SourceSplitEnumerator; import org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeUtil; @@ -99,7 +100,8 @@ public void notifyCheckpointComplete(long checkpointId) {} public void handleSplitRequest(int subtaskId) {} private void discoverySplits() throws TunnelException { - TableTunnel.DownloadSession session = MaxcomputeUtil.getDownloadSession(this.pluginConfig); + TableTunnel.DownloadSession session = + MaxcomputeUtil.getDownloadSession(ReadonlyConfig.fromConfig(this.pluginConfig)); long recordCount = session.getRecordCount(); int numReaders = enumeratorContext.currentParallelism(); int splitRowNum = (int) Math.ceil((double) recordCount / numReaders); diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/CreateTableParser.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/CreateTableParser.java new file mode 100644 index 00000000000..41a6f70f2c8 --- /dev/null +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/CreateTableParser.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.maxcompute.util; + +import lombok.Getter; + +import java.util.HashMap; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class CreateTableParser { + + private static final Pattern COLUMN_PATTERN = Pattern.compile("`?(\\w+)`?\\s*([\\w|\\W]*)"); + + public static Map getColumnList(String createTableSql) { + Map columns = new HashMap<>(); + StringBuilder columnBuilder = new StringBuilder(); + int startIndex = createTableSql.indexOf("("); + createTableSql = createTableSql.substring(startIndex + 1); + + boolean insideParentheses = false; + for (int i = 0; i < createTableSql.length(); i++) { + char c = createTableSql.charAt(i); + if (c == '(') { + insideParentheses = true; + columnBuilder.append(c); + } else if ((c == ',' || c == ')') && !insideParentheses) { + parseColumn(columnBuilder.toString(), columns, startIndex + i + 1); + columnBuilder.setLength(0); + if (c == ')') { + break; + } + } else if (c == ')') { + insideParentheses = false; + columnBuilder.append(c); + } else { + columnBuilder.append(c); + } + } + return columns; + } + + private static void parseColumn( + String columnString, Map columnList, int suffixIndex) { + Matcher matcher = COLUMN_PATTERN.matcher(columnString.trim()); + if (matcher.matches()) { + String columnName = matcher.group(1); + String otherInfo = matcher.group(2).trim(); + StringBuilder columnBuilder = + new StringBuilder(columnName).append(" ").append(otherInfo); + if (columnBuilder.toString().toUpperCase().contains("PRIMARY KEY") + || columnBuilder.toString().toUpperCase().contains("CREATE TABLE")) { + return; + } + int endIndex = + suffixIndex + - columnString + .substring( + columnString.indexOf(columnName) + columnName.length()) + .length(); + int startIndex = + suffixIndex - columnString.substring(columnString.indexOf(columnName)).length(); + columnList.put(columnName, new ColumnInfo(columnName, otherInfo, startIndex, endIndex)); + } + } + + @Getter + public static final class ColumnInfo { + + public ColumnInfo(String name, String info, int startIndex, int endIndex) { + this.name = name; + this.info = info; + this.startIndex = startIndex; + this.endIndex = endIndex; + } + + String name; + String info; + int startIndex; + int endIndex; + } +} diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java index 2a3eda909aa..940a2fda120 100644 --- a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.table.type.ArrayType; import org.apache.seatunnel.api.table.type.MapType; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; @@ -89,7 +90,7 @@ public static Record getMaxcomputeRowData( } public static SeaTunnelRowType getSeaTunnelRowType(Config pluginConfig) { - Table table = MaxcomputeUtil.getTable(pluginConfig); + Table table = MaxcomputeUtil.getTable(ReadonlyConfig.fromConfig(pluginConfig)); TableSchema tableSchema = table.getSchema(); ArrayList> seaTunnelDataTypes = new ArrayList<>(); ArrayList fieldNames = new ArrayList<>(); @@ -251,16 +252,17 @@ private static Object resolveObject2Maxcompute(Object field, TypeInfo typeInfo) case DOUBLE: case BIGINT: case BOOLEAN: + case DECIMAL: + case TIMESTAMP_NTZ: return field; case BINARY: return new Binary((byte[]) field); - case DECIMAL: - return null; case VARCHAR: return new Varchar((String) field); case CHAR: return new Char((String) field); case STRING: + case JSON: if (field instanceof byte[]) { return new String((byte[]) field); } diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeUtil.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeUtil.java index 8d7c7de17b6..59804cc3afc 100644 --- a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeUtil.java +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeUtil.java @@ -17,8 +17,7 @@ package org.apache.seatunnel.connectors.seatunnel.maxcompute.util; -import org.apache.seatunnel.shade.com.typesafe.config.Config; - +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; import org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException; @@ -33,53 +32,46 @@ import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_ID; import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_KEY; import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ENDPOINT; -import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.OVERWRITE; import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC; import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PROJECT; import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.TABLE_NAME; @Slf4j public class MaxcomputeUtil { - public static Table getTable(Config pluginConfig) { - Odps odps = getOdps(pluginConfig); - Table table = odps.tables().get(pluginConfig.getString(TABLE_NAME.key())); - return table; + public static Table getTable(ReadonlyConfig readonlyConfig) { + Odps odps = getOdps(readonlyConfig); + return odps.tables().get(readonlyConfig.get(TABLE_NAME)); } - public static TableTunnel getTableTunnel(Config pluginConfig) { - Odps odps = getOdps(pluginConfig); - TableTunnel tunnel = new TableTunnel(odps); - return tunnel; + public static TableTunnel getTableTunnel(ReadonlyConfig readonlyConfig) { + Odps odps = getOdps(readonlyConfig); + return new TableTunnel(odps); } - public static Odps getOdps(Config pluginConfig) { + public static Odps getOdps(ReadonlyConfig readonlyConfig) { Account account = - new AliyunAccount( - pluginConfig.getString(ACCESS_ID.key()), - pluginConfig.getString(ACCESS_KEY.key())); + new AliyunAccount(readonlyConfig.get(ACCESS_ID), readonlyConfig.get(ACCESS_KEY)); Odps odps = new Odps(account); - odps.setEndpoint(pluginConfig.getString(ENDPOINT.key())); - odps.setDefaultProject(pluginConfig.getString(PROJECT.key())); + odps.setEndpoint(readonlyConfig.get(ENDPOINT)); + odps.setDefaultProject(readonlyConfig.get(PROJECT)); return odps; } - public static TableTunnel.DownloadSession getDownloadSession(Config pluginConfig) { - TableTunnel tunnel = getTableTunnel(pluginConfig); + public static TableTunnel.DownloadSession getDownloadSession(ReadonlyConfig readonlyConfig) { + TableTunnel tunnel = getTableTunnel(readonlyConfig); TableTunnel.DownloadSession session; try { - if (pluginConfig.hasPath(PARTITION_SPEC.key())) { - PartitionSpec partitionSpec = - new PartitionSpec(pluginConfig.getString(PARTITION_SPEC.key())); + if (readonlyConfig.getOptional(PARTITION_SPEC).isPresent()) { + PartitionSpec partitionSpec = new PartitionSpec(readonlyConfig.get(PARTITION_SPEC)); session = tunnel.createDownloadSession( - pluginConfig.getString(PROJECT.key()), - pluginConfig.getString(TABLE_NAME.key()), + readonlyConfig.get(PROJECT), + readonlyConfig.get(TABLE_NAME), partitionSpec); } else { session = tunnel.createDownloadSession( - pluginConfig.getString(PROJECT.key()), - pluginConfig.getString(TABLE_NAME.key())); + readonlyConfig.get(PROJECT), readonlyConfig.get(TABLE_NAME)); } } catch (Exception e) { throw new MaxcomputeConnectorException( @@ -88,36 +80,18 @@ public static TableTunnel.DownloadSession getDownloadSession(Config pluginConfig return session; } - public static void initTableOrPartition(Config pluginConfig) { - Boolean overwrite = OVERWRITE.defaultValue(); - if (pluginConfig.hasPath(OVERWRITE.key())) { - overwrite = pluginConfig.getBoolean(OVERWRITE.key()); - } + public static Table parseTable(Odps odps, String projectName, String tableName) { try { - Table table = MaxcomputeUtil.getTable(pluginConfig); - if (pluginConfig.hasPath(PARTITION_SPEC.key())) { - PartitionSpec partitionSpec = - new PartitionSpec(pluginConfig.getString(PARTITION_SPEC.key())); - if (overwrite) { - try { - table.deletePartition(partitionSpec, true); - } catch (NullPointerException e) { - log.debug("NullPointerException when delete table partition"); - } - } - table.createPartition(partitionSpec, true); - } else { - if (overwrite) { - try { - table.truncate(); - } catch (NullPointerException e) { - log.debug("NullPointerException when truncate table"); - } - } - } - } catch (Exception e) { + Table table = odps.tables().get(projectName, tableName); + table.reload(); + return table; + } catch (Exception ex) { throw new MaxcomputeConnectorException( - CommonErrorCodeDeprecated.READER_OPERATION_FAILED, e); + CommonErrorCodeDeprecated.TABLE_SCHEMA_GET_FAILED, + String.format( + "get table %s.%s info with exception, error:%s", + projectName, tableName, ex.getMessage()), + ex); } } } diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCreateTableTest.java b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCreateTableTest.java new file mode 100644 index 00000000000..efdcd070e3b --- /dev/null +++ b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCreateTableTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.maxcompute.catalog; + +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.ConstraintKey; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.catalog.PrimaryKey; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.type.BasicType; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +@Slf4j +public class MaxComputeCreateTableTest { + + @Test + public void test() { + + List columns = new ArrayList<>(); + + columns.add(PhysicalColumn.of("id", BasicType.LONG_TYPE, (Long) null, true, null, "")); + columns.add(PhysicalColumn.of("name", BasicType.STRING_TYPE, (Long) null, true, null, "")); + columns.add( + PhysicalColumn.of( + "age", BasicType.INT_TYPE, (Long) null, true, null, "test comment")); + columns.add(PhysicalColumn.of("score", BasicType.INT_TYPE, (Long) null, true, null, "")); + columns.add(PhysicalColumn.of("gender", BasicType.BYTE_TYPE, (Long) null, true, null, "")); + columns.add( + PhysicalColumn.of("create_time", BasicType.LONG_TYPE, (Long) null, true, null, "")); + + String result = + MaxComputeCatalogUtil.getCreateTableStatement( + "CREATE TABLE IF NOT EXISTS `${database}`.`${table}` ( \n" + + "${rowtype_primary_key} , \n" + + "${rowtype_unique_key} , \n" + + "`create_time` DATETIME NOT NULL , \n" + + "${rowtype_fields} \n" + + ") ENGINE=OLAP \n" + + "PRIMARY KEY(${rowtype_primary_key},`create_time`) \n" + + "PARTITION BY RANGE (`create_time`)( \n" + + " PARTITION p20230329 VALUES LESS THAN (\"2023-03-29\") \n" + + ") \n" + + "DISTRIBUTED BY HASH (${rowtype_primary_key}) \n" + + "PROPERTIES (\n" + + "\"replication_allocation\" = \"tag.location.default: 1\",\n" + + "\"in_memory\" = \"false\",\n" + + "\"storage_format\" = \"V2\",\n" + + "\"disable_auto_compaction\" = \"false\"\n" + + ");", + TablePath.of("test1.test2"), + CatalogTable.of( + TableIdentifier.of("test", "test1", "test2"), + TableSchema.builder() + .primaryKey(PrimaryKey.of("", Arrays.asList("id", "age"))) + .constraintKey( + Arrays.asList( + ConstraintKey.of( + ConstraintKey.ConstraintType + .UNIQUE_KEY, + "unique_key", + Collections.singletonList( + ConstraintKey + .ConstraintKeyColumn + .of( + "name", + ConstraintKey + .ColumnSortType + .DESC))), + ConstraintKey.of( + ConstraintKey.ConstraintType + .UNIQUE_KEY, + "unique_key2", + Collections.singletonList( + ConstraintKey + .ConstraintKeyColumn + .of( + "score", + ConstraintKey + .ColumnSortType + .ASC))))) + .columns(columns) + .build(), + Collections.emptyMap(), + Collections.emptyList(), + "")); + Assertions.assertEquals( + result, + "CREATE TABLE IF NOT EXISTS `test1`.`test2` ( \n" + + "`id` BIGINT NULL ,`age` INT NULL COMMENT 'test comment' , \n" + + "`name` STRING NULL ,`score` INT NULL , \n" + + "`create_time` DATETIME NOT NULL , \n" + + "`gender` TINYINT NULL \n" + + ") ENGINE=OLAP \n" + + "PRIMARY KEY(`id`,`age`,`create_time`) \n" + + "PARTITION BY RANGE (`create_time`)( \n" + + " PARTITION p20230329 VALUES LESS THAN (\"2023-03-29\") \n" + + ") \n" + + "DISTRIBUTED BY HASH (`id`,`age`) \n" + + "PROPERTIES (\n" + + "\"replication_allocation\" = \"tag.location.default: 1\",\n" + + "\"in_memory\" = \"false\",\n" + + "\"storage_format\" = \"V2\",\n" + + "\"disable_auto_compaction\" = \"false\"\n" + + ");"); + } +} diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/PreviewActionTest.java b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/PreviewActionTest.java new file mode 100644 index 00000000000..d4ba6799ac8 --- /dev/null +++ b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/PreviewActionTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.maxcompute.catalog; + +import org.apache.seatunnel.shade.com.google.common.collect.Lists; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.Catalog; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.catalog.PreviewResult; +import org.apache.seatunnel.api.table.catalog.PrimaryKey; +import org.apache.seatunnel.api.table.catalog.SQLPreviewResult; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.type.BasicType; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Optional; + +public class PreviewActionTest { + + private static final CatalogTable CATALOG_TABLE = + CatalogTable.of( + TableIdentifier.of("catalog", "database", "table"), + TableSchema.builder() + .primaryKey(PrimaryKey.of("", Lists.newArrayList("id"))) + .columns( + Lists.newArrayList( + PhysicalColumn.of( + "id", + BasicType.LONG_TYPE, + (Long) null, + false, + null, + ""), + PhysicalColumn.of( + "test", + BasicType.STRING_TYPE, + (Long) null, + true, + null, + ""))) + .build(), + Collections.emptyMap(), + Collections.emptyList(), + "comment"); + + @Test + public void testDorisPreviewAction() { + MaxComputeCatalogFactory factory = new MaxComputeCatalogFactory(); + Catalog catalog = + factory.createCatalog( + "test", + ReadonlyConfig.fromMap( + new HashMap() { + { + put("username", "root"); + put("password", "root"); + } + })); + assertPreviewResult( + catalog, + Catalog.ActionType.DROP_TABLE, + "DROP TABLE IF EXISTS testddatabase.testtable;", + Optional.empty()); + assertPreviewResult( + catalog, + Catalog.ActionType.CREATE_TABLE, + "CREATE TABLE IF NOT EXISTS `testtable` (\n" + + "`id` BIGINT NOT NULL ,\n" + + "`test` STRING NULL \n" + + ");", + Optional.of(CATALOG_TABLE)); + } + + private void assertPreviewResult( + Catalog catalog, + Catalog.ActionType actionType, + String expectedSql, + Optional catalogTable) { + PreviewResult previewResult = + catalog.previewAction( + actionType, TablePath.of("testddatabase.testtable"), catalogTable); + Assertions.assertInstanceOf(SQLPreviewResult.class, previewResult); + Assertions.assertEquals(expectedSql, ((SQLPreviewResult) previewResult).getSql()); + } +} diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/datatype/MaxComputeTypeConvertorTest.java b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/datatype/MaxComputeTypeConvertorTest.java new file mode 100644 index 00000000000..244f7cbe0ed --- /dev/null +++ b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/datatype/MaxComputeTypeConvertorTest.java @@ -0,0 +1,1087 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.maxcompute.datatype; + +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.converter.BasicTypeDefine; +import org.apache.seatunnel.api.table.type.ArrayType; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.DecimalArrayType; +import org.apache.seatunnel.api.table.type.DecimalType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.api.table.type.MapType; +import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; +import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import com.aliyun.odps.OdpsType; +import com.aliyun.odps.type.TypeInfo; +import com.aliyun.odps.type.TypeInfoFactory; + +import java.util.Locale; + +public class MaxComputeTypeConvertorTest { + + @Test + public void testConvertUnsupported() { + BasicTypeDefine typeDefine = + BasicTypeDefine.builder() + .name("test") + .columnType("aaa") + .dataType("aaa") + .build(); + try { + MaxComputeTypeConverter.INSTANCE.convert(typeDefine); + Assertions.fail(); + } catch (SeaTunnelRuntimeException e) { + // ignore + } catch (Throwable e) { + Assertions.fail(); + } + } + + @Test + public void testConvertTinyint() { + BasicTypeDefine typeDefine = + BasicTypeDefine.builder() + .name("test") + .nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.TINYINT)) + .columnType( + TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.TINYINT) + .getTypeName()) + .dataType(OdpsType.TINYINT.name()) + .length(1L) + .build(); + Column column = MaxComputeTypeConverter.INSTANCE.convert(typeDefine); + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(BasicType.BYTE_TYPE, column.getDataType()); + Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType()); + } + + @Test + public void testConvertSmallint() { + BasicTypeDefine typeDefine = + BasicTypeDefine.builder() + .name("test") + .nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.SMALLINT)) + .columnType( + TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.SMALLINT) + .getTypeName()) + .dataType(OdpsType.SMALLINT.name()) + .build(); + Column column = MaxComputeTypeConverter.INSTANCE.convert(typeDefine); + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(BasicType.SHORT_TYPE, column.getDataType()); + Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType()); + } + + @Test + public void testConvertInt() { + BasicTypeDefine typeDefine = + BasicTypeDefine.builder() + .name("test") + .nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.INT)) + .columnType( + TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.INT).getTypeName()) + .dataType(OdpsType.INT.name()) + .build(); + Column column = MaxComputeTypeConverter.INSTANCE.convert(typeDefine); + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(BasicType.INT_TYPE, column.getDataType()); + Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType()); + } + + @Test + public void testConvertBoolean() { + BasicTypeDefine typeDefine = + BasicTypeDefine.builder() + .name("test") + .nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.BOOLEAN)) + .columnType( + TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.BOOLEAN) + .getTypeName()) + .dataType(OdpsType.BOOLEAN.name()) + .build(); + Column column = MaxComputeTypeConverter.INSTANCE.convert(typeDefine); + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(BasicType.BOOLEAN_TYPE, column.getDataType()); + Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType()); + } + + @Test + public void testConvertBigint() { + BasicTypeDefine typeDefine = + BasicTypeDefine.builder() + .name("test") + .nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.BIGINT)) + .columnType( + TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.BIGINT).getTypeName()) + .dataType(OdpsType.BIGINT.name()) + .build(); + Column column = MaxComputeTypeConverter.INSTANCE.convert(typeDefine); + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(BasicType.LONG_TYPE, column.getDataType()); + Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType()); + } + + @Test + public void testConvertFloat() { + BasicTypeDefine typeDefine = + BasicTypeDefine.builder() + .name("test") + .nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.FLOAT)) + .columnType( + TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.FLOAT).getTypeName()) + .dataType(OdpsType.FLOAT.name()) + .build(); + Column column = MaxComputeTypeConverter.INSTANCE.convert(typeDefine); + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(BasicType.FLOAT_TYPE, column.getDataType()); + Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType()); + } + + @Test + public void testConvertDouble() { + BasicTypeDefine typeDefine = + BasicTypeDefine.builder() + .name("test") + .nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.DOUBLE)) + .columnType( + TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.DOUBLE).getTypeName()) + .dataType(OdpsType.DOUBLE.name()) + .build(); + Column column = MaxComputeTypeConverter.INSTANCE.convert(typeDefine); + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(BasicType.DOUBLE_TYPE, column.getDataType()); + Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType()); + } + + @Test + public void testConvertDecimal() { + BasicTypeDefine typeDefine = + BasicTypeDefine.builder() + .name("test") + .nativeType(TypeInfoFactory.getDecimalTypeInfo(9, 2)) + .columnType(TypeInfoFactory.getDecimalTypeInfo(9, 2).getTypeName()) + .dataType(OdpsType.DECIMAL.name()) + .build(); + Column column = MaxComputeTypeConverter.INSTANCE.convert(typeDefine); + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(new DecimalType(9, 2), column.getDataType()); + Assertions.assertEquals(9L, column.getColumnLength()); + Assertions.assertEquals(2, column.getScale()); + Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType()); + } + + @Test + public void testConvertChar() { + BasicTypeDefine typeDefine = + BasicTypeDefine.builder() + .name("test") + .nativeType(TypeInfoFactory.getCharTypeInfo(2)) + .columnType(TypeInfoFactory.getCharTypeInfo(2).getTypeName()) + .dataType(OdpsType.CHAR.name()) + .build(); + Column column = MaxComputeTypeConverter.INSTANCE.convert(typeDefine); + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType()); + Assertions.assertEquals(8, column.getColumnLength()); + Assertions.assertEquals( + typeDefine.getColumnType(), column.getSourceType().toUpperCase(Locale.ROOT)); + + typeDefine = + BasicTypeDefine.builder() + .name("test") + .nativeType(TypeInfoFactory.getVarcharTypeInfo(2)) + .columnType(TypeInfoFactory.getVarcharTypeInfo(2).getTypeName()) + .dataType(OdpsType.VARCHAR.name()) + .build(); + column = MaxComputeTypeConverter.INSTANCE.convert(typeDefine); + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType()); + Assertions.assertEquals(8, column.getColumnLength()); + Assertions.assertEquals( + typeDefine.getColumnType(), column.getSourceType().toUpperCase(Locale.ROOT)); + } + + @Test + public void testConvertString() { + BasicTypeDefine typeDefine = + BasicTypeDefine.builder() + .name("test") + .nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.STRING)) + .columnType( + TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.STRING).getTypeName()) + .dataType(OdpsType.STRING.name()) + .build(); + Column column = MaxComputeTypeConverter.INSTANCE.convert(typeDefine); + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType()); + Assertions.assertEquals( + MaxComputeTypeConverter.MAX_VARBINARY_LENGTH, column.getColumnLength()); + Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType()); + } + + @Test + public void testConvertJson() { + BasicTypeDefine typeDefine = + BasicTypeDefine.builder() + .name("test") + .nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.JSON)) + .columnType( + TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.JSON).getTypeName()) + .dataType(OdpsType.JSON.name()) + .build(); + Column column = MaxComputeTypeConverter.INSTANCE.convert(typeDefine); + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType()); + Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType()); + } + + @Test + public void testConvertDate() { + BasicTypeDefine typeDefine = + BasicTypeDefine.builder() + .name("test") + .nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.DATE)) + .columnType( + TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.DATE).getTypeName()) + .dataType(OdpsType.DATE.name()) + .build(); + Column column = MaxComputeTypeConverter.INSTANCE.convert(typeDefine); + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(LocalTimeType.LOCAL_DATE_TYPE, column.getDataType()); + Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType()); + } + + @Test + public void testConvertDatetime() { + BasicTypeDefine typeDefine = + BasicTypeDefine.builder() + .name("test") + .nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.DATETIME)) + .columnType( + TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.DATETIME) + .getTypeName()) + .dataType(OdpsType.DATETIME.name()) + .build(); + Column column = MaxComputeTypeConverter.INSTANCE.convert(typeDefine); + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(LocalTimeType.LOCAL_DATE_TIME_TYPE, column.getDataType()); + Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType()); + + typeDefine = + BasicTypeDefine.builder() + .name("test") + .nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.TIMESTAMP)) + .columnType( + TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.TIMESTAMP) + .getTypeName()) + .dataType( + TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.TIMESTAMP) + .getOdpsType() + .name()) + .build(); + column = MaxComputeTypeConverter.INSTANCE.convert(typeDefine); + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(LocalTimeType.LOCAL_DATE_TIME_TYPE, column.getDataType()); + Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType()); + + typeDefine = + BasicTypeDefine.builder() + .name("test") + .nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.TIMESTAMP_NTZ)) + .columnType( + TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.TIMESTAMP_NTZ) + .getTypeName()) + .dataType( + TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.TIMESTAMP_NTZ) + .getOdpsType() + .name()) + .build(); + column = MaxComputeTypeConverter.INSTANCE.convert(typeDefine); + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(LocalTimeType.LOCAL_DATE_TIME_TYPE, column.getDataType()); + Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType()); + } + + @Test + public void testConvertArray() { + BasicTypeDefine typeDefine = + BasicTypeDefine.builder() + .name("test") + .nativeType( + TypeInfoFactory.getArrayTypeInfo( + TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.BOOLEAN))) + .columnType( + TypeInfoFactory.getArrayTypeInfo( + TypeInfoFactory.getPrimitiveTypeInfo( + OdpsType.BOOLEAN)) + .getTypeName()) + .dataType( + TypeInfoFactory.getArrayTypeInfo( + TypeInfoFactory.getPrimitiveTypeInfo( + OdpsType.BOOLEAN)) + .getOdpsType() + .name()) + .build(); + Column column = MaxComputeTypeConverter.INSTANCE.convert(typeDefine); + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(ArrayType.BOOLEAN_ARRAY_TYPE, column.getDataType()); + Assertions.assertEquals("ARRAY", column.getSourceType()); + + typeDefine = + BasicTypeDefine.builder() + .name("test") + .nativeType( + TypeInfoFactory.getArrayTypeInfo( + TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.TINYINT))) + .columnType( + TypeInfoFactory.getArrayTypeInfo( + TypeInfoFactory.getPrimitiveTypeInfo( + OdpsType.TINYINT)) + .getTypeName()) + .dataType( + TypeInfoFactory.getArrayTypeInfo( + TypeInfoFactory.getPrimitiveTypeInfo( + OdpsType.TINYINT)) + .getOdpsType() + .name()) + .build(); + column = MaxComputeTypeConverter.INSTANCE.convert(typeDefine); + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(ArrayType.BYTE_ARRAY_TYPE, column.getDataType()); + Assertions.assertEquals("ARRAY", column.getSourceType()); + + typeDefine = + BasicTypeDefine.builder() + .name("test") + .nativeType( + TypeInfoFactory.getArrayTypeInfo( + TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.SMALLINT))) + .columnType( + TypeInfoFactory.getArrayTypeInfo( + TypeInfoFactory.getPrimitiveTypeInfo( + OdpsType.SMALLINT)) + .getTypeName()) + .dataType( + TypeInfoFactory.getArrayTypeInfo( + TypeInfoFactory.getPrimitiveTypeInfo( + OdpsType.SMALLINT)) + .getOdpsType() + .name()) + .build(); + column = MaxComputeTypeConverter.INSTANCE.convert(typeDefine); + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(ArrayType.SHORT_ARRAY_TYPE, column.getDataType()); + Assertions.assertEquals("ARRAY", column.getSourceType()); + + typeDefine = + BasicTypeDefine.builder() + .name("test") + .nativeType( + TypeInfoFactory.getArrayTypeInfo( + TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.INT))) + .columnType( + TypeInfoFactory.getArrayTypeInfo( + TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.INT)) + .getTypeName()) + .dataType( + TypeInfoFactory.getArrayTypeInfo( + TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.INT)) + .getOdpsType() + .name()) + .build(); + column = MaxComputeTypeConverter.INSTANCE.convert(typeDefine); + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(ArrayType.INT_ARRAY_TYPE, column.getDataType()); + Assertions.assertEquals("ARRAY", column.getSourceType()); + + typeDefine = + BasicTypeDefine.builder() + .name("test") + .nativeType( + TypeInfoFactory.getArrayTypeInfo( + TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.BIGINT))) + .columnType( + TypeInfoFactory.getArrayTypeInfo( + TypeInfoFactory.getPrimitiveTypeInfo( + OdpsType.BIGINT)) + .getTypeName()) + .dataType( + TypeInfoFactory.getArrayTypeInfo( + TypeInfoFactory.getPrimitiveTypeInfo( + OdpsType.BIGINT)) + .getOdpsType() + .name()) + .build(); + column = MaxComputeTypeConverter.INSTANCE.convert(typeDefine); + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(ArrayType.LONG_ARRAY_TYPE, column.getDataType()); + Assertions.assertEquals("ARRAY", column.getSourceType()); + + typeDefine = + BasicTypeDefine.builder() + .name("test") + .nativeType( + TypeInfoFactory.getArrayTypeInfo( + TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.FLOAT))) + .columnType( + TypeInfoFactory.getArrayTypeInfo( + TypeInfoFactory.getPrimitiveTypeInfo( + OdpsType.FLOAT)) + .getTypeName()) + .dataType( + TypeInfoFactory.getArrayTypeInfo( + TypeInfoFactory.getPrimitiveTypeInfo( + OdpsType.FLOAT)) + .getOdpsType() + .name()) + .build(); + column = MaxComputeTypeConverter.INSTANCE.convert(typeDefine); + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(ArrayType.FLOAT_ARRAY_TYPE, column.getDataType()); + Assertions.assertEquals("ARRAY", column.getSourceType()); + + typeDefine = + BasicTypeDefine.builder() + .name("test") + .nativeType( + TypeInfoFactory.getArrayTypeInfo( + TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.DOUBLE))) + .columnType( + TypeInfoFactory.getArrayTypeInfo( + TypeInfoFactory.getPrimitiveTypeInfo( + OdpsType.DOUBLE)) + .getTypeName()) + .dataType( + TypeInfoFactory.getArrayTypeInfo( + TypeInfoFactory.getPrimitiveTypeInfo( + OdpsType.DOUBLE)) + .getOdpsType() + .name()) + .build(); + column = MaxComputeTypeConverter.INSTANCE.convert(typeDefine); + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(ArrayType.DOUBLE_ARRAY_TYPE, column.getDataType()); + Assertions.assertEquals("ARRAY", column.getSourceType()); + + typeDefine = + BasicTypeDefine.builder() + .name("test") + .nativeType( + TypeInfoFactory.getArrayTypeInfo( + TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.DATE))) + .columnType( + TypeInfoFactory.getArrayTypeInfo( + TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.DATE)) + .getTypeName()) + .dataType( + TypeInfoFactory.getArrayTypeInfo( + TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.DATE)) + .getOdpsType() + .name()) + .build(); + column = MaxComputeTypeConverter.INSTANCE.convert(typeDefine); + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(ArrayType.LOCAL_DATE_ARRAY_TYPE, column.getDataType()); + Assertions.assertEquals("ARRAY", column.getSourceType()); + + typeDefine = + BasicTypeDefine.builder() + .name("test") + .nativeType( + TypeInfoFactory.getArrayTypeInfo( + TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.DATETIME))) + .columnType( + TypeInfoFactory.getArrayTypeInfo( + TypeInfoFactory.getPrimitiveTypeInfo( + OdpsType.DATETIME)) + .getTypeName()) + .dataType( + TypeInfoFactory.getArrayTypeInfo( + TypeInfoFactory.getPrimitiveTypeInfo( + OdpsType.DATETIME)) + .getOdpsType() + .name()) + .build(); + column = MaxComputeTypeConverter.INSTANCE.convert(typeDefine); + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(ArrayType.LOCAL_DATE_TIME_ARRAY_TYPE, column.getDataType()); + Assertions.assertEquals("ARRAY", column.getSourceType()); + } + + @Test + public void testConvertMap() { + TypeInfo typeInfo = + TypeInfoFactory.getMapTypeInfo( + TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.STRING), + TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.BOOLEAN)); + BasicTypeDefine typeDefine = + BasicTypeDefine.builder() + .name("test") + .nativeType(typeInfo) + .columnType(typeInfo.getTypeName()) + .dataType(typeInfo.getOdpsType().name()) + .build(); + Column column = MaxComputeTypeConverter.INSTANCE.convert(typeDefine); + Assertions.assertEquals(typeDefine.getName(), column.getName()); + MapType mapType = new MapType<>(BasicType.STRING_TYPE, BasicType.BOOLEAN_TYPE); + Assertions.assertEquals(mapType, column.getDataType()); + Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType()); + } + + @Test + public void testReconvertBoolean() { + Column column = + PhysicalColumn.builder() + .name("test") + .dataType(BasicType.BOOLEAN_TYPE) + .build(); + + BasicTypeDefine typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals(MaxComputeTypeConverter.BOOLEAN, typeDefine.getColumnType()); + Assertions.assertEquals(MaxComputeTypeConverter.BOOLEAN, typeDefine.getDataType()); + Assertions.assertEquals(1, typeDefine.getLength()); + } + + @Test + public void testReconvertByte() { + Column column = + PhysicalColumn.builder() + .name("test") + .dataType(BasicType.BYTE_TYPE) + .build(); + + BasicTypeDefine typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals(MaxComputeTypeConverter.TINYINT, typeDefine.getColumnType()); + Assertions.assertEquals(MaxComputeTypeConverter.TINYINT, typeDefine.getDataType()); + } + + @Test + public void testReconvertShort() { + Column column = + PhysicalColumn.builder() + .name("test") + .dataType(BasicType.SHORT_TYPE) + .build(); + + BasicTypeDefine typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals(MaxComputeTypeConverter.SMALLINT, typeDefine.getColumnType()); + Assertions.assertEquals(MaxComputeTypeConverter.SMALLINT, typeDefine.getDataType()); + } + + @Test + public void testReconvertInt() { + Column column = + PhysicalColumn.builder() + .name("test") + .dataType(BasicType.INT_TYPE) + .build(); + + BasicTypeDefine typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals(MaxComputeTypeConverter.INT, typeDefine.getColumnType()); + Assertions.assertEquals(MaxComputeTypeConverter.INT, typeDefine.getDataType()); + } + + @Test + public void testReconvertLong() { + Column column = + PhysicalColumn.builder() + .name("test") + .dataType(BasicType.LONG_TYPE) + .build(); + + BasicTypeDefine typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals(MaxComputeTypeConverter.BIGINT, typeDefine.getColumnType()); + Assertions.assertEquals(MaxComputeTypeConverter.BIGINT, typeDefine.getDataType()); + } + + @Test + public void testReconvertFloat() { + Column column = + PhysicalColumn.builder() + .name("test") + .dataType(BasicType.FLOAT_TYPE) + .build(); + + BasicTypeDefine typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals(MaxComputeTypeConverter.FLOAT, typeDefine.getColumnType()); + Assertions.assertEquals(MaxComputeTypeConverter.FLOAT, typeDefine.getDataType()); + } + + @Test + public void testReconvertDouble() { + Column column = + PhysicalColumn.builder() + .name("test") + .dataType(BasicType.DOUBLE_TYPE) + .build(); + + BasicTypeDefine typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals(MaxComputeTypeConverter.DOUBLE, typeDefine.getColumnType()); + Assertions.assertEquals(MaxComputeTypeConverter.DOUBLE, typeDefine.getDataType()); + } + + @Test + public void testReconvertDecimal() { + Column column = + PhysicalColumn.builder() + .name("test") + .dataType(new DecimalType(0, 0)) + .build(); + + BasicTypeDefine typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals( + String.format( + "%s(%s,%s)", + MaxComputeTypeConverter.DECIMAL, + MaxComputeTypeConverter.MAX_PRECISION, + MaxComputeTypeConverter.MAX_SCALE), + typeDefine.getColumnType()); + Assertions.assertEquals(MaxComputeTypeConverter.DECIMAL, typeDefine.getDataType()); + + column = + PhysicalColumn.builder() + .name("test") + .dataType(new DecimalType(10, 2)) + .build(); + + typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals(MaxComputeTypeConverter.DECIMAL, typeDefine.getDataType()); + Assertions.assertEquals( + String.format("%s(%s,%s)", MaxComputeTypeConverter.DECIMAL, 10, 2), + typeDefine.getColumnType()); + } + + @Test + public void testReconvertBytes() { + Column column = + PhysicalColumn.builder() + .name("test") + .dataType(PrimitiveByteArrayType.INSTANCE) + .columnLength(null) + .build(); + + BasicTypeDefine typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals(MaxComputeTypeConverter.BINARY, typeDefine.getColumnType()); + Assertions.assertEquals(MaxComputeTypeConverter.BINARY, typeDefine.getDataType()); + } + + @Test + public void testReconvertString() { + Column column = + PhysicalColumn.builder() + .name("test") + .dataType(BasicType.STRING_TYPE) + .columnLength(null) + .sourceType(MaxComputeTypeConverter.JSON) + .build(); + + BasicTypeDefine typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals(MaxComputeTypeConverter.STRING, typeDefine.getColumnType()); + Assertions.assertEquals(MaxComputeTypeConverter.STRING, typeDefine.getDataType()); + + column = + PhysicalColumn.builder() + .name("test") + .dataType(BasicType.STRING_TYPE) + .columnLength(null) + .sourceType(MaxComputeTypeConverter.JSON) + .build(); + + typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals(MaxComputeTypeConverter.STRING, typeDefine.getColumnType()); + Assertions.assertEquals(MaxComputeTypeConverter.STRING, typeDefine.getDataType()); + + column = + PhysicalColumn.builder() + .name("test") + .dataType(BasicType.STRING_TYPE) + .columnLength(255L) + .build(); + + typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals( + String.format("%s(%s)", MaxComputeTypeConverter.CHAR, column.getColumnLength()), + typeDefine.getColumnType()); + Assertions.assertEquals(MaxComputeTypeConverter.CHAR, typeDefine.getDataType()); + + column = + PhysicalColumn.builder() + .name("test") + .dataType(BasicType.STRING_TYPE) + .columnLength(255L) + .sourceType("VARCHAR(255)") + .build(); + + typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals( + String.format("%s(%s)", MaxComputeTypeConverter.CHAR, column.getColumnLength()), + typeDefine.getColumnType()); + Assertions.assertEquals(MaxComputeTypeConverter.CHAR, typeDefine.getDataType()); + + column = + PhysicalColumn.builder() + .name("test") + .dataType(BasicType.STRING_TYPE) + .columnLength(65533L) + .build(); + + typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals( + String.format("%s(%s)", MaxComputeTypeConverter.VARCHAR, column.getColumnLength()), + typeDefine.getColumnType()); + Assertions.assertEquals(MaxComputeTypeConverter.VARCHAR, typeDefine.getDataType()); + + column = + PhysicalColumn.builder() + .name("test") + .dataType(BasicType.STRING_TYPE) + .columnLength(16777215L) + .build(); + + typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals(MaxComputeTypeConverter.STRING, typeDefine.getColumnType()); + Assertions.assertEquals(MaxComputeTypeConverter.STRING, typeDefine.getDataType()); + } + + @Test + public void testReconvertDate() { + Column column = + PhysicalColumn.builder() + .name("test") + .dataType(LocalTimeType.LOCAL_DATE_TYPE) + .build(); + + BasicTypeDefine typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals(MaxComputeTypeConverter.DATE, typeDefine.getColumnType()); + Assertions.assertEquals(MaxComputeTypeConverter.DATE, typeDefine.getDataType()); + } + + @Test + public void testReconvertTime() { + Column column = + PhysicalColumn.builder() + .name("test") + .dataType(LocalTimeType.LOCAL_TIME_TYPE) + .build(); + + Exception exception = + Assertions.assertThrows( + Exception.class, () -> MaxComputeTypeConverter.INSTANCE.reconvert(column)); + Assertions.assertTrue( + exception + .getMessage() + .contains( + "ErrorCode:[COMMON-19], ErrorDescription:['Maxcompute' unsupported convert SeaTunnel data type 'TIME' of 'test' to connector data type.]")); + } + + @Test + public void testReconvertDatetime() { + Column column = + PhysicalColumn.builder() + .name("test") + .dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE) + .build(); + + BasicTypeDefine typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals(MaxComputeTypeConverter.DATETIME, typeDefine.getDataType()); + + column = + PhysicalColumn.builder() + .name("test") + .dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE) + .scale(3) + .build(); + + typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals(MaxComputeTypeConverter.DATETIME, typeDefine.getColumnType()); + Assertions.assertEquals(MaxComputeTypeConverter.DATETIME, typeDefine.getDataType()); + + column = + PhysicalColumn.builder() + .name("test") + .dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE) + .scale(10) + .build(); + + typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals(MaxComputeTypeConverter.TIMESTAMP, typeDefine.getDataType()); + } + + @Test + public void testReconvertArray() { + Column column = + PhysicalColumn.builder() + .name("test") + .dataType(ArrayType.BOOLEAN_ARRAY_TYPE) + .build(); + + BasicTypeDefine typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals("ARRAY", typeDefine.getColumnType()); + Assertions.assertEquals(MaxComputeTypeConverter.ARRAY, typeDefine.getDataType()); + + column = + PhysicalColumn.builder() + .name("test") + .dataType(ArrayType.BYTE_ARRAY_TYPE) + .build(); + + typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals("ARRAY", typeDefine.getColumnType()); + Assertions.assertEquals(MaxComputeTypeConverter.ARRAY, typeDefine.getDataType()); + + column = + PhysicalColumn.builder() + .name("test") + .dataType(ArrayType.STRING_ARRAY_TYPE) + .build(); + + typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals("ARRAY", typeDefine.getColumnType()); + Assertions.assertEquals(MaxComputeTypeConverter.ARRAY, typeDefine.getDataType()); + + column = + PhysicalColumn.builder() + .name("test") + .dataType(ArrayType.SHORT_ARRAY_TYPE) + .build(); + + typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals("ARRAY", typeDefine.getColumnType()); + Assertions.assertEquals(MaxComputeTypeConverter.ARRAY, typeDefine.getDataType()); + + column = + PhysicalColumn.builder() + .name("test") + .dataType(ArrayType.INT_ARRAY_TYPE) + .build(); + + typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals("ARRAY", typeDefine.getColumnType()); + Assertions.assertEquals(MaxComputeTypeConverter.ARRAY, typeDefine.getDataType()); + + column = + PhysicalColumn.builder() + .name("test") + .dataType(ArrayType.LONG_ARRAY_TYPE) + .build(); + + typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals("ARRAY", typeDefine.getColumnType()); + Assertions.assertEquals(MaxComputeTypeConverter.ARRAY, typeDefine.getDataType()); + + column = + PhysicalColumn.builder() + .name("test") + .dataType(ArrayType.FLOAT_ARRAY_TYPE) + .build(); + + typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals("ARRAY", typeDefine.getColumnType()); + Assertions.assertEquals(MaxComputeTypeConverter.ARRAY, typeDefine.getDataType()); + + column = + PhysicalColumn.builder() + .name("test") + .dataType(ArrayType.DOUBLE_ARRAY_TYPE) + .build(); + + typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals("ARRAY", typeDefine.getColumnType()); + Assertions.assertEquals(MaxComputeTypeConverter.ARRAY, typeDefine.getDataType()); + + column = + PhysicalColumn.builder() + .name("test") + .dataType(ArrayType.LOCAL_DATE_ARRAY_TYPE) + .build(); + + typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals("ARRAY", typeDefine.getColumnType()); + Assertions.assertEquals(MaxComputeTypeConverter.ARRAY, typeDefine.getDataType()); + + column = + PhysicalColumn.builder() + .name("test") + .dataType(ArrayType.LOCAL_DATE_TIME_ARRAY_TYPE) + .build(); + + typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals("ARRAY", typeDefine.getColumnType()); + Assertions.assertEquals(MaxComputeTypeConverter.ARRAY, typeDefine.getDataType()); + + DecimalArrayType decimalArrayType = new DecimalArrayType(new DecimalType(10, 2)); + column = PhysicalColumn.builder().name("test").dataType(decimalArrayType).build(); + + typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals("ARRAY", typeDefine.getColumnType()); + Assertions.assertEquals("ARRAY", typeDefine.getDataType()); + } + + @Test + public void testReconvertMap() { + Column column = + PhysicalColumn.builder() + .name("test") + .dataType(new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE)) + .build(); + + BasicTypeDefine typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals("MAP", typeDefine.getColumnType()); + Assertions.assertEquals(MaxComputeTypeConverter.MAP, typeDefine.getDataType()); + + column = + PhysicalColumn.builder() + .name("test") + .dataType(new MapType<>(BasicType.BYTE_TYPE, BasicType.STRING_TYPE)) + .build(); + + typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals("MAP", typeDefine.getColumnType()); + Assertions.assertEquals("MAP", typeDefine.getDataType()); + + column = + PhysicalColumn.builder() + .name("test") + .dataType(new MapType<>(BasicType.SHORT_TYPE, BasicType.STRING_TYPE)) + .build(); + + typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals("MAP", typeDefine.getColumnType()); + Assertions.assertEquals("MAP", typeDefine.getDataType()); + + column = + PhysicalColumn.builder() + .name("test") + .dataType(new MapType<>(BasicType.INT_TYPE, BasicType.STRING_TYPE)) + .build(); + + typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals("MAP", typeDefine.getColumnType()); + Assertions.assertEquals("MAP", typeDefine.getDataType()); + + column = + PhysicalColumn.builder() + .name("test") + .dataType(new MapType<>(BasicType.LONG_TYPE, BasicType.STRING_TYPE)) + .build(); + + typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals("MAP", typeDefine.getColumnType()); + Assertions.assertEquals("MAP", typeDefine.getDataType()); + + column = + PhysicalColumn.builder() + .name("test") + .dataType(new MapType<>(BasicType.FLOAT_TYPE, BasicType.STRING_TYPE)) + .build(); + + typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals("MAP", typeDefine.getColumnType()); + Assertions.assertEquals("MAP", typeDefine.getDataType()); + + column = + PhysicalColumn.builder() + .name("test") + .dataType(new MapType<>(BasicType.DOUBLE_TYPE, BasicType.STRING_TYPE)) + .build(); + + typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals("MAP", typeDefine.getColumnType()); + Assertions.assertEquals("MAP", typeDefine.getDataType()); + + column = + PhysicalColumn.builder() + .name("test") + .dataType(new MapType<>(new DecimalType(10, 2), BasicType.STRING_TYPE)) + .build(); + + typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals("MAP", typeDefine.getColumnType()); + Assertions.assertEquals("MAP", typeDefine.getDataType()); + + column = + PhysicalColumn.builder() + .name("test") + .dataType( + new MapType<>(LocalTimeType.LOCAL_DATE_TYPE, BasicType.STRING_TYPE)) + .build(); + + typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals("MAP", typeDefine.getColumnType()); + Assertions.assertEquals("MAP", typeDefine.getDataType()); + + column = + PhysicalColumn.builder() + .name("test") + .dataType( + new MapType<>( + LocalTimeType.LOCAL_DATE_TIME_TYPE, BasicType.STRING_TYPE)) + .build(); + + typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals("MAP", typeDefine.getColumnType()); + Assertions.assertEquals("MAP", typeDefine.getDataType()); + } +}