From 0e90f5a0006b53095ac86ee55e6f6fa396a6d06a Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud <65791736+ahmedabu98@users.noreply.github.com> Date: Thu, 10 Oct 2024 17:19:16 +0300 Subject: [PATCH] [Managed Iceberg] Add support for TIMESTAMP, TIME, and DATE types (#32688) * support timestamp, time, date types * add to changes md * always write java time LocalDateTime for iceberg TIMESTAMP * update java doc * add timezone support with Strings * clean up; reading iceberg timestamptz will return sqltype.datetime * support string, long, sql.datetime, and datetime; timestamp returns sql.datetime and timestamptTZ returns datetime --- .../IO_Iceberg_Integration_Tests.json | 2 +- CHANGES.md | 1 + .../io/iceberg/hive/IcebergHiveCatalogIT.java | 12 ++ .../apache/beam/sdk/io/iceberg/IcebergIO.java | 34 +++- .../beam/sdk/io/iceberg/IcebergUtils.java | 166 ++++++++++++++++-- .../beam/sdk/io/iceberg/IcebergIOIT.java | 15 +- .../beam/sdk/io/iceberg/IcebergUtilsTest.java | 166 ++++++++++++++++-- 7 files changed, 355 insertions(+), 41 deletions(-) diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index 3f63c0c9975f..bbdc3a3910ef 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 2 + "modification": 3 } diff --git a/CHANGES.md b/CHANGES.md index 774abefcb066..6a70a49b2ab1 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -64,6 +64,7 @@ * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). * [Managed Iceberg] Support creating tables if needed ([#32686](https://github.com/apache/beam/pull/32686)) +* [Managed Iceberg] Add support for TIMESTAMP, TIME, and DATE types ([#32688](https://github.com/apache/beam/pull/32688)) ## New Features / Improvements diff --git a/sdks/java/io/iceberg/hive/src/test/java/org/apache/beam/sdk/io/iceberg/hive/IcebergHiveCatalogIT.java b/sdks/java/io/iceberg/hive/src/test/java/org/apache/beam/sdk/io/iceberg/hive/IcebergHiveCatalogIT.java index 54a4998d37fb..ca4d862c2c72 100644 --- a/sdks/java/io/iceberg/hive/src/test/java/org/apache/beam/sdk/io/iceberg/hive/IcebergHiveCatalogIT.java +++ b/sdks/java/io/iceberg/hive/src/test/java/org/apache/beam/sdk/io/iceberg/hive/IcebergHiveCatalogIT.java @@ -32,6 +32,7 @@ import org.apache.beam.sdk.io.iceberg.hive.testutils.HiveMetastoreExtension; import org.apache.beam.sdk.managed.Managed; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; @@ -64,7 +65,10 @@ import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.util.DateTimeUtil; import org.apache.thrift.TException; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Rule; @@ -100,6 +104,10 @@ public class IcebergHiveCatalogIT { .addArrayField("arr_long", Schema.FieldType.INT64) .addRowField("row", NESTED_ROW_SCHEMA) .addNullableRowField("nullable_row", NESTED_ROW_SCHEMA) + .addDateTimeField("datetime_tz") + .addLogicalTypeField("datetime", SqlTypes.DATETIME) + .addLogicalTypeField("date", SqlTypes.DATE) + .addLogicalTypeField("time", SqlTypes.TIME) .build(); private static final SimpleFunction ROW_FUNC = @@ -127,6 +135,10 @@ public Row apply(Long num) { .addValue(LongStream.range(1, num % 10).boxed().collect(Collectors.toList())) .addValue(nestedRow) .addValue(num % 2 == 0 ? null : nestedRow) + .addValue(new DateTime(num).withZone(DateTimeZone.forOffsetHoursMinutes(3, 25))) + .addValue(DateTimeUtil.timestampFromMicros(num)) + .addValue(DateTimeUtil.dateFromDays(Integer.parseInt(strNum))) + .addValue(DateTimeUtil.timeFromMicros(num)) .build(); } }; diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java index d12f8914a338..fa4ff9714c7f 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java @@ -150,7 +150,16 @@ * DOUBLE DOUBLE * * - * DATETIME STRING + * SqlTypes.DATETIME TIMESTAMP + * + * + * DATETIME TIMESTAMPTZ + * + * + * SqlTypes.DATE DATE + * + * + * SqlTypes.TIME TIME * * * ITERABLE LIST @@ -166,6 +175,29 @@ * * * + *

Note: {@code SqlTypes} are Beam logical types. + * + *

Note on timestamps

+ * + *

For an existing table, the following Beam types are supported for both {@code timestamp} and + * {@code timestamptz}: + * + *

+ * + *

Note: If you expect Beam to create the Iceberg table at runtime, please provide {@code + * SqlTypes.DATETIME} for a {@code timestamp} column and {@code DATETIME} for a {@code timestamptz} + * column. If the table does not exist, Beam will treat {@code STRING} and {@code INT64} at + * face-value and create equivalent column types. + * + *

For Iceberg reads, the connector will produce Beam {@code SqlTypes.DATETIME} types for + * Iceberg's {@code timestamp} and {@code DATETIME} types for {@code timestamptz}. + * *

Dynamic Destinations

* *

Managed Iceberg supports writing to dynamic destinations. To do so, please provide an diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java index acd9b25a6a5e..ef19a5881366 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java @@ -20,12 +20,18 @@ import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; import java.nio.ByteBuffer; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.UUID; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.util.Preconditions; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; @@ -34,15 +40,13 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.DateTimeUtil; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.DateTime; -import org.joda.time.DateTimeZone; +import org.joda.time.Instant; -/** Utilities for converting between Beam and Iceberg types. */ +/** Utilities for converting between Beam and Iceberg types, made public for user's convenience. */ public class IcebergUtils { - // This is made public for users convenience, as many may have more experience working with - // Iceberg types. - private IcebergUtils() {} private static final Map BEAM_TYPES_TO_ICEBERG_TYPES = @@ -54,6 +58,14 @@ private IcebergUtils() {} .put(Schema.TypeName.DOUBLE, Types.DoubleType.get()) .put(Schema.TypeName.STRING, Types.StringType.get()) .put(Schema.TypeName.BYTES, Types.BinaryType.get()) + .put(Schema.TypeName.DATETIME, Types.TimestampType.withZone()) + .build(); + + private static final Map BEAM_LOGICAL_TYPES_TO_ICEBERG_TYPES = + ImmutableMap.builder() + .put(SqlTypes.DATE.getIdentifier(), Types.DateType.get()) + .put(SqlTypes.TIME.getIdentifier(), Types.TimeType.get()) + .put(SqlTypes.DATETIME.getIdentifier(), Types.TimestampType.withoutZone()) .build(); private static Schema.FieldType icebergTypeToBeamFieldType(final Type type) { @@ -69,9 +81,15 @@ private static Schema.FieldType icebergTypeToBeamFieldType(final Type type) { case DOUBLE: return Schema.FieldType.DOUBLE; case DATE: + return Schema.FieldType.logicalType(SqlTypes.DATE); case TIME: - case TIMESTAMP: // TODO: Logical types? - return Schema.FieldType.DATETIME; + return Schema.FieldType.logicalType(SqlTypes.TIME); + case TIMESTAMP: + Types.TimestampType ts = (Types.TimestampType) type.asPrimitiveType(); + if (ts.shouldAdjustToUTC()) { + return Schema.FieldType.DATETIME; + } + return Schema.FieldType.logicalType(SqlTypes.DATETIME); case STRING: return Schema.FieldType.STRING; case UUID: @@ -151,6 +169,14 @@ static TypeAndMaxId beamFieldTypeToIcebergFieldType( // other types. return new TypeAndMaxId( --nestedFieldId, BEAM_TYPES_TO_ICEBERG_TYPES.get(beamType.getTypeName())); + } else if (beamType.getTypeName().isLogicalType()) { + String logicalTypeIdentifier = + checkArgumentNotNull(beamType.getLogicalType()).getIdentifier(); + @Nullable Type type = BEAM_LOGICAL_TYPES_TO_ICEBERG_TYPES.get(logicalTypeIdentifier); + if (type == null) { + throw new RuntimeException("Unsupported Beam logical type " + logicalTypeIdentifier); + } + return new TypeAndMaxId(--nestedFieldId, type); } else if (beamType.getTypeName().isCollectionType()) { // ARRAY or ITERABLE Schema.FieldType beamCollectionType = Preconditions.checkArgumentNotNull(beamType.getCollectionElementType()); @@ -227,8 +253,6 @@ static TypeAndMaxId beamFieldTypeToIcebergFieldType( * *

The following unsupported Beam types will be defaulted to {@link Types.StringType}: *

  • {@link Schema.TypeName.DECIMAL} - *
  • {@link Schema.TypeName.DATETIME} - *
  • {@link Schema.TypeName.LOGICAL_TYPE} */ public static org.apache.iceberg.Schema beamSchemaToIcebergSchema(final Schema schema) { List fields = new ArrayList<>(schema.getFieldCount()); @@ -282,12 +306,20 @@ private static void copyFieldIntoRecord(Record rec, Types.NestedField field, Row Optional.ofNullable(value.getDouble(name)).ifPresent(v -> rec.setField(name, v)); break; case DATE: - throw new UnsupportedOperationException("Date fields not yet supported"); + Optional.ofNullable(value.getLogicalTypeValue(name, LocalDate.class)) + .ifPresent(v -> rec.setField(name, v)); + break; case TIME: - throw new UnsupportedOperationException("Time fields not yet supported"); + Optional.ofNullable(value.getLogicalTypeValue(name, LocalTime.class)) + .ifPresent(v -> rec.setField(name, v)); + break; case TIMESTAMP: - Optional.ofNullable(value.getDateTime(name)) - .ifPresent(v -> rec.setField(name, v.getMillis())); + Object val = value.getValue(name); + if (val == null) { + break; + } + Types.TimestampType ts = (Types.TimestampType) field.type().asPrimitiveType(); + rec.setField(name, getIcebergTimestampValue(val, ts.shouldAdjustToUTC())); break; case STRING: Optional.ofNullable(value.getString(name)).ifPresent(v -> rec.setField(name, v)); @@ -322,6 +354,55 @@ private static void copyFieldIntoRecord(Record rec, Types.NestedField field, Row } } + /** + * Returns the appropriate value for an Iceberg timestamp field + * + *

    If `timestamp`, we resolve incoming values to a {@link LocalDateTime}. + * + *

    If `timestamptz`, we resolve to a UTC {@link OffsetDateTime}. Iceberg already resolves all + * incoming timestamps to UTC, so there is no harm in doing it from our side. + * + *

    Valid types are: + * + *

      + *
    • {@link SqlTypes.DATETIME} --> {@link LocalDateTime} + *
    • {@link Schema.FieldType.DATETIME} --> {@link Instant} + *
    • {@link Schema.FieldType.INT64} --> {@link Long} + *
    • {@link Schema.FieldType.STRING} --> {@link String} + *
    + */ + private static Object getIcebergTimestampValue(Object beamValue, boolean shouldAdjustToUtc) { + // timestamptz + if (shouldAdjustToUtc) { + if (beamValue instanceof LocalDateTime) { // SqlTypes.DATETIME + return OffsetDateTime.of((LocalDateTime) beamValue, ZoneOffset.UTC); + } else if (beamValue instanceof Instant) { // FieldType.DATETIME + return DateTimeUtil.timestamptzFromMicros(((Instant) beamValue).getMillis() * 1000L); + } else if (beamValue instanceof Long) { // FieldType.INT64 + return DateTimeUtil.timestamptzFromMicros((Long) beamValue); + } else if (beamValue instanceof String) { // FieldType.STRING + return OffsetDateTime.parse((String) beamValue).withOffsetSameInstant(ZoneOffset.UTC); + } else { + throw new UnsupportedOperationException( + "Unsupported Beam type for Iceberg timestamp with timezone: " + beamValue.getClass()); + } + } + + // timestamp + if (beamValue instanceof LocalDateTime) { // SqlType.DATETIME + return beamValue; + } else if (beamValue instanceof Instant) { // FieldType.DATETIME + return DateTimeUtil.timestampFromMicros(((Instant) beamValue).getMillis() * 1000L); + } else if (beamValue instanceof Long) { // FieldType.INT64 + return DateTimeUtil.timestampFromMicros((Long) beamValue); + } else if (beamValue instanceof String) { // FieldType.STRING + return LocalDateTime.parse((String) beamValue); + } else { + throw new UnsupportedOperationException( + "Unsupported Beam type for Iceberg timestamp with timezone: " + beamValue.getClass()); + } + } + /** Converts an Iceberg {@link Record} to a Beam {@link Row}. */ public static Row icebergRecordToBeamRow(Schema schema, Record record) { Row.Builder rowBuilder = Row.withSchema(schema); @@ -345,16 +426,17 @@ public static Row icebergRecordToBeamRow(Schema schema, Record record) { case FLOAT: // Iceberg and Beam both use float case DOUBLE: // Iceberg and Beam both use double case STRING: // Iceberg and Beam both use String - case BOOLEAN: // Iceberg and Beam both use String + case BOOLEAN: // Iceberg and Beam both use boolean case ARRAY: case ITERABLE: case MAP: rowBuilder.addValue(icebergValue); break; case DATETIME: - // Iceberg uses a long for millis; Beam uses joda time DateTime - long millis = (long) icebergValue; - rowBuilder.addValue(new DateTime(millis, DateTimeZone.UTC)); + // Iceberg uses a long for micros. + // Beam DATETIME uses joda's DateTime, which only supports millis, + // so we do lose some precision here + rowBuilder.addValue(getBeamDateTimeValue(icebergValue)); break; case BYTES: // Iceberg uses ByteBuffer; Beam uses byte[] @@ -369,8 +451,8 @@ public static Row icebergRecordToBeamRow(Schema schema, Record record) { rowBuilder.addValue(icebergRecordToBeamRow(nestedSchema, nestedRecord)); break; case LOGICAL_TYPE: - throw new UnsupportedOperationException( - "Cannot convert iceberg field to Beam logical type"); + rowBuilder.addValue(getLogicalTypeValue(icebergValue, field.getType())); + break; default: throw new UnsupportedOperationException( "Unsupported Beam type: " + field.getType().getTypeName()); @@ -378,4 +460,50 @@ public static Row icebergRecordToBeamRow(Schema schema, Record record) { } return rowBuilder.build(); } + + private static DateTime getBeamDateTimeValue(Object icebergValue) { + long micros; + if (icebergValue instanceof OffsetDateTime) { + micros = DateTimeUtil.microsFromTimestamptz((OffsetDateTime) icebergValue); + } else if (icebergValue instanceof LocalDateTime) { + micros = DateTimeUtil.microsFromTimestamp((LocalDateTime) icebergValue); + } else if (icebergValue instanceof Long) { + micros = (long) icebergValue; + } else if (icebergValue instanceof String) { + return DateTime.parse((String) icebergValue); + } else { + throw new UnsupportedOperationException( + "Unsupported Iceberg type for Beam type DATETIME: " + icebergValue.getClass()); + } + return new DateTime(micros / 1000L); + } + + private static Object getLogicalTypeValue(Object icebergValue, Schema.FieldType type) { + if (icebergValue instanceof String) { + String strValue = (String) icebergValue; + if (type.isLogicalType(SqlTypes.DATE.getIdentifier())) { + return LocalDate.parse(strValue); + } else if (type.isLogicalType(SqlTypes.TIME.getIdentifier())) { + return LocalTime.parse(strValue); + } else if (type.isLogicalType(SqlTypes.DATETIME.getIdentifier())) { + return LocalDateTime.parse(strValue); + } + } else if (icebergValue instanceof Long) { + if (type.isLogicalType(SqlTypes.TIME.getIdentifier())) { + return DateTimeUtil.timeFromMicros((Long) icebergValue); + } else if (type.isLogicalType(SqlTypes.DATETIME.getIdentifier())) { + return DateTimeUtil.timestampFromMicros((Long) icebergValue); + } + } else if (icebergValue instanceof Integer + && type.isLogicalType(SqlTypes.DATE.getIdentifier())) { + return DateTimeUtil.dateFromDays((Integer) icebergValue); + } else if (icebergValue instanceof OffsetDateTime + && type.isLogicalType(SqlTypes.DATETIME.getIdentifier())) { + return ((OffsetDateTime) icebergValue) + .withOffsetSameInstant(ZoneOffset.UTC) + .toLocalDateTime(); + } + // LocalDateTime, LocalDate, LocalTime + return icebergValue; + } } diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java index a5c034ac901a..5df8604699a3 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io.iceberg; +import static org.apache.beam.sdk.schemas.Schema.FieldType; import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -36,6 +37,7 @@ import java.util.stream.Stream; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.managed.Managed; +import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; @@ -72,7 +74,10 @@ import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.util.DateTimeUtil; import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Before; @@ -107,9 +112,13 @@ public class IcebergIOIT implements Serializable { .addBooleanField("bool") .addInt32Field("int") .addRowField("row", NESTED_ROW_SCHEMA) - .addArrayField("arr_long", org.apache.beam.sdk.schemas.Schema.FieldType.INT64) + .addArrayField("arr_long", FieldType.INT64) .addNullableRowField("nullable_row", NESTED_ROW_SCHEMA) .addNullableInt64Field("nullable_long") + .addDateTimeField("datetime_tz") + .addLogicalTypeField("datetime", SqlTypes.DATETIME) + .addLogicalTypeField("date", SqlTypes.DATE) + .addLogicalTypeField("time", SqlTypes.TIME) .build(); private static final SimpleFunction ROW_FUNC = @@ -139,6 +148,10 @@ public Row apply(Long num) { .addValue(LongStream.range(0, num % 10).boxed().collect(Collectors.toList())) .addValue(num % 2 == 0 ? null : nestedRow) .addValue(num) + .addValue(new DateTime(num).withZone(DateTimeZone.forOffsetHoursMinutes(3, 25))) + .addValue(DateTimeUtil.timestampFromMicros(num)) + .addValue(DateTimeUtil.dateFromDays(Integer.parseInt(strNum))) + .addValue(DateTimeUtil.timeFromMicros(num)) .build(); } }; diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java index a20d5b7c8f59..134f05c34bfb 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java @@ -28,16 +28,21 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; import java.util.Arrays; import java.util.List; import java.util.Map; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.DateTimeUtil; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.junit.Test; @@ -45,6 +50,7 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +/** Test class for {@link IcebergUtils}. */ @RunWith(Enclosed.class) public class IcebergUtilsTest { @@ -102,21 +108,87 @@ public void testDouble() { } @Test - public void testDate() {} + public void testDate() { + checkRowValueToRecordValue( + Schema.FieldType.logicalType(SqlTypes.DATE), + Types.DateType.get(), + DateTimeUtil.dateFromDays(12345)); + } @Test - public void testTime() {} + public void testTime() { + checkRowValueToRecordValue( + Schema.FieldType.logicalType(SqlTypes.TIME), + Types.TimeType.get(), + DateTimeUtil.timeFromMicros(12345678L)); + } @Test public void testTimestamp() { + // SqlTypes.DATETIME + checkRowValueToRecordValue( + Schema.FieldType.logicalType(SqlTypes.DATETIME), + Types.TimestampType.withoutZone(), + DateTimeUtil.timestampFromMicros(123456789L)); + + // Schema.FieldType.DATETIME DateTime dateTime = new DateTime().withDate(1979, 03, 14).withTime(1, 2, 3, 4).withZone(DateTimeZone.UTC); - checkRowValueToRecordValue( Schema.FieldType.DATETIME, - dateTime.toInstant(), + dateTime, + Types.TimestampType.withoutZone(), + DateTimeUtil.timestampFromMicros(dateTime.getMillis() * 1000L)); + + // Schema.FieldType.INT64 + long micros = 1234567890L; + checkRowValueToRecordValue( + Schema.FieldType.INT64, + micros, Types.TimestampType.withoutZone(), - dateTime.getMillis()); + DateTimeUtil.timestampFromMicros(micros)); + + // Schema.FieldType.STRING + String val = "2024-10-08T13:18:20.053"; + LocalDateTime localDateTime = LocalDateTime.of(2024, 10, 8, 13, 18, 20, 53_000_000); + checkRowValueToRecordValue( + Schema.FieldType.STRING, val, Types.TimestampType.withoutZone(), localDateTime); + } + + @Test + public void testTimestampWithZone() { + String val = "2024-10-08T13:18:20.053+03:27"; + DateTime dateTime = DateTime.parse(val); + OffsetDateTime offsetDateTime = OffsetDateTime.parse(val); + LocalDateTime localDateTime = + offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC).toLocalDateTime(); + // SqlTypes.DATETIME + checkRowValueToRecordValue( + Schema.FieldType.logicalType(SqlTypes.DATETIME), + localDateTime, + Types.TimestampType.withZone(), + offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC)); + + // Schema.FieldType.DATETIME + checkRowValueToRecordValue( + Schema.FieldType.DATETIME, + dateTime, + Types.TimestampType.withZone(), + offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC)); + + // Schema.FieldType.INT64 + checkRowValueToRecordValue( + Schema.FieldType.INT64, + DateTimeUtil.microsFromTimestamptz(offsetDateTime), + Types.TimestampType.withZone(), + offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC)); + + // Schema.FieldType.STRING + checkRowValueToRecordValue( + Schema.FieldType.STRING, + val, + Types.TimestampType.withZone(), + offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC)); } @Test @@ -190,7 +262,7 @@ private void checkRecordValueToRowValue( Row row = IcebergUtils.icebergRecordToBeamRow(beamSchema, record); - assertThat(row.getBaseValue("v"), equalTo(destValue)); + assertThat(row.getValue("v"), equalTo(destValue)); } @Test @@ -224,21 +296,75 @@ public void testDouble() { } @Test - public void testDate() {} + public void testDate() { + checkRecordValueToRowValue( + Types.DateType.get(), + Schema.FieldType.logicalType(SqlTypes.DATE), + DateTimeUtil.dateFromDays(12345)); + } @Test - public void testTime() {} + public void testTime() { + checkRecordValueToRowValue( + Types.TimeType.get(), + Schema.FieldType.logicalType(SqlTypes.TIME), + DateTimeUtil.timeFromMicros(1234567L)); + } @Test public void testTimestamp() { + // SqlTypes.DATETIME + checkRecordValueToRowValue( + Types.TimestampType.withoutZone(), + Schema.FieldType.logicalType(SqlTypes.DATETIME), + DateTimeUtil.timestampFromMicros(123456789L)); + + // Schema.FieldType.DATETIME DateTime dateTime = new DateTime().withDate(1979, 03, 14).withTime(1, 2, 3, 4).withZone(DateTimeZone.UTC); - checkRecordValueToRowValue( Types.TimestampType.withoutZone(), - dateTime.getMillis(), + dateTime.getMillis() * 1000L, + Schema.FieldType.DATETIME, + dateTime); + } + + @Test + public void testTimestampWithZone() { + String timestamp = "2024-10-08T13:18:20.053+03:27"; + OffsetDateTime offsetDateTime = OffsetDateTime.parse(timestamp); + LocalDateTime localDateTime = + offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC).toLocalDateTime(); + // SqlTypes.DATETIME + checkRecordValueToRowValue( + Types.TimestampType.withZone(), + offsetDateTime, + Schema.FieldType.logicalType(SqlTypes.DATETIME), + localDateTime); + checkRecordValueToRowValue( + Types.TimestampType.withZone(), + localDateTime, + Schema.FieldType.logicalType(SqlTypes.DATETIME), + localDateTime); + checkRecordValueToRowValue( + Types.TimestampType.withZone(), + DateTimeUtil.microsFromTimestamptz(offsetDateTime), + Schema.FieldType.logicalType(SqlTypes.DATETIME), + localDateTime); + + // Schema.FieldType.DATETIME + DateTime dateTime = DateTime.parse(timestamp).withZone(DateTimeZone.UTC); + checkRecordValueToRowValue( + Types.TimestampType.withZone(), offsetDateTime, Schema.FieldType.DATETIME, dateTime); + checkRecordValueToRowValue( + Types.TimestampType.withZone(), localDateTime, Schema.FieldType.DATETIME, dateTime); + checkRecordValueToRowValue( + Types.TimestampType.withZone(), + DateTimeUtil.microsFromTimestamptz(offsetDateTime), Schema.FieldType.DATETIME, - dateTime.toInstant()); + dateTime); + checkRecordValueToRowValue( + Types.TimestampType.withZone(), timestamp, Schema.FieldType.DATETIME, dateTime); } @Test @@ -425,7 +551,7 @@ public void testStructBeamFieldTypeToIcebergFieldType() { new BeamFieldTypeTestCase( 1, Schema.FieldType.row(BEAM_SCHEMA_PRIMITIVE), - 7, + 11, Types.StructType.of(ICEBERG_SCHEMA_PRIMITIVE.columns())), new BeamFieldTypeTestCase( 15, @@ -537,6 +663,10 @@ public void testMapBeamFieldTypeToIcebergFieldType() { .addNullableStringField("str") .addNullableBooleanField("bool") .addByteArrayField("bytes") + .addDateTimeField("datetime_tz") + .addLogicalTypeField("datetime", SqlTypes.DATETIME) + .addLogicalTypeField("time", SqlTypes.TIME) + .addLogicalTypeField("date", SqlTypes.DATE) .build(); static final org.apache.iceberg.Schema ICEBERG_SCHEMA_PRIMITIVE = @@ -547,16 +677,17 @@ public void testMapBeamFieldTypeToIcebergFieldType() { required(4, "long", Types.LongType.get()), optional(5, "str", Types.StringType.get()), optional(6, "bool", Types.BooleanType.get()), - required(7, "bytes", Types.BinaryType.get())); + required(7, "bytes", Types.BinaryType.get()), + required(8, "datetime_tz", Types.TimestampType.withZone()), + required(9, "datetime", Types.TimestampType.withoutZone()), + required(10, "time", Types.TimeType.get()), + required(11, "date", Types.DateType.get())); @Test public void testPrimitiveBeamSchemaToIcebergSchema() { org.apache.iceberg.Schema convertedIcebergSchema = IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA_PRIMITIVE); - System.out.println(convertedIcebergSchema); - System.out.println(ICEBERG_SCHEMA_PRIMITIVE); - assertTrue(convertedIcebergSchema.sameSchema(ICEBERG_SCHEMA_PRIMITIVE)); } @@ -591,9 +722,6 @@ public void testArrayBeamSchemaToIcebergSchema() { public void testArrayIcebergSchemaToBeamSchema() { Schema convertedBeamSchema = IcebergUtils.icebergSchemaToBeamSchema(ICEBERG_SCHEMA_LIST); - System.out.println(convertedBeamSchema); - System.out.println(BEAM_SCHEMA_LIST); - assertEquals(BEAM_SCHEMA_LIST, convertedBeamSchema); }