Skip to content

Commit

Permalink
Timezone support
Browse files Browse the repository at this point in the history
There have been requests from users to support date/time partition in the S3/GCP/Azure sink with a specific timezone. The code changes brings the timezone configuration as an optional setting. When epoch configuration is involved, changing the timezone to non-UTC yields an error.

Removes the check plugin since the jar dependency is built with a newer JVM version and fails the build.
  • Loading branch information
stheppi committed Feb 23, 2024
1 parent e5e85df commit b6f9c66
Show file tree
Hide file tree
Showing 15 changed files with 523 additions and 134 deletions.
16 changes: 15 additions & 1 deletion InsertRollingWallclock.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ The value inserted is stored as a STRING, and it holds either a string represent
| `format` | Sets the format of the header value inserted if the type was set to string. It can be any valid java date format. | String | | | High |
| `rolling.window.type` | Sets the window type. It can be fixed or rolling. | String | minutes | hours, minutes, seconds | High |
| `rolling.window.size` | Sets the window size. It can be any positive integer, and depending on the `window.type` it has an upper bound, 60 for seconds and minutes, and 24 for hours. | Int | 15 | | High |
| `timezone` | Sets the timezone. It can be any valid java timezone. Overwrite it when `value.type` is set to `format`, otherwise it will raise an exception. | String | UTC | | High |

## Example

Expand All @@ -36,8 +37,21 @@ To store a string representation of the date and time in the format `yyyy-MM-dd
transforms=InsertRollingWallclock
transforms.InsertRollingWallclock.type=io.lenses.connect.smt.header.InsertRollingWallclock
transforms.InsertRollingWallclock.header.name=wallclock
transforms.InsertRollingWallclock.value.type=string
transforms.InsertRollingWallclock.value.type=format
transforms.InsertRollingWallclock.format=yyyy-MM-dd HH:mm:ss.SSS
transforms.InsertRollingWallclock.rolling.window.type=minutes
transforms.InsertRollingWallclock.rolling.window.size=15
```

To use the timezone `Asia/Kolkoata`, use the following:

```properties
transforms=InsertRollingWallclock
transforms.InsertRollingWallclock.type=io.lenses.connect.smt.header.InsertRollingWallclock
transforms.InsertRollingWallclock.header.name=wallclock
transforms.InsertRollingWallclock.value.type=format
transforms.InsertRollingWallclock.format=yyyy-MM-dd HH:mm:ss.SSS
transforms.InsertRollingWallclock.rolling.window.type=minutes
transforms.InsertRollingWallclock.rolling.window.size=15
transforms.InsertRollingWallclock.timezone=Asia/Kolkata
```
26 changes: 18 additions & 8 deletions InsertWallclock.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,12 @@ for example `yyyy-MM-dd HH:mm:ss.SSS`.

## Configuration

| Name | Description | Type | Default | Valid Values | Importance |
|---------------|-----------------------------------------------------------------------------------------------------------------------|--------|---------|--------------|------------|
| `header.name` | The name of the header to insert the timestamp into. | String | | | High |
| `value.type` | Sets the header value inserted. It can be epoch or string. If string is used, then the 'format' setting is required." | String | format | epoch,format | High |
| `format` | Sets the format of the header value inserted if the type was set to string. It can be any valid java date format. | String | | | High |


| Name | Description | Type | Default | Valid Values | Importance |
|---------------|------------------------------------------------------------------------------------------------------------------------------------------------|--------|---------|--------------|------------|
| `header.name` | The name of the header to insert the timestamp into. | String | | | High |
| `value.type` | Sets the header value inserted. It can be epoch or string. If string is used, then the 'format' setting is required." | String | format | epoch,format | High |
| `format` | Sets the format of the header value inserted if the type was set to string. It can be any valid java date format. | String | | | High |
| `timezone` | Sets the timezone. It can be any valid java timezone. Overwrite it when `value.type` is set to `format`, otherwise it will raise an exception. | String | UTC | | High |

## Example

Expand All @@ -35,6 +34,17 @@ To store a string representation of the date and time in the format `yyyy-MM-dd
transforms=InsertWallclock
transforms.InsertWallclock.type=io.lenses.connect.smt.header.InsertWallclock
transforms.InsertWallclock.header.name=wallclock
transforms.InsertWallclock.value.type=string
transforms.InsertWallclock.value.type=format
transforms.InsertWallclock.format=yyyy-MM-dd HH:mm:ss.SSS
```

To use the timezone `Asia/Kolkoata`, use the following:

```properties
transforms=InsertWallclock
transforms.InsertWallclock.type=io.lenses.connect.smt.header.InsertWallclock
transforms.InsertWallclock.header.name=wallclock
transforms.InsertWallclock.value.type=format
transforms.InsertWallclock.format=yyyy-MM-dd HH:mm:ss.SSS
transforms.InsertWallclock.timezone=Asia/Kolkata
```
23 changes: 16 additions & 7 deletions InsertWallclockDateTimePart.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@

## Description

