From a552fb26f5ea982687b26d19610200ddc4169e85 Mon Sep 17 00:00:00 2001 From: PHILO-HE Date: Wed, 8 Mar 2023 09:58:45 +0800 Subject: [PATCH] Port upstream patch to fix parquet reader issue on long min/max string data (#145) * Port a patch: Refactor Thrift Transport for Parquet Metadata Access #4160 * Port a patch: Read Parquet Page Header with ThriftStreamingTransport to Fix the Incorrect Header Length #4108 --- velox/dwio/parquet/reader/PageReader.cpp | 47 ++------ velox/dwio/parquet/reader/PageReader.h | 23 +++- velox/dwio/parquet/reader/ParquetReader.cpp | 11 +- velox/dwio/parquet/tests/CMakeLists.txt | 1 + .../dwio/parquet/tests/thrift/CMakeLists.txt | 19 +++ .../tests/thrift/ThriftTransportTest.cpp | 111 ++++++++++++++++++ velox/dwio/parquet/thrift/ThriftTransport.h | 57 ++++++++- 7 files changed, 219 insertions(+), 50 deletions(-) create mode 100644 velox/dwio/parquet/tests/thrift/CMakeLists.txt create mode 100644 velox/dwio/parquet/tests/thrift/ThriftTransportTest.cpp diff --git a/velox/dwio/parquet/reader/PageReader.cpp b/velox/dwio/parquet/reader/PageReader.cpp index d00fdcf6bca7..de7038aeb380 100644 --- a/velox/dwio/parquet/reader/PageReader.cpp +++ b/velox/dwio/parquet/reader/PageReader.cpp @@ -44,7 +44,7 @@ void PageReader::seekToPage(int64_t row) { numRowsInPage_ = 0; break; } - PageHeader pageHeader = readPageHeader(chunkSize_ - pageStart_); + PageHeader pageHeader = readPageHeader(); pageStart_ = pageDataStart_ + pageHeader.compressed_page_size; switch (pageHeader.type) { @@ -75,14 +75,7 @@ void PageReader::seekToPage(int64_t row) { } } -PageHeader PageReader::readPageHeader(int64_t remainingSize) { - // Note that sizeof(PageHeader) may be longer than actually read - std::shared_ptr transport; - std::unique_ptr> - protocol; - char copy[sizeof(PageHeader)]; - bool wasInBuffer = false; +PageHeader PageReader::readPageHeader() { if (bufferEnd_ == bufferStart_) { const void* buffer; int32_t size; @@ -90,37 +83,17 @@ PageHeader PageReader::readPageHeader(int64_t remainingSize) { bufferStart_ = reinterpret_cast(buffer); bufferEnd_ = bufferStart_ + size; } - if (bufferEnd_ - bufferStart_ >= sizeof(PageHeader)) { - wasInBuffer = true; - transport = std::make_shared( - bufferStart_, sizeof(PageHeader)); - protocol = std::make_unique>(transport); - } else { - dwio::common::readBytes( - std::min(remainingSize, sizeof(PageHeader)), - inputStream_.get(), - ©, - bufferStart_, - bufferEnd_); - transport = std::make_shared( - copy, sizeof(PageHeader)); - protocol = std::make_unique>(transport); - } + std::shared_ptr transport = + std::make_shared( + inputStream_.get(), bufferStart_, bufferEnd_); + apache::thrift::protocol::TCompactProtocolT protocol( + transport); PageHeader pageHeader; - uint64_t readBytes = pageHeader.read(protocol.get()); + uint64_t readBytes; + readBytes = pageHeader.read(&protocol); + pageDataStart_ = pageStart_ + readBytes; - // Unread the bytes that were not consumed. - if (wasInBuffer) { - bufferStart_ += readBytes; - } else { - std::vector start = {pageDataStart_}; - dwio::common::PositionProvider position(start); - inputStream_->seekToPosition(position); - bufferStart_ = bufferEnd_ = nullptr; - } return pageHeader; } diff --git a/velox/dwio/parquet/reader/PageReader.h b/velox/dwio/parquet/reader/PageReader.h index 65c523b552d1..316215f98ab7 100644 --- a/velox/dwio/parquet/reader/PageReader.h +++ b/velox/dwio/parquet/reader/PageReader.h @@ -50,6 +50,21 @@ class PageReader { type_->makeLevelInfo(leafInfo_); } + // This PageReader constructor is for unit test only. + PageReader( + std::unique_ptr stream, + memory::MemoryPool& pool, + thrift::CompressionCodec::type codec, + int64_t chunkSize) + : pool_(pool), + inputStream_(std::move(stream)), + maxRepeat_(0), + maxDefine_(1), + isTopLevel_(maxRepeat_ == 0 && maxDefine_ <= 1), + codec_(codec), + chunkSize_(chunkSize), + nullConcatenation_(pool_) {} + /// Advances 'numRows' top level rows. void skip(int64_t numRows); @@ -110,6 +125,10 @@ class PageReader { return {repDefBegin_, repDefEnd_}; } + // Parses the PageHeader at 'inputStream_', and move the bufferStart_ and + // bufferEnd_ to the corresponding positions. + thrift::PageHeader readPageHeader(); + private: // Indicates that we only want the repdefs for the next page. Used when // prereading repdefs with seekToPage. @@ -161,10 +180,6 @@ class PageReader { // next page. void updateRowInfoAfterPageSkipped(); - // Parses the PageHeader at 'inputStream_'. Will not read more than - // 'remainingBytes' since there could be less data left in the - // ColumnChunk than the full header size. - thrift::PageHeader readPageHeader(int64_t remainingSize); void prepareDataPageV1(const thrift::PageHeader& pageHeader, int64_t row); void prepareDataPageV2(const thrift::PageHeader& pageHeader, int64_t row); void prepareDictionary(const thrift::PageHeader& pageHeader); diff --git a/velox/dwio/parquet/reader/ParquetReader.cpp b/velox/dwio/parquet/reader/ParquetReader.cpp index 8ae88b8349a8..8a5fdec10fb2 100644 --- a/velox/dwio/parquet/reader/ParquetReader.cpp +++ b/velox/dwio/parquet/reader/ParquetReader.cpp @@ -75,11 +75,12 @@ void ReaderBase::loadFileMetaData() { missingLength, stream.get(), copy.data(), bufferStart, bufferEnd); } - auto thriftTransport = std::make_shared( - copy.data() + footerOffsetInBuffer, footerLength); - auto thriftProtocol = - std::make_unique>(thriftTransport); + std::shared_ptr thriftTransport = + std::make_shared( + copy.data() + footerOffsetInBuffer, footerLength); + auto thriftProtocol = std::make_unique< + apache::thrift::protocol::TCompactProtocolT>( + thriftTransport); fileMetaData_ = std::make_unique(); fileMetaData_->read(thriftProtocol.get()); } diff --git a/velox/dwio/parquet/tests/CMakeLists.txt b/velox/dwio/parquet/tests/CMakeLists.txt index 2b7f78ade95b..71d3133e1ac3 100644 --- a/velox/dwio/parquet/tests/CMakeLists.txt +++ b/velox/dwio/parquet/tests/CMakeLists.txt @@ -24,6 +24,7 @@ set(TEST_LINK_LIBS add_subdirectory(duckdb_reader) add_subdirectory(reader) +add_subdirectory(thrift) add_executable(velox_dwio_parquet_tpch_test ParquetTpchTest.cpp) add_test( diff --git a/velox/dwio/parquet/tests/thrift/CMakeLists.txt b/velox/dwio/parquet/tests/thrift/CMakeLists.txt new file mode 100644 index 000000000000..4dea14398d9f --- /dev/null +++ b/velox/dwio/parquet/tests/thrift/CMakeLists.txt @@ -0,0 +1,19 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +add_executable(velox_dwio_parquet_thrift_test ThriftTransportTest.cpp) + +add_test(velox_dwio_parquet_thrift_test velox_dwio_parquet_thrift_test) +target_link_libraries(velox_dwio_parquet_thrift_test arrow thrift + ${VELOX_LINK_LIBS} gtest gtest_main) diff --git a/velox/dwio/parquet/tests/thrift/ThriftTransportTest.cpp b/velox/dwio/parquet/tests/thrift/ThriftTransportTest.cpp new file mode 100644 index 000000000000..2fed88512f6d --- /dev/null +++ b/velox/dwio/parquet/tests/thrift/ThriftTransportTest.cpp @@ -0,0 +1,111 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/dwio/parquet/thrift/ThriftTransport.h" +#include +#include + +using namespace facebook::velox; +using namespace facebook::velox::dwio::common; +using namespace facebook::velox::parquet::thrift; + +class ThriftTransportTest : public testing::Test { + protected: + void SetUp() override { + input_.resize(bufferSize_); + output_.resize(bufferSize_); + for (size_t i = 0; i < input_.size(); ++i) { + input_[i] = static_cast(i); + } + } + + void prepareThriftStreamingTransport() { + inputStream_ = std::make_shared( + input_.data(), input_.size(), 20); + int32_t batchSize_; + const void* bufferPointer; + if (!inputStream_->Next(&bufferPointer, &batchSize_)) { + VELOX_CHECK(false, "Reading past end"); + } + bufferStart_ = static_cast(bufferPointer); + bufferEnd_ = bufferStart_ + batchSize_; + transport_ = std::make_shared( + inputStream_.get(), bufferStart_, bufferEnd_); + } + + void prepareThriftBufferedTransport() { + transport_ = + std::make_shared(input_.data(), bufferSize_); + } + + static constexpr uint32_t bufferSize_ = 200; + static constexpr uint32_t batchSize_ = 20; + std::vector input_; + std::vector output_; + const char* FOLLY_NULLABLE bufferStart_{nullptr}; + const char* FOLLY_NULLABLE bufferEnd_{nullptr}; + std::shared_ptr inputStream_; + std::shared_ptr transport_; +}; + +TEST_F(ThriftTransportTest, streaming) { + prepareThriftStreamingTransport(); + transport_->read(output_.data(), 10); + transport_->read(output_.data() + 10, 50); + transport_->read(output_.data() + 60, 140); + + for (size_t i = 0; i < input_.size(); ++i) { + VELOX_CHECK_EQ(input_[i], output_[i]); + } +} + +TEST_F(ThriftTransportTest, streamingOutOfBoundry) { + prepareThriftStreamingTransport(); + transport_->read(output_.data(), 10); + transport_->read(output_.data() + 10, 50); + transport_->read(output_.data() + 60, 140); + + // The whole inputStream_ is consumed. + EXPECT_ANY_THROW(transport_->read(output_.data() + bufferSize_, 1)); +} + +TEST_F(ThriftTransportTest, buffered) { + prepareThriftBufferedTransport(); + transport_->read(output_.data(), 10); + transport_->read(output_.data() + 10, 50); + transport_->read(output_.data() + 60, 140); + + for (size_t i = 0; i < input_.size(); ++i) { + VELOX_CHECK_EQ(input_[i], output_[i]); + } +} + +TEST_F(ThriftTransportTest, bufferedOutOfBoundry) { + prepareThriftStreamingTransport(); + transport_->read(output_.data(), 10); + transport_->read(output_.data() + 10, 50); + transport_->read(output_.data() + 60, 140); + + // The whole inputStream_ is consumed. + EXPECT_ANY_THROW(transport_->read(output_.data() + bufferSize_, 1)); +} + +// Define main so that gflags get processed. +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + folly::init(&argc, &argv, false); + return RUN_ALL_TESTS(); +} diff --git a/velox/dwio/parquet/thrift/ThriftTransport.h b/velox/dwio/parquet/thrift/ThriftTransport.h index 3e4bf769936f..cee094ce4435 100644 --- a/velox/dwio/parquet/thrift/ThriftTransport.h +++ b/velox/dwio/parquet/thrift/ThriftTransport.h @@ -21,12 +21,61 @@ namespace facebook::velox::parquet::thrift { -class ThriftBufferedTransport - : public apache::thrift::transport::TVirtualTransport< - ThriftBufferedTransport> { +class ThriftTransport + : public apache::thrift::transport::TVirtualTransport { + public: + virtual uint32_t read(uint8_t* outputBuf, uint32_t len) = 0; + virtual ~ThriftTransport() = default; +}; + +class ThriftStreamingTransport : public ThriftTransport { + public: + ThriftStreamingTransport( + dwio::common::SeekableInputStream* inputStream, + const char*& bufferStart, + const char*& bufferEnd) + : inputStream_(inputStream), + bufferStart_(bufferStart), + bufferEnd_(bufferEnd) { + VELOX_CHECK_NOT_NULL(inputStream_); + VELOX_CHECK_NOT_NULL(bufferStart_); + VELOX_CHECK_NOT_NULL(bufferEnd_); + } + + uint32_t read(uint8_t* outputBuf, uint32_t len) { + uint32_t bytesToRead = len; + while (bytesToRead > 0) { + if (bufferEnd_ == bufferStart_) { + int32_t size; + if (!inputStream_->Next( + reinterpret_cast(&bufferStart_), &size)) { + VELOX_FAIL("Reading past the end of the stream"); + } + bufferEnd_ = bufferStart_ + size; + } + + uint32_t bytesToReadInBuffer = + std::min(bufferEnd_ - bufferStart_, bytesToRead); + memcpy(outputBuf, bufferStart_, bytesToReadInBuffer); + bufferStart_ += bytesToReadInBuffer; + bytesToRead -= bytesToReadInBuffer; + outputBuf += bytesToReadInBuffer; + } + + return len; + } + + private: + dwio::common::SeekableInputStream* inputStream_; + const char*& bufferStart_; + const char*& bufferEnd_; +}; + +class ThriftBufferedTransport : public ThriftTransport { public: ThriftBufferedTransport(const void* inputBuf, uint64_t len) - : inputBuf_(reinterpret_cast(inputBuf)), + : ThriftTransport(), + inputBuf_(reinterpret_cast(inputBuf)), size_(len), offset_(0) {}