Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Device memory spill support #35

Closed
wants to merge 47 commits into from
Closed
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
1aa68a8
Add support for yaml configurations
pentschev Apr 18, 2019
7349974
Add DeviceHostFile class to handle memory-spilling in LRU fashion
pentschev Apr 18, 2019
377368a
Add CUDAWorker class to handle device memory
pentschev Apr 18, 2019
197e653
Use CUDAWorker by default, add --device-memory-limit parameter
pentschev Apr 18, 2019
de5c3e9
Rename configurations dask-cuda -> cuda
pentschev Apr 18, 2019
42c0686
Use CUDAWorker on LocalCUDACluster
pentschev Apr 23, 2019
c92bbc9
Add scheduler_ip argument to CUDAWorker
pentschev Apr 23, 2019
ce4ba37
Add test_device_spill
pentschev Apr 23, 2019
b3cbf2c
Add assert_device_host_file_size in utils_test module
pentschev Apr 23, 2019
855876c
Temporarily build with dask-distributed master
pentschev Apr 23, 2019
07fccfa
Install CuPy for CI builds
pentschev Apr 23, 2019
6d48805
Fix DeviceHostFile transfer between host/device dicts
pentschev Apr 25, 2019
1dd2c16
Add DeviceHostFile test
pentschev Apr 25, 2019
df6cb95
Update numpy and numba requirements
pentschev Apr 25, 2019
5a7ceef
Use Dask master and enable __array_function__ for CI builds
pentschev Apr 25, 2019
8659fb6
Fixes for GPU CI
pentschev Apr 29, 2019
2c24f03
Fix test_spill, extend it to multiple parameters
pentschev Apr 29, 2019
1d06c75
Minor device_host_file and utils_test fixes
pentschev Apr 29, 2019
deff18f
Add CUDANanny, wraps Nanny but defaults to CUDAWorker
pentschev Apr 29, 2019
2d9c150
Fix GPU indices for test_cuda_visible_devices
pentschev Apr 30, 2019
ce5c650
Add single-GPU test for visible devices
pentschev Apr 30, 2019
c5fbb6f
Improve documentation for CUDANanny, DeviceHostFile
pentschev Apr 30, 2019
dcc6a6a
Fix defaults for device_memory_limit
pentschev May 1, 2019
9bc21e7
Fix test_with_subset_of_cuda_visible_devices
pentschev May 1, 2019
9eb2dfd
Pass *args to CUDANanny and CUDAWorker
pentschev May 2, 2019
4324439
Increase required dask, distributed versions
pentschev May 2, 2019
388c677
Use yaml.safe_load
mrocklin May 2, 2019
c942438
Fix CUDA worker not pausing issue, print correct resume message
pentschev May 3, 2019
d968b0f
Support protocol= keyword in LocalCUDACluster (#39)
mrocklin May 3, 2019
1abb1eb
Merge branch 'branch-0.7' into device-memory-spill
mrocklin May 6, 2019
eb70191
Merge branch 'device-memory-spill' of github.com:pentschev/dask-cuda …
mrocklin May 6, 2019
806ac8b
Minor style changes
mrocklin May 6, 2019
f35826e
Change config namespace from cuda to distributed
mrocklin May 6, 2019
f308943
Rename files to remove cuda suffix
mrocklin May 6, 2019
1b9d38b
Fix style
pentschev May 6, 2019
6d5b714
Add device, host, disk LRU aliases to simplify user queries
pentschev May 7, 2019
358f194
Default pausing due to device memory usage to 0.0 (disable)
pentschev May 7, 2019
c81f763
Small doc fix in dask_cuda_worker.py
mrocklin May 7, 2019
f28cbd1
Avoid modifying DeviceHostFile internals in test_device_host_file
pentschev May 7, 2019
6163359
Merge remote-tracking branch 'origin/device-memory-spill' into device…
pentschev May 7, 2019
a3c89fb
Allow choosing worker-class when starting dask-cuda-worker
pentschev May 7, 2019
8f59141
Add back original test_cuda_visible_devices, using Worker class
pentschev May 7, 2019
1e3acce
Aliasing device, host and disk to dict instead of LRU
pentschev May 7, 2019
882bf31
Remove utils_test, use simple naming instead of gen_random_key()
pentschev May 7, 2019
1960b79
Use tmp_path instead of tempfile, fix flake8/black issues
pentschev May 7, 2019
eb86d5d
Add back 'loop' to test_cuda_visible_devices, despite flake8 complaints
pentschev May 7, 2019
8153b27
Add missing pause argument to test_spill
pentschev May 7, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions ci/gpu/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ cd $WORKSPACE
export GIT_DESCRIBE_TAG=`git describe --abbrev=0 --tags`
export GIT_DESCRIBE_NUMBER=`git rev-list ${GIT_DESCRIBE_TAG}..HEAD --count`

# Enable NumPy's __array_function__ protocol (needed for NumPy 1.16.x,
# will possibly be enabled by default starting on 1.17)
export NUMPY_EXPERIMENTAL_ARRAY_FUNCTION=1

################################################################################
# SETUP - Check environment
################################################################################
Expand All @@ -38,6 +42,13 @@ conda list
# FIX Added to deal with Anancoda SSL verification issues during conda builds
conda config --set ssl_verify False

################################################################################
# SETUP - Install additional packages
################################################################################

# Install CuPy for tests
pip install cupy-cuda100==6.0.0rc1

################################################################################
# TEST - Run tests
################################################################################
Expand Down
6 changes: 4 additions & 2 deletions conda/recipes/dask-cuda/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ requirements:
- setuptools
run:
- python x.x
- dask-core >=1.1.4
- distributed >=1.25.2
- dask-core >=1.2.1
- distributed >=1.27.1
- numpy >=1.16.0
- numba >=0.40.1

test:
imports:
Expand Down
1 change: 1 addition & 0 deletions dask_cuda/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from .local_cuda_cluster import LocalCUDACluster
from . import config
14 changes: 14 additions & 0 deletions dask_cuda/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import yaml
import os

import dask

config = dask.config.config


fn = os.path.join(os.path.dirname(__file__), "cuda.yaml")
dask.config.ensure_file(source=fn)
with open(fn) as f:
dask_cuda_defaults = yaml.safe_load(f)

dask.config.update_defaults(dask_cuda_defaults)
8 changes: 8 additions & 0 deletions dask_cuda/cuda.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
distributed:
worker:
# Fractions of device memory at which we take action to avoid memory blowup
# Set any of the lower three values to False to turn off the behavior entirely
device-memory:
target: 0.60 # target fraction to stay below
spill: 0.70 # fraction at which we spill to host
pause: 0.0 # fraction at which we pause worker threads - Disabled (0.0) by default, preventing worker from stalling when third-party memory managers are used and can't be disabled
19 changes: 15 additions & 4 deletions dask_cuda/dask_cuda_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@
import atexit
import logging
import os
from sys import exit

import click
from distributed import Nanny, Worker
from distributed import Nanny
from distributed.config import config
from distributed.utils import get_ip_interface, parse_timedelta
from distributed.worker import _ncores
Expand All @@ -16,13 +15,13 @@
uri_from_host_port,
install_signal_handlers,
)
from distributed.comm import get_address_host_port
from distributed.preloading import validate_preload_argv
from distributed.proctitle import (
enable_proctitle_on_children,
enable_proctitle_on_current,
)

