-
Notifications
You must be signed in to change notification settings - Fork 96
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Device memory spill support #35
Changes from 27 commits
1aa68a8
7349974
377368a
197e653
de5c3e9
42c0686
c92bbc9
ce4ba37
b3cbf2c
855876c
07fccfa
6d48805
1dd2c16
df6cb95
5a7ceef
8659fb6
2c24f03
1d06c75
deff18f
2d9c150
ce5c650
c5fbb6f
dcc6a6a
9bc21e7
9eb2dfd
4324439
388c677
c942438
d968b0f
1abb1eb
eb70191
806ac8b
f35826e
f308943
1b9d38b
6d5b714
358f194
c81f763
f28cbd1
6163359
a3c89fb
8f59141
1e3acce
882bf31
1960b79
eb86d5d
8153b27
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1,2 @@ | ||
from .local_cuda_cluster import LocalCUDACluster | ||
from . import config |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
import yaml | ||
import os | ||
|
||
import dask | ||
|
||
config = dask.config.config | ||
|
||
|
||
fn = os.path.join(os.path.dirname(__file__), 'cuda.yaml') | ||
with open(fn) as f: | ||
dask_cuda_defaults = yaml.load(f) | ||
|
||
dask.config.update_defaults(dask_cuda_defaults) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
cuda: | ||
worker: | ||
# Fractions of device memory at which we take action to avoid memory blowup | ||
# Set any of the lower three values to False to turn off the behavior entirely | ||
device-memory: | ||
target: 0.60 # target fraction to stay below | ||
spill: 0.70 # fraction at which we spill to host | ||
pause: 0.80 # fraction at which we pause worker threads |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
from distributed import Nanny | ||
|
||
from .cuda_worker import CUDAWorker | ||
|
||
|
||
class CUDANanny(Nanny): | ||
""" A process to manage CUDAWorker processes | ||
|
||
This is a subclass of Nanny, with the only difference | ||
being worker_class=CUDAWorker. | ||
""" | ||
def __init__(self, *args, **kwargs): | ||
Nanny.__init__(self, *args, | ||
worker_class=CUDAWorker, **kwargs) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,271 @@ | ||
from tornado import gen | ||
from numba import cuda | ||
|
||
import dask | ||
from distributed import Worker | ||
from distributed.worker import logger | ||
from distributed.compatibility import unicode | ||
from distributed.utils import format_bytes, ignoring, parse_bytes, PeriodicCallback | ||
|
||
from .device_host_file import DeviceHostFile | ||
|
||
|
||
def get_device_total_memory(): | ||
""" Return total memory of CUDA device from current context """ | ||
return cuda.current_context().get_memory_info()[1] # (free, total) | ||
|
||
|
||
def get_device_used_memory(): | ||
""" Return used memory of CUDA device from current context """ | ||
memory_info = cuda.current_context().get_memory_info() # (free, total) | ||
return memory_info[1] - memory_info[0] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This looks great. I didn't know that Numba could do this. Also, for general awareness, there is also this: https://github.com/gpuopenanalytics/pynvml/ though I haven't used it much myself. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Didn't know about that one. I don't have a strong opinion, but since Numba is a package more likely to be already installed and we only check memory for the time being, I think it makes more sense to just use Numba for now. |
||
|
||
|
||
def parse_device_memory_limit(memory_limit, ncores): | ||
""" Parse device memory limit input """ | ||
if memory_limit is None or memory_limit == 0 or memory_limit == 'auto': | ||
memory_limit = int(get_device_total_memory()) | ||
with ignoring(ValueError, TypeError): | ||
x = float(memory_limit) | ||
if isinstance(x, float) and x <= 1: | ||
return int(x * get_device_total_memory()) | ||
|
||
if isinstance(memory_limit, (unicode, str)): | ||
pentschev marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return parse_bytes(memory_limit) | ||
else: | ||
return int(memory_limit) | ||
|
||
|
||
class CUDAWorker(Worker): | ||
""" CUDA Worker node in a Dask distributed cluster | ||
|
||
Parameters | ||
---------- | ||
device_memory_limit: int, float, string | ||
Number of bytes of CUDA device memory that this worker should use. | ||
Set to zero for no limit or 'auto' for 100% of memory use. | ||
Use strings or numbers like 5GB or 5e9 | ||
device_memory_target_fraction: float | ||
Fraction of CUDA device memory to try to stay beneath | ||
device_memory_spill_fraction: float | ||
Fraction of CUDA device memory at which we start spilling to disk | ||
device_memory_pause_fraction: float | ||
Fraction of CUDA device memory at which we stop running new tasks | ||
|
||
Note: CUDAWorker is a subclass fo distributed.Worker, only parameters | ||
specific for CUDAWorker are listed here. For a complete list of | ||
parameters, refer to that. | ||
""" | ||
|
||
def __init__(self, *args, **kwargs): | ||
self.device_memory_limit = kwargs.pop('device_memory_limit', | ||
get_device_total_memory()) | ||
|
||
if 'device_memory_target_fraction' in kwargs: | ||
self.device_memory_target_fraction = kwargs.pop( | ||
'device_memory_target_fraction') | ||
else: | ||
self.device_memory_target_fraction = dask.config.get( | ||
'cuda.worker.device-memory.target') | ||
if 'device_memory_spill_fraction' in kwargs: | ||
self.device_memory_spill_fraction = kwargs.pop( | ||
'device_memory_spill_fraction') | ||
else: | ||
self.device_memory_spill_fraction = dask.config.get( | ||
'cuda.worker.device-memory.spill') | ||
if 'device_memory_pause_fraction' in kwargs: | ||
self.device_memory_pause_fraction = kwargs.pop( | ||
'device_memory_pause_fraction') | ||
else: | ||
self.device_memory_pause_fraction = dask.config.get( | ||
'cuda.worker.device-memory.pause') | ||
|
||
super().__init__(*args, **kwargs) | ||
|
||
self.device_memory_limit = parse_device_memory_limit( | ||
self.device_memory_limit, self.ncores) | ||
|
||
self.data = DeviceHostFile(device_memory_limit=self.device_memory_limit, | ||
memory_limit=self.memory_limit, | ||
local_dir=self.local_dir) | ||
|
||
self._paused = False | ||
self._device_paused = False | ||
|
||
if self.device_memory_limit: | ||
self._device_memory_monitoring = False | ||
pc = PeriodicCallback( | ||
self.device_memory_monitor, | ||
self.memory_monitor_interval * 1000, | ||
io_loop=self.io_loop | ||
) | ||
self.periodic_callbacks["device_memory"] = pc | ||
|
||
def _start(self, addr_on_port=0): | ||
super()._start(addr_on_port) | ||
if self.device_memory_limit: | ||
logger.info(' Device Memory: %26s', | ||
format_bytes(self.device_memory_limit)) | ||
logger.info('-' * 49) | ||
|
||
def _check_for_pause(self, fraction, pause_fraction, used_memory, memory_limit, | ||
paused, free_func, worker_description): | ||
if pause_fraction and fraction > pause_fraction: | ||
# Try to free some memory while in paused state | ||
if free_func: | ||
free_func() | ||
if not self._paused: | ||
logger.warning("%s is at %d%% memory usage. Pausing worker. " | ||
"Process memory: %s -- Worker memory limit: %s", | ||
worker_description, | ||
int(fraction * 100), | ||
format_bytes(used_memory), | ||
format_bytes(memory_limit)) | ||
return True | ||
return False | ||
|
||
def _resume_message(self, fraction, used_memory, memory_limit, | ||
worker_description): | ||
logger.warning("%s is at %d%% memory usage. Resuming worker. " | ||
"Process memory: %s -- Worker memory limit: %s", | ||
worker_description, | ||
int(fraction * 100), | ||
format_bytes(used_memory), | ||
format_bytes(memory_limit)) | ||
|
||
def _resume_worker(self): | ||
if self.paused and not (self._paused or self._device_paused): | ||
self.paused = False | ||
self.ensure_computing() | ||
|
||
@gen.coroutine | ||
def memory_monitor(self): | ||
""" Track this process's memory usage and act accordingly | ||
|
||
If we rise above (memory_spill_fraction * memory_limit) of | ||
memory use, start dumping data to disk. The default value for | ||
memory_spill_fraction is 0.7, defined via configuration | ||
'distributed.worker.memory.target'. | ||
|
||
If we rise above (memory_pause_fraction * memory_limit) of | ||
memory use , stop execution of new tasks. The default value | ||
for memory_pause_fraction is 0.8, defined via configuration | ||
'distributed.worker.memory.pause'. | ||
""" | ||
if self._memory_monitoring: | ||
return | ||
self._memory_monitoring = True | ||
total = 0 | ||
|
||
proc = self.monitor.proc | ||
memory = proc.memory_info().rss | ||
frac = memory / self.memory_limit | ||
|
||
# Pause worker threads if device memory use above | ||
# (self.memory_pause_fraction * 100)% | ||
old_pause_state = self._paused | ||
worker_description = 'Worker' | ||
self._paused = self._check_for_pause(frac, self.memory_pause_fraction, memory, | ||
self.memory_limit, self._paused, | ||
self._throttled_gc.collect(), | ||
worker_description) | ||
if old_pause_state and not self._paused: | ||
self._resume_message(frac, memory, self.memory_limit, | ||
worker_description) | ||
self._resume_worker() | ||
|
||
# Dump data to disk if memory use above | ||
# (self.memory_spill_fraction * 100)% | ||
if self.memory_spill_fraction and frac > self.memory_spill_fraction: | ||
target = self.memory_limit * self.memory_target_fraction | ||
count = 0 | ||
need = memory - target | ||
while memory > target: | ||
if not self.data.host.fast: | ||
logger.warning("Memory use is high but worker has no data " | ||
"to store to disk. Perhaps some other process " | ||
"is leaking memory? Process memory: %s -- " | ||
"Worker memory limit: %s", | ||
format_bytes(proc.memory_info().rss), | ||
format_bytes(self.memory_limit)) | ||
break | ||
k, v, weight = self.data.host.fast.evict() | ||
del k, v | ||
total += weight | ||
count += 1 | ||
yield gen.moment | ||
memory = proc.memory_info().rss | ||
if total > need and memory > target: | ||
# Issue a GC to ensure that the evicted data is actually | ||
# freed from memory and taken into account by the monitor | ||
# before trying to evict even more data. | ||
self._throttled_gc.collect() | ||
memory = proc.memory_info().rss | ||
if count: | ||
logger.debug("Moved %d pieces of data and %s bytes to disk", | ||
count, format_bytes(total)) | ||
|
||
self._memory_monitoring = False | ||
raise gen.Return(total) | ||
|
||
@gen.coroutine | ||
def device_memory_monitor(self): | ||
""" Track this process's memory usage and act accordingly | ||
|
||
If we rise above (device_memory_spill_fraction * memory_limit) of | ||
device memory use, start dumping data to disk. The default value | ||
for device_memory_spill_fraction is 0.7, defined via configuration | ||
'cuda.worker.device-memory.target'. | ||
|
||
If we rise above (device_memory_pause_fraction * memory_limit) of | ||
device memory use, stop execution of new tasks. The default value | ||
for device_memory_pause_fraction is 0.8, defined via configuration | ||
'cuda.worker.device-memory.pause'. | ||
""" | ||
if self._memory_monitoring: | ||
return | ||
self._device_memory_monitoring = True | ||
total = 0 | ||
memory = get_device_used_memory() | ||
frac = memory / self.device_memory_limit | ||
|
||
# Pause worker threads if device memory use above | ||
# (self.device_memory_pause_fraction * 100)% | ||
old_pause_state = self._device_paused | ||
worker_description = "Worker's CUDA device" | ||
self._device_paused = self._check_for_pause( | ||
frac, self.device_memory_pause_fraction, memory, | ||
self.device_memory_limit, self._device_paused, None, | ||
worker_description) | ||
if old_pause_state and not self._device_paused: | ||
self._resume_message(frac, memory, self.device_memory_limit, | ||
worker_description) | ||
self._resume_worker() | ||
|
||
# Dump device data to host if device memory use above | ||
# (self.device_memory_spill_fraction * 100)% | ||
if (self.device_memory_spill_fraction | ||
and frac > self.device_memory_spill_fraction): | ||
target = self.device_memory_limit * self.device_memory_target_fraction | ||
count = 0 | ||
while memory > target: | ||
if not self.data.device.fast: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have a question regarding , what is the data that is stored in I was trying to persist a data frame bigger than the gpu memory with individual chunks that comfortably fit in device memory but it seems to get paused and throw the following warning on both workers and get paused: distributed.worker - WARNING - CUDA device memory use is high but worker has no data to store to host. Perhaps some other process is leaking memory? Process memory: 13.76 GB -- Worker memory limit: 17.07 GB Also, is there a way to access the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
IMO, this is one of the trickiest parts. We can't guarantee that
I think this isn't possible, why would you like to access that directly? I don't know if this is by design or is just something that was never implemented, @mrocklin could you clarify this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Correct. The workers are in separate processes, so there is no way to access them from Python. You can ask Dask to run functions on them to inspect state if you like with def f(dask_worker):
return len(dask_worker.data.device.fast)
client.run(f) (See the run docstring for more information). You could also try the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note that when @pentschev says 'dask.array device chunks" he also means any piece of GPU allocated data, which could be a dask array device chunk as he's dealing with, or a cudf dataframe as you're dealing with. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks a lot @mrocklin for the function to inspect state. I used your function to debug whats happening. It seems that the data is being evicted from ('tcp://172.17.0.2:44914', {'data.device.fast': 14, 'data.device.slow': 0})
('tcp://172.17.0.2:44914', {'data.device.fast': 15, 'data.device.slow': 0})
('tcp://172.17.0.2:44914', {'data.device.fast': 3, 'data.device.slow': 14})
('tcp://172.17.0.2:44914', {'data.device.fast': 3, 'data.device.slow': 14})
('tcp://172.17.0.2:44914', {'data.device.fast': 0, 'data.device.slow': 18})
('tcp://172.17.0.2:44914', {'data.device.fast': 0, 'data.device.slow': 18}) Error:
May be the del here is not clearing memory. Don't know. https://github.com/rapidsai/dask-cuda/pull/35/files#diff-c87f0866b277f959dc7c5d1e4b0ff015R243 Will add a small minimal reproducible example here soon. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree that this is a major issue, that's why I'm concerned with it. In particular, I think pausing is something that can't be enabled under these circumstances, if it is, then when Dask spills memory to host but can't really release that, the worked will get stuck. I have two proposals (not necessarily mutually exclusive) until we come up with something else:
We can also disable pausing by default, which I'm inclined to think should be the default to prevent this sort of situation. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For 1 see dask/distributed#2453 Disable pausing by default seems fine to me. This is just a config value change at this point, yes? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes, that would be something similar, if not exactly that (sorry, I can't understand all the details without diving in a bit deeper). Any thoughts on item 2. as well?
Yes. I just don't know if disabling pause has no other consequences. On the host, I guess this is to prevent the host from running out of memory and eventually getting killed, is that right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I can see how it would solve the problem. I guess I'm hoping that medium term it's not necessary. My inclination is to wait until we have a real-world problem that needs this before adding it. I won't be surprised if that problem occurs quickly, but I'd still rather put it off and get this in. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No objections from me. That said, I have no further changes to be added, from my side, it's ready for more reviews or merging. |
||
logger.warning("CUDA device memory use is high but worker has " | ||
"no data to store to host. Perhaps some other " | ||
"process is leaking memory? Process memory: " | ||
"%s -- Worker memory limit: %s", | ||
format_bytes(get_device_used_memory()), | ||
format_bytes(self.device_memory_limit)) | ||
break | ||
k, v, weight = self.data.device.fast.evict() | ||
del k, v | ||
total += weight | ||
count += 1 | ||
yield gen.moment | ||
memory = get_device_used_memory() | ||
if count: | ||
logger.debug("Moved %d pieces of data and %s bytes to host memory", | ||
count, format_bytes(total)) | ||
|
||
self._device_memory_monitoring = False | ||
raise gen.Return(total) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could also keep this in the
distributed
namespace if we wanted to. It might be nice to have all of the worker config together.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have no strong opinions/objections. I think we can keep it in the same namespace but need a different file to prevent dask-cuda and dask-distributed from overwriting one another's configurations. Would you like me to change it to
distributed
namespace?