Skip to content

Commit

Permalink
Minor refactor to cleanup teardown steps for the local dask cluster b…
Browse files Browse the repository at this point in the history
…y adding a teardown utility.
  • Loading branch information
rlratzel committed Dec 30, 2020
1 parent c97ffad commit 640b3e1
Show file tree
Hide file tree
Showing 11 changed files with 55 additions and 91 deletions.
16 changes: 15 additions & 1 deletion python/cugraph/dask/common/mg_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,23 @@ def get_visible_devices():


def setup_local_dask_cluster(p2p=True):
"""
Performs steps to setup a Dask cluster using LocalCUDACluster and returns
the LocalCUDACluster and corresponding client instance.
"""
cluster = LocalCUDACluster()
client = Client(cluster)
client.wait_for_workers(len(get_visible_devices()))
Comms.initialize(p2p)

return (Comms, client, cluster)
return (cluster, client)


def teardown_local_dask_cluster(cluster, client):
"""
Performs steps to destroy a Dask cluster and a corresponding client
instance.
"""
Comms.destroy()
client.close()
cluster.close()
13 changes: 4 additions & 9 deletions python/cugraph/tests/dask/test_mg_bfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,15 @@
import dask_cudf
import cudf
from cugraph.dask.common.mg_utils import (is_single_gpu,
setup_local_dask_cluster)
setup_local_dask_cluster,
teardown_local_dask_cluster)


@pytest.fixture(scope="module")
def client_connection():
# setup
(comms, client, cluster) = setup_local_dask_cluster(p2p=True)

(cluster, client) = setup_local_dask_cluster(p2p=True)
yield client

# teardown
comms.destroy()
client.close()
cluster.close()
teardown_local_dask_cluster(cluster, client)


@pytest.mark.skipif(
Expand Down
13 changes: 4 additions & 9 deletions python/cugraph/tests/dask/test_mg_comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,15 @@
import dask_cudf
import cudf
from cugraph.dask.common.mg_utils import (is_single_gpu,
setup_local_dask_cluster)
setup_local_dask_cluster,
teardown_local_dask_cluster)


@pytest.fixture(scope="module")
def client_connection():
# setup
(comms, client, cluster) = setup_local_dask_cluster(p2p=True)

(cluster, client) = setup_local_dask_cluster(p2p=True)
yield client

# teardown
comms.destroy()
client.close()
cluster.close()
teardown_local_dask_cluster(cluster, client)


@pytest.mark.skipif(
Expand Down
13 changes: 4 additions & 9 deletions python/cugraph/tests/dask/test_mg_degree.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,15 @@
import cugraph
import dask_cudf
from cugraph.dask.common.mg_utils import (is_single_gpu,
setup_local_dask_cluster)
setup_local_dask_cluster,
teardown_local_dask_cluster)


@pytest.fixture(scope="module")
def client_connection():
# setup
(comms, client, cluster) = setup_local_dask_cluster(p2p=True)

(cluster, client) = setup_local_dask_cluster(p2p=True)
yield client

# teardown
comms.destroy()
client.close()
cluster.close()
teardown_local_dask_cluster(cluster, client)


@pytest.mark.skipif(
Expand Down
13 changes: 4 additions & 9 deletions python/cugraph/tests/dask/test_mg_katz_centrality.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,15 @@
import dask_cudf
import cudf
from cugraph.dask.common.mg_utils import (is_single_gpu,
setup_local_dask_cluster)
setup_local_dask_cluster,
teardown_local_dask_cluster)


@pytest.fixture(scope="module")
def client_connection():
# setup
(comms, client, cluster) = setup_local_dask_cluster(p2p=True)

(cluster, client) = setup_local_dask_cluster(p2p=True)
yield client

# teardown
comms.destroy()
client.close()
cluster.close()
teardown_local_dask_cluster(cluster, client)


@pytest.mark.skipif(
Expand Down
13 changes: 4 additions & 9 deletions python/cugraph/tests/dask/test_mg_louvain.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
import dask_cudf
from cugraph.tests import utils
from cugraph.dask.common.mg_utils import (is_single_gpu,
setup_local_dask_cluster)
setup_local_dask_cluster,
teardown_local_dask_cluster)

try:
from rapids_pytest_benchmark import setFixtureParamNames
Expand All @@ -42,15 +43,9 @@ def setFixtureParamNames(*args, **kwargs):
# Fixtures
@pytest.fixture(scope="module")
def client_connection():
# setup
(comms, client, cluster) = setup_local_dask_cluster(p2p=True)