from .worker import CUDAWorker
from .local_cuda_cluster import cuda_visible_devices
from .utils import get_n_gpus

Expand Down Expand Up @@ -98,6 +97,15 @@
"string (like 5GB or 5000M), "
"'auto', or zero for no memory management",
)
@click.option(
"--device-memory-limit",
default="auto",
help="Bytes of memory per CUDA device that the worker can use. "
"This can be an integer (bytes), "
"float (fraction of total device memory), "
"string (like 5GB or 5000M), "
"'auto', or zero for no memory management",
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to avoid this until someone asks for it if possible. I'm somewhat against an API like this because it forces us to enumerate the possible options in code. If we were to do something like this I think that we would probably provide the full namespace of the class and then try to import it.

However there is enough uncertainty here that, for maintenance reasons, I'd prefer that we not promise anything until someone is asking us for this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For context, the worker_class keyword exists because different groups have their own custom Worker classes that they use. Exposing a keyword like this but making no move to allow them to plug in seems against the spirit of the keyword.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For things like the memory monitor, this may not be an issue, both because we have disabled it temporarily, and because it may just not be necessary with the kind of data used on GPUs.

That's not true. We solely disabled pausing the worker, to control spilling from the device, we still need to monitor the device memory. And this is why I needed to subclass it.

I would prefer to avoid this until someone asks for it if possible. I'm somewhat against an API like this because it forces us to enumerate the possible options in code. If we were to do something like this I think that we would probably provide the full namespace of the class and then try to import it.

Unfortunately, this is necessary for us to reenable the old CUDA_VISIBLE_DEVICES test. Using CUDAWorker, which was hardcoded before, prevents us from launching a process on a single-GPU machine to mock test as if it had several and check for the ordering of GPUs of each process.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For things like the memory monitor, this may not be an issue, both because we have disabled it temporarily, and because it may just not be necessary with the kind of data used on GPUs.

That's not true. We solely disabled pausing the worker, to control spilling from the device, we still need to monitor the device memory. And this is why I needed to subclass it.

Why do we need to monitor device memory externally from the use of sizeof? If objects' reliably tell us how much device memory they take up then we may not need to track device memory separately.

Dask workers operated this way for a long time before we finally needed to give in and have a separate periodic callback that tracked system memory. I'm inclined to try the simpler approach first and see if it breaks down or not.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, they do track how much memory they take. However, tracking the device memory lets us decide when it's time to spill memory. Isn't that what memory_target_fraction and memory_spill_fraction (prepend device_ for the cases in this PR) are for?

The block https://github.com/rapidsai/dask-cuda/pull/35/files#diff-a77f0c6f19d8d34d59aede5e31455719R282 controls the spilling, and this is why we needed to subclass Worker.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, something like that diff. You'll also want to add the data= keyword to LocalCUDACluster.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation on memory monitor.

Yes, something like that diff. You'll also want to add the data= keyword to LocalCUDACluster.

Yes, and I also want to create the object before. :)

