Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Connector-V2] Support MaxCompute save mode #8277

Merged
merged 13 commits into from
Dec 16, 2024
65 changes: 58 additions & 7 deletions docs/en/connector-v2/sink/Maxcompute.md
Original file line number Diff line number Diff line change
@@ -51,6 +51,64 @@ Used to read data from Maxcompute.

`overwrite` Whether to overwrite the table or partition, default: false.

### save_mode_create_template

We use templates to automatically create MaxCompute tables,
which will create corresponding table creation statements based on the type of upstream data and schema type,
and the default template can be modified according to the situation. Only work on multi-table mode at now.

Default template:

```sql
CREATE TABLE IF NOT EXISTS `${table}` (
${rowtype_fields}
);
```

If a custom field is filled in the template, such as adding an `id` field

```sql
CREATE TABLE IF NOT EXISTS `${table}`
(
id,
${rowtype_fields}
);
```

The connector will automatically obtain the corresponding type from the upstream to complete the filling,
and remove the id field from `rowtype_fields`. This method can be used to customize the modification of field types and attributes.

You can use the following placeholders

- database: Used to get the database in the upstream schema
- table_name: Used to get the table name in the upstream schema
- rowtype_fields: Used to get all the fields in the upstream schema, we will automatically map to the field
description of MaxCompute
- rowtype_primary_key: Used to get the primary key in the upstream schema (maybe a list)
- rowtype_unique_key: Used to get the unique key in the upstream schema (maybe a list)

### schema_save_mode[Enum]

Before the synchronous task is turned on, different treatment schemes are selected for the existing surface structure of the target side.
Option introduction:
`RECREATE_SCHEMA` :Will create when the table does not exist, delete and rebuild when the table is saved
`CREATE_SCHEMA_WHEN_NOT_EXIST` :Will Created when the table does not exist, skipped when the table is saved
`ERROR_WHEN_SCHEMA_NOT_EXIST` :Error will be reported when the table does not exist
`IGNORE` :Ignore the treatment of the table

### data_save_mode[Enum]

Before the synchronous task is turned on, different processing schemes are selected for data existing data on the target side.
Option introduction:
`DROP_DATA`: Preserve database structure and delete data
`APPEND_DATA`:Preserve database structure, preserve data
`CUSTOM_PROCESSING`:User defined processing
`ERROR_WHEN_DATA_EXISTS`:When there is data, an error is reported

### custom_sql[String]

When data_save_mode selects CUSTOM_PROCESSING, you should fill in the CUSTOM_SQL parameter. This parameter usually fills in a SQL that can be executed. SQL will be executed before synchronization tasks.

### common options

