From 1b34e264cab785db88dab2ea0dea7349ea326674 Mon Sep 17 00:00:00 2001 From: Seunghwa Kang <45857425+seunghwak@users.noreply.github.com> Date: Wed, 7 Apr 2021 09:04:48 -0400 Subject: [PATCH] Improve graph primitives performance on graphs with widely varying vertex degrees (#1447) Partially addresses Issue #1442 Update graph primitives used by PageRank, Katz Centrality, BFS, and SSSP to launch 3 different kernels based on vertex degrees to address thread divergence issue. In addition, cut memory footprint of the VertexFrontier class used by BFS & SSSP. The following highlights performance improvement with this optimization. R-mat 2^25 vertices 2^25 * 32 edges PageRank: 7.66, 7.42, 8.83, 8.83 seconds (the first two unweighted, the last two weighted, first & third without personalization)=> 1.07, 1.08, 1.36, 1.39 seconds Katz: 1.08, 1.94 seconds (unweighted, weighted)=> 0.243, 0.275 BFS: 1.32 seconds=> 0.251 R-mat 2^25 vertices 2^25 * 16 edges SSSP: 1.89 seconds (memory allocation fails with the edge factor of 32)=> 0.317 And now SSSP also works with 2^25 vertices 2^25 * 32 edges with the memory footprint improvement and it took 0.514 sec. Still needs additional optimizations to reach the target performance 1. add BFS & SSSP specific optimizations (the current implementation assumes general reduction operations while BFS can pick any source vertex if a vertex is discovered by multiple source vertices and SSSP picks the one with the minimum edge weight, these pure function reduction operations allow additional optimizations). 2. Launch 3 different kernels in multiple streams to recover parallelism when the frontier size is relatively small (currently three kernels are queued in a single stream, and this leads to up to 3x decrease in parallelism) Authors: - Seunghwa Kang (https://github.com/seunghwak) Approvers: - Alex Fender (https://github.com/afender) - Chuck Hastings (https://github.com/ChuckHastings) - Brad Rees (https://github.com/BradReesWork) URL: https://github.com/rapidsai/cugraph/pull/1447 --- cpp/include/experimental/graph.hpp | 11 +- cpp/include/experimental/graph_functions.hpp | 2 + cpp/include/experimental/graph_view.hpp | 27 +- .../copy_v_transform_reduce_in_out_nbr.cuh | 256 ++++---- ...ransform_reduce_key_aggregated_out_nbr.cuh | 2 +- cpp/include/patterns/count_if_e.cuh | 179 +----- cpp/include/patterns/edge_op_utils.cuh | 38 +- ...orm_reduce_by_adj_matrix_row_col_key_e.cuh | 2 +- cpp/include/patterns/transform_reduce_e.cuh | 260 ++++++-- .../update_frontier_v_push_if_out_nbr.cuh | 603 +++++++++++------- cpp/include/patterns/vertex_frontier.cuh | 344 +++++----- cpp/include/utilities/dataframe_buffer.cuh | 36 ++ cpp/src/experimental/bfs.cu | 34 +- cpp/src/experimental/graph.cu | 20 +- cpp/src/experimental/graph_view.cu | 28 +- cpp/src/experimental/sssp.cu | 34 +- cpp/tests/experimental/bfs_test.cpp | 24 +- .../experimental/katz_centrality_test.cpp | 24 +- cpp/tests/experimental/mg_bfs_test.cpp | 35 +- .../experimental/mg_katz_centrality_test.cpp | 35 +- cpp/tests/experimental/mg_sssp_test.cpp | 35 +- cpp/tests/experimental/pagerank_test.cpp | 24 +- cpp/tests/experimental/sssp_test.cpp | 26 +- cpp/tests/pagerank/mg_pagerank_test.cpp | 29 +- .../utilities/generate_graph_from_edgelist.cu | 13 +- 25 files changed, 1276 insertions(+), 845 deletions(-) diff --git a/cpp/include/experimental/graph.hpp b/cpp/include/experimental/graph.hpp index a380200ea1f..27f766b8593 100644 --- a/cpp/include/experimental/graph.hpp +++ b/cpp/include/experimental/graph.hpp @@ -88,12 +88,12 @@ class graph_tget_number_of_vertices(), this->get_number_of_edges(), this->get_graph_properties(), - vertex_partition_segment_offsets_.size() > 0, + adj_matrix_partition_segment_offsets_.size() > 0, false); } @@ -105,9 +105,10 @@ class graph_t partition_{}; std::vector - vertex_partition_segment_offsets_{}; // segment offsets within the vertex partition based on - // vertex degree, relevant only if - // sorted_by_global_degree_within_vertex_partition is true + adj_matrix_partition_segment_offsets_{}; // segment offsets within the vertex partition based + // on vertex degree, relevant only if + // sorted_by_global_degree_within_vertex_partition is + // true }; // single-GPU version diff --git a/cpp/include/experimental/graph_functions.hpp b/cpp/include/experimental/graph_functions.hpp index 100742adccd..b48dc6da136 100644 --- a/cpp/include/experimental/graph_functions.hpp +++ b/cpp/include/experimental/graph_functions.hpp @@ -251,6 +251,8 @@ void unrenumber_local_int_vertices( vertex_t local_int_vertex_last, bool do_expensive_check = false); +// FIXME: We may add unrenumber_int_rows(or cols) as this will require communication only within a +// sub-communicator and potentially be more efficient. /** * @brief Unrenumber (possibly non-local) internal vertices to external vertices based on the * providied @p renumber_map_labels. diff --git a/cpp/include/experimental/graph_view.hpp b/cpp/include/experimental/graph_view.hpp index 47c93b42ca9..e9593b70ddb 100644 --- a/cpp/include/experimental/graph_view.hpp +++ b/cpp/include/experimental/graph_view.hpp @@ -301,7 +301,7 @@ class graph_view_t const& adj_matrix_partition_offsets, std::vector const& adj_matrix_partition_indices, std::vector const& adj_matrix_partition_weights, - std::vector const& vertex_partition_segment_offsets, + std::vector const& adj_matrix_partition_segment_offsets, partition_t const& partition, vertex_t number_of_vertices, edge_t number_of_edges, @@ -431,6 +431,17 @@ class graph_view_t get_local_adj_matrix_partition_segment_offsets(size_t partition_idx) const + { + return adj_matrix_partition_segment_offsets_.size() > 0 + ? std::vector( + adj_matrix_partition_segment_offsets_.begin() + + partition_idx * (detail::num_segments_per_vertex_partition + 1), + adj_matrix_partition_segment_offsets_.begin() + + (partition_idx + 1) * (detail::num_segments_per_vertex_partition + 1)) + : std::vector{}; + } + // FIXME: this function is not part of the public stable API. This function is mainly for pattern // accelerator implementation. This function is currently public to support the legacy // implementations directly accessing CSR/CSC data, but this function will eventually become @@ -499,9 +510,10 @@ class graph_view_t partition_{}; std::vector - vertex_partition_segment_offsets_{}; // segment offsets within the vertex partition based on - // vertex degree, relevant only if - // sorted_by_global_degree_within_vertex_partition is true + adj_matrix_partition_segment_offsets_{}; // segment offsets within the vertex partition based + // on vertex degree, relevant only if + // sorted_by_global_degree_within_vertex_partition is + // true }; // single-GPU version @@ -612,6 +624,13 @@ class graph_view_t get_local_adj_matrix_partition_segment_offsets( + size_t adj_matrix_partition_idx) const + { + assert(adj_matrix_partition_idx == 0); + return segment_offsets_.size() > 0 ? segment_offsets_ : std::vector{}; + } + // FIXME: this function is not part of the public stable API.This function is mainly for pattern // accelerator implementation. This function is currently public to support the legacy // implementations directly accessing CSR/CSC data, but this function will eventually become diff --git a/cpp/include/patterns/copy_v_transform_reduce_in_out_nbr.cuh b/cpp/include/patterns/copy_v_transform_reduce_in_out_nbr.cuh index e6a73a874ae..6d828dab513 100644 --- a/cpp/include/patterns/copy_v_transform_reduce_in_out_nbr.cuh +++ b/cpp/include/patterns/copy_v_transform_reduce_in_out_nbr.cuh @@ -42,23 +42,7 @@ namespace experimental { namespace detail { -// FIXME: block size requires tuning -int32_t constexpr copy_v_transform_reduce_nbr_for_all_block_size = 128; - -#if 0 -// FIXME: delete this once we verify that the thrust replace in for_all_major_for_all_nbr_low_degree is no slower than the original for loop based imoplementation -template -__device__ std::enable_if_t accumulate_edge_op_result(T& lhs, T const& rhs) -{ - lhs = plus_edge_op_result(lhs, rhs); -} - -template -__device__ std::enable_if_t accumulate_edge_op_result(T& lhs, T const& rhs) -{ - atomic_add(&lhs, rhs); -} -#endif +int32_t constexpr copy_v_transform_reduce_nbr_for_all_block_size = 512; template (tid); while (idx < static_cast(major_last - major_first)) { + auto major_offset = major_start_offset + idx; vertex_t const* indices{nullptr}; weight_t const* weights{nullptr}; edge_t local_degree{}; - auto major_offset = major_start_offset + idx; thrust::tie(indices, weights, local_degree) = matrix_partition.get_local_edges(static_cast(major_offset)); -#if 1 auto transform_op = [&matrix_partition, &adj_matrix_row_value_input_first, &adj_matrix_col_value_input_first, @@ -148,44 +131,6 @@ __global__ void for_all_major_for_all_nbr_low_degree( atomic_accumulate_edge_op_result(result_value_output_first + minor_offset, e_op_result); }); } -#else - // FIXME: delete this once we verify that the code above is not slower than this. - e_op_result_t e_op_result_sum{init}; // relevent only if update_major == true - for (edge_t i = 0; i < local_degree; ++i) { - auto minor = indices[i]; - auto weight = weights != nullptr ? weights[i] : weight_t{1.0}; - auto minor_offset = matrix_partition.get_minor_offset_from_minor_nocheck(minor); - auto row = GraphViewType::is_adj_matrix_transposed - ? minor - : matrix_partition.get_major_from_major_offset_nocheck(major_offset); - auto col = GraphViewType::is_adj_matrix_transposed - ? matrix_partition.get_major_from_major_offset_nocheck(major_offset) - : minor; - auto row_offset = GraphViewType::is_adj_matrix_transposed - ? minor_offset - : static_cast(major_offset); - auto col_offset = GraphViewType::is_adj_matrix_transposed - ? static_cast(major_offset) - : minor_offset; - auto e_op_result = evaluate_edge_op() - .compute(row, - col, - weight, - *(adj_matrix_row_value_input_first + row_offset), - *(adj_matrix_col_value_input_first + col_offset), - e_op); - if (update_major) { - accumulate_edge_op_result(e_op_result_sum, e_op_result); - } else { - accumulate_edge_op_result(*(result_value_output_first + minor_offset), - e_op_result); - } - } - if (update_major) { *(result_value_output_first + idx) = e_op_result_sum; } -#endif idx += gridDim.x * blockDim.x; } } @@ -219,14 +164,14 @@ __global__ void for_all_major_for_all_nbr_mid_degree( auto idx = static_cast(tid / raft::warp_size()); while (idx < static_cast(major_last - major_first)) { + auto major_offset = major_start_offset + idx; vertex_t const* indices{nullptr}; weight_t const* weights{nullptr}; edge_t local_degree{}; - auto major_offset = major_start_offset + idx; thrust::tie(indices, weights, local_degree) = matrix_partition.get_local_edges(major_offset); auto e_op_result_sum = lane_id == 0 ? init : e_op_result_t{}; // relevent only if update_major == true - for (edge_t i = lane_id; i < local_degree; i += raft::warp_size) { + for (edge_t i = lane_id; i < local_degree; i += raft::warp_size()) { auto minor = indices[i]; auto weight = weights != nullptr ? weights[i] : weight_t{1.0}; auto minor_offset = matrix_partition.get_minor_offset_from_minor_nocheck(minor); @@ -293,10 +238,10 @@ __global__ void for_all_major_for_all_nbr_high_degree( auto idx = static_cast(blockIdx.x); while (idx < static_cast(major_last - major_first)) { + auto major_offset = major_start_offset + idx; vertex_t const* indices{nullptr}; weight_t const* weights{nullptr}; edge_t local_degree{}; - auto major_offset = major_start_offset + idx; thrust::tie(indices, weights, local_degree) = matrix_partition.get_local_edges(major_offset); auto e_op_result_sum = threadIdx.x == 0 ? init : e_op_result_t{}; // relevent only if update_major == true @@ -358,7 +303,8 @@ void copy_v_transform_reduce_nbr(raft::handle_t const& handle, T init, VertexValueOutputIterator vertex_value_output_first) { - using vertex_t = typename GraphViewType::vertex_type; + constexpr auto update_major = (in == GraphViewType::is_adj_matrix_transposed); + using vertex_t = typename GraphViewType::vertex_type; static_assert(is_arithmetic_or_thrust_tuple_of_arithmetic::value); @@ -398,15 +344,13 @@ void copy_v_transform_reduce_nbr(raft::handle_t const& handle, matrix_partition_device_t matrix_partition(graph_view, i); auto major_tmp_buffer_size = - GraphViewType::is_multi_gpu && (in == GraphViewType::is_adj_matrix_transposed) - ? matrix_partition.get_major_size() - : vertex_t{0}; + GraphViewType::is_multi_gpu && update_major ? matrix_partition.get_major_size() : vertex_t{0}; auto major_tmp_buffer = allocate_dataframe_buffer(major_tmp_buffer_size, handle.get_stream()); auto major_buffer_first = get_dataframe_buffer_begin(major_tmp_buffer); auto major_init = T{}; - if (in == GraphViewType::is_adj_matrix_transposed) { + if (update_major) { if (GraphViewType::is_multi_gpu) { auto& col_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().col_name()); auto const col_comm_rank = col_comm.get_rank(); @@ -416,60 +360,142 @@ void copy_v_transform_reduce_nbr(raft::handle_t const& handle, } } - int comm_root_rank = 0; - if (GraphViewType::is_multi_gpu) { - auto& row_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().row_name()); - auto const row_comm_rank = row_comm.get_rank(); - auto const row_comm_size = row_comm.get_size(); - auto& col_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().col_name()); - auto const col_comm_rank = col_comm.get_rank(); - comm_root_rank = i * row_comm_size + row_comm_rank; - } - - if (graph_view.get_vertex_partition_size(comm_root_rank) > 0) { - raft::grid_1d_thread_t update_grid(graph_view.get_vertex_partition_size(comm_root_rank), + auto row_value_input_offset = GraphViewType::is_adj_matrix_transposed + ? vertex_t{0} + : matrix_partition.get_major_value_start_offset(); + auto col_value_input_offset = GraphViewType::is_adj_matrix_transposed + ? matrix_partition.get_major_value_start_offset() + : vertex_t{0}; + auto segment_offsets = graph_view.get_local_adj_matrix_partition_segment_offsets(i); + if (segment_offsets.size() > 0) { + // FIXME: we may further improve performance by 1) concurrently running kernels on different + // segments; 2) individually tuning block sizes for different segments; and 3) adding one more + // segment for very high degree vertices and running segmented reduction + static_assert(detail::num_segments_per_vertex_partition == 3); + if (segment_offsets[1] > 0) { + raft::grid_1d_block_t update_grid(segment_offsets[1], + detail::copy_v_transform_reduce_nbr_for_all_block_size, + handle.get_device_properties().maxGridSize[0]); + // FIXME: with C++17 we can collapse the if-else statement below with a functor with "if + // constexpr" that returns either a multi-GPU output buffer or a single-GPU output buffer. + if (GraphViewType::is_multi_gpu) { + detail::for_all_major_for_all_nbr_high_degree + <<>>( + matrix_partition, + matrix_partition.get_major_first(), + matrix_partition.get_major_first() + segment_offsets[1], + adj_matrix_row_value_input_first + row_value_input_offset, + adj_matrix_col_value_input_first + col_value_input_offset, + update_major ? major_buffer_first : minor_buffer_first, + e_op, + major_init); + } else { + detail::for_all_major_for_all_nbr_high_degree + <<>>( + matrix_partition, + matrix_partition.get_major_first(), + matrix_partition.get_major_first() + segment_offsets[1], + adj_matrix_row_value_input_first + row_value_input_offset, + adj_matrix_col_value_input_first + col_value_input_offset, + vertex_value_output_first, + e_op, + major_init); + } + } + if (segment_offsets[2] - segment_offsets[1] > 0) { + raft::grid_1d_warp_t update_grid(segment_offsets[2] - segment_offsets[1], detail::copy_v_transform_reduce_nbr_for_all_block_size, handle.get_device_properties().maxGridSize[0]); - - if (GraphViewType::is_multi_gpu) { - auto& row_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().row_name()); - auto const row_comm_size = row_comm.get_size(); - auto& col_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().col_name()); - auto const col_comm_rank = col_comm.get_rank(); - - auto row_value_input_offset = GraphViewType::is_adj_matrix_transposed - ? vertex_t{0} - : matrix_partition.get_major_value_start_offset(); - auto col_value_input_offset = GraphViewType::is_adj_matrix_transposed - ? matrix_partition.get_major_value_start_offset() - : vertex_t{0}; - - detail::for_all_major_for_all_nbr_low_degree - <<>>( - matrix_partition, - graph_view.get_vertex_partition_first(comm_root_rank), - graph_view.get_vertex_partition_last(comm_root_rank), - adj_matrix_row_value_input_first + row_value_input_offset, - adj_matrix_col_value_input_first + col_value_input_offset, - (in == GraphViewType::is_adj_matrix_transposed) ? major_buffer_first - : minor_buffer_first, - e_op, - major_init); - } else { - detail::for_all_major_for_all_nbr_low_degree - <<>>( - matrix_partition, - graph_view.get_vertex_partition_first(comm_root_rank), - graph_view.get_vertex_partition_last(comm_root_rank), - adj_matrix_row_value_input_first, - adj_matrix_col_value_input_first, - vertex_value_output_first, - e_op, - major_init); + // FIXME: with C++17 we can collapse the if-else statement below with a functor with "if + // constexpr" that returns either a multi-GPU output buffer or a single-GPU output buffer. + if (GraphViewType::is_multi_gpu) { + detail::for_all_major_for_all_nbr_mid_degree + <<>>( + matrix_partition, + matrix_partition.get_major_first() + segment_offsets[1], + matrix_partition.get_major_first() + segment_offsets[2], + adj_matrix_row_value_input_first + row_value_input_offset, + adj_matrix_col_value_input_first + col_value_input_offset, + update_major ? major_buffer_first + segment_offsets[1] : minor_buffer_first, + e_op, + major_init); + } else { + detail::for_all_major_for_all_nbr_mid_degree + <<>>( + matrix_partition, + matrix_partition.get_major_first() + segment_offsets[1], + matrix_partition.get_major_first() + segment_offsets[2], + adj_matrix_row_value_input_first + row_value_input_offset, + adj_matrix_col_value_input_first + col_value_input_offset, + vertex_value_output_first + (update_major ? segment_offsets[1] : vertex_t{0}), + e_op, + major_init); + } + } + if (segment_offsets[3] - segment_offsets[2] > 0) { + raft::grid_1d_thread_t update_grid(segment_offsets[3] - segment_offsets[2], + detail::copy_v_transform_reduce_nbr_for_all_block_size, + handle.get_device_properties().maxGridSize[0]); + // FIXME: with C++17 we can collapse the if-else statement below with a functor with "if + // constexpr" that returns either a multi-GPU output buffer or a single-GPU output buffer. + if (GraphViewType::is_multi_gpu) { + detail::for_all_major_for_all_nbr_low_degree + <<>>( + matrix_partition, + matrix_partition.get_major_first() + segment_offsets[2], + matrix_partition.get_major_last(), + adj_matrix_row_value_input_first + row_value_input_offset, + adj_matrix_col_value_input_first + col_value_input_offset, + update_major ? major_buffer_first + segment_offsets[2] : minor_buffer_first, + e_op, + major_init); + } else { + detail::for_all_major_for_all_nbr_low_degree + <<>>( + matrix_partition, + matrix_partition.get_major_first() + segment_offsets[2], + matrix_partition.get_major_last(), + adj_matrix_row_value_input_first + row_value_input_offset, + adj_matrix_col_value_input_first + col_value_input_offset, + vertex_value_output_first + (update_major ? segment_offsets[2] : vertex_t{0}), + e_op, + major_init); + } + } + } else { + if (matrix_partition.get_major_size() > 0) { + raft::grid_1d_thread_t update_grid(matrix_partition.get_major_size(), + detail::copy_v_transform_reduce_nbr_for_all_block_size, + handle.get_device_properties().maxGridSize[0]); + // FIXME: with C++17 we can collapse the if-else statement below with a functor with "if + // constexpr" that returns either a multi-GPU output buffer or a single-GPU output buffer. + if (GraphViewType::is_multi_gpu) { + detail::for_all_major_for_all_nbr_low_degree + <<>>( + matrix_partition, + matrix_partition.get_major_first(), + matrix_partition.get_major_last(), + adj_matrix_row_value_input_first + row_value_input_offset, + adj_matrix_col_value_input_first + col_value_input_offset, + update_major ? major_buffer_first : minor_buffer_first, + e_op, + major_init); + } else { + detail::for_all_major_for_all_nbr_low_degree + <<>>( + matrix_partition, + matrix_partition.get_major_first(), + matrix_partition.get_major_last(), + adj_matrix_row_value_input_first + row_value_input_offset, + adj_matrix_col_value_input_first + col_value_input_offset, + vertex_value_output_first, + e_op, + major_init); + } } } - if (GraphViewType::is_multi_gpu && (in == GraphViewType::is_adj_matrix_transposed)) { + if (GraphViewType::is_multi_gpu && update_major) { auto& row_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().row_name()); auto const row_comm_rank = row_comm.get_rank(); auto const row_comm_size = row_comm.get_size(); @@ -487,7 +513,7 @@ void copy_v_transform_reduce_nbr(raft::handle_t const& handle, } } - if (GraphViewType::is_multi_gpu && (in != GraphViewType::is_adj_matrix_transposed)) { + if (GraphViewType::is_multi_gpu && !update_major) { auto& comm = handle.get_comms(); auto const comm_rank = comm.get_rank(); auto& row_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().row_name()); diff --git a/cpp/include/patterns/copy_v_transform_reduce_key_aggregated_out_nbr.cuh b/cpp/include/patterns/copy_v_transform_reduce_key_aggregated_out_nbr.cuh index 22dc2041793..f904c35ef9e 100644 --- a/cpp/include/patterns/copy_v_transform_reduce_key_aggregated_out_nbr.cuh +++ b/cpp/include/patterns/copy_v_transform_reduce_key_aggregated_out_nbr.cuh @@ -60,10 +60,10 @@ __global__ void for_all_major_for_all_nbr_low_degree( auto idx = static_cast(tid); while (idx < static_cast(major_last - major_first)) { + auto major_offset = major_start_offset + idx; vertex_t const* indices{nullptr}; weight_t const* weights{nullptr}; edge_t local_degree{}; - auto major_offset = major_start_offset + idx; thrust::tie(indices, weights, local_degree) = matrix_partition.get_local_edges(static_cast(major_offset)); if (local_degree > 0) { diff --git a/cpp/include/patterns/count_if_e.cuh b/cpp/include/patterns/count_if_e.cuh index 99bfc80f643..4eb3fea24c4 100644 --- a/cpp/include/patterns/count_if_e.cuh +++ b/cpp/include/patterns/count_if_e.cuh @@ -16,132 +16,16 @@ #pragma once #include -#include #include -#include -#include +#include -#include -#include #include -#include -#include - #include -#include namespace cugraph { namespace experimental { -namespace detail { - -// FIXME: block size requires tuning -int32_t constexpr count_if_e_for_all_block_size = 128; - -// FIXME: function names conflict if included with transform_reduce_e.cuh -template -__global__ void for_all_major_for_all_nbr_low_degree( - matrix_partition_device_t matrix_partition, - AdjMatrixRowValueInputIterator adj_matrix_row_value_input_first, - AdjMatrixColValueInputIterator adj_matrix_col_value_input_first, - typename GraphViewType::edge_type* block_counts, - EdgeOp e_op) -{ - using vertex_t = typename GraphViewType::vertex_type; - using edge_t = typename GraphViewType::edge_type; - using weight_t = typename GraphViewType::weight_type; - - auto const tid = threadIdx.x + blockIdx.x * blockDim.x; - auto idx = static_cast(tid); - - edge_t count{0}; - while (idx < static_cast(matrix_partition.get_major_size())) { - vertex_t const* indices{nullptr}; - weight_t const* weights{nullptr}; - edge_t local_degree{}; - thrust::tie(indices, weights, local_degree) = matrix_partition.get_local_edges(idx); -#if 1 - count += thrust::count_if( - thrust::seq, - thrust::make_counting_iterator(edge_t{0}), - thrust::make_counting_iterator(local_degree), - [&matrix_partition, - &adj_matrix_row_value_input_first, - &adj_matrix_col_value_input_first, - &e_op, - idx, - indices, - weights] __device__(auto i) { - auto minor = indices[i]; - auto weight = weights != nullptr ? weights[i] : 1.0; - auto minor_offset = matrix_partition.get_minor_offset_from_minor_nocheck(minor); - auto row = GraphViewType::is_adj_matrix_transposed - ? minor - : matrix_partition.get_major_from_major_offset_nocheck(idx); - auto col = GraphViewType::is_adj_matrix_transposed - ? matrix_partition.get_major_from_major_offset_nocheck(idx) - : minor; - auto row_offset = - GraphViewType::is_adj_matrix_transposed ? minor_offset : static_cast(idx); - auto col_offset = - GraphViewType::is_adj_matrix_transposed ? static_cast(idx) : minor_offset; - auto e_op_result = evaluate_edge_op() - .compute(row, - col, - weight, - *(adj_matrix_row_value_input_first + row_offset), - *(adj_matrix_col_value_input_first + col_offset), - e_op); - - return e_op_result; - }); -#else - // FIXME: delete this once we verify that the code above is not slower than this. - for (vertex_t i = 0; i < local_degree; ++i) { - auto minor = indices[i]; - auto weight = weights != nullptr ? weights[i] : 1.0; - auto minor_offset = matrix_partition.get_minor_offset_from_minor_nocheck(minor); - auto row = GraphViewType::is_adj_matrix_transposed - ? minor - : matrix_partition.get_major_from_major_offset_nocheck(idx); - auto col = GraphViewType::is_adj_matrix_transposed - ? matrix_partition.get_major_from_major_offset_nocheck(idx) - : minor; - auto row_offset = - GraphViewType::is_adj_matrix_transposed ? minor_offset : static_cast(idx); - auto col_offset = - GraphViewType::is_adj_matrix_transposed ? static_cast(idx) : minor_offset; - auto e_op_result = evaluate_edge_op() - .compute(row, - col, - weight, - *(adj_matrix_row_value_input_first + row_offset), - *(adj_matrix_col_value_input_first + col_offset), - e_op); - if (e_op_result) { count++; } - } -#endif - idx += gridDim.x * blockDim.x; - } - - using BlockReduce = cub::BlockReduce; - __shared__ typename BlockReduce::TempStorage temp_storage; - count = BlockReduce(temp_storage).Sum(count); - if (threadIdx.x == 0) { *(block_counts + blockIdx.x) = count; } -} - -} // namespace detail - /** * @brief Count the number of edges that satisfies the given predicate. * @@ -182,55 +66,18 @@ typename GraphViewType::edge_type count_if_e( AdjMatrixColValueInputIterator adj_matrix_col_value_input_first, EdgeOp e_op) { - using vertex_t = typename GraphViewType::vertex_type; - using edge_t = typename GraphViewType::edge_type; - - edge_t count{0}; - for (size_t i = 0; i < graph_view.get_number_of_local_adj_matrix_partitions(); ++i) { - matrix_partition_device_t matrix_partition(graph_view, i); - - if (matrix_partition.get_major_size() > 0) { - auto row_value_input_offset = GraphViewType::is_adj_matrix_transposed - ? vertex_t{0} - : matrix_partition.get_major_value_start_offset(); - auto col_value_input_offset = GraphViewType::is_adj_matrix_transposed - ? matrix_partition.get_major_value_start_offset() - : vertex_t{0}; - - raft::grid_1d_thread_t update_grid(matrix_partition.get_major_size(), - detail::count_if_e_for_all_block_size, - handle.get_device_properties().maxGridSize[0]); - - rmm::device_uvector block_counts(update_grid.num_blocks, handle.get_stream()); - - detail::for_all_major_for_all_nbr_low_degree<<>>( - matrix_partition, - adj_matrix_row_value_input_first + row_value_input_offset, - adj_matrix_col_value_input_first + col_value_input_offset, - block_counts.data(), - e_op); - - // FIXME: we have several options to implement this. With cooperative group support - // (https://devblogs.nvidia.com/cooperative-groups/), we can run this synchronization within - // the previous kernel. Using atomics at the end of the previous kernel is another option - // (sequentialization due to atomics may not be bad as different blocks may reach the - // synchronization point in varying timings and the number of SMs is not very big) - count += thrust::reduce(rmm::exec_policy(handle.get_stream())->on(handle.get_stream()), - block_counts.begin(), - block_counts.end(), - edge_t{0}, - thrust::plus()); - } - } - - if (GraphViewType::is_multi_gpu) { - count = host_scalar_allreduce(handle.get_comms(), count, handle.get_stream()); - } - - return count; + using edge_t = typename GraphViewType::edge_type; + + return transform_reduce_e(handle, + graph_view, + adj_matrix_row_value_input_first, + adj_matrix_col_value_input_first, + cast_edge_op_bool_to_integer{e_op}, + edge_t{0}); } } // namespace experimental diff --git a/cpp/include/patterns/edge_op_utils.cuh b/cpp/include/patterns/edge_op_utils.cuh index 58fb31c7605..198c1880ff4 100644 --- a/cpp/include/patterns/edge_op_utils.cuh +++ b/cpp/include/patterns/edge_op_utils.cuh @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -77,6 +77,42 @@ struct evaluate_edge_op { } }; +template +struct cast_edge_op_bool_to_integer { + static_assert(std::is_integral::value); + using vertex_type = typename GraphViewType::vertex_type; + using weight_type = typename GraphViewType::weight_type; + using row_value_type = typename std::iterator_traits::value_type; + using col_value_type = typename std::iterator_traits::value_type; + + EdgeOp e_op{}; + + template + __device__ std::enable_if_t>::valid, T> + operator()(V r, V c, W w, R rv, C cv) + { + return e_op(r, c, w, rv, cv) ? T{1} : T{0}; + } + + template + __device__ std::enable_if_t>::valid, T> + operator()(V r, V c, R rv, C cv) + { + return e_op(r, c, rv, cv) ? T{1} : T{0}; + } +}; + template __host__ __device__ std::enable_if_t::value, T> plus_edge_op_result( T const& lhs, T const& rhs) diff --git a/cpp/include/patterns/transform_reduce_by_adj_matrix_row_col_key_e.cuh b/cpp/include/patterns/transform_reduce_by_adj_matrix_row_col_key_e.cuh index 34721c75e31..9848aa21f88 100644 --- a/cpp/include/patterns/transform_reduce_by_adj_matrix_row_col_key_e.cuh +++ b/cpp/include/patterns/transform_reduce_by_adj_matrix_row_col_key_e.cuh @@ -62,10 +62,10 @@ __global__ void for_all_major_for_all_nbr_low_degree( auto idx = static_cast(tid); while (idx < static_cast(major_last - major_first)) { + auto major_offset = major_start_offset + idx; vertex_t const* indices{nullptr}; weight_t const* weights{nullptr}; edge_t local_degree{}; - auto major_offset = major_start_offset + idx; thrust::tie(indices, weights, local_degree) = matrix_partition.get_local_edges(static_cast(major_offset)); if (local_degree > 0) { diff --git a/cpp/include/patterns/transform_reduce_e.cuh b/cpp/include/patterns/transform_reduce_e.cuh index 1f59777bc35..b95e036d460 100644 --- a/cpp/include/patterns/transform_reduce_e.cuh +++ b/cpp/include/patterns/transform_reduce_e.cuh @@ -41,31 +41,34 @@ int32_t constexpr transform_reduce_e_for_all_block_size = 128; template __global__ void for_all_major_for_all_nbr_low_degree( matrix_partition_device_t matrix_partition, + typename GraphViewType::vertex_type major_first, + typename GraphViewType::vertex_type major_last, AdjMatrixRowValueInputIterator adj_matrix_row_value_input_first, AdjMatrixColValueInputIterator adj_matrix_col_value_input_first, - BlockResultIterator block_result_first, + ResultIterator result_iter /* size 1 */, EdgeOp e_op) { using vertex_t = typename GraphViewType::vertex_type; using edge_t = typename GraphViewType::edge_type; using weight_t = typename GraphViewType::weight_type; - using e_op_result_t = typename std::iterator_traits::value_type; + using e_op_result_t = typename std::iterator_traits::value_type; - auto const tid = threadIdx.x + blockIdx.x * blockDim.x; - size_t idx = static_cast(tid); + auto const tid = threadIdx.x + blockIdx.x * blockDim.x; + auto major_start_offset = static_cast(major_first - matrix_partition.get_major_first()); + size_t idx = static_cast(tid); e_op_result_t e_op_result_sum{}; - while (idx < static_cast(matrix_partition.get_major_size())) { + while (idx < static_cast(major_last - major_first)) { + auto major_offset = major_start_offset + idx; vertex_t const* indices{nullptr}; weight_t const* weights{nullptr}; edge_t local_degree{}; - thrust::tie(indices, weights, local_degree) = matrix_partition.get_local_edges(idx); -#if 1 - auto sum = thrust::transform_reduce( + thrust::tie(indices, weights, local_degree) = matrix_partition.get_local_edges(major_offset); + auto sum = thrust::transform_reduce( thrust::seq, thrust::make_counting_iterator(edge_t{0}), thrust::make_counting_iterator(local_degree), @@ -104,9 +107,112 @@ __global__ void for_all_major_for_all_nbr_low_degree( [] __device__(auto lhs, auto rhs) { return plus_edge_op_result(lhs, rhs); }); e_op_result_sum = plus_edge_op_result(e_op_result_sum, sum); -#else - // FIXME: delete this once we verify that the code above is not slower than this. - for (vertex_t i = 0; i < local_degree; ++i) { + idx += gridDim.x * blockDim.x; + } + + e_op_result_sum = + block_reduce_edge_op_result().compute( + e_op_result_sum); + if (threadIdx.x == 0) { atomic_accumulate_edge_op_result(result_iter, e_op_result_sum); } +} + +template +__global__ void for_all_major_for_all_nbr_mid_degree( + matrix_partition_device_t matrix_partition, + typename GraphViewType::vertex_type major_first, + typename GraphViewType::vertex_type major_last, + AdjMatrixRowValueInputIterator adj_matrix_row_value_input_first, + AdjMatrixColValueInputIterator adj_matrix_col_value_input_first, + ResultIterator result_iter /* size 1 */, + EdgeOp e_op) +{ + using vertex_t = typename GraphViewType::vertex_type; + using edge_t = typename GraphViewType::edge_type; + using weight_t = typename GraphViewType::weight_type; + using e_op_result_t = typename std::iterator_traits::value_type; + + auto const tid = threadIdx.x + blockIdx.x * blockDim.x; + static_assert(transform_reduce_e_for_all_block_size % raft::warp_size() == 0); + auto const lane_id = tid % raft::warp_size(); + auto major_start_offset = static_cast(major_first - matrix_partition.get_major_first()); + size_t idx = static_cast(tid / raft::warp_size()); + + e_op_result_t e_op_result_sum{}; + while (idx < static_cast(major_last - major_first)) { + auto major_offset = major_start_offset + idx; + vertex_t const* indices{nullptr}; + weight_t const* weights{nullptr}; + edge_t local_degree{}; + thrust::tie(indices, weights, local_degree) = matrix_partition.get_local_edges(major_offset); + for (edge_t i = lane_id; i < local_degree; i += raft::warp_size()) { + auto minor = indices[i]; + auto weight = weights != nullptr ? weights[i] : weight_t{1.0}; + auto minor_offset = matrix_partition.get_minor_offset_from_minor_nocheck(minor); + auto row = GraphViewType::is_adj_matrix_transposed + ? minor + : matrix_partition.get_major_from_major_offset_nocheck(idx); + auto col = GraphViewType::is_adj_matrix_transposed + ? matrix_partition.get_major_from_major_offset_nocheck(idx) + : minor; + auto row_offset = + GraphViewType::is_adj_matrix_transposed ? minor_offset : static_cast(idx); + auto col_offset = + GraphViewType::is_adj_matrix_transposed ? static_cast(idx) : minor_offset; + auto e_op_result = evaluate_edge_op() + .compute(row, + col, + weight, + *(adj_matrix_row_value_input_first + row_offset), + *(adj_matrix_col_value_input_first + col_offset), + e_op); + e_op_result_sum = plus_edge_op_result(e_op_result_sum, e_op_result); + } + idx += gridDim.x * (blockDim.x / raft::warp_size()); + } + + e_op_result_sum = + block_reduce_edge_op_result().compute( + e_op_result_sum); + if (threadIdx.x == 0) { atomic_accumulate_edge_op_result(result_iter, e_op_result_sum); } +} + +template +__global__ void for_all_major_for_all_nbr_high_degree( + matrix_partition_device_t matrix_partition, + typename GraphViewType::vertex_type major_first, + typename GraphViewType::vertex_type major_last, + AdjMatrixRowValueInputIterator adj_matrix_row_value_input_first, + AdjMatrixColValueInputIterator adj_matrix_col_value_input_first, + ResultIterator result_iter /* size 1 */, + EdgeOp e_op) +{ + using vertex_t = typename GraphViewType::vertex_type; + using edge_t = typename GraphViewType::edge_type; + using weight_t = typename GraphViewType::weight_type; + using e_op_result_t = typename std::iterator_traits::value_type; + + auto major_start_offset = static_cast(major_first - matrix_partition.get_major_first()); + size_t idx = static_cast(blockIdx.x); + + e_op_result_t e_op_result_sum{}; + while (idx < static_cast(major_last - major_first)) { + auto major_offset = major_start_offset + idx; + vertex_t const* indices{nullptr}; + weight_t const* weights{nullptr}; + edge_t local_degree{}; + thrust::tie(indices, weights, local_degree) = matrix_partition.get_local_edges(major_offset); + for (edge_t i = threadIdx.x; i < local_degree; i += blockDim.x) { auto minor = indices[i]; auto weight = weights != nullptr ? weights[i] : weight_t{1.0}; auto minor_offset = matrix_partition.get_minor_offset_from_minor_nocheck(minor); @@ -132,14 +238,13 @@ __global__ void for_all_major_for_all_nbr_low_degree( e_op); e_op_result_sum = plus_edge_op_result(e_op_result_sum, e_op_result); } -#endif - idx += gridDim.x * blockDim.x; + idx += gridDim.x; } e_op_result_sum = block_reduce_edge_op_result().compute( e_op_result_sum); - if (threadIdx.x == 0) { *(block_result_first + blockIdx.x) = e_op_result_sum; } + if (threadIdx.x == 0) { atomic_accumulate_edge_op_result(result_iter, e_op_result_sum); } } } // namespace detail @@ -190,51 +295,106 @@ T transform_reduce_e(raft::handle_t const& handle, using vertex_t = typename GraphViewType::vertex_type; - T result{}; + auto result_buffer = allocate_dataframe_buffer(1, handle.get_stream()); + thrust::fill(rmm::exec_policy(handle.get_stream())->on(handle.get_stream()), + get_dataframe_buffer_begin(result_buffer), + get_dataframe_buffer_begin(result_buffer) + 1, + T{}); + for (size_t i = 0; i < graph_view.get_number_of_local_adj_matrix_partitions(); ++i) { matrix_partition_device_t matrix_partition(graph_view, i); - if (matrix_partition.get_major_size() > 0) { - auto row_value_input_offset = GraphViewType::is_adj_matrix_transposed - ? vertex_t{0} - : matrix_partition.get_major_value_start_offset(); - auto col_value_input_offset = GraphViewType::is_adj_matrix_transposed - ? matrix_partition.get_major_value_start_offset() - : vertex_t{0}; + auto row_value_input_offset = GraphViewType::is_adj_matrix_transposed + ? vertex_t{0} + : matrix_partition.get_major_value_start_offset(); + auto col_value_input_offset = GraphViewType::is_adj_matrix_transposed + ? matrix_partition.get_major_value_start_offset() + : vertex_t{0}; + auto segment_offsets = graph_view.get_local_adj_matrix_partition_segment_offsets(i); + if (segment_offsets.size() > 0) { + // FIXME: we may further improve performance by 1) concurrently running kernels on different + // segments; 2) individually tuning block sizes for different segments; and 3) adding one more + // segment for very high degree vertices and running segmented reduction + static_assert(detail::num_segments_per_vertex_partition == 3); + if (segment_offsets[1] > 0) { + raft::grid_1d_block_t update_grid(segment_offsets[1], + detail::transform_reduce_e_for_all_block_size, + handle.get_device_properties().maxGridSize[0]); - raft::grid_1d_thread_t update_grid(matrix_partition.get_major_size(), + detail::for_all_major_for_all_nbr_high_degree<<>>( + matrix_partition, + matrix_partition.get_major_first(), + matrix_partition.get_major_first() + segment_offsets[1], + adj_matrix_row_value_input_first + row_value_input_offset, + adj_matrix_col_value_input_first + col_value_input_offset, + get_dataframe_buffer_begin(result_buffer), + e_op); + } + if (segment_offsets[2] - segment_offsets[1] > 0) { + raft::grid_1d_warp_t update_grid(segment_offsets[2] - segment_offsets[1], detail::transform_reduce_e_for_all_block_size, handle.get_device_properties().maxGridSize[0]); - auto block_result_buffer = - allocate_dataframe_buffer(update_grid.num_blocks, handle.get_stream()); - - detail::for_all_major_for_all_nbr_low_degree<<>>( - matrix_partition, - adj_matrix_row_value_input_first + row_value_input_offset, - adj_matrix_col_value_input_first + col_value_input_offset, - get_dataframe_buffer_begin(block_result_buffer), - e_op); - - // FIXME: we have several options to implement this. With cooperative group support - // (https://devblogs.nvidia.com/cooperative-groups/), we can run this synchronization within - // the previous kernel. Using atomics at the end of the previous kernel is another option - // (sequentialization due to atomics may not be bad as different blocks may reach the - // synchronization point in varying timings and the number of SMs is not very big) - auto partial_result = - thrust::reduce(rmm::exec_policy(handle.get_stream())->on(handle.get_stream()), - get_dataframe_buffer_begin(block_result_buffer), - get_dataframe_buffer_begin(block_result_buffer) + update_grid.num_blocks, - T(), - [] __device__(T lhs, T rhs) { return plus_edge_op_result(lhs, rhs); }); - - result = plus_edge_op_result(result, partial_result); + detail::for_all_major_for_all_nbr_mid_degree<<>>( + matrix_partition, + matrix_partition.get_major_first() + segment_offsets[1], + matrix_partition.get_major_first() + segment_offsets[2], + adj_matrix_row_value_input_first + row_value_input_offset, + adj_matrix_col_value_input_first + col_value_input_offset, + get_dataframe_buffer_begin(result_buffer), + e_op); + } + if (segment_offsets[3] - segment_offsets[2] > 0) { + raft::grid_1d_thread_t update_grid(segment_offsets[3] - segment_offsets[2], + detail::transform_reduce_e_for_all_block_size, + handle.get_device_properties().maxGridSize[0]); + + detail::for_all_major_for_all_nbr_low_degree<<>>( + matrix_partition, + matrix_partition.get_major_first() + segment_offsets[2], + matrix_partition.get_major_last(), + adj_matrix_row_value_input_first + row_value_input_offset, + adj_matrix_col_value_input_first + col_value_input_offset, + get_dataframe_buffer_begin(result_buffer), + e_op); + } + } else { + if (matrix_partition.get_major_size() > 0) { + raft::grid_1d_thread_t update_grid(matrix_partition.get_major_size(), + detail::transform_reduce_e_for_all_block_size, + handle.get_device_properties().maxGridSize[0]); + + detail::for_all_major_for_all_nbr_low_degree<<>>( + matrix_partition, + matrix_partition.get_major_first(), + matrix_partition.get_major_last(), + adj_matrix_row_value_input_first + row_value_input_offset, + adj_matrix_col_value_input_first + col_value_input_offset, + get_dataframe_buffer_begin(result_buffer), + e_op); + } } } + auto result = + thrust::reduce(rmm::exec_policy(handle.get_stream())->on(handle.get_stream()), + get_dataframe_buffer_begin(result_buffer), + get_dataframe_buffer_begin(result_buffer) + 1, + T{}, + [] __device__(T lhs, T rhs) { return plus_edge_op_result(lhs, rhs); }); + if (GraphViewType::is_multi_gpu) { result = host_scalar_allreduce(handle.get_comms(), result, handle.get_stream()); } diff --git a/cpp/include/patterns/update_frontier_v_push_if_out_nbr.cuh b/cpp/include/patterns/update_frontier_v_push_if_out_nbr.cuh index 4d557b97a30..3d87f19969e 100644 --- a/cpp/include/patterns/update_frontier_v_push_if_out_nbr.cuh +++ b/cpp/include/patterns/update_frontier_v_push_if_out_nbr.cuh @@ -15,7 +15,6 @@ */ #pragma once -#include #include #include #include @@ -37,13 +36,15 @@ #include #include #include -#include +#include +#include #include #include #include #include #include +#include #include #include #include @@ -55,9 +56,7 @@ namespace experimental { namespace detail { -// FIXME: block size requires tuning -int32_t constexpr update_frontier_v_push_if_out_nbr_for_all_block_size = 128; -int32_t constexpr update_frontier_v_push_if_out_nbr_update_block_size = 128; +int32_t constexpr update_frontier_v_push_if_out_nbr_for_all_block_size = 512; template (thrust::distance(row_first, row_last)); auto const tid = threadIdx.x + blockIdx.x * blockDim.x; - size_t idx = tid; + auto idx = static_cast(tid); - while (idx < num_rows) { + while (idx < static_cast(thrust::distance(row_first, row_last))) { vertex_t row = *(row_first + idx); auto row_offset = matrix_partition.get_major_offset_from_major_nocheck(row); vertex_t const* indices{nullptr}; weight_t const* weights{nullptr}; edge_t local_out_degree{}; thrust::tie(indices, weights, local_out_degree) = matrix_partition.get_local_edges(row_offset); - for (vertex_t i = 0; i < local_out_degree; ++i) { + for (edge_t i = 0; i < local_out_degree; ++i) { auto col = indices[i]; auto weight = weights != nullptr ? weights[i] : 1.0; auto col_offset = matrix_partition.get_minor_offset_from_minor_nocheck(col); @@ -125,12 +123,145 @@ __global__ void for_all_frontier_row_for_all_nbr_low_degree( } } +template +__global__ void for_all_frontier_row_for_all_nbr_mid_degree( + matrix_partition_device_t matrix_partition, + RowIterator row_first, + RowIterator row_last, + AdjMatrixRowValueInputIterator adj_matrix_row_value_input_first, + AdjMatrixColValueInputIterator adj_matrix_col_value_input_first, + BufferKeyOutputIterator buffer_key_output_first, + BufferPayloadOutputIterator buffer_payload_output_first, + size_t* buffer_idx_ptr, + EdgeOp e_op) +{ + using vertex_t = typename GraphViewType::vertex_type; + using edge_t = typename GraphViewType::edge_type; + using weight_t = typename GraphViewType::weight_type; + + static_assert(!GraphViewType::is_adj_matrix_transposed, + "GraphViewType should support the push model."); + + auto const tid = threadIdx.x + blockIdx.x * blockDim.x; + static_assert(update_frontier_v_push_if_out_nbr_for_all_block_size % raft::warp_size() == 0); + auto const lane_id = tid % raft::warp_size(); + auto idx = static_cast(tid / raft::warp_size()); + + while (idx < static_cast(thrust::distance(row_first, row_last))) { + vertex_t row = *(row_first + idx); + auto row_offset = matrix_partition.get_major_offset_from_major_nocheck(row); + vertex_t const* indices{nullptr}; + weight_t const* weights{nullptr}; + edge_t local_out_degree{}; + thrust::tie(indices, weights, local_out_degree) = matrix_partition.get_local_edges(row_offset); + for (edge_t i = lane_id; i < local_out_degree; i += raft::warp_size()) { + auto col = indices[i]; + auto weight = weights != nullptr ? weights[i] : 1.0; + auto col_offset = matrix_partition.get_minor_offset_from_minor_nocheck(col); + auto e_op_result = evaluate_edge_op() + .compute(row, + col, + weight, + *(adj_matrix_row_value_input_first + row_offset), + *(adj_matrix_col_value_input_first + col_offset), + e_op); + if (thrust::get<0>(e_op_result) == true) { + // FIXME: This atomicAdd serializes execution. If we renumber vertices to insure that rows + // within a partition are sorted by their out-degree in decreasing order, we can compute + // a tight uppper bound for the maximum number of pushes per warp/block and use shared + // memory buffer to reduce the number of atomicAdd operations. + static_assert(sizeof(unsigned long long int) == sizeof(size_t)); + auto buffer_idx = atomicAdd(reinterpret_cast(buffer_idx_ptr), + static_cast(1)); + *(buffer_key_output_first + buffer_idx) = col; + *(buffer_payload_output_first + buffer_idx) = thrust::get<1>(e_op_result); + } + } + + idx += gridDim.x * (blockDim.x / raft::warp_size()); + } +} + +template +__global__ void for_all_frontier_row_for_all_nbr_high_degree( + matrix_partition_device_t matrix_partition, + RowIterator row_first, + RowIterator row_last, + AdjMatrixRowValueInputIterator adj_matrix_row_value_input_first, + AdjMatrixColValueInputIterator adj_matrix_col_value_input_first, + BufferKeyOutputIterator buffer_key_output_first, + BufferPayloadOutputIterator buffer_payload_output_first, + size_t* buffer_idx_ptr, + EdgeOp e_op) +{ + using vertex_t = typename GraphViewType::vertex_type; + using edge_t = typename GraphViewType::edge_type; + using weight_t = typename GraphViewType::weight_type; + + static_assert(!GraphViewType::is_adj_matrix_transposed, + "GraphViewType should support the push model."); + + auto idx = static_cast(blockIdx.x); + + while (idx < static_cast(thrust::distance(row_first, row_last))) { + vertex_t row = *(row_first + idx); + auto row_offset = matrix_partition.get_major_offset_from_major_nocheck(row); + vertex_t const* indices{nullptr}; + weight_t const* weights{nullptr}; + edge_t local_out_degree{}; + thrust::tie(indices, weights, local_out_degree) = matrix_partition.get_local_edges(row_offset); + for (edge_t i = threadIdx.x; i < local_out_degree; i += blockDim.x) { + auto col = indices[i]; + auto weight = weights != nullptr ? weights[i] : 1.0; + auto col_offset = matrix_partition.get_minor_offset_from_minor_nocheck(col); + auto e_op_result = evaluate_edge_op() + .compute(row, + col, + weight, + *(adj_matrix_row_value_input_first + row_offset), + *(adj_matrix_col_value_input_first + col_offset), + e_op); + if (thrust::get<0>(e_op_result) == true) { + // FIXME: This atomicAdd serializes execution. If we renumber vertices to insure that rows + // within a partition are sorted by their out-degree in decreasing order, we can compute + // a tight uppper bound for the maximum number of pushes per warp/block and use shared + // memory buffer to reduce the number of atomicAdd operations. + static_assert(sizeof(unsigned long long int) == sizeof(size_t)); + auto buffer_idx = atomicAdd(reinterpret_cast(buffer_idx_ptr), + static_cast(1)); + *(buffer_key_output_first + buffer_idx) = col; + *(buffer_payload_output_first + buffer_idx) = thrust::get<1>(e_op_result); + } + } + + idx += gridDim.x; + } +} + template -size_t reduce_buffer_elements(raft::handle_t const& handle, - BufferKeyOutputIterator buffer_key_output_first, - BufferPayloadOutputIterator buffer_payload_output_first, - size_t num_buffer_elements, - ReduceOp reduce_op) +size_t sort_and_reduce_buffer_elements(raft::handle_t const& handle, + BufferKeyOutputIterator buffer_key_output_first, + BufferPayloadOutputIterator buffer_payload_output_first, + size_t num_buffer_elements, + ReduceOp reduce_op) { thrust::sort_by_key(rmm::exec_policy(handle.get_stream())->on(handle.get_stream()), buffer_key_output_first, @@ -182,92 +313,6 @@ size_t reduce_buffer_elements(raft::handle_t const& handle, } } -template -__global__ void update_frontier_and_vertex_output_values( - vertex_partition_device_t vertex_partition, - BufferKeyInputIterator buffer_key_input_first, - BufferPayloadInputIterator buffer_payload_input_first, - size_t num_buffer_elements, - VertexValueInputIterator vertex_value_input_first, - VertexValueOutputIterator vertex_value_output_first, - vertex_t** bucket_ptrs, - size_t* bucket_sizes_ptr, - size_t invalid_bucket_idx, - vertex_t invalid_vertex, - VertexOp v_op) -{ - static_assert(std::is_same::value_type, - vertex_t>::value); - auto const tid = threadIdx.x + blockIdx.x * blockDim.x; - size_t idx = tid; - size_t block_idx = blockIdx.x; - // FIXME: it might be more performant to process more than one element per thread - auto num_blocks = (num_buffer_elements + blockDim.x - 1) / blockDim.x; - - using BlockScan = - cub::BlockScan; - __shared__ typename BlockScan::TempStorage temp_storage; - - __shared__ size_t bucket_block_start_offsets[num_buckets]; - - size_t bucket_block_local_offsets[num_buckets]; - size_t bucket_block_aggregate_sizes[num_buckets]; - - while (block_idx < num_blocks) { - for (size_t i = 0; i < num_buckets; ++i) { bucket_block_local_offsets[i] = 0; } - - size_t selected_bucket_idx{invalid_bucket_idx}; - vertex_t key{invalid_vertex}; - - if (idx < num_buffer_elements) { - key = *(buffer_key_input_first + idx); - auto key_offset = vertex_partition.get_local_vertex_offset_from_vertex_nocheck(key); - auto v_val = *(vertex_value_input_first + key_offset); - auto payload = *(buffer_payload_input_first + idx); - auto v_op_result = v_op(v_val, payload); - selected_bucket_idx = thrust::get<0>(v_op_result); - if (selected_bucket_idx != invalid_bucket_idx) { - *(vertex_value_output_first + key_offset) = thrust::get<1>(v_op_result); - bucket_block_local_offsets[selected_bucket_idx] = 1; - } - } - - for (size_t i = 0; i < num_buckets; ++i) { - BlockScan(temp_storage) - .ExclusiveSum(bucket_block_local_offsets[i], - bucket_block_local_offsets[i], - bucket_block_aggregate_sizes[i]); - } - - if (threadIdx.x == 0) { - for (size_t i = 0; i < num_buckets; ++i) { - static_assert(sizeof(unsigned long long int) == sizeof(size_t)); - bucket_block_start_offsets[i] = - atomicAdd(reinterpret_cast(bucket_sizes_ptr + i), - static_cast(bucket_block_aggregate_sizes[i])); - } - } - - __syncthreads(); - - // FIXME: better use shared memory buffer to aggreaget global memory writes - if (selected_bucket_idx != invalid_bucket_idx) { - bucket_ptrs[selected_bucket_idx][bucket_block_start_offsets[selected_bucket_idx] + - bucket_block_local_offsets[selected_bucket_idx]] = key; - } - - idx += gridDim.x * blockDim.x; - block_idx += gridDim.x; - } -} - } // namespace detail /** @@ -289,10 +334,12 @@ __global__ void update_frontier_and_vertex_output_values( * @param handle RAFT handle object to encapsulate resources (e.g. CUDA stream, communicator, and * handles to various CUDA libraries) to run graph algorithms. * @param graph_view Non-owning graph object. - * @param vertex_first Iterator pointing to the first (inclusive) vertex in the current frontier. v - * in [vertex_first, vertex_last) should be distinct (and should belong to this process in - * multi-GPU), otherwise undefined behavior - * @param vertex_last Iterator pointing to the last (exclusive) vertex in the current frontier. + * @param vertex_frontier VertexFrontier class object for vertex frontier managements. This object + * includes multiple bucket objects. + * @param cur_fontier_bucket_idx Index of the VertexFrontier bucket holding vertices for the current + * iteration. + * @param next_frontier_bucket_indices Indices of the VertexFrontier buckets to store new frontier + * vertices for the next iteration. * @param adj_matrix_row_value_input_first Iterator pointing to the adjacency matrix row input * properties for the first (inclusive) row (assigned to this process in multi-GPU). * `adj_matrix_row_value_input_last` (exclusive) is deduced as @p adj_matrix_row_value_input_first + @@ -314,35 +361,33 @@ __global__ void update_frontier_and_vertex_output_values( * (inclusive) vertex (assigned to tihs process in multi-GPU). `vertex_value_output_last` * (exclusive) is deduced as @p vertex_value_output_first + @p * graph_view.get_number_of_local_vertices(). - * @param vertex_frontier vertex frontier class object for vertex frontier managements. This object - * includes multiple bucket objects. * @param v_op Binary operator takes *(@p vertex_value_input_first + i) (where i is [0, @p * graph_view.get_number_of_local_vertices())) and reduced value of the @p e_op outputs for * this vertex and returns the target bucket index (for frontier update) and new verrtex property - * values (to update *(@p vertex_value_output_first + i)). + * values (to update *(@p vertex_value_output_first + i)). The target bucket index should either be + * VertexFrontier::kInvalidBucketIdx or an index in @p next_frontier_bucket_indices. */ template void update_frontier_v_push_if_out_nbr( raft::handle_t const& handle, GraphViewType const& graph_view, - VertexIterator vertex_first, - VertexIterator vertex_last, + VertexFrontierType& vertex_frontier, + size_t cur_frontier_bucket_idx, + std::vector const& next_frontier_bucket_indices, AdjMatrixRowValueInputIterator adj_matrix_row_value_input_first, AdjMatrixColValueInputIterator adj_matrix_col_value_input_first, EdgeOp e_op, ReduceOp reduce_op, VertexValueInputIterator vertex_value_input_first, VertexValueOutputIterator vertex_value_output_first, - VertexFrontierType& vertex_frontier, VertexOp v_op) { static_assert(!GraphViewType::is_adj_matrix_transposed, @@ -353,6 +398,9 @@ void update_frontier_v_push_if_out_nbr( using weight_t = typename GraphViewType::weight_type; using payload_t = typename ReduceOp::type; + auto cur_frontier_vertex_first = vertex_frontier.get_bucket(cur_frontier_bucket_idx).begin(); + auto cur_frontier_vertex_last = vertex_frontier.get_bucket(cur_frontier_bucket_idx).end(); + // 1. fill the buffer rmm::device_uvector keys(size_t{0}, handle.get_stream()); @@ -361,57 +409,55 @@ void update_frontier_v_push_if_out_nbr( for (size_t i = 0; i < graph_view.get_number_of_local_adj_matrix_partitions(); ++i) { matrix_partition_device_t matrix_partition(graph_view, i); - rmm::device_uvector frontier_rows( - 0, handle.get_stream()); // relevant only if GraphViewType::is_multi_gpu is true - - size_t frontier_size{}; + rmm::device_uvector frontier_rows(0, handle.get_stream()); if (GraphViewType::is_multi_gpu) { - auto& row_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().row_name()); - auto const row_comm_rank = row_comm.get_rank(); - auto const row_comm_size = row_comm.get_size(); auto& col_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().col_name()); auto const col_comm_rank = col_comm.get_rank(); - auto sub_comm_rank = col_comm_rank; - frontier_size = host_scalar_bcast(col_comm, - (static_cast(sub_comm_rank) == i) - ? thrust::distance(vertex_first, vertex_last) - : size_t{0}, - i, - handle.get_stream()); - if (static_cast(sub_comm_rank) != i) { - frontier_rows.resize(frontier_size, handle.get_stream()); + auto frontier_size = + host_scalar_bcast(col_comm, + (static_cast(col_comm_rank) == i) + ? thrust::distance(cur_frontier_vertex_first, cur_frontier_vertex_last) + : size_t{0} /* dummy */, + i, + handle.get_stream()); + frontier_rows.resize(frontier_size, handle.get_stream()); + + if (static_cast(col_comm_rank) == i) { + thrust::copy(rmm::exec_policy(handle.get_stream())->on(handle.get_stream()), + cur_frontier_vertex_first, + cur_frontier_vertex_last, + frontier_rows.begin()); } - device_bcast( - col_comm, vertex_first, frontier_rows.begin(), frontier_size, i, handle.get_stream()); + + device_bcast(col_comm, + cur_frontier_vertex_first, + frontier_rows.begin(), + frontier_size, + i, + handle.get_stream()); } else { - frontier_size = thrust::distance(vertex_first, vertex_last); + frontier_rows.resize(thrust::distance(cur_frontier_vertex_first, cur_frontier_vertex_last), + handle.get_stream()); + thrust::copy(rmm::exec_policy(handle.get_stream())->on(handle.get_stream()), + cur_frontier_vertex_first, + cur_frontier_vertex_last, + frontier_rows.begin()); } - auto max_pushes = - frontier_size > 0 - ? frontier_rows.size() > 0 - ? thrust::transform_reduce( - rmm::exec_policy(handle.get_stream())->on(handle.get_stream()), - frontier_rows.begin(), - frontier_rows.end(), - [matrix_partition] __device__(auto row) { - auto row_offset = matrix_partition.get_major_offset_from_major_nocheck(row); - return matrix_partition.get_local_degree(row_offset); - }, - edge_t{0}, - thrust::plus()) - : thrust::transform_reduce( - rmm::exec_policy(handle.get_stream())->on(handle.get_stream()), - vertex_first, - vertex_last, - [matrix_partition] __device__(auto row) { - auto row_offset = matrix_partition.get_major_offset_from_major_nocheck(row); - return matrix_partition.get_local_degree(row_offset); - }, - edge_t{0}, - thrust::plus()) - : edge_t{0}; + auto max_pushes = frontier_rows.size() > 0 + ? thrust::transform_reduce( + rmm::exec_policy(handle.get_stream())->on(handle.get_stream()), + frontier_rows.begin(), + frontier_rows.end(), + [matrix_partition] __device__(auto row) { + auto row_offset = + matrix_partition.get_major_offset_from_major_nocheck(row); + return matrix_partition.get_local_degree(row_offset); + }, + edge_t{0}, + thrust::plus()) + : edge_t{0}; // FIXME: This is highly pessimistic for single GPU (and multi-GPU as well if we maintain // additional per column data for filtering in e_op). If we can pause & resume execution if @@ -433,23 +479,80 @@ void update_frontier_v_push_if_out_nbr( auto row_value_input_offset = GraphViewType::is_adj_matrix_transposed ? vertex_t{0} : matrix_partition.get_major_value_start_offset(); - - // FIXME: This is highly inefficeint for graphs with high-degree vertices. If we renumber - // vertices to insure that rows within a partition are sorted by their out-degree in decreasing - // order, we will apply this kernel only to low out-degree vertices. - if (frontier_size > 0) { - raft::grid_1d_thread_t for_all_low_degree_grid( - frontier_size, - detail::update_frontier_v_push_if_out_nbr_for_all_block_size, - handle.get_device_properties().maxGridSize[0]); - - if (frontier_rows.size() > 0) { - detail::for_all_frontier_row_for_all_nbr_low_degree<< 0) { + static_assert(detail::num_segments_per_vertex_partition == 3); + std::vector h_thresholds(detail::num_segments_per_vertex_partition - 1); + h_thresholds[0] = matrix_partition.get_major_first() + segment_offsets[1]; + h_thresholds[1] = matrix_partition.get_major_first() + segment_offsets[2]; + rmm::device_uvector d_thresholds(h_thresholds.size(), handle.get_stream()); + raft::update_device( + d_thresholds.data(), h_thresholds.data(), h_thresholds.size(), handle.get_stream()); + rmm::device_uvector d_offsets(d_thresholds.size(), handle.get_stream()); + thrust::lower_bound(rmm::exec_policy(handle.get_stream())->on(handle.get_stream()), + frontier_rows.begin(), + frontier_rows.end(), + d_thresholds.begin(), + d_thresholds.end(), + d_offsets.begin()); + std::vector h_offsets(d_offsets.size()); + raft::update_host(h_offsets.data(), d_offsets.data(), d_offsets.size(), handle.get_stream()); + CUDA_TRY(cudaStreamSynchronize(handle.get_stream())); + // FIXME: we may further improve performance by 1) concurrently running kernels on different + // segments; 2) individually tuning block sizes for different segments; and 3) adding one more + // segment for very high degree vertices and running segmented reduction + if (h_offsets[0] > 0) { + raft::grid_1d_block_t update_grid( + h_offsets[0], + detail::update_frontier_v_push_if_out_nbr_for_all_block_size, + handle.get_device_properties().maxGridSize[0]); + + detail::for_all_frontier_row_for_all_nbr_high_degree<<>>( + matrix_partition, + frontier_rows.begin(), + frontier_rows.begin() + h_offsets[0], + adj_matrix_row_value_input_first + row_value_input_offset, + adj_matrix_col_value_input_first, + keys.begin(), + get_dataframe_buffer_begin(payload_buffer), + buffer_idx.data(), + e_op); + } + if (h_offsets[1] - h_offsets[0] > 0) { + raft::grid_1d_warp_t update_grid( + h_offsets[1] - h_offsets[0], + detail::update_frontier_v_push_if_out_nbr_for_all_block_size, + handle.get_device_properties().maxGridSize[0]); + + detail::for_all_frontier_row_for_all_nbr_mid_degree<<>>( matrix_partition, - frontier_rows.begin(), + frontier_rows.begin() + h_offsets[0], + frontier_rows.begin() + h_offsets[1], + adj_matrix_row_value_input_first + row_value_input_offset, + adj_matrix_col_value_input_first, + keys.begin(), + get_dataframe_buffer_begin(payload_buffer), + buffer_idx.data(), + e_op); + } + if (frontier_rows.size() - h_offsets[1] > 0) { + raft::grid_1d_thread_t update_grid( + frontier_rows.size() - h_offsets[1], + detail::update_frontier_v_push_if_out_nbr_for_all_block_size, + handle.get_device_properties().maxGridSize[0]); + + detail::for_all_frontier_row_for_all_nbr_low_degree<<>>( + matrix_partition, + frontier_rows.begin() + h_offsets[1], frontier_rows.end(), adj_matrix_row_value_input_first + row_value_input_offset, adj_matrix_col_value_input_first, @@ -457,14 +560,21 @@ void update_frontier_v_push_if_out_nbr( get_dataframe_buffer_begin(payload_buffer), buffer_idx.data(), e_op); - } else { - detail::for_all_frontier_row_for_all_nbr_low_degree<< 0) { + raft::grid_1d_thread_t update_grid( + frontier_rows.size(), + detail::update_frontier_v_push_if_out_nbr_for_all_block_size, + handle.get_device_properties().maxGridSize[0]); + + detail::for_all_frontier_row_for_all_nbr_low_degree<<>>( matrix_partition, - vertex_first, - vertex_last, + frontier_rows.begin(), + frontier_rows.end(), adj_matrix_row_value_input_first + row_value_input_offset, adj_matrix_col_value_input_first, keys.begin(), @@ -478,12 +588,13 @@ void update_frontier_v_push_if_out_nbr( // 2. reduce the buffer auto num_buffer_elements = - detail::reduce_buffer_elements(handle, - keys.begin(), - get_dataframe_buffer_begin(payload_buffer), - buffer_idx.value(handle.get_stream()), - reduce_op); + detail::sort_and_reduce_buffer_elements(handle, + keys.begin(), + get_dataframe_buffer_begin(payload_buffer), + buffer_idx.value(handle.get_stream()), + reduce_op); if (GraphViewType::is_multi_gpu) { + // FIXME: this step is unnecessary if row_comm_size== 1 auto& comm = handle.get_comms(); auto const comm_rank = comm.get_rank(); auto& row_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().row_name()); @@ -533,49 +644,113 @@ void update_frontier_v_push_if_out_nbr( payload_buffer = std::move(rx_payload_buffer); num_buffer_elements = - detail::reduce_buffer_elements(handle, - keys.begin(), - get_dataframe_buffer_begin(payload_buffer), - keys.size(), - reduce_op); + detail::sort_and_reduce_buffer_elements(handle, + keys.begin(), + get_dataframe_buffer_begin(payload_buffer), + keys.size(), + reduce_op); } // 3. update vertex properties if (num_buffer_elements > 0) { - raft::grid_1d_thread_t update_grid(num_buffer_elements, - detail::update_frontier_v_push_if_out_nbr_update_block_size, - handle.get_device_properties().maxGridSize[0]); - - auto constexpr invalid_vertex = invalid_vertex_id::value; + static_assert(VertexFrontierType::kNumBuckets <= std::numeric_limits::max()); + rmm::device_uvector bucket_indices(num_buffer_elements, handle.get_stream()); vertex_partition_device_t vertex_partition(graph_view); - auto bucket_and_bucket_size_device_ptrs = - vertex_frontier.get_bucket_and_bucket_size_device_pointers(); - detail::update_frontier_and_vertex_output_values - <<>>( - vertex_partition, - keys.begin(), - get_dataframe_buffer_begin(payload_buffer), - num_buffer_elements, - vertex_value_input_first, - vertex_value_output_first, - std::get<0>(bucket_and_bucket_size_device_ptrs), - std::get<1>(bucket_and_bucket_size_device_ptrs), - VertexFrontierType::kInvalidBucketIdx, - invalid_vertex, - v_op); - - auto bucket_sizes_device_ptr = std::get<1>(bucket_and_bucket_size_device_ptrs); - std::vector bucket_sizes(VertexFrontierType::kNumBuckets); - raft::update_host(bucket_sizes.data(), - bucket_sizes_device_ptr, - VertexFrontierType::kNumBuckets, - handle.get_stream()); - CUDA_TRY(cudaStreamSynchronize(handle.get_stream())); - for (size_t i = 0; i < VertexFrontierType::kNumBuckets; ++i) { - vertex_frontier.get_bucket(i).set_size(bucket_sizes[i]); + auto key_payload_pair_first = thrust::make_zip_iterator( + thrust::make_tuple(keys.begin(), get_dataframe_buffer_begin(payload_buffer))); + thrust::transform( + rmm::exec_policy(handle.get_stream())->on(handle.get_stream()), + key_payload_pair_first, + key_payload_pair_first + num_buffer_elements, + bucket_indices.begin(), + [vertex_value_input_first, + vertex_value_output_first, + v_op, + vertex_partition, + invalid_bucket_idx = VertexFrontierType::kInvalidBucketIdx] __device__(auto pair) { + auto key = thrust::get<0>(pair); + auto payload = thrust::get<1>(pair); + auto key_offset = vertex_partition.get_local_vertex_offset_from_vertex_nocheck(key); + auto v_val = *(vertex_value_input_first + key_offset); + auto v_op_result = v_op(v_val, payload); + auto bucket_idx = thrust::get<0>(v_op_result); + if (bucket_idx != invalid_bucket_idx) { + *(vertex_value_output_first + key_offset) = thrust::get<1>(v_op_result); + return static_cast(bucket_idx); + } else { + return std::numeric_limits::max(); + } + }); + + resize_dataframe_buffer(payload_buffer, size_t{0}, handle.get_stream()); + shrink_to_fit_dataframe_buffer(payload_buffer, handle.get_stream()); + + auto bucket_key_pair_first = + thrust::make_zip_iterator(thrust::make_tuple(bucket_indices.begin(), keys.begin())); + keys.resize(thrust::distance( + bucket_key_pair_first, + thrust::remove_if(rmm::exec_policy(handle.get_stream())->on(handle.get_stream()), + bucket_key_pair_first, + bucket_key_pair_first + num_buffer_elements, + [] __device__(auto pair) { + return thrust::get<0>(pair) == + std::numeric_limits::max(); + })), + handle.get_stream()); + bucket_indices.resize(keys.size(), handle.get_stream()); + keys.shrink_to_fit(handle.get_stream()); + bucket_indices.shrink_to_fit(handle.get_stream()); + + bucket_key_pair_first = + thrust::make_zip_iterator(thrust::make_tuple(bucket_indices.begin(), keys.begin())); + if (next_frontier_bucket_indices.size() == 1) { + vertex_frontier.get_bucket(next_frontier_bucket_indices[0]).insert(keys.begin(), keys.size()); + } else if (next_frontier_bucket_indices.size() == 2) { + auto first_bucket_size = thrust::distance( + bucket_key_pair_first, + thrust::stable_partition( // stalbe_partition to maintain sorted order within each bucket + rmm::exec_policy(handle.get_stream())->on(handle.get_stream()), + bucket_key_pair_first, + bucket_key_pair_first + bucket_indices.size(), + [first_bucket_idx = static_cast(next_frontier_bucket_indices[0])] __device__( + auto pair) { return thrust::get<0>(pair) == first_bucket_idx; })); + vertex_frontier.get_bucket(next_frontier_bucket_indices[0]) + .insert(keys.begin(), first_bucket_size); + vertex_frontier.get_bucket(next_frontier_bucket_indices[1]) + .insert(keys.begin() + first_bucket_size, + thrust::distance(keys.begin() + first_bucket_size, keys.end())); + } else { + thrust::sort(rmm::exec_policy(handle.get_stream())->on(handle.get_stream()), + bucket_key_pair_first, + bucket_key_pair_first + bucket_indices.size()); + rmm::device_uvector d_indices(next_frontier_bucket_indices.size(), + handle.get_stream()); + rmm::device_uvector d_counts(d_indices.size(), handle.get_stream()); + auto it = + thrust::reduce_by_key(rmm::exec_policy(handle.get_stream())->on(handle.get_stream()), + bucket_indices.begin(), + bucket_indices.end(), + thrust::make_constant_iterator(size_t{1}), + d_indices.begin(), + d_counts.begin()); + d_indices.resize(thrust::distance(d_indices.begin(), thrust::get<0>(it)), + handle.get_stream()); + d_counts.resize(d_indices.size(), handle.get_stream()); + std::vector h_indices(d_indices.size()); + std::vector h_counts(h_indices.size()); + raft::update_host(h_indices.data(), d_indices.data(), d_indices.size(), handle.get_stream()); + raft::update_host(h_counts.data(), d_counts.data(), d_counts.size(), handle.get_stream()); + handle.get_stream_view().synchronize(); + std::vector h_offsets(h_indices.size(), 0); + std::partial_sum(h_counts.begin(), h_counts.end() - 1, h_offsets.begin() + 1); + for (size_t i = 0; i < h_indices.size(); ++i) { + if (h_counts[i] > 0) { + vertex_frontier.get_bucket(h_indices[i]).insert(keys.begin() + h_offsets[i], h_counts[i]); + } + } } } } diff --git a/cpp/include/patterns/vertex_frontier.cuh b/cpp/include/patterns/vertex_frontier.cuh index 375ec097850..4758334e9fc 100644 --- a/cpp/include/patterns/vertex_frontier.cuh +++ b/cpp/include/patterns/vertex_frontier.cuh @@ -24,8 +24,7 @@ #include #include -#include -#include +#include #include #include @@ -37,129 +36,80 @@ namespace cugraph { namespace experimental { -namespace detail { - -// FIXME: block size requires tuning -int32_t constexpr move_and_invalidate_if_block_size = 128; - -// FIXME: better move to another file for reusability -inline size_t round_up(size_t number_to_round, size_t modulus) -{ - return ((number_to_round + (modulus - 1)) / modulus) * modulus; -} - -template -__global__ void move_and_invalidate_if(RowIterator row_first, - RowIterator row_last, - vertex_t** bucket_ptrs, - size_t* bucket_sizes_ptr, - size_t this_bucket_idx, - size_t invalid_bucket_idx, - vertex_t invalid_vertex, - SplitOp split_op) -{ - static_assert( - std::is_same::value_type, vertex_t>::value); - auto const tid = threadIdx.x + blockIdx.x * blockDim.x; - size_t idx = tid; - size_t block_idx = blockIdx.x; - auto num_elements = thrust::distance(row_first, row_last); - // FIXME: it might be more performant to process more than one element per thread - auto num_blocks = (num_elements + blockDim.x - 1) / blockDim.x; - - using BlockScan = cub::BlockScan; - __shared__ typename BlockScan::TempStorage temp_storage; - - __shared__ size_t bucket_block_start_offsets[num_buckets]; - - size_t bucket_block_local_offsets[num_buckets]; - size_t bucket_block_aggregate_sizes[num_buckets]; - - while (block_idx < num_blocks) { - for (size_t i = 0; i < num_buckets; ++i) { bucket_block_local_offsets[i] = 0; } - - size_t selected_bucket_idx{invalid_bucket_idx}; - vertex_t key{invalid_vertex}; - - if (idx < num_elements) { - key = *(row_first + idx); - selected_bucket_idx = split_op(key); - if (selected_bucket_idx != this_bucket_idx) { - *(row_first + idx) = invalid_vertex; - if (selected_bucket_idx != invalid_bucket_idx) { - bucket_block_local_offsets[selected_bucket_idx] = 1; - } - } - } - - for (size_t i = 0; i < num_buckets; ++i) { - BlockScan(temp_storage) - .ExclusiveSum(bucket_block_local_offsets[i], - bucket_block_local_offsets[i], - bucket_block_aggregate_sizes[i]); - } - - if (threadIdx.x == 0) { - for (size_t i = 0; i < num_buckets; ++i) { - static_assert(sizeof(unsigned long long int) == sizeof(size_t)); - bucket_block_start_offsets[i] = - atomicAdd(reinterpret_cast(bucket_sizes_ptr + i), - static_cast(bucket_block_aggregate_sizes[i])); - } - } - - __syncthreads(); - - // FIXME: better use shared memory buffer to aggreaget global memory writes - if ((selected_bucket_idx != this_bucket_idx) && (selected_bucket_idx != invalid_bucket_idx)) { - bucket_ptrs[selected_bucket_idx][bucket_block_start_offsets[selected_bucket_idx] + - bucket_block_local_offsets[selected_bucket_idx]] = key; - } - - idx += gridDim.x * blockDim.x; - block_idx += gridDim.x; - } -} - -} // namespace detail - template -class Bucket { +class SortedUniqueElementBucket { public: - Bucket(raft::handle_t const& handle, size_t capacity) - : handle_ptr_(&handle), elements_(capacity, handle.get_stream()) + SortedUniqueElementBucket(raft::handle_t const& handle) + : handle_ptr_(&handle), elements_(0, handle.get_stream()) { - thrust::fill(rmm::exec_policy(handle_ptr_->get_stream())->on(handle_ptr_->get_stream()), - elements_.begin(), - elements_.end(), - invalid_vertex_id::value); } void insert(vertex_t v) { - raft::update_device(elements_.data() + size_, &v, 1, handle_ptr_->get_stream()); - ++size_; + if (elements_.size() > 0) { + rmm::device_scalar vertex(v, handle_ptr_->get_stream()); + insert(vertex.data(), vertex_t{1}); + } else { + elements_.resize(1, handle_ptr_->get_stream()); + raft::update_device(elements_.data(), &v, size_t{1}, handle_ptr_->get_stream()); + } } - size_t size() const { return size_; } + /** + * @ brief insert a list of vertices to the bucket + * + * @param sorted_unique_vertices Device pointer to the array storing the vertex list. + * @param num_sorted_unique_vertices Size of the vertex list to insert. + */ + void insert(vertex_t const* sorted_unique_vertices, vertex_t num_sorted_unique_vertices) + { + if (elements_.size() > 0) { + rmm::device_uvector merged_vertices(elements_.size() + num_sorted_unique_vertices, + handle_ptr_->get_stream()); + thrust::merge(rmm::exec_policy(handle_ptr_->get_stream())->on(handle_ptr_->get_stream()), + elements_.begin(), + elements_.end(), + sorted_unique_vertices, + sorted_unique_vertices + num_sorted_unique_vertices, + merged_vertices.begin()); + merged_vertices.resize( + thrust::distance( + merged_vertices.begin(), + thrust::unique(rmm::exec_policy(handle_ptr_->get_stream())->on(handle_ptr_->get_stream()), + merged_vertices.begin(), + merged_vertices.end())), + handle_ptr_->get_stream()); + merged_vertices.shrink_to_fit(handle_ptr_->get_stream()); + elements_ = std::move(merged_vertices); + } else { + elements_.resize(num_sorted_unique_vertices, handle_ptr_->get_stream()); + thrust::copy(rmm::exec_policy(handle_ptr_->get_stream())->on(handle_ptr_->get_stream()), + sorted_unique_vertices, + sorted_unique_vertices + num_sorted_unique_vertices, + elements_.begin()); + } + } - void set_size(size_t size) { size_ = size; } + size_t size() const { return elements_.size(); } template std::enable_if_t aggregate_size() const { - return host_scalar_allreduce(handle_ptr_->get_comms(), size_, handle_ptr_->get_stream()); + return host_scalar_allreduce( + handle_ptr_->get_comms(), elements_.size(), handle_ptr_->get_stream()); } template std::enable_if_t aggregate_size() const { - return size_; + return elements_.size(); } - void clear() { size_ = 0; } + void resize(size_t size) { elements_.resize(size, handle_ptr_->get_stream()); } + + void clear() { elements_.resize(0, handle_ptr_->get_stream()); } - size_t capacity() const { return elements_.size(); } + void shrink_to_fit() { elements_.shrink_to_fit(handle_ptr_->get_stream()); } auto const data() const { return elements_.data(); } @@ -169,14 +119,13 @@ class Bucket { auto begin() { return elements_.begin(); } - auto const end() const { return elements_.begin() + size_; } + auto const end() const { return elements_.end(); } - auto end() { return elements_.begin() + size_; } + auto end() { return elements_.end(); } private: raft::handle_t const* handle_ptr_{nullptr}; rmm::device_uvector elements_; - size_t size_{0}; }; template @@ -185,29 +134,17 @@ class VertexFrontier { static size_t constexpr kNumBuckets = num_buckets; static size_t constexpr kInvalidBucketIdx{std::numeric_limits::max()}; - VertexFrontier(raft::handle_t const& handle, std::vector bucket_capacities) - : handle_ptr_(&handle), - tmp_bucket_ptrs_(num_buckets, handle.get_stream()), - tmp_bucket_sizes_(num_buckets, handle.get_stream()) + VertexFrontier(raft::handle_t const& handle) : handle_ptr_(&handle) { - CUGRAPH_EXPECTS(bucket_capacities.size() == num_buckets, - "invalid input argument bucket_capacities (size mismatch)"); - thrust::fill(rmm::exec_policy(handle_ptr_->get_stream())->on(handle_ptr_->get_stream()), - tmp_bucket_ptrs_.begin(), - tmp_bucket_ptrs_.end(), - static_cast(nullptr)); - thrust::fill(rmm::exec_policy(handle_ptr_->get_stream())->on(handle_ptr_->get_stream()), - tmp_bucket_sizes_.begin(), - tmp_bucket_sizes_.end(), - size_t{0}); - for (size_t i = 0; i < num_buckets; ++i) { - buckets_.emplace_back(handle, bucket_capacities[i]); - } + for (size_t i = 0; i < num_buckets; ++i) { buckets_.emplace_back(handle); } } - Bucket& get_bucket(size_t bucket_idx) { return buckets_[bucket_idx]; } + SortedUniqueElementBucket& get_bucket(size_t bucket_idx) + { + return buckets_[bucket_idx]; + } - Bucket const& get_bucket(size_t bucket_idx) const + SortedUniqueElementBucket const& get_bucket(size_t bucket_idx) const { return buckets_[bucket_idx]; } @@ -218,78 +155,111 @@ class VertexFrontier { } template - void split_bucket(size_t bucket_idx, SplitOp split_op) + void split_bucket(size_t this_bucket_idx, + std::vector const& move_to_bucket_indices, + SplitOp split_op) { - auto constexpr invalid_vertex = invalid_vertex_id::value; - - auto bucket_and_bucket_size_device_ptrs = get_bucket_and_bucket_size_device_pointers(); - - auto& this_bucket = get_bucket(bucket_idx); + auto& this_bucket = get_bucket(this_bucket_idx); if (this_bucket.size() > 0) { - raft::grid_1d_thread_t move_and_invalidate_if_grid( - this_bucket.size(), - detail::move_and_invalidate_if_block_size, - handle_ptr_->get_device_properties().maxGridSize[0]); - - detail::move_and_invalidate_if - <<get_stream()>>>(this_bucket.begin(), - this_bucket.end(), - std::get<0>(bucket_and_bucket_size_device_ptrs), - std::get<1>(bucket_and_bucket_size_device_ptrs), - bucket_idx, - kInvalidBucketIdx, - invalid_vertex, - split_op); - } + static_assert(kNumBuckets <= std::numeric_limits::max()); + rmm::device_uvector bucket_indices(this_bucket.size(), handle_ptr_->get_stream()); + thrust::transform( + rmm::exec_policy(handle_ptr_->get_stream())->on(handle_ptr_->get_stream()), + this_bucket.begin(), + this_bucket.end(), + bucket_indices.begin(), + [split_op] __device__(auto v) { return static_cast(split_op(v)); }); + + auto pair_first = + thrust::make_zip_iterator(thrust::make_tuple(bucket_indices.begin(), this_bucket.begin())); + this_bucket.resize(thrust::distance( + pair_first, + thrust::remove_if( + rmm::exec_policy(handle_ptr_->get_stream())->on(handle_ptr_->get_stream()), + pair_first, + pair_first + bucket_indices.size(), + [invalid_bucket_idx = static_cast(kInvalidBucketIdx)] __device__(auto pair) { + return thrust::get<0>(pair) == invalid_bucket_idx; + }))); + bucket_indices.resize(this_bucket.size(), handle_ptr_->get_stream()); + this_bucket.shrink_to_fit(); + bucket_indices.shrink_to_fit(handle_ptr_->get_stream()); + + pair_first = + thrust::make_zip_iterator(thrust::make_tuple(bucket_indices.begin(), this_bucket.begin())); + auto new_this_bucket_size = thrust::distance( + pair_first, + thrust::stable_partition( // stalbe_partition to maintain sorted order within each bucket + rmm::exec_policy(handle_ptr_->get_stream())->on(handle_ptr_->get_stream()), + pair_first, + pair_first + bucket_indices.size(), + [this_bucket_idx = static_cast(this_bucket_idx)] __device__(auto pair) { + return thrust::get<0>(pair) == this_bucket_idx; + })); + + if (move_to_bucket_indices.size() == 1) { + get_bucket(move_to_bucket_indices[0]) + .insert(this_bucket.begin() + new_this_bucket_size, + thrust::distance(this_bucket.begin() + new_this_bucket_size, this_bucket.end())); + } else if (move_to_bucket_indices.size() == 2) { + auto next_bucket_size = thrust::distance( + pair_first + new_this_bucket_size, + thrust::stable_partition( // stalbe_partition to maintain sorted order within each bucket + rmm::exec_policy(handle_ptr_->get_stream())->on(handle_ptr_->get_stream()), + pair_first + new_this_bucket_size, + pair_first + bucket_indices.size(), + [next_bucket_idx = static_cast(move_to_bucket_indices[0])] __device__( + auto pair) { return thrust::get<0>(pair) == next_bucket_idx; })); + get_bucket(move_to_bucket_indices[0]) + .insert(this_bucket.begin() + new_this_bucket_size, next_bucket_size); + get_bucket(move_to_bucket_indices[1]) + .insert(this_bucket.begin() + new_this_bucket_size + next_bucket_size, + thrust::distance(this_bucket.begin() + new_this_bucket_size + next_bucket_size, + this_bucket.end())); + } else { + thrust::sort(rmm::exec_policy(handle_ptr_->get_stream())->on(handle_ptr_->get_stream()), + pair_first + new_this_bucket_size, + pair_first + bucket_indices.size()); + rmm::device_uvector d_indices(move_to_bucket_indices.size(), + handle_ptr_->get_stream()); + rmm::device_uvector d_counts(d_indices.size(), handle_ptr_->get_stream()); + auto it = thrust::reduce_by_key( + rmm::exec_policy(handle_ptr_->get_stream())->on(handle_ptr_->get_stream()), + bucket_indices.begin() + new_this_bucket_size, + bucket_indices.end(), + thrust::make_constant_iterator(size_t{1}), + d_indices.begin(), + d_counts.begin()); + d_indices.resize(thrust::distance(d_indices.begin(), thrust::get<0>(it)), + handle_ptr_->get_stream()); + d_counts.resize(d_indices.size(), handle_ptr_->get_stream()); + std::vector h_indices(d_indices.size()); + std::vector h_counts(h_indices.size()); + raft::update_host( + h_indices.data(), d_indices.data(), d_indices.size(), handle_ptr_->get_stream()); + raft::update_host( + h_counts.data(), d_counts.data(), d_counts.size(), handle_ptr_->get_stream()); + handle_ptr_->get_stream_view().synchronize(); + std::vector h_offsets(h_indices.size(), 0); + std::partial_sum(h_counts.begin(), h_counts.end() - 1, h_offsets.begin() + 1); + for (size_t i = 0; i < h_indices.size(); ++i) { + if (h_counts[i] > 0) { + get_bucket(h_indices[i]) + .insert(this_bucket.begin() + new_this_bucket_size + h_offsets[i], h_counts[i]); + } + } + } - // FIXME: if we adopt CUDA cooperative group https://devblogs.nvidia.com/cooperative-groups - // and global sync(), we can merge this step with the above kernel (and rename the above kernel - // to move_if) - auto it = - thrust::remove_if(rmm::exec_policy(handle_ptr_->get_stream())->on(handle_ptr_->get_stream()), - get_bucket(bucket_idx).begin(), - get_bucket(bucket_idx).end(), - [] __device__(auto value) { return value == invalid_vertex; }); - - auto bucket_sizes_device_ptr = std::get<1>(bucket_and_bucket_size_device_ptrs); - std::vector bucket_sizes(kNumBuckets); - raft::update_host( - bucket_sizes.data(), bucket_sizes_device_ptr, kNumBuckets, handle_ptr_->get_stream()); - CUDA_TRY(cudaStreamSynchronize(handle_ptr_->get_stream())); - for (size_t i = 0; i < kNumBuckets; ++i) { - if (i != bucket_idx) { get_bucket(i).set_size(bucket_sizes[i]); } + this_bucket.resize(new_this_bucket_size); + this_bucket.shrink_to_fit(); } - auto size = thrust::distance(get_bucket(bucket_idx).begin(), it); - get_bucket(bucket_idx).set_size(size); - return; } - auto get_bucket_and_bucket_size_device_pointers() - { - std::vector tmp_ptrs(buckets_.size(), nullptr); - std::vector tmp_sizes(buckets_.size(), 0); - for (size_t i = 0; i < buckets_.size(); ++i) { - tmp_ptrs[i] = get_bucket(i).data(); - tmp_sizes[i] = get_bucket(i).size(); - } - raft::update_device( - tmp_bucket_ptrs_.data(), tmp_ptrs.data(), tmp_ptrs.size(), handle_ptr_->get_stream()); - raft::update_device( - tmp_bucket_sizes_.data(), tmp_sizes.data(), tmp_sizes.size(), handle_ptr_->get_stream()); - CUDA_TRY(cudaStreamSynchronize(handle_ptr_->get_stream())); - return std::make_tuple(tmp_bucket_ptrs_.data(), tmp_bucket_sizes_.data()); - } - private: raft::handle_t const* handle_ptr_{nullptr}; - std::vector> buckets_{}; - rmm::device_uvector tmp_bucket_ptrs_; - rmm::device_uvector tmp_bucket_sizes_; + std::vector> buckets_{}; }; } // namespace experimental diff --git a/cpp/include/utilities/dataframe_buffer.cuh b/cpp/include/utilities/dataframe_buffer.cuh index e59b12f2a80..b0e9c1ebfec 100644 --- a/cpp/include/utilities/dataframe_buffer.cuh +++ b/cpp/include/utilities/dataframe_buffer.cuh @@ -61,6 +61,21 @@ struct resize_dataframe_buffer_tuple_iterator_element_impl +struct shrink_to_fit_dataframe_buffer_tuple_iterator_element_impl { + void run(BufferType& buffer, cudaStream_t stream) + { + std::get(buffer).shrink_to_fit(stream); + shrink_to_fit_dataframe_buffer_tuple_iterator_element_impl() + .run(buffer, stream); + } +}; + +template +struct shrink_to_fit_dataframe_buffer_tuple_iterator_element_impl { + void run(BufferType& buffer, cudaStream_t stream) {} +}; + template auto get_dataframe_buffer_begin_tuple_element_impl(BufferType& buffer) { @@ -111,6 +126,27 @@ void resize_dataframe_buffer(BufferType& buffer, size_t new_buffer_size, cudaStr .run(buffer, new_buffer_size, stream); } +template ::value>* = nullptr> +void shrink_to_fit_dataframe_buffer(BufferType& buffer, cudaStream_t stream) +{ + buffer.shrink_to_fit(stream); +} + +template ::value>* = nullptr> +void shrink_to_fit_dataframe_buffer(BufferType& buffer, cudaStream_t stream) +{ + size_t constexpr tuple_size = thrust::tuple_size::value; + detail::shrink_to_fit_dataframe_buffer_tuple_iterator_element_impl() + .run(buffer, stream); +} + template ::value>* = nullptr> diff --git a/cpp/src/experimental/bfs.cu b/cpp/src/experimental/bfs.cu index 9145e3737b6..2a703c1c85e 100644 --- a/cpp/src/experimental/bfs.cu +++ b/cpp/src/experimental/bfs.cu @@ -90,11 +90,9 @@ void bfs(raft::handle_t const &handle, // 3. initialize BFS frontier - enum class Bucket { cur, num_buckets }; - std::vector bucket_sizes(static_cast(Bucket::num_buckets), - push_graph_view.get_number_of_local_vertices()); + enum class Bucket { cur, next, num_buckets }; VertexFrontier(Bucket::num_buckets)> - vertex_frontier(handle, bucket_sizes); + vertex_frontier(handle); if (push_graph_view.is_local_vertex_nocheck(source_vertex)) { vertex_frontier.get_bucket(static_cast(Bucket::cur)).insert(source_vertex); @@ -103,23 +101,18 @@ void bfs(raft::handle_t const &handle, // 4. BFS iteration vertex_t depth{0}; - auto cur_local_vertex_frontier_first = - vertex_frontier.get_bucket(static_cast(Bucket::cur)).begin(); - auto cur_vertex_frontier_aggregate_size = - vertex_frontier.get_bucket(static_cast(Bucket::cur)).aggregate_size(); while (true) { if (direction_optimizing) { CUGRAPH_FAIL("unimplemented."); } else { vertex_partition_device_t vertex_partition(push_graph_view); - auto cur_local_vertex_frontier_last = - vertex_frontier.get_bucket(static_cast(Bucket::cur)).end(); update_frontier_v_push_if_out_nbr( handle, push_graph_view, - cur_local_vertex_frontier_first, - cur_local_vertex_frontier_last, + vertex_frontier, + static_cast(Bucket::cur), + std::vector{static_cast(Bucket::next)}, thrust::make_constant_iterator(0) /* dummy */, thrust::make_constant_iterator(0) /* dummy */, [vertex_partition, distances] __device__( @@ -135,20 +128,19 @@ void bfs(raft::handle_t const &handle, reduce_op::any(), distances, thrust::make_zip_iterator(thrust::make_tuple(distances, predecessor_first)), - vertex_frontier, [depth] __device__(auto v_val, auto pushed_val) { - auto idx = (v_val == invalid_distance) ? static_cast(Bucket::cur) + auto idx = (v_val == invalid_distance) ? static_cast(Bucket::next) : VertexFrontier::kInvalidBucketIdx; return thrust::make_tuple(idx, thrust::make_tuple(depth + 1, pushed_val)); }); - auto new_vertex_frontier_aggregate_size = - vertex_frontier.get_bucket(static_cast(Bucket::cur)).aggregate_size() - - cur_vertex_frontier_aggregate_size; - if (new_vertex_frontier_aggregate_size == 0) { break; } - - cur_local_vertex_frontier_first = cur_local_vertex_frontier_last; - cur_vertex_frontier_aggregate_size += new_vertex_frontier_aggregate_size; + vertex_frontier.get_bucket(static_cast(Bucket::cur)).clear(); + vertex_frontier.get_bucket(static_cast(Bucket::cur)).shrink_to_fit(); + vertex_frontier.swap_buckets(static_cast(Bucket::cur), + static_cast(Bucket::next)); + if (vertex_frontier.get_bucket(static_cast(Bucket::cur)).aggregate_size() == 0) { + break; + } } depth++; diff --git a/cpp/src/experimental/graph.cu b/cpp/src/experimental/graph.cu index 47c41cb3426..18db57a737f 100644 --- a/cpp/src/experimental/graph.cu +++ b/cpp/src/experimental/graph.cu @@ -295,8 +295,8 @@ graph_t::max())); rmm::device_uvector d_thresholds(detail::num_segments_per_vertex_partition - 1, default_stream); - std::vector h_thresholds = {static_cast(detail::low_degree_threshold), - static_cast(detail::mid_degree_threshold)}; + std::vector h_thresholds = {static_cast(detail::mid_degree_threshold), + static_cast(detail::low_degree_threshold)}; raft::update_device( d_thresholds.data(), h_thresholds.data(), h_thresholds.size(), default_stream); @@ -317,7 +317,8 @@ graph_t{}); rmm::device_uvector aggregate_segment_offsets(col_comm_size * segment_offsets.size(), default_stream); @@ -326,8 +327,8 @@ graph_t::max())); rmm::device_uvector d_thresholds(detail::num_segments_per_vertex_partition - 1, default_stream); - std::vector h_thresholds = {static_cast(detail::low_degree_threshold), - static_cast(detail::mid_degree_threshold)}; + std::vector h_thresholds = {static_cast(detail::mid_degree_threshold), + static_cast(detail::low_degree_threshold)}; raft::update_device( d_thresholds.data(), h_thresholds.data(), h_thresholds.size(), default_stream); @@ -462,7 +463,8 @@ graph_tget_number_of_vertices(), d_thresholds.begin(), d_thresholds.end(), - segment_offsets.begin() + 1); + segment_offsets.begin() + 1, + thrust::greater{}); segment_offsets_.resize(segment_offsets.size()); raft::update_host( diff --git a/cpp/src/experimental/graph_view.cu b/cpp/src/experimental/graph_view.cu index c6f39a44333..67603ae260b 100644 --- a/cpp/src/experimental/graph_view.cu +++ b/cpp/src/experimental/graph_view.cu @@ -161,7 +161,7 @@ graph_view_t const& adj_matrix_partition_offsets, std::vector const& adj_matrix_partition_indices, std::vector const& adj_matrix_partition_weights, - std::vector const& vertex_partition_segment_offsets, + std::vector const& adj_matrix_partition_segment_offsets, partition_t const& partition, vertex_t number_of_vertices, edge_t number_of_edges, @@ -176,7 +176,7 @@ graph_view_t bucket_sizes(static_cast(Bucket::num_buckets), - push_graph_view.get_number_of_local_vertices()); + enum class Bucket { cur_near, next_near, far, num_buckets }; VertexFrontier(Bucket::num_buckets)> - vertex_frontier(handle, bucket_sizes); + vertex_frontier(handle); // 5. SSSP iteration @@ -172,8 +169,9 @@ void sssp(raft::handle_t const &handle, update_frontier_v_push_if_out_nbr( handle, push_graph_view, - vertex_frontier.get_bucket(static_cast(Bucket::cur_near)).begin(), - vertex_frontier.get_bucket(static_cast(Bucket::cur_near)).end(), + vertex_frontier, + static_cast(Bucket::cur_near), + std::vector{static_cast(Bucket::next_near), static_cast(Bucket::far)}, row_distances, thrust::make_constant_iterator(0) /* dummy */, [vertex_partition, distances, cutoff] __device__( @@ -193,30 +191,31 @@ void sssp(raft::handle_t const &handle, reduce_op::min>(), distances, thrust::make_zip_iterator(thrust::make_tuple(distances, predecessor_first)), - vertex_frontier, [near_far_threshold] __device__(auto v_val, auto pushed_val) { auto new_dist = thrust::get<0>(pushed_val); auto idx = new_dist < v_val - ? (new_dist < near_far_threshold ? static_cast(Bucket::new_near) + ? (new_dist < near_far_threshold ? static_cast(Bucket::next_near) : static_cast(Bucket::far)) : VertexFrontier::kInvalidBucketIdx; return thrust::make_tuple(idx, pushed_val); }); vertex_frontier.get_bucket(static_cast(Bucket::cur_near)).clear(); - if (vertex_frontier.get_bucket(static_cast(Bucket::new_near)).aggregate_size() > 0) { + vertex_frontier.get_bucket(static_cast(Bucket::cur_near)).shrink_to_fit(); + if (vertex_frontier.get_bucket(static_cast(Bucket::next_near)).aggregate_size() > 0) { vertex_frontier.swap_buckets(static_cast(Bucket::cur_near), - static_cast(Bucket::new_near)); + static_cast(Bucket::next_near)); } else if (vertex_frontier.get_bucket(static_cast(Bucket::far)).aggregate_size() > 0) { // near queue is empty, split the far queue auto old_near_far_threshold = near_far_threshold; near_far_threshold += delta; - size_t new_near_size{0}; - size_t new_far_size{0}; + size_t near_size{0}; + size_t far_size{0}; while (true) { vertex_frontier.split_bucket( static_cast(Bucket::far), + std::vector{static_cast(Bucket::cur_near)}, [vertex_partition, distances, old_near_far_threshold, near_far_threshold] __device__( auto v) { auto dist = @@ -229,17 +228,16 @@ void sssp(raft::handle_t const &handle, return static_cast(Bucket::far); } }); - new_near_size = + near_size = vertex_frontier.get_bucket(static_cast(Bucket::cur_near)).aggregate_size(); - new_far_size = - vertex_frontier.get_bucket(static_cast(Bucket::far)).aggregate_size(); - if ((new_near_size > 0) || (new_far_size == 0)) { + far_size = vertex_frontier.get_bucket(static_cast(Bucket::far)).aggregate_size(); + if ((near_size > 0) || (far_size == 0)) { break; } else { near_far_threshold += delta; } } - if ((new_near_size == 0) && (new_far_size == 0)) { break; } + if ((near_size == 0) && (far_size == 0)) { break; } } else { break; } diff --git a/cpp/tests/experimental/bfs_test.cpp b/cpp/tests/experimental/bfs_test.cpp index 8fce9488d8a..ded57dd1855 100644 --- a/cpp/tests/experimental/bfs_test.cpp +++ b/cpp/tests/experimental/bfs_test.cpp @@ -14,6 +14,7 @@ * limitations under the License. */ +#include #include #include #include @@ -153,11 +154,22 @@ class Tests_BFS : public ::testing::TestWithParam { using weight_t = float; raft::handle_t handle{}; + HighResClock hr_clock{}; + if (PERF) { + CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + hr_clock.start(); + } cugraph::experimental::graph_t graph(handle); rmm::device_uvector d_renumber_map_labels(0, handle.get_stream()); std::tie(graph, d_renumber_map_labels) = read_graph(handle, configuration, renumber); + if (PERF) { + CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + double elapsed_time{0.0}; + hr_clock.stop(&elapsed_time); + std::cout << "read_graph took " << elapsed_time * 1e-6 << " s.\n"; + } auto graph_view = graph.view(); ASSERT_TRUE(static_cast(configuration.source) >= 0 && @@ -169,7 +181,10 @@ class Tests_BFS : public ::testing::TestWithParam { rmm::device_uvector d_predecessors(graph_view.get_number_of_vertices(), handle.get_stream()); - CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + if (PERF) { + CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + hr_clock.start(); + } cugraph::experimental::bfs(handle, graph_view, @@ -179,7 +194,12 @@ class Tests_BFS : public ::testing::TestWithParam { false, std::numeric_limits::max()); - CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + if (PERF) { + CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + double elapsed_time{0.0}; + hr_clock.stop(&elapsed_time); + std::cout << "BFS took " << elapsed_time * 1e-6 << " s.\n"; + } if (configuration.check_correctness) { cugraph::experimental::graph_t unrenumbered_graph( diff --git a/cpp/tests/experimental/katz_centrality_test.cpp b/cpp/tests/experimental/katz_centrality_test.cpp index 71011f3d018..c7756699acd 100644 --- a/cpp/tests/experimental/katz_centrality_test.cpp +++ b/cpp/tests/experimental/katz_centrality_test.cpp @@ -14,6 +14,7 @@ * limitations under the License. */ +#include #include #include #include @@ -171,11 +172,22 @@ class Tests_KatzCentrality : public ::testing::TestWithParam graph(handle); rmm::device_uvector d_renumber_map_labels(0, handle.get_stream()); std::tie(graph, d_renumber_map_labels) = read_graph(handle, configuration, renumber); + if (PERF) { + CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + double elapsed_time{0.0}; + hr_clock.stop(&elapsed_time); + std::cout << "read_graph took " << elapsed_time * 1e-6 << " s.\n"; + } auto graph_view = graph.view(); auto degrees = graph_view.compute_in_degrees(handle); @@ -191,7 +203,10 @@ class Tests_KatzCentrality : public ::testing::TestWithParam d_katz_centralities(graph_view.get_number_of_vertices(), handle.get_stream()); - CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + if (PERF) { + CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + hr_clock.start(); + } cugraph::experimental::katz_centrality(handle, graph_view, @@ -204,7 +219,12 @@ class Tests_KatzCentrality : public ::testing::TestWithParam unrenumbered_graph( diff --git a/cpp/tests/experimental/mg_bfs_test.cpp b/cpp/tests/experimental/mg_bfs_test.cpp index 76ccb5d9de3..64ffedd2492 100644 --- a/cpp/tests/experimental/mg_bfs_test.cpp +++ b/cpp/tests/experimental/mg_bfs_test.cpp @@ -14,6 +14,7 @@ * limitations under the License. */ +#include #include #include #include @@ -34,6 +35,11 @@ #include +// do the perf measurements +// enabled by command line parameter s'--perf' +// +static int PERF = 0; + typedef struct BFS_Usecase_t { cugraph::test::input_graph_specifier_t input_graph_specifier{}; @@ -117,6 +123,7 @@ class Tests_MGBFS : public ::testing::TestWithParam { // 1. initialize handle raft::handle_t handle{}; + HighResClock hr_clock{}; raft::comms::initialize_mpi_comms(&handle, MPI_COMM_WORLD); auto& comm = handle.get_comms(); @@ -130,10 +137,20 @@ class Tests_MGBFS : public ::testing::TestWithParam { // 2. create MG graph + if (PERF) { + CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + hr_clock.start(); + } cugraph::experimental::graph_t mg_graph(handle); rmm::device_uvector d_mg_renumber_map_labels(0, handle.get_stream()); std::tie(mg_graph, d_mg_renumber_map_labels) = read_graph(handle, configuration, true); + if (PERF) { + CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + double elapsed_time{0.0}; + hr_clock.stop(&elapsed_time); + std::cout << "MG read_graph took " << elapsed_time * 1e-6 << " s.\n"; + } auto mg_graph_view = mg_graph.view(); @@ -149,7 +166,10 @@ class Tests_MGBFS : public ::testing::TestWithParam { rmm::device_uvector d_mg_predecessors(mg_graph_view.get_number_of_local_vertices(), handle.get_stream()); - CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + if (PERF) { + CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + hr_clock.start(); + } cugraph::experimental::bfs(handle, mg_graph_view, @@ -157,10 +177,14 @@ class Tests_MGBFS : public ::testing::TestWithParam { d_mg_predecessors.data(), static_cast(configuration.source), false, - std::numeric_limits::max(), - true); + std::numeric_limits::max()); - CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + if (PERF) { + CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + double elapsed_time{0.0}; + hr_clock.stop(&elapsed_time); + std::cout << "MG BFS took " << elapsed_time * 1e-6 << " s.\n"; + } // 5. copmare SG & MG results @@ -204,8 +228,7 @@ class Tests_MGBFS : public ::testing::TestWithParam { d_sg_predecessors.data(), unrenumbered_source, false, - std::numeric_limits::max(), - true); + std::numeric_limits::max()); // 5-3. compare diff --git a/cpp/tests/experimental/mg_katz_centrality_test.cpp b/cpp/tests/experimental/mg_katz_centrality_test.cpp index e3033af3771..937bd33472b 100644 --- a/cpp/tests/experimental/mg_katz_centrality_test.cpp +++ b/cpp/tests/experimental/mg_katz_centrality_test.cpp @@ -14,6 +14,7 @@ * limitations under the License. */ +#include #include #include #include @@ -31,6 +32,11 @@ #include +// do the perf measurements +// enabled by command line parameter s'--perf' +// +static int PERF = 0; + typedef struct KatzCentrality_Usecase_t { cugraph::test::input_graph_specifier_t input_graph_specifier{}; @@ -117,6 +123,7 @@ class Tests_MGKatzCentrality : public ::testing::TestWithParam mg_graph(handle); rmm::device_uvector d_mg_renumber_map_labels(0, handle.get_stream()); std::tie(mg_graph, d_mg_renumber_map_labels) = read_graph(handle, configuration, true); + if (PERF) { + CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + double elapsed_time{0.0}; + hr_clock.stop(&elapsed_time); + std::cout << "MG read_graph took " << elapsed_time * 1e-6 << " s.\n"; + } auto mg_graph_view = mg_graph.view(); @@ -150,7 +167,10 @@ class Tests_MGKatzCentrality : public ::testing::TestWithParam d_mg_katz_centralities( mg_graph_view.get_number_of_local_vertices(), handle.get_stream()); - CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + if (PERF) { + CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + hr_clock.start(); + } cugraph::experimental::katz_centrality(handle, mg_graph_view, @@ -160,10 +180,14 @@ class Tests_MGKatzCentrality : public ::testing::TestWithParam::max(), - false, - true); + false); - CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + if (PERF) { + CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + double elapsed_time{0.0}; + hr_clock.stop(&elapsed_time); + std::cout << "MG Katz Centrality took " << elapsed_time * 1e-6 << " s.\n"; + } // 5. copmare SG & MG results @@ -189,8 +213,7 @@ class Tests_MGKatzCentrality : public ::testing::TestWithParam::max(), // max_iterations - false, - true); + false); // 5-4. compare diff --git a/cpp/tests/experimental/mg_sssp_test.cpp b/cpp/tests/experimental/mg_sssp_test.cpp index 48e4dc869f4..de39b8da128 100644 --- a/cpp/tests/experimental/mg_sssp_test.cpp +++ b/cpp/tests/experimental/mg_sssp_test.cpp @@ -14,6 +14,7 @@ * limitations under the License. */ +#include #include #include #include @@ -34,6 +35,11 @@ #include +// do the perf measurements +// enabled by command line parameter s'--perf' +// +static int PERF = 0; + typedef struct SSSP_Usecase_t { cugraph::test::input_graph_specifier_t input_graph_specifier{}; @@ -115,6 +121,7 @@ class Tests_MGSSSP : public ::testing::TestWithParam { // 1. initialize handle raft::handle_t handle{}; + HighResClock hr_clock{}; raft::comms::initialize_mpi_comms(&handle, MPI_COMM_WORLD); auto& comm = handle.get_comms(); @@ -128,10 +135,20 @@ class Tests_MGSSSP : public ::testing::TestWithParam { // 2. create MG graph + if (PERF) { + CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + hr_clock.start(); + } cugraph::experimental::graph_t mg_graph(handle); rmm::device_uvector d_mg_renumber_map_labels(0, handle.get_stream()); std::tie(mg_graph, d_mg_renumber_map_labels) = read_graph(handle, configuration, true); + if (PERF) { + CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + double elapsed_time{0.0}; + hr_clock.stop(&elapsed_time); + std::cout << "MG read_graph took " << elapsed_time * 1e-6 << " s.\n"; + } auto mg_graph_view = mg_graph.view(); @@ -147,7 +164,10 @@ class Tests_MGSSSP : public ::testing::TestWithParam { rmm::device_uvector d_mg_predecessors(mg_graph_view.get_number_of_local_vertices(), handle.get_stream()); - CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + if (PERF) { + CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + hr_clock.start(); + } // FIXME: disable do_expensive_check cugraph::experimental::sssp(handle, @@ -155,10 +175,14 @@ class Tests_MGSSSP : public ::testing::TestWithParam { d_mg_distances.data(), d_mg_predecessors.data(), static_cast(configuration.source), - std::numeric_limits::max(), - true); + std::numeric_limits::max()); - CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + if (PERF) { + CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + double elapsed_time{0.0}; + hr_clock.stop(&elapsed_time); + std::cout << "MG SSSP took " << elapsed_time * 1e-6 << " s.\n"; + } // 5. copmare SG & MG results @@ -202,8 +226,7 @@ class Tests_MGSSSP : public ::testing::TestWithParam { d_sg_distances.data(), d_sg_predecessors.data(), unrenumbered_source, - std::numeric_limits::max(), - true); + std::numeric_limits::max()); // 5-3. compare diff --git a/cpp/tests/experimental/pagerank_test.cpp b/cpp/tests/experimental/pagerank_test.cpp index 649fe11d805..0340140d14b 100644 --- a/cpp/tests/experimental/pagerank_test.cpp +++ b/cpp/tests/experimental/pagerank_test.cpp @@ -14,6 +14,7 @@ * limitations under the License. */ +#include #include #include #include @@ -213,11 +214,22 @@ class Tests_PageRank : public ::testing::TestWithParam { constexpr bool renumber = true; raft::handle_t handle{}; + HighResClock hr_clock{}; + if (PERF) { + CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + hr_clock.start(); + } cugraph::experimental::graph_t graph(handle); rmm::device_uvector d_renumber_map_labels(0, handle.get_stream()); std::tie(graph, d_renumber_map_labels) = read_graph(handle, configuration, renumber); + if (PERF) { + CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + double elapsed_time{0.0}; + hr_clock.stop(&elapsed_time); + std::cout << "read_graph took " << elapsed_time * 1e-6 << " s.\n"; + } auto graph_view = graph.view(); std::vector h_personalization_vertices{}; @@ -271,7 +283,10 @@ class Tests_PageRank : public ::testing::TestWithParam { rmm::device_uvector d_pageranks(graph_view.get_number_of_vertices(), handle.get_stream()); - CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + if (PERF) { + CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + hr_clock.start(); + } cugraph::experimental::pagerank(handle, graph_view, @@ -286,7 +301,12 @@ class Tests_PageRank : public ::testing::TestWithParam { false, false); - CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + if (PERF) { + CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + double elapsed_time{0.0}; + hr_clock.stop(&elapsed_time); + std::cout << "PageRank took " << elapsed_time * 1e-6 << " s.\n"; + } if (configuration.check_correctness) { cugraph::experimental::graph_t unrenumbered_graph( diff --git a/cpp/tests/experimental/sssp_test.cpp b/cpp/tests/experimental/sssp_test.cpp index 9364d261dec..e8ab3ec5426 100644 --- a/cpp/tests/experimental/sssp_test.cpp +++ b/cpp/tests/experimental/sssp_test.cpp @@ -14,6 +14,7 @@ * limitations under the License. */ +#include #include #include #include @@ -157,11 +158,22 @@ class Tests_SSSP : public ::testing::TestWithParam { constexpr bool renumber = true; raft::handle_t handle{}; + HighResClock hr_clock{}; + if (PERF) { + CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + hr_clock.start(); + } cugraph::experimental::graph_t graph(handle); rmm::device_uvector d_renumber_map_labels(0, handle.get_stream()); std::tie(graph, d_renumber_map_labels) = read_graph(handle, configuration, renumber); + if (PERF) { + CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + double elapsed_time{0.0}; + hr_clock.stop(&elapsed_time); + std::cout << "read_graph took " << elapsed_time * 1e-6 << " s.\n"; + } auto graph_view = graph.view(); ASSERT_TRUE(static_cast(configuration.source) >= 0 && @@ -172,7 +184,10 @@ class Tests_SSSP : public ::testing::TestWithParam { rmm::device_uvector d_predecessors(graph_view.get_number_of_vertices(), handle.get_stream()); - CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + if (PERF) { + CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + hr_clock.start(); + } cugraph::experimental::sssp(handle, graph_view, @@ -182,7 +197,12 @@ class Tests_SSSP : public ::testing::TestWithParam { std::numeric_limits::max(), false); - CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + if (PERF) { + CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + double elapsed_time{0.0}; + hr_clock.stop(&elapsed_time); + std::cout << "SSSP took " << elapsed_time * 1e-6 << " s.\n"; + } if (configuration.check_correctness) { cugraph::experimental::graph_t unrenumbered_graph( @@ -323,7 +343,7 @@ INSTANTIATE_TEST_CASE_P( SSSP_Usecase("test/datasets/wiki2003.mtx", 1000), SSSP_Usecase(cugraph::test::rmat_params_t{10, 16, 0.57, 0.19, 0.19, 0, false, false}, 0), // disable correctness checks for large graphs - SSSP_Usecase(cugraph::test::rmat_params_t{20, 16, 0.57, 0.19, 0.19, 0, false, false}, + SSSP_Usecase(cugraph::test::rmat_params_t{20, 32, 0.57, 0.19, 0.19, 0, false, false}, 0, false))); diff --git a/cpp/tests/pagerank/mg_pagerank_test.cpp b/cpp/tests/pagerank/mg_pagerank_test.cpp index f7b1e8dfbb4..bbc80a60a3d 100644 --- a/cpp/tests/pagerank/mg_pagerank_test.cpp +++ b/cpp/tests/pagerank/mg_pagerank_test.cpp @@ -14,6 +14,7 @@ * limitations under the License. */ +#include #include #include #include @@ -34,6 +35,11 @@ #include +// do the perf measurements +// enabled by command line parameter s'--perf' +// +static int PERF = 0; + typedef struct PageRank_Usecase_t { cugraph::test::input_graph_specifier_t input_graph_specifier{}; @@ -127,6 +133,7 @@ class Tests_MGPageRank : public ::testing::TestWithParam { // 1. initialize handle raft::handle_t handle{}; + HighResClock hr_clock{}; raft::comms::initialize_mpi_comms(&handle, MPI_COMM_WORLD); auto& comm = handle.get_comms(); @@ -140,10 +147,20 @@ class Tests_MGPageRank : public ::testing::TestWithParam { // 2. create MG graph + if (PERF) { + CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + hr_clock.start(); + } cugraph::experimental::graph_t mg_graph(handle); rmm::device_uvector d_mg_renumber_map_labels(0, handle.get_stream()); std::tie(mg_graph, d_mg_renumber_map_labels) = read_graph(handle, configuration, true); + if (PERF) { + CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + double elapsed_time{0.0}; + hr_clock.stop(&elapsed_time); + std::cout << "MG read_graph took " << elapsed_time * 1e-6 << " s.\n"; + } auto mg_graph_view = mg_graph.view(); @@ -195,7 +212,10 @@ class Tests_MGPageRank : public ::testing::TestWithParam { rmm::device_uvector d_mg_pageranks(mg_graph_view.get_number_of_local_vertices(), handle.get_stream()); - CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + if (PERF) { + CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + hr_clock.start(); + } cugraph::experimental::pagerank(handle, mg_graph_view, @@ -209,7 +229,12 @@ class Tests_MGPageRank : public ::testing::TestWithParam { std::numeric_limits::max(), false); - CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + if (PERF) { + CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + double elapsed_time{0.0}; + hr_clock.stop(&elapsed_time); + std::cout << "MG PageRank took " << elapsed_time * 1e-6 << " s.\n"; + } // 5. copmare SG & MG results diff --git a/cpp/tests/utilities/generate_graph_from_edgelist.cu b/cpp/tests/utilities/generate_graph_from_edgelist.cu index a9df392d2fb..5f41e0e5ce0 100644 --- a/cpp/tests/utilities/generate_graph_from_edgelist.cu +++ b/cpp/tests/utilities/generate_graph_from_edgelist.cu @@ -109,7 +109,6 @@ generate_graph_from_edgelist_impl(raft::handle_t const& handle, (store_transposed ? edgelist_rows.begin() : edgelist_cols.begin()) + h_displacements[i]; counts[i] = static_cast(h_edge_counts[i]); } - // FIXME: set do_expensive_check to false once validated std::tie(renumber_map_labels, partition, number_of_vertices, number_of_edges) = cugraph::experimental::renumber_edgelist( handle, @@ -117,8 +116,7 @@ generate_graph_from_edgelist_impl(raft::handle_t const& handle, static_cast(vertices.size()), major_ptrs, minor_ptrs, - counts, - true); + counts); } // 4. create a graph @@ -142,7 +140,6 @@ generate_graph_from_edgelist_impl(raft::handle_t const& handle, number_of_vertices, number_of_edges, cugraph::experimental::graph_properties_t{is_symmetric, false, test_weighted}, - true, true), std::move(renumber_map_labels)); } @@ -168,7 +165,6 @@ generate_graph_from_edgelist_impl(raft::handle_t const& handle, { vertex_t number_of_vertices = static_cast(vertices.size()); - // FIXME: set do_expensive_check to false once validated auto renumber_map_labels = renumber ? cugraph::experimental::renumber_edgelist( handle, @@ -176,11 +172,9 @@ generate_graph_from_edgelist_impl(raft::handle_t const& handle, static_cast(vertices.size()), store_transposed ? edgelist_cols.data() : edgelist_rows.data(), store_transposed ? edgelist_rows.data() : edgelist_cols.data(), - static_cast(edgelist_rows.size()), - true) + static_cast(edgelist_rows.size())) : rmm::device_uvector(0, handle.get_stream()); - // FIXME: set do_expensive_check to false once validated return std::make_tuple( cugraph::experimental::graph_t( handle, @@ -191,8 +185,7 @@ generate_graph_from_edgelist_impl(raft::handle_t const& handle, static_cast(edgelist_rows.size())}, number_of_vertices, cugraph::experimental::graph_properties_t{is_symmetric, false, test_weighted}, - renumber ? true : false, - true), + renumber ? true : false), std::move(renumber_map_labels)); }