Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API: implement types timestamp_ns and timestamptz_ns #9008

Merged
merged 42 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
0e098f0
API: implement types timestamp_ns and timestamptz_ns
jacobmarble Feb 20, 2024
b283a5a
Redo as separate type
epgif Mar 26, 2024
a114044
Try to hew closer to the original satisfiesOrderOf logic.
epgif Mar 26, 2024
0ebdf9f
Test that Avro produces a value within 1 micro
epgif Mar 26, 2024
bea3487
address most comments
epgif Apr 11, 2024
753aed5
Bucket timestamp and timestamp_ns the same.
epgif Apr 12, 2024
19605d6
Fix bug caught by TestPartitionSpecParser#testTransforms .
epgif Apr 25, 2024
d0b4627
address review comments
epgif Jun 5, 2024
2f71da9
address style improvements
epgif Jun 6, 2024
c3c1288
test DateTimeUtil.convertNanos on negative input
epgif Jun 6, 2024
ec01d68
use Math.toIntExact in Timestamps NANOS conversion
epgif Jun 6, 2024
08e123c
Merge branch 'main' into jgm-timestamp-nanos-api
epgif Jun 18, 2024
eb5e382
address review comments
epgif Jul 4, 2024
421ed86
Adjust tests as requested.
epgif Jul 8, 2024
d7a1326
add nanosecond tests to TestTimestamps
jacobmarble Jul 8, 2024
068b18b
add timestamptz and timestampns_tz to fromPrimitiveString test
jacobmarble Jul 8, 2024
269b3e9
correct TestYears - was testing days
jacobmarble Jul 8, 2024
8462374
test Avro timestamp conversion precisely
jacobmarble Jul 8, 2024
16319d8
Add requested additional tests.
epgif Jul 8, 2024
e38f68a
Remove redundant and erroneous tests.
epgif Jul 8, 2024
654fc61
Merge remote-tracking branch 'jacobmarble/jgm-timestamp-nanos-api' in…
epgif Jul 11, 2024
d34daad
Update TestBucketing as requested.
epgif Jul 11, 2024
42ca8a4
Convert long to TimestampLiteral and then that to TimestampNanoLiteral.
epgif Jul 31, 2024
0cbdeb8
Use DateTimeUtil conversion instead of /.
rdblue Aug 23, 2024
48626e5
Update Literals to use DateTimeUtil, add new tests.
rdblue Aug 23, 2024
743e872
Fix test for DateTimeUtil.isoTimestampToNanos.
rdblue Aug 23, 2024
dedeb19
Fix TestDateTimeUtil and add test cases.
rdblue Aug 23, 2024
c9f4273
Simplify Timestamps transform get.
rdblue Aug 23, 2024
97489ab
Remove ChronoUnit wrapper enum.
rdblue Aug 23, 2024
8c3cc67
Restore Timestamps as enum and simplify boilerplate.
rdblue Aug 25, 2024
6f25c99
Minor fix to bucket transform.
rdblue Aug 25, 2024
f006cb2
Fix style
rdblue Aug 25, 2024
066c955
Fix typos in TestTimestamps.
rdblue Aug 25, 2024
4d77202
Add a comment to clarify conversion test.
rdblue Aug 25, 2024
9a3d16f
Split timestamp and timestamp_ns comparator test cases.
rdblue Aug 25, 2024
274de56
Fix spec update to specify microsecond hashing.
rdblue Aug 25, 2024
20e7085
Merge pull request #1 from rdblue/jgm-timestamp-nanos-api
jacobmarble Aug 26, 2024
0bbd3d6
Run :iceberg-api:spotlessApply
epgif Aug 26, 2024
58f11a3
fix testTimestampWithZoneHumanString
epgif Aug 30, 2024
8ea5777
Prevent creating table metadata with nanosecond timestamps before v3.
rdblue Sep 2, 2024
1321952
Merge pull request #2 from rdblue/jgm-timestamp-nanos-api
Sep 3, 2024
47d4b64
fix merge conflict
jacobmarble Sep 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions api/src/main/java/org/apache/iceberg/Schema.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public class Schema implements Serializable {
private static final Joiner NEWLINE = Joiner.on('\n');
private static final String ALL_COLUMNS = "*";
private static final int DEFAULT_SCHEMA_ID = 0;
private static final Map<Type.TypeID, Integer> MIN_FORMAT_VERSIONS =
ImmutableMap.of(Type.TypeID.TIMESTAMP_NANO, 3);

private final StructType struct;
private final int schemaId;
Expand Down Expand Up @@ -573,4 +575,27 @@ private List<NestedField> reassignIds(List<NestedField> columns, TypeUtil.GetID
});
return res.asStructType().fields();
}

