Skip to content

Commit

Permalink
Variable precision timestamp support for Hive write operations
Browse files Browse the repository at this point in the history
  • Loading branch information
aalbu committed Sep 25, 2020
1 parent 741bb2d commit 83da853
Show file tree
Hide file tree
Showing 16 changed files with 479 additions and 141 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,6 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
throw new PrestoException(NOT_SUPPORTED, "Bucketing/Partitioning columns not supported when Avro schema url is set");
}

validateTimestampColumns(tableMetadata.getColumns());
List<HiveColumnHandle> columnHandles = getColumnHandles(tableMetadata, ImmutableSet.copyOf(partitionedBy));
HiveStorageFormat hiveStorageFormat = getHiveStorageFormat(tableMetadata.getProperties());
Map<String, String> tableProperties = getEmptyTableProperties(tableMetadata, bucketProperty, new HdfsContext(session, schemaName, tableName));
Expand Down Expand Up @@ -2322,7 +2321,6 @@ public Optional<ConnectorNewTableLayout> getInsertLayout(ConnectorSession sessio
@Override
public Optional<ConnectorNewTableLayout> getNewTableLayout(ConnectorSession session, ConnectorTableMetadata tableMetadata)
{
validateTimestampColumns(tableMetadata.getColumns());
validatePartitionColumns(tableMetadata);
validateBucketColumns(tableMetadata);
validateColumns(tableMetadata);
Expand Down Expand Up @@ -2377,11 +2375,12 @@ public TableStatisticsMetadata getStatisticsCollectionMetadata(ConnectorSession

private TableStatisticsMetadata getStatisticsCollectionMetadata(List<ColumnMetadata> columns, List<String> partitionedBy, Optional<Set<String>> analyzeColumns, boolean includeRowCount)
{
validateTimestampColumns(columns);
Set<ColumnStatisticMetadata> columnStatistics = columns.stream()
.filter(column -> !partitionedBy.contains(column.getName()))
.filter(column -> !column.isHidden())
.filter(column -> analyzeColumns.isEmpty() || analyzeColumns.get().contains(column.getName()))
// we only support stats collection at default precision for now
.filter(column -> !(column.getType() instanceof TimestampType) || column.getType().equals(TIMESTAMP_MILLIS))
.map(this::getColumnStatisticMetadata)
.flatMap(List::stream)
.collect(toImmutableSet());
Expand Down Expand Up @@ -2595,19 +2594,6 @@ private static void validateColumns(ConnectorTableMetadata tableMetadata)
}
}

// temporary, until variable precision timestamps are supported on write
private static void validateTimestampColumns(List<ColumnMetadata> columns)
{
for (ColumnMetadata column : columns) {
Type type = column.getType();
if (type instanceof TimestampType) {
if (type != TIMESTAMP_MILLIS) {
throw new PrestoException(NOT_SUPPORTED, "CREATE TABLE, INSERT and ANALYZE are not supported with requested timestamp precision: " + type);
}
}
}
}

private static Function<HiveColumnHandle, ColumnMetadata> columnMetadataGetter(Table table)
{
ImmutableList.Builder<String> columnNames = ImmutableList.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import static io.prestosql.plugin.hive.HiveSessionProperties.getCompressionCodec;
import static io.prestosql.plugin.hive.HiveSessionProperties.getInsertExistingPartitionsBehavior;
import static io.prestosql.plugin.hive.HiveSessionProperties.getTemporaryStagingDirectoryPath;
import static io.prestosql.plugin.hive.HiveSessionProperties.getTimestampPrecision;
import static io.prestosql.plugin.hive.HiveSessionProperties.isTemporaryStagingDirectoryEnabled;
import static io.prestosql.plugin.hive.LocationHandle.WriteMode.DIRECT_TO_TARGET_EXISTING_DIRECTORY;
import static io.prestosql.plugin.hive.metastore.MetastoreUtil.getHiveSchema;
Expand Down Expand Up @@ -522,7 +523,7 @@ else if (insertExistingPartitionsBehavior == InsertExistingPartitionsBehavior.ER
}

List<Type> types = dataColumns.stream()
.map(column -> column.getHiveType().getType(typeManager))
.map(column -> column.getHiveType().getType(typeManager, getTimestampPrecision(session).getPrecision()))
.collect(toImmutableList());

Map<String, Integer> columnIndexes = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITE_VALIDATION_FAILED;
import static io.prestosql.plugin.hive.HiveMetadata.PRESTO_QUERY_ID_NAME;
import static io.prestosql.plugin.hive.HiveMetadata.PRESTO_VERSION_NAME;
import static io.prestosql.plugin.hive.HiveSessionProperties.getTimestampPrecision;
import static io.prestosql.plugin.hive.HiveSessionProperties.isRcfileOptimizedWriterValidate;
import static io.prestosql.plugin.hive.rcfile.RcFilePageSourceFactory.createTextVectorEncoding;
import static io.prestosql.plugin.hive.util.HiveUtil.getColumnNames;
Expand Down Expand Up @@ -117,7 +118,7 @@ else if (ColumnarSerDe.class.getName().equals(storageFormat.getSerDe())) {
// an index to rearrange columns in the proper order
List<String> fileColumnNames = getColumnNames(schema);
List<Type> fileColumnTypes = getColumnTypes(schema).stream()
.map(hiveType -> hiveType.getType(typeManager))
.map(hiveType -> hiveType.getType(typeManager, getTimestampPrecision(session).getPrecision()))
.collect(toList());

int[] fileInputColumnIndexes = fileColumnNames.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_CLOSE_ERROR;
import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR;
import static io.prestosql.plugin.hive.HiveSessionProperties.getTimestampPrecision;
import static io.prestosql.plugin.hive.util.HiveUtil.getColumnNames;
import static io.prestosql.plugin.hive.util.HiveUtil.getColumnTypes;
import static io.prestosql.plugin.hive.util.HiveWriteUtils.createRecordWriter;
Expand Down Expand Up @@ -90,7 +91,7 @@ public RecordFileWriter(
// existing tables may have columns in a different order
List<String> fileColumnNames = getColumnNames(schema);
List<Type> fileColumnTypes = getColumnTypes(schema).stream()
.map(hiveType -> hiveType.getType(typeManager))
.map(hiveType -> hiveType.getType(typeManager, getTimestampPrecision(session).getPrecision()))
.collect(toList());

fieldCount = fileColumnNames.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import static io.prestosql.plugin.hive.HiveSessionProperties.getOrcOptimizedWriterMinStripeSize;
import static io.prestosql.plugin.hive.HiveSessionProperties.getOrcOptimizedWriterValidateMode;
import static io.prestosql.plugin.hive.HiveSessionProperties.getOrcStringStatisticsLimit;
import static io.prestosql.plugin.hive.HiveSessionProperties.getTimestampPrecision;
import static io.prestosql.plugin.hive.HiveSessionProperties.isOrcOptimizedWriterValidate;
import static io.prestosql.plugin.hive.util.HiveUtil.getColumnNames;
import static io.prestosql.plugin.hive.util.HiveUtil.getColumnTypes;
Expand Down Expand Up @@ -140,7 +141,7 @@ public Optional<FileWriter> createFileWriter(
// an index to rearrange columns in the proper order
List<String> fileColumnNames = getColumnNames(schema);
List<Type> fileColumnTypes = getColumnTypes(schema).stream()
.map(hiveType -> hiveType.getType(typeManager))
.map(hiveType -> hiveType.getType(typeManager, getTimestampPrecision(session).getPrecision()))
.collect(toList());

int[] fileInputColumnIndexes = fileColumnNames.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.concurrent.Callable;

import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_OPEN_ERROR;
import static io.prestosql.plugin.hive.HiveSessionProperties.getTimestampPrecision;
import static io.prestosql.plugin.hive.util.HiveUtil.getColumnNames;
import static io.prestosql.plugin.hive.util.HiveUtil.getColumnTypes;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -86,7 +87,7 @@ public Optional<FileWriter> createFileWriter(

List<String> fileColumnNames = getColumnNames(schema);
List<Type> fileColumnTypes = getColumnTypes(schema).stream()
.map(hiveType -> hiveType.getType(typeManager))
.map(hiveType -> hiveType.getType(typeManager, getTimestampPrecision(session).getPrecision()))
.collect(toList());

int[] fileInputColumnIndexes = fileColumnNames.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import io.prestosql.spi.type.DecimalType;
import io.prestosql.spi.type.DoubleType;
import io.prestosql.spi.type.IntegerType;
import io.prestosql.spi.type.LongTimestamp;
import io.prestosql.spi.type.RealType;
import io.prestosql.spi.type.SmallintType;
import io.prestosql.spi.type.TimestampType;
import io.prestosql.spi.type.TinyintType;
import io.prestosql.spi.type.Type;
import io.prestosql.spi.type.VarbinaryType;
Expand All @@ -51,15 +53,22 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.IntSupplier;

import static io.prestosql.plugin.hive.util.HiveUtil.isArrayType;
import static io.prestosql.plugin.hive.util.HiveUtil.isMapType;
import static io.prestosql.plugin.hive.util.HiveUtil.isRowType;
import static io.prestosql.plugin.hive.util.HiveWriteUtils.getHiveDecimal;
import static io.prestosql.spi.type.TimestampType.TIMESTAMP_MILLIS;
import static io.prestosql.spi.type.TimestampType.TIMESTAMP_MICROS;
import static io.prestosql.spi.type.TimestampType.TIMESTAMP_NANOS;
import static io.prestosql.spi.type.Timestamps.MICROSECONDS_PER_MILLISECOND;
import static io.prestosql.spi.type.Timestamps.MICROSECONDS_PER_SECOND;
import static io.prestosql.spi.type.Timestamps.NANOSECONDS_PER_MICROSECOND;
import static io.prestosql.spi.type.Timestamps.PICOSECONDS_PER_NANOSECOND;
import static io.prestosql.spi.type.Timestamps.roundDiv;
import static java.lang.Float.intBitsToFloat;
import static java.lang.Math.floorDiv;
import static java.lang.Math.floorMod;
import static java.lang.Math.toIntExact;
import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -118,8 +127,13 @@ public FieldSetter create(SettableStructObjectInspector rowInspector, Object row
return new DateFieldSetter(rowInspector, row, field);
}

if (type.equals(TIMESTAMP_MILLIS)) {
return new TimestampFieldSetter(rowInspector, row, field, timeZone);
if (type instanceof TimestampType) {
if (((TimestampType) type).isShort()) {
return new ShortTimestampFieldSetter(rowInspector, row, field, timeZone);
}
else {
return new LongTimestampFieldSetter(rowInspector, row, field, timeZone);
}
}

if (type instanceof DecimalType) {
Expand Down Expand Up @@ -361,24 +375,60 @@ public void setField(Block block, int position)
}
}

private static class TimestampFieldSetter
private abstract static class TimestampFieldSetter
extends FieldSetter
{
private final DateTimeZone timeZone;
private final TimestampWritableV2 value = new TimestampWritableV2();
protected final DateTimeZone timeZone;
protected final TimestampWritableV2 value = new TimestampWritableV2();

public TimestampFieldSetter(SettableStructObjectInspector rowInspector, Object row, StructField field, DateTimeZone timeZone)
{
super(rowInspector, row, field);
this.timeZone = requireNonNull(timeZone, "timeZone is null");
}

protected Timestamp getTimestamp(long micros, IntSupplier picosSupplier)
{
long epochMillis = floorDiv(micros, MICROSECONDS_PER_MILLISECOND);
epochMillis = timeZone.convertLocalToUTC(epochMillis, false);
int nanosFromMicros = floorMod(micros, MICROSECONDS_PER_SECOND) * NANOSECONDS_PER_MICROSECOND;
int nanosFromPicos = (int) roundDiv(picosSupplier.getAsInt(), PICOSECONDS_PER_NANOSECOND);
return Timestamp.ofEpochMilli(epochMillis, nanosFromMicros + nanosFromPicos);
}
}

private static class ShortTimestampFieldSetter
extends TimestampFieldSetter
{
public ShortTimestampFieldSetter(SettableStructObjectInspector rowInspector, Object row, StructField field, DateTimeZone timeZone)
{
super(rowInspector, row, field, timeZone);
}

@Override
public void setField(Block block, int position)
{
long micros = TIMESTAMP_MICROS.getLong(block, position);
Timestamp timestamp = getTimestamp(micros, () -> 0);
value.set(timestamp);
rowInspector.setStructFieldData(row, field, value);
}
}

private static class LongTimestampFieldSetter
extends TimestampFieldSetter
{
public LongTimestampFieldSetter(SettableStructObjectInspector rowInspector, Object row, StructField field, DateTimeZone timeZone)
{
super(rowInspector, row, field, timeZone);
}

@Override
public void setField(Block block, int position)
{
long epochMilli = floorDiv(TIMESTAMP_MILLIS.getLong(block, position), MICROSECONDS_PER_MILLISECOND);
epochMilli = timeZone.convertLocalToUTC(epochMilli, false);
value.set(Timestamp.ofEpochMilli(epochMilli));
LongTimestamp longTimestamp = (LongTimestamp) TIMESTAMP_NANOS.getObject(block, position);
Timestamp timestamp = getTimestamp(longTimestamp.getEpochMicros(), longTimestamp::getPicosOfMicro);
value.set(timestamp);
rowInspector.setStructFieldData(row, field, value);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.prestosql.spi.type.CharType;
import io.prestosql.spi.type.DecimalType;
import io.prestosql.spi.type.NamedTypeSignature;
import io.prestosql.spi.type.TimestampType;
import io.prestosql.spi.type.Type;
import io.prestosql.spi.type.TypeSignatureParameter;
import io.prestosql.spi.type.VarcharType;
Expand Down Expand Up @@ -49,7 +50,6 @@
import static io.prestosql.spi.type.IntegerType.INTEGER;
import static io.prestosql.spi.type.RealType.REAL;
import static io.prestosql.spi.type.SmallintType.SMALLINT;
import static io.prestosql.spi.type.TimestampType.TIMESTAMP_MILLIS;
import static io.prestosql.spi.type.TinyintType.TINYINT;
import static io.prestosql.spi.type.VarbinaryType.VARBINARY;
import static java.lang.String.format;
Expand Down Expand Up @@ -113,7 +113,7 @@ public static TypeInfo translate(Type type)
if (DATE.equals(type)) {
return HIVE_DATE.getTypeInfo();
}
if (TIMESTAMP_MILLIS.equals(type)) {
if (type instanceof TimestampType) {
return HIVE_TIMESTAMP.getTypeInfo();
}
if (type instanceof DecimalType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import io.prestosql.spi.type.IntegerType;
import io.prestosql.spi.type.RealType;
import io.prestosql.spi.type.SmallintType;
import io.prestosql.spi.type.TimestampType;
import io.prestosql.spi.type.TinyintType;
import io.prestosql.spi.type.Type;
import io.prestosql.spi.type.VarbinaryType;
Expand Down Expand Up @@ -702,7 +703,7 @@ public static ObjectInspector getRowColumnInspector(Type type)
return writableDateObjectInspector;
}

if (type.equals(TIMESTAMP_MILLIS)) {
if (type instanceof TimestampType) {
return writableTimestampObjectInspector;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@
import static io.prestosql.plugin.hive.HiveTestUtils.createGenericHiveRecordCursorProvider;
import static io.prestosql.plugin.hive.HiveTestUtils.getHiveSession;
import static io.prestosql.plugin.hive.HiveTestUtils.getTypes;
import static io.prestosql.plugin.hive.HiveTimestampPrecision.MICROSECONDS;
import static io.prestosql.plugin.hive.HiveTimestampPrecision.MILLISECONDS;
import static io.prestosql.plugin.hive.HiveTimestampPrecision.NANOSECONDS;
import static io.prestosql.testing.StructuralTestUtil.rowBlockOf;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -291,16 +294,15 @@ public void testRcBinaryOptimizedWriter(int rowCount)
public void testOrc(int rowCount, long fileSizePadding)
throws Exception
{
// Hive binary writers are broken for timestamps
List<TestColumn> testColumns = TEST_COLUMNS.stream()
.filter(TestHiveFileFormats::withoutTimestamps)
.collect(toImmutableList());

assertThatFileFormat(ORC)
.withColumns(testColumns)
.withRowsCount(rowCount)
.withFileSizePadding(fileSizePadding)
.isReadableByPageSource(new OrcPageSourceFactory(new OrcReaderOptions(), HDFS_ENVIRONMENT, STATS, UTC));
for (HiveTimestampPrecision timestampPrecision : List.of(MILLISECONDS, MICROSECONDS, NANOSECONDS)) {
ConnectorSession session = getHiveSession(new HiveConfig().setTimestampPrecision(timestampPrecision));
assertThatFileFormat(ORC)
.withColumns(TEST_COLUMNS)
.withRowsCount(rowCount)
.withFileSizePadding(fileSizePadding)
.withSession(session)
.isReadableByPageSource(new OrcPageSourceFactory(new OrcReaderOptions(), HDFS_ENVIRONMENT, STATS, UTC));
}
}

@Test(dataProvider = "validRowAndFileSizePadding")
Expand Down
Loading

0 comments on commit 83da853

Please sign in to comment.