Skip to content

Commit

Permalink
Merge pull request #10953 from rapidsai/branch-22.06
Browse files Browse the repository at this point in the history
[gpuCI] Forward-merge branch-22.06 to branch-22.08 [skip gpuci]
  • Loading branch information
GPUtester authored May 24, 2022
2 parents f111891 + 379cc9f commit c75847a
Show file tree
Hide file tree
Showing 6 changed files with 237 additions and 18 deletions.
156 changes: 148 additions & 8 deletions cpp/include/cudf/io/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ namespace io {

constexpr size_t default_row_group_size_bytes = 128 * 1024 * 1024; // 128MB
constexpr size_type default_row_group_size_rows = 1000000;
constexpr size_t default_max_page_size_bytes = 512 * 1024;
constexpr size_type default_max_page_size_rows = 20000;

/**
* @brief Builds parquet_reader_options to use for `read_parquet()`.
Expand Down Expand Up @@ -382,6 +384,10 @@ class parquet_writer_options {
size_t _row_group_size_bytes = default_row_group_size_bytes;
// Maximum number of rows in row group (unless smaller than a single page)
size_type _row_group_size_rows = default_row_group_size_rows;
// Maximum size of each page (uncompressed)
size_t _max_page_size_bytes = default_max_page_size_bytes;
// Maximum number of rows in a page
size_type _max_page_size_rows = default_max_page_size_rows;

/**
* @brief Constructor from sink and table.
Expand Down Expand Up @@ -482,6 +488,24 @@ class parquet_writer_options {
*/
auto get_row_group_size_rows() const { return _row_group_size_rows; }

/**
* @brief Returns the maximum uncompressed page size, in bytes. If set larger than the row group
* size, then this will return the row group size.
*/
auto get_max_page_size_bytes() const
{
return std::min(_max_page_size_bytes, get_row_group_size_bytes());
}

/**
* @brief Returns maximum page size, in rows. If set larger than the row group size, then this
* will return the row group size.
*/
auto get_max_page_size_rows() const
{
return std::min(_max_page_size_rows, get_row_group_size_rows());
}

/**
* @brief Sets partitions.
*
Expand Down Expand Up @@ -555,8 +579,8 @@ class parquet_writer_options {
void set_row_group_size_bytes(size_t size_bytes)
{
CUDF_EXPECTS(
size_bytes >= 512 * 1024,
"The maximum row group size cannot be smaller than the page size, which is 512KB.");
size_bytes >= 4 * 1024,
"The maximum row group size cannot be smaller than the minimum page size, which is 4KB.");
_row_group_size_bytes = size_bytes;
}

Expand All @@ -567,9 +591,29 @@ class parquet_writer_options {
{
CUDF_EXPECTS(
size_rows >= 5000,
"The maximum row group size cannot be smaller than the page size, which is 5000 rows.");
"The maximum row group size cannot be smaller than the fragment size, which is 5000 rows.");
_row_group_size_rows = size_rows;
}

/**
* @brief Sets the maximum uncompressed page size, in bytes.
*/
void set_max_page_size_bytes(size_t size_bytes)
{
CUDF_EXPECTS(size_bytes >= 4 * 1024, "The maximum page size cannot be smaller than 4KB.");
_max_page_size_bytes = size_bytes;
}

/**
* @brief Sets the maximum page size, in rows.
*/
void set_max_page_size_rows(size_type size_rows)
{
CUDF_EXPECTS(
size_rows >= 5000,
"The maximum page size cannot be smaller than the fragment size, which is 5000 rows.");
_max_page_size_rows = size_rows;
}
};

