Skip to content

Commit

Permalink
Merge pull request #4087 from nvdbaranec/parquet_large_files
Browse files Browse the repository at this point in the history
[REVIEW] Support for large parquet files via chunked writes.
  • Loading branch information
OlivierNV authored Feb 15, 2020
2 parents 8e6cbba + 110057a commit 6fe4c47
Show file tree
Hide file tree
Showing 13 changed files with 878 additions and 90 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
- PR #3891 Port NVStrings (r)split_record to contiguous_(r)split_record
- PR #4072 Allow round_robin_partition to single partition
- PR #4064 Add cudaGetDeviceCount to JNI layer
- PR #4087 Add support for writing large Parquet files in a chunked manner.
- PR #3716 Update cudf.to_parquet to use new GPU accelerated Parquet writer
- PR #4083 Use two partitions in test_groupby_multiindex_reset_index
- PR #4071 Add Java bindings for round robin partition
Expand Down
8 changes: 8 additions & 0 deletions cpp/benchmarks/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -175,3 +175,11 @@ set(NULLMASK_BENCH_SRC
"${CMAKE_CURRENT_SOURCE_DIR}/null_mask/set_null_mask_benchmark.cpp")

ConfigureBench(NULLMASK_BENCH "${NULLMASK_BENCH_SRC}")

###################################################################################################
# - parquet writer benchmark -----------------------------------------------------------------------------

set(PARQUET_WRITER_BENCH_SRC
"${CMAKE_CURRENT_SOURCE_DIR}/io/parquet_writer_benchmark.cu")

ConfigureBench(PARQUET_WRITER_BENCH "${PARQUET_WRITER_BENCH_SRC}")
141 changes: 141 additions & 0 deletions cpp/benchmarks/io/parquet_writer_benchmark.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <benchmark/benchmark.h>

#include <cudf/column/column.hpp>
#include <cudf/table/table.hpp>

#include <tests/utilities/base_fixture.hpp>
#include <tests/utilities/column_utilities.hpp>
#include <tests/utilities/column_wrapper.hpp>

#include <benchmarks/fixture/benchmark_fixture.hpp>
#include <benchmarks/synchronization/synchronization.hpp>

#include <cudf/io/functions.hpp>

// to enable, run cmake with -DBUILD_BENCHMARKS=ON

namespace cudf_io = cudf::experimental::io;

class ParquetWrite: public cudf::benchmark {};
class ParquetWriteChunked: public cudf::benchmark {};

template<typename T>
std::unique_ptr<cudf::experimental::table> create_random_fixed_table(cudf::size_type num_columns, cudf::size_type num_rows, bool include_validity)
{
auto valids = cudf::test::make_counting_transform_iterator(0,
[](auto i) {
return i % 2 == 0 ? true : false;
}
);
std::vector<cudf::test::fixed_width_column_wrapper<T>> src_cols(num_columns);
for(int idx=0; idx<num_columns; idx++){
auto rand_elements = cudf::test::make_counting_transform_iterator(0, [](T i){return rand();});
if(include_validity){
src_cols[idx] = cudf::test::fixed_width_column_wrapper<T>(rand_elements, rand_elements + num_rows, valids);
} else {
src_cols[idx] = cudf::test::fixed_width_column_wrapper<T>(rand_elements, rand_elements + num_rows);
}
}
std::vector<std::unique_ptr<cudf::column>> columns(num_columns);
std::transform(src_cols.begin(), src_cols.end(), columns.begin(), [](cudf::test::fixed_width_column_wrapper<T> &in){
auto ret = in.release();
ret->has_nulls();
return ret;
});
return std::make_unique<cudf::experimental::table>(std::move(columns));
}

void PQ_write(benchmark::State& state)
{
int64_t total_desired_bytes = state.range(0);
cudf::size_type num_cols = state.range(1);

cudf::size_type el_size = 4;
int64_t num_rows = total_desired_bytes / (num_cols * el_size);

srand(31337);
auto tbl = create_random_fixed_table<int>(num_cols, num_rows, true);
cudf::table_view view = tbl->view();

for(auto _ : state){
cuda_event_timer raii(state, true); // flush_l2_cache = true, stream = 0
cudf_io::write_parquet_args args{cudf_io::sink_info(), view};
cudf_io::write_parquet(args);
}

state.SetBytesProcessed(
static_cast<int64_t>(state.iterations())*state.range(0));
}

