Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Just-in-time deserialization #353

Merged
merged 59 commits into from
Nov 23, 2020
Merged
Show file tree
Hide file tree
Changes from 46 commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
3c8117f
Initial implementing of ObjectProxy
madsbk Aug 6, 2020
12e1f6c
Added basic tests of ObjectProxy
madsbk Aug 6, 2020
0303db4
cleanup
madsbk Aug 6, 2020
411bce8
Implemented some more proxy attributes
madsbk Aug 6, 2020
7c528d0
Added spilling of proxy object optional
madsbk Aug 6, 2020
93ddf87
Merge branch 'branch-0.15' of github.com:rapidsai/dask-cuda into jit_…
madsbk Aug 7, 2020
de36de9
Re-added dask_serialize for DeviceSerialized
madsbk Aug 7, 2020
b4eb344
Added support of __array__
madsbk Aug 7, 2020
521d3ca
Added __sizeof__
madsbk Aug 7, 2020
33b2bd6
Added some spill_proxy tests in test_device_host_file.py
madsbk Aug 7, 2020
cc1df47
Checking len() instead of .size()
madsbk Aug 7, 2020
f6c11f1
Added dispatch support of hash_object_dispatch and group_split_dispatch
madsbk Aug 7, 2020
349a393
Added "*args, **kwargs" to dispatch of ObjectProxy
madsbk Aug 7, 2020
68ff7ed
Added dispatch of make_scalar
madsbk Aug 7, 2020
d47f6fd
Added dispatch of concat_dispatch
madsbk Aug 7, 2020
e7e2822
Merge branch 'branch-0.15' of github.com:rapidsai/dask-cuda into jit_…
madsbk Aug 14, 2020
b8e1feb
Merge branch 'branch-0.16' of github.com:rapidsai/dask-cuda into jit_…
madsbk Sep 11, 2020
d93f8c5
meta.yaml: added pandas dependency
madsbk Sep 14, 2020
3756b0a
meta.yaml: depend on dask (not only dask-core)
madsbk Sep 14, 2020
40058cd
Added jit-unspill worker option
madsbk Sep 14, 2020
37b2ef8
meta.yaml: removed pandas
madsbk Sep 17, 2020
fdb9c69
Merge branch 'branch-0.16' of github.com:rapidsai/dask-cuda into jit_…
madsbk Sep 18, 2020
a82ee6d
Merge branch 'branch-0.16' of github.com:rapidsai/dask-cuda into jit_…
madsbk Oct 1, 2020
16879fc
Using explicit args for the dispatch functions
madsbk Oct 1, 2020
ac782b0
fixed typo
madsbk Oct 2, 2020
37ffe2b
Merge branch 'branch-0.17' into jit_deserialization
madsbk Oct 6, 2020
51d6aa5
ObjectProxy._obj_pxy_serialize(): takes serializers
madsbk Oct 6, 2020
f626372
serializers replaces is_serialized
madsbk Oct 6, 2020
b35bed8
Supporting cuda serializers
madsbk Oct 6, 2020
7860fa7
Added a lot of operators
madsbk Oct 7, 2020
45152a0
fixed typos
madsbk Oct 7, 2020
1a43713
Support and test of a proxy object of a proxy object
madsbk Oct 7, 2020
0e80d3a
test_spilling_local_cuda_cluster(): added some extra checks
madsbk Oct 8, 2020
0f27030
Added _obj_pxy_is_cuda_object()
madsbk Oct 9, 2020
cb2bafb
asproxy(): added subclass argument
madsbk Oct 9, 2020
de7a49c
fixed type in test_spilling_local_cuda_cluster check
madsbk Oct 9, 2020
95b31c4
Added test of communicating proxy objects
madsbk Oct 9, 2020
5d7ee69
Making ObjectProxy threadsafe
madsbk Oct 9, 2020
4c32ca6
renamed ObjectProxy => ProxyObject
madsbk Oct 9, 2020
f02835e
Never re-serialize proxy objects
madsbk Oct 12, 2020
bd145fe
Merge branch 'branch-0.17' of github.com:rapidsai/dask-cuda into jit_…
madsbk Nov 10, 2020
76638ae
Test: setting device_memory_limit="1B" to force serialization
madsbk Nov 10, 2020
56f0546
test: added an explicit client shutdown
madsbk Nov 11, 2020
a86ca44
Added some str/repr tests
madsbk Nov 11, 2020
eafef51
clean up
madsbk Nov 11, 2020
71bd470
Merge branch 'branch-0.17' of github.com:rapidsai/dask-cuda into jit_…
madsbk Nov 12, 2020
deb2f58
added some more checks in test_proxy_object_of_numpy
madsbk Nov 12, 2020
3a22755
ProxyObject: added docs
madsbk Nov 12, 2020
9a76b8e
added unproxy()
madsbk Nov 13, 2020
b1f9331
Added docs
madsbk Nov 13, 2020
bd8e745
Added ValueError when serializers isn't specified
madsbk Nov 13, 2020
f8ea867
typo
madsbk Nov 13, 2020
7a73f35
Style and spelling fixes
madsbk Nov 13, 2020
dec1d7e
Merge branch 'branch-0.17' of github.com:rapidsai/dask-cuda into jit_…
madsbk Nov 19, 2020
e0fb226
docs
madsbk Nov 20, 2020
b565bc5
ProxyObject.__sizeof__(): use dask.sizeof()
madsbk Nov 20, 2020
c774ed2
Merge branch 'jit_deserialization' of github.com:madsbk/dask-cuda int…
madsbk Nov 20, 2020
dbcf68d
Serializer: convert to tuples before comparing
madsbk Nov 20, 2020
81fa615
typos
madsbk Nov 20, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conda/recipes/dask-cuda/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ requirements:
- setuptools
run:
- python x.x
- dask-core >=2.4.0
- dask >=2.4.0
- distributed >=2.18.0
- pynvml >=8.0.3
- numpy >=1.16.0
Expand Down
7 changes: 7 additions & 0 deletions dask_cuda/cli/dask_cuda_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,11 @@
"InfiniBand only and will still cause unpredictable errors if not _ALL_ "
"interfaces are connected and properly configured.",
)
@click.option(
"--enable-jit-unspill/--disable-jit-unspill",
default=None, # If not specified, use Dask config
help="Enable just-in-time unspilling",
)
def main(
scheduler,
host,
Expand Down Expand Up @@ -218,6 +223,7 @@ def main(
enable_nvlink,
enable_rdmacm,
net_devices,
enable_jit_unspill,
**kwargs,
):
if tls_ca_file and tls_cert and tls_key:
Expand Down Expand Up @@ -252,6 +258,7 @@ def main(
enable_nvlink,
enable_rdmacm,
net_devices,
enable_jit_unspill,
**kwargs,
)

Expand Down
7 changes: 7 additions & 0 deletions dask_cuda/cuda_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def __init__(
enable_nvlink=False,
enable_rdmacm=False,
net_devices=None,
jit_unspill=None,
**kwargs,
):
# Required by RAPIDS libraries (e.g., cuDF) to ensure no context
Expand Down Expand Up @@ -177,6 +178,11 @@ def del_pid_file():
cuda_device_index=0,
)

