Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MTMG multi node #3932

Merged
merged 19 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,9 @@ endif()

set(CUGRAPH_SOURCES
src/detail/shuffle_vertices.cu
src/detail/shuffle_vertex_pairs.cu
src/detail/shuffle_vertex_pairs_int32_int32.cu
src/detail/shuffle_vertex_pairs_int32_int64.cu
src/detail/shuffle_vertex_pairs_int64_int64.cu
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the rationale behind this? Is this to cut compile time?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be more consistent on this to keep our codebase maintainable.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this was to cut compile time. It was the one file that was being constantly recompiled while I was testing this that took a long time.

I have been picking these up when I see them to split them. I think ultimately we need to re-evaluate how we are doing type dispatching so that we can generally improve compile time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah... we should re-evaluate this, but AFAIK, this isn't the file with the longest compile time, and just splitting this file looks a bit odd. No problem in doing this for local development, but I guess pushing this change to the main repo (without really agreeing on how to tackle compile time issue) sounds a bit premature.

src/detail/collect_local_vertex_values.cu
src/detail/groupby_and_count.cu
src/sampling/random_walks_mg.cu
Expand Down
8 changes: 0 additions & 8 deletions cpp/include/cugraph/mtmg/handle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,6 @@ class handle_t {
*/
int get_size() const { return raft_handle_.get_comms().get_size(); }

/**
* @brief Get number of local gpus
*
* @return number of local gpus
*/
// FIXME: wrong for multi-node
int get_local_size() const { return raft_handle_.get_comms().get_size(); }

/**
* @brief Get gpu rank
*
Expand Down
9 changes: 3 additions & 6 deletions cpp/include/cugraph/mtmg/instance_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,11 @@ class instance_manager_t {
*/
instance_manager_t(std::vector<std::unique_ptr<raft::handle_t>>&& handles,
std::vector<std::unique_ptr<ncclComm_t>>&& nccl_comms,
std::vector<rmm::cuda_device_id>&& device_ids,
int local_gpu_count)
std::vector<rmm::cuda_device_id>&& device_ids)
: thread_counter_{0},
raft_handle_{std::move(handles)},
nccl_comms_{std::move(nccl_comms)},
device_ids_{std::move(device_ids)},
local_gpu_count_{local_gpu_count}
device_ids_{std::move(device_ids)}
{
}

Expand Down Expand Up @@ -79,7 +77,7 @@ class instance_manager_t {
/**
* @brief Number of local GPUs in the instance
*/
int get_local_gpu_count() { return local_gpu_count_; }
int get_local_gpu_count() { return static_cast<int>(raft_handle_.size()); }

private:
// FIXME: Should this be an std::map<> where the key is the rank?
Expand All @@ -89,7 +87,6 @@ class instance_manager_t {
std::vector<std::unique_ptr<raft::handle_t>> raft_handle_{};
std::vector<std::unique_ptr<ncclComm_t>> nccl_comms_{};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we directly managing NCCL? Shouldn't this be implementation detail of raft?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can look at updating raft for this. I'll talk with Corey.

std::vector<rmm::cuda_device_id> device_ids_{};
int local_gpu_count_{};

std::atomic<int> thread_counter_{0};
};
Expand Down
96 changes: 65 additions & 31 deletions cpp/include/cugraph/mtmg/resource_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ class resource_manager_t {
{
std::lock_guard<std::mutex> lock(lock_);

CUGRAPH_EXPECTS(remote_rank_set_.find(rank) == remote_rank_set_.end(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to double check, is rank still local? I assume remote_rank_set_ stores global ranks, and can we query remote_rank_set_ with a local rank?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or from now on, rank is global and device_id might serve as local GPU rank?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my implementation here (trying to somewhat mimic MPI) rank is global. device_id is local.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah.. in that case, we need to review the documentation.

  /**
   * @brief add a local GPU to the resource manager.
   *
   * @param rank       The rank to assign to the local GPU
   * @param device_id  The device_id corresponding to this rank
   */

It is not clear to figure out that rank is global and device_id is local just seeing the documentation.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the parameter names and documentation to be global_rank and local_device_id. Added a small improvement in the class documentation to identify the global rank concept.

"cannot register same rank as local and remote");
CUGRAPH_EXPECTS(local_rank_map_.find(rank) == local_rank_map_.end(),
"cannot register same rank multiple times");

Expand All @@ -89,6 +91,9 @@ class resource_manager_t {
// There is a deprecated environment variable: NCCL_LAUNCH_MODE=GROUP
// which should temporarily work around this problem.
//
// Further NOTE: multi-node requires the NCCL_LAUNCH_MODE=GROUP feature
// to be enabled even with the pool memory resource.
//
// Ultimately there should be some RMM parameters passed into this function
// (or the constructor of the object) to configure this behavior
#if 0
Expand All @@ -108,6 +113,23 @@ class resource_manager_t {
rmm::mr::set_per_device_resource(device_id, per_device_it.first->second.get());
}

/**
* @brief add a remote GPU to the resource manager.
*
* @param rank The rank to assign to the remote GPU
*/
void register_remote_gpu(int rank)
{
std::lock_guard<std::mutex> lock(lock_);

CUGRAPH_EXPECTS(local_rank_map_.find(rank) == local_rank_map_.end(),
"cannot register same rank as local and remote");
CUGRAPH_EXPECTS(remote_rank_set_.find(rank) == remote_rank_set_.end(),
"cannot register same rank multiple times");

remote_rank_set_.insert(rank);
}

/**
* @brief Create an instance using a subset of the registered resources
*
Expand All @@ -130,29 +152,40 @@ class resource_manager_t {
ncclUniqueId instance_manager_id,
size_t n_streams = 16) const
{
std::for_each(
ranks_to_include.begin(), ranks_to_include.end(), [local_ranks = local_rank_map_](int rank) {
CUGRAPH_EXPECTS(local_ranks.find(rank) != local_ranks.end(),
"requesting inclusion of an invalid rank");
});
std::vector<int> local_ranks_to_include;

std::for_each(ranks_to_include.begin(),
ranks_to_include.end(),
[&local_ranks = local_rank_map_,
&remote_ranks = remote_rank_set_,
&local_ranks_to_include](int rank) {
if (local_ranks.find(rank) == local_ranks.end()) {
CUGRAPH_EXPECTS(remote_ranks.find(rank) != remote_ranks.end(),
"requesting inclusion of an invalid rank");
} else {
local_ranks_to_include.push_back(rank);
}
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think using std::copy_if with std::back_inserter better documents the intention.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will look at this change.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in next push.


std::vector<std::unique_ptr<ncclComm_t>> nccl_comms{};
std::vector<std::unique_ptr<raft::handle_t>> handles{};
std::vector<rmm::cuda_device_id> device_ids{};

nccl_comms.reserve(ranks_to_include.size());
handles.reserve(ranks_to_include.size());
device_ids.reserve(ranks_to_include.size());
nccl_comms.reserve(local_ranks_to_include.size());
handles.reserve(local_ranks_to_include.size());
device_ids.reserve(local_ranks_to_include.size());

// FIXME: not quite right for multi-node
auto gpu_row_comm_size = static_cast<int>(sqrt(static_cast<double>(ranks_to_include.size())));
while (ranks_to_include.size() % gpu_row_comm_size != 0) {
--gpu_row_comm_size;
}

// FIXME: not quite right for multi-node
for (size_t i = 0; i < ranks_to_include.size(); ++i) {
int rank = ranks_to_include[i];
int current_device{};
RAFT_CUDA_TRY(cudaGetDevice(&current_device));
NCCL_TRY(ncclGroupStart());

for (size_t i = 0; i < local_ranks_to_include.size(); ++i) {
int rank = local_ranks_to_include[i];
auto pos = local_rank_map_.find(rank);
RAFT_CUDA_TRY(cudaSetDevice(pos->second.value()));

Expand All @@ -162,36 +195,37 @@ class resource_manager_t {
std::make_shared<rmm::cuda_stream_pool>(n_streams),
per_device_rmm_resources_.find(rank)->second));
device_ids.push_back(pos->second);

NCCL_TRY(
ncclCommInitRank(nccl_comms[i].get(), ranks_to_include.size(), instance_manager_id, rank));
raft::comms::build_comms_nccl_only(
handles[i].get(), *nccl_comms[i], ranks_to_include.size(), rank);
}
NCCL_TRY(ncclGroupEnd());
RAFT_CUDA_TRY(cudaSetDevice(current_device));

std::vector<std::thread> running_threads;

for (size_t i = 0; i < ranks_to_include.size(); ++i) {
for (size_t i = 0; i < local_ranks_to_include.size(); ++i) {
running_threads.emplace_back([instance_manager_id,
idx = i,
gpu_row_comm_size,
comm_size = ranks_to_include.size(),
&ranks_to_include,
&local_rank_map = local_rank_map_,
&local_ranks_to_include,
&device_ids,
&nccl_comms,
&handles]() {
int rank = ranks_to_include[idx];
auto pos = local_rank_map.find(rank);
RAFT_CUDA_TRY(cudaSetDevice(pos->second.value()));

NCCL_TRY(ncclCommInitRank(nccl_comms[idx].get(), comm_size, instance_manager_id, rank));

raft::comms::build_comms_nccl_only(handles[idx].get(), *nccl_comms[idx], comm_size, rank);
int rank = local_ranks_to_include[idx];
RAFT_CUDA_TRY(cudaSetDevice(device_ids[idx].value()));

cugraph::partition_manager::init_subcomm(*handles[idx], gpu_row_comm_size);
});
}

std::for_each(running_threads.begin(), running_threads.end(), [](auto& t) { t.join(); });

// FIXME: Update for multi-node
return std::make_unique<instance_manager_t>(
std::move(handles), std::move(nccl_comms), std::move(device_ids), ranks_to_include.size());
std::move(handles), std::move(nccl_comms), std::move(device_ids));
}

/**
Expand All @@ -203,24 +237,24 @@ class resource_manager_t {
{
std::lock_guard<std::mutex> lock(lock_);

//
// C++20 mechanism:
// return std::vector<int>{ std::views::keys(local_rank_map_).begin(),
// std::views::keys(local_rank_map_).end() };
// Would need a bit more complicated to handle remote_rank_map_ also
//
std::vector<int> registered_ranks(local_rank_map_.size());
std::vector<int> registered_ranks(local_rank_map_.size() + remote_rank_set_.size());
std::transform(
local_rank_map_.begin(), local_rank_map_.end(), registered_ranks.begin(), [](auto pair) {
return pair.first;
});

std::copy(remote_rank_set_.begin(),
remote_rank_set_.end(),
registered_ranks.begin() + local_rank_map_.size());

std::sort(registered_ranks.begin(), registered_ranks.end());
return registered_ranks;
}

private:
mutable std::mutex lock_{};
std::map<int, rmm::cuda_device_id> local_rank_map_{};
std::set<int> remote_rank_set_{};
std::map<int, std::shared_ptr<rmm::mr::device_memory_resource>> per_device_rmm_resources_{};
};

Expand Down
1 change: 1 addition & 0 deletions cpp/include/cugraph/utilities/shuffle_comm.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -828,6 +828,7 @@ auto shuffle_values(raft::comms::comms_t const& comm,
std::vector<size_t> rx_counts{};
std::vector<size_t> rx_offsets{};
std::vector<int> rx_src_ranks{};

std::tie(tx_counts, tx_offsets, tx_dst_ranks, rx_counts, rx_offsets, rx_src_ranks) =
detail::compute_tx_rx_counts_offsets_ranks(comm, d_tx_value_counts, stream_view);

Expand Down
Loading