void PQ_write_chunked(benchmark::State& state)
{
int64_t total_desired_bytes = state.range(0);
cudf::size_type num_cols = state.range(1);
cudf::size_type num_tables = state.range(2);

cudf::size_type el_size = 4;
int64_t num_rows = (total_desired_bytes / (num_cols * el_size)) / num_tables;

srand(31337);
std::vector<std::unique_ptr<cudf::experimental::table>> tables;
for(cudf::size_type idx=0; idx<num_tables; idx++){
tables.push_back(create_random_fixed_table<int>(num_cols, num_rows, true));
}

for(auto _ : state){
cuda_event_timer raii(state, true); // flush_l2_cache = true, stream = 0
cudf_io::write_parquet_chunked_args args{cudf_io::sink_info()};

auto state = cudf_io::write_parquet_chunked_begin(args);
std::for_each(tables.begin(), tables.end(), [&state](std::unique_ptr<cudf::experimental::table> const& tbl){
cudf_io::write_parquet_chunked(*tbl, state);
});
cudf_io::write_parquet_chunked_end(state);
}

state.SetBytesProcessed(
static_cast<int64_t>(state.iterations())*state.range(0));
}


#define PWBM_BENCHMARK_DEFINE(name, size, num_columns) \
BENCHMARK_DEFINE_F(ParquetWrite, name)(::benchmark::State& state) { \
PQ_write(state); \
} \
BENCHMARK_REGISTER_F(ParquetWrite, name)->Args({size, num_columns}) \
->Unit(benchmark::kMillisecond)->UseManualTime() \
->Iterations(4)

PWBM_BENCHMARK_DEFINE(3Gb8Cols, (int64_t)3 * 1024 * 1024 * 1024, 8);
PWBM_BENCHMARK_DEFINE(3Gb1024Cols, (int64_t)3 * 1024 * 1024 * 1024, 1024);


#define PWCBM_BENCHMARK_DEFINE(name, size, num_columns, num_chunks) \
BENCHMARK_DEFINE_F(ParquetWriteChunked, name)(::benchmark::State& state) { \
PQ_write_chunked(state); \
} \
BENCHMARK_REGISTER_F(ParquetWriteChunked, name)->Args({size, num_columns, num_chunks}) \
->Unit(benchmark::kMillisecond)->UseManualTime() \
->Iterations(4)

PWCBM_BENCHMARK_DEFINE(3Gb8Cols64Chunks, (int64_t)3 * 1024 * 1024 * 1024, 8, 64);
PWCBM_BENCHMARK_DEFINE(3Gb1024Cols64Chunks, (int64_t)3 * 1024 * 1024 * 1024, 1024, 64);

PWCBM_BENCHMARK_DEFINE(3Gb8Cols128Chunks, (int64_t)3 * 1024 * 1024 * 1024, 8, 128);
PWCBM_BENCHMARK_DEFINE(3Gb1024Cols128Chunks, (int64_t)3 * 1024 * 1024 * 1024, 1024, 128);
82 changes: 81 additions & 1 deletion cpp/include/cudf/io/functions.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019, NVIDIA CORPORATION.
* Copyright (c) 2019-2020, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -427,6 +427,86 @@ void write_parquet(write_parquet_args const& args, rmm::mr::device_memory_resour
rmm::mr::get_default_resource());


/**
* @brief Settings to use for `write_parquet_chunked()`
*/
struct write_parquet_chunked_args {
/// Specify the sink to use for writer output
sink_info sink;
/// Specify the compression format to use
compression_type compression;
/// Specify the level of statistics in the output file
statistics_freq stats_level;
/// Optional associated metadata.
const table_metadata_with_nullability *metadata;

explicit write_parquet_chunked_args(sink_info const& sink_,
const table_metadata_with_nullability *metadata_ = nullptr,
compression_type compression_ = compression_type::AUTO,
statistics_freq stats_lvl_ = statistics_freq::STATISTICS_ROWGROUP)
: sink(sink_), metadata(metadata_), compression(compression_), stats_level(stats_lvl_) {}
};

/**
* @brief Forward declaration of anonymous chunked-writer state struct.
*/
namespace detail {
namespace parquet {
struct pq_chunked_state;
};
};

