Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use cuco::static_map to build string dictionaries in ORC writer #13580

Merged
merged 59 commits into from
Jul 14, 2023
Merged
Show file tree
Hide file tree
Changes from 52 commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
06130bd
remove unused members
vuule Jun 4, 2023
8e75032
simplify char count
vuule Jun 4, 2023
9db5f13
replace chunk dict char count
vuule Jun 6, 2023
f04871c
remove string_char_count from DictionaryChunk
vuule Jun 6, 2023
23b3a16
allocate dict storage
vuule Jun 6, 2023
3f59f73
add char_count
vuule Jun 6, 2023
25342c4
Merge branch 'branch-23.08' of https://github.com/rapidsai/cudf into …
vuule Jun 7, 2023
29f25a2
init hash maps
vuule Jun 8, 2023
1191626
build dictionaries; decide encode
vuule Jun 8, 2023
1b18ac7
remove debug prints
vuule Jun 8, 2023
ff621f3
fix indexing
vuule Jun 9, 2023
3e564d2
take dict index entry size into account
vuule Jun 9, 2023
1916898
Merge branch 'branch-23.08' of https://github.com/rapidsai/cudf into …
vuule Jun 9, 2023
e36308a
dict_data alloc
vuule Jun 13, 2023
3b00770
collect_map_entries
vuule Jun 13, 2023
1562e4b
get_dictionary_indices
vuule Jun 13, 2023
5a152cc
replace stripe dict counters
vuule Jun 13, 2023
8f7ef1d
fix lifetime
vuule Jun 13, 2023
3a6d95b
full replacement (?)
vuule Jun 14, 2023
19650e6
Merge branch 'branch-23.08' of https://github.com/rapidsai/cudf into …
vuule Jun 14, 2023
b793c02
Merge branch 'branch-23.08' into fea-write-orc-dict
vuule Jun 14, 2023
7bedaa3
Merge branch 'branch-23.08' of https://github.com/rapidsai/cudf into …
vuule Jun 22, 2023
2be21b2
Merge branch 'branch-23.08' of https://github.com/rapidsai/cudf into …
vuule Jun 23, 2023
4935fb5
Merge branch 'branch-23.08' of https://github.com/rapidsai/cudf into …
vuule Jun 26, 2023
0bd10d7
Merge branch 'branch-23.08' of https://github.com/rapidsai/cudf into …
vuule Jun 26, 2023
217b1c9
clean up mem after use
vuule Jun 26, 2023
84b45c2
separate dict build
vuule Jun 26, 2023
8bd1bc0
proper is_enabled
vuule Jun 26, 2023
08d2ab7
Merge branch 'fea-write-orc-dict' of https://github.com/vuule/cudf in…
vuule Jun 26, 2023
6f96e22
remove dict size limit; docs
vuule Jun 27, 2023
bed6883
docs
vuule Jun 27, 2023
22843f5
impl docs
vuule Jun 27, 2023
9500780
comments
vuule Jun 27, 2023
e1b8e94
constttttttttttttt
vuule Jun 28, 2023
588f0f1
namespace
vuule Jun 28, 2023
b978776
header clean up
vuule Jun 28, 2023
5c51670
Merge branch 'branch-23.08' of https://github.com/rapidsai/cudf into …
vuule Jun 28, 2023
54abee1
Merge branch 'branch-23.08' into fea-write-orc-dict
vuule Jun 30, 2023
99cf647
Merge branch 'branch-23.08' into fea-write-orc-dict
vuule Jul 6, 2023
611ae72
Merge branch 'branch-23.08' of https://github.com/rapidsai/cudf into …
vuule Jul 7, 2023
c8c6aab
Apply suggestions from code review
vuule Jul 7, 2023
75203d1
simplify populate_dictionary_hash_maps_kernel
vuule Jul 7, 2023
6ce882e
Merge branch 'fea-write-orc-dict' of https://github.com/vuule/cudf in…
vuule Jul 7, 2023
56bee95
Update cpp/src/io/orc/dict_enc.cu
vuule Jul 7, 2023
80535f6
Merge branch 'fea-write-orc-dict' of https://github.com/vuule/cudf in…
vuule Jul 7, 2023
4b00841
Merge branch 'branch-23.08' of https://github.com/rapidsai/cudf into …
vuule Jul 10, 2023
908186c
populate_dictionary_hash_maps_kernel optimization/clean up
vuule Jul 10, 2023
2ba5c56
Merge branch 'branch-23.08' into fea-write-orc-dict
vuule Jul 11, 2023
6e37d08
group headers
vuule Jul 11, 2023
d9732c9
Merge branch 'branch-23.08' of https://github.com/rapidsai/cudf into …
vuule Jul 11, 2023
5c8112f
Merge branch 'fea-write-orc-dict' of https://github.com/vuule/cudf in…
vuule Jul 11, 2023
a7081f2
Merge branch 'branch-23.08' into fea-write-orc-dict
vuule Jul 12, 2023
b5ef2ff
Merge branch 'branch-23.08' into fea-write-orc-dict
vuule Jul 13, 2023
43145a9
Merge branch 'branch-23.08' of https://github.com/rapidsai/cudf into …
vuule Jul 13, 2023
7d0cd4d
fix sync issue
vuule Jul 13, 2023
2f2c1ce
col ref
vuule Jul 13, 2023
f8b8f6d
Merge branch 'fea-write-orc-dict' of https://github.com/vuule/cudf in…
vuule Jul 13, 2023
d3ac7f9
code review missed
vuule Jul 13, 2023
2704328
bail if no string cols
vuule Jul 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions cpp/src/io/json/json_tree.cu
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@
#include <cudf/utilities/error.hpp>
#include <cudf/utilities/span.hpp>