(cluster, client) = setup_local_dask_cluster(p2p=True)
yield client

# teardown
comms.destroy()
client.close()
cluster.close()
teardown_local_dask_cluster(cluster, client)


@pytest.mark.skipif(
Expand Down
13 changes: 4 additions & 9 deletions python/cugraph/tests/dask/test_mg_pagerank.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
import dask_cudf
import cudf
from cugraph.dask.common.mg_utils import (is_single_gpu,
setup_local_dask_cluster)
setup_local_dask_cluster,
teardown_local_dask_cluster)


# The function selects personalization_perc% of accessible vertices in graph M
Expand Down Expand Up @@ -52,15 +53,9 @@ def personalize(vertices, personalization_perc):

@pytest.fixture(scope="module")
def client_connection():
# setup
(comms, client, cluster) = setup_local_dask_cluster(p2p=True)

(cluster, client) = setup_local_dask_cluster(p2p=True)
yield client

# teardown
comms.destroy()
client.close()
cluster.close()
teardown_local_dask_cluster(cluster, client)


@pytest.mark.skipif(
Expand Down
13 changes: 4 additions & 9 deletions python/cugraph/tests/dask/test_mg_renumber.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,15 @@
from cugraph.tests import utils
from cugraph.structure.number_map import NumberMap
from cugraph.dask.common.mg_utils import (is_single_gpu,
setup_local_dask_cluster)
setup_local_dask_cluster,
teardown_local_dask_cluster)


@pytest.fixture(scope="module")
def client_connection():
# setup
(comms, client, cluster) = setup_local_dask_cluster(p2p=True)

(cluster, client) = setup_local_dask_cluster(p2p=True)
yield client

# teardown
comms.destroy()
client.close()
cluster.close()
teardown_local_dask_cluster(cluster, client)


# Test all combinations of default/managed and pooled/non-pooled allocation
Expand Down
13 changes: 4 additions & 9 deletions python/cugraph/tests/dask/test_mg_sssp.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,15 @@
import dask_cudf
import cudf
from cugraph.dask.common.mg_utils import (is_single_gpu,
setup_local_dask_cluster)
setup_local_dask_cluster,
teardown_local_dask_cluster)


@pytest.fixture(scope="module")
def client_connection():
# setup
(comms, client, cluster) = setup_local_dask_cluster(p2p=True)

(cluster, client) = setup_local_dask_cluster(p2p=True)
yield client

# teardown
comms.destroy()
client.close()
cluster.close()
teardown_local_dask_cluster(cluster, client)


@pytest.mark.skipif(
Expand Down
13 changes: 4 additions & 9 deletions python/cugraph/tests/dask/test_mg_utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
from cugraph.dask.common.part_utils import concat_within_workers
from cugraph.dask.common.read_utils import get_n_workers
from cugraph.dask.common.mg_utils import (is_single_gpu,
setup_local_dask_cluster)
setup_local_dask_cluster,
teardown_local_dask_cluster)
import os
import time
import numpy as np
Expand All @@ -37,15 +38,9 @@ def setup_function():

@pytest.fixture(scope="module")
def client_connection():
# setup
(comms, client, cluster) = setup_local_dask_cluster(p2p=True)

(cluster, client) = setup_local_dask_cluster(p2p=True)
yield client

# teardown
comms.destroy()
client.close()
cluster.close()
teardown_local_dask_cluster(cluster, client)


@pytest.mark.skipif(
Expand Down
13 changes: 4 additions & 9 deletions python/cugraph/tests/test_symmetrize.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
import cugraph
from cugraph.tests import utils
from cugraph.dask.common.mg_utils import (is_single_gpu,
setup_local_dask_cluster)
setup_local_dask_cluster,
teardown_local_dask_cluster)


def test_version():
Expand Down Expand Up @@ -188,15 +189,9 @@ def test_symmetrize_weighted(graph_file):

@pytest.fixture(scope="module")
def client_connection():
# setup
(comms, client, cluster) = setup_local_dask_cluster(p2p=True)

(cluster, client) = setup_local_dask_cluster(p2p=True)
yield client

# teardown
comms.destroy()
client.close()
cluster.close()
teardown_local_dask_cluster(cluster, client)


@pytest.mark.skipif(
Expand Down

0 comments on commit 640b3e1

Please sign in to comment.