Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[gpuCI] Forward-merge branch-22.06 to branch-22.08 [skip gpuci] #10953

Merged
merged 1 commit into from
May 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 148 additions & 8 deletions cpp/include/cudf/io/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ namespace io {

constexpr size_t default_row_group_size_bytes = 128 * 1024 * 1024; // 128MB
constexpr size_type default_row_group_size_rows = 1000000;
constexpr size_t default_max_page_size_bytes = 512 * 1024;
constexpr size_type default_max_page_size_rows = 20000;

/**
* @brief Builds parquet_reader_options to use for `read_parquet()`.
Expand Down Expand Up @@ -382,6 +384,10 @@ class parquet_writer_options {
size_t _row_group_size_bytes = default_row_group_size_bytes;
// Maximum number of rows in row group (unless smaller than a single page)
size_type _row_group_size_rows = default_row_group_size_rows;
// Maximum size of each page (uncompressed)
size_t _max_page_size_bytes = default_max_page_size_bytes;
// Maximum number of rows in a page
size_type _max_page_size_rows = default_max_page_size_rows;

/**
* @brief Constructor from sink and table.
Expand Down Expand Up @@ -482,6 +488,24 @@ class parquet_writer_options {
*/
auto get_row_group_size_rows() const { return _row_group_size_rows; }

/**
* @brief Returns the maximum uncompressed page size, in bytes. If set larger than the row group
* size, then this will return the row group size.
*/
auto get_max_page_size_bytes() const
{
return std::min(_max_page_size_bytes, get_row_group_size_bytes());
}

/**
* @brief Returns maximum page size, in rows. If set larger than the row group size, then this
* will return the row group size.
*/
auto get_max_page_size_rows() const
{
return std::min(_max_page_size_rows, get_row_group_size_rows());
}

/**
* @brief Sets partitions.
*
Expand Down Expand Up @@ -555,8 +579,8 @@ class parquet_writer_options {
void set_row_group_size_bytes(size_t size_bytes)
{
CUDF_EXPECTS(
size_bytes >= 512 * 1024,
"The maximum row group size cannot be smaller than the page size, which is 512KB.");
size_bytes >= 4 * 1024,
"The maximum row group size cannot be smaller than the minimum page size, which is 4KB.");
_row_group_size_bytes = size_bytes;
}

Expand All @@ -567,9 +591,29 @@ class parquet_writer_options {
{
CUDF_EXPECTS(
size_rows >= 5000,
"The maximum row group size cannot be smaller than the page size, which is 5000 rows.");
"The maximum row group size cannot be smaller than the fragment size, which is 5000 rows.");
_row_group_size_rows = size_rows;
}

/**
* @brief Sets the maximum uncompressed page size, in bytes.
*/
void set_max_page_size_bytes(size_t size_bytes)
{
CUDF_EXPECTS(size_bytes >= 4 * 1024, "The maximum page size cannot be smaller than 4KB.");
_max_page_size_bytes = size_bytes;
}

/**
* @brief Sets the maximum page size, in rows.
*/
void set_max_page_size_rows(size_type size_rows)
{
CUDF_EXPECTS(
size_rows >= 5000,
"The maximum page size cannot be smaller than the fragment size, which is 5000 rows.");
_max_page_size_rows = size_rows;
}
};

