Skip to content

Commit

Permalink
fix PaimonIT error
Browse files Browse the repository at this point in the history
  • Loading branch information
liunaijie committed Aug 6, 2023
1 parent 669aba4 commit 7498907
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.seatunnel.connectors.seatunnel.paimon.sink;

import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode;
Expand All @@ -39,6 +41,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -115,6 +118,72 @@ public void write(SeaTunnelRow element) throws IOException {
}
}

private String internalRowToString(InternalRow rowData) {
StringBuilder stringBuilder = new StringBuilder();
SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
for (int i = 0; i < fieldTypes.length; i++) {
// judge the field is or not equals null
if (rowData.isNullAt(i)) {
stringBuilder.append("null value ");
}
stringBuilder.append(", ");
switch (fieldTypes[i].getSqlType()) {
case TINYINT:
stringBuilder.append(rowData.getByte(i));
break;
case SMALLINT:
stringBuilder.append(rowData.getShort(i));
break;
case INT:
stringBuilder.append(rowData.getInt(i));
break;
case BIGINT:
stringBuilder.append(rowData.getLong(i));
break;
case FLOAT:
stringBuilder.append(rowData.getFloat(i));
break;
case DOUBLE:
stringBuilder.append(rowData.getDouble(i));
break;
case DECIMAL:
DecimalType fieldType = (DecimalType) seaTunnelRowType.getFieldType(i);
log.info("decimal type: " + fieldType);
stringBuilder.append(
rowData.getDecimal(i, fieldType.getPrecision(), fieldType.getScale()));
break;
case STRING:
stringBuilder.append(rowData.getString(i));
break;
case BYTES:
stringBuilder.append(Arrays.toString(rowData.getBinary(i)));
break;
case BOOLEAN:
stringBuilder.append(rowData.getBoolean(i));
break;
case DATE:
stringBuilder.append(rowData.getTimestamp(i, 3));
break;
case TIMESTAMP:
stringBuilder.append(rowData.getTimestamp(i, 9));
break;
case MAP:
stringBuilder.append(rowData.getMap(i).toString());
break;
case ARRAY:
stringBuilder.append(rowData.getArray(i));
break;
case ROW:
SeaTunnelRowType rowType = (SeaTunnelRowType) seaTunnelRowType.getFieldType(i);
InternalRow internalRow = rowData.getRow(i, rowType.getTotalFields());
String string = internalRowToString(internalRow);
stringBuilder.append(string);
break;
}
}
return stringBuilder.toString();
}

@Override
public Optional<PaimonCommitInfo> prepareCommit() throws IOException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;

import lombok.extern.slf4j.Slf4j;

import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
Expand All @@ -53,6 +55,7 @@
import java.util.Map;

/** The converter for converting {@link InternalRow} and {@link SeaTunnelRow} */
@Slf4j
public class RowConverter {

private RowConverter() {}
Expand Down Expand Up @@ -359,13 +362,17 @@ public static InternalRow convert(
break;
case DECIMAL:
DecimalType fieldType = (DecimalType) seaTunnelRowType.getFieldType(i);
binaryWriter.writeDecimal(
i,
Decimal convertedDecimal =
Decimal.fromBigDecimal(
(BigDecimal) seaTunnelRow.getField(i),
fieldType.getPrecision(),
fieldType.getScale()),
fieldType.getPrecision());
fieldType.getScale());
binaryWriter.writeDecimal(i, convertedDecimal, fieldType.getPrecision());
log.error("convertedDecimal is " + convertedDecimal);
log.error(
"writtenDecimal is "
+ binaryRow.getDecimal(
i, fieldType.getPrecision(), fieldType.getScale()));
break;
case STRING:
binaryWriter.writeString(
Expand Down
2 changes: 1 addition & 1 deletion seatunnel-dist/release-docs/LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ The text of each license is the standard Apache 2.0 license.
(Apache License 2.0) aircompressor (io.airlift:aircompressor:0.10 - http://github.com/airlift/aircompressor)
(Apache License, Version 2.0) Apache Yetus - Audience Annotations (org.apache.yetus:audience-annotations:0.11.0 - https://yetus.apache.org/audience-annotations)
(The Apache Software License, Version 2.0) Apache Avro (org.apache.avro:avro:1.8.2 - http://avro.apache.org)
(The Apache Software License, Version 2.0) Apache Avro (org.apache.avro:avro:1.10.2 - http://avro.apache.org)
(The Apache Software License, Version 2.0) Apache Avro (org.apache.avro:avro:1.11.1 - http://avro.apache.org)
(Apache License, Version 2.0) Apache Commons Codec (commons-codec:commons-codec:1.13 - https://commons.apache.org/proper/commons-codec/)
(Apache License, Version 2.0) Apache Commons Collections (org.apache.commons:commons-collections4:4.4 - https://commons.apache.org/proper/commons-collections/)
(Apache License, Version 2.0) Apache Commons Compress (org.apache.commons:commons-compress:1.20 - https://commons.apache.org/proper/commons-compress/)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import java.nio.file.Path;

@DisabledOnContainer(
value = TestContainerId.FLINK_1_13,
value = {TestContainerId.FLINK_1_13, TestContainerId.FLINK_1_16},
disabledReason = "Paimon does not support flink 1.13")
public class PaimonIT extends TestSuiteBase {

Expand Down
3 changes: 2 additions & 1 deletion seatunnel-formats/seatunnel-format-avro/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@
<name>SeaTunnel : Formats : Avro</name>

<properties>
<avro.version>1.10.2</avro.version>
<avro.version>1.11.1</avro.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-api</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
Expand Down
2 changes: 1 addition & 1 deletion tools/dependencies/known-dependencies.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,4 @@ listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar
accessors-smart-2.4.7.jar
asm-9.1.jar
json-smart-2.4.7.jar
avro-1.10.2.jar
avro-1.11.1.jar

0 comments on commit 7498907

Please sign in to comment.