Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for dictionary encoded INT96 timestamp in parquet files #4680

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions velox/connectors/hive/HiveConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,16 @@ bool HiveConfig::s3UseProxyFromEnv() const {
return config_->get<bool>(kS3UseProxyFromEnv, false);
}

uint8_t HiveConfig::readTimestampUnit(const Config* session) const {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The unit should be read from the Parquet logical type for this column, not set by the user as a config property. See https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#timestamp

Copy link
Contributor

@Yuhta Yuhta Jul 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is corresponding to the timestamp unit that can be handled in compute engine (usually milliseconds for Presto and maybe some other values for Spark), not related to the type in the file. The reader should use the more coarse one of both.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yingsu00 In a Parquet file, the unit of int96 is fixed because it is made up of days and nanos, unlike int64-timestamp, which can have different units. We parse days and nanos from Parquet, while the compute engine may need different units of timestamps, e.g. Presto needs milli while Spark needs micro. This config allows us to adjust timestamp precision according to user's requirement.

Without this change, the filter result could become incorrect. For example, for a Spark filter a == 2000-09-12 22:36:29.000000, if a is stored as nano unit in Velox, when a is 2000-09-12 22:36:29.000000111 Velox returns false but Spark needs true because it only cares about the micro digits.

Therefore, we need to truncate the value and this logic is also needed for int64-timestamp reader. Does this makes sense? Thanks.

Reference for Int96 in Parquet: https://github.com/apache/parquet-format/pull/49/files#diff-0e877db0daf579f98a11e5e113b29250a2dcae3decb1e83a88db1e6f092bee96R149-R150

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yingsu00 In a Parquet file, the unit of int96 is fixed because it is made up of days and nanos, unlike int64-timestamp, which can have different units. We parse days and nanos from Parquet, while the compute engine may need different units of timestamps, e.g. Presto needs milli while Spark needs micro. This config allows us to adjust timestamp precision according to user's requirement.

Without this change, the filter result could become incorrect. For example, for a Spark filter a == 2000-09-12 22:36:29.000000, if a is stored as nano unit in Velox, when a is 2000-09-12 22:36:29.000000111 Velox returns false but Spark needs true because it only cares about the micro digits.

Therefore, we need to truncate the value and this logic is also needed for int64-timestamp reader. Does this makes sense? Thanks.

Reference for Int96 in Parquet: https://github.com/apache/parquet-format/pull/49/files#diff-0e877db0daf579f98a11e5e113b29250a2dcae3decb1e83a88db1e6f092bee96R149-R150

THanks @rui-mo for explaining. Sorry I didn't check the INT96 Timestamp spec. Just approved this PR.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for helping review this PR.

const auto unit = session->get<uint8_t>(
kReadTimestampUnitSession,
config_->get<uint8_t>(kReadTimestampUnit, 3 /*milli*/));
VELOX_CHECK(
unit == 3 || unit == 6 /*micro*/ || unit == 9 /*nano*/,
"Invalid timestamp unit.");
return unit;
}

