Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Device memory spill support #35

Closed
wants to merge 47 commits into from
Closed
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
1aa68a8
Add support for yaml configurations
pentschev Apr 18, 2019
7349974
Add DeviceHostFile class to handle memory-spilling in LRU fashion
pentschev Apr 18, 2019
377368a
Add CUDAWorker class to handle device memory
pentschev Apr 18, 2019
197e653
Use CUDAWorker by default, add --device-memory-limit parameter
pentschev Apr 18, 2019
de5c3e9
Rename configurations dask-cuda -> cuda
pentschev Apr 18, 2019
42c0686
Use CUDAWorker on LocalCUDACluster
pentschev Apr 23, 2019
c92bbc9
Add scheduler_ip argument to CUDAWorker
pentschev Apr 23, 2019
ce4ba37
Add test_device_spill
pentschev Apr 23, 2019
b3cbf2c
Add assert_device_host_file_size in utils_test module
pentschev Apr 23, 2019
855876c
Temporarily build with dask-distributed master
pentschev Apr 23, 2019
07fccfa
Install CuPy for CI builds
pentschev Apr 23, 2019
6d48805
Fix DeviceHostFile transfer between host/device dicts
pentschev Apr 25, 2019
1dd2c16
Add DeviceHostFile test
pentschev Apr 25, 2019
df6cb95
Update numpy and numba requirements
pentschev Apr 25, 2019
5a7ceef
Use Dask master and enable __array_function__ for CI builds
pentschev Apr 25, 2019
8659fb6
Fixes for GPU CI
pentschev Apr 29, 2019
2c24f03
Fix test_spill, extend it to multiple parameters
pentschev Apr 29, 2019
1d06c75
Minor device_host_file and utils_test fixes
pentschev Apr 29, 2019
deff18f
Add CUDANanny, wraps Nanny but defaults to CUDAWorker
pentschev Apr 29, 2019
2d9c150
Fix GPU indices for test_cuda_visible_devices
pentschev Apr 30, 2019
ce5c650
Add single-GPU test for visible devices
pentschev Apr 30, 2019
c5fbb6f
Improve documentation for CUDANanny, DeviceHostFile
pentschev Apr 30, 2019
dcc6a6a
Fix defaults for device_memory_limit
pentschev May 1, 2019
9bc21e7
Fix test_with_subset_of_cuda_visible_devices
pentschev May 1, 2019
9eb2dfd
Pass *args to CUDANanny and CUDAWorker
pentschev May 2, 2019
4324439
Increase required dask, distributed versions
pentschev May 2, 2019
388c677
Use yaml.safe_load
mrocklin May 2, 2019
c942438
Fix CUDA worker not pausing issue, print correct resume message
pentschev May 3, 2019
d968b0f
Support protocol= keyword in LocalCUDACluster (#39)
mrocklin May 3, 2019
1abb1eb
Merge branch 'branch-0.7' into device-memory-spill
mrocklin May 6, 2019
eb70191
Merge branch 'device-memory-spill' of github.com:pentschev/dask-cuda …
mrocklin May 6, 2019
806ac8b
Minor style changes
mrocklin May 6, 2019
f35826e
Change config namespace from cuda to distributed
mrocklin May 6, 2019
f308943
Rename files to remove cuda suffix
mrocklin May 6, 2019
1b9d38b
Fix style
pentschev May 6, 2019
6d5b714
Add device, host, disk LRU aliases to simplify user queries
pentschev May 7, 2019
358f194
Default pausing due to device memory usage to 0.0 (disable)
pentschev May 7, 2019
c81f763
Small doc fix in dask_cuda_worker.py
mrocklin May 7, 2019
f28cbd1
Avoid modifying DeviceHostFile internals in test_device_host_file
pentschev May 7, 2019
6163359
Merge remote-tracking branch 'origin/device-memory-spill' into device…
pentschev May 7, 2019
a3c89fb
Allow choosing worker-class when starting dask-cuda-worker
pentschev May 7, 2019
8f59141
Add back original test_cuda_visible_devices, using Worker class
pentschev May 7, 2019
1e3acce
Aliasing device, host and disk to dict instead of LRU
pentschev May 7, 2019
882bf31
Remove utils_test, use simple naming instead of gen_random_key()
pentschev May 7, 2019
1960b79
Use tmp_path instead of tempfile, fix flake8/black issues
pentschev May 7, 2019
eb86d5d
Add back 'loop' to test_cuda_visible_devices, despite flake8 complaints
pentschev May 7, 2019
8153b27
Add missing pause argument to test_spill
pentschev May 7, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ci/gpu/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ cd $WORKSPACE
export GIT_DESCRIBE_TAG=`git describe --abbrev=0 --tags`
export GIT_DESCRIBE_NUMBER=`git rev-list ${GIT_DESCRIBE_TAG}..HEAD --count`

# Use dask-distributed master until there's a release with
# https://github.com/dask/distributed/pull/2625
pip install --upgrade git+https://github.com/dask/distributed

################################################################################
# SETUP - Check environment
################################################################################
Expand Down
1 change: 1 addition & 0 deletions dask_cuda/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from .local_cuda_cluster import LocalCUDACluster
from . import config
13 changes: 13 additions & 0 deletions dask_cuda/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import yaml
import os

import dask

config = dask.config.config


fn = os.path.join(os.path.dirname(__file__), 'cuda.yaml')
with open(fn) as f:
dask_cuda_defaults = yaml.load(f)

dask.config.update_defaults(dask_cuda_defaults)
8 changes: 8 additions & 0 deletions dask_cuda/cuda.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
cuda:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also keep this in the distributed namespace if we wanted to. It might be nice to have all of the worker config together.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no strong opinions/objections. I think we can keep it in the same namespace but need a different file to prevent dask-cuda and dask-distributed from overwriting one another's configurations. Would you like me to change it to distributed namespace?

worker:
# Fractions of device memory at which we take action to avoid memory blowup
# Set any of the lower three values to False to turn off the behavior entirely
device-memory:
target: 0.60 # target fraction to stay below
spill: 0.70 # fraction at which we spill to host
pause: 0.80 # fraction at which we pause worker threads
252 changes: 252 additions & 0 deletions dask_cuda/cuda_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
from tornado import gen
from numba import cuda

import dask
from distributed import Worker
from distributed.worker import logger
from distributed.compatibility import unicode
from distributed.utils import format_bytes, ignoring, parse_bytes, PeriodicCallback

from .device_host_file import DeviceHostFile


def get_device_total_memory():
""" Return total memory of CUDA device from current context """
return cuda.current_context().get_memory_info()[1] # (free, total)


def get_device_used_memory():
""" Return used memory of CUDA device from current context """
memory_info = cuda.current_context().get_memory_info() # (free, total)
return memory_info[1] - memory_info[0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great. I didn't know that Numba could do this.

Also, for general awareness, there is also this: https://github.com/gpuopenanalytics/pynvml/ though I haven't used it much myself.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't know about that one. I don't have a strong opinion, but since Numba is a package more likely to be already installed and we only check memory for the time being, I think it makes more sense to just use Numba for now.



def parse_device_memory_limit(memory_limit, ncores):
""" Parse device memory limit input """
if memory_limit is None or memory_limit == 0 or memory_limit == 'auto':
memory_limit = int(get_device_total_memory())
with ignoring(ValueError, TypeError):
x = float(memory_limit)
if isinstance(x, float) and x <= 1:
return int(x * get_device_total_memory())

if isinstance(memory_limit, (unicode, str)):
return parse_bytes(memory_limit)
else:
return int(memory_limit)


class CUDAWorker(Worker):
""" CUDA Worker node in a Dask distributed cluster

Parameters
----------
device_memory_limit: int, float, string
Number of bytes of CUDA device memory that this worker should use.
Set to zero for no limit or 'auto' for 100% of memory use.
Use strings or numbers like 5GB or 5e9
device_memory_target_fraction: float
Fraction of CUDA device memory to try to stay beneath
device_memory_spill_fraction: float
Fraction of CUDA device memory at which we start spilling to disk
device_memory_pause_fraction: float
Fraction of CUDA device memory at which we stop running new tasks

Note: CUDAWorker is a subclass fo distributed.Worker, only parameters
specific for CUDAWorker are listed here. For a complete list of
parameters, refer to that.
"""

def __init__(self, scheduler_ip=None, **kwargs):
self.device_memory_limit = kwargs.pop('device_memory_limit', None)

if 'device_memory_target_fraction' in kwargs:
self.device_memory_target_fraction = kwargs.pop(
'device_memory_target_fraction')
else:
self.device_memory_target_fraction = dask.config.get(
'cuda.worker.device-memory.target')
if 'device_memory_spill_fraction' in kwargs:
self.device_memory_spill_fraction = kwargs.pop(
'device_memory_spill_fraction')
else:
self.device_memory_spill_fraction = dask.config.get(
'cuda.worker.device-memory.spill')
if 'device_memory_pause_fraction' in kwargs:
self.device_memory_pause_fraction = kwargs.pop(
'device_memory_pause_fraction')
else:
self.device_memory_pause_fraction = dask.config.get(
'cuda.worker.device-memory.pause')

super().__init__(scheduler_ip=scheduler_ip, **kwargs)

self.device_memory_limit = parse_device_memory_limit(
self.device_memory_limit, self.ncores)

self.data = DeviceHostFile(device_memory_limit=self.device_memory_limit,
memory_limit=self.memory_limit,
local_dir=self.local_dir)

self._paused = False
self._device_paused = False

if self.device_memory_limit:
self._device_memory_monitoring = False
pc = PeriodicCallback(
self.device_memory_monitor,
self.memory_monitor_interval * 1000,
io_loop=self.io_loop
)
self.periodic_callbacks["device_memory"] = pc

def _start(self, addr_on_port=0):
super()._start(addr_on_port)
if self.device_memory_limit:
logger.info(' Device Memory: %26s',
format_bytes(self.device_memory_limit))
logger.info('-' * 49)

def _check_for_pause(self, fraction, pause_fraction, used_memory, memory_limit,
paused, free_func, worker_description):
if pause_fraction and fraction > pause_fraction:
# Try to free some memory while in paused state
if free_func:
free_func()
if not self._paused:
logger.warning("%s is at %d%% memory usage. Pausing worker. "
"Process memory: %s -- Worker memory limit: %s",
worker_description,
int(fraction * 100),
format_bytes(used_memory),
format_bytes(memory_limit))
return True
elif paused:
logger.warning("Worker is at %d%% memory usage. Resuming worker. "
"Process memory: %s -- Worker memory limit: %s",
int(fraction * 100),
format_bytes(used_memory),
format_bytes(memory_limit))
self.ensure_computing()
return False

@gen.coroutine
def memory_monitor(self):
""" Track this process's memory usage and act accordingly

If we rise above (memory_spill_fraction * memory_limit) of
memory use, start dumping data to disk. The default value for
memory_spill_fraction is 0.7, defined via configuration
'distributed.worker.memory.target'.

If we rise above (memory_pause_fraction * memory_limit) of
memory use , stop execution of new tasks. The default value
for memory_pause_fraction is 0.8, defined via configuration
'distributed.worker.memory.pause'.
"""
if self._memory_monitoring:
return
self._memory_monitoring = True
total = 0

proc = self.monitor.proc
memory = proc.memory_info().rss
frac = memory / self.memory_limit

# Pause worker threads if device memory use above
# (self.memory_pause_fraction * 100)%
self._paused = self._check_for_pause(frac, self.memory_pause_fraction, memory,
self.memory_limit, self._paused,
self._throttled_gc.collect(), 'Worker')
self.paused = (self._paused or self._device_paused)

# Dump data to disk if memory use above
# (self.memory_spill_fraction * 100)%
if self.memory_spill_fraction and frac > self.memory_spill_fraction:
target = self.memory_limit * self.memory_target_fraction
count = 0
need = memory - target
while memory > target:
if not self.data.host.fast:
logger.warning("Memory use is high but worker has no data "
"to store to disk. Perhaps some other process "
"is leaking memory? Process memory: %s -- "
"Worker memory limit: %s",
format_bytes(proc.memory_info().rss),
format_bytes(self.memory_limit))
break
k, v, weight = self.data.host.fast.evict()
del k, v
total += weight
count += 1
yield gen.moment
memory = proc.memory_info().rss
if total > need and memory > target:
# Issue a GC to ensure that the evicted data is actually
# freed from memory and taken into account by the monitor
# before trying to evict even more data.
self._throttled_gc.collect()
memory = proc.memory_info().rss
if count:
logger.debug("Moved %d pieces of data and %s bytes to disk",
count, format_bytes(total))

self._memory_monitoring = False
raise gen.Return(total)

@gen.coroutine
def device_memory_monitor(self):
""" Track this process's memory usage and act accordingly

If we rise above (device_memory_spill_fraction * memory_limit) of
device memory use, start dumping data to disk. The default value
for device_memory_spill_fraction is 0.7, defined via configuration
'cuda.worker.device-memory.target'.

If we rise above (device_memory_pause_fraction * memory_limit) of
device memory use, stop execution of new tasks. The default value
for device_memory_pause_fraction is 0.8, defined via configuration
'cuda.worker.device-memory.pause'.
"""
if self._memory_monitoring:
return
self._device_memory_monitoring = True
total = 0
memory = get_device_used_memory()
frac = memory / self.device_memory_limit

# Pause worker threads if device memory use above
# (self.device_memory_pause_fraction * 100)%
self._paused = self._check_for_pause(frac, self.device_memory_pause_fraction,
memory, self.device_memory_limit,
self._device_paused, None,
"Worker's CUDA device")
self.paused = (self._paused or self._device_paused)

# Dump device data to host if device memory use above
# (self.device_memory_spill_fraction * 100)%
if (self.device_memory_spill_fraction
and frac > self.device_memory_spill_fraction):
target = self.device_memory_limit * self.device_memory_target_fraction
count = 0
while memory > target:
if not self.data.device.fast:
Copy link
Member

@VibhuJawa VibhuJawa May 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a question regarding , what is the data that is stored in CUDAWorker.data.device.fast:?

I was trying to persist a data frame bigger than the gpu memory with individual chunks that comfortably fit in device memory but it seems to get paused and throw the following warning on both workers and get paused:

distributed.worker - WARNING - CUDA device memory use is high but worker has no data to store to host. Perhaps some other process is leaking memory? Process memory: 13.76 GB -- Worker memory limit: 17.07 GB

Also, is there a way to access the CUDAWorker object from the client/cluster object. I tried the CUDAWorker at client.cluster.workers[0].Worker but it does not seem contain the self.data attribute and other attributes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a question regarding , what is the data that is stored in CUDAWorker.data.device.fast:?

CUDAWorker.data.device.fast is a dict that stores dask.array device chunks. Similarly CUDAWorker.data.device.slow (== CUDAWorker.data.host.fast) is a dict that stores dask.array host chunks or device chunks that have been spilled to host, and CUDAWorker.data.host.slow contains the path to all chunks on disk.

I was trying to persist a data frame bigger than the gpu memory with individual chunks that comfortably fit in device memory but it seems to get paused and throw the following warning on both workers and get paused:

IMO, this is one of the trickiest parts. We can't guarantee that CUDAWorker has full control over device memory, depending on what you're using to create/load your data, it may have its own memory pool, for example. This is why I had to explicitly disable CuPy's memory pooling to write a reliable test in https://github.com/rapidsai/dask-cuda/pull/35/files#diff-fa83f19df929827ec523fd00a21599b8R38.

Also, is there a way to access the CUDAWorker object from the client/cluster object. I tried the CUDAWorker at client.cluster.workers[0].Worker but it does not seem contain the self.data attribute and other attributes.

I think this isn't possible, why would you like to access that directly? I don't know if this is by design or is just something that was never implemented, @mrocklin could you clarify this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, is there a way to access the CUDAWorker object from the client/cluster object.

I think this isn't possible

Correct. The workers are in separate processes, so there is no way to access them from Python. You can ask Dask to run functions on them to inspect state if you like with

def f(dask_worker):
    return len(dask_worker.data.device.fast)

client.run(f)

(See the run docstring for more information).

You could also try the start_ipython_workers function with the keyword qtconsole=True, but I woudln't be surprised if it has fallen out of use with recent Tornado library upgrades.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that when @pentschev says 'dask.array device chunks" he also means any piece of GPU allocated data, which could be a dask array device chunk as he's dealing with, or a cudf dataframe as you're dealing with.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot @mrocklin for the function to inspect state. I used your function to debug whats happening. It seems that the data is being evicted from data.device.fast but the gpu memory is still not being cleared.

('tcp://172.17.0.2:44914', {'data.device.fast': 14, 'data.device.slow': 0})
('tcp://172.17.0.2:44914', {'data.device.fast': 15, 'data.device.slow': 0})
('tcp://172.17.0.2:44914', {'data.device.fast': 3, 'data.device.slow': 14})
('tcp://172.17.0.2:44914', {'data.device.fast': 3, 'data.device.slow': 14})
('tcp://172.17.0.2:44914', {'data.device.fast': 0, 'data.device.slow': 18})
('tcp://172.17.0.2:44914', {'data.device.fast': 0, 'data.device.slow': 18})

Error:

distributed.worker - WARNING - CUDA device memory use is high but worker has no data to store to host.  Perhaps some other process is leaking memory?  Process memory: 12.08 GB -- Worker memory limit: 17.07 GB

May be the del here is not clearing memory. Don't know. https://github.com/rapidsai/dask-cuda/pull/35/files#diff-c87f0866b277f959dc7c5d1e4b0ff015R243

Will add a small minimal reproducible example here soon.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that this is a major issue, that's why I'm concerned with it. In particular, I think pausing is something that can't be enabled under these circumstances, if it is, then when Dask spills memory to host but can't really release that, the worked will get stuck.

I have two proposals (not necessarily mutually exclusive) until we come up with something else:

  1. Allow the user to register a function that will be run at the initialization of the worker, to allow the user to disable any memory managers that will be used throughout the runtime;
  2. Allow the user to register a function that will be called in CUDAWorker.device_memory_monitor, triggering memory managers' release of memory.

We can also disable pausing by default, which I'm inclined to think should be the default to prevent this sort of situation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For 1 see dask/distributed#2453

Disable pausing by default seems fine to me. This is just a config value change at this point, yes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For 1 see dask/distributed#2453

Yes, that would be something similar, if not exactly that (sorry, I can't understand all the details without diving in a bit deeper).

Any thoughts on item 2. as well?

Disable pausing by default seems fine to me. This is just a config value change at this point, yes?

Yes. I just don't know if disabling pause has no other consequences. On the host, I guess this is to prevent the host from running out of memory and eventually getting killed, is that right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any thoughts on item 2. as well?

I can see how it would solve the problem. I guess I'm hoping that medium term it's not necessary. My inclination is to wait until we have a real-world problem that needs this before adding it. I won't be surprised if that problem occurs quickly, but I'd still rather put it off and get this in.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No objections from me. That said, I have no further changes to be added, from my side, it's ready for more reviews or merging.

logger.warning("CUDA device memory use is high but worker has "
"no data to store to host. Perhaps some other "
"process is leaking memory? Process memory: "
"%s -- Worker memory limit: %s",
format_bytes(get_device_used_memory()),
format_bytes(self.device_memory_limit))
break
k, v, weight = self.data.device.fast.evict()
del k, v
total += weight
count += 1
yield gen.moment
memory = get_device_used_memory()
if count:
logger.debug("Moved %d pieces of data and %s bytes to host memory",
count, format_bytes(total))

self._device_memory_monitoring = False
raise gen.Return(total)
15 changes: 14 additions & 1 deletion dask_cuda/dask_cuda_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from sys import exit

import click
from distributed import Nanny, Worker
from distributed import Nanny
from distributed.config import config
from distributed.utils import get_ip_interface, parse_timedelta
from distributed.worker import _ncores
Expand All @@ -23,6 +23,7 @@
enable_proctitle_on_current,
)

from .cuda_worker import CUDAWorker
from .local_cuda_cluster import cuda_visible_devices
from .utils import get_n_gpus

Expand Down Expand Up @@ -98,6 +99,15 @@
"string (like 5GB or 5000M), "
"'auto', or zero for no memory management",
)
@click.option(
"--device-memory-limit",
default="auto",
help="Bytes of memory per CUDA device that the worker can use. "
"This can be an integer (bytes), "
"float (fraction of total system memory), "
"string (like 5GB or 5000M), "
"'auto', or zero for no memory management",
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to avoid this until someone asks for it if possible. I'm somewhat against an API like this because it forces us to enumerate the possible options in code. If we were to do something like this I think that we would probably provide the full namespace of the class and then try to import it.

However there is enough uncertainty here that, for maintenance reasons, I'd prefer that we not promise anything until someone is asking us for this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For context, the worker_class keyword exists because different groups have their own custom Worker classes that they use. Exposing a keyword like this but making no move to allow them to plug in seems against the spirit of the keyword.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For things like the memory monitor, this may not be an issue, both because we have disabled it temporarily, and because it may just not be necessary with the kind of data used on GPUs.

That's not true. We solely disabled pausing the worker, to control spilling from the device, we still need to monitor the device memory. And this is why I needed to subclass it.

I would prefer to avoid this until someone asks for it if possible. I'm somewhat against an API like this because it forces us to enumerate the possible options in code. If we were to do something like this I think that we would probably provide the full namespace of the class and then try to import it.

Unfortunately, this is necessary for us to reenable the old CUDA_VISIBLE_DEVICES test. Using CUDAWorker, which was hardcoded before, prevents us from launching a process on a single-GPU machine to mock test as if it had several and check for the ordering of GPUs of each process.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For things like the memory monitor, this may not be an issue, both because we have disabled it temporarily, and because it may just not be necessary with the kind of data used on GPUs.

That's not true. We solely disabled pausing the worker, to control spilling from the device, we still need to monitor the device memory. And this is why I needed to subclass it.

Why do we need to monitor device memory externally from the use of sizeof? If objects' reliably tell us how much device memory they take up then we may not need to track device memory separately.

Dask workers operated this way for a long time before we finally needed to give in and have a separate periodic callback that tracked system memory. I'm inclined to try the simpler approach first and see if it breaks down or not.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, they do track how much memory they take. However, tracking the device memory lets us decide when it's time to spill memory. Isn't that what memory_target_fraction and memory_spill_fraction (prepend device_ for the cases in this PR) are for?

The block https://github.com/rapidsai/dask-cuda/pull/35/files#diff-a77f0c6f19d8d34d59aede5e31455719R282 controls the spilling, and this is why we needed to subclass Worker.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, something like that diff. You'll also want to add the data= keyword to LocalCUDACluster.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation on memory monitor.

Yes, something like that diff. You'll also want to add the data= keyword to LocalCUDACluster.

Yes, and I also want to create the object before. :)

But ok, I can probably have it quickly done by tomorrow. There's a few more things that need to be ported to allow it to work (like finding out how much memory the device has in total), and also some test(s), which shouldn't be too difficult now that there's already one that works with the monitoring mechanism and I can base it on that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and I also want to create the object before. :)

It's actually valid to pass just the class. Dask will construct it. I think that this is explained in the Worker docstring. This is better because you're using a Nanny and don't want to pass it through a process boundary.

like finding out how much memory the device has in total)