Sink plugin common parameters, please refer to [Sink Common Options](../sink-common-options.md) for details.
@@ -70,10 +128,3 @@ sink {
}
}
```

## Changelog

### next version

- [Feature] Add Maxcompute Sink Connector([3640](https://github.com/apache/seatunnel/pull/3640))

2 changes: 1 addition & 1 deletion seatunnel-connectors-v2/connector-maxcompute/pom.xml
Original file line number Diff line number Diff line change
@@ -30,7 +30,7 @@
<name>SeaTunnel : Connectors V2 : Maxcompute</name>

<properties>
<maxcompute.version>0.31.3</maxcompute.version>
<maxcompute.version>0.51.0</maxcompute.version>
<commons.lang3.version>3.4</commons.lang3.version>
</properties>

Original file line number Diff line number Diff line change
@@ -22,29 +22,46 @@
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.PreviewResult;
import org.apache.seatunnel.api.table.catalog.SQLPreviewResult;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
import org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.connectors.seatunnel.maxcompute.datatype.MaxComputeTypeConverter;
import org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeUtil;

import org.apache.commons.lang3.StringUtils;

import com.aliyun.odps.Odps;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.Projects;
import com.aliyun.odps.Table;
import com.aliyun.odps.Tables;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.task.SQLTask;
import com.aliyun.odps.type.TypeInfo;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;

import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_ID;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_KEY;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ENDPOINT;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PROJECT;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.SAVE_MODE_CREATE_TEMPLATE;
import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;

@Slf4j
public class MaxComputeCatalog implements Catalog {
@@ -79,11 +96,9 @@ public String getDefaultDatabase() throws CatalogException {

@Override
public boolean databaseExists(String databaseName) throws CatalogException {
Odps odps = new Odps(account);
odps.setEndpoint(readonlyConfig.get(ENDPOINT));
odps.setDefaultProject(readonlyConfig.get(PROJECT));
Projects projects = odps.projects();
try {
Odps odps = getOdps(readonlyConfig.get(PROJECT));
Projects projects = odps.projects();
return projects.exists(databaseName);
} catch (OdpsException e) {
throw new CatalogException("Check " + databaseName + " exist error", e);
@@ -107,9 +122,7 @@ public List<String> listDatabases() throws CatalogException {
@Override
public List<String> listTables(String databaseName)
throws CatalogException, DatabaseNotExistException {
Odps odps = new Odps(account);
odps.setEndpoint(readonlyConfig.get(ENDPOINT));
odps.setDefaultProject(databaseName);
Odps odps = getOdps(databaseName);

Tables tables = odps.tables();
List<String> tableNames = new ArrayList<>();
@@ -122,12 +135,9 @@ public List<String> listTables(String databaseName)

@Override
public boolean tableExists(TablePath tablePath) throws CatalogException {
Odps odps = new Odps(account);
odps.setEndpoint(readonlyConfig.get(ENDPOINT));
odps.setDefaultProject(tablePath.getDatabaseName());

Tables tables = odps.tables();
try {
Odps odps = getOdps(tablePath.getDatabaseName());
com.aliyun.odps.Tables tables = odps.tables();
return tables.exists(tablePath.getTableName());
} catch (OdpsException e) {
throw new CatalogException("tableExists" + tablePath + " error", e);
@@ -137,19 +147,97 @@ public boolean tableExists(TablePath tablePath) throws CatalogException {
@Override
public CatalogTable getTable(TablePath tablePath)
throws CatalogException, TableNotExistException {
throw new UnsupportedOperationException();
if (!tableExists(tablePath)) {
throw new TableNotExistException(catalogName, tablePath);
}
Table odpsTable;
com.aliyun.odps.TableSchema odpsSchema;
boolean isPartitioned;
try {
Odps odps = getOdps(tablePath.getDatabaseName());
odpsTable =
MaxcomputeUtil.parseTable(
odps, tablePath.getDatabaseName(), tablePath.getTableName());
odpsSchema = odpsTable.getSchema();
isPartitioned = odpsTable.isPartitioned();
} catch (Exception ex) {
throw new CatalogException(catalogName, ex);
}
List<String> partitionKeys = new ArrayList<>();
TableSchema.Builder builder = TableSchema.builder();
buildColumnsWithErrorCheck(
tablePath,
builder,
odpsSchema.getColumns().iterator(),
(column) -> {
BasicTypeDefine<TypeInfo> typeDefine =
BasicTypeDefine.<TypeInfo>builder()
.name(column.getName())
.nativeType(column.getTypeInfo())
.columnType(column.getTypeInfo().getTypeName())
.dataType(column.getTypeInfo().getTypeName())
.nullable(column.isNullable())
.comment(column.getComment())
.build();
return MaxComputeTypeConverter.INSTANCE.convert(typeDefine);
});
if (isPartitioned) {
buildColumnsWithErrorCheck(
tablePath,
builder,
odpsSchema.getPartitionColumns().iterator(),
(column) -> {
BasicTypeDefine<TypeInfo> typeDefine =
BasicTypeDefine.<TypeInfo>builder()
.name(column.getName())
.nativeType(column.getTypeInfo())
.columnType(column.getTypeInfo().getTypeName())
.dataType(column.getTypeInfo().getTypeName())
.nullable(column.isNullable())
.comment(column.getComment())
.build();
partitionKeys.add(column.getName());
return MaxComputeTypeConverter.INSTANCE.convert(typeDefine);
});
}
TableSchema tableSchema = builder.build();
TableIdentifier tableIdentifier = getTableIdentifier(tablePath);
return CatalogTable.of(
tableIdentifier,
tableSchema,
readonlyConfig.toMap(),
partitionKeys,
odpsTable.getComment(),
catalogName);
}

@Override
public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
throw new UnsupportedOperationException();
try {
Odps odps = getOdps(tablePath.getDatabaseName());
SQLTask.run(
odps,
MaxComputeCatalogUtil.getCreateTableStatement(
readonlyConfig.get(SAVE_MODE_CREATE_TEMPLATE),
tablePath,
table))
.waitForSuccess();
} catch (OdpsException e) {
throw new CatalogException("create table error", e);
}
}

@Override
public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
throw new UnsupportedOperationException();
try {
Odps odps = getOdps(tablePath.getDatabaseName());
SQLTask.run(odps, MaxComputeCatalogUtil.getDropTableQuery(tablePath, ignoreIfNotExists))
.waitForSuccess();
} catch (OdpsException e) {
throw new CatalogException("drop table error", e);
}
}

@Override
@@ -163,4 +251,70 @@ public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
throws DatabaseNotExistException, CatalogException {
throw new UnsupportedOperationException();
}

@Override
public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
try {
Odps odps = getOdps(tablePath.getDatabaseName());
Table odpsTable = odps.tables().get(tablePath.getTableName());
if (odpsTable.isPartitioned()
&& StringUtils.isNotEmpty(readonlyConfig.get(PARTITION_SPEC))) {
PartitionSpec partitionSpec = new PartitionSpec(readonlyConfig.get(PARTITION_SPEC));
odpsTable.deletePartition(partitionSpec, ignoreIfNotExists);
odpsTable.createPartition(partitionSpec, true);
} else {
odpsTable.truncate();
}
} catch (Exception e) {
throw new CatalogException("truncate table error", e);
}
}

@Override
public boolean isExistsData(TablePath tablePath) {
throw new UnsupportedOperationException();
}

@Override
public void executeSql(TablePath tablePath, String sql) {
try {
Odps odps = getOdps(tablePath.getDatabaseName());
SQLTask.run(odps, sql).waitForSuccess();
} catch (OdpsException e) {
throw new CatalogException("execute sql error", e);
}
}

@Override
public PreviewResult previewAction(
ActionType actionType, TablePath tablePath, Optional<CatalogTable> catalogTable) {
if (actionType == ActionType.CREATE_TABLE) {
checkArgument(catalogTable.isPresent(), "CatalogTable cannot be null");
return new SQLPreviewResult(
MaxComputeCatalogUtil.getCreateTableStatement(
readonlyConfig.get(SAVE_MODE_CREATE_TEMPLATE),
tablePath,
catalogTable.get()));
} else if (actionType == ActionType.DROP_TABLE) {
return new SQLPreviewResult(MaxComputeCatalogUtil.getDropTableQuery(tablePath, true));
} else {
throw new UnsupportedOperationException("Unsupported action type: " + actionType);
}
}

private Odps getOdps(String project) {
Odps odps = new Odps(account);
odps.setEndpoint(readonlyConfig.get(ENDPOINT));
odps.setDefaultProject(project);
return odps;
}

protected TableIdentifier getTableIdentifier(TablePath tablePath) {
return TableIdentifier.of(
catalogName,
tablePath.getDatabaseName(),
tablePath.getSchemaName(),
tablePath.getTableName());
}
}
Original file line number Diff line number Diff line change
@@ -52,7 +52,7 @@ public String factoryIdentifier() {
public OptionRule optionRule() {
return OptionRule.builder()
.required(ACCESS_ID, ACCESS_KEY, ENDPOINT, PROJECT, TABLE_NAME)
.optional(PARTITION_SPEC, SPLIT_ROW, SPLIT_ROW, SCHEMA)
.optional(PARTITION_SPEC, SPLIT_ROW, SCHEMA)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.maxcompute.catalog;

import org.apache.seatunnel.api.sink.SaveModePlaceHolder;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.api.table.converter.TypeConverter;
import org.apache.seatunnel.connectors.seatunnel.maxcompute.datatype.MaxComputeTypeConverter;
import org.apache.seatunnel.connectors.seatunnel.maxcompute.util.CreateTableParser;

import org.apache.commons.lang3.StringUtils;

import com.aliyun.odps.type.TypeInfo;
import lombok.extern.slf4j.Slf4j;

import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;

@Slf4j
public class MaxComputeCatalogUtil {

public static String getDropTableQuery(TablePath tablePath, boolean ignoreIfNotExists) {
return "DROP TABLE "
+ (ignoreIfNotExists ? "IF EXISTS " : "")
+ tablePath.getFullName()
+ ";";
}

/**
* @param createTableTemplate create table template
* @param catalogTable catalog table
* @return create table stmt
*/
public static String getCreateTableStatement(
String createTableTemplate, TablePath tablePath, CatalogTable catalogTable) {

String template = createTableTemplate;
if (!createTableTemplate.trim().endsWith(";")) {
template += ";";
}
TableSchema tableSchema = catalogTable.getTableSchema();

String primaryKey = "";
if (tableSchema.getPrimaryKey() != null) {
List<String> fields = Arrays.asList(catalogTable.getTableSchema().getFieldNames());
List<String> keys = tableSchema.getPrimaryKey().getColumnNames();
keys.sort(Comparator.comparingInt(fields::indexOf));
primaryKey = keys.stream().map(r -> "`" + r + "`").collect(Collectors.joining(","));
}
String uniqueKey = "";
if (!tableSchema.getConstraintKeys().isEmpty()) {
uniqueKey =
tableSchema.getConstraintKeys().stream()
.flatMap(c -> c.getColumnNames().stream())
.map(r -> "`" + r.getColumnName() + "`")
.collect(Collectors.joining(","));
}

template =
template.replaceAll(
SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getReplacePlaceHolder(),
primaryKey);
template =
template.replaceAll(
SaveModePlaceHolder.ROWTYPE_UNIQUE_KEY.getReplacePlaceHolder(), uniqueKey);

Map<String, CreateTableParser.ColumnInfo> columnInTemplate =
CreateTableParser.getColumnList(template);
template = mergeColumnInTemplate(columnInTemplate, tableSchema, template);

String rowTypeFields =
tableSchema.getColumns().stream()
.filter(column -> !columnInTemplate.containsKey(column.getName()))
.map(
x ->
MaxComputeCatalogUtil.columnToMaxComputeType(
x, MaxComputeTypeConverter.INSTANCE))
.collect(Collectors.joining(",\n"));

return template.replaceAll(
SaveModePlaceHolder.DATABASE.getReplacePlaceHolder(),
tablePath.getDatabaseName())
.replaceAll(
SaveModePlaceHolder.TABLE.getReplacePlaceHolder(), tablePath.getTableName())
.replaceAll(
SaveModePlaceHolder.ROWTYPE_FIELDS.getReplacePlaceHolder(), rowTypeFields);
}

private static String mergeColumnInTemplate(
Map<String, CreateTableParser.ColumnInfo> columnInTemplate,
TableSchema tableSchema,
String template) {
int offset = 0;
Map<String, Column> columnMap =
tableSchema.getColumns().stream()
.collect(Collectors.toMap(Column::getName, Function.identity()));
List<CreateTableParser.ColumnInfo> columnInfosInSeq =
columnInTemplate.values().stream()
.sorted(
Comparator.comparingInt(
CreateTableParser.ColumnInfo::getStartIndex))
.collect(Collectors.toList());
for (CreateTableParser.ColumnInfo columnInfo : columnInfosInSeq) {
String col = columnInfo.getName();
if (StringUtils.isEmpty(columnInfo.getInfo())) {
if (columnMap.containsKey(col)) {
Column column = columnMap.get(col);
String newCol =
columnToMaxComputeType(column, MaxComputeTypeConverter.INSTANCE);
String prefix = template.substring(0, columnInfo.getStartIndex() + offset);
String suffix = template.substring(offset + columnInfo.getEndIndex());
if (prefix.endsWith("`")) {
prefix = prefix.substring(0, prefix.length() - 1);
offset--;
}
if (suffix.startsWith("`")) {
suffix = suffix.substring(1);
offset--;
}
template = prefix + newCol + suffix;
offset += newCol.length() - columnInfo.getName().length();
} else {
throw new IllegalArgumentException("Can't find column " + col + " in table.");
}
}
}
return template;
}

