From 4a1b2ee171397515222b06d00ada4763db564fb2 Mon Sep 17 00:00:00 2001 From: PHILO-HE Date: Wed, 8 Mar 2023 09:58:45 +0800 Subject: [PATCH] Port upstream patch to fix parquet reader issue on long min/max string data (#145) * Port a patch: Refactor Thrift Transport for Parquet Metadata Access #4160 * Port a patch: Read Parquet Page Header with ThriftStreamingTransport to Fix the Incorrect Header Length #4108 --- velox/dwio/parquet/reader/PageReader.cpp | 47 +++++---------------- velox/dwio/parquet/reader/PageReader.h | 23 ++++++++-- velox/dwio/parquet/reader/ParquetReader.cpp | 11 ++--- 3 files changed, 35 insertions(+), 46 deletions(-) diff --git a/velox/dwio/parquet/reader/PageReader.cpp b/velox/dwio/parquet/reader/PageReader.cpp index 5349afe94180..ee81b4bf2351 100644 --- a/velox/dwio/parquet/reader/PageReader.cpp +++ b/velox/dwio/parquet/reader/PageReader.cpp @@ -44,7 +44,7 @@ void PageReader::seekToPage(int64_t row) { numRowsInPage_ = 0; break; } - PageHeader pageHeader = readPageHeader(chunkSize_ - pageStart_); + PageHeader pageHeader = readPageHeader(); pageStart_ = pageDataStart_ + pageHeader.compressed_page_size; switch (pageHeader.type) { @@ -75,14 +75,7 @@ void PageReader::seekToPage(int64_t row) { } } -PageHeader PageReader::readPageHeader(int64_t remainingSize) { - // Note that sizeof(PageHeader) may be longer than actually read - std::shared_ptr transport; - std::unique_ptr> - protocol; - char copy[sizeof(PageHeader)]; - bool wasInBuffer = false; +PageHeader PageReader::readPageHeader() { if (bufferEnd_ == bufferStart_) { const void* buffer; int32_t size; @@ -90,37 +83,17 @@ PageHeader PageReader::readPageHeader(int64_t remainingSize) { bufferStart_ = reinterpret_cast(buffer); bufferEnd_ = bufferStart_ + size; } - if (bufferEnd_ - bufferStart_ >= sizeof(PageHeader)) { - wasInBuffer = true; - transport = std::make_shared( - bufferStart_, sizeof(PageHeader)); - protocol = std::make_unique>(transport); - } else { - dwio::common::readBytes( - std::min(remainingSize, sizeof(PageHeader)), - inputStream_.get(), - ©, - bufferStart_, - bufferEnd_); - transport = std::make_shared( - copy, sizeof(PageHeader)); - protocol = std::make_unique>(transport); - } + std::shared_ptr transport = + std::make_shared( + inputStream_.get(), bufferStart_, bufferEnd_); + apache::thrift::protocol::TCompactProtocolT protocol( + transport); PageHeader pageHeader; - uint64_t readBytes = pageHeader.read(protocol.get()); + uint64_t readBytes; + readBytes = pageHeader.read(&protocol); + pageDataStart_ = pageStart_ + readBytes; - // Unread the bytes that were not consumed. - if (wasInBuffer) { - bufferStart_ += readBytes; - } else { - std::vector start = {pageDataStart_}; - dwio::common::PositionProvider position(start); - inputStream_->seekToPosition(position); - bufferStart_ = bufferEnd_ = nullptr; - } return pageHeader; } diff --git a/velox/dwio/parquet/reader/PageReader.h b/velox/dwio/parquet/reader/PageReader.h index 09d50f01de81..336ea5897e74 100644 --- a/velox/dwio/parquet/reader/PageReader.h +++ b/velox/dwio/parquet/reader/PageReader.h @@ -51,6 +51,21 @@ class PageReader { type_->makeLevelInfo(leafInfo_); } + // This PageReader constructor is for unit test only. + PageReader( + std::unique_ptr stream, + memory::MemoryPool& pool, + thrift::CompressionCodec::type codec, + int64_t chunkSize) + : pool_(pool), + inputStream_(std::move(stream)), + maxRepeat_(0), + maxDefine_(1), + isTopLevel_(maxRepeat_ == 0 && maxDefine_ <= 1), + codec_(codec), + chunkSize_(chunkSize), + nullConcatenation_(pool_) {} + /// Advances 'numRows' top level rows. void skip(int64_t numRows); @@ -111,6 +126,10 @@ class PageReader { return {repDefBegin_, repDefEnd_}; } + // Parses the PageHeader at 'inputStream_', and move the bufferStart_ and + // bufferEnd_ to the corresponding positions. + thrift::PageHeader readPageHeader(); + private: // Indicates that we only want the repdefs for the next page. Used when // prereading repdefs with seekToPage. @@ -162,10 +181,6 @@ class PageReader { // next page. void updateRowInfoAfterPageSkipped(); - // Parses the PageHeader at 'inputStream_'. Will not read more than - // 'remainingBytes' since there could be less data left in the - // ColumnChunk than the full header size. - thrift::PageHeader readPageHeader(int64_t remainingSize); void prepareDataPageV1(const thrift::PageHeader& pageHeader, int64_t row); void prepareDataPageV2(const thrift::PageHeader& pageHeader, int64_t row); void prepareDictionary(const thrift::PageHeader& pageHeader); diff --git a/velox/dwio/parquet/reader/ParquetReader.cpp b/velox/dwio/parquet/reader/ParquetReader.cpp index 6b86e39f5181..cfb0778822ea 100644 --- a/velox/dwio/parquet/reader/ParquetReader.cpp +++ b/velox/dwio/parquet/reader/ParquetReader.cpp @@ -77,11 +77,12 @@ void ReaderBase::loadFileMetaData() { missingLength, stream.get(), copy.data(), bufferStart, bufferEnd); } - auto thriftTransport = std::make_shared( - copy.data() + footerOffsetInBuffer, footerLength); - auto thriftProtocol = - std::make_unique>(thriftTransport); + std::shared_ptr thriftTransport = + std::make_shared( + copy.data() + footerOffsetInBuffer, footerLength); + auto thriftProtocol = std::make_unique< + apache::thrift::protocol::TCompactProtocolT>( + thriftTransport); fileMetaData_ = std::make_unique(); fileMetaData_->read(thriftProtocol.get()); }