Skip to content

Commit

Permalink
Formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
davidsloan committed Sep 13, 2024
1 parent 3b86cc9 commit 49e5aec
Show file tree
Hide file tree
Showing 11 changed files with 355 additions and 319 deletions.
126 changes: 65 additions & 61 deletions src/main/java/io/lenses/connect/smt/header/MultiDateTimeFormatter.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at: http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
* for the specific language governing permissions and limitations under the License.
*/
package io.lenses.connect.smt.header;

import org.apache.kafka.common.config.ConfigException;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
Expand All @@ -10,81 +18,77 @@
import java.util.List;
import java.util.Locale;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.ConfigException;

class MultiDateTimeFormatter {

private final List<DateTimeFormatter> formatters;
private final List<String> patterns;
private final Boolean returnNowIfNull;
private final List<DateTimeFormatter> formatters;
private final List<String> patterns;
private final Boolean returnNowIfNull;

public MultiDateTimeFormatter(
List<String> patterns,
List<DateTimeFormatter> formatters,
Boolean returnNowIfNull
) {
this.patterns = patterns;
this.formatters = formatters;
this.returnNowIfNull = returnNowIfNull;
}
public MultiDateTimeFormatter(
List<String> patterns, List<DateTimeFormatter> formatters, Boolean returnNowIfNull) {
this.patterns = patterns;
this.formatters = formatters;
this.returnNowIfNull = returnNowIfNull;
}

public Instant format(String value, ZoneId zoneId) {
if (value == null && returnNowIfNull) {
return LocalDateTime.now().atZone(zoneId).toInstant();
} else if (value == null) {
throw new DateTimeParseException("No valid date time provided", "null", 0);
}
for (DateTimeFormatter formatter : formatters) {
try {
LocalDateTime localDateTime = LocalDateTime.parse( value, formatter);
return localDateTime.atZone(zoneId).toInstant();
} catch (DateTimeParseException dtpe) {
// ignore exception and use fallback
}
}
throw new DateTimeParseException("Cannot parse date with any formats", value, 0);
public Instant format(String value, ZoneId zoneId) {
if (value == null && returnNowIfNull) {
return LocalDateTime.now().atZone(zoneId).toInstant();
} else if (value == null) {
throw new DateTimeParseException("No valid date time provided", "null", 0);
}


public String getDisplayPatterns() {
return String.join(", ", patterns);
for (DateTimeFormatter formatter : formatters) {
try {
LocalDateTime localDateTime = LocalDateTime.parse(value, formatter);
return localDateTime.atZone(zoneId).toInstant();
} catch (DateTimeParseException dtpe) {
// ignore exception and use fallback
}
}
throw new DateTimeParseException("Cannot parse date with any formats", value, 0);
}

public String getDisplayPatterns() {
return String.join(", ", patterns);
}

private static DateTimeFormatter createFormatter(String pattern, String configName, Locale locale, ZoneId zoneId) {
private static DateTimeFormatter createFormatter(
String pattern, String configName, Locale locale, ZoneId zoneId) {
try {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(pattern);
if (locale != null) {
formatter = formatter.withLocale(locale);
}
if (zoneId != null) {
formatter = formatter.withZone(zoneId);
}
return formatter;
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(pattern);
if (locale != null) {
formatter = formatter.withLocale(locale);
}
if (zoneId != null) {
formatter = formatter.withZone(zoneId);
}
return formatter;
} catch (IllegalArgumentException e) {
throw new ConfigException("Configuration '" + configName + "' is not a valid date format.");
throw new ConfigException("Configuration '" + configName + "' is not a valid date format.");
}
}
}

public static MultiDateTimeFormatter createDateTimeFormatter(
List<String> patternConfigs, String configName, Locale locale) {
public static MultiDateTimeFormatter createDateTimeFormatter(
List<String> patternConfigs, String configName, Locale locale) {

return new MultiDateTimeFormatter(
patternConfigs,
patternConfigs.stream()
.map(patternConfig -> createFormatter(patternConfig, configName, locale, null))
.collect(Collectors.toUnmodifiableList()),
false
);
}
patternConfigs,
patternConfigs.stream()
.map(patternConfig -> createFormatter(patternConfig, configName, locale, null))
.collect(Collectors.toUnmodifiableList()),
false);
}

public static MultiDateTimeFormatter createDateTimeFormatter(
List<String> patternConfigs, String configName, ZoneId zoneId) {
public static MultiDateTimeFormatter createDateTimeFormatter(
List<String> patternConfigs, String configName, ZoneId zoneId) {

return new MultiDateTimeFormatter(
patternConfigs,
patternConfigs.stream()
.map(patternConfig -> createFormatter(patternConfig, configName, null, zoneId))
.collect(Collectors.toUnmodifiableList()),
true);
}
patternConfigs,
patternConfigs.stream()
.map(patternConfig -> createFormatter(patternConfig, configName, null, zoneId))
.collect(Collectors.toUnmodifiableList()),
true);
}
}
58 changes: 30 additions & 28 deletions src/main/java/io/lenses/connect/smt/header/PropsFormatter.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,39 +10,41 @@
*/
package io.lenses.connect.smt.header;

import org.apache.kafka.connect.transforms.util.SimpleConfig;

import java.util.Comparator;
import java.util.Map;
import org.apache.kafka.connect.transforms.util.SimpleConfig;

