Skip to content

Commit

Permalink
fix: make formattimestamp and parsetimestamp default to utc (#6954)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zara Lim authored Feb 11, 2021
1 parent 4b3bc96 commit a5ea98a
Show file tree
Hide file tree
Showing 15 changed files with 854 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public String formatTimestamp(
@UdfParameter(
description = "The format pattern should be in the format expected by"
+ " java.time.format.DateTimeFormatter.") final String formatPattern) {
return formatTimestamp(timestamp, formatPattern, ZoneId.systemDefault().getId());
return formatTimestamp(timestamp, formatPattern, ZoneId.of("GMT").getId());
}

@Udf(description = "Converts a TIMESTAMP value into the"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public Timestamp parseTimestamp(
@UdfParameter(
description = "The format pattern should be in the format expected by"
+ " java.time.format.DateTimeFormatter.") final String formatPattern) {
return parseTimestamp(formattedTimestamp, formatPattern, ZoneId.systemDefault().getId());
return parseTimestamp(formattedTimestamp, formatPattern, ZoneId.of("GMT").getId());
}

@Udf(description = "Converts a string representation of a date at the given time zone"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.TimeZone;
import java.util.stream.IntStream;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -44,10 +45,11 @@ public void shouldConvertTimestampToString() {
// When:
final String result = udf.formatTimestamp(new Timestamp(1638360611123L),
"yyyy-MM-dd HH:mm:ss.SSS");
final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
sdf.setTimeZone(TimeZone.getTimeZone("GMT"));

// Then:
final String expectedResult = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
.format(new Date(1638360611123L));
final String expectedResult = sdf.format(new Date(1638360611123L));
assertThat(result, is(expectedResult));
}

Expand All @@ -71,21 +73,6 @@ public void testPSTTimeZone() {
assertThat(result, is("2018-08-15 10:10:43"));
}

@Test
public void testTimeZoneInLocalTime() {
// Given:
final Timestamp timestamp = new Timestamp(1534353043000L);

// When:
final String localTime = udf.formatTimestamp(timestamp, "yyyy-MM-dd HH:mm:ss zz");

// Then:
final String expected = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss zz")
.format(timestamp);

assertThat(localTime, is(expected));
}

@Test
public void testTimeZoneInPacificTime() {
// Given:
Expand Down Expand Up @@ -134,8 +121,9 @@ public void shouldSupportEmbeddedChars() {
"yyyy-MM-dd'T'HH:mm:ss.SSS'Fred'");

// Then:
final String expectedResult = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Fred'")
.format(new Date(1638360611123L));
final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Fred'");
sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
final String expectedResult = sdf.format(new Date(1638360611123L));
assertThat(result, is(expectedResult));
}

Expand Down Expand Up @@ -170,7 +158,9 @@ public void shouldWorkWithManyDifferentFormatters() {
final String pattern = "yyyy-MM-dd HH:mm:ss.SSS'X" + idx + "'";
final Timestamp timestamp = new Timestamp(1538361611123L + idx);
final String result = udf.formatTimestamp(timestamp, pattern);
final String expectedResult = new SimpleDateFormat(pattern).format(timestamp);
final SimpleDateFormat sdf = new SimpleDateFormat(pattern);
sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
final String expectedResult = sdf.format(timestamp);
assertThat(result, is(expectedResult));
} catch (final Exception e) {
fail(e.getMessage());
Expand All @@ -187,7 +177,9 @@ public void shouldRoundTripWithParseTimestamp() {
.forEach(idx -> {
final Timestamp timestamp = new Timestamp(1538361611123L + idx);
final String result = udf.formatTimestamp(timestamp, pattern);
final String expectedResult = new SimpleDateFormat(pattern).format(timestamp);
final SimpleDateFormat sdf = new SimpleDateFormat(pattern);
sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
final String expectedResult = sdf.format(timestamp);
assertThat(result, is(expectedResult));

final Timestamp roundtripTimestamp = parseTimestamp.parseTimestamp(result, pattern);
Expand Down Expand Up @@ -225,7 +217,9 @@ public void shouldBehaveLikeSimpleDateFormat() {
}

private void assertLikeSimpleDateFormat(final String format) {
final String expected = new SimpleDateFormat(format).format(1538361611123L);
final SimpleDateFormat sdf = new SimpleDateFormat(format);
sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
final String expected = sdf.format(1538361611123L);
final Object result = new FormatTimestamp()
.formatTimestamp(new Timestamp(1538361611123L), format);
assertThat(result, is(expected));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.sql.Timestamp;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.TimeZone;
import java.util.stream.IntStream;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -43,10 +44,11 @@ public void shouldParseTimestamp() throws ParseException {
// When:
final Object result = udf.parseTimestamp("2021-12-01 12:10:11.123",
"yyyy-MM-dd HH:mm:ss.SSS");
final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
sdf.setTimeZone(TimeZone.getTimeZone("GMT"));

// Then:
final Timestamp expectedResult = Timestamp.from(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
.parse("2021-12-01 12:10:11.123").toInstant());
final Timestamp expectedResult = Timestamp.from(sdf.parse("2021-12-01 12:10:11.123").toInstant());
assertThat(result, is(expectedResult));
}

Expand All @@ -56,9 +58,11 @@ public void shouldSupportEmbeddedChars() throws ParseException {
final Object result = udf.parseTimestamp("2021-12-01T12:10:11.123Fred",
"yyyy-MM-dd'T'HH:mm:ss.SSS'Fred'");

final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Fred'");
sdf.setTimeZone(TimeZone.getTimeZone("GMT"));

// Then:
final Timestamp expectedResult = Timestamp.from(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Fred'")
.parse("2021-12-01T12:10:11.123Fred").toInstant());
final Timestamp expectedResult = Timestamp.from(sdf.parse("2021-12-01T12:10:11.123Fred").toInstant());
assertThat(result, is(expectedResult));
}

Expand Down Expand Up @@ -142,7 +146,9 @@ public void shouldWorkWithManyDifferentFormatters() {
final String sourceDate = "2018-12-01 10:12:13.456X" + idx;
final String pattern = "yyyy-MM-dd HH:mm:ss.SSS'X" + idx + "'";
final Timestamp result = udf.parseTimestamp(sourceDate, pattern);
final Timestamp expectedResult = Timestamp.from(new SimpleDateFormat(pattern).parse(sourceDate).toInstant());
final SimpleDateFormat sdf = new SimpleDateFormat(pattern);
sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
final Timestamp expectedResult = Timestamp.from(sdf.parse(sourceDate).toInstant());
assertThat(result, is(expectedResult));
} catch (final Exception e) {
fail(e.getMessage());
Expand Down Expand Up @@ -175,7 +181,9 @@ public void shouldBehaveLikeSimpleDateFormat() throws Exception {
}

private void assertLikeSimpleDateFormat(final String value, final String format) throws Exception {
final Timestamp expected = Timestamp.from(new SimpleDateFormat(format).parse(value).toInstant());
final SimpleDateFormat sdf = new SimpleDateFormat(format);
sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
final Timestamp expected = Timestamp.from(sdf.parse(value).toInstant());
final Object result = new ParseTimestamp().parseTimestamp(value, format);
assertThat(result, is(expected));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
{
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM TEST (K STRING KEY, ID BIGINT, NAME STRING, TIME STRING) WITH (KAFKA_TOPIC='test_topic', KEY_FORMAT='KAFKA', VALUE_FORMAT='DELIMITED');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "TEST",
"schema" : "`K` STRING KEY, `ID` BIGINT, `NAME` STRING, `TIME` STRING",
"topicName" : "test_topic",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "DELIMITED"
}
},
"orReplace" : false
}
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM TS AS SELECT\n TEST.K K,\n TEST.ID ID,\n PARSE_TIMESTAMP(TEST.TIME, 'yyyy-MM-dd''T''HH:mm:ss') TS\nFROM TEST TEST\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "TS",
"schema" : "`K` STRING KEY, `ID` BIGINT, `TS` TIMESTAMP",
"topicName" : "TS",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "DELIMITED"
}
},
"orReplace" : false
},
"queryPlan" : {
"sources" : [ "TEST" ],
"sink" : "TS",
"physicalPlan" : {
"@type" : "streamSinkV1",
"properties" : {
"queryContext" : "TS"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Project"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KsqlTopic/Source"
},
"topicName" : "test_topic",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "DELIMITED"
}
},
"sourceSchema" : "`K` STRING KEY, `ID` BIGINT, `NAME` STRING, `TIME` STRING"
},
"keyColumnNames" : [ "K" ],
"selectExpressions" : [ "ID AS ID", "PARSE_TIMESTAMP(TIME, 'yyyy-MM-dd''T''HH:mm:ss') AS TS" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "DELIMITED"
}
},
"topicName" : "TS"
},
"queryId" : "CSAS_TS_0"
}
} ],
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"ksql.security.extension.class" : null,
"metric.reporters" : "",
"ksql.transient.prefix" : "transient_",
"ksql.query.status.running.threshold.seconds" : "300",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.output.topic.name.prefix" : "",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.persistence.default.format.key" : "KAFKA",
"ksql.query.persistent.max.bytes.buffering.total" : "-1",
"ksql.query.error.max.queue.size" : "10",
"ksql.variable.substitution.enable" : "true",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.query.pull.max.qps" : "2147483647",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.query.pull.metrics.enabled" : "false",
"ksql.create.or.replace.enabled" : "true",
"ksql.metrics.extension" : null,
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.cast.strings.preserve.nulls" : "true",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.pull.queries.enable" : "true",
"ksql.suppress.enabled" : "false",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.query.persistent.active.limit" : "2147483647",
"ksql.persistence.wrap.single.values" : null,
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.query.retry.backoff.initial.ms" : "15000",
"ksql.query.transient.max.bytes.buffering.total" : "-1",
"ksql.schema.registry.url" : "",
"ksql.properties.overrides.denylist" : "",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.connect.url" : "http://localhost:8083",
"ksql.service.id" : "some.ksql.service.id",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.query.pull.table.scan.enabled" : "false",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.streams.topology.optimization" : "all",
"ksql.query.retry.backoff.max.ms" : "900000",
"ksql.streams.num.stream.threads" : "4",
"ksql.timestamp.throw.on.invalid" : "false",
"ksql.metrics.tags.custom" : "",
"ksql.persistence.default.format.value" : null,
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.udf.collect.metrics" : "false",
"ksql.query.pull.thread.pool.size" : "100",
"ksql.persistent.prefix" : "query_",
"ksql.metastore.backup.location" : "",
"ksql.error.classifier.regex" : "",
"ksql.suppress.buffer.size.bytes" : "-1"
}
}
Loading

0 comments on commit a5ea98a

Please sign in to comment.