public static String columnToMaxComputeType(
Column column, TypeConverter<BasicTypeDefine<TypeInfo>> typeConverter) {
checkNotNull(column, "The column is required.");
return String.format(
"`%s` %s %s %s",
column.getName(),
typeConverter.reconvert(column).getColumnType(),
column.isNullable() ? "NULL" : "NOT NULL",
StringUtils.isEmpty(column.getComment())
? ""
: "COMMENT '" + column.getComment() + "'");
}
}
Original file line number Diff line number Diff line change
@@ -17,34 +17,36 @@

package org.apache.seatunnel.connectors.seatunnel.maxcompute.catalog;

import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.DataTypeConvertor;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig;
import org.apache.seatunnel.connectors.seatunnel.maxcompute.datatype.MaxComputeTypeConverter;

import org.apache.commons.collections4.MapUtils;

import com.aliyun.odps.OdpsType;
import com.aliyun.odps.type.ArrayTypeInfo;
import com.aliyun.odps.type.DecimalTypeInfo;
import com.aliyun.odps.type.MapTypeInfo;
import com.aliyun.odps.type.StructTypeInfo;
import com.aliyun.odps.type.TypeInfo;
import com.aliyun.odps.type.TypeInfoFactory;
import com.google.auto.service.AutoService;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;

