From f6a92fc915e0e48b3ac425d38ce37e17dffde609 Mon Sep 17 00:00:00 2001 From: Kumar Aatish Date: Wed, 16 Feb 2022 15:59:42 -0500 Subject: [PATCH] Multi gpu sample edges utilities (#2064) Add utilities to enable multi gpu gathering to be used for mnmg sampling. Authors: - Kumar Aatish (https://github.com/kaatish) Approvers: - Chuck Hastings (https://github.com/ChuckHastings) - Seunghwa Kang (https://github.com/seunghwak) URL: https://github.com/rapidsai/cugraph/pull/2064 --- cpp/CMakeLists.txt | 1 + .../cugraph/detail/graph_functions.cuh | 192 +++++++ cpp/include/cugraph/graph_view.hpp | 15 +- cpp/src/sampling/detail/gather_utils_impl.cu | 268 ++++++++++ cpp/src/sampling/detail/gather_utils_impl.cuh | 488 +++++++++++++++++ cpp/src/structure/graph_impl.cuh | 139 +---- cpp/src/structure/graph_view_impl.cuh | 182 ++++++- cpp/tests/CMakeLists.txt | 6 +- cpp/tests/sampling/detail/mg_gather_utils.cu | 496 ++++++++++++++++++ 9 files changed, 1643 insertions(+), 144 deletions(-) create mode 100644 cpp/include/cugraph/detail/graph_functions.cuh create mode 100644 cpp/src/sampling/detail/gather_utils_impl.cu create mode 100644 cpp/src/sampling/detail/gather_utils_impl.cuh create mode 100644 cpp/tests/sampling/detail/mg_gather_utils.cu diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index b1c50959c88..d6de28f0d7d 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -182,6 +182,7 @@ add_library(cugraph SHARED src/community/legacy/extract_subgraph_by_vertex.cu src/community/legacy/egonet.cu src/sampling/random_walks.cu + src/sampling/detail/gather_utils_impl.cu src/cores/legacy/core_number.cu src/cores/core_number_sg.cu src/cores/core_number_mg.cu diff --git a/cpp/include/cugraph/detail/graph_functions.cuh b/cpp/include/cugraph/detail/graph_functions.cuh new file mode 100644 index 00000000000..402c12362ed --- /dev/null +++ b/cpp/include/cugraph/detail/graph_functions.cuh @@ -0,0 +1,192 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include + +#include +#include + +namespace cugraph { + +namespace detail { + +/** + * @brief Compute local out degrees of the sources belonging to the adjacency matrices + * stored on each gpu + * + * Iterate through partitions and store their local degrees + * + * @tparam GraphViewType Type of the passed non-owning graph object. + * @param handle RAFT handle object to encapsulate resources (e.g. CUDA stream, communicator, and + * handles to various CUDA libraries) to run graph algorithms. + * @param graph_view Non-owning graph object. + * @return A single vector containing the local out degrees of the sources belong to the adjacency + * matrices + */ +template +rmm::device_uvector compute_local_major_degrees( + raft::handle_t const& handle, GraphViewType const& graph_view); + +/** + * @brief Calculate global degree information for all vertices represented by current gpu + * + * Calculate local degree and perform row wise exclusive scan over all gpus in column + * communicator. + * + * @tparam GraphViewType Type of the passed non-owning graph object. + * @param handle RAFT handle object to encapsulate resources (e.g. CUDA stream, communicator, and + * handles to various CUDA libraries) to run graph algorithms. + * @param graph_view Non-owning graph object. + * @return Tuple of two device vectors. The first one contains per source edge-count encountered + * by gpus in the column communicator before current gpu. The second device vector contains the + * global out degree for every source represented by current gpu + */ +template +std::tuple, + rmm::device_uvector> +get_global_degree_information(raft::handle_t const& handle, GraphViewType const& graph_view); + +/** + * @brief Gather active sources and associated client gpu ids across gpus in a + * column communicator + * + * Collect all the vertex ids and client gpu ids to be processed by every gpu in + * the column communicator and call sort on the list. + * + * @tparam vertex_t Type of vertex indices. + * @tparam VertexIterator Type of the iterator for vertex identifiers. + * @tparam GPUIdIterator Type of the iterator for gpu id identifiers. + * @param handle RAFT handle object to encapsulate resources (e.g. CUDA stream, communicator, and + * handles to various CUDA libraries) to run graph algorithms. + * @param vertex_input_first Iterator pointing to the first vertex id to be processed + * @param vertex_input_last Iterator pointing to the last (exclusive) vertex id to be processed + * @param gpu_id_first Iterator pointing to the first gpu id to be processed + * @return Device vector containing all the vertices that are to be processed by every gpu + * in the column communicator + */ +template +std::tuple, + rmm::device_uvector::value_type>> +gather_active_sources_in_row(raft::handle_t const& handle, + GraphViewType const& graph_view, + VertexIterator vertex_input_first, + VertexIterator vertex_input_last, + GPUIdIterator gpu_id_first); + +/** + * @brief Return global out degrees of active sources + * + * Get partition information of all graph partitions on the gpu and select + * global degrees of all active sources + * + * @tparam GraphViewType Type of the passed non-owning graph object. + * @param handle RAFT handle object to encapsulate resources (e.g. CUDA stream, communicator, and + * handles to various CUDA libraries) to run graph algorithms. + * @param graph_view Non-owning graph object. + * @param active_majors Device vector containing all the vertex id that are processed by + * gpus in the column communicator + * @param global_out_degrees Global out degrees for every source represented by current gpu + * @return Global out degrees of all sources in active_majors + */ +template +rmm::device_uvector get_active_major_global_degrees( + raft::handle_t const& handle, + GraphViewType const& graph_view, + const rmm::device_uvector& active_majors, + const rmm::device_uvector& global_out_degrees); + +/** + * @brief Return partition information of all vertex ids of all the partitions belonging to a gpu + * + * Iterate through partitions and store the starting vertex ids, exclusive scan of vertex counts, + * offsets and indices of the partitions csr structure + * + * @tparam GraphViewType Type of the passed non-owning graph object. + * @param handle RAFT handle object to encapsulate resources (e.g. CUDA stream, communicator, and + * handles to various CUDA libraries) to run graph algorithms. + * @param graph_view Non-owning graph object. + * @return Tuple of device vectors. The first vector contains all the partitions related to the + * gpu. The second and third vectors contain starting and ending vertex ids of all the partitions + * belonging to the gpu. The fourth vector contains the starting vertex id of the hypersparse + * region in each partition. The fifth vector denotes the vertex count offset (how many vertices + * are dealt with by the previous partitions. + */ +template +std::tuple>, + rmm::device_uvector, + rmm::device_uvector, + rmm::device_uvector, + rmm::device_uvector> +partition_information(raft::handle_t const& handle, GraphViewType const& graph_view); + +/** + * @brief Gather valid edges present on the current gpu + * + * Collect all the edges that are present in the adjacency lists on the current gpu + * + * @tparam GraphViewType Type of the passed non-owning graph object. + * @tparam EdgeIndexIterator Type of the iterator for edge indices. + * @tparam GPUIdIterator Type of the iterator for gpu id identifiers. + * @param handle RAFT handle object to encapsulate resources (e.g. CUDA stream, communicator, and + * handles to various CUDA libraries) to run graph algorithms. + * @param graph_view Non-owning graph object. + * @param active_majors_in_row Device vector containing all the vertex id that are processed by + * gpus in the column communicator + * @param active_major_gpu_ids Device vector containing the gpu id associated by every vertex + * present in active_majors_in_row + * @param edge_index_first Iterator pointing to the first destination index + * @param indices_per_source Number of indices supplied for every source in the range + * [vertex_input_first, vertex_input_last) + * @param global_degree_offset Global degree offset to local adjacency list for every source + * represented by current gpu + * @return A tuple of device vector containing the majors, minors and gpu_ids gathered locally + */ +template +std::tuple, + rmm::device_uvector, + rmm::device_uvector> +gather_local_edges( + raft::handle_t const& handle, + GraphViewType const& graph_view, + const rmm::device_uvector& active_majors_in_row, + const rmm::device_uvector& active_major_gpu_ids, + EdgeIndexIterator edge_index_first, + typename GraphViewType::edge_type indices_per_major, + const rmm::device_uvector& global_degree_offsets); + +} // namespace detail + +} // namespace cugraph diff --git a/cpp/include/cugraph/graph_view.hpp b/cpp/include/cugraph/graph_view.hpp index c0b1c66ea9f..f9b10c0cd53 100644 --- a/cpp/include/cugraph/graph_view.hpp +++ b/cpp/include/cugraph/graph_view.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + #pragma once #include @@ -632,6 +633,12 @@ class graph_view_t, + rmm::device_uvector, + std::optional>> + decompress_to_edgelist(raft::handle_t const& handle, + std::optional> const& renumber_map) const; + private: std::vector adj_matrix_partition_offsets_{}; std::vector adj_matrix_partition_indices_{}; @@ -859,6 +866,12 @@ class graph_view_t, + rmm::device_uvector, + std::optional>> + decompress_to_edgelist(raft::handle_t const& handle, + std::optional> const& renumber_map) const; + private: edge_t const* offsets_{nullptr}; vertex_t const* indices_{nullptr}; diff --git a/cpp/src/sampling/detail/gather_utils_impl.cu b/cpp/src/sampling/detail/gather_utils_impl.cu new file mode 100644 index 00000000000..fd70b402540 --- /dev/null +++ b/cpp/src/sampling/detail/gather_utils_impl.cu @@ -0,0 +1,268 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +namespace cugraph { + +namespace detail { + +template rmm::device_uvector compute_local_major_degrees( + raft::handle_t const& handle, + graph_view_t const& graph_view); + +template rmm::device_uvector compute_local_major_degrees( + raft::handle_t const& handle, + graph_view_t const& graph_view); + +template rmm::device_uvector compute_local_major_degrees( + raft::handle_t const& handle, + graph_view_t const& graph_view); + +template rmm::device_uvector compute_local_major_degrees( + raft::handle_t const& handle, + graph_view_t const& graph_view); + +template rmm::device_uvector compute_local_major_degrees( + raft::handle_t const& handle, + graph_view_t const& graph_view); + +template rmm::device_uvector compute_local_major_degrees( + raft::handle_t const& handle, + graph_view_t const& graph_view); + +template std::tuple, rmm::device_uvector> +get_global_degree_information(raft::handle_t const& handle, + graph_view_t const& graph_view); + +template std::tuple, rmm::device_uvector> +get_global_degree_information( + raft::handle_t const& handle, + graph_view_t const& graph_view); + +template std::tuple, rmm::device_uvector> +get_global_degree_information(raft::handle_t const& handle, + graph_view_t const& graph_view); + +template std::tuple, rmm::device_uvector> +get_global_degree_information( + raft::handle_t const& handle, + graph_view_t const& graph_view); + +template std::tuple, rmm::device_uvector> +get_global_degree_information(raft::handle_t const& handle, + graph_view_t const& graph_view); + +template std::tuple, rmm::device_uvector> +get_global_degree_information( + raft::handle_t const& handle, + graph_view_t const& graph_view); + +template std::tuple, rmm::device_uvector> +gather_active_sources_in_row(raft::handle_t const& handle, + graph_view_t const& graph_view, + int32_t const* vertex_input_first, + int32_t const* vertex_input_last, + int32_t const* gpu_id_first); + +template std::tuple, rmm::device_uvector> +gather_active_sources_in_row(raft::handle_t const& handle, + graph_view_t const& graph_view, + int32_t const* vertex_input_first, + int32_t const* vertex_input_last, + int32_t const* gpu_id_first); + +template std::tuple, rmm::device_uvector> +gather_active_sources_in_row(raft::handle_t const& handle, + graph_view_t const& graph_view, + int32_t const* vertex_input_first, + int32_t const* vertex_input_last, + int32_t const* gpu_id_first); + +template std::tuple, rmm::device_uvector> +gather_active_sources_in_row(raft::handle_t const& handle, + graph_view_t const& graph_view, + int32_t const* vertex_input_first, + int32_t const* vertex_input_last, + int32_t const* gpu_id_first); + +template std::tuple, rmm::device_uvector> +gather_active_sources_in_row(raft::handle_t const& handle, + graph_view_t const& graph_view, + int64_t const* vertex_input_first, + int64_t const* vertex_input_last, + int32_t const* gpu_id_first); + +template std::tuple, rmm::device_uvector> +gather_active_sources_in_row(raft::handle_t const& handle, + graph_view_t const& graph_view, + int64_t const* vertex_input_first, + int64_t const* vertex_input_last, + int32_t const* gpu_id_first); + +template rmm::device_uvector get_active_major_global_degrees( + raft::handle_t const& handle, + graph_view_t const& graph_view, + const rmm::device_uvector& active_majors, + const rmm::device_uvector& global_out_degrees); + +template rmm::device_uvector get_active_major_global_degrees( + raft::handle_t const& handle, + graph_view_t const& graph_view, + const rmm::device_uvector& active_majors, + const rmm::device_uvector& global_out_degrees); + +template rmm::device_uvector get_active_major_global_degrees( + raft::handle_t const& handle, + graph_view_t const& graph_view, + const rmm::device_uvector& active_majors, + const rmm::device_uvector& global_out_degrees); + +template rmm::device_uvector get_active_major_global_degrees( + raft::handle_t const& handle, + graph_view_t const& graph_view, + const rmm::device_uvector& active_majors, + const rmm::device_uvector& global_out_degrees); + +template rmm::device_uvector get_active_major_global_degrees( + raft::handle_t const& handle, + graph_view_t const& graph_view, + const rmm::device_uvector& active_majors, + const rmm::device_uvector& global_out_degrees); + +template rmm::device_uvector get_active_major_global_degrees( + raft::handle_t const& handle, + graph_view_t const& graph_view, + const rmm::device_uvector& active_majors, + const rmm::device_uvector& global_out_degrees); + +template std::tuple< + rmm::device_uvector>, + rmm::device_uvector, + rmm::device_uvector, + rmm::device_uvector, + rmm::device_uvector> +partition_information(raft::handle_t const& handle, + graph_view_t const& graph_view); + +template std::tuple< + rmm::device_uvector>, + rmm::device_uvector, + rmm::device_uvector, + rmm::device_uvector, + rmm::device_uvector> +partition_information(raft::handle_t const& handle, + graph_view_t const& graph_view); + +template std::tuple< + rmm::device_uvector>, + rmm::device_uvector, + rmm::device_uvector, + rmm::device_uvector, + rmm::device_uvector> +partition_information(raft::handle_t const& handle, + graph_view_t const& graph_view); + +template std::tuple< + rmm::device_uvector>, + rmm::device_uvector, + rmm::device_uvector, + rmm::device_uvector, + rmm::device_uvector> +partition_information(raft::handle_t const& handle, + graph_view_t const& graph_view); + +template std::tuple< + rmm::device_uvector>, + rmm::device_uvector, + rmm::device_uvector, + rmm::device_uvector, + rmm::device_uvector> +partition_information(raft::handle_t const& handle, + graph_view_t const& graph_view); + +template std::tuple< + rmm::device_uvector>, + rmm::device_uvector, + rmm::device_uvector, + rmm::device_uvector, + rmm::device_uvector> +partition_information(raft::handle_t const& handle, + graph_view_t const& graph_view); + +template std:: + tuple, rmm::device_uvector, rmm::device_uvector> + gather_local_edges(raft::handle_t const& handle, + graph_view_t const& graph_view, + const rmm::device_uvector& active_majors_in_row, + const rmm::device_uvector& active_major_gpu_ids, + int32_t const* edge_index_first, + int32_t indices_per_major, + const rmm::device_uvector& global_degree_offsets); + +template std:: + tuple, rmm::device_uvector, rmm::device_uvector> + gather_local_edges(raft::handle_t const& handle, + graph_view_t const& graph_view, + const rmm::device_uvector& active_majors_in_row, + const rmm::device_uvector& active_major_gpu_ids, + int32_t const* edge_index_first, + int32_t indices_per_major, + const rmm::device_uvector& global_degree_offsets); + +template std:: + tuple, rmm::device_uvector, rmm::device_uvector> + gather_local_edges(raft::handle_t const& handle, + graph_view_t const& graph_view, + const rmm::device_uvector& active_majors_in_row, + const rmm::device_uvector& active_major_gpu_ids, + int64_t const* edge_index_first, + int64_t indices_per_major, + const rmm::device_uvector& global_degree_offsets); + +template std:: + tuple, rmm::device_uvector, rmm::device_uvector> + gather_local_edges(raft::handle_t const& handle, + graph_view_t const& graph_view, + const rmm::device_uvector& active_majors_in_row, + const rmm::device_uvector& active_major_gpu_ids, + int64_t const* edge_index_first, + int64_t indices_per_major, + const rmm::device_uvector& global_degree_offsets); + +template std:: + tuple, rmm::device_uvector, rmm::device_uvector> + gather_local_edges(raft::handle_t const& handle, + graph_view_t const& graph_view, + const rmm::device_uvector& active_majors_in_row, + const rmm::device_uvector& active_major_gpu_ids, + int64_t const* edge_index_first, + int64_t indices_per_major, + const rmm::device_uvector& global_degree_offsets); + +template std:: + tuple, rmm::device_uvector, rmm::device_uvector> + gather_local_edges(raft::handle_t const& handle, + graph_view_t const& graph_view, + const rmm::device_uvector& active_majors_in_row, + const rmm::device_uvector& active_major_gpu_ids, + int64_t const* edge_index_first, + int64_t indices_per_major, + const rmm::device_uvector& global_degree_offsets); + +} // namespace detail + +} // namespace cugraph diff --git a/cpp/src/sampling/detail/gather_utils_impl.cuh b/cpp/src/sampling/detail/gather_utils_impl.cuh new file mode 100644 index 00000000000..4b8efac7b01 --- /dev/null +++ b/cpp/src/sampling/detail/gather_utils_impl.cuh @@ -0,0 +1,488 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include + +#include +#include + +namespace cugraph { + +namespace detail { + +template +rmm::device_uvector compute_local_major_degrees( + raft::handle_t const& handle, GraphViewType const& graph_view) +{ + static_assert(GraphViewType::is_adj_matrix_transposed == false); + using vertex_t = typename GraphViewType::vertex_type; + using edge_t = typename GraphViewType::edge_type; + using weight_t = typename GraphViewType::weight_type; + + rmm::device_uvector local_degrees( + GraphViewType::is_adj_matrix_transposed + ? graph_view.get_number_of_local_adj_matrix_partition_cols() + : graph_view.get_number_of_local_adj_matrix_partition_rows(), + handle.get_stream()); + + // FIXME optimize for communication + // get_number_of_local_adj_matrix_partition_rows == summation of get_major_size() of all + // partitions belonging to the gpu + vertex_t partial_offset{0}; + for (size_t i = 0; i < graph_view.get_number_of_local_adj_matrix_partitions(); ++i) { + auto matrix_partition = + matrix_partition_device_view_t( + graph_view.get_matrix_partition_view(i)); + + // Check if hypersparse segment is present in the partition + auto segment_offsets = graph_view.get_local_adj_matrix_partition_segment_offsets(i); + auto use_dcs = segment_offsets + ? ((*segment_offsets).size() > (num_sparse_segments_per_vertex_partition + 1)) + : false; + + if (use_dcs) { + auto major_hypersparse_first = matrix_partition.get_major_first() + + (*segment_offsets)[num_sparse_segments_per_vertex_partition]; + // Calculate degrees in sparse region + auto sparse_begin = local_degrees.begin() + partial_offset; + auto sparse_end = local_degrees.begin() + partial_offset + + (major_hypersparse_first - matrix_partition.get_major_first()); + ; + + thrust::tabulate(handle.get_thrust_policy(), + sparse_begin, + sparse_end, + [offsets = matrix_partition.get_offsets()] __device__(auto i) { + return offsets[i + 1] - offsets[i]; + }); + + // Calculate degrees in hypersparse region + auto dcs_nzd_vertex_count = *(matrix_partition.get_dcs_nzd_vertex_count()); + // Initialize hypersparse region degrees as 0 + thrust::fill(handle.get_thrust_policy(), + sparse_end, + sparse_begin + matrix_partition.get_major_size(), + edge_t{0}); + thrust::for_each(handle.get_thrust_policy(), + thrust::make_counting_iterator(vertex_t{0}), + thrust::make_counting_iterator(dcs_nzd_vertex_count), + [major_hypersparse_first, + major_first = matrix_partition.get_major_first(), + vertex_ids = *(matrix_partition.get_dcs_nzd_vertices()), + offsets = matrix_partition.get_offsets(), + local_degrees = thrust::raw_pointer_cast(sparse_begin)] __device__(auto i) { + auto d = offsets[(major_hypersparse_first - major_first) + i + 1] - + offsets[(major_hypersparse_first - major_first) + i]; + auto v = vertex_ids[i]; + local_degrees[v - major_first] = d; + }); + } else { + auto sparse_begin = local_degrees.begin() + partial_offset; + auto sparse_end = local_degrees.begin() + partial_offset + matrix_partition.get_major_size(); + thrust::tabulate(handle.get_thrust_policy(), + sparse_begin, + sparse_end, + [offsets = matrix_partition.get_offsets()] __device__(auto i) { + return offsets[i + 1] - offsets[i]; + }); + } + partial_offset += matrix_partition.get_major_size(); + } + return local_degrees; +} + +template +std::tuple, + rmm::device_uvector> +get_global_degree_information(raft::handle_t const& handle, GraphViewType const& graph_view) +{ + static_assert(GraphViewType::is_multi_gpu == true); + using edge_t = typename GraphViewType::edge_type; + auto local_degrees = compute_local_major_degrees(handle, graph_view); + + auto& col_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().col_name()); + auto const col_size = col_comm.get_size(); + auto const col_rank = col_comm.get_rank(); + auto& row_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().row_name()); + auto const row_size = row_comm.get_size(); + + auto& comm = handle.get_comms(); + auto const comm_size = comm.get_size(); + auto const comm_rank = comm.get_rank(); + + rmm::device_uvector temp_input(local_degrees.size(), handle.get_stream()); + raft::update_device( + temp_input.data(), local_degrees.data(), local_degrees.size(), handle.get_stream()); + + rmm::device_uvector recv_data(local_degrees.size(), handle.get_stream()); + if (col_rank == 0) { + thrust::fill(handle.get_thrust_policy(), recv_data.begin(), recv_data.end(), edge_t{0}); + } + for (int i = 0; i < col_size - 1; ++i) { + if (col_rank == i) { + comm.device_send( + temp_input.begin(), temp_input.size(), comm_rank + row_size, handle.get_stream()); + } + if (col_rank == i + 1) { + comm.device_recv( + recv_data.begin(), recv_data.size(), comm_rank - row_size, handle.get_stream()); + thrust::transform(handle.get_thrust_policy(), + temp_input.begin(), + temp_input.end(), + recv_data.begin(), + temp_input.begin(), + thrust::plus()); + } + col_comm.barrier(); + } + // Get global degrees + device_bcast(col_comm, + temp_input.begin(), + temp_input.begin(), + temp_input.size(), + col_size - 1, + handle.get_stream()); + + return std::make_tuple(std::move(recv_data), std::move(temp_input)); +} + +template +std::tuple, + rmm::device_uvector::value_type>> +gather_active_sources_in_row(raft::handle_t const& handle, + GraphViewType const& graph_view, + VertexIterator vertex_input_first, + VertexIterator vertex_input_last, + GPUIdIterator gpu_id_first) +{ + static_assert(GraphViewType::is_multi_gpu == true); + static_assert(GraphViewType::is_adj_matrix_transposed == false); + using gpu_t = typename std::iterator_traits::value_type; + using vertex_t = typename GraphViewType::vertex_type; + + auto const& col_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().col_name()); + size_t source_count = thrust::distance(vertex_input_first, vertex_input_last); + auto external_source_counts = + cugraph::host_scalar_allgather(col_comm, source_count, handle.get_stream()); + auto total_external_source_count = + std::accumulate(external_source_counts.begin(), external_source_counts.end(), size_t{0}); + std::vector displacements(external_source_counts.size(), size_t{0}); + std::exclusive_scan( + external_source_counts.begin(), external_source_counts.end(), displacements.begin(), size_t{0}); + + rmm::device_uvector active_majors(total_external_source_count, handle.get_stream()); + rmm::device_uvector active_major_gpu_ids(total_external_source_count, handle.get_stream()); + // Get the sources other gpus on the same row are working on + // FIXME : replace with device_bcast for better scaling + device_allgatherv(col_comm, + vertex_input_first, + active_majors.data(), + external_source_counts, + displacements, + handle.get_stream()); + device_allgatherv(col_comm, + gpu_id_first, + active_major_gpu_ids.data(), + external_source_counts, + displacements, + handle.get_stream()); + thrust::sort_by_key(handle.get_thrust_policy(), + active_majors.begin(), + active_majors.end(), + active_major_gpu_ids.begin()); + return std::make_tuple(std::move(active_majors), std::move(active_major_gpu_ids)); +} + +template +rmm::device_uvector get_active_major_global_degrees( + raft::handle_t const& handle, + GraphViewType const& graph_view, + const rmm::device_uvector& active_majors, + const rmm::device_uvector& global_out_degrees) +{ + using vertex_t = typename GraphViewType::vertex_type; + using edge_t = typename GraphViewType::edge_type; + using partition_t = matrix_partition_device_view_t; + rmm::device_uvector active_major_degrees(active_majors.size(), handle.get_stream()); + + std::vector id_begin; + std::vector id_end; + std::vector count_offsets; + id_begin.reserve(graph_view.get_number_of_local_adj_matrix_partitions()); + id_end.reserve(graph_view.get_number_of_local_adj_matrix_partitions()); + count_offsets.reserve(graph_view.get_number_of_local_adj_matrix_partitions()); + vertex_t counter{0}; + for (size_t i = 0; i < graph_view.get_number_of_local_adj_matrix_partitions(); ++i) { + auto matrix_partition = partition_t(graph_view.get_matrix_partition_view(i)); + // Starting vertex ids of each partition + id_begin.push_back(matrix_partition.get_major_first()); + id_end.push_back(matrix_partition.get_major_last()); + count_offsets.push_back(counter); + counter += matrix_partition.get_major_size(); + } + rmm::device_uvector vertex_id_begin(id_begin.size(), handle.get_stream()); + rmm::device_uvector vertex_id_end(id_end.size(), handle.get_stream()); + rmm::device_uvector vertex_count_offsets(count_offsets.size(), handle.get_stream()); + raft::update_device( + vertex_id_begin.data(), id_begin.data(), id_begin.size(), handle.get_stream()); + raft::update_device(vertex_id_end.data(), id_end.data(), id_end.size(), handle.get_stream()); + raft::update_device( + vertex_count_offsets.data(), count_offsets.data(), count_offsets.size(), handle.get_stream()); + + thrust::transform(handle.get_thrust_policy(), + active_majors.begin(), + active_majors.end(), + active_major_degrees.begin(), + [id_begin = vertex_id_begin.data(), + id_end = vertex_id_end.data(), + global_out_degrees = global_out_degrees.data(), + vertex_count_offsets = vertex_count_offsets.data(), + count = vertex_id_end.size()] __device__(auto v) { + // Find which partition id did the vertex belong to + auto partition_id = thrust::distance( + id_end, thrust::upper_bound(thrust::seq, id_end, id_end + count, v)); + // starting position of the segment within global_degree_offset + // where the information for partition (partition_id) starts + // vertex_count_offsets[partition_id] + // The relative location of offset information for vertex id v within + // the segment + // v - id_end[partition_id] + auto location_in_segment = v - id_begin[partition_id]; + // read location of global_degree_offset needs to take into account the + // partition offsets because it is a concatenation of all the offsets + // across all partitions + auto location = location_in_segment + vertex_count_offsets[partition_id]; + return global_out_degrees[location]; + }); + return active_major_degrees; +} + +template +std::tuple>, + rmm::device_uvector, + rmm::device_uvector, + rmm::device_uvector, + rmm::device_uvector> +partition_information(raft::handle_t const& handle, GraphViewType const& graph_view) +{ + using vertex_t = typename GraphViewType::vertex_type; + using edge_t = typename GraphViewType::edge_type; + using partition_t = matrix_partition_device_view_t; + + std::vector partitions; + std::vector id_begin; + std::vector id_end; + std::vector hypersparse_begin; + std::vector vertex_count_offsets; + + partitions.reserve(graph_view.get_number_of_local_adj_matrix_partitions()); + id_begin.reserve(graph_view.get_number_of_local_adj_matrix_partitions()); + id_end.reserve(graph_view.get_number_of_local_adj_matrix_partitions()); + hypersparse_begin.reserve(graph_view.get_number_of_local_adj_matrix_partitions()); + vertex_count_offsets.reserve(graph_view.get_number_of_local_adj_matrix_partitions()); + + vertex_t counter{0}; + for (size_t i = 0; i < graph_view.get_number_of_local_adj_matrix_partitions(); ++i) { + partitions.emplace_back(graph_view.get_matrix_partition_view(i)); + auto& matrix_partition = partitions.back(); + + // Starting vertex ids of each partition + id_begin.push_back(matrix_partition.get_major_first()); + id_end.push_back(matrix_partition.get_major_last()); + + auto segment_offsets = graph_view.get_local_adj_matrix_partition_segment_offsets(i); + auto use_dcs = segment_offsets + ? ((*segment_offsets).size() > (num_sparse_segments_per_vertex_partition + 1)) + : false; + if (use_dcs) { + auto major_hypersparse_first = matrix_partition.get_major_first() + + (*segment_offsets)[num_sparse_segments_per_vertex_partition]; + hypersparse_begin.push_back(major_hypersparse_first); + } else { + hypersparse_begin.push_back(matrix_partition.get_major_last()); + } + + // Count of relative position of the vertices + vertex_count_offsets.push_back(counter); + + counter += matrix_partition.get_major_size(); + } + + // Allocate device memory for transfer + rmm::device_uvector matrix_partitions( + graph_view.get_number_of_local_adj_matrix_partitions(), handle.get_stream()); + + rmm::device_uvector major_begin(id_begin.size(), handle.get_stream()); + rmm::device_uvector minor_end(id_end.size(), handle.get_stream()); + rmm::device_uvector hs_begin(hypersparse_begin.size(), handle.get_stream()); + rmm::device_uvector vc_offsets(vertex_count_offsets.size(), handle.get_stream()); + + // Transfer data + raft::update_device( + matrix_partitions.data(), partitions.data(), partitions.size(), handle.get_stream()); + raft::update_device(major_begin.data(), id_begin.data(), id_begin.size(), handle.get_stream()); + raft::update_device(minor_end.data(), id_end.data(), id_end.size(), handle.get_stream()); + raft::update_device(vc_offsets.data(), + vertex_count_offsets.data(), + vertex_count_offsets.size(), + handle.get_stream()); + raft::update_device( + hs_begin.data(), hypersparse_begin.data(), hypersparse_begin.size(), handle.get_stream()); + + return std::make_tuple(std::move(matrix_partitions), + std::move(major_begin), + std::move(minor_end), + std::move(hs_begin), + std::move(vc_offsets)); +} + +template +std::tuple, + rmm::device_uvector, + rmm::device_uvector> +gather_local_edges( + raft::handle_t const& handle, + GraphViewType const& graph_view, + const rmm::device_uvector& active_majors_in_row, + const rmm::device_uvector& active_major_gpu_ids, + EdgeIndexIterator edge_index_first, + typename GraphViewType::edge_type indices_per_major, + const rmm::device_uvector& global_degree_offsets) +{ + static_assert(GraphViewType::is_multi_gpu == true); + using vertex_t = typename GraphViewType::vertex_type; + using edge_t = typename GraphViewType::edge_type; + auto edge_count = active_majors_in_row.size() * indices_per_major; + rmm::device_uvector majors(edge_count, handle.get_stream()); + rmm::device_uvector minors(edge_count, handle.get_stream()); + rmm::device_uvector minor_gpu_ids(edge_count, handle.get_stream()); + vertex_t invalid_vertex_id = graph_view.get_number_of_vertices(); + + auto [partitions, id_begin, id_end, hypersparse_begin, vertex_count_offsets] = + partition_information(handle, graph_view); + + thrust::for_each( + handle.get_thrust_policy(), + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(edge_count), + [edge_index_first, + active_majors = active_majors_in_row.data(), + active_major_gpu_ids = active_major_gpu_ids.data(), + id_begin = id_begin.data(), + id_end = id_end.data(), + id_seg_count = id_begin.size(), + vertex_count_offsets = vertex_count_offsets.data(), + global_degree_offsets = global_degree_offsets.data(), + majors = majors.data(), + minors = minors.data(), + dst_gpu_ids = minor_gpu_ids.data(), + partitions = partitions.data(), + hypersparse_begin = hypersparse_begin.data(), + invalid_vertex_id, + indices_per_major] __device__(auto index) { + // major which this edge index refers to + auto loc = index / indices_per_major; + auto major = active_majors[loc]; + majors[index] = major; + dst_gpu_ids[index] = active_major_gpu_ids[loc]; + + // Find which partition id did the major belong to + auto partition_id = thrust::distance( + id_end, thrust::upper_bound(thrust::seq, id_end, id_end + id_seg_count, major)); + // starting position of the segment within global_degree_offset + // where the information for partition (partition_id) starts + // vertex_count_offsets[partition_id] + // The relative location of offset information for vertex id v within + // the segment + // v - seg[partition_id] + vertex_t location_in_segment; + if (major < hypersparse_begin[partition_id]) { + location_in_segment = major - id_begin[partition_id]; + } else { + auto row_hypersparse_idx = + partitions[partition_id].get_major_hypersparse_idx_from_major_nocheck(major); + if (row_hypersparse_idx) { + location_in_segment = *(row_hypersparse_idx)-id_begin[partition_id]; + } else { + minors[index] = invalid_vertex_id; + return; + } + } + + // csr offset value for vertex v that belongs to partition (partition_id) + auto offset_ptr = partitions[partition_id].get_offsets(); + auto sparse_offset = offset_ptr[location_in_segment]; + auto local_out_degree = offset_ptr[location_in_segment + 1] - sparse_offset; + vertex_t const* adjacency_list = partitions[partition_id].get_indices() + sparse_offset; + // read location of global_degree_offset needs to take into account the + // partition offsets because it is a concatenation of all the offsets + // across all partitions + auto location = location_in_segment + vertex_count_offsets[partition_id]; + auto g_degree_offset = global_degree_offsets[location]; + auto g_dst_index = edge_index_first[index]; + if ((g_dst_index >= g_degree_offset) && (g_dst_index < g_degree_offset + local_out_degree)) { + minors[index] = adjacency_list[g_dst_index - g_degree_offset]; + } else { + minors[index] = invalid_vertex_id; + } + }); + auto input_iter = thrust::make_zip_iterator( + thrust::make_tuple(majors.begin(), minors.begin(), minor_gpu_ids.begin())); + + auto compacted_length = thrust::distance( + input_iter, + thrust::remove_if( + handle.get_thrust_policy(), + input_iter, + input_iter + minors.size(), + minors.begin(), + [invalid_vertex_id] __device__(auto dst) { return (dst == invalid_vertex_id); })); + majors.resize(compacted_length, handle.get_stream()); + minors.resize(compacted_length, handle.get_stream()); + minor_gpu_ids.resize(compacted_length, handle.get_stream()); + return std::make_tuple(std::move(majors), std::move(minors), std::move(minor_gpu_ids)); +} + +} // namespace detail + +} // namespace cugraph diff --git a/cpp/src/structure/graph_impl.cuh b/cpp/src/structure/graph_impl.cuh index ef64e60ac2f..72748fc2e61 100644 --- a/cpp/src/structure/graph_impl.cuh +++ b/cpp/src/structure/graph_impl.cuh @@ -13,9 +13,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + #pragma once -#include #include #include #include @@ -1210,113 +1210,11 @@ graph_t> const& renumber_map, bool destroy) { - auto& comm = handle.get_comms(); - auto const comm_size = comm.get_size(); - auto& row_comm = - this->get_handle_ptr()->get_subcomm(cugraph::partition_2d::key_naming_t().row_name()); - auto const row_comm_size = row_comm.get_size(); - auto& col_comm = - this->get_handle_ptr()->get_subcomm(cugraph::partition_2d::key_naming_t().col_name()); - auto const col_comm_rank = col_comm.get_rank(); - - auto graph_view = this->view(); - - std::vector edgelist_edge_counts(graph_view.get_number_of_local_adj_matrix_partitions(), - size_t{0}); - for (size_t i = 0; i < edgelist_edge_counts.size(); ++i) { - edgelist_edge_counts[i] = - static_cast(graph_view.get_number_of_local_adj_matrix_partition_edges(i)); - } - auto number_of_local_edges = - std::reduce(edgelist_edge_counts.begin(), edgelist_edge_counts.end()); - auto vertex_partition_lasts = graph_view.get_vertex_partition_lasts(); - - rmm::device_uvector edgelist_majors(number_of_local_edges, handle.get_stream()); - rmm::device_uvector edgelist_minors(edgelist_majors.size(), handle.get_stream()); - auto edgelist_weights = this->is_weighted() ? std::make_optional>( - edgelist_majors.size(), handle.get_stream()) - : std::nullopt; - - size_t cur_size{0}; - for (size_t i = 0; i < edgelist_edge_counts.size(); ++i) { - detail::decompress_matrix_partition_to_edgelist( - handle, - matrix_partition_device_view_t( - graph_view.get_matrix_partition_view(i)), - edgelist_majors.data() + cur_size, - edgelist_minors.data() + cur_size, - edgelist_weights ? std::optional{(*edgelist_weights).data() + cur_size} - : std::nullopt, - graph_view.get_local_adj_matrix_partition_segment_offsets(i)); - cur_size += edgelist_edge_counts[i]; - } - - auto local_vertex_first = graph_view.get_local_vertex_first(); - auto local_vertex_last = graph_view.get_local_vertex_last(); + auto result = this->view().decompress_to_edgelist(handle, renumber_map); if (destroy) { *this = graph_t(handle); } - if (renumber_map) { - std::vector h_thresholds(row_comm_size - 1, vertex_t{0}); - for (int i = 0; i < row_comm_size - 1; ++i) { - h_thresholds[i] = graph_view.get_vertex_partition_last(col_comm_rank * row_comm_size + i); - } - rmm::device_uvector d_thresholds(h_thresholds.size(), handle.get_stream()); - raft::update_device( - d_thresholds.data(), h_thresholds.data(), h_thresholds.size(), handle.get_stream()); - - std::vector major_ptrs(edgelist_edge_counts.size()); - std::vector minor_ptrs(major_ptrs.size()); - auto edgelist_intra_partition_segment_offsets = - std::make_optional>>( - major_ptrs.size(), std::vector(row_comm_size + 1, size_t{0})); - size_t cur_size{0}; - for (size_t i = 0; i < graph_view.get_number_of_local_adj_matrix_partitions(); ++i) { - major_ptrs[i] = edgelist_majors.data() + cur_size; - minor_ptrs[i] = edgelist_minors.data() + cur_size; - if (edgelist_weights) { - thrust::sort_by_key(handle.get_thrust_policy(), - minor_ptrs[i], - minor_ptrs[i] + edgelist_edge_counts[i], - thrust::make_zip_iterator(thrust::make_tuple( - major_ptrs[i], (*edgelist_weights).data() + cur_size))); - } else { - thrust::sort_by_key(handle.get_thrust_policy(), - minor_ptrs[i], - minor_ptrs[i] + edgelist_edge_counts[i], - major_ptrs[i]); - } - rmm::device_uvector d_segment_offsets(d_thresholds.size(), handle.get_stream()); - thrust::lower_bound(handle.get_thrust_policy(), - minor_ptrs[i], - minor_ptrs[i] + edgelist_edge_counts[i], - d_thresholds.begin(), - d_thresholds.end(), - d_segment_offsets.begin(), - thrust::less{}); - (*edgelist_intra_partition_segment_offsets)[i][0] = size_t{0}; - (*edgelist_intra_partition_segment_offsets)[i].back() = edgelist_edge_counts[i]; - raft::update_host((*edgelist_intra_partition_segment_offsets)[i].data() + 1, - d_segment_offsets.data(), - d_segment_offsets.size(), - handle.get_stream()); - handle.sync_stream(); - cur_size += edgelist_edge_counts[i]; - } - - unrenumber_local_int_edges( - handle, - store_transposed ? minor_ptrs : major_ptrs, - store_transposed ? major_ptrs : minor_ptrs, - edgelist_edge_counts, - (*renumber_map).data(), - vertex_partition_lasts, - edgelist_intra_partition_segment_offsets); - } - - return std::make_tuple(store_transposed ? std::move(edgelist_minors) : std::move(edgelist_majors), - store_transposed ? std::move(edgelist_majors) : std::move(edgelist_minors), - std::move(edgelist_weights)); + return result; } template > const& renumber_map, bool destroy) { - auto graph_view = this->view(); - - rmm::device_uvector edgelist_majors( - graph_view.get_number_of_local_adj_matrix_partition_edges(), handle.get_stream()); - rmm::device_uvector edgelist_minors(edgelist_majors.size(), handle.get_stream()); - auto edgelist_weights = this->is_weighted() ? std::make_optional>( - edgelist_majors.size(), handle.get_stream()) - : std::nullopt; - detail::decompress_matrix_partition_to_edgelist( - handle, - matrix_partition_device_view_t( - graph_view.get_matrix_partition_view()), - edgelist_majors.data(), - edgelist_minors.data(), - edgelist_weights ? std::optional{(*edgelist_weights).data()} : std::nullopt, - graph_view.get_local_adj_matrix_partition_segment_offsets()); + auto result = this->view().decompress_to_edgelist(handle, renumber_map); if (destroy) { *this = graph_t(handle); } - if (renumber_map) { - unrenumber_local_int_edges( - handle, - store_transposed ? edgelist_minors.data() : edgelist_majors.data(), - store_transposed ? edgelist_majors.data() : edgelist_minors.data(), - edgelist_majors.size(), - (*renumber_map).data(), - (*renumber_map).size()); - } - - return std::make_tuple(store_transposed ? std::move(edgelist_minors) : std::move(edgelist_majors), - store_transposed ? std::move(edgelist_majors) : std::move(edgelist_minors), - std::move(edgelist_weights)); + return result; } } // namespace cugraph diff --git a/cpp/src/structure/graph_view_impl.cuh b/cpp/src/structure/graph_view_impl.cuh index b282a898e5a..1eef31ae806 100644 --- a/cpp/src/structure/graph_view_impl.cuh +++ b/cpp/src/structure/graph_view_impl.cuh @@ -13,9 +13,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + #pragma once +#include #include +#include #include #include #include @@ -329,10 +332,10 @@ edge_t count_matrix_partition_multi_edges( count_matrix_partition_multi_edges_block_size, handle.get_device_properties().maxGridSize[0]); - for_all_major_for_all_nbr_high_degree<<>>( + cugraph::for_all_major_for_all_nbr_high_degree<<>>( matrix_partition, matrix_partition.get_major_first(), matrix_partition.get_major_first() + (*segment_offsets)[1], @@ -343,10 +346,10 @@ edge_t count_matrix_partition_multi_edges( count_matrix_partition_multi_edges_block_size, handle.get_device_properties().maxGridSize[0]); - for_all_major_for_all_nbr_mid_degree<<>>( + cugraph::for_all_major_for_all_nbr_mid_degree<<>>( matrix_partition, matrix_partition.get_major_first() + (*segment_offsets)[1], matrix_partition.get_major_first() + (*segment_offsets)[2], @@ -949,4 +952,167 @@ edge_t graph_view_tget_local_adj_matrix_partition_segment_offsets()); } +template +std::tuple, + rmm::device_uvector, + std::optional>> +graph_view_t>:: + decompress_to_edgelist(raft::handle_t const& handle, + std::optional> const& renumber_map) const +{ + auto& comm = handle.get_comms(); + auto const comm_size = comm.get_size(); + auto& row_comm = + this->get_handle_ptr()->get_subcomm(cugraph::partition_2d::key_naming_t().row_name()); + auto const row_comm_size = row_comm.get_size(); + auto& col_comm = + this->get_handle_ptr()->get_subcomm(cugraph::partition_2d::key_naming_t().col_name()); + auto const col_comm_rank = col_comm.get_rank(); + + std::vector edgelist_edge_counts(get_number_of_local_adj_matrix_partitions(), size_t{0}); + for (size_t i = 0; i < edgelist_edge_counts.size(); ++i) { + edgelist_edge_counts[i] = + static_cast(get_number_of_local_adj_matrix_partition_edges(i)); + } + auto number_of_local_edges = + std::reduce(edgelist_edge_counts.begin(), edgelist_edge_counts.end()); + auto vertex_partition_lasts = get_vertex_partition_lasts(); + + rmm::device_uvector edgelist_majors(number_of_local_edges, handle.get_stream()); + rmm::device_uvector edgelist_minors(edgelist_majors.size(), handle.get_stream()); + auto edgelist_weights = this->is_weighted() ? std::make_optional>( + edgelist_majors.size(), handle.get_stream()) + : std::nullopt; + + size_t cur_size{0}; + for (size_t i = 0; i < edgelist_edge_counts.size(); ++i) { + detail::decompress_matrix_partition_to_edgelist( + handle, + matrix_partition_device_view_t( + get_matrix_partition_view(i)), + edgelist_majors.data() + cur_size, + edgelist_minors.data() + cur_size, + edgelist_weights ? std::optional{(*edgelist_weights).data() + cur_size} + : std::nullopt, + get_local_adj_matrix_partition_segment_offsets(i)); + cur_size += edgelist_edge_counts[i]; + } + + auto local_vertex_first = get_local_vertex_first(); + auto local_vertex_last = get_local_vertex_last(); + + if (renumber_map) { + std::vector h_thresholds(row_comm_size - 1, vertex_t{0}); + for (int i = 0; i < row_comm_size - 1; ++i) { + h_thresholds[i] = get_vertex_partition_last(col_comm_rank * row_comm_size + i); + } + rmm::device_uvector d_thresholds(h_thresholds.size(), handle.get_stream()); + raft::update_device( + d_thresholds.data(), h_thresholds.data(), h_thresholds.size(), handle.get_stream()); + + std::vector major_ptrs(edgelist_edge_counts.size()); + std::vector minor_ptrs(major_ptrs.size()); + auto edgelist_intra_partition_segment_offsets = + std::make_optional>>( + major_ptrs.size(), std::vector(row_comm_size + 1, size_t{0})); + size_t cur_size{0}; + for (size_t i = 0; i < get_number_of_local_adj_matrix_partitions(); ++i) { + major_ptrs[i] = edgelist_majors.data() + cur_size; + minor_ptrs[i] = edgelist_minors.data() + cur_size; + if (edgelist_weights) { + thrust::sort_by_key(handle.get_thrust_policy(), + minor_ptrs[i], + minor_ptrs[i] + edgelist_edge_counts[i], + thrust::make_zip_iterator(thrust::make_tuple( + major_ptrs[i], (*edgelist_weights).data() + cur_size))); + } else { + thrust::sort_by_key(handle.get_thrust_policy(), + minor_ptrs[i], + minor_ptrs[i] + edgelist_edge_counts[i], + major_ptrs[i]); + } + rmm::device_uvector d_segment_offsets(d_thresholds.size(), handle.get_stream()); + thrust::lower_bound(handle.get_thrust_policy(), + minor_ptrs[i], + minor_ptrs[i] + edgelist_edge_counts[i], + d_thresholds.begin(), + d_thresholds.end(), + d_segment_offsets.begin(), + thrust::less{}); + (*edgelist_intra_partition_segment_offsets)[i][0] = size_t{0}; + (*edgelist_intra_partition_segment_offsets)[i].back() = edgelist_edge_counts[i]; + raft::update_host((*edgelist_intra_partition_segment_offsets)[i].data() + 1, + d_segment_offsets.data(), + d_segment_offsets.size(), + handle.get_stream()); + handle.sync_stream(); + cur_size += edgelist_edge_counts[i]; + } + + unrenumber_local_int_edges( + handle, + store_transposed ? minor_ptrs : major_ptrs, + store_transposed ? major_ptrs : minor_ptrs, + edgelist_edge_counts, + (*renumber_map).data(), + vertex_partition_lasts, + edgelist_intra_partition_segment_offsets); + } + + return std::make_tuple(store_transposed ? std::move(edgelist_minors) : std::move(edgelist_majors), + store_transposed ? std::move(edgelist_majors) : std::move(edgelist_minors), + std::move(edgelist_weights)); +} + +template +std::tuple, + rmm::device_uvector, + std::optional>> +graph_view_t>:: + decompress_to_edgelist(raft::handle_t const& handle, + std::optional> const& renumber_map) const +{ + rmm::device_uvector edgelist_majors(get_number_of_local_adj_matrix_partition_edges(), + handle.get_stream()); + rmm::device_uvector edgelist_minors(edgelist_majors.size(), handle.get_stream()); + auto edgelist_weights = this->is_weighted() ? std::make_optional>( + edgelist_majors.size(), handle.get_stream()) + : std::nullopt; + detail::decompress_matrix_partition_to_edgelist( + handle, + matrix_partition_device_view_t( + get_matrix_partition_view()), + edgelist_majors.data(), + edgelist_minors.data(), + edgelist_weights ? std::optional{(*edgelist_weights).data()} : std::nullopt, + get_local_adj_matrix_partition_segment_offsets()); + + if (renumber_map) { + unrenumber_local_int_edges( + handle, + store_transposed ? edgelist_minors.data() : edgelist_majors.data(), + store_transposed ? edgelist_majors.data() : edgelist_minors.data(), + edgelist_majors.size(), + (*renumber_map).data(), + (*renumber_map).size()); + } + + return std::make_tuple(store_transposed ? std::move(edgelist_minors) : std::move(edgelist_majors), + store_transposed ? std::move(edgelist_majors) : std::move(edgelist_minors), + std::move(edgelist_weights)); +} + } // namespace cugraph diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index d37c0bb2edc..ff5af6ba2e4 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -1,4 +1,4 @@ -# +#============================================================================= # Copyright (c) 2019-2022, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -575,6 +575,10 @@ if(BUILD_CUGRAPH_MG_TESTS) ########################################################################################### # - MG PRIMS EXTRACT_IF_E tests ----------------------------------------------------------- ConfigureTestMG(MG_EXTRACT_IF_E_TEST prims/mg_extract_if_e.cu) + + ########################################################################################### + # - MG GATHER_UTILS tests ----------------------------------------------------------------- + ConfigureTestMG(MG_GATHER_UTILS_TEST sampling/detail/mg_gather_utils.cu) else() message(FATAL_ERROR "OpenMPI NOT found, cannot build MG tests.") endif() diff --git a/cpp/tests/sampling/detail/mg_gather_utils.cu b/cpp/tests/sampling/detail/mg_gather_utils.cu new file mode 100644 index 00000000000..2c9b0486ea6 --- /dev/null +++ b/cpp/tests/sampling/detail/mg_gather_utils.cu @@ -0,0 +1,496 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include + +template +rmm::device_uvector random_vertex_ids(raft::handle_t const& handle, + vertex_t begin, + vertex_t end, + vertex_t count, + int repetitions_per_vertex = 0) +{ + auto& comm = handle.get_comms(); + auto const comm_rank = comm.get_rank(); + vertex_t number_of_vertices = end - begin; + + rmm::device_uvector vertices( + std::max((repetitions_per_vertex + 1) * number_of_vertices, count), handle.get_stream()); + thrust::tabulate( + handle.get_thrust_policy(), + vertices.begin(), + vertices.end(), + [begin, number_of_vertices] __device__(auto v) { return begin + (v % number_of_vertices); }); + thrust::default_random_engine g; + g.seed(comm_rank); + thrust::shuffle(handle.get_thrust_policy(), vertices.begin(), vertices.end(), g); + vertices.resize(count, handle.get_stream()); + return vertices; +} + +template +std::tuple, rmm::device_uvector, rmm::device_uvector> +create_segmented_data(raft::handle_t const& handle, + vertex_t invalid_vertex_id, + rmm::device_uvector const& out_degrees) +{ + rmm::device_uvector offset(out_degrees.size() + 1, handle.get_stream()); + // no need for sync since gather call is on stream + offset.set_element_to_zero_async(0, handle.get_stream()); + thrust::inclusive_scan( + handle.get_thrust_policy(), out_degrees.begin(), out_degrees.end(), offset.begin() + 1); + auto total_edge_count = offset.back_element(handle.get_stream()); + rmm::device_uvector segmented_sources(total_edge_count, handle.get_stream()); + rmm::device_uvector segmented_sequence(total_edge_count, handle.get_stream()); + thrust::fill( + handle.get_thrust_policy(), segmented_sources.begin(), segmented_sources.end(), vertex_t{0}); + thrust::fill( + handle.get_thrust_policy(), segmented_sequence.begin(), segmented_sequence.end(), edge_t{1}); + thrust::for_each(handle.get_thrust_policy(), + thrust::counting_iterator(0), + thrust::counting_iterator(offset.size()), + [offset = offset.data(), + source_count = out_degrees.size(), + src = segmented_sources.data(), + seq = segmented_sequence.data()] __device__(auto index) { + auto location = offset[index]; + if (index == 0) { + seq[location] = edge_t{0}; + } else { + seq[location] = offset[index - 1] - offset[index] + 1; + } + if (index < source_count) { src[location] = index; } + }); + thrust::inclusive_scan(handle.get_thrust_policy(), + segmented_sequence.begin(), + segmented_sequence.end(), + segmented_sequence.begin()); + thrust::inclusive_scan(handle.get_thrust_policy(), + segmented_sources.begin(), + segmented_sources.end(), + segmented_sources.begin(), + thrust::maximum()); + return std::make_tuple( + std::move(offset), std::move(segmented_sources), std::move(segmented_sequence)); +} + +template +std::tuple, + rmm::device_uvector> +sg_gather_edges(raft::handle_t const& handle, + GraphViewType const& graph_view, + VertexIterator vertex_input_first, + VertexIterator vertex_input_last, + EdgeIndexIterator edge_index_first, + typename GraphViewType::vertex_type invalid_vertex_id, + typename GraphViewType::edge_type indices_per_source) +{ + static_assert(GraphViewType::is_adj_matrix_transposed == false); + using vertex_t = typename GraphViewType::vertex_type; + using edge_t = typename GraphViewType::edge_type; + using weight_t = typename GraphViewType::weight_type; + auto source_count = thrust::distance(vertex_input_first, vertex_input_last); + auto edge_count = source_count * indices_per_source; + rmm::device_uvector sources(edge_count, handle.get_stream()); + rmm::device_uvector destinations(edge_count, handle.get_stream()); + auto matrix_partition = + cugraph::matrix_partition_device_view_t( + graph_view.get_matrix_partition_view()); + thrust::for_each(handle.get_thrust_policy(), + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(edge_count), + [vertex_input_first, + indices_per_source, + edge_index_first, + sources = sources.data(), + destinations = destinations.data(), + offsets = matrix_partition.get_offsets(), + indices = matrix_partition.get_indices(), + invalid_vertex_id] __device__(auto index) { + auto source = vertex_input_first[index / indices_per_source]; + sources[index] = source; + auto source_offset = offsets[source]; + auto degree = offsets[source + 1] - source_offset; + auto e_index = edge_index_first[index]; + if (e_index < degree) { + destinations[index] = indices[source_offset + e_index]; + } else { + destinations[index] = invalid_vertex_id; + } + }); + auto input_iter = + thrust::make_zip_iterator(thrust::make_tuple(sources.begin(), destinations.begin())); + auto compacted_length = thrust::distance( + input_iter, + thrust::remove_if( + handle.get_thrust_policy(), + input_iter, + input_iter + destinations.size(), + destinations.begin(), + [invalid_vertex_id] __device__(auto dst) { return (dst == invalid_vertex_id); })); + sources.resize(compacted_length, handle.get_stream()); + destinations.resize(compacted_length, handle.get_stream()); + return std::make_tuple(std::move(sources), std::move(destinations)); +} + +template +void sort_coo(raft::handle_t const& handle, + rmm::device_uvector& src, + rmm::device_uvector& dst) +{ + thrust::sort_by_key(handle.get_thrust_policy(), dst.begin(), dst.end(), src.begin()); + thrust::sort_by_key(handle.get_thrust_policy(), src.begin(), src.end(), dst.begin()); +} + +template +rmm::device_uvector generate_random_destination_indices( + raft::handle_t const& handle, + const rmm::device_uvector& out_degrees, + vertex_t invalid_vertex_id, + edge_t invalid_destination_index, + edge_t indices_per_source) +{ + auto [random_source_offsets, segmented_source_ids, segmented_sequence] = + create_segmented_data(handle, invalid_vertex_id, out_degrees); + // Generate random weights to shuffle sequence of destination indices + rmm::device_uvector random_weights(segmented_sequence.size(), handle.get_stream()); + auto& row_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().row_name()); + auto const row_rank = row_comm.get_rank(); + auto& comm = handle.get_comms(); + auto const comm_rank = comm.get_rank(); + auto force_seed = 0; + thrust::transform(handle.get_thrust_policy(), + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(random_weights.size()), + random_weights.begin(), + [force_seed] __device__(auto index) { + thrust::default_random_engine g; + g.seed(force_seed); + thrust::uniform_int_distribution dist; + g.discard(index); + return dist(g); + }); + thrust::sort_by_key( + handle.get_thrust_policy(), + thrust::make_zip_iterator( + thrust::make_tuple(segmented_source_ids.begin(), random_weights.begin())), + thrust::make_zip_iterator(thrust::make_tuple(segmented_source_ids.end(), random_weights.end())), + segmented_sequence.begin(), + [] __device__(auto left, auto right) { return left < right; }); + + rmm::device_uvector dst_index(indices_per_source * out_degrees.size(), + handle.get_stream()); + + thrust::for_each(handle.get_thrust_policy(), + thrust::counting_iterator(0), + thrust::counting_iterator(out_degrees.size()), + [offset = random_source_offsets.data(), + dst_index = dst_index.data(), + seg_seq = segmented_sequence.data(), + k = indices_per_source, + invalid_destination_index] __device__(auto index) { + auto length = thrust::minimum()(offset[index + 1] - offset[index], k); + auto source_offset = offset[index]; + // copy first k valid destination indices. If k is larger + // than out degree then stop at out degree to avoid + // out of bounds access + for (edge_t i = 0; i < length; ++i) { + dst_index[index * k + i] = seg_seq[source_offset + i]; + } + // If requested number of destination indices is larger than + // out degree then write out invalid destination index + for (edge_t i = length; i < k; ++i) { + dst_index[index * k + i] = invalid_destination_index; + } + }); + return dst_index; +} + +struct Prims_Usecase { + bool check_correctness{true}; +}; + +template +class Tests_MG_GatherEdges + : public ::testing::TestWithParam> { + public: + Tests_MG_GatherEdges() {} + static void SetupTestCase() {} + static void TearDownTestCase() {} + + virtual void SetUp() {} + virtual void TearDown() {} + + template + void run_current_test(Prims_Usecase const& prims_usecase, input_usecase_t const& input_usecase) + { + // 1. initialize handle + + raft::handle_t handle{}; + HighResClock hr_clock{}; + + raft::comms::initialize_mpi_comms(&handle, MPI_COMM_WORLD); + auto& comm = handle.get_comms(); + auto const comm_size = comm.get_size(); + auto const comm_rank = comm.get_rank(); + + auto row_comm_size = static_cast(sqrt(static_cast(comm_size))); + while (comm_size % row_comm_size != 0) { + --row_comm_size; + } + cugraph::partition_2d::subcomm_factory_t + subcomm_factory(handle, row_comm_size); + + // 2. create MG graph + + if (cugraph::test::g_perf) { + RAFT_CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + handle.get_comms().barrier(); + hr_clock.start(); + } + + constexpr bool sort_adjacency_list = true; + + auto [mg_graph, mg_renumber_map_labels] = + cugraph::test::construct_graph( + handle, input_usecase, true, true, false, sort_adjacency_list); + + if (cugraph::test::g_perf) { + RAFT_CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + handle.get_comms().barrier(); + double elapsed_time{0.0}; + hr_clock.stop(&elapsed_time); + std::cout << "MG construct_graph took " << elapsed_time * 1e-6 << " s.\n"; + } + + auto mg_graph_view = mg_graph.view(); + constexpr edge_t indices_per_source = 2; + constexpr vertex_t repetitions_per_vertex = 5; + constexpr vertex_t source_sample_count = 3; + + // 3. Gather mnmg call + // Generate random vertex ids in the range of current gpu + + auto [global_degree_offset, global_out_degrees] = + cugraph::detail::get_global_degree_information(handle, mg_graph_view); + + // Generate random sources to gather on + auto random_sources = random_vertex_ids(handle, + mg_graph_view.get_local_vertex_first(), + mg_graph_view.get_local_vertex_last(), + source_sample_count, + repetitions_per_vertex); + rmm::device_uvector random_source_gpu_ids(random_sources.size(), handle.get_stream()); + thrust::fill(handle.get_thrust_policy(), + random_source_gpu_ids.begin(), + random_source_gpu_ids.end(), + comm_rank); + + auto [active_sources_in_row, active_source_gpu_ids] = + cugraph::detail::gather_active_sources_in_row(handle, + mg_graph_view, + random_sources.cbegin(), + random_sources.cend(), + random_source_gpu_ids.cbegin()); + + // get source global out degrees to generate indices + auto active_source_degrees = cugraph::detail::get_active_major_global_degrees( + handle, mg_graph_view, active_sources_in_row, global_out_degrees); + + auto random_destination_indices = + generate_random_destination_indices(handle, + active_source_degrees, + mg_graph_view.get_number_of_vertices(), + mg_graph_view.get_number_of_edges(), + indices_per_source); + + auto [src, dst, gpu_ids] = + cugraph::detail::gather_local_edges(handle, + mg_graph_view, + active_sources_in_row, + active_source_gpu_ids, + random_destination_indices.cbegin(), + indices_per_source, + global_degree_offset); + + if (prims_usecase.check_correctness) { + // Gather outputs + auto mg_out_srcs = cugraph::test::device_gatherv(handle, src.data(), src.size()); + auto mg_out_dsts = cugraph::test::device_gatherv(handle, dst.data(), dst.size()); + + // Gather inputs + auto& col_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().col_name()); + auto const col_rank = col_comm.get_rank(); + auto sg_random_srcs = cugraph::test::device_gatherv( + handle, active_sources_in_row.data(), col_rank == 0 ? active_sources_in_row.size() : 0); + auto sg_random_dst_indices = + cugraph::test::device_gatherv(handle, + random_destination_indices.data(), + col_rank == 0 ? random_destination_indices.size() : 0); + + // Gather input graph edgelist + rmm::device_uvector sg_src(0, handle.get_stream()); + rmm::device_uvector sg_dst(0, handle.get_stream()); + std::tie(sg_src, sg_dst, std::ignore) = + mg_graph_view.decompress_to_edgelist(handle, std::nullopt); + + auto aggregated_sg_src = cugraph::test::device_gatherv(handle, sg_src.begin(), sg_src.size()); + auto aggregated_sg_dst = cugraph::test::device_gatherv(handle, sg_dst.begin(), sg_dst.size()); + + sort_coo(handle, mg_out_srcs, mg_out_dsts); + + if (handle.get_comms().get_rank() == int{0}) { + cugraph::graph_t sg_graph(handle); + auto aggregated_edge_iter = thrust::make_zip_iterator( + thrust::make_tuple(aggregated_sg_src.begin(), aggregated_sg_dst.begin())); + thrust::sort(handle.get_thrust_policy(), + aggregated_edge_iter, + aggregated_edge_iter + aggregated_sg_src.size()); + auto sg_graph_properties = + cugraph::graph_properties_t{mg_graph_view.is_symmetric(), mg_graph_view.is_multigraph()}; + + std::tie(sg_graph, std::ignore) = + cugraph::create_graph_from_edgelist( + handle, + std::nullopt, + std::move(aggregated_sg_src), + std::move(aggregated_sg_dst), + std::nullopt, + sg_graph_properties, + false); + auto sg_graph_view = sg_graph.view(); + // Call single gpu gather + auto [sg_out_srcs, sg_out_dsts] = sg_gather_edges(handle, + sg_graph_view, + sg_random_srcs.begin(), + sg_random_srcs.end(), + sg_random_dst_indices.begin(), + sg_graph_view.get_number_of_vertices(), + indices_per_source); + sort_coo(handle, sg_out_srcs, sg_out_dsts); + + auto passed = thrust::equal( + handle.get_thrust_policy(), sg_out_srcs.begin(), sg_out_srcs.end(), mg_out_srcs.begin()); + passed &= thrust::equal( + handle.get_thrust_policy(), sg_out_dsts.begin(), sg_out_dsts.end(), mg_out_dsts.begin()); + ASSERT_TRUE(passed); + } + } + } +}; + +using Tests_MG_GatherEdges_File = Tests_MG_GatherEdges; + +using Tests_MG_GatherEdges_Rmat = Tests_MG_GatherEdges; + +TEST_P(Tests_MG_GatherEdges_File, CheckInt32Int32Float) +{ + auto param = GetParam(); + run_current_test(std::get<0>(param), std::get<1>(param)); +} + +TEST_P(Tests_MG_GatherEdges_File, CheckInt32Int64Float) +{ + auto param = GetParam(); + run_current_test(std::get<0>(param), std::get<1>(param)); +} + +TEST_P(Tests_MG_GatherEdges_File, CheckInt64Int64Float) +{ + auto param = GetParam(); + run_current_test(std::get<0>(param), std::get<1>(param)); +} + +TEST_P(Tests_MG_GatherEdges_Rmat, CheckInt32Int32Float) +{ + auto param = GetParam(); + run_current_test(std::get<0>(param), std::get<1>(param)); +} + +TEST_P(Tests_MG_GatherEdges_Rmat, CheckInt32Int64Float) +{ + auto param = GetParam(); + run_current_test(std::get<0>(param), std::get<1>(param)); +} + +TEST_P(Tests_MG_GatherEdges_Rmat, CheckInt64Int64Float) +{ + auto param = GetParam(); + run_current_test(std::get<0>(param), std::get<1>(param)); +} + +INSTANTIATE_TEST_SUITE_P( + file_test, + Tests_MG_GatherEdges_File, + ::testing::Combine( + ::testing::Values(Prims_Usecase{true}), + ::testing::Values(cugraph::test::File_Usecase("test/datasets/karate.mtx"), + cugraph::test::File_Usecase("test/datasets/web-Google.mtx"), + cugraph::test::File_Usecase("test/datasets/ljournal-2008.mtx"), + cugraph::test::File_Usecase("test/datasets/webbase-1M.mtx")))); + +INSTANTIATE_TEST_SUITE_P( + rmat_small_test, + Tests_MG_GatherEdges_Rmat, + ::testing::Combine(::testing::Values(Prims_Usecase{false}), + ::testing::Values(cugraph::test::Rmat_Usecase( + 10, 16, 0.57, 0.19, 0.19, 0, false, false, 0, true)))); + +INSTANTIATE_TEST_SUITE_P( + rmat_benchmark_test, /* note that scale & edge factor can be overridden in benchmarking (with + --gtest_filter to select only the rmat_benchmark_test with a specific + vertex & edge type combination) by command line arguments and do not + include more than one Rmat_Usecase that differ only in scale or edge + factor (to avoid running same benchmarks more than once) */ + Tests_MG_GatherEdges_Rmat, + ::testing::Combine(::testing::Values(Prims_Usecase{false}), + ::testing::Values(cugraph::test::Rmat_Usecase( + 20, 32, 0.57, 0.19, 0.19, 0, false, false, 0, true)))); + +CUGRAPH_MG_TEST_PROGRAM_MAIN()