#include <cuco/static_map.cuh>

#include <cub/device/device_radix_sort.cuh>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_uvector.hpp>
#include <rmm/exec_policy.hpp>
#include <rmm/mr/device/polymorphic_allocator.hpp>

#include <cub/device/device_radix_sort.cuh>

#include <cuco/static_map.cuh>

#include <thrust/binary_search.h>
#include <thrust/copy.h>
#include <thrust/count.h>
Expand Down
630 changes: 210 additions & 420 deletions cpp/src/io/orc/dict_enc.cu

Large diffs are not rendered by default.

20 changes: 14 additions & 6 deletions cpp/src/io/orc/orc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,19 @@

#include <string>

namespace cudf {
namespace io {
namespace orc {
namespace cudf::io::orc {

namespace {
[[nodiscard]] constexpr uint32_t varint_size(uint64_t val)
{
auto len = 1u;
while (val > 0x7f) {
val >>= 7;
++len;
}
return len;
}
} // namespace

uint32_t ProtobufReader::read_field_size(uint8_t const* end)
{
Expand Down Expand Up @@ -515,6 +525,4 @@ void metadata::init_parent_descriptors()
}
}

} // namespace orc
} // namespace io
} // namespace cudf
} // namespace cudf::io::orc
10 changes: 0 additions & 10 deletions cpp/src/io/orc/orc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -465,16 +465,6 @@ class ProtobufWriter {
return l;
}

uint32_t varint_size(uint64_t val)
{
auto len = 1u;
while (val > 0x7f) {
val >>= 7;
++len;
}
return len;
}