@AutoService(DataTypeConvertor.class)
public class MaxComputeDataTypeConvertor implements DataTypeConvertor<TypeInfo> {

public static final String PRECISION = "precision";
public static final String SCALE = "scale";

@Override
public SeaTunnelDataType<?> toSeaTunnelType(String field, String connectorDataType) {
if (connectorDataType.startsWith("MAP")) {
@@ -146,145 +148,37 @@ public SeaTunnelDataType<?> toSeaTunnelType(String field, String connectorDataTy
@Override
public SeaTunnelDataType<?> toSeaTunnelType(
String field, TypeInfo connectorDataType, Map<String, Object> dataTypeProperties) {
switch (connectorDataType.getOdpsType()) {
case MAP:
MapTypeInfo mapTypeInfo = (MapTypeInfo) connectorDataType;
return new MapType(
toSeaTunnelType(field, mapTypeInfo.getKeyTypeInfo(), dataTypeProperties),
toSeaTunnelType(field, mapTypeInfo.getValueTypeInfo(), dataTypeProperties));
case ARRAY:
ArrayTypeInfo arrayTypeInfo = (ArrayTypeInfo) connectorDataType;
switch (arrayTypeInfo.getElementTypeInfo().getOdpsType()) {
case BOOLEAN:
return ArrayType.BOOLEAN_ARRAY_TYPE;
case INT:
return ArrayType.INT_ARRAY_TYPE;
case BIGINT:
return ArrayType.LONG_ARRAY_TYPE;
case FLOAT:
return ArrayType.FLOAT_ARRAY_TYPE;
case DOUBLE:
return ArrayType.DOUBLE_ARRAY_TYPE;
case STRING:
return ArrayType.STRING_ARRAY_TYPE;
default:
throw CommonError.convertToSeaTunnelTypeError(
MaxcomputeConfig.PLUGIN_NAME,
connectorDataType.getTypeName(),
field);
}
case STRUCT:
StructTypeInfo structTypeInfo = (StructTypeInfo) connectorDataType;
List<TypeInfo> fields = structTypeInfo.getFieldTypeInfos();
List<String> fieldNames = new ArrayList<>(fields.size());
List<SeaTunnelDataType<?>> fieldTypes = new ArrayList<>(fields.size());
for (TypeInfo f : fields) {
fieldNames.add(f.getTypeName());
fieldTypes.add(toSeaTunnelType(f.getTypeName(), f, dataTypeProperties));
}
return new SeaTunnelRowType(
fieldNames.toArray(new String[0]),
fieldTypes.toArray(new SeaTunnelDataType[0]));
case TINYINT:
return BasicType.BYTE_TYPE;
case SMALLINT:
return BasicType.SHORT_TYPE;
case INT:
return BasicType.INT_TYPE;
case BIGINT:
return BasicType.LONG_TYPE;
case BINARY:
return PrimitiveByteArrayType.INSTANCE;
case FLOAT:
return BasicType.FLOAT_TYPE;
case DOUBLE:
return BasicType.DOUBLE_TYPE;
case DECIMAL:
DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) connectorDataType;
return new DecimalType(decimalTypeInfo.getPrecision(), decimalTypeInfo.getScale());
case VARCHAR:
case CHAR:
case STRING:
return BasicType.STRING_TYPE;
case DATE:
return LocalTimeType.LOCAL_DATE_TYPE;
case DATETIME:
return LocalTimeType.LOCAL_TIME_TYPE;
case TIMESTAMP:
return LocalTimeType.LOCAL_DATE_TIME_TYPE;
case BOOLEAN:
return BasicType.BOOLEAN_TYPE;
case VOID:
return BasicType.VOID_TYPE;
case INTERVAL_DAY_TIME:
case INTERVAL_YEAR_MONTH:
default:
throw CommonError.convertToSeaTunnelTypeError(
MaxcomputeConfig.PLUGIN_NAME, connectorDataType.getTypeName(), field);
}
checkNotNull(connectorDataType, "seaTunnelDataType cannot be null");

BasicTypeDefine<TypeInfo> typeDefine =
BasicTypeDefine.<TypeInfo>builder()
.name(field)
.columnType(connectorDataType.getTypeName())
.dataType(connectorDataType.getOdpsType().name())
.nativeType(connectorDataType)
.build();

return MaxComputeTypeConverter.INSTANCE.convert(typeDefine).getDataType();
}

@Override
public TypeInfo toConnectorType(
String field,
SeaTunnelDataType<?> seaTunnelDataType,
Map<String, Object> dataTypeProperties) {
switch (seaTunnelDataType.getSqlType()) {
case MAP:
MapType mapType = (MapType) seaTunnelDataType;
return TypeInfoFactory.getMapTypeInfo(
toConnectorType(field, mapType.getKeyType(), dataTypeProperties),
toConnectorType(field, mapType.getValueType(), dataTypeProperties));
case ARRAY:
ArrayType arrayType = (ArrayType) seaTunnelDataType;
return TypeInfoFactory.getArrayTypeInfo(
toConnectorType(field, arrayType.getElementType(), dataTypeProperties));
case ROW:
SeaTunnelRowType rowType = (SeaTunnelRowType) seaTunnelDataType;
List<String> fieldNames = new ArrayList<>(rowType.getTotalFields());
List<TypeInfo> fieldTypes = new ArrayList<>(rowType.getTotalFields());
for (int i = 0; i < rowType.getTotalFields(); i++) {
fieldNames.add(rowType.getFieldName(i));
fieldTypes.add(
toConnectorType(field, rowType.getFieldType(i), dataTypeProperties));
}
return TypeInfoFactory.getStructTypeInfo(fieldNames, fieldTypes);
case TINYINT:
return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.TINYINT);
case SMALLINT:
return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.SMALLINT);
case INT:
return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.INT);
case BIGINT:
return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.BIGINT);
case BYTES:
return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.BINARY);
case FLOAT:
return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.FLOAT);
case DOUBLE:
return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.DOUBLE);
case DECIMAL:
DecimalType decimalType = (DecimalType) seaTunnelDataType;
return TypeInfoFactory.getDecimalTypeInfo(
decimalType.getPrecision(), decimalType.getScale());
case STRING:
return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.STRING);
case DATE:
return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.DATE);
case TIMESTAMP:
return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.TIMESTAMP);
case TIME:
return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.DATETIME);
case BOOLEAN:
return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.BOOLEAN);
case NULL:
return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.VOID);
default:
throw CommonError.convertToConnectorTypeError(
MaxcomputeConfig.PLUGIN_NAME,
seaTunnelDataType.getSqlType().toString(),
field);
}
checkNotNull(seaTunnelDataType, "seaTunnelDataType cannot be null");
Long precision = MapUtils.getLong(dataTypeProperties, PRECISION);
Integer scale = MapUtils.getInteger(dataTypeProperties, SCALE);
Column column =
PhysicalColumn.builder()
.name(field)
.dataType(seaTunnelDataType)
.columnLength(precision)
.scale(scale)
.nullable(true)
.build();
BasicTypeDefine<TypeInfo> typeDefine = MaxComputeTypeConverter.INSTANCE.reconvert(column);
return typeDefine.getNativeType();
}

