Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JSON tree algorithms refactor I: CSR data structure for column tree #15979 #13

Open
wants to merge 58 commits into
base: branch-24.10
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
1ec9617
added csr data struct
shrshi Jun 11, 2024
022d7ce
formatting
shrshi Jun 11, 2024
382633f
added test
shrshi Jun 25, 2024
1823854
formatting
shrshi Jun 25, 2024
4a7e2a5
Merge branch 'branch-24.08' into json-tree-refactor-ii
shrshi Jun 25, 2024
8d5ddfb
Merge branch 'branch-24.08' into json-tree-refactor
shrshi Jun 26, 2024
84a7749
fixing csr construction
shrshi Jun 28, 2024
810c389
moving the csr algorithms
shrshi Jun 28, 2024
6a1a415
formatting
shrshi Jun 28, 2024
85c197d
Merge branch 'branch-24.08' into json-tree-refactor
shrshi Jun 28, 2024
996c6dd
Merge branch 'json-tree-refactor' of github.com:shrshi/cudf into json…
shrshi Jun 28, 2024
4bba629
moving to experimental namespace
shrshi Jul 15, 2024
25530f6
Merge branch 'branch-24.08' into json-tree-refactor
shrshi Jul 15, 2024
df9e65b
formatting
shrshi Jul 15, 2024
d1588c8
removed node properties from csr struct - will be introduced in stage…
shrshi Jul 15, 2024
7e1a756
merging branch 24.08 into current branch
shrshi Jul 24, 2024
5541b93
partial commit
shrshi Jul 24, 2024
1490ce9
Merge branch 'branch-24.10' into json-tree-refactor
shrshi Jul 24, 2024
d05e670
better csr construction
shrshi Jul 30, 2024
1ce88be
formatting
shrshi Jul 30, 2024
d6d724c
exec policy is no sync
shrshi Jul 30, 2024
2622d6b
fix copyright year
shrshi Jul 30, 2024
9498372
fixing max row offsets
shrshi Jul 31, 2024
4339b0a
formatting
shrshi Jul 31, 2024
e61288b
Merge branch 'branch-24.10' into json-tree-refactor
shrshi Jul 31, 2024
9b6b7ff
struct docs
shrshi Jul 31, 2024
53db174
Merge branch 'json-tree-refactor' of github.com:shrshi/cudf into json…
shrshi Jul 31, 2024
85608eb
cudf exports!
shrshi Jul 31, 2024
f451c40
Merge branch 'branch-24.10' into json-tree-refactor
shrshi Sep 6, 2024
e29656d
deduplicating code
shrshi Sep 6, 2024
e6eda41
formatting
shrshi Sep 6, 2024
bf4f191
addressing reviews - 1
shrshi Sep 6, 2024
55e943a
addressing reviews - 2
shrshi Sep 6, 2024
4e00526
tsk tsk should have run compute sanitizer sooner
shrshi Sep 6, 2024
ca7a5f3
addressing reviews - 3
shrshi Sep 6, 2024
14664db
addressing reviews - 4
shrshi Sep 6, 2024
63eec8a
Merge branch 'branch-24.10' into json-tree-refactor
shrshi Sep 11, 2024
5f4aca6
adding more tests; debugging on the way
shrshi Sep 13, 2024
e6a9941
formatting
shrshi Sep 13, 2024
82c9ebe
added more tests; fixed bugs
shrshi Sep 17, 2024
8dd6877
formatting
shrshi Sep 17, 2024
0c63f22
finally tests passing
shrshi Sep 18, 2024
2d4861e
fixed all bugs hopefully
shrshi Sep 19, 2024
7759a91
formatting
shrshi Sep 19, 2024
e5d4a35
pr reviews
shrshi Sep 20, 2024
3cdc211
exec policy sync -> nosync
shrshi Sep 20, 2024
9ca7b5e
pr reviews
shrshi Sep 20, 2024
29be430
cleanup
shrshi Sep 20, 2024
023a4a8
moving steps to lambdas to handle intermediate vectors
shrshi Sep 20, 2024
ded2c5e
formatting
shrshi Sep 20, 2024
b2d11dd
more lambdas
shrshi Sep 20, 2024
7260ae6
formatting
shrshi Sep 20, 2024
14ba59e
move back reduce_to_column_tree
karthikeyann Sep 21, 2024
9e582d5
cleanup: remove unused header
karthikeyann Sep 21, 2024
cd69fd1
undo merging of reduce_by_key
karthikeyann Sep 21, 2024
534a1d3
Merge branch 'branch-24.10' of github.com:rapidsai/cudf into json-tre…
karthikeyann Sep 21, 2024
4d4ce13
move debug flags
karthikeyann Sep 21, 2024
7de2ce3
use result from reduce_to_column_tree for csr
karthikeyann Sep 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ add_library(
src/io/functions.cpp
src/io/json/host_tree_algorithms.cu
src/io/json/json_column.cu
src/io/json/column_tree_construction.cu
src/io/json/json_normalization.cu
src/io/json/json_tree.cu
src/io/json/nested_json_gpu.cu
Expand Down
554 changes: 554 additions & 0 deletions cpp/src/io/json/column_tree_construction.cu

Large diffs are not rendered by default.

97 changes: 97 additions & 0 deletions cpp/src/io/json/host_tree_algorithms.cu
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@

#include <algorithm>

#ifndef CSR_DEBUG_EQ
#define CSR_DEBUG_EQ
#endif
namespace cudf::io::json::detail {

/**
Expand Down Expand Up @@ -221,6 +224,89 @@ struct json_column_data {
bitmask_type* validity;
};


struct h_tree_meta_t {
std::vector<NodeT> node_categories;
std::vector<NodeIndexT> parent_node_ids;
std::vector<SymbolOffsetT> node_range_begin;
std::vector<SymbolOffsetT> node_range_end;
};

struct h_column_tree {
// concatenated adjacency list
std::vector<NodeIndexT> rowidx;
std::vector<NodeIndexT> colidx;
// node properties
std::vector<NodeT> categories;
std::vector<NodeIndexT> column_ids;
};

#ifdef CSR_DEBUG_EQ
bool check_equality(tree_meta_t& d_a,
cudf::device_span<cudf::size_type const> d_a_max_row_offsets,
experimental::compressed_sparse_row& d_b_csr,
experimental::column_tree_properties& d_b_ctp,
rmm::cuda_stream_view stream)
{
// convert from tree_meta_t to column_tree_csr
stream.synchronize();

h_tree_meta_t a{cudf::detail::make_std_vector_async(d_a.node_categories, stream),
cudf::detail::make_std_vector_async(d_a.parent_node_ids, stream),
cudf::detail::make_std_vector_async(d_a.node_range_begin, stream),
cudf::detail::make_std_vector_async(d_a.node_range_end, stream)};

h_column_tree b{cudf::detail::make_std_vector_async(d_b_csr.rowidx, stream),
cudf::detail::make_std_vector_async(d_b_csr.colidx, stream),
cudf::detail::make_std_vector_async(d_b_ctp.categories, stream),
cudf::detail::make_std_vector_async(d_b_ctp.mapped_ids, stream)};

auto a_max_row_offsets = cudf::detail::make_std_vector_async(d_a_max_row_offsets, stream);
auto b_max_row_offsets = cudf::detail::make_std_vector_async(d_b_ctp.max_row_offsets, stream);

stream.synchronize();

auto num_nodes = a.parent_node_ids.size();
if (num_nodes > 1) {
if (b.rowidx.size() != num_nodes + 1) { return false; }

for (auto pos = b.rowidx[0]; pos < b.rowidx[1]; pos++) {
auto v = b.colidx[pos];
if (a.parent_node_ids[b.column_ids[v]] != b.column_ids[0]) { return false; }
}
for (size_t u = 1; u < num_nodes; u++) {
auto v = b.colidx[b.rowidx[u]];
if (a.parent_node_ids[b.column_ids[u]] != b.column_ids[v]) { return false; }

for (auto pos = b.rowidx[u] + 1; pos < b.rowidx[u + 1]; pos++) {
v = b.colidx[pos];
if (a.parent_node_ids[b.column_ids[v]] != b.column_ids[u]) { return false; }
}
}
for (size_t u = 0; u < num_nodes; u++) {
if (a.node_categories[b.column_ids[u]] != b.categories[u]) { return false; }
}

for (size_t u = 0; u < num_nodes; u++) {
if (a_max_row_offsets[b.column_ids[u]] != b_max_row_offsets[u]) { return false; }
}
} else if (num_nodes == 1) {
if (b.rowidx.size() != num_nodes + 1) { return false; }

if (b.rowidx[0] != 0 || b.rowidx[1] != 1) return false;
if (!b.colidx.empty()) return false;
for (size_t u = 0; u < num_nodes; u++) {
if (a.node_categories[b.column_ids[u]] != b.categories[u]) { return false; }
}

for (size_t u = 0; u < num_nodes; u++) {
if (a_max_row_offsets[b.column_ids[u]] != b_max_row_offsets[u]) { return false; }
}
}
return true;
}
#endif

std::pair<cudf::detail::host_vector<uint8_t>,
std::unordered_map<NodeIndexT, std::reference_wrapper<device_json_column>>>
build_tree(device_json_column& root,
Expand Down Expand Up @@ -301,6 +387,17 @@ void make_device_json_column(device_span<SymbolT const> input,
is_array_of_arrays,
row_array_parent_col_id,
stream);
#ifdef CSR_DEBUG_EQ
auto [d_column_tree_csr, d_column_tree_properties] =
cudf::io::json::experimental::detail::reduce_to_column_tree(
d_column_tree, d_unique_col_ids, d_max_row_offsets, is_array_of_arrays, row_array_parent_col_id, stream);

auto iseq = check_equality(
d_column_tree, d_max_row_offsets, d_column_tree_csr, d_column_tree_properties, stream);
// assert equality between csr and meta formats
CUDF_EXPECTS(iseq, "OH NO!");
#endif

auto num_columns = d_unique_col_ids.size();
std::vector<std::string> column_names = copy_strings_to_host_sync(
input, d_column_tree.node_range_begin, d_column_tree.node_range_end, stream);
Expand Down
67 changes: 43 additions & 24 deletions cpp/src/io/json/json_column.cu
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
namespace cudf::io::json::detail {

// DEBUG prints

auto to_cat = [](auto v) -> std::string {
switch (v) {
case NC_STRUCT: return " S";
Expand Down Expand Up @@ -106,25 +107,25 @@ void print_tree(host_span<SymbolT const> input,
*/
std::tuple<tree_meta_t, rmm::device_uvector<NodeIndexT>, rmm::device_uvector<size_type>>
reduce_to_column_tree(tree_meta_t& tree,
device_span<NodeIndexT> original_col_ids,
device_span<NodeIndexT> sorted_col_ids,
device_span<NodeIndexT> ordered_node_ids,
device_span<size_type> row_offsets,
device_span<NodeIndexT const> original_col_ids,
device_span<NodeIndexT const> sorted_col_ids,
device_span<NodeIndexT const> ordered_node_ids,
device_span<size_type const> row_offsets,
bool is_array_of_arrays,
NodeIndexT const row_array_parent_col_id,
rmm::cuda_stream_view stream)
{
CUDF_FUNC_RANGE();
// 1. column count for allocation
auto const num_columns =
thrust::unique_count(rmm::exec_policy(stream), sorted_col_ids.begin(), sorted_col_ids.end());
auto const num_columns = thrust::unique_count(
rmm::exec_policy_nosync(stream), sorted_col_ids.begin(), sorted_col_ids.end());

// 2. reduce_by_key {col_id}, {row_offset}, max.
rmm::device_uvector<NodeIndexT> unique_col_ids(num_columns, stream);
rmm::device_uvector<size_type> max_row_offsets(num_columns, stream);
auto ordered_row_offsets =
thrust::make_permutation_iterator(row_offsets.begin(), ordered_node_ids.begin());
thrust::reduce_by_key(rmm::exec_policy(stream),
thrust::reduce_by_key(rmm::exec_policy_nosync(stream),
sorted_col_ids.begin(),
sorted_col_ids.end(),
ordered_row_offsets,
Expand All @@ -136,7 +137,7 @@ reduce_to_column_tree(tree_meta_t& tree,
// 3. reduce_by_key {col_id}, {node_categories} - custom opp (*+v=*, v+v=v, *+#=E)
rmm::device_uvector<NodeT> column_categories(num_columns, stream);
thrust::reduce_by_key(
rmm::exec_policy(stream),
rmm::exec_policy_nosync(stream),
sorted_col_ids.begin(),
sorted_col_ids.end(),
thrust::make_permutation_iterator(tree.node_categories.begin(), ordered_node_ids.begin()),
Expand All @@ -160,32 +161,33 @@ reduce_to_column_tree(tree_meta_t& tree,
// *+#=E
return NC_ERR;
});

// 4. unique_copy parent_node_ids, ranges
rmm::device_uvector<TreeDepthT> column_levels(0, stream); // not required
rmm::device_uvector<TreeDepthT> column_levels(num_columns, stream); // required for CSR
rmm::device_uvector<NodeIndexT> parent_col_ids(num_columns, stream);
rmm::device_uvector<SymbolOffsetT> col_range_begin(num_columns, stream); // Field names
rmm::device_uvector<SymbolOffsetT> col_range_end(num_columns, stream);
rmm::device_uvector<size_type> unique_node_ids(num_columns, stream);
thrust::unique_by_key_copy(rmm::exec_policy(stream),
thrust::unique_by_key_copy(rmm::exec_policy_nosync(stream),
sorted_col_ids.begin(),
sorted_col_ids.end(),
ordered_node_ids.begin(),
thrust::make_discard_iterator(),
unique_node_ids.begin());

thrust::copy_n(
rmm::exec_policy(stream),
rmm::exec_policy_nosync(stream),
thrust::make_zip_iterator(
thrust::make_permutation_iterator(tree.parent_node_ids.begin(), unique_node_ids.begin()),
thrust::make_permutation_iterator(tree.node_range_begin.begin(), unique_node_ids.begin()),
thrust::make_permutation_iterator(tree.node_range_end.begin(), unique_node_ids.begin())),
thrust::make_permutation_iterator(tree.node_range_end.begin(), unique_node_ids.begin()),
thrust::make_permutation_iterator(tree.node_levels.begin(), unique_node_ids.begin())),
unique_node_ids.size(),
thrust::make_zip_iterator(
parent_col_ids.begin(), col_range_begin.begin(), col_range_end.begin()));
parent_col_ids.begin(), col_range_begin.begin(), col_range_end.begin(), column_levels.begin()));

// convert parent_node_ids to parent_col_ids
thrust::transform(
rmm::exec_policy(stream),
rmm::exec_policy_nosync(stream),
parent_col_ids.begin(),
parent_col_ids.end(),
parent_col_ids.begin(),
Expand All @@ -194,6 +196,11 @@ reduce_to_column_tree(tree_meta_t& tree,
: col_ids[parent_node_id];
});

#ifdef CSR_DEBUG_PRINT
print<NodeIndexT>(unique_col_ids, "h_unique_col_ids", stream);
print<NodeIndexT>(parent_col_ids, "h_parent_col_ids", stream);
#endif

// condition is true if parent is not a list, or sentinel/root
// Special case to return true if parent is a list and is_array_of_arrays is true
auto is_non_list_parent = [column_categories = column_categories.begin(),
Expand All @@ -203,18 +210,21 @@ reduce_to_column_tree(tree_meta_t& tree,
column_categories[parent_col_id] == NC_LIST &&
(!is_array_of_arrays || parent_col_id != row_array_parent_col_id));
};

#ifdef CSR_DEBUG_PRINT
print<row_offset_t>(max_row_offsets, "h_max_row_offsets", stream);
#endif

// Mixed types in List children go to different columns,
// so all immediate children of list column should have same max_row_offsets.
// create list's children max_row_offsets array. (initialize to zero)
// atomicMax on children max_row_offsets array.
// gather the max_row_offsets from children row offset array.
{
rmm::device_uvector<NodeIndexT> list_parents_children_max_row_offsets(num_columns, stream);
thrust::fill(rmm::exec_policy(stream),
list_parents_children_max_row_offsets.begin(),
list_parents_children_max_row_offsets.end(),
0);
thrust::for_each(rmm::exec_policy(stream),
auto list_parents_children_max_row_offsets =
cudf::detail::make_zeroed_device_uvector_async<NodeIndexT>(
static_cast<std::size_t>(num_columns), stream, cudf::get_current_device_resource_ref());
thrust::for_each(rmm::exec_policy_nosync(stream),
unique_col_ids.begin(),
unique_col_ids.end(),
[column_categories = column_categories.begin(),
Expand All @@ -230,8 +240,9 @@ reduce_to_column_tree(tree_meta_t& tree,
ref.fetch_max(max_row_offsets[col_id], cuda::std::memory_order_relaxed);
}
});

thrust::gather_if(
rmm::exec_policy(stream),
rmm::exec_policy_nosync(stream),
parent_col_ids.begin(),
parent_col_ids.end(),
parent_col_ids.begin(),
Expand All @@ -243,10 +254,14 @@ reduce_to_column_tree(tree_meta_t& tree,
});
}

#ifdef CSR_DEBUG_PRINT
print<row_offset_t>(max_row_offsets, "h_max_row_offsets", stream);
#endif

// copy lists' max_row_offsets to children.
// all structs should have same size.
thrust::transform_if(
rmm::exec_policy(stream),
rmm::exec_policy_nosync(stream),
unique_col_ids.begin(),
unique_col_ids.end(),
max_row_offsets.begin(),
Expand All @@ -270,9 +285,13 @@ reduce_to_column_tree(tree_meta_t& tree,
return is_non_list_parent(parent_col_id);
});

#ifdef CSR_DEBUG_PRINT
print<row_offset_t>(max_row_offsets, "h_max_row_offsets", stream);
#endif

// For Struct and List (to avoid copying entire strings when mixed type as string is enabled)
thrust::transform_if(
rmm::exec_policy(stream),
rmm::exec_policy_nosync(stream),
col_range_begin.begin(),
col_range_begin.end(),
column_categories.begin(),
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/io/json/json_tree.cu
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ std::pair<rmm::device_uvector<KeyType>, rmm::device_uvector<IndexType>> stable_s
nullptr, temp_storage_bytes, keys_buffer, order_buffer, keys.size());
rmm::device_buffer d_temp_storage(temp_storage_bytes, stream);

thrust::copy(rmm::exec_policy(stream), keys.begin(), keys.end(), keys_buffer1.begin());
thrust::sequence(rmm::exec_policy(stream), order_buffer1.begin(), order_buffer1.end());
thrust::copy(rmm::exec_policy_nosync(stream), keys.begin(), keys.end(), keys_buffer1.begin());
thrust::sequence(rmm::exec_policy_nosync(stream), order_buffer1.begin(), order_buffer1.end());

cub::DeviceRadixSort::SortPairs(d_temp_storage.data(),
temp_storage_bytes,
Expand Down
74 changes: 70 additions & 4 deletions cpp/src/io/json/nested_json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,71 @@ struct device_json_column {
}
};

namespace experimental {
/*
* @brief Sparse graph adjacency matrix stored in Compressed Sparse Row (CSR) format.
*/
struct compressed_sparse_row {
rmm::device_uvector<NodeIndexT> rowidx;
rmm::device_uvector<NodeIndexT> colidx;
};

/*
* @brief Auxiliary column tree properties that are required to construct the device json
* column subtree, but not required for the final cudf column construction.
*/
struct column_tree_properties {
rmm::device_uvector<NodeT> categories;
rmm::device_uvector<size_type> max_row_offsets;
rmm::device_uvector<NodeIndexT> mapped_ids;
};

/*
* @brief Unverified column tree stored in Compressed Sparse Row (CSR) format. The device json
* column subtree - the subgraph that conforms to column tree properties - is extracted and further
* processed according to the JSON reader options passed. Only the final processed subgraph is
* annotated with information required to construct cuDF columns.
*/
struct column_tree {
// concatenated adjacency list
compressed_sparse_row adjacency;
// device_json_column properties
using row_offset_t = size_type;
// Indicator array for the device column subtree
// Stores the number of rows in the column if the node is part of device column subtree
// Stores zero otherwise
rmm::device_uvector<row_offset_t> subtree_nrows;
rmm::device_uvector<row_offset_t> string_offsets;
rmm::device_uvector<row_offset_t> string_lengths;
// Row offsets
rmm::device_uvector<row_offset_t> child_offsets;
// Validity bitmap
rmm::device_buffer validity;
};

namespace detail {
/**
* @brief Reduce node tree into column tree by aggregating each property of column.
*
* @param tree json node tree to reduce (modified in-place, but restored to original state)
* @param col_ids column ids of each node (modified in-place, but restored to original state)
* @param row_offsets row offsets of each node (modified in-place, but restored to original state)
* @param stream The CUDA stream to which kernels are dispatched
* @return A tuple containing the column tree, identifier for each column and the maximum row index
* in each column
*/
CUDF_EXPORT
std::tuple<compressed_sparse_row, column_tree_properties> reduce_to_column_tree(
tree_meta_t& tree,
device_span<NodeIndexT const> original_col_ids,
device_span<size_type const> row_offsets,
bool is_array_of_arrays,
NodeIndexT row_array_parent_col_id,
rmm::cuda_stream_view stream);

} // namespace detail
} // namespace experimental

namespace detail {

// TODO: return device_uvector instead of passing pre-allocated memory
Expand Down Expand Up @@ -314,12 +379,13 @@ get_array_children_indices(TreeDepthT row_array_children_level,
* @return A tuple of column tree representation of JSON string, column ids of columns, and
* max row offsets of columns
*/
CUDF_EXPORT
std::tuple<tree_meta_t, rmm::device_uvector<NodeIndexT>, rmm::device_uvector<size_type>>
reduce_to_column_tree(tree_meta_t& tree,
device_span<NodeIndexT> original_col_ids,
device_span<NodeIndexT> sorted_col_ids,
device_span<NodeIndexT> ordered_node_ids,
device_span<size_type> row_offsets,
device_span<NodeIndexT const> original_col_ids,
device_span<NodeIndexT const> sorted_col_ids,
device_span<NodeIndexT const> ordered_node_ids,
device_span<size_type const> row_offsets,
bool is_array_of_arrays,
NodeIndexT const row_array_parent_col_id,
rmm::cuda_stream_view stream);
Expand Down
Loading
Loading