-
Notifications
You must be signed in to change notification settings - Fork 96
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Device memory spill support (LRU-based only) #51
Merged
mrocklin
merged 16 commits into
rapidsai:branch-0.8
from
pentschev:device-memory-spill-lru
May 15, 2019
Merged
Changes from all commits
Commits
Show all changes
16 commits
Select commit
Hold shift + click to select a range
9e37bd2
Add DeviceHostFile class to handle memory-spilling in LRU fashion
pentschev eae1bf8
Add DeviceHostFile tests
pentschev 8138305
Pass DeviceHostFile to Worker via data argument
pentschev 49733d7
Add CuPy and enable __array_function__ in CI build
pentschev 7ebed48
Update version requirements of dask, distributed and numpy
pentschev 5f47afa
Add get_device_total_memory utility function
pentschev c570360
Pass pre-constructed DeviceHostFile to Worker
pentschev a37d7f6
Add memory spilling test
pentschev 18ced44
Add numba as a requirement
pentschev ac56871
Add --device-memory-limit paramater to dask-cuda-worker
pentschev f7acb76
Create work directory before DeviceHostFile
pentschev c9b283f
Add DeviceHostFile support for LocalCUDACluster
pentschev 6428b54
Add LocalCUDACluster device spilling test
pentschev be3710d
Add `fast` to DeviceHostFile for Worker compatibility
pentschev 45d9a26
Fix some setup.py formatting
pentschev 93adc53
Fix LocalCUDACluster device_memory_limit parsing and local dir creation
pentschev File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
from zict import Buffer, File, Func | ||
from zict.common import ZictBase | ||
from distributed.protocol import deserialize_bytes, serialize_bytes | ||
from distributed.worker import weight | ||
|
||
from functools import partial | ||
import os | ||
|
||
|
||
def _is_device_object(obj): | ||
""" | ||
Check if obj is a device object, by checking if it has a | ||
__cuda_array_interface__ attributed | ||
""" | ||
return hasattr(obj, "__cuda_array_interface__") | ||
|
||
|
||
def _serialize_if_device(obj): | ||
""" Serialize an object if it's a device object """ | ||
if _is_device_object(obj): | ||
return serialize_bytes(obj, on_error="raise") | ||
else: | ||
return obj | ||
|
||
|
||
def _deserialize_if_device(obj): | ||
""" Deserialize an object if it's an instance of bytes """ | ||
if isinstance(obj, bytes): | ||
return deserialize_bytes(obj) | ||
else: | ||
return obj | ||
|
||
|
||
class DeviceHostFile(ZictBase): | ||
""" Manages serialization/deserialization of objects. | ||
|
||
Three LRU cache levels are controlled, for device, host and disk. | ||
Each level takes care of serializing objects once its limit has been | ||
reached and pass it to the subsequent level. Similarly, each cache | ||
may deserialize the object, but storing it back in the appropriate | ||
cache, depending on the type of object being deserialized. | ||
|
||
Parameters | ||
---------- | ||
device_memory_limit: int | ||
Number of bytes of CUDA device memory for device LRU cache, | ||
spills to host cache once filled. | ||
memory_limit: int | ||
Number of bytes of host memory for host LRU cache, spills to | ||
disk once filled. | ||
local_dir: path | ||
Path where to store serialized objects on disk | ||
""" | ||
|
||
def __init__( | ||
self, device_memory_limit=None, memory_limit=None, local_dir="dask-worker-space" | ||
): | ||
path = os.path.join(local_dir, "storage") | ||
|
||
self.host_func = dict() | ||
self.disk_func = Func( | ||
partial(serialize_bytes, on_error="raise"), deserialize_bytes, File(path) | ||
) | ||
self.host_buffer = Buffer( | ||
self.host_func, self.disk_func, memory_limit, weight=weight | ||
) | ||
|
||
self.device_func = dict() | ||
self.device_host_func = Func( | ||
_serialize_if_device, _deserialize_if_device, self.host_buffer | ||
) | ||
self.device_buffer = Buffer( | ||
self.device_func, self.device_host_func, device_memory_limit, weight=weight | ||
) | ||
|
||
self.device = self.device_buffer.fast.d | ||
self.host = self.host_buffer.fast.d | ||
self.disk = self.host_buffer.slow.d | ||
|
||
# For Worker compatibility only, where `fast` is host memory buffer | ||
self.fast = self.host_buffer.fast | ||
|
||
def __setitem__(self, key, value): | ||
if _is_device_object(value): | ||
self.device_buffer[key] = value | ||
else: | ||
self.host_buffer[key] = value | ||
|
||
def __getitem__(self, key): | ||
if key in self.host_buffer: | ||
obj = self.host_buffer[key] | ||
del self.host_buffer[key] | ||
self.device_buffer[key] = _deserialize_if_device(obj) | ||
|
||
if key in self.device_buffer: | ||
return self.device_buffer[key] | ||
else: | ||
raise KeyError | ||
|
||
def __len__(self): | ||
return len(self.device_buffer) | ||
|
||
def __iter__(self): | ||
return iter(self.device_buffer) | ||
|
||
def __delitem__(self, i): | ||
del self.device_buffer[i] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend the following instead:
Otherwise we create the data object immediately here, and then need to pass it down to the worker through a process boundary.
This is implemented here, but it looks like it's not documented anywhere (my mistake)
https://github.com/dask/distributed/blob/8e449d392e91eff0a3454ee98ef362de8f78cc4f/distributed/worker.py#L500-L501
Also, if the user specifies
device_memory_limit=0
then we might want something simpler. I can imagine wanting to turn off this behavior if things get complex.We probably also want the same treatment in
LocalCUDACluster
, though as a first pass we can also include this ourselves.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was in the process of doing that, only saw your comment now. Indeed, this is a better solution, but either way, the downside is that I had to create the work directory in
dask-cuda-worker
, which I'm not particularly happy with, so if you have a suggestion on how to avoid that, it would be great!I'm already working on the
LocalCUDACluster
, I'm still not done, will soon push those.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hrm, yes I can see how that would be a concern. I don't have a good solution currently unfortunately. I'll think about it though.