/**
* This class is responsible for formatting properties from a SimpleConfig object.
* It converts the properties into a string representation in a json-like format.
* This class is responsible for formatting properties from a SimpleConfig object. It converts the
* properties into a string representation in a json-like format.
*/
public class PropsFormatter {

private final SimpleConfig simpleConfig;
private final SimpleConfig simpleConfig;

/**
* Constructs a new PropsFormatter with the given SimpleConfig.
*
* @param simpleConfig the SimpleConfig object containing the properties to be formatted
*/
public PropsFormatter(SimpleConfig simpleConfig) {
this.simpleConfig = simpleConfig;
}
/**
* Constructs a new PropsFormatter with the given SimpleConfig.
*
* @param simpleConfig the SimpleConfig object containing the properties to be formatted
*/
public PropsFormatter(SimpleConfig simpleConfig) {
this.simpleConfig = simpleConfig;
}

/**
* Formats the properties from the SimpleConfig object into a string.
* The properties are represented as key-value pairs in the format: "key: "value"".
* All properties are enclosed in curly braces.
*
* @return a string representation of the properties
*/
public String apply() {
StringBuilder sb = new StringBuilder("{");
simpleConfig.originalsStrings().entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(entry -> sb.append(entry.getKey()).append(": \"").append(entry.getValue()).append("\", "));
sb.delete(sb.length() - 2, sb.length());
return sb.append("}").toString();
}
}
/**
* Formats the properties from the SimpleConfig object into a string. The properties are
* represented as key-value pairs in the format: "key: "value"". All properties are enclosed in
* curly braces.
*
* @return a string representation of the properties
*/
public String apply() {
StringBuilder sb = new StringBuilder("{");
simpleConfig.originalsStrings().entrySet().stream()
.sorted(Map.Entry.comparingByKey())
.forEach(
entry ->
sb.append(entry.getKey()).append(": \"").append(entry.getValue()).append("\", "));
sb.delete(sb.length() - 2, sb.length());
return sb.append("}").toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.Locale;
import java.util.Optional;
Expand Down Expand Up @@ -110,7 +109,8 @@ public Instant getInstant(R r) {
+ " instead.");
}

return convertToTimestamp(extractedValue, unixPrecision, fromPattern, timeZone, propsFormatter);
return convertToTimestamp(
extractedValue, unixPrecision, fromPattern, timeZone, propsFormatter);
}
}

Expand Down Expand Up @@ -155,7 +155,12 @@ public static <R extends ConnectRecord<R>> RecordFieldTimestamp<R> create(
MultiDateTimeFormatter.createDateTimeFormatter(
patterns, FORMAT_FROM_CONFIG, locale));

return new RecordFieldTimestamp<>(fieldTypeAndFields, fromPattern, unixPrecision, zoneId, Optional.of(new PropsFormatter(config)));
return new RecordFieldTimestamp<>(
fieldTypeAndFields,
fromPattern,
unixPrecision,
zoneId,
Optional.of(new PropsFormatter(config)));
}

public static ConfigDef extendConfigDef(ConfigDef from) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,10 +430,12 @@ public void configure(Map<String, ?> configs) {
throw new ConfigException("TimestampConverter requires header key to be specified");
}

MultiDateTimeFormatter fromPattern = Optional
.ofNullable(simpleConfig.getList(FORMAT_FROM_CONFIG))
.map(fromFormatPattern -> MultiDateTimeFormatter.createDateTimeFormatter(
fromFormatPattern, FORMAT_FROM_CONFIG, Constants.UTC.toZoneId()))
MultiDateTimeFormatter fromPattern =
Optional.ofNullable(simpleConfig.getList(FORMAT_FROM_CONFIG))
.map(
fromFormatPattern ->
MultiDateTimeFormatter.createDateTimeFormatter(
fromFormatPattern, FORMAT_FROM_CONFIG, Constants.UTC.toZoneId()))
.orElse(null);

String toFormatPattern = simpleConfig.getString(FORMAT_TO_CONFIG);
Expand Down
18 changes: 13 additions & 5 deletions src/main/java/io/lenses/connect/smt/header/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,26 @@
import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.Date;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;

class Utils {

static Instant convertToTimestamp(
Object value, String unixPrecision, Optional<MultiDateTimeFormatter> fromPattern, ZoneId zoneId, Optional<PropsFormatter> propsFormatter) {
static Instant convertToTimestamp(
Object value,
String unixPrecision,
Optional<MultiDateTimeFormatter> fromPattern,
ZoneId zoneId,
Optional<PropsFormatter> propsFormatter) {
if (value == null) {
return Instant.now();
}
Expand Down Expand Up @@ -74,7 +76,13 @@ static Instant convertToTimestamp(
try {
return Instant.ofEpochMilli(Long.parseLong((String) value));
} catch (NumberFormatException e) {
throw new DataException("Expected a long, but found " + value + ". Props: " + propsFormatter.map(PropsFormatter::apply).orElse("(No props formatter)"));
throw new DataException(
"Expected a long, but found "
+ value
+ ". Props: "
+ propsFormatter
.map(PropsFormatter::apply)
.orElse("(No props formatter)"));
}
});
}
Expand Down
Loading

0 comments on commit 49e5aec

Please sign in to comment.