Skip to content

Commit

Permalink
API: implement types timestamp_ns and timestamptz_ns
Browse files Browse the repository at this point in the history
Helps #8657

This change adds field `ChronoUnit unit` to `TimestampType`, such that
`TimestampType` now represents four specified types:
- `timestamp` (existing)
- `timestamptz` (existing)
- `timestamp_ns` (new #8683)
- `timestamptz_ns` (new #8683)

Note that TimestampType.with[out]Zone() are marked as deprecated in this
change. In future PRs, I'll remove usage of these static methods.
  • Loading branch information
jacobmarble authored and epgif committed Jan 30, 2024
1 parent 20ff1ab commit f3dad15
Show file tree
Hide file tree
Showing 35 changed files with 911 additions and 280 deletions.
111 changes: 67 additions & 44 deletions api/src/main/java/org/apache/iceberg/expressions/ExpressionUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,28 @@
import org.apache.iceberg.transforms.Transforms;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.DateTimeUtil;

/** Expression utility methods. */
public class ExpressionUtil {
private static final Function<Object, Integer> HASH_FUNC =
Transforms.bucket(Integer.MAX_VALUE).bind(Types.StringType.get());
private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
private static final long FIVE_MINUTES_IN_MICROS = TimeUnit.MINUTES.toMicros(5);
private static final long FIVE_MINUTES_IN_MILLIS = TimeUnit.MINUTES.toMillis(5);
private static final long THREE_DAYS_IN_HOURS = TimeUnit.DAYS.toHours(3);
private static final long NINETY_DAYS_IN_HOURS = TimeUnit.DAYS.toHours(90);
private static final Pattern DATE = Pattern.compile("\\d{4}-\\d{2}-\\d{2}");
private static final Pattern TIME = Pattern.compile("\\d{2}:\\d{2}(:\\d{2}(.\\d{1,9})?)?");
private static final Pattern TIMESTAMP =
Pattern.compile("\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}(:\\d{2}(.\\d{1,9})?)?");
Pattern.compile("\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}(:\\d{2}(.\\d{1,6})?)?");
private static final Pattern TIMESTAMPNS =
Pattern.compile("\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}(:\\d{2}(.\\d{7,9})?)?");
private static final Pattern TIMESTAMPTZ =
Pattern.compile(
"\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}(:\\d{2}(.\\d{1,9})?)?([-+]\\d{2}:\\d{2}|Z)");
"\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}(:\\d{2}(.\\d{1,6})?)?([-+]\\d{2}:\\d{2}|Z)");
private static final Pattern TIMESTAMPTZNS =
Pattern.compile(
"\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}(:\\d{2}(.\\d{7,9})?)?([-+]\\d{2}:\\d{2}|Z)");
static final int LONG_IN_PREDICATE_ABBREVIATION_THRESHOLD = 10;
private static final int LONG_IN_PREDICATE_ABBREVIATION_MIN_GAIN = 5;

Expand Down Expand Up @@ -247,13 +253,12 @@ public static <T> UnboundTerm<T> unbind(Term term) {

private static class ExpressionSanitizer
extends ExpressionVisitors.ExpressionVisitor<Expression> {
private final long now;
private final long nowMillis;
private final int today;

private ExpressionSanitizer() {
long nowMillis = System.currentTimeMillis();
OffsetDateTime nowDateTime = Instant.ofEpochMilli(nowMillis).atOffset(ZoneOffset.UTC);
this.now = nowMillis * 1000;
this.nowMillis = System.currentTimeMillis();
OffsetDateTime nowDateTime = Instant.ofEpochMilli(this.nowMillis).atOffset(ZoneOffset.UTC);
this.today = (int) ChronoUnit.DAYS.between(EPOCH, nowDateTime);
}

Expand Down Expand Up @@ -293,13 +298,13 @@ public <T> Expression predicate(BoundPredicate<T> pred) {
return new UnboundPredicate<>(
pred.op(),
unbind(pred.term()),
(T) sanitize(bound.term().type(), bound.literal(), now, today));
(T) sanitize(bound.term().type(), bound.literal(), nowMillis, today));
} else if (pred.isSetPredicate()) {
BoundSetPredicate<T> bound = (BoundSetPredicate<T>) pred;
Iterable<T> iter =
() ->
bound.literalSet().stream()
.map(lit -> (T) sanitize(bound.term().type(), lit, now, today))
.map(lit -> (T) sanitize(bound.term().type(), lit, nowMillis, today))
.iterator();
return new UnboundPredicate<>(pred.op(), unbind(pred.term()), iter);
}
Expand All @@ -326,11 +331,11 @@ public <T> Expression predicate(UnboundPredicate<T> pred) {
case STARTS_WITH:
case NOT_STARTS_WITH:
return new UnboundPredicate<>(
pred.op(), pred.term(), (T) sanitize(pred.literal(), now, today));
pred.op(), pred.term(), (T) sanitize(pred.literal(), nowMillis, today));
case IN:
case NOT_IN:
Iterable<String> iter =
() -> pred.literals().stream().map(lit -> sanitize(lit, now, today)).iterator();
() -> pred.literals().stream().map(lit -> sanitize(lit, nowMillis, today)).iterator();
return new UnboundPredicate<>(pred.op(), pred.term(), (Iterable<T>) iter);
default:
throw new UnsupportedOperationException(
Expand All @@ -340,13 +345,12 @@ public <T> Expression predicate(UnboundPredicate<T> pred) {
}

private static class StringSanitizer extends ExpressionVisitors.ExpressionVisitor<String> {
private final long nowMicros;
private final long nowMillis;
private final int today;

private StringSanitizer() {
long nowMillis = System.currentTimeMillis();
OffsetDateTime nowDateTime = Instant.ofEpochMilli(nowMillis).atOffset(ZoneOffset.UTC);
this.nowMicros = nowMillis * 1000;
this.nowMillis = System.currentTimeMillis();
OffsetDateTime nowDateTime = Instant.ofEpochMilli(this.nowMillis).atOffset(ZoneOffset.UTC);
this.today = (int) ChronoUnit.DAYS.between(EPOCH, nowDateTime);
}

Expand Down Expand Up @@ -376,7 +380,7 @@ public String or(String leftResult, String rightResult) {
}

private String value(BoundLiteralPredicate<?> pred) {
return sanitize(pred.term().type(), pred.literal().value(), nowMicros, today);
return sanitize(pred.term().type(), pred.literal().value(), nowMillis, today);
}

@Override
Expand Down Expand Up @@ -408,7 +412,7 @@ public <T> String predicate(BoundPredicate<T> pred) {
+ " IN "
+ abbreviateValues(
pred.asSetPredicate().literalSet().stream()
.map(lit -> sanitize(pred.term().type(), lit, nowMicros, today))
.map(lit -> sanitize(pred.term().type(), lit, nowMillis, today))
.collect(Collectors.toList()))
.stream()
.collect(Collectors.joining(", ", "(", ")"));
Expand All @@ -417,7 +421,7 @@ public <T> String predicate(BoundPredicate<T> pred) {
+ " NOT IN "
+ abbreviateValues(
pred.asSetPredicate().literalSet().stream()
.map(lit -> sanitize(pred.term().type(), lit, nowMicros, today))
.map(lit -> sanitize(pred.term().type(), lit, nowMillis, today))
.collect(Collectors.toList()))
.stream()
.collect(Collectors.joining(", ", "(", ")"));
Expand All @@ -444,23 +448,23 @@ public <T> String predicate(UnboundPredicate<T> pred) {
case NOT_NAN:
return "not_nan(" + term + ")";
case LT:
return term + " < " + sanitize(pred.literal(), nowMicros, today);
return term + " < " + sanitize(pred.literal(), nowMillis, today);
case LT_EQ:
return term + " <= " + sanitize(pred.literal(), nowMicros, today);
return term + " <= " + sanitize(pred.literal(), nowMillis, today);
case GT:
return term + " > " + sanitize(pred.literal(), nowMicros, today);
return term + " > " + sanitize(pred.literal(), nowMillis, today);
case GT_EQ:
return term + " >= " + sanitize(pred.literal(), nowMicros, today);
return term + " >= " + sanitize(pred.literal(), nowMillis, today);
case EQ:
return term + " = " + sanitize(pred.literal(), nowMicros, today);
return term + " = " + sanitize(pred.literal(), nowMillis, today);
case NOT_EQ:
return term + " != " + sanitize(pred.literal(), nowMicros, today);
return term + " != " + sanitize(pred.literal(), nowMillis, today);
case IN:
return term
+ " IN "
+ abbreviateValues(
pred.literals().stream()
.map(lit -> sanitize(lit, nowMicros, today))
.map(lit -> sanitize(lit, nowMillis, today))
.collect(Collectors.toList()))
.stream()
.collect(Collectors.joining(", ", "(", ")"));
Expand All @@ -469,14 +473,14 @@ public <T> String predicate(UnboundPredicate<T> pred) {
+ " NOT IN "
+ abbreviateValues(
pred.literals().stream()
.map(lit -> sanitize(lit, nowMicros, today))
.map(lit -> sanitize(lit, nowMillis, today))
.collect(Collectors.toList()))
.stream()
.collect(Collectors.joining(", ", "(", ")"));
case STARTS_WITH:
return term + " STARTS WITH " + sanitize(pred.literal(), nowMicros, today);
return term + " STARTS WITH " + sanitize(pred.literal(), nowMillis, today);
case NOT_STARTS_WITH:
return term + " NOT STARTS WITH " + sanitize(pred.literal(), nowMicros, today);
return term + " NOT STARTS WITH " + sanitize(pred.literal(), nowMillis, today);
default:
throw new UnsupportedOperationException(
"Cannot sanitize unsupported predicate type: " + pred.op());
Expand All @@ -501,7 +505,7 @@ private static <T> List<String> abbreviateValues(List<String> sanitizedValues) {
return sanitizedValues;
}

private static String sanitize(Type type, Object value, long now, int today) {
private static String sanitize(Type type, Object value, long nowMillis, int today) {
switch (type.typeId()) {
case INTEGER:
case LONG:
Expand All @@ -514,9 +518,9 @@ private static String sanitize(Type type, Object value, long now, int today) {
case TIME:
return "(time)";
case TIMESTAMP:
return sanitizeTimestamp((long) value, now);
return sanitizeTimestamp(((Types.TimestampType) type).unit(), (long) value, nowMillis);
case STRING:
return sanitizeString((CharSequence) value, now, today);
return sanitizeString((CharSequence) value, nowMillis, today);
case BOOLEAN:
case UUID:
case DECIMAL:
Expand All @@ -529,13 +533,14 @@ private static String sanitize(Type type, Object value, long now, int today) {
String.format("Cannot sanitize value for unsupported type %s: %s", type, value));
}

private static String sanitize(Literal<?> literal, long now, int today) {
private static String sanitize(Literal<?> literal, long nowMillis, int today) {
if (literal instanceof Literals.StringLiteral) {
return sanitizeString(((Literals.StringLiteral) literal).value(), now, today);
return sanitizeString(((Literals.StringLiteral) literal).value(), nowMillis, today);
} else if (literal instanceof Literals.DateLiteral) {
return sanitizeDate(((Literals.DateLiteral) literal).value(), today);
} else if (literal instanceof Literals.TimestampLiteral) {
return sanitizeTimestamp(((Literals.TimestampLiteral) literal).value(), now);
Literals.TimestampLiteral tsLiteral = ((Literals.TimestampLiteral) literal);
return sanitizeTimestamp(tsLiteral.unit(), tsLiteral.value(), nowMillis);
} else if (literal instanceof Literals.TimeLiteral) {
return "(time)";
} else if (literal instanceof Literals.IntegerLiteral) {
Expand Down Expand Up @@ -564,14 +569,26 @@ private static String sanitizeDate(int days, int today) {
return "(date)";
}

private static String sanitizeTimestamp(long micros, long now) {
String isPast = now > micros ? "ago" : "from-now";
long diff = Math.abs(now - micros);
if (diff < FIVE_MINUTES_IN_MICROS) {
private static String sanitizeTimestamp(ChronoUnit unit, long timeUnits, long nowMillis) {
long timeMillis;
switch (unit) {
case MICROS:
timeMillis = DateTimeUtil.microsToMillis(timeUnits);
break;
case NANOS:
timeMillis = DateTimeUtil.nanosToMillis(timeUnits);
break;
default:
throw new UnsupportedOperationException("Unsupported timestamp unit: " + unit);
}

long diff = Math.abs(nowMillis - timeMillis);
if (diff < FIVE_MINUTES_IN_MILLIS) {
return "(timestamp-about-now)";
}

long hours = TimeUnit.MICROSECONDS.toHours(diff);
String isPast = nowMillis > timeMillis ? "ago" : "from-now";
long hours = DateTimeUtil.millisToHours(diff);
if (hours <= THREE_DAYS_IN_HOURS) {
return "(timestamp-" + hours + "-hours-" + isPast + ")";
} else if (hours < NINETY_DAYS_IN_HOURS) {
Expand All @@ -589,17 +606,23 @@ private static String sanitizeNumber(Number value, String type) {
return "(" + numDigits + "-digit-" + type + ")";
}

private static String sanitizeString(CharSequence value, long now, int today) {
private static String sanitizeString(CharSequence value, long nowMillis, int today) {
try {
if (DATE.matcher(value).matches()) {
Literal<Integer> date = Literal.of(value).to(Types.DateType.get());
return sanitizeDate(date.value(), today);
} else if (TIMESTAMP.matcher(value).matches()) {
Literal<Long> ts = Literal.of(value).to(Types.TimestampType.withoutZone());
return sanitizeTimestamp(ts.value(), now);
Literal<Long> ts = Literal.of(value).to(Types.TimestampType.microsWithoutZone());
return sanitizeTimestamp(ChronoUnit.MICROS, ts.value(), nowMillis);
} else if (TIMESTAMPNS.matcher(value).matches()) {
Literal<Long> ts = Literal.of(value).to(Types.TimestampType.nanosWithoutZone());
return sanitizeTimestamp(ChronoUnit.NANOS, ts.value(), nowMillis);
} else if (TIMESTAMPTZ.matcher(value).matches()) {
Literal<Long> ts = Literal.of(value).to(Types.TimestampType.withZone());
return sanitizeTimestamp(ts.value(), now);
Literal<Long> ts = Literal.of(value).to(Types.TimestampType.microsWithZone());
return sanitizeTimestamp(ChronoUnit.MICROS, ts.value(), nowMillis);
} else if (TIMESTAMPTZNS.matcher(value).matches()) {
Literal<Long> ts = Literal.of(value).to(Types.TimestampType.nanosWithZone());
return sanitizeTimestamp(ChronoUnit.NANOS, ts.value(), nowMillis);
} else if (TIME.matcher(value).matches()) {
return "(time)";
} else {
Expand Down
82 changes: 62 additions & 20 deletions api/src/main/java/org/apache/iceberg/expressions/Literals.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.TimestampType;
import org.apache.iceberg.util.ByteBuffers;
import org.apache.iceberg.util.DateTimeUtil;
import org.apache.iceberg.util.NaNUtil;

class Literals {
Expand Down Expand Up @@ -298,7 +300,7 @@ public <T> Literal<T> to(Type type) {
case TIME:
return (Literal<T>) new TimeLiteral(value());
case TIMESTAMP:
return (Literal<T>) new TimestampLiteral(value());
return (Literal<T>) new TimestampLiteral(((TimestampType) type).unit(), value());
case DATE:
if ((long) Integer.MAX_VALUE < value()) {
return aboveMax();
Expand Down Expand Up @@ -426,23 +428,55 @@ protected Type.TypeID typeId() {
}

static class TimestampLiteral extends ComparableLiteral<Long> {
TimestampLiteral(Long value) {
private final ChronoUnit unit;

TimestampLiteral(ChronoUnit unit, Long value) {
super(value);
this.unit = unit;
}

@Override
@SuppressWarnings("unchecked")
public <T> Literal<T> to(Type type) {
switch (type.typeId()) {
case TIMESTAMP:
return (Literal<T>) this;
ChronoUnit toUnit = ((TimestampType) type).unit();
switch (unit) {
case MICROS:
switch (toUnit) {
case MICROS:
return (Literal<T>) this;
case NANOS:
return (Literal<T>)
new TimestampLiteral(unit, DateTimeUtil.microsToNanos(value()));
}
break;
case NANOS:
switch (toUnit) {
case MICROS:
return (Literal<T>)
new TimestampLiteral(unit, DateTimeUtil.nanosToMicros(value()));
case NANOS:
return (Literal<T>) this;
}
break;
}
break;
case DATE:
return (Literal<T>)
new DateLiteral(
(int)
ChronoUnit.DAYS.between(
EPOCH_DAY, EPOCH.plus(value(), ChronoUnit.MICROS).toLocalDate()));
default:
switch (unit) {
case MICROS:
return (Literal<T>)
new DateLiteral(
(int)
ChronoUnit.DAYS.between(
EPOCH_DAY, EPOCH.plus(value(), ChronoUnit.MICROS).toLocalDate()));
case NANOS:
return (Literal<T>)
new DateLiteral(
(int)
ChronoUnit.DAYS.between(
EPOCH_DAY, EPOCH.plusNanos(value()).toLocalDate()));
}
}
return null;
}
Expand All @@ -451,6 +485,10 @@ public <T> Literal<T> to(Type type) {
protected Type.TypeID typeId() {
return Type.TypeID.TIMESTAMP;
}

protected ChronoUnit unit() {
return unit;
}
}

static class DecimalLiteral extends ComparableLiteral<BigDecimal> {
Expand Down Expand Up @@ -501,18 +539,22 @@ public <T> Literal<T> to(Type type) {
return (Literal<T>) new TimeLiteral(timeMicros);

case TIMESTAMP:
if (((Types.TimestampType) type).shouldAdjustToUTC()) {
long timestampMicros =
ChronoUnit.MICROS.between(
EPOCH, OffsetDateTime.parse(value(), DateTimeFormatter.ISO_DATE_TIME));
return (Literal<T>) new TimestampLiteral(timestampMicros);
TimestampType tsType = (TimestampType) type;
if (tsType.shouldAdjustToUTC()) {
long timestampUnits =
tsType
.unit()
.between(EPOCH, OffsetDateTime.parse(value(), DateTimeFormatter.ISO_DATE_TIME));
return (Literal<T>) new TimestampLiteral(tsType.unit(), timestampUnits);
} else {
long timestampMicros =
ChronoUnit.MICROS.between(
EPOCH,
LocalDateTime.parse(value(), DateTimeFormatter.ISO_LOCAL_DATE_TIME)
.atOffset(ZoneOffset.UTC));
return (Literal<T>) new TimestampLiteral(timestampMicros);
long timestampUnits =
tsType
.unit()
.between(
EPOCH,
LocalDateTime.parse(value(), DateTimeFormatter.ISO_LOCAL_DATE_TIME)
.atOffset(ZoneOffset.UTC));
return (Literal<T>) new TimestampLiteral(tsType.unit(), timestampUnits);
}

case STRING:
Expand Down
Loading

0 comments on commit f3dad15

Please sign in to comment.