Skip to content

Commit

Permalink
Fix too much small row group issue in Velox parquet writer (oap-proje…
Browse files Browse the repository at this point in the history
…ct#326)

Co-authored-by: youxiduo <[email protected]>
  • Loading branch information
2 people authored and zhejiangxiaomai committed Jun 26, 2023
1 parent e09cc23 commit 03fa725
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 27 deletions.
89 changes: 66 additions & 23 deletions velox/dwio/parquet/writer/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,52 +21,95 @@

namespace facebook::velox::parquet {

void Writer::flush() {
if (stagingRows_ > 0) {
if (!arrowWriter_) {
stream_ = std::make_shared<DataBufferSink>(
finalSink_.get(),
pool_,
queryCtx_->queryConfig().dataBufferGrowRatio());
auto arrowProperties = ::parquet::ArrowWriterProperties::Builder().build();
PARQUET_ASSIGN_OR_THROW(
arrowWriter_,
::parquet::arrow::FileWriter::Open(
*(schema_.get()),
arrow::default_memory_pool(),
stream_,
properties_,
arrowProperties));
}

auto fields = schema_->fields();
std::vector<std::shared_ptr<arrow::ChunkedArray>> chunks;
for (int colIdx = 0; colIdx < fields.size(); colIdx++) {
auto dataType = fields.at(colIdx)->type();
auto chunk = arrow::ChunkedArray::Make(std::move(stagingChunks_.at(colIdx)), dataType).ValueOrDie();
chunks.push_back(chunk);
}
auto table = arrow::Table::Make(schema_, std::move(chunks), stagingRows_);
PARQUET_THROW_NOT_OK(arrowWriter_->WriteTable(*table, maxRowGroupRows_));
if (queryCtx_->queryConfig().dataBufferGrowRatio() > 1) {
PARQUET_THROW_NOT_OK(stream_->Flush());
}
for (auto& chunk : stagingChunks_) {
chunk.clear();
}
stagingRows_ = 0;
stagingBytes_ = 0;
}
}

/**
* This method would cache input `ColumnarBatch` to make the size of row group big.
* It would flush when:
* - the cached numRows bigger than `maxRowGroupRows_`
* - the cached bytes bigger than `maxRowGroupBytes_`
*
* This method assumes each input `ColumnarBatch` have same schema.
*/
void Writer::write(const RowVectorPtr& data) {
ArrowArray array;
ArrowSchema schema;
exportToArrow(data, array, &pool_);
exportToArrow(data, schema);
PARQUET_ASSIGN_OR_THROW(
auto recordBatch, arrow::ImportRecordBatch(&array, &schema));
auto table = arrow::Table::Make(
recordBatch->schema(), recordBatch->columns(), data->size());
if (!arrowWriter_) {
stream_ = std::make_shared<DataBufferSink>(
finalSink_.get(),
pool_,
queryCtx_->queryConfig().dataBufferGrowRatio());
auto arrowProperties = ::parquet::ArrowWriterProperties::Builder().build();
PARQUET_ASSIGN_OR_THROW(
arrowWriter_,
::parquet::arrow::FileWriter::Open(
*recordBatch->schema(),
arrow::default_memory_pool(),
stream_,
properties_,
arrowProperties));
if (!schema_) {
schema_ = recordBatch->schema();
for (int colIdx = 0; colIdx < schema_->num_fields(); colIdx++) {
stagingChunks_.push_back(std::vector<std::shared_ptr<arrow::Array>>());
}
}

PARQUET_THROW_NOT_OK(arrowWriter_->WriteTable(*table, 10000));

if (queryCtx_->queryConfig().dataBufferGrowRatio() > 1) {
flush(); // No performance drop on 1TB dataset.
auto bytes = data->estimateFlatSize();
auto numRows = data->size();
if (stagingBytes_ + bytes > maxRowGroupBytes_ || stagingRows_ + numRows > maxRowGroupRows_) {
flush();
}
}

void Writer::flush() {
PARQUET_THROW_NOT_OK(stream_->Flush());
for (int colIdx = 0; colIdx < recordBatch->num_columns(); colIdx++) {
auto array = recordBatch->column(colIdx);
stagingChunks_.at(colIdx).push_back(array);
}
stagingRows_ += numRows;
stagingBytes_ += bytes;
}

void Writer::newRowGroup(int32_t numRows) {
PARQUET_THROW_NOT_OK(arrowWriter_->NewRowGroup(numRows));
}

void Writer::close() {
flush();

if (arrowWriter_) {
PARQUET_THROW_NOT_OK(arrowWriter_->Close());
arrowWriter_.reset();
}

PARQUET_THROW_NOT_OK(stream_->Close());

stagingChunks_.clear();
}

} // namespace facebook::velox::parquet
18 changes: 14 additions & 4 deletions velox/dwio/parquet/writer/Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,18 +90,19 @@ class DataBufferSink : public arrow::io::OutputStream {
class Writer {
public:
// Constructts a writer with output to 'sink'. A new row group is
// started every 'rowsInRowGroup' top level rows. 'pool' is used for
// started every 'maxRowGroupBytes' top level rows. 'pool' is used for
// temporary memory. 'properties' specifies Parquet-specific
// options.
Writer(
std::unique_ptr<dwio::common::DataSink> sink,
memory::MemoryPool& pool,
int32_t rowsInRowGroup,
int64_t maxRowGroupBytes,
std::shared_ptr<::parquet::WriterProperties> properties =
::parquet::WriterProperties::Builder().build(),
std::shared_ptr<velox::core::QueryCtx> queryCtx =
std::make_shared<velox::core::QueryCtx>(nullptr))
: rowsInRowGroup_(rowsInRowGroup),
: maxRowGroupBytes_(maxRowGroupBytes),
maxRowGroupRows_(properties->max_row_group_length()),
pool_(pool),
finalSink_(std::move(sink)),
properties_(std::move(properties)),
Expand All @@ -121,11 +122,20 @@ class Writer {
void close();

private:
const int32_t rowsInRowGroup_;
const int64_t maxRowGroupBytes_;
const int64_t maxRowGroupRows_;

int64_t stagingRows_ = 0;
int64_t stagingBytes_ = 0;

// Pool for 'stream_'.
memory::MemoryPool& pool_;

std::shared_ptr<arrow::Schema> schema_;

// columns, Arrays
std::vector<std::vector<std::shared_ptr<arrow::Array>>> stagingChunks_;

// Final destination of output.
std::unique_ptr<dwio::common::DataSink> finalSink_;

Expand Down

0 comments on commit 03fa725

Please sign in to comment.