Skip to content

Commit

Permalink
Fix a bug of Spark transform when has a timestamp/date type filed in …
Browse files Browse the repository at this point in the history
…source
  • Loading branch information
mcy committed Feb 26, 2023
1 parent d633bc8 commit 30d6e51
Showing 1 changed file with 23 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
import org.apache.seatunnel.translation.spark.serialization.InternalRowConverter;
import org.apache.seatunnel.translation.spark.utils.InstantConverterUtils;
import org.apache.seatunnel.translation.spark.utils.TypeConverterUtils;

import org.apache.spark.sql.Dataset;
Expand All @@ -35,13 +36,18 @@
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
import org.apache.spark.sql.catalyst.expressions.MutableValue;
import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;

import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.URL;
import java.sql.Date;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
Expand Down Expand Up @@ -139,12 +145,23 @@ private Dataset<Row> sparkTransform(SeaTunnelTransform transform, Dataset<Row> s
continue;
}
InternalRow internalRow = outputRowConverter.convert(seaTunnelRow);
outputRows.add(
new GenericRowWithSchema(
Arrays.stream(((SpecificInternalRow) internalRow).values())
.map(MutableValue::boxed)
.toArray(),
structType));

Object[] fields =
Arrays.stream(((SpecificInternalRow) internalRow).values())
.map(MutableValue::boxed)
.toArray();
for (int i = 0; i < structType.fields().length; i++) {
DataType dataType = structType.fields()[i].dataType();
Object field = fields[i];
if (dataType == DataTypes.TimestampType && field instanceof Long) {
fields[i] = Timestamp.from(InstantConverterUtils.ofEpochMicro((long) field));
}
if (dataType == DataTypes.DateType && field instanceof Integer) {
fields[i] = Date.valueOf(LocalDate.ofEpochDay((int) field));
}
}

outputRows.add(new GenericRowWithSchema(fields, structType));
}
return sparkRuntimeEnvironment.getSparkSession().createDataFrame(outputRows, structType);
}
Expand Down

0 comments on commit 30d6e51

Please sign in to comment.