class parquet_writer_options_builder {
Expand Down Expand Up @@ -690,7 +734,7 @@ class parquet_writer_options_builder {
/**
* @brief Sets the maximum number of rows in output row groups.
*
* @param val maximum number or rows
* @param val maximum number of rows
* @return this for chaining.
*/
parquet_writer_options_builder& row_group_size_rows(size_type val)
Expand All @@ -699,6 +743,33 @@ class parquet_writer_options_builder {
return *this;
}

/**
* @brief Sets the maximum uncompressed page size, in bytes. Serves as a hint to the writer,
* and can be exceeded under certain circumstances. Cannot be larger than the row group size in
* bytes, and will be adjusted to match if it is.
*
* @param val maximum page size
* @return this for chaining.
*/
parquet_writer_options_builder& max_page_size_bytes(size_t val)
{
options.set_max_page_size_bytes(val);
return *this;
}

/**
* @brief Sets the maximum page size, in rows. Counts only top-level rows, ignoring any nesting.
* Cannot be larger than the row group size in rows, and will be adjusted to match if it is.
*
* @param val maximum rows per page
* @return this for chaining.
*/
parquet_writer_options_builder& max_page_size_rows(size_type val)
{
options.set_max_page_size_rows(val);
return *this;
}

/**
* @brief Sets whether int96 timestamps are written or not in parquet_writer_options.
*
Expand Down Expand Up @@ -783,6 +854,10 @@ class chunked_parquet_writer_options {
size_t _row_group_size_bytes = default_row_group_size_bytes;
// Maximum number of rows in row group (unless smaller than a single page)
size_type _row_group_size_rows = default_row_group_size_rows;
// Maximum size of each page (uncompressed)
size_t _max_page_size_bytes = default_max_page_size_bytes;
// Maximum number of rows in a page
size_type _max_page_size_rows = default_max_page_size_rows;

/**
* @brief Constructor from sink.
Expand Down Expand Up @@ -844,6 +919,24 @@ class chunked_parquet_writer_options {
*/
auto get_row_group_size_rows() const { return _row_group_size_rows; }

/**
* @brief Returns maximum uncompressed page size, in bytes. If set larger than the row group size,
* then this will return the row group size.
*/
auto get_max_page_size_bytes() const
{
return std::min(_max_page_size_bytes, get_row_group_size_bytes());
}

/**
* @brief Returns maximum page size, in rows. If set larger than the row group size, then this
* will return the row group size.
*/
auto get_max_page_size_rows() const
{
return std::min(_max_page_size_rows, get_row_group_size_rows());
}

/**
* @brief Sets metadata.
*
Expand Down Expand Up @@ -891,8 +984,8 @@ class chunked_parquet_writer_options {
void set_row_group_size_bytes(size_t size_bytes)
{
CUDF_EXPECTS(
size_bytes >= 512 * 1024,
"The maximum row group size cannot be smaller than the page size, which is 512KB.");
size_bytes >= 4 * 1024,
"The maximum row group size cannot be smaller than the minimum page size, which is 4KB.");
_row_group_size_bytes = size_bytes;
}

Expand All @@ -903,10 +996,30 @@ class chunked_parquet_writer_options {
{
CUDF_EXPECTS(
size_rows >= 5000,
"The maximum row group size cannot be smaller than the page size, which is 5000 rows.");
"The maximum row group size cannot be smaller than the fragment size, which is 5000 rows.");
_row_group_size_rows = size_rows;
}

/**
* @brief Sets the maximum uncompressed page size, in bytes.
*/
void set_max_page_size_bytes(size_t size_bytes)
{
CUDF_EXPECTS(size_bytes >= 4 * 1024, "The maximum page size cannot be smaller than 4KB.");
_max_page_size_bytes = size_bytes;
}

/**
* @brief Sets the maximum page size, in rows.
*/
void set_max_page_size_rows(size_type size_rows)
{
CUDF_EXPECTS(
size_rows >= 5000,
"The maximum page size cannot be smaller than the fragment size, which is 5000 rows.");
_max_page_size_rows = size_rows;
}

/**
* @brief creates builder to build chunked_parquet_writer_options.
*
Expand Down Expand Up @@ -1016,7 +1129,7 @@ class chunked_parquet_writer_options_builder {
/**
* @brief Sets the maximum number of rows in output row groups.
*
* @param val maximum number or rows
* @param val maximum number of rows
* @return this for chaining.
*/
chunked_parquet_writer_options_builder& row_group_size_rows(size_type val)
Expand All @@ -1025,6 +1138,33 @@ class chunked_parquet_writer_options_builder {
return *this;
}

/**
* @brief Sets the maximum uncompressed page size, in bytes. Serves as a hint to the writer,
* and can be exceeded under certain circumstances. Cannot be larger than the row group size in
* bytes, and will be adjusted to match if it is.
*
* @param val maximum page size
* @return this for chaining.
*/
chunked_parquet_writer_options_builder& max_page_size_bytes(size_t val)
{
options.set_max_page_size_bytes(val);
return *this;
}

/**
* @brief Sets the maximum page size, in rows. Counts only top-level rows, ignoring any nesting.
* Cannot be larger than the row group size in rows, and will be adjusted to match if it is.
*
* @param val maximum rows per page
* @return this for chaining.
*/
chunked_parquet_writer_options_builder& max_page_size_rows(size_type val)
{
options.set_max_page_size_rows(val);
return *this;
}

/**
* @brief move chunked_parquet_writer_options member once it's built.
*/
Expand Down
30 changes: 23 additions & 7 deletions cpp/src/io/parquet/page_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,9 @@ __global__ void __launch_bounds__(128)
statistics_merge_group* page_grstats,
statistics_merge_group* chunk_grstats,
size_t max_page_comp_data_size,
int32_t num_columns)
int32_t num_columns,
size_t max_page_size_bytes,
size_type max_page_size_rows)
{
// TODO: All writing seems to be done by thread 0. Could be replaced by thrust foreach
__shared__ __align__(8) parquet_column_device_view col_g;
Expand Down Expand Up @@ -334,11 +336,16 @@ __global__ void __launch_bounds__(128)
? frag_g.num_leaf_values * 2 // Assume worst-case of 2-bytes per dictionary index
: frag_g.fragment_data_size;
// TODO (dm): this convoluted logic to limit page size needs refactoring
uint32_t max_page_size = (values_in_page * 2 >= ck_g.num_values) ? 256 * 1024
: (values_in_page * 3 >= ck_g.num_values) ? 384 * 1024
: 512 * 1024;
size_t this_max_page_size = (values_in_page * 2 >= ck_g.num_values) ? 256 * 1024
: (values_in_page * 3 >= ck_g.num_values) ? 384 * 1024
: 512 * 1024;

// override this_max_page_size if the requested size is smaller
this_max_page_size = min(this_max_page_size, max_page_size_bytes);

if (num_rows >= ck_g.num_rows ||
(values_in_page > 0 && (page_size + fragment_data_size > max_page_size))) {
(values_in_page > 0 && (page_size + fragment_data_size > this_max_page_size)) ||
rows_in_page > max_page_size_rows) {
if (ck_g.use_dictionary) {
page_size =
1 + 5 + ((values_in_page * ck_g.dict_rle_bits + 7) >> 3) + (values_in_page >> 8);
Expand Down Expand Up @@ -1927,15 +1934,24 @@ void InitEncoderPages(device_2dspan<EncColumnChunk> chunks,
device_span<gpu::EncPage> pages,
device_span<parquet_column_device_view const> col_desc,
int32_t num_columns,
size_t max_page_size_bytes,
size_type max_page_size_rows,
statistics_merge_group* page_grstats,
statistics_merge_group* chunk_grstats,
size_t max_page_comp_data_size,
rmm::cuda_stream_view stream)
{
auto num_rowgroups = chunks.size().first;
dim3 dim_grid(num_columns, num_rowgroups); // 1 threadblock per rowgroup
gpuInitPages<<<dim_grid, 128, 0, stream.value()>>>(
chunks, pages, col_desc, page_grstats, chunk_grstats, max_page_comp_data_size, num_columns);
gpuInitPages<<<dim_grid, 128, 0, stream.value()>>>(chunks,
pages,
col_desc,
page_grstats,
chunk_grstats,
max_page_comp_data_size,
num_columns,
max_page_size_bytes,
max_page_size_rows);
}

void EncodePages(device_span<gpu::EncPage> pages,
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,8 @@ void InitEncoderPages(cudf::detail::device_2dspan<EncColumnChunk> chunks,
device_span<gpu::EncPage> pages,
device_span<parquet_column_device_view const> col_desc,
int32_t num_columns,
size_t max_page_size_bytes,
size_type max_page_size_rows,
statistics_merge_group* page_grstats,
statistics_merge_group* chunk_grstats,
size_t max_page_comp_data_size,
Expand Down
17 changes: 16 additions & 1 deletion cpp/src/io/parquet/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -859,7 +859,16 @@ void writer::impl::init_page_sizes(hostdevice_2dvector<gpu::EncColumnChunk>& chu
uint32_t num_columns)
{
chunks.host_to_device(stream);
gpu::InitEncoderPages(chunks, {}, col_desc, num_columns, nullptr, nullptr, 0, stream);
gpu::InitEncoderPages(chunks,
{},
col_desc,
num_columns,
max_page_size_bytes,
max_page_size_rows,
nullptr,
nullptr,
0,
stream);
chunks.device_to_host(stream, true);
}

Expand Down Expand Up @@ -965,6 +974,8 @@ void writer::impl::init_encoder_pages(hostdevice_2dvector<gpu::EncColumnChunk>&
pages,
col_desc,
num_columns,
max_page_size_bytes,
max_page_size_rows,
(num_stats_bfr) ? page_stats_mrg.data() : nullptr,
(num_stats_bfr > num_pages) ? page_stats_mrg.data() + num_pages : nullptr,
max_page_comp_data_size,
Expand Down Expand Up @@ -1122,6 +1133,8 @@ writer::impl::impl(std::vector<std::unique_ptr<data_sink>> sinks,
stream(stream),
max_row_group_size{options.get_row_group_size_bytes()},
max_row_group_rows{options.get_row_group_size_rows()},
max_page_size_bytes(options.get_max_page_size_bytes()),
max_page_size_rows(options.get_max_page_size_rows()),
compression_(to_parquet_compression(options.get_compression())),
stats_granularity_(options.get_stats_level()),
int96_timestamps(options.is_enabled_int96_timestamps()),
Expand All @@ -1144,6 +1157,8 @@ writer::impl::impl(std::vector<std::unique_ptr<data_sink>> sinks,
stream(stream),
max_row_group_size{options.get_row_group_size_bytes()},
max_row_group_rows{options.get_row_group_size_rows()},
max_page_size_bytes(options.get_max_page_size_bytes()),
max_page_size_rows(options.get_max_page_size_rows()),
compression_(to_parquet_compression(options.get_compression())),
stats_granularity_(options.get_stats_level()),
int96_timestamps(options.is_enabled_int96_timestamps()),
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/io/parquet/writer_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ class writer::impl {

size_t max_row_group_size = default_row_group_size_bytes;
size_type max_row_group_rows = default_row_group_size_rows;
size_t max_page_size_bytes = default_max_page_size_bytes;
size_type max_page_size_rows = default_max_page_size_rows;
Compression compression_ = Compression::UNCOMPRESSED;
statistics_freq stats_granularity_ = statistics_freq::STATISTICS_NONE;
bool int96_timestamps = false;
Expand Down
Loading