Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REVIEW] Updates to support nightly MG test automation #1308

Merged
merged 6 commits into from
Jan 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 42 additions & 1 deletion python/cugraph/dask/common/mg_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,17 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from cugraph.raft.dask.common.utils import default_client

import os

import numba.cuda

from dask_cuda import LocalCUDACluster
from dask.distributed import Client

from cugraph.raft.dask.common.utils import default_client
import cugraph.comms as Comms


# FIXME: We currently look for the default client from dask, as such is the
# if there is a dask client running without any GPU we will still try
Expand Down Expand Up @@ -41,3 +49,36 @@ def is_single_gpu():
return False
else:
return True


def get_visible_devices():
_visible_devices = os.environ.get("CUDA_VISIBLE_DEVICES")
if _visible_devices is None:
# FIXME: We assume that if the variable is unset there is only one GPU
visible_devices = ["0"]
else:
visible_devices = _visible_devices.strip().split(",")
return visible_devices


def setup_local_dask_cluster(p2p=True):
"""
Performs steps to setup a Dask cluster using LocalCUDACluster and returns
the LocalCUDACluster and corresponding client instance.
"""
cluster = LocalCUDACluster()
client = Client(cluster)
client.wait_for_workers(len(get_visible_devices()))
Comms.initialize(p2p)

return (cluster, client)


def teardown_local_dask_cluster(cluster, client):
"""
Performs steps to destroy a Dask cluster and a corresponding client
instance.
"""
Comms.destroy()
client.close()
cluster.close()
28 changes: 11 additions & 17 deletions python/cugraph/tests/dask/mg_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,15 @@
# limitations under the License.

import time
import os

import pytest

from dask.distributed import Client

from cugraph.dask.common.mg_utils import get_visible_devices
from dask_cuda import LocalCUDACluster as CUDACluster
import cugraph.comms as Comms
import pytest


# Maximal number of verifications of the number of workers
DEFAULT_MAX_ATTEMPT = 100
Expand All @@ -26,22 +29,13 @@
DEFAULT_WAIT_TIME = 0.5


def get_visible_devices():
_visible_devices = os.environ.get("CUDA_VISIBLE_DEVICES")
if _visible_devices is None:
# FIXME: We assume that if the variable is unset there is only one GPU
visible_devices = ["0"]
else:
visible_devices = _visible_devices.strip().split(",")
return visible_devices


def skip_if_not_enough_devices(required_devices):
visible_devices = get_visible_devices()
number_of_visible_devices = len(visible_devices)
if required_devices > number_of_visible_devices:
pytest.skip("Not enough devices available to "
"test MG({})".format(required_devices))
if required_devices is not None:
visible_devices = get_visible_devices()
number_of_visible_devices = len(visible_devices)
if required_devices > number_of_visible_devices:
pytest.skip("Not enough devices available to "
"test MG({})".format(required_devices))


class MGContext:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@
# Parameters
# =============================================================================
DATASETS = ["../datasets/karate.csv"]
MG_DEVICE_COUNT_OPTIONS = [1, 2, 3, 4]
MG_DEVICE_COUNT_OPTIONS = [pytest.param(1, marks=pytest.mark.preset_gpu_count),
pytest.param(2, marks=pytest.mark.preset_gpu_count),
pytest.param(3, marks=pytest.mark.preset_gpu_count),
pytest.param(4, marks=pytest.mark.preset_gpu_count),
None]
RESULT_DTYPE_OPTIONS = [np.float64]


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@
# Parameters
# =============================================================================
DATASETS = ["../datasets/karate.csv"]
MG_DEVICE_COUNT_OPTIONS = [1, 2, 4]
MG_DEVICE_COUNT_OPTIONS = [pytest.param(1, marks=pytest.mark.preset_gpu_count),
pytest.param(2, marks=pytest.mark.preset_gpu_count),
pytest.param(3, marks=pytest.mark.preset_gpu_count),
pytest.param(4, marks=pytest.mark.preset_gpu_count),
None]
RESULT_DTYPE_OPTIONS = [np.float64]


Expand Down
19 changes: 6 additions & 13 deletions python/cugraph/tests/dask/test_mg_bfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,21 @@
# limitations under the License.

import cugraph.dask as dcg
import cugraph.comms as Comms
from dask.distributed import Client
import gc
import pytest
import cugraph
import dask_cudf
import cudf
from dask_cuda import LocalCUDACluster
from cugraph.dask.common.mg_utils import is_single_gpu
from cugraph.dask.common.mg_utils import (is_single_gpu,
setup_local_dask_cluster,
teardown_local_dask_cluster)


@pytest.fixture
@pytest.fixture(scope="module")
def client_connection():
cluster = LocalCUDACluster()
client = Client(cluster)
Comms.initialize(p2p=True)

(cluster, client) = setup_local_dask_cluster(p2p=True)
yield client

Comms.destroy()
client.close()
cluster.close()
teardown_local_dask_cluster(cluster, client)


