From 9e37bd266f9cb48ea89a1179382fba2f3dd81ea8 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Tue, 14 May 2019 22:56:34 +0200 Subject: [PATCH 01/16] Add DeviceHostFile class to handle memory-spilling in LRU fashion --- dask_cuda/device_host_file.py | 104 ++++++++++++++++++++++++++++++++++ 1 file changed, 104 insertions(+) create mode 100644 dask_cuda/device_host_file.py diff --git a/dask_cuda/device_host_file.py b/dask_cuda/device_host_file.py new file mode 100644 index 000000000..36a1e023b --- /dev/null +++ b/dask_cuda/device_host_file.py @@ -0,0 +1,104 @@ +from zict import Buffer, File, Func +from zict.common import ZictBase +from distributed.protocol import deserialize_bytes, serialize_bytes +from distributed.worker import weight + +from functools import partial +import os + + +def _is_device_object(obj): + """ + Check if obj is a device object, by checking if it has a + __cuda_array_interface__ attributed + """ + return hasattr(obj, "__cuda_array_interface__") + + +def _serialize_if_device(obj): + """ Serialize an object if it's a device object """ + if _is_device_object(obj): + return serialize_bytes(obj, on_error="raise") + else: + return obj + + +def _deserialize_if_device(obj): + """ Deserialize an object if it's an instance of bytes """ + if isinstance(obj, bytes): + return deserialize_bytes(obj) + else: + return obj + + +class DeviceHostFile(ZictBase): + """ Manages serialization/deserialization of objects. + + Three LRU cache levels are controlled, for device, host and disk. + Each level takes care of serializing objects once its limit has been + reached and pass it to the subsequent level. Similarly, each cache + may deserialize the object, but storing it back in the appropriate + cache, depending on the type of object being deserialized. + + Parameters + ---------- + device_memory_limit: int + Number of bytes of CUDA device memory for device LRU cache, + spills to host cache once filled. + memory_limit: int + Number of bytes of host memory for host LRU cache, spills to + disk once filled. + local_dir: path + Path where to store serialized objects on disk + """ + + def __init__( + self, device_memory_limit=None, memory_limit=None, local_dir="dask-worker-space" + ): + path = os.path.join(local_dir, "storage") + + self.host_func = dict() + self.disk_func = Func( + partial(serialize_bytes, on_error="raise"), deserialize_bytes, File(path) + ) + self.host_buffer = Buffer( + self.host_func, self.disk_func, memory_limit, weight=weight + ) + + self.device_func = dict() + self.device_host_func = Func( + _serialize_if_device, _deserialize_if_device, self.host_buffer + ) + self.device_buffer = Buffer( + self.device_func, self.device_host_func, device_memory_limit, weight=weight + ) + + self.device = self.device_buffer.fast.d + self.host = self.host_buffer.fast.d + self.disk = self.host_buffer.slow.d + + def __setitem__(self, key, value): + if _is_device_object(value): + self.device_buffer[key] = value + else: + self.host_buffer[key] = value + + def __getitem__(self, key): + if key in self.host_buffer: + obj = self.host_buffer[key] + del self.host_buffer[key] + self.device_buffer[key] = _deserialize_if_device(obj) + + if key in self.device_buffer: + return self.device_buffer[key] + else: + raise KeyError + + def __len__(self): + return len(self.device_buffer) + + def __iter__(self): + return iter(self.device_buffer) + + def __delitem__(self, i): + del self.device_buffer[i] From eae1bf8e08310919a6370100bef95f936ae730ae Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Tue, 14 May 2019 22:57:09 +0200 Subject: [PATCH 02/16] Add DeviceHostFile tests --- dask_cuda/tests/test_device_host_file.py | 112 +++++++++++++++++++++++ 1 file changed, 112 insertions(+) create mode 100644 dask_cuda/tests/test_device_host_file.py diff --git a/dask_cuda/tests/test_device_host_file.py b/dask_cuda/tests/test_device_host_file.py new file mode 100644 index 000000000..c4cacb30b --- /dev/null +++ b/dask_cuda/tests/test_device_host_file.py @@ -0,0 +1,112 @@ +import numpy as np +import cupy +from dask_cuda.device_host_file import DeviceHostFile +from random import randint + +import pytest +from cupy.testing import assert_array_equal + + +@pytest.mark.parametrize("num_host_arrays", [1, 10, 100]) +@pytest.mark.parametrize("num_device_arrays", [1, 10, 100]) +@pytest.mark.parametrize("array_size_range", [(1, 1000), (100, 100), (1000, 1000)]) +def test_device_host_file_short( + tmp_path, num_device_arrays, num_host_arrays, array_size_range +): + tmpdir = tmp_path / "storage" + tmpdir.mkdir() + dhf = DeviceHostFile( + device_memory_limit=1024 * 16, memory_limit=1024 * 16, local_dir=tmpdir + ) + + host = [ + ("x-%d" % i, np.random.random(randint(*array_size_range))) + for i in range(num_host_arrays) + ] + device = [ + ("dx-%d" % i, cupy.random.random(randint(*array_size_range))) + for i in range(num_device_arrays) + ] + + import random + + full = host + device + random.shuffle(full) + + for i in full: + dhf[i[0]] = i[1] + + random.shuffle(full) + + for i in full: + assert_array_equal(i[1], dhf[i[0]]) + del dhf[i[0]] + + assert set(dhf.device.keys()) == set() + assert set(dhf.host.keys()) == set() + assert set(dhf.disk.keys()) == set() + + +def test_device_host_file_step_by_step(tmp_path): + tmpdir = tmp_path / "storage" + tmpdir.mkdir() + dhf = DeviceHostFile( + device_memory_limit=1024 * 16, memory_limit=1024 * 16, local_dir=tmpdir + ) + + a = np.random.random(1000) + b = cupy.random.random(1000) + + dhf["a1"] = a + + assert set(dhf.device.keys()) == set() + assert set(dhf.host.keys()) == set(["a1"]) + assert set(dhf.disk.keys()) == set() + + dhf["b1"] = b + + assert set(dhf.device.keys()) == set(["b1"]) + assert set(dhf.host.keys()) == set(["a1"]) + assert set(dhf.disk.keys()) == set() + + dhf["b2"] = b + assert set(dhf.device.keys()) == set(["b1", "b2"]) + assert set(dhf.host.keys()) == set(["a1"]) + assert set(dhf.disk.keys()) == set() + + dhf["b3"] = b + assert set(dhf.device.keys()) == set(["b2", "b3"]) + assert set(dhf.host.keys()) == set(["a1", "b1"]) + assert set(dhf.disk.keys()) == set() + + dhf["a2"] = a + assert set(dhf.device.keys()) == set(["b2", "b3"]) + assert set(dhf.host.keys()) == set(["a2", "b1"]) + assert set(dhf.disk.keys()) == set(["a1"]) + + dhf["b4"] = b + assert set(dhf.device.keys()) == set(["b3", "b4"]) + assert set(dhf.host.keys()) == set(["a2", "b2"]) + assert set(dhf.disk.keys()) == set(["a1", "b1"]) + + dhf["b4"] = b + assert set(dhf.device.keys()) == set(["b3", "b4"]) + assert set(dhf.host.keys()) == set(["a2", "b2"]) + assert set(dhf.disk.keys()) == set(["a1", "b1"]) + + assert_array_equal(dhf["a1"], a) + del dhf["a1"] + assert_array_equal(dhf["a2"], a) + del dhf["a2"] + assert_array_equal(dhf["b1"], b) + del dhf["b1"] + assert_array_equal(dhf["b2"], b) + del dhf["b2"] + assert_array_equal(dhf["b3"], b) + del dhf["b3"] + assert_array_equal(dhf["b4"], b) + del dhf["b4"] + + assert set(dhf.device.keys()) == set() + assert set(dhf.host.keys()) == set() + assert set(dhf.disk.keys()) == set() From 813830594b8c9f50628777ee9f70ae8370555924 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Tue, 14 May 2019 23:00:02 +0200 Subject: [PATCH 03/16] Pass DeviceHostFile to Worker via data argument --- dask_cuda/dask_cuda_worker.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dask_cuda/dask_cuda_worker.py b/dask_cuda/dask_cuda_worker.py index 7667c137d..9652cfef5 100755 --- a/dask_cuda/dask_cuda_worker.py +++ b/dask_cuda/dask_cuda_worker.py @@ -23,6 +23,7 @@ enable_proctitle_on_current, ) +from .device_host_file import DeviceHostFile from .local_cuda_cluster import cuda_visible_devices from .utils import get_n_gpus @@ -252,6 +253,7 @@ def del_pid_file(): contact_address=None, env={"CUDA_VISIBLE_DEVICES": cuda_visible_devices(i)}, name=name if nprocs == 1 or not name else name + "-" + str(i), + data=DeviceHostFile **kwargs ) for i in range(nprocs) From 49733d72519abb495588b8d47fdcd07d37577642 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Tue, 14 May 2019 23:05:42 +0200 Subject: [PATCH 04/16] Add CuPy and enable __array_function__ in CI build --- ci/gpu/build.sh | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/ci/gpu/build.sh b/ci/gpu/build.sh index 2a7e0c4cd..f640358f8 100755 --- a/ci/gpu/build.sh +++ b/ci/gpu/build.sh @@ -19,6 +19,10 @@ cd $WORKSPACE export GIT_DESCRIBE_TAG=`git describe --abbrev=0 --tags` export GIT_DESCRIBE_NUMBER=`git rev-list ${GIT_DESCRIBE_TAG}..HEAD --count` +# Enable NumPy's __array_function__ protocol (needed for NumPy 1.16.x, +# will possibly be enabled by default starting on 1.17) +export NUMPY_EXPERIMENTAL_ARRAY_FUNCTION=1 + ################################################################################ # SETUP - Check environment ################################################################################ @@ -38,6 +42,13 @@ conda list # FIX Added to deal with Anancoda SSL verification issues during conda builds conda config --set ssl_verify False +################################################################################ +# SETUP - Install additional packages +################################################################################ + +# Install CuPy for tests +pip install cupy-cuda100==6.0.0rc1 + ################################################################################ # TEST - Run tests ################################################################################ From 7ebed48faf696a54e1d9ee6564882e8d77e6903d Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Tue, 14 May 2019 23:09:48 +0200 Subject: [PATCH 05/16] Update version requirements of dask, distributed and numpy --- conda/recipes/dask-cuda/meta.yaml | 5 +++-- requirements.txt | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/conda/recipes/dask-cuda/meta.yaml b/conda/recipes/dask-cuda/meta.yaml index 5764cbc84..2b747affd 100644 --- a/conda/recipes/dask-cuda/meta.yaml +++ b/conda/recipes/dask-cuda/meta.yaml @@ -21,8 +21,9 @@ requirements: - setuptools run: - python x.x - - dask-core >=1.1.4 - - distributed >=1.25.2 + - dask-core >=1.2.1 + - distributed >=1.28.0 + - numpy >=1.16.0 test: imports: diff --git a/requirements.txt b/requirements.txt index ed5f31b5d..6d6c69dc1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ -dask>=1.1.4 -distributed>=1.25.2 +dask>=1.2.1 +distributed>=1.28.0 +numpy>=1.16.0 From 5f47afacd9c8ddd46a9a0df24680dac4c6c40545 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Wed, 15 May 2019 12:35:21 +0200 Subject: [PATCH 06/16] Add get_device_total_memory utility function --- dask_cuda/utils.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/dask_cuda/utils.py b/dask_cuda/utils.py index 054e679c9..3809f90f1 100644 --- a/dask_cuda/utils.py +++ b/dask_cuda/utils.py @@ -1,5 +1,6 @@ import toolz import os +from numba import cuda def get_n_gpus(): @@ -12,3 +13,11 @@ def get_n_gpus(): @toolz.memoize def _n_gpus_from_nvidia_smi(): return len(os.popen("nvidia-smi -L").read().strip().split("\n")) + + +def get_device_total_memory(index=0): + """ + Return total memory of CUDA device with index + """ + with cuda.gpus[index]: + return cuda.current_context().get_memory_info()[1] From c570360449aa1660072e818651a9c71313a6b926 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Wed, 15 May 2019 12:36:52 +0200 Subject: [PATCH 07/16] Pass pre-constructed DeviceHostFile to Worker --- dask_cuda/dask_cuda_worker.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/dask_cuda/dask_cuda_worker.py b/dask_cuda/dask_cuda_worker.py index 9652cfef5..e4b20cc51 100755 --- a/dask_cuda/dask_cuda_worker.py +++ b/dask_cuda/dask_cuda_worker.py @@ -9,7 +9,7 @@ from distributed import Nanny, Worker from distributed.config import config from distributed.utils import get_ip_interface, parse_timedelta -from distributed.worker import _ncores +from distributed.worker import _ncores, parse_memory_limit from distributed.security import Security from distributed.cli.utils import ( check_python_3, @@ -25,7 +25,7 @@ from .device_host_file import DeviceHostFile from .local_cuda_cluster import cuda_visible_devices -from .utils import get_n_gpus +from .utils import get_n_gpus, get_device_total_memory from toolz import valmap from tornado.ioloop import IOLoop, TimeoutError @@ -176,7 +176,7 @@ def main( nprocs = get_n_gpus() if not nthreads: - nthreads = min(1, _ncores // nprocs ) + nthreads = min(1, _ncores // nprocs) if pid_file: with open(pid_file, "w") as f: @@ -253,7 +253,12 @@ def del_pid_file(): contact_address=None, env={"CUDA_VISIBLE_DEVICES": cuda_visible_devices(i)}, name=name if nprocs == 1 or not name else name + "-" + str(i), - data=DeviceHostFile + data=DeviceHostFile( + device_memory_limit=get_device_total_memory(index=i), + memory_limit=parse_memory_limit( + memory_limit, nthreads, total_cores=nprocs + ), + ), **kwargs ) for i in range(nprocs) From a37d7f61fbc8ef51409c5a723f1f3206b9106e42 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Wed, 15 May 2019 14:46:21 +0200 Subject: [PATCH 08/16] Add memory spilling test --- dask_cuda/tests/test_spill.py | 92 +++++++++++++++++++++++++++++++++++ 1 file changed, 92 insertions(+) create mode 100644 dask_cuda/tests/test_spill.py diff --git a/dask_cuda/tests/test_spill.py b/dask_cuda/tests/test_spill.py new file mode 100644 index 000000000..e9138fffd --- /dev/null +++ b/dask_cuda/tests/test_spill.py @@ -0,0 +1,92 @@ +from distributed.utils_test import gen_cluster +from distributed.worker import Worker +from distributed import wait +from dask_cuda.device_host_file import DeviceHostFile +import dask.array as da +import pytest +import os +from zict.file import _safe_key as safe_key + + +def assert_device_host_file_size(dhf, total_bytes, chunk_overhead=1024): + byte_sum = dhf.device_buffer.fast.total_weight + dhf.host_buffer.fast.total_weight + file_path = [os.path.join(dhf.disk.directory, safe_key(k)) for k in dhf.disk.keys()] + file_size = [os.path.getsize(f) for f in file_path] + byte_sum += sum(file_size) + + # Allow up to chunk_overhead bytes overhead per chunk on disk + host_overhead = len(dhf.host) * chunk_overhead + disk_overhead = len(dhf.disk) * chunk_overhead + assert ( + byte_sum >= total_bytes + and byte_sum <= total_bytes + host_overhead + disk_overhead + ) + + +@pytest.mark.parametrize( + "params", + [ + { + "device_memory_limit": 2e9, + "memory_limit": 4e9, + "host_target": 0.6, + "host_spill": 0.7, + "spills_to_disk": False, + }, + { + "device_memory_limit": 1e9, + "memory_limit": 1e9, + "host_target": 0.3, + "host_spill": 0.3, + "spills_to_disk": True, + }, + ], +) +def test_device_spill(params): + @gen_cluster( + client=True, + ncores=[("127.0.0.1", 1)], + Worker=Worker, + worker_kwargs={ + "memory_limit": params["memory_limit"], + "data": DeviceHostFile( + device_memory_limit=params["device_memory_limit"], + memory_limit=params["memory_limit"], + ), + }, + config={ + "distributed.worker.memory.target": params["host_target"], + "distributed.worker.memory.spill": params["host_spill"], + }, + ) + def test_device_spill(client, scheduler, worker): + import cupy + + cupy.cuda.set_allocator(None) + cupy.cuda.set_pinned_memory_allocator(None) + + rs = da.random.RandomState(RandomState=cupy.random.RandomState) + x = rs.random(int(250e6), chunks=10e6) + yield wait(x) + + xx = x.persist() + yield wait(xx) + + print(worker.data.device_buffer) + + # Allow up to 1024 bytes overhead per chunk serialized + assert_device_host_file_size(worker.data, x.nbytes, 1024) + + y = client.compute(x.sum()) + res = yield y + print(worker.data.device_buffer) + + assert (abs(res / x.size) - 0.5) < 1e-3 + + assert_device_host_file_size(worker.data, x.nbytes, 1024) + if params["spills_to_disk"]: + assert len(worker.data.disk) > 0 + else: + assert len(worker.data.disk) == 0 + + test_device_spill() From 18ced446904b174f342eaa872765ab9b93a6541c Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Wed, 15 May 2019 14:54:32 +0200 Subject: [PATCH 09/16] Add numba as a requirement --- conda/recipes/dask-cuda/meta.yaml | 1 + requirements.txt | 1 + 2 files changed, 2 insertions(+) diff --git a/conda/recipes/dask-cuda/meta.yaml b/conda/recipes/dask-cuda/meta.yaml index 2b747affd..6be19fc57 100644 --- a/conda/recipes/dask-cuda/meta.yaml +++ b/conda/recipes/dask-cuda/meta.yaml @@ -24,6 +24,7 @@ requirements: - dask-core >=1.2.1 - distributed >=1.28.0 - numpy >=1.16.0 + - numba >=0.40.1 test: imports: diff --git a/requirements.txt b/requirements.txt index 6d6c69dc1..1ea6bcd55 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ dask>=1.2.1 distributed>=1.28.0 numpy>=1.16.0 +numba>=0.40.1 From ac56871c972f1f822e7dd086526193288786c5c6 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Wed, 15 May 2019 16:17:44 +0200 Subject: [PATCH 10/16] Add --device-memory-limit paramater to dask-cuda-worker --- dask_cuda/dask_cuda_worker.py | 17 +++++++++++++++-- dask_cuda/tests/test_dask_cuda_worker.py | 2 ++ 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/dask_cuda/dask_cuda_worker.py b/dask_cuda/dask_cuda_worker.py index e4b20cc51..49d7f0cf4 100755 --- a/dask_cuda/dask_cuda_worker.py +++ b/dask_cuda/dask_cuda_worker.py @@ -8,7 +8,7 @@ import click from distributed import Nanny, Worker from distributed.config import config -from distributed.utils import get_ip_interface, parse_timedelta +from distributed.utils import get_ip_interface, parse_timedelta, parse_bytes from distributed.worker import _ncores, parse_memory_limit from distributed.security import Security from distributed.cli.utils import ( @@ -99,6 +99,16 @@ "string (like 5GB or 5000M), " "'auto', or zero for no memory management", ) +@click.option( + "--device-memory-limit", + default="auto", + help="Bytes of memory per CUDA device that the worker can use. " + "This can be an integer (bytes), " + "float (fraction of total device memory), " + "string (like 5GB or 5000M), " + "'auto', or zero for no memory management " + "(i.e., allow full device memory usage).", +) @click.option( "--reconnect/--no-reconnect", default=True, @@ -147,6 +157,7 @@ def main( nthreads, name, memory_limit, + device_memory_limit, pid_file, reconnect, resources, @@ -254,7 +265,9 @@ def del_pid_file(): env={"CUDA_VISIBLE_DEVICES": cuda_visible_devices(i)}, name=name if nprocs == 1 or not name else name + "-" + str(i), data=DeviceHostFile( - device_memory_limit=get_device_total_memory(index=i), + device_memory_limit=get_device_total_memory(index=i) + if (device_memory_limit == "auto" or device_memory_limit == int(0)) + else parse_bytes(device_memory_limit), memory_limit=parse_memory_limit( memory_limit, nthreads, total_cores=nprocs ), diff --git a/dask_cuda/tests/test_dask_cuda_worker.py b/dask_cuda/tests/test_dask_cuda_worker.py index 2b72c73a4..bcfdfd4b6 100644 --- a/dask_cuda/tests/test_dask_cuda_worker.py +++ b/dask_cuda/tests/test_dask_cuda_worker.py @@ -25,6 +25,8 @@ def test_cuda_visible_devices(loop): "127.0.0.1:9359", "--host", "127.0.0.1", + "--device-memory-limit", + "1 MB", "--no-bokeh", ] ) as worker: From f7acb76b515c5199ef1be9bd36e8f175692814de Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Wed, 15 May 2019 16:58:20 +0200 Subject: [PATCH 11/16] Create work directory before DeviceHostFile --- dask_cuda/dask_cuda_worker.py | 40 +++++++++++++++++++++++++++-------- 1 file changed, 31 insertions(+), 9 deletions(-) diff --git a/dask_cuda/dask_cuda_worker.py b/dask_cuda/dask_cuda_worker.py index 49d7f0cf4..277092993 100755 --- a/dask_cuda/dask_cuda_worker.py +++ b/dask_cuda/dask_cuda_worker.py @@ -8,7 +8,13 @@ import click from distributed import Nanny, Worker from distributed.config import config -from distributed.utils import get_ip_interface, parse_timedelta, parse_bytes +from distributed.diskutils import WorkSpace +from distributed.utils import ( + get_ip_interface, + parse_timedelta, + parse_bytes, + warn_on_duration, +) from distributed.worker import _ncores, parse_memory_limit from distributed.security import Security from distributed.cli.utils import ( @@ -246,6 +252,18 @@ def del_pid_file(): if death_timeout is not None: death_timeout = parse_timedelta(death_timeout, "s") + local_dir = kwargs.get("local_dir", "dask-worker-space") + with warn_on_duration( + "1s", + "Creating scratch directories is taking a surprisingly long time. " + "This is often due to running workers on a network file system. " + "Consider specifying a local-directory to point workers to write " + "scratch data to a local disk.", + ): + _workspace = WorkSpace(os.path.abspath(local_dir)) + _workdir = _workspace.new_work_dir(prefix="worker-") + local_dir = _workdir.dir_path + nannies = [ t( scheduler, @@ -264,15 +282,19 @@ def del_pid_file(): contact_address=None, env={"CUDA_VISIBLE_DEVICES": cuda_visible_devices(i)}, name=name if nprocs == 1 or not name else name + "-" + str(i), - data=DeviceHostFile( - device_memory_limit=get_device_total_memory(index=i) - if (device_memory_limit == "auto" or device_memory_limit == int(0)) - else parse_bytes(device_memory_limit), - memory_limit=parse_memory_limit( - memory_limit, nthreads, total_cores=nprocs - ), + data=( + DeviceHostFile, + { + "device_memory_limit": get_device_total_memory(index=i) + if (device_memory_limit == "auto" or device_memory_limit == int(0)) + else parse_bytes(device_memory_limit), + "memory_limit": parse_memory_limit( + memory_limit, nthreads, total_cores=nprocs + ), + "local_dir": local_dir, + }, ), - **kwargs + **kwargs, ) for i in range(nprocs) ] From c9b283fbedbb612ef1dd83f04a21777b1fd6bab6 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Wed, 15 May 2019 21:35:55 +0200 Subject: [PATCH 12/16] Add DeviceHostFile support for LocalCUDACluster --- dask_cuda/local_cuda_cluster.py | 53 +++++++++++++++++++++++++++++++-- 1 file changed, 50 insertions(+), 3 deletions(-) diff --git a/dask_cuda/local_cuda_cluster.py b/dask_cuda/local_cuda_cluster.py index 11cb88716..7eba04bbf 100644 --- a/dask_cuda/local_cuda_cluster.py +++ b/dask_cuda/local_cuda_cluster.py @@ -1,11 +1,14 @@ import os +import warnings from tornado import gen from dask.distributed import LocalCluster -from distributed.worker import TOTAL_MEMORY +from distributed.nanny import Nanny +from distributed.worker import Worker, TOTAL_MEMORY -from .utils import get_n_gpus +from .device_host_file import DeviceHostFile +from .utils import get_n_gpus, get_device_total_memory def cuda_visible_devices(i, visible=None): @@ -35,7 +38,8 @@ def __init__( threads_per_worker=1, processes=True, memory_limit=None, - **kwargs + device_memory_limit=None, + **kwargs, ): if n_workers is None: n_workers = get_n_gpus() @@ -45,6 +49,9 @@ def __init__( raise ValueError("Can not specify more processes than GPUs") if memory_limit is None: memory_limit = TOTAL_MEMORY / n_workers + self.host_memory_limit = memory_limit + self.device_memory_limit = device_memory_limit + LocalCluster.__init__( self, n_workers=n_workers, @@ -82,3 +89,43 @@ def _start(self, ip=None, n_workers=0): self.status = "running" raise gen.Return(self) + + @gen.coroutine + def _start_worker(self, death_timeout=60, **kwargs): + if self.status and self.status.startswith("clos"): + warnings.warn("Tried to start a worker while status=='%s'" % self.status) + return + + if self.processes: + W = Nanny + kwargs["quiet"] = True + else: + W = Worker + + device_index = int(kwargs["env"]["CUDA_VISIBLE_DEVICES"].split(",")[0]) + if self.device_memory_limit is None: + self.device_memory_limit = get_device_total_memory(device_index) + data = DeviceHostFile( + device_memory_limit=self.device_memory_limit, + memory_limit=self.host_memory_limit, + ) + + w = yield W( + self.scheduler.address, + loop=self.loop, + death_timeout=death_timeout, + silence_logs=self.silence_logs, + data=data, + **kwargs, + ) + + self.workers.append(w) + + while w.status != "closed" and w.worker_address not in self.scheduler.workers: + yield gen.sleep(0.01) + + if w.status == "closed" and self.scheduler.status == "running": + self.workers.remove(w) + raise gen.TimeoutError("Worker failed to start") + + raise gen.Return(w) From 6428b5478820f1e95097ca3a7e18ca6cf1fe0684 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Wed, 15 May 2019 21:37:48 +0200 Subject: [PATCH 13/16] Add LocalCUDACluster device spilling test --- dask_cuda/tests/test_spill.py | 77 ++++++++++++++++++++++++++++++----- 1 file changed, 67 insertions(+), 10 deletions(-) diff --git a/dask_cuda/tests/test_spill.py b/dask_cuda/tests/test_spill.py index e9138fffd..bc16923a3 100644 --- a/dask_cuda/tests/test_spill.py +++ b/dask_cuda/tests/test_spill.py @@ -1,8 +1,10 @@ -from distributed.utils_test import gen_cluster +from distributed.utils_test import gen_cluster, loop, gen_test from distributed.worker import Worker -from distributed import wait +from distributed import Client, get_worker, wait +from dask_cuda import LocalCUDACluster from dask_cuda.device_host_file import DeviceHostFile import dask.array as da +import cupy import pytest import os from zict.file import _safe_key as safe_key @@ -60,11 +62,6 @@ def test_device_spill(params): }, ) def test_device_spill(client, scheduler, worker): - import cupy - - cupy.cuda.set_allocator(None) - cupy.cuda.set_pinned_memory_allocator(None) - rs = da.random.RandomState(RandomState=cupy.random.RandomState) x = rs.random(int(250e6), chunks=10e6) yield wait(x) @@ -72,14 +69,11 @@ def test_device_spill(client, scheduler, worker): xx = x.persist() yield wait(xx) - print(worker.data.device_buffer) - # Allow up to 1024 bytes overhead per chunk serialized assert_device_host_file_size(worker.data, x.nbytes, 1024) y = client.compute(x.sum()) res = yield y - print(worker.data.device_buffer) assert (abs(res / x.size) - 0.5) < 1e-3 @@ -90,3 +84,66 @@ def test_device_spill(client, scheduler, worker): assert len(worker.data.disk) == 0 test_device_spill() + + +@pytest.mark.parametrize( + "params", + [ + { + "device_memory_limit": 2e9, + "memory_limit": 4e9, + "host_target": 0.6, + "host_spill": 0.7, + "spills_to_disk": False, + }, + { + "device_memory_limit": 1e9, + "memory_limit": 1e9, + "host_target": 0.3, + "host_spill": 0.3, + "spills_to_disk": True, + }, + ], +) +def test_cluster_device_spill(loop, params): + @gen_test() + def test_cluster_device_spill(): + cluster = yield LocalCUDACluster( + 1, + scheduler_port=0, + processes=True, + silence_logs=False, + dashboard_address=None, + asynchronous=True, + device_memory_limit=params["device_memory_limit"], + memory_limit=params["memory_limit"], + memory_target_fraction=params["host_target"], + memory_spill_fraction=params["host_spill"], + ) + client = yield Client(cluster, asynchronous=True) + + rs = da.random.RandomState(RandomState=cupy.random.RandomState) + x = rs.random(int(250e6), chunks=10e6) + yield wait(x) + + xx = x.persist() + yield wait(xx) + + def get_data(worker, total_size): + assert_device_host_file_size(get_worker().data, total_size) + + # Allow up to 1024 bytes overhead per chunk serialized + yield client.run(get_data, cluster.workers[0].Worker, x.nbytes) + + y = client.compute(x.sum()) + res = yield y + + assert (abs(res / x.size) - 0.5) < 1e-3 + + yield client.run(get_data, cluster.workers[0].Worker, x.nbytes) + disk_chunks = yield client.run(lambda: len(get_worker().data.disk)) + for dc in disk_chunks.values(): + if params["spills_to_disk"]: + assert dc > 0 + else: + assert dc == 0 From be3710db6b3d6f0ec471aec9017332b3285481c2 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Wed, 15 May 2019 21:38:36 +0200 Subject: [PATCH 14/16] Add `fast` to DeviceHostFile for Worker compatibility --- dask_cuda/device_host_file.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dask_cuda/device_host_file.py b/dask_cuda/device_host_file.py index 36a1e023b..39785e978 100644 --- a/dask_cuda/device_host_file.py +++ b/dask_cuda/device_host_file.py @@ -77,6 +77,9 @@ def __init__( self.host = self.host_buffer.fast.d self.disk = self.host_buffer.slow.d + # For Worker compatibility only, where `fast` is host memory buffer + self.fast = self.host_buffer.fast + def __setitem__(self, key, value): if _is_device_object(value): self.device_buffer[key] = value From 45d9a268a5abf5113d41e5b36a1738678bb65dd4 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Wed, 15 May 2019 21:39:27 +0200 Subject: [PATCH 15/16] Fix some setup.py formatting --- setup.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/setup.py b/setup.py index 027f9cb35..993206163 100644 --- a/setup.py +++ b/setup.py @@ -7,13 +7,13 @@ with open(os.path.join(os.path.dirname(__file__), "README.md")) as f: long_description = f.read() -version = os.environ.get('GIT_DESCRIBE_TAG', '0.0.0.dev0').lstrip('v') +version = os.environ.get("GIT_DESCRIBE_TAG", "0.0.0.dev0").lstrip("v") setup( name="dask-cuda", version=version, description="Utilities for Dask and CUDA interactions", long_description=long_description, - long_description_content_type='text/markdown', + long_description_content_type="text/markdown", url="https://github.com/rapidsai/dask-cuda", author="RAPIDS development team", author_email="mrocklin@nvidia.com", @@ -27,9 +27,9 @@ "Programming Language :: Python :: 3.7", ], packages=find_packages(exclude=["docs", "tests", "tests.*", "docs.*"]), - install_requires=open('requirements.txt').read().strip().split('\n'), - entry_points=''' + install_requires=open("requirements.txt").read().strip().split("\n"), + entry_points=""" [console_scripts] dask-cuda-worker=dask_cuda.dask_cuda_worker:go - ''', + """, ) From 93adc53f6751d01b7df1a6393c3a2d1a6ce57e4f Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Wed, 15 May 2019 21:52:06 +0200 Subject: [PATCH 16/16] Fix LocalCUDACluster device_memory_limit parsing and local dir creation --- dask_cuda/local_cuda_cluster.py | 17 +++++++++++++++++ dask_cuda/tests/test_local_cuda_cluster.py | 10 ++++++++-- 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/dask_cuda/local_cuda_cluster.py b/dask_cuda/local_cuda_cluster.py index 7eba04bbf..31adadf8f 100644 --- a/dask_cuda/local_cuda_cluster.py +++ b/dask_cuda/local_cuda_cluster.py @@ -4,8 +4,10 @@ from tornado import gen from dask.distributed import LocalCluster +from distributed.diskutils import WorkSpace from distributed.nanny import Nanny from distributed.worker import Worker, TOTAL_MEMORY +from distributed.utils import parse_bytes, warn_on_duration from .device_host_file import DeviceHostFile from .utils import get_n_gpus, get_device_total_memory @@ -102,12 +104,27 @@ def _start_worker(self, death_timeout=60, **kwargs): else: W = Worker + local_dir = kwargs.get("local_dir", "dask-worker-space") + with warn_on_duration( + "1s", + "Creating scratch directories is taking a surprisingly long time. " + "This is often due to running workers on a network file system. " + "Consider specifying a local-directory to point workers to write " + "scratch data to a local disk.", + ): + _workspace = WorkSpace(os.path.abspath(local_dir)) + _workdir = _workspace.new_work_dir(prefix="worker-") + local_dir = _workdir.dir_path + device_index = int(kwargs["env"]["CUDA_VISIBLE_DEVICES"].split(",")[0]) if self.device_memory_limit is None: self.device_memory_limit = get_device_total_memory(device_index) + elif isinstance(self.device_memory_limit, str): + self.device_memory_limit = parse_bytes(self.device_memory_limit) data = DeviceHostFile( device_memory_limit=self.device_memory_limit, memory_limit=self.host_memory_limit, + local_dir=local_dir, ) w = yield W( diff --git a/dask_cuda/tests/test_local_cuda_cluster.py b/dask_cuda/tests/test_local_cuda_cluster.py index 295518994..e17304fa2 100644 --- a/dask_cuda/tests/test_local_cuda_cluster.py +++ b/dask_cuda/tests/test_local_cuda_cluster.py @@ -11,7 +11,10 @@ @gen_test(timeout=20) async def test_local_cuda_cluster(): async with LocalCUDACluster( - scheduler_port=0, asynchronous=True, diagnostics_port=None + scheduler_port=0, + asynchronous=True, + diagnostics_port=None, + device_memory_limit=1, ) as cluster: async with Client(cluster, asynchronous=True) as client: assert len(cluster.workers) == utils.get_n_gpus() @@ -37,7 +40,10 @@ async def test_with_subset_of_cuda_visible_devices(): os.environ["CUDA_VISIBLE_DEVICES"] = "2,3,7,8" try: async with LocalCUDACluster( - scheduler_port=0, asynchronous=True, diagnostics_port=None + scheduler_port=0, + asynchronous=True, + diagnostics_port=None, + device_memory_limit=1, ) as cluster: async with Client(cluster, asynchronous=True) as client: assert len(cluster.workers) == 4