diff --git a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertRuleParser.java b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertRuleParser.java index 5d90591af7c..7681f67cde2 100644 --- a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertRuleParser.java +++ b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertRuleParser.java @@ -91,6 +91,12 @@ private List assembleFieldValueRules( } private SeaTunnelDataType getFieldType(String fieldTypeStr) { + if (fieldTypeStr.toLowerCase().startsWith("decimal(")) { + String lengthAndScale = + fieldTypeStr.toLowerCase().replace("decimal(", "").replace(")", ""); + String[] split = lengthAndScale.split(","); + return new DecimalType(Integer.valueOf(split[0]), Integer.valueOf(split[1])); + } return TYPES.get(fieldTypeStr.toLowerCase()); } @@ -110,6 +116,5 @@ private SeaTunnelDataType getFieldType(String fieldTypeStr) { TYPES.put("datetime", LocalTimeType.LOCAL_DATE_TIME_TYPE); TYPES.put("date", LocalTimeType.LOCAL_DATE_TYPE); TYPES.put("time", LocalTimeType.LOCAL_TIME_TYPE); - TYPES.put("decimal", new DecimalType(38, 18)); } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_hive_source_and_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_hive_source_and_assert.conf index 04b0240a3bf..cd1b32c14e5 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_hive_source_and_assert.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_hive_source_and_assert.conf @@ -127,12 +127,12 @@ sink{ }, { field_name = hive_e2e_source_table.decimal_column - field_type = decimal + field_type = "decimal(10,2)" field_value = [{equals_to = 42.12}] }, { field_name = hive_e2e_source_table.numeric_column - field_type = decimal + field_type = "decimal(10,2)" field_value = [{equals_to = 42.12}] }, ] diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/json_path_transform/json_path_basic_type_test.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/json_path_transform/json_path_basic_type_test.conf index 97fa1ae036c..c6f9a15f54a 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/json_path_transform/json_path_basic_type_test.conf +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/json_path_transform/json_path_basic_type_test.conf @@ -159,7 +159,7 @@ sink { }, { field_name = c1_decimal - field_type = decimal + field_type = "decimal(4,2)" field_value = [ { rule_type = NOT_NULL diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_system.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_system.conf index c28849e4406..2c924419ec6 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_system.conf +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_system.conf @@ -34,10 +34,13 @@ source { c1 = "string" c2 = "timestamp" c3 = "string" + c4 = "bigint" + c5 = "int" + c6 = "int" } } rows = [ - {fields = [1, "Joy Ding", "12.4", "2012-12-21T12:34:56", null], kind = INSERT} + {fields = [1, "Joy Ding", "12.4", "2012-12-21T12:34:56", null, 1687747869032, 20230625, 235109], kind = INSERT} ] } } @@ -46,7 +49,7 @@ transform { Sql { source_table_name = "fake" result_table_name = "fake1" - query = "select cast(id as STRING) as id, cast(id as INT) as id2, cast(id as DOUBLE) as id3 , cast(c1 as double) as c1_1, cast(c1 as DECIMAL(10,2)) as c1_2, cast(c2 as DATE) as c2_1, coalesce(c3,'Unknown') c3_1, ifnull(c3,'Unknown') c3_2, ifnull(nullif(name,'Joy Ding'),'NULL') name1, nullif(name,'Joy Ding_') name2 from fake" + query = "select cast(id as STRING) as id, cast(id as INT) as id2, cast(id as DOUBLE) as id3 , cast(c1 as double) as c1_1, cast(c1 as DECIMAL(10,2)) as c1_2, cast(c2 as DATE) as c2_1, coalesce(c3,'Unknown') c3_1, ifnull(c3,'Unknown') c3_2, ifnull(nullif(name,'Joy Ding'),'NULL') name1, nullif(name,'Joy Ding_') name2, cast(c4 as timestamp) as c4_1, cast(c4 as decimal(17,4)) as c4_2, cast(c5 as date) as c5, cast(c6 as time) as c6 from fake" } } @@ -86,6 +89,13 @@ sink { {equals_to = 12.4} ] }, + { + field_name = "c1_2" + field_type = "decimal(10,2)" + field_value = [ + {equals_to = "12.40"} + ] + }, { field_name = "c2_1" field_type = "date" @@ -120,6 +130,34 @@ sink { field_value = [ {equals_to = "Joy Ding"} ] + }, + { + field_name = "c4_1" + field_type = "timestamp" + field_value = [ + {equals_to = "2023-06-26T02:51:09.032"} + ] + }, + { + field_name = "c4_2" + field_type = "decimal(17,4)" + field_value = [ + {equals_to = "1687747869032.0000"} + ] + }, + { + field_name = "c5" + field_type = "date" + field_value = [ + {equals_to = "2023-06-25"} + ] + }, + { + field_name = "c6" + field_type = "time" + field_value = [ + {equals_to = "23:51:09"} + ] } ] } diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/SystemFunction.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/SystemFunction.java index 36ead478891..b9eeb48b405 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/SystemFunction.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/SystemFunction.java @@ -22,9 +22,11 @@ import java.math.BigDecimal; import java.math.RoundingMode; +import java.time.Instant; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; +import java.time.ZoneId; import java.util.List; public class SystemFunction { @@ -96,6 +98,14 @@ public static Object castAs(List args) { if (v1 instanceof LocalTime) { return LocalDateTime.of(LocalDate.now(), (LocalTime) v1); } + if (v1 instanceof Long) { + Instant instant = Instant.ofEpochMilli(((Long) v1).longValue()); + ZoneId zone = ZoneId.systemDefault(); + return LocalDateTime.ofInstant(instant, zone); + } + throw new TransformException( + CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION, + String.format("Unsupported CAST AS type: %s", v2)); case "DATE": if (v1 instanceof LocalDateTime) { return ((LocalDateTime) v1).toLocalDate(); @@ -103,16 +113,36 @@ public static Object castAs(List args) { if (v1 instanceof LocalDate) { return v1; } + if (v1 instanceof Integer) { + int dateValue = ((Integer) v1).intValue(); + int year = dateValue / 10000; + int month = (dateValue / 100) % 100; + int day = dateValue % 100; + return LocalDate.of(year, month, day); + } + throw new TransformException( + CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION, + String.format("Unsupported CAST AS type: %s", v2)); case "TIME": if (v1 instanceof LocalDateTime) { return ((LocalDateTime) v1).toLocalTime(); } if (v1 instanceof LocalDate) { - return LocalDateTime.of((LocalDate) v1, LocalTime.of(0, 0, 0)); + return LocalTime.of(0, 0, 0); } if (v1 instanceof LocalTime) { - return LocalDateTime.of(LocalDate.now(), (LocalTime) v1); + return v1; + } + if (v1 instanceof Integer) { + int intTime = ((Integer) v1).intValue(); + int hour = intTime / 10000; + int minute = (intTime / 100) % 100; + int second = intTime % 100; + return LocalTime.of(hour, minute, second); } + throw new TransformException( + CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION, + String.format("Unsupported CAST AS type: %s", v2)); case "DECIMAL": BigDecimal bigDecimal = new BigDecimal(v1.toString()); Integer scale = (Integer) args.get(3);