@Override
Original file line number Diff line number Diff line change
@@ -19,6 +19,9 @@

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SaveModePlaceHolder;
import org.apache.seatunnel.api.sink.SchemaSaveMode;

import java.io.Serializable;

@@ -69,4 +72,33 @@ public class MaxcomputeConfig implements Serializable {
.booleanType()
.defaultValue(false)
.withDescription("Whether to overwrite the table or partition");

public static final Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
Options.key("schema_save_mode")
.enumType(SchemaSaveMode.class)
.defaultValue(SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST)
.withDescription("schema_save_mode");

public static final Option<DataSaveMode> DATA_SAVE_MODE =
Options.key("data_save_mode")
.enumType(DataSaveMode.class)
.defaultValue(DataSaveMode.APPEND_DATA)
.withDescription("data_save_mode");

public static final Option<String> CUSTOM_SQL =
Options.key("custom_sql").stringType().noDefaultValue().withDescription("custom_sql");

// create table
public static final Option<String> SAVE_MODE_CREATE_TEMPLATE =
Options.key("save_mode_create_template")
.stringType()
.defaultValue(
"CREATE TABLE IF NOT EXISTS `"
+ SaveModePlaceHolder.TABLE.getPlaceHolder()
+ "` (\n"
+ SaveModePlaceHolder.ROWTYPE_FIELDS.getPlaceHolder()
+ "\n"
+ ");")
.withDescription(
"Create table statement template, used to create MaxCompute table");
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -17,52 +17,89 @@

package org.apache.seatunnel.connectors.seatunnel.maxcompute.sink;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.DefaultSaveModeHandler;
import org.apache.seatunnel.api.sink.SaveModeHandler;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.sink.SupportSaveMode;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.factory.CatalogFactory;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeUtil;
import org.apache.seatunnel.connectors.seatunnel.maxcompute.catalog.MaxComputeCatalog;
import org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig;
import org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.auto.service.AutoService;

import java.util.Optional;

import static org.apache.seatunnel.api.table.factory.FactoryUtil.discoverFactory;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.OVERWRITE;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PLUGIN_NAME;

@AutoService(SeaTunnelSink.class)
public class MaxcomputeSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
public class MaxcomputeSink extends AbstractSimpleSink<SeaTunnelRow, Void>
implements SupportSaveMode, SupportMultiTableSink {
private static final Logger LOG = LoggerFactory.getLogger(MaxcomputeSink.class);
private Config pluginConfig;
private SeaTunnelRowType typeInfo;
private final ReadonlyConfig readonlyConfig;
private final CatalogTable catalogTable;

public MaxcomputeSink(ReadonlyConfig readonlyConfig, CatalogTable catalogTable) {
this.readonlyConfig = readonlyConfig;
this.catalogTable = catalogTable;
}

@Override
public String getPluginName() {
return PLUGIN_NAME;
}

@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
this.pluginConfig = pluginConfig;
MaxcomputeUtil.initTableOrPartition(pluginConfig);
public MaxcomputeWriter createWriter(SinkWriter.Context context) {
return new MaxcomputeWriter(this.readonlyConfig, this.catalogTable.getSeaTunnelRowType());
}

@Override
public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
this.typeInfo = seaTunnelRowType;
}
public Optional<SaveModeHandler> getSaveModeHandler() {
CatalogFactory catalogFactory =
discoverFactory(
Thread.currentThread().getContextClassLoader(),
CatalogFactory.class,
"MaxCompute");
if (catalogFactory == null) {
throw new MaxcomputeConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format(
"PluginName: %s, PluginType: %s, Message: %s",
getPluginName(),
PluginType.SINK,
"Cannot find MaxCompute catalog factory"));
}
MaxComputeCatalog catalog =
(MaxComputeCatalog)
catalogFactory.createCatalog(
catalogFactory.factoryIdentifier(), readonlyConfig);

@Override
public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context) {
return new MaxcomputeWriter(this.pluginConfig, this.typeInfo);
DataSaveMode dataSaveMode = readonlyConfig.get(MaxcomputeConfig.DATA_SAVE_MODE);
if (readonlyConfig.get(OVERWRITE)) {
// compatible with old version
LOG.warn(
"The configuration of 'overwrite' is deprecated, please use 'data_save_mode' instead.");
dataSaveMode = DataSaveMode.DROP_DATA;
}

return Optional.of(
new DefaultSaveModeHandler(
readonlyConfig.get(MaxcomputeConfig.SCHEMA_SAVE_MODE),
dataSaveMode,
catalog,
catalogTable,
readonlyConfig.get(MaxcomputeConfig.CUSTOM_SQL)));
}

@Override
Original file line number Diff line number Diff line change
@@ -18,18 +18,27 @@
package org.apache.seatunnel.connectors.seatunnel.maxcompute.sink;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.sink.SinkCommonOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;

import com.google.auto.service.AutoService;

import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_ID;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_KEY;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.CUSTOM_SQL;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.DATA_SAVE_MODE;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ENDPOINT;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.OVERWRITE;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PLUGIN_NAME;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PROJECT;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.SAVE_MODE_CREATE_TEMPLATE;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.SCHEMA_SAVE_MODE;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.TABLE_NAME;

