diff --git a/ci/gpu/build.sh b/ci/gpu/build.sh index 2a7e0c4cd..f640358f8 100755 --- a/ci/gpu/build.sh +++ b/ci/gpu/build.sh @@ -19,6 +19,10 @@ cd $WORKSPACE export GIT_DESCRIBE_TAG=`git describe --abbrev=0 --tags` export GIT_DESCRIBE_NUMBER=`git rev-list ${GIT_DESCRIBE_TAG}..HEAD --count` +# Enable NumPy's __array_function__ protocol (needed for NumPy 1.16.x, +# will possibly be enabled by default starting on 1.17) +export NUMPY_EXPERIMENTAL_ARRAY_FUNCTION=1 + ################################################################################ # SETUP - Check environment ################################################################################ @@ -38,6 +42,13 @@ conda list # FIX Added to deal with Anancoda SSL verification issues during conda builds conda config --set ssl_verify False +################################################################################ +# SETUP - Install additional packages +################################################################################ + +# Install CuPy for tests +pip install cupy-cuda100==6.0.0rc1 + ################################################################################ # TEST - Run tests ################################################################################ diff --git a/conda/recipes/dask-cuda/meta.yaml b/conda/recipes/dask-cuda/meta.yaml index 5764cbc84..6be19fc57 100644 --- a/conda/recipes/dask-cuda/meta.yaml +++ b/conda/recipes/dask-cuda/meta.yaml @@ -21,8 +21,10 @@ requirements: - setuptools run: - python x.x - - dask-core >=1.1.4 - - distributed >=1.25.2 + - dask-core >=1.2.1 + - distributed >=1.28.0 + - numpy >=1.16.0 + - numba >=0.40.1 test: imports: diff --git a/dask_cuda/dask_cuda_worker.py b/dask_cuda/dask_cuda_worker.py index 7667c137d..277092993 100755 --- a/dask_cuda/dask_cuda_worker.py +++ b/dask_cuda/dask_cuda_worker.py @@ -8,8 +8,14 @@ import click from distributed import Nanny, Worker from distributed.config import config -from distributed.utils import get_ip_interface, parse_timedelta -from distributed.worker import _ncores +from distributed.diskutils import WorkSpace +from distributed.utils import ( + get_ip_interface, + parse_timedelta, + parse_bytes, + warn_on_duration, +) +from distributed.worker import _ncores, parse_memory_limit from distributed.security import Security from distributed.cli.utils import ( check_python_3, @@ -23,8 +29,9 @@ enable_proctitle_on_current, ) +from .device_host_file import DeviceHostFile from .local_cuda_cluster import cuda_visible_devices -from .utils import get_n_gpus +from .utils import get_n_gpus, get_device_total_memory from toolz import valmap from tornado.ioloop import IOLoop, TimeoutError @@ -98,6 +105,16 @@ "string (like 5GB or 5000M), " "'auto', or zero for no memory management", ) +@click.option( + "--device-memory-limit", + default="auto", + help="Bytes of memory per CUDA device that the worker can use. " + "This can be an integer (bytes), " + "float (fraction of total device memory), " + "string (like 5GB or 5000M), " + "'auto', or zero for no memory management " + "(i.e., allow full device memory usage).", +) @click.option( "--reconnect/--no-reconnect", default=True, @@ -146,6 +163,7 @@ def main( nthreads, name, memory_limit, + device_memory_limit, pid_file, reconnect, resources, @@ -175,7 +193,7 @@ def main( nprocs = get_n_gpus() if not nthreads: - nthreads = min(1, _ncores // nprocs ) + nthreads = min(1, _ncores // nprocs) if pid_file: with open(pid_file, "w") as f: @@ -234,6 +252,18 @@ def del_pid_file(): if death_timeout is not None: death_timeout = parse_timedelta(death_timeout, "s") + local_dir = kwargs.get("local_dir", "dask-worker-space") + with warn_on_duration( + "1s", + "Creating scratch directories is taking a surprisingly long time. " + "This is often due to running workers on a network file system. " + "Consider specifying a local-directory to point workers to write " + "scratch data to a local disk.", + ): + _workspace = WorkSpace(os.path.abspath(local_dir)) + _workdir = _workspace.new_work_dir(prefix="worker-") + local_dir = _workdir.dir_path + nannies = [ t( scheduler, @@ -252,7 +282,19 @@ def del_pid_file(): contact_address=None, env={"CUDA_VISIBLE_DEVICES": cuda_visible_devices(i)}, name=name if nprocs == 1 or not name else name + "-" + str(i), - **kwargs + data=( + DeviceHostFile, + { + "device_memory_limit": get_device_total_memory(index=i) + if (device_memory_limit == "auto" or device_memory_limit == int(0)) + else parse_bytes(device_memory_limit), + "memory_limit": parse_memory_limit( + memory_limit, nthreads, total_cores=nprocs + ), + "local_dir": local_dir, + }, + ), + **kwargs, ) for i in range(nprocs) ] diff --git a/dask_cuda/device_host_file.py b/dask_cuda/device_host_file.py new file mode 100644 index 000000000..39785e978 --- /dev/null +++ b/dask_cuda/device_host_file.py @@ -0,0 +1,107 @@ +from zict import Buffer, File, Func +from zict.common import ZictBase +from distributed.protocol import deserialize_bytes, serialize_bytes +from distributed.worker import weight + +from functools import partial +import os + + +def _is_device_object(obj): + """ + Check if obj is a device object, by checking if it has a + __cuda_array_interface__ attributed + """ + return hasattr(obj, "__cuda_array_interface__") + + +def _serialize_if_device(obj): + """ Serialize an object if it's a device object """ + if _is_device_object(obj): + return serialize_bytes(obj, on_error="raise") + else: + return obj + + +def _deserialize_if_device(obj): + """ Deserialize an object if it's an instance of bytes """ + if isinstance(obj, bytes): + return deserialize_bytes(obj) + else: + return obj + + +class DeviceHostFile(ZictBase): + """ Manages serialization/deserialization of objects. + + Three LRU cache levels are controlled, for device, host and disk. + Each level takes care of serializing objects once its limit has been + reached and pass it to the subsequent level. Similarly, each cache + may deserialize the object, but storing it back in the appropriate + cache, depending on the type of object being deserialized. + + Parameters + ---------- + device_memory_limit: int + Number of bytes of CUDA device memory for device LRU cache, + spills to host cache once filled. + memory_limit: int + Number of bytes of host memory for host LRU cache, spills to + disk once filled. + local_dir: path + Path where to store serialized objects on disk + """ + + def __init__( + self, device_memory_limit=None, memory_limit=None, local_dir="dask-worker-space" + ): + path = os.path.join(local_dir, "storage") + + self.host_func = dict() + self.disk_func = Func( + partial(serialize_bytes, on_error="raise"), deserialize_bytes, File(path) + ) + self.host_buffer = Buffer( + self.host_func, self.disk_func, memory_limit, weight=weight + ) + + self.device_func = dict() + self.device_host_func = Func( + _serialize_if_device, _deserialize_if_device, self.host_buffer + ) + self.device_buffer = Buffer( + self.device_func, self.device_host_func, device_memory_limit, weight=weight + ) + + self.device = self.device_buffer.fast.d + self.host = self.host_buffer.fast.d + self.disk = self.host_buffer.slow.d + + # For Worker compatibility only, where `fast` is host memory buffer + self.fast = self.host_buffer.fast + + def __setitem__(self, key, value): + if _is_device_object(value): + self.device_buffer[key] = value + else: + self.host_buffer[key] = value + + def __getitem__(self, key): + if key in self.host_buffer: + obj = self.host_buffer[key] + del self.host_buffer[key] + self.device_buffer[key] = _deserialize_if_device(obj) + + if key in self.device_buffer: + return self.device_buffer[key] + else: + raise KeyError + + def __len__(self): + return len(self.device_buffer) + + def __iter__(self): + return iter(self.device_buffer) + + def __delitem__(self, i): + del self.device_buffer[i] diff --git a/dask_cuda/local_cuda_cluster.py b/dask_cuda/local_cuda_cluster.py index 11cb88716..31adadf8f 100644 --- a/dask_cuda/local_cuda_cluster.py +++ b/dask_cuda/local_cuda_cluster.py @@ -1,11 +1,16 @@ import os +import warnings from tornado import gen from dask.distributed import LocalCluster -from distributed.worker import TOTAL_MEMORY +from distributed.diskutils import WorkSpace +from distributed.nanny import Nanny +from distributed.worker import Worker, TOTAL_MEMORY +from distributed.utils import parse_bytes, warn_on_duration -from .utils import get_n_gpus +from .device_host_file import DeviceHostFile +from .utils import get_n_gpus, get_device_total_memory def cuda_visible_devices(i, visible=None): @@ -35,7 +40,8 @@ def __init__( threads_per_worker=1, processes=True, memory_limit=None, - **kwargs + device_memory_limit=None, + **kwargs, ): if n_workers is None: n_workers = get_n_gpus() @@ -45,6 +51,9 @@ def __init__( raise ValueError("Can not specify more processes than GPUs") if memory_limit is None: memory_limit = TOTAL_MEMORY / n_workers + self.host_memory_limit = memory_limit + self.device_memory_limit = device_memory_limit + LocalCluster.__init__( self, n_workers=n_workers, @@ -82,3 +91,58 @@ def _start(self, ip=None, n_workers=0): self.status = "running" raise gen.Return(self) + + @gen.coroutine + def _start_worker(self, death_timeout=60, **kwargs): + if self.status and self.status.startswith("clos"): + warnings.warn("Tried to start a worker while status=='%s'" % self.status) + return + + if self.processes: + W = Nanny + kwargs["quiet"] = True + else: + W = Worker + + local_dir = kwargs.get("local_dir", "dask-worker-space") + with warn_on_duration( + "1s", + "Creating scratch directories is taking a surprisingly long time. " + "This is often due to running workers on a network file system. " + "Consider specifying a local-directory to point workers to write " + "scratch data to a local disk.", + ): + _workspace = WorkSpace(os.path.abspath(local_dir)) + _workdir = _workspace.new_work_dir(prefix="worker-") + local_dir = _workdir.dir_path + + device_index = int(kwargs["env"]["CUDA_VISIBLE_DEVICES"].split(",")[0]) + if self.device_memory_limit is None: + self.device_memory_limit = get_device_total_memory(device_index) + elif isinstance(self.device_memory_limit, str): + self.device_memory_limit = parse_bytes(self.device_memory_limit) + data = DeviceHostFile( + device_memory_limit=self.device_memory_limit, + memory_limit=self.host_memory_limit, + local_dir=local_dir, + ) + + w = yield W( + self.scheduler.address, + loop=self.loop, + death_timeout=death_timeout, + silence_logs=self.silence_logs, + data=data, + **kwargs, + ) + + self.workers.append(w) + + while w.status != "closed" and w.worker_address not in self.scheduler.workers: + yield gen.sleep(0.01) + + if w.status == "closed" and self.scheduler.status == "running": + self.workers.remove(w) + raise gen.TimeoutError("Worker failed to start") + + raise gen.Return(w) diff --git a/dask_cuda/tests/test_dask_cuda_worker.py b/dask_cuda/tests/test_dask_cuda_worker.py index 2b72c73a4..bcfdfd4b6 100644 --- a/dask_cuda/tests/test_dask_cuda_worker.py +++ b/dask_cuda/tests/test_dask_cuda_worker.py @@ -25,6 +25,8 @@ def test_cuda_visible_devices(loop): "127.0.0.1:9359", "--host", "127.0.0.1", + "--device-memory-limit", + "1 MB", "--no-bokeh", ] ) as worker: diff --git a/dask_cuda/tests/test_device_host_file.py b/dask_cuda/tests/test_device_host_file.py new file mode 100644 index 000000000..c4cacb30b --- /dev/null +++ b/dask_cuda/tests/test_device_host_file.py @@ -0,0 +1,112 @@ +import numpy as np +import cupy +from dask_cuda.device_host_file import DeviceHostFile +from random import randint + +import pytest +from cupy.testing import assert_array_equal + + +@pytest.mark.parametrize("num_host_arrays", [1, 10, 100]) +@pytest.mark.parametrize("num_device_arrays", [1, 10, 100]) +@pytest.mark.parametrize("array_size_range", [(1, 1000), (100, 100), (1000, 1000)]) +def test_device_host_file_short( + tmp_path, num_device_arrays, num_host_arrays, array_size_range +): + tmpdir = tmp_path / "storage" + tmpdir.mkdir() + dhf = DeviceHostFile( + device_memory_limit=1024 * 16, memory_limit=1024 * 16, local_dir=tmpdir + ) + + host = [ + ("x-%d" % i, np.random.random(randint(*array_size_range))) + for i in range(num_host_arrays) + ] + device = [ + ("dx-%d" % i, cupy.random.random(randint(*array_size_range))) + for i in range(num_device_arrays) + ] + + import random + + full = host + device + random.shuffle(full) + + for i in full: + dhf[i[0]] = i[1] + + random.shuffle(full) + + for i in full: + assert_array_equal(i[1], dhf[i[0]]) + del dhf[i[0]] + + assert set(dhf.device.keys()) == set() + assert set(dhf.host.keys()) == set() + assert set(dhf.disk.keys()) == set() + + +def test_device_host_file_step_by_step(tmp_path): + tmpdir = tmp_path / "storage" + tmpdir.mkdir() + dhf = DeviceHostFile( + device_memory_limit=1024 * 16, memory_limit=1024 * 16, local_dir=tmpdir + ) + + a = np.random.random(1000) + b = cupy.random.random(1000) + + dhf["a1"] = a + + assert set(dhf.device.keys()) == set() + assert set(dhf.host.keys()) == set(["a1"]) + assert set(dhf.disk.keys()) == set() + + dhf["b1"] = b + + assert set(dhf.device.keys()) == set(["b1"]) + assert set(dhf.host.keys()) == set(["a1"]) + assert set(dhf.disk.keys()) == set() + + dhf["b2"] = b + assert set(dhf.device.keys()) == set(["b1", "b2"]) + assert set(dhf.host.keys()) == set(["a1"]) + assert set(dhf.disk.keys()) == set() + + dhf["b3"] = b + assert set(dhf.device.keys()) == set(["b2", "b3"]) + assert set(dhf.host.keys()) == set(["a1", "b1"]) + assert set(dhf.disk.keys()) == set() + + dhf["a2"] = a + assert set(dhf.device.keys()) == set(["b2", "b3"]) + assert set(dhf.host.keys()) == set(["a2", "b1"]) + assert set(dhf.disk.keys()) == set(["a1"]) + + dhf["b4"] = b + assert set(dhf.device.keys()) == set(["b3", "b4"]) + assert set(dhf.host.keys()) == set(["a2", "b2"]) + assert set(dhf.disk.keys()) == set(["a1", "b1"]) + + dhf["b4"] = b + assert set(dhf.device.keys()) == set(["b3", "b4"]) + assert set(dhf.host.keys()) == set(["a2", "b2"]) + assert set(dhf.disk.keys()) == set(["a1", "b1"]) + + assert_array_equal(dhf["a1"], a) + del dhf["a1"] + assert_array_equal(dhf["a2"], a) + del dhf["a2"] + assert_array_equal(dhf["b1"], b) + del dhf["b1"] + assert_array_equal(dhf["b2"], b) + del dhf["b2"] + assert_array_equal(dhf["b3"], b) + del dhf["b3"] + assert_array_equal(dhf["b4"], b) + del dhf["b4"] + + assert set(dhf.device.keys()) == set() + assert set(dhf.host.keys()) == set() + assert set(dhf.disk.keys()) == set() diff --git a/dask_cuda/tests/test_local_cuda_cluster.py b/dask_cuda/tests/test_local_cuda_cluster.py index 295518994..e17304fa2 100644 --- a/dask_cuda/tests/test_local_cuda_cluster.py +++ b/dask_cuda/tests/test_local_cuda_cluster.py @@ -11,7 +11,10 @@ @gen_test(timeout=20) async def test_local_cuda_cluster(): async with LocalCUDACluster( - scheduler_port=0, asynchronous=True, diagnostics_port=None + scheduler_port=0, + asynchronous=True, + diagnostics_port=None, + device_memory_limit=1, ) as cluster: async with Client(cluster, asynchronous=True) as client: assert len(cluster.workers) == utils.get_n_gpus() @@ -37,7 +40,10 @@ async def test_with_subset_of_cuda_visible_devices(): os.environ["CUDA_VISIBLE_DEVICES"] = "2,3,7,8" try: async with LocalCUDACluster( - scheduler_port=0, asynchronous=True, diagnostics_port=None + scheduler_port=0, + asynchronous=True, + diagnostics_port=None, + device_memory_limit=1, ) as cluster: async with Client(cluster, asynchronous=True) as client: assert len(cluster.workers) == 4 diff --git a/dask_cuda/tests/test_spill.py b/dask_cuda/tests/test_spill.py new file mode 100644 index 000000000..bc16923a3 --- /dev/null +++ b/dask_cuda/tests/test_spill.py @@ -0,0 +1,149 @@ +from distributed.utils_test import gen_cluster, loop, gen_test +from distributed.worker import Worker +from distributed import Client, get_worker, wait +from dask_cuda import LocalCUDACluster +from dask_cuda.device_host_file import DeviceHostFile +import dask.array as da +import cupy +import pytest +import os +from zict.file import _safe_key as safe_key + + +def assert_device_host_file_size(dhf, total_bytes, chunk_overhead=1024): + byte_sum = dhf.device_buffer.fast.total_weight + dhf.host_buffer.fast.total_weight + file_path = [os.path.join(dhf.disk.directory, safe_key(k)) for k in dhf.disk.keys()] + file_size = [os.path.getsize(f) for f in file_path] + byte_sum += sum(file_size) + + # Allow up to chunk_overhead bytes overhead per chunk on disk + host_overhead = len(dhf.host) * chunk_overhead + disk_overhead = len(dhf.disk) * chunk_overhead + assert ( + byte_sum >= total_bytes + and byte_sum <= total_bytes + host_overhead + disk_overhead + ) + + +@pytest.mark.parametrize( + "params", + [ + { + "device_memory_limit": 2e9, + "memory_limit": 4e9, + "host_target": 0.6, + "host_spill": 0.7, + "spills_to_disk": False, + }, + { + "device_memory_limit": 1e9, + "memory_limit": 1e9, + "host_target": 0.3, + "host_spill": 0.3, + "spills_to_disk": True, + }, + ], +) +def test_device_spill(params): + @gen_cluster( + client=True, + ncores=[("127.0.0.1", 1)], + Worker=Worker, + worker_kwargs={ + "memory_limit": params["memory_limit"], + "data": DeviceHostFile( + device_memory_limit=params["device_memory_limit"], + memory_limit=params["memory_limit"], + ), + }, + config={ + "distributed.worker.memory.target": params["host_target"], + "distributed.worker.memory.spill": params["host_spill"], + }, + ) + def test_device_spill(client, scheduler, worker): + rs = da.random.RandomState(RandomState=cupy.random.RandomState) + x = rs.random(int(250e6), chunks=10e6) + yield wait(x) + + xx = x.persist() + yield wait(xx) + + # Allow up to 1024 bytes overhead per chunk serialized + assert_device_host_file_size(worker.data, x.nbytes, 1024) + + y = client.compute(x.sum()) + res = yield y + + assert (abs(res / x.size) - 0.5) < 1e-3 + + assert_device_host_file_size(worker.data, x.nbytes, 1024) + if params["spills_to_disk"]: + assert len(worker.data.disk) > 0 + else: + assert len(worker.data.disk) == 0 + + test_device_spill() + + +@pytest.mark.parametrize( + "params", + [ + { + "device_memory_limit": 2e9, + "memory_limit": 4e9, + "host_target": 0.6, + "host_spill": 0.7, + "spills_to_disk": False, + }, + { + "device_memory_limit": 1e9, + "memory_limit": 1e9, + "host_target": 0.3, + "host_spill": 0.3, + "spills_to_disk": True, + }, + ], +) +def test_cluster_device_spill(loop, params): + @gen_test() + def test_cluster_device_spill(): + cluster = yield LocalCUDACluster( + 1, + scheduler_port=0, + processes=True, + silence_logs=False, + dashboard_address=None, + asynchronous=True, + device_memory_limit=params["device_memory_limit"], + memory_limit=params["memory_limit"], + memory_target_fraction=params["host_target"], + memory_spill_fraction=params["host_spill"], + ) + client = yield Client(cluster, asynchronous=True) + + rs = da.random.RandomState(RandomState=cupy.random.RandomState) + x = rs.random(int(250e6), chunks=10e6) + yield wait(x) + + xx = x.persist() + yield wait(xx) + + def get_data(worker, total_size): + assert_device_host_file_size(get_worker().data, total_size) + + # Allow up to 1024 bytes overhead per chunk serialized + yield client.run(get_data, cluster.workers[0].Worker, x.nbytes) + + y = client.compute(x.sum()) + res = yield y + + assert (abs(res / x.size) - 0.5) < 1e-3 + + yield client.run(get_data, cluster.workers[0].Worker, x.nbytes) + disk_chunks = yield client.run(lambda: len(get_worker().data.disk)) + for dc in disk_chunks.values(): + if params["spills_to_disk"]: + assert dc > 0 + else: + assert dc == 0 diff --git a/dask_cuda/utils.py b/dask_cuda/utils.py index 054e679c9..3809f90f1 100644 --- a/dask_cuda/utils.py +++ b/dask_cuda/utils.py @@ -1,5 +1,6 @@ import toolz import os +from numba import cuda def get_n_gpus(): @@ -12,3 +13,11 @@ def get_n_gpus(): @toolz.memoize def _n_gpus_from_nvidia_smi(): return len(os.popen("nvidia-smi -L").read().strip().split("\n")) + + +def get_device_total_memory(index=0): + """ + Return total memory of CUDA device with index + """ + with cuda.gpus[index]: + return cuda.current_context().get_memory_info()[1] diff --git a/requirements.txt b/requirements.txt index ed5f31b5d..1ea6bcd55 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,4 @@ -dask>=1.1.4 -distributed>=1.25.2 +dask>=1.2.1 +distributed>=1.28.0 +numpy>=1.16.0 +numba>=0.40.1 diff --git a/setup.py b/setup.py index 027f9cb35..993206163 100644 --- a/setup.py +++ b/setup.py @@ -7,13 +7,13 @@ with open(os.path.join(os.path.dirname(__file__), "README.md")) as f: long_description = f.read() -version = os.environ.get('GIT_DESCRIBE_TAG', '0.0.0.dev0').lstrip('v') +version = os.environ.get("GIT_DESCRIBE_TAG", "0.0.0.dev0").lstrip("v") setup( name="dask-cuda", version=version, description="Utilities for Dask and CUDA interactions", long_description=long_description, - long_description_content_type='text/markdown', + long_description_content_type="text/markdown", url="https://github.com/rapidsai/dask-cuda", author="RAPIDS development team", author_email="mrocklin@nvidia.com", @@ -27,9 +27,9 @@ "Programming Language :: Python :: 3.7", ], packages=find_packages(exclude=["docs", "tests", "tests.*", "docs.*"]), - install_requires=open('requirements.txt').read().strip().split('\n'), - entry_points=''' + install_requires=open("requirements.txt").read().strip().split("\n"), + entry_points=""" [console_scripts] dask-cuda-worker=dask_cuda.dask_cuda_worker:go - ''', + """, )