uint8_t HiveConfig::parquetWriteTimestampUnit(const Config* session) const {
const auto unit = session->get<uint8_t>(
kParquetWriteTimestampUnitSession,
Expand Down
9 changes: 9 additions & 0 deletions velox/connectors/hive/HiveConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,12 @@ class HiveConfig {
static constexpr const char* kS3UseProxyFromEnv =
"hive.s3.use-proxy-from-env";

// The unit for reading timestamps from files.
static constexpr const char* kReadTimestampUnit =
"hive.reader.timestamp-unit";
static constexpr const char* kReadTimestampUnitSession =
"hive.reader.timestamp_unit";

/// Timestamp unit for Parquet write through Arrow bridge.
static constexpr const char* kParquetWriteTimestampUnit =
"hive.parquet.writer.timestamp-unit";
Expand Down Expand Up @@ -333,6 +339,9 @@ class HiveConfig {

bool s3UseProxyFromEnv() const;

// Returns the timestamp unit used when reading timestamps from files.
uint8_t readTimestampUnit(const Config* session) const;

/// Returns the timestamp unit used when writing timestamps into Parquet
/// through Arrow bridge. 0: second, 3: milli, 6: micro, 9: nano.
uint8_t parquetWriteTimestampUnit(const Config* session) const;
Expand Down
8 changes: 7 additions & 1 deletion velox/connectors/hive/HiveConnectorUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,9 @@ void configureRowReaderOptions(
const std::shared_ptr<common::ScanSpec>& scanSpec,
std::shared_ptr<common::MetadataFilter> metadataFilter,
const RowTypePtr& rowType,
const std::shared_ptr<const HiveConnectorSplit>& hiveSplit) {
const std::shared_ptr<const HiveConnectorSplit>& hiveSplit,
const std::shared_ptr<const HiveConfig>& hiveConfig,
const Config* sessionProperties) {
auto skipRowsIt =
tableParameters.find(dwio::common::TableParameter::kSkipHeaderLineCount);
if (skipRowsIt != tableParameters.end()) {
Expand All @@ -582,6 +584,10 @@ void configureRowReaderOptions(
rowReaderOptions.setMetadataFilter(std::move(metadataFilter));
rowReaderOptions.setRequestedType(rowType);
rowReaderOptions.range(hiveSplit->start, hiveSplit->length);
if (hiveConfig && sessionProperties) {
rowReaderOptions.setTimestampPrecision(static_cast<TimestampPrecision>(
hiveConfig->readTimestampUnit(sessionProperties)));
}
}

namespace {
Expand Down
4 changes: 3 additions & 1 deletion velox/connectors/hive/HiveConnectorUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ void configureRowReaderOptions(
const std::shared_ptr<common::ScanSpec>& scanSpec,
std::shared_ptr<common::MetadataFilter> metadataFilter,
const RowTypePtr& rowType,
const std::shared_ptr<const HiveConnectorSplit>& hiveSplit);
const std::shared_ptr<const HiveConnectorSplit>& hiveSplit,
const std::shared_ptr<const HiveConfig>& hiveConfig = nullptr,
const Config* sessionProperties = nullptr);

bool testFilters(
const common::ScanSpec* scanSpec,
Expand Down
4 changes: 3 additions & 1 deletion velox/connectors/hive/SplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,9 @@ void SplitReader::createReader(
scanSpec_,
std::move(metadataFilter),
ROW(std::move(columnNames), std::move(columnTypes)),
hiveSplit_);
hiveSplit_,
hiveConfig_,
connectorQueryCtx_->sessionProperties());
}

bool SplitReader::checkIfSplitIsEmpty(
Expand Down
44 changes: 43 additions & 1 deletion velox/dwio/parquet/reader/PageReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ namespace facebook::velox::parquet {
using thrift::Encoding;
using thrift::PageHeader;

struct __attribute__((__packed__)) Int96Timestamp {
int32_t days;
uint64_t nanos;
};

void PageReader::seekToPage(int64_t row) {
defineDecoder_.reset();
repeatDecoder_.reset();
Expand Down Expand Up @@ -371,6 +376,42 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) {
}
break;
}
case thrift::Type::INT96: {
auto numVeloxBytes = dictionary_.numValues * sizeof(Timestamp);
dictionary_.values = AlignedBuffer::allocate<char>(numVeloxBytes, &pool_);
auto numBytes = dictionary_.numValues * sizeof(Int96Timestamp);
if (pageData_) {
memcpy(dictionary_.values->asMutable<char>(), pageData_, numBytes);
} else {
dwio::common::readBytes(
numBytes,
inputStream_.get(),
dictionary_.values->asMutable<char>(),
bufferStart_,
bufferEnd_);
}
// Expand the Parquet type length values to Velox type length.
// We start from the end to allow in-place expansion.
auto values = dictionary_.values->asMutable<Timestamp>();
auto parquetValues = dictionary_.values->asMutable<char>();

for (auto i = dictionary_.numValues - 1; i >= 0; --i) {
// Convert the timestamp into seconds and nanos since the Unix epoch,
// 00:00:00.000000 on 1 January 1970.
int64_t nanos;
memcpy(
&nanos,
parquetValues + i * sizeof(Int96Timestamp),
sizeof(int64_t));
int32_t days;
memcpy(
&days,
parquetValues + i * sizeof(Int96Timestamp) + sizeof(int64_t),
sizeof(int32_t));
values[i] = Timestamp::fromDaysAndNanos(days, nanos);
}
break;
}
case thrift::Type::BYTE_ARRAY: {
dictionary_.values =
AlignedBuffer::allocate<StringView>(dictionary_.numValues, &pool_);
Expand Down Expand Up @@ -461,7 +502,6 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) {
VELOX_UNSUPPORTED(
"Parquet type {} not supported for dictionary", parquetType);
}
case thrift::Type::INT96:
default:
VELOX_UNSUPPORTED(
"Parquet type {} not supported for dictionary", parquetType);
Expand All @@ -488,6 +528,8 @@ int32_t parquetTypeBytes(thrift::Type::type type) {
case thrift::Type::INT64:
case thrift::Type::DOUBLE:
return 8;
case thrift::Type::INT96:
return 12;
default:
VELOX_FAIL("Type does not have a byte width {}", type);
}
Expand Down
5 changes: 5 additions & 0 deletions velox/dwio/parquet/reader/ParquetColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "velox/dwio/parquet/reader/RepeatedColumnReader.h"
#include "velox/dwio/parquet/reader/StringColumnReader.h"
#include "velox/dwio/parquet/reader/StructColumnReader.h"
#include "velox/dwio/parquet/reader/TimestampColumnReader.h"

namespace facebook::velox::parquet {

Expand Down Expand Up @@ -74,6 +75,10 @@ std::unique_ptr<dwio::common::SelectiveColumnReader> ParquetColumnReader::build(
return std::make_unique<BooleanColumnReader>(
requestedType, fileType, params, scanSpec);

case TypeKind::TIMESTAMP:
return std::make_unique<TimestampColumnReader>(
requestedType, fileType, params, scanSpec);

default:
VELOX_FAIL(
"buildReader unhandled type: " +
Expand Down
11 changes: 9 additions & 2 deletions velox/dwio/parquet/reader/ParquetData.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,24 @@ class ParquetParams : public dwio::common::FormatParams {
memory::MemoryPool& pool,
dwio::common::ColumnReaderStatistics& stats,
const FileMetaDataPtr metaData,
const date::time_zone* sessionTimezone)
const date::time_zone* sessionTimezone,
TimestampPrecision timestampPrecision)
: FormatParams(pool, stats),
metaData_(metaData),
sessionTimezone_(sessionTimezone) {}
sessionTimezone_(sessionTimezone),
timestampPrecision_(timestampPrecision) {}
std::unique_ptr<dwio::common::FormatData> toFormatData(
const std::shared_ptr<const dwio::common::TypeWithId>& type,
const common::ScanSpec& scanSpec) override;

TimestampPrecision timestampPrecision() const {
return timestampPrecision_;
}

private:
const FileMetaDataPtr metaData_;
const date::time_zone* sessionTimezone_;
const TimestampPrecision timestampPrecision_;
};

/// Format-specific data created for each leaf column of a Parquet rowgroup.
Expand Down
3 changes: 2 additions & 1 deletion velox/dwio/parquet/reader/ParquetReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,8 @@ class ParquetRowReader::Impl {
pool_,
columnReaderStats_,
readerBase_->fileMetaData(),
readerBase->sessionTimezone());
readerBase->sessionTimezone(),
options_.timestampPrecision());
requestedType_ = options_.requestedType() ? options_.requestedType()
: readerBase_->schema();
columnReader_ = ParquetColumnReader::build(
Expand Down
85 changes: 85 additions & 0 deletions velox/dwio/parquet/reader/TimestampColumnReader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "velox/dwio/parquet/reader/IntegerColumnReader.h"
#include "velox/dwio/parquet/reader/ParquetColumnReader.h"

namespace facebook::velox::parquet {

class TimestampColumnReader : public IntegerColumnReader {
public:
TimestampColumnReader(
const TypePtr& requestedType,
std::shared_ptr<const dwio::common::TypeWithId> fileType,
ParquetParams& params,
common::ScanSpec& scanSpec)
: IntegerColumnReader(requestedType, fileType, params, scanSpec),
timestampPrecision_(params.timestampPrecision()) {}

bool hasBulkPath() const override {
return false;
}

void getValues(RowSet rows, VectorPtr* result) override {
getFlatValues<Timestamp, Timestamp>(rows, result, requestedType_);
if (allNull_) {
return;
}

// Adjust timestamp nanos to the requested precision.
VectorPtr resultVector = *result;
auto rawValues =
resultVector->asUnchecked<FlatVector<Timestamp>>()->mutableRawValues();
for (auto i = 0; i < numValues_; ++i) {
if (resultVector->isNullAt(i)) {
continue;
}
const auto timestamp = rawValues[i];
uint64_t nanos = timestamp.getNanos();
switch (timestampPrecision_) {
case TimestampPrecision::kMilliseconds:
nanos = nanos / 1'000'000 * 1'000'000;
break;
case TimestampPrecision::kMicroseconds:
nanos = nanos / 1'000 * 1'000;
break;
case TimestampPrecision::kNanoseconds:
break;
}
rawValues[i] = Timestamp(timestamp.getSeconds(), nanos);
}
}

void read(
vector_size_t offset,
RowSet rows,
const uint64_t* /*incomingNulls*/) override {
auto& data = formatData_->as<ParquetData>();
// Use int128_t as a workaroud. Timestamp in Velox is of 16-byte length.
prepareRead<int128_t>(offset, rows, nullptr);
readCommon<IntegerColumnReader, true>(rows);
readOffset_ += rows.back() + 1;
}

private:
// The requested precision can be specified from HiveConfig to read timestamp
// from Parquet.
TimestampPrecision timestampPrecision_;
};

} // namespace facebook::velox::parquet
Binary file not shown.
13 changes: 13 additions & 0 deletions velox/dwio/parquet/tests/reader/E2EFilterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,19 @@ TEST_F(E2EFilterTest, integerDictionary) {
20);
}

TEST_F(E2EFilterTest, timestampDictionary) {
options_.dataPageSize = 4 * 1024;
options_.writeInt96AsTimestamp = true;

testWithTypes(
"timestamp_val_0:timestamp,"
"timestamp_val_1:timestamp",
[&]() {},
true,
{"timestamp_val_0", "timestamp_val_1"},
20);
}

TEST_F(E2EFilterTest, floatAndDoubleDirect) {
options_.enableDictionary = false;
options_.dataPageSize = 4 * 1024;
Expand Down
Loading
Loading