diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterFactory.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterFactory.java index 4206e4fdc65..04b33f33648 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterFactory.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterFactory.java @@ -19,8 +19,6 @@ import org.apache.seatunnel.api.table.converter.BasicTypeDefine; import org.apache.seatunnel.api.table.converter.TypeConverter; -import org.apache.seatunnel.common.exception.CommonError; -import org.apache.seatunnel.connectors.doris.config.DorisConfig; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; @@ -37,7 +35,7 @@ public static TypeConverter getTypeConverter(@NonNull String do || dorisVersion.toLowerCase(Locale.ROOT).startsWith("selectdb-doris-2.")) { return DorisTypeConverterV2.INSTANCE; } else { - throw CommonError.unsupportedVersion(DorisConfig.IDENTIFIER, dorisVersion); + return DorisTypeConverterV2.INSTANCE; } } } diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowConverter.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowConverter.java deleted file mode 100644 index 0fd8e27306c..00000000000 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowConverter.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.connectors.doris.serialize; - -import org.apache.seatunnel.api.table.type.ArrayType; -import org.apache.seatunnel.api.table.type.DecimalArrayType; -import org.apache.seatunnel.api.table.type.MapType; -import org.apache.seatunnel.api.table.type.SeaTunnelDataType; -import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; -import org.apache.seatunnel.common.utils.DateTimeUtils; -import org.apache.seatunnel.common.utils.DateUtils; -import org.apache.seatunnel.common.utils.TimeUtils; -import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException; - -import lombok.Builder; - -import java.math.BigDecimal; -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.time.LocalTime; -import java.util.LinkedHashMap; -import java.util.Map; - -public class SeaTunnelRowConverter { - @Builder.Default private DateUtils.Formatter dateFormatter = DateUtils.Formatter.YYYY_MM_DD; - - @Builder.Default - private DateTimeUtils.Formatter dateTimeFormatter = - DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS_SSSSSS; - - @Builder.Default private TimeUtils.Formatter timeFormatter = TimeUtils.Formatter.HH_MM_SS; - - protected Object convert(SeaTunnelDataType dataType, Object val) { - if (val == null) { - return null; - } - switch (dataType.getSqlType()) { - case TINYINT: - case SMALLINT: - case INT: - case BIGINT: - case FLOAT: - case DOUBLE: - case DECIMAL: - case BOOLEAN: - case STRING: - return val; - case DATE: - return DateUtils.toString((LocalDate) val, dateFormatter); - case TIME: - return TimeUtils.toString((LocalTime) val, timeFormatter); - case TIMESTAMP: - return DateTimeUtils.toString((LocalDateTime) val, dateTimeFormatter); - case ARRAY: - return convertArray(dataType, val); - case MAP: - return convertMap(dataType, val); - case BYTES: - return new String((byte[]) val); - default: - throw new DorisConnectorException( - CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, - dataType + " is not supported "); - } - } - - public Object[] convertArray(SeaTunnelDataType dataType, Object val) { - if (dataType instanceof DecimalArrayType) { - return (BigDecimal[]) val; - } - - SeaTunnelDataType elementType = ((ArrayType) dataType).getElementType(); - Object[] realValue = (Object[]) val; - Object[] newArrayValue = new Object[realValue.length]; - for (int i = 0; i < realValue.length; i++) { - newArrayValue[i] = convert(elementType, realValue[i]); - } - return newArrayValue; - } - - public Map convertMap(SeaTunnelDataType dataType, Object val) { - MapType valueMapType = (MapType) dataType; - Map realValue = (Map) val; - Map newMapValue = new LinkedHashMap<>(); - for (Map.Entry entry : realValue.entrySet()) { - newMapValue.put( - convert(valueMapType.getKeyType(), entry.getKey()), - convert(valueMapType.getValueType(), entry.getValue())); - } - return newMapValue; - } -} diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowSerializer.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowSerializer.java index 4bfc148d86e..0c5b9c0c420 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowSerializer.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowSerializer.java @@ -17,27 +17,28 @@ package org.apache.seatunnel.connectors.doris.serialize; +import org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonGenerator; +import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper; + import org.apache.seatunnel.api.table.type.RowKind; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.doris.sink.writer.LoadConstants; - -import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.seatunnel.format.json.JsonSerializationSchema; +import org.apache.seatunnel.format.text.TextSerializationSchema; import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.HashMap; -import java.util.Map; -import java.util.StringJoiner; +import java.util.Arrays; +import java.util.List; -import static com.google.common.base.Preconditions.checkState; +import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE; import static org.apache.seatunnel.connectors.doris.sink.writer.LoadConstants.CSV; import static org.apache.seatunnel.connectors.doris.sink.writer.LoadConstants.JSON; import static org.apache.seatunnel.connectors.doris.sink.writer.LoadConstants.NULL_VALUE; -public class SeaTunnelRowSerializer extends SeaTunnelRowConverter implements DorisSerializer { +public class SeaTunnelRowSerializer implements DorisSerializer { String type; - private ObjectMapper objectMapper; private final SeaTunnelRowType seaTunnelRowType; private final String fieldDelimiter; private final boolean enableDelete; @@ -51,48 +52,29 @@ public SeaTunnelRowSerializer( this.seaTunnelRowType = seaTunnelRowType; this.fieldDelimiter = fieldDelimiter; this.enableDelete = enableDelete; - if (JSON.equals(type)) { - objectMapper = new ObjectMapper(); - } } - @Override - public byte[] serialize(SeaTunnelRow seaTunnelRow) throws IOException { - String valString; - if (JSON.equals(type)) { - valString = buildJsonString(seaTunnelRow); - } else if (CSV.equals(type)) { - valString = buildCSVString(seaTunnelRow); - } else { - throw new IllegalArgumentException("The type " + type + " is not supported!"); - } - return valString.getBytes(StandardCharsets.UTF_8); + public byte[] buildJsonString(SeaTunnelRow row, SeaTunnelRowType seaTunnelRowType) + throws IOException { + + JsonSerializationSchema jsonSerializationSchema = + new JsonSerializationSchema(seaTunnelRowType, NULL_VALUE); + ObjectMapper mapper = jsonSerializationSchema.getMapper(); + mapper.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, true); + return jsonSerializationSchema.serialize(row); } - public String buildJsonString(SeaTunnelRow row) throws IOException { - Map rowMap = new HashMap<>(row.getFields().length); + public byte[] buildCSVString(SeaTunnelRow row, SeaTunnelRowType seaTunnelRowType) + throws IOException { - for (int i = 0; i < row.getFields().length; i++) { - Object value = convert(seaTunnelRowType.getFieldType(i), row.getField(i)); - rowMap.put(seaTunnelRowType.getFieldName(i), value); - } - if (enableDelete) { - rowMap.put(LoadConstants.DORIS_DELETE_SIGN, parseDeleteSign(row.getRowKind())); - } - return objectMapper.writeValueAsString(rowMap); - } + TextSerializationSchema build = + TextSerializationSchema.builder() + .seaTunnelRowType(seaTunnelRowType) + .delimiter(fieldDelimiter) + .nullValue(NULL_VALUE) + .build(); - public String buildCSVString(SeaTunnelRow row) throws IOException { - StringJoiner joiner = new StringJoiner(fieldDelimiter); - for (int i = 0; i < row.getFields().length; i++) { - Object field = convert(seaTunnelRowType.getFieldType(i), row.getField(i)); - String value = field != null ? field.toString() : NULL_VALUE; - joiner.add(value); - } - if (enableDelete) { - joiner.add(parseDeleteSign(row.getRowKind())); - } - return joiner.toString(); + return build.serialize(row); } public String parseDeleteSign(RowKind rowKind) { @@ -105,46 +87,40 @@ public String parseDeleteSign(RowKind rowKind) { } } - public static Builder builder() { - return new Builder(); - } - - /** Builder for RowDataSerializer. */ - public static class Builder { - private SeaTunnelRowType seaTunnelRowType; - private String type; - private String fieldDelimiter; - private boolean deletable; - - public Builder setType(String type) { - this.type = type; - return this; - } + @Override + public void open() throws IOException {} - public Builder setSeaTunnelRowType(SeaTunnelRowType seaTunnelRowType) { - this.seaTunnelRowType = seaTunnelRowType; - return this; - } + @Override + public byte[] serialize(SeaTunnelRow seaTunnelRow) throws IOException { - public Builder setFieldDelimiter(String fieldDelimiter) { - this.fieldDelimiter = fieldDelimiter; - return this; - } + List fieldNames = Arrays.asList(seaTunnelRowType.getFieldNames()); + List> fieldTypes = Arrays.asList(seaTunnelRowType.getFieldTypes()); - public Builder enableDelete(boolean deletable) { - this.deletable = deletable; - return this; + if (enableDelete) { + SeaTunnelRow seaTunnelRowEnableDelete = seaTunnelRow.copy(); + seaTunnelRowEnableDelete.setField( + seaTunnelRow.getFields().length, parseDeleteSign(seaTunnelRow.getRowKind())); + fieldNames.add(LoadConstants.DORIS_DELETE_SIGN); + fieldTypes.add(STRING_TYPE); } - public SeaTunnelRowSerializer build() { - checkState(CSV.equals(type) && fieldDelimiter != null || JSON.equals(type)); - return new SeaTunnelRowSerializer(type, seaTunnelRowType, fieldDelimiter, deletable); + if (JSON.equals(type)) { + return buildJsonString( + seaTunnelRow, + new SeaTunnelRowType( + fieldNames.toArray(new String[0]), + fieldTypes.toArray(new SeaTunnelDataType[0]))); + } else if (CSV.equals(type)) { + return buildCSVString( + seaTunnelRow, + new SeaTunnelRowType( + fieldNames.toArray(new String[0]), + fieldTypes.toArray(new SeaTunnelDataType[0]))); + } else { + throw new IllegalArgumentException("The type " + type + " is not supported!"); } } - @Override - public void open() throws IOException {} - @Override public void close() throws IOException {} } diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/serialization/RowBatch.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/serialization/RowBatch.java index a569e2b285e..930e83c5686 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/serialization/RowBatch.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/serialization/RowBatch.java @@ -20,6 +20,7 @@ import org.apache.seatunnel.shade.org.apache.arrow.memory.RootAllocator; import org.apache.seatunnel.shade.org.apache.arrow.vector.BigIntVector; import org.apache.seatunnel.shade.org.apache.arrow.vector.BitVector; +import org.apache.seatunnel.shade.org.apache.arrow.vector.DateDayVector; import org.apache.seatunnel.shade.org.apache.arrow.vector.DecimalVector; import org.apache.seatunnel.shade.org.apache.arrow.vector.FieldVector; import org.apache.seatunnel.shade.org.apache.arrow.vector.FixedSizeBinaryVector; @@ -27,6 +28,7 @@ import org.apache.seatunnel.shade.org.apache.arrow.vector.Float8Vector; import org.apache.seatunnel.shade.org.apache.arrow.vector.IntVector; import org.apache.seatunnel.shade.org.apache.arrow.vector.SmallIntVector; +import org.apache.seatunnel.shade.org.apache.arrow.vector.TimeStampMicroVector; import org.apache.seatunnel.shade.org.apache.arrow.vector.TinyIntVector; import org.apache.seatunnel.shade.org.apache.arrow.vector.VarCharVector; import org.apache.seatunnel.shade.org.apache.arrow.vector.VectorSchemaRoot; @@ -46,6 +48,8 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.api.table.type.SqlType; +import org.apache.seatunnel.common.utils.DateTimeUtils; +import org.apache.seatunnel.common.utils.DateUtils; import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode; import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException; @@ -71,21 +75,21 @@ @Slf4j public class RowBatch { + SeaTunnelDataType[] fieldTypes; + private final ArrowStreamReader arrowStreamReader; + private final String DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss"; + private final String DATETIMEV2_PATTERN = "yyyy-MM-dd HH:mm:ss.SSSSSS"; + private final DateTimeFormatter dateTimeV2Formatter = + DateTimeFormatter.ofPattern(DATETIMEV2_PATTERN); + private final DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd"); // offset for iterate the rowBatch private int offsetInRowBatch = 0; private int rowCountInOneBatch = 0; private int readRowCount = 0; - SeaTunnelDataType[] fieldTypes; private List seatunnelRowBatch = new ArrayList<>(); - private final ArrowStreamReader arrowStreamReader; private VectorSchemaRoot root; private List fieldVectors; private RootAllocator rootAllocator; - private final String DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss"; - private final String DATETIMEV2_PATTERN = "yyyy-MM-dd HH:mm:ss.SSSSSS"; - private final DateTimeFormatter dateTimeV2Formatter = - DateTimeFormatter.ofPattern(DATETIMEV2_PATTERN); - private final DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd"); public RowBatch(TScanBatchResult nextResult, SeaTunnelRowType seaTunnelRowType) { this.rootAllocator = new RootAllocator(Integer.MAX_VALUE); @@ -293,6 +297,19 @@ private void convertArrowValue( return new BigDecimal(new BigInteger(bytes), 0); }); break; + } else if (fieldVector instanceof VarCharVector) { + VarCharVector varCharVector = (VarCharVector) fieldVector; + Preconditions.checkArgument( + minorType.equals(Types.MinorType.VARCHAR), + typeMismatchMessage(currentType, minorType)); + addValueToRowForAllRows( + col, + rowIndex -> + varCharVector.isNull(rowIndex) + ? null + : new BigDecimal( + new String(varCharVector.get(rowIndex)))); + break; } DecimalVector decimalVector = (DecimalVector) fieldVector; Preconditions.checkArgument( @@ -307,6 +324,21 @@ private void convertArrowValue( break; case "DATE": case "DATEV2": + if (fieldVector instanceof DateDayVector) { + DateDayVector dateVector = (DateDayVector) fieldVector; + Preconditions.checkArgument( + minorType.equals(Types.MinorType.DATEDAY), + typeMismatchMessage(currentType, minorType)); + addValueToRowForAllRows( + col, + rowIndex -> { + if (dateVector.isNull(rowIndex)) { + return null; + } + return LocalDate.ofEpochDay(dateVector.get(rowIndex)); + }); + break; + } VarCharVector dateVector = (VarCharVector) fieldVector; Preconditions.checkArgument( minorType.equals(Types.MinorType.VARCHAR), @@ -322,6 +354,22 @@ private void convertArrowValue( }); break; case "TIMESTAMP": + if (fieldVector instanceof TimeStampMicroVector) { + TimeStampMicroVector timestampVector = (TimeStampMicroVector) fieldVector; + + addValueToRowForAllRows( + col, + rowIndex -> { + if (timestampVector.isNull(rowIndex)) { + return null; + } + String stringValue = timestampVector.getObject(rowIndex).toString(); + stringValue = completeMilliseconds(stringValue); + + return DateTimeUtils.parse(stringValue); + }); + break; + } VarCharVector timestampVector = (VarCharVector) fieldVector; Preconditions.checkArgument( minorType.equals(Types.MinorType.VARCHAR), @@ -499,6 +547,9 @@ private Object getDataFromVector(Object vectorObject, SqlType sqlType) { } if (vectorObject instanceof Integer) { + if (sqlType.equals(SqlType.DATE)) { + return LocalDate.ofEpochDay((int) vectorObject); + } return Integer.valueOf(vectorObject.toString()); } @@ -520,6 +571,8 @@ private Object getDataFromVector(Object vectorObject, SqlType sqlType) { return LocalDateTime.parse(stringValue, dateTimeV2Formatter); } else if (sqlType.equals(SqlType.DATE)) { return LocalDate.parse(vectorObject.toString(), dateFormatter); + } else if (sqlType.equals(SqlType.DECIMAL)) { + return new BigDecimal(vectorObject.toString()); } return vectorObject.toString(); } @@ -540,6 +593,13 @@ private Object getDataFromVector(Object vectorObject, SqlType sqlType) { } return new BigDecimal(new BigInteger(bytes), 0); } + if (vectorObject instanceof LocalDate) { + return DateUtils.parse(vectorObject.toString()); + } + + if (vectorObject instanceof LocalDateTime) { + return DateTimeUtils.parse(vectorObject.toString()); + } return vectorObject.toString(); } diff --git a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowConverterTest.java b/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowConverterTest.java deleted file mode 100644 index 5755beb3f74..00000000000 --- a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowConverterTest.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.connectors.doris.serialize; - -import org.apache.seatunnel.api.table.type.LocalTimeType; - -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -import java.time.LocalDateTime; - -public class SeaTunnelRowConverterTest { - - private static final SeaTunnelRowConverter seaTunnelRowConverter = new SeaTunnelRowConverter(); - - @Test - void testDateTimeWithNano() { - Assertions.assertEquals( - "2021-01-01 00:00:00.123456", - seaTunnelRowConverter.convert( - LocalTimeType.LOCAL_DATE_TIME_TYPE, - LocalDateTime.of(2021, 1, 1, 0, 0, 0, 123456789))); - Assertions.assertEquals( - "2021-01-01 00:00:00.000000", - seaTunnelRowConverter.convert( - LocalTimeType.LOCAL_DATE_TIME_TYPE, - LocalDateTime.of(2021, 1, 1, 0, 0, 0, 0))); - Assertions.assertEquals( - "2021-01-01 00:00:00.000001", - seaTunnelRowConverter.convert( - LocalTimeType.LOCAL_DATE_TIME_TYPE, - LocalDateTime.of(2021, 1, 1, 0, 0, 0, 1000))); - Assertions.assertEquals( - "2021-01-01 00:00:00.000123", - seaTunnelRowConverter.convert( - LocalTimeType.LOCAL_DATE_TIME_TYPE, - LocalDateTime.of(2021, 1, 1, 0, 0, 0, 123456))); - } -} diff --git a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonSerializationSchema.java b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonSerializationSchema.java index 4e2e98317b4..b35710b3a0c 100644 --- a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonSerializationSchema.java +++ b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonSerializationSchema.java @@ -59,6 +59,13 @@ public JsonSerializationSchema(SeaTunnelRowType rowType, Charset charset) { this.charset = charset; } + public JsonSerializationSchema(SeaTunnelRowType rowType, String nullValue) { + this.rowType = rowType; + this.runtimeConverter = + new RowToJsonConverters().createConverter(checkNotNull(rowType), nullValue); + this.charset = StandardCharsets.UTF_8; + } + @Override public byte[] serialize(SeaTunnelRow row) { if (node == null) { diff --git a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/RowToJsonConverters.java b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/RowToJsonConverters.java index 575b5bace14..2cf8ae092e7 100644 --- a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/RowToJsonConverters.java +++ b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/RowToJsonConverters.java @@ -49,15 +49,25 @@ public class RowToJsonConverters implements Serializable { private static final long serialVersionUID = 6988876688930916940L; + private String nullValue; + public RowToJsonConverter createConverter(SeaTunnelDataType type) { return wrapIntoNullableConverter(createNotNullConverter(type)); } + public RowToJsonConverter createConverter(SeaTunnelDataType type, String nullValue) { + this.nullValue = nullValue; + return createConverter(type); + } + private RowToJsonConverter wrapIntoNullableConverter(RowToJsonConverter converter) { return new RowToJsonConverter() { @Override public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) { if (value == null) { + if (nullValue != null) { + return mapper.getNodeFactory().textNode(nullValue); + } return mapper.getNodeFactory().nullNode(); } return converter.convert(mapper, reuse, value); @@ -74,7 +84,9 @@ private RowToJsonConverter createNotNullConverter(SeaTunnelDataType type) { return new RowToJsonConverter() { @Override public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) { - return null; + return nullValue == null + ? null + : mapper.getNodeFactory().textNode((String) value); } }; case BOOLEAN: @@ -175,8 +187,7 @@ public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) { return createArrayConverter((ArrayType) type); case MAP: MapType mapType = (MapType) type; - return createMapConverter( - mapType.toString(), mapType.getKeyType(), mapType.getValueType()); + return createMapConverter(mapType.getKeyType(), mapType.getValueType()); default: throw new SeaTunnelJsonFormatException( CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, @@ -258,15 +269,10 @@ public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) { } private RowToJsonConverter createMapConverter( - String typeSummary, SeaTunnelDataType keyType, SeaTunnelDataType valueType) { - if (!SqlType.STRING.equals(keyType.getSqlType())) { - throw new SeaTunnelJsonFormatException( - CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, - "JSON format doesn't support non-string as key type of map. The type is: " - + typeSummary); - } - + SeaTunnelDataType keyType, SeaTunnelDataType valueType) { + final RowToJsonConverter keyConverter = createConverter(keyType); final RowToJsonConverter valueConverter = createConverter(valueType); + return new RowToJsonConverter() { @Override public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) { @@ -280,9 +286,12 @@ public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) { node.removeAll(); } - Map mapData = (Map) value; - for (Map.Entry entry : mapData.entrySet()) { - String fieldName = entry.getKey(); + Map mapData = (Map) value; + for (Map.Entry entry : mapData.entrySet()) { + // Convert the key to a string using the key converter + JsonNode keyNode = keyConverter.convert(mapper, null, entry.getKey()); + String fieldName = keyNode.isTextual() ? keyNode.asText() : keyNode.toString(); + node.set( fieldName, valueConverter.convert(mapper, node.get(fieldName), entry.getValue())); diff --git a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java index ff1bb820056..fb6fd9da767 100644 --- a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java +++ b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java @@ -601,4 +601,81 @@ public void testParseUnsupportedDateTimeFormat() throws IOException { "ErrorCode:[COMMON-33], ErrorDescription:[The datetime format '2022-09-24-22:45:00' of field 'timestamp_field' is not supported. Please check the datetime format.]", exception2.getCause().getCause().getMessage()); } + + @Test + public void testSerializationWithNullValue() { + SeaTunnelRowType schema = + new SeaTunnelRowType( + new String[] { + "bool", "int", "longValue", "float", "name", "date", "time", "timestamp" + }, + new SeaTunnelDataType[] { + BOOLEAN_TYPE, + INT_TYPE, + LONG_TYPE, + FLOAT_TYPE, + STRING_TYPE, + LocalTimeType.LOCAL_DATE_TYPE, + LocalTimeType.LOCAL_TIME_TYPE, + LocalTimeType.LOCAL_DATE_TIME_TYPE + }); + + Object[] fields = new Object[] {null, null, null, null, null, null, null, null}; + SeaTunnelRow expected = new SeaTunnelRow(fields); + assertEquals( + "{\"bool\":\"\\\\N\",\"int\":\"\\\\N\",\"longValue\":\"\\\\N\",\"float\":\"\\\\N\",\"name\":\"\\\\N\",\"date\":\"\\\\N\",\"time\":\"\\\\N\",\"timestamp\":\"\\\\N\"}", + new String(new JsonSerializationSchema(schema, "\\N").serialize(expected))); + } + + @Test + public void testSerializationWithMapHasNonStringKey() { + SeaTunnelRowType schema = + new SeaTunnelRowType( + new String[] {"mapii", "mapbb"}, + new SeaTunnelDataType[] { + new MapType(INT_TYPE, INT_TYPE), new MapType(BOOLEAN_TYPE, INT_TYPE) + }); + Map mapII = new HashMap<>(); + mapII.put(1, 2); + + Map mapBI = new HashMap<>(); + mapBI.put(true, 3); + + Object[] fields = new Object[] {mapII, mapBI}; + SeaTunnelRow expected = new SeaTunnelRow(fields); + assertEquals( + "{\"mapii\":{\"1\":2},\"mapbb\":{\"true\":3}}", + new String(new JsonSerializationSchema(schema, "\\N").serialize(expected))); + } + + @Test + public void testSerializationWithTimestamp() { + SeaTunnelRowType schema = + new SeaTunnelRowType( + new String[] {"timestamp"}, + new SeaTunnelDataType[] {LocalTimeType.LOCAL_DATE_TIME_TYPE}); + LocalDateTime timestamp = LocalDateTime.of(2022, 9, 24, 22, 45, 0, 123456000); + SeaTunnelRow row = new SeaTunnelRow(new Object[] {timestamp}); + assertEquals( + "{\"timestamp\":\"2022-09-24T22:45:00.123456\"}", + new String(new JsonSerializationSchema(schema, "\\N").serialize(row))); + + timestamp = LocalDateTime.of(2022, 9, 24, 22, 45, 0, 0); + row = new SeaTunnelRow(new Object[] {timestamp}); + assertEquals( + "{\"timestamp\":\"2022-09-24T22:45:00\"}", + new String(new JsonSerializationSchema(schema, "\\N").serialize(row))); + + timestamp = LocalDateTime.of(2022, 9, 24, 22, 45, 0, 1000); + row = new SeaTunnelRow(new Object[] {timestamp}); + assertEquals( + "{\"timestamp\":\"2022-09-24T22:45:00.000001\"}", + new String(new JsonSerializationSchema(schema, "\\N").serialize(row))); + + timestamp = LocalDateTime.of(2022, 9, 24, 22, 45, 0, 123456); + row = new SeaTunnelRow(new Object[] {timestamp}); + assertEquals( + "{\"timestamp\":\"2022-09-24T22:45:00.000123456\"}", + new String(new JsonSerializationSchema(schema, "\\N").serialize(row))); + } } diff --git a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextSerializationSchema.java b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextSerializationSchema.java index 6f108ee295d..01ca981a11d 100644 --- a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextSerializationSchema.java +++ b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextSerializationSchema.java @@ -48,6 +48,7 @@ public class TextSerializationSchema implements SerializationSchema { private final DateTimeUtils.Formatter dateTimeFormatter; private final TimeUtils.Formatter timeFormatter; private final Charset charset; + private final String nullValue; private TextSerializationSchema( @NonNull SeaTunnelRowType seaTunnelRowType, @@ -55,13 +56,15 @@ private TextSerializationSchema( DateUtils.Formatter dateFormatter, DateTimeUtils.Formatter dateTimeFormatter, TimeUtils.Formatter timeFormatter, - Charset charset) { + Charset charset, + String nullValue) { this.seaTunnelRowType = seaTunnelRowType; this.separators = separators; this.dateFormatter = dateFormatter; this.dateTimeFormatter = dateTimeFormatter; this.timeFormatter = timeFormatter; this.charset = charset; + this.nullValue = nullValue; } public static Builder builder() { @@ -76,6 +79,7 @@ public static class Builder { DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS; private TimeUtils.Formatter timeFormatter = TimeUtils.Formatter.HH_MM_SS; private Charset charset = StandardCharsets.UTF_8; + private String nullValue = ""; private Builder() {} @@ -114,6 +118,11 @@ public Builder charset(Charset charset) { return this; } + public Builder nullValue(String nullValue) { + this.nullValue = nullValue; + return this; + } + public TextSerializationSchema build() { return new TextSerializationSchema( seaTunnelRowType, @@ -121,7 +130,8 @@ public TextSerializationSchema build() { dateFormatter, dateTimeFormatter, timeFormatter, - charset); + charset, + nullValue); } } @@ -141,7 +151,7 @@ public byte[] serialize(SeaTunnelRow element) { private String convert(Object field, SeaTunnelDataType fieldType, int level) { if (field == null) { - return ""; + return nullValue; } switch (fieldType.getSqlType()) { case DOUBLE: diff --git a/seatunnel-formats/seatunnel-format-text/src/test/java/org/apache/seatunnel/format/text/CsvTextFormatSchemaTest.java b/seatunnel-formats/seatunnel-format-text/src/test/java/org/apache/seatunnel/format/text/CsvTextFormatSchemaTest.java index 0f58e32f145..77c80a4bb81 100644 --- a/seatunnel-formats/seatunnel-format-text/src/test/java/org/apache/seatunnel/format/text/CsvTextFormatSchemaTest.java +++ b/seatunnel-formats/seatunnel-format-text/src/test/java/org/apache/seatunnel/format/text/CsvTextFormatSchemaTest.java @@ -25,6 +25,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.utils.DateTimeUtils.Formatter; import org.apache.seatunnel.format.text.splitor.CsvLineSplitor; import org.junit.jupiter.api.Assertions; @@ -34,9 +35,12 @@ import java.io.IOException; import java.math.BigDecimal; import java.time.LocalDate; +import java.time.LocalDateTime; import java.util.Arrays; import java.util.Map; +import static org.junit.jupiter.api.Assertions.assertEquals; + public class CsvTextFormatSchemaTest { public String content = "\"mess,age\"," @@ -150,4 +154,40 @@ public void testParse() throws IOException { Assertions.assertEquals(((Map) (seaTunnelRow.getField(15))).get("tyrantlucifer"), 18); Assertions.assertEquals(((Map) (seaTunnelRow.getField(15))).get("Kris"), 21); } + + @Test + public void testSerializationWithTimestamp() { + String delimiter = ","; + + SeaTunnelRowType schema = + new SeaTunnelRowType( + new String[] {"timestamp"}, + new SeaTunnelDataType[] {LocalTimeType.LOCAL_DATE_TIME_TYPE}); + LocalDateTime timestamp = LocalDateTime.of(2022, 9, 24, 22, 45, 0, 123456000); + TextSerializationSchema textSerializationSchema = + TextSerializationSchema.builder() + .seaTunnelRowType(schema) + .dateTimeFormatter(Formatter.YYYY_MM_DD_HH_MM_SS_SSSSSS) + .delimiter(delimiter) + .build(); + SeaTunnelRow row = new SeaTunnelRow(new Object[] {timestamp}); + + assertEquals( + "2022-09-24 22:45:00.123456", new String(textSerializationSchema.serialize(row))); + + timestamp = LocalDateTime.of(2022, 9, 24, 22, 45, 0, 0); + row = new SeaTunnelRow(new Object[] {timestamp}); + assertEquals( + "2022-09-24 22:45:00.000000", new String(textSerializationSchema.serialize(row))); + + timestamp = LocalDateTime.of(2022, 9, 24, 22, 45, 0, 1000); + row = new SeaTunnelRow(new Object[] {timestamp}); + assertEquals( + "2022-09-24 22:45:00.000001", new String(textSerializationSchema.serialize(row))); + + timestamp = LocalDateTime.of(2022, 9, 24, 22, 45, 0, 123456); + row = new SeaTunnelRow(new Object[] {timestamp}); + assertEquals( + "2022-09-24 22:45:00.000123", new String(textSerializationSchema.serialize(row))); + } } diff --git a/seatunnel-formats/seatunnel-format-text/src/test/java/org/apache/seatunnel/format/text/TextFormatSchemaTest.java b/seatunnel-formats/seatunnel-format-text/src/test/java/org/apache/seatunnel/format/text/TextFormatSchemaTest.java index 45574392d23..a8ab6decfa4 100644 --- a/seatunnel-formats/seatunnel-format-text/src/test/java/org/apache/seatunnel/format/text/TextFormatSchemaTest.java +++ b/seatunnel-formats/seatunnel-format-text/src/test/java/org/apache/seatunnel/format/text/TextFormatSchemaTest.java @@ -36,6 +36,13 @@ import java.util.Arrays; import java.util.Map; +import static org.apache.seatunnel.api.table.type.BasicType.BOOLEAN_TYPE; +import static org.apache.seatunnel.api.table.type.BasicType.FLOAT_TYPE; +import static org.apache.seatunnel.api.table.type.BasicType.INT_TYPE; +import static org.apache.seatunnel.api.table.type.BasicType.LONG_TYPE; +import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE; +import static org.junit.jupiter.api.Assertions.assertEquals; + public class TextFormatSchemaTest { public String content = String.join("\u0002", Arrays.asList("1", "2", "3", "4", "5", "6")) @@ -187,4 +194,38 @@ public void testParseUnsupportedDateTimeFormat() throws IOException { "ErrorCode:[COMMON-33], ErrorDescription:[The datetime format '2022-09-24-22:45:00' of field 'timestamp_field' is not supported. Please check the datetime format.]", exception2.getMessage()); } + + @Test + public void testSerializationWithNullValue() throws Exception { + SeaTunnelRowType schema = + new SeaTunnelRowType( + new String[] { + "bool", "int", "longValue", "float", "name", "date", "time", "timestamp" + }, + new SeaTunnelDataType[] { + BOOLEAN_TYPE, + INT_TYPE, + LONG_TYPE, + FLOAT_TYPE, + STRING_TYPE, + LocalTimeType.LOCAL_DATE_TYPE, + LocalTimeType.LOCAL_TIME_TYPE, + LocalTimeType.LOCAL_DATE_TIME_TYPE + }); + + Object[] fields = new Object[] {null, null, null, null, null, null, null, null}; + SeaTunnelRow expected = new SeaTunnelRow(fields); + + TextSerializationSchema textSerializationSchema = + TextSerializationSchema.builder() + .seaTunnelRowType(schema) + .delimiter("\u0001") + .nullValue("\\N") + .build(); + + System.out.println(new String(textSerializationSchema.serialize(expected))); + assertEquals( + "\\N\u0001\\N\u0001\\N\u0001\\N\u0001\\N\u0001\\N\u0001\\N\u0001\\N", + new String(textSerializationSchema.serialize(expected))); + } }