/**
* Check the compatibility of the schema with a format version.
*
* <p>This validates that the schema does not contain types that were released in later format
* versions.
*
* @param schema a Schema
* @param formatVersion table format version
*/
public static void checkCompatibility(Schema schema, int formatVersion) {
// check the type in each field
for (NestedField field : schema.lazyIdToField().values()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. Now that I'm thinking about this more, we may want to accumulate a full set of problems and then show them in one message. That can be done as a follow-up though.

Integer minFormatVersion = MIN_FORMAT_VERSIONS.get(field.type().typeId());
Preconditions.checkState(
minFormatVersion == null || formatVersion >= minFormatVersion,
"Invalid type in v%s schema: %s %s is not supported until v%s",
formatVersion,
schema.findColumnName(field.fieldId()),
field.type(),
minFormatVersion);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class BoundLiteralPredicate<T> extends BoundPredicate<T> {
Type.TypeID.LONG,
Type.TypeID.DATE,
Type.TypeID.TIME,
Type.TypeID.TIMESTAMP_NANO,
Type.TypeID.TIMESTAMP);

private static long toLong(Literal<?> lit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@
import org.apache.iceberg.transforms.Transforms;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.DateTimeUtil;

/** Expression utility methods. */
public class ExpressionUtil {
private static final Function<Object, Integer> HASH_FUNC =
Transforms.bucket(Integer.MAX_VALUE).bind(Types.StringType.get());
private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
private static final long FIVE_MINUTES_IN_MICROS = TimeUnit.MINUTES.toMicros(5);
private static final long FIVE_MINUTES_IN_NANOS = TimeUnit.MINUTES.toNanos(5);
private static final long THREE_DAYS_IN_HOURS = TimeUnit.DAYS.toHours(3);
private static final long NINETY_DAYS_IN_HOURS = TimeUnit.DAYS.toHours(90);
private static final Pattern DATE = Pattern.compile("\\d{4}-\\d{2}-\\d{2}");
Expand All @@ -52,6 +54,12 @@ public class ExpressionUtil {
private static final Pattern TIMESTAMPTZ =
Pattern.compile(
"\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}(:\\d{2}(.\\d{1,9})?)?([-+]\\d{2}:\\d{2}|Z)");
private static final Pattern TIMESTAMPNS =
Pattern.compile("\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}(:\\d{2}(.\\d{7,9})?)?");
private static final Pattern TIMESTAMPTZNS =
Pattern.compile(
"\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}(:\\d{2}(.\\d{7,9})?)?([-+]\\d{2}:\\d{2}|Z)");

static final int LONG_IN_PREDICATE_ABBREVIATION_THRESHOLD = 10;
private static final int LONG_IN_PREDICATE_ABBREVIATION_MIN_GAIN = 5;

Expand Down Expand Up @@ -515,6 +523,8 @@ private static String sanitize(Type type, Object value, long now, int today) {
return "(time)";
case TIMESTAMP:
return sanitizeTimestamp((long) value, now);
case TIMESTAMP_NANO:
return sanitizeTimestamp(DateTimeUtil.nanosToMicros((long) value / 1000), now);
case STRING:
return sanitizeString((CharSequence) value, now, today);
case BOOLEAN:
Expand All @@ -536,6 +546,9 @@ private static String sanitize(Literal<?> literal, long now, int today) {
return sanitizeDate(((Literals.DateLiteral) literal).value(), today);
} else if (literal instanceof Literals.TimestampLiteral) {
return sanitizeTimestamp(((Literals.TimestampLiteral) literal).value(), now);
} else if (literal instanceof Literals.TimestampNanoLiteral) {
return sanitizeTimestamp(
DateTimeUtil.nanosToMicros(((Literals.TimestampNanoLiteral) literal).value()), now);
} else if (literal instanceof Literals.TimeLiteral) {
return "(time)";
} else if (literal instanceof Literals.IntegerLiteral) {
Expand Down Expand Up @@ -594,6 +607,12 @@ private static String sanitizeString(CharSequence value, long now, int today) {
if (DATE.matcher(value).matches()) {
Literal<Integer> date = Literal.of(value).to(Types.DateType.get());
return sanitizeDate(date.value(), today);
} else if (TIMESTAMPNS.matcher(value).matches()) {
Literal<Long> ts = Literal.of(value).to(Types.TimestampNanoType.withoutZone());
return sanitizeTimestamp(DateTimeUtil.nanosToMicros(ts.value()), now);
} else if (TIMESTAMPTZNS.matcher(value).matches()) {
Literal<Long> ts = Literal.of(value).to(Types.TimestampNanoType.withZone());
return sanitizeTimestamp(DateTimeUtil.nanosToMicros(ts.value()), now);
} else if (TIMESTAMP.matcher(value).matches()) {
Literal<Long> ts = Literal.of(value).to(Types.TimestampType.withoutZone());
return sanitizeTimestamp(ts.value(), now);
Expand Down
57 changes: 43 additions & 14 deletions api/src/main/java/org/apache/iceberg/expressions/Literals.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
Expand All @@ -40,6 +39,7 @@
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ByteBuffers;
import org.apache.iceberg.util.DateTimeUtil;
import org.apache.iceberg.util.NaNUtil;

class Literals {
Expand Down Expand Up @@ -299,6 +299,9 @@ public <T> Literal<T> to(Type type) {
return (Literal<T>) new TimeLiteral(value());
case TIMESTAMP:
return (Literal<T>) new TimestampLiteral(value());
case TIMESTAMP_NANO:
// assume micros and convert to nanos to match the behavior in the timestamp case above
return new TimestampLiteral(value()).to(type);
case DATE:
if ((long) Integer.MAX_VALUE < value()) {
return aboveMax();
Expand Down Expand Up @@ -437,11 +440,9 @@ public <T> Literal<T> to(Type type) {
case TIMESTAMP:
return (Literal<T>) this;
case DATE:
return (Literal<T>)
new DateLiteral(
(int)
ChronoUnit.DAYS.between(
EPOCH_DAY, EPOCH.plus(value(), ChronoUnit.MICROS).toLocalDate()));
return (Literal<T>) new DateLiteral(DateTimeUtil.microsToDays(value()));
case TIMESTAMP_NANO:
return (Literal<T>) new TimestampNanoLiteral(DateTimeUtil.microsToNanos(value()));
default:
}
return null;
Expand All @@ -453,6 +454,32 @@ protected Type.TypeID typeId() {
}
}

static class TimestampNanoLiteral extends ComparableLiteral<Long> {
TimestampNanoLiteral(Long value) {
super(value);
}

@Override
@SuppressWarnings("unchecked")
public <T> Literal<T> to(Type type) {
switch (type.typeId()) {
case DATE:
return (Literal<T>) new DateLiteral(DateTimeUtil.nanosToDays(value()));
case TIMESTAMP:
return (Literal<T>) new TimestampLiteral(DateTimeUtil.nanosToMicros(value()));
case TIMESTAMP_NANO:
return (Literal<T>) this;
default:
}
return null;
}

@Override
protected Type.TypeID typeId() {
return Type.TypeID.TIMESTAMP_NANO;
}
}

static class DecimalLiteral extends ComparableLiteral<BigDecimal> {
DecimalLiteral(BigDecimal value) {
super(value);
Expand Down Expand Up @@ -502,19 +529,21 @@ public <T> Literal<T> to(Type type) {

case TIMESTAMP:
if (((Types.TimestampType) type).shouldAdjustToUTC()) {
long timestampMicros =
ChronoUnit.MICROS.between(
EPOCH, OffsetDateTime.parse(value(), DateTimeFormatter.ISO_DATE_TIME));
long timestampMicros = DateTimeUtil.isoTimestamptzToMicros(value().toString());
return (Literal<T>) new TimestampLiteral(timestampMicros);
} else {
long timestampMicros =
ChronoUnit.MICROS.between(
EPOCH,
LocalDateTime.parse(value(), DateTimeFormatter.ISO_LOCAL_DATE_TIME)
.atOffset(ZoneOffset.UTC));
long timestampMicros = DateTimeUtil.isoTimestampToMicros(value().toString());
return (Literal<T>) new TimestampLiteral(timestampMicros);
}

case TIMESTAMP_NANO:
if (((Types.TimestampNanoType) type).shouldAdjustToUTC()) {
return (Literal<T>)
new TimestampNanoLiteral(DateTimeUtil.isoTimestamptzToNanos(value()));
} else {
return (Literal<T>) new TimestampNanoLiteral(DateTimeUtil.isoTimestampToNanos(value()));
}

case STRING:
return (Literal<T>) this;

Expand Down
18 changes: 18 additions & 0 deletions api/src/main/java/org/apache/iceberg/transforms/Bucket.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.BucketUtil;
import org.apache.iceberg.util.DateTimeUtil;
import org.apache.iceberg.util.SerializableFunction;

class Bucket<T> implements Transform<T, Integer>, Serializable {
Expand Down Expand Up @@ -63,6 +64,8 @@ static <T, B extends Bucket<T> & SerializableFunction<T, Integer>> B get(
case FIXED:
case BINARY:
return (B) new BucketByteBuffer(numBuckets);
case TIMESTAMP_NANO:
return (B) new BucketTimestampNano(numBuckets);
case UUID:
return (B) new BucketUUID(numBuckets);
default:
Expand Down Expand Up @@ -107,6 +110,7 @@ public boolean canTransform(Type type) {
case DATE:
case TIME:
case TIMESTAMP:
case TIMESTAMP_NANO:
case STRING:
case BINARY:
case FIXED:
Expand Down Expand Up @@ -214,6 +218,20 @@ protected int hash(Long value) {
}
}

// In order to bucket TimestampNano the same as Timestamp, convert to micros before hashing.
private static class BucketTimestampNano extends Bucket<Long>
implements SerializableFunction<Long, Integer> {

private BucketTimestampNano(int numBuckets) {
super(numBuckets);
}

@Override
protected int hash(Long nanos) {
return BucketUtil.hash(DateTimeUtil.nanosToMicros(nanos));
}
}

private static class BucketString extends Bucket<CharSequence>
implements SerializableFunction<CharSequence, Integer> {

Expand Down
14 changes: 9 additions & 5 deletions api/src/main/java/org/apache/iceberg/transforms/Dates.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ public Type getResultType(Type sourceType) {
return Types.IntegerType.get();
}

ChronoUnit granularity() {
return granularity;
}

@Override
public boolean preservesOrder() {
return true;
Expand All @@ -109,11 +113,11 @@ public boolean satisfiesOrderOf(Transform<?, ?> other) {
}

if (other instanceof Dates) {
// test the granularity, in days. day(ts) => 1 day, months(ts) => 30 days, and day satisfies
// the order of months
Dates otherTransform = (Dates) other;
return granularity.getDuration().toDays()
<= otherTransform.granularity.getDuration().toDays();
return TransformUtil.satisfiesOrderOf(granularity, ((Dates) other).granularity());
} else if (other instanceof Timestamps) {
return TransformUtil.satisfiesOrderOf(granularity, ((Timestamps) other).granularity());
} else if (other instanceof TimeTransform) {
return TransformUtil.satisfiesOrderOf(granularity, ((TimeTransform<?>) other).granularity());
}

return false;
Expand Down
34 changes: 8 additions & 26 deletions api/src/main/java/org/apache/iceberg/transforms/Days.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg.transforms;

import java.io.ObjectStreamException;
import java.time.temporal.ChronoUnit;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;

Expand All @@ -31,38 +32,19 @@ static <T> Days<T> get() {
}

@Override
@SuppressWarnings("unchecked")
protected Transform<T, Integer> toEnum(Type type) {
switch (type.typeId()) {
case DATE:
return (Transform<T, Integer>) Dates.DAY;
case TIMESTAMP:
return (Transform<T, Integer>) Timestamps.DAY;
default:
throw new IllegalArgumentException("Unsupported type: " + type);
}
protected ChronoUnit granularity() {
return ChronoUnit.DAYS;
}

@Override
public Type getResultType(Type sourceType) {
return Types.DateType.get();
protected Transform<T, Integer> toEnum(Type type) {
return (Transform<T, Integer>)
fromSourceType(type, Dates.DAY, Timestamps.MICROS_TO_DAY, Timestamps.NANOS_TO_DAY);
}

@Override
public boolean satisfiesOrderOf(Transform<?, ?> other) {
if (this == other) {
return true;
}

if (other instanceof Timestamps) {
return Timestamps.DAY.satisfiesOrderOf(other);
} else if (other instanceof Dates) {
return Dates.DAY.satisfiesOrderOf(other);
} else if (other instanceof Days || other instanceof Months || other instanceof Years) {
return true;
}

return false;
public Type getResultType(Type sourceType) {
return Types.DateType.get();
}

@Override
Expand Down
33 changes: 9 additions & 24 deletions api/src/main/java/org/apache/iceberg/transforms/Hours.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg.transforms;

import java.io.ObjectStreamException;
import java.time.temporal.ChronoUnit;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;

Expand All @@ -30,44 +31,28 @@ static <T> Hours<T> get() {
return (Hours<T>) INSTANCE;
}

@Override
protected ChronoUnit granularity() {
return ChronoUnit.HOURS;
}

@Override
@SuppressWarnings("unchecked")
protected Transform<T, Integer> toEnum(Type type) {
if (type.typeId() == Type.TypeID.TIMESTAMP) {
return (Transform<T, Integer>) Timestamps.HOUR;
}

throw new IllegalArgumentException("Unsupported type: " + type);
return (Transform<T, Integer>)
fromSourceType(type, null, Timestamps.MICROS_TO_HOUR, Timestamps.NANOS_TO_HOUR);
}

@Override
public boolean canTransform(Type type) {
return type.typeId() == Type.TypeID.TIMESTAMP;
return type.typeId() == Type.TypeID.TIMESTAMP || type.typeId() == Type.TypeID.TIMESTAMP_NANO;
}

@Override
public Type getResultType(Type sourceType) {
return Types.IntegerType.get();
}

@Override
public boolean satisfiesOrderOf(Transform<?, ?> other) {
if (this == other) {
return true;
}

if (other instanceof Timestamps) {
return other == Timestamps.HOUR;
} else if (other instanceof Hours
|| other instanceof Days
|| other instanceof Months
|| other instanceof Years) {
return true;
}

return false;
}

@Override
public String toHumanString(Type alwaysInt, Integer value) {
return value != null ? TransformUtil.humanHour(value) : "null";
Expand Down
Loading