Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Device memory spill support (LRU-based only) #51

Merged
merged 16 commits into from
May 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions ci/gpu/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ cd $WORKSPACE
export GIT_DESCRIBE_TAG=`git describe --abbrev=0 --tags`
export GIT_DESCRIBE_NUMBER=`git rev-list ${GIT_DESCRIBE_TAG}..HEAD --count`

# Enable NumPy's __array_function__ protocol (needed for NumPy 1.16.x,
# will possibly be enabled by default starting on 1.17)
export NUMPY_EXPERIMENTAL_ARRAY_FUNCTION=1

################################################################################
# SETUP - Check environment
################################################################################
Expand All @@ -38,6 +42,13 @@ conda list
# FIX Added to deal with Anancoda SSL verification issues during conda builds
conda config --set ssl_verify False

################################################################################
# SETUP - Install additional packages
################################################################################

# Install CuPy for tests
pip install cupy-cuda100==6.0.0rc1

################################################################################
# TEST - Run tests
################################################################################
Expand Down
6 changes: 4 additions & 2 deletions conda/recipes/dask-cuda/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ requirements:
- setuptools
run:
- python x.x
- dask-core >=1.1.4
- distributed >=1.25.2
- dask-core >=1.2.1
- distributed >=1.28.0
- numpy >=1.16.0
- numba >=0.40.1

test:
imports:
Expand Down
52 changes: 47 additions & 5 deletions dask_cuda/dask_cuda_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,14 @@
import click
from distributed import Nanny, Worker
from distributed.config import config
from distributed.utils import get_ip_interface, parse_timedelta
from distributed.worker import _ncores
from distributed.diskutils import WorkSpace
from distributed.utils import (
get_ip_interface,
parse_timedelta,
parse_bytes,
warn_on_duration,
)
from distributed.worker import _ncores, parse_memory_limit
from distributed.security import Security
from distributed.cli.utils import (
check_python_3,
Expand All @@ -23,8 +29,9 @@
enable_proctitle_on_current,
)

from .device_host_file import DeviceHostFile
from .local_cuda_cluster import cuda_visible_devices
from .utils import get_n_gpus
from .utils import get_n_gpus, get_device_total_memory