But ok, I can probably have it quickly done by tomorrow. There's a few more things that need to be ported to allow it to work (like finding out how much memory the device has in total), and also some test(s), which shouldn't be too difficult now that there's already one that works with the monitoring mechanism and I can base it on that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and I also want to create the object before. :)

It's actually valid to pass just the class. Dask will construct it. I think that this is explained in the Worker docstring. This is better because you're using a Nanny and don't want to pass it through a process boundary.

like finding out how much memory the device has in total)

I recommend that we start with just using the full memory or a config value by default and not mess with any user inputs (which will get messy).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's actually valid to pass just the class. Dask will construct it. I think that this is explained in the Worker docstring. This is better because you're using a Nanny and don't want to pass it through a process boundary.

Ok, I'll check that.

I recommend that we start with just using the full memory or a config value by default and not mess with any user inputs (which will get messy).

We need to identify how much memory there is available for the device, regardless. I can probably use the same numba code from before.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, mostly I want to say lets not add a new device_memory_foo= keyword yet if we can avoid it.

@click.option(
"--reconnect/--no-reconnect",
default=True,
Expand Down Expand Up @@ -146,6 +154,7 @@ def main(
nthreads,
name,
memory_limit,
device_memory_limit,
pid_file,
reconnect,
resources,
Expand Down Expand Up @@ -175,7 +184,7 @@ def main(
nprocs = get_n_gpus()

if not nthreads:
nthreads = min(1, _ncores // nprocs )
nthreads = min(1, _ncores // nprocs)

if pid_file:
with open(pid_file, "w") as f:
Expand Down Expand Up @@ -243,6 +252,7 @@ def del_pid_file():
loop=loop,
resources=resources,
memory_limit=memory_limit,
device_memory_limit=device_memory_limit,
reconnect=reconnect,
local_dir=local_directory,
death_timeout=death_timeout,
Expand All @@ -252,6 +262,7 @@ def del_pid_file():
contact_address=None,
env={"CUDA_VISIBLE_DEVICES": cuda_visible_devices(i)},
name=name if nprocs == 1 or not name else name + "-" + str(i),
worker_class=CUDAWorker,
**kwargs
)
for i in range(nprocs)
Expand Down
104 changes: 104 additions & 0 deletions dask_cuda/device_host_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
from zict import Buffer, File, Func
from zict.common import ZictBase
from distributed.protocol import deserialize_bytes, serialize_bytes
from distributed.worker import weight

from functools import partial
import os


def _is_device_object(obj):
"""
Check if obj is a device object, by checking if it has a
__cuda_array_interface__ attributed
"""
return hasattr(obj, "__cuda_array_interface__")


def _serialize_if_device(obj):
""" Serialize an object if it's a device object """
if _is_device_object(obj):
return serialize_bytes(obj, on_error="raise")
else:
return obj


def _deserialize_if_device(obj):
""" Deserialize an object if it's an instance of bytes """
if isinstance(obj, bytes):
return deserialize_bytes(obj)
else:
return obj


class DeviceHostFile(ZictBase):
""" Manages serialization/deserialization of objects.

Three LRU cache levels are controlled, for device, host and disk.
Each level takes care of serializing objects once its limit has been
reached and pass it to the subsequent level. Similarly, each cache
may deserialize the object, but storing it back in the appropriate
cache, depending on the type of object being deserialized.

Parameters
----------
device_memory_limit: int
Number of bytes of CUDA device memory for device LRU cache,
spills to host cache once filled.
memory_limit: int
Number of bytes of host memory for host LRU cache, spills to
disk once filled.
local_dir: path
Path where to store serialized objects on disk
"""

def __init__(
self, device_memory_limit=None, memory_limit=None, local_dir="dask-worker-space"
):
path = os.path.join(local_dir, "storage")

self.host_func = dict()
self.disk_func = Func(
partial(serialize_bytes, on_error="raise"), deserialize_bytes, File(path)
)
self.host_buffer = Buffer(
self.host_func, self.disk_func, memory_limit, weight=weight
)

self.device_func = dict()
self.device_host_func = Func(
_serialize_if_device, _deserialize_if_device, self.host_buffer
)
self.device_buffer = Buffer(
self.device_func, self.device_host_func, device_memory_limit, weight=weight
)

self.device = self.device_buffer.fast
self.host = self.host_buffer.fast
self.disk = self.host_buffer.slow
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like these are LRU objects. maybe

        self.device = self.device_buffer.fast.d
        self.host = self.host_buffer.fast.d
        self.disk = self.host_buffer.slow

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry, I thought the LRU is what we wanted give access to the users, no? They can give more information, such as the used and total memory from the LRU, not just what objects are in there.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, print self.host (or CUDAWorker.data.host) should be equivalent of Worker.data.fast, where I don't think there was any alias such as Worker.data.fast = Worker.data.fast.d. Therefore, I think the way it is right now is closer to the behavior from the original Worker class.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that most users will be somewhat confused by the LRU object and find it unfamiliar. I would just hand them the dict. If they want fancier information they can dive through the full object.

Open to counter-arguments though. I don't have strong confidence in what will happen.

Users are confused today by Worker.data.fast (it's not at all clear by the name that this refers to the in-memory data). I think that we can improve upon the current Worker.data approach rather than copy it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that most users will be somewhat confused by the LRU object and find it unfamiliar. I would just hand them the dict. If they want fancier information they can dive through the full object.

Open to counter-arguments though. I don't have strong confidence in what will happen.

I see your point, and I think there are two benefits of keeping it the way it is:

  1. Users can read the LRU information;
  2. Users can still get the dict by simply adding .d.

Getting the LRU if they only have the dict is not possible, therefore, I would be more fond of having another alias for the dict (device_dict, and such, for example) itself than not having the LRU accessible. However, I have no strong objections to not exposing the LRU, just think it may be useful for some people.

Users are confused today by Worker.data.fast (it's not at all clear by the name that this refers to the in-memory data). I think that we can improve upon the current Worker.data approach rather than copy it.

I understand that now (there was no documentation for this). In that sense, I would suggest having the same aliases (host and disk) there too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Users can still get the dict by simply adding .d.

I think it's unlikley that the average user will know to add .d. We haven't told them, and, even if we did, they wouldn't read about it.

If they want LRU information (which I think will be the uncommon case) then they can go up to the full device_buffer object and navigate through. So the sophisticated results are still available to the sophisticated results, but the simple results are simple to interpret by a simple user.

I understand that now (there was no documentation for this). In that sense, I would suggest having the same aliases (host and disk) there too.

Agreed. (though I would rename host to memory for most users). Unfortunately we can't add this to the Buffer class (which is a bit more generic) but we could monkey-patch them in this worker code.

https://github.com/dask/distributed/blob/a61df1f54a7ae38c26d8d40de4e1e944067b8dea/distributed/worker.py#L479-L486

Want to submit a short PR or issue?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If they want LRU information (which I think will be the uncommon case) then they can go up to the full device_buffer object and navigate through. So the sophisticated results are still available to the sophisticated results, but the simple results are simple to interpret by a simple user.

Fine by me, I will do the change.

Agreed. (though I would rename host to memory for most users). Unfortunately we can't add this to the Buffer class (which is a bit more generic) but we could monkey-patch them in this worker code.

That's what I had mind, more or less the same I did here.

Want to submit a short PR or issue?

Sure, let me first finish the changes here.


def __setitem__(self, key, value):
if _is_device_object(value):
self.device_buffer[key] = value
else:
self.host_buffer[key] = value

def __getitem__(self, key):
if key in self.host_buffer:
obj = self.host_buffer[key]
del self.host_buffer[key]
self.device_buffer[key] = _deserialize_if_device(obj)

if key in self.device_buffer:
return self.device_buffer[key]
else:
raise KeyError

def __len__(self):
return len(self.device_buffer)

def __iter__(self):
return iter(self.device_buffer)

def __delitem__(self, i):
del self.device_buffer[i]
34 changes: 25 additions & 9 deletions dask_cuda/local_cuda_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

from dask.distributed import LocalCluster
from distributed.worker import TOTAL_MEMORY
from distributed.utils import get_ip_interface

from .nanny import CUDANanny
from .utils import get_n_gpus


Expand Down Expand Up @@ -35,7 +37,9 @@ def __init__(
threads_per_worker=1,
processes=True,
memory_limit=None,
**kwargs
device_memory_limit=None,
worker_class=CUDANanny,
**kwargs,
):
if n_workers is None:
n_workers = get_n_gpus()
Expand All @@ -45,11 +49,14 @@ def __init__(
raise ValueError("Can not specify more processes than GPUs")
if memory_limit is None:
memory_limit = TOTAL_MEMORY / n_workers

LocalCluster.__init__(
self,
n_workers=n_workers,
threads_per_worker=threads_per_worker,
memory_limit=memory_limit,
device_memory_limit=device_memory_limit,
worker_class=worker_class,
**kwargs,
)

Expand All @@ -60,21 +67,30 @@ def _start(self, ip=None, n_workers=0):
"""
if self.status == "running":
return
if (ip is None) and (not self.scheduler_port) and (not self.processes):
# Use inproc transport for optimization
scheduler_address = "inproc://"
elif ip is not None and ip.startswith("tls://"):
scheduler_address = "%s:%d" % (ip, self.scheduler_port)

if self.protocol == "inproc://":
address = self.protocol
else:
if ip is None:
ip = "127.0.0.1"
scheduler_address = (ip, self.scheduler_port)
self.scheduler.start(scheduler_address)
if self.interface:
ip = get_ip_interface(self.interface)
else:
ip = "127.0.0.1"

if "://" in ip:
address = ip
else:
address = self.protocol + ip
if self.scheduler_port:
address += ":" + str(self.scheduler_port)

self.scheduler.start(address)

yield [
self._start_worker(
**self.worker_kwargs,
env={"CUDA_VISIBLE_DEVICES": cuda_visible_devices(i)},
name="gpu-" + str(i),
)
for i in range(n_workers)
]
Expand Down
14 changes: 14 additions & 0 deletions dask_cuda/nanny.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from distributed import Nanny

from .worker import CUDAWorker


class CUDANanny(Nanny):
""" A process to manage CUDAWorker processes

This is a subclass of Nanny, with the only difference
being worker_class=CUDAWorker.
"""

def __init__(self, *args, worker_class=CUDAWorker, **kwargs):
Nanny.__init__(self, *args, worker_class=worker_class, **kwargs)
Loading