@AutoService(Factory.class)
@@ -43,7 +52,27 @@ public String factoryIdentifier() {
public OptionRule optionRule() {
return OptionRule.builder()
.required(ACCESS_ID, ACCESS_KEY, ENDPOINT, PROJECT, TABLE_NAME)
.optional(PARTITION_SPEC, OVERWRITE)
.optional(
PARTITION_SPEC,
OVERWRITE,
SCHEMA_SAVE_MODE,
DATA_SAVE_MODE,
SAVE_MODE_CREATE_TEMPLATE,
CUSTOM_SQL,
SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
.build();
}

@Override
public TableSink createSink(TableSinkFactoryContext context) {
return () ->
new MaxcomputeSink(
context.getOptions(),
CatalogTable.of(
TableIdentifier.of(
context.getCatalogTable().getCatalogName(),
context.getOptions().get(PROJECT),
context.getOptions().get(TABLE_NAME)),
context.getCatalogTable()));
}
}
Original file line number Diff line number Diff line change
@@ -17,8 +17,8 @@

package org.apache.seatunnel.connectors.seatunnel.maxcompute.sink;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
@@ -42,32 +42,31 @@
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.TABLE_NAME;

@Slf4j
public class MaxcomputeWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
public class MaxcomputeWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
implements SupportMultiTableSinkWriter<Void> {
private RecordWriter recordWriter;
private final TableTunnel.UploadSession session;
private final TableSchema tableSchema;
private static final Long BLOCK_0 = 0L;
private SeaTunnelRowType rowType;
private final SeaTunnelRowType rowType;

public MaxcomputeWriter(Config pluginConfig, SeaTunnelRowType rowType) {
public MaxcomputeWriter(ReadonlyConfig readonlyConfig, SeaTunnelRowType rowType) {
try {
this.rowType = rowType;
Table table = MaxcomputeUtil.getTable(pluginConfig);
Table table = MaxcomputeUtil.getTable(readonlyConfig);
this.tableSchema = table.getSchema();
TableTunnel tunnel = MaxcomputeUtil.getTableTunnel(pluginConfig);
if (pluginConfig.hasPath(PARTITION_SPEC.key())) {
PartitionSpec partitionSpec =
new PartitionSpec(pluginConfig.getString(PARTITION_SPEC.key()));
TableTunnel tunnel = MaxcomputeUtil.getTableTunnel(readonlyConfig);
if (readonlyConfig.getOptional(PARTITION_SPEC).isPresent()) {
PartitionSpec partitionSpec = new PartitionSpec(readonlyConfig.get(PARTITION_SPEC));
session =
tunnel.createUploadSession(
pluginConfig.getString(PROJECT.key()),
pluginConfig.getString(TABLE_NAME.key()),
readonlyConfig.get(PROJECT),
readonlyConfig.get(TABLE_NAME),
partitionSpec);
} else {
session =
tunnel.createUploadSession(
pluginConfig.getString(PROJECT.key()),
pluginConfig.getString(TABLE_NAME.key()));
readonlyConfig.get(PROJECT), readonlyConfig.get(TABLE_NAME));
}
this.recordWriter = session.openRecordWriter(BLOCK_0);
log.info("open record writer success");
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
@@ -67,7 +68,8 @@ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
source -> {
try {
TableTunnel.DownloadSession session =
MaxcomputeUtil.getDownloadSession(pluginConfig);
MaxcomputeUtil.getDownloadSession(
ReadonlyConfig.fromConfig(pluginConfig));
TunnelRecordReader recordReader =
session.openRecordReader(source.getSplitId(), source.getRowNum());
log.info("open record reader success");
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeUtil;

@@ -99,7 +100,8 @@ public void notifyCheckpointComplete(long checkpointId) {}
public void handleSplitRequest(int subtaskId) {}

private void discoverySplits() throws TunnelException {
TableTunnel.DownloadSession session = MaxcomputeUtil.getDownloadSession(this.pluginConfig);
TableTunnel.DownloadSession session =
MaxcomputeUtil.getDownloadSession(ReadonlyConfig.fromConfig(this.pluginConfig));
long recordCount = session.getRecordCount();
int numReaders = enumeratorContext.currentParallelism();
int splitRowNum = (int) Math.ceil((double) recordCount / numReaders);
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.maxcompute.util;

import lombok.Getter;

import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class CreateTableParser {

private static final Pattern COLUMN_PATTERN = Pattern.compile("`?(\\w+)`?\\s*([\\w|\\W]*)");

public static Map<String, ColumnInfo> getColumnList(String createTableSql) {
Map<String, ColumnInfo> columns = new HashMap<>();
StringBuilder columnBuilder = new StringBuilder();
int startIndex = createTableSql.indexOf("(");
createTableSql = createTableSql.substring(startIndex + 1);

boolean insideParentheses = false;
for (int i = 0; i < createTableSql.length(); i++) {
char c = createTableSql.charAt(i);
if (c == '(') {
insideParentheses = true;
columnBuilder.append(c);
} else if ((c == ',' || c == ')') && !insideParentheses) {
parseColumn(columnBuilder.toString(), columns, startIndex + i + 1);
columnBuilder.setLength(0);
if (c == ')') {
break;
}
} else if (c == ')') {
insideParentheses = false;
columnBuilder.append(c);
} else {
columnBuilder.append(c);
}
}
return columns;
}

private static void parseColumn(
String columnString, Map<String, ColumnInfo> columnList, int suffixIndex) {
Matcher matcher = COLUMN_PATTERN.matcher(columnString.trim());
if (matcher.matches()) {
String columnName = matcher.group(1);
String otherInfo = matcher.group(2).trim();
StringBuilder columnBuilder =
new StringBuilder(columnName).append(" ").append(otherInfo);
if (columnBuilder.toString().toUpperCase().contains("PRIMARY KEY")
|| columnBuilder.toString().toUpperCase().contains("CREATE TABLE")) {
return;
}
int endIndex =
suffixIndex
- columnString
.substring(
columnString.indexOf(columnName) + columnName.length())
.length();
int startIndex =
suffixIndex - columnString.substring(columnString.indexOf(columnName)).length();
columnList.put(columnName, new ColumnInfo(columnName, otherInfo, startIndex, endIndex));
}
}

@Getter
public static final class ColumnInfo {

public ColumnInfo(String name, String info, int startIndex, int endIndex) {
this.name = name;
this.info = info;
this.startIndex = startIndex;
this.endIndex = endIndex;
}

String name;
String info;
int startIndex;
int endIndex;
}
}
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -89,7 +90,7 @@ public static Record getMaxcomputeRowData(
}

public static SeaTunnelRowType getSeaTunnelRowType(Config pluginConfig) {
Table table = MaxcomputeUtil.getTable(pluginConfig);
Table table = MaxcomputeUtil.getTable(ReadonlyConfig.fromConfig(pluginConfig));
TableSchema tableSchema = table.getSchema();
ArrayList<SeaTunnelDataType<?>> seaTunnelDataTypes = new ArrayList<>();
ArrayList<String> fieldNames = new ArrayList<>();
@@ -251,16 +252,17 @@ private static Object resolveObject2Maxcompute(Object field, TypeInfo typeInfo)
case DOUBLE:
case BIGINT:
case BOOLEAN:
case DECIMAL:
case TIMESTAMP_NTZ:
return field;
case BINARY:
return new Binary((byte[]) field);
case DECIMAL:
return null;
case VARCHAR:
return new Varchar((String) field);
case CHAR:
return new Char((String) field);
case STRING:
case JSON:
if (field instanceof byte[]) {
return new String((byte[]) field);
}
Original file line number Diff line number Diff line change
@@ -17,8 +17,7 @@

package org.apache.seatunnel.connectors.seatunnel.maxcompute.util;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException;

@@ -33,53 +32,46 @@
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_ID;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_KEY;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ENDPOINT;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.OVERWRITE;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PROJECT;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.TABLE_NAME;

@Slf4j
public class MaxcomputeUtil {
public static Table getTable(Config pluginConfig) {
Odps odps = getOdps(pluginConfig);
Table table = odps.tables().get(pluginConfig.getString(TABLE_NAME.key()));
return table;
public static Table getTable(ReadonlyConfig readonlyConfig) {
Odps odps = getOdps(readonlyConfig);
return odps.tables().get(readonlyConfig.get(TABLE_NAME));
}

public static TableTunnel getTableTunnel(Config pluginConfig) {
Odps odps = getOdps(pluginConfig);
TableTunnel tunnel = new TableTunnel(odps);
return tunnel;
public static TableTunnel getTableTunnel(ReadonlyConfig readonlyConfig) {
Odps odps = getOdps(readonlyConfig);
return new TableTunnel(odps);
}

public static Odps getOdps(Config pluginConfig) {
public static Odps getOdps(ReadonlyConfig readonlyConfig) {
Account account =
new AliyunAccount(
pluginConfig.getString(ACCESS_ID.key()),
pluginConfig.getString(ACCESS_KEY.key()));
new AliyunAccount(readonlyConfig.get(ACCESS_ID), readonlyConfig.get(ACCESS_KEY));
Odps odps = new Odps(account);
odps.setEndpoint(pluginConfig.getString(ENDPOINT.key()));
odps.setDefaultProject(pluginConfig.getString(PROJECT.key()));
odps.setEndpoint(readonlyConfig.get(ENDPOINT));
odps.setDefaultProject(readonlyConfig.get(PROJECT));
return odps;
}

public static TableTunnel.DownloadSession getDownloadSession(Config pluginConfig) {
TableTunnel tunnel = getTableTunnel(pluginConfig);
public static TableTunnel.DownloadSession getDownloadSession(ReadonlyConfig readonlyConfig) {
TableTunnel tunnel = getTableTunnel(readonlyConfig);
TableTunnel.DownloadSession session;
try {
if (pluginConfig.hasPath(PARTITION_SPEC.key())) {
PartitionSpec partitionSpec =
new PartitionSpec(pluginConfig.getString(PARTITION_SPEC.key()));
if (readonlyConfig.getOptional(PARTITION_SPEC).isPresent()) {
PartitionSpec partitionSpec = new PartitionSpec(readonlyConfig.get(PARTITION_SPEC));
session =
tunnel.createDownloadSession(
pluginConfig.getString(PROJECT.key()),
pluginConfig.getString(TABLE_NAME.key()),
readonlyConfig.get(PROJECT),
readonlyConfig.get(TABLE_NAME),
partitionSpec);
} else {
session =
tunnel.createDownloadSession(
pluginConfig.getString(PROJECT.key()),
pluginConfig.getString(TABLE_NAME.key()));
readonlyConfig.get(PROJECT), readonlyConfig.get(TABLE_NAME));
}
} catch (Exception e) {
throw new MaxcomputeConnectorException(
@@ -88,36 +80,18 @@ public static TableTunnel.DownloadSession getDownloadSession(Config pluginConfig
return session;
}

public static void initTableOrPartition(Config pluginConfig) {
Boolean overwrite = OVERWRITE.defaultValue();
if (pluginConfig.hasPath(OVERWRITE.key())) {
overwrite = pluginConfig.getBoolean(OVERWRITE.key());
}
public static Table parseTable(Odps odps, String projectName, String tableName) {
try {
Table table = MaxcomputeUtil.getTable(pluginConfig);
if (pluginConfig.hasPath(PARTITION_SPEC.key())) {
PartitionSpec partitionSpec =
new PartitionSpec(pluginConfig.getString(PARTITION_SPEC.key()));
if (overwrite) {
try {
table.deletePartition(partitionSpec, true);
} catch (NullPointerException e) {
log.debug("NullPointerException when delete table partition");
}
}
table.createPartition(partitionSpec, true);
} else {
if (overwrite) {
try {
table.truncate();
} catch (NullPointerException e) {
log.debug("NullPointerException when truncate table");
}
}
}
} catch (Exception e) {
Table table = odps.tables().get(projectName, tableName);
table.reload();
return table;
} catch (Exception ex) {
throw new MaxcomputeConnectorException(
CommonErrorCodeDeprecated.READER_OPERATION_FAILED, e);
CommonErrorCodeDeprecated.TABLE_SCHEMA_GET_FAILED,
String.format(
"get table %s.%s info with exception, error:%s",
projectName, tableName, ex.getMessage()),
ex);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.maxcompute.catalog;

import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.catalog.PrimaryKey;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.BasicType;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

@Slf4j
public class MaxComputeCreateTableTest {

@Test
public void test() {

List<Column> columns = new ArrayList<>();

columns.add(PhysicalColumn.of("id", BasicType.LONG_TYPE, (Long) null, true, null, ""));
columns.add(PhysicalColumn.of("name", BasicType.STRING_TYPE, (Long) null, true, null, ""));
columns.add(
PhysicalColumn.of(
"age", BasicType.INT_TYPE, (Long) null, true, null, "test comment"));
columns.add(PhysicalColumn.of("score", BasicType.INT_TYPE, (Long) null, true, null, ""));
columns.add(PhysicalColumn.of("gender", BasicType.BYTE_TYPE, (Long) null, true, null, ""));
columns.add(
PhysicalColumn.of("create_time", BasicType.LONG_TYPE, (Long) null, true, null, ""));

String result =
MaxComputeCatalogUtil.getCreateTableStatement(
"CREATE TABLE IF NOT EXISTS `${database}`.`${table}` ( \n"
+ "${rowtype_primary_key} , \n"
+ "${rowtype_unique_key} , \n"
+ "`create_time` DATETIME NOT NULL , \n"
+ "${rowtype_fields} \n"
+ ") ENGINE=OLAP \n"
+ "PRIMARY KEY(${rowtype_primary_key},`create_time`) \n"
+ "PARTITION BY RANGE (`create_time`)( \n"
+ " PARTITION p20230329 VALUES LESS THAN (\"2023-03-29\") \n"
+ ") \n"
+ "DISTRIBUTED BY HASH (${rowtype_primary_key}) \n"
+ "PROPERTIES (\n"
+ "\"replication_allocation\" = \"tag.location.default: 1\",\n"
+ "\"in_memory\" = \"false\",\n"
+ "\"storage_format\" = \"V2\",\n"
+ "\"disable_auto_compaction\" = \"false\"\n"
+ ");",
TablePath.of("test1.test2"),
CatalogTable.of(
TableIdentifier.of("test", "test1", "test2"),
TableSchema.builder()
.primaryKey(PrimaryKey.of("", Arrays.asList("id", "age")))
.constraintKey(
Arrays.asList(
ConstraintKey.of(
ConstraintKey.ConstraintType
.UNIQUE_KEY,
"unique_key",
Collections.singletonList(
ConstraintKey
.ConstraintKeyColumn
.of(
"name",
ConstraintKey
.ColumnSortType
.DESC))),
ConstraintKey.of(
ConstraintKey.ConstraintType
.UNIQUE_KEY,
"unique_key2",
Collections.singletonList(
ConstraintKey
.ConstraintKeyColumn
.of(
"score",
ConstraintKey
.ColumnSortType
.ASC)))))
.columns(columns)
.build(),
Collections.emptyMap(),
Collections.emptyList(),
""));
Assertions.assertEquals(
result,
"CREATE TABLE IF NOT EXISTS `test1`.`test2` ( \n"
+ "`id` BIGINT NULL ,`age` INT NULL COMMENT 'test comment' , \n"
+ "`name` STRING NULL ,`score` INT NULL , \n"
+ "`create_time` DATETIME NOT NULL , \n"
+ "`gender` TINYINT NULL \n"
+ ") ENGINE=OLAP \n"
+ "PRIMARY KEY(`id`,`age`,`create_time`) \n"
+ "PARTITION BY RANGE (`create_time`)( \n"
+ " PARTITION p20230329 VALUES LESS THAN (\"2023-03-29\") \n"
+ ") \n"
+ "DISTRIBUTED BY HASH (`id`,`age`) \n"
+ "PROPERTIES (\n"
+ "\"replication_allocation\" = \"tag.location.default: 1\",\n"
+ "\"in_memory\" = \"false\",\n"
+ "\"storage_format\" = \"V2\",\n"
+ "\"disable_auto_compaction\" = \"false\"\n"
+ ");");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.maxcompute.catalog;

import org.apache.seatunnel.shade.com.google.common.collect.Lists;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.catalog.PreviewResult;
import org.apache.seatunnel.api.table.catalog.PrimaryKey;
import org.apache.seatunnel.api.table.catalog.SQLPreviewResult;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.BasicType;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.HashMap;
import java.util.Optional;

public class PreviewActionTest {

private static final CatalogTable CATALOG_TABLE =
CatalogTable.of(
TableIdentifier.of("catalog", "database", "table"),
TableSchema.builder()
.primaryKey(PrimaryKey.of("", Lists.newArrayList("id")))
.columns(
Lists.newArrayList(
PhysicalColumn.of(
"id",
BasicType.LONG_TYPE,
(Long) null,
false,
null,
""),
PhysicalColumn.of(
"test",
BasicType.STRING_TYPE,
(Long) null,
true,
null,
"")))
.build(),
Collections.emptyMap(),
Collections.emptyList(),
"comment");

@Test
public void testDorisPreviewAction() {
MaxComputeCatalogFactory factory = new MaxComputeCatalogFactory();
Catalog catalog =
factory.createCatalog(
"test",
ReadonlyConfig.fromMap(
new HashMap<String, Object>() {
{
put("username", "root");
put("password", "root");
}
}));
assertPreviewResult(
catalog,
Catalog.ActionType.DROP_TABLE,
"DROP TABLE IF EXISTS testddatabase.testtable;",
Optional.empty());
assertPreviewResult(
catalog,
Catalog.ActionType.CREATE_TABLE,
"CREATE TABLE IF NOT EXISTS `testtable` (\n"
+ "`id` BIGINT NOT NULL ,\n"
+ "`test` STRING NULL \n"
+ ");",
Optional.of(CATALOG_TABLE));
}

private void assertPreviewResult(
Catalog catalog,
Catalog.ActionType actionType,
String expectedSql,
Optional<CatalogTable> catalogTable) {
PreviewResult previewResult =
catalog.previewAction(
actionType, TablePath.of("testddatabase.testtable"), catalogTable);
Assertions.assertInstanceOf(SQLPreviewResult.class, previewResult);
Assertions.assertEquals(expectedSql, ((SQLPreviewResult) previewResult).getSql());
}
}

Large diffs are not rendered by default.