Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API: implement types timestamp_ns and timestamptz_ns #8960

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,14 @@
import org.apache.iceberg.transforms.Transforms;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.DateTimeUtil;

/** Expression utility methods. */
public class ExpressionUtil {
private static final Function<Object, Integer> HASH_FUNC =
Transforms.bucket(Integer.MAX_VALUE).bind(Types.StringType.get());
private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
private static final long FIVE_MINUTES_IN_MICROS = TimeUnit.MINUTES.toMicros(5);
private static final long FIVE_MINUTES_IN_MILLIS = TimeUnit.MINUTES.toMillis(5);
private static final long THREE_DAYS_IN_HOURS = TimeUnit.DAYS.toHours(3);
private static final long NINETY_DAYS_IN_HOURS = TimeUnit.DAYS.toHours(90);
private static final Pattern DATE = Pattern.compile("\\d{4}-\\d{2}-\\d{2}");
Expand Down Expand Up @@ -247,13 +248,12 @@ public static <T> UnboundTerm<T> unbind(Term term) {

private static class ExpressionSanitizer
extends ExpressionVisitors.ExpressionVisitor<Expression> {
private final long now;
private final long nowMillis;
private final int today;

private ExpressionSanitizer() {
long nowMillis = System.currentTimeMillis();
OffsetDateTime nowDateTime = Instant.ofEpochMilli(nowMillis).atOffset(ZoneOffset.UTC);
this.now = nowMillis * 1000;
this.nowMillis = System.currentTimeMillis();
OffsetDateTime nowDateTime = Instant.ofEpochMilli(this.nowMillis).atOffset(ZoneOffset.UTC);
this.today = (int) ChronoUnit.DAYS.between(EPOCH, nowDateTime);
}

Expand Down Expand Up @@ -293,13 +293,13 @@ public <T> Expression predicate(BoundPredicate<T> pred) {
return new UnboundPredicate<>(
pred.op(),
unbind(pred.term()),
(T) sanitize(bound.term().type(), bound.literal(), now, today));
(T) sanitize(bound.term().type(), bound.literal(), nowMillis, today));
} else if (pred.isSetPredicate()) {
BoundSetPredicate<T> bound = (BoundSetPredicate<T>) pred;
Iterable<T> iter =
() ->
bound.literalSet().stream()
.map(lit -> (T) sanitize(bound.term().type(), lit, now, today))
.map(lit -> (T) sanitize(bound.term().type(), lit, nowMillis, today))
.iterator();
return new UnboundPredicate<>(pred.op(), unbind(pred.term()), iter);
}
Expand All @@ -326,11 +326,11 @@ public <T> Expression predicate(UnboundPredicate<T> pred) {
case STARTS_WITH:
case NOT_STARTS_WITH:
return new UnboundPredicate<>(
pred.op(), pred.term(), (T) sanitize(pred.literal(), now, today));
pred.op(), pred.term(), (T) sanitize(pred.literal(), nowMillis, today));
case IN:
case NOT_IN:
Iterable<String> iter =
() -> pred.literals().stream().map(lit -> sanitize(lit, now, today)).iterator();
() -> pred.literals().stream().map(lit -> sanitize(lit, nowMillis, today)).iterator();
return new UnboundPredicate<>(pred.op(), pred.term(), (Iterable<T>) iter);
default:
throw new UnsupportedOperationException(
Expand All @@ -340,13 +340,12 @@ public <T> Expression predicate(UnboundPredicate<T> pred) {
}

private static class StringSanitizer extends ExpressionVisitors.ExpressionVisitor<String> {
private final long nowMicros;
private final long nowMillis;
private final int today;

private StringSanitizer() {
long nowMillis = System.currentTimeMillis();
OffsetDateTime nowDateTime = Instant.ofEpochMilli(nowMillis).atOffset(ZoneOffset.UTC);
this.nowMicros = nowMillis * 1000;
this.nowMillis = System.currentTimeMillis();
OffsetDateTime nowDateTime = Instant.ofEpochMilli(this.nowMillis).atOffset(ZoneOffset.UTC);
this.today = (int) ChronoUnit.DAYS.between(EPOCH, nowDateTime);
}

Expand Down Expand Up @@ -376,7 +375,7 @@ public String or(String leftResult, String rightResult) {
}

private String value(BoundLiteralPredicate<?> pred) {
return sanitize(pred.term().type(), pred.literal().value(), nowMicros, today);
return sanitize(pred.term().type(), pred.literal().value(), nowMillis, today);
}

@Override
Expand Down Expand Up @@ -408,7 +407,7 @@ public <T> String predicate(BoundPredicate<T> pred) {
+ " IN "
+ abbreviateValues(
pred.asSetPredicate().literalSet().stream()
.map(lit -> sanitize(pred.term().type(), lit, nowMicros, today))
.map(lit -> sanitize(pred.term().type(), lit, nowMillis, today))
.collect(Collectors.toList()))
.stream()
.collect(Collectors.joining(", ", "(", ")"));
Expand All @@ -417,7 +416,7 @@ public <T> String predicate(BoundPredicate<T> pred) {
+ " NOT IN "
+ abbreviateValues(
pred.asSetPredicate().literalSet().stream()
.map(lit -> sanitize(pred.term().type(), lit, nowMicros, today))
.map(lit -> sanitize(pred.term().type(), lit, nowMillis, today))
.collect(Collectors.toList()))
.stream()
.collect(Collectors.joining(", ", "(", ")"));
Expand All @@ -444,23 +443,23 @@ public <T> String predicate(UnboundPredicate<T> pred) {
case NOT_NAN:
return "not_nan(" + term + ")";
case LT:
return term + " < " + sanitize(pred.literal(), nowMicros, today);
return term + " < " + sanitize(pred.literal(), nowMillis, today);
case LT_EQ:
return term + " <= " + sanitize(pred.literal(), nowMicros, today);
return term + " <= " + sanitize(pred.literal(), nowMillis, today);
case GT:
return term + " > " + sanitize(pred.literal(), nowMicros, today);
return term + " > " + sanitize(pred.literal(), nowMillis, today);
case GT_EQ:
return term + " >= " + sanitize(pred.literal(), nowMicros, today);
return term + " >= " + sanitize(pred.literal(), nowMillis, today);
case EQ:
return term + " = " + sanitize(pred.literal(), nowMicros, today);
return term + " = " + sanitize(pred.literal(), nowMillis, today);
case NOT_EQ:
return term + " != " + sanitize(pred.literal(), nowMicros, today);
return term + " != " + sanitize(pred.literal(), nowMillis, today);
case IN:
return term
+ " IN "
+ abbreviateValues(
pred.literals().stream()
.map(lit -> sanitize(lit, nowMicros, today))
.map(lit -> sanitize(lit, nowMillis, today))
.collect(Collectors.toList()))
.stream()
.collect(Collectors.joining(", ", "(", ")"));
Expand All @@ -469,14 +468,14 @@ public <T> String predicate(UnboundPredicate<T> pred) {
+ " NOT IN "
+ abbreviateValues(
pred.literals().stream()
.map(lit -> sanitize(lit, nowMicros, today))
.map(lit -> sanitize(lit, nowMillis, today))
.collect(Collectors.toList()))
.stream()
.collect(Collectors.joining(", ", "(", ")"));
case STARTS_WITH:
return term + " STARTS WITH " + sanitize(pred.literal(), nowMicros, today);
return term + " STARTS WITH " + sanitize(pred.literal(), nowMillis, today);
case NOT_STARTS_WITH:
return term + " NOT STARTS WITH " + sanitize(pred.literal(), nowMicros, today);
return term + " NOT STARTS WITH " + sanitize(pred.literal(), nowMillis, today);
default:
throw new UnsupportedOperationException(
"Cannot sanitize unsupported predicate type: " + pred.op());
Expand All @@ -501,7 +500,7 @@ private static <T> List<String> abbreviateValues(List<String> sanitizedValues) {
return sanitizedValues;
}

private static String sanitize(Type type, Object value, long now, int today) {
private static String sanitize(Type type, Object value, long nowMillis, int today) {
switch (type.typeId()) {
case INTEGER:
case LONG:
Expand All @@ -514,9 +513,9 @@ private static String sanitize(Type type, Object value, long now, int today) {
case TIME:
return "(time)";
case TIMESTAMP:
return sanitizeTimestamp((long) value, now);
return sanitizeTimestamp(((Types.TimestampType) type).unit(), (long) value, nowMillis);
case STRING:
return sanitizeString((CharSequence) value, now, today);
return sanitizeString((CharSequence) value, nowMillis, today);
case BOOLEAN:
case UUID:
case DECIMAL:
Expand All @@ -529,13 +528,16 @@ private static String sanitize(Type type, Object value, long now, int today) {
String.format("Cannot sanitize value for unsupported type %s: %s", type, value));
}

private static String sanitize(Literal<?> literal, long now, int today) {
private static String sanitize(Literal<?> literal, long nowMillis, int today) {
if (literal instanceof Literals.StringLiteral) {
return sanitizeString(((Literals.StringLiteral) literal).value(), now, today);
return sanitizeString(((Literals.StringLiteral) literal).value(), nowMillis, today);
} else if (literal instanceof Literals.DateLiteral) {
return sanitizeDate(((Literals.DateLiteral) literal).value(), today);
} else if (literal instanceof Literals.TimestampLiteral) {
return sanitizeTimestamp(((Literals.TimestampLiteral) literal).value(), now);
return sanitizeTimestamp(
((Literals.TimestampLiteral) literal).unit(),
((Literals.TimestampLiteral) literal).value(),
nowMillis);
} else if (literal instanceof Literals.TimeLiteral) {
return "(time)";
} else if (literal instanceof Literals.IntegerLiteral) {
Expand Down Expand Up @@ -564,14 +566,26 @@ private static String sanitizeDate(int days, int today) {
return "(date)";
}

private static String sanitizeTimestamp(long micros, long now) {
String isPast = now > micros ? "ago" : "from-now";
long diff = Math.abs(now - micros);
if (diff < FIVE_MINUTES_IN_MICROS) {
private static String sanitizeTimestamp(ChronoUnit unit, long timeUnits, long nowMillis) {
long timeMillis;
switch (unit) {
case MICROS:
timeMillis = DateTimeUtil.microsToMillis(timeUnits);
break;
case NANOS:
timeMillis = DateTimeUtil.nanosToMillis(timeUnits);
break;
default:
throw new UnsupportedOperationException("Unsupported timestamp unit: " + unit);
}

long diff = Math.abs(nowMillis - timeMillis);
if (diff < FIVE_MINUTES_IN_MILLIS) {
return "(timestamp-about-now)";
}

long hours = TimeUnit.MICROSECONDS.toHours(diff);
String isPast = nowMillis > timeMillis ? "ago" : "from-now";
long hours = DateTimeUtil.millisToHours(diff);
if (hours <= THREE_DAYS_IN_HOURS) {
return "(timestamp-" + hours + "-hours-" + isPast + ")";
} else if (hours < NINETY_DAYS_IN_HOURS) {
Expand All @@ -589,17 +603,17 @@ private static String sanitizeNumber(Number value, String type) {
return "(" + numDigits + "-digit-" + type + ")";
}

private static String sanitizeString(CharSequence value, long now, int today) {
private static String sanitizeString(CharSequence value, long nowMillis, int today) {
try {
if (DATE.matcher(value).matches()) {
Literal<Integer> date = Literal.of(value).to(Types.DateType.get());
return sanitizeDate(date.value(), today);
} else if (TIMESTAMP.matcher(value).matches()) {
Literal<Long> ts = Literal.of(value).to(Types.TimestampType.withoutZone());
return sanitizeTimestamp(ts.value(), now);
Literal<Long> ts = Literal.of(value).to(Types.TimestampType.nanosWithoutZone());
return sanitizeTimestamp(ChronoUnit.NANOS, ts.value(), nowMillis);
} else if (TIMESTAMPTZ.matcher(value).matches()) {
Literal<Long> ts = Literal.of(value).to(Types.TimestampType.withZone());
return sanitizeTimestamp(ts.value(), now);
Literal<Long> ts = Literal.of(value).to(Types.TimestampType.nanosWithZone());
return sanitizeTimestamp(ChronoUnit.NANOS, ts.value(), nowMillis);
} else if (TIME.matcher(value).matches()) {
return "(time)";
} else {
Expand Down
62 changes: 48 additions & 14 deletions api/src/main/java/org/apache/iceberg/expressions/Literals.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.TimestampType;
import org.apache.iceberg.util.ByteBuffers;
import org.apache.iceberg.util.DateTimeUtil;
import org.apache.iceberg.util.NaNUtil;

class Literals {
Expand Down Expand Up @@ -298,7 +300,7 @@ public <T> Literal<T> to(Type type) {
case TIME:
return (Literal<T>) new TimeLiteral(value());
case TIMESTAMP:
return (Literal<T>) new TimestampLiteral(value());
return (Literal<T>) new TimestampLiteral(((TimestampType) type).unit(), value());
case DATE:
if ((long) Integer.MAX_VALUE < value()) {
return aboveMax();
Expand Down Expand Up @@ -426,16 +428,40 @@ protected Type.TypeID typeId() {
}

static class TimestampLiteral extends ComparableLiteral<Long> {
TimestampLiteral(Long value) {
private final ChronoUnit unit;

TimestampLiteral(ChronoUnit unit, Long value) {
super(value);
this.unit = unit;
}

@Override
@SuppressWarnings("unchecked")
public <T> Literal<T> to(Type type) {
switch (type.typeId()) {
case TIMESTAMP:
return (Literal<T>) this;
ChronoUnit toUnit = ((TimestampType) type).unit();
switch (unit) {
case MICROS:
switch (toUnit) {
case MICROS:
return (Literal<T>) this;
case NANOS:
return (Literal<T>)
new TimestampLiteral(unit, DateTimeUtil.microsToNanos(value()));
}
break;
case NANOS:
switch (toUnit) {
case MICROS:
return (Literal<T>)
new TimestampLiteral(unit, DateTimeUtil.nanosToMicros(value()));
case NANOS:
return (Literal<T>) this;
}
break;
}
break;
case DATE:
return (Literal<T>)
new DateLiteral(
Expand All @@ -451,6 +477,10 @@ public <T> Literal<T> to(Type type) {
protected Type.TypeID typeId() {
return Type.TypeID.TIMESTAMP;
}

protected ChronoUnit unit() {
return unit;
}
}

static class DecimalLiteral extends ComparableLiteral<BigDecimal> {
Expand Down Expand Up @@ -501,18 +531,22 @@ public <T> Literal<T> to(Type type) {
return (Literal<T>) new TimeLiteral(timeMicros);

case TIMESTAMP:
if (((Types.TimestampType) type).shouldAdjustToUTC()) {
long timestampMicros =
ChronoUnit.MICROS.between(
EPOCH, OffsetDateTime.parse(value(), DateTimeFormatter.ISO_DATE_TIME));
return (Literal<T>) new TimestampLiteral(timestampMicros);
TimestampType tsType = (TimestampType) type;
if (tsType.shouldAdjustToUTC()) {
long timestampUnits =
tsType
.unit()
.between(EPOCH, OffsetDateTime.parse(value(), DateTimeFormatter.ISO_DATE_TIME));
return (Literal<T>) new TimestampLiteral(tsType.unit(), timestampUnits);
} else {
long timestampMicros =
ChronoUnit.MICROS.between(
EPOCH,
LocalDateTime.parse(value(), DateTimeFormatter.ISO_LOCAL_DATE_TIME)
.atOffset(ZoneOffset.UTC));
return (Literal<T>) new TimestampLiteral(timestampMicros);
long timestampUnits =
tsType
.unit()
.between(
EPOCH,
LocalDateTime.parse(value(), DateTimeFormatter.ISO_LOCAL_DATE_TIME)
.atOffset(ZoneOffset.UTC));
return (Literal<T>) new TimestampLiteral(tsType.unit(), timestampUnits);
}

case STRING:
Expand Down
13 changes: 7 additions & 6 deletions api/src/main/java/org/apache/iceberg/transforms/Days.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg.transforms;

import java.io.ObjectStreamException;
import java.time.temporal.ChronoUnit;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;

Expand All @@ -37,7 +38,7 @@ protected Transform<T, Integer> toEnum(Type type) {
case DATE:
return (Transform<T, Integer>) Dates.DAY;
case TIMESTAMP:
return (Transform<T, Integer>) Timestamps.DAY;
return (Transform<T, Integer>) Timestamps.get((Types.TimestampType) type, ChronoUnit.DAYS);
default:
throw new IllegalArgumentException("Unsupported type: " + type);
}
Expand All @@ -55,14 +56,14 @@ public boolean satisfiesOrderOf(Transform<?, ?> other) {
}

if (other instanceof Timestamps) {
return Timestamps.DAY.satisfiesOrderOf(other);
return ((Timestamps) other).getResultTypeUnit() == ChronoUnit.DAYS
|| ((Timestamps) other).getResultTypeUnit() == ChronoUnit.MONTHS
|| ((Timestamps) other).getResultTypeUnit() == ChronoUnit.YEARS;
} else if (other instanceof Dates) {
return Dates.DAY.satisfiesOrderOf(other);
} else if (other instanceof Days || other instanceof Months || other instanceof Years) {
return true;
} else {
return other instanceof Days || other instanceof Months || other instanceof Years;
}

return false;
}

@Override
Expand Down
Loading