Skip to content

Commit

Permalink
support reading timestamp type Parquet data (facebookincubator#17)
Browse files Browse the repository at this point in the history
* support reading timestamp type Parquet data

* fix

* fix

* fix

* Update PageReader.cpp
  • Loading branch information
yukkit authored Dec 27, 2024
1 parent 7880c7c commit 976d779
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 0 deletions.
50 changes: 50 additions & 0 deletions velox/dwio/parquet/reader/PageReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

#include "velox/dwio/parquet/reader/PageReader.h"

#include "common/base/Exceptions.h"
#include "dwio/parquet/thrift/ParquetThriftTypes.h"
#include "velox/common/testutil/TestValue.h"
#include "velox/dwio/common/BufferUtil.h"
#include "velox/dwio/common/ColumnVisitors.h"
Expand Down Expand Up @@ -334,6 +336,54 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) {
pageHeader.uncompressed_page_size);
}

if (type_->logicalType_->__isset.TIMESTAMP) {
auto timestampAdjustment = 1;
if (type_->logicalType_->TIMESTAMP.unit.__isset.MILLIS) {
timestampAdjustment = Timestamp::kMillisecondsInSecond;
} else if (type_->logicalType_->TIMESTAMP.unit.__isset.MICROS) {
timestampAdjustment = Timestamp::kMicrosecondsInMillisecond * Timestamp::kMillisecondsInSecond;
} else if (type_->logicalType_->TIMESTAMP.unit.__isset.NANOS) {
timestampAdjustment = Timestamp::kNanosInSecond;
} else {
std::stringstream oss;
type_->logicalType_->TIMESTAMP.unit.printTo(oss);
VELOX_USER_FAIL(
"Unsupported timestamp unit for dictionary encoding: {}",
oss.str());
}

auto parquetTypeSize = sizeof(int64_t);
auto numVeloxBytes = dictionary_.numValues * sizeof(Timestamp);
dictionary_.values = AlignedBuffer::allocate<char>(numVeloxBytes, &pool_);
auto numBytes = dictionary_.numValues * parquetTypeSize;
if (pageData_) {
memcpy(dictionary_.values->asMutable<char>(), pageData_, numBytes);
} else {
dwio::common::readBytes(
numBytes,
inputStream_.get(),
dictionary_.values->asMutable<char>(),
bufferStart_,
bufferEnd_);
}
// Expand the Parquet type length values to Velox type length.
// We start from the end to allow in-place expansion.
auto values = dictionary_.values->asMutable<Timestamp>();
auto parquetValues = dictionary_.values->asMutable<char>();

for (auto i = dictionary_.numValues - 1; i >= 0; --i) {
// Convert the timestamp into seconds and nanos since the Unix epoch,
// 00:00:00.000000 on 1 January 1970.
int64_t val;
memcpy(&val, parquetValues + i * parquetTypeSize, sizeof(int64_t));

int64_t seconds = val / timestampAdjustment;
uint64_t nanos = (val % timestampAdjustment) * (Timestamp::kNanosInSecond / timestampAdjustment);
values[i] = Timestamp(seconds, nanos);
}
return;
}

auto parquetType = type_->parquetType_.value();
switch (parquetType) {
case thrift::Type::INT32:
Expand Down
4 changes: 4 additions & 0 deletions velox/dwio/parquet/reader/ParquetReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,10 @@ TypePtr ReaderBase::convertType(
case thrift::Type::type::INT32:
return INTEGER();
case thrift::Type::type::INT64:
if (schemaElement.logicalType.__isset.TIMESTAMP) {
return TIMESTAMP();
}

return BIGINT();
case thrift::Type::type::INT96:
return TIMESTAMP(); // INT96 only maps to a timestamp
Expand Down
Binary file not shown.
25 changes: 25 additions & 0 deletions velox/dwio/parquet/tests/reader/ParquetReaderTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/

#include "type/Timestamp.h"
#include "velox/dwio/parquet/tests/ParquetTestBase.h"
#include "velox/expression/ExprToSubfieldFilter.h"
#include "velox/vector/tests/utils/VectorMaker.h"
Expand Down Expand Up @@ -1289,3 +1290,27 @@ TEST_F(ParquetReaderTest, testEmptyV2DataPage) {
assertReadWithReaderAndExpected(
outputRowType, *rowReader, expected, *leafPool_);
}

TEST_F(ParquetReaderTest, testReadTimestamp) {
const std::string sample(getExampleFilePath("test_read_timestamp.parquet"));

dwio::common::ReaderOptions readerOptions{leafPool_.get()};
auto reader = createReader(sample, readerOptions);
EXPECT_EQ(reader->numberOfRows(), 3ULL);

auto outputRowType = ROW({"c0"}, {TIMESTAMP()});
EXPECT_EQ(*(reader->typeWithId()->type()), *outputRowType);

auto rowReaderOpts = getReaderOpts(outputRowType);
rowReaderOpts.setScanSpec(makeScanSpec(outputRowType));
auto rowReader = reader->createRowReader(rowReaderOpts);

auto expected = makeRowVector({makeFlatVector<Timestamp>({
Timestamp(0, 1 * Timestamp::kNanosecondsInMillisecond),
Timestamp(7 * 60 * 60, 1 * Timestamp::kNanosecondsInMillisecond),
Timestamp(1732693111 + 8 * 60 * 60, 0),
})});

assertReadWithReaderAndExpected(
outputRowType, *rowReader, expected, *leafPool_);
}

0 comments on commit 976d779

Please sign in to comment.