diff --git a/release-note.md b/release-note.md index ed8c2a16d83..3f0bb601738 100644 --- a/release-note.md +++ b/release-note.md @@ -57,6 +57,7 @@ - [ClickHouse] Fix clickhouse write cdc changelog update event #3951 - [ClickHouse] Fix connector source snapshot state NPE #4027 - [Kudu] Fix connector source snapshot state NPE #4027 +- [Maxcompute] Fix some data type parse fail #3894 ### Zeta Engine - [Checkpoint] Fix Checkpoint Continue Trigger After Job CANCELED #3808 diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/com/aliyun/odps/type/SimpleArrayTypeInfo.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/com/aliyun/odps/type/SimpleArrayTypeInfo.java new file mode 100644 index 00000000000..76fddc10c4b --- /dev/null +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/com/aliyun/odps/type/SimpleArrayTypeInfo.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.aliyun.odps.type; + +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException; + +import com.aliyun.odps.OdpsType; + +public class SimpleArrayTypeInfo implements ArrayTypeInfo { + private final TypeInfo valueType; + + SimpleArrayTypeInfo(TypeInfo typeInfo) { + if (typeInfo == null) { + throw new MaxcomputeConnectorException( + CommonErrorCode.UNSUPPORTED_DATA_TYPE, "Invalid element type."); + } else { + this.valueType = typeInfo; + } + } + + public String getTypeName() { + return this.getOdpsType().name() + "<" + this.valueType.getTypeName() + ">"; + } + + public TypeInfo getElementTypeInfo() { + return this.valueType; + } + + public OdpsType getOdpsType() { + return OdpsType.ARRAY; + } + + public String toString() { + return this.getTypeName(); + } +} diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/com/aliyun/odps/type/SimpleMapTypeInfo.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/com/aliyun/odps/type/SimpleMapTypeInfo.java new file mode 100644 index 00000000000..6cedcd22bba --- /dev/null +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/com/aliyun/odps/type/SimpleMapTypeInfo.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.aliyun.odps.type; + +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException; + +import com.aliyun.odps.OdpsType; + +public class SimpleMapTypeInfo implements MapTypeInfo { + private final TypeInfo keyType; + private final TypeInfo valueType; + + SimpleMapTypeInfo(TypeInfo keyType, TypeInfo valueType) { + if (keyType != null && valueType != null) { + this.keyType = keyType; + this.valueType = valueType; + } else { + throw new MaxcomputeConnectorException( + CommonErrorCode.UNSUPPORTED_DATA_TYPE, "Invalid key or value type for map."); + } + } + + public String getTypeName() { + return this.getOdpsType().name() + + "<" + + this.keyType.getTypeName() + + "," + + this.valueType.getTypeName() + + ">"; + } + + public TypeInfo getKeyTypeInfo() { + return this.keyType; + } + + public TypeInfo getValueTypeInfo() { + return this.valueType; + } + + public OdpsType getOdpsType() { + return OdpsType.MAP; + } + + public String toString() { + return this.getTypeName(); + } +} diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/com/aliyun/odps/type/SimpleStructTypeInfo.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/com/aliyun/odps/type/SimpleStructTypeInfo.java new file mode 100644 index 00000000000..a6283552217 --- /dev/null +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/com/aliyun/odps/type/SimpleStructTypeInfo.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.aliyun.odps.type; + +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException; + +import com.aliyun.odps.OdpsType; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +public class SimpleStructTypeInfo implements StructTypeInfo { + private final List<String> fieldNames; + private final List<TypeInfo> fieldTypeInfos; + + SimpleStructTypeInfo(List<String> names, List<TypeInfo> typeInfos) { + this.validateParameters(names, typeInfos); + this.fieldNames = this.toLowerCase(names); + this.fieldTypeInfos = new ArrayList(typeInfos); + } + + private List<String> toLowerCase(List<String> names) { + List<String> lowerNames = new ArrayList(names.size()); + Iterator var3 = names.iterator(); + + while (var3.hasNext()) { + String name = (String) var3.next(); + lowerNames.add(name.toLowerCase()); + } + + return lowerNames; + } + + private void validateParameters(List<String> names, List<TypeInfo> typeInfos) { + if (names != null && typeInfos != null && !names.isEmpty() && !typeInfos.isEmpty()) { + if (names.size() != typeInfos.size()) { + throw new MaxcomputeConnectorException( + CommonErrorCode.UNSUPPORTED_DATA_TYPE, + "The amount of field names must be equal to the amount of field types."); + } + } else { + throw new MaxcomputeConnectorException( + CommonErrorCode.UNSUPPORTED_DATA_TYPE, + "Invalid name or element type for struct."); + } + } + + public String getTypeName() { + StringBuilder stringBuilder = new StringBuilder(this.getOdpsType().name()); + stringBuilder.append("<"); + + for (int i = 0; i < this.fieldNames.size(); ++i) { + if (i > 0) { + stringBuilder.append(","); + } + + stringBuilder.append((String) this.fieldNames.get(i)); + stringBuilder.append(":"); + stringBuilder.append(((TypeInfo) this.fieldTypeInfos.get(i)).getTypeName()); + } + + stringBuilder.append(">"); + return stringBuilder.toString(); + } + + public List<String> getFieldNames() { + return this.fieldNames; + } + + public List<TypeInfo> getFieldTypeInfos() { + return this.fieldTypeInfos; + } + + public int getFieldCount() { + return this.fieldNames.size(); + } + + public OdpsType getOdpsType() { + return OdpsType.STRUCT; + } + + public String toString() { + return this.getTypeName(); + } +} diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java index 7fce0961184..cf6d639ed12 100644 --- a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java @@ -63,6 +63,6 @@ public SeaTunnelDataType<SeaTunnelRow> getConsumedType() { @Override public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context) { - return new MaxcomputeWriter(this.typeInfo, this.pluginConfig); + return new MaxcomputeWriter(this.pluginConfig); } } diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java index a737ef992d6..c3d157fbf2f 100644 --- a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java @@ -20,7 +20,6 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.exception.CommonErrorCode; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; import org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException; @@ -43,23 +42,18 @@ @Slf4j public class MaxcomputeWriter extends AbstractSinkWriter<SeaTunnelRow, Void> { - private final SeaTunnelRowType seaTunnelRowType; private final RecordWriter recordWriter; private final TableTunnel.UploadSession session; private final TableSchema tableSchema; - private Config pluginConfig; - - public MaxcomputeWriter(SeaTunnelRowType seaTunnelRowType, Config pluginConfig) { - this.seaTunnelRowType = seaTunnelRowType; - this.pluginConfig = pluginConfig; + public MaxcomputeWriter(Config pluginConfig) { try { Table table = MaxcomputeUtil.getTable(pluginConfig); this.tableSchema = table.getSchema(); TableTunnel tunnel = MaxcomputeUtil.getTableTunnel(pluginConfig); - if (this.pluginConfig.hasPath(PARTITION_SPEC.key())) { + if (pluginConfig.hasPath(PARTITION_SPEC.key())) { PartitionSpec partitionSpec = - new PartitionSpec(this.pluginConfig.getString(PARTITION_SPEC.key())); + new PartitionSpec(pluginConfig.getString(PARTITION_SPEC.key())); session = tunnel.createUploadSession( pluginConfig.getString(PROJECT.key()), @@ -80,8 +74,7 @@ public MaxcomputeWriter(SeaTunnelRowType seaTunnelRowType, Config pluginConfig) @Override public void write(SeaTunnelRow seaTunnelRow) throws IOException { - Record record = - MaxcomputeTypeMapper.getMaxcomputeRowData(seaTunnelRow, this.seaTunnelRowType); + Record record = MaxcomputeTypeMapper.getMaxcomputeRowData(seaTunnelRow, this.tableSchema); recordWriter.write(record); } diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java index 815b337afe3..ac772140150 100644 --- a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java @@ -32,178 +32,58 @@ import org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException; import com.aliyun.odps.Column; -import com.aliyun.odps.OdpsType; import com.aliyun.odps.Table; import com.aliyun.odps.TableSchema; import com.aliyun.odps.data.ArrayRecord; +import com.aliyun.odps.data.Binary; +import com.aliyun.odps.data.Char; import com.aliyun.odps.data.Record; +import com.aliyun.odps.data.SimpleStruct; +import com.aliyun.odps.data.Varchar; import com.aliyun.odps.type.ArrayTypeInfo; import com.aliyun.odps.type.DecimalTypeInfo; import com.aliyun.odps.type.MapTypeInfo; +import com.aliyun.odps.type.SimpleArrayTypeInfo; +import com.aliyun.odps.type.SimpleMapTypeInfo; +import com.aliyun.odps.type.SimpleStructTypeInfo; import com.aliyun.odps.type.StructTypeInfo; import com.aliyun.odps.type.TypeInfo; import lombok.extern.slf4j.Slf4j; import java.io.Serializable; -import java.nio.ByteBuffer; -import java.sql.SQLException; -import java.time.Instant; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.time.LocalDate; import java.time.LocalDateTime; import java.time.ZoneId; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; -import static com.aliyun.odps.OdpsType.ARRAY; -import static com.aliyun.odps.OdpsType.BIGINT; -import static com.aliyun.odps.OdpsType.BINARY; -import static com.aliyun.odps.OdpsType.BOOLEAN; -import static com.aliyun.odps.OdpsType.DATE; -import static com.aliyun.odps.OdpsType.DECIMAL; -import static com.aliyun.odps.OdpsType.DOUBLE; -import static com.aliyun.odps.OdpsType.FLOAT; -import static com.aliyun.odps.OdpsType.INT; -import static com.aliyun.odps.OdpsType.MAP; -import static com.aliyun.odps.OdpsType.SMALLINT; -import static com.aliyun.odps.OdpsType.STRING; -import static com.aliyun.odps.OdpsType.TIMESTAMP; -import static com.aliyun.odps.OdpsType.TINYINT; -import static com.aliyun.odps.OdpsType.VOID; - @Slf4j public class MaxcomputeTypeMapper implements Serializable { - private static SeaTunnelDataType<?> maxcomputeType2SeaTunnelType(TypeInfo typeInfo) { - switch (typeInfo.getOdpsType()) { - case BIGINT: - return BasicType.LONG_TYPE; - case DOUBLE: - return BasicType.DOUBLE_TYPE; - case BOOLEAN: - return BasicType.BOOLEAN_TYPE; - case DECIMAL: - return mappingDecimalType((DecimalTypeInfo) typeInfo); - case MAP: - return mappingMapType((MapTypeInfo) typeInfo); - case ARRAY: - return mappingListType((ArrayTypeInfo) typeInfo); - case VOID: - return BasicType.VOID_TYPE; - case TINYINT: - case SMALLINT: - case INT: - return BasicType.INT_TYPE; - case FLOAT: - return BasicType.FLOAT_TYPE; - case CHAR: - case VARCHAR: - case STRING: - return BasicType.STRING_TYPE; - case DATE: - return LocalTimeType.LOCAL_DATE_TYPE; - case TIMESTAMP: - case DATETIME: - return LocalTimeType.LOCAL_DATE_TIME_TYPE; - case BINARY: - return PrimitiveByteArrayType.INSTANCE; - case STRUCT: - return mappingStructType((StructTypeInfo) typeInfo); - case INTERVAL_DAY_TIME: - return LocalTimeType.LOCAL_TIME_TYPE; - case INTERVAL_YEAR_MONTH: - default: - throw new MaxcomputeConnectorException( - CommonErrorCode.UNSUPPORTED_DATA_TYPE, - String.format( - "Doesn't support Maxcompute type '%s' .", typeInfo.getTypeName())); - } - } - - private static DecimalType mappingDecimalType(DecimalTypeInfo decimalTypeInfo) { - return new DecimalType(decimalTypeInfo.getPrecision(), decimalTypeInfo.getScale()); - } - - private static MapType mappingMapType(MapTypeInfo mapTypeInfo) { - return new MapType( - maxcomputeType2SeaTunnelType(mapTypeInfo.getKeyTypeInfo()), - maxcomputeType2SeaTunnelType(mapTypeInfo.getValueTypeInfo())); - } - - private static ArrayType mappingListType(ArrayTypeInfo arrayTypeInfo) { - switch (arrayTypeInfo.getOdpsType()) { - case BOOLEAN: - return ArrayType.BOOLEAN_ARRAY_TYPE; - case INT: - return ArrayType.INT_ARRAY_TYPE; - case BIGINT: - return ArrayType.LONG_ARRAY_TYPE; - case FLOAT: - return ArrayType.FLOAT_ARRAY_TYPE; - case DOUBLE: - return ArrayType.DOUBLE_ARRAY_TYPE; - case STRING: - return ArrayType.STRING_ARRAY_TYPE; - default: - throw new MaxcomputeConnectorException( - CommonErrorCode.UNSUPPORTED_DATA_TYPE, - String.format( - "Doesn't support Maxcompute type '%s' .", - arrayTypeInfo.getTypeName())); - } - } - - private static SeaTunnelRowType mappingStructType(StructTypeInfo structType) { - List<TypeInfo> fields = structType.getFieldTypeInfos(); - List<String> fieldNames = new ArrayList<>(fields.size()); - List<SeaTunnelDataType<?>> fieldTypes = new ArrayList<>(fields.size()); - for (TypeInfo field : fields) { - fieldNames.add(field.getTypeName()); - fieldTypes.add(maxcomputeType2SeaTunnelType(field)); + public static SeaTunnelRow getSeaTunnelRowData(Record rs, SeaTunnelRowType typeInfo) { + List<Object> fields = new ArrayList<>(); + SeaTunnelDataType<?>[] seaTunnelDataTypes = typeInfo.getFieldTypes(); + for (int i = 0; i < rs.getColumns().length; i++) { + fields.add(resolveObject2SeaTunnel(rs.get(i), seaTunnelDataTypes[i])); } - return new SeaTunnelRowType( - fieldNames.toArray(new String[0]), fieldTypes.toArray(new SeaTunnelDataType[0])); + return new SeaTunnelRow(fields.toArray()); } - private static OdpsType seaTunnelType2MaxcomputeType(SeaTunnelDataType<?> seaTunnelDataType) { - switch (seaTunnelDataType.getSqlType()) { - case ARRAY: - return ARRAY; - case MAP: - return MAP; - case STRING: - return STRING; - case BOOLEAN: - return BOOLEAN; - case TINYINT: - return TINYINT; - case SMALLINT: - return SMALLINT; - case INT: - return INT; - case BIGINT: - return BIGINT; - case FLOAT: - return FLOAT; - case DOUBLE: - return DOUBLE; - case DECIMAL: - return DECIMAL; - case BYTES: - return BINARY; - case DATE: - return DATE; - case TIMESTAMP: - return TIMESTAMP; - case NULL: - return VOID; - case TIME: - default: - throw new MaxcomputeConnectorException( - CommonErrorCode.UNSUPPORTED_DATA_TYPE, - String.format( - "Doesn't support SeaTunnelDataType type '%s' .", - seaTunnelDataType.getSqlType())); + public static Record getMaxcomputeRowData(SeaTunnelRow seaTunnelRow, TableSchema tableSchema) { + ArrayRecord arrayRecord = new ArrayRecord(tableSchema); + List<Column> columns = tableSchema.getColumns(); + for (int i = 0; i < seaTunnelRow.getFields().length; i++) { + arrayRecord.set( + i, + resolveObject2Maxcompute( + seaTunnelRow.getField(i), columns.get(i).getTypeInfo())); } + return arrayRecord; } public static SeaTunnelRowType getSeaTunnelRowType(Config pluginConfig) { @@ -216,7 +96,7 @@ public static SeaTunnelRowType getSeaTunnelRowType(Config pluginConfig) { fieldNames.add(tableSchema.getColumns().get(i).getName()); TypeInfo maxcomputeTypeInfo = tableSchema.getColumns().get(i).getTypeInfo(); SeaTunnelDataType<?> seaTunnelDataType = - maxcomputeType2SeaTunnelType(maxcomputeTypeInfo); + maxcompute2SeaTunnelType(maxcomputeTypeInfo); seaTunnelDataTypes.add(seaTunnelDataType); } } catch (Exception e) { @@ -227,26 +107,96 @@ public static SeaTunnelRowType getSeaTunnelRowType(Config pluginConfig) { seaTunnelDataTypes.toArray(new SeaTunnelDataType<?>[seaTunnelDataTypes.size()])); } - public static TableSchema seaTunnelRowType2TableSchema(SeaTunnelRowType seaTunnelRowType) { - TableSchema tableSchema = new TableSchema(); - for (int i = 0; i < seaTunnelRowType.getFieldNames().length; i++) { - OdpsType odpsType = seaTunnelType2MaxcomputeType(seaTunnelRowType.getFieldType(i)); - Column column = new Column(seaTunnelRowType.getFieldName(i), odpsType); - tableSchema.addColumn(column); + private static SeaTunnelDataType<?> maxcompute2SeaTunnelType(TypeInfo typeInfo) { + switch (typeInfo.getOdpsType()) { + case MAP: + MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo; + return new MapType( + maxcompute2SeaTunnelType(mapTypeInfo.getKeyTypeInfo()), + maxcompute2SeaTunnelType(mapTypeInfo.getValueTypeInfo())); + case ARRAY: + ArrayTypeInfo arrayTypeInfo = (ArrayTypeInfo) typeInfo; + switch (arrayTypeInfo.getElementTypeInfo().getOdpsType()) { + case BOOLEAN: + return ArrayType.BOOLEAN_ARRAY_TYPE; + case INT: + return ArrayType.INT_ARRAY_TYPE; + case BIGINT: + return ArrayType.LONG_ARRAY_TYPE; + case FLOAT: + return ArrayType.FLOAT_ARRAY_TYPE; + case DOUBLE: + return ArrayType.DOUBLE_ARRAY_TYPE; + case STRING: + return ArrayType.STRING_ARRAY_TYPE; + default: + throw new MaxcomputeConnectorException( + CommonErrorCode.UNSUPPORTED_DATA_TYPE, + String.format( + "SeaTunnel type not support this type [%s] now", + typeInfo.getTypeName())); + } + case STRUCT: + StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo; + List<TypeInfo> fields = structTypeInfo.getFieldTypeInfos(); + List<String> fieldNames = new ArrayList<>(fields.size()); + List<SeaTunnelDataType<?>> fieldTypes = new ArrayList<>(fields.size()); + for (TypeInfo field : fields) { + fieldNames.add(field.getTypeName()); + fieldTypes.add(maxcompute2SeaTunnelType(field)); + } + return new SeaTunnelRowType( + fieldNames.toArray(new String[0]), + fieldTypes.toArray(new SeaTunnelDataType[0])); + case TINYINT: + return BasicType.BYTE_TYPE; + case SMALLINT: + return BasicType.SHORT_TYPE; + case INT: + return BasicType.INT_TYPE; + case BIGINT: + return BasicType.LONG_TYPE; + case BINARY: + return PrimitiveByteArrayType.INSTANCE; + case FLOAT: + return BasicType.FLOAT_TYPE; + case DOUBLE: + return BasicType.DOUBLE_TYPE; + case DECIMAL: + DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) typeInfo; + return new DecimalType(decimalTypeInfo.getPrecision(), decimalTypeInfo.getScale()); + case VARCHAR: + case CHAR: + case STRING: + return BasicType.STRING_TYPE; + case DATE: + return LocalTimeType.LOCAL_DATE_TYPE; + case DATETIME: + case TIMESTAMP: + return LocalTimeType.LOCAL_DATE_TIME_TYPE; + case BOOLEAN: + return BasicType.BOOLEAN_TYPE; + case VOID: + return BasicType.VOID_TYPE; + case INTERVAL_DAY_TIME: + case INTERVAL_YEAR_MONTH: + default: + throw new MaxcomputeConnectorException( + CommonErrorCode.UNSUPPORTED_DATA_TYPE, + String.format( + "SeaTunnel type not support this type [%s] now", + typeInfo.getTypeName())); } - return tableSchema; } - private static Object resolveObject(Object field, SeaTunnelDataType<?> fieldType) { + private static Object resolveObject2SeaTunnel(Object field, SeaTunnelDataType<?> fieldType) { if (field == null) { return null; } switch (fieldType.getSqlType()) { case ARRAY: ArrayList<Object> origArray = new ArrayList<>(); - java.util.Arrays.stream(((Record) field).getColumns()) - .iterator() - .forEachRemaining(origArray::add); + ((ArrayList) field).iterator().forEachRemaining(origArray::add); SeaTunnelDataType<?> elementType = ((ArrayType<?, ?>) fieldType).getElementType(); switch (elementType.getSqlType()) { case STRING: @@ -262,13 +212,11 @@ private static Object resolveObject(Object field, SeaTunnelDataType<?> fieldType case DOUBLE: return origArray.toArray(new Double[0]); default: - String errorMsg = - String.format( - "SeaTunnel array type not support this type [%s] now", - fieldType.getSqlType()); throw new MaxcomputeConnectorException( CommonErrorCode.UNSUPPORTED_DATA_TYPE, - "SeaTunnel not support this data type now"); + String.format( + "SeaTunnel type not support this type [%s] now", + fieldType.getSqlType().name())); } case MAP: HashMap<Object, Object> dataMap = new HashMap<>(); @@ -278,60 +226,147 @@ private static Object resolveObject(Object field, SeaTunnelDataType<?> fieldType origDataMap.forEach( (key, value) -> dataMap.put( - resolveObject(key, keyType), - resolveObject(value, valueType))); + resolveObject2SeaTunnel(key, keyType), + resolveObject2SeaTunnel(value, valueType))); return dataMap; - case BOOLEAN: + case ROW: + SeaTunnelDataType<?>[] fieldTypes = ((SeaTunnelRowType) fieldType).getFieldTypes(); + Object[] objects = new Object[fieldTypes.length]; + List<Object> fieldValues = ((SimpleStruct) field).getFieldValues(); + for (int i = 0; i < fieldTypes.length; i++) { + Object object = resolveObject2SeaTunnel(fieldValues.get(i), fieldTypes[i]); + objects[i] = object; + } + return new SeaTunnelRow(objects); + case TINYINT: + case SMALLINT: case INT: - case BIGINT: case FLOAT: case DOUBLE: - case DECIMAL: - case DATE: + case BIGINT: + case BOOLEAN: return field; - case STRING: - return field.toString(); - case TINYINT: - return Byte.parseByte(field.toString()); - case SMALLINT: - return Short.parseShort(field.toString()); - case NULL: - return null; case BYTES: - ByteBuffer buffer = (ByteBuffer) field; - byte[] bytes = new byte[buffer.remaining()]; - buffer.get(bytes, 0, bytes.length); - return bytes; + return ((Binary) field).data(); + case DECIMAL: + return null; + case STRING: + if (field instanceof byte[]) { + return new String((byte[]) field); + } + if (field instanceof Char) { + return rtrim(String.valueOf(field)); + } + return String.valueOf(field); + case DATE: + if (field instanceof LocalDate) { + return Date.valueOf((LocalDate) field); + } + return ((Date) field).toLocalDate(); + case TIME: + return ((Time) field).toLocalTime(); case TIMESTAMP: - Instant instant = Instant.ofEpochMilli((long) field); - return LocalDateTime.ofInstant(instant, ZoneId.of("+8")); + return ((java.util.Date) field) + .toInstant() + .atZone(ZoneId.systemDefault()) + .toLocalDateTime(); + case NULL: default: - // do nothing - // never got in there throw new MaxcomputeConnectorException( CommonErrorCode.UNSUPPORTED_DATA_TYPE, - "SeaTunnel not support this data type now"); + String.format( + "SeaTunnel type not support this type [%s] now", + fieldType.getSqlType().name())); } } - public static SeaTunnelRow getSeaTunnelRowData(Record rs, SeaTunnelRowType typeInfo) - throws SQLException { - List<Object> fields = new ArrayList<>(); - SeaTunnelDataType<?>[] seaTunnelDataTypes = typeInfo.getFieldTypes(); - for (int i = 0; i < rs.getColumns().length; i++) { - fields.add(resolveObject(rs.get(i), seaTunnelDataTypes[i])); + private static Object resolveObject2Maxcompute(Object field, TypeInfo typeInfo) { + if (field == null) { + return null; + } + switch (typeInfo.getOdpsType()) { + case ARRAY: + ArrayList<Object> origArray = new ArrayList<>(); + Arrays.stream((Object[]) field).iterator().forEachRemaining(origArray::add); + switch (((SimpleArrayTypeInfo) typeInfo).getElementTypeInfo().getOdpsType()) { + case STRING: + case BOOLEAN: + case INT: + case BIGINT: + case FLOAT: + case DOUBLE: + return origArray; + default: + throw new MaxcomputeConnectorException( + CommonErrorCode.UNSUPPORTED_DATA_TYPE, + String.format( + "Maxcompute type not support this type [%s] now", + typeInfo.getTypeName())); + } + case MAP: + HashMap<Object, Object> dataMap = new HashMap<>(); + TypeInfo keyTypeInfo = ((SimpleMapTypeInfo) typeInfo).getKeyTypeInfo(); + TypeInfo valueTypeInfo = ((SimpleMapTypeInfo) typeInfo).getValueTypeInfo(); + HashMap<Object, Object> origDataMap = (HashMap<Object, Object>) field; + origDataMap.forEach( + (key, value) -> + dataMap.put( + resolveObject2Maxcompute(key, keyTypeInfo), + resolveObject2Maxcompute(value, valueTypeInfo))); + return origDataMap; + case STRUCT: + Object[] fields = ((SeaTunnelRow) field).getFields(); + List<TypeInfo> typeInfos = ((SimpleStructTypeInfo) typeInfo).getFieldTypeInfos(); + ArrayList<Object> origStruct = new ArrayList<>(); + for (int i = 0; i < fields.length; i++) { + origStruct.add(resolveObject2Maxcompute(fields[i], typeInfos.get(i))); + } + return new SimpleStruct((StructTypeInfo) typeInfo, origStruct); + case TINYINT: + case SMALLINT: + case INT: + case FLOAT: + case DOUBLE: + case BIGINT: + case BOOLEAN: + return field; + case BINARY: + return new Binary((byte[]) field); + case DECIMAL: + return null; + case VARCHAR: + return new Varchar((String) field); + case CHAR: + return new Char((String) field); + case STRING: + if (field instanceof byte[]) { + return new String((byte[]) field); + } + if (field instanceof Char) { + return rtrim(String.valueOf(field)); + } + return String.valueOf(field); + case TIMESTAMP: + return Timestamp.valueOf((LocalDateTime) field); + case DATETIME: + return Date.from( + ((LocalDateTime) field).atZone(ZoneId.systemDefault()).toInstant()); + case DATE: + return Date.valueOf((LocalDate) field); + default: + throw new MaxcomputeConnectorException( + CommonErrorCode.UNSUPPORTED_DATA_TYPE, + String.format( + "Maxcompute type not support this type [%s] now", + typeInfo.getTypeName())); } - return new SeaTunnelRow(fields.toArray()); } - public static Record getMaxcomputeRowData( - SeaTunnelRow seaTunnelRow, SeaTunnelRowType seaTunnelRowType) { - TableSchema tableSchema = seaTunnelRowType2TableSchema(seaTunnelRowType); - ArrayRecord arrayRecord = new ArrayRecord(tableSchema); - for (int i = 0; i < seaTunnelRow.getFields().length; i++) { - arrayRecord.set( - i, resolveObject(seaTunnelRow.getField(i), seaTunnelRowType.getFieldType(i))); + private static String rtrim(String s) { + int i = s.length() - 1; + while (i >= 0 && Character.isWhitespace(s.charAt(i))) { + i--; } - return arrayRecord; + return s.substring(0, i + 1); } } diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/resources/maxcompute_to_maxcompute.conf b/seatunnel-connectors-v2/connector-maxcompute/src/main/resources/maxcompute_to_maxcompute.conf index 4038766194a..71797a6cb57 100644 --- a/seatunnel-connectors-v2/connector-maxcompute/src/main/resources/maxcompute_to_maxcompute.conf +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/resources/maxcompute_to_maxcompute.conf @@ -18,17 +18,44 @@ ###### This config file is a demonstration of streaming processing in seatunnel config ###### +###### +###### Sample of maxcompute data type +###### +# DROP TABLE IF EXISTS fake_source; +# +# CREATE TABLE IF NOT EXISTS fake_source(c1 TINYINT,c2 SMALLINT,c3 INT,c4 BIGINT,c5 FLOAT ,c6 DOUBLE +# ,c7 VARCHAR(10),c8 CHAR(10),c9 STRING,c10 DATE,c11 DATETIME ,c12 TIMESTAMP ,c13 BOOLEAN,c14 BINARY +# ,c15 MAP<STRING,STRING>,c16 ARRAY<INT>,c17 STRUCT<s1:STRING,s2:INT,s3:ARRAY<FLOAT>>); +# +# INSERT INTO fake_source(c1, c2, c3, c4, c5, c6, c7,c8,c9,c10,c11,c12,c13,c14,c15,c16,c17) VALUES ( +# CAST(-128 AS TINYINT ),CAST(-32768 AS SMALLINT ) ,0,10000000000000,0.01,0.0000000000000001 +# ,CAST("varchar" as VARCHAR(10)),CAST("char" as CHAR(10)),"hello0",CAST("2022-12-31" as DATE ) +# ,CAST("2022-12-31 23:59:59" as DATETIME ),CAST("2022-12-31 23:59:59.999" as TIMESTAMP ),FALSE,CAST("bytes" AS BINARY ) +# ,MAP("int",1,"str","hello"),ARRAY("11","22"),named_struct("s1","s1","s2",100,"s3",array(1.1, 2.2))); +# +# SELECT * FROM fake_source; +# +# DROP TABLE IF EXISTS fake_sink; +# +# CREATE TABLE IF NOT EXISTS fake_sink LIKE fake_source; +# +# SELECT * FROM fake_sink; +# + env { - # You can set flink configuration here - execution.parallelism = 2 - job.mode = "STREAMING" - #execution.checkpoint.interval = 10000 - #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" + # You can set spark configuration here + # see available properties defined by spark: https://spark.apache.org/docs/latest/configuration.html#available-properties + #job.mode = BATCH + job.name = "SeaTunnel" + spark.executor.instances = 1 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local } source { # This is a example source plugin **only for test and demonstrate the feature source plugin** - MaxcomputeSource { + Maxcompute { accessId="<your access id>" accesskey="<your access Key>" endpoint="<http://service.odps.aliyun.com/api>" @@ -53,12 +80,12 @@ transform { } sink { - MaxcomputeSink { + Maxcompute { accessId="<your access id>" accesskey="<your access Key>" endpoint="<http://service.odps.aliyun.com/api>" project="<your project>" - result_table_name="<your table name>" + table_name="<your table name>" #partition_spec="<your partition spec>" #overwrite = false } diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/test/java/BasicTypeToOdpsTypeTest.java b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/BasicTypeToOdpsTypeTest.java index 14392de55d3..87aa33e7b2f 100644 --- a/seatunnel-connectors-v2/connector-maxcompute/src/test/java/BasicTypeToOdpsTypeTest.java +++ b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/BasicTypeToOdpsTypeTest.java @@ -26,20 +26,18 @@ import com.aliyun.odps.Column; import com.aliyun.odps.OdpsType; +import com.aliyun.odps.TableSchema; import com.aliyun.odps.data.ArrayRecord; import com.aliyun.odps.data.Record; import lombok.SneakyThrows; -import java.sql.SQLException; - public class BasicTypeToOdpsTypeTest { private static void testType( String fieldName, SeaTunnelDataType<?> seaTunnelDataType, OdpsType odpsType, - Object object) - throws SQLException { + Object object) { SeaTunnelRowType typeInfo = new SeaTunnelRowType( new String[] {fieldName}, new SeaTunnelDataType<?>[] {seaTunnelDataType}); @@ -47,8 +45,13 @@ private static void testType( ArrayRecord record = new ArrayRecord(new Column[] {new Column(fieldName, odpsType)}); record.set(fieldName, object); + TableSchema tableSchema = new TableSchema(); + for (Column column : record.getColumns()) { + tableSchema.addColumn(column); + } + SeaTunnelRow seaTunnelRow = MaxcomputeTypeMapper.getSeaTunnelRowData(record, typeInfo); - Record tRecord = MaxcomputeTypeMapper.getMaxcomputeRowData(seaTunnelRow, typeInfo); + Record tRecord = MaxcomputeTypeMapper.getMaxcomputeRowData(seaTunnelRow, tableSchema); for (int i = 0; i < tRecord.getColumns().length; i++) { Assertions.assertEquals(record.get(i), tRecord.get(i)); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/pom.xml new file mode 100644 index 00000000000..35e9619b2d4 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/pom.xml @@ -0,0 +1,37 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.seatunnel</groupId> + <artifactId>seatunnel-connector-v2-e2e</artifactId> + <version>${revision}</version> + </parent> + + <artifactId>connector-maxcompute-e2e</artifactId> + + <dependencies> + <!-- SeaTunnel connectors --> + <dependency> + <groupId>org.apache.seatunnel</groupId> + <artifactId>connector-maxcompute</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + </dependencies> + +</project> diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/java/org/apache/seatunnel/e2e/connector/maxcompute/MaxcomputeIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/java/org/apache/seatunnel/e2e/connector/maxcompute/MaxcomputeIT.java new file mode 100644 index 00000000000..749d6c1b5f6 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/java/org/apache/seatunnel/e2e/connector/maxcompute/MaxcomputeIT.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.connector.maxcompute; + +import org.apache.seatunnel.e2e.common.TestResource; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.TestContainer; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; + +import java.io.IOException; + +@Disabled("Disabled because it needs user's personal maxcompute account to run this test") +public class MaxcomputeIT extends TestSuiteBase implements TestResource { + + @BeforeEach + @Override + public void startUp() throws Exception {} + + @AfterEach + @Override + public void tearDown() throws Exception {} + + @TestTemplate + public void testMaxcompute(TestContainer container) throws IOException, InterruptedException { + Container.ExecResult execResult = container.executeJob("/maxcompute_to_fake.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/resources/maxcompute_to_fake.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/resources/maxcompute_to_fake.conf new file mode 100644 index 00000000000..646722435e8 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/resources/maxcompute_to_fake.conf @@ -0,0 +1,54 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +###### +###### This config file is a demonstration of batch processing in SeaTunnel config +###### + +env { + execution.parallelism = 1 + job.mode = "BATCH" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + Maxcompute { + parallelism = 3 + accessId="xxx" + accesskey="xxx" + endpoint="xxx" + project="xxx" + table_name="xxx" + #partition_spec="ds='20220101'" + split_row = 3 + } + + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/category/source-v2 +} + +transform { +} + +sink { + Console { + parallelism = 2 + } + + # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/category/sink-v2 +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml index 83754eae7b0..efeb0b7fcd9 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml @@ -53,6 +53,7 @@ <module>connector-mongodb-e2e</module> <module>connector-selectdb-cloud-e2e</module> <module>connector-hbase-e2e</module> + <module>connector-maxcompute-e2e</module> </modules> <dependencies>