diff --git a/docs/en/connector-v2/sink/Maxcompute.md b/docs/en/connector-v2/sink/Maxcompute.md
index de5fc931203..2f428341896 100644
--- a/docs/en/connector-v2/sink/Maxcompute.md
+++ b/docs/en/connector-v2/sink/Maxcompute.md
@@ -51,6 +51,64 @@ Used to read data from Maxcompute.
`overwrite` Whether to overwrite the table or partition, default: false.
+### save_mode_create_template
+
+We use templates to automatically create MaxCompute tables,
+which will create corresponding table creation statements based on the type of upstream data and schema type,
+and the default template can be modified according to the situation. Only work on multi-table mode at now.
+
+Default template:
+
+```sql
+CREATE TABLE IF NOT EXISTS `${table}` (
+${rowtype_fields}
+);
+```
+
+If a custom field is filled in the template, such as adding an `id` field
+
+```sql
+CREATE TABLE IF NOT EXISTS `${table}`
+(
+ id,
+ ${rowtype_fields}
+);
+```
+
+The connector will automatically obtain the corresponding type from the upstream to complete the filling,
+and remove the id field from `rowtype_fields`. This method can be used to customize the modification of field types and attributes.
+
+You can use the following placeholders
+
+- database: Used to get the database in the upstream schema
+- table_name: Used to get the table name in the upstream schema
+- rowtype_fields: Used to get all the fields in the upstream schema, we will automatically map to the field
+ description of MaxCompute
+- rowtype_primary_key: Used to get the primary key in the upstream schema (maybe a list)
+- rowtype_unique_key: Used to get the unique key in the upstream schema (maybe a list)
+
+### schema_save_mode[Enum]
+
+Before the synchronous task is turned on, different treatment schemes are selected for the existing surface structure of the target side.
+Option introduction:
+`RECREATE_SCHEMA` :Will create when the table does not exist, delete and rebuild when the table is saved
+`CREATE_SCHEMA_WHEN_NOT_EXIST` :Will Created when the table does not exist, skipped when the table is saved
+`ERROR_WHEN_SCHEMA_NOT_EXIST` :Error will be reported when the table does not exist
+`IGNORE` :Ignore the treatment of the table
+
+### data_save_mode[Enum]
+
+Before the synchronous task is turned on, different processing schemes are selected for data existing data on the target side.
+Option introduction:
+`DROP_DATA`: Preserve database structure and delete data
+`APPEND_DATA`:Preserve database structure, preserve data
+`CUSTOM_PROCESSING`:User defined processing
+`ERROR_WHEN_DATA_EXISTS`:When there is data, an error is reported
+
+### custom_sql[String]
+
+When data_save_mode selects CUSTOM_PROCESSING, you should fill in the CUSTOM_SQL parameter. This parameter usually fills in a SQL that can be executed. SQL will be executed before synchronization tasks.
+
### common options
Sink plugin common parameters, please refer to [Sink Common Options](../sink-common-options.md) for details.
@@ -70,10 +128,3 @@ sink {
}
}
```
-
-## Changelog
-
-### next version
-
-- [Feature] Add Maxcompute Sink Connector([3640](https://github.com/apache/seatunnel/pull/3640))
-
diff --git a/seatunnel-connectors-v2/connector-maxcompute/pom.xml b/seatunnel-connectors-v2/connector-maxcompute/pom.xml
index 12029cd0156..3c4261f930e 100644
--- a/seatunnel-connectors-v2/connector-maxcompute/pom.xml
+++ b/seatunnel-connectors-v2/connector-maxcompute/pom.xml
@@ -30,7 +30,7 @@
SeaTunnel : Connectors V2 : Maxcompute
- 0.31.3
+ 0.51.0
3.4
diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalog.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalog.java
index 9bd27081725..f90a5c40862 100644
--- a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalog.java
+++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalog.java
@@ -22,29 +22,46 @@
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PreviewResult;
+import org.apache.seatunnel.api.table.catalog.SQLPreviewResult;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
import org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
+import org.apache.seatunnel.connectors.seatunnel.maxcompute.datatype.MaxComputeTypeConverter;
+import org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeUtil;
+
+import org.apache.commons.lang3.StringUtils;
import com.aliyun.odps.Odps;
import com.aliyun.odps.OdpsException;
+import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.Projects;
+import com.aliyun.odps.Table;
import com.aliyun.odps.Tables;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
+import com.aliyun.odps.task.SQLTask;
+import com.aliyun.odps.type.TypeInfo;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_ID;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_KEY;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ENDPOINT;
+import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PROJECT;
+import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.SAVE_MODE_CREATE_TEMPLATE;
+import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;
@Slf4j
public class MaxComputeCatalog implements Catalog {
@@ -79,11 +96,9 @@ public String getDefaultDatabase() throws CatalogException {
@Override
public boolean databaseExists(String databaseName) throws CatalogException {
- Odps odps = new Odps(account);
- odps.setEndpoint(readonlyConfig.get(ENDPOINT));
- odps.setDefaultProject(readonlyConfig.get(PROJECT));
- Projects projects = odps.projects();
try {
+ Odps odps = getOdps(readonlyConfig.get(PROJECT));
+ Projects projects = odps.projects();
return projects.exists(databaseName);
} catch (OdpsException e) {
throw new CatalogException("Check " + databaseName + " exist error", e);
@@ -107,9 +122,7 @@ public List listDatabases() throws CatalogException {
@Override
public List listTables(String databaseName)
throws CatalogException, DatabaseNotExistException {
- Odps odps = new Odps(account);
- odps.setEndpoint(readonlyConfig.get(ENDPOINT));
- odps.setDefaultProject(databaseName);
+ Odps odps = getOdps(databaseName);
Tables tables = odps.tables();
List tableNames = new ArrayList<>();
@@ -122,12 +135,9 @@ public List listTables(String databaseName)
@Override
public boolean tableExists(TablePath tablePath) throws CatalogException {
- Odps odps = new Odps(account);
- odps.setEndpoint(readonlyConfig.get(ENDPOINT));
- odps.setDefaultProject(tablePath.getDatabaseName());
-
- Tables tables = odps.tables();
try {
+ Odps odps = getOdps(tablePath.getDatabaseName());
+ com.aliyun.odps.Tables tables = odps.tables();
return tables.exists(tablePath.getTableName());
} catch (OdpsException e) {
throw new CatalogException("tableExists" + tablePath + " error", e);
@@ -137,19 +147,97 @@ public boolean tableExists(TablePath tablePath) throws CatalogException {
@Override
public CatalogTable getTable(TablePath tablePath)
throws CatalogException, TableNotExistException {
- throw new UnsupportedOperationException();
+ if (!tableExists(tablePath)) {
+ throw new TableNotExistException(catalogName, tablePath);
+ }
+ Table odpsTable;
+ com.aliyun.odps.TableSchema odpsSchema;
+ boolean isPartitioned;
+ try {
+ Odps odps = getOdps(tablePath.getDatabaseName());
+ odpsTable =
+ MaxcomputeUtil.parseTable(
+ odps, tablePath.getDatabaseName(), tablePath.getTableName());
+ odpsSchema = odpsTable.getSchema();
+ isPartitioned = odpsTable.isPartitioned();
+ } catch (Exception ex) {
+ throw new CatalogException(catalogName, ex);
+ }
+ List partitionKeys = new ArrayList<>();
+ TableSchema.Builder builder = TableSchema.builder();
+ buildColumnsWithErrorCheck(
+ tablePath,
+ builder,
+ odpsSchema.getColumns().iterator(),
+ (column) -> {
+ BasicTypeDefine typeDefine =
+ BasicTypeDefine.builder()
+ .name(column.getName())
+ .nativeType(column.getTypeInfo())
+ .columnType(column.getTypeInfo().getTypeName())
+ .dataType(column.getTypeInfo().getTypeName())
+ .nullable(column.isNullable())
+ .comment(column.getComment())
+ .build();
+ return MaxComputeTypeConverter.INSTANCE.convert(typeDefine);
+ });
+ if (isPartitioned) {
+ buildColumnsWithErrorCheck(
+ tablePath,
+ builder,
+ odpsSchema.getPartitionColumns().iterator(),
+ (column) -> {
+ BasicTypeDefine typeDefine =
+ BasicTypeDefine.builder()
+ .name(column.getName())
+ .nativeType(column.getTypeInfo())
+ .columnType(column.getTypeInfo().getTypeName())
+ .dataType(column.getTypeInfo().getTypeName())
+ .nullable(column.isNullable())
+ .comment(column.getComment())
+ .build();
+ partitionKeys.add(column.getName());
+ return MaxComputeTypeConverter.INSTANCE.convert(typeDefine);
+ });
+ }
+ TableSchema tableSchema = builder.build();
+ TableIdentifier tableIdentifier = getTableIdentifier(tablePath);
+ return CatalogTable.of(
+ tableIdentifier,
+ tableSchema,
+ readonlyConfig.toMap(),
+ partitionKeys,
+ odpsTable.getComment(),
+ catalogName);
}
@Override
public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
- throw new UnsupportedOperationException();
+ try {
+ Odps odps = getOdps(tablePath.getDatabaseName());
+ SQLTask.run(
+ odps,
+ MaxComputeCatalogUtil.getCreateTableStatement(
+ readonlyConfig.get(SAVE_MODE_CREATE_TEMPLATE),
+ tablePath,
+ table))
+ .waitForSuccess();
+ } catch (OdpsException e) {
+ throw new CatalogException("create table error", e);
+ }
}
@Override
public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
- throw new UnsupportedOperationException();
+ try {
+ Odps odps = getOdps(tablePath.getDatabaseName());
+ SQLTask.run(odps, MaxComputeCatalogUtil.getDropTableQuery(tablePath, ignoreIfNotExists))
+ .waitForSuccess();
+ } catch (OdpsException e) {
+ throw new CatalogException("drop table error", e);
+ }
}
@Override
@@ -163,4 +251,70 @@ public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
throws DatabaseNotExistException, CatalogException {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ try {
+ Odps odps = getOdps(tablePath.getDatabaseName());
+ Table odpsTable = odps.tables().get(tablePath.getTableName());
+ if (odpsTable.isPartitioned()
+ && StringUtils.isNotEmpty(readonlyConfig.get(PARTITION_SPEC))) {
+ PartitionSpec partitionSpec = new PartitionSpec(readonlyConfig.get(PARTITION_SPEC));
+ odpsTable.deletePartition(partitionSpec, ignoreIfNotExists);
+ odpsTable.createPartition(partitionSpec, true);
+ } else {
+ odpsTable.truncate();
+ }
+ } catch (Exception e) {
+ throw new CatalogException("truncate table error", e);
+ }
+ }
+
+ @Override
+ public boolean isExistsData(TablePath tablePath) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void executeSql(TablePath tablePath, String sql) {
+ try {
+ Odps odps = getOdps(tablePath.getDatabaseName());
+ SQLTask.run(odps, sql).waitForSuccess();
+ } catch (OdpsException e) {
+ throw new CatalogException("execute sql error", e);
+ }
+ }
+
+ @Override
+ public PreviewResult previewAction(
+ ActionType actionType, TablePath tablePath, Optional catalogTable) {
+ if (actionType == ActionType.CREATE_TABLE) {
+ checkArgument(catalogTable.isPresent(), "CatalogTable cannot be null");
+ return new SQLPreviewResult(
+ MaxComputeCatalogUtil.getCreateTableStatement(
+ readonlyConfig.get(SAVE_MODE_CREATE_TEMPLATE),
+ tablePath,
+ catalogTable.get()));
+ } else if (actionType == ActionType.DROP_TABLE) {
+ return new SQLPreviewResult(MaxComputeCatalogUtil.getDropTableQuery(tablePath, true));
+ } else {
+ throw new UnsupportedOperationException("Unsupported action type: " + actionType);
+ }
+ }
+
+ private Odps getOdps(String project) {
+ Odps odps = new Odps(account);
+ odps.setEndpoint(readonlyConfig.get(ENDPOINT));
+ odps.setDefaultProject(project);
+ return odps;
+ }
+
+ protected TableIdentifier getTableIdentifier(TablePath tablePath) {
+ return TableIdentifier.of(
+ catalogName,
+ tablePath.getDatabaseName(),
+ tablePath.getSchemaName(),
+ tablePath.getTableName());
+ }
}
diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalogFactory.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalogFactory.java
index dac0d3384fc..dd34731ed93 100644
--- a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalogFactory.java
+++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalogFactory.java
@@ -52,7 +52,7 @@ public String factoryIdentifier() {
public OptionRule optionRule() {
return OptionRule.builder()
.required(ACCESS_ID, ACCESS_KEY, ENDPOINT, PROJECT, TABLE_NAME)
- .optional(PARTITION_SPEC, SPLIT_ROW, SPLIT_ROW, SCHEMA)
+ .optional(PARTITION_SPEC, SPLIT_ROW, SCHEMA)
.build();
}
}
diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalogUtil.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalogUtil.java
new file mode 100644
index 00000000000..8097d95a653
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalogUtil.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.maxcompute.catalog;
+
+import org.apache.seatunnel.api.sink.SaveModePlaceHolder;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
+import org.apache.seatunnel.api.table.converter.TypeConverter;
+import org.apache.seatunnel.connectors.seatunnel.maxcompute.datatype.MaxComputeTypeConverter;
+import org.apache.seatunnel.connectors.seatunnel.maxcompute.util.CreateTableParser;
+
+import org.apache.commons.lang3.StringUtils;
+
+import com.aliyun.odps.type.TypeInfo;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
+
+@Slf4j
+public class MaxComputeCatalogUtil {
+
+ public static String getDropTableQuery(TablePath tablePath, boolean ignoreIfNotExists) {
+ return "DROP TABLE "
+ + (ignoreIfNotExists ? "IF EXISTS " : "")
+ + tablePath.getFullName()
+ + ";";
+ }
+
+ /**
+ * @param createTableTemplate create table template
+ * @param catalogTable catalog table
+ * @return create table stmt
+ */
+ public static String getCreateTableStatement(
+ String createTableTemplate, TablePath tablePath, CatalogTable catalogTable) {
+
+ String template = createTableTemplate;
+ if (!createTableTemplate.trim().endsWith(";")) {
+ template += ";";
+ }
+ TableSchema tableSchema = catalogTable.getTableSchema();
+
+ String primaryKey = "";
+ if (tableSchema.getPrimaryKey() != null) {
+ List fields = Arrays.asList(catalogTable.getTableSchema().getFieldNames());
+ List keys = tableSchema.getPrimaryKey().getColumnNames();
+ keys.sort(Comparator.comparingInt(fields::indexOf));
+ primaryKey = keys.stream().map(r -> "`" + r + "`").collect(Collectors.joining(","));
+ }
+ String uniqueKey = "";
+ if (!tableSchema.getConstraintKeys().isEmpty()) {
+ uniqueKey =
+ tableSchema.getConstraintKeys().stream()
+ .flatMap(c -> c.getColumnNames().stream())
+ .map(r -> "`" + r.getColumnName() + "`")
+ .collect(Collectors.joining(","));
+ }
+
+ template =
+ template.replaceAll(
+ SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getReplacePlaceHolder(),
+ primaryKey);
+ template =
+ template.replaceAll(
+ SaveModePlaceHolder.ROWTYPE_UNIQUE_KEY.getReplacePlaceHolder(), uniqueKey);
+
+ Map columnInTemplate =
+ CreateTableParser.getColumnList(template);
+ template = mergeColumnInTemplate(columnInTemplate, tableSchema, template);
+
+ String rowTypeFields =
+ tableSchema.getColumns().stream()
+ .filter(column -> !columnInTemplate.containsKey(column.getName()))
+ .map(
+ x ->
+ MaxComputeCatalogUtil.columnToMaxComputeType(
+ x, MaxComputeTypeConverter.INSTANCE))
+ .collect(Collectors.joining(",\n"));
+
+ return template.replaceAll(
+ SaveModePlaceHolder.DATABASE.getReplacePlaceHolder(),
+ tablePath.getDatabaseName())
+ .replaceAll(
+ SaveModePlaceHolder.TABLE.getReplacePlaceHolder(), tablePath.getTableName())
+ .replaceAll(
+ SaveModePlaceHolder.ROWTYPE_FIELDS.getReplacePlaceHolder(), rowTypeFields);
+ }
+
+ private static String mergeColumnInTemplate(
+ Map columnInTemplate,
+ TableSchema tableSchema,
+ String template) {
+ int offset = 0;
+ Map columnMap =
+ tableSchema.getColumns().stream()
+ .collect(Collectors.toMap(Column::getName, Function.identity()));
+ List columnInfosInSeq =
+ columnInTemplate.values().stream()
+ .sorted(
+ Comparator.comparingInt(
+ CreateTableParser.ColumnInfo::getStartIndex))
+ .collect(Collectors.toList());
+ for (CreateTableParser.ColumnInfo columnInfo : columnInfosInSeq) {
+ String col = columnInfo.getName();
+ if (StringUtils.isEmpty(columnInfo.getInfo())) {
+ if (columnMap.containsKey(col)) {
+ Column column = columnMap.get(col);
+ String newCol =
+ columnToMaxComputeType(column, MaxComputeTypeConverter.INSTANCE);
+ String prefix = template.substring(0, columnInfo.getStartIndex() + offset);
+ String suffix = template.substring(offset + columnInfo.getEndIndex());
+ if (prefix.endsWith("`")) {
+ prefix = prefix.substring(0, prefix.length() - 1);
+ offset--;
+ }
+ if (suffix.startsWith("`")) {
+ suffix = suffix.substring(1);
+ offset--;
+ }
+ template = prefix + newCol + suffix;
+ offset += newCol.length() - columnInfo.getName().length();
+ } else {
+ throw new IllegalArgumentException("Can't find column " + col + " in table.");
+ }
+ }
+ }
+ return template;
+ }
+
+ public static String columnToMaxComputeType(
+ Column column, TypeConverter> typeConverter) {
+ checkNotNull(column, "The column is required.");
+ return String.format(
+ "`%s` %s %s %s",
+ column.getName(),
+ typeConverter.reconvert(column).getColumnType(),
+ column.isNullable() ? "NULL" : "NOT NULL",
+ StringUtils.isEmpty(column.getComment())
+ ? ""
+ : "COMMENT '" + column.getComment() + "'");
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertor.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertor.java
index 110222ae324..17886b87c35 100644
--- a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertor.java
+++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertor.java
@@ -17,34 +17,36 @@
package org.apache.seatunnel.connectors.seatunnel.maxcompute.catalog;
+import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.DataTypeConvertor;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.MapType;
-import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig;
+import org.apache.seatunnel.connectors.seatunnel.maxcompute.datatype.MaxComputeTypeConverter;
+
+import org.apache.commons.collections4.MapUtils;
-import com.aliyun.odps.OdpsType;
-import com.aliyun.odps.type.ArrayTypeInfo;
-import com.aliyun.odps.type.DecimalTypeInfo;
-import com.aliyun.odps.type.MapTypeInfo;
-import com.aliyun.odps.type.StructTypeInfo;
import com.aliyun.odps.type.TypeInfo;
-import com.aliyun.odps.type.TypeInfoFactory;
import com.google.auto.service.AutoService;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
+import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
+
@AutoService(DataTypeConvertor.class)
public class MaxComputeDataTypeConvertor implements DataTypeConvertor {
+ public static final String PRECISION = "precision";
+ public static final String SCALE = "scale";
+
@Override
public SeaTunnelDataType> toSeaTunnelType(String field, String connectorDataType) {
if (connectorDataType.startsWith("MAP")) {
@@ -146,82 +148,17 @@ public SeaTunnelDataType> toSeaTunnelType(String field, String connectorDataTy
@Override
public SeaTunnelDataType> toSeaTunnelType(
String field, TypeInfo connectorDataType, Map dataTypeProperties) {
- switch (connectorDataType.getOdpsType()) {
- case MAP:
- MapTypeInfo mapTypeInfo = (MapTypeInfo) connectorDataType;
- return new MapType(
- toSeaTunnelType(field, mapTypeInfo.getKeyTypeInfo(), dataTypeProperties),
- toSeaTunnelType(field, mapTypeInfo.getValueTypeInfo(), dataTypeProperties));
- case ARRAY:
- ArrayTypeInfo arrayTypeInfo = (ArrayTypeInfo) connectorDataType;
- switch (arrayTypeInfo.getElementTypeInfo().getOdpsType()) {
- case BOOLEAN:
- return ArrayType.BOOLEAN_ARRAY_TYPE;
- case INT:
- return ArrayType.INT_ARRAY_TYPE;
- case BIGINT:
- return ArrayType.LONG_ARRAY_TYPE;
- case FLOAT:
- return ArrayType.FLOAT_ARRAY_TYPE;
- case DOUBLE:
- return ArrayType.DOUBLE_ARRAY_TYPE;
- case STRING:
- return ArrayType.STRING_ARRAY_TYPE;
- default:
- throw CommonError.convertToSeaTunnelTypeError(
- MaxcomputeConfig.PLUGIN_NAME,
- connectorDataType.getTypeName(),
- field);
- }
- case STRUCT:
- StructTypeInfo structTypeInfo = (StructTypeInfo) connectorDataType;
- List fields = structTypeInfo.getFieldTypeInfos();
- List fieldNames = new ArrayList<>(fields.size());
- List> fieldTypes = new ArrayList<>(fields.size());
- for (TypeInfo f : fields) {
- fieldNames.add(f.getTypeName());
- fieldTypes.add(toSeaTunnelType(f.getTypeName(), f, dataTypeProperties));
- }
- return new SeaTunnelRowType(
- fieldNames.toArray(new String[0]),
- fieldTypes.toArray(new SeaTunnelDataType[0]));
- case TINYINT:
- return BasicType.BYTE_TYPE;
- case SMALLINT:
- return BasicType.SHORT_TYPE;
- case INT:
- return BasicType.INT_TYPE;
- case BIGINT:
- return BasicType.LONG_TYPE;
- case BINARY:
- return PrimitiveByteArrayType.INSTANCE;
- case FLOAT:
- return BasicType.FLOAT_TYPE;
- case DOUBLE:
- return BasicType.DOUBLE_TYPE;
- case DECIMAL:
- DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) connectorDataType;
- return new DecimalType(decimalTypeInfo.getPrecision(), decimalTypeInfo.getScale());
- case VARCHAR:
- case CHAR:
- case STRING:
- return BasicType.STRING_TYPE;
- case DATE:
- return LocalTimeType.LOCAL_DATE_TYPE;
- case DATETIME:
- return LocalTimeType.LOCAL_TIME_TYPE;
- case TIMESTAMP:
- return LocalTimeType.LOCAL_DATE_TIME_TYPE;
- case BOOLEAN:
- return BasicType.BOOLEAN_TYPE;
- case VOID:
- return BasicType.VOID_TYPE;
- case INTERVAL_DAY_TIME:
- case INTERVAL_YEAR_MONTH:
- default:
- throw CommonError.convertToSeaTunnelTypeError(
- MaxcomputeConfig.PLUGIN_NAME, connectorDataType.getTypeName(), field);
- }
+ checkNotNull(connectorDataType, "seaTunnelDataType cannot be null");
+
+ BasicTypeDefine typeDefine =
+ BasicTypeDefine.builder()
+ .name(field)
+ .columnType(connectorDataType.getTypeName())
+ .dataType(connectorDataType.getOdpsType().name())
+ .nativeType(connectorDataType)
+ .build();
+
+ return MaxComputeTypeConverter.INSTANCE.convert(typeDefine).getDataType();
}
@Override
@@ -229,62 +166,19 @@ public TypeInfo toConnectorType(
String field,
SeaTunnelDataType> seaTunnelDataType,
Map dataTypeProperties) {
- switch (seaTunnelDataType.getSqlType()) {
- case MAP:
- MapType mapType = (MapType) seaTunnelDataType;
- return TypeInfoFactory.getMapTypeInfo(
- toConnectorType(field, mapType.getKeyType(), dataTypeProperties),
- toConnectorType(field, mapType.getValueType(), dataTypeProperties));
- case ARRAY:
- ArrayType arrayType = (ArrayType) seaTunnelDataType;
- return TypeInfoFactory.getArrayTypeInfo(
- toConnectorType(field, arrayType.getElementType(), dataTypeProperties));
- case ROW:
- SeaTunnelRowType rowType = (SeaTunnelRowType) seaTunnelDataType;
- List fieldNames = new ArrayList<>(rowType.getTotalFields());
- List fieldTypes = new ArrayList<>(rowType.getTotalFields());
- for (int i = 0; i < rowType.getTotalFields(); i++) {
- fieldNames.add(rowType.getFieldName(i));
- fieldTypes.add(
- toConnectorType(field, rowType.getFieldType(i), dataTypeProperties));
- }
- return TypeInfoFactory.getStructTypeInfo(fieldNames, fieldTypes);
- case TINYINT:
- return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.TINYINT);
- case SMALLINT:
- return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.SMALLINT);
- case INT:
- return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.INT);
- case BIGINT:
- return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.BIGINT);
- case BYTES:
- return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.BINARY);
- case FLOAT:
- return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.FLOAT);
- case DOUBLE:
- return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.DOUBLE);
- case DECIMAL:
- DecimalType decimalType = (DecimalType) seaTunnelDataType;
- return TypeInfoFactory.getDecimalTypeInfo(
- decimalType.getPrecision(), decimalType.getScale());
- case STRING:
- return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.STRING);
- case DATE:
- return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.DATE);
- case TIMESTAMP:
- return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.TIMESTAMP);
- case TIME:
- return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.DATETIME);
- case BOOLEAN:
- return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.BOOLEAN);
- case NULL:
- return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.VOID);
- default:
- throw CommonError.convertToConnectorTypeError(
- MaxcomputeConfig.PLUGIN_NAME,
- seaTunnelDataType.getSqlType().toString(),
- field);
- }
+ checkNotNull(seaTunnelDataType, "seaTunnelDataType cannot be null");
+ Long precision = MapUtils.getLong(dataTypeProperties, PRECISION);
+ Integer scale = MapUtils.getInteger(dataTypeProperties, SCALE);
+ Column column =
+ PhysicalColumn.builder()
+ .name(field)
+ .dataType(seaTunnelDataType)
+ .columnLength(precision)
+ .scale(scale)
+ .nullable(true)
+ .build();
+ BasicTypeDefine typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column);
+ return typeDefine.getNativeType();
}
@Override
diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeConfig.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeConfig.java
index 84bbccddb79..d3465bd3dba 100644
--- a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeConfig.java
+++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeConfig.java
@@ -19,6 +19,9 @@
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.SaveModePlaceHolder;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
import java.io.Serializable;
@@ -69,4 +72,33 @@ public class MaxcomputeConfig implements Serializable {
.booleanType()
.defaultValue(false)
.withDescription("Whether to overwrite the table or partition");
+
+ public static final Option SCHEMA_SAVE_MODE =
+ Options.key("schema_save_mode")
+ .enumType(SchemaSaveMode.class)
+ .defaultValue(SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST)
+ .withDescription("schema_save_mode");
+
+ public static final Option DATA_SAVE_MODE =
+ Options.key("data_save_mode")
+ .enumType(DataSaveMode.class)
+ .defaultValue(DataSaveMode.APPEND_DATA)
+ .withDescription("data_save_mode");
+
+ public static final Option CUSTOM_SQL =
+ Options.key("custom_sql").stringType().noDefaultValue().withDescription("custom_sql");
+
+ // create table
+ public static final Option SAVE_MODE_CREATE_TEMPLATE =
+ Options.key("save_mode_create_template")
+ .stringType()
+ .defaultValue(
+ "CREATE TABLE IF NOT EXISTS `"
+ + SaveModePlaceHolder.TABLE.getPlaceHolder()
+ + "` (\n"
+ + SaveModePlaceHolder.ROWTYPE_FIELDS.getPlaceHolder()
+ + "\n"
+ + ");")
+ .withDescription(
+ "Create table statement template, used to create MaxCompute table");
}
diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/datatype/MaxComputeTypeConverter.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/datatype/MaxComputeTypeConverter.java
new file mode 100644
index 00000000000..0c81dacc2a3
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/datatype/MaxComputeTypeConverter.java
@@ -0,0 +1,542 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.maxcompute.datatype;
+
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
+import org.apache.seatunnel.api.table.converter.TypeConverter;
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.connectors.seatunnel.common.source.TypeDefineUtils;
+
+import com.aliyun.odps.OdpsType;
+import com.aliyun.odps.type.AbstractCharTypeInfo;
+import com.aliyun.odps.type.ArrayTypeInfo;
+import com.aliyun.odps.type.DecimalTypeInfo;
+import com.aliyun.odps.type.MapTypeInfo;
+import com.aliyun.odps.type.StructTypeInfo;
+import com.aliyun.odps.type.TypeInfo;
+import com.aliyun.odps.type.TypeInfoFactory;
+import com.google.auto.service.AutoService;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PLUGIN_NAME;
+
+/** Refer https://help.aliyun.com/zh/maxcompute/user-guide/maxcompute-v2-0-data-type-edition */
+@Slf4j
+@AutoService(TypeConverter.class)
+public class MaxComputeTypeConverter implements TypeConverter> {
+
+ // ============================data types=====================
+ static final String BOOLEAN = "BOOLEAN";
+
+ // -------------------------number----------------------------
+ static final String TINYINT = "TINYINT";
+ static final String SMALLINT = "SMALLINT";
+ static final String INT = "INT";
+ static final String BIGINT = "BIGINT";
+ static final String DECIMAL = "DECIMAL";
+ static final String FLOAT = "FLOAT";
+ static final String DOUBLE = "DOUBLE";
+
+ // -------------------------string----------------------------
+ public static final String CHAR = "CHAR";
+ public static final String VARCHAR = "VARCHAR";
+ public static final String STRING = "STRING";
+
+ // -------------------------complex----------------------------
+ public static final String JSON = "JSON";
+ public static final String ARRAY = "ARRAY";
+ public static final String MAP = "MAP";
+ public static final String STRUCT = "STRUCT";
+
+ // ------------------------------time-------------------------
+ public static final String DATE = "DATE";
+ public static final String DATETIME = "DATETIME";
+ public static final String TIMESTAMP = "TIMESTAMP";
+ public static final String TIMESTAMP_NTZ = "TIMESTAMP_NTZ";
+
+ // ------------------------------blob-------------------------
+ static final String BINARY = "BINARY";
+
+ // ------------------------------other-------------------------
+ static final String INTERVAL = "INTERVAL";
+
+ public static final int DEFAULT_PRECISION = 38;
+ public static final int MAX_PRECISION = 38;
+ public static final int DEFAULT_SCALE = 18;
+ public static final int MAX_SCALE = 18;
+ public static final int MAX_TIMESTAMP_SCALE = 9;
+
+ // 8MB
+ public static final long MAX_VARBINARY_LENGTH = (long) Math.pow(2, 23);
+
+ public static final MaxComputeTypeConverter INSTANCE = new MaxComputeTypeConverter();
+
+ public MaxComputeTypeConverter() {}
+
+ @Override
+ public String identifier() {
+ return PLUGIN_NAME;
+ }
+
+ @Override
+ public Column convert(BasicTypeDefine typeDefine) {
+ PhysicalColumn.PhysicalColumnBuilder builder =
+ PhysicalColumn.builder()
+ .name(typeDefine.getName())
+ .sourceType(typeDefine.getColumnType())
+ .nullable(typeDefine.isNullable())
+ .defaultValue(typeDefine.getDefaultValue())
+ .comment(typeDefine.getComment());
+ TypeInfo nativeType = typeDefine.getNativeType();
+ if (nativeType instanceof ArrayTypeInfo) {
+ typeDefine.setColumnType(
+ ((ArrayTypeInfo) nativeType).getElementTypeInfo().getTypeName());
+ typeDefine.setDataType(
+ ((ArrayTypeInfo) nativeType).getElementTypeInfo().getOdpsType().name());
+ typeDefine.setNativeType(((ArrayTypeInfo) nativeType).getElementTypeInfo());
+ Column arrayColumn = convert(typeDefine);
+ SeaTunnelDataType> newType;
+ switch (arrayColumn.getDataType().getSqlType()) {
+ case STRING:
+ newType = ArrayType.STRING_ARRAY_TYPE;
+ break;
+ case BOOLEAN:
+ newType = ArrayType.BOOLEAN_ARRAY_TYPE;
+ break;
+ case TINYINT:
+ newType = ArrayType.BYTE_ARRAY_TYPE;
+ break;
+ case SMALLINT:
+ newType = ArrayType.SHORT_ARRAY_TYPE;
+ break;
+ case INT:
+ newType = ArrayType.INT_ARRAY_TYPE;
+ break;
+ case BIGINT:
+ newType = ArrayType.LONG_ARRAY_TYPE;
+ break;
+ case FLOAT:
+ newType = ArrayType.FLOAT_ARRAY_TYPE;
+ break;
+ case DOUBLE:
+ newType = ArrayType.DOUBLE_ARRAY_TYPE;
+ break;
+ case DATE:
+ newType = ArrayType.LOCAL_DATE_ARRAY_TYPE;
+ break;
+ case TIME:
+ newType = ArrayType.LOCAL_TIME_ARRAY_TYPE;
+ break;
+ case TIMESTAMP:
+ newType = ArrayType.LOCAL_DATE_TIME_ARRAY_TYPE;
+ break;
+ default:
+ throw CommonError.unsupportedDataType(
+ PLUGIN_NAME,
+ arrayColumn.getDataType().getSqlType().toString(),
+ typeDefine.getName());
+ }
+ return new PhysicalColumn(
+ arrayColumn.getName(),
+ newType,
+ arrayColumn.getColumnLength(),
+ arrayColumn.getScale(),
+ arrayColumn.isNullable(),
+ arrayColumn.getDefaultValue(),
+ arrayColumn.getComment(),
+ "ARRAY<" + arrayColumn.getSourceType() + ">",
+ arrayColumn.getOptions());
+ }
+ if (nativeType instanceof StructTypeInfo) {
+ List names = ((StructTypeInfo) nativeType).getFieldNames();
+ List> types = new ArrayList<>();
+ for (TypeInfo typeInfo : ((StructTypeInfo) nativeType).getFieldTypeInfos()) {
+ BasicTypeDefine fieldDefine = new BasicTypeDefine<>();
+ fieldDefine.setName(names.get(types.size()));
+ fieldDefine.setColumnType(typeInfo.getTypeName());
+ fieldDefine.setDataType(typeInfo.getOdpsType().name());
+ fieldDefine.setNativeType(typeInfo);
+ types.add(convert(fieldDefine).getDataType());
+ }
+ SeaTunnelRowType rowType =
+ new SeaTunnelRowType(
+ names.toArray(new String[0]), types.toArray(new SeaTunnelDataType[0]));
+ return new PhysicalColumn(
+ typeDefine.getName(),
+ rowType,
+ typeDefine.getLength(),
+ typeDefine.getScale(),
+ typeDefine.isNullable(),
+ typeDefine.getDefaultValue(),
+ typeDefine.getComment(),
+ typeDefine.getNativeType().getTypeName(),
+ new HashMap<>());
+ }
+
+ if (nativeType instanceof MapTypeInfo) {
+ BasicTypeDefine keyDefine = new BasicTypeDefine<>();
+ keyDefine.setName("key");
+ keyDefine.setColumnType(((MapTypeInfo) nativeType).getKeyTypeInfo().getTypeName());
+ keyDefine.setDataType(((MapTypeInfo) nativeType).getKeyTypeInfo().getOdpsType().name());
+ keyDefine.setNativeType(((MapTypeInfo) nativeType).getKeyTypeInfo());
+ Column keyColumn = convert(keyDefine);
+ BasicTypeDefine valueDefine = new BasicTypeDefine<>();
+ valueDefine.setName("value");
+ valueDefine.setColumnType(((MapTypeInfo) nativeType).getValueTypeInfo().getTypeName());
+ valueDefine.setDataType(
+ ((MapTypeInfo) nativeType).getValueTypeInfo().getOdpsType().name());
+ valueDefine.setNativeType(((MapTypeInfo) nativeType).getValueTypeInfo());
+ Column valueColumn = convert(valueDefine);
+ MapType mapType = new MapType(keyColumn.getDataType(), valueColumn.getDataType());
+ return new PhysicalColumn(
+ typeDefine.getName(),
+ mapType,
+ typeDefine.getLength(),
+ typeDefine.getScale(),
+ typeDefine.isNullable(),
+ typeDefine.getDefaultValue(),
+ typeDefine.getComment(),
+ typeDefine.getNativeType().getTypeName(),
+ new HashMap<>());
+ }
+
+ if (typeDefine.getNativeType() instanceof DecimalTypeInfo) {
+ DecimalType decimalType;
+ if (((DecimalTypeInfo) typeDefine.getNativeType()).getPrecision() > DEFAULT_PRECISION) {
+ log.warn("{} will probably cause value overflow.", DECIMAL);
+ decimalType = new DecimalType(DEFAULT_PRECISION, DEFAULT_SCALE);
+ } else {
+ decimalType =
+ new DecimalType(
+ ((DecimalTypeInfo) typeDefine.getNativeType()).getPrecision(),
+ ((DecimalTypeInfo) typeDefine.getNativeType()).getScale());
+ }
+ builder.dataType(decimalType);
+ builder.columnLength((long) decimalType.getPrecision());
+ builder.scale(decimalType.getScale());
+ } else if (typeDefine.getNativeType() instanceof AbstractCharTypeInfo) {
+ // CHAR(n) or VARCHAR(n)
+ builder.columnLength(
+ TypeDefineUtils.charTo4ByteLength(
+ (long)
+ ((AbstractCharTypeInfo) typeDefine.getNativeType())
+ .getLength()));
+ builder.dataType(BasicType.STRING_TYPE);
+ } else {
+ String dataType = typeDefine.getDataType().toUpperCase();
+ switch (dataType) {
+ case BOOLEAN:
+ builder.dataType(BasicType.BOOLEAN_TYPE);
+ break;
+ case TINYINT:
+ builder.dataType(BasicType.BYTE_TYPE);
+ break;
+ case SMALLINT:
+ builder.dataType(BasicType.SHORT_TYPE);
+ break;
+ case INT:
+ builder.dataType(BasicType.INT_TYPE);
+ break;
+ case BIGINT:
+ builder.dataType(BasicType.LONG_TYPE);
+ break;
+ case FLOAT:
+ builder.dataType(BasicType.FLOAT_TYPE);
+ break;
+ case DOUBLE:
+ builder.dataType(BasicType.DOUBLE_TYPE);
+ break;
+ case STRING:
+ if (typeDefine.getLength() == null || typeDefine.getLength() <= 0) {
+ builder.columnLength(MAX_VARBINARY_LENGTH);
+ } else {
+ builder.columnLength(typeDefine.getLength());
+ }
+ builder.dataType(BasicType.STRING_TYPE);
+ break;
+ case JSON:
+ builder.dataType(BasicType.STRING_TYPE);
+ break;
+ case BINARY:
+ if (typeDefine.getLength() == null || typeDefine.getLength() <= 0) {
+ builder.columnLength(MAX_VARBINARY_LENGTH);
+ } else {
+ builder.columnLength(typeDefine.getLength());
+ }
+ builder.dataType(PrimitiveByteArrayType.INSTANCE);
+ break;
+ case DATE:
+ builder.dataType(LocalTimeType.LOCAL_DATE_TYPE);
+ break;
+ case DATETIME:
+ case TIMESTAMP:
+ case TIMESTAMP_NTZ:
+ builder.dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE);
+ builder.scale(typeDefine.getScale());
+ break;
+ case INTERVAL:
+ default:
+ throw CommonError.convertToSeaTunnelTypeError(
+ PLUGIN_NAME, dataType, typeDefine.getName());
+ }
+ }
+ return builder.build();
+ }
+
+ @Override
+ public BasicTypeDefine reconvert(Column column) {
+ BasicTypeDefine.BasicTypeDefineBuilder builder =
+ BasicTypeDefine.builder()
+ .name(column.getName())
+ .nullable(column.isNullable())
+ .comment(column.getComment())
+ .defaultValue(column.getDefaultValue());
+
+ switch (column.getDataType().getSqlType()) {
+ case NULL:
+ builder.nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.STRING));
+ builder.columnType(STRING);
+ builder.dataType(STRING);
+ break;
+ case BOOLEAN:
+ builder.nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.BOOLEAN));
+ builder.columnType(BOOLEAN);
+ builder.dataType(BOOLEAN);
+ builder.length(1L);
+ break;
+ case TINYINT:
+ builder.nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.TINYINT));
+ builder.columnType(TINYINT);
+ builder.dataType(TINYINT);
+ break;
+ case SMALLINT:
+ builder.nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.SMALLINT));
+ builder.columnType(SMALLINT);
+ builder.dataType(SMALLINT);
+ break;
+ case INT:
+ builder.nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.INT));
+ builder.columnType(INT);
+ builder.dataType(INT);
+ break;
+ case BIGINT:
+ builder.nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.BIGINT));
+ builder.columnType(BIGINT);
+ builder.dataType(BIGINT);
+ break;
+ case FLOAT:
+ builder.nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.FLOAT));
+ builder.columnType(FLOAT);
+ builder.dataType(FLOAT);
+ break;
+ case DOUBLE:
+ builder.nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.DOUBLE));
+ builder.columnType(DOUBLE);
+ builder.dataType(DOUBLE);
+ break;
+ case DECIMAL:
+ DecimalType decimalType = (DecimalType) column.getDataType();
+ long precision = decimalType.getPrecision();
+ int scale = decimalType.getScale();
+ if (precision <= 0) {
+ precision = DEFAULT_PRECISION;
+ scale = DEFAULT_SCALE;
+ log.warn(
+ "The decimal column {} type decimal({},{}) is out of range, "
+ + "which is precision less than 0, "
+ + "it will be converted to decimal({},{})",
+ column.getName(),
+ decimalType.getPrecision(),
+ decimalType.getScale(),
+ precision,
+ scale);
+ } else if (precision > MAX_PRECISION) {
+ scale = (int) Math.max(0, scale - (precision - MAX_PRECISION));
+ precision = MAX_PRECISION;
+ log.warn(
+ "The decimal column {} type decimal({},{}) is out of range, "
+ + "which exceeds the maximum precision of {}, "
+ + "it will be converted to decimal({},{})",
+ column.getName(),
+ decimalType.getPrecision(),
+ decimalType.getScale(),
+ MAX_PRECISION,
+ precision,
+ scale);
+ }
+ if (scale < 0) {
+ scale = 0;
+ log.warn(
+ "The decimal column {} type decimal({},{}) is out of range, "
+ + "which is scale less than 0, "
+ + "it will be converted to decimal({},{})",
+ column.getName(),
+ decimalType.getPrecision(),
+ decimalType.getScale(),
+ precision,
+ scale);
+ } else if (scale > MAX_SCALE) {
+ scale = MAX_SCALE;
+ log.warn(
+ "The decimal column {} type decimal({},{}) is out of range, "
+ + "which exceeds the maximum scale of {}, "
+ + "it will be converted to decimal({},{})",
+ column.getName(),
+ decimalType.getPrecision(),
+ decimalType.getScale(),
+ MAX_SCALE,
+ precision,
+ scale);
+ }
+
+ String decimalTypeStr = String.format("%s(%s,%s)", DECIMAL, precision, scale);
+ builder.nativeType(TypeInfoFactory.getDecimalTypeInfo((int) precision, scale));
+ builder.columnType(decimalTypeStr);
+ builder.dataType(DECIMAL);
+ builder.precision(precision);
+ builder.scale(scale);
+ break;
+ case BYTES:
+ builder.nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.BINARY));
+ builder.columnType(BINARY);
+ builder.dataType(BINARY);
+ if (column.getColumnLength() == null || column.getColumnLength() <= 0) {
+ builder.length(MAX_VARBINARY_LENGTH);
+ } else {
+ builder.length(column.getColumnLength());
+ }
+ break;
+ case STRING:
+ if (column.getColumnLength() == null || column.getColumnLength() <= 0) {
+ builder.nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.STRING));
+ builder.columnType(STRING);
+ builder.dataType(STRING);
+ } else if (column.getColumnLength() <= 255) {
+ builder.nativeType(
+ TypeInfoFactory.getCharTypeInfo(column.getColumnLength().intValue()));
+ builder.columnType(String.format("%s(%s)", CHAR, column.getColumnLength()));
+ builder.dataType(CHAR);
+ builder.length(column.getColumnLength());
+ } else if (column.getColumnLength() <= 65535) {
+ builder.nativeType(
+ TypeInfoFactory.getVarcharTypeInfo(
+ column.getColumnLength().intValue()));
+ builder.columnType(String.format("%s(%s)", VARCHAR, column.getColumnLength()));
+ builder.dataType(VARCHAR);
+ builder.length(column.getColumnLength());
+ } else {
+ builder.nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.STRING));
+ builder.columnType(STRING);
+ builder.dataType(STRING);
+ builder.length(column.getColumnLength());
+ }
+ break;
+ case DATE:
+ builder.nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.DATE));
+ builder.columnType(DATE);
+ builder.dataType(DATE);
+ break;
+ case TIMESTAMP:
+ if (column.getScale() == null || column.getScale() <= 3) {
+ builder.nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.DATETIME));
+ builder.dataType(DATETIME);
+ builder.columnType(DATETIME);
+ } else {
+ int timestampScale = column.getScale();
+ if (timestampScale > MAX_TIMESTAMP_SCALE) {
+ timestampScale = MAX_TIMESTAMP_SCALE;
+ log.warn(
+ "The timestamp column {} type timestamp({}) is out of range, "
+ + "which exceeds the maximum scale of {}, "
+ + "it will be converted to timestamp({})",
+ column.getName(),
+ column.getScale(),
+ MAX_TIMESTAMP_SCALE,
+ timestampScale);
+ }
+ builder.nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.TIMESTAMP));
+ builder.dataType(TIMESTAMP);
+ builder.columnType(TIMESTAMP);
+ builder.scale(timestampScale);
+ }
+ break;
+ case MAP:
+ MapType mapType = (MapType) column.getDataType();
+ SeaTunnelDataType> keyType = mapType.getKeyType();
+ SeaTunnelDataType> valueType = mapType.getValueType();
+ BasicTypeDefine keyDefine =
+ reconvert(
+ new PhysicalColumn(
+ "key", keyType, null, null, true, null, null, null, null));
+ BasicTypeDefine valueDefine =
+ reconvert(
+ new PhysicalColumn(
+ "value", valueType, null, null, true, null, null, null,
+ null));
+ builder.nativeType(
+ TypeInfoFactory.getMapTypeInfo(
+ keyDefine.getNativeType(), valueDefine.getNativeType()));
+ builder.columnType(
+ String.format(
+ "MAP<%s,%s>",
+ keyDefine.getColumnType(), valueDefine.getColumnType()));
+ builder.dataType(MAP);
+ break;
+ case ARRAY:
+ ArrayType, ?> arrayType = (ArrayType, ?>) column.getDataType();
+ SeaTunnelDataType> elementType = arrayType.getElementType();
+ BasicTypeDefine elementDefine =
+ reconvert(
+ new PhysicalColumn(
+ "element",
+ elementType,
+ null,
+ null,
+ true,
+ null,
+ null,
+ null,
+ null));
+
+ builder.nativeType(TypeInfoFactory.getArrayTypeInfo(elementDefine.getNativeType()));
+ builder.columnType(String.format("ARRAY<%s>", elementDefine.getColumnType()));
+ builder.dataType(ARRAY);
+ break;
+ case TIME:
+ default:
+ throw CommonError.convertToConnectorTypeError(
+ PLUGIN_NAME, column.getDataType().getSqlType().name(), column.getName());
+ }
+
+ return builder.build();
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
index 91e8c12dca3..ddfe1de08b3 100644
--- a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
+++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
@@ -17,32 +17,42 @@
package org.apache.seatunnel.connectors.seatunnel.maxcompute.sink;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.DefaultSaveModeHandler;
+import org.apache.seatunnel.api.sink.SaveModeHandler;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSink;
+import org.apache.seatunnel.api.sink.SupportSaveMode;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
-import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
-import org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeUtil;
+import org.apache.seatunnel.connectors.seatunnel.maxcompute.catalog.MaxComputeCatalog;
+import org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig;
+import org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.auto.service.AutoService;
-
import java.util.Optional;
+import static org.apache.seatunnel.api.table.factory.FactoryUtil.discoverFactory;
+import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.OVERWRITE;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PLUGIN_NAME;
-@AutoService(SeaTunnelSink.class)
-public class MaxcomputeSink extends AbstractSimpleSink {
+public class MaxcomputeSink extends AbstractSimpleSink
+ implements SupportSaveMode, SupportMultiTableSink {
private static final Logger LOG = LoggerFactory.getLogger(MaxcomputeSink.class);
- private Config pluginConfig;
- private SeaTunnelRowType typeInfo;
+ private final ReadonlyConfig readonlyConfig;
+ private final CatalogTable catalogTable;
+
+ public MaxcomputeSink(ReadonlyConfig readonlyConfig, CatalogTable catalogTable) {
+ this.readonlyConfig = readonlyConfig;
+ this.catalogTable = catalogTable;
+ }
@Override
public String getPluginName() {
@@ -50,19 +60,46 @@ public String getPluginName() {
}
@Override
- public void prepare(Config pluginConfig) throws PrepareFailException {
- this.pluginConfig = pluginConfig;
- MaxcomputeUtil.initTableOrPartition(pluginConfig);
+ public MaxcomputeWriter createWriter(SinkWriter.Context context) {
+ return new MaxcomputeWriter(this.readonlyConfig, this.catalogTable.getSeaTunnelRowType());
}
@Override
- public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
- this.typeInfo = seaTunnelRowType;
- }
+ public Optional getSaveModeHandler() {
+ CatalogFactory catalogFactory =
+ discoverFactory(
+ Thread.currentThread().getContextClassLoader(),
+ CatalogFactory.class,
+ "MaxCompute");
+ if (catalogFactory == null) {
+ throw new MaxcomputeConnectorException(
+ SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format(
+ "PluginName: %s, PluginType: %s, Message: %s",
+ getPluginName(),
+ PluginType.SINK,
+ "Cannot find MaxCompute catalog factory"));
+ }
+ MaxComputeCatalog catalog =
+ (MaxComputeCatalog)
+ catalogFactory.createCatalog(
+ catalogFactory.factoryIdentifier(), readonlyConfig);
- @Override
- public AbstractSinkWriter createWriter(SinkWriter.Context context) {
- return new MaxcomputeWriter(this.pluginConfig, this.typeInfo);
+ DataSaveMode dataSaveMode = readonlyConfig.get(MaxcomputeConfig.DATA_SAVE_MODE);
+ if (readonlyConfig.get(OVERWRITE)) {
+ // compatible with old version
+ LOG.warn(
+ "The configuration of 'overwrite' is deprecated, please use 'data_save_mode' instead.");
+ dataSaveMode = DataSaveMode.DROP_DATA;
+ }
+
+ return Optional.of(
+ new DefaultSaveModeHandler(
+ readonlyConfig.get(MaxcomputeConfig.SCHEMA_SAVE_MODE),
+ dataSaveMode,
+ catalog,
+ catalogTable,
+ readonlyConfig.get(MaxcomputeConfig.CUSTOM_SQL)));
}
@Override
diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSinkFactory.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSinkFactory.java
index faa2f629102..16dededbfe5 100644
--- a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSinkFactory.java
@@ -18,18 +18,27 @@
package org.apache.seatunnel.connectors.seatunnel.maxcompute.sink;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.SinkCommonOptions;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import com.google.auto.service.AutoService;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_ID;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_KEY;
+import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.CUSTOM_SQL;
+import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.DATA_SAVE_MODE;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ENDPOINT;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.OVERWRITE;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PLUGIN_NAME;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PROJECT;
+import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.SAVE_MODE_CREATE_TEMPLATE;
+import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.SCHEMA_SAVE_MODE;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.TABLE_NAME;
@AutoService(Factory.class)
@@ -43,7 +52,27 @@ public String factoryIdentifier() {
public OptionRule optionRule() {
return OptionRule.builder()
.required(ACCESS_ID, ACCESS_KEY, ENDPOINT, PROJECT, TABLE_NAME)
- .optional(PARTITION_SPEC, OVERWRITE)
+ .optional(
+ PARTITION_SPEC,
+ OVERWRITE,
+ SCHEMA_SAVE_MODE,
+ DATA_SAVE_MODE,
+ SAVE_MODE_CREATE_TEMPLATE,
+ CUSTOM_SQL,
+ SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
.build();
}
+
+ @Override
+ public TableSink createSink(TableSinkFactoryContext context) {
+ return () ->
+ new MaxcomputeSink(
+ context.getOptions(),
+ CatalogTable.of(
+ TableIdentifier.of(
+ context.getCatalogTable().getCatalogName(),
+ context.getOptions().get(PROJECT),
+ context.getOptions().get(TABLE_NAME)),
+ context.getCatalogTable()));
+ }
}
diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java
index 51492ae5912..f72a3124b0b 100644
--- a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java
+++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java
@@ -17,8 +17,8 @@
package org.apache.seatunnel.connectors.seatunnel.maxcompute.sink;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
@@ -42,32 +42,31 @@
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.TABLE_NAME;
@Slf4j
-public class MaxcomputeWriter extends AbstractSinkWriter {
+public class MaxcomputeWriter extends AbstractSinkWriter
+ implements SupportMultiTableSinkWriter {
private RecordWriter recordWriter;
private final TableTunnel.UploadSession session;
private final TableSchema tableSchema;
private static final Long BLOCK_0 = 0L;
- private SeaTunnelRowType rowType;
+ private final SeaTunnelRowType rowType;
- public MaxcomputeWriter(Config pluginConfig, SeaTunnelRowType rowType) {
+ public MaxcomputeWriter(ReadonlyConfig readonlyConfig, SeaTunnelRowType rowType) {
try {
this.rowType = rowType;
- Table table = MaxcomputeUtil.getTable(pluginConfig);
+ Table table = MaxcomputeUtil.getTable(readonlyConfig);
this.tableSchema = table.getSchema();
- TableTunnel tunnel = MaxcomputeUtil.getTableTunnel(pluginConfig);
- if (pluginConfig.hasPath(PARTITION_SPEC.key())) {
- PartitionSpec partitionSpec =
- new PartitionSpec(pluginConfig.getString(PARTITION_SPEC.key()));
+ TableTunnel tunnel = MaxcomputeUtil.getTableTunnel(readonlyConfig);
+ if (readonlyConfig.getOptional(PARTITION_SPEC).isPresent()) {
+ PartitionSpec partitionSpec = new PartitionSpec(readonlyConfig.get(PARTITION_SPEC));
session =
tunnel.createUploadSession(
- pluginConfig.getString(PROJECT.key()),
- pluginConfig.getString(TABLE_NAME.key()),
+ readonlyConfig.get(PROJECT),
+ readonlyConfig.get(TABLE_NAME),
partitionSpec);
} else {
session =
tunnel.createUploadSession(
- pluginConfig.getString(PROJECT.key()),
- pluginConfig.getString(TABLE_NAME.key()));
+ readonlyConfig.get(PROJECT), readonlyConfig.get(TABLE_NAME));
}
this.recordWriter = session.openRecordWriter(BLOCK_0);
log.info("open record writer success");
diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceReader.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceReader.java
index 71a5683a42d..0c38da1d730 100644
--- a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceReader.java
+++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceReader.java
@@ -19,6 +19,7 @@
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
@@ -67,7 +68,8 @@ public void pollNext(Collector output) throws Exception {
source -> {
try {
TableTunnel.DownloadSession session =
- MaxcomputeUtil.getDownloadSession(pluginConfig);
+ MaxcomputeUtil.getDownloadSession(
+ ReadonlyConfig.fromConfig(pluginConfig));
TunnelRecordReader recordReader =
session.openRecordReader(source.getSplitId(), source.getRowNum());
log.info("open record reader success");
diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceSplitEnumerator.java
index 3aa2e5f21a1..cde763add21 100644
--- a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceSplitEnumerator.java
+++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceSplitEnumerator.java
@@ -19,6 +19,7 @@
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeUtil;
@@ -99,7 +100,8 @@ public void notifyCheckpointComplete(long checkpointId) {}
public void handleSplitRequest(int subtaskId) {}
private void discoverySplits() throws TunnelException {
- TableTunnel.DownloadSession session = MaxcomputeUtil.getDownloadSession(this.pluginConfig);
+ TableTunnel.DownloadSession session =
+ MaxcomputeUtil.getDownloadSession(ReadonlyConfig.fromConfig(this.pluginConfig));
long recordCount = session.getRecordCount();
int numReaders = enumeratorContext.currentParallelism();
int splitRowNum = (int) Math.ceil((double) recordCount / numReaders);
diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/CreateTableParser.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/CreateTableParser.java
new file mode 100644
index 00000000000..41a6f70f2c8
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/CreateTableParser.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.maxcompute.util;
+
+import lombok.Getter;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class CreateTableParser {
+
+ private static final Pattern COLUMN_PATTERN = Pattern.compile("`?(\\w+)`?\\s*([\\w|\\W]*)");
+
+ public static Map getColumnList(String createTableSql) {
+ Map columns = new HashMap<>();
+ StringBuilder columnBuilder = new StringBuilder();
+ int startIndex = createTableSql.indexOf("(");
+ createTableSql = createTableSql.substring(startIndex + 1);
+
+ boolean insideParentheses = false;
+ for (int i = 0; i < createTableSql.length(); i++) {
+ char c = createTableSql.charAt(i);
+ if (c == '(') {
+ insideParentheses = true;
+ columnBuilder.append(c);
+ } else if ((c == ',' || c == ')') && !insideParentheses) {
+ parseColumn(columnBuilder.toString(), columns, startIndex + i + 1);
+ columnBuilder.setLength(0);
+ if (c == ')') {
+ break;
+ }
+ } else if (c == ')') {
+ insideParentheses = false;
+ columnBuilder.append(c);
+ } else {
+ columnBuilder.append(c);
+ }
+ }
+ return columns;
+ }
+
+ private static void parseColumn(
+ String columnString, Map columnList, int suffixIndex) {
+ Matcher matcher = COLUMN_PATTERN.matcher(columnString.trim());
+ if (matcher.matches()) {
+ String columnName = matcher.group(1);
+ String otherInfo = matcher.group(2).trim();
+ StringBuilder columnBuilder =
+ new StringBuilder(columnName).append(" ").append(otherInfo);
+ if (columnBuilder.toString().toUpperCase().contains("PRIMARY KEY")
+ || columnBuilder.toString().toUpperCase().contains("CREATE TABLE")) {
+ return;
+ }
+ int endIndex =
+ suffixIndex
+ - columnString
+ .substring(
+ columnString.indexOf(columnName) + columnName.length())
+ .length();
+ int startIndex =
+ suffixIndex - columnString.substring(columnString.indexOf(columnName)).length();
+ columnList.put(columnName, new ColumnInfo(columnName, otherInfo, startIndex, endIndex));
+ }
+ }
+
+ @Getter
+ public static final class ColumnInfo {
+
+ public ColumnInfo(String name, String info, int startIndex, int endIndex) {
+ this.name = name;
+ this.info = info;
+ this.startIndex = startIndex;
+ this.endIndex = endIndex;
+ }
+
+ String name;
+ String info;
+ int startIndex;
+ int endIndex;
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java
index 2a3eda909aa..940a2fda120 100644
--- a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java
+++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java
@@ -19,6 +19,7 @@
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -89,7 +90,7 @@ public static Record getMaxcomputeRowData(
}
public static SeaTunnelRowType getSeaTunnelRowType(Config pluginConfig) {
- Table table = MaxcomputeUtil.getTable(pluginConfig);
+ Table table = MaxcomputeUtil.getTable(ReadonlyConfig.fromConfig(pluginConfig));
TableSchema tableSchema = table.getSchema();
ArrayList> seaTunnelDataTypes = new ArrayList<>();
ArrayList fieldNames = new ArrayList<>();
@@ -251,16 +252,17 @@ private static Object resolveObject2Maxcompute(Object field, TypeInfo typeInfo)
case DOUBLE:
case BIGINT:
case BOOLEAN:
+ case DECIMAL:
+ case TIMESTAMP_NTZ:
return field;
case BINARY:
return new Binary((byte[]) field);
- case DECIMAL:
- return null;
case VARCHAR:
return new Varchar((String) field);
case CHAR:
return new Char((String) field);
case STRING:
+ case JSON:
if (field instanceof byte[]) {
return new String((byte[]) field);
}
diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeUtil.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeUtil.java
index 8d7c7de17b6..59804cc3afc 100644
--- a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeUtil.java
+++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeUtil.java
@@ -17,8 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.maxcompute.util;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException;
@@ -33,53 +32,46 @@
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_ID;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_KEY;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ENDPOINT;
-import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.OVERWRITE;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PROJECT;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.TABLE_NAME;
@Slf4j
public class MaxcomputeUtil {
- public static Table getTable(Config pluginConfig) {
- Odps odps = getOdps(pluginConfig);
- Table table = odps.tables().get(pluginConfig.getString(TABLE_NAME.key()));
- return table;
+ public static Table getTable(ReadonlyConfig readonlyConfig) {
+ Odps odps = getOdps(readonlyConfig);
+ return odps.tables().get(readonlyConfig.get(TABLE_NAME));
}
- public static TableTunnel getTableTunnel(Config pluginConfig) {
- Odps odps = getOdps(pluginConfig);
- TableTunnel tunnel = new TableTunnel(odps);
- return tunnel;
+ public static TableTunnel getTableTunnel(ReadonlyConfig readonlyConfig) {
+ Odps odps = getOdps(readonlyConfig);
+ return new TableTunnel(odps);
}
- public static Odps getOdps(Config pluginConfig) {
+ public static Odps getOdps(ReadonlyConfig readonlyConfig) {
Account account =
- new AliyunAccount(
- pluginConfig.getString(ACCESS_ID.key()),
- pluginConfig.getString(ACCESS_KEY.key()));
+ new AliyunAccount(readonlyConfig.get(ACCESS_ID), readonlyConfig.get(ACCESS_KEY));
Odps odps = new Odps(account);
- odps.setEndpoint(pluginConfig.getString(ENDPOINT.key()));
- odps.setDefaultProject(pluginConfig.getString(PROJECT.key()));
+ odps.setEndpoint(readonlyConfig.get(ENDPOINT));
+ odps.setDefaultProject(readonlyConfig.get(PROJECT));
return odps;
}
- public static TableTunnel.DownloadSession getDownloadSession(Config pluginConfig) {
- TableTunnel tunnel = getTableTunnel(pluginConfig);
+ public static TableTunnel.DownloadSession getDownloadSession(ReadonlyConfig readonlyConfig) {
+ TableTunnel tunnel = getTableTunnel(readonlyConfig);
TableTunnel.DownloadSession session;
try {
- if (pluginConfig.hasPath(PARTITION_SPEC.key())) {
- PartitionSpec partitionSpec =
- new PartitionSpec(pluginConfig.getString(PARTITION_SPEC.key()));
+ if (readonlyConfig.getOptional(PARTITION_SPEC).isPresent()) {
+ PartitionSpec partitionSpec = new PartitionSpec(readonlyConfig.get(PARTITION_SPEC));
session =
tunnel.createDownloadSession(
- pluginConfig.getString(PROJECT.key()),
- pluginConfig.getString(TABLE_NAME.key()),
+ readonlyConfig.get(PROJECT),
+ readonlyConfig.get(TABLE_NAME),
partitionSpec);
} else {
session =
tunnel.createDownloadSession(
- pluginConfig.getString(PROJECT.key()),
- pluginConfig.getString(TABLE_NAME.key()));
+ readonlyConfig.get(PROJECT), readonlyConfig.get(TABLE_NAME));
}
} catch (Exception e) {
throw new MaxcomputeConnectorException(
@@ -88,36 +80,18 @@ public static TableTunnel.DownloadSession getDownloadSession(Config pluginConfig
return session;
}
- public static void initTableOrPartition(Config pluginConfig) {
- Boolean overwrite = OVERWRITE.defaultValue();
- if (pluginConfig.hasPath(OVERWRITE.key())) {
- overwrite = pluginConfig.getBoolean(OVERWRITE.key());
- }
+ public static Table parseTable(Odps odps, String projectName, String tableName) {
try {
- Table table = MaxcomputeUtil.getTable(pluginConfig);
- if (pluginConfig.hasPath(PARTITION_SPEC.key())) {
- PartitionSpec partitionSpec =
- new PartitionSpec(pluginConfig.getString(PARTITION_SPEC.key()));
- if (overwrite) {
- try {
- table.deletePartition(partitionSpec, true);
- } catch (NullPointerException e) {
- log.debug("NullPointerException when delete table partition");
- }
- }
- table.createPartition(partitionSpec, true);
- } else {
- if (overwrite) {
- try {
- table.truncate();
- } catch (NullPointerException e) {
- log.debug("NullPointerException when truncate table");
- }
- }
- }
- } catch (Exception e) {
+ Table table = odps.tables().get(projectName, tableName);
+ table.reload();
+ return table;
+ } catch (Exception ex) {
throw new MaxcomputeConnectorException(
- CommonErrorCodeDeprecated.READER_OPERATION_FAILED, e);
+ CommonErrorCodeDeprecated.TABLE_SCHEMA_GET_FAILED,
+ String.format(
+ "get table %s.%s info with exception, error:%s",
+ projectName, tableName, ex.getMessage()),
+ ex);
}
}
}
diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCreateTableTest.java b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCreateTableTest.java
new file mode 100644
index 00000000000..efdcd070e3b
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCreateTableTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.maxcompute.catalog;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+@Slf4j
+public class MaxComputeCreateTableTest {
+
+ @Test
+ public void test() {
+
+ List columns = new ArrayList<>();
+
+ columns.add(PhysicalColumn.of("id", BasicType.LONG_TYPE, (Long) null, true, null, ""));
+ columns.add(PhysicalColumn.of("name", BasicType.STRING_TYPE, (Long) null, true, null, ""));
+ columns.add(
+ PhysicalColumn.of(
+ "age", BasicType.INT_TYPE, (Long) null, true, null, "test comment"));
+ columns.add(PhysicalColumn.of("score", BasicType.INT_TYPE, (Long) null, true, null, ""));
+ columns.add(PhysicalColumn.of("gender", BasicType.BYTE_TYPE, (Long) null, true, null, ""));
+ columns.add(
+ PhysicalColumn.of("create_time", BasicType.LONG_TYPE, (Long) null, true, null, ""));
+
+ String result =
+ MaxComputeCatalogUtil.getCreateTableStatement(
+ "CREATE TABLE IF NOT EXISTS `${database}`.`${table}` ( \n"
+ + "${rowtype_primary_key} , \n"
+ + "${rowtype_unique_key} , \n"
+ + "`create_time` DATETIME NOT NULL , \n"
+ + "${rowtype_fields} \n"
+ + ") ENGINE=OLAP \n"
+ + "PRIMARY KEY(${rowtype_primary_key},`create_time`) \n"
+ + "PARTITION BY RANGE (`create_time`)( \n"
+ + " PARTITION p20230329 VALUES LESS THAN (\"2023-03-29\") \n"
+ + ") \n"
+ + "DISTRIBUTED BY HASH (${rowtype_primary_key}) \n"
+ + "PROPERTIES (\n"
+ + "\"replication_allocation\" = \"tag.location.default: 1\",\n"
+ + "\"in_memory\" = \"false\",\n"
+ + "\"storage_format\" = \"V2\",\n"
+ + "\"disable_auto_compaction\" = \"false\"\n"
+ + ");",
+ TablePath.of("test1.test2"),
+ CatalogTable.of(
+ TableIdentifier.of("test", "test1", "test2"),
+ TableSchema.builder()
+ .primaryKey(PrimaryKey.of("", Arrays.asList("id", "age")))
+ .constraintKey(
+ Arrays.asList(
+ ConstraintKey.of(
+ ConstraintKey.ConstraintType
+ .UNIQUE_KEY,
+ "unique_key",
+ Collections.singletonList(
+ ConstraintKey
+ .ConstraintKeyColumn
+ .of(
+ "name",
+ ConstraintKey
+ .ColumnSortType
+ .DESC))),
+ ConstraintKey.of(
+ ConstraintKey.ConstraintType
+ .UNIQUE_KEY,
+ "unique_key2",
+ Collections.singletonList(
+ ConstraintKey
+ .ConstraintKeyColumn
+ .of(
+ "score",
+ ConstraintKey
+ .ColumnSortType
+ .ASC)))))
+ .columns(columns)
+ .build(),
+ Collections.emptyMap(),
+ Collections.emptyList(),
+ ""));
+ Assertions.assertEquals(
+ result,
+ "CREATE TABLE IF NOT EXISTS `test1`.`test2` ( \n"
+ + "`id` BIGINT NULL ,`age` INT NULL COMMENT 'test comment' , \n"
+ + "`name` STRING NULL ,`score` INT NULL , \n"
+ + "`create_time` DATETIME NOT NULL , \n"
+ + "`gender` TINYINT NULL \n"
+ + ") ENGINE=OLAP \n"
+ + "PRIMARY KEY(`id`,`age`,`create_time`) \n"
+ + "PARTITION BY RANGE (`create_time`)( \n"
+ + " PARTITION p20230329 VALUES LESS THAN (\"2023-03-29\") \n"
+ + ") \n"
+ + "DISTRIBUTED BY HASH (`id`,`age`) \n"
+ + "PROPERTIES (\n"
+ + "\"replication_allocation\" = \"tag.location.default: 1\",\n"
+ + "\"in_memory\" = \"false\",\n"
+ + "\"storage_format\" = \"V2\",\n"
+ + "\"disable_auto_compaction\" = \"false\"\n"
+ + ");");
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/PreviewActionTest.java b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/PreviewActionTest.java
new file mode 100644
index 00000000000..d4ba6799ac8
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/PreviewActionTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.maxcompute.catalog;
+
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PreviewResult;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.SQLPreviewResult;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Optional;
+
+public class PreviewActionTest {
+
+ private static final CatalogTable CATALOG_TABLE =
+ CatalogTable.of(
+ TableIdentifier.of("catalog", "database", "table"),
+ TableSchema.builder()
+ .primaryKey(PrimaryKey.of("", Lists.newArrayList("id")))
+ .columns(
+ Lists.newArrayList(
+ PhysicalColumn.of(
+ "id",
+ BasicType.LONG_TYPE,
+ (Long) null,
+ false,
+ null,
+ ""),
+ PhysicalColumn.of(
+ "test",
+ BasicType.STRING_TYPE,
+ (Long) null,
+ true,
+ null,
+ "")))
+ .build(),
+ Collections.emptyMap(),
+ Collections.emptyList(),
+ "comment");
+
+ @Test
+ public void testDorisPreviewAction() {
+ MaxComputeCatalogFactory factory = new MaxComputeCatalogFactory();
+ Catalog catalog =
+ factory.createCatalog(
+ "test",
+ ReadonlyConfig.fromMap(
+ new HashMap() {
+ {
+ put("username", "root");
+ put("password", "root");
+ }
+ }));
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.DROP_TABLE,
+ "DROP TABLE IF EXISTS testddatabase.testtable;",
+ Optional.empty());
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.CREATE_TABLE,
+ "CREATE TABLE IF NOT EXISTS `testtable` (\n"
+ + "`id` BIGINT NOT NULL ,\n"
+ + "`test` STRING NULL \n"
+ + ");",
+ Optional.of(CATALOG_TABLE));
+ }
+
+ private void assertPreviewResult(
+ Catalog catalog,
+ Catalog.ActionType actionType,
+ String expectedSql,
+ Optional catalogTable) {
+ PreviewResult previewResult =
+ catalog.previewAction(
+ actionType, TablePath.of("testddatabase.testtable"), catalogTable);
+ Assertions.assertInstanceOf(SQLPreviewResult.class, previewResult);
+ Assertions.assertEquals(expectedSql, ((SQLPreviewResult) previewResult).getSql());
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/datatype/MaxComputeTypeConvertorTest.java b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/datatype/MaxComputeTypeConvertorTest.java
new file mode 100644
index 00000000000..244f7cbe0ed
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/datatype/MaxComputeTypeConvertorTest.java
@@ -0,0 +1,1087 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.maxcompute.datatype;
+
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalArrayType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import com.aliyun.odps.OdpsType;
+import com.aliyun.odps.type.TypeInfo;
+import com.aliyun.odps.type.TypeInfoFactory;
+
+import java.util.Locale;
+
+public class MaxComputeTypeConvertorTest {
+
+ @Test
+ public void testConvertUnsupported() {
+ BasicTypeDefine typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .columnType("aaa")
+ .dataType("aaa")
+ .build();
+ try {
+ MaxComputeTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.fail();
+ } catch (SeaTunnelRuntimeException e) {
+ // ignore
+ } catch (Throwable e) {
+ Assertions.fail();
+ }
+ }
+
+ @Test
+ public void testConvertTinyint() {
+ BasicTypeDefine typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.TINYINT))
+ .columnType(
+ TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.TINYINT)
+ .getTypeName())
+ .dataType(OdpsType.TINYINT.name())
+ .length(1L)
+ .build();
+ Column column = MaxComputeTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(BasicType.BYTE_TYPE, column.getDataType());
+ Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType());
+ }
+
+ @Test
+ public void testConvertSmallint() {
+ BasicTypeDefine typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.SMALLINT))
+ .columnType(
+ TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.SMALLINT)
+ .getTypeName())
+ .dataType(OdpsType.SMALLINT.name())
+ .build();
+ Column column = MaxComputeTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(BasicType.SHORT_TYPE, column.getDataType());
+ Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType());
+ }
+
+ @Test
+ public void testConvertInt() {
+ BasicTypeDefine typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.INT))
+ .columnType(
+ TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.INT).getTypeName())
+ .dataType(OdpsType.INT.name())
+ .build();
+ Column column = MaxComputeTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(BasicType.INT_TYPE, column.getDataType());
+ Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType());
+ }
+
+ @Test
+ public void testConvertBoolean() {
+ BasicTypeDefine typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.BOOLEAN))
+ .columnType(
+ TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.BOOLEAN)
+ .getTypeName())
+ .dataType(OdpsType.BOOLEAN.name())
+ .build();
+ Column column = MaxComputeTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(BasicType.BOOLEAN_TYPE, column.getDataType());
+ Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType());
+ }
+
+ @Test
+ public void testConvertBigint() {
+ BasicTypeDefine typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.BIGINT))
+ .columnType(
+ TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.BIGINT).getTypeName())
+ .dataType(OdpsType.BIGINT.name())
+ .build();
+ Column column = MaxComputeTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(BasicType.LONG_TYPE, column.getDataType());
+ Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType());
+ }
+
+ @Test
+ public void testConvertFloat() {
+ BasicTypeDefine typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.FLOAT))
+ .columnType(
+ TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.FLOAT).getTypeName())
+ .dataType(OdpsType.FLOAT.name())
+ .build();
+ Column column = MaxComputeTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(BasicType.FLOAT_TYPE, column.getDataType());
+ Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType());
+ }
+
+ @Test
+ public void testConvertDouble() {
+ BasicTypeDefine typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.DOUBLE))
+ .columnType(
+ TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.DOUBLE).getTypeName())
+ .dataType(OdpsType.DOUBLE.name())
+ .build();
+ Column column = MaxComputeTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(BasicType.DOUBLE_TYPE, column.getDataType());
+ Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType());
+ }
+
+ @Test
+ public void testConvertDecimal() {
+ BasicTypeDefine typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .nativeType(TypeInfoFactory.getDecimalTypeInfo(9, 2))
+ .columnType(TypeInfoFactory.getDecimalTypeInfo(9, 2).getTypeName())
+ .dataType(OdpsType.DECIMAL.name())
+ .build();
+ Column column = MaxComputeTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(new DecimalType(9, 2), column.getDataType());
+ Assertions.assertEquals(9L, column.getColumnLength());
+ Assertions.assertEquals(2, column.getScale());
+ Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType());
+ }
+
+ @Test
+ public void testConvertChar() {
+ BasicTypeDefine typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .nativeType(TypeInfoFactory.getCharTypeInfo(2))
+ .columnType(TypeInfoFactory.getCharTypeInfo(2).getTypeName())
+ .dataType(OdpsType.CHAR.name())
+ .build();
+ Column column = MaxComputeTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType());
+ Assertions.assertEquals(8, column.getColumnLength());
+ Assertions.assertEquals(
+ typeDefine.getColumnType(), column.getSourceType().toUpperCase(Locale.ROOT));
+
+ typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .nativeType(TypeInfoFactory.getVarcharTypeInfo(2))
+ .columnType(TypeInfoFactory.getVarcharTypeInfo(2).getTypeName())
+ .dataType(OdpsType.VARCHAR.name())
+ .build();
+ column = MaxComputeTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType());
+ Assertions.assertEquals(8, column.getColumnLength());
+ Assertions.assertEquals(
+ typeDefine.getColumnType(), column.getSourceType().toUpperCase(Locale.ROOT));
+ }
+
+ @Test
+ public void testConvertString() {
+ BasicTypeDefine typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.STRING))
+ .columnType(
+ TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.STRING).getTypeName())
+ .dataType(OdpsType.STRING.name())
+ .build();
+ Column column = MaxComputeTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType());
+ Assertions.assertEquals(
+ MaxComputeTypeConverter.MAX_VARBINARY_LENGTH, column.getColumnLength());
+ Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType());
+ }
+
+ @Test
+ public void testConvertJson() {
+ BasicTypeDefine typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.JSON))
+ .columnType(
+ TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.JSON).getTypeName())
+ .dataType(OdpsType.JSON.name())
+ .build();
+ Column column = MaxComputeTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType());
+ Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType());
+ }
+
+ @Test
+ public void testConvertDate() {
+ BasicTypeDefine typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.DATE))
+ .columnType(
+ TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.DATE).getTypeName())
+ .dataType(OdpsType.DATE.name())
+ .build();
+ Column column = MaxComputeTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(LocalTimeType.LOCAL_DATE_TYPE, column.getDataType());
+ Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType());
+ }
+
+ @Test
+ public void testConvertDatetime() {
+ BasicTypeDefine typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.DATETIME))
+ .columnType(
+ TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.DATETIME)
+ .getTypeName())
+ .dataType(OdpsType.DATETIME.name())
+ .build();
+ Column column = MaxComputeTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(LocalTimeType.LOCAL_DATE_TIME_TYPE, column.getDataType());
+ Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType());
+
+ typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.TIMESTAMP))
+ .columnType(
+ TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.TIMESTAMP)
+ .getTypeName())
+ .dataType(
+ TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.TIMESTAMP)
+ .getOdpsType()
+ .name())
+ .build();
+ column = MaxComputeTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(LocalTimeType.LOCAL_DATE_TIME_TYPE, column.getDataType());
+ Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType());
+
+ typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .nativeType(TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.TIMESTAMP_NTZ))
+ .columnType(
+ TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.TIMESTAMP_NTZ)
+ .getTypeName())
+ .dataType(
+ TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.TIMESTAMP_NTZ)
+ .getOdpsType()
+ .name())
+ .build();
+ column = MaxComputeTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(LocalTimeType.LOCAL_DATE_TIME_TYPE, column.getDataType());
+ Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType());
+ }
+
+ @Test
+ public void testConvertArray() {
+ BasicTypeDefine typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .nativeType(
+ TypeInfoFactory.getArrayTypeInfo(
+ TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.BOOLEAN)))
+ .columnType(
+ TypeInfoFactory.getArrayTypeInfo(
+ TypeInfoFactory.getPrimitiveTypeInfo(
+ OdpsType.BOOLEAN))
+ .getTypeName())
+ .dataType(
+ TypeInfoFactory.getArrayTypeInfo(
+ TypeInfoFactory.getPrimitiveTypeInfo(
+ OdpsType.BOOLEAN))
+ .getOdpsType()
+ .name())
+ .build();
+ Column column = MaxComputeTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(ArrayType.BOOLEAN_ARRAY_TYPE, column.getDataType());
+ Assertions.assertEquals("ARRAY", column.getSourceType());
+
+ typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .nativeType(
+ TypeInfoFactory.getArrayTypeInfo(
+ TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.TINYINT)))
+ .columnType(
+ TypeInfoFactory.getArrayTypeInfo(
+ TypeInfoFactory.getPrimitiveTypeInfo(
+ OdpsType.TINYINT))
+ .getTypeName())
+ .dataType(
+ TypeInfoFactory.getArrayTypeInfo(
+ TypeInfoFactory.getPrimitiveTypeInfo(
+ OdpsType.TINYINT))
+ .getOdpsType()
+ .name())
+ .build();
+ column = MaxComputeTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(ArrayType.BYTE_ARRAY_TYPE, column.getDataType());
+ Assertions.assertEquals("ARRAY", column.getSourceType());
+
+ typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .nativeType(
+ TypeInfoFactory.getArrayTypeInfo(
+ TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.SMALLINT)))
+ .columnType(
+ TypeInfoFactory.getArrayTypeInfo(
+ TypeInfoFactory.getPrimitiveTypeInfo(
+ OdpsType.SMALLINT))
+ .getTypeName())
+ .dataType(
+ TypeInfoFactory.getArrayTypeInfo(
+ TypeInfoFactory.getPrimitiveTypeInfo(
+ OdpsType.SMALLINT))
+ .getOdpsType()
+ .name())
+ .build();
+ column = MaxComputeTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(ArrayType.SHORT_ARRAY_TYPE, column.getDataType());
+ Assertions.assertEquals("ARRAY", column.getSourceType());
+
+ typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .nativeType(
+ TypeInfoFactory.getArrayTypeInfo(
+ TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.INT)))
+ .columnType(
+ TypeInfoFactory.getArrayTypeInfo(
+ TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.INT))
+ .getTypeName())
+ .dataType(
+ TypeInfoFactory.getArrayTypeInfo(
+ TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.INT))
+ .getOdpsType()
+ .name())
+ .build();
+ column = MaxComputeTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(ArrayType.INT_ARRAY_TYPE, column.getDataType());
+ Assertions.assertEquals("ARRAY", column.getSourceType());
+
+ typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .nativeType(
+ TypeInfoFactory.getArrayTypeInfo(
+ TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.BIGINT)))
+ .columnType(
+ TypeInfoFactory.getArrayTypeInfo(
+ TypeInfoFactory.getPrimitiveTypeInfo(
+ OdpsType.BIGINT))
+ .getTypeName())
+ .dataType(
+ TypeInfoFactory.getArrayTypeInfo(
+ TypeInfoFactory.getPrimitiveTypeInfo(
+ OdpsType.BIGINT))
+ .getOdpsType()
+ .name())
+ .build();
+ column = MaxComputeTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(ArrayType.LONG_ARRAY_TYPE, column.getDataType());
+ Assertions.assertEquals("ARRAY", column.getSourceType());
+
+ typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .nativeType(
+ TypeInfoFactory.getArrayTypeInfo(
+ TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.FLOAT)))
+ .columnType(
+ TypeInfoFactory.getArrayTypeInfo(
+ TypeInfoFactory.getPrimitiveTypeInfo(
+ OdpsType.FLOAT))
+ .getTypeName())
+ .dataType(
+ TypeInfoFactory.getArrayTypeInfo(
+ TypeInfoFactory.getPrimitiveTypeInfo(
+ OdpsType.FLOAT))
+ .getOdpsType()
+ .name())
+ .build();
+ column = MaxComputeTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(ArrayType.FLOAT_ARRAY_TYPE, column.getDataType());
+ Assertions.assertEquals("ARRAY", column.getSourceType());
+
+ typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .nativeType(
+ TypeInfoFactory.getArrayTypeInfo(
+ TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.DOUBLE)))
+ .columnType(
+ TypeInfoFactory.getArrayTypeInfo(
+ TypeInfoFactory.getPrimitiveTypeInfo(
+ OdpsType.DOUBLE))
+ .getTypeName())
+ .dataType(
+ TypeInfoFactory.getArrayTypeInfo(
+ TypeInfoFactory.getPrimitiveTypeInfo(
+ OdpsType.DOUBLE))
+ .getOdpsType()
+ .name())
+ .build();
+ column = MaxComputeTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(ArrayType.DOUBLE_ARRAY_TYPE, column.getDataType());
+ Assertions.assertEquals("ARRAY", column.getSourceType());
+
+ typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .nativeType(
+ TypeInfoFactory.getArrayTypeInfo(
+ TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.DATE)))
+ .columnType(
+ TypeInfoFactory.getArrayTypeInfo(
+ TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.DATE))
+ .getTypeName())
+ .dataType(
+ TypeInfoFactory.getArrayTypeInfo(
+ TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.DATE))
+ .getOdpsType()
+ .name())
+ .build();
+ column = MaxComputeTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(ArrayType.LOCAL_DATE_ARRAY_TYPE, column.getDataType());
+ Assertions.assertEquals("ARRAY", column.getSourceType());
+
+ typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .nativeType(
+ TypeInfoFactory.getArrayTypeInfo(
+ TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.DATETIME)))
+ .columnType(
+ TypeInfoFactory.getArrayTypeInfo(
+ TypeInfoFactory.getPrimitiveTypeInfo(
+ OdpsType.DATETIME))
+ .getTypeName())
+ .dataType(
+ TypeInfoFactory.getArrayTypeInfo(
+ TypeInfoFactory.getPrimitiveTypeInfo(
+ OdpsType.DATETIME))
+ .getOdpsType()
+ .name())
+ .build();
+ column = MaxComputeTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(ArrayType.LOCAL_DATE_TIME_ARRAY_TYPE, column.getDataType());
+ Assertions.assertEquals("ARRAY", column.getSourceType());
+ }
+
+ @Test
+ public void testConvertMap() {
+ TypeInfo typeInfo =
+ TypeInfoFactory.getMapTypeInfo(
+ TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.STRING),
+ TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.BOOLEAN));
+ BasicTypeDefine typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .nativeType(typeInfo)
+ .columnType(typeInfo.getTypeName())
+ .dataType(typeInfo.getOdpsType().name())
+ .build();
+ Column column = MaxComputeTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ MapType mapType = new MapType<>(BasicType.STRING_TYPE, BasicType.BOOLEAN_TYPE);
+ Assertions.assertEquals(mapType, column.getDataType());
+ Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType());
+ }
+
+ @Test
+ public void testReconvertBoolean() {
+ Column column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(BasicType.BOOLEAN_TYPE)
+ .build();
+
+ BasicTypeDefine typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals(MaxComputeTypeConverter.BOOLEAN, typeDefine.getColumnType());
+ Assertions.assertEquals(MaxComputeTypeConverter.BOOLEAN, typeDefine.getDataType());
+ Assertions.assertEquals(1, typeDefine.getLength());
+ }
+
+ @Test
+ public void testReconvertByte() {
+ Column column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(BasicType.BYTE_TYPE)
+ .build();
+
+ BasicTypeDefine typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals(MaxComputeTypeConverter.TINYINT, typeDefine.getColumnType());
+ Assertions.assertEquals(MaxComputeTypeConverter.TINYINT, typeDefine.getDataType());
+ }
+
+ @Test
+ public void testReconvertShort() {
+ Column column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(BasicType.SHORT_TYPE)
+ .build();
+
+ BasicTypeDefine typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals(MaxComputeTypeConverter.SMALLINT, typeDefine.getColumnType());
+ Assertions.assertEquals(MaxComputeTypeConverter.SMALLINT, typeDefine.getDataType());
+ }
+
+ @Test
+ public void testReconvertInt() {
+ Column column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(BasicType.INT_TYPE)
+ .build();
+
+ BasicTypeDefine typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals(MaxComputeTypeConverter.INT, typeDefine.getColumnType());
+ Assertions.assertEquals(MaxComputeTypeConverter.INT, typeDefine.getDataType());
+ }
+
+ @Test
+ public void testReconvertLong() {
+ Column column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(BasicType.LONG_TYPE)
+ .build();
+
+ BasicTypeDefine typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals(MaxComputeTypeConverter.BIGINT, typeDefine.getColumnType());
+ Assertions.assertEquals(MaxComputeTypeConverter.BIGINT, typeDefine.getDataType());
+ }
+
+ @Test
+ public void testReconvertFloat() {
+ Column column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(BasicType.FLOAT_TYPE)
+ .build();
+
+ BasicTypeDefine typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals(MaxComputeTypeConverter.FLOAT, typeDefine.getColumnType());
+ Assertions.assertEquals(MaxComputeTypeConverter.FLOAT, typeDefine.getDataType());
+ }
+
+ @Test
+ public void testReconvertDouble() {
+ Column column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(BasicType.DOUBLE_TYPE)
+ .build();
+
+ BasicTypeDefine typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals(MaxComputeTypeConverter.DOUBLE, typeDefine.getColumnType());
+ Assertions.assertEquals(MaxComputeTypeConverter.DOUBLE, typeDefine.getDataType());
+ }
+
+ @Test
+ public void testReconvertDecimal() {
+ Column column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(new DecimalType(0, 0))
+ .build();
+
+ BasicTypeDefine typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals(
+ String.format(
+ "%s(%s,%s)",
+ MaxComputeTypeConverter.DECIMAL,
+ MaxComputeTypeConverter.MAX_PRECISION,
+ MaxComputeTypeConverter.MAX_SCALE),
+ typeDefine.getColumnType());
+ Assertions.assertEquals(MaxComputeTypeConverter.DECIMAL, typeDefine.getDataType());
+
+ column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(new DecimalType(10, 2))
+ .build();
+
+ typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals(MaxComputeTypeConverter.DECIMAL, typeDefine.getDataType());
+ Assertions.assertEquals(
+ String.format("%s(%s,%s)", MaxComputeTypeConverter.DECIMAL, 10, 2),
+ typeDefine.getColumnType());
+ }
+
+ @Test
+ public void testReconvertBytes() {
+ Column column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(PrimitiveByteArrayType.INSTANCE)
+ .columnLength(null)
+ .build();
+
+ BasicTypeDefine typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals(MaxComputeTypeConverter.BINARY, typeDefine.getColumnType());
+ Assertions.assertEquals(MaxComputeTypeConverter.BINARY, typeDefine.getDataType());
+ }
+
+ @Test
+ public void testReconvertString() {
+ Column column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(BasicType.STRING_TYPE)
+ .columnLength(null)
+ .sourceType(MaxComputeTypeConverter.JSON)
+ .build();
+
+ BasicTypeDefine typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals(MaxComputeTypeConverter.STRING, typeDefine.getColumnType());
+ Assertions.assertEquals(MaxComputeTypeConverter.STRING, typeDefine.getDataType());
+
+ column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(BasicType.STRING_TYPE)
+ .columnLength(null)
+ .sourceType(MaxComputeTypeConverter.JSON)
+ .build();
+
+ typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals(MaxComputeTypeConverter.STRING, typeDefine.getColumnType());
+ Assertions.assertEquals(MaxComputeTypeConverter.STRING, typeDefine.getDataType());
+
+ column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(BasicType.STRING_TYPE)
+ .columnLength(255L)
+ .build();
+
+ typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals(
+ String.format("%s(%s)", MaxComputeTypeConverter.CHAR, column.getColumnLength()),
+ typeDefine.getColumnType());
+ Assertions.assertEquals(MaxComputeTypeConverter.CHAR, typeDefine.getDataType());
+
+ column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(BasicType.STRING_TYPE)
+ .columnLength(255L)
+ .sourceType("VARCHAR(255)")
+ .build();
+
+ typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals(
+ String.format("%s(%s)", MaxComputeTypeConverter.CHAR, column.getColumnLength()),
+ typeDefine.getColumnType());
+ Assertions.assertEquals(MaxComputeTypeConverter.CHAR, typeDefine.getDataType());
+
+ column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(BasicType.STRING_TYPE)
+ .columnLength(65533L)
+ .build();
+
+ typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals(
+ String.format("%s(%s)", MaxComputeTypeConverter.VARCHAR, column.getColumnLength()),
+ typeDefine.getColumnType());
+ Assertions.assertEquals(MaxComputeTypeConverter.VARCHAR, typeDefine.getDataType());
+
+ column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(BasicType.STRING_TYPE)
+ .columnLength(16777215L)
+ .build();
+
+ typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals(MaxComputeTypeConverter.STRING, typeDefine.getColumnType());
+ Assertions.assertEquals(MaxComputeTypeConverter.STRING, typeDefine.getDataType());
+ }
+
+ @Test
+ public void testReconvertDate() {
+ Column column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(LocalTimeType.LOCAL_DATE_TYPE)
+ .build();
+
+ BasicTypeDefine typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals(MaxComputeTypeConverter.DATE, typeDefine.getColumnType());
+ Assertions.assertEquals(MaxComputeTypeConverter.DATE, typeDefine.getDataType());
+ }
+
+ @Test
+ public void testReconvertTime() {
+ Column column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(LocalTimeType.LOCAL_TIME_TYPE)
+ .build();
+
+ Exception exception =
+ Assertions.assertThrows(
+ Exception.class, () -> MaxComputeTypeConverter.INSTANCE.reconvert(column));
+ Assertions.assertTrue(
+ exception
+ .getMessage()
+ .contains(
+ "ErrorCode:[COMMON-19], ErrorDescription:['Maxcompute' unsupported convert SeaTunnel data type 'TIME' of 'test' to connector data type.]"));
+ }
+
+ @Test
+ public void testReconvertDatetime() {
+ Column column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE)
+ .build();
+
+ BasicTypeDefine typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals(MaxComputeTypeConverter.DATETIME, typeDefine.getDataType());
+
+ column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE)
+ .scale(3)
+ .build();
+
+ typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals(MaxComputeTypeConverter.DATETIME, typeDefine.getColumnType());
+ Assertions.assertEquals(MaxComputeTypeConverter.DATETIME, typeDefine.getDataType());
+
+ column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE)
+ .scale(10)
+ .build();
+
+ typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals(MaxComputeTypeConverter.TIMESTAMP, typeDefine.getDataType());
+ }
+
+ @Test
+ public void testReconvertArray() {
+ Column column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(ArrayType.BOOLEAN_ARRAY_TYPE)
+ .build();
+
+ BasicTypeDefine typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals("ARRAY", typeDefine.getColumnType());
+ Assertions.assertEquals(MaxComputeTypeConverter.ARRAY, typeDefine.getDataType());
+
+ column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(ArrayType.BYTE_ARRAY_TYPE)
+ .build();
+
+ typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals("ARRAY", typeDefine.getColumnType());
+ Assertions.assertEquals(MaxComputeTypeConverter.ARRAY, typeDefine.getDataType());
+
+ column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(ArrayType.STRING_ARRAY_TYPE)
+ .build();
+
+ typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals("ARRAY", typeDefine.getColumnType());
+ Assertions.assertEquals(MaxComputeTypeConverter.ARRAY, typeDefine.getDataType());
+
+ column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(ArrayType.SHORT_ARRAY_TYPE)
+ .build();
+
+ typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals("ARRAY", typeDefine.getColumnType());
+ Assertions.assertEquals(MaxComputeTypeConverter.ARRAY, typeDefine.getDataType());
+
+ column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(ArrayType.INT_ARRAY_TYPE)
+ .build();
+
+ typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals("ARRAY", typeDefine.getColumnType());
+ Assertions.assertEquals(MaxComputeTypeConverter.ARRAY, typeDefine.getDataType());
+
+ column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(ArrayType.LONG_ARRAY_TYPE)
+ .build();
+
+ typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals("ARRAY", typeDefine.getColumnType());
+ Assertions.assertEquals(MaxComputeTypeConverter.ARRAY, typeDefine.getDataType());
+
+ column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(ArrayType.FLOAT_ARRAY_TYPE)
+ .build();
+
+ typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals("ARRAY", typeDefine.getColumnType());
+ Assertions.assertEquals(MaxComputeTypeConverter.ARRAY, typeDefine.getDataType());
+
+ column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(ArrayType.DOUBLE_ARRAY_TYPE)
+ .build();
+
+ typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals("ARRAY", typeDefine.getColumnType());
+ Assertions.assertEquals(MaxComputeTypeConverter.ARRAY, typeDefine.getDataType());
+
+ column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(ArrayType.LOCAL_DATE_ARRAY_TYPE)
+ .build();
+
+ typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals("ARRAY", typeDefine.getColumnType());
+ Assertions.assertEquals(MaxComputeTypeConverter.ARRAY, typeDefine.getDataType());
+
+ column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(ArrayType.LOCAL_DATE_TIME_ARRAY_TYPE)
+ .build();
+
+ typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals("ARRAY", typeDefine.getColumnType());
+ Assertions.assertEquals(MaxComputeTypeConverter.ARRAY, typeDefine.getDataType());
+
+ DecimalArrayType decimalArrayType = new DecimalArrayType(new DecimalType(10, 2));
+ column = PhysicalColumn.builder().name("test").dataType(decimalArrayType).build();
+
+ typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals("ARRAY", typeDefine.getColumnType());
+ Assertions.assertEquals("ARRAY", typeDefine.getDataType());
+ }
+
+ @Test
+ public void testReconvertMap() {
+ Column column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE))
+ .build();
+
+ BasicTypeDefine typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals("MAP", typeDefine.getColumnType());
+ Assertions.assertEquals(MaxComputeTypeConverter.MAP, typeDefine.getDataType());
+
+ column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(new MapType<>(BasicType.BYTE_TYPE, BasicType.STRING_TYPE))
+ .build();
+
+ typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals("MAP", typeDefine.getColumnType());
+ Assertions.assertEquals("MAP", typeDefine.getDataType());
+
+ column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(new MapType<>(BasicType.SHORT_TYPE, BasicType.STRING_TYPE))
+ .build();
+
+ typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals("MAP", typeDefine.getColumnType());
+ Assertions.assertEquals("MAP", typeDefine.getDataType());
+
+ column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(new MapType<>(BasicType.INT_TYPE, BasicType.STRING_TYPE))
+ .build();
+
+ typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals("MAP", typeDefine.getColumnType());
+ Assertions.assertEquals("MAP", typeDefine.getDataType());
+
+ column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(new MapType<>(BasicType.LONG_TYPE, BasicType.STRING_TYPE))
+ .build();
+
+ typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals("MAP", typeDefine.getColumnType());
+ Assertions.assertEquals("MAP", typeDefine.getDataType());
+
+ column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(new MapType<>(BasicType.FLOAT_TYPE, BasicType.STRING_TYPE))
+ .build();
+
+ typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals("MAP", typeDefine.getColumnType());
+ Assertions.assertEquals("MAP", typeDefine.getDataType());
+
+ column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(new MapType<>(BasicType.DOUBLE_TYPE, BasicType.STRING_TYPE))
+ .build();
+
+ typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals("MAP", typeDefine.getColumnType());
+ Assertions.assertEquals("MAP", typeDefine.getDataType());
+
+ column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(new MapType<>(new DecimalType(10, 2), BasicType.STRING_TYPE))
+ .build();
+
+ typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals("MAP", typeDefine.getColumnType());
+ Assertions.assertEquals("MAP", typeDefine.getDataType());
+
+ column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(
+ new MapType<>(LocalTimeType.LOCAL_DATE_TYPE, BasicType.STRING_TYPE))
+ .build();
+
+ typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals("MAP", typeDefine.getColumnType());
+ Assertions.assertEquals("MAP", typeDefine.getDataType());
+
+ column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(
+ new MapType<>(
+ LocalTimeType.LOCAL_DATE_TIME_TYPE, BasicType.STRING_TYPE))
+ .build();
+
+ typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals("MAP", typeDefine.getColumnType());
+ Assertions.assertEquals("MAP", typeDefine.getDataType());
+ }
+}