class parquet_writer_options_builder {
Expand Down Expand Up @@ -690,7 +734,7 @@ class parquet_writer_options_builder {
/**
* @brief Sets the maximum number of rows in output row groups.
*
* @param val maximum number or rows
* @param val maximum number of rows
* @return this for chaining.
*/
parquet_writer_options_builder& row_group_size_rows(size_type val)
Expand All @@ -699,6 +743,33 @@ class parquet_writer_options_builder {
return *this;
}

/**
* @brief Sets the maximum uncompressed page size, in bytes. Serves as a hint to the writer,
* and can be exceeded under certain circumstances. Cannot be larger than the row group size in
* bytes, and will be adjusted to match if it is.
*
* @param val maximum page size
* @return this for chaining.
*/
parquet_writer_options_builder& max_page_size_bytes(size_t val)
{
options.set_max_page_size_bytes(val);
return *this;
}

/**
* @brief Sets the maximum page size, in rows. Counts only top-level rows, ignoring any nesting.
* Cannot be larger than the row group size in rows, and will be adjusted to match if it is.
*
* @param val maximum rows per page
* @return this for chaining.
*/
parquet_writer_options_builder& max_page_size_rows(size_type val)
{
options.set_max_page_size_rows(val);
return *this;
}

/**
* @brief Sets whether int96 timestamps are written or not in parquet_writer_options.
*
Expand Down Expand Up @@ -783,6 +854,10 @@ class chunked_parquet_writer_options {
size_t _row_group_size_bytes = default_row_group_size_bytes;
// Maximum number of rows in row group (unless smaller than a single page)
size_type _row_group_size_rows = default_row_group_size_rows;
// Maximum size of each page (uncompressed)
size_t _max_page_size_bytes = default_max_page_size_bytes;
// Maximum number of rows in a page
size_type _max_page_size_rows = default_max_page_size_rows;

/**
* @brief Constructor from sink.
Expand Down Expand Up @@ -844,6 +919,24 @@ class chunked_parquet_writer_options {
*/
auto get_row_group_size_rows() const { return _row_group_size_rows; }

/**
* @brief Returns maximum uncompressed page size, in bytes. If set larger than the row group size,
* then this will return the row group size.
*/
auto get_max_page_size_bytes() const
{
return std::min(_max_page_size_bytes, get_row_group_size_bytes());
}

/**
* @brief Returns maximum page size, in rows. If set larger than the row group size, then this
* will return the row group size.
*/
auto get_max_page_size_rows() const
{
return std::min(_max_page_size_rows, get_row_group_size_rows());
}

/**
* @brief Sets metadata.
*
Expand Down Expand Up @@ -891,8 +984,8 @@ class chunked_parquet_writer_options {
void set_row_group_size_bytes(size_t size_bytes)
{
CUDF_EXPECTS(
size_bytes >= 512 * 1024,
"The maximum row group size cannot be smaller than the page size, which is 512KB.");
size_bytes >= 4 * 1024,
"The maximum row group size cannot be smaller than the minimum page size, which is 4KB.");
_row_group_size_bytes = size_bytes;
}

Expand All @@ -903,10 +996,30 @@ class chunked_parquet_writer_options {
{
CUDF_EXPECTS(
size_rows >= 5000,
"The maximum row group size cannot be smaller than the page size, which is 5000 rows.");
"The maximum row group size cannot be smaller than the fragment size, which is 5000 rows.");
_row_group_size_rows = size_rows;
}

/**
* @brief Sets the maximum uncompressed page size, in bytes.
*/
void set_max_page_size_bytes(size_t size_bytes)
{
CUDF_EXPECTS(size_bytes >= 4 * 1024, "The maximum page size cannot be smaller than 4KB.");
_max_page_size_bytes = size_bytes;
}

/**
* @brief Sets the maximum page size, in rows.
*/
void set_max_page_size_rows(size_type size_rows)
{
CUDF_EXPECTS(
size_rows >= 5000,
"The maximum page size cannot be smaller than the fragment size, which is 5000 rows.");
_max_page_size_rows = size_rows;
}

/**
* @brief creates builder to build chunked_parquet_writer_options.
*
Expand Down Expand Up @@ -1016,7 +1129,7 @@ class chunked_parquet_writer_options_builder {
/**
* @brief Sets the maximum number of rows in output row groups.
*
* @param val maximum number or rows
* @param val maximum number of rows
* @return this for chaining.
*/
chunked_parquet_writer_options_builder& row_group_size_rows(size_type val)
Expand All @@ -1025,6 +1138,33 @@ class chunked_parquet_writer_options_builder {
return *this;
}

/**
* @brief Sets the maximum uncompressed page size, in bytes. Serves as a hint to the writer,
* and can be exceeded under certain circumstances. Cannot be larger than the row group size in
* bytes, and will be adjusted to match if it is.
*
* @param val maximum page size
* @return this for chaining.
*/
chunked_parquet_writer_options_builder& max_page_size_bytes(size_t val)
{
options.set_max_page_size_bytes(val);
return *this;
}

/**
* @brief Sets the maximum page size, in rows. Counts only top-level rows, ignoring any nesting.
* Cannot be larger than the row group size in rows, and will be adjusted to match if it is.
*
* @param val maximum rows per page
* @return this for chaining.
*/
chunked_parquet_writer_options_builder& max_page_size_rows(size_type val)
{
options.set_max_page_size_rows(val);
return *this;
}

/**
* @brief move chunked_parquet_writer_options member once it's built.
*/
Expand Down
30 changes: 23 additions & 7 deletions cpp/src/io/parquet/page_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,9 @@ __global__ void __launch_bounds__(128)
statistics_merge_group* page_grstats,
statistics_merge_group* chunk_grstats,
size_t max_page_comp_data_size,
int32_t num_columns)
int32_t num_columns,
size_t max_page_size_bytes,
size_type max_page_size_rows)
{
// TODO: All writing seems to be done by thread 0. Could be replaced by thrust foreach
__shared__ __align__(8) parquet_column_device_view col_g;
Expand Down Expand Up @@ -334,11 +336,16 @@ __global__ void __launch_bounds__(128)
? frag_g.num_leaf_values * 2 // Assume worst-case of 2-bytes per dictionary index
: frag_g.fragment_data_size;
// TODO (dm): this convoluted logic to limit page size needs refactoring
uint32_t max_page_size = (values_in_page * 2 >= ck_g.num_values) ? 256 * 1024
: (values_in_page * 3 >= ck_g.num_values) ? 384 * 1024
: 512 * 1024;
size_t this_max_page_size = (values_in_page * 2 >= ck_g.num_values) ? 256 * 1024
: (values_in_page * 3 >= ck_g.num_values) ? 384 * 1024
: 512 * 1024;

// override this_max_page_size if the requested size is smaller
this_max_page_size = min(this_max_page_size, max_page_size_bytes);

if (num_rows >= ck_g.num_rows ||
(values_in_page > 0 && (page_size + fragment_data_size > max_page_size))) {
(values_in_page > 0 && (page_size + fragment_data_size > this_max_page_size)) ||
rows_in_page > max_page_size_rows) {
if (ck_g.use_dictionary) {
page_size =
1 + 5 + ((values_in_page * ck_g.dict_rle_bits + 7) >> 3) + (values_in_page >> 8);
Expand Down Expand Up @@ -1927,15 +1934,24 @@ void InitEncoderPages(device_2dspan<EncColumnChunk> chunks,
device_span<gpu::EncPage> pages,
device_span<parquet_column_device_view const> col_desc,
int32_t num_columns,
size_t max_page_size_bytes,
size_type max_page_size_rows,
statistics_merge_group* page_grstats,
statistics_merge_group* chunk_grstats,
size_t max_page_comp_data_size,
rmm::cuda_stream_view stream)
{
auto num_rowgroups = chunks.size().first;
dim3 dim_grid(num_columns, num_rowgroups); // 1 threadblock per rowgroup
gpuInitPages<<<dim_grid, 128, 0, stream.value()>>>(
chunks, pages, col_desc, page_grstats, chunk_grstats, max_page_comp_data_size, num_columns);
gpuInitPages<<<dim_grid, 128, 0, stream.value()>>>(chunks,
pages,
col_desc,
page_grstats,
chunk_grstats,
max_page_comp_data_size,
num_columns,
max_page_size_bytes,
max_page_size_rows);
}

void EncodePages(device_span<gpu::EncPage> pages,
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,8 @@ void InitEncoderPages(cudf::detail::device_2dspan<EncColumnChunk> chunks,
device_span<gpu::EncPage> pages,
device_span<parquet_column_device_view const> col_desc,
int32_t num_columns,
size_t max_page_size_bytes,
size_type max_page_size_rows,
statistics_merge_group* page_grstats,
statistics_merge_group* chunk_grstats,
size_t max_page_comp_data_size,
Expand Down
17 changes: 16 additions & 1 deletion cpp/src/io/parquet/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -859,7 +859,16 @@ void writer::impl::init_page_sizes(hostdevice_2dvector<gpu::EncColumnChunk>& chu
uint32_t num_columns)
{
chunks.host_to_device(stream);
gpu::InitEncoderPages(chunks, {}, col_desc, num_columns, nullptr, nullptr, 0, stream);
gpu::InitEncoderPages(chunks,
{},
col_desc,
num_columns,
max_page_size_bytes,
max_page_size_rows,
nullptr,
nullptr,
0,
stream);
chunks.device_to_host(stream, true);
}

Expand Down Expand Up @@ -965,6 +974,8 @@ void writer::impl::init_encoder_pages(hostdevice_2dvector<gpu::EncColumnChunk>&
pages,
col_desc,
num_columns,
max_page_size_bytes,
max_page_size_rows,
(num_stats_bfr) ? page_stats_mrg.data() : nullptr,
(num_stats_bfr > num_pages) ? page_stats_mrg.data() + num_pages : nullptr,
max_page_comp_data_size,
Expand Down Expand Up @@ -1122,6 +1133,8 @@ writer::impl::impl(std::vector<std::unique_ptr<data_sink>> sinks,
stream(stream),
max_row_group_size{options.get_row_group_size_bytes()},
max_row_group_rows{options.get_row_group_size_rows()},
max_page_size_bytes(options.get_max_page_size_bytes()),
max_page_size_rows(options.get_max_page_size_rows()),
compression_(to_parquet_compression(options.get_compression())),
stats_granularity_(options.get_stats_level()),
int96_timestamps(options.is_enabled_int96_timestamps()),
Expand All @@ -1144,6 +1157,8 @@ writer::impl::impl(std::vector<std::unique_ptr<data_sink>> sinks,
stream(stream),
max_row_group_size{options.get_row_group_size_bytes()},
max_row_group_rows{options.get_row_group_size_rows()},
max_page_size_bytes(options.get_max_page_size_bytes()),
max_page_size_rows(options.get_max_page_size_rows()),
compression_(to_parquet_compression(options.get_compression())),
stats_granularity_(options.get_stats_level()),
int96_timestamps(options.is_enabled_int96_timestamps()),
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/io/parquet/writer_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ class writer::impl {

size_t max_row_group_size = default_row_group_size_bytes;
size_type max_row_group_rows = default_row_group_size_rows;
size_t max_page_size_bytes = default_max_page_size_bytes;
size_type max_page_size_rows = default_max_page_size_rows;
Compression compression_ = Compression::UNCOMPRESSED;
statistics_freq stats_granularity_ = statistics_freq::STATISTICS_NONE;
bool int96_timestamps = false;
Expand Down
Loading

0 comments on commit c75847a

Please sign in to comment.