Skip to content

Commit

Permalink
Fix Iceberg table partitioned by columns of TimestampType
Browse files Browse the repository at this point in the history
Co-authored-by: Kiersten Stokes <[email protected]>
  • Loading branch information
hantangwangd and kiersten-stokes committed Oct 20, 2023
1 parent 8957e66 commit 5e30a43
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ public Object getObjectValue(SqlFunctionProperties properties, Block block, int
}
}

public TimeUnit getPrecision()
{
return this.precision;
}

@Override
@SuppressWarnings("EqualsWhichDoesntCheckParameterClass")
public boolean equals(Object other)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.facebook.presto.common.type.IntegerType;
import com.facebook.presto.common.type.RealType;
import com.facebook.presto.common.type.SmallintType;
import com.facebook.presto.common.type.TimestampType;
import com.facebook.presto.common.type.TinyintType;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.VarbinaryType;
Expand Down Expand Up @@ -70,6 +71,7 @@
import static java.util.Objects.requireNonNull;
import static java.util.UUID.randomUUID;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

public class IcebergPageSink
implements ConnectorPageSink
Expand Down Expand Up @@ -334,7 +336,7 @@ private static Object applyTransform(Transform<?, ?> transform, org.apache.icebe
return ((Transform<Object, Object>) transform).bind(icebergType).apply(value);
}

