Skip to content

Commit

Permalink
Updates to support nightly MG test automation(#1308)
Browse files Browse the repository at this point in the history
This PR includes various updates to support nightly automated MG test runs, including:
* Adding a marker which nightly scripts use to run on all visible GPUs instead of a hardcoded number of GPUs, since the scripts rely on knowing the number of GPUs being used in the tests by setting the `CUDA_VISIBLE_DEVICES` env var.
  * In the nightly scripts, the marker is used like so: `pytest -m "not preset_gpu_count" ...`
* Added a `client.wait_for_workers()` call to various setups to both match the approach taken by the `MGContext` class, and to ensure workers are running. This seemed to increase reliability in the test runs. 
  * _side note: we should decide to use only the `MGContext` class or the `client_connection` pytest fixture in these tests, since they both aim to accomplish the same thing._

Authors:
  - Rick Ratzel <[email protected]>

Approvers:
  - null

URL: #1308
  • Loading branch information
rlratzel authored Jan 4, 2021
1 parent 70d9abd commit 049b088
Show file tree
Hide file tree
Showing 16 changed files with 130 additions and 158 deletions.
43 changes: 42 additions & 1 deletion python/cugraph/dask/common/mg_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,17 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from cugraph.raft.dask.common.utils import default_client

import os

import numba.cuda

from dask_cuda import LocalCUDACluster
from dask.distributed import Client

from cugraph.raft.dask.common.utils import default_client
import cugraph.comms as Comms


# FIXME: We currently look for the default client from dask, as such is the
# if there is a dask client running without any GPU we will still try
Expand Down Expand Up @@ -41,3 +49,36 @@ def is_single_gpu():
return False
else:
return True


def get_visible_devices():
_visible_devices = os.environ.get("CUDA_VISIBLE_DEVICES")
if _visible_devices is None:
# FIXME: We assume that if the variable is unset there is only one GPU
visible_devices = ["0"]
else:
visible_devices = _visible_devices.strip().split(",")
return visible_devices


def setup_local_dask_cluster(p2p=True):
"""
Performs steps to setup a Dask cluster using LocalCUDACluster and returns
the LocalCUDACluster and corresponding client instance.
"""
cluster = LocalCUDACluster()
client = Client(cluster)
client.wait_for_workers(len(get_visible_devices()))
Comms.initialize(p2p)

return (cluster, client)


def teardown_local_dask_cluster(cluster, client):
"""
Performs steps to destroy a Dask cluster and a corresponding client
instance.
"""
Comms.destroy()
client.close()
cluster.close()
28 changes: 11 additions & 17 deletions python/cugraph/tests/dask/mg_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,15 @@
# limitations under the License.

import time
import os

import pytest

from dask.distributed import Client

from cugraph.dask.common.mg_utils import get_visible_devices
from dask_cuda import LocalCUDACluster as CUDACluster
import cugraph.comms as Comms
import pytest


# Maximal number of verifications of the number of workers
DEFAULT_MAX_ATTEMPT = 100
Expand All @@ -26,22 +29,13 @@
DEFAULT_WAIT_TIME = 0.5


def get_visible_devices():
_visible_devices = os.environ.get("CUDA_VISIBLE_DEVICES")
if _visible_devices is None:
# FIXME: We assume that if the variable is unset there is only one GPU
visible_devices = ["0"]
else:
visible_devices = _visible_devices.strip().split(",")
return visible_devices


def skip_if_not_enough_devices(required_devices):
visible_devices = get_visible_devices()
number_of_visible_devices = len(visible_devices)
if required_devices > number_of_visible_devices:
pytest.skip("Not enough devices available to "
"test MG({})".format(required_devices))
if required_devices is not None:
visible_devices = get_visible_devices()
number_of_visible_devices = len(visible_devices)
if required_devices > number_of_visible_devices:
pytest.skip("Not enough devices available to "
"test MG({})".format(required_devices))


class MGContext:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@
# Parameters
# =============================================================================
DATASETS = ["../datasets/karate.csv"]
MG_DEVICE_COUNT_OPTIONS = [1, 2, 3, 4]
MG_DEVICE_COUNT_OPTIONS = [pytest.param(1, marks=pytest.mark.preset_gpu_count),
pytest.param(2, marks=pytest.mark.preset_gpu_count),
pytest.param(3, marks=pytest.mark.preset_gpu_count),
pytest.param(4, marks=pytest.mark.preset_gpu_count),
None]
RESULT_DTYPE_OPTIONS = [np.float64]


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@
# Parameters
# =============================================================================
DATASETS = ["../datasets/karate.csv"]
MG_DEVICE_COUNT_OPTIONS = [1, 2, 4]
MG_DEVICE_COUNT_OPTIONS = [pytest.param(1, marks=pytest.mark.preset_gpu_count),
pytest.param(2, marks=pytest.mark.preset_gpu_count),
pytest.param(3, marks=pytest.mark.preset_gpu_count),
pytest.param(4, marks=pytest.mark.preset_gpu_count),
None]
RESULT_DTYPE_OPTIONS = [np.float64]


Expand Down
19 changes: 6 additions & 13 deletions python/cugraph/tests/dask/test_mg_bfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,21 @@
# limitations under the License.

import cugraph.dask as dcg
import cugraph.comms as Comms
from dask.distributed import Client
import gc
import pytest
import cugraph
import dask_cudf
import cudf
from dask_cuda import LocalCUDACluster
from cugraph.dask.common.mg_utils import is_single_gpu
from cugraph.dask.common.mg_utils import (is_single_gpu,
setup_local_dask_cluster,
teardown_local_dask_cluster)


@pytest.fixture
@pytest.fixture(scope="module")
def client_connection():
cluster = LocalCUDACluster()
client = Client(cluster)
Comms.initialize(p2p=True)

(cluster, client) = setup_local_dask_cluster(p2p=True)
yield client

Comms.destroy()
client.close()
cluster.close()
teardown_local_dask_cluster(cluster, client)


@pytest.mark.skipif(
Expand Down
19 changes: 6 additions & 13 deletions python/cugraph/tests/dask/test_mg_comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,21 @@
# limitations under the License.

import cugraph.dask as dcg
import cugraph.comms as Comms
from dask.distributed import Client
import gc
import pytest
import cugraph
import dask_cudf
import cudf
from dask_cuda import LocalCUDACluster
from cugraph.dask.common.mg_utils import is_single_gpu
from cugraph.dask.common.mg_utils import (is_single_gpu,
setup_local_dask_cluster,
teardown_local_dask_cluster)


@pytest.fixture
@pytest.fixture(scope="module")
def client_connection():
cluster = LocalCUDACluster()
client = Client(cluster)
Comms.initialize(p2p=True)

(cluster, client) = setup_local_dask_cluster(p2p=True)
yield client

Comms.destroy()
client.close()
cluster.close()
teardown_local_dask_cluster(cluster, client)


@pytest.mark.skipif(
Expand Down
21 changes: 6 additions & 15 deletions python/cugraph/tests/dask/test_mg_degree.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,30 +11,21 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from dask.distributed import Client
import gc
import pytest
import cudf
import cugraph.comms as Comms
import cugraph
import dask_cudf
from cugraph.dask.common.mg_utils import is_single_gpu
from cugraph.dask.common.mg_utils import (is_single_gpu,
setup_local_dask_cluster,
teardown_local_dask_cluster)

# Move to conftest
from dask_cuda import LocalCUDACluster


@pytest.fixture
@pytest.fixture(scope="module")
def client_connection():
cluster = LocalCUDACluster()
client = Client(cluster)
Comms.initialize(p2p=True)

(cluster, client) = setup_local_dask_cluster(p2p=True)
yield client

Comms.destroy()
client.close()
cluster.close()
teardown_local_dask_cluster(cluster, client)


@pytest.mark.skipif(
Expand Down
22 changes: 6 additions & 16 deletions python/cugraph/tests/dask/test_mg_katz_centrality.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,20 @@
# import numpy as np
import pytest
import cugraph.dask as dcg
import cugraph.comms as Comms
from dask.distributed import Client
import gc
import cugraph
import dask_cudf
import cudf
from dask_cuda import LocalCUDACluster
from cugraph.dask.common.mg_utils import is_single_gpu
from cugraph.dask.common.mg_utils import (is_single_gpu,
setup_local_dask_cluster,
teardown_local_dask_cluster)

# The function selects personalization_perc% of accessible vertices in graph M
# and randomly assigns them personalization values


@pytest.fixture
@pytest.fixture(scope="module")
def client_connection():
cluster = LocalCUDACluster()
client = Client(cluster)
Comms.initialize(p2p=True)

(cluster, client) = setup_local_dask_cluster(p2p=True)
yield client

Comms.destroy()
client.close()
cluster.close()
teardown_local_dask_cluster(cluster, client)


@pytest.mark.skipif(
Expand Down
19 changes: 5 additions & 14 deletions python/cugraph/tests/dask/test_mg_louvain.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@
import pytest

import cugraph.dask as dcg
import cugraph.comms as Comms
from dask.distributed import Client
import cugraph
import dask_cudf
from dask_cuda import LocalCUDACluster
from cugraph.tests import utils
from cugraph.dask.common.mg_utils import is_single_gpu
from cugraph.dask.common.mg_utils import (is_single_gpu,
setup_local_dask_cluster,
teardown_local_dask_cluster)

try:
from rapids_pytest_benchmark import setFixtureParamNames
Expand All @@ -44,17 +43,9 @@ def setFixtureParamNames(*args, **kwargs):
# Fixtures
@pytest.fixture(scope="module")
def client_connection():
# setup
cluster = LocalCUDACluster()
client = Client(cluster)
Comms.initialize(p2p=True)

(cluster, client) = setup_local_dask_cluster(p2p=True)
yield client

# teardown
Comms.destroy()
client.close()
cluster.close()
teardown_local_dask_cluster(cluster, client)


@pytest.mark.skipif(
Expand Down
21 changes: 7 additions & 14 deletions python/cugraph/tests/dask/test_mg_pagerank.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,18 @@
import numpy as np
import pytest
import cugraph.dask as dcg
import cugraph.comms as Comms
from dask.distributed import Client
import gc
import cugraph
import dask_cudf
import cudf
from dask_cuda import LocalCUDACluster
from cugraph.dask.common.mg_utils import is_single_gpu
from cugraph.dask.common.mg_utils import (is_single_gpu,
setup_local_dask_cluster,
teardown_local_dask_cluster)


# The function selects personalization_perc% of accessible vertices in graph M
# and randomly assigns them personalization values


def personalize(vertices, personalization_perc):
personalization = None
if personalization_perc != 0:
Expand All @@ -52,17 +51,11 @@ def personalize(vertices, personalization_perc):
PERSONALIZATION_PERC = [0, 10, 50]


@pytest.fixture
@pytest.fixture(scope="module")
def client_connection():
cluster = LocalCUDACluster()
client = Client(cluster)
Comms.initialize(p2p=True)

(cluster, client) = setup_local_dask_cluster(p2p=True)
yield client

Comms.destroy()
client.close()
cluster.close()
teardown_local_dask_cluster(cluster, client)


@pytest.mark.skipif(
Expand Down
19 changes: 6 additions & 13 deletions python/cugraph/tests/dask/test_mg_renumber.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,22 @@
import numpy as np

import cugraph.dask as dcg
import cugraph.comms as Comms
from dask.distributed import Client
import cugraph
import dask_cudf
import dask
import cudf
from dask_cuda import LocalCUDACluster
from cugraph.tests import utils
from cugraph.structure.number_map import NumberMap
from cugraph.dask.common.mg_utils import is_single_gpu
from cugraph.dask.common.mg_utils import (is_single_gpu,
setup_local_dask_cluster,
teardown_local_dask_cluster)


@pytest.fixture
@pytest.fixture(scope="module")
def client_connection():
cluster = LocalCUDACluster()
client = Client(cluster)
Comms.initialize(p2p=True)

(cluster, client) = setup_local_dask_cluster(p2p=True)
yield client

Comms.destroy()
client.close()
cluster.close()
teardown_local_dask_cluster(cluster, client)


# Test all combinations of default/managed and pooled/non-pooled allocation
Expand Down
Loading

0 comments on commit 049b088

Please sign in to comment.