/**
* @brief Begin the process of writing a parquet file in a chunked/stream form.
*
* The intent of the write_parquet_chunked_ path is to allow writing of an
* arbitrarily large / arbitrary number of rows to a parquet file in multiple passes.
*
* The following code snippet demonstrates how to write a single parquet file containing
* one logical table by writing a series of individual cudf::tables.
* @code
* #include <cudf.h>
* ...
* std::string filepath = "dataset.parquet";
* cudf::experimental::io::write_parquet_chunked_args args{cudf::sink_info(filepath), table->view()};
* ...
* auto state = cudf::write_parquet_chunked_begin(args);
* cudf::write_parquet_chunked(table0, state);
* cudf::write_parquet_chunked(table1, state);
* ...
* cudf_write_parquet_chunked_end(state);
* @endcode
*
* @param[in] args Settings for controlling writing behavior
* @param[in] mr Optional resource to use for device memory allocation
*
* @returns pointer to an anonymous state structure storing information about the chunked write. this
* pointer must be passed to all subsequent write_parquet_chunked() and write_parquet_chunked_end()
* calls.
*/
std::shared_ptr<detail::parquet::pq_chunked_state> write_parquet_chunked_begin(
write_parquet_chunked_args const& args,
rmm::mr::device_memory_resource* mr = rmm::mr::get_default_resource());
/**
* @brief Write a single table as a subtable of a larger logical parquet file/table.
*
* All tables passed into multiple calls of this function must contain the same # of columns and have columns
* of the same type.
*
* @param[in] table The table data to be written.
* @param[in] state Opaque state information about the writer process. Must be the same pointer returned
* from write_parquet_chunked_begin()
*/
void write_parquet_chunked(table_view const& table, std::shared_ptr<detail::parquet::pq_chunked_state> state);

/**
* @brief Finish writing a chunked/stream parquet file.
*
* @param[in] state Opaque state information about the writer process. Must be the same pointer returned
* from write_parquet_chunked_begin()
*/
void write_parquet_chunked_end(std::shared_ptr<detail::parquet::pq_chunked_state>& state);

} // namespace io
} // namespace experimental
} // namespace cudf
28 changes: 23 additions & 5 deletions cpp/include/cudf/io/types.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019, NVIDIA CORPORATION.
* Copyright (c) 2019-2020, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -62,6 +62,7 @@ enum class io_type {
FILEPATH, ///< Input/output is a file path
HOST_BUFFER, ///< Input/output is a buffer in host memory,
ARROW_RANDOM_ACCESS_FILE, ///< Input/output is an arrow::io::RandomAccessFile
VOID, ///< Input/output is nothing. No work is done. Useful for benchmarking
};

/**
Expand Down Expand Up @@ -99,8 +100,25 @@ enum statistics_freq {
* f5 f6
*/
struct table_metadata {
std::vector<std::string> column_names; //!< Names of columns contained in the table
std::map<std::string, std::string> user_data; //!< Format-dependent metadata as key-values pairs
std::vector<std::string> column_names; //!< Names of columns contained in the table
std::map<std::string, std::string> user_data; //!< Format-dependent metadata as key-values pairs
};

/**
* @brief Derived class of table_metadata which includes nullability information per column of input.
*
* This information is used as an optimization for chunked writes. If the caller leaves column_nullable
* uninitialized, the writer code will assume the worst case : that all columns are nullable.
*
* If the column_nullable field is not empty, it is expected that it has a length equal to the number
* of columns in the table being written.
*
* In the case where column nullability is known, pass `true` if the corresponding column could contain
* nulls in one or more subtables to be written, otherwise `false`.
*
*/
struct table_metadata_with_nullability : public table_metadata {
std::vector<bool> column_nullable; //!< Per-column nullability information.
};

/**
Expand Down Expand Up @@ -136,7 +154,7 @@ struct source_info {
* @brief Destination information for write interfaces
*/
struct sink_info {
io_type type = io_type::FILEPATH;
io_type type = io_type::VOID;
std::string filepath;
std::vector<char>* buffer = nullptr;

Expand All @@ -146,7 +164,7 @@ struct sink_info {
: type(io_type::FILEPATH), filepath(file_path) {}

explicit sink_info(std::vector<char>* buffer)
: type(io_type::HOST_BUFFER), buffer(buffer) {}
: type(io_type::HOST_BUFFER), buffer(buffer) {}
};

} // namespace io
Expand Down
Loading

0 comments on commit 6fe4c47

Please sign in to comment.