Skip to content

Commit

Permalink
Fix a bug of Spark transform when has a timestamp/date type filed in …
Browse files Browse the repository at this point in the history
…source
  • Loading branch information
mcy committed Feb 26, 2023
1 parent 2d2876e commit d633bc8
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@

import java.io.IOException;
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;
Expand Down Expand Up @@ -190,12 +191,18 @@ private static Object reconvert(Object field, SeaTunnelDataType<?> dataType) {
case ROW:
return reconvert((InternalRow) field, (SeaTunnelRowType) dataType);
case DATE:
if (field instanceof Date) {
return ((Date) field).toLocalDate();
}
return LocalDate.ofEpochDay((int) field);
case TIME:
// TODO: Support TIME Type
throw new RuntimeException(
"SeaTunnel not support time type, it will be supported in the future.");
case TIMESTAMP:
if (field instanceof Timestamp) {
return ((Timestamp) field).toLocalDateTime();
}
return Timestamp.from(InstantConverterUtils.ofEpochMicro((long) field))
.toLocalDateTime();
case MAP:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@

import java.io.IOException;
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;
Expand Down Expand Up @@ -190,12 +191,18 @@ private static Object reconvert(Object field, SeaTunnelDataType<?> dataType) {
case ROW:
return reconvert((InternalRow) field, (SeaTunnelRowType) dataType);
case DATE:
if (field instanceof Date) {
return ((Date) field).toLocalDate();
}
return LocalDate.ofEpochDay((int) field);
case TIME:
// TODO: Support TIME Type
throw new RuntimeException(
"SeaTunnel not support time type, it will be supported in the future.");
case TIMESTAMP:
if (field instanceof Timestamp) {
return ((Timestamp) field).toLocalDateTime();
}
return Timestamp.from(InstantConverterUtils.ofEpochMicro((long) field))
.toLocalDateTime();
case MAP:
Expand Down

0 comments on commit d633bc8

Please sign in to comment.