From 16d705740dc7cc666eb6f1227147e2695e880b68 Mon Sep 17 00:00:00 2001 From: Charles Hastings Date: Mon, 2 Oct 2023 17:49:26 -0700 Subject: [PATCH 01/15] created MNMG/MTMG test code --- cpp/tests/CMakeLists.txt | 6 + cpp/tests/mtmg/multi_node_threaded_test.cu | 603 +++++++++++++++++++++ 2 files changed, 609 insertions(+) create mode 100644 cpp/tests/mtmg/multi_node_threaded_test.cu diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index 2a4bb8ab2a5..45c59149f7f 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -427,6 +427,12 @@ target_link_libraries(MTMG_TEST UCP::UCP ) +ConfigureTest(MTMG_MULTINODE_TEST mtmg/multi_node_threaded_test.cu) +target_link_libraries(MTMG_MULTINODE_TEST + PRIVATE + UCP::UCP + ) + ################################################################################################### # - MG tests -------------------------------------------------------------------------------------- diff --git a/cpp/tests/mtmg/multi_node_threaded_test.cu b/cpp/tests/mtmg/multi_node_threaded_test.cu new file mode 100644 index 00000000000..c4d83764ee1 --- /dev/null +++ b/cpp/tests/mtmg/multi_node_threaded_test.cu @@ -0,0 +1,603 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include + +#include + +#include + +#include +#include +#include + +#include +#include + +struct Multithreaded_Usecase { + bool test_weighted{false}; + bool check_correctness{true}; +}; + +std::string g_comms_dir_name{}; +int g_node_rank{}; +int g_num_nodes{}; +int g_execution_id{0}; + +template +class Tests_Multithreaded + : public ::testing::TestWithParam> { + public: + Tests_Multithreaded() {} + + static void SetUpTestCase() {} + static void TearDownTestCase() {} + + virtual void SetUp() {} + virtual void TearDown() {} + + std::vector get_gpu_list() + { + int num_gpus_per_node{1}; + RAFT_CUDA_TRY(cudaGetDeviceCount(&num_gpus_per_node)); + + std::vector gpu_list(num_gpus_per_node); + std::iota(gpu_list.begin(), gpu_list.end(), 0); + + return gpu_list; + } + + void wait_for_directory(std::string directory_name, int max_tries = 60) + { + while (max_tries > 0) { + if (std::filesystem::is_directory(directory_name)) break; + sleep(1); + --max_tries; + } + + CUGRAPH_EXPECTS(std::filesystem::is_directory(directory_name), + "Timed out waiting for directory to be created"); + } + + std::ifstream wait_for_file(std::string file_name, int max_tries = 60) + { + while (max_tries > 0) { + if (std::filesystem::is_regular_file(file_name)) break; + sleep(1); + --max_tries; + } + + CUGRAPH_EXPECTS(std::filesystem::is_regular_file(file_name), + "Timed out waiting for file to be created"); + + return std::ifstream(file_name, std::ios::binary); + } + + template + void run_current_test( + std::tuple const& param, + std::vector gpu_list) + { + using edge_type_t = int32_t; + + constexpr bool renumber = true; + constexpr bool do_expensive_check = false; + + auto [multithreaded_usecase, input_usecase] = param; + + raft::handle_t handle{}; + + result_t constexpr alpha{0.85}; + result_t constexpr epsilon{1e-6}; + + size_t device_buffer_size{64 * 1024 * 1024}; + size_t thread_buffer_size{4 * 1024 * 1024}; + + int num_gpus = gpu_list.size(); + int num_threads = num_gpus * 4; + + // + // This is intended to mimic a multi-node host application (non-MPI) integrating + // with MTMG library. This is a simple implementation using a shared file system + // to pass configuration messages. + // + ncclUniqueId instance_manager_id; + int execution_id = g_execution_id++; + + std::ostringstream comms_dir_name; + comms_dir_name << g_comms_dir_name << "_" << execution_id; + + if (g_node_rank == 0) { + ncclGetUniqueId(&instance_manager_id); + + // Create directory for configuration files + std::filesystem::create_directory(comms_dir_name.str()); + std::ofstream instance_manager_file(comms_dir_name.str() + "/instance_manager", + std::ios::binary); + instance_manager_file.write(reinterpret_cast(&instance_manager_id), + sizeof(instance_manager_id)); + instance_manager_file.close(); + } else { + // Wait for node rank 0 to create directory + wait_for_directory(comms_dir_name.str()); + + auto instance_manager_file = wait_for_file(comms_dir_name.str() + "/instance_manager"); + instance_manager_file.read(reinterpret_cast(&instance_manager_id), + sizeof(instance_manager_id)); + instance_manager_file.close(); + } + + // Create a file for this process (rank) to identify how many GPUs + std::ostringstream filename_creator; + + filename_creator << comms_dir_name.str() << "/gpu_count_" << g_node_rank; + { + std::ofstream num_gpus_file(filename_creator.str(), std::ios::binary); + int num_gpus_on_this_node = static_cast(gpu_list.size()); + num_gpus_file.write(reinterpret_cast(&num_gpus_on_this_node), sizeof(int)); + num_gpus_file.close(); + std::cout << "wrote to filename: " << filename_creator.str() << std::endl; + } + + cugraph::mtmg::resource_manager_t resource_manager; + + std::for_each(gpu_list.begin(), gpu_list.end(), [&resource_manager](int gpu_id) { + resource_manager.register_local_gpu(gpu_id, rmm::cuda_device_id{gpu_id}); + }); + + for (int i = 0; i < g_num_nodes; ++i) { + if (i != g_node_rank) { + filename_creator.str(comms_dir_name.str()); + filename_creator << "/gpu_count_" << i; + auto num_gpus_file = wait_for_file(filename_creator.str()); + int num_gpus_this_node{0}; + num_gpus_file.read(reinterpret_cast(&num_gpus_this_node), sizeof(int)); + num_gpus_file.close(); + + for (int j = 0; j < num_gpus_this_node; ++j) { + std::cout << "register remote resource" << std::endl; + // resource_manager.register_remote_gpu(j, XXX); + } + } + } + + auto instance_manager = resource_manager.create_instance_manager( + resource_manager.registered_ranks(), instance_manager_id); + + cugraph::mtmg::edgelist_t edgelist; + cugraph::mtmg::graph_t graph; + cugraph::mtmg::graph_view_t graph_view; + cugraph::mtmg::vertex_result_t pageranks; + std::optional> renumber_map = + std::make_optional>(); + + auto edge_weights = multithreaded_usecase.test_weighted + ? std::make_optional, + weight_t>>() + : std::nullopt; + + // + // Simulate graph creation by spawning threads to walk through the + // local COO and add edges + // + std::vector running_threads; + + // Initialize shared edgelist object, one per GPU + for (int i = 0; i < num_gpus; ++i) { + running_threads.emplace_back([&instance_manager, + &edgelist, + device_buffer_size, + use_weight = true, + use_edge_id = false, + use_edge_type = false]() { + auto thread_handle = instance_manager->get_handle(); + + edgelist.set(thread_handle, device_buffer_size, use_weight, use_edge_id, use_edge_type); + }); + } + + // Wait for CPU threads to complete + std::for_each(running_threads.begin(), running_threads.end(), [](auto& t) { t.join(); }); + running_threads.resize(0); + instance_manager->reset_threads(); + + // Load SG edge list + auto [d_src_v, d_dst_v, d_weights_v, d_vertices_v, is_symmetric] = + input_usecase.template construct_edgelist( + handle, multithreaded_usecase.test_weighted, false, false); + + auto h_src_v = cugraph::test::to_host(handle, d_src_v); + auto h_dst_v = cugraph::test::to_host(handle, d_dst_v); + auto h_weights_v = cugraph::test::to_host(handle, d_weights_v); + auto unique_vertices = cugraph::test::to_host(handle, d_vertices_v); + + // Load edgelist from different threads. We'll use more threads than GPUs here + for (int i = 0; i < num_threads; ++i) { + running_threads.emplace_back([&instance_manager, + thread_buffer_size, + &edgelist, + &h_src_v, + &h_dst_v, + &h_weights_v, + i, + num_threads]() { + auto thread_handle = instance_manager->get_handle(); + cugraph::mtmg::per_thread_edgelist_t + per_thread_edgelist(edgelist.get(thread_handle), thread_buffer_size); + + for (size_t j = i; j < h_src_v.size(); j += num_threads) { +#if 0 + if (h_weights_v) { + thread_edgelist.append( + thread_handle, h_src_v[j], h_dst_v[j], (*h_weights_v)[j], std::nullopt, std::nullopt); + } else { + thread_edgelist.append( + thread_handle, h_src_v[j], h_dst_v[j], std::nullopt, std::nullopt, std::nullopt); + } +#endif + per_thread_edgelist.append( + thread_handle, + h_src_v[j], + h_dst_v[j], + h_weights_v ? std::make_optional((*h_weights_v)[j]) : std::nullopt, + std::nullopt, + std::nullopt); + } + + per_thread_edgelist.flush(thread_handle); + }); + } + + // Wait for CPU threads to complete + std::for_each(running_threads.begin(), running_threads.end(), [](auto& t) { t.join(); }); + running_threads.resize(0); + instance_manager->reset_threads(); + + for (int i = 0; i < num_gpus; ++i) { + running_threads.emplace_back([&instance_manager, + &graph, + &edge_weights, + &edgelist, + &renumber_map, + &pageranks, + is_symmetric = is_symmetric, + renumber, + do_expensive_check]() { + auto thread_handle = instance_manager->get_handle(); + + if (thread_handle.get_thread_rank() > 0) return; + + std::optional, + edge_t>> + edge_ids{std::nullopt}; + std::optional, + int32_t>> + edge_types{std::nullopt}; + + edgelist.finalize_buffer(thread_handle); + edgelist.consolidate_and_shuffle(thread_handle, true); + + cugraph::mtmg:: + create_graph_from_edgelist( + thread_handle, + edgelist, + cugraph::graph_properties_t{is_symmetric, true}, + renumber, + graph, + edge_weights, + edge_ids, + edge_types, + renumber_map, + do_expensive_check); + }); + } + + // Wait for CPU threads to complete + std::for_each(running_threads.begin(), running_threads.end(), [](auto& t) { t.join(); }); + running_threads.resize(0); + instance_manager->reset_threads(); + + graph_view = graph.view(); + + for (int i = 0; i < num_threads; ++i) { + running_threads.emplace_back( + [&instance_manager, &graph_view, &edge_weights, &pageranks, alpha, epsilon]() { + auto thread_handle = instance_manager->get_handle(); + + if (thread_handle.get_thread_rank() > 0) return; + + auto [local_pageranks, metadata] = + cugraph::pagerank( + thread_handle.raft_handle(), + graph_view.get(thread_handle), + edge_weights ? std::make_optional(edge_weights->get(thread_handle).view()) + : std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt, + alpha, + epsilon, + 500, + true); + + pageranks.set(thread_handle, std::move(local_pageranks)); + }); + } + + // Wait for CPU threads to complete + std::for_each(running_threads.begin(), running_threads.end(), [](auto& t) { t.join(); }); + running_threads.resize(0); + instance_manager->reset_threads(); + + std::vector, std::vector>> computed_pageranks_v; + std::mutex computed_pageranks_lock{}; + + auto pageranks_view = pageranks.view(); + auto renumber_map_view = renumber_map ? std::make_optional(renumber_map->view()) : std::nullopt; + + // Load computed_pageranks from different threads. + for (int i = 0; i < num_gpus; ++i) { + running_threads.emplace_back([&instance_manager, + &graph_view, + &renumber_map_view, + &pageranks_view, + &computed_pageranks_lock, + &computed_pageranks_v, + &h_src_v, + &h_dst_v, + &h_weights_v, + &unique_vertices, + i, + num_threads]() { + auto thread_handle = instance_manager->get_handle(); + + auto number_of_vertices = unique_vertices->size(); + + std::vector my_vertex_list; + my_vertex_list.reserve((number_of_vertices + num_threads - 1) / num_threads); + + for (size_t j = i; j < number_of_vertices; j += num_threads) { + my_vertex_list.push_back((*unique_vertices)[j]); + } + + rmm::device_uvector d_my_vertex_list(my_vertex_list.size(), + thread_handle.raft_handle().get_stream()); + raft::update_device(d_my_vertex_list.data(), + my_vertex_list.data(), + my_vertex_list.size(), + thread_handle.raft_handle().get_stream()); + + auto d_my_pageranks = pageranks_view.gather( + thread_handle, + raft::device_span{d_my_vertex_list.data(), d_my_vertex_list.size()}, + graph_view, + renumber_map_view); + + std::vector my_pageranks(d_my_pageranks.size()); + raft::update_host(my_pageranks.data(), + d_my_pageranks.data(), + d_my_pageranks.size(), + thread_handle.raft_handle().get_stream()); + + { + std::lock_guard lock(computed_pageranks_lock); + computed_pageranks_v.push_back( + std::make_tuple(std::move(my_vertex_list), std::move(my_pageranks))); + } + }); + } + + // Wait for CPU threads to complete + std::for_each(running_threads.begin(), running_threads.end(), [](auto& t) { t.join(); }); + running_threads.resize(0); + instance_manager->reset_threads(); + + if (multithreaded_usecase.check_correctness) { + // Want to compare the results in computed_pageranks_v with SG results + cugraph::graph_t sg_graph(handle); + std::optional< + cugraph::edge_property_t, weight_t>> + sg_edge_weights{std::nullopt}; + std::optional> sg_renumber_map{std::nullopt}; + + std::tie(sg_graph, sg_edge_weights, std::ignore, std::ignore, sg_renumber_map) = cugraph:: + create_graph_from_edgelist( + handle, + std::nullopt, + std::move(d_src_v), + std::move(d_dst_v), + std::move(d_weights_v), + std::nullopt, + std::nullopt, + cugraph::graph_properties_t{is_symmetric, true}, + true); + + auto [sg_pageranks, meta] = cugraph::pagerank( + handle, + sg_graph.view(), + sg_edge_weights ? std::make_optional(sg_edge_weights->view()) : std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt, + alpha, + epsilon); + + auto h_sg_pageranks = cugraph::test::to_host(handle, sg_pageranks); + auto h_sg_renumber_map = cugraph::test::to_host(handle, sg_renumber_map); + auto compare_functor = cugraph::test::nearly_equal{ + weight_t{1e-3}, + weight_t{(weight_t{1} / static_cast(h_sg_pageranks.size())) * weight_t{1e-3}}}; + + std::for_each( + computed_pageranks_v.begin(), + computed_pageranks_v.end(), + [h_sg_pageranks, compare_functor, h_sg_renumber_map](auto t1) { + std::for_each( + thrust::make_zip_iterator(std::get<0>(t1).begin(), std::get<1>(t1).begin()), + thrust::make_zip_iterator(std::get<0>(t1).end(), std::get<1>(t1).end()), + [h_sg_pageranks, compare_functor, h_sg_renumber_map](auto t2) { + vertex_t v = thrust::get<0>(t2); + weight_t pr = thrust::get<1>(t2); + + auto pos = std::find(h_sg_renumber_map->begin(), h_sg_renumber_map->end(), v); + auto offset = std::distance(h_sg_renumber_map->begin(), pos); + + ASSERT_TRUE(compare_functor(pr, h_sg_pageranks[offset])) + << "vertex " << v << ", SG result = " << h_sg_pageranks[offset] + << ", mtmg result = " << pr << ", renumber map = " << (*h_sg_renumber_map)[offset]; + }); + }); + } + } +}; + +using Tests_Multithreaded_File = Tests_Multithreaded; +using Tests_Multithreaded_Rmat = Tests_Multithreaded; + +// FIXME: add tests for type combinations +TEST_P(Tests_Multithreaded_File, CheckInt32Int32FloatFloat) +{ + run_current_test( + override_File_Usecase_with_cmd_line_arguments(GetParam()), std::vector{{0, 1}}); +} + +TEST_P(Tests_Multithreaded_Rmat, CheckInt32Int32FloatFloat) +{ + run_current_test( + override_Rmat_Usecase_with_cmd_line_arguments(GetParam()), std::vector{{0, 1}}); +} + +INSTANTIATE_TEST_SUITE_P(file_test, + Tests_Multithreaded_File, + ::testing::Combine( + // enable correctness checks + ::testing::Values(Multithreaded_Usecase{false, true}, + Multithreaded_Usecase{true, true}), + ::testing::Values(cugraph::test::File_Usecase("karate.csv"), + cugraph::test::File_Usecase("dolphins.csv")))); + +INSTANTIATE_TEST_SUITE_P( + rmat_small_test, + Tests_Multithreaded_Rmat, + ::testing::Combine( + // enable correctness checks + ::testing::Values(Multithreaded_Usecase{false, true}, Multithreaded_Usecase{true, true}), + ::testing::Values(cugraph::test::Rmat_Usecase(10, 16, 0.57, 0.19, 0.19, 0, false, false)))); + +INSTANTIATE_TEST_SUITE_P( + file_benchmark_test, /* note that the test filename can be overridden in benchmarking (with + --gtest_filter to select only the file_benchmark_test with a specific + vertex & edge type combination) by command line arguments and do not + include more than one File_Usecase that differ only in filename + (to avoid running same benchmarks more than once) */ + Tests_Multithreaded_File, + ::testing::Combine( + // disable correctness checks + ::testing::Values(Multithreaded_Usecase{false, false}, Multithreaded_Usecase{true, false}), + ::testing::Values(cugraph::test::File_Usecase("test/datasets/karate.mtx")))); + +INSTANTIATE_TEST_SUITE_P( + rmat_benchmark_test, /* note that scale & edge factor can be overridden in benchmarking (with + --gtest_filter to select only the rmat_benchmark_test with a specific + vertex & edge type combination) by command line arguments and do not + include more than one Rmat_Usecase that differ only in scale or edge + factor (to avoid running same benchmarks more than once) */ + Tests_Multithreaded_Rmat, + ::testing::Combine( + // disable correctness checks for large graphs + ::testing::Values(Multithreaded_Usecase{false, false}, Multithreaded_Usecase{true, false}), + ::testing::Values(cugraph::test::Rmat_Usecase(10, 16, 0.57, 0.19, 0.19, 0, false, false)))); + +inline auto local_parse_test_options(int argc, char** argv) +{ + try { + cxxopts::Options options(argv[0], " - cuGraph tests command line options"); + options.allow_unrecognised_options().add_options()( + "rmm_mode", "RMM allocation mode", cxxopts::value()->default_value("pool"))( + "perf", "enalbe performance measurements", cxxopts::value()->default_value("false"))( + "rmat_scale", "override the hardcoded R-mat scale", cxxopts::value())( + "rmat_edge_factor", "override the hardcoded R-mat edge factor", cxxopts::value())( + "node_rank", "rank of this process on multi-node configuration", cxxopts::value())( + "num_nodes", "number of nodes in this multi-node configuration", cxxopts::value())( + "comms_dir_name", + "directory where comms data is stored (shared)", + cxxopts::value())( + "test_file_name", "override the hardcoded test filename", cxxopts::value()); + + return options.parse(argc, argv); + } catch (const cxxopts::OptionException& e) { + CUGRAPH_FAIL("Error parsing command line options"); + } +} + +// +// Need to customize the test configuration to support multi-node comms not using MPI +// +int main(int argc, char** argv) +{ + ::testing::InitGoogleTest(&argc, argv); + auto const cmd_opts = local_parse_test_options(argc, argv); + auto const rmm_mode = cmd_opts["rmm_mode"].as(); + auto resource = cugraph::test::create_memory_resource(rmm_mode); + rmm::mr::set_current_device_resource(resource.get()); + cugraph::test::g_perf = cmd_opts["perf"].as(); + cugraph::test::g_rmat_scale = (cmd_opts.count("rmat_scale") > 0) + ? std::make_optional(cmd_opts["rmat_scale"].as()) + : std::nullopt; + cugraph::test::g_rmat_edge_factor = + (cmd_opts.count("rmat_edge_factor") > 0) + ? std::make_optional(cmd_opts["rmat_edge_factor"].as()) + : std::nullopt; + cugraph::test::g_test_file_name = + (cmd_opts.count("test_file_name") > 0) + ? std::make_optional(cmd_opts["test_file_name"].as()) + : std::nullopt; + + g_comms_dir_name = (cmd_opts.count("comms_dir_name") > 0) + ? cmd_opts["comms_dir_name"].as() + : "COMMS_DIR"; + + CUGRAPH_EXPECTS(cmd_opts.count("node_rank") > 0, "node_rank not specified"); + CUGRAPH_EXPECTS(cmd_opts.count("num_nodes") > 0, "num_nodes not specified"); + g_node_rank = cmd_opts["node_rank"].as(); + g_num_nodes = cmd_opts["num_nodes"].as(); + + return RUN_ALL_TESTS(); +} From 002632f7926dc7064135e06aa470bd8b4bb75c22 Mon Sep 17 00:00:00 2001 From: Charles Hastings Date: Tue, 3 Oct 2023 13:52:25 -0700 Subject: [PATCH 02/15] first try at implementation --- cpp/include/cugraph/mtmg/instance_manager.hpp | 9 +-- cpp/include/cugraph/mtmg/resource_manager.hpp | 67 +++++++++++++------ cpp/tests/mtmg/multi_node_threaded_test.cu | 18 ++--- 3 files changed, 59 insertions(+), 35 deletions(-) diff --git a/cpp/include/cugraph/mtmg/instance_manager.hpp b/cpp/include/cugraph/mtmg/instance_manager.hpp index 8bf62b56f4b..78d41039fd6 100644 --- a/cpp/include/cugraph/mtmg/instance_manager.hpp +++ b/cpp/include/cugraph/mtmg/instance_manager.hpp @@ -37,13 +37,11 @@ class instance_manager_t { */ instance_manager_t(std::vector>&& handles, std::vector>&& nccl_comms, - std::vector&& device_ids, - int local_gpu_count) + std::vector&& device_ids) : thread_counter_{0}, raft_handle_{std::move(handles)}, nccl_comms_{std::move(nccl_comms)}, - device_ids_{std::move(device_ids)}, - local_gpu_count_{local_gpu_count} + device_ids_{std::move(device_ids)} { } @@ -79,7 +77,7 @@ class instance_manager_t { /** * @brief Number of local GPUs in the instance */ - int get_local_gpu_count() { return local_gpu_count_; } + int get_local_gpu_count() { return static_cast(raft_handle_.size()); } private: // FIXME: Should this be an std::map<> where the key is the rank? @@ -89,7 +87,6 @@ class instance_manager_t { std::vector> raft_handle_{}; std::vector> nccl_comms_{}; std::vector device_ids_{}; - int local_gpu_count_{}; std::atomic thread_counter_{0}; }; diff --git a/cpp/include/cugraph/mtmg/resource_manager.hpp b/cpp/include/cugraph/mtmg/resource_manager.hpp index b4633626e7c..37cc4f87711 100644 --- a/cpp/include/cugraph/mtmg/resource_manager.hpp +++ b/cpp/include/cugraph/mtmg/resource_manager.hpp @@ -108,6 +108,22 @@ class resource_manager_t { rmm::mr::set_per_device_resource(device_id, per_device_it.first->second.get()); } + /** + * @brief add a remote GPU to the resource manager. + * + * @param rank The rank to assign to the local GPU + * @param remode_node_rank The rank assigned to the remote node + */ + void register_remote_gpu(int rank, int remote_node_rank) + { + std::lock_guard lock(lock_); + + CUGRAPH_EXPECTS(remote_rank_map_.find(rank) == remote_rank_map_.end(), + "cannot register same rank multiple times"); + + remote_rank_map_.insert(std::pair(rank, remote_node_rank)); + } + /** * @brief Create an instance using a subset of the registered resources * @@ -127,29 +143,36 @@ class resource_manager_t { std::unique_ptr create_instance_manager( std::vector ranks_to_include, ncclUniqueId instance_manager_id) const { - std::for_each( - ranks_to_include.begin(), ranks_to_include.end(), [local_ranks = local_rank_map_](int rank) { - CUGRAPH_EXPECTS(local_ranks.find(rank) != local_ranks.end(), - "requesting inclusion of an invalid rank"); - }); + std::vector local_ranks_to_include; + + std::for_each(ranks_to_include.begin(), + ranks_to_include.end(), + [&local_ranks = local_rank_map_, + &remote_ranks = remote_rank_map_, + &local_ranks_to_include](int rank) { + if (local_ranks.find(rank) == local_ranks.end()) { + CUGRAPH_EXPECTS(remote_ranks.find(rank) != remote_ranks.end(), + "requesting inclusion of an invalid rank"); + } else { + local_ranks_to_include.push_back(rank); + } + }); std::vector> nccl_comms{}; std::vector> handles{}; std::vector device_ids{}; - nccl_comms.reserve(ranks_to_include.size()); - handles.reserve(ranks_to_include.size()); - device_ids.reserve(ranks_to_include.size()); + nccl_comms.reserve(local_ranks_to_include.size()); + handles.reserve(local_ranks_to_include.size()); + device_ids.reserve(local_ranks_to_include.size()); - // FIXME: not quite right for multi-node auto gpu_row_comm_size = static_cast(sqrt(static_cast(ranks_to_include.size()))); while (ranks_to_include.size() % gpu_row_comm_size != 0) { --gpu_row_comm_size; } - // FIXME: not quite right for multi-node - for (size_t i = 0; i < ranks_to_include.size(); ++i) { - int rank = ranks_to_include[i]; + for (size_t i = 0; i < local_ranks_to_include.size(); ++i) { + int rank = local_ranks_to_include[i]; auto pos = local_rank_map_.find(rank); RAFT_CUDA_TRY(cudaSetDevice(pos->second.value())); @@ -163,18 +186,17 @@ class resource_manager_t { std::vector running_threads; - for (size_t i = 0; i < ranks_to_include.size(); ++i) { + for (size_t i = 0; i < local_ranks_to_include.size(); ++i) { running_threads.emplace_back([instance_manager_id, idx = i, gpu_row_comm_size, comm_size = ranks_to_include.size(), - &ranks_to_include, - &local_rank_map = local_rank_map_, + &local_ranks_to_include, + &device_ids, &nccl_comms, &handles]() { - int rank = ranks_to_include[idx]; - auto pos = local_rank_map.find(rank); - RAFT_CUDA_TRY(cudaSetDevice(pos->second.value())); + int rank = local_ranks_to_include[idx]; + RAFT_CUDA_TRY(cudaSetDevice(device_ids[idx].value())); NCCL_TRY(ncclCommInitRank(nccl_comms[idx].get(), comm_size, instance_manager_id, rank)); @@ -188,7 +210,7 @@ class resource_manager_t { // FIXME: Update for multi-node return std::make_unique( - std::move(handles), std::move(nccl_comms), std::move(device_ids), ranks_to_include.size()); + std::move(handles), std::move(nccl_comms), std::move(device_ids)); } /** @@ -206,11 +228,15 @@ class resource_manager_t { // std::views::keys(local_rank_map_).end() }; // Would need a bit more complicated to handle remote_rank_map_ also // - std::vector registered_ranks(local_rank_map_.size()); + std::vector registered_ranks(local_rank_map_.size() + remote_rank_map_.size()); std::transform( local_rank_map_.begin(), local_rank_map_.end(), registered_ranks.begin(), [](auto pair) { return pair.first; }); + std::transform(remote_rank_map_.begin(), + remote_rank_map_.end(), + registered_ranks.begin() + local_rank_map_.size(), + [](auto pair) { return pair.first; }); return registered_ranks; } @@ -218,6 +244,7 @@ class resource_manager_t { private: mutable std::mutex lock_{}; std::map local_rank_map_{}; + std::map remote_rank_map_{}; std::map> per_device_rmm_resources_{}; }; diff --git a/cpp/tests/mtmg/multi_node_threaded_test.cu b/cpp/tests/mtmg/multi_node_threaded_test.cu index c4d83764ee1..15e2c0bcfc3 100644 --- a/cpp/tests/mtmg/multi_node_threaded_test.cu +++ b/cpp/tests/mtmg/multi_node_threaded_test.cu @@ -170,28 +170,28 @@ class Tests_Multithreaded int num_gpus_on_this_node = static_cast(gpu_list.size()); num_gpus_file.write(reinterpret_cast(&num_gpus_on_this_node), sizeof(int)); num_gpus_file.close(); - std::cout << "wrote to filename: " << filename_creator.str() << std::endl; } cugraph::mtmg::resource_manager_t resource_manager; - - std::for_each(gpu_list.begin(), gpu_list.end(), [&resource_manager](int gpu_id) { - resource_manager.register_local_gpu(gpu_id, rmm::cuda_device_id{gpu_id}); - }); + int node_rank{0}; for (int i = 0; i < g_num_nodes; ++i) { if (i != g_node_rank) { - filename_creator.str(comms_dir_name.str()); - filename_creator << "/gpu_count_" << i; + filename_creator.str(""); + filename_creator << comms_dir_name.str() << "/gpu_count_" << i; auto num_gpus_file = wait_for_file(filename_creator.str()); int num_gpus_this_node{0}; num_gpus_file.read(reinterpret_cast(&num_gpus_this_node), sizeof(int)); num_gpus_file.close(); for (int j = 0; j < num_gpus_this_node; ++j) { - std::cout << "register remote resource" << std::endl; - // resource_manager.register_remote_gpu(j, XXX); + resource_manager.register_remote_gpu(node_rank++, i); } + } else { + std::for_each( + gpu_list.begin(), gpu_list.end(), [&resource_manager, &node_rank](int gpu_id) { + resource_manager.register_local_gpu(node_rank++, rmm::cuda_device_id{gpu_id}); + }); } } From 99c19b245d9b543426ceb307ee1a787c15d26154 Mon Sep 17 00:00:00 2001 From: Charles Hastings Date: Wed, 4 Oct 2023 10:29:34 -0700 Subject: [PATCH 03/15] debugging --- cpp/include/cugraph/mtmg/resource_manager.hpp | 26 +++++++-- cpp/tests/mtmg/multi_node_threaded_test.cu | 53 ++++++++++++------- 2 files changed, 55 insertions(+), 24 deletions(-) diff --git a/cpp/include/cugraph/mtmg/resource_manager.hpp b/cpp/include/cugraph/mtmg/resource_manager.hpp index 37cc4f87711..be8749aeeaf 100644 --- a/cpp/include/cugraph/mtmg/resource_manager.hpp +++ b/cpp/include/cugraph/mtmg/resource_manager.hpp @@ -184,6 +184,22 @@ class resource_manager_t { device_ids.push_back(pos->second); } + std::cout << "calling ncclCommInitRank, comm_size = " << ranks_to_include.size() << std::endl; + + int current_device{}; + RAFT_CUDA_TRY(cudaGetDevice(¤t_device)); + ncclGroupStart(); + for (size_t i = 0; i < local_ranks_to_include.size(); ++i) { + int rank = local_ranks_to_include[i]; + RAFT_CUDA_TRY(cudaSetDevice(device_ids[i].value())); + NCCL_TRY( + ncclCommInitRank(nccl_comms[i].get(), ranks_to_include.size(), instance_manager_id, rank)); + raft::comms::build_comms_nccl_only( + handles[i].get(), *nccl_comms[i], ranks_to_include.size(), rank); + } + ncclGroupEnd(); + RAFT_CUDA_TRY(cudaSetDevice(current_device)); + std::vector running_threads; for (size_t i = 0; i < local_ranks_to_include.size(); ++i) { @@ -198,17 +214,16 @@ class resource_manager_t { int rank = local_ranks_to_include[idx]; RAFT_CUDA_TRY(cudaSetDevice(device_ids[idx].value())); - NCCL_TRY(ncclCommInitRank(nccl_comms[idx].get(), comm_size, instance_manager_id, rank)); - - raft::comms::build_comms_nccl_only(handles[idx].get(), *nccl_comms[idx], comm_size, rank); - + std::cout << "calling init_subcomm, rank = " << rank << ", comm_size = " << comm_size + << ", gpu_row_comm_size = " << gpu_row_comm_size << std::endl; + // This one requires paralellism, I believe cugraph::partition_manager::init_subcomm(*handles[idx], gpu_row_comm_size); + std::cout << "finished initialization thread, rank = " << rank << std::endl; }); } std::for_each(running_threads.begin(), running_threads.end(), [](auto& t) { t.join(); }); - // FIXME: Update for multi-node return std::make_unique( std::move(handles), std::move(nccl_comms), std::move(device_ids)); } @@ -238,6 +253,7 @@ class resource_manager_t { registered_ranks.begin() + local_rank_map_.size(), [](auto pair) { return pair.first; }); + std::sort(registered_ranks.begin(), registered_ranks.end()); return registered_ranks; } diff --git a/cpp/tests/mtmg/multi_node_threaded_test.cu b/cpp/tests/mtmg/multi_node_threaded_test.cu index 15e2c0bcfc3..ea80167527d 100644 --- a/cpp/tests/mtmg/multi_node_threaded_test.cu +++ b/cpp/tests/mtmg/multi_node_threaded_test.cu @@ -195,9 +195,17 @@ class Tests_Multithreaded } } + std::cout << "creating instance manager" << std::endl; + raft::print_host_vector(" registered ranks", + resource_manager.registered_ranks().data(), + resource_manager.registered_ranks().size(), + std::cout); + auto instance_manager = resource_manager.create_instance_manager( resource_manager.registered_ranks(), instance_manager_id); + std::cout << "done creating instance manager" << std::endl; + cugraph::mtmg::edgelist_t edgelist; cugraph::mtmg::graph_t graph; cugraph::mtmg::graph_view_t graph_view; @@ -465,25 +473,32 @@ class Tests_Multithreaded weight_t{1e-3}, weight_t{(weight_t{1} / static_cast(h_sg_pageranks.size())) * weight_t{1e-3}}}; - std::for_each( - computed_pageranks_v.begin(), - computed_pageranks_v.end(), - [h_sg_pageranks, compare_functor, h_sg_renumber_map](auto t1) { - std::for_each( - thrust::make_zip_iterator(std::get<0>(t1).begin(), std::get<1>(t1).begin()), - thrust::make_zip_iterator(std::get<0>(t1).end(), std::get<1>(t1).end()), - [h_sg_pageranks, compare_functor, h_sg_renumber_map](auto t2) { - vertex_t v = thrust::get<0>(t2); - weight_t pr = thrust::get<1>(t2); - - auto pos = std::find(h_sg_renumber_map->begin(), h_sg_renumber_map->end(), v); - auto offset = std::distance(h_sg_renumber_map->begin(), pos); - - ASSERT_TRUE(compare_functor(pr, h_sg_pageranks[offset])) - << "vertex " << v << ", SG result = " << h_sg_pageranks[offset] - << ", mtmg result = " << pr << ", renumber map = " << (*h_sg_renumber_map)[offset]; - }); - }); + std::for_each(computed_pageranks_v.begin(), + computed_pageranks_v.end(), + [h_sg_pageranks, compare_functor, h_sg_renumber_map](auto t1) { + std::for_each( + thrust::make_zip_iterator(std::get<0>(t1).begin(), std::get<1>(t1).begin()), + thrust::make_zip_iterator(std::get<0>(t1).end(), std::get<1>(t1).end()), + [h_sg_pageranks, compare_functor, h_sg_renumber_map](auto t2) { + vertex_t v = thrust::get<0>(t2); + weight_t pr = thrust::get<1>(t2); + + auto pos = + std::find(h_sg_renumber_map->begin(), h_sg_renumber_map->end(), v); + auto offset = std::distance(h_sg_renumber_map->begin(), pos); + + if (pos == h_sg_renumber_map->end()) { + ASSERT_TRUE(compare_functor(pr, weight_t{0})) + << "vertex " << v << ", SG result = " << h_sg_pageranks[offset] + << ", mtmg result = " << pr << ", not in renumber map"; + } else { + ASSERT_TRUE(compare_functor(pr, h_sg_pageranks[offset])) + << "vertex " << v << ", SG result = " << h_sg_pageranks[offset] + << ", mtmg result = " << pr + << ", renumber map = " << (*h_sg_renumber_map)[offset]; + } + }); + }); } } }; From fe6ed6157c0341b26330eba28ce694332ee13eb4 Mon Sep 17 00:00:00 2001 From: Charles Hastings Date: Wed, 4 Oct 2023 14:58:38 -0700 Subject: [PATCH 04/15] debuggin --- .../cugraph/mtmg/detail/per_device_edgelist.hpp | 2 ++ cpp/include/cugraph/mtmg/resource_manager.hpp | 15 +++++---------- cpp/tests/mtmg/multi_node_threaded_test.cu | 16 ++++++++++++++++ 3 files changed, 23 insertions(+), 10 deletions(-) diff --git a/cpp/include/cugraph/mtmg/detail/per_device_edgelist.hpp b/cpp/include/cugraph/mtmg/detail/per_device_edgelist.hpp index 8011146ee4f..ba4afb2228e 100644 --- a/cpp/include/cugraph/mtmg/detail/per_device_edgelist.hpp +++ b/cpp/include/cugraph/mtmg/detail/per_device_edgelist.hpp @@ -205,6 +205,8 @@ class per_device_edgelist_t { auto tmp_edge_type = edge_type_ ? std::make_optional(std::move((*edge_type_)[0])) : std::nullopt; + std::cout << "calling detail::shuffle... rank = " << handle.get_rank() + << ", size = " << src_[0].size() << std::endl; std::tie(store_transposed ? dst_[0] : src_[0], store_transposed ? src_[0] : dst_[0], tmp_wgt, diff --git a/cpp/include/cugraph/mtmg/resource_manager.hpp b/cpp/include/cugraph/mtmg/resource_manager.hpp index be8749aeeaf..ff9c8e485ae 100644 --- a/cpp/include/cugraph/mtmg/resource_manager.hpp +++ b/cpp/include/cugraph/mtmg/resource_manager.hpp @@ -171,6 +171,10 @@ class resource_manager_t { --gpu_row_comm_size; } + int current_device{}; + RAFT_CUDA_TRY(cudaGetDevice(¤t_device)); + NCCL_TRY(ncclGroupStart()); + for (size_t i = 0; i < local_ranks_to_include.size(); ++i) { int rank = local_ranks_to_include[i]; auto pos = local_rank_map_.find(rank); @@ -182,22 +186,13 @@ class resource_manager_t { handles.push_back( std::make_unique(tmp_handle, per_device_rmm_resources_.find(rank)->second)); device_ids.push_back(pos->second); - } - - std::cout << "calling ncclCommInitRank, comm_size = " << ranks_to_include.size() << std::endl; - int current_device{}; - RAFT_CUDA_TRY(cudaGetDevice(¤t_device)); - ncclGroupStart(); - for (size_t i = 0; i < local_ranks_to_include.size(); ++i) { - int rank = local_ranks_to_include[i]; - RAFT_CUDA_TRY(cudaSetDevice(device_ids[i].value())); NCCL_TRY( ncclCommInitRank(nccl_comms[i].get(), ranks_to_include.size(), instance_manager_id, rank)); raft::comms::build_comms_nccl_only( handles[i].get(), *nccl_comms[i], ranks_to_include.size(), rank); } - ncclGroupEnd(); + NCCL_TRY(ncclGroupEnd()); RAFT_CUDA_TRY(cudaSetDevice(current_device)); std::vector running_threads; diff --git a/cpp/tests/mtmg/multi_node_threaded_test.cu b/cpp/tests/mtmg/multi_node_threaded_test.cu index ea80167527d..b4e63ea8df1 100644 --- a/cpp/tests/mtmg/multi_node_threaded_test.cu +++ b/cpp/tests/mtmg/multi_node_threaded_test.cu @@ -219,6 +219,8 @@ class Tests_Multithreaded weight_t>>() : std::nullopt; + std::cout << "create edge list" << std::endl; + // // Simulate graph creation by spawning threads to walk through the // local COO and add edges @@ -244,6 +246,8 @@ class Tests_Multithreaded running_threads.resize(0); instance_manager->reset_threads(); + std::cout << "load edges" << std::endl; + // Load SG edge list auto [d_src_v, d_dst_v, d_weights_v, d_vertices_v, is_symmetric] = input_usecase.template construct_edgelist( @@ -296,6 +300,8 @@ class Tests_Multithreaded running_threads.resize(0); instance_manager->reset_threads(); + std::cout << "create graph" << std::endl; + for (int i = 0; i < num_gpus; ++i) { running_threads.emplace_back([&instance_manager, &graph, @@ -319,9 +325,17 @@ class Tests_Multithreaded int32_t>> edge_types{std::nullopt}; + std::cout << "calling finalize_buffer, rank = " << thread_handle.get_rank() << std::endl; + edgelist.finalize_buffer(thread_handle); + + std::cout << "calling consolidate_and_shuffle, rank = " << thread_handle.get_rank() + << std::endl; edgelist.consolidate_and_shuffle(thread_handle, true); + std::cout << "calling create_graph_from_edgelist, rank = " << thread_handle.get_rank() + << std::endl; + cugraph::mtmg:: create_graph_from_edgelist( thread_handle, @@ -342,6 +356,8 @@ class Tests_Multithreaded running_threads.resize(0); instance_manager->reset_threads(); + std::cout << "call pagerank" << std::endl; + graph_view = graph.view(); for (int i = 0; i < num_threads; ++i) { From 72092cc4055958f73d1a22f3a8e4da7b315f1424 Mon Sep 17 00:00:00 2001 From: Charles Hastings Date: Thu, 5 Oct 2023 11:57:35 -0700 Subject: [PATCH 05/15] debuggin --- cpp/tests/mtmg/multi_node_threaded_test.cu | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cpp/tests/mtmg/multi_node_threaded_test.cu b/cpp/tests/mtmg/multi_node_threaded_test.cu index b4e63ea8df1..081e24522a5 100644 --- a/cpp/tests/mtmg/multi_node_threaded_test.cu +++ b/cpp/tests/mtmg/multi_node_threaded_test.cu @@ -309,6 +309,7 @@ class Tests_Multithreaded &edgelist, &renumber_map, &pageranks, + &h_src_v, // debugging is_symmetric = is_symmetric, renumber, do_expensive_check]() { @@ -330,7 +331,7 @@ class Tests_Multithreaded edgelist.finalize_buffer(thread_handle); std::cout << "calling consolidate_and_shuffle, rank = " << thread_handle.get_rank() - << std::endl; + << ", total edges = " << h_src_v.size() << std::endl; edgelist.consolidate_and_shuffle(thread_handle, true); std::cout << "calling create_graph_from_edgelist, rank = " << thread_handle.get_rank() From 256157f8aba33fb64b3cad62a8e6289c02b4ed9c Mon Sep 17 00:00:00 2001 From: Charles Hastings Date: Thu, 5 Oct 2023 13:10:39 -0700 Subject: [PATCH 06/15] debugging --- cpp/tests/mtmg/multi_node_threaded_test.cu | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/cpp/tests/mtmg/multi_node_threaded_test.cu b/cpp/tests/mtmg/multi_node_threaded_test.cu index 081e24522a5..4fc553dffca 100644 --- a/cpp/tests/mtmg/multi_node_threaded_test.cu +++ b/cpp/tests/mtmg/multi_node_threaded_test.cu @@ -127,8 +127,8 @@ class Tests_Multithreaded size_t device_buffer_size{64 * 1024 * 1024}; size_t thread_buffer_size{4 * 1024 * 1024}; - int num_gpus = gpu_list.size(); - int num_threads = num_gpus * 4; + int num_local_gpus = gpu_list.size(); + int num_threads = num_local_gpus * 4; // // This is intended to mimic a multi-node host application (non-MPI) integrating @@ -228,7 +228,7 @@ class Tests_Multithreaded std::vector running_threads; // Initialize shared edgelist object, one per GPU - for (int i = 0; i < num_gpus; ++i) { + for (int i = 0; i < num_local_gpus; ++i) { running_threads.emplace_back([&instance_manager, &edgelist, device_buffer_size, @@ -266,13 +266,13 @@ class Tests_Multithreaded &h_src_v, &h_dst_v, &h_weights_v, - i, - num_threads]() { + starting_edge_offset = g_node_rank * num_threads + i, + stride = num_threads]() { auto thread_handle = instance_manager->get_handle(); cugraph::mtmg::per_thread_edgelist_t per_thread_edgelist(edgelist.get(thread_handle), thread_buffer_size); - for (size_t j = i; j < h_src_v.size(); j += num_threads) { + for (size_t j = starting_edge_offset; j < h_src_v.size(); j += stride) { #if 0 if (h_weights_v) { thread_edgelist.append( @@ -302,7 +302,7 @@ class Tests_Multithreaded std::cout << "create graph" << std::endl; - for (int i = 0; i < num_gpus; ++i) { + for (int i = 0; i < num_local_gpus; ++i) { running_threads.emplace_back([&instance_manager, &graph, &edge_weights, @@ -398,7 +398,7 @@ class Tests_Multithreaded auto renumber_map_view = renumber_map ? std::make_optional(renumber_map->view()) : std::nullopt; // Load computed_pageranks from different threads. - for (int i = 0; i < num_gpus; ++i) { + for (int i = 0; i < num_local_gpus; ++i) { running_threads.emplace_back([&instance_manager, &graph_view, &renumber_map_view, From de2922c8fdf02abd286ef718cd7ee473719bd06b Mon Sep 17 00:00:00 2001 From: Charles Hastings Date: Thu, 5 Oct 2023 13:20:00 -0700 Subject: [PATCH 07/15] debugging --- cpp/tests/mtmg/multi_node_threaded_test.cu | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/tests/mtmg/multi_node_threaded_test.cu b/cpp/tests/mtmg/multi_node_threaded_test.cu index 4fc553dffca..5f9a184eec6 100644 --- a/cpp/tests/mtmg/multi_node_threaded_test.cu +++ b/cpp/tests/mtmg/multi_node_threaded_test.cu @@ -267,7 +267,7 @@ class Tests_Multithreaded &h_dst_v, &h_weights_v, starting_edge_offset = g_node_rank * num_threads + i, - stride = num_threads]() { + stride = g_num_nodes * num_threads]() { auto thread_handle = instance_manager->get_handle(); cugraph::mtmg::per_thread_edgelist_t per_thread_edgelist(edgelist.get(thread_handle), thread_buffer_size); From a80fe0eae2c511e4d64cd676e943fc215be4d24d Mon Sep 17 00:00:00 2001 From: Charles Hastings Date: Mon, 9 Oct 2023 09:46:56 -0700 Subject: [PATCH 08/15] debugging, split shuffle_vertex_pairs to reduce compile time --- cpp/CMakeLists.txt | 4 +- cpp/include/cugraph/mtmg/handle.hpp | 8 - cpp/src/detail/shuffle_vertex_pairs.cuh | 382 ++++++++++++++++++ .../shuffle_vertex_pairs_int32_int32.cu | 75 ++++ .../shuffle_vertex_pairs_int32_int64.cu | 76 ++++ .../shuffle_vertex_pairs_int64_int64.cu | 76 ++++ 6 files changed, 612 insertions(+), 9 deletions(-) create mode 100644 cpp/src/detail/shuffle_vertex_pairs.cuh create mode 100644 cpp/src/detail/shuffle_vertex_pairs_int32_int32.cu create mode 100644 cpp/src/detail/shuffle_vertex_pairs_int32_int64.cu create mode 100644 cpp/src/detail/shuffle_vertex_pairs_int64_int64.cu diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 41870cbc92b..25e50ed0188 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -181,7 +181,9 @@ endif() set(CUGRAPH_SOURCES src/detail/shuffle_vertices.cu - src/detail/shuffle_vertex_pairs.cu + src/detail/shuffle_vertex_pairs_int32_int32.cu + src/detail/shuffle_vertex_pairs_int32_int64.cu + src/detail/shuffle_vertex_pairs_int64_int64.cu src/detail/collect_local_vertex_values.cu src/detail/groupby_and_count.cu src/sampling/random_walks_mg.cu diff --git a/cpp/include/cugraph/mtmg/handle.hpp b/cpp/include/cugraph/mtmg/handle.hpp index f23bce5aeac..79a0a703912 100644 --- a/cpp/include/cugraph/mtmg/handle.hpp +++ b/cpp/include/cugraph/mtmg/handle.hpp @@ -78,14 +78,6 @@ class handle_t { */ int get_size() const { return raft_handle_.get_comms().get_size(); } - /** - * @brief Get number of local gpus - * - * @return number of local gpus - */ - // FIXME: wrong for multi-node - int get_local_size() const { return raft_handle_.get_comms().get_size(); } - /** * @brief Get gpu rank * diff --git a/cpp/src/detail/shuffle_vertex_pairs.cuh b/cpp/src/detail/shuffle_vertex_pairs.cuh new file mode 100644 index 00000000000..8bfe86a905e --- /dev/null +++ b/cpp/src/detail/shuffle_vertex_pairs.cuh @@ -0,0 +1,382 @@ +/* + * Copyright (c) 2021-2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +#include +#include +#include +#include +#include + +#include +#include + +#include + +namespace cugraph { + +namespace { + +template +std::tuple, + rmm::device_uvector, + std::optional>, + std::optional>, + std::optional>> +shuffle_vertex_pairs_with_values_by_gpu_id_impl( + raft::handle_t const& handle, + rmm::device_uvector&& majors, + rmm::device_uvector&& minors, + std::optional>&& weights, + std::optional>&& edge_ids, + std::optional>&& edge_types, + func_t func) +{ + auto& comm = handle.get_comms(); + auto const comm_size = comm.get_size(); + + auto total_global_mem = handle.get_device_properties().totalGlobalMem; + auto element_size = sizeof(vertex_t) * 2 + (weights ? sizeof(weight_t) : size_t{0}) + + (edge_ids ? sizeof(edge_t) : size_t{0}) + + (edge_types ? sizeof(edge_type_t) : size_t{0}); + auto constexpr mem_frugal_ratio = + 0.1; // if the expected temporary buffer size exceeds the mem_frugal_ratio of the + // total_global_mem, switch to the memory frugal approach (thrust::sort is used to + // group-by by default, and thrust::sort requires temporary buffer comparable to the input + // data size) + auto mem_frugal_threshold = + static_cast(static_cast(total_global_mem / element_size) * mem_frugal_ratio); + +#if 0 + auto mem_frugal_flag = + host_scalar_allreduce(comm, + majors.size() > mem_frugal_threshold ? int{1} : int{0}, + raft::comms::op_t::MAX, + handle.get_stream()); +#else + bool mem_frugal_flag{true}; +#endif + + // invoke groupby_and_count and shuffle values to pass mem_frugal_threshold instead of directly + // calling groupby_gpu_id_and_shuffle_values there is no benefit in reducing peak memory as we + // need to allocate a receive buffer anyways) but this reduces the maximum memory allocation size + // by half or more (thrust::sort used inside the groupby_and_count allocates the entire temporary + // buffer in a single chunk, and the pool allocator often cannot handle a large single allocation + // (due to fragmentation) even when the remaining free memory in aggregate is significantly larger + // than the requested size). + + // FIXME: Consider a generic function that takes a value tuple of optionals + // to eliminate this complexity + rmm::device_uvector d_tx_value_counts(0, handle.get_stream()); + + if (weights) { + if (edge_ids) { + if (edge_types) { + d_tx_value_counts = cugraph::groupby_and_count( + thrust::make_zip_iterator(majors.begin(), + minors.begin(), + weights->begin(), + edge_ids->begin(), + edge_types->begin()), + thrust::make_zip_iterator( + majors.end(), minors.end(), weights->end(), edge_ids->end(), edge_types->end()), + [func] __device__(auto val) { return func(thrust::get<0>(val), thrust::get<1>(val)); }, + comm_size, + mem_frugal_threshold, + handle.get_stream()); + } else { + d_tx_value_counts = cugraph::groupby_and_count( + thrust::make_zip_iterator( + majors.begin(), minors.begin(), weights->begin(), edge_ids->begin()), + thrust::make_zip_iterator(majors.end(), minors.end(), weights->end(), edge_ids->end()), + [func] __device__(auto val) { return func(thrust::get<0>(val), thrust::get<1>(val)); }, + comm_size, + mem_frugal_threshold, + handle.get_stream()); + } + } else { + if (edge_types) { + d_tx_value_counts = cugraph::groupby_and_count( + thrust::make_zip_iterator( + majors.begin(), minors.begin(), weights->begin(), edge_types->begin()), + thrust::make_zip_iterator(majors.end(), minors.end(), weights->end(), edge_types->end()), + [func] __device__(auto val) { return func(thrust::get<0>(val), thrust::get<1>(val)); }, + comm_size, + mem_frugal_threshold, + handle.get_stream()); + } else { + std::cout << " just weights, calling group_by_and_count, rank = " << comm.get_rank() + << std::endl; + d_tx_value_counts = cugraph::groupby_and_count( + thrust::make_zip_iterator(majors.begin(), minors.begin(), weights->begin()), + thrust::make_zip_iterator(majors.end(), minors.end(), weights->end()), + [func] __device__(auto val) { return func(thrust::get<0>(val), thrust::get<1>(val)); }, + comm_size, + mem_frugal_threshold, + handle.get_stream()); + std::cout << " just weights finished, rank = " << comm.get_rank() << std::endl; + } + } + } else { + if (edge_ids) { + if (edge_types) { + d_tx_value_counts = cugraph::groupby_and_count( + thrust::make_zip_iterator( + majors.begin(), minors.begin(), edge_ids->begin(), edge_types->begin()), + thrust::make_zip_iterator(majors.end(), minors.end(), edge_ids->end(), edge_types->end()), + [func] __device__(auto val) { return func(thrust::get<0>(val), thrust::get<1>(val)); }, + comm_size, + mem_frugal_threshold, + handle.get_stream()); + } else { + d_tx_value_counts = cugraph::groupby_and_count( + thrust::make_zip_iterator(majors.begin(), minors.begin(), edge_ids->begin()), + thrust::make_zip_iterator(majors.end(), minors.end(), edge_ids->end()), + [func] __device__(auto val) { return func(thrust::get<0>(val), thrust::get<1>(val)); }, + comm_size, + mem_frugal_threshold, + handle.get_stream()); + } + } else { + if (edge_types) { + d_tx_value_counts = cugraph::groupby_and_count( + thrust::make_zip_iterator(majors.begin(), minors.begin(), edge_types->begin()), + thrust::make_zip_iterator(majors.end(), minors.end(), edge_types->end()), + [func] __device__(auto val) { return func(thrust::get<0>(val), thrust::get<1>(val)); }, + comm_size, + mem_frugal_threshold, + handle.get_stream()); + } else { + std::cout << " no weights, calling group_by_and_count, rank = " << comm.get_rank() + << std::endl; + d_tx_value_counts = cugraph::groupby_and_count( + thrust::make_zip_iterator(majors.begin(), minors.begin()), + thrust::make_zip_iterator(majors.end(), minors.end()), + [func] __device__(auto val) { return func(thrust::get<0>(val), thrust::get<1>(val)); }, + comm_size, + mem_frugal_threshold, + handle.get_stream()); + } + } + } + + std::vector h_tx_value_counts(d_tx_value_counts.size()); + raft::update_host(h_tx_value_counts.data(), + d_tx_value_counts.data(), + d_tx_value_counts.size(), + handle.get_stream()); + handle.sync_stream(); + + std::cout << " mem_frugal_flag check, rank = " << comm.get_rank() << std::endl; + + if (mem_frugal_flag) { // trade-off potential parallelism to lower peak memory + std::cout << " mem_frugal_flag true, shuffle values, rank = " << comm.get_rank() << std::endl; + + std::tie(majors, std::ignore) = + shuffle_values(comm, majors.begin(), h_tx_value_counts, handle.get_stream()); + + std::tie(minors, std::ignore) = + shuffle_values(comm, minors.begin(), h_tx_value_counts, handle.get_stream()); + + if (weights) { + std::tie(weights, std::ignore) = + shuffle_values(comm, (*weights).begin(), h_tx_value_counts, handle.get_stream()); + } + + if (edge_ids) { + std::tie(edge_ids, std::ignore) = + shuffle_values(comm, (*edge_ids).begin(), h_tx_value_counts, handle.get_stream()); + } + + if (edge_types) { + std::tie(edge_types, std::ignore) = + shuffle_values(comm, (*edge_types).begin(), h_tx_value_counts, handle.get_stream()); + } + } else { + if (weights) { + if (edge_ids) { + if (edge_types) { + std::forward_as_tuple(std::tie(majors, minors, weights, edge_ids, edge_types), + std::ignore) = + shuffle_values(comm, + thrust::make_zip_iterator(majors.begin(), + minors.begin(), + weights->begin(), + edge_ids->begin(), + edge_types->begin()), + h_tx_value_counts, + handle.get_stream()); + } else { + std::forward_as_tuple(std::tie(majors, minors, weights, edge_ids), std::ignore) = + shuffle_values(comm, + thrust::make_zip_iterator( + majors.begin(), minors.begin(), weights->begin(), edge_ids->begin()), + h_tx_value_counts, + handle.get_stream()); + } + } else { + if (edge_types) { + std::forward_as_tuple(std::tie(majors, minors, weights, edge_types), std::ignore) = + shuffle_values(comm, + thrust::make_zip_iterator( + majors.begin(), minors.begin(), weights->begin(), edge_types->begin()), + h_tx_value_counts, + handle.get_stream()); + } else { + std::cout << " mem_frugal_flag false, shuffle values, rank = " << comm.get_rank() + << std::endl; + std::forward_as_tuple(std::tie(majors, minors, weights), std::ignore) = shuffle_values( + comm, + thrust::make_zip_iterator(majors.begin(), minors.begin(), weights->begin()), + h_tx_value_counts, + handle.get_stream()); + } + } + } else { + if (edge_ids) { + if (edge_types) { + std::forward_as_tuple(std::tie(majors, minors, edge_ids, edge_types), std::ignore) = + shuffle_values( + comm, + thrust::make_zip_iterator( + majors.begin(), minors.begin(), edge_ids->begin(), edge_types->begin()), + h_tx_value_counts, + handle.get_stream()); + } else { + std::forward_as_tuple(std::tie(majors, minors, edge_ids), std::ignore) = shuffle_values( + comm, + thrust::make_zip_iterator(majors.begin(), minors.begin(), edge_ids->begin()), + h_tx_value_counts, + handle.get_stream()); + } + } else { + if (edge_types) { + std::forward_as_tuple(std::tie(majors, minors, edge_types), std::ignore) = shuffle_values( + comm, + thrust::make_zip_iterator(majors.begin(), minors.begin(), edge_types->begin()), + h_tx_value_counts, + handle.get_stream()); + } else { + std::cout << " mem_frugal_flag false, shuffle values no weights, rank = " + << comm.get_rank() << std::endl; + std::forward_as_tuple(std::tie(majors, minors), std::ignore) = + shuffle_values(comm, + thrust::make_zip_iterator(majors.begin(), minors.begin()), + h_tx_value_counts, + handle.get_stream()); + } + } + } + } + + std::cout << " make tuple, rank = " << comm.get_rank() << std::endl; + return std::make_tuple(std::move(majors), + std::move(minors), + std::move(weights), + std::move(edge_ids), + std::move(edge_types)); +} + +} // namespace + +namespace detail { + +template +std::tuple, + rmm::device_uvector, + std::optional>, + std::optional>, + std::optional>> +shuffle_ext_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( + raft::handle_t const& handle, + rmm::device_uvector&& majors, + rmm::device_uvector&& minors, + std::optional>&& weights, + std::optional>&& edge_ids, + std::optional>&& edge_types) +{ + auto& comm = handle.get_comms(); + auto const comm_size = comm.get_size(); + auto& major_comm = handle.get_subcomm(cugraph::partition_manager::major_comm_name()); + auto const major_comm_size = major_comm.get_size(); + auto& minor_comm = handle.get_subcomm(cugraph::partition_manager::minor_comm_name()); + auto const minor_comm_size = minor_comm.get_size(); + + std::cout << " in shuffle... rank = " << comm.get_rank() << std::endl; + + return shuffle_vertex_pairs_with_values_by_gpu_id_impl( + handle, + std::move(majors), + std::move(minors), + std::move(weights), + std::move(edge_ids), + std::move(edge_types), + cugraph::detail::compute_gpu_id_from_ext_edge_endpoints_t{ + comm_size, major_comm_size, minor_comm_size}); +} + +template +std::tuple, + rmm::device_uvector, + std::optional>, + std::optional>, + std::optional>> +shuffle_int_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( + raft::handle_t const& handle, + rmm::device_uvector&& majors, + rmm::device_uvector&& minors, + std::optional>&& weights, + std::optional>&& edge_ids, + std::optional>&& edge_types, + std::vector const& vertex_partition_range_lasts) +{ + auto& comm = handle.get_comms(); + auto const comm_size = comm.get_size(); + auto& major_comm = handle.get_subcomm(cugraph::partition_manager::major_comm_name()); + auto const major_comm_size = major_comm.get_size(); + auto& minor_comm = handle.get_subcomm(cugraph::partition_manager::minor_comm_name()); + auto const minor_comm_size = minor_comm.get_size(); + + rmm::device_uvector d_vertex_partition_range_lasts(vertex_partition_range_lasts.size(), + handle.get_stream()); + raft::update_device(d_vertex_partition_range_lasts.data(), + vertex_partition_range_lasts.data(), + vertex_partition_range_lasts.size(), + handle.get_stream()); + + return shuffle_vertex_pairs_with_values_by_gpu_id_impl( + handle, + std::move(majors), + std::move(minors), + std::move(weights), + std::move(edge_ids), + std::move(edge_types), + cugraph::detail::compute_gpu_id_from_int_edge_endpoints_t{ + raft::device_span(d_vertex_partition_range_lasts.data(), + d_vertex_partition_range_lasts.size()), + comm_size, + major_comm_size, + minor_comm_size}); +} + +} // namespace detail +} // namespace cugraph diff --git a/cpp/src/detail/shuffle_vertex_pairs_int32_int32.cu b/cpp/src/detail/shuffle_vertex_pairs_int32_int32.cu new file mode 100644 index 00000000000..e88fb7d5a91 --- /dev/null +++ b/cpp/src/detail/shuffle_vertex_pairs_int32_int32.cu @@ -0,0 +1,75 @@ +/* + * Copyright (c) 2021-2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include + +namespace cugraph { +namespace detail { + +template std::tuple, + rmm::device_uvector, + std::optional>, + std::optional>, + std::optional>> +shuffle_ext_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( + raft::handle_t const& handle, + rmm::device_uvector&& majors, + rmm::device_uvector&& minors, + std::optional>&& weights, + std::optional>&& edge_ids, + std::optional>&& edge_types); + +template std::tuple, + rmm::device_uvector, + std::optional>, + std::optional>, + std::optional>> +shuffle_ext_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( + raft::handle_t const& handle, + rmm::device_uvector&& majors, + rmm::device_uvector&& minors, + std::optional>&& weights, + std::optional>&& edge_ids, + std::optional>&& edge_types); + +template std::tuple, + rmm::device_uvector, + std::optional>, + std::optional>, + std::optional>> +shuffle_int_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( + raft::handle_t const& handle, + rmm::device_uvector&& majors, + rmm::device_uvector&& minors, + std::optional>&& weights, + std::optional>&& edge_ids, + std::optional>&& edge_types, + std::vector const& vertex_partition_range_lasts); + +template std::tuple, + rmm::device_uvector, + std::optional>, + std::optional>, + std::optional>> +shuffle_int_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( + raft::handle_t const& handle, + rmm::device_uvector&& majors, + rmm::device_uvector&& minors, + std::optional>&& weights, + std::optional>&& edge_ids, + std::optional>&& edge_types, + std::vector const& vertex_partition_range_lasts); +} // namespace detail +} // namespace cugraph diff --git a/cpp/src/detail/shuffle_vertex_pairs_int32_int64.cu b/cpp/src/detail/shuffle_vertex_pairs_int32_int64.cu new file mode 100644 index 00000000000..b6a92bd0db9 --- /dev/null +++ b/cpp/src/detail/shuffle_vertex_pairs_int32_int64.cu @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2021-2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include + +namespace cugraph { +namespace detail { + +template std::tuple, + rmm::device_uvector, + std::optional>, + std::optional>, + std::optional>> +shuffle_ext_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( + raft::handle_t const& handle, + rmm::device_uvector&& majors, + rmm::device_uvector&& minors, + std::optional>&& weights, + std::optional>&& edge_ids, + std::optional>&& edge_types); + +template std::tuple, + rmm::device_uvector, + std::optional>, + std::optional>, + std::optional>> +shuffle_ext_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( + raft::handle_t const& handle, + rmm::device_uvector&& majors, + rmm::device_uvector&& minors, + std::optional>&& weights, + std::optional>&& edge_ids, + std::optional>&& edge_types); + +template std::tuple, + rmm::device_uvector, + std::optional>, + std::optional>, + std::optional>> +shuffle_int_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( + raft::handle_t const& handle, + rmm::device_uvector&& majors, + rmm::device_uvector&& minors, + std::optional>&& weights, + std::optional>&& edge_ids, + std::optional>&& edge_types, + std::vector const& vertex_partition_range_lasts); + +template std::tuple, + rmm::device_uvector, + std::optional>, + std::optional>, + std::optional>> +shuffle_int_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( + raft::handle_t const& handle, + rmm::device_uvector&& majors, + rmm::device_uvector&& minors, + std::optional>&& weights, + std::optional>&& edge_ids, + std::optional>&& edge_types, + std::vector const& vertex_partition_range_lasts); + +} // namespace detail +} // namespace cugraph diff --git a/cpp/src/detail/shuffle_vertex_pairs_int64_int64.cu b/cpp/src/detail/shuffle_vertex_pairs_int64_int64.cu new file mode 100644 index 00000000000..5b2b271456a --- /dev/null +++ b/cpp/src/detail/shuffle_vertex_pairs_int64_int64.cu @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2021-2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include + +namespace cugraph { +namespace detail { + +template std::tuple, + rmm::device_uvector, + std::optional>, + std::optional>, + std::optional>> +shuffle_ext_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( + raft::handle_t const& handle, + rmm::device_uvector&& majors, + rmm::device_uvector&& minors, + std::optional>&& weights, + std::optional>&& edge_ids, + std::optional>&& edge_types); + +template std::tuple, + rmm::device_uvector, + std::optional>, + std::optional>, + std::optional>> +shuffle_ext_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( + raft::handle_t const& handle, + rmm::device_uvector&& majors, + rmm::device_uvector&& minors, + std::optional>&& weights, + std::optional>&& edge_ids, + std::optional>&& edge_types); + +template std::tuple, + rmm::device_uvector, + std::optional>, + std::optional>, + std::optional>> +shuffle_int_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( + raft::handle_t const& handle, + rmm::device_uvector&& majors, + rmm::device_uvector&& minors, + std::optional>&& weights, + std::optional>&& edge_ids, + std::optional>&& edge_types, + std::vector const& vertex_partition_range_lasts); + +template std::tuple, + rmm::device_uvector, + std::optional>, + std::optional>, + std::optional>> +shuffle_int_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( + raft::handle_t const& handle, + rmm::device_uvector&& majors, + rmm::device_uvector&& minors, + std::optional>&& weights, + std::optional>&& edge_ids, + std::optional>&& edge_types, + std::vector const& vertex_partition_range_lasts); + +} // namespace detail +} // namespace cugraph From 3d576932062ad422da145212b79f72af289bcdfb Mon Sep 17 00:00:00 2001 From: Charles Hastings Date: Thu, 12 Oct 2023 11:57:19 -0700 Subject: [PATCH 09/15] test first working version --- .../mtmg/detail/per_device_edgelist.hpp | 2 - cpp/include/cugraph/mtmg/resource_manager.hpp | 7 +- .../cugraph/utilities/shuffle_comm.cuh | 1 + cpp/src/detail/shuffle_vertex_pairs.cu | 522 ------------------ cpp/src/detail/shuffle_vertex_pairs.cuh | 20 - cpp/tests/mtmg/multi_node_threaded_test.cu | 28 +- 6 files changed, 6 insertions(+), 574 deletions(-) delete mode 100644 cpp/src/detail/shuffle_vertex_pairs.cu diff --git a/cpp/include/cugraph/mtmg/detail/per_device_edgelist.hpp b/cpp/include/cugraph/mtmg/detail/per_device_edgelist.hpp index ba4afb2228e..8011146ee4f 100644 --- a/cpp/include/cugraph/mtmg/detail/per_device_edgelist.hpp +++ b/cpp/include/cugraph/mtmg/detail/per_device_edgelist.hpp @@ -205,8 +205,6 @@ class per_device_edgelist_t { auto tmp_edge_type = edge_type_ ? std::make_optional(std::move((*edge_type_)[0])) : std::nullopt; - std::cout << "calling detail::shuffle... rank = " << handle.get_rank() - << ", size = " << src_[0].size() << std::endl; std::tie(store_transposed ? dst_[0] : src_[0], store_transposed ? src_[0] : dst_[0], tmp_wgt, diff --git a/cpp/include/cugraph/mtmg/resource_manager.hpp b/cpp/include/cugraph/mtmg/resource_manager.hpp index ff9c8e485ae..7f244128522 100644 --- a/cpp/include/cugraph/mtmg/resource_manager.hpp +++ b/cpp/include/cugraph/mtmg/resource_manager.hpp @@ -89,6 +89,9 @@ class resource_manager_t { // There is a deprecated environment variable: NCCL_LAUNCH_MODE=GROUP // which should temporarily work around this problem. // + // Further NOTE: multi-node requires the NCCL_LAUNCH_MODE=GROUP feature + // to be enabled even with the pool memory resource. + // // Ultimately there should be some RMM parameters passed into this function // (or the constructor of the object) to configure this behavior #if 0 @@ -209,11 +212,7 @@ class resource_manager_t { int rank = local_ranks_to_include[idx]; RAFT_CUDA_TRY(cudaSetDevice(device_ids[idx].value())); - std::cout << "calling init_subcomm, rank = " << rank << ", comm_size = " << comm_size - << ", gpu_row_comm_size = " << gpu_row_comm_size << std::endl; - // This one requires paralellism, I believe cugraph::partition_manager::init_subcomm(*handles[idx], gpu_row_comm_size); - std::cout << "finished initialization thread, rank = " << rank << std::endl; }); } diff --git a/cpp/include/cugraph/utilities/shuffle_comm.cuh b/cpp/include/cugraph/utilities/shuffle_comm.cuh index 6a260144324..f338b676138 100644 --- a/cpp/include/cugraph/utilities/shuffle_comm.cuh +++ b/cpp/include/cugraph/utilities/shuffle_comm.cuh @@ -828,6 +828,7 @@ auto shuffle_values(raft::comms::comms_t const& comm, std::vector rx_counts{}; std::vector rx_offsets{}; std::vector rx_src_ranks{}; + std::tie(tx_counts, tx_offsets, tx_dst_ranks, rx_counts, rx_offsets, rx_src_ranks) = detail::compute_tx_rx_counts_offsets_ranks(comm, d_tx_value_counts, stream_view); diff --git a/cpp/src/detail/shuffle_vertex_pairs.cu b/cpp/src/detail/shuffle_vertex_pairs.cu deleted file mode 100644 index eb81e21c017..00000000000 --- a/cpp/src/detail/shuffle_vertex_pairs.cu +++ /dev/null @@ -1,522 +0,0 @@ -/* - * Copyright (c) 2021-2023, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include - -#include -#include -#include -#include -#include - -#include -#include - -#include - -namespace cugraph { - -namespace { - -template -std::tuple, - rmm::device_uvector, - std::optional>, - std::optional>, - std::optional>> -shuffle_vertex_pairs_with_values_by_gpu_id_impl( - raft::handle_t const& handle, - rmm::device_uvector&& majors, - rmm::device_uvector&& minors, - std::optional>&& weights, - std::optional>&& edge_ids, - std::optional>&& edge_types, - func_t func) -{ - auto& comm = handle.get_comms(); - auto const comm_size = comm.get_size(); - - auto total_global_mem = handle.get_device_properties().totalGlobalMem; - auto element_size = sizeof(vertex_t) * 2 + (weights ? sizeof(weight_t) : size_t{0}) + - (edge_ids ? sizeof(edge_t) : size_t{0}) + - (edge_types ? sizeof(edge_type_t) : size_t{0}); - auto constexpr mem_frugal_ratio = - 0.1; // if the expected temporary buffer size exceeds the mem_frugal_ratio of the - // total_global_mem, switch to the memory frugal approach (thrust::sort is used to - // group-by by default, and thrust::sort requires temporary buffer comparable to the input - // data size) - auto mem_frugal_threshold = - static_cast(static_cast(total_global_mem / element_size) * mem_frugal_ratio); - - auto mem_frugal_flag = - host_scalar_allreduce(comm, - majors.size() > mem_frugal_threshold ? int{1} : int{0}, - raft::comms::op_t::MAX, - handle.get_stream()); - - // invoke groupby_and_count and shuffle values to pass mem_frugal_threshold instead of directly - // calling groupby_gpu_id_and_shuffle_values there is no benefit in reducing peak memory as we - // need to allocate a receive buffer anyways) but this reduces the maximum memory allocation size - // by half or more (thrust::sort used inside the groupby_and_count allocates the entire temporary - // buffer in a single chunk, and the pool allocator often cannot handle a large single allocation - // (due to fragmentation) even when the remaining free memory in aggregate is significantly larger - // than the requested size). - - // FIXME: Consider a generic function that takes a value tuple of optionals - // to eliminate this complexity - rmm::device_uvector d_tx_value_counts(0, handle.get_stream()); - - if (weights) { - if (edge_ids) { - if (edge_types) { - d_tx_value_counts = cugraph::groupby_and_count( - thrust::make_zip_iterator(majors.begin(), - minors.begin(), - weights->begin(), - edge_ids->begin(), - edge_types->begin()), - thrust::make_zip_iterator( - majors.end(), minors.end(), weights->end(), edge_ids->end(), edge_types->end()), - [func] __device__(auto val) { return func(thrust::get<0>(val), thrust::get<1>(val)); }, - comm_size, - mem_frugal_threshold, - handle.get_stream()); - } else { - d_tx_value_counts = cugraph::groupby_and_count( - thrust::make_zip_iterator( - majors.begin(), minors.begin(), weights->begin(), edge_ids->begin()), - thrust::make_zip_iterator(majors.end(), minors.end(), weights->end(), edge_ids->end()), - [func] __device__(auto val) { return func(thrust::get<0>(val), thrust::get<1>(val)); }, - comm_size, - mem_frugal_threshold, - handle.get_stream()); - } - } else { - if (edge_types) { - d_tx_value_counts = cugraph::groupby_and_count( - thrust::make_zip_iterator( - majors.begin(), minors.begin(), weights->begin(), edge_types->begin()), - thrust::make_zip_iterator(majors.end(), minors.end(), weights->end(), edge_types->end()), - [func] __device__(auto val) { return func(thrust::get<0>(val), thrust::get<1>(val)); }, - comm_size, - mem_frugal_threshold, - handle.get_stream()); - } else { - d_tx_value_counts = cugraph::groupby_and_count( - thrust::make_zip_iterator(majors.begin(), minors.begin(), weights->begin()), - thrust::make_zip_iterator(majors.end(), minors.end(), weights->end()), - [func] __device__(auto val) { return func(thrust::get<0>(val), thrust::get<1>(val)); }, - comm_size, - mem_frugal_threshold, - handle.get_stream()); - } - } - } else { - if (edge_ids) { - if (edge_types) { - d_tx_value_counts = cugraph::groupby_and_count( - thrust::make_zip_iterator( - majors.begin(), minors.begin(), edge_ids->begin(), edge_types->begin()), - thrust::make_zip_iterator(majors.end(), minors.end(), edge_ids->end(), edge_types->end()), - [func] __device__(auto val) { return func(thrust::get<0>(val), thrust::get<1>(val)); }, - comm_size, - mem_frugal_threshold, - handle.get_stream()); - } else { - d_tx_value_counts = cugraph::groupby_and_count( - thrust::make_zip_iterator(majors.begin(), minors.begin(), edge_ids->begin()), - thrust::make_zip_iterator(majors.end(), minors.end(), edge_ids->end()), - [func] __device__(auto val) { return func(thrust::get<0>(val), thrust::get<1>(val)); }, - comm_size, - mem_frugal_threshold, - handle.get_stream()); - } - } else { - if (edge_types) { - d_tx_value_counts = cugraph::groupby_and_count( - thrust::make_zip_iterator(majors.begin(), minors.begin(), edge_types->begin()), - thrust::make_zip_iterator(majors.end(), minors.end(), edge_types->end()), - [func] __device__(auto val) { return func(thrust::get<0>(val), thrust::get<1>(val)); }, - comm_size, - mem_frugal_threshold, - handle.get_stream()); - } else { - d_tx_value_counts = cugraph::groupby_and_count( - thrust::make_zip_iterator(majors.begin(), minors.begin()), - thrust::make_zip_iterator(majors.end(), minors.end()), - [func] __device__(auto val) { return func(thrust::get<0>(val), thrust::get<1>(val)); }, - comm_size, - mem_frugal_threshold, - handle.get_stream()); - } - } - } - - std::vector h_tx_value_counts(d_tx_value_counts.size()); - raft::update_host(h_tx_value_counts.data(), - d_tx_value_counts.data(), - d_tx_value_counts.size(), - handle.get_stream()); - handle.sync_stream(); - - if (mem_frugal_flag) { // trade-off potential parallelism to lower peak memory - std::tie(majors, std::ignore) = - shuffle_values(comm, majors.begin(), h_tx_value_counts, handle.get_stream()); - - std::tie(minors, std::ignore) = - shuffle_values(comm, minors.begin(), h_tx_value_counts, handle.get_stream()); - - if (weights) { - std::tie(weights, std::ignore) = - shuffle_values(comm, (*weights).begin(), h_tx_value_counts, handle.get_stream()); - } - - if (edge_ids) { - std::tie(edge_ids, std::ignore) = - shuffle_values(comm, (*edge_ids).begin(), h_tx_value_counts, handle.get_stream()); - } - - if (edge_types) { - std::tie(edge_types, std::ignore) = - shuffle_values(comm, (*edge_types).begin(), h_tx_value_counts, handle.get_stream()); - } - } else { - if (weights) { - if (edge_ids) { - if (edge_types) { - std::forward_as_tuple(std::tie(majors, minors, weights, edge_ids, edge_types), - std::ignore) = - shuffle_values(comm, - thrust::make_zip_iterator(majors.begin(), - minors.begin(), - weights->begin(), - edge_ids->begin(), - edge_types->begin()), - h_tx_value_counts, - handle.get_stream()); - } else { - std::forward_as_tuple(std::tie(majors, minors, weights, edge_ids), std::ignore) = - shuffle_values(comm, - thrust::make_zip_iterator( - majors.begin(), minors.begin(), weights->begin(), edge_ids->begin()), - h_tx_value_counts, - handle.get_stream()); - } - } else { - if (edge_types) { - std::forward_as_tuple(std::tie(majors, minors, weights, edge_types), std::ignore) = - shuffle_values(comm, - thrust::make_zip_iterator( - majors.begin(), minors.begin(), weights->begin(), edge_types->begin()), - h_tx_value_counts, - handle.get_stream()); - } else { - std::forward_as_tuple(std::tie(majors, minors, weights), std::ignore) = shuffle_values( - comm, - thrust::make_zip_iterator(majors.begin(), minors.begin(), weights->begin()), - h_tx_value_counts, - handle.get_stream()); - } - } - } else { - if (edge_ids) { - if (edge_types) { - std::forward_as_tuple(std::tie(majors, minors, edge_ids, edge_types), std::ignore) = - shuffle_values( - comm, - thrust::make_zip_iterator( - majors.begin(), minors.begin(), edge_ids->begin(), edge_types->begin()), - h_tx_value_counts, - handle.get_stream()); - } else { - std::forward_as_tuple(std::tie(majors, minors, edge_ids), std::ignore) = shuffle_values( - comm, - thrust::make_zip_iterator(majors.begin(), minors.begin(), edge_ids->begin()), - h_tx_value_counts, - handle.get_stream()); - } - } else { - if (edge_types) { - std::forward_as_tuple(std::tie(majors, minors, edge_types), std::ignore) = shuffle_values( - comm, - thrust::make_zip_iterator(majors.begin(), minors.begin(), edge_types->begin()), - h_tx_value_counts, - handle.get_stream()); - } else { - std::forward_as_tuple(std::tie(majors, minors), std::ignore) = - shuffle_values(comm, - thrust::make_zip_iterator(majors.begin(), minors.begin()), - h_tx_value_counts, - handle.get_stream()); - } - } - } - } - - return std::make_tuple(std::move(majors), - std::move(minors), - std::move(weights), - std::move(edge_ids), - std::move(edge_types)); -} - -} // namespace - -namespace detail { - -template -std::tuple, - rmm::device_uvector, - std::optional>, - std::optional>, - std::optional>> -shuffle_ext_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( - raft::handle_t const& handle, - rmm::device_uvector&& majors, - rmm::device_uvector&& minors, - std::optional>&& weights, - std::optional>&& edge_ids, - std::optional>&& edge_types) -{ - auto& comm = handle.get_comms(); - auto const comm_size = comm.get_size(); - auto& major_comm = handle.get_subcomm(cugraph::partition_manager::major_comm_name()); - auto const major_comm_size = major_comm.get_size(); - auto& minor_comm = handle.get_subcomm(cugraph::partition_manager::minor_comm_name()); - auto const minor_comm_size = minor_comm.get_size(); - - return shuffle_vertex_pairs_with_values_by_gpu_id_impl( - handle, - std::move(majors), - std::move(minors), - std::move(weights), - std::move(edge_ids), - std::move(edge_types), - cugraph::detail::compute_gpu_id_from_ext_edge_endpoints_t{ - comm_size, major_comm_size, minor_comm_size}); -} - -template -std::tuple, - rmm::device_uvector, - std::optional>, - std::optional>, - std::optional>> -shuffle_int_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( - raft::handle_t const& handle, - rmm::device_uvector&& majors, - rmm::device_uvector&& minors, - std::optional>&& weights, - std::optional>&& edge_ids, - std::optional>&& edge_types, - std::vector const& vertex_partition_range_lasts) -{ - auto& comm = handle.get_comms(); - auto const comm_size = comm.get_size(); - auto& major_comm = handle.get_subcomm(cugraph::partition_manager::major_comm_name()); - auto const major_comm_size = major_comm.get_size(); - auto& minor_comm = handle.get_subcomm(cugraph::partition_manager::minor_comm_name()); - auto const minor_comm_size = minor_comm.get_size(); - - rmm::device_uvector d_vertex_partition_range_lasts(vertex_partition_range_lasts.size(), - handle.get_stream()); - raft::update_device(d_vertex_partition_range_lasts.data(), - vertex_partition_range_lasts.data(), - vertex_partition_range_lasts.size(), - handle.get_stream()); - - return shuffle_vertex_pairs_with_values_by_gpu_id_impl( - handle, - std::move(majors), - std::move(minors), - std::move(weights), - std::move(edge_ids), - std::move(edge_types), - cugraph::detail::compute_gpu_id_from_int_edge_endpoints_t{ - raft::device_span(d_vertex_partition_range_lasts.data(), - d_vertex_partition_range_lasts.size()), - comm_size, - major_comm_size, - minor_comm_size}); -} - -template std::tuple, - rmm::device_uvector, - std::optional>, - std::optional>, - std::optional>> -shuffle_ext_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( - raft::handle_t const& handle, - rmm::device_uvector&& majors, - rmm::device_uvector&& minors, - std::optional>&& weights, - std::optional>&& edge_ids, - std::optional>&& edge_types); - -template std::tuple, - rmm::device_uvector, - std::optional>, - std::optional>, - std::optional>> -shuffle_ext_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( - raft::handle_t const& handle, - rmm::device_uvector&& majors, - rmm::device_uvector&& minors, - std::optional>&& weights, - std::optional>&& edge_ids, - std::optional>&& edge_types); - -template std::tuple, - rmm::device_uvector, - std::optional>, - std::optional>, - std::optional>> -shuffle_ext_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( - raft::handle_t const& handle, - rmm::device_uvector&& majors, - rmm::device_uvector&& minors, - std::optional>&& weights, - std::optional>&& edge_ids, - std::optional>&& edge_types); - -template std::tuple, - rmm::device_uvector, - std::optional>, - std::optional>, - std::optional>> -shuffle_ext_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( - raft::handle_t const& handle, - rmm::device_uvector&& majors, - rmm::device_uvector&& minors, - std::optional>&& weights, - std::optional>&& edge_ids, - std::optional>&& edge_types); - -template std::tuple, - rmm::device_uvector, - std::optional>, - std::optional>, - std::optional>> -shuffle_ext_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( - raft::handle_t const& handle, - rmm::device_uvector&& majors, - rmm::device_uvector&& minors, - std::optional>&& weights, - std::optional>&& edge_ids, - std::optional>&& edge_types); - -template std::tuple, - rmm::device_uvector, - std::optional>, - std::optional>, - std::optional>> -shuffle_ext_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( - raft::handle_t const& handle, - rmm::device_uvector&& majors, - rmm::device_uvector&& minors, - std::optional>&& weights, - std::optional>&& edge_ids, - std::optional>&& edge_types); - -template std::tuple, - rmm::device_uvector, - std::optional>, - std::optional>, - std::optional>> -shuffle_int_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( - raft::handle_t const& handle, - rmm::device_uvector&& majors, - rmm::device_uvector&& minors, - std::optional>&& weights, - std::optional>&& edge_ids, - std::optional>&& edge_types, - std::vector const& vertex_partition_range_lasts); - -template std::tuple, - rmm::device_uvector, - std::optional>, - std::optional>, - std::optional>> -shuffle_int_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( - raft::handle_t const& handle, - rmm::device_uvector&& majors, - rmm::device_uvector&& minors, - std::optional>&& weights, - std::optional>&& edge_ids, - std::optional>&& edge_types, - std::vector const& vertex_partition_range_lasts); - -template std::tuple, - rmm::device_uvector, - std::optional>, - std::optional>, - std::optional>> -shuffle_int_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( - raft::handle_t const& handle, - rmm::device_uvector&& majors, - rmm::device_uvector&& minors, - std::optional>&& weights, - std::optional>&& edge_ids, - std::optional>&& edge_types, - std::vector const& vertex_partition_range_lasts); - -template std::tuple, - rmm::device_uvector, - std::optional>, - std::optional>, - std::optional>> -shuffle_int_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( - raft::handle_t const& handle, - rmm::device_uvector&& majors, - rmm::device_uvector&& minors, - std::optional>&& weights, - std::optional>&& edge_ids, - std::optional>&& edge_types, - std::vector const& vertex_partition_range_lasts); - -template std::tuple, - rmm::device_uvector, - std::optional>, - std::optional>, - std::optional>> -shuffle_int_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( - raft::handle_t const& handle, - rmm::device_uvector&& majors, - rmm::device_uvector&& minors, - std::optional>&& weights, - std::optional>&& edge_ids, - std::optional>&& edge_types, - std::vector const& vertex_partition_range_lasts); - -template std::tuple, - rmm::device_uvector, - std::optional>, - std::optional>, - std::optional>> -shuffle_int_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( - raft::handle_t const& handle, - rmm::device_uvector&& majors, - rmm::device_uvector&& minors, - std::optional>&& weights, - std::optional>&& edge_ids, - std::optional>&& edge_types, - std::vector const& vertex_partition_range_lasts); - -} // namespace detail -} // namespace cugraph diff --git a/cpp/src/detail/shuffle_vertex_pairs.cuh b/cpp/src/detail/shuffle_vertex_pairs.cuh index 8bfe86a905e..4d036fd85f9 100644 --- a/cpp/src/detail/shuffle_vertex_pairs.cuh +++ b/cpp/src/detail/shuffle_vertex_pairs.cuh @@ -66,15 +66,11 @@ shuffle_vertex_pairs_with_values_by_gpu_id_impl( auto mem_frugal_threshold = static_cast(static_cast(total_global_mem / element_size) * mem_frugal_ratio); -#if 0 auto mem_frugal_flag = host_scalar_allreduce(comm, majors.size() > mem_frugal_threshold ? int{1} : int{0}, raft::comms::op_t::MAX, handle.get_stream()); -#else - bool mem_frugal_flag{true}; -#endif // invoke groupby_and_count and shuffle values to pass mem_frugal_threshold instead of directly // calling groupby_gpu_id_and_shuffle_values there is no benefit in reducing peak memory as we @@ -124,8 +120,6 @@ shuffle_vertex_pairs_with_values_by_gpu_id_impl( mem_frugal_threshold, handle.get_stream()); } else { - std::cout << " just weights, calling group_by_and_count, rank = " << comm.get_rank() - << std::endl; d_tx_value_counts = cugraph::groupby_and_count( thrust::make_zip_iterator(majors.begin(), minors.begin(), weights->begin()), thrust::make_zip_iterator(majors.end(), minors.end(), weights->end()), @@ -133,7 +127,6 @@ shuffle_vertex_pairs_with_values_by_gpu_id_impl( comm_size, mem_frugal_threshold, handle.get_stream()); - std::cout << " just weights finished, rank = " << comm.get_rank() << std::endl; } } } else { @@ -166,8 +159,6 @@ shuffle_vertex_pairs_with_values_by_gpu_id_impl( mem_frugal_threshold, handle.get_stream()); } else { - std::cout << " no weights, calling group_by_and_count, rank = " << comm.get_rank() - << std::endl; d_tx_value_counts = cugraph::groupby_and_count( thrust::make_zip_iterator(majors.begin(), minors.begin()), thrust::make_zip_iterator(majors.end(), minors.end()), @@ -186,11 +177,7 @@ shuffle_vertex_pairs_with_values_by_gpu_id_impl( handle.get_stream()); handle.sync_stream(); - std::cout << " mem_frugal_flag check, rank = " << comm.get_rank() << std::endl; - if (mem_frugal_flag) { // trade-off potential parallelism to lower peak memory - std::cout << " mem_frugal_flag true, shuffle values, rank = " << comm.get_rank() << std::endl; - std::tie(majors, std::ignore) = shuffle_values(comm, majors.begin(), h_tx_value_counts, handle.get_stream()); @@ -242,8 +229,6 @@ shuffle_vertex_pairs_with_values_by_gpu_id_impl( h_tx_value_counts, handle.get_stream()); } else { - std::cout << " mem_frugal_flag false, shuffle values, rank = " << comm.get_rank() - << std::endl; std::forward_as_tuple(std::tie(majors, minors, weights), std::ignore) = shuffle_values( comm, thrust::make_zip_iterator(majors.begin(), minors.begin(), weights->begin()), @@ -276,8 +261,6 @@ shuffle_vertex_pairs_with_values_by_gpu_id_impl( h_tx_value_counts, handle.get_stream()); } else { - std::cout << " mem_frugal_flag false, shuffle values no weights, rank = " - << comm.get_rank() << std::endl; std::forward_as_tuple(std::tie(majors, minors), std::ignore) = shuffle_values(comm, thrust::make_zip_iterator(majors.begin(), minors.begin()), @@ -288,7 +271,6 @@ shuffle_vertex_pairs_with_values_by_gpu_id_impl( } } - std::cout << " make tuple, rank = " << comm.get_rank() << std::endl; return std::make_tuple(std::move(majors), std::move(minors), std::move(weights), @@ -321,8 +303,6 @@ shuffle_ext_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( auto& minor_comm = handle.get_subcomm(cugraph::partition_manager::minor_comm_name()); auto const minor_comm_size = minor_comm.get_size(); - std::cout << " in shuffle... rank = " << comm.get_rank() << std::endl; - return shuffle_vertex_pairs_with_values_by_gpu_id_impl( handle, std::move(majors), diff --git a/cpp/tests/mtmg/multi_node_threaded_test.cu b/cpp/tests/mtmg/multi_node_threaded_test.cu index 5f9a184eec6..7793575e3c9 100644 --- a/cpp/tests/mtmg/multi_node_threaded_test.cu +++ b/cpp/tests/mtmg/multi_node_threaded_test.cu @@ -195,17 +195,9 @@ class Tests_Multithreaded } } - std::cout << "creating instance manager" << std::endl; - raft::print_host_vector(" registered ranks", - resource_manager.registered_ranks().data(), - resource_manager.registered_ranks().size(), - std::cout); - auto instance_manager = resource_manager.create_instance_manager( resource_manager.registered_ranks(), instance_manager_id); - std::cout << "done creating instance manager" << std::endl; - cugraph::mtmg::edgelist_t edgelist; cugraph::mtmg::graph_t graph; cugraph::mtmg::graph_view_t graph_view; @@ -219,8 +211,6 @@ class Tests_Multithreaded weight_t>>() : std::nullopt; - std::cout << "create edge list" << std::endl; - // // Simulate graph creation by spawning threads to walk through the // local COO and add edges @@ -246,8 +236,6 @@ class Tests_Multithreaded running_threads.resize(0); instance_manager->reset_threads(); - std::cout << "load edges" << std::endl; - // Load SG edge list auto [d_src_v, d_dst_v, d_weights_v, d_vertices_v, is_symmetric] = input_usecase.template construct_edgelist( @@ -300,8 +288,6 @@ class Tests_Multithreaded running_threads.resize(0); instance_manager->reset_threads(); - std::cout << "create graph" << std::endl; - for (int i = 0; i < num_local_gpus; ++i) { running_threads.emplace_back([&instance_manager, &graph, @@ -326,17 +312,9 @@ class Tests_Multithreaded int32_t>> edge_types{std::nullopt}; - std::cout << "calling finalize_buffer, rank = " << thread_handle.get_rank() << std::endl; - edgelist.finalize_buffer(thread_handle); - - std::cout << "calling consolidate_and_shuffle, rank = " << thread_handle.get_rank() - << ", total edges = " << h_src_v.size() << std::endl; edgelist.consolidate_and_shuffle(thread_handle, true); - std::cout << "calling create_graph_from_edgelist, rank = " << thread_handle.get_rank() - << std::endl; - cugraph::mtmg:: create_graph_from_edgelist( thread_handle, @@ -357,8 +335,6 @@ class Tests_Multithreaded running_threads.resize(0); instance_manager->reset_threads(); - std::cout << "call pagerank" << std::endl; - graph_view = graph.view(); for (int i = 0; i < num_threads; ++i) { @@ -527,13 +503,13 @@ using Tests_Multithreaded_Rmat = Tests_Multithreaded( - override_File_Usecase_with_cmd_line_arguments(GetParam()), std::vector{{0, 1}}); + override_File_Usecase_with_cmd_line_arguments(GetParam()), get_gpu_list()); } TEST_P(Tests_Multithreaded_Rmat, CheckInt32Int32FloatFloat) { run_current_test( - override_Rmat_Usecase_with_cmd_line_arguments(GetParam()), std::vector{{0, 1}}); + override_Rmat_Usecase_with_cmd_line_arguments(GetParam()), get_gpu_list()); } INSTANTIATE_TEST_SUITE_P(file_test, From d72bdedc9146b09f10a026e9327bfe89bba7af54 Mon Sep 17 00:00:00 2001 From: Charles Hastings Date: Thu, 12 Oct 2023 14:14:07 -0700 Subject: [PATCH 10/15] a little code cleanup --- cpp/include/cugraph/mtmg/resource_manager.hpp | 31 +++++++++---------- cpp/tests/mtmg/multi_node_threaded_test.cu | 2 +- 2 files changed, 16 insertions(+), 17 deletions(-) diff --git a/cpp/include/cugraph/mtmg/resource_manager.hpp b/cpp/include/cugraph/mtmg/resource_manager.hpp index 7f244128522..de619ee9b7b 100644 --- a/cpp/include/cugraph/mtmg/resource_manager.hpp +++ b/cpp/include/cugraph/mtmg/resource_manager.hpp @@ -114,17 +114,16 @@ class resource_manager_t { /** * @brief add a remote GPU to the resource manager. * - * @param rank The rank to assign to the local GPU - * @param remode_node_rank The rank assigned to the remote node + * @param rank The rank to assign to the remote GPU */ - void register_remote_gpu(int rank, int remote_node_rank) + void register_remote_gpu(int rank) { std::lock_guard lock(lock_); - CUGRAPH_EXPECTS(remote_rank_map_.find(rank) == remote_rank_map_.end(), + CUGRAPH_EXPECTS(remote_rank_set_.find(rank) == remote_rank_set_.end(), "cannot register same rank multiple times"); - remote_rank_map_.insert(std::pair(rank, remote_node_rank)); + remote_rank_set_.insert(rank); } /** @@ -151,7 +150,7 @@ class resource_manager_t { std::for_each(ranks_to_include.begin(), ranks_to_include.end(), [&local_ranks = local_rank_map_, - &remote_ranks = remote_rank_map_, + &remote_ranks = remote_rank_set_, &local_ranks_to_include](int rank) { if (local_ranks.find(rank) == local_ranks.end()) { CUGRAPH_EXPECTS(remote_ranks.find(rank) != remote_ranks.end(), @@ -231,21 +230,21 @@ class resource_manager_t { { std::lock_guard lock(lock_); - // - // C++20 mechanism: - // return std::vector{ std::views::keys(local_rank_map_).begin(), - // std::views::keys(local_rank_map_).end() }; - // Would need a bit more complicated to handle remote_rank_map_ also - // - std::vector registered_ranks(local_rank_map_.size() + remote_rank_map_.size()); + std::vector registered_ranks(local_rank_map_.size() + remote_rank_set_.size()); std::transform( local_rank_map_.begin(), local_rank_map_.end(), registered_ranks.begin(), [](auto pair) { return pair.first; }); - std::transform(remote_rank_map_.begin(), - remote_rank_map_.end(), +#if 0 + std::transform(remote_rank_set_.begin(), + remote_rank_set_.end(), registered_ranks.begin() + local_rank_map_.size(), [](auto pair) { return pair.first; }); +#else + std::copy(remote_rank_set_.begin(), + remote_rank_set_.end(), + registered_ranks.begin() + local_rank_map_.size()); +#endif std::sort(registered_ranks.begin(), registered_ranks.end()); return registered_ranks; @@ -254,7 +253,7 @@ class resource_manager_t { private: mutable std::mutex lock_{}; std::map local_rank_map_{}; - std::map remote_rank_map_{}; + std::set remote_rank_set_{}; std::map> per_device_rmm_resources_{}; }; diff --git a/cpp/tests/mtmg/multi_node_threaded_test.cu b/cpp/tests/mtmg/multi_node_threaded_test.cu index 7793575e3c9..79ce299972c 100644 --- a/cpp/tests/mtmg/multi_node_threaded_test.cu +++ b/cpp/tests/mtmg/multi_node_threaded_test.cu @@ -185,7 +185,7 @@ class Tests_Multithreaded num_gpus_file.close(); for (int j = 0; j < num_gpus_this_node; ++j) { - resource_manager.register_remote_gpu(node_rank++, i); + resource_manager.register_remote_gpu(node_rank++); } } else { std::for_each( From c413f38a96e72e465b39395fa34be39c74855022 Mon Sep 17 00:00:00 2001 From: Charles Hastings Date: Thu, 12 Oct 2023 14:24:53 -0700 Subject: [PATCH 11/15] add extra error checks --- cpp/include/cugraph/mtmg/resource_manager.hpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cpp/include/cugraph/mtmg/resource_manager.hpp b/cpp/include/cugraph/mtmg/resource_manager.hpp index de619ee9b7b..f396dfc8149 100644 --- a/cpp/include/cugraph/mtmg/resource_manager.hpp +++ b/cpp/include/cugraph/mtmg/resource_manager.hpp @@ -70,6 +70,8 @@ class resource_manager_t { { std::lock_guard lock(lock_); + CUGRAPH_EXPECTS(remote_rank_set_.find(rank) == remote_rank_set_.end(), + "cannot register same rank as local and remote"); CUGRAPH_EXPECTS(local_rank_map_.find(rank) == local_rank_map_.end(), "cannot register same rank multiple times"); @@ -120,6 +122,8 @@ class resource_manager_t { { std::lock_guard lock(lock_); + CUGRAPH_EXPECTS(local_rank_map_.find(rank) == local_rank_map_.end(), + "cannot register same rank as local and remote"); CUGRAPH_EXPECTS(remote_rank_set_.find(rank) == remote_rank_set_.end(), "cannot register same rank multiple times"); From 006cf0b9262d88dda8720060a0b54b7f61d32509 Mon Sep 17 00:00:00 2001 From: Charles Hastings Date: Tue, 24 Oct 2023 11:05:12 -0700 Subject: [PATCH 12/15] clean up a few things --- cpp/include/cugraph/mtmg/resource_manager.hpp | 8 +------- cpp/tests/mtmg/multi_node_threaded_test.cu | 12 ++---------- 2 files changed, 3 insertions(+), 17 deletions(-) diff --git a/cpp/include/cugraph/mtmg/resource_manager.hpp b/cpp/include/cugraph/mtmg/resource_manager.hpp index f396dfc8149..53eed0c18a4 100644 --- a/cpp/include/cugraph/mtmg/resource_manager.hpp +++ b/cpp/include/cugraph/mtmg/resource_manager.hpp @@ -239,16 +239,10 @@ class resource_manager_t { local_rank_map_.begin(), local_rank_map_.end(), registered_ranks.begin(), [](auto pair) { return pair.first; }); -#if 0 - std::transform(remote_rank_set_.begin(), - remote_rank_set_.end(), - registered_ranks.begin() + local_rank_map_.size(), - [](auto pair) { return pair.first; }); -#else + std::copy(remote_rank_set_.begin(), remote_rank_set_.end(), registered_ranks.begin() + local_rank_map_.size()); -#endif std::sort(registered_ranks.begin(), registered_ranks.end()); return registered_ranks; diff --git a/cpp/tests/mtmg/multi_node_threaded_test.cu b/cpp/tests/mtmg/multi_node_threaded_test.cu index 79ce299972c..b0f6d9e95c9 100644 --- a/cpp/tests/mtmg/multi_node_threaded_test.cu +++ b/cpp/tests/mtmg/multi_node_threaded_test.cu @@ -133,7 +133,8 @@ class Tests_Multithreaded // // This is intended to mimic a multi-node host application (non-MPI) integrating // with MTMG library. This is a simple implementation using a shared file system - // to pass configuration messages. + // to pass configuration messages. Terribly inefficient, but should mimic + // expected behavior. // ncclUniqueId instance_manager_id; int execution_id = g_execution_id++; @@ -261,15 +262,6 @@ class Tests_Multithreaded per_thread_edgelist(edgelist.get(thread_handle), thread_buffer_size); for (size_t j = starting_edge_offset; j < h_src_v.size(); j += stride) { -#if 0 - if (h_weights_v) { - thread_edgelist.append( - thread_handle, h_src_v[j], h_dst_v[j], (*h_weights_v)[j], std::nullopt, std::nullopt); - } else { - thread_edgelist.append( - thread_handle, h_src_v[j], h_dst_v[j], std::nullopt, std::nullopt, std::nullopt); - } -#endif per_thread_edgelist.append( thread_handle, h_src_v[j], From 6178f36dffe9c195e4ed61f18073da4799f18ed6 Mon Sep 17 00:00:00 2001 From: Charles Hastings Date: Thu, 2 Nov 2023 10:28:50 -0700 Subject: [PATCH 13/15] address PR comments --- cpp/CMakeLists.txt | 4 +- cpp/include/cugraph/mtmg/instance_manager.hpp | 15 +- cpp/include/cugraph/mtmg/resource_manager.hpp | 74 ++++---- ...rtex_pairs.cuh => shuffle_vertex_pairs.cu} | 164 ++++++++++++++++- .../shuffle_vertex_pairs_int32_int32.cu | 75 -------- .../shuffle_vertex_pairs_int32_int64.cu | 76 -------- .../shuffle_vertex_pairs_int64_int64.cu | 76 -------- cpp/tests/CMakeLists.txt | 4 +- cpp/tests/mtmg/multi_node_threaded_test.cu | 173 +++++------------- 9 files changed, 261 insertions(+), 400 deletions(-) rename cpp/src/detail/{shuffle_vertex_pairs.cuh => shuffle_vertex_pairs.cu} (65%) delete mode 100644 cpp/src/detail/shuffle_vertex_pairs_int32_int32.cu delete mode 100644 cpp/src/detail/shuffle_vertex_pairs_int32_int64.cu delete mode 100644 cpp/src/detail/shuffle_vertex_pairs_int64_int64.cu diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 25e50ed0188..41870cbc92b 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -181,9 +181,7 @@ endif() set(CUGRAPH_SOURCES src/detail/shuffle_vertices.cu - src/detail/shuffle_vertex_pairs_int32_int32.cu - src/detail/shuffle_vertex_pairs_int32_int64.cu - src/detail/shuffle_vertex_pairs_int64_int64.cu + src/detail/shuffle_vertex_pairs.cu src/detail/collect_local_vertex_values.cu src/detail/groupby_and_count.cu src/sampling/random_walks_mg.cu diff --git a/cpp/include/cugraph/mtmg/instance_manager.hpp b/cpp/include/cugraph/mtmg/instance_manager.hpp index 56dc450723d..4e0a5f21d0a 100644 --- a/cpp/include/cugraph/mtmg/instance_manager.hpp +++ b/cpp/include/cugraph/mtmg/instance_manager.hpp @@ -18,7 +18,7 @@ #include -#include +#include #include @@ -45,6 +45,19 @@ class instance_manager_t { { } + ~instance_manager_t() + { + int current_device{}; + RAFT_CUDA_TRY(cudaGetDevice(¤t_device)); + + for (size_t i = 0; i < nccl_comms_.size(); ++i) { + RAFT_CUDA_TRY(cudaSetDevice(device_ids_[i].value())); + RAFT_NCCL_TRY(ncclCommDestroy(*nccl_comms_[i])); + } + + RAFT_CUDA_TRY(cudaSetDevice(current_device)); + } + /** * @brief Get handle * diff --git a/cpp/include/cugraph/mtmg/resource_manager.hpp b/cpp/include/cugraph/mtmg/resource_manager.hpp index 02bddd3ca98..842f7b6f56b 100644 --- a/cpp/include/cugraph/mtmg/resource_manager.hpp +++ b/cpp/include/cugraph/mtmg/resource_manager.hpp @@ -41,6 +41,11 @@ namespace mtmg { * register_local_gpu (or register_remote_gpu once we support a multi-node * configuration) to allocate resources that can be used in the mtmg space. * + * Each GPU in the cluster should be given a unique global rank, an integer + * that will be used to reference the GPU within the resource manager. It + * is recommended that the GPUs be numbered sequentially from 0, although this + * is not required. + * * When we want to execute some graph computations, we need to create an instance for execution. * Based on how big a subset of the desired compute resources is desired, we can allocate some * number of GPUs to the problem (up to the total set of managed resources). @@ -48,7 +53,7 @@ namespace mtmg { * The returned instance can be used to create a graph, execute one or more algorithms, etc. Once * we are done the caller can delete the instance. * - * At the moment, the caller is assumed to be responsible for scheduling use of the resources. + * The caller is assumed to be responsible for scheduling use of the resources. * * For our first release, we will only consider a single node multi-GPU configuration, so the remote * GPU methods are currently disabled via ifdef. @@ -63,27 +68,28 @@ class resource_manager_t { /** * @brief add a local GPU to the resource manager. * - * @param rank The rank to assign to the local GPU - * @param device_id The device_id corresponding to this rank + * @param global_rank The global rank to assign to the local GPU + * @param local_device_id The local device_id corresponding to this rank */ - void register_local_gpu(int rank, rmm::cuda_device_id device_id) + void register_local_gpu(int global_rank, rmm::cuda_device_id local_device_id) { std::lock_guard lock(lock_); - CUGRAPH_EXPECTS(remote_rank_set_.find(rank) == remote_rank_set_.end(), - "cannot register same rank as local and remote"); - CUGRAPH_EXPECTS(local_rank_map_.find(rank) == local_rank_map_.end(), - "cannot register same rank multiple times"); + CUGRAPH_EXPECTS(remote_rank_set_.find(global_rank) == remote_rank_set_.end(), + "cannot register same global_rank as local and remote"); + CUGRAPH_EXPECTS(local_rank_map_.find(global_rank) == local_rank_map_.end(), + "cannot register same global_rank multiple times"); int num_gpus_this_node; RAFT_CUDA_TRY(cudaGetDeviceCount(&num_gpus_this_node)); - CUGRAPH_EXPECTS((device_id.value() >= 0) && (device_id.value() < num_gpus_this_node), - "device id out of range"); + CUGRAPH_EXPECTS( + (local_device_id.value() >= 0) && (local_device_id.value() < num_gpus_this_node), + "local device id out of range"); - local_rank_map_.insert(std::pair(rank, device_id)); + local_rank_map_.insert(std::pair(global_rank, local_device_id)); - RAFT_CUDA_TRY(cudaSetDevice(device_id.value())); + RAFT_CUDA_TRY(cudaSetDevice(local_device_id.value())); // FIXME: There is a bug in the cuda_memory_resource that results in a Hang. // using the pool resource as a work-around. @@ -98,36 +104,36 @@ class resource_manager_t { // (or the constructor of the object) to configure this behavior #if 0 auto per_device_it = per_device_rmm_resources_.insert( - std::pair{rank, std::make_shared()}); + std::pair{global_rank, std::make_shared()}); #else auto const [free, total] = rmm::detail::available_device_memory(); auto const min_alloc = rmm::detail::align_down(std::min(free, total / 6), rmm::detail::CUDA_ALLOCATION_ALIGNMENT); auto per_device_it = per_device_rmm_resources_.insert( - std::pair{rank, + std::pair{global_rank, rmm::mr::make_owning_wrapper( std::make_shared(), min_alloc)}); #endif - rmm::mr::set_per_device_resource(device_id, per_device_it.first->second.get()); + rmm::mr::set_per_device_resource(local_device_id, per_device_it.first->second.get()); } /** * @brief add a remote GPU to the resource manager. * - * @param rank The rank to assign to the remote GPU + * @param global_rank The global rank to assign to the remote GPU */ - void register_remote_gpu(int rank) + void register_remote_gpu(int global_rank) { std::lock_guard lock(lock_); - CUGRAPH_EXPECTS(local_rank_map_.find(rank) == local_rank_map_.end(), - "cannot register same rank as local and remote"); - CUGRAPH_EXPECTS(remote_rank_set_.find(rank) == remote_rank_set_.end(), - "cannot register same rank multiple times"); + CUGRAPH_EXPECTS(local_rank_map_.find(global_rank) == local_rank_map_.end(), + "cannot register same global_rank as local and remote"); + CUGRAPH_EXPECTS(remote_rank_set_.find(global_rank) == remote_rank_set_.end(), + "cannot register same global_rank multiple times"); - remote_rank_set_.insert(rank); + remote_rank_set_.insert(global_rank); } /** @@ -154,18 +160,12 @@ class resource_manager_t { { std::vector local_ranks_to_include; - std::for_each(ranks_to_include.begin(), - ranks_to_include.end(), - [&local_ranks = local_rank_map_, - &remote_ranks = remote_rank_set_, - &local_ranks_to_include](int rank) { - if (local_ranks.find(rank) == local_ranks.end()) { - CUGRAPH_EXPECTS(remote_ranks.find(rank) != remote_ranks.end(), - "requesting inclusion of an invalid rank"); - } else { - local_ranks_to_include.push_back(rank); - } - }); + std::copy_if(ranks_to_include.begin(), + ranks_to_include.end(), + std::back_inserter(local_ranks_to_include), + [&local_ranks = local_rank_map_](int rank) { + return (local_ranks.find(rank) != local_ranks.end()); + }); std::vector> nccl_comms{}; std::vector> handles{}; @@ -182,7 +182,7 @@ class resource_manager_t { int current_device{}; RAFT_CUDA_TRY(cudaGetDevice(¤t_device)); - NCCL_TRY(ncclGroupStart()); + RAFT_NCCL_TRY(ncclGroupStart()); for (size_t i = 0; i < local_ranks_to_include.size(); ++i) { int rank = local_ranks_to_include[i]; @@ -196,12 +196,12 @@ class resource_manager_t { per_device_rmm_resources_.find(rank)->second)); device_ids.push_back(pos->second); - NCCL_TRY( + RAFT_NCCL_TRY( ncclCommInitRank(nccl_comms[i].get(), ranks_to_include.size(), instance_manager_id, rank)); raft::comms::build_comms_nccl_only( handles[i].get(), *nccl_comms[i], ranks_to_include.size(), rank); } - NCCL_TRY(ncclGroupEnd()); + RAFT_NCCL_TRY(ncclGroupEnd()); RAFT_CUDA_TRY(cudaSetDevice(current_device)); std::vector running_threads; diff --git a/cpp/src/detail/shuffle_vertex_pairs.cuh b/cpp/src/detail/shuffle_vertex_pairs.cu similarity index 65% rename from cpp/src/detail/shuffle_vertex_pairs.cuh rename to cpp/src/detail/shuffle_vertex_pairs.cu index 4d036fd85f9..eb81e21c017 100644 --- a/cpp/src/detail/shuffle_vertex_pairs.cuh +++ b/cpp/src/detail/shuffle_vertex_pairs.cu @@ -13,8 +13,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#pragma once - #include #include @@ -358,5 +356,167 @@ shuffle_int_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( minor_comm_size}); } +template std::tuple, + rmm::device_uvector, + std::optional>, + std::optional>, + std::optional>> +shuffle_ext_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( + raft::handle_t const& handle, + rmm::device_uvector&& majors, + rmm::device_uvector&& minors, + std::optional>&& weights, + std::optional>&& edge_ids, + std::optional>&& edge_types); + +template std::tuple, + rmm::device_uvector, + std::optional>, + std::optional>, + std::optional>> +shuffle_ext_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( + raft::handle_t const& handle, + rmm::device_uvector&& majors, + rmm::device_uvector&& minors, + std::optional>&& weights, + std::optional>&& edge_ids, + std::optional>&& edge_types); + +template std::tuple, + rmm::device_uvector, + std::optional>, + std::optional>, + std::optional>> +shuffle_ext_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( + raft::handle_t const& handle, + rmm::device_uvector&& majors, + rmm::device_uvector&& minors, + std::optional>&& weights, + std::optional>&& edge_ids, + std::optional>&& edge_types); + +template std::tuple, + rmm::device_uvector, + std::optional>, + std::optional>, + std::optional>> +shuffle_ext_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( + raft::handle_t const& handle, + rmm::device_uvector&& majors, + rmm::device_uvector&& minors, + std::optional>&& weights, + std::optional>&& edge_ids, + std::optional>&& edge_types); + +template std::tuple, + rmm::device_uvector, + std::optional>, + std::optional>, + std::optional>> +shuffle_ext_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( + raft::handle_t const& handle, + rmm::device_uvector&& majors, + rmm::device_uvector&& minors, + std::optional>&& weights, + std::optional>&& edge_ids, + std::optional>&& edge_types); + +template std::tuple, + rmm::device_uvector, + std::optional>, + std::optional>, + std::optional>> +shuffle_ext_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( + raft::handle_t const& handle, + rmm::device_uvector&& majors, + rmm::device_uvector&& minors, + std::optional>&& weights, + std::optional>&& edge_ids, + std::optional>&& edge_types); + +template std::tuple, + rmm::device_uvector, + std::optional>, + std::optional>, + std::optional>> +shuffle_int_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( + raft::handle_t const& handle, + rmm::device_uvector&& majors, + rmm::device_uvector&& minors, + std::optional>&& weights, + std::optional>&& edge_ids, + std::optional>&& edge_types, + std::vector const& vertex_partition_range_lasts); + +template std::tuple, + rmm::device_uvector, + std::optional>, + std::optional>, + std::optional>> +shuffle_int_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( + raft::handle_t const& handle, + rmm::device_uvector&& majors, + rmm::device_uvector&& minors, + std::optional>&& weights, + std::optional>&& edge_ids, + std::optional>&& edge_types, + std::vector const& vertex_partition_range_lasts); + +template std::tuple, + rmm::device_uvector, + std::optional>, + std::optional>, + std::optional>> +shuffle_int_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( + raft::handle_t const& handle, + rmm::device_uvector&& majors, + rmm::device_uvector&& minors, + std::optional>&& weights, + std::optional>&& edge_ids, + std::optional>&& edge_types, + std::vector const& vertex_partition_range_lasts); + +template std::tuple, + rmm::device_uvector, + std::optional>, + std::optional>, + std::optional>> +shuffle_int_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( + raft::handle_t const& handle, + rmm::device_uvector&& majors, + rmm::device_uvector&& minors, + std::optional>&& weights, + std::optional>&& edge_ids, + std::optional>&& edge_types, + std::vector const& vertex_partition_range_lasts); + +template std::tuple, + rmm::device_uvector, + std::optional>, + std::optional>, + std::optional>> +shuffle_int_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( + raft::handle_t const& handle, + rmm::device_uvector&& majors, + rmm::device_uvector&& minors, + std::optional>&& weights, + std::optional>&& edge_ids, + std::optional>&& edge_types, + std::vector const& vertex_partition_range_lasts); + +template std::tuple, + rmm::device_uvector, + std::optional>, + std::optional>, + std::optional>> +shuffle_int_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( + raft::handle_t const& handle, + rmm::device_uvector&& majors, + rmm::device_uvector&& minors, + std::optional>&& weights, + std::optional>&& edge_ids, + std::optional>&& edge_types, + std::vector const& vertex_partition_range_lasts); + } // namespace detail } // namespace cugraph diff --git a/cpp/src/detail/shuffle_vertex_pairs_int32_int32.cu b/cpp/src/detail/shuffle_vertex_pairs_int32_int32.cu deleted file mode 100644 index e88fb7d5a91..00000000000 --- a/cpp/src/detail/shuffle_vertex_pairs_int32_int32.cu +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Copyright (c) 2021-2023, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include - -namespace cugraph { -namespace detail { - -template std::tuple, - rmm::device_uvector, - std::optional>, - std::optional>, - std::optional>> -shuffle_ext_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( - raft::handle_t const& handle, - rmm::device_uvector&& majors, - rmm::device_uvector&& minors, - std::optional>&& weights, - std::optional>&& edge_ids, - std::optional>&& edge_types); - -template std::tuple, - rmm::device_uvector, - std::optional>, - std::optional>, - std::optional>> -shuffle_ext_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( - raft::handle_t const& handle, - rmm::device_uvector&& majors, - rmm::device_uvector&& minors, - std::optional>&& weights, - std::optional>&& edge_ids, - std::optional>&& edge_types); - -template std::tuple, - rmm::device_uvector, - std::optional>, - std::optional>, - std::optional>> -shuffle_int_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( - raft::handle_t const& handle, - rmm::device_uvector&& majors, - rmm::device_uvector&& minors, - std::optional>&& weights, - std::optional>&& edge_ids, - std::optional>&& edge_types, - std::vector const& vertex_partition_range_lasts); - -template std::tuple, - rmm::device_uvector, - std::optional>, - std::optional>, - std::optional>> -shuffle_int_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( - raft::handle_t const& handle, - rmm::device_uvector&& majors, - rmm::device_uvector&& minors, - std::optional>&& weights, - std::optional>&& edge_ids, - std::optional>&& edge_types, - std::vector const& vertex_partition_range_lasts); -} // namespace detail -} // namespace cugraph diff --git a/cpp/src/detail/shuffle_vertex_pairs_int32_int64.cu b/cpp/src/detail/shuffle_vertex_pairs_int32_int64.cu deleted file mode 100644 index b6a92bd0db9..00000000000 --- a/cpp/src/detail/shuffle_vertex_pairs_int32_int64.cu +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright (c) 2021-2023, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include - -namespace cugraph { -namespace detail { - -template std::tuple, - rmm::device_uvector, - std::optional>, - std::optional>, - std::optional>> -shuffle_ext_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( - raft::handle_t const& handle, - rmm::device_uvector&& majors, - rmm::device_uvector&& minors, - std::optional>&& weights, - std::optional>&& edge_ids, - std::optional>&& edge_types); - -template std::tuple, - rmm::device_uvector, - std::optional>, - std::optional>, - std::optional>> -shuffle_ext_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( - raft::handle_t const& handle, - rmm::device_uvector&& majors, - rmm::device_uvector&& minors, - std::optional>&& weights, - std::optional>&& edge_ids, - std::optional>&& edge_types); - -template std::tuple, - rmm::device_uvector, - std::optional>, - std::optional>, - std::optional>> -shuffle_int_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( - raft::handle_t const& handle, - rmm::device_uvector&& majors, - rmm::device_uvector&& minors, - std::optional>&& weights, - std::optional>&& edge_ids, - std::optional>&& edge_types, - std::vector const& vertex_partition_range_lasts); - -template std::tuple, - rmm::device_uvector, - std::optional>, - std::optional>, - std::optional>> -shuffle_int_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( - raft::handle_t const& handle, - rmm::device_uvector&& majors, - rmm::device_uvector&& minors, - std::optional>&& weights, - std::optional>&& edge_ids, - std::optional>&& edge_types, - std::vector const& vertex_partition_range_lasts); - -} // namespace detail -} // namespace cugraph diff --git a/cpp/src/detail/shuffle_vertex_pairs_int64_int64.cu b/cpp/src/detail/shuffle_vertex_pairs_int64_int64.cu deleted file mode 100644 index 5b2b271456a..00000000000 --- a/cpp/src/detail/shuffle_vertex_pairs_int64_int64.cu +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright (c) 2021-2023, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include - -namespace cugraph { -namespace detail { - -template std::tuple, - rmm::device_uvector, - std::optional>, - std::optional>, - std::optional>> -shuffle_ext_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( - raft::handle_t const& handle, - rmm::device_uvector&& majors, - rmm::device_uvector&& minors, - std::optional>&& weights, - std::optional>&& edge_ids, - std::optional>&& edge_types); - -template std::tuple, - rmm::device_uvector, - std::optional>, - std::optional>, - std::optional>> -shuffle_ext_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( - raft::handle_t const& handle, - rmm::device_uvector&& majors, - rmm::device_uvector&& minors, - std::optional>&& weights, - std::optional>&& edge_ids, - std::optional>&& edge_types); - -template std::tuple, - rmm::device_uvector, - std::optional>, - std::optional>, - std::optional>> -shuffle_int_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( - raft::handle_t const& handle, - rmm::device_uvector&& majors, - rmm::device_uvector&& minors, - std::optional>&& weights, - std::optional>&& edge_ids, - std::optional>&& edge_types, - std::vector const& vertex_partition_range_lasts); - -template std::tuple, - rmm::device_uvector, - std::optional>, - std::optional>, - std::optional>> -shuffle_int_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( - raft::handle_t const& handle, - rmm::device_uvector&& majors, - rmm::device_uvector&& minors, - std::optional>&& weights, - std::optional>&& edge_ids, - std::optional>&& edge_types, - std::vector const& vertex_partition_range_lasts); - -} // namespace detail -} // namespace cugraph diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index 45c59149f7f..c13ab05aa53 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -427,9 +427,11 @@ target_link_libraries(MTMG_TEST UCP::UCP ) -ConfigureTest(MTMG_MULTINODE_TEST mtmg/multi_node_threaded_test.cu) + # FIXME... should use MG library +ConfigureTest(MTMG_MULTINODE_TEST mtmg/multi_node_threaded_test.cu utilities/mg_utilities.cpp) target_link_libraries(MTMG_MULTINODE_TEST PRIVATE + MPI::MPI_CXX UCP::UCP ) diff --git a/cpp/tests/mtmg/multi_node_threaded_test.cu b/cpp/tests/mtmg/multi_node_threaded_test.cu index b0f6d9e95c9..0ac1813c9db 100644 --- a/cpp/tests/mtmg/multi_node_threaded_test.cu +++ b/cpp/tests/mtmg/multi_node_threaded_test.cu @@ -29,6 +29,7 @@ #include #include +#include #include #include @@ -49,10 +50,10 @@ struct Multithreaded_Usecase { bool check_correctness{true}; }; -std::string g_comms_dir_name{}; -int g_node_rank{}; -int g_num_nodes{}; -int g_execution_id{0}; +// Global variable defining resource manager +static cugraph::mtmg::resource_manager_t g_resource_manager{}; +static int g_node_rank{-1}; +static int g_num_nodes{-1}; template class Tests_Multithreaded @@ -60,9 +61,6 @@ class Tests_Multithreaded public: Tests_Multithreaded() {} - static void SetUpTestCase() {} - static void TearDownTestCase() {} - virtual void SetUp() {} virtual void TearDown() {} @@ -77,32 +75,6 @@ class Tests_Multithreaded return gpu_list; } - void wait_for_directory(std::string directory_name, int max_tries = 60) - { - while (max_tries > 0) { - if (std::filesystem::is_directory(directory_name)) break; - sleep(1); - --max_tries; - } - - CUGRAPH_EXPECTS(std::filesystem::is_directory(directory_name), - "Timed out waiting for directory to be created"); - } - - std::ifstream wait_for_file(std::string file_name, int max_tries = 60) - { - while (max_tries > 0) { - if (std::filesystem::is_regular_file(file_name)) break; - sleep(1); - --max_tries; - } - - CUGRAPH_EXPECTS(std::filesystem::is_regular_file(file_name), - "Timed out waiting for file to be created"); - - return std::ifstream(file_name, std::ios::binary); - } - template (&instance_manager_id), - sizeof(instance_manager_id)); - instance_manager_file.close(); - } else { - // Wait for node rank 0 to create directory - wait_for_directory(comms_dir_name.str()); - - auto instance_manager_file = wait_for_file(comms_dir_name.str() + "/instance_manager"); - instance_manager_file.read(reinterpret_cast(&instance_manager_id), - sizeof(instance_manager_id)); - instance_manager_file.close(); - } - - // Create a file for this process (rank) to identify how many GPUs - std::ostringstream filename_creator; + ncclUniqueId instance_manager_id{}; - filename_creator << comms_dir_name.str() << "/gpu_count_" << g_node_rank; - { - std::ofstream num_gpus_file(filename_creator.str(), std::ios::binary); - int num_gpus_on_this_node = static_cast(gpu_list.size()); - num_gpus_file.write(reinterpret_cast(&num_gpus_on_this_node), sizeof(int)); - num_gpus_file.close(); - } - - cugraph::mtmg::resource_manager_t resource_manager; - int node_rank{0}; + if (g_node_rank == 0) RAFT_NCCL_TRY(ncclGetUniqueId(&instance_manager_id)); - for (int i = 0; i < g_num_nodes; ++i) { - if (i != g_node_rank) { - filename_creator.str(""); - filename_creator << comms_dir_name.str() << "/gpu_count_" << i; - auto num_gpus_file = wait_for_file(filename_creator.str()); - int num_gpus_this_node{0}; - num_gpus_file.read(reinterpret_cast(&num_gpus_this_node), sizeof(int)); - num_gpus_file.close(); - - for (int j = 0; j < num_gpus_this_node; ++j) { - resource_manager.register_remote_gpu(node_rank++); - } - } else { - std::for_each( - gpu_list.begin(), gpu_list.end(), [&resource_manager, &node_rank](int gpu_id) { - resource_manager.register_local_gpu(node_rank++, rmm::cuda_device_id{gpu_id}); - }); - } - } + RAFT_MPI_TRY( + MPI_Bcast(&instance_manager_id, sizeof(instance_manager_id), MPI_CHAR, 0, MPI_COMM_WORLD)); - auto instance_manager = resource_manager.create_instance_manager( - resource_manager.registered_ranks(), instance_manager_id); + auto instance_manager = g_resource_manager.create_instance_manager( + g_resource_manager.registered_ranks(), instance_manager_id); cugraph::mtmg::edgelist_t edgelist; cugraph::mtmg::graph_t graph; @@ -545,35 +458,17 @@ INSTANTIATE_TEST_SUITE_P( ::testing::Values(Multithreaded_Usecase{false, false}, Multithreaded_Usecase{true, false}), ::testing::Values(cugraph::test::Rmat_Usecase(10, 16, 0.57, 0.19, 0.19, 0, false, false)))); -inline auto local_parse_test_options(int argc, char** argv) -{ - try { - cxxopts::Options options(argv[0], " - cuGraph tests command line options"); - options.allow_unrecognised_options().add_options()( - "rmm_mode", "RMM allocation mode", cxxopts::value()->default_value("pool"))( - "perf", "enalbe performance measurements", cxxopts::value()->default_value("false"))( - "rmat_scale", "override the hardcoded R-mat scale", cxxopts::value())( - "rmat_edge_factor", "override the hardcoded R-mat edge factor", cxxopts::value())( - "node_rank", "rank of this process on multi-node configuration", cxxopts::value())( - "num_nodes", "number of nodes in this multi-node configuration", cxxopts::value())( - "comms_dir_name", - "directory where comms data is stored (shared)", - cxxopts::value())( - "test_file_name", "override the hardcoded test filename", cxxopts::value()); - - return options.parse(argc, argv); - } catch (const cxxopts::OptionException& e) { - CUGRAPH_FAIL("Error parsing command line options"); - } -} - // // Need to customize the test configuration to support multi-node comms not using MPI // int main(int argc, char** argv) { + cugraph::test::initialize_mpi(argc, argv); + auto comm_rank = cugraph::test::query_mpi_comm_world_rank(); + auto comm_size = cugraph::test::query_mpi_comm_world_size(); + ::testing::InitGoogleTest(&argc, argv); - auto const cmd_opts = local_parse_test_options(argc, argv); + auto const cmd_opts = parse_test_options(argc, argv); auto const rmm_mode = cmd_opts["rmm_mode"].as(); auto resource = cugraph::test::create_memory_resource(rmm_mode); rmm::mr::set_current_device_resource(resource.get()); @@ -590,14 +485,34 @@ int main(int argc, char** argv) ? std::make_optional(cmd_opts["test_file_name"].as()) : std::nullopt; - g_comms_dir_name = (cmd_opts.count("comms_dir_name") > 0) - ? cmd_opts["comms_dir_name"].as() - : "COMMS_DIR"; + // + // Set global values for the test. Need to know the rank of this process, + // the comm size, number of GPUs per node, and the NCCL Id for rank 0. + // + int num_gpus_this_node{-1}; + std::vector num_gpus_per_node{}; + + g_node_rank = comm_rank; + g_num_nodes = comm_size; + + num_gpus_per_node.resize(comm_size); - CUGRAPH_EXPECTS(cmd_opts.count("node_rank") > 0, "node_rank not specified"); - CUGRAPH_EXPECTS(cmd_opts.count("num_nodes") > 0, "num_nodes not specified"); - g_node_rank = cmd_opts["node_rank"].as(); - g_num_nodes = cmd_opts["num_nodes"].as(); + RAFT_CUDA_TRY(cudaGetDeviceCount(&num_gpus_this_node)); + RAFT_MPI_TRY(MPI_Allgather( + &num_gpus_this_node, 1, MPI_INT, num_gpus_per_node.data(), 1, MPI_INT, MPI_COMM_WORLD)); + + int node_rank{0}; + + for (int i = 0; i < comm_size; ++i) { + for (int j = 0; j < num_gpus_per_node[i]; ++j) { + if (i != comm_rank) + g_resource_manager.register_remote_gpu(node_rank++); + else + g_resource_manager.register_local_gpu(node_rank++, rmm::cuda_device_id{j}); + } + } - return RUN_ALL_TESTS(); + auto result = RUN_ALL_TESTS(); + cugraph::test::finalize_mpi(); + return result; } From 7502ed57eefe5d0424633a150f5ac26bdc73362a Mon Sep 17 00:00:00 2001 From: Charles Hastings Date: Thu, 2 Nov 2023 14:20:48 -0700 Subject: [PATCH 14/15] remove inadvertently added blank line --- cpp/include/cugraph/utilities/shuffle_comm.cuh | 1 - 1 file changed, 1 deletion(-) diff --git a/cpp/include/cugraph/utilities/shuffle_comm.cuh b/cpp/include/cugraph/utilities/shuffle_comm.cuh index f338b676138..6a260144324 100644 --- a/cpp/include/cugraph/utilities/shuffle_comm.cuh +++ b/cpp/include/cugraph/utilities/shuffle_comm.cuh @@ -828,7 +828,6 @@ auto shuffle_values(raft::comms::comms_t const& comm, std::vector rx_counts{}; std::vector rx_offsets{}; std::vector rx_src_ranks{}; - std::tie(tx_counts, tx_offsets, tx_dst_ranks, rx_counts, rx_offsets, rx_src_ranks) = detail::compute_tx_rx_counts_offsets_ranks(comm, d_tx_value_counts, stream_view); From 63d157fb0fadc965187b21a26f057d2c04b92561 Mon Sep 17 00:00:00 2001 From: Charles Hastings Date: Fri, 3 Nov 2023 11:31:52 -0700 Subject: [PATCH 15/15] add some FIXME comments, move multi-node MTMG test into MG test block in cmake to ensure MPI is available --- cpp/include/cugraph/mtmg/instance_manager.hpp | 3 +++ cpp/include/cugraph/mtmg/resource_manager.hpp | 2 ++ cpp/tests/CMakeLists.txt | 18 ++++++++++-------- cpp/tests/mtmg/multi_node_threaded_test.cu | 2 -- 4 files changed, 15 insertions(+), 10 deletions(-) diff --git a/cpp/include/cugraph/mtmg/instance_manager.hpp b/cpp/include/cugraph/mtmg/instance_manager.hpp index 4e0a5f21d0a..f819a5a0abe 100644 --- a/cpp/include/cugraph/mtmg/instance_manager.hpp +++ b/cpp/include/cugraph/mtmg/instance_manager.hpp @@ -98,6 +98,9 @@ class instance_manager_t { // (or no) GPUs, so mapping rank to a handle might be a challenge // std::vector> raft_handle_{}; + + // FIXME: Explore what RAFT changes might be desired to allow the ncclComm_t + // to be managed by RAFT instead of cugraph::mtmg std::vector> nccl_comms_{}; std::vector device_ids_{}; diff --git a/cpp/include/cugraph/mtmg/resource_manager.hpp b/cpp/include/cugraph/mtmg/resource_manager.hpp index 842f7b6f56b..127944cf7ba 100644 --- a/cpp/include/cugraph/mtmg/resource_manager.hpp +++ b/cpp/include/cugraph/mtmg/resource_manager.hpp @@ -167,6 +167,8 @@ class resource_manager_t { return (local_ranks.find(rank) != local_ranks.end()); }); + // FIXME: Explore what RAFT changes might be desired to allow the ncclComm_t + // to be managed by RAFT instead of cugraph::mtmg std::vector> nccl_comms{}; std::vector> handles{}; std::vector device_ids{}; diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index 0a44b393d23..2f69cf9cb0d 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -423,14 +423,6 @@ target_link_libraries(MTMG_TEST UCP::UCP ) - # FIXME... should use MG library -ConfigureTest(MTMG_MULTINODE_TEST mtmg/multi_node_threaded_test.cu utilities/mg_utilities.cpp) -target_link_libraries(MTMG_MULTINODE_TEST - PRIVATE - MPI::MPI_CXX - UCP::UCP - ) - ################################################################################################### # - MG tests -------------------------------------------------------------------------------------- @@ -688,6 +680,16 @@ if(BUILD_CUGRAPH_MG_TESTS) ConfigureCTestMG(MG_CAPI_TWO_HOP_NEIGHBORS_TEST c_api/mg_two_hop_neighbors_test.c) rapids_test_install_relocatable(INSTALL_COMPONENT_SET testing_mg DESTINATION bin/gtests/libcugraph_mg) + + ############################################################################################### + # - Multi-node MTMG tests --------------------------------------------------------------------- + ConfigureTest(MTMG_MULTINODE_TEST mtmg/multi_node_threaded_test.cu utilities/mg_utilities.cpp) + target_link_libraries(MTMG_MULTINODE_TEST + PRIVATE + cugraphmgtestutil + UCP::UCP + ) + endif() ################################################################################################### diff --git a/cpp/tests/mtmg/multi_node_threaded_test.cu b/cpp/tests/mtmg/multi_node_threaded_test.cu index 0ac1813c9db..e5a7de07781 100644 --- a/cpp/tests/mtmg/multi_node_threaded_test.cu +++ b/cpp/tests/mtmg/multi_node_threaded_test.cu @@ -36,8 +36,6 @@ #include -#include - #include #include #include