From 0d53305825c7fadabbe06988e2b5488a3099de0a Mon Sep 17 00:00:00 2001 From: PHILO-HE Date: Tue, 7 Mar 2023 09:33:20 +0800 Subject: [PATCH] Port a patch: Refactor Thrift Transport for Parquet Metadata Access #4160 --- velox/dwio/parquet/tests/CMakeLists.txt | 1 + .../dwio/parquet/tests/thrift/CMakeLists.txt | 19 +++ .../tests/thrift/ThriftTransportTest.cpp | 111 ++++++++++++++++++ velox/dwio/parquet/thrift/ThriftTransport.h | 57 ++++++++- 4 files changed, 184 insertions(+), 4 deletions(-) create mode 100644 velox/dwio/parquet/tests/thrift/CMakeLists.txt create mode 100644 velox/dwio/parquet/tests/thrift/ThriftTransportTest.cpp diff --git a/velox/dwio/parquet/tests/CMakeLists.txt b/velox/dwio/parquet/tests/CMakeLists.txt index 2b7f78ade95b..71d3133e1ac3 100644 --- a/velox/dwio/parquet/tests/CMakeLists.txt +++ b/velox/dwio/parquet/tests/CMakeLists.txt @@ -24,6 +24,7 @@ set(TEST_LINK_LIBS add_subdirectory(duckdb_reader) add_subdirectory(reader) +add_subdirectory(thrift) add_executable(velox_dwio_parquet_tpch_test ParquetTpchTest.cpp) add_test( diff --git a/velox/dwio/parquet/tests/thrift/CMakeLists.txt b/velox/dwio/parquet/tests/thrift/CMakeLists.txt new file mode 100644 index 000000000000..4dea14398d9f --- /dev/null +++ b/velox/dwio/parquet/tests/thrift/CMakeLists.txt @@ -0,0 +1,19 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +add_executable(velox_dwio_parquet_thrift_test ThriftTransportTest.cpp) + +add_test(velox_dwio_parquet_thrift_test velox_dwio_parquet_thrift_test) +target_link_libraries(velox_dwio_parquet_thrift_test arrow thrift + ${VELOX_LINK_LIBS} gtest gtest_main) diff --git a/velox/dwio/parquet/tests/thrift/ThriftTransportTest.cpp b/velox/dwio/parquet/tests/thrift/ThriftTransportTest.cpp new file mode 100644 index 000000000000..2fed88512f6d --- /dev/null +++ b/velox/dwio/parquet/tests/thrift/ThriftTransportTest.cpp @@ -0,0 +1,111 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/dwio/parquet/thrift/ThriftTransport.h" +#include +#include + +using namespace facebook::velox; +using namespace facebook::velox::dwio::common; +using namespace facebook::velox::parquet::thrift; + +class ThriftTransportTest : public testing::Test { + protected: + void SetUp() override { + input_.resize(bufferSize_); + output_.resize(bufferSize_); + for (size_t i = 0; i < input_.size(); ++i) { + input_[i] = static_cast(i); + } + } + + void prepareThriftStreamingTransport() { + inputStream_ = std::make_shared( + input_.data(), input_.size(), 20); + int32_t batchSize_; + const void* bufferPointer; + if (!inputStream_->Next(&bufferPointer, &batchSize_)) { + VELOX_CHECK(false, "Reading past end"); + } + bufferStart_ = static_cast(bufferPointer); + bufferEnd_ = bufferStart_ + batchSize_; + transport_ = std::make_shared( + inputStream_.get(), bufferStart_, bufferEnd_); + } + + void prepareThriftBufferedTransport() { + transport_ = + std::make_shared(input_.data(), bufferSize_); + } + + static constexpr uint32_t bufferSize_ = 200; + static constexpr uint32_t batchSize_ = 20; + std::vector input_; + std::vector output_; + const char* FOLLY_NULLABLE bufferStart_{nullptr}; + const char* FOLLY_NULLABLE bufferEnd_{nullptr}; + std::shared_ptr inputStream_; + std::shared_ptr transport_; +}; + +TEST_F(ThriftTransportTest, streaming) { + prepareThriftStreamingTransport(); + transport_->read(output_.data(), 10); + transport_->read(output_.data() + 10, 50); + transport_->read(output_.data() + 60, 140); + + for (size_t i = 0; i < input_.size(); ++i) { + VELOX_CHECK_EQ(input_[i], output_[i]); + } +} + +TEST_F(ThriftTransportTest, streamingOutOfBoundry) { + prepareThriftStreamingTransport(); + transport_->read(output_.data(), 10); + transport_->read(output_.data() + 10, 50); + transport_->read(output_.data() + 60, 140); + + // The whole inputStream_ is consumed. + EXPECT_ANY_THROW(transport_->read(output_.data() + bufferSize_, 1)); +} + +TEST_F(ThriftTransportTest, buffered) { + prepareThriftBufferedTransport(); + transport_->read(output_.data(), 10); + transport_->read(output_.data() + 10, 50); + transport_->read(output_.data() + 60, 140); + + for (size_t i = 0; i < input_.size(); ++i) { + VELOX_CHECK_EQ(input_[i], output_[i]); + } +} + +TEST_F(ThriftTransportTest, bufferedOutOfBoundry) { + prepareThriftStreamingTransport(); + transport_->read(output_.data(), 10); + transport_->read(output_.data() + 10, 50); + transport_->read(output_.data() + 60, 140); + + // The whole inputStream_ is consumed. + EXPECT_ANY_THROW(transport_->read(output_.data() + bufferSize_, 1)); +} + +// Define main so that gflags get processed. +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + folly::init(&argc, &argv, false); + return RUN_ALL_TESTS(); +} diff --git a/velox/dwio/parquet/thrift/ThriftTransport.h b/velox/dwio/parquet/thrift/ThriftTransport.h index 3e4bf769936f..cee094ce4435 100644 --- a/velox/dwio/parquet/thrift/ThriftTransport.h +++ b/velox/dwio/parquet/thrift/ThriftTransport.h @@ -21,12 +21,61 @@ namespace facebook::velox::parquet::thrift { -class ThriftBufferedTransport - : public apache::thrift::transport::TVirtualTransport< - ThriftBufferedTransport> { +class ThriftTransport + : public apache::thrift::transport::TVirtualTransport { + public: + virtual uint32_t read(uint8_t* outputBuf, uint32_t len) = 0; + virtual ~ThriftTransport() = default; +}; + +class ThriftStreamingTransport : public ThriftTransport { + public: + ThriftStreamingTransport( + dwio::common::SeekableInputStream* inputStream, + const char*& bufferStart, + const char*& bufferEnd) + : inputStream_(inputStream), + bufferStart_(bufferStart), + bufferEnd_(bufferEnd) { + VELOX_CHECK_NOT_NULL(inputStream_); + VELOX_CHECK_NOT_NULL(bufferStart_); + VELOX_CHECK_NOT_NULL(bufferEnd_); + } + + uint32_t read(uint8_t* outputBuf, uint32_t len) { + uint32_t bytesToRead = len; + while (bytesToRead > 0) { + if (bufferEnd_ == bufferStart_) { + int32_t size; + if (!inputStream_->Next( + reinterpret_cast(&bufferStart_), &size)) { + VELOX_FAIL("Reading past the end of the stream"); + } + bufferEnd_ = bufferStart_ + size; + } + + uint32_t bytesToReadInBuffer = + std::min(bufferEnd_ - bufferStart_, bytesToRead); + memcpy(outputBuf, bufferStart_, bytesToReadInBuffer); + bufferStart_ += bytesToReadInBuffer; + bytesToRead -= bytesToReadInBuffer; + outputBuf += bytesToReadInBuffer; + } + + return len; + } + + private: + dwio::common::SeekableInputStream* inputStream_; + const char*& bufferStart_; + const char*& bufferEnd_; +}; + +class ThriftBufferedTransport : public ThriftTransport { public: ThriftBufferedTransport(const void* inputBuf, uint64_t len) - : inputBuf_(reinterpret_cast(inputBuf)), + : ThriftTransport(), + inputBuf_(reinterpret_cast(inputBuf)), size_(len), offset_(0) {}