Skip to content

Commit

Permalink
[Feature][Zeta] add from_unixtime function (#5462)
Browse files Browse the repository at this point in the history
  • Loading branch information
chovy-3012 authored Nov 20, 2023
1 parent 69f79af commit fb24f8e
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 1 deletion.
24 changes: 24 additions & 0 deletions docs/en/transform-v2/sql-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -848,6 +848,30 @@ Example:

YEAR(CREATED)

### FROM_UNIXTIME

```FROM_UNIXTIME (unixtime, formatString,timeZone)```

Convert the number of seconds from the UNIX epoch (1970-01-01 00:00:00 UTC) to a string representing the timestamp of that moment.

The most important format characters are: y year, M month, d day, H hour, m minute, s second. For details of the format, see `java.time.format.DateTimeFormatter`.

`timeZone` is optional, default value is system's time zone. `timezone` value can be a `UTC+ timezone offset`, for example, `UTC+8` represents the Asia/Shanghai time zone, see `java.time.ZoneId`

This method returns a string.

Example:

// use default zone

CALL FROM_UNIXTIME(1672502400, 'yyyy-MM-dd HH:mm:ss')

or

// use given zone

CALL FROM_UNIXTIME(1672502400, 'yyyy-MM-dd HH:mm:ss','UTC+6')

## System Functions

### CAST
Expand Down
3 changes: 2 additions & 1 deletion release-note.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
### Formats
- [Canal]Support read canal format message #3950
- [Debezium]Support debezium canal format message #3981

### Connector-V2

- [Json-format] [Canal-Json] Fix json deserialize NPE (#4195)
Expand Down Expand Up @@ -78,6 +78,7 @@
- [Zeta] Fix cpu load problem (#4828)
- [zeta] Fix the deadlock issue with JDBC driver loading (#4878)
- [zeta] dynamically replace the value of the variable at runtime (#4950)
- [Zeta] Add from_unixtime function (#5462)
- [zeta] Fix CDC task restore throw NPE (#5507)

### E2E
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ public void testSQLTransform(TestContainer container) throws IOException, Interr
Container.ExecResult sqlFuncSystem =
container.executeJob("/sql_transform/func_system.conf");
Assertions.assertEquals(0, sqlFuncSystem.getExitCode());
Container.ExecResult sqlFuncFromUnixtime =
container.executeJob("/sql_transform/func_from_unixtime.conf");
Assertions.assertEquals(0, sqlFuncFromUnixtime.getExitCode());
Container.ExecResult sqlCriteriaFilter =
container.executeJob("/sql_transform/criteria_filter.conf");
Assertions.assertEquals(0, sqlCriteriaFilter.getExitCode());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
execution.parallelism = 1
job.mode = "BATCH"
checkpoint.interval = 10000
}

source {
FakeSource {
result_table_name = "fake"
schema = {
fields {
unixtime = "bigint"
}
}
rows = [
{fields = [1672502400], kind = INSERT}
]
}
}

transform {
Sql {
source_table_name = "fake"
result_table_name = "fake1"
query = "select from_unixtime(unixtime,'yyyy-MM-dd HH:mm:ss','UTC+8') as ts from fake"
}
}

sink {
Console {
source_table_name = "fake1"
}
Assert {
source_table_name = "fake1"
rules = {
field_rules = [
{
field_name = "ts"
field_type = "string"
field_value = [
{equals_to = "2023-01-01 00:00:00"}
]
}
]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ public class ZetaSQLFunction {
public static final String SECOND = "SECOND";
public static final String WEEK = "WEEK";
public static final String YEAR = "YEAR";
public static final String FROM_UNIXTIME = "FROM_UNIXTIME";

// -------------------------system functions----------------------------
public static final String COALESCE = "COALESCE";
Expand Down Expand Up @@ -377,6 +378,8 @@ public Object executeFunctionExpr(String functionName, List<Object> args) {
return DateTimeFunction.dayOfWeek(args);
case DAY_OF_YEAR:
return DateTimeFunction.dayOfYear(args);
case FROM_UNIXTIME:
return DateTimeFunction.fromUnixTime(args);
case EXTRACT:
return DateTimeFunction.extract(args);
case FORMATDATETIME:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ private SeaTunnelDataType<?> getFunctionType(Function function) {
case ZetaSQLFunction.DAYNAME:
case ZetaSQLFunction.MONTHNAME:
case ZetaSQLFunction.FORMATDATETIME:
case ZetaSQLFunction.FROM_UNIXTIME:
return BasicType.STRING_TYPE;
case ZetaSQLFunction.ASCII:
case ZetaSQLFunction.LOCATE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@

import java.text.DateFormatSymbols;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.Period;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.temporal.Temporal;
import java.time.temporal.TemporalAccessor;
Expand Down Expand Up @@ -541,4 +543,20 @@ public static Integer year(List<Object> args) {
LocalDate localDate = convertToLocalDate(datetime);
return localDate.getYear();
}

public static String fromUnixTime(List<Object> args) {
Long unixTime = (Long) args.get(0);
if (unixTime == null) {
return null;
}
String format = (String) args.get(1);
ZoneId zoneId = ZoneId.systemDefault();
if (args.size() == 3) {
String timeZone = (String) args.get(2);
zoneId = ZoneId.of(timeZone);
}
DateTimeFormatter df = DateTimeFormatter.ofPattern(format);
LocalDateTime datetime = Instant.ofEpochSecond(unixTime).atZone(zoneId).toLocalDateTime();
return df.format(datetime);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.transform.sql.zeta;

import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.transform.sql.SQLEngine;
import org.apache.seatunnel.transform.sql.SQLEngineFactory;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class DateTimeFunctionTest {

@Test
public void testFromUnixtimeFunction() {

SQLEngine sqlEngine = SQLEngineFactory.getSQLEngine(SQLEngineFactory.EngineType.ZETA);

SeaTunnelRowType rowType =
new SeaTunnelRowType(
new String[] {"unixtime"}, new SeaTunnelDataType[] {BasicType.LONG_TYPE});

// 1672502400 means `2023-01-01 12:00:00 UTC+8` in unix time
Long unixTime = 1672545600L;
SeaTunnelRow inputRow = new SeaTunnelRow(new Long[] {unixTime});

// transform by `from_unixtime` function
sqlEngine.init(
"test",
null,
rowType,
"select from_unixtime(unixtime,'yyyy-MM-dd') as ts from test");
SeaTunnelRow outRow = sqlEngine.transformBySQL(inputRow);
Object field = outRow.getField(0);
Assertions.assertNotNull(field.toString());

// transform by `from_unixtime` time zone function
sqlEngine.init(
"test",
null,
rowType,
"select from_unixtime(unixtime,'yyyy-MM-dd HH:mm:ss','UTC+6') as ts from test");
SeaTunnelRow outRow1 = sqlEngine.transformBySQL(inputRow);
Object field1 = outRow1.getField(0);
Assertions.assertEquals("2023-01-01 10:00:00", field1.toString());
}
}

0 comments on commit fb24f8e

Please sign in to comment.