if jit_unspill is None:
self.jit_unspill = dask.config.get("jit-unspill", default=False)
else:
self.jit_unspill = jit_unspill

self.nannies = [
t(
scheduler,
Expand Down Expand Up @@ -216,6 +222,7 @@ def del_pid_file():
),
"memory_limit": memory_limit,
"local_directory": local_directory,
"jit_unspill": self.jit_unspill,
},
),
**kwargs,
Expand Down
41 changes: 37 additions & 4 deletions dask_cuda/device_host_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@
from distributed.utils import nbytes
from distributed.worker import weight

from . import proxy_object
from .is_device_object import is_device_object
from .utils import nvtx_annotate


class DeviceSerialized:
""" Store device object on the host

This stores a device-side object as

1. A msgpack encodable header
2. A list of `bytes`-like objects (like NumPy arrays)
that are in host memory
Expand Down Expand Up @@ -66,6 +65,27 @@ def host_to_device(s: DeviceSerialized) -> object:
return deserialize(s.header, s.frames)


@nvtx_annotate("SPILL_D2H", color="red", domain="dask_cuda")
def pxy_obj_device_to_host(obj: object) -> proxy_object.ProxyObject:
try:
# Never re-serialize proxy objects.
if obj._obj_pxy["serializers"] is None:
return obj
except (KeyError, AttributeError):
pass

# Notice, both the "dask" and the "pickle" serializer will
# spill `obj` to main memory.
return proxy_object.asproxy(obj, serializers=["dask", "pickle"])


@nvtx_annotate("SPILL_H2D", color="green", domain="dask_cuda")
def pxy_obj_host_to_device(s: proxy_object.ProxyObject) -> object:
# Notice, we do _not_ deserialize at this point. The proxy
# object automatically deserialize just-in-time.
return s


class DeviceHostFile(ZictBase):
""" Manages serialization/deserialization of objects.

Expand All @@ -86,10 +106,16 @@ class DeviceHostFile(ZictBase):
implies no spilling to disk.
local_directory: path
Path where to store serialized objects on disk
jit_unspill: bool
If True, enable just-in-time unspilling (see proxy_object.ProxyObject).
"""

def __init__(
self, device_memory_limit=None, memory_limit=None, local_directory=None,
self,
device_memory_limit=None,
memory_limit=None,
local_directory=None,
jit_unspill=False,
):
if local_directory is None:
local_directory = dask.config.get("temporary-directory") or os.getcwd()
Expand All @@ -115,7 +141,14 @@ def __init__(

self.device_keys = set()
self.device_func = dict()
self.device_host_func = Func(device_to_host, host_to_device, self.host_buffer)
if jit_unspill:
self.device_host_func = Func(
pxy_obj_device_to_host, pxy_obj_host_to_device, self.host_buffer
)
else:
self.device_host_func = Func(
device_to_host, host_to_device, self.host_buffer
)
self.device_buffer = Buffer(
self.device_func, self.device_host_func, device_memory_limit, weight=weight
)
Expand Down
9 changes: 9 additions & 0 deletions dask_cuda/local_cuda_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ class LocalCUDACluster(LocalCluster):
but in that case with default (non-managed) memory type.
WARNING: managed memory is currently incompatible with NVLink, trying
to enable both will result in an exception.
jit_unspill: bool
If True, enable just-in-time unspilling (see proxy_object.ProxyObject).

Examples
--------
Expand Down Expand Up @@ -133,6 +135,7 @@ def __init__(
ucx_net_devices=None,
rmm_pool_size=None,
rmm_managed_memory=False,
jit_unspill=None,
**kwargs,
):
# Required by RAPIDS libraries (e.g., cuDF) to ensure no context
Expand Down Expand Up @@ -182,6 +185,11 @@ def __init__(
"Processes are necessary in order to use multiple GPUs with Dask"
)

if jit_unspill is None:
self.jit_unspill = dask.config.get("jit-unspill", default=False)
else:
self.jit_unspill = jit_unspill

if data is None:
data = (
DeviceHostFile,
Expand All @@ -191,6 +199,7 @@ def __init__(
"local_directory": local_directory
or dask.config.get("temporary-directory")
or os.getcwd(),
"jit_unspill": self.jit_unspill,
},
)

Expand Down
Loading