Skip to content

Commit

Permalink
Port upstream patch to fix parquet reader issue on long min/max strin…
Browse files Browse the repository at this point in the history
…g data (oap-project#145)

* Port a patch: Refactor Thrift Transport for Parquet Metadata Access facebookincubator#4160

* Port a patch: Read Parquet Page Header with ThriftStreamingTransport to Fix the Incorrect Header Length facebookincubator#4108
  • Loading branch information
PHILO-HE authored and zhejiangxiaomai committed Mar 8, 2023
1 parent 74643aa commit 4a1b2ee
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 46 deletions.
47 changes: 10 additions & 37 deletions velox/dwio/parquet/reader/PageReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ void PageReader::seekToPage(int64_t row) {
numRowsInPage_ = 0;
break;
}
PageHeader pageHeader = readPageHeader(chunkSize_ - pageStart_);
PageHeader pageHeader = readPageHeader();
pageStart_ = pageDataStart_ + pageHeader.compressed_page_size;

switch (pageHeader.type) {
Expand Down Expand Up @@ -75,52 +75,25 @@ void PageReader::seekToPage(int64_t row) {
}
}

PageHeader PageReader::readPageHeader(int64_t remainingSize) {
// Note that sizeof(PageHeader) may be longer than actually read
std::shared_ptr<thrift::ThriftBufferedTransport> transport;
std::unique_ptr<apache::thrift::protocol::TCompactProtocolT<
thrift::ThriftBufferedTransport>>
protocol;
char copy[sizeof(PageHeader)];
bool wasInBuffer = false;
PageHeader PageReader::readPageHeader() {
if (bufferEnd_ == bufferStart_) {
const void* buffer;
int32_t size;
inputStream_->Next(&buffer, &size);
bufferStart_ = reinterpret_cast<const char*>(buffer);
bufferEnd_ = bufferStart_ + size;
}
if (bufferEnd_ - bufferStart_ >= sizeof(PageHeader)) {
wasInBuffer = true;
transport = std::make_shared<thrift::ThriftBufferedTransport>(
bufferStart_, sizeof(PageHeader));
protocol = std::make_unique<apache::thrift::protocol::TCompactProtocolT<
thrift::ThriftBufferedTransport>>(transport);
} else {
dwio::common::readBytes(
std::min<int64_t>(remainingSize, sizeof(PageHeader)),
inputStream_.get(),
&copy,
bufferStart_,
bufferEnd_);

transport = std::make_shared<thrift::ThriftBufferedTransport>(
copy, sizeof(PageHeader));
protocol = std::make_unique<apache::thrift::protocol::TCompactProtocolT<
thrift::ThriftBufferedTransport>>(transport);
}
std::shared_ptr<thrift::ThriftTransport> transport =
std::make_shared<thrift::ThriftStreamingTransport>(
inputStream_.get(), bufferStart_, bufferEnd_);
apache::thrift::protocol::TCompactProtocolT<thrift::ThriftTransport> protocol(
transport);
PageHeader pageHeader;
uint64_t readBytes = pageHeader.read(protocol.get());
uint64_t readBytes;
readBytes = pageHeader.read(&protocol);

pageDataStart_ = pageStart_ + readBytes;
// Unread the bytes that were not consumed.
if (wasInBuffer) {
bufferStart_ += readBytes;
} else {
std::vector<uint64_t> start = {pageDataStart_};
dwio::common::PositionProvider position(start);
inputStream_->seekToPosition(position);
bufferStart_ = bufferEnd_ = nullptr;
}
return pageHeader;
}

Expand Down
23 changes: 19 additions & 4 deletions velox/dwio/parquet/reader/PageReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,21 @@ class PageReader {
type_->makeLevelInfo(leafInfo_);
}

// This PageReader constructor is for unit test only.
PageReader(
std::unique_ptr<dwio::common::SeekableInputStream> stream,
memory::MemoryPool& pool,
thrift::CompressionCodec::type codec,
int64_t chunkSize)
: pool_(pool),
inputStream_(std::move(stream)),
maxRepeat_(0),
maxDefine_(1),
isTopLevel_(maxRepeat_ == 0 && maxDefine_ <= 1),
codec_(codec),
chunkSize_(chunkSize),
nullConcatenation_(pool_) {}

/// Advances 'numRows' top level rows.
void skip(int64_t numRows);

Expand Down Expand Up @@ -111,6 +126,10 @@ class PageReader {
return {repDefBegin_, repDefEnd_};
}

// Parses the PageHeader at 'inputStream_', and move the bufferStart_ and
// bufferEnd_ to the corresponding positions.
thrift::PageHeader readPageHeader();

private:
// Indicates that we only want the repdefs for the next page. Used when
// prereading repdefs with seekToPage.
Expand Down Expand Up @@ -162,10 +181,6 @@ class PageReader {
// next page.
void updateRowInfoAfterPageSkipped();

// Parses the PageHeader at 'inputStream_'. Will not read more than
// 'remainingBytes' since there could be less data left in the
// ColumnChunk than the full header size.
thrift::PageHeader readPageHeader(int64_t remainingSize);
void prepareDataPageV1(const thrift::PageHeader& pageHeader, int64_t row);
void prepareDataPageV2(const thrift::PageHeader& pageHeader, int64_t row);
void prepareDictionary(const thrift::PageHeader& pageHeader);
Expand Down
11 changes: 6 additions & 5 deletions velox/dwio/parquet/reader/ParquetReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,12 @@ void ReaderBase::loadFileMetaData() {
missingLength, stream.get(), copy.data(), bufferStart, bufferEnd);
}

auto thriftTransport = std::make_shared<thrift::ThriftBufferedTransport>(
copy.data() + footerOffsetInBuffer, footerLength);
auto thriftProtocol =
std::make_unique<apache::thrift::protocol::TCompactProtocolT<
thrift::ThriftBufferedTransport>>(thriftTransport);
std::shared_ptr<thrift::ThriftTransport> thriftTransport =
std::make_shared<thrift::ThriftBufferedTransport>(
copy.data() + footerOffsetInBuffer, footerLength);
auto thriftProtocol = std::make_unique<
apache::thrift::protocol::TCompactProtocolT<thrift::ThriftTransport>>(
thriftTransport);
fileMetaData_ = std::make_unique<thrift::FileMetaData>();
fileMetaData_->read(thriftProtocol.get());
}
Expand Down

0 comments on commit 4a1b2ee

Please sign in to comment.