@pytest.mark.skipif(
Expand Down
19 changes: 6 additions & 13 deletions python/cugraph/tests/dask/test_mg_comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,21 @@
# limitations under the License.

import cugraph.dask as dcg
import cugraph.comms as Comms
from dask.distributed import Client
import gc
import pytest
import cugraph
import dask_cudf
import cudf
from dask_cuda import LocalCUDACluster
from cugraph.dask.common.mg_utils import is_single_gpu
from cugraph.dask.common.mg_utils import (is_single_gpu,
setup_local_dask_cluster,
teardown_local_dask_cluster)


@pytest.fixture
@pytest.fixture(scope="module")
def client_connection():
cluster = LocalCUDACluster()
client = Client(cluster)
Comms.initialize(p2p=True)

(cluster, client) = setup_local_dask_cluster(p2p=True)
yield client

Comms.destroy()
client.close()
cluster.close()
teardown_local_dask_cluster(cluster, client)


@pytest.mark.skipif(
Expand Down
21 changes: 6 additions & 15 deletions python/cugraph/tests/dask/test_mg_degree.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,30 +11,21 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from dask.distributed import Client
import gc
import pytest
import cudf
import cugraph.comms as Comms
import cugraph
import dask_cudf
from cugraph.dask.common.mg_utils import is_single_gpu
from cugraph.dask.common.mg_utils import (is_single_gpu,
setup_local_dask_cluster,
teardown_local_dask_cluster)

# Move to conftest
from dask_cuda import LocalCUDACluster


@pytest.fixture
@pytest.fixture(scope="module")
def client_connection():
cluster = LocalCUDACluster()
client = Client(cluster)
Comms.initialize(p2p=True)

(cluster, client) = setup_local_dask_cluster(p2p=True)
yield client

Comms.destroy()
client.close()
cluster.close()
teardown_local_dask_cluster(cluster, client)


@pytest.mark.skipif(
Expand Down
22 changes: 6 additions & 16 deletions python/cugraph/tests/dask/test_mg_katz_centrality.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,20 @@
# import numpy as np
import pytest
import cugraph.dask as dcg
import cugraph.comms as Comms
from dask.distributed import Client
import gc
import cugraph
import dask_cudf
import cudf
from dask_cuda import LocalCUDACluster
from cugraph.dask.common.mg_utils import is_single_gpu
from cugraph.dask.common.mg_utils import (is_single_gpu,
setup_local_dask_cluster,
teardown_local_dask_cluster)

# The function selects personalization_perc% of accessible vertices in graph M
# and randomly assigns them personalization values


@pytest.fixture
@pytest.fixture(scope="module")
def client_connection():
cluster = LocalCUDACluster()
client = Client(cluster)
Comms.initialize(p2p=True)

(cluster, client) = setup_local_dask_cluster(p2p=True)
yield client

Comms.destroy()
client.close()
cluster.close()
teardown_local_dask_cluster(cluster, client)


@pytest.mark.skipif(
Expand Down
19 changes: 5 additions & 14 deletions python/cugraph/tests/dask/test_mg_louvain.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@
import pytest

import cugraph.dask as dcg
import cugraph.comms as Comms
from dask.distributed import Client
import cugraph
import dask_cudf
from dask_cuda import LocalCUDACluster
from cugraph.tests import utils
from cugraph.dask.common.mg_utils import is_single_gpu
from cugraph.dask.common.mg_utils import (is_single_gpu,
setup_local_dask_cluster,
teardown_local_dask_cluster)

try:
from rapids_pytest_benchmark import setFixtureParamNames
Expand All @@ -44,17 +43,9 @@ def setFixtureParamNames(*args, **kwargs):
# Fixtures
@pytest.fixture(scope="module")
def client_connection():
# setup
cluster = LocalCUDACluster()
client = Client(cluster)
Comms.initialize(p2p=True)

(cluster, client) = setup_local_dask_cluster(p2p=True)
yield client

# teardown
Comms.destroy()
client.close()
cluster.close()
teardown_local_dask_cluster(cluster, client)


@pytest.mark.skipif(
Expand Down
21 changes: 7 additions & 14 deletions python/cugraph/tests/dask/test_mg_pagerank.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,18 @@
import numpy as np
import pytest
import cugraph.dask as dcg
import cugraph.comms as Comms
from dask.distributed import Client
import gc
import cugraph
import dask_cudf
import cudf
from dask_cuda import LocalCUDACluster
from cugraph.dask.common.mg_utils import is_single_gpu
from cugraph.dask.common.mg_utils import (is_single_gpu,
setup_local_dask_cluster,
teardown_local_dask_cluster)


# The function selects personalization_perc% of accessible vertices in graph M
# and randomly assigns them personalization values


def personalize(vertices, personalization_perc):
personalization = None
if personalization_perc != 0:
Expand All @@ -52,17 +51,11 @@ def personalize(vertices, personalization_perc):
PERSONALIZATION_PERC = [0, 10, 50]


@pytest.fixture
@pytest.fixture(scope="module")
def client_connection():
cluster = LocalCUDACluster()
client = Client(cluster)
Comms.initialize(p2p=True)

(cluster, client) = setup_local_dask_cluster(p2p=True)
yield client

Comms.destroy()
client.close()
cluster.close()
teardown_local_dask_cluster(cluster, client)


@pytest.mark.skipif(
Expand Down
19 changes: 6 additions & 13 deletions python/cugraph/tests/dask/test_mg_renumber.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,22 @@
import numpy as np

import cugraph.dask as dcg
import cugraph.comms as Comms
from dask.distributed import Client
import cugraph
import dask_cudf
import dask
import cudf
from dask_cuda import LocalCUDACluster
from cugraph.tests import utils
from cugraph.structure.number_map import NumberMap
from cugraph.dask.common.mg_utils import is_single_gpu
from cugraph.dask.common.mg_utils import (is_single_gpu,
setup_local_dask_cluster,
teardown_local_dask_cluster)


@pytest.fixture
@pytest.fixture(scope="module")
def client_connection():
cluster = LocalCUDACluster()
client = Client(cluster)
Comms.initialize(p2p=True)

(cluster, client) = setup_local_dask_cluster(p2p=True)
yield client

Comms.destroy()
client.close()
cluster.close()
teardown_local_dask_cluster(cluster, client)


# Test all combinations of default/managed and pooled/non-pooled allocation
Expand Down
Loading