public static Object getIcebergValue(Block block, int position, Type type)
private static Object getIcebergValue(Block block, int position, Type type)
{
if (block.isNull(position)) {
return null;
Expand Down Expand Up @@ -363,6 +365,10 @@ public static Object getIcebergValue(Block block, int position, Type type)
if (type instanceof VarcharType) {
return type.getSlice(block, position).toStringUtf8();
}
if (type instanceof TimestampType) {
long timestamp = type.getLong(block, position);
return ((TimestampType) type).getPrecision() == MILLISECONDS ? MILLISECONDS.toMicros(timestamp) : timestamp;
}
throw new UnsupportedOperationException("Type not supported as partition column: " + type.getDisplayName());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.facebook.presto.common.type.DecimalType;
import com.facebook.presto.common.type.Decimals;
import com.facebook.presto.common.type.TimeZoneKey;
import com.facebook.presto.common.type.TimestampType;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.VarbinaryType;
import com.facebook.presto.common.type.VarcharType;
Expand Down Expand Up @@ -56,6 +57,8 @@
import static java.lang.Long.parseLong;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

public class IcebergPageSource
implements ConnectorPageSource
Expand Down Expand Up @@ -84,7 +87,7 @@ public IcebergPageSource(
HivePartitionKey icebergPartition = partitionKeys.get(column.getId());
Type type = column.getType();
Object prefilledValue = deserializePartitionValue(type, icebergPartition.getValue().orElse(null), column.getName(), timeZoneKey);
prefilledBlocks[outputIndex] = Utils.nativeValueToBlock(type, prefilledValue);
prefilledBlocks[outputIndex] = nativeValueToBlock(type, prefilledValue);
delegateIndexes[outputIndex] = -1;
}
else {
Expand Down Expand Up @@ -242,4 +245,12 @@ private static Object deserializePartitionValue(Type type, String valueString, S
// Iceberg tables don't partition by non-primitive-type columns.
throw new PrestoException(GENERIC_INTERNAL_ERROR, "Invalid partition type " + type.toString());
}

private Block nativeValueToBlock(Type type, Object prefilledValue)
{
if (prefilledValue != null && type instanceof TimestampType && ((TimestampType) type).getPrecision() == MILLISECONDS) {
return Utils.nativeValueToBlock(type, MICROSECONDS.toMillis((long) prefilledValue));
}
return Utils.nativeValueToBlock(type, prefilledValue);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.facebook.presto.common.block.BlockBuilder;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.type.RowType;
import com.facebook.presto.common.type.TimestampType;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.common.type.TypeUtils;
import com.facebook.presto.spi.ColumnMetadata;
Expand Down Expand Up @@ -56,6 +57,8 @@
import static com.facebook.presto.iceberg.TypeConverter.toPrestoType;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.stream.Collectors.toSet;

public class PartitionTable
Expand Down Expand Up @@ -272,7 +275,7 @@ private Map<Integer, Object> toMap(Map<Integer, ByteBuffer> idToMetricMap)
return Partition.toMap(idToTypeMapping, idToMetricMap);
}

public static Object convert(Object value, Type type)
private Object convert(Object value, Type type)
{
if (value == null) {
return null;
Expand All @@ -287,6 +290,12 @@ public static Object convert(Object value, Type type)
if (type instanceof Types.FloatType) {
return Float.floatToIntBits((Float) value);
}
if (type instanceof Types.TimestampType) {
com.facebook.presto.common.type.Type prestoType = toPrestoType(type, typeManager);
if (prestoType instanceof TimestampType && ((TimestampType) prestoType).getPrecision() == MILLISECONDS) {
return MICROSECONDS.toMillis((long) value);
}
}
return value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.intellij.lang.annotations.Language;
import org.testng.annotations.Test;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -194,6 +196,43 @@ public void testShowColumns()
assertEquals(actual, expectedParametrizedVarchar);
}

@Test
public void testPartitionedByTimestampType()
{
// create iceberg table partitioned by column of TimestampType, and insert some data
assertQuerySucceeds("create table test_partition_columns(a bigint, b timestamp) with (partitioning = ARRAY['b'])");
assertQuerySucceeds("insert into test_partition_columns values(1, timestamp '1984-12-08 00:10:00'), (2, timestamp '2001-01-08 12:01:01')");

// validate return data of TimestampType
List<Object> timestampColumnDatas = getQueryRunner().execute("select b from test_partition_columns order by a asc").getOnlyColumn().collect(Collectors.toList());
assertEquals(timestampColumnDatas.size(), 2);
assertEquals(timestampColumnDatas.get(0), LocalDateTime.parse("1984-12-08 00:10:00", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
assertEquals(timestampColumnDatas.get(1), LocalDateTime.parse("2001-01-08 12:01:01", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));

// validate column of TimestampType exists in query filter
assertEquals(getQueryRunner().execute("select b from test_partition_columns where b = timestamp '1984-12-08 00:10:00'").getOnlyValue(),
LocalDateTime.parse("1984-12-08 00:10:00", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
assertEquals(getQueryRunner().execute("select b from test_partition_columns where b = timestamp '2001-01-08 12:01:01'").getOnlyValue(),
LocalDateTime.parse("2001-01-08 12:01:01", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));

// validate column of TimestampType in system table "partitions"
assertEquals(getQueryRunner().execute("select count(*) FROM \"test_partition_columns$partitions\"").getOnlyValue(), 2L);
assertEquals(getQueryRunner().execute("select row_count from \"test_partition_columns$partitions\" where b = timestamp '1984-12-08 00:10:00'").getOnlyValue(), 1L);
assertEquals(getQueryRunner().execute("select row_count from \"test_partition_columns$partitions\" where b = timestamp '2001-01-08 12:01:01'").getOnlyValue(), 1L);

// validate column of TimestampType exists in delete filter
assertUpdate("delete from test_partition_columns WHERE b = timestamp '2001-01-08 12:01:01'", 1);
timestampColumnDatas = getQueryRunner().execute("select b from test_partition_columns order by a asc").getOnlyColumn().collect(Collectors.toList());
assertEquals(timestampColumnDatas.size(), 1);
assertEquals(timestampColumnDatas.get(0), LocalDateTime.parse("1984-12-08 00:10:00", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
assertEquals(getQueryRunner().execute("select b FROM test_partition_columns where b = timestamp '1984-12-08 00:10:00'").getOnlyValue(),
LocalDateTime.parse("1984-12-08 00:10:00", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
assertEquals(getQueryRunner().execute("select count(*) from \"test_partition_columns$partitions\"").getOnlyValue(), 1L);
assertEquals(getQueryRunner().execute("select row_count from \"test_partition_columns$partitions\" where b = timestamp '1984-12-08 00:10:00'").getOnlyValue(), 1L);

assertQuerySucceeds("drop table test_partition_columns");
}

@Override
public void testDescribeOutput()
{
Expand Down

0 comments on commit 5e30a43

Please sign in to comment.