I recommend that we start with just using the full memory or a config value by default and not mess with any user inputs (which will get messy).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's actually valid to pass just the class. Dask will construct it. I think that this is explained in the Worker docstring. This is better because you're using a Nanny and don't want to pass it through a process boundary.

Ok, I'll check that.

I recommend that we start with just using the full memory or a config value by default and not mess with any user inputs (which will get messy).

We need to identify how much memory there is available for the device, regardless. I can probably use the same numba code from before.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, mostly I want to say lets not add a new device_memory_foo= keyword yet if we can avoid it.

@click.option(
"--reconnect/--no-reconnect",
default=True,
Expand Down Expand Up @@ -146,6 +156,7 @@ def main(
nthreads,
name,
memory_limit,
device_memory_limit,
pid_file,
reconnect,
resources,
Expand Down Expand Up @@ -243,6 +254,7 @@ def del_pid_file():
loop=loop,
resources=resources,
memory_limit=memory_limit,
device_memory_limit=device_memory_limit,
reconnect=reconnect,
local_dir=local_directory,
death_timeout=death_timeout,
Expand All @@ -252,6 +264,7 @@ def del_pid_file():
contact_address=None,
env={"CUDA_VISIBLE_DEVICES": cuda_visible_devices(i)},
name=name if nprocs == 1 or not name else name + "-" + str(i),
worker_class=CUDAWorker,
**kwargs
)
for i in range(nprocs)
Expand Down
Loading