Skip to content

Commit

Permalink
documentation and cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
karthikeyann committed Sep 17, 2022
1 parent 439cfa0 commit 222cd60
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 79 deletions.
151 changes: 72 additions & 79 deletions cpp/src/io/json/json_tree.cu
Original file line number Diff line number Diff line change
Expand Up @@ -284,24 +284,49 @@ tree_meta_t get_tree_representation(device_span<PdaTokenT const> tokens,
std::move(node_range_end)};
}

// JSON tree traversal for record orient. (list of structs)
// returns col_id of each node, and row_offset(TODO)
std::tuple<rmm::device_uvector<NodeIndexT>, rmm::device_uvector<size_type>>
records_orient_tree_traversal(device_span<SymbolT const> d_input,
tree_meta_t& d_tree,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();
/**
This algorithm assigns a unique column id to each node in the tree.
The row offset is the row index of the node in that column id.
Algorithm:
1. Convert node_category+fieldname to node_type.
a. Create a hashmap to hash field name and assign unique node id as values.
b. Convert the node categories to node types.
Node type is defined as node category enum value if it is not a field node,
otherwise it is the unique node id assigned by the hashmap (value shifted by #NUM_CATEGORY).
2. Preprocessing: Translate parent node ids after sorting by level.
a. sort by level
b. get gather map of sorted indices
c. translate parent_node_ids to new sorted indices
3. Find level boundaries.
copy_if index of first unique values of sorted levels.
4. Per-Level Processing: Propagate parent node ids for each level.
For each level,
a. gather col_id from previous level results. input=col_id, gather_map is parent_indices.
b. stable sort by {parent_col_id, node_type}
c. scan sum of unique {parent_col_id, node_type}
d. scatter the col_id back to stable node_level order (using scatter_indices)
Restore original node_id order
5. Generate row_offset.
a. stable_sort by parent_col_id.
b. scan_by_key {parent_col_id} (required only on nodes who's parent is list)
c. propagate to non-list leaves from parent list node by recursion
**/
// GPU version
// 3. convert node_category+fieldname to node_type!
// 1. Convert node_category+fieldname to node_type.
using hash_table_allocator_type = rmm::mr::stream_allocator_adaptor<default_allocator<char>>;
using hash_map_type =
cuco::static_map<size_type, size_type, cuda::thread_scope_device, hash_table_allocator_type>;

constexpr size_type empty_node_index_sentinel = std::numeric_limits<size_type>::max();
auto num_nodes = d_tree.node_categories.size();
hash_map_type key_map{compute_hash_table_size(num_nodes),
hash_map_type key_map{compute_hash_table_size(num_nodes), // TODO reduce oversubscription
cuco::sentinel::empty_key{empty_node_index_sentinel},
cuco::sentinel::empty_value{empty_node_index_sentinel},
hash_table_allocator_type{default_allocator<char>{}, stream},
Expand Down Expand Up @@ -359,8 +384,9 @@ records_orient_tree_traversal(device_span<SymbolT const> d_input,
else
return static_cast<size_type>(node_categories[node_id]);
});
// TODO delete hash_map after node_type creation
print_vec(cudf::detail::make_std_vector_async(node_type, stream), "node_type");
// 1. Preprocessing: Translate parent node ids after sorting by level.
// 2. Preprocessing: Translate parent node ids after sorting by level.
// a. sort by level
// b. get gather map of sorted indices
// c. translate parent_node_ids to sorted indices
Expand All @@ -373,12 +399,9 @@ records_orient_tree_traversal(device_span<SymbolT const> d_input,
print_vec(cudf::detail::make_std_vector_async(node_type, stream), "gpu.node_type");
print_vec(cudf::detail::make_std_vector_async(d_tree.node_levels, stream), "gpu.node_levels");
rmm::device_uvector<NodeIndexT> parent_node_ids(d_tree.parent_node_ids, stream); // make a copy
auto out_pid = thrust::make_zip_iterator(scatter_indices.data(),
// d_tree.node_levels.data(),
parent_node_ids.data(),
node_type.data());
// d_tree.node_categories.data());
// TODO: use cub radix sort.
auto out_pid =
thrust::make_zip_iterator(scatter_indices.data(), parent_node_ids.data(), node_type.data());
// Uses cub radix sort.
thrust::stable_sort_by_key(rmm::exec_policy(stream),
d_tree.node_levels.data(),
d_tree.node_levels.data() + num_nodes,
Expand All @@ -402,7 +425,7 @@ records_orient_tree_traversal(device_span<SymbolT const> d_input,
print_vec(cudf::detail::make_std_vector_async(parent_indices, stream), "parent_indices");
print_vec(cudf::detail::make_std_vector_async(parent_node_ids, stream),
"parent_node_ids (restored)");
// 2. Find level boundaries.
// 3. Find level boundaries.
hostdevice_vector<size_type> level_boundaries(num_nodes + 1, stream);
auto level_end = thrust::copy_if(
rmm::exec_policy(stream),
Expand All @@ -416,7 +439,6 @@ records_orient_tree_traversal(device_span<SymbolT const> d_input,
print_vec(level_boundaries, "level_boundaries");
auto num_levels = level_end - level_boundaries.d_begin();
std::cout << "num_levels: " << num_levels << std::endl;
// level_boundaries[num_levels] = num_nodes;

auto print_level_data = [stream](auto level,
auto start,
Expand Down Expand Up @@ -450,70 +472,59 @@ records_orient_tree_traversal(device_span<SymbolT const> d_input,
printf("%3d ", col_id.element(n, stream));
printf(" col_id-%ld\n", level);
};
#define PRINT_LEVEL_DATA(level) \
print_level_data(level, \
level_boundaries[level - 1], \
level_boundaries[level], \
scatter_indices, \
parent_indices, \
parent_col_id, \
node_type, \
d_tree.node_levels, \
col_id);

// 4. Propagate parent node ids for each level.
// For each level,
// a. gather col_id from previous level results. input=col_id, gather_map is parent_indices.
// b. sort by {col_id, type}
// c. scan sum of unique {parent_col_id, type}
// cross check outputs.
// Calculate row offsets too.
// b. stable sort by {parent_col_id, node_type}
// c. scan sum of unique {parent_col_id, node_type}
// d. scatter the col_id back to stable node_level order (using scatter_indices)
rmm::device_uvector<NodeIndexT> col_id(num_nodes, stream);
rmm::device_uvector<size_type> parent_col_id(num_nodes, stream);
rmm::device_uvector<NodeIndexT> parent_col_id(num_nodes, stream);
thrust::uninitialized_fill(rmm::exec_policy(stream),
parent_col_id.begin(),
parent_col_id.end(),
0); // XXX: is this needed?
thrust::uninitialized_fill(rmm::exec_policy(stream), col_id.begin(), col_id.end(), 1); ///
thrust::device_pointer_cast(col_id.data())[0] =
0; // TODO: Could initialize to 0 and scatter to level_boundaries
thrust::device_pointer_cast(parent_col_id.data())[0] = -1;
// fill with 1, useful for scan later
thrust::uninitialized_fill(rmm::exec_policy(stream), col_id.begin(), col_id.end(), 1);
// TODO: Could initialize to 0 and scatter to level_boundaries
thrust::device_pointer_cast(col_id.data())[0] = 0; // Initialize First node col_id to 0
thrust::device_pointer_cast(parent_col_id.data())[0] =
-1; // Initialize First node parent_col_id to -1 sentinel
for (decltype(num_levels) level = 1; level < num_levels; level++) {
// std::cout << level << ".before gather\n";
// PRINT_LEVEL_DATA(level);
thrust::gather(rmm::exec_policy(stream),
parent_indices.data() +
level_boundaries[level - 1], // FIXME: might be wrong. might be a bug here.
parent_indices.data() + level_boundaries[level],
col_id.data(), // + level_boundaries[level - 1],
parent_col_id.data() + level_boundaries[level - 1]);
// std::cout << level << ".after gather\n";
// print_level_data(level,
// level_boundaries[level - 1],
// level_boundaries[level],
// scatter_indices,
// parent_indices,
// parent_col_id,
// node_type,
// d_tree.node_levels,
// col_id);
// std::cout << level << ".before sort\n";
// PRINT_LEVEL_DATA(level);
// TODO probably sort_by_key value should be a gather/scatter index to restore original order.
thrust::stable_sort_by_key(
rmm::exec_policy(stream),
thrust::make_zip_iterator(parent_col_id.begin() + level_boundaries[level - 1],
node_type.data() + level_boundaries[level - 1]),
thrust::make_zip_iterator(parent_col_id.begin() + level_boundaries[level],
node_type.data() + level_boundaries[level]),
thrust::make_zip_iterator(
scatter_indices.begin() +
level_boundaries[level - 1] //, // is this required?
// gather_indices.begin() + level_boundaries[level - 1],
// parent_indices.begin() + level_boundaries[level - 1]
));
thrust::make_zip_iterator(scatter_indices.begin() + level_boundaries[level - 1]));
// std::cout << level << ".after sort\n";
// print_level_data(level,
// level_boundaries[level - 1],
// level_boundaries[level],
// scatter_indices,
// parent_indices,
// parent_col_id,
// node_type,
// d_tree.node_levels,
// col_id);
// PRINT_LEVEL_DATA(level);
auto start_it = thrust::make_zip_iterator(parent_col_id.begin() + level_boundaries[level - 1],
node_type.data() + level_boundaries[level - 1]);
auto adjacent_pair_it = thrust::make_zip_iterator(start_it - 1, start_it);
// std::cout << level << ".before transform\n";
thrust::transform(rmm::exec_policy(stream),
adjacent_pair_it + 1,
adjacent_pair_it + level_boundaries[level] - level_boundaries[level - 1],
Expand All @@ -524,39 +535,23 @@ records_orient_tree_traversal(device_span<SymbolT const> d_input,
return lhs != rhs ? 1 : 0;
});
// std::cout << level << ".before scan\n";
// // includes previous level last col_id to continue the index.
// includes previous level last col_id to continue the index.
thrust::inclusive_scan(
rmm::exec_policy(stream),
col_id.data() + level_boundaries[level - 1], // FIXME: This is where the bug is.
col_id.data() + level_boundaries[level] + 1, // TODO: +1 only for not-last-levels.
col_id.data() + level_boundaries[level - 1]);
// // print node_id, parent_node_idx, parent_col_id, node_type, level.
// std::cout << level << ".after scan\n";
// print_level_data(level,
// level_boundaries[level - 1],
// level_boundaries[level],
// scatter_indices,
// parent_indices,
// parent_col_id,
// node_type,
// d_tree.node_levels,
// col_id);
// PRINT_LEVEL_DATA(level);
// TODO scatter/gather to restore original order. (scatter will be faster.)
thrust::stable_sort_by_key(
thrust::sort_by_key(
rmm::exec_policy(stream),
scatter_indices.begin() + level_boundaries[level - 1],
scatter_indices.begin() + level_boundaries[level],
thrust::make_zip_iterator(col_id.begin() + level_boundaries[level - 1],
parent_col_id.data() + level_boundaries[level - 1]));
// print_level_data(level,
// level_boundaries[level - 1],
// level_boundaries[level],
// scatter_indices,
// parent_indices,
// parent_col_id,
// node_type,
// d_tree.node_levels,
// col_id);
// std::cout << level << ".after restore order\n";
// PRINT_LEVEL_DATA(level);
}
// FIXME: to make parent_col_id of last level correct, do we need a gather here?
thrust::gather(rmm::exec_policy(stream),
Expand All @@ -577,7 +572,7 @@ records_orient_tree_traversal(device_span<SymbolT const> d_input,
}
return new_col_ids;
};
// restore original order of col_id.
// restore original order of col_id., and used d_tree members
// TODO can we do this with scatter instead of sort?
thrust::sort_by_key(rmm::exec_policy(stream),
scatter_indices.begin(),
Expand Down Expand Up @@ -607,9 +602,9 @@ records_orient_tree_traversal(device_span<SymbolT const> d_input,
// printf(" (JSON)\n");

// 5. Generate row_offset.
// stable_sort by parent_col_id.
// scan_by_key on nodes who's parent is list on col_id.
// propagate to leaves! how?
// a. stable_sort by parent_col_id.
// b. scan_by_key {parent_col_id} (required only on nodes who's parent is list)
// c. propagate to non-list leaves from parent list node by recursion
thrust::stable_sort_by_key(
rmm::exec_policy(stream), parent_col_id.begin(), parent_col_id.end(), scatter_indices.begin());
rmm::device_uvector<size_type> row_offsets(num_nodes, stream);
Expand All @@ -622,10 +617,10 @@ records_orient_tree_traversal(device_span<SymbolT const> d_input,
row_offsets.begin());
print_vec(cudf::detail::make_std_vector_async(parent_col_id, stream), "parent_col_id");
print_vec(cudf::detail::make_std_vector_async(row_offsets, stream), "row_offsets (generated)");
thrust::stable_sort_by_key(rmm::exec_policy(stream),
scatter_indices.begin(),
scatter_indices.end(),
thrust::make_zip_iterator(parent_col_id.begin(), row_offsets.begin()));
thrust::sort_by_key(rmm::exec_policy(stream),
scatter_indices.begin(),
scatter_indices.end(),
thrust::make_zip_iterator(parent_col_id.begin(), row_offsets.begin()));
thrust::transform_if(
rmm::exec_policy(stream),
thrust::make_counting_iterator<size_type>(0),
Expand All @@ -652,8 +647,6 @@ records_orient_tree_traversal(device_span<SymbolT const> d_input,
// condition for JSON_lines)
});
print_vec(cudf::detail::make_std_vector_async(row_offsets, stream), "row_offsets (generated)");
// TODO check if d_tree is back to original order.
// return col_id, row_offset of each node.
return std::tuple{std::move(col_id), std::move(row_offsets)};
}

Expand Down
11 changes: 11 additions & 0 deletions cpp/src/io/json/nested_json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,17 @@ tree_meta_t get_tree_representation(
rmm::cuda_stream_view stream = cudf::default_stream_value,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief Traverse the tree representation of the JSON input in records orient format and populate
* the output columns indices and row offsets within that column.
*
* @param d_input The JSON input
* @param d_tree A tree representation of the input JSON string as vectors of node type, parent
* index, level, begin index, and end index in the input JSON string
* @param stream The CUDA stream to which kernels are dispatched
* @param mr Optional, resource with which to allocate
* @return A tuple of the output column indices and the row offsets within each column for each node
*/
std::tuple<rmm::device_uvector<NodeIndexT>, rmm::device_uvector<size_type>>
records_orient_tree_traversal(
device_span<SymbolT const> d_input,
Expand Down

0 comments on commit 222cd60

Please sign in to comment.