uint32_t put_int(int64_t v)
{
int64_t s = (v < 0);
Expand Down
139 changes: 82 additions & 57 deletions cpp/src/io/orc/orc_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@

#include <rmm/cuda_stream_view.hpp>

#include <cuco/static_map.cuh>

namespace cudf {
namespace io {
namespace orc {
Expand All @@ -39,6 +41,19 @@ namespace gpu {
using cudf::detail::device_2dspan;
using cudf::detail::host_2dspan;

auto constexpr KEY_SENTINEL = size_type{-1};
auto constexpr VALUE_SENTINEL = size_type{-1};

using map_type = cuco::static_map<size_type, size_type>;

/**
* @brief The alias of `map_type::pair_atomic_type` class.
*
* Declare this struct by trivial subclassing instead of type aliasing so we can have forward
* declaration of this struct somewhere else.
*/
struct slot_type : public map_type::slot_type {};

struct CompressedStreamInfo {
CompressedStreamInfo() = default;
explicit constexpr CompressedStreamInfo(uint8_t const* compressed_data_, size_t compressed_size_)
Expand Down Expand Up @@ -172,36 +187,63 @@ struct StripeStream {
};

/**
* @brief Struct to describe a dictionary chunk
* @brief Struct to describe a stripe dictionary
*/
struct DictionaryChunk {
uint32_t* dict_data; // dictionary data (index of non-null rows)
uint32_t* dict_index; // row indices of corresponding string (row from dictionary index)
uint32_t start_row; // start row of this chunk
uint32_t num_rows; // num rows in this chunk
uint32_t num_strings; // number of strings in this chunk
uint32_t
string_char_count; // total size of string data (NOTE: assumes less than 4G bytes per chunk)
uint32_t num_dict_strings; // number of strings in dictionary
uint32_t dict_char_count; // size of dictionary string data for this chunk

orc_column_device_view const* leaf_column; //!< Pointer to string column
struct stripe_dictionary {
// input
device_span<slot_type> map_slots; // hash map storage
uint32_t column_idx = 0; // column index
size_type start_row = 0; // first row in the stripe
size_type start_rowgroup = 0; // first rowgroup in the stripe
size_type num_rows = 0; // number of rows in the stripe

// output
device_span<uint32_t> data; // index of elements in the column to include in the dictionary
device_span<uint32_t> index; // index into the dictionary for each row in the column
size_type entry_count = 0; // number of entries in the dictionary
size_type char_count = 0; // number of characters in the dictionary
bool is_enabled = false; // true if dictionary encoding is enabled for this stripe
};

/**
* @brief Struct to describe a dictionary
* @brief Initializes the hash maps storage for dictionary encoding to sentinel values.
*
* @param dictionaries Dictionary descriptors
* @param stream CUDA stream used for device memory operations and kernel launches
*/
void initialize_dictionary_hash_maps(device_2dspan<stripe_dictionary> dictionaries,
rmm::cuda_stream_view stream);

/**
* @brief Populates the hash maps with unique values from the stripe.
*
* @param dictionaries Dictionary descriptors
* @param columns Pre-order flattened device array of ORC column views
* @param stream CUDA stream used for device memory operations and kernel launches
*/
struct StripeDictionary {
uint32_t* dict_data; // row indices of corresponding string (row from dictionary index)
uint32_t* dict_index; // dictionary index from row index
uint32_t column_id; // real column id
uint32_t start_chunk; // first chunk in stripe
uint32_t num_chunks; // number of chunks in the stripe
uint32_t num_strings; // number of unique strings in the dictionary
uint32_t dict_char_count; // total size of dictionary string data

orc_column_device_view const* leaf_column; //!< Pointer to string column
};
void populate_dictionary_hash_maps(device_2dspan<stripe_dictionary> dictionaries,
device_span<orc_column_device_view const> columns,
rmm::cuda_stream_view stream);

/**
* @brief Stores the indices of the hash map entries in the dictionary data buffer.
*
* @param dictionaries Dictionary descriptors
* @param stream CUDA stream used for device memory operations and kernel launches
*/
void collect_map_entries(device_2dspan<stripe_dictionary> dictionaries,
rmm::cuda_stream_view stream);

/**
* @brief Stores the corresponding dictionary indices for each row in the column.
*
* @param dictionaries Dictionary descriptors
* @param columns Pre-order flattened device array of ORC column views
* @param stream CUDA stream used for device memory operations and kernel launches
*/
void get_dictionary_indices(device_2dspan<stripe_dictionary> dictionaries,
device_span<orc_column_device_view const> columns,
rmm::cuda_stream_view stream);

constexpr uint32_t encode_block_size = 512;

Expand Down Expand Up @@ -317,14 +359,16 @@ void EncodeOrcColumnData(device_2dspan<EncChunk const> chunks,
/**
* @brief Launches kernel for encoding column dictionaries
*
* @param[in] stripes Stripe dictionaries device array [stripe][string_column]
* @param[in] stripes Stripe dictionaries device array
* @param[in] columns Pre-order flattened device array of ORC column views
* @param[in] chunks encoder chunk device array [column][rowgroup]
* @param[in] num_string_columns Number of string columns
* @param[in] num_stripes Number of stripes
* @param[in,out] enc_streams chunk streams device array [column][rowgroup]
* @param[in] stream CUDA stream used for device memory operations and kernel launches
*/
void EncodeStripeDictionaries(StripeDictionary const* stripes,
void EncodeStripeDictionaries(stripe_dictionary const* stripes,
device_span<orc_column_device_view const> columns,
device_2dspan<EncChunk const> chunks,
uint32_t num_string_columns,
uint32_t num_stripes,
Expand Down Expand Up @@ -373,38 +417,19 @@ std::optional<writer_compression_statistics> CompressOrcDataStreams(
rmm::cuda_stream_view stream);

/**
* @brief Launches kernel for initializing dictionary chunks
*
* @param[in] orc_columns Pre-order flattened device array of ORC column views
* @param[in,out] chunks DictionaryChunk device array [rowgroup][column]
* @param[in] dict_data dictionary data (index of non-null rows)
* @param[in] dict_index row indices of corresponding string (row from dictionary index)
* @param[in] tmp_indices Temporary buffer for dictionary indices
* @param[in] rowgroup_bounds Ranges of rows in each rowgroup [rowgroup][column]
* @param[in] str_col_indexes List of columns that are strings type
* @param[in] stream CUDA stream used for device memory operations and kernel launches
*/
void InitDictionaryIndices(device_span<orc_column_device_view const> orc_columns,
device_2dspan<DictionaryChunk> chunks,
device_span<device_span<uint32_t>> dict_data,
device_span<device_span<uint32_t>> dict_index,
device_span<device_span<uint32_t>> tmp_indices,
device_2dspan<rowgroup_rows const> rowgroup_bounds,
device_span<uint32_t const> str_col_indexes,
rmm::cuda_stream_view stream);

/**
* @brief Launches kernel for building stripe dictionaries
* @brief Counts the number of characters in each rowgroup of each string column.
*
* @param[in] d_stripes StripeDictionary device 2D array [stripe][column]
* @param[in] h_stripes StripeDictionary host 2D array [stripe][column]
* @param[in] chunks DictionaryChunk device array [rowgroup][column]
* @param[in] stream CUDA stream used for device memory operations and kernel launches
* @param counts Output array of character counts [column][rowgroup]
* @param orc_columns Pre-order flattened device array of ORC column views
* @param rowgroup_bounds Ranges of rows in each rowgroup [rowgroup][column]
* @param str_col_indexes Indexes of string columns in orc_columns
* @param stream CUDA stream used for device memory operations and kernel launches
*/
void BuildStripeDictionaries(device_2dspan<StripeDictionary> d_stripes,
host_2dspan<StripeDictionary const> h_stripes,
device_2dspan<DictionaryChunk const> chunks,
rmm::cuda_stream_view stream);
void rowgroup_char_counts(device_2dspan<size_type> counts,
device_span<orc_column_device_view const> orc_columns,
device_2dspan<rowgroup_rows const> rowgroup_bounds,
device_span<uint32_t const> str_col_indexes,
rmm::cuda_stream_view stream);

/**
* @brief Launches kernels to initialize statistics collection
Expand Down
29 changes: 16 additions & 13 deletions cpp/src/io/orc/stripe_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ struct orcenc_state_s {
byterle_enc_state_s byterle;
intrle_enc_state_s intrle;
strdata_enc_state_s strenc;
StripeDictionary dict_stripe;
stripe_dictionary const* dict_stripe;
} u;
union {
uint8_t u8[scratch_buffer_size]; // gblock_vminscratch buffer
Expand Down Expand Up @@ -996,14 +996,16 @@ __global__ void __launch_bounds__(block_size)
/**
* @brief Encode column dictionaries
*
* @param[in] stripes Stripe dictionaries device array [stripe][string_column]
* @param[in] stripes Stripe dictionaries device array
* @param[in] columns Pre-order flattened device array of ORC column views
* @param[in] chunks EncChunk device array [rowgroup][column]
* @param[in] num_columns Number of columns
*/
// blockDim {512,1,1}
template <int block_size>
__global__ void __launch_bounds__(block_size)
gpuEncodeStringDictionaries(StripeDictionary const* stripes,
gpuEncodeStringDictionaries(stripe_dictionary const* stripes,
device_span<orc_column_device_view const> columns,
device_2dspan<EncChunk const> chunks,
device_2dspan<encoder_chunk_streams> streams)
{
Expand All @@ -1015,20 +1017,20 @@ __global__ void __launch_bounds__(block_size)
uint32_t cid = (blockIdx.y) ? CI_DICTIONARY : CI_DATA2;
int t = threadIdx.x;

if (t == 0) s->u.dict_stripe = stripes[stripe_id];
if (t == 0) s->u.dict_stripe = &stripes[stripe_id];

__syncthreads();
auto const strm_ptr = &streams[s->u.dict_stripe.column_id][s->u.dict_stripe.start_chunk];
auto const strm_ptr = &streams[s->u.dict_stripe->column_idx][s->u.dict_stripe->start_rowgroup];
if (t == 0) {
s->chunk = chunks[s->u.dict_stripe.column_id][s->u.dict_stripe.start_chunk];
s->chunk = chunks[s->u.dict_stripe->column_idx][s->u.dict_stripe->start_rowgroup];
s->stream = *strm_ptr;
s->strm_pos[cid] = 0;
s->numlengths = 0;
s->nrows = s->u.dict_stripe.num_strings;
s->nrows = s->u.dict_stripe->entry_count;
s->cur_row = 0;
}
auto const string_column = s->u.dict_stripe.leaf_column;
auto const dict_data = s->u.dict_stripe.dict_data;
auto const string_column = columns[s->u.dict_stripe->column_idx];
auto const dict_data = s->u.dict_stripe->data;
__syncthreads();
if (s->chunk.encoding_kind != DICTIONARY_V2) {
return; // This column isn't using dictionary encoding -> bail out
Expand All @@ -1042,7 +1044,7 @@ __global__ void __launch_bounds__(block_size)
char const* ptr = nullptr;
uint32_t count = 0;
if (t < numvals) {
auto string_val = string_column->element<string_view>(string_idx);
auto string_val = string_column.element<string_view>(string_idx);
ptr = string_val.data();
count = string_val.size_bytes();
}
Expand All @@ -1056,7 +1058,7 @@ __global__ void __launch_bounds__(block_size)
// Encoding string lengths
uint32_t count =
(t < numvals)
? static_cast<uint32_t>(string_column->element<string_view>(string_idx).size_bytes())
? static_cast<uint32_t>(string_column.element<string_view>(string_idx).size_bytes())
: 0;
uint32_t nz_idx = (s->cur_row + t) & 0x3ff;
if (t < numvals) s->lengths.u32[nz_idx] = count;
Expand Down Expand Up @@ -1268,7 +1270,8 @@ void EncodeOrcColumnData(device_2dspan<EncChunk const> chunks,
<<<dim_grid, dim_block, 0, stream.value()>>>(chunks, streams);
}

void EncodeStripeDictionaries(StripeDictionary const* stripes,
void EncodeStripeDictionaries(stripe_dictionary const* stripes,
device_span<orc_column_device_view const> columns,
device_2dspan<EncChunk const> chunks,
uint32_t num_string_columns,
uint32_t num_stripes,
Expand All @@ -1278,7 +1281,7 @@ void EncodeStripeDictionaries(StripeDictionary const* stripes,
dim3 dim_block(512, 1); // 512 threads per dictionary
dim3 dim_grid(num_string_columns * num_stripes, 2);
gpuEncodeStringDictionaries<512>
<<<dim_grid, dim_block, 0, stream.value()>>>(stripes, chunks, enc_streams);
<<<dim_grid, dim_block, 0, stream.value()>>>(stripes, columns, chunks, enc_streams);
}

void CompactOrcDataStreams(device_2dspan<StripeStream> strm_desc,
Expand Down
Loading