From 1a86f5ba817f29bffc45bd488b336c14f4d5d0e2 Mon Sep 17 00:00:00 2001 From: Kumar Aatish Date: Tue, 31 Aug 2021 14:55:45 -0400 Subject: [PATCH 1/4] First commit --- .../cugraph/prims/transform_reduce_e.cuh | 1 + cpp/tests/CMakeLists.txt | 4 + cpp/tests/community/mg_louvain_helper.cu | 29 +- cpp/tests/community/mg_louvain_test.cpp | 1 + cpp/tests/prims/mg_transform_reduce_e.cu | 445 ++++++++++++++++++ 5 files changed, 467 insertions(+), 13 deletions(-) create mode 100644 cpp/tests/prims/mg_transform_reduce_e.cu diff --git a/cpp/include/cugraph/prims/transform_reduce_e.cuh b/cpp/include/cugraph/prims/transform_reduce_e.cuh index 000800a9862..f28fbe47581 100644 --- a/cpp/include/cugraph/prims/transform_reduce_e.cuh +++ b/cpp/include/cugraph/prims/transform_reduce_e.cuh @@ -19,6 +19,7 @@ #include #include #include +#include #include #include diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index cf8cfd94c1c..ce33d563f08 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -474,6 +474,10 @@ if(BUILD_CUGRAPH_MG_TESTS) ########################################################################################### # - MG PRIMS TRANSFORM_REDUCE_V tests ----------------------------------------------------- ConfigureTestMG(MG_TRANSFORM_REDUCE_V_TEST prims/mg_transform_reduce_v.cu) + + ########################################################################################### + # - MG PRIMS TRANSFORM_REDUCE_E tests ----------------------------------------------------- + ConfigureTestMG(MG_TRANSFORM_REDUCE_E_TEST prims/mg_transform_reduce_e.cu) else() message(FATAL_ERROR "OpenMPI NOT found, cannot build MG tests.") endif() diff --git a/cpp/tests/community/mg_louvain_helper.cu b/cpp/tests/community/mg_louvain_helper.cu index 5909ab177cd..43d382b4041 100644 --- a/cpp/tests/community/mg_louvain_helper.cu +++ b/cpp/tests/community/mg_louvain_helper.cu @@ -64,13 +64,14 @@ template std::tuple, rmm::device_uvector, std::optional>> -compressed_sparse_to_edgelist(edge_t const* compressed_sparse_offsets, +compressed_sparse_to_edgelist(raft::handle_t const& handle, + edge_t const* compressed_sparse_offsets, vertex_t const* compressed_sparse_indices, std::optional compressed_sparse_weights, vertex_t major_first, - vertex_t major_last, - cudaStream_t stream) + vertex_t major_last) { + cudaStream_t stream = handle.get_stream(); edge_t number_of_edges{0}; raft::update_host( &number_of_edges, compressed_sparse_offsets + (major_last - major_first), 1, stream); @@ -114,11 +115,12 @@ compressed_sparse_to_edgelist(edge_t const* compressed_sparse_offsets, template void sort_and_coarsen_edgelist( + raft::handle_t const& handle, rmm::device_uvector& edgelist_major_vertices /* [INOUT] */, rmm::device_uvector& edgelist_minor_vertices /* [INOUT] */, - std::optional>& edgelist_weights /* [INOUT] */, - cudaStream_t stream) + std::optional>& edgelist_weights /* [INOUT] */) { + cudaStream_t stream = handle.get_stream(); auto pair_first = thrust::make_zip_iterator( thrust::make_tuple(edgelist_major_vertices.begin(), edgelist_minor_vertices.begin())); @@ -171,6 +173,7 @@ std::tuple, rmm::device_uvector, std::optional>> compressed_sparse_to_relabeled_and_sorted_and_coarsened_edgelist( + raft::handle_t const& handle, edge_t const* compressed_sparse_offsets, vertex_t const* compressed_sparse_indices, std::optional compressed_sparse_weights, @@ -179,19 +182,19 @@ compressed_sparse_to_relabeled_and_sorted_and_coarsened_edgelist( vertex_t major_first, vertex_t major_last, vertex_t minor_first, - vertex_t minor_last, - cudaStream_t stream) + vertex_t minor_last) { + cudaStream_t stream = handle.get_stream(); // FIXME: it might be possible to directly create relabled & coarsened edgelist from the // compressed sparse format to save memory auto [edgelist_major_vertices, edgelist_minor_vertices, edgelist_weights] = - compressed_sparse_to_edgelist(compressed_sparse_offsets, + compressed_sparse_to_edgelist(handle, + compressed_sparse_offsets, compressed_sparse_indices, compressed_sparse_weights, major_first, - major_last, - stream); + major_last); auto pair_first = thrust::make_zip_iterator( thrust::make_tuple(edgelist_major_vertices.begin(), edgelist_minor_vertices.begin())); @@ -206,7 +209,7 @@ compressed_sparse_to_relabeled_and_sorted_and_coarsened_edgelist( }); sort_and_coarsen_edgelist( - edgelist_major_vertices, edgelist_minor_vertices, edgelist_weights, stream); + handle, edgelist_major_vertices, edgelist_minor_vertices, edgelist_weights); return std::make_tuple(std::move(edgelist_major_vertices), std::move(edgelist_minor_vertices), @@ -226,6 +229,7 @@ coarsen_graph( coarsened_edgelist_minor_vertices, coarsened_edgelist_weights] = compressed_sparse_to_relabeled_and_sorted_and_coarsened_edgelist( + handle, graph_view.get_matrix_partition_view().get_offsets(), graph_view.get_matrix_partition_view().get_indices(), graph_view.get_matrix_partition_view().get_weights(), @@ -234,8 +238,7 @@ coarsen_graph( vertex_t{0}, graph_view.get_number_of_vertices(), vertex_t{0}, - graph_view.get_number_of_vertices(), - handle.get_stream()); + graph_view.get_number_of_vertices()); cugraph::edgelist_t edgelist{}; edgelist.p_src_vertices = store_transposed ? coarsened_edgelist_minor_vertices.data() diff --git a/cpp/tests/community/mg_louvain_test.cpp b/cpp/tests/community/mg_louvain_test.cpp index 4ceacba2acd..05a7fd902d6 100644 --- a/cpp/tests/community/mg_louvain_test.cpp +++ b/cpp/tests/community/mg_louvain_test.cpp @@ -30,6 +30,7 @@ #include #include +#include #include diff --git a/cpp/tests/prims/mg_transform_reduce_e.cu b/cpp/tests/prims/mg_transform_reduce_e.cu new file mode 100644 index 00000000000..db4474041d3 --- /dev/null +++ b/cpp/tests/prims/mg_transform_reduce_e.cu @@ -0,0 +1,445 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include + +#include + +template +struct property_transform : public thrust::unary_function> { + int mod{}; + property_transform(int mod_count) : mod(mod_count) {} + constexpr __device__ auto operator()(const vertex_t& val) + { + cuco::detail::MurmurHash3_32 hash_func{}; + auto value = hash_func(val) % mod; + return thrust::make_tuple(static_cast(value)...); + } +}; +template typename Tuple, typename... Args> +struct property_transform> : public property_transform { +}; + +template +auto make_iterator_tuple(Tuple& data, std::index_sequence) +{ + return thrust::make_tuple((std::get(data).begin())...); +} + +template +auto get_zip_iterator(std::tuple& data) +{ + return thrust::make_zip_iterator(make_iterator_tuple( + data, std::make_index_sequence>::value>())); +} + +template +auto get_property_iterator(std::tuple& data) +{ + return (std::get<0>(data)).begin(); +} + +template +auto get_property_iterator(std::tuple& data) +{ + return get_zip_iterator(data); +} + +template +struct generate_impl { + static thrust::tuple initial_value(int init) + { + return thrust::make_tuple(static_cast(init)...); + } + template + static std::tuple...> vertex_property(rmm::device_uvector& labels, + int hash_bin_count, + raft::handle_t const& handle) + { + auto data = std::make_tuple(rmm::device_uvector(labels.size(), handle.get_stream())...); + auto zip = get_zip_iterator(data); + thrust::transform(handle.get_thrust_policy(), + labels.begin(), + labels.end(), + zip, + property_transform(hash_bin_count)); + return data; + } + template + static std::tuple...> vertex_property(thrust::counting_iterator begin, + thrust::counting_iterator end, + int hash_bin_count, + raft::handle_t const& handle) + { + auto length = thrust::distance(begin, end); + auto data = std::make_tuple(rmm::device_uvector(length, handle.get_stream())...); + auto zip = get_zip_iterator(data); + thrust::transform(handle.get_thrust_policy(), + begin, + end, + zip, + property_transform(hash_bin_count)); + return data; + } + template + static constexpr void copy_property_impl(Op&& op, T1&& t1, T2&& t2, std::index_sequence) + { + (op(std::get(t1), std::get(t2)), ...); + } + + template + static void + copy_property(Tuple const& property, + Tuple& output_property, + Op op) + { + copy_property_impl(op, property, output_property, std::make_index_sequence::value>()); + } + + template + static std::tuple...> + column_property(raft::handle_t const& handle, + GraphViewType const& graph_view, + std::tuple...>& property) + { + if (true) { + std::cerr<<"ERR DEBUG MESSAGE "<(graph_view.get_number_of_local_adj_matrix_partition_cols(), handle.get_stream())...); + copy_property(property, output_property, + [&handle, &graph_view] (const auto& in, auto& out) { + copy_to_adj_matrix_col(handle, graph_view, in.begin(), out.begin()); + } + ); + return output_property; + } + + template + static std::tuple...> + row_property(raft::handle_t const& handle, + GraphViewType const& graph_view, + std::tuple...>& property) + { + if (true) { + std::cerr<<"ERR DEBUG MESSAGE "<(graph_view.get_number_of_local_adj_matrix_partition_rows(), handle.get_stream())...); + copy_property(property, output_property, + [&handle, &graph_view] (const auto& in, auto& out) { + copy_to_adj_matrix_row(handle, graph_view, in.begin(), out.begin()); + } + ); + return output_property; + } + +}; + +template +struct result_compare { + static constexpr double threshold_ratio{1e-3}; + constexpr auto operator()(const T& t1, const T& t2) + { + if constexpr (std::is_floating_point_v) { + return std::abs(t1 - t2) < (std::max(t1, t2) * threshold_ratio); + } + return t1 == t2; + } +}; + +template +struct result_compare> { + static constexpr double threshold_ratio{1e-3}; + + using Type = thrust::tuple; + constexpr auto operator()(const Type& t1, const Type& t2) + { + return equality_impl(t1, t2, std::make_index_sequence::value>()); + } + + private: + template + constexpr bool equal(T t1, T t2) + { + if constexpr (std::is_floating_point_v) { + return std::abs(t1 - t2) < (std::max(t1, t2) * threshold_ratio); + } + return t1 == t2; + } + template + constexpr auto equality_impl(T& t1, T& t2, std::index_sequence) + { + return (... && (equal(thrust::get(t1), thrust::get(t2)))); + } +}; + +template +struct generate : public generate_impl { + static T initial_value(int init) { return static_cast(init); } +}; +template +struct generate> : public generate_impl { +}; + +struct Prims_Usecase { + bool check_correctness{true}; + bool test_weighted{false}; +}; + +template +class Tests_MG_TransformReduceE + : public ::testing::TestWithParam> { + public: + Tests_MG_TransformReduceE() {} + static void SetupTestCase() {} + static void TearDownTestCase() {} + + virtual void SetUp() {} + virtual void TearDown() {} + + // Compare the results of reduce_if_v primitive and thrust reduce on a single GPU + template + void run_current_test(Prims_Usecase const& prims_usecase, input_usecase_t const& input_usecase) + { + // 1. initialize handle + + raft::handle_t handle{}; + HighResClock hr_clock{}; + + raft::comms::initialize_mpi_comms(&handle, MPI_COMM_WORLD); + auto& comm = handle.get_comms(); + auto const comm_size = comm.get_size(); + auto const comm_rank = comm.get_rank(); + + auto row_comm_size = static_cast(sqrt(static_cast(comm_size))); + while (comm_size % row_comm_size != 0) { + --row_comm_size; + } + cugraph::partition_2d::subcomm_factory_t + subcomm_factory(handle, row_comm_size); + + // 2. create MG graph + + if (cugraph::test::g_perf) { + CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + handle.get_comms().barrier(); + hr_clock.start(); + } + auto [mg_graph, d_mg_renumber_map_labels] = + input_usecase.template construct_graph( + handle, prims_usecase.test_weighted, true); + + if (cugraph::test::g_perf) { + CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + handle.get_comms().barrier(); + double elapsed_time{0.0}; + hr_clock.stop(&elapsed_time); + std::cout << "MG construct_graph took " << elapsed_time * 1e-6 << " s.\n"; + } + + auto mg_graph_view = mg_graph.view(); + + // 3. run MG transform reduce + + const int hash_bin_count = 5; + const int initial_value = 10; + + auto property_initial_value = generate::initial_value(initial_value); + using property_t = decltype(property_initial_value); + auto vertex_property_data = + generate::vertex_property((*d_mg_renumber_map_labels), hash_bin_count, handle); + auto col_prop = generate::column_property(handle, mg_graph_view, vertex_property_data); + auto row_prop = generate::row_property(handle, mg_graph_view, vertex_property_data); + auto col_property_iter = get_property_iterator(col_prop); + auto row_property_iter = get_property_iterator(row_prop); + + if (cugraph::test::g_perf) { + CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + handle.get_comms().barrier(); + hr_clock.start(); + } + + auto result = transform_reduce_e( + handle, mg_graph_view, + row_property_iter, + col_property_iter, + [] __device__(auto row, auto col, weight_t wt, auto row_property, auto col_property) { + if (row_property < col_property) { + return row_property; + } else { + return col_property; + } + }, + property_initial_value); + + if (cugraph::test::g_perf) { + CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + handle.get_comms().barrier(); + double elapsed_time{0.0}; + hr_clock.stop(&elapsed_time); + std::cout << "MG transform reduce took " << elapsed_time * 1e-6 << " s.\n"; + } + + //// 4. compare SG & MG results + + if (prims_usecase.check_correctness) { +// auto [sg_graph, d_sg_renumber_map_labels] = +// input_usecase.template construct_graph( +// handle, true, false); +// auto sg_graph_view = sg_graph.view(); +// +// auto sg_vertex_property_data = +// generate::vertex_property((*d_sg_renumber_map_labels), hash_bin_count, handle); +// auto sg_col_prop = generate::column_property(handle, sg_graph_view, sg_vertex_property_data); +// auto sg_row_prop = generate::row_property(handle, sg_graph_view, sg_vertex_property_data); +// auto sg_col_property_iter = get_property_iterator(sg_col_prop); +// auto sg_row_property_iter = get_property_iterator(sg_row_prop); +// +// auto expected_result = transform_reduce_e( +// handle, sg_graph_view, +// sg_row_property_iter, +// sg_col_property_iter, +// [] __device__(auto row, auto col, weight_t wt, auto row_property, auto col_property) { +// if (row_property < col_property) { +// return row_property; +// } else { +// return col_property; +// } +// }, +// property_initial_value); +// result_compare compare{}; +// ASSERT_TRUE(compare(expected_result, result)); +// + } + } +}; + +using Tests_MG_TransformReduceE_File = Tests_MG_TransformReduceE; +using Tests_MG_TransformReduceE_Rmat = Tests_MG_TransformReduceE; + +TEST_P(Tests_MG_TransformReduceE_File, CheckInt32Int32FloatTupleIntFloatTransposeFalse) +{ + auto param = GetParam(); + run_current_test, false>(std::get<0>(param), + std::get<1>(param)); +} + +TEST_P(Tests_MG_TransformReduceE_Rmat, CheckInt32Int32FloatTupleIntFloatTransposeFalse) +{ + auto param = GetParam(); + run_current_test, false>( + std::get<0>(param), + cugraph::test::override_Rmat_Usecase_with_cmd_line_arguments(std::get<1>(param))); +} + +TEST_P(Tests_MG_TransformReduceE_File, CheckInt32Int32FloatTupleIntFloatTransposeTrue) +{ + auto param = GetParam(); + run_current_test, true>(std::get<0>(param), + std::get<1>(param)); +} + +TEST_P(Tests_MG_TransformReduceE_Rmat, CheckInt32Int32FloatTupleIntFloatTransposeTrue) +{ + auto param = GetParam(); + run_current_test, true>( + std::get<0>(param), + cugraph::test::override_Rmat_Usecase_with_cmd_line_arguments(std::get<1>(param))); +} + +TEST_P(Tests_MG_TransformReduceE_File, CheckInt32Int32FloatTransposeFalse) +{ + auto param = GetParam(); + run_current_test(std::get<0>(param), std::get<1>(param)); +} + +TEST_P(Tests_MG_TransformReduceE_Rmat, CheckInt32Int32FloatTransposeFalse) +{ + auto param = GetParam(); + run_current_test( + std::get<0>(param), + cugraph::test::override_Rmat_Usecase_with_cmd_line_arguments(std::get<1>(param))); +} + +TEST_P(Tests_MG_TransformReduceE_File, CheckInt32Int32FloatTransposeTrue) +{ + auto param = GetParam(); + run_current_test(std::get<0>(param), std::get<1>(param)); +} + +TEST_P(Tests_MG_TransformReduceE_Rmat, CheckInt32Int32FloatTransposeTrue) +{ + auto param = GetParam(); + run_current_test( + std::get<0>(param), + cugraph::test::override_Rmat_Usecase_with_cmd_line_arguments(std::get<1>(param))); +} + +INSTANTIATE_TEST_SUITE_P( + file_test, + Tests_MG_TransformReduceE_File, + ::testing::Combine( + ::testing::Values(Prims_Usecase{true}), + ::testing::Values(cugraph::test::File_Usecase("test/datasets/karate.mtx"), + cugraph::test::File_Usecase("test/datasets/web-Google.mtx"), + cugraph::test::File_Usecase("test/datasets/ljournal-2008.mtx"), + cugraph::test::File_Usecase("test/datasets/webbase-1M.mtx")))); + +INSTANTIATE_TEST_SUITE_P( + rmat_small_test, + Tests_MG_TransformReduceE_Rmat, + ::testing::Combine(::testing::Values(Prims_Usecase{true}), + ::testing::Values(cugraph::test::Rmat_Usecase( + 10, 16, 0.57, 0.19, 0.19, 0, false, false, 0, true)))); + +INSTANTIATE_TEST_SUITE_P( + rmat_large_test, + Tests_MG_TransformReduceE_Rmat, + ::testing::Combine(::testing::Values(Prims_Usecase{false}), + ::testing::Values(cugraph::test::Rmat_Usecase( + 20, 32, 0.57, 0.19, 0.19, 0, false, false, 0, true)))); + +CUGRAPH_MG_TEST_PROGRAM_MAIN() From ce4633f2bb3bfc5854a4a662511c0f1ca2049d55 Mon Sep 17 00:00:00 2001 From: Kumar Aatish Date: Fri, 3 Sep 2021 16:07:02 -0400 Subject: [PATCH 2/4] Fix block reduce of thrust tuples Templating fixes in thrust tuple utils --- .../prims/copy_to_adj_matrix_row_col.cuh | 6 +- .../copy_v_transform_reduce_in_out_nbr.cuh | 4 +- ...ransform_reduce_key_aggregated_out_nbr.cuh | 21 +- .../cugraph/prims/property_op_utils.cuh | 4 +- ...orm_reduce_by_adj_matrix_row_col_key_e.cuh | 30 +-- .../cugraph/prims/transform_reduce_e.cuh | 53 ++-- .../update_frontier_v_push_if_out_nbr.cuh | 103 ++++--- .../cugraph/utilities/collect_comm.cuh | 8 +- .../cugraph/utilities/dataframe_buffer.cuh | 164 ++++-------- .../cugraph/utilities/shuffle_comm.cuh | 66 +++-- .../cugraph/utilities/thrust_tuple_utils.cuh | 46 +++- cpp/src/community/louvain.cuh | 36 ++- .../components/weakly_connected_components.cu | 61 ++--- cpp/tests/community/mg_louvain_helper.cu | 2 +- cpp/tests/community/mg_louvain_test.cpp | 2 +- cpp/tests/prims/mg_transform_reduce_e.cu | 251 +++++++++--------- 16 files changed, 396 insertions(+), 461 deletions(-) diff --git a/cpp/include/cugraph/prims/copy_to_adj_matrix_row_col.cuh b/cpp/include/cugraph/prims/copy_to_adj_matrix_row_col.cuh index af5081a33d1..09deafa344b 100644 --- a/cpp/include/cugraph/prims/copy_to_adj_matrix_row_col.cuh +++ b/cpp/include/cugraph/prims/copy_to_adj_matrix_row_col.cuh @@ -156,8 +156,7 @@ void copy_to_matrix_major(raft::handle_t const& handle, auto rx_tmp_buffer = allocate_dataframe_buffer< typename std::iterator_traits::value_type>(rx_counts[i], handle.get_stream()); - auto rx_value_first = get_dataframe_buffer_begin< - typename std::iterator_traits::value_type>(rx_tmp_buffer); + auto rx_value_first = get_dataframe_buffer_begin(rx_tmp_buffer); if (col_comm_rank == i) { auto vertex_partition = @@ -347,8 +346,7 @@ void copy_to_matrix_minor(raft::handle_t const& handle, auto rx_tmp_buffer = allocate_dataframe_buffer< typename std::iterator_traits::value_type>(rx_counts[i], handle.get_stream()); - auto rx_value_first = get_dataframe_buffer_begin< - typename std::iterator_traits::value_type>(rx_tmp_buffer); + auto rx_value_first = get_dataframe_buffer_begin(rx_tmp_buffer); if (row_comm_rank == i) { auto vertex_partition = diff --git a/cpp/include/cugraph/prims/copy_v_transform_reduce_in_out_nbr.cuh b/cpp/include/cugraph/prims/copy_v_transform_reduce_in_out_nbr.cuh index 117e7525c25..11a2856f133 100644 --- a/cpp/include/cugraph/prims/copy_v_transform_reduce_in_out_nbr.cuh +++ b/cpp/include/cugraph/prims/copy_v_transform_reduce_in_out_nbr.cuh @@ -428,7 +428,7 @@ void copy_v_transform_reduce_nbr(raft::handle_t const& handle, : graph_view.get_number_of_local_adj_matrix_partition_cols() : vertex_t{0}; auto minor_tmp_buffer = allocate_dataframe_buffer(minor_tmp_buffer_size, handle.get_stream()); - auto minor_buffer_first = get_dataframe_buffer_begin(minor_tmp_buffer); + auto minor_buffer_first = get_dataframe_buffer_begin(minor_tmp_buffer); if (in != GraphViewType::is_adj_matrix_transposed) { auto minor_init = init; @@ -463,7 +463,7 @@ void copy_v_transform_reduce_nbr(raft::handle_t const& handle, GraphViewType::is_multi_gpu && update_major ? matrix_partition.get_major_size() : vertex_t{0}; auto major_tmp_buffer = allocate_dataframe_buffer(major_tmp_buffer_size, handle.get_stream()); - auto major_buffer_first = get_dataframe_buffer_begin(major_tmp_buffer); + auto major_buffer_first = get_dataframe_buffer_begin(major_tmp_buffer); auto major_init = T{}; if (update_major) { diff --git a/cpp/include/cugraph/prims/copy_v_transform_reduce_key_aggregated_out_nbr.cuh b/cpp/include/cugraph/prims/copy_v_transform_reduce_key_aggregated_out_nbr.cuh index f7f9dae9dd7..3a1e41c45ea 100644 --- a/cpp/include/cugraph/prims/copy_v_transform_reduce_key_aggregated_out_nbr.cuh +++ b/cpp/include/cugraph/prims/copy_v_transform_reduce_key_aggregated_out_nbr.cuh @@ -334,7 +334,7 @@ void copy_v_transform_reduce_key_aggregated_out_nbr( handle.get_stream()); device_bcast(row_comm, map_value_first, - get_dataframe_buffer_begin(map_value_buffer) + map_displacements[i], + get_dataframe_buffer_begin(map_value_buffer) + map_displacements[i], map_counts[i], i, handle.get_stream()); @@ -346,11 +346,10 @@ void copy_v_transform_reduce_key_aggregated_out_nbr( map_key_first, map_key_last, map_keys.begin() + map_displacements[row_comm_rank]); - thrust::copy( - execution_policy, - map_value_first, - map_value_first + thrust::distance(map_key_first, map_key_last), - get_dataframe_buffer_begin(map_value_buffer) + map_displacements[row_comm_rank]); + thrust::copy(execution_policy, + map_value_first, + map_value_first + thrust::distance(map_key_first, map_key_last), + get_dataframe_buffer_begin(map_value_buffer) + map_displacements[row_comm_rank]); handle.get_stream_view().synchronize(); // cuco::static_map currently does not take stream @@ -366,7 +365,7 @@ void copy_v_transform_reduce_key_aggregated_out_nbr( stream_adapter); auto pair_first = thrust::make_zip_iterator( - thrust::make_tuple(map_keys.begin(), get_dataframe_buffer_begin(map_value_buffer))); + thrust::make_tuple(map_keys.begin(), get_dataframe_buffer_begin(map_value_buffer))); kv_map_ptr->insert(pair_first, pair_first + map_keys.size()); } else { handle.get_stream_view().synchronize(); // cuco::static_map currently does not take stream @@ -544,7 +543,7 @@ void copy_v_transform_reduce_key_aggregated_out_nbr( auto tmp_e_op_result_buffer = allocate_dataframe_buffer(tmp_major_vertices.size(), handle.get_stream()); - auto tmp_e_op_result_buffer_first = get_dataframe_buffer_begin(tmp_e_op_result_buffer); + auto tmp_e_op_result_buffer_first = get_dataframe_buffer_begin(tmp_e_op_result_buffer); auto triplet_first = thrust::make_zip_iterator(thrust::make_tuple( tmp_major_vertices.begin(), tmp_minor_keys.begin(), tmp_key_aggregated_edge_weights.begin())); @@ -603,7 +602,7 @@ void copy_v_transform_reduce_key_aggregated_out_nbr( handle.get_stream()); device_gatherv(col_comm, tmp_e_op_result_buffer_first, - get_dataframe_buffer_begin(rx_tmp_e_op_result_buffer), + get_dataframe_buffer_begin(rx_tmp_e_op_result_buffer), tmp_major_vertices.size(), rx_sizes, rx_displs, @@ -643,7 +642,7 @@ void copy_v_transform_reduce_key_aggregated_out_nbr( thrust::sort_by_key(execution_policy, major_vertices.begin(), major_vertices.end(), - get_dataframe_buffer_begin(e_op_result_buffer)); + get_dataframe_buffer_begin(e_op_result_buffer)); auto num_uniques = thrust::count_if( execution_policy, @@ -671,7 +670,7 @@ void copy_v_transform_reduce_key_aggregated_out_nbr( execution_policy, major_vertices.begin(), major_vertices.end(), - get_dataframe_buffer_begin(e_op_result_buffer), + get_dataframe_buffer_begin(e_op_result_buffer), thrust::make_discard_iterator(), thrust::make_permutation_iterator( vertex_value_output_first, diff --git a/cpp/include/cugraph/prims/property_op_utils.cuh b/cpp/include/cugraph/prims/property_op_utils.cuh index e164b14ecf2..ceb7ed82571 100644 --- a/cpp/include/cugraph/prims/property_op_utils.cuh +++ b/cpp/include/cugraph/prims/property_op_utils.cuh @@ -133,13 +133,13 @@ struct property_add> private: template - __device__ constexpr auto sum_impl(T& t1, T& t2, std::index_sequence) + __host__ __device__ constexpr auto sum_impl(T& t1, T& t2, std::index_sequence) { return thrust::make_tuple((thrust::get(t1) + thrust::get(t2))...); } public: - __device__ constexpr auto operator()(const Type& t1, const Type& t2) + __host__ __device__ constexpr auto operator()(const Type& t1, const Type& t2) { return sum_impl(t1, t2, std::make_index_sequence::value>()); } diff --git a/cpp/include/cugraph/prims/transform_reduce_by_adj_matrix_row_col_key_e.cuh b/cpp/include/cugraph/prims/transform_reduce_by_adj_matrix_row_col_key_e.cuh index 70a9afa32c0..2657f274e5b 100644 --- a/cpp/include/cugraph/prims/transform_reduce_by_adj_matrix_row_col_key_e.cuh +++ b/cpp/include/cugraph/prims/transform_reduce_by_adj_matrix_row_col_key_e.cuh @@ -319,10 +319,8 @@ template std::tuple, BufferType> reduce_to_unique_kv_pairs( rmm::device_uvector&& keys, BufferType&& value_buffer, cudaStream_t stream) { - thrust::sort_by_key(rmm::exec_policy(stream), - keys.begin(), - keys.end(), - get_dataframe_buffer_begin(value_buffer)); + thrust::sort_by_key( + rmm::exec_policy(stream), keys.begin(), keys.end(), get_dataframe_buffer_begin(value_buffer)); auto num_uniques = thrust::count_if(rmm::exec_policy(stream), thrust::make_counting_iterator(size_t{0}), @@ -336,9 +334,9 @@ std::tuple, BufferType> reduce_to_unique_kv_pairs( thrust::reduce_by_key(rmm::exec_policy(stream), keys.begin(), keys.end(), - get_dataframe_buffer_begin(value_buffer), + get_dataframe_buffer_begin(value_buffer), unique_keys.begin(), - get_dataframe_buffer_begin(value_for_unique_key_buffer)); + get_dataframe_buffer_begin(value_for_unique_key_buffer)); return std::make_tuple(std::move(unique_keys), std::move(value_for_unique_key_buffer)); } @@ -420,7 +418,7 @@ transform_reduce_by_adj_matrix_row_col_key_e( (adj_matrix_row_key ? row_value_input_offset : col_value_input_offset), e_op, tmp_keys.data(), - get_dataframe_buffer_begin(tmp_value_buffer)); + get_dataframe_buffer_begin(tmp_value_buffer)); } if ((*segment_offsets)[2] - (*segment_offsets)[1] > 0) { raft::grid_1d_warp_t update_grid( @@ -438,7 +436,7 @@ transform_reduce_by_adj_matrix_row_col_key_e( (adj_matrix_row_key ? row_value_input_offset : col_value_input_offset), e_op, tmp_keys.data(), - get_dataframe_buffer_begin(tmp_value_buffer)); + get_dataframe_buffer_begin(tmp_value_buffer)); } if ((*segment_offsets)[3] - (*segment_offsets)[2] > 0) { raft::grid_1d_thread_t update_grid( @@ -456,7 +454,7 @@ transform_reduce_by_adj_matrix_row_col_key_e( (adj_matrix_row_key ? row_value_input_offset : col_value_input_offset), e_op, tmp_keys.data(), - get_dataframe_buffer_begin(tmp_value_buffer)); + get_dataframe_buffer_begin(tmp_value_buffer)); } if (matrix_partition.get_dcs_nzd_vertex_count() && (*(matrix_partition.get_dcs_nzd_vertex_count()) > 0)) { @@ -474,7 +472,7 @@ transform_reduce_by_adj_matrix_row_col_key_e( (adj_matrix_row_key ? row_value_input_offset : col_value_input_offset), e_op, tmp_keys.data(), - get_dataframe_buffer_begin(tmp_value_buffer)); + get_dataframe_buffer_begin(tmp_value_buffer)); } } else { raft::grid_1d_thread_t update_grid( @@ -493,7 +491,7 @@ transform_reduce_by_adj_matrix_row_col_key_e( (adj_matrix_row_key ? row_value_input_offset : col_value_input_offset), e_op, tmp_keys.data(), - get_dataframe_buffer_begin(tmp_value_buffer)); + get_dataframe_buffer_begin(tmp_value_buffer)); } } std::tie(tmp_keys, tmp_value_buffer) = reduce_to_unique_kv_pairs( @@ -510,7 +508,7 @@ transform_reduce_by_adj_matrix_row_col_key_e( comm, tmp_keys.begin(), tmp_keys.end(), - get_dataframe_buffer_begin(tmp_value_buffer), + get_dataframe_buffer_begin(tmp_value_buffer), [key_func = detail::compute_gpu_id_from_vertex_t{comm_size}] __device__( auto val) { return key_func(val); }, handle.get_stream()); @@ -528,14 +526,14 @@ transform_reduce_by_adj_matrix_row_col_key_e( // can reserve address space to avoid expensive reallocation. // https://devblogs.nvidia.com/introducing-low-level-gpu-virtual-memory-management keys.resize(cur_size + tmp_keys.size(), handle.get_stream()); - resize_dataframe_buffer(value_buffer, keys.size(), handle.get_stream()); + resize_dataframe_buffer(value_buffer, keys.size(), handle.get_stream()); auto execution_policy = handle.get_thrust_policy(); thrust::copy(execution_policy, tmp_keys.begin(), tmp_keys.end(), keys.begin() + cur_size); thrust::copy(execution_policy, - get_dataframe_buffer_begin(tmp_value_buffer), - get_dataframe_buffer_begin(tmp_value_buffer) + tmp_keys.size(), - get_dataframe_buffer_begin(value_buffer) + cur_size); + get_dataframe_buffer_begin(tmp_value_buffer), + get_dataframe_buffer_begin(tmp_value_buffer) + tmp_keys.size(), + get_dataframe_buffer_begin(value_buffer) + cur_size); } } diff --git a/cpp/include/cugraph/prims/transform_reduce_e.cuh b/cpp/include/cugraph/prims/transform_reduce_e.cuh index f28fbe47581..7c28d67c3a8 100644 --- a/cpp/include/cugraph/prims/transform_reduce_e.cuh +++ b/cpp/include/cugraph/prims/transform_reduce_e.cuh @@ -16,10 +16,10 @@ #pragma once #include +#include #include #include #include -#include #include #include @@ -65,6 +65,9 @@ __global__ void for_all_major_for_all_nbr_hypersparse( auto dcs_nzd_vertex_count = *(matrix_partition.get_dcs_nzd_vertex_count()); + using BlockReduce = cub::BlockReduce; + __shared__ typename BlockReduce::TempStorage temp_storage; + property_add edge_property_add{}; e_op_result_t e_op_result_sum{}; while (idx < static_cast(dcs_nzd_vertex_count)) { @@ -118,9 +121,7 @@ __global__ void for_all_major_for_all_nbr_hypersparse( idx += gridDim.x * blockDim.x; } - e_op_result_sum = - block_reduce_edge_op_result().compute( - e_op_result_sum); + e_op_result_sum = BlockReduce(temp_storage).Reduce(e_op_result_sum, edge_property_add); if (threadIdx.x == 0) { atomic_accumulate_edge_op_result(result_iter, e_op_result_sum); } } @@ -150,6 +151,9 @@ __global__ void for_all_major_for_all_nbr_low_degree( auto major_start_offset = static_cast(major_first - matrix_partition.get_major_first()); size_t idx = static_cast(tid); + using BlockReduce = cub::BlockReduce; + __shared__ typename BlockReduce::TempStorage temp_storage; + property_add edge_property_add{}; e_op_result_t e_op_result_sum{}; while (idx < static_cast(major_last - major_first)) { @@ -203,9 +207,7 @@ __global__ void for_all_major_for_all_nbr_low_degree( idx += gridDim.x * blockDim.x; } - e_op_result_sum = - block_reduce_edge_op_result().compute( - e_op_result_sum); + e_op_result_sum = BlockReduce(temp_storage).Reduce(e_op_result_sum, edge_property_add); if (threadIdx.x == 0) { atomic_accumulate_edge_op_result(result_iter, e_op_result_sum); } } @@ -237,6 +239,8 @@ __global__ void for_all_major_for_all_nbr_mid_degree( auto major_start_offset = static_cast(major_first - matrix_partition.get_major_first()); size_t idx = static_cast(tid / raft::warp_size()); + using BlockReduce = cub::BlockReduce; + __shared__ typename BlockReduce::TempStorage temp_storage; property_add edge_property_add{}; e_op_result_t e_op_result_sum{}; while (idx < static_cast(major_last - major_first)) { @@ -277,9 +281,7 @@ __global__ void for_all_major_for_all_nbr_mid_degree( idx += gridDim.x * (blockDim.x / raft::warp_size()); } - e_op_result_sum = - block_reduce_edge_op_result().compute( - e_op_result_sum); + e_op_result_sum = BlockReduce(temp_storage).Reduce(e_op_result_sum, edge_property_add); if (threadIdx.x == 0) { atomic_accumulate_edge_op_result(result_iter, e_op_result_sum); } } @@ -308,6 +310,8 @@ __global__ void for_all_major_for_all_nbr_high_degree( auto major_start_offset = static_cast(major_first - matrix_partition.get_major_first()); size_t idx = static_cast(blockIdx.x); + using BlockReduce = cub::BlockReduce; + __shared__ typename BlockReduce::TempStorage temp_storage; property_add edge_property_add{}; e_op_result_t e_op_result_sum{}; while (idx < static_cast(major_last - major_first)) { @@ -348,9 +352,7 @@ __global__ void for_all_major_for_all_nbr_high_degree( idx += gridDim.x; } - e_op_result_sum = - block_reduce_edge_op_result().compute( - e_op_result_sum); + e_op_result_sum = BlockReduce(temp_storage).Reduce(e_op_result_sum, edge_property_add); if (threadIdx.x == 0) { atomic_accumulate_edge_op_result(result_iter, e_op_result_sum); } } @@ -407,10 +409,11 @@ T transform_reduce_e(raft::handle_t const& handle, property_add edge_property_add{}; auto result_buffer = allocate_dataframe_buffer(1, handle.get_stream()); - thrust::fill(handle.get_thrust_policy(), - get_dataframe_buffer_begin(result_buffer), - get_dataframe_buffer_begin(result_buffer) + 1, - T{}); + thrust::fill( + handle.get_thrust_policy(), + get_dataframe_buffer_begin(result_buffer), + get_dataframe_buffer_begin(result_buffer) + 1, + ((GraphViewType::is_multi_gpu) && (handle.get_comms().get_rank() == 0)) ? init : T{}); for (size_t i = 0; i < graph_view.get_number_of_local_adj_matrix_partitions(); ++i) { auto matrix_partition = @@ -440,7 +443,7 @@ T transform_reduce_e(raft::handle_t const& handle, matrix_partition.get_major_first() + (*segment_offsets)[1], adj_matrix_row_value_input_first + row_value_input_offset, adj_matrix_col_value_input_first + col_value_input_offset, - get_dataframe_buffer_begin(result_buffer), + get_dataframe_buffer_begin(result_buffer), e_op); } if ((*segment_offsets)[2] - (*segment_offsets)[1] > 0) { @@ -454,7 +457,7 @@ T transform_reduce_e(raft::handle_t const& handle, matrix_partition.get_major_first() + (*segment_offsets)[2], adj_matrix_row_value_input_first + row_value_input_offset, adj_matrix_col_value_input_first + col_value_input_offset, - get_dataframe_buffer_begin(result_buffer), + get_dataframe_buffer_begin(result_buffer), e_op); } if ((*segment_offsets)[3] - (*segment_offsets)[2] > 0) { @@ -468,7 +471,7 @@ T transform_reduce_e(raft::handle_t const& handle, matrix_partition.get_major_first() + (*segment_offsets)[3], adj_matrix_row_value_input_first + row_value_input_offset, adj_matrix_col_value_input_first + col_value_input_offset, - get_dataframe_buffer_begin(result_buffer), + get_dataframe_buffer_begin(result_buffer), e_op); } if (matrix_partition.get_dcs_nzd_vertex_count() && @@ -482,7 +485,7 @@ T transform_reduce_e(raft::handle_t const& handle, matrix_partition.get_major_first() + (*segment_offsets)[3], adj_matrix_row_value_input_first + row_value_input_offset, adj_matrix_col_value_input_first + col_value_input_offset, - get_dataframe_buffer_begin(result_buffer), + get_dataframe_buffer_begin(result_buffer), e_op); } } else { @@ -498,15 +501,15 @@ T transform_reduce_e(raft::handle_t const& handle, matrix_partition.get_major_last(), adj_matrix_row_value_input_first + row_value_input_offset, adj_matrix_col_value_input_first + col_value_input_offset, - get_dataframe_buffer_begin(result_buffer), + get_dataframe_buffer_begin(result_buffer), e_op); } } } auto result = thrust::reduce(handle.get_thrust_policy(), - get_dataframe_buffer_begin(result_buffer), - get_dataframe_buffer_begin(result_buffer) + 1, + get_dataframe_buffer_begin(result_buffer), + get_dataframe_buffer_begin(result_buffer) + 1, T{}, edge_property_add); @@ -514,7 +517,7 @@ T transform_reduce_e(raft::handle_t const& handle, result = host_scalar_allreduce(handle.get_comms(), result, handle.get_stream()); } - return edge_property_add(init, result); + return result; } } // namespace cugraph diff --git a/cpp/include/cugraph/prims/update_frontier_v_push_if_out_nbr.cuh b/cpp/include/cugraph/prims/update_frontier_v_push_if_out_nbr.cuh index ffa15663376..d06ae30a07e 100644 --- a/cpp/include/cugraph/prims/update_frontier_v_push_if_out_nbr.cuh +++ b/cpp/include/cugraph/prims/update_frontier_v_push_if_out_nbr.cuh @@ -104,7 +104,7 @@ auto get_optional_payload_buffer_begin( std::add_lvalue_reference_t( size_t{0}, cudaStream_t{nullptr}))> optional_payload_buffer) { - return get_dataframe_buffer_begin(optional_payload_buffer); + return get_dataframe_buffer_begin(optional_payload_buffer); } #else auto allocate_optional_payload_buffer = [](size_t size, cudaStream_t stream) { @@ -119,7 +119,7 @@ auto get_optional_payload_buffer_begin = [](auto& optional_payload_buffer) { if constexpr (std::is_same_v) { return static_cast(nullptr); } else { - return get_dataframe_buffer_begin(optional_payload_buffer); + return get_dataframe_buffer_begin(optional_payload_buffer); } }; #endif @@ -571,7 +571,7 @@ size_t sort_and_reduce_buffer_elements(raft::handle_t const& handle, buffer_key_output_first + num_buffer_elements, buffer_payload_output_first, keys.begin(), - get_dataframe_buffer_begin(value_buffer), + get_dataframe_buffer_begin(value_buffer), thrust::equal_to(), reduce_op); num_reduced_buffer_elements = @@ -582,8 +582,8 @@ size_t sort_and_reduce_buffer_elements(raft::handle_t const& handle, keys.begin() + num_reduced_buffer_elements, buffer_key_output_first); thrust::copy(execution_policy, - get_dataframe_buffer_begin(value_buffer), - get_dataframe_buffer_begin(value_buffer) + num_reduced_buffer_elements, + get_dataframe_buffer_begin(value_buffer), + get_dataframe_buffer_begin(value_buffer) + num_reduced_buffer_elements, buffer_payload_output_first); } @@ -890,44 +890,43 @@ void update_frontier_v_push_if_out_nbr( auto& col_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().col_name()); auto const col_comm_rank = col_comm.get_rank(); - resize_dataframe_buffer( + resize_dataframe_buffer( matrix_partition_frontier_key_buffer, matrix_partition_frontier_size, handle.get_stream()); if (static_cast(col_comm_rank) == i) { thrust::copy(handle.get_thrust_policy(), frontier_key_first, frontier_key_last, - get_dataframe_buffer_begin(matrix_partition_frontier_key_buffer)); + get_dataframe_buffer_begin(matrix_partition_frontier_key_buffer)); } device_bcast(col_comm, frontier_key_first, - get_dataframe_buffer_begin(matrix_partition_frontier_key_buffer), + get_dataframe_buffer_begin(matrix_partition_frontier_key_buffer), matrix_partition_frontier_size, i, handle.get_stream()); } else { - resize_dataframe_buffer( + resize_dataframe_buffer( matrix_partition_frontier_key_buffer, matrix_partition_frontier_size, handle.get_stream()); thrust::copy(handle.get_thrust_policy(), frontier_key_first, frontier_key_last, - get_dataframe_buffer_begin(matrix_partition_frontier_key_buffer)); + get_dataframe_buffer_begin(matrix_partition_frontier_key_buffer)); } vertex_t const* matrix_partition_frontier_row_first{nullptr}; vertex_t const* matrix_partition_frontier_row_last{nullptr}; if constexpr (std::is_same_v) { matrix_partition_frontier_row_first = - get_dataframe_buffer_begin(matrix_partition_frontier_key_buffer); + get_dataframe_buffer_begin(matrix_partition_frontier_key_buffer); matrix_partition_frontier_row_last = - get_dataframe_buffer_end(matrix_partition_frontier_key_buffer); + get_dataframe_buffer_end(matrix_partition_frontier_key_buffer); } else { - matrix_partition_frontier_row_first = - thrust::get<0>(get_dataframe_buffer_begin(matrix_partition_frontier_key_buffer) - .get_iterator_tuple()); + matrix_partition_frontier_row_first = thrust::get<0>( + get_dataframe_buffer_begin(matrix_partition_frontier_key_buffer).get_iterator_tuple()); matrix_partition_frontier_row_last = thrust::get<0>( - get_dataframe_buffer_end(matrix_partition_frontier_key_buffer).get_iterator_tuple()); + get_dataframe_buffer_end(matrix_partition_frontier_key_buffer).get_iterator_tuple()); } auto segment_offsets = graph_view.get_local_adj_matrix_partition_segment_offsets(i); @@ -989,9 +988,9 @@ void update_frontier_v_push_if_out_nbr( // FIXME: if i != 0, this will require costly reallocation if we don't use the new CUDA feature // to reserve address space. auto new_buffer_size = buffer_idx.value(handle.get_stream()) + max_pushes; - resize_dataframe_buffer(key_buffer, new_buffer_size, handle.get_stream()); + resize_dataframe_buffer(key_buffer, new_buffer_size, handle.get_stream()); if constexpr (!std::is_same_v) { - resize_dataframe_buffer(payload_buffer, new_buffer_size, handle.get_stream()); + resize_dataframe_buffer(payload_buffer, new_buffer_size, handle.get_stream()); } auto row_value_input_offset = GraphViewType::is_adj_matrix_transposed @@ -1029,11 +1028,11 @@ void update_frontier_v_push_if_out_nbr( detail::for_all_frontier_row_for_all_nbr_high_degree <<>>( matrix_partition, - get_dataframe_buffer_begin(matrix_partition_frontier_key_buffer), - get_dataframe_buffer_begin(matrix_partition_frontier_key_buffer) + h_offsets[0], + get_dataframe_buffer_begin(matrix_partition_frontier_key_buffer), + get_dataframe_buffer_begin(matrix_partition_frontier_key_buffer) + h_offsets[0], adj_matrix_row_value_input_first + row_value_input_offset, adj_matrix_col_value_input_first, - get_dataframe_buffer_begin(key_buffer), + get_dataframe_buffer_begin(key_buffer), detail::get_optional_payload_buffer_begin(payload_buffer), buffer_idx.data(), e_op); @@ -1046,11 +1045,11 @@ void update_frontier_v_push_if_out_nbr( detail::for_all_frontier_row_for_all_nbr_mid_degree <<>>( matrix_partition, - get_dataframe_buffer_begin(matrix_partition_frontier_key_buffer) + h_offsets[0], - get_dataframe_buffer_begin(matrix_partition_frontier_key_buffer) + h_offsets[1], + get_dataframe_buffer_begin(matrix_partition_frontier_key_buffer) + h_offsets[0], + get_dataframe_buffer_begin(matrix_partition_frontier_key_buffer) + h_offsets[1], adj_matrix_row_value_input_first + row_value_input_offset, adj_matrix_col_value_input_first, - get_dataframe_buffer_begin(key_buffer), + get_dataframe_buffer_begin(key_buffer), detail::get_optional_payload_buffer_begin(payload_buffer), buffer_idx.data(), e_op); @@ -1063,11 +1062,11 @@ void update_frontier_v_push_if_out_nbr( detail::for_all_frontier_row_for_all_nbr_low_degree <<>>( matrix_partition, - get_dataframe_buffer_begin(matrix_partition_frontier_key_buffer) + h_offsets[1], - get_dataframe_buffer_begin(matrix_partition_frontier_key_buffer) + h_offsets[2], + get_dataframe_buffer_begin(matrix_partition_frontier_key_buffer) + h_offsets[1], + get_dataframe_buffer_begin(matrix_partition_frontier_key_buffer) + h_offsets[2], adj_matrix_row_value_input_first + row_value_input_offset, adj_matrix_col_value_input_first, - get_dataframe_buffer_begin(key_buffer), + get_dataframe_buffer_begin(key_buffer), detail::get_optional_payload_buffer_begin(payload_buffer), buffer_idx.data(), e_op); @@ -1081,11 +1080,11 @@ void update_frontier_v_push_if_out_nbr( <<>>( matrix_partition, matrix_partition.get_major_first() + (*segment_offsets)[3], - get_dataframe_buffer_begin(matrix_partition_frontier_key_buffer) + h_offsets[2], - get_dataframe_buffer_begin(matrix_partition_frontier_key_buffer) + h_offsets[3], + get_dataframe_buffer_begin(matrix_partition_frontier_key_buffer) + h_offsets[2], + get_dataframe_buffer_begin(matrix_partition_frontier_key_buffer) + h_offsets[3], adj_matrix_row_value_input_first + row_value_input_offset, adj_matrix_col_value_input_first, - get_dataframe_buffer_begin(key_buffer), + get_dataframe_buffer_begin(key_buffer), detail::get_optional_payload_buffer_begin(payload_buffer), buffer_idx.data(), e_op); @@ -1100,11 +1099,11 @@ void update_frontier_v_push_if_out_nbr( detail::for_all_frontier_row_for_all_nbr_low_degree <<>>( matrix_partition, - get_dataframe_buffer_begin(matrix_partition_frontier_key_buffer), - get_dataframe_buffer_end(matrix_partition_frontier_key_buffer), + get_dataframe_buffer_begin(matrix_partition_frontier_key_buffer), + get_dataframe_buffer_end(matrix_partition_frontier_key_buffer), adj_matrix_row_value_input_first + row_value_input_offset, adj_matrix_col_value_input_first, - get_dataframe_buffer_begin(key_buffer), + get_dataframe_buffer_begin(key_buffer), detail::get_optional_payload_buffer_begin(payload_buffer), buffer_idx.data(), e_op); @@ -1131,7 +1130,7 @@ void update_frontier_v_push_if_out_nbr( auto num_buffer_elements = detail::sort_and_reduce_buffer_elements( handle, - get_dataframe_buffer_begin(key_buffer), + get_dataframe_buffer_begin(key_buffer), detail::get_optional_payload_buffer_begin(payload_buffer), buffer_idx.value(handle.get_stream()), reduce_op); @@ -1166,10 +1165,9 @@ void update_frontier_v_push_if_out_nbr( handle.get_stream()); vertex_t const* row_first{nullptr}; if constexpr (std::is_same_v) { - row_first = get_dataframe_buffer_begin(key_buffer); + row_first = get_dataframe_buffer_begin(key_buffer); } else { - row_first = - thrust::get<0>(get_dataframe_buffer_begin(key_buffer).get_iterator_tuple()); + row_first = thrust::get<0>(get_dataframe_buffer_begin(key_buffer).get_iterator_tuple()); } thrust::lower_bound(handle.get_thrust_policy(), row_first, @@ -1189,24 +1187,21 @@ void update_frontier_v_push_if_out_nbr( auto rx_key_buffer = allocate_dataframe_buffer(size_t{0}, handle.get_stream()); std::tie(rx_key_buffer, std::ignore) = shuffle_values( - row_comm, get_dataframe_buffer_begin(key_buffer), tx_counts, handle.get_stream()); + row_comm, get_dataframe_buffer_begin(key_buffer), tx_counts, handle.get_stream()); key_buffer = std::move(rx_key_buffer); if constexpr (!std::is_same_v) { auto rx_payload_buffer = allocate_dataframe_buffer(size_t{0}, handle.get_stream()); - std::tie(rx_payload_buffer, std::ignore) = - shuffle_values(row_comm, - get_dataframe_buffer_begin(payload_buffer), - tx_counts, - handle.get_stream()); + std::tie(rx_payload_buffer, std::ignore) = shuffle_values( + row_comm, get_dataframe_buffer_begin(payload_buffer), tx_counts, handle.get_stream()); payload_buffer = std::move(rx_payload_buffer); } num_buffer_elements = detail::sort_and_reduce_buffer_elements( handle, - get_dataframe_buffer_begin(key_buffer), + get_dataframe_buffer_begin(key_buffer), detail::get_optional_payload_buffer_begin(payload_buffer), - size_dataframe_buffer(key_buffer), + size_dataframe_buffer(key_buffer), reduce_op); // barrier is necessary here to avoid potential overlap (which can leads to deadlock) between @@ -1232,7 +1227,7 @@ void update_frontier_v_push_if_out_nbr( if constexpr (!std::is_same_v) { auto key_payload_pair_first = thrust::make_zip_iterator( - thrust::make_tuple(get_dataframe_buffer_begin(key_buffer), + thrust::make_tuple(get_dataframe_buffer_begin(key_buffer), detail::get_optional_payload_buffer_begin(payload_buffer))); thrust::transform( handle.get_thrust_policy(), @@ -1263,13 +1258,13 @@ void update_frontier_v_push_if_out_nbr( } }); - resize_dataframe_buffer(payload_buffer, size_t{0}, handle.get_stream()); - shrink_to_fit_dataframe_buffer(payload_buffer, handle.get_stream()); + resize_dataframe_buffer(payload_buffer, size_t{0}, handle.get_stream()); + shrink_to_fit_dataframe_buffer(payload_buffer, handle.get_stream()); } else { thrust::transform( handle.get_thrust_policy(), - get_dataframe_buffer_begin(key_buffer), - get_dataframe_buffer_begin(key_buffer) + num_buffer_elements, + get_dataframe_buffer_begin(key_buffer), + get_dataframe_buffer_begin(key_buffer) + num_buffer_elements, bucket_indices.begin(), detail::call_v_op_t(key_buffer))); + thrust::make_tuple(bucket_indices.begin(), get_dataframe_buffer_begin(key_buffer))); bucket_indices.resize( thrust::distance(bucket_key_pair_first, thrust::remove_if(handle.get_thrust_policy(), @@ -1292,13 +1287,13 @@ void update_frontier_v_push_if_out_nbr( bucket_key_pair_first + num_buffer_elements, detail::check_invalid_bucket_idx_t())), handle.get_stream()); - resize_dataframe_buffer(key_buffer, bucket_indices.size(), handle.get_stream()); + resize_dataframe_buffer(key_buffer, bucket_indices.size(), handle.get_stream()); bucket_indices.shrink_to_fit(handle.get_stream()); - shrink_to_fit_dataframe_buffer(key_buffer, handle.get_stream()); + shrink_to_fit_dataframe_buffer(key_buffer, handle.get_stream()); frontier.insert_to_buckets(bucket_indices.begin(), bucket_indices.end(), - get_dataframe_buffer_begin(key_buffer), + get_dataframe_buffer_begin(key_buffer), next_frontier_bucket_indices); } } diff --git a/cpp/include/cugraph/utilities/collect_comm.cuh b/cpp/include/cugraph/utilities/collect_comm.cuh index cabd9fa2273..d1f85148f44 100644 --- a/cpp/include/cugraph/utilities/collect_comm.cuh +++ b/cpp/include/cugraph/utilities/collect_comm.cuh @@ -143,8 +143,7 @@ collect_values_for_keys(raft::comms::comms_t const& comm, auto value_buffer = allocate_dataframe_buffer( thrust::distance(collect_key_first, collect_key_last), stream_view); - kv_map_ptr->find( - collect_key_first, collect_key_last, get_dataframe_buffer_begin(value_buffer)); + kv_map_ptr->find(collect_key_first, collect_key_last, get_dataframe_buffer_begin(value_buffer)); return value_buffer; } @@ -254,9 +253,8 @@ collect_values_for_unique_keys(raft::comms::comms_t const& comm, auto value_buffer = allocate_dataframe_buffer( thrust::distance(collect_unique_key_first, collect_unique_key_last), stream_view); - kv_map_ptr->find(collect_unique_key_first, - collect_unique_key_last, - get_dataframe_buffer_begin(value_buffer)); + kv_map_ptr->find( + collect_unique_key_first, collect_unique_key_last, get_dataframe_buffer_begin(value_buffer)); return value_buffer; } diff --git a/cpp/include/cugraph/utilities/dataframe_buffer.cuh b/cpp/include/cugraph/utilities/dataframe_buffer.cuh index 04c5db91d89..3301f06f238 100644 --- a/cpp/include/cugraph/utilities/dataframe_buffer.cuh +++ b/cpp/include/cugraph/utilities/dataframe_buffer.cuh @@ -30,66 +30,25 @@ namespace cugraph { namespace detail { -template -auto allocate_dataframe_buffer_tuple_element_impl(size_t buffer_size, - rmm::cuda_stream_view stream_view) -{ - using element_t = typename thrust::tuple_element::type; - return rmm::device_uvector(buffer_size, stream_view); -} - template auto allocate_dataframe_buffer_tuple_impl(std::index_sequence, size_t buffer_size, rmm::cuda_stream_view stream_view) { - return std::make_tuple( - allocate_dataframe_buffer_tuple_element_impl(buffer_size, stream_view)...); + return std::make_tuple(rmm::device_uvector::type>( + buffer_size, stream_view)...); } -template -struct resize_dataframe_buffer_tuple_iterator_element_impl { - void run(BufferType& buffer, size_t new_buffer_size, rmm::cuda_stream_view stream_view) - { - std::get(buffer).resize(new_buffer_size, stream_view); - resize_dataframe_buffer_tuple_iterator_element_impl().run( - buffer, new_buffer_size, stream_view); - } -}; - -template -struct resize_dataframe_buffer_tuple_iterator_element_impl { - void run(BufferType& buffer, size_t new_buffer_size, rmm::cuda_stream_view stream_view) {} -}; - -template -struct shrink_to_fit_dataframe_buffer_tuple_iterator_element_impl { - void run(BufferType& buffer, rmm::cuda_stream_view stream_view) - { - std::get(buffer).shrink_to_fit(stream_view); - shrink_to_fit_dataframe_buffer_tuple_iterator_element_impl() - .run(buffer, stream_view); - } -}; - -template -struct shrink_to_fit_dataframe_buffer_tuple_iterator_element_impl { - void run(BufferType& buffer, rmm::cuda_stream_view stream_view) {} -}; - -template -auto get_dataframe_buffer_begin_tuple_element_impl(BufferType& buffer) +template +auto get_dataframe_buffer_begin_tuple_impl(std::index_sequence, TupleType& buffer) { - using element_t = typename thrust::tuple_element::type; - return std::get(buffer).begin(); + return thrust::make_zip_iterator(thrust::make_tuple((std::get(buffer).begin())...)); } -template -auto get_dataframe_buffer_begin_tuple_impl(std::index_sequence, BufferType& buffer) +template +auto get_dataframe_buffer_end_tuple_impl(std::index_sequence, TupleType& buffer) { - // thrust::make_tuple instead of std::make_tuple as this is fed to thrust::make_zip_iterator. - return thrust::make_tuple( - get_dataframe_buffer_begin_tuple_element_impl(buffer)...); + return thrust::make_zip_iterator(thrust::make_tuple((std::get(buffer).end())...)); } template @@ -122,100 +81,73 @@ auto allocate_dataframe_buffer(size_t buffer_size, rmm::cuda_stream_view stream_ std::make_index_sequence(), buffer_size, stream_view); } -template ::value>* = nullptr> -void resize_dataframe_buffer(BufferType& buffer, - size_t new_buffer_size, - rmm::cuda_stream_view stream_view) -{ - buffer.resize(new_buffer_size, stream_view); -} - -template ::value>* = nullptr> -void resize_dataframe_buffer(BufferType& buffer, +template +void resize_dataframe_buffer(Type& buffer, size_t new_buffer_size, rmm::cuda_stream_view stream_view) { - size_t constexpr tuple_size = thrust::tuple_size::value; - detail:: - resize_dataframe_buffer_tuple_iterator_element_impl() - .run(buffer, new_buffer_size, stream_view); -} - -template ::value>* = nullptr> -void shrink_to_fit_dataframe_buffer(BufferType& buffer, rmm::cuda_stream_view stream_view) -{ - buffer.shrink_to_fit(stream_view); -} - -template ::value>* = nullptr> -void shrink_to_fit_dataframe_buffer(BufferType& buffer, rmm::cuda_stream_view stream_view) -{ - size_t constexpr tuple_size = thrust::tuple_size::value; - detail::shrink_to_fit_dataframe_buffer_tuple_iterator_element_impl() - .run(buffer, stream_view); + if constexpr (is_std_tuple_of_arithmetic_vectors::value) { + std::apply([new_buffer_size, + stream_view](auto&&... args) { (args.resize(new_buffer_size, stream_view), ...); }, + buffer); + } else if constexpr (is_arithmetic_vector::value) { + buffer.resize(new_buffer_size, stream_view); + } } -template ::value>* = nullptr> -size_t size_dataframe_buffer(BufferType& buffer) +template +void shrink_to_fit_dataframe_buffer(Type& buffer, rmm::cuda_stream_view stream_view) { - return buffer.size(); + if constexpr (is_std_tuple_of_arithmetic_vectors::value) { + std::apply([stream_view](auto&&... args) { (args.shrink_to_fit(stream_view), ...); }, buffer); + } else if constexpr (is_arithmetic_vector::value) { + buffer.shrink_to_fit(stream_view); + } } -template ::value>* = nullptr> -size_t size_dataframe_buffer(BufferType& buffer) +template +size_t size_dataframe_buffer(Type& buffer) { - return std::get<0>(buffer).size(); + if constexpr (is_std_tuple_of_arithmetic_vectors::value) { + return std::get<0>(buffer).size(); + } else if constexpr (is_arithmetic_vector::value) { + return buffer.size(); + } + return size_t{}; } -template ::value>* = nullptr> +template ::value>* = + nullptr> auto get_dataframe_buffer_begin(BufferType& buffer) { return buffer.begin(); } -template ::value>* = nullptr> +template < + typename BufferType, + typename std::enable_if_t::value>* = nullptr> auto get_dataframe_buffer_begin(BufferType& buffer) { - size_t constexpr tuple_size = thrust::tuple_size::value; - return thrust::make_zip_iterator(detail::get_dataframe_buffer_begin_tuple_impl( - std::make_index_sequence(), buffer)); + return detail::get_dataframe_buffer_begin_tuple_impl( + std::make_index_sequence::value>(), buffer); } -template ::value>* = nullptr> +template ::value>* = + nullptr> auto get_dataframe_buffer_end(BufferType& buffer) { return buffer.end(); } -template ::value>* = nullptr> +template < + typename BufferType, + typename std::enable_if_t::value>* = nullptr> auto get_dataframe_buffer_end(BufferType& buffer) { - size_t constexpr tuple_size = thrust::tuple_size::value; - return thrust::make_zip_iterator( - detail::get_dataframe_buffer_end_tuple_impl(std::make_index_sequence(), buffer)); + return detail::get_dataframe_buffer_end_tuple_impl( + std::make_index_sequence::value>(), buffer); } } // namespace cugraph diff --git a/cpp/include/cugraph/utilities/shuffle_comm.cuh b/cpp/include/cugraph/utilities/shuffle_comm.cuh index 6c4ea3641c0..5fd78dc00ee 100644 --- a/cpp/include/cugraph/utilities/shuffle_comm.cuh +++ b/cpp/include/cugraph/utilities/shuffle_comm.cuh @@ -216,18 +216,16 @@ auto shuffle_values(raft::comms::comms_t const& comm, // FIXME: this needs to be replaced with AlltoAll once NCCL 2.8 is released // (if num_tx_dst_ranks == num_rx_src_ranks == comm_size). - device_multicast_sendrecv( - comm, - tx_value_first, - tx_counts, - tx_offsets, - tx_dst_ranks, - get_dataframe_buffer_begin::value_type>( - rx_value_buffer), - rx_counts, - rx_offsets, - rx_src_ranks, - stream_view); + device_multicast_sendrecv(comm, + tx_value_first, + tx_counts, + tx_offsets, + tx_dst_ranks, + get_dataframe_buffer_begin(rx_value_buffer), + rx_counts, + rx_offsets, + rx_src_ranks, + stream_view); if (rx_counts.size() < static_cast(comm_size)) { std::vector tmp_rx_counts(comm_size, size_t{0}); @@ -268,18 +266,16 @@ auto groupby_gpuid_and_shuffle_values(raft::comms::comms_t const& comm, // FIXME: this needs to be replaced with AlltoAll once NCCL 2.8 is released // (if num_tx_dst_ranks == num_rx_src_ranks == comm_size). - device_multicast_sendrecv( - comm, - tx_value_first, - tx_counts, - tx_offsets, - tx_dst_ranks, - get_dataframe_buffer_begin::value_type>( - rx_value_buffer), - rx_counts, - rx_offsets, - rx_src_ranks, - stream_view); + device_multicast_sendrecv(comm, + tx_value_first, + tx_counts, + tx_offsets, + tx_dst_ranks, + get_dataframe_buffer_begin(rx_value_buffer), + rx_counts, + rx_offsets, + rx_src_ranks, + stream_view); if (rx_counts.size() < static_cast(comm_size)) { std::vector tmp_rx_counts(comm_size, size_t{0}); @@ -335,18 +331,16 @@ auto groupby_gpuid_and_shuffle_kv_pairs(raft::comms::comms_t const& comm, // FIXME: this needs to be replaced with AlltoAll once NCCL 2.8 is released // (if num_tx_dst_ranks == num_rx_src_ranks == comm_size). - device_multicast_sendrecv( - comm, - tx_value_first, - tx_counts, - tx_offsets, - tx_dst_ranks, - get_dataframe_buffer_begin::value_type>( - rx_value_buffer), - rx_counts, - rx_offsets, - rx_src_ranks, - stream_view); + device_multicast_sendrecv(comm, + tx_value_first, + tx_counts, + tx_offsets, + tx_dst_ranks, + get_dataframe_buffer_begin(rx_value_buffer), + rx_counts, + rx_offsets, + rx_src_ranks, + stream_view); if (rx_counts.size() < static_cast(comm_size)) { std::vector tmp_rx_counts(comm_size, size_t{0}); diff --git a/cpp/include/cugraph/utilities/thrust_tuple_utils.cuh b/cpp/include/cugraph/utilities/thrust_tuple_utils.cuh index a46db93f6b3..7f5edaa91d6 100644 --- a/cpp/include/cugraph/utilities/thrust_tuple_utils.cuh +++ b/cpp/include/cugraph/utilities/thrust_tuple_utils.cuh @@ -17,6 +17,7 @@ #include #include +#include #include #include @@ -132,19 +133,44 @@ template struct is_thrust_tuple> : std::true_type { }; -template +template struct is_thrust_tuple_of_arithmetic : std::false_type { }; -template -struct is_thrust_tuple_of_arithmetic::value>> { - static constexpr bool value = - detail::is_thrust_tuple_of_arithemetic_impl( - thrust::tuple_size::value)>() - .evaluate(); +template +struct is_thrust_tuple_of_arithmetic> { + private: + template + static constexpr bool is_valid = std::is_arithmetic_v || std::is_same_v; + + public: + static constexpr bool value = (... && is_valid); +}; + +template +struct is_std_tuple : std::false_type { +}; + +template +struct is_std_tuple> : std::true_type { +}; + +template typename Vector> +struct is_arithmetic_vector : std::false_type { +}; + +template