From 976d779cc959b05b5786a799c50da19b8c929627 Mon Sep 17 00:00:00 2001 From: "yujie.zhang (he/him)" Date: Fri, 27 Dec 2024 18:53:33 +0800 Subject: [PATCH] support reading timestamp type Parquet data (#17) * support reading timestamp type Parquet data * fix * fix * fix * Update PageReader.cpp --- velox/dwio/parquet/reader/PageReader.cpp | 50 ++++++++++++++++++ velox/dwio/parquet/reader/ParquetReader.cpp | 4 ++ .../examples/test_read_timestamp.parquet | Bin 0 -> 402 bytes .../tests/reader/ParquetReaderTest.cpp | 25 +++++++++ 4 files changed, 79 insertions(+) create mode 100644 velox/dwio/parquet/tests/examples/test_read_timestamp.parquet diff --git a/velox/dwio/parquet/reader/PageReader.cpp b/velox/dwio/parquet/reader/PageReader.cpp index cf46fdb58184..e6817f4d7f5e 100644 --- a/velox/dwio/parquet/reader/PageReader.cpp +++ b/velox/dwio/parquet/reader/PageReader.cpp @@ -16,6 +16,8 @@ #include "velox/dwio/parquet/reader/PageReader.h" +#include "common/base/Exceptions.h" +#include "dwio/parquet/thrift/ParquetThriftTypes.h" #include "velox/common/testutil/TestValue.h" #include "velox/dwio/common/BufferUtil.h" #include "velox/dwio/common/ColumnVisitors.h" @@ -334,6 +336,54 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) { pageHeader.uncompressed_page_size); } + if (type_->logicalType_->__isset.TIMESTAMP) { + auto timestampAdjustment = 1; + if (type_->logicalType_->TIMESTAMP.unit.__isset.MILLIS) { + timestampAdjustment = Timestamp::kMillisecondsInSecond; + } else if (type_->logicalType_->TIMESTAMP.unit.__isset.MICROS) { + timestampAdjustment = Timestamp::kMicrosecondsInMillisecond * Timestamp::kMillisecondsInSecond; + } else if (type_->logicalType_->TIMESTAMP.unit.__isset.NANOS) { + timestampAdjustment = Timestamp::kNanosInSecond; + } else { + std::stringstream oss; + type_->logicalType_->TIMESTAMP.unit.printTo(oss); + VELOX_USER_FAIL( + "Unsupported timestamp unit for dictionary encoding: {}", + oss.str()); + } + + auto parquetTypeSize = sizeof(int64_t); + auto numVeloxBytes = dictionary_.numValues * sizeof(Timestamp); + dictionary_.values = AlignedBuffer::allocate(numVeloxBytes, &pool_); + auto numBytes = dictionary_.numValues * parquetTypeSize; + if (pageData_) { + memcpy(dictionary_.values->asMutable(), pageData_, numBytes); + } else { + dwio::common::readBytes( + numBytes, + inputStream_.get(), + dictionary_.values->asMutable(), + bufferStart_, + bufferEnd_); + } + // Expand the Parquet type length values to Velox type length. + // We start from the end to allow in-place expansion. + auto values = dictionary_.values->asMutable(); + auto parquetValues = dictionary_.values->asMutable(); + + for (auto i = dictionary_.numValues - 1; i >= 0; --i) { + // Convert the timestamp into seconds and nanos since the Unix epoch, + // 00:00:00.000000 on 1 January 1970. + int64_t val; + memcpy(&val, parquetValues + i * parquetTypeSize, sizeof(int64_t)); + + int64_t seconds = val / timestampAdjustment; + uint64_t nanos = (val % timestampAdjustment) * (Timestamp::kNanosInSecond / timestampAdjustment); + values[i] = Timestamp(seconds, nanos); + } + return; + } + auto parquetType = type_->parquetType_.value(); switch (parquetType) { case thrift::Type::INT32: diff --git a/velox/dwio/parquet/reader/ParquetReader.cpp b/velox/dwio/parquet/reader/ParquetReader.cpp index 635366f19f13..0633b7fe38e0 100644 --- a/velox/dwio/parquet/reader/ParquetReader.cpp +++ b/velox/dwio/parquet/reader/ParquetReader.cpp @@ -664,6 +664,10 @@ TypePtr ReaderBase::convertType( case thrift::Type::type::INT32: return INTEGER(); case thrift::Type::type::INT64: + if (schemaElement.logicalType.__isset.TIMESTAMP) { + return TIMESTAMP(); + } + return BIGINT(); case thrift::Type::type::INT96: return TIMESTAMP(); // INT96 only maps to a timestamp diff --git a/velox/dwio/parquet/tests/examples/test_read_timestamp.parquet b/velox/dwio/parquet/tests/examples/test_read_timestamp.parquet new file mode 100644 index 0000000000000000000000000000000000000000..002fd43fe2b27427d59716384dcf200039e10502 GIT binary patch literal 402 zcmWG=3^EjD5j7BX@)2bdWe{Ru(AcW~S3%++1A~JTKLZFjERqg=Ee2#f+i*VhF1Lgz zgQ$q8nhsF801&guNN~VpBsgGt#27TNhyu+3o6Z%;z`z9LurV?*t1zgwG0BLsNSZRR z2}nvXB^!wHh_Q*?VifzwsFtS2L5lH`IzX>5h%zw%2>~F*zyLH?$CN>WtvER&H8&9? z!=lOrvLvTRNX7=JfPq1bO;ScuhL(;1d4^4m1Lz?UU;qdfBo-Bxrk3a?7Zm80rRL;U QNXp0n-LV>o1Awss0EPfblK=n! literal 0 HcmV?d00001 diff --git a/velox/dwio/parquet/tests/reader/ParquetReaderTest.cpp b/velox/dwio/parquet/tests/reader/ParquetReaderTest.cpp index e46dfadfa1ef..b650ea464536 100644 --- a/velox/dwio/parquet/tests/reader/ParquetReaderTest.cpp +++ b/velox/dwio/parquet/tests/reader/ParquetReaderTest.cpp @@ -14,6 +14,7 @@ * limitations under the License. */ +#include "type/Timestamp.h" #include "velox/dwio/parquet/tests/ParquetTestBase.h" #include "velox/expression/ExprToSubfieldFilter.h" #include "velox/vector/tests/utils/VectorMaker.h" @@ -1289,3 +1290,27 @@ TEST_F(ParquetReaderTest, testEmptyV2DataPage) { assertReadWithReaderAndExpected( outputRowType, *rowReader, expected, *leafPool_); } + +TEST_F(ParquetReaderTest, testReadTimestamp) { + const std::string sample(getExampleFilePath("test_read_timestamp.parquet")); + + dwio::common::ReaderOptions readerOptions{leafPool_.get()}; + auto reader = createReader(sample, readerOptions); + EXPECT_EQ(reader->numberOfRows(), 3ULL); + + auto outputRowType = ROW({"c0"}, {TIMESTAMP()}); + EXPECT_EQ(*(reader->typeWithId()->type()), *outputRowType); + + auto rowReaderOpts = getReaderOpts(outputRowType); + rowReaderOpts.setScanSpec(makeScanSpec(outputRowType)); + auto rowReader = reader->createRowReader(rowReaderOpts); + + auto expected = makeRowVector({makeFlatVector({ + Timestamp(0, 1 * Timestamp::kNanosecondsInMillisecond), + Timestamp(7 * 60 * 60, 1 * Timestamp::kNanosecondsInMillisecond), + Timestamp(1732693111 + 8 * 60 * 60, 0), + })}); + + assertReadWithReaderAndExpected( + outputRowType, *rowReader, expected, *leafPool_); +}