Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support non p2p configuration when initializing the comms #4543

Merged
merged 11 commits into from
Jul 31, 2024
2 changes: 0 additions & 2 deletions python/cugraph/cugraph/dask/comms/comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,6 @@ def initialize(comms=None, p2p=False, prows=None, pcols=None, partition_type=1):
__default_handle = None
if comms is None:
# Initialize communicator
if not p2p:
raise Exception("Set p2p to True for running mnmg algorithms")
__instance = raftComms(comms_p2p=p2p)
__instance.init()
# Initialize subcommunicator
Expand Down
8 changes: 6 additions & 2 deletions python/cugraph/cugraph/testing/mg_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2022-2023, NVIDIA CORPORATION.
# Copyright (c) 2022-2024, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand Down Expand Up @@ -35,6 +35,7 @@ def start_dask_client(
jit_unspill=False,
worker_class=None,
device_memory_limit=0.8,
p2p=True,
):
"""
Creates a new dask client, and possibly also a cluster, and returns them as
Expand Down Expand Up @@ -95,6 +96,9 @@ def start_dask_client(
dask_cuda.LocalCUDACluster for details. This parameter is ignored if
the env var SCHEDULER_FILE is set which implies the dask cluster has
already been created.

p2p : bool, optional (default=True)
Initialize UCX endpoints if True.
"""
dask_scheduler_file = os.environ.get("SCHEDULER_FILE")
dask_local_directory = os.getenv("DASK_LOCAL_DIRECTORY")
Expand Down Expand Up @@ -164,7 +168,7 @@ def start_dask_client(
# FIXME: use proper logging, INFO or DEBUG level
print("\nDask client/cluster created using LocalCUDACluster")

Comms.initialize(p2p=True)
Comms.initialize(p2p=p2p)

return (client, cluster)

Expand Down
17 changes: 16 additions & 1 deletion python/cugraph/cugraph/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2021-2023, NVIDIA CORPORATION.
# Copyright (c) 2021-2024, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand Down Expand Up @@ -52,6 +52,21 @@ def dask_client():
stop_dask_client(dask_client, dask_cluster)


# FIXME: Add tests leveraging this fixture
@pytest.fixture(scope="module")
def dask_client_non_p2p():
# start_dask_client will check for the SCHEDULER_FILE and
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we know if we'd like to run tests that don't create UCX endpoints by hardcoding a different fixture, or would it be better to pass a CLI option, or something else? I think it's okay to add this fixture now if we're going to use it, but I wonder if we should think of an alternative if something like a CLI option to pytest would be better.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should eliminate the p2p feature entirely. IIRC, the only reason we required the P2P to be configured was because at the time we initially implemented it was required to properly set up the NCCL communication that the C++ library uses. NCCL has been improved so that the P2P feature is no longer required to make this configuration. Joseph's changes here and his testing have validated that belief.

I'd be inclined to merge this PR and create a new issue to address dropping the P2P configuration option and convert all of the tests to configure with the non-P2P configuration.

# DASK_WORKER_DEVICES env vars and use them when creating a client if
# set. start_dask_client will also initialize the Comms singleton.
dask_client, dask_cluster = start_dask_client(
worker_class=IncreasedCloseTimeoutNanny, p2p=False
)

yield dask_client

stop_dask_client(dask_client, dask_cluster)


@pytest.fixture(scope="module")
def scratch_dir():
# This should always be set if doing MG testing, since temporary
Expand Down
Loading