diff --git a/docs/en/transform-v2/sql-functions.md b/docs/en/transform-v2/sql-functions.md index dd8b8dbfdd1..c074fab74fd 100644 --- a/docs/en/transform-v2/sql-functions.md +++ b/docs/en/transform-v2/sql-functions.md @@ -848,6 +848,30 @@ Example: YEAR(CREATED) +### FROM_UNIXTIME + +```FROM_UNIXTIME (unixtime, formatString,timeZone)``` + +Convert the number of seconds from the UNIX epoch (1970-01-01 00:00:00 UTC) to a string representing the timestamp of that moment. + +The most important format characters are: y year, M month, d day, H hour, m minute, s second. For details of the format, see `java.time.format.DateTimeFormatter`. + +`timeZone` is optional, default value is system's time zone. `timezone` value can be a `UTC+ timezone offset`, for example, `UTC+8` represents the Asia/Shanghai time zone, see `java.time.ZoneId` + +This method returns a string. + +Example: + +// use default zone + +CALL FROM_UNIXTIME(1672502400, 'yyyy-MM-dd HH:mm:ss') + +or + +// use given zone + +CALL FROM_UNIXTIME(1672502400, 'yyyy-MM-dd HH:mm:ss','UTC+6') + ## System Functions ### CAST diff --git a/release-note.md b/release-note.md index 1e352b80c71..ee7bb9bd6b4 100644 --- a/release-note.md +++ b/release-note.md @@ -20,7 +20,7 @@ ### Formats - [Canal]Support read canal format message #3950 - [Debezium]Support debezium canal format message #3981 - + ### Connector-V2 - [Json-format] [Canal-Json] Fix json deserialize NPE (#4195) @@ -78,6 +78,7 @@ - [Zeta] Fix cpu load problem (#4828) - [zeta] Fix the deadlock issue with JDBC driver loading (#4878) - [zeta] dynamically replace the value of the variable at runtime (#4950) +- [Zeta] Add from_unixtime function (#5462) - [zeta] Fix CDC task restore throw NPE (#5507) ### E2E diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java index d54a2addaf5..ad3f8edfa84 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java @@ -46,6 +46,9 @@ public void testSQLTransform(TestContainer container) throws IOException, Interr Container.ExecResult sqlFuncSystem = container.executeJob("/sql_transform/func_system.conf"); Assertions.assertEquals(0, sqlFuncSystem.getExitCode()); + Container.ExecResult sqlFuncFromUnixtime = + container.executeJob("/sql_transform/func_from_unixtime.conf"); + Assertions.assertEquals(0, sqlFuncFromUnixtime.getExitCode()); Container.ExecResult sqlCriteriaFilter = container.executeJob("/sql_transform/criteria_filter.conf"); Assertions.assertEquals(0, sqlCriteriaFilter.getExitCode()); diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_from_unixtime.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_from_unixtime.conf new file mode 100644 index 00000000000..c759c7a4086 --- /dev/null +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_from_unixtime.conf @@ -0,0 +1,67 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + execution.parallelism = 1 + job.mode = "BATCH" + checkpoint.interval = 10000 +} + +source { + FakeSource { + result_table_name = "fake" + schema = { + fields { + unixtime = "bigint" + } + } + rows = [ + {fields = [1672502400], kind = INSERT} + ] + } +} + +transform { + Sql { + source_table_name = "fake" + result_table_name = "fake1" + query = "select from_unixtime(unixtime,'yyyy-MM-dd HH:mm:ss','UTC+8') as ts from fake" + } +} + +sink { + Console { + source_table_name = "fake1" + } + Assert { + source_table_name = "fake1" + rules = { + field_rules = [ + { + field_name = "ts" + field_type = "string" + field_value = [ + {equals_to = "2023-01-01 00:00:00"} + ] + } + ] + } + } +} diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java index 0f8dee9d08b..1ed6469b864 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java @@ -155,6 +155,7 @@ public class ZetaSQLFunction { public static final String SECOND = "SECOND"; public static final String WEEK = "WEEK"; public static final String YEAR = "YEAR"; + public static final String FROM_UNIXTIME = "FROM_UNIXTIME"; // -------------------------system functions---------------------------- public static final String COALESCE = "COALESCE"; @@ -377,6 +378,8 @@ public Object executeFunctionExpr(String functionName, List args) { return DateTimeFunction.dayOfWeek(args); case DAY_OF_YEAR: return DateTimeFunction.dayOfYear(args); + case FROM_UNIXTIME: + return DateTimeFunction.fromUnixTime(args); case EXTRACT: return DateTimeFunction.extract(args); case FORMATDATETIME: diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java index 4e62c2f6c7f..4db9b95af27 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java @@ -217,6 +217,7 @@ private SeaTunnelDataType getFunctionType(Function function) { case ZetaSQLFunction.DAYNAME: case ZetaSQLFunction.MONTHNAME: case ZetaSQLFunction.FORMATDATETIME: + case ZetaSQLFunction.FROM_UNIXTIME: return BasicType.STRING_TYPE; case ZetaSQLFunction.ASCII: case ZetaSQLFunction.LOCATE: diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/DateTimeFunction.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/DateTimeFunction.java index a1f226d34cc..2bd8fa28143 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/DateTimeFunction.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/DateTimeFunction.java @@ -23,10 +23,12 @@ import java.text.DateFormatSymbols; import java.time.Duration; +import java.time.Instant; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; import java.time.Period; +import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.time.temporal.Temporal; import java.time.temporal.TemporalAccessor; @@ -541,4 +543,20 @@ public static Integer year(List args) { LocalDate localDate = convertToLocalDate(datetime); return localDate.getYear(); } + + public static String fromUnixTime(List args) { + Long unixTime = (Long) args.get(0); + if (unixTime == null) { + return null; + } + String format = (String) args.get(1); + ZoneId zoneId = ZoneId.systemDefault(); + if (args.size() == 3) { + String timeZone = (String) args.get(2); + zoneId = ZoneId.of(timeZone); + } + DateTimeFormatter df = DateTimeFormatter.ofPattern(format); + LocalDateTime datetime = Instant.ofEpochSecond(unixTime).atZone(zoneId).toLocalDateTime(); + return df.format(datetime); + } } diff --git a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/DateTimeFunctionTest.java b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/DateTimeFunctionTest.java new file mode 100644 index 00000000000..70b744a04c1 --- /dev/null +++ b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/DateTimeFunctionTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.transform.sql.zeta; + +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.transform.sql.SQLEngine; +import org.apache.seatunnel.transform.sql.SQLEngineFactory; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class DateTimeFunctionTest { + + @Test + public void testFromUnixtimeFunction() { + + SQLEngine sqlEngine = SQLEngineFactory.getSQLEngine(SQLEngineFactory.EngineType.ZETA); + + SeaTunnelRowType rowType = + new SeaTunnelRowType( + new String[] {"unixtime"}, new SeaTunnelDataType[] {BasicType.LONG_TYPE}); + + // 1672502400 means `2023-01-01 12:00:00 UTC+8` in unix time + Long unixTime = 1672545600L; + SeaTunnelRow inputRow = new SeaTunnelRow(new Long[] {unixTime}); + + // transform by `from_unixtime` function + sqlEngine.init( + "test", + null, + rowType, + "select from_unixtime(unixtime,'yyyy-MM-dd') as ts from test"); + SeaTunnelRow outRow = sqlEngine.transformBySQL(inputRow); + Object field = outRow.getField(0); + Assertions.assertNotNull(field.toString()); + + // transform by `from_unixtime` time zone function + sqlEngine.init( + "test", + null, + rowType, + "select from_unixtime(unixtime,'yyyy-MM-dd HH:mm:ss','UTC+6') as ts from test"); + SeaTunnelRow outRow1 = sqlEngine.transformBySQL(inputRow); + Object field1 = outRow1.getField(0); + Assertions.assertEquals("2023-01-01 10:00:00", field1.toString()); + } +}