From a395fa188aa42c934b58f373c14c0f2e2bfc293f Mon Sep 17 00:00:00 2001 From: jnke2016 Date: Thu, 18 Jul 2024 04:32:57 -0700 Subject: [PATCH 1/6] remove p2p as a requirement when initializing the comms --- python/cugraph/cugraph/dask/comms/comms.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/python/cugraph/cugraph/dask/comms/comms.py b/python/cugraph/cugraph/dask/comms/comms.py index 5499b13af03..1e1c28fbbee 100644 --- a/python/cugraph/cugraph/dask/comms/comms.py +++ b/python/cugraph/cugraph/dask/comms/comms.py @@ -146,8 +146,6 @@ def initialize(comms=None, p2p=False, prows=None, pcols=None, partition_type=1): __default_handle = None if comms is None: # Initialize communicator - if not p2p: - raise Exception("Set p2p to True for running mnmg algorithms") __instance = raftComms(comms_p2p=p2p) __instance.init() # Initialize subcommunicator From 17355f836dfd3974710565b8ffcf1b45539a14cc Mon Sep 17 00:00:00 2001 From: jnke2016 Date: Thu, 18 Jul 2024 04:44:26 -0700 Subject: [PATCH 2/6] add p2p as an argument when starting the client --- python/cugraph/cugraph/testing/mg_utils.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/python/cugraph/cugraph/testing/mg_utils.py b/python/cugraph/cugraph/testing/mg_utils.py index 32854652f05..4f60e5f53e2 100644 --- a/python/cugraph/cugraph/testing/mg_utils.py +++ b/python/cugraph/cugraph/testing/mg_utils.py @@ -35,6 +35,7 @@ def start_dask_client( jit_unspill=False, worker_class=None, device_memory_limit=0.8, + p2p=True ): """ Creates a new dask client, and possibly also a cluster, and returns them as @@ -95,6 +96,9 @@ def start_dask_client( dask_cuda.LocalCUDACluster for details. This parameter is ignored if the env var SCHEDULER_FILE is set which implies the dask cluster has already been created. + + p2p : bool, optional (default=False) + Initialize UCX endpoints if True. """ dask_scheduler_file = os.environ.get("SCHEDULER_FILE") dask_local_directory = os.getenv("DASK_LOCAL_DIRECTORY") @@ -164,7 +168,7 @@ def start_dask_client( # FIXME: use proper logging, INFO or DEBUG level print("\nDask client/cluster created using LocalCUDACluster") - Comms.initialize(p2p=True) + Comms.initialize(p2p=p2p) return (client, cluster) From 41474a186fb14f8a43cb242c84979f1e45ded90a Mon Sep 17 00:00:00 2001 From: jnke2016 Date: Thu, 18 Jul 2024 05:55:43 -0700 Subject: [PATCH 3/6] add non p2p dask client --- python/cugraph/cugraph/tests/conftest.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/python/cugraph/cugraph/tests/conftest.py b/python/cugraph/cugraph/tests/conftest.py index cb5755128eb..a672eaf1a1d 100644 --- a/python/cugraph/cugraph/tests/conftest.py +++ b/python/cugraph/cugraph/tests/conftest.py @@ -52,6 +52,21 @@ def dask_client(): stop_dask_client(dask_client, dask_cluster) +@pytest.fixture(scope="module") +def dask_client_non_p2p(): + # start_dask_client will check for the SCHEDULER_FILE and + # DASK_WORKER_DEVICES env vars and use them when creating a client if + # set. start_dask_client will also initialize the Comms singleton. + dask_client, dask_cluster = start_dask_client( + worker_class=IncreasedCloseTimeoutNanny, + p2p=False + ) + + yield dask_client + + stop_dask_client(dask_client, dask_cluster) + + @pytest.fixture(scope="module") def scratch_dir(): # This should always be set if doing MG testing, since temporary From 621a9a886c45edd356e4dbb645f8890d1fec9362 Mon Sep 17 00:00:00 2001 From: jnke2016 Date: Thu, 18 Jul 2024 05:57:34 -0700 Subject: [PATCH 4/6] update docstrings --- python/cugraph/cugraph/testing/mg_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/cugraph/cugraph/testing/mg_utils.py b/python/cugraph/cugraph/testing/mg_utils.py index 4f60e5f53e2..b0ada30f14f 100644 --- a/python/cugraph/cugraph/testing/mg_utils.py +++ b/python/cugraph/cugraph/testing/mg_utils.py @@ -97,7 +97,7 @@ def start_dask_client( the env var SCHEDULER_FILE is set which implies the dask cluster has already been created. - p2p : bool, optional (default=False) + p2p : bool, optional (default=True) Initialize UCX endpoints if True. """ dask_scheduler_file = os.environ.get("SCHEDULER_FILE") From a0cc5b5ac8a8cb3fc0da88964ead1766cd6f7943 Mon Sep 17 00:00:00 2001 From: jnke2016 Date: Thu, 18 Jul 2024 06:23:28 -0700 Subject: [PATCH 5/6] update copyrights --- python/cugraph/cugraph/testing/mg_utils.py | 6 +++--- python/cugraph/cugraph/tests/conftest.py | 5 ++--- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/python/cugraph/cugraph/testing/mg_utils.py b/python/cugraph/cugraph/testing/mg_utils.py index b0ada30f14f..07399b90627 100644 --- a/python/cugraph/cugraph/testing/mg_utils.py +++ b/python/cugraph/cugraph/testing/mg_utils.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -35,7 +35,7 @@ def start_dask_client( jit_unspill=False, worker_class=None, device_memory_limit=0.8, - p2p=True + p2p=True, ): """ Creates a new dask client, and possibly also a cluster, and returns them as @@ -96,7 +96,7 @@ def start_dask_client( dask_cuda.LocalCUDACluster for details. This parameter is ignored if the env var SCHEDULER_FILE is set which implies the dask cluster has already been created. - + p2p : bool, optional (default=True) Initialize UCX endpoints if True. """ diff --git a/python/cugraph/cugraph/tests/conftest.py b/python/cugraph/cugraph/tests/conftest.py index a672eaf1a1d..51f8a05bbe3 100644 --- a/python/cugraph/cugraph/tests/conftest.py +++ b/python/cugraph/cugraph/tests/conftest.py @@ -1,4 +1,4 @@ -# Copyright (c) 2021-2023, NVIDIA CORPORATION. +# Copyright (c) 2021-2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -58,8 +58,7 @@ def dask_client_non_p2p(): # DASK_WORKER_DEVICES env vars and use them when creating a client if # set. start_dask_client will also initialize the Comms singleton. dask_client, dask_cluster = start_dask_client( - worker_class=IncreasedCloseTimeoutNanny, - p2p=False + worker_class=IncreasedCloseTimeoutNanny, p2p=False ) yield dask_client From cb13448b751be01a0814adc7f56436a736f0a5d1 Mon Sep 17 00:00:00 2001 From: jnke2016 Date: Thu, 18 Jul 2024 06:24:18 -0700 Subject: [PATCH 6/6] add fixme --- python/cugraph/cugraph/tests/conftest.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/cugraph/cugraph/tests/conftest.py b/python/cugraph/cugraph/tests/conftest.py index 51f8a05bbe3..d31c2968afe 100644 --- a/python/cugraph/cugraph/tests/conftest.py +++ b/python/cugraph/cugraph/tests/conftest.py @@ -52,6 +52,7 @@ def dask_client(): stop_dask_client(dask_client, dask_cluster) +# FIXME: Add tests leveraging this fixture @pytest.fixture(scope="module") def dask_client_non_p2p(): # start_dask_client will check for the SCHEDULER_FILE and