from toolz import valmap
from tornado.ioloop import IOLoop, TimeoutError
Expand Down Expand Up @@ -98,6 +105,16 @@
"string (like 5GB or 5000M), "
"'auto', or zero for no memory management",
)
@click.option(
"--device-memory-limit",
default="auto",
help="Bytes of memory per CUDA device that the worker can use. "
"This can be an integer (bytes), "
"float (fraction of total device memory), "
"string (like 5GB or 5000M), "
"'auto', or zero for no memory management "
"(i.e., allow full device memory usage).",
)
@click.option(
"--reconnect/--no-reconnect",
default=True,
Expand Down Expand Up @@ -146,6 +163,7 @@ def main(
nthreads,
name,
memory_limit,
device_memory_limit,
pid_file,
reconnect,
resources,
Expand Down Expand Up @@ -175,7 +193,7 @@ def main(
nprocs = get_n_gpus()

if not nthreads:
nthreads = min(1, _ncores // nprocs )
nthreads = min(1, _ncores // nprocs)

if pid_file:
with open(pid_file, "w") as f:
Expand Down Expand Up @@ -234,6 +252,18 @@ def del_pid_file():
if death_timeout is not None:
death_timeout = parse_timedelta(death_timeout, "s")

local_dir = kwargs.get("local_dir", "dask-worker-space")
with warn_on_duration(
"1s",
"Creating scratch directories is taking a surprisingly long time. "
"This is often due to running workers on a network file system. "
"Consider specifying a local-directory to point workers to write "
"scratch data to a local disk.",
):
_workspace = WorkSpace(os.path.abspath(local_dir))
_workdir = _workspace.new_work_dir(prefix="worker-")
local_dir = _workdir.dir_path

nannies = [
t(
scheduler,
Expand All @@ -252,7 +282,19 @@ def del_pid_file():
contact_address=None,
env={"CUDA_VISIBLE_DEVICES": cuda_visible_devices(i)},
name=name if nprocs == 1 or not name else name + "-" + str(i),
**kwargs
data=(
DeviceHostFile,
{
"device_memory_limit": get_device_total_memory(index=i)
if (device_memory_limit == "auto" or device_memory_limit == int(0))
else parse_bytes(device_memory_limit),
"memory_limit": parse_memory_limit(
memory_limit, nthreads, total_cores=nprocs
),
"local_dir": local_dir,
},
),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend the following instead:

data=(DeviceHostFile, {...})

Otherwise we create the data object immediately here, and then need to pass it down to the worker through a process boundary.

This is implemented here, but it looks like it's not documented anywhere (my mistake)

https://github.com/dask/distributed/blob/8e449d392e91eff0a3454ee98ef362de8f78cc4f/distributed/worker.py#L500-L501

Also, if the user specifies device_memory_limit=0 then we might want something simpler. I can imagine wanting to turn off this behavior if things get complex.

We probably also want the same treatment in LocalCUDACluster, though as a first pass we can also include this ourselves.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was in the process of doing that, only saw your comment now. Indeed, this is a better solution, but either way, the downside is that I had to create the work directory in dask-cuda-worker, which I'm not particularly happy with, so if you have a suggestion on how to avoid that, it would be great!

I'm already working on the LocalCUDACluster, I'm still not done, will soon push those.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the downside is that I had to create the work directory in dask-cuda-worker, which I'm not particularly happy with, so if you have a suggestion on how to avoid that, it would be great!

Hrm, yes I can see how that would be a concern. I don't have a good solution currently unfortunately. I'll think about it though.

**kwargs,
)
for i in range(nprocs)
]
Expand Down
107 changes: 107 additions & 0 deletions dask_cuda/device_host_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
from zict import Buffer, File, Func
from zict.common import ZictBase
from distributed.protocol import deserialize_bytes, serialize_bytes
from distributed.worker import weight

from functools import partial
import os


def _is_device_object(obj):
"""
Check if obj is a device object, by checking if it has a
__cuda_array_interface__ attributed
"""
return hasattr(obj, "__cuda_array_interface__")


def _serialize_if_device(obj):
""" Serialize an object if it's a device object """
if _is_device_object(obj):
return serialize_bytes(obj, on_error="raise")
else:
return obj


def _deserialize_if_device(obj):
""" Deserialize an object if it's an instance of bytes """
if isinstance(obj, bytes):
return deserialize_bytes(obj)
else:
return obj


class DeviceHostFile(ZictBase):
""" Manages serialization/deserialization of objects.

Three LRU cache levels are controlled, for device, host and disk.
Each level takes care of serializing objects once its limit has been
reached and pass it to the subsequent level. Similarly, each cache
may deserialize the object, but storing it back in the appropriate
cache, depending on the type of object being deserialized.

Parameters
----------
device_memory_limit: int
Number of bytes of CUDA device memory for device LRU cache,
spills to host cache once filled.
memory_limit: int
Number of bytes of host memory for host LRU cache, spills to
disk once filled.
local_dir: path
Path where to store serialized objects on disk
"""

def __init__(
self, device_memory_limit=None, memory_limit=None, local_dir="dask-worker-space"
):
path = os.path.join(local_dir, "storage")

self.host_func = dict()
self.disk_func = Func(
partial(serialize_bytes, on_error="raise"), deserialize_bytes, File(path)
)
self.host_buffer = Buffer(
self.host_func, self.disk_func, memory_limit, weight=weight
)

self.device_func = dict()
self.device_host_func = Func(
_serialize_if_device, _deserialize_if_device, self.host_buffer
)
self.device_buffer = Buffer(
self.device_func, self.device_host_func, device_memory_limit, weight=weight
)

self.device = self.device_buffer.fast.d
self.host = self.host_buffer.fast.d
self.disk = self.host_buffer.slow.d

# For Worker compatibility only, where `fast` is host memory buffer
self.fast = self.host_buffer.fast

def __setitem__(self, key, value):
if _is_device_object(value):
self.device_buffer[key] = value
else:
self.host_buffer[key] = value

def __getitem__(self, key):
if key in self.host_buffer:
obj = self.host_buffer[key]
del self.host_buffer[key]
self.device_buffer[key] = _deserialize_if_device(obj)

if key in self.device_buffer:
return self.device_buffer[key]
else:
raise KeyError

def __len__(self):
return len(self.device_buffer)

def __iter__(self):
return iter(self.device_buffer)

def __delitem__(self, i):
del self.device_buffer[i]
70 changes: 67 additions & 3 deletions dask_cuda/local_cuda_cluster.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
import os
import warnings

from tornado import gen

from dask.distributed import LocalCluster
from distributed.worker import TOTAL_MEMORY
from distributed.diskutils import WorkSpace
from distributed.nanny import Nanny
from distributed.worker import Worker, TOTAL_MEMORY
from distributed.utils import parse_bytes, warn_on_duration

from .utils import get_n_gpus
from .device_host_file import DeviceHostFile
from .utils import get_n_gpus, get_device_total_memory


def cuda_visible_devices(i, visible=None):
Expand Down Expand Up @@ -35,7 +40,8 @@ def __init__(
threads_per_worker=1,
processes=True,
memory_limit=None,
**kwargs
device_memory_limit=None,
**kwargs,
):
if n_workers is None:
n_workers = get_n_gpus()
Expand All @@ -45,6 +51,9 @@ def __init__(
raise ValueError("Can not specify more processes than GPUs")
if memory_limit is None:
memory_limit = TOTAL_MEMORY / n_workers
self.host_memory_limit = memory_limit
self.device_memory_limit = device_memory_limit

LocalCluster.__init__(
self,
n_workers=n_workers,
Expand Down Expand Up @@ -82,3 +91,58 @@ def _start(self, ip=None, n_workers=0):
self.status = "running"

raise gen.Return(self)

@gen.coroutine
def _start_worker(self, death_timeout=60, **kwargs):
if self.status and self.status.startswith("clos"):
warnings.warn("Tried to start a worker while status=='%s'" % self.status)
return

if self.processes:
W = Nanny
kwargs["quiet"] = True
else:
W = Worker

local_dir = kwargs.get("local_dir", "dask-worker-space")
with warn_on_duration(
"1s",
"Creating scratch directories is taking a surprisingly long time. "
"This is often due to running workers on a network file system. "
"Consider specifying a local-directory to point workers to write "
"scratch data to a local disk.",
):
_workspace = WorkSpace(os.path.abspath(local_dir))
_workdir = _workspace.new_work_dir(prefix="worker-")
local_dir = _workdir.dir_path

device_index = int(kwargs["env"]["CUDA_VISIBLE_DEVICES"].split(",")[0])
if self.device_memory_limit is None:
self.device_memory_limit = get_device_total_memory(device_index)
elif isinstance(self.device_memory_limit, str):
self.device_memory_limit = parse_bytes(self.device_memory_limit)
data = DeviceHostFile(
device_memory_limit=self.device_memory_limit,
memory_limit=self.host_memory_limit,
local_dir=local_dir,
)

w = yield W(
self.scheduler.address,
loop=self.loop,
death_timeout=death_timeout,
silence_logs=self.silence_logs,
data=data,
**kwargs,
)

self.workers.append(w)

while w.status != "closed" and w.worker_address not in self.scheduler.workers:
yield gen.sleep(0.01)

if w.status == "closed" and self.scheduler.status == "running":
self.workers.remove(w)
raise gen.TimeoutError("Worker failed to start")

raise gen.Return(w)
2 changes: 2 additions & 0 deletions dask_cuda/tests/test_dask_cuda_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ def test_cuda_visible_devices(loop):
"127.0.0.1:9359",
"--host",
"127.0.0.1",
"--device-memory-limit",
"1 MB",
"--no-bokeh",
]
) as worker:
Expand Down
Loading