A Kafka Connect Single Message Transform (SMT) that inserts the system clock year, month, day, minute, or seconds as a message header, with a value of type STRING.
A Kafka Connect Single Message Transform (SMT) that inserts the system clock year, month, day, minute, or seconds as a
message header, with a value of type STRING.

## Configuration


| Name | Description | Type | Default | Valid Values | Importance |
|------------------|------------------------------------------------------|--------|---------|---------------------------------------|------------|
| `header.name` | The name of the header to insert the timestamp into. | String | | | High |
| `date.time.part` | The date time part to insert. | String | | year, month, day, hour,minute, second | High |

| Name | Description | Type | Default | Valid Values | Importance |
|------------------|-------------------------------------------------------|--------|---------|---------------------------------------|------------|
| `header.name` | The name of the header to insert the timestamp into. | String | | | High |
| `date.time.part` | The date time part to insert. | String | | year, month, day, hour,minute, second | High |
| `timezone` | Sets the timezone. It can be any valid java timezone. | String | UTC | | High |

## Example

Expand Down Expand Up @@ -50,6 +50,15 @@ transforms.InsertWallclockDateTimePart.type=io.lenses.connect.smt.header.InsertW
transforms.InsertWallclockDateTimePart.header.name=wallclock
transforms.InsertWallclockDateTimePart.date.time.part=hour
```
To store the hour, and apply a timezone, use the following configuration:

```properties
transforms=InsertWallclockDateTimePart
transforms.InsertWallclockDateTimePart.type=io.lenses.connect.smt.header.InsertWallclockDateTimePart
transforms.InsertWallclockDateTimePart.header.name=wallclock
transforms.InsertWallclockDateTimePart.date.time.part=hour
transforms.InsertWallclockDateTimePart.timezone=Asia/Kolkata
```

To store the minute, use the following configuration:

Expand Down
28 changes: 20 additions & 8 deletions TimestampConverter.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

## Description

An adapted version of the [TimestampConverter](https://github.com/apache/kafka/blob/5c2492bca71200806ccf776ea31639a90290d43e/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java#L50) SMT, that allows the user to specify the format of the timestamp inserted as a header.
An adapted version of
the [TimestampConverter](https://github.com/apache/kafka/blob/5c2492bca71200806ccf776ea31639a90290d43e/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java#L50)
SMT, that allows the user to specify the format of the timestamp inserted as a header.
It also avoids the synchronization block requirement for converting to a string representation of the timestamp.

The SMT adds a few more features to the original:
Expand All @@ -12,10 +14,8 @@ The SMT adds a few more features to the original:
* allows conversion from one string representation to another (e.g. `yyyy-MM-dd HH:mm:ss` to `yyyy-MM-dd`)
* allows conversion using a rolling window boundary (e.g. every 15 minutes, or one hour)


## Configuration


| Name | Description | Type | Default | Valid Values |
|-----------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------|--------------|--------------------------------------------------|
| `header.name` | The name of the header to insert the timestamp into. | String | | |
Expand All @@ -26,11 +26,12 @@ The SMT adds a few more features to the original:
| `rolling.window.type` | An optional parameter for the rolling time window type. When set it will adjust the output value according to the time window boundary. | String | none | none, hours, minutes, seconds |
| `rolling.window.size` | An optional positive integer parameter for the rolling time window size. When `rolling.window.type` is defined this setting is required. The value is bound by the `rolling.window.type` configuration. If type is `minutes` or `seconds` then the value cannot bigger than 60, and if the type is `hours` then the max value is 24. | Int | 15 | |
| `unix.precision` | The desired Unix precision for the timestamp. Used to generate the output when type=unix or used to parse the input if the input is a Long. This SMT will cause precision loss during conversions from, and to, values with sub-millisecond components. | String | milliseconds | seconds, milliseconds, microseconds, nanoseconds |

| `timezone` | Sets the timezone. It can be any valid java timezone. Overwrite it when `target.type` is set to `date, time, or string`, otherwise it will raise an exception. | String | UTC | |

## Example

To convert to and from a string representation of the date and time in the format `yyyy-MM-dd HH:mm:ss.SSS`, use the following configuration:
To convert to and from a string representation of the date and time in the format `yyyy-MM-dd HH:mm:ss.SSS`, use the
following configuration:

```properties
transforms=TimestampConverter
Expand All @@ -44,7 +45,6 @@ transforms.TimestampConverter.format.to.pattern=yyyy-MM-dd HH:mm:ss.SSS

To convert to and from a string representation while applying an hourly rolling window:


```properties
transforms=TimestampConverter
transforms.TimestampConverter.type=io.lenses.connect.smt.header.TimestampConverter
Expand All @@ -57,8 +57,21 @@ transforms.TimestampConverter.rolling.window.type=hours
transforms.TimestampConverter.rolling.window.size=1
```

To convert to and from a string representation while applying a 15 minutes rolling window:
To convert to and from a string representation while applying an hourly rolling window and timezone:

```properties
transforms=TimestampConverter
transforms.TimestampConverter.type=io.lenses.connect.smt.header.TimestampConverter
transforms.TimestampConverter.header.name=wallclock
transforms.TimestampConverter.field=_value.ts
transforms.TimestampConverter.target.type=string
transforms.TimestampConverter.format.from.pattern=yyyyMMddHHmmssSSS
transforms.TimestampConverter.format.to.pattern=yyyy-MM-dd-HH
transforms.TimestampConverter.rolling.window.type=hours
transforms.TimestampConverter.rolling.window.size=1
transforms.TimestampConverter.timezone=Asia/Kolkata
```
To convert to and from a string representation while applying a 15 minutes rolling window:

```properties
transforms=TimestampConverter
Expand All @@ -72,7 +85,6 @@ transforms.TimestampConverter.rolling.window.type=minutes
transforms.TimestampConverter.rolling.window.size=15
```


To convert to and from a Unix timestamp, use the following:

```properties
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
</execution>
</executions>
</plugin>
<plugin>
<!--<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>3.1.2</version>
Expand Down Expand Up @@ -114,7 +114,7 @@
<version>10.12.1</version>
</dependency>
</dependencies>
</plugin>
</plugin>-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/io/lenses/connect/smt/header/Constants.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.lenses.connect.smt.header;

import java.time.ZoneId;

public class Constants {
public static final ZoneId UTC = ZoneId.of("UTC");
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@

import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.Map;
import java.util.TimeZone;
import java.util.function.Supplier;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
Expand Down Expand Up @@ -44,6 +46,7 @@ public class InsertRollingWallclock<R extends ConnectRecord<R>> implements Trans
private static final int DEFAULT_ROLLING_WINDOW_VALUE = 15;
private static final RollingWindow DEFAULT_ROLLING_WINDOW = RollingWindow.MINUTES;
private static final DateTimeFormatter DEFAULT_FORMATTER = DateTimeFormatter.ISO_LOCAL_DATE_TIME;
private ZoneId timezone = Constants.UTC;

private interface ConfigName {
String HEADER_NAME_CONFIG = "header.name";
Expand All @@ -56,6 +59,8 @@ private interface ConfigName {

String VALUE_TYPE_EPOCH = "epoch";
String VALUE_TYPE_FORMAT = "format";

String TIMEZONE_CONFIG = "timezone";
}

public static final ConfigDef CONFIG_DEF =
Expand Down Expand Up @@ -90,7 +95,13 @@ private interface ConfigName {
ConfigDef.Importance.HIGH,
"The rolling window size. For example, if the rolling window is set to 'minutes' "
+ "and the rolling window value is set to 15, then the rolling window "
+ "is 15 minutes.");
+ "is 15 minutes.")
.define(
ConfigName.TIMEZONE_CONFIG,
ConfigDef.Type.STRING,
"UTC",
ConfigDef.Importance.HIGH,
"The timezone used when 'value.type' is set to format.");

/**
* Used to testing only to inject the instant value.
Expand Down Expand Up @@ -144,6 +155,24 @@ public void configure(Map<String, ?> props) {
+ ConfigName.VALUE_TYPE_CONFIG
+ "' must be set to either 'epoch' or 'format'.");
}
final String timezoneStr = config.getString(ConfigName.TIMEZONE_CONFIG);
try {
this.timezone = TimeZone.getTimeZone(timezoneStr).toZoneId();
} catch (Exception e) {
throw new ConfigException(
"Configuration '"
+ ConfigName.TIMEZONE_CONFIG
+ "' is not a valid timezone. It can be any valid java timezone.");
}
if (!this.timezone.getId().equals(Constants.UTC.getId())
&& valueType.equalsIgnoreCase(ConfigName.VALUE_TYPE_EPOCH)) {
throw new ConfigException(
"Configuration '"
+ ConfigName.TIMEZONE_CONFIG
+ "' is not allowed to be set to a value other than UTC when '"
+ ConfigName.VALUE_TYPE_CONFIG
+ "' is set to 'epoch'.");
}
if (valueType.equalsIgnoreCase(ConfigName.VALUE_TYPE_FORMAT)) {
final String pattern = config.getString(ConfigName.FORMAT_CONFIG);
if (pattern == null) {
Expand All @@ -157,6 +186,7 @@ public void configure(Map<String, ?> props) {
}
}
valueExtractorF = this::getFormattedValue;
format = format.withZone(timezone);
} else {
valueExtractorF = this::getEpochValue;
}
Expand Down Expand Up @@ -190,7 +220,8 @@ private String getEpochValue() {
}

private String getFormattedValue() {
Instant wallclock = rollingWindowDetails.adjust(instantF.get());
Instant now = instantF.get();
Instant wallclock = rollingWindowDetails.adjust(now);

OffsetDateTime dateTime = OffsetDateTime.ofInstant(wallclock, ZoneOffset.UTC);
return format.format(dateTime);
Expand Down
Loading

0 comments on commit b6f9c66

Please sign in to comment.