From 3c8117f1f3a9b0349ca5605e5125a3a9d5404563 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Thu, 6 Aug 2020 10:23:06 +0200 Subject: [PATCH 01/49] Initial implementing of ObjectProxy --- dask_cuda/device_host_file.py | 65 ++++++------------- dask_cuda/proxy_object.py | 117 ++++++++++++++++++++++++++++++++++ 2 files changed, 137 insertions(+), 45 deletions(-) create mode 100644 dask_cuda/proxy_object.py diff --git a/dask_cuda/device_host_file.py b/dask_cuda/device_host_file.py index 90cdd869d..549fc20bb 100644 --- a/dask_cuda/device_host_file.py +++ b/dask_cuda/device_host_file.py @@ -5,60 +5,22 @@ from zict.common import ZictBase import dask -from distributed.protocol import ( - dask_deserialize, - dask_serialize, - deserialize, - deserialize_bytes, - serialize, - serialize_bytelist, -) -from distributed.utils import nbytes +from distributed.protocol import deserialize_bytes, serialize_bytelist from distributed.worker import weight +from . import proxy_object from .is_device_object import is_device_object from .utils import nvtx_annotate -class DeviceSerialized: - """ Store device object on the host - - This stores a device-side object as - - 1. A msgpack encodable header - 2. A list of `bytes`-like objects (like NumPy arrays) - that are in host memory - """ - - def __init__(self, header, frames): - self.header = header - self.frames = frames - - def __sizeof__(self): - return sum(map(nbytes, self.frames)) - - -@dask_serialize.register(DeviceSerialized) -def device_serialize(obj): - header = {"obj-header": obj.header} - frames = obj.frames - return header, frames - - -@dask_deserialize.register(DeviceSerialized) -def device_deserialize(header, frames): - return DeviceSerialized(header["obj-header"], frames) - - @nvtx_annotate("SPILL_D2H", color="red", domain="dask_cuda") -def device_to_host(obj: object) -> DeviceSerialized: - header, frames = serialize(obj, serializers=["dask", "pickle"]) - return DeviceSerialized(header, frames) +def device_to_host(obj: object) -> proxy_object.ObjectProxy: + return proxy_object.asproxy(obj) @nvtx_annotate("SPILL_H2D", color="green", domain="dask_cuda") -def host_to_device(s: DeviceSerialized) -> object: - return deserialize(s.header, s.frames) +def host_to_device(s: proxy_object.ObjectProxy) -> object: + return s class DeviceHostFile(ZictBase): @@ -129,7 +91,7 @@ def __setitem__(self, key, value): else: self.host_buffer[key] = value - def __getitem__(self, key): + def _1_getitem__(self, key): if key in self.device_keys: return self.device_buffer[key] elif key in self.host_buffer: @@ -137,6 +99,19 @@ def __getitem__(self, key): else: raise KeyError(key) + def __getitem__(self, key): + if key in self.device_keys: + ret = self.device_buffer[key] + elif key in self.host_buffer: + ret = self.host_buffer[key] + else: + raise KeyError(key) + + # if hasattr(ret, "_obj_pxy_deserialize"): + # ret = ret._obj_pxy_deserialize() + # return proxy_object.asproxy(ret, serialize_obj=False) + return ret + def __len__(self): return len(self.device_buffer) diff --git a/dask_cuda/proxy_object.py b/dask_cuda/proxy_object.py new file mode 100644 index 000000000..a96f7788c --- /dev/null +++ b/dask_cuda/proxy_object.py @@ -0,0 +1,117 @@ +import pickle + +import dask +import distributed.protocol + +# List of attributes that should be copied to the proxy at creation, which makes +# them accessible without deserialization of the proxied object +_FIXED_ATTRS = ["name"] + + +def asproxy(obj, serialize_obj=True, serializers=["dask", "pickle"]): + if hasattr(obj, "_obj_pxy"): + return obj # Already a proxy object + + fixed_attr = {} + for attr in _FIXED_ATTRS: + try: + fixed_attr[attr] = getattr(obj, attr) + except AttributeError: + pass + + orig_obj = obj + if serialize_obj: + obj = distributed.protocol.serialize(obj, serializers=serializers) + + return ObjectProxy( + obj=obj, + is_serialized=serialize_obj, + fixed_attr=fixed_attr, + type_serialized=pickle.dumps(type(orig_obj)), + typename=dask.utils.typename(type(orig_obj)), + serializers=serializers, + ) + + +class ObjectProxy: + __slots__ = [ + "_obj_pxy", # A dict that holds the state of the proxy object + "__obj_pxy_cache", # A dict used for caching attributes + ] + + def __init__( + self, obj, is_serialized, fixed_attr, type_serialized, typename, serializers + ): + self._obj_pxy = { + "obj": obj, + "is_serialized": is_serialized, + "fixed_attr": fixed_attr, + "type_serialized": type_serialized, + "typename": typename, + "serializers": serializers, + } + self.__obj_pxy_cache = {} + + def _obj_pxy_get_meta(self): + return {k: self._obj_pxy[k] for k in self._obj_pxy.keys() if k != "obj"} + + def _obj_pxy_serialize(self): + if not self._obj_pxy["is_serialized"]: + self._obj_pxy["obj"] = distributed.protocol.serialize( + self._obj_pxy["obj"], self._obj_pxy["serializers"] + ) + self._obj_pxy["is_serialized"] = True + return self._obj_pxy["obj"] + + def _obj_pxy_deserialize(self): + if self._obj_pxy["is_serialized"]: + header, frames = self._obj_pxy["obj"] + self._obj_pxy["obj"] = distributed.protocol.deserialize(header, frames) + self._obj_pxy["is_serialized"] = False + return self._obj_pxy["obj"] + + def __getattr__(self, name): + typename = self._obj_pxy["typename"] + if name in _FIXED_ATTRS: + try: + return self._obj_pxy["fixed_attr"][name] + except KeyError: + raise AttributeError( + f"type object '{typename}' has no attribute '{name}'" + ) + + return getattr(self._obj_pxy_deserialize(), name) + + def __str__(self): + return str(self._obj_pxy_deserialize()) + + def __repr__(self): + typename = self._obj_pxy["typename"] + ret = f"<{dask.utils.typename(type(self))} at {hex(id(self))} for {typename}" + if self._obj_pxy["is_serialized"]: + ret += " (serialized)>" + else: + ret += f" at {hex(id(self._obj_pxy['obj']))}>" + return ret + + @property + def __class__(self): + try: + return self.__obj_pxy_cache["type_serialized"] + except KeyError: + ret = pickle.loads(self._obj_pxy["type_serialized"]) + self.__obj_pxy_cache["type_serialized"] = ret + return ret + + +@distributed.protocol.dask_serialize.register(ObjectProxy) +def obj_pxy_dask_serialize(obj: ObjectProxy): + header, frames = obj._obj_pxy_serialize() + return {"proxied-header": header, "obj-pxy-meta": obj._obj_pxy_get_meta()}, frames + + +@distributed.protocol.dask_deserialize.register(ObjectProxy) +def obj_pxy_dask_deserialize(header, frames): + return ObjectProxy( + obj=(header["proxied-header"], frames), **header["obj-pxy-meta"], + ) From 12e1f6c7d5b0a8c34b1aebc592d859988dcaf551 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Thu, 6 Aug 2020 10:23:33 +0200 Subject: [PATCH 02/49] Added basic tests of ObjectProxy --- dask_cuda/tests/test_proxy.py | 58 +++++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) create mode 100644 dask_cuda/tests/test_proxy.py diff --git a/dask_cuda/tests/test_proxy.py b/dask_cuda/tests/test_proxy.py new file mode 100644 index 000000000..83fbb0493 --- /dev/null +++ b/dask_cuda/tests/test_proxy.py @@ -0,0 +1,58 @@ +import pytest +from pandas.testing import assert_frame_equal + +from distributed import Client +from distributed.protocol.serialize import deserialize, serialize + +import dask_cudf + +import dask_cuda +from dask_cuda import proxy_object + + +@pytest.mark.parametrize("serialize_obj", [True, False]) +def test_proxy_object_of_cudf(serialize_obj): + """Check that a proxied cudf dataframe is behaviors as a regular dataframe""" + cudf = pytest.importorskip("cudf") + df = cudf.DataFrame({"a": range(10)}) + pxy = proxy_object.asproxy(df, serialize_obj=serialize_obj) + assert_frame_equal(df.to_pandas(), pxy.to_pandas()) + + +@pytest.mark.parametrize("serialize_obj", [True, False]) +@pytest.mark.parametrize( + "serializers", [["dask", "pickle"], ["cuda", "dask", "pickle"]] +) +def test_serialize_of_proxied_cudf(serialize_obj, serializers): + """Check that we can serialize a proxied cudf dataframe, which might + be serialized already. + """ + cudf = pytest.importorskip("cudf") + + if "cuda" in serializers: + pytest.skip("cuda serializer support not implemented") + + df = cudf.DataFrame({"a": range(10)}) + pxy = proxy_object.asproxy(df, serialize_obj=serialize_obj) + header, frames = serialize(pxy, serializers=serializers) + pxy = deserialize(header, frames) + assert_frame_equal(df.to_pandas(), pxy.to_pandas()) + + +def test_spilling_local_cuda_cluster(): + """Testing spelling of a proxied cudf dataframe in a local cuda cluster""" + cudf = pytest.importorskip("cudf") + + def task(x): + assert isinstance(x, cudf.DataFrame) + assert x.size == 10 # Trigger deserialization + return x + + # Notice, setting `device_memory_limit=1` to trigger spilling + with dask_cuda.LocalCUDACluster(n_workers=1, device_memory_limit=1) as cluster: + with Client(cluster): + df = cudf.DataFrame({"a": range(10)}) + ddf = dask_cudf.from_cudf(df, npartitions=1) + ddf = ddf.map_partitions(task, meta=df.head()) + got = ddf.compute() + assert_frame_equal(got.to_pandas(), df.to_pandas()) From 0303db4023c1a658ca44c593db3d22f92c03b7bb Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Thu, 6 Aug 2020 03:11:25 -0700 Subject: [PATCH 03/49] cleanup --- dask_cuda/device_host_file.py | 19 ++++--------------- 1 file changed, 4 insertions(+), 15 deletions(-) diff --git a/dask_cuda/device_host_file.py b/dask_cuda/device_host_file.py index 549fc20bb..b78a6eefa 100644 --- a/dask_cuda/device_host_file.py +++ b/dask_cuda/device_host_file.py @@ -15,11 +15,13 @@ @nvtx_annotate("SPILL_D2H", color="red", domain="dask_cuda") def device_to_host(obj: object) -> proxy_object.ObjectProxy: - return proxy_object.asproxy(obj) + return proxy_object.asproxy(obj, serialize_obj=True) @nvtx_annotate("SPILL_H2D", color="green", domain="dask_cuda") def host_to_device(s: proxy_object.ObjectProxy) -> object: + # Notice, we do _not_ deserialize at this point. The proxy + # object automatically deserialize just-in-time. return s @@ -91,7 +93,7 @@ def __setitem__(self, key, value): else: self.host_buffer[key] = value - def _1_getitem__(self, key): + def __getitem__(self, key): if key in self.device_keys: return self.device_buffer[key] elif key in self.host_buffer: @@ -99,19 +101,6 @@ def _1_getitem__(self, key): else: raise KeyError(key) - def __getitem__(self, key): - if key in self.device_keys: - ret = self.device_buffer[key] - elif key in self.host_buffer: - ret = self.host_buffer[key] - else: - raise KeyError(key) - - # if hasattr(ret, "_obj_pxy_deserialize"): - # ret = ret._obj_pxy_deserialize() - # return proxy_object.asproxy(ret, serialize_obj=False) - return ret - def __len__(self): return len(self.device_buffer) From 411bce85d09359b73e1427c9a70db39b5e70b028 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Thu, 6 Aug 2020 13:53:32 +0200 Subject: [PATCH 04/49] Implemented some more proxy attributes --- dask_cuda/proxy_object.py | 27 +++++++++++++++++++++++++++ dask_cuda/tests/test_proxy.py | 15 +++++++++++++++ 2 files changed, 42 insertions(+) diff --git a/dask_cuda/proxy_object.py b/dask_cuda/proxy_object.py index a96f7788c..1e3588c2d 100644 --- a/dask_cuda/proxy_object.py +++ b/dask_cuda/proxy_object.py @@ -94,6 +94,33 @@ def __repr__(self): ret += f" at {hex(id(self._obj_pxy['obj']))}>" return ret + def __len__(self): + return len(self._obj_pxy_deserialize()) + + def __contains__(self, value): + return value in self._obj_pxy_deserialize() + + def __getitem__(self, key): + return self._obj_pxy_deserialize()[key] + + def __setitem__(self, key, value): + self._obj_pxy_deserialize()[key] = value + + def __delitem__(self, key): + del self._obj_pxy_deserialize()[key] + + def __getslice__(self, i, j): + return self._obj_pxy_deserialize()[i:j] + + def __setslice__(self, i, j, value): + self._obj_pxy_deserialize()[i:j] = value + + def __delslice__(self, i, j): + del self._obj_pxy_deserialize()[i:j] + + def __iter__(self): + return iter(self._obj_pxy_deserialize()) + @property def __class__(self): try: diff --git a/dask_cuda/tests/test_proxy.py b/dask_cuda/tests/test_proxy.py index 83fbb0493..dc235a961 100644 --- a/dask_cuda/tests/test_proxy.py +++ b/dask_cuda/tests/test_proxy.py @@ -10,6 +10,21 @@ from dask_cuda import proxy_object +@pytest.mark.parametrize("serialize_obj", [True, False]) +def test_proxy_object(serialize_obj): + """Check "transparency" of the proxy object""" + + org = list(range(10)) + pxy = proxy_object.asproxy(org, serialize_obj=serialize_obj) + + assert len(org) == len(pxy) + assert org[0] == pxy[0] + assert 1 in pxy + assert -1 not in pxy + + # TODO: check operators (when implemented) + + @pytest.mark.parametrize("serialize_obj", [True, False]) def test_proxy_object_of_cudf(serialize_obj): """Check that a proxied cudf dataframe is behaviors as a regular dataframe""" From 7c528d06cecd9c3d58bc30a5e61702d066859fb7 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Thu, 6 Aug 2020 15:27:47 +0200 Subject: [PATCH 05/49] Added spilling of proxy object optional --- dask_cuda/device_host_file.py | 54 +++++++++++++++++++-- dask_cuda/explicit_comms/dataframe_merge.py | 3 +- dask_cuda/local_cuda_cluster.py | 7 +++ dask_cuda/tests/test_proxy.py | 7 ++- 4 files changed, 63 insertions(+), 8 deletions(-) diff --git a/dask_cuda/device_host_file.py b/dask_cuda/device_host_file.py index b78a6eefa..596b5d1ef 100644 --- a/dask_cuda/device_host_file.py +++ b/dask_cuda/device_host_file.py @@ -5,7 +5,13 @@ from zict.common import ZictBase import dask -from distributed.protocol import deserialize_bytes, serialize_bytelist +from distributed.protocol import ( + deserialize, + deserialize_bytes, + serialize, + serialize_bytelist, +) +from distributed.utils import nbytes from distributed.worker import weight from . import proxy_object @@ -13,13 +19,40 @@ from .utils import nvtx_annotate +class DeviceSerialized: + """ Store device object on the host + This stores a device-side object as + 1. A msgpack encodable header + 2. A list of `bytes`-like objects (like NumPy arrays) + that are in host memory + """ + + def __init__(self, header, frames): + self.header = header + self.frames = frames + + def __sizeof__(self): + return sum(map(nbytes, self.frames)) + + +@nvtx_annotate("SPILL_D2H", color="red", domain="dask_cuda") +def device_to_host(obj: object) -> DeviceSerialized: + header, frames = serialize(obj, serializers=["dask", "pickle"]) + return DeviceSerialized(header, frames) + + +@nvtx_annotate("SPILL_H2D", color="green", domain="dask_cuda") +def host_to_device(s: DeviceSerialized) -> object: + return deserialize(s.header, s.frames) + + @nvtx_annotate("SPILL_D2H", color="red", domain="dask_cuda") -def device_to_host(obj: object) -> proxy_object.ObjectProxy: +def pxy_obj_device_to_host(obj: object) -> proxy_object.ObjectProxy: return proxy_object.asproxy(obj, serialize_obj=True) @nvtx_annotate("SPILL_H2D", color="green", domain="dask_cuda") -def host_to_device(s: proxy_object.ObjectProxy) -> object: +def pxy_obj_host_to_device(s: proxy_object.ObjectProxy) -> object: # Notice, we do _not_ deserialize at this point. The proxy # object automatically deserialize just-in-time. return s @@ -48,7 +81,11 @@ class DeviceHostFile(ZictBase): """ def __init__( - self, device_memory_limit=None, memory_limit=None, local_directory=None, + self, + device_memory_limit=None, + memory_limit=None, + local_directory=None, + spill_proxy=False, ): if local_directory is None: local_directory = dask.config.get("temporary-directory") or os.getcwd() @@ -74,7 +111,14 @@ def __init__( self.device_keys = set() self.device_func = dict() - self.device_host_func = Func(device_to_host, host_to_device, self.host_buffer) + if spill_proxy: + self.device_host_func = Func( + pxy_obj_device_to_host, pxy_obj_host_to_device, self.host_buffer + ) + else: + self.device_host_func = Func( + device_to_host, host_to_device, self.host_buffer + ) self.device_buffer = Buffer( self.device_func, self.device_host_func, device_memory_limit, weight=weight ) diff --git a/dask_cuda/explicit_comms/dataframe_merge.py b/dask_cuda/explicit_comms/dataframe_merge.py index ad5ad8af5..91e0e3e44 100644 --- a/dask_cuda/explicit_comms/dataframe_merge.py +++ b/dask_cuda/explicit_comms/dataframe_merge.py @@ -63,9 +63,10 @@ def concat(df_list): return None else: typ = str(type(df_list[0])) - if 'cudf' in typ: + if "cudf" in typ: # delay import of cudf to handle CPU only tests import cudf + return cudf.concat(df_list) else: return pandas.concat(df_list) diff --git a/dask_cuda/local_cuda_cluster.py b/dask_cuda/local_cuda_cluster.py index 6843f2579..d750aa31a 100644 --- a/dask_cuda/local_cuda_cluster.py +++ b/dask_cuda/local_cuda_cluster.py @@ -150,6 +150,7 @@ def __init__( ucx_net_devices=None, rmm_pool_size=None, rmm_managed_memory=False, + spill_proxy=None, **kwargs, ): if CUDA_VISIBLE_DEVICES is None: @@ -188,6 +189,11 @@ def __init__( elif isinstance(self.device_memory_limit, str): self.device_memory_limit = parse_bytes(self.device_memory_limit) + if spill_proxy is None: + self.spill_proxy = dask.config.get("spill-proxy", default=False) + else: + self.spill_proxy = spill_proxy + if data is None: data = ( DeviceHostFile, @@ -197,6 +203,7 @@ def __init__( "local_directory": local_directory or dask.config.get("temporary-directory") or os.getcwd(), + "spill_proxy": self.spill_proxy, }, ) diff --git a/dask_cuda/tests/test_proxy.py b/dask_cuda/tests/test_proxy.py index dc235a961..e16fe4bce 100644 --- a/dask_cuda/tests/test_proxy.py +++ b/dask_cuda/tests/test_proxy.py @@ -54,7 +54,8 @@ def test_serialize_of_proxied_cudf(serialize_obj, serializers): assert_frame_equal(df.to_pandas(), pxy.to_pandas()) -def test_spilling_local_cuda_cluster(): +@pytest.mark.parametrize("spill_proxy", [True, False]) +def test_spilling_local_cuda_cluster(spill_proxy): """Testing spelling of a proxied cudf dataframe in a local cuda cluster""" cudf = pytest.importorskip("cudf") @@ -64,7 +65,9 @@ def task(x): return x # Notice, setting `device_memory_limit=1` to trigger spilling - with dask_cuda.LocalCUDACluster(n_workers=1, device_memory_limit=1) as cluster: + with dask_cuda.LocalCUDACluster( + n_workers=1, device_memory_limit=1, spill_proxy=spill_proxy + ) as cluster: with Client(cluster): df = cudf.DataFrame({"a": range(10)}) ddf = dask_cudf.from_cudf(df, npartitions=1) From de36de99e688d8b6dea4f3d1a295f6ff15b738a5 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Fri, 7 Aug 2020 09:14:02 +0200 Subject: [PATCH 06/49] Re-added dask_serialize for DeviceSerialized --- dask_cuda/device_host_file.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/dask_cuda/device_host_file.py b/dask_cuda/device_host_file.py index 005dac5d1..990584bf6 100644 --- a/dask_cuda/device_host_file.py +++ b/dask_cuda/device_host_file.py @@ -6,6 +6,8 @@ import dask from distributed.protocol import ( + dask_deserialize, + dask_serialize, deserialize, deserialize_bytes, serialize, @@ -40,6 +42,18 @@ def __reduce_ex__(self, protocol): return device_deserialize, (header, frames) +@dask_serialize.register(DeviceSerialized) +def device_serialize(obj): + header = {"obj-header": obj.header} + frames = obj.frames + return header, frames + + +@dask_deserialize.register(DeviceSerialized) +def device_deserialize(header, frames): + return DeviceSerialized(header["obj-header"], frames) + + @nvtx_annotate("SPILL_D2H", color="red", domain="dask_cuda") def device_to_host(obj: object) -> DeviceSerialized: header, frames = serialize(obj, serializers=["dask", "pickle"]) From b4eb34459f067470f414cf33af720a4d89482064 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Fri, 7 Aug 2020 09:40:28 +0200 Subject: [PATCH 07/49] Added support of __array__ --- dask_cuda/proxy_object.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/dask_cuda/proxy_object.py b/dask_cuda/proxy_object.py index 1e3588c2d..fefd39afa 100644 --- a/dask_cuda/proxy_object.py +++ b/dask_cuda/proxy_object.py @@ -121,6 +121,10 @@ def __delslice__(self, i, j): def __iter__(self): return iter(self._obj_pxy_deserialize()) + def __array__(self): + ret = getattr(self._obj_pxy_deserialize(), "__array__")() + return ret + @property def __class__(self): try: From 521d3cabc3862022ceb8651a72f61743953c466e Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Fri, 7 Aug 2020 10:09:28 +0200 Subject: [PATCH 08/49] Added __sizeof__ --- dask_cuda/proxy_object.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/dask_cuda/proxy_object.py b/dask_cuda/proxy_object.py index fefd39afa..3deb184d9 100644 --- a/dask_cuda/proxy_object.py +++ b/dask_cuda/proxy_object.py @@ -1,7 +1,9 @@ import pickle +import sys import dask import distributed.protocol +import distributed.utils # List of attributes that should be copied to the proxy at creation, which makes # them accessible without deserialization of the proxied object @@ -134,6 +136,13 @@ def __class__(self): self.__obj_pxy_cache["type_serialized"] = ret return ret + def __sizeof__(self): + if self._obj_pxy["is_serialized"]: + frames = self._obj_pxy["obj"][1] + return sum(map(distributed.utils.nbytes, frames)) + else: + return sys.getsizeof(self._obj_pxy_deserialize()) + @distributed.protocol.dask_serialize.register(ObjectProxy) def obj_pxy_dask_serialize(obj: ObjectProxy): From 33b2bd6bb92c8213df8ddc9c4626da162a093b1f Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Fri, 7 Aug 2020 10:10:05 +0200 Subject: [PATCH 09/49] Added some spill_proxy tests in test_device_host_file.py --- dask_cuda/tests/test_device_host_file.py | 43 +++++++++++++++--------- 1 file changed, 28 insertions(+), 15 deletions(-) diff --git a/dask_cuda/tests/test_device_host_file.py b/dask_cuda/tests/test_device_host_file.py index 5e0fb949c..f8620d490 100644 --- a/dask_cuda/tests/test_device_host_file.py +++ b/dask_cuda/tests/test_device_host_file.py @@ -4,12 +4,11 @@ import numpy as np import pytest -import dask -from dask import array as da +import dask.array from distributed.protocol import ( deserialize, - serialize, deserialize_bytes, + serialize, serialize_bytelist, ) from distributed.protocol.pickle import HIGHEST_PROTOCOL @@ -23,6 +22,12 @@ cupy = pytest.importorskip("cupy") +def assert_eq(x, y): + # Explicitly calling "cupy.asnumpy" to support `ObjectProxy` because + # "cupy" is hardcoded in `dask.array.normalize_to_array()` + return dask.array.assert_eq(cupy.asnumpy(x), cupy.asnumpy(y)) + + def test_device_host_file_config(tmp_path): dhf_disk_path = str(tmp_path / "dask-worker-space" / "storage") with dask.config.set(temporary_directory=str(tmp_path)): @@ -34,13 +39,17 @@ def test_device_host_file_config(tmp_path): @pytest.mark.parametrize("num_host_arrays", [1, 10, 100]) @pytest.mark.parametrize("num_device_arrays", [1, 10, 100]) @pytest.mark.parametrize("array_size_range", [(1, 1000), (100, 100), (1000, 1000)]) +@pytest.mark.parametrize("spill_proxy", [True, False]) def test_device_host_file_short( - tmp_path, num_device_arrays, num_host_arrays, array_size_range + tmp_path, num_device_arrays, num_host_arrays, array_size_range, spill_proxy ): tmpdir = tmp_path / "storage" tmpdir.mkdir() dhf = DeviceHostFile( - device_memory_limit=1024 * 16, memory_limit=1024 * 16, local_directory=tmpdir + device_memory_limit=1024 * 16, + memory_limit=1024 * 16, + local_directory=tmpdir, + spill_proxy=spill_proxy, ) host = [ @@ -64,7 +73,7 @@ def test_device_host_file_short( for k, original in full: acquired = dhf[k] - da.assert_eq(original, acquired) + assert_eq(original, acquired) del dhf[k] assert set(dhf.device.keys()) == set() @@ -72,11 +81,15 @@ def test_device_host_file_short( assert set(dhf.disk.keys()) == set() -def test_device_host_file_step_by_step(tmp_path): +@pytest.mark.parametrize("spill_proxy", [True, False]) +def test_device_host_file_step_by_step(tmp_path, spill_proxy): tmpdir = tmp_path / "storage" tmpdir.mkdir() dhf = DeviceHostFile( - device_memory_limit=1024 * 16, memory_limit=1024 * 16, local_directory=tmpdir + device_memory_limit=1024 * 16, + memory_limit=1024 * 16, + local_directory=tmpdir, + spill_proxy=spill_proxy, ) a = np.random.random(1000) @@ -119,17 +132,17 @@ def test_device_host_file_step_by_step(tmp_path): assert set(dhf.host.keys()) == set(["a2", "b2"]) assert set(dhf.disk.keys()) == set(["a1", "b1"]) - da.assert_eq(dhf["a1"], a) + assert_eq(dhf["a1"], a) del dhf["a1"] - da.assert_eq(dhf["a2"], a) + assert_eq(dhf["a2"], a) del dhf["a2"] - da.assert_eq(dhf["b1"], b) + assert_eq(dhf["b1"], b) del dhf["b1"] - da.assert_eq(dhf["b2"], b) + assert_eq(dhf["b2"], b) del dhf["b2"] - da.assert_eq(dhf["b3"], b) + assert_eq(dhf["b3"], b) del dhf["b3"] - da.assert_eq(dhf["b4"], b) + assert_eq(dhf["b4"], b) del dhf["b4"] assert set(dhf.device.keys()) == set() @@ -152,7 +165,7 @@ def test_serialize_cupy_collection(collection, length, value): assert_func = dd.assert_eq else: x = cupy.arange(10) - assert_func = da.assert_eq + assert_func = assert_eq if length == 0: obj = device_to_host(x) From cc1df47dafa2beb2d0c7cc3b2a6f893d6a69c13d Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Fri, 7 Aug 2020 01:21:05 -0700 Subject: [PATCH 10/49] Checking len() instead of .size() --- dask_cuda/tests/test_proxy.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_cuda/tests/test_proxy.py b/dask_cuda/tests/test_proxy.py index e16fe4bce..cdeca7169 100644 --- a/dask_cuda/tests/test_proxy.py +++ b/dask_cuda/tests/test_proxy.py @@ -61,7 +61,7 @@ def test_spilling_local_cuda_cluster(spill_proxy): def task(x): assert isinstance(x, cudf.DataFrame) - assert x.size == 10 # Trigger deserialization + assert len(x) == 10 # Trigger deserialization return x # Notice, setting `device_memory_limit=1` to trigger spilling From f6c11f160bd84c8cae08865420a5cb46812ce424 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Fri, 7 Aug 2020 03:25:31 -0700 Subject: [PATCH 11/49] Added dispatch support of hash_object_dispatch and group_split_dispatch --- dask_cuda/proxy_object.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/dask_cuda/proxy_object.py b/dask_cuda/proxy_object.py index 3deb184d9..f043b259a 100644 --- a/dask_cuda/proxy_object.py +++ b/dask_cuda/proxy_object.py @@ -2,6 +2,7 @@ import sys import dask +import dask.dataframe.utils import distributed.protocol import distributed.utils @@ -155,3 +156,13 @@ def obj_pxy_dask_deserialize(header, frames): return ObjectProxy( obj=(header["proxied-header"], frames), **header["obj-pxy-meta"], ) + + +@dask.dataframe.utils.hash_object_dispatch.register(ObjectProxy) +def obj_pxy_hash_object(obj: ObjectProxy, *args, **kwargs): + return dask.dataframe.utils.hash_object_dispatch(obj._obj_pxy_deserialize()) + + +@dask.dataframe.utils.group_split_dispatch.register(ObjectProxy) +def obj_pxy_group_split(obj: ObjectProxy, *args, **kwargs): + return dask.dataframe.utils.hash_object_dispatch(obj._obj_pxy_deserialize()) From 349a3939f2a3ba9247f85c663d75308ef58209b7 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Fri, 7 Aug 2020 05:36:14 -0700 Subject: [PATCH 12/49] Added "*args, **kwargs" to dispatch of ObjectProxy --- dask_cuda/proxy_object.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/dask_cuda/proxy_object.py b/dask_cuda/proxy_object.py index f043b259a..32491306b 100644 --- a/dask_cuda/proxy_object.py +++ b/dask_cuda/proxy_object.py @@ -160,9 +160,13 @@ def obj_pxy_dask_deserialize(header, frames): @dask.dataframe.utils.hash_object_dispatch.register(ObjectProxy) def obj_pxy_hash_object(obj: ObjectProxy, *args, **kwargs): - return dask.dataframe.utils.hash_object_dispatch(obj._obj_pxy_deserialize()) + return dask.dataframe.utils.hash_object_dispatch( + obj._obj_pxy_deserialize(), *args, **kwargs + ) @dask.dataframe.utils.group_split_dispatch.register(ObjectProxy) def obj_pxy_group_split(obj: ObjectProxy, *args, **kwargs): - return dask.dataframe.utils.hash_object_dispatch(obj._obj_pxy_deserialize()) + return dask.dataframe.utils.hash_object_dispatch( + obj._obj_pxy_deserialize(), *args, **kwargs + ) From 68ff7ed11c1bc20267530c022a41d33db5f83a9b Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Fri, 7 Aug 2020 05:37:19 -0700 Subject: [PATCH 13/49] Added dispatch of make_scalar --- dask_cuda/proxy_object.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/dask_cuda/proxy_object.py b/dask_cuda/proxy_object.py index 32491306b..1eb8cc21e 100644 --- a/dask_cuda/proxy_object.py +++ b/dask_cuda/proxy_object.py @@ -170,3 +170,8 @@ def obj_pxy_group_split(obj: ObjectProxy, *args, **kwargs): return dask.dataframe.utils.hash_object_dispatch( obj._obj_pxy_deserialize(), *args, **kwargs ) + + +@dask.dataframe.utils.make_scalar.register(ObjectProxy) +def obj_pxy_make_scalar(obj: ObjectProxy, *args, **kwargs): + return dask.dataframe.utils.make_scalar(obj._obj_pxy_deserialize(), *args, **kwargs) From d47f6fda5f39c4759cdeb1973af507cf0f93ed10 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Fri, 7 Aug 2020 05:38:20 -0700 Subject: [PATCH 14/49] Added dispatch of concat_dispatch --- dask_cuda/proxy_object.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/dask_cuda/proxy_object.py b/dask_cuda/proxy_object.py index 1eb8cc21e..dafd2e1a5 100644 --- a/dask_cuda/proxy_object.py +++ b/dask_cuda/proxy_object.py @@ -3,6 +3,7 @@ import dask import dask.dataframe.utils +import dask.dataframe.methods import distributed.protocol import distributed.utils @@ -175,3 +176,14 @@ def obj_pxy_group_split(obj: ObjectProxy, *args, **kwargs): @dask.dataframe.utils.make_scalar.register(ObjectProxy) def obj_pxy_make_scalar(obj: ObjectProxy, *args, **kwargs): return dask.dataframe.utils.make_scalar(obj._obj_pxy_deserialize(), *args, **kwargs) + + +@dask.dataframe.methods.concat_dispatch.register(ObjectProxy) +def obj_pxy_concat(objs, *args, **kwargs): + # Deserialize concat inputs (in-place) + for i in range(len(objs)): + try: + objs[i] = objs[i]._obj_pxy_deserialize() + except AttributeError: + pass + return dask.dataframe.methods.concat(objs, *args, **kwargs) From d93f8c570c301c66ad5cc9ca0781cd7cfefbf289 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Mon, 14 Sep 2020 08:23:15 +0200 Subject: [PATCH 15/49] meta.yaml: added pandas dependency --- conda/recipes/dask-cuda/meta.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/conda/recipes/dask-cuda/meta.yaml b/conda/recipes/dask-cuda/meta.yaml index ece107eb8..5f0d53754 100644 --- a/conda/recipes/dask-cuda/meta.yaml +++ b/conda/recipes/dask-cuda/meta.yaml @@ -29,6 +29,7 @@ requirements: - pynvml >=8.0.3 - numpy >=1.16.0 - numba >=0.50.0,!=0.51.0 + - pandas >=1.0,<1.2.0dev0 test: imports: From 3756b0a61a23dbce95288664e4e11e2e5e79661b Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Mon, 14 Sep 2020 08:40:06 +0200 Subject: [PATCH 16/49] meta.yaml: depend on dask (not only dask-core) --- conda/recipes/dask-cuda/meta.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/conda/recipes/dask-cuda/meta.yaml b/conda/recipes/dask-cuda/meta.yaml index 5f0d53754..28985a2bc 100644 --- a/conda/recipes/dask-cuda/meta.yaml +++ b/conda/recipes/dask-cuda/meta.yaml @@ -24,7 +24,7 @@ requirements: - setuptools run: - python x.x - - dask-core >=2.4.0 + - dask >=2.4.0 - distributed >=2.18.0 - pynvml >=8.0.3 - numpy >=1.16.0 From 40058cd5a443e1b0301643c30a22868bd8fea593 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Mon, 14 Sep 2020 11:08:11 -0700 Subject: [PATCH 17/49] Added jit-unspill worker option --- dask_cuda/cli/dask_cuda_worker.py | 7 +++++++ dask_cuda/cuda_worker.py | 7 +++++++ dask_cuda/device_host_file.py | 6 ++++-- dask_cuda/local_cuda_cluster.py | 12 +++++++----- dask_cuda/tests/test_device_host_file.py | 12 ++++++------ dask_cuda/tests/test_proxy.py | 6 +++--- 6 files changed, 34 insertions(+), 16 deletions(-) diff --git a/dask_cuda/cli/dask_cuda_worker.py b/dask_cuda/cli/dask_cuda_worker.py index 92c7ede23..b684dcb91 100755 --- a/dask_cuda/cli/dask_cuda_worker.py +++ b/dask_cuda/cli/dask_cuda_worker.py @@ -190,6 +190,11 @@ "InfiniBand only and will still cause unpredictable errors if not _ALL_ " "interfaces are connected and properly configured.", ) +@click.option( + "--enable-jit-unspill/--disable-jit-unspill", + default=None, # If not specified, use Dask config + help="Enable just-in-time unspilling", +) def main( scheduler, host, @@ -217,6 +222,7 @@ def main( enable_nvlink, enable_rdmacm, net_devices, + enable_jit_unspill, **kwargs, ): if tls_ca_file and tls_cert and tls_key: @@ -251,6 +257,7 @@ def main( enable_nvlink, enable_rdmacm, net_devices, + enable_jit_unspill, **kwargs, ) diff --git a/dask_cuda/cuda_worker.py b/dask_cuda/cuda_worker.py index bc25bae81..dfebaba45 100644 --- a/dask_cuda/cuda_worker.py +++ b/dask_cuda/cuda_worker.py @@ -71,6 +71,7 @@ def __init__( enable_nvlink=False, enable_rdmacm=False, net_devices=None, + jit_unspill=None, **kwargs, ): # Required by RAPIDS libraries (e.g., cuDF) to ensure no context @@ -177,6 +178,11 @@ def del_pid_file(): cuda_device_index=0, ) + if jit_unspill is None: + self.jit_unspill = dask.config.get("jit-unspill", default=False) + else: + self.jit_unspill = jit_unspill + self.nannies = [ t( scheduler, @@ -219,6 +225,7 @@ def del_pid_file(): else parse_bytes(device_memory_limit), "memory_limit": memory_limit, "local_directory": local_directory, + "jit_unspill": self.jit_unspill, }, ), **kwargs, diff --git a/dask_cuda/device_host_file.py b/dask_cuda/device_host_file.py index 990584bf6..deba17bec 100644 --- a/dask_cuda/device_host_file.py +++ b/dask_cuda/device_host_file.py @@ -97,6 +97,8 @@ class DeviceHostFile(ZictBase): implies no spilling to disk. local_directory: path Path where to store serialized objects on disk + jit_unspill: bool + If True, enable just-in-time unspilling (see proxy_object.ObjectProxy). """ def __init__( @@ -104,7 +106,7 @@ def __init__( device_memory_limit=None, memory_limit=None, local_directory=None, - spill_proxy=False, + jit_unspill=False, ): if local_directory is None: local_directory = dask.config.get("temporary-directory") or os.getcwd() @@ -130,7 +132,7 @@ def __init__( self.device_keys = set() self.device_func = dict() - if spill_proxy: + if jit_unspill: self.device_host_func = Func( pxy_obj_device_to_host, pxy_obj_host_to_device, self.host_buffer ) diff --git a/dask_cuda/local_cuda_cluster.py b/dask_cuda/local_cuda_cluster.py index ba4e1facd..8acafb28d 100644 --- a/dask_cuda/local_cuda_cluster.py +++ b/dask_cuda/local_cuda_cluster.py @@ -108,6 +108,8 @@ class LocalCUDACluster(LocalCluster): but in that case with default (non-managed) memory type. WARNING: managed memory is currently incompatible with NVLink, trying to enable both will result in an exception. + jit_unspill: bool + If True, enable just-in-time unspilling (see proxy_object.ObjectProxy). Examples -------- @@ -149,7 +151,7 @@ def __init__( ucx_net_devices=None, rmm_pool_size=None, rmm_managed_memory=False, - spill_proxy=None, + jit_unspill=None, **kwargs, ): # Required by RAPIDS libraries (e.g., cuDF) to ensure no context @@ -200,10 +202,10 @@ def __init__( elif isinstance(self.device_memory_limit, str): self.device_memory_limit = parse_bytes(self.device_memory_limit) - if spill_proxy is None: - self.spill_proxy = dask.config.get("spill-proxy", default=False) + if jit_unspill is None: + self.jit_unspill = dask.config.get("jit-unspill", default=False) else: - self.spill_proxy = spill_proxy + self.jit_unspill = jit_unspill if data is None: data = ( @@ -214,7 +216,7 @@ def __init__( "local_directory": local_directory or dask.config.get("temporary-directory") or os.getcwd(), - "spill_proxy": self.spill_proxy, + "jit_unspill": self.jit_unspill, }, ) diff --git a/dask_cuda/tests/test_device_host_file.py b/dask_cuda/tests/test_device_host_file.py index f8620d490..ab43354d0 100644 --- a/dask_cuda/tests/test_device_host_file.py +++ b/dask_cuda/tests/test_device_host_file.py @@ -39,9 +39,9 @@ def test_device_host_file_config(tmp_path): @pytest.mark.parametrize("num_host_arrays", [1, 10, 100]) @pytest.mark.parametrize("num_device_arrays", [1, 10, 100]) @pytest.mark.parametrize("array_size_range", [(1, 1000), (100, 100), (1000, 1000)]) -@pytest.mark.parametrize("spill_proxy", [True, False]) +@pytest.mark.parametrize("jit_unspill", [True, False]) def test_device_host_file_short( - tmp_path, num_device_arrays, num_host_arrays, array_size_range, spill_proxy + tmp_path, num_device_arrays, num_host_arrays, array_size_range, jit_unspill ): tmpdir = tmp_path / "storage" tmpdir.mkdir() @@ -49,7 +49,7 @@ def test_device_host_file_short( device_memory_limit=1024 * 16, memory_limit=1024 * 16, local_directory=tmpdir, - spill_proxy=spill_proxy, + jit_unspill=jit_unspill, ) host = [ @@ -81,15 +81,15 @@ def test_device_host_file_short( assert set(dhf.disk.keys()) == set() -@pytest.mark.parametrize("spill_proxy", [True, False]) -def test_device_host_file_step_by_step(tmp_path, spill_proxy): +@pytest.mark.parametrize("jit_unspill", [True, False]) +def test_device_host_file_step_by_step(tmp_path, jit_unspill): tmpdir = tmp_path / "storage" tmpdir.mkdir() dhf = DeviceHostFile( device_memory_limit=1024 * 16, memory_limit=1024 * 16, local_directory=tmpdir, - spill_proxy=spill_proxy, + jit_unspill=jit_unspill, ) a = np.random.random(1000) diff --git a/dask_cuda/tests/test_proxy.py b/dask_cuda/tests/test_proxy.py index cdeca7169..4c23ac3da 100644 --- a/dask_cuda/tests/test_proxy.py +++ b/dask_cuda/tests/test_proxy.py @@ -54,8 +54,8 @@ def test_serialize_of_proxied_cudf(serialize_obj, serializers): assert_frame_equal(df.to_pandas(), pxy.to_pandas()) -@pytest.mark.parametrize("spill_proxy", [True, False]) -def test_spilling_local_cuda_cluster(spill_proxy): +@pytest.mark.parametrize("jit_unspill", [True, False]) +def test_spilling_local_cuda_cluster(jit_unspill): """Testing spelling of a proxied cudf dataframe in a local cuda cluster""" cudf = pytest.importorskip("cudf") @@ -66,7 +66,7 @@ def task(x): # Notice, setting `device_memory_limit=1` to trigger spilling with dask_cuda.LocalCUDACluster( - n_workers=1, device_memory_limit=1, spill_proxy=spill_proxy + n_workers=1, device_memory_limit=1, jit_unspill=jit_unspill ) as cluster: with Client(cluster): df = cudf.DataFrame({"a": range(10)}) From 37b2ef854d3ade5736f47e3f2517ba1b05a720a4 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Thu, 17 Sep 2020 18:20:50 +0200 Subject: [PATCH 18/49] meta.yaml: removed pandas Co-authored-by: Peter Andreas Entschev --- conda/recipes/dask-cuda/meta.yaml | 1 - 1 file changed, 1 deletion(-) diff --git a/conda/recipes/dask-cuda/meta.yaml b/conda/recipes/dask-cuda/meta.yaml index 28985a2bc..8988de6a9 100644 --- a/conda/recipes/dask-cuda/meta.yaml +++ b/conda/recipes/dask-cuda/meta.yaml @@ -29,7 +29,6 @@ requirements: - pynvml >=8.0.3 - numpy >=1.16.0 - numba >=0.50.0,!=0.51.0 - - pandas >=1.0,<1.2.0dev0 test: imports: From 16879fcff14d22cbd20329f07ece8707016b26ad Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Thu, 1 Oct 2020 01:40:09 -0700 Subject: [PATCH 19/49] Using explicit args for the dispatch functions --- dask_cuda/proxy_object.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/dask_cuda/proxy_object.py b/dask_cuda/proxy_object.py index dafd2e1a5..bc777607c 100644 --- a/dask_cuda/proxy_object.py +++ b/dask_cuda/proxy_object.py @@ -155,27 +155,26 @@ def obj_pxy_dask_serialize(obj: ObjectProxy): @distributed.protocol.dask_deserialize.register(ObjectProxy) def obj_pxy_dask_deserialize(header, frames): return ObjectProxy( - obj=(header["proxied-header"], frames), **header["obj-pxy-meta"], + obj=(header["proxied-header"], frames), + **header["obj-pxy-meta"], ) @dask.dataframe.utils.hash_object_dispatch.register(ObjectProxy) -def obj_pxy_hash_object(obj: ObjectProxy, *args, **kwargs): - return dask.dataframe.utils.hash_object_dispatch( - obj._obj_pxy_deserialize(), *args, **kwargs - ) +def obj_pxy_hash_object(obj: ObjectProxy, index=True): + return dask.dataframe.utils.hash_object_dispatch(obj._obj_pxy_deserialize(), index) @dask.dataframe.utils.group_split_dispatch.register(ObjectProxy) -def obj_pxy_group_split(obj: ObjectProxy, *args, **kwargs): +def obj_pxy_group_split(obj: ObjectProxy, c, k, ignore_index=False): return dask.dataframe.utils.hash_object_dispatch( - obj._obj_pxy_deserialize(), *args, **kwargs + obj._obj_pxy_deserialize(), c, k, ignore_index ) @dask.dataframe.utils.make_scalar.register(ObjectProxy) -def obj_pxy_make_scalar(obj: ObjectProxy, *args, **kwargs): - return dask.dataframe.utils.make_scalar(obj._obj_pxy_deserialize(), *args, **kwargs) +def obj_pxy_make_scalar(obj: ObjectProxy): + return dask.dataframe.utils.make_scalar(obj._obj_pxy_deserialize()) @dask.dataframe.methods.concat_dispatch.register(ObjectProxy) From ac782b00481b644ed91730391046cd82dfac028b Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Fri, 2 Oct 2020 04:46:05 -0700 Subject: [PATCH 20/49] fixed typo --- dask_cuda/proxy_object.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_cuda/proxy_object.py b/dask_cuda/proxy_object.py index bc777607c..f35c13e5d 100644 --- a/dask_cuda/proxy_object.py +++ b/dask_cuda/proxy_object.py @@ -167,7 +167,7 @@ def obj_pxy_hash_object(obj: ObjectProxy, index=True): @dask.dataframe.utils.group_split_dispatch.register(ObjectProxy) def obj_pxy_group_split(obj: ObjectProxy, c, k, ignore_index=False): - return dask.dataframe.utils.hash_object_dispatch( + return dask.dataframe.utils.group_split_dispatch( obj._obj_pxy_deserialize(), c, k, ignore_index ) From 51d6aa57219df9f1f2110c113b00e3cbcd38447f Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Tue, 6 Oct 2020 14:35:23 +0200 Subject: [PATCH 21/49] ObjectProxy._obj_pxy_serialize(): takes serializers --- dask_cuda/proxy_object.py | 18 +++++++++++++++--- dask_cuda/tests/test_proxy.py | 4 +--- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/dask_cuda/proxy_object.py b/dask_cuda/proxy_object.py index f35c13e5d..317116a88 100644 --- a/dask_cuda/proxy_object.py +++ b/dask_cuda/proxy_object.py @@ -59,11 +59,12 @@ def __init__( def _obj_pxy_get_meta(self): return {k: self._obj_pxy[k] for k in self._obj_pxy.keys() if k != "obj"} - def _obj_pxy_serialize(self): + def _obj_pxy_serialize(self, serializers): if not self._obj_pxy["is_serialized"]: self._obj_pxy["obj"] = distributed.protocol.serialize( - self._obj_pxy["obj"], self._obj_pxy["serializers"] + self._obj_pxy["obj"], serializers ) + self._obj_pxy["serializers"] = serializers self._obj_pxy["is_serialized"] = True return self._obj_pxy["obj"] @@ -148,12 +149,23 @@ def __sizeof__(self): @distributed.protocol.dask_serialize.register(ObjectProxy) def obj_pxy_dask_serialize(obj: ObjectProxy): - header, frames = obj._obj_pxy_serialize() + """ + The generic serialization of ObjectProxy used by Dask when communicating + ObjectProxy. As serializers, it uses "dask" or "pickle", which means + that proxied CUDA objects are spilled to main memory before communicated. + """ + header, frames = obj._obj_pxy_serialize(serializers=["dask", "pickle"]) return {"proxied-header": header, "obj-pxy-meta": obj._obj_pxy_get_meta()}, frames @distributed.protocol.dask_deserialize.register(ObjectProxy) def obj_pxy_dask_deserialize(header, frames): + """ + The generic deserialization of ObjectProxy. Notice, it doesn't deserialize + thr proxied object at this time. When accessed, the proxied object are + deserialized using the same serializers that were used when the object was + serialized. + """ return ObjectProxy( obj=(header["proxied-header"], frames), **header["obj-pxy-meta"], diff --git a/dask_cuda/tests/test_proxy.py b/dask_cuda/tests/test_proxy.py index 4c23ac3da..868cd5713 100644 --- a/dask_cuda/tests/test_proxy.py +++ b/dask_cuda/tests/test_proxy.py @@ -35,9 +35,7 @@ def test_proxy_object_of_cudf(serialize_obj): @pytest.mark.parametrize("serialize_obj", [True, False]) -@pytest.mark.parametrize( - "serializers", [["dask", "pickle"], ["cuda", "dask", "pickle"]] -) +@pytest.mark.parametrize("serializers", [["dask"], ["cuda"]]) def test_serialize_of_proxied_cudf(serialize_obj, serializers): """Check that we can serialize a proxied cudf dataframe, which might be serialized already. From f626372b6780975a2c57c47f4d45bd9748e4d530 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Tue, 6 Oct 2020 15:41:14 +0200 Subject: [PATCH 22/49] serializers replaces is_serialized --- dask_cuda/device_host_file.py | 4 +- dask_cuda/proxy_object.py | 85 ++++++++++++++++++++++++++--------- dask_cuda/tests/test_proxy.py | 24 +++++----- 3 files changed, 79 insertions(+), 34 deletions(-) diff --git a/dask_cuda/device_host_file.py b/dask_cuda/device_host_file.py index deba17bec..126e92144 100644 --- a/dask_cuda/device_host_file.py +++ b/dask_cuda/device_host_file.py @@ -67,7 +67,9 @@ def host_to_device(s: DeviceSerialized) -> object: @nvtx_annotate("SPILL_D2H", color="red", domain="dask_cuda") def pxy_obj_device_to_host(obj: object) -> proxy_object.ObjectProxy: - return proxy_object.asproxy(obj, serialize_obj=True) + # Notice, both the "dask" and the "pickle" serializer will + # spill `obj` to main memory. + return proxy_object.asproxy(obj, serializers=["dask", "pickle"]) @nvtx_annotate("SPILL_H2D", color="green", domain="dask_cuda") diff --git a/dask_cuda/proxy_object.py b/dask_cuda/proxy_object.py index 317116a88..eeb593e2e 100644 --- a/dask_cuda/proxy_object.py +++ b/dask_cuda/proxy_object.py @@ -12,7 +12,20 @@ _FIXED_ATTRS = ["name"] -def asproxy(obj, serialize_obj=True, serializers=["dask", "pickle"]): +def asproxy(obj, serializers=None): + """Wrap `obj` in a ObjectProxy object if it isn't already. + + Parameters + ---------- + obj: object + Object to wrap in a ObjectProxy object. + serializers: List[Str], optional + List of serializers to use to serialize `obj`. If None, + no serialization is done. + ret: ObjectProxy + The proxy object proxing `obj` + """ + if hasattr(obj, "_obj_pxy"): return obj # Already a proxy object @@ -23,18 +36,15 @@ def asproxy(obj, serialize_obj=True, serializers=["dask", "pickle"]): except AttributeError: pass - orig_obj = obj - if serialize_obj: - obj = distributed.protocol.serialize(obj, serializers=serializers) - - return ObjectProxy( + ret = ObjectProxy( obj=obj, - is_serialized=serialize_obj, fixed_attr=fixed_attr, - type_serialized=pickle.dumps(type(orig_obj)), - typename=dask.utils.typename(type(orig_obj)), - serializers=serializers, + type_serialized=pickle.dumps(type(obj)), + typename=dask.utils.typename(type(obj)), ) + if serializers is not None: + ret._obj_pxy_serialize(serializers=serializers) + return ret class ObjectProxy: @@ -43,12 +53,9 @@ class ObjectProxy: "__obj_pxy_cache", # A dict used for caching attributes ] - def __init__( - self, obj, is_serialized, fixed_attr, type_serialized, typename, serializers - ): + def __init__(self, obj, fixed_attr, type_serialized, typename, serializers=None): self._obj_pxy = { "obj": obj, - "is_serialized": is_serialized, "fixed_attr": fixed_attr, "type_serialized": type_serialized, "typename": typename, @@ -57,22 +64,58 @@ def __init__( self.__obj_pxy_cache = {} def _obj_pxy_get_meta(self): + """Return the metadata of the proxy object. + + Returns + ------- + ret: dict + Dictionary of metadata + """ return {k: self._obj_pxy[k] for k in self._obj_pxy.keys() if k != "obj"} def _obj_pxy_serialize(self, serializers): - if not self._obj_pxy["is_serialized"]: + """Inplace serialization of the proxied object using the `serializers` + + Parameters + ---------- + serializers: List[Str] + List of serializers to use to serialize the proxied object. + + Returns + ------- + header: dict + The header of the serialized frames + frames: List[Bytes] + List of frames that makes up the serialized object + """ + if ( + self._obj_pxy["serializers"] is not None + and self._obj_pxy["serializers"] != serializers + ): + # The proxied object is serialized with other serializers + self._obj_pxy_deserialize() + + if self._obj_pxy["serializers"] is None: self._obj_pxy["obj"] = distributed.protocol.serialize( self._obj_pxy["obj"], serializers ) self._obj_pxy["serializers"] = serializers - self._obj_pxy["is_serialized"] = True + + assert serializers == self._obj_pxy["serializers"] return self._obj_pxy["obj"] def _obj_pxy_deserialize(self): - if self._obj_pxy["is_serialized"]: + """Inplace deserialization of the proxied object + + Returns + ------- + ret : object + The proxied object (deserialized) + """ + if self._obj_pxy["serializers"] is not None: header, frames = self._obj_pxy["obj"] self._obj_pxy["obj"] = distributed.protocol.deserialize(header, frames) - self._obj_pxy["is_serialized"] = False + self._obj_pxy["serializers"] = None return self._obj_pxy["obj"] def __getattr__(self, name): @@ -93,8 +136,8 @@ def __str__(self): def __repr__(self): typename = self._obj_pxy["typename"] ret = f"<{dask.utils.typename(type(self))} at {hex(id(self))} for {typename}" - if self._obj_pxy["is_serialized"]: - ret += " (serialized)>" + if self._obj_pxy["serializers"] is not None: + ret += f" (serialized={repr(self._obj_pxy['serializers'])})>" else: ret += f" at {hex(id(self._obj_pxy['obj']))}>" return ret @@ -140,7 +183,7 @@ def __class__(self): return ret def __sizeof__(self): - if self._obj_pxy["is_serialized"]: + if self._obj_pxy["serializers"] is not None: frames = self._obj_pxy["obj"][1] return sum(map(distributed.utils.nbytes, frames)) else: diff --git a/dask_cuda/tests/test_proxy.py b/dask_cuda/tests/test_proxy.py index 868cd5713..28d948541 100644 --- a/dask_cuda/tests/test_proxy.py +++ b/dask_cuda/tests/test_proxy.py @@ -10,12 +10,12 @@ from dask_cuda import proxy_object -@pytest.mark.parametrize("serialize_obj", [True, False]) -def test_proxy_object(serialize_obj): +@pytest.mark.parametrize("serializers", [None, ["dask", "pickle"]]) +def test_proxy_object(serializers): """Check "transparency" of the proxy object""" org = list(range(10)) - pxy = proxy_object.asproxy(org, serialize_obj=serialize_obj) + pxy = proxy_object.asproxy(org, serializers=serializers) assert len(org) == len(pxy) assert org[0] == pxy[0] @@ -25,29 +25,29 @@ def test_proxy_object(serialize_obj): # TODO: check operators (when implemented) -@pytest.mark.parametrize("serialize_obj", [True, False]) -def test_proxy_object_of_cudf(serialize_obj): +@pytest.mark.parametrize("serializers", [None, ["dask"]]) +def test_proxy_object_of_cudf(serializers): """Check that a proxied cudf dataframe is behaviors as a regular dataframe""" cudf = pytest.importorskip("cudf") df = cudf.DataFrame({"a": range(10)}) - pxy = proxy_object.asproxy(df, serialize_obj=serialize_obj) + pxy = proxy_object.asproxy(df, serializers=serializers) assert_frame_equal(df.to_pandas(), pxy.to_pandas()) -@pytest.mark.parametrize("serialize_obj", [True, False]) -@pytest.mark.parametrize("serializers", [["dask"], ["cuda"]]) -def test_serialize_of_proxied_cudf(serialize_obj, serializers): +@pytest.mark.parametrize("proxy_serializers", [None, ["dask"]]) +@pytest.mark.parametrize("dask_serializers", [["dask"], ["cuda"]]) +def test_serialize_of_proxied_cudf(proxy_serializers, dask_serializers): """Check that we can serialize a proxied cudf dataframe, which might be serialized already. """ cudf = pytest.importorskip("cudf") - if "cuda" in serializers: + if "cuda" in dask_serializers: pytest.skip("cuda serializer support not implemented") df = cudf.DataFrame({"a": range(10)}) - pxy = proxy_object.asproxy(df, serialize_obj=serialize_obj) - header, frames = serialize(pxy, serializers=serializers) + pxy = proxy_object.asproxy(df, serializers=proxy_serializers) + header, frames = serialize(pxy, serializers=dask_serializers) pxy = deserialize(header, frames) assert_frame_equal(df.to_pandas(), pxy.to_pandas()) From b35bed8d46d770f8267ffe2a8e7eb03093d3871d Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Tue, 6 Oct 2020 16:04:47 +0200 Subject: [PATCH 23/49] Supporting cuda serializers --- dask_cuda/proxy_object.py | 16 ++++++++++++++++ dask_cuda/tests/test_proxy.py | 5 +---- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/dask_cuda/proxy_object.py b/dask_cuda/proxy_object.py index eeb593e2e..4774b41aa 100644 --- a/dask_cuda/proxy_object.py +++ b/dask_cuda/proxy_object.py @@ -201,7 +201,23 @@ def obj_pxy_dask_serialize(obj: ObjectProxy): return {"proxied-header": header, "obj-pxy-meta": obj._obj_pxy_get_meta()}, frames +@distributed.protocol.cuda.cuda_serialize.register(ObjectProxy) +def obj_pxy_cuda_serialize(obj: ObjectProxy): + """ + The CUDA serialization of ObjectProxy used by Dask when communicating + ObjectProxy. As serializers, it uses "cuda", "dask" or "pickle", which means + that proxied CUDA objects are _not_ spilled to main memory if communicating + using UCX or another CUDA friendly communicantion library. + """ + if obj._obj_pxy["serializers"] is not None: # Already serialized + header, frames = obj._obj_pxy["obj"] + else: + header, frames = obj._obj_pxy_serialize(serializers=["cuda", "dask", "pickle"]) + return {"proxied-header": header, "obj-pxy-meta": obj._obj_pxy_get_meta()}, frames + + @distributed.protocol.dask_deserialize.register(ObjectProxy) +@distributed.protocol.cuda.cuda_deserialize.register(ObjectProxy) def obj_pxy_dask_deserialize(header, frames): """ The generic deserialization of ObjectProxy. Notice, it doesn't deserialize diff --git a/dask_cuda/tests/test_proxy.py b/dask_cuda/tests/test_proxy.py index 28d948541..3b63b2e87 100644 --- a/dask_cuda/tests/test_proxy.py +++ b/dask_cuda/tests/test_proxy.py @@ -34,7 +34,7 @@ def test_proxy_object_of_cudf(serializers): assert_frame_equal(df.to_pandas(), pxy.to_pandas()) -@pytest.mark.parametrize("proxy_serializers", [None, ["dask"]]) +@pytest.mark.parametrize("proxy_serializers", [None, ["dask"], ["cuda"]]) @pytest.mark.parametrize("dask_serializers", [["dask"], ["cuda"]]) def test_serialize_of_proxied_cudf(proxy_serializers, dask_serializers): """Check that we can serialize a proxied cudf dataframe, which might @@ -42,9 +42,6 @@ def test_serialize_of_proxied_cudf(proxy_serializers, dask_serializers): """ cudf = pytest.importorskip("cudf") - if "cuda" in dask_serializers: - pytest.skip("cuda serializer support not implemented") - df = cudf.DataFrame({"a": range(10)}) pxy = proxy_object.asproxy(df, serializers=proxy_serializers) header, frames = serialize(pxy, serializers=dask_serializers) From 7860fa72a7db4fdc34c769fe6eacb18db7c3d9e3 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Wed, 7 Oct 2020 14:05:37 +0200 Subject: [PATCH 24/49] Added a lot of operators --- dask_cuda/proxy_object.py | 204 +++++++++++++++++++++++++++++++--- dask_cuda/tests/test_proxy.py | 94 +++++++++++++++- 2 files changed, 281 insertions(+), 17 deletions(-) diff --git a/dask_cuda/proxy_object.py b/dask_cuda/proxy_object.py index 4774b41aa..8a468f025 100644 --- a/dask_cuda/proxy_object.py +++ b/dask_cuda/proxy_object.py @@ -1,9 +1,10 @@ +import operator import pickle import sys import dask -import dask.dataframe.utils import dask.dataframe.methods +import dask.dataframe.utils import distributed.protocol import distributed.utils @@ -142,6 +143,22 @@ def __repr__(self): ret += f" at {hex(id(self._obj_pxy['obj']))}>" return ret + @property + def __class__(self): + try: + return self.__obj_pxy_cache["type_serialized"] + except KeyError: + ret = pickle.loads(self._obj_pxy["type_serialized"]) + self.__obj_pxy_cache["type_serialized"] = ret + return ret + + def __sizeof__(self): + if self._obj_pxy["serializers"] is not None: + frames = self._obj_pxy["obj"][1] + return sum(map(distributed.utils.nbytes, frames)) + else: + return sys.getsizeof(self._obj_pxy_deserialize()) + def __len__(self): return len(self._obj_pxy_deserialize()) @@ -173,21 +190,178 @@ def __array__(self): ret = getattr(self._obj_pxy_deserialize(), "__array__")() return ret - @property - def __class__(self): - try: - return self.__obj_pxy_cache["type_serialized"] - except KeyError: - ret = pickle.loads(self._obj_pxy["type_serialized"]) - self.__obj_pxy_cache["type_serialized"] = ret - return ret + def __add__(self, other): + return self._obj_pxy_deserialize() + other - def __sizeof__(self): - if self._obj_pxy["serializers"] is not None: - frames = self._obj_pxy["obj"][1] - return sum(map(distributed.utils.nbytes, frames)) - else: - return sys.getsizeof(self._obj_pxy_deserialize()) + def __sub__(self, other): + return self._obj_pxy_deserialize() - other + + def __mul__(self, other): + return self._obj_pxy_deserialize() * other + + def __div__(self, other): + return operator.div(self._obj_pxy_deserialize(), other) + + def __truediv__(self, other): + return operator.truediv(self._obj_pxy_deserialize(), other) + + def __floordiv__(self, other): + return self._obj_pxy_deserialize() // other + + def __mod__(self, other): + return self._obj_pxy_deserialize() % other + + def __divmod__(self, other): + return divmod(self._obj_pxy_deserialize(), other) + + def __pow__(self, other, *args): + return pow(self._obj_pxy_deserialize(), other, *args) + + def __lshift__(self, other): + return self._obj_pxy_deserialize() << other + + def __rshift__(self, other): + return self._obj_pxy_deserialize() >> other + + def __and__(self, other): + return self._obj_pxy_deserialize() & other + + def __xor__(self, other): + return self._obj_pxy_deserialize() ^ other + + def __or__(self, other): + return self._obj_pxy_deserialize() | other + + def __radd__(self, other): + return other + self._obj_pxy_deserialize() + + def __rsub__(self, other): + return other - self._obj_pxy_deserialize() + + def __rmul__(self, other): + return other * self._obj_pxy_deserialize() + + def __rdiv__(self, other): + return operator.div(other, self._obj_pxy_deserialize()) + + def __rtruediv__(self, other): + return operator.truediv(other, self._obj_pxy_deserialize()) + + def __rfloordiv__(self, other): + return other // self._obj_pxy_deserialize() + + def __rmod__(self, other): + return other % self._obj_pxy_deserialize() + + def __rdivmod__(self, other): + return divmod(other, self._obj_pxy_deserialize()) + + def __rpow__(self, other, *args): + return pow(other, self._obj_pxy_deserialize(), *args) + + def __rlshift__(self, other): + return other << self._obj_pxy_deserialize() + + def __rrshift__(self, other): + return other >> self._obj_pxy_deserialize() + + def __rand__(self, other): + return other & self._obj_pxy_deserialize() + + def __rxor__(self, other): + return other ^ self._obj_pxy_deserialize() + + def __ror__(self, other): + return other | self._obj_pxy_deserialize() + + def __iadd__(self, other): + proxied = self._obj_pxy_deserialize() + proxied += other + return self + + def __isub__(self, other): + proxied = self._obj_pxy_deserialize() + proxied -= other + return self + + def __imul__(self, other): + proxied = self._obj_pxy_deserialize() + proxied *= other + return self + + def __idiv__(self, other): + proxied = self._obj_pxy_deserialize() + self._obj_pxy["obj"] = operator.idiv(proxied, other) + return self + + def __itruediv__(self, other): + proxied = self._obj_pxy_deserialize() + self._obj_pxy["obj"] = operator.itruediv(proxied, other) + return self + + def __ifloordiv__(self, other): + proxied = self._obj_pxy_deserialize() + proxied //= other + return self + + def __imod__(self, other): + proxied = self._obj_pxy_deserialize() + proxied %= other + return self + + def __ipow__(self, other): + proxied = self._obj_pxy_deserialize() + proxied **= other + return self + + def __ilshift__(self, other): + proxied = self._obj_pxy_deserialize() + proxied <<= other + return self + + def __irshift__(self, other): + proxied = self._obj_pxy_deserialize() + proxied >>= other + return self + + def __iand__(self, other): + proxied = self._obj_pxy_deserialize() + proxied &= other + return self + + def __ixor__(self, other): + proxied = self._obj_pxy_deserialize() + proxied ^= other + return self + + def __ior__(self, other): + proxied = self._obj_pxy_deserialize() + proxied |= other + return self + + def __neg__(self): + return -self._obj_pxy_deserialize() + + def __pos__(self): + return +self._obj_pxy_deserialize() + + def __abs__(self): + return abs(self._obj_pxy_deserialize()) + + def __invert__(self): + return ~self._obj_pxy_deserialize() + + def __int__(self): + return int(self._obj_pxy_deserialize()) + + def __float__(self): + return float(self._obj_pxy_deserialize()) + + def __complex__(self): + return complex(self._obj_pxy_deserialize()) + + def __index__(self): + return operator.index(self._obj_pxy_deserialize()) @distributed.protocol.dask_serialize.register(ObjectProxy) diff --git a/dask_cuda/tests/test_proxy.py b/dask_cuda/tests/test_proxy.py index 3b63b2e87..661d0095c 100644 --- a/dask_cuda/tests/test_proxy.py +++ b/dask_cuda/tests/test_proxy.py @@ -1,3 +1,5 @@ +import operator + import pytest from pandas.testing import assert_frame_equal @@ -22,12 +24,100 @@ def test_proxy_object(serializers): assert 1 in pxy assert -1 not in pxy - # TODO: check operators (when implemented) + +@pytest.mark.parametrize("serializers", [None, ["dask", "pickle"]]) +def test_proxy_object_of_numpy(serializers): + """Check that a proxied numpy array behaves as a regular dataframe""" + + np = pytest.importorskip("numpy") + + # Make sure that equality works, which we use to test the other operators + org = np.arange(10) + 1 + pxy = proxy_object.asproxy(org.copy(), serializers=serializers) + assert all(org == pxy) + assert all(org + 1 != pxy) + + # Check unary scalar operators + for op in [int, float, complex, operator.index, oct, hex]: + org = np.int64(42) + pxy = proxy_object.asproxy(org.copy(), serializers=serializers) + expect = op(org) + got = op(pxy) + assert type(expect) == type(got) + assert expect == got + + # Check unary operators + for op_str in ["neg", "pos", "abs", "inv"]: + op = getattr(operator, op_str) + org = np.arange(10) + 1 + pxy = proxy_object.asproxy(org.copy(), serializers=serializers) + expect = op(org) + got = op(pxy) + assert type(expect) == type(got) + assert all(expect == got) + + # Check binary operators that takes a scalar as second argument + for op_str in ["rshift", "lshift", "pow"]: + op = getattr(operator, op_str) + org = np.arange(10) + 1 + pxy = proxy_object.asproxy(org.copy(), serializers=serializers) + expect = op(org, 2) + got = op(pxy, 2) + assert type(expect) == type(got) + assert all(expect == got) + + # Check binary operators + for op_str in [ + "add", + "eq", + "floordiv", + "ge", + "gt", + "le", + "lshift", + "lt", + "mod", + "mul", + "ne", + "or_", + "sub", + "truediv", + "xor", + "iadd", + "ifloordiv", + "ilshift", + "imod", + "imul", + "isub", + "ixor", + ]: + op = getattr(operator, op_str) + org = np.arange(10) + 1 + pxy = proxy_object.asproxy(org.copy(), serializers=serializers) + expect = op(org.copy(), org) + got = op(org.copy(), pxy) + assert isinstance(got, type(expect)) + assert all(expect == got) + + expect = op(org.copy(), org) + got = op(pxy, org) + assert isinstance(got, type(expect)) + assert all(expect == got) + + # Check unary truth operators + for op_str in ["not_", "truth"]: + op = getattr(operator, op_str) + org = np.arange(1) + 1 + pxy = proxy_object.asproxy(org.copy(), serializers=serializers) + expect = op(org) + got = op(pxy) + assert type(expect) == type(got) + assert expect == got @pytest.mark.parametrize("serializers", [None, ["dask"]]) def test_proxy_object_of_cudf(serializers): - """Check that a proxied cudf dataframe is behaviors as a regular dataframe""" + """Check that a proxied cudf dataframe behaves as a regular dataframe""" cudf = pytest.importorskip("cudf") df = cudf.DataFrame({"a": range(10)}) pxy = proxy_object.asproxy(df, serializers=serializers) From 45152a004175ce5845124fc3c2a534c87f02072f Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Wed, 7 Oct 2020 14:30:01 +0200 Subject: [PATCH 25/49] fixed typos --- dask_cuda/proxy_object.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/dask_cuda/proxy_object.py b/dask_cuda/proxy_object.py index 8a468f025..df9a4307d 100644 --- a/dask_cuda/proxy_object.py +++ b/dask_cuda/proxy_object.py @@ -378,10 +378,10 @@ def obj_pxy_dask_serialize(obj: ObjectProxy): @distributed.protocol.cuda.cuda_serialize.register(ObjectProxy) def obj_pxy_cuda_serialize(obj: ObjectProxy): """ - The CUDA serialization of ObjectProxy used by Dask when communicating - ObjectProxy. As serializers, it uses "cuda", "dask" or "pickle", which means - that proxied CUDA objects are _not_ spilled to main memory if communicating - using UCX or another CUDA friendly communicantion library. + The CUDA serialization of ObjectProxy used by Dask when communicating using UCX + or another CUDA friendly communicantion library. As serializers, it uses "cuda", + "dask" or "pickle", which means that proxied CUDA objects are _not_ spilled to + main memory. """ if obj._obj_pxy["serializers"] is not None: # Already serialized header, frames = obj._obj_pxy["obj"] @@ -395,7 +395,7 @@ def obj_pxy_cuda_serialize(obj: ObjectProxy): def obj_pxy_dask_deserialize(header, frames): """ The generic deserialization of ObjectProxy. Notice, it doesn't deserialize - thr proxied object at this time. When accessed, the proxied object are + the proxied object at this time. When accessed, the proxied object are deserialized using the same serializers that were used when the object was serialized. """ From 1a4371354803fd22e8e64782fe3d5e9a07efc9eb Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Wed, 7 Oct 2020 16:14:18 +0200 Subject: [PATCH 26/49] Support and test of a proxy object of a proxy object --- dask_cuda/proxy_object.py | 33 +++++++++++++++++---------------- dask_cuda/tests/test_proxy.py | 16 ++++++++++++++++ 2 files changed, 33 insertions(+), 16 deletions(-) diff --git a/dask_cuda/proxy_object.py b/dask_cuda/proxy_object.py index df9a4307d..891d0e659 100644 --- a/dask_cuda/proxy_object.py +++ b/dask_cuda/proxy_object.py @@ -27,22 +27,22 @@ def asproxy(obj, serializers=None): The proxy object proxing `obj` """ - if hasattr(obj, "_obj_pxy"): - return obj # Already a proxy object - - fixed_attr = {} - for attr in _FIXED_ATTRS: - try: - fixed_attr[attr] = getattr(obj, attr) - except AttributeError: - pass - - ret = ObjectProxy( - obj=obj, - fixed_attr=fixed_attr, - type_serialized=pickle.dumps(type(obj)), - typename=dask.utils.typename(type(obj)), - ) + if hasattr(obj, "_obj_pxy"): # Already a proxy object + ret = obj + else: + fixed_attr = {} + for attr in _FIXED_ATTRS: + try: + fixed_attr[attr] = getattr(obj, attr) + except AttributeError: + pass + + ret = ObjectProxy( + obj=obj, + fixed_attr=fixed_attr, + type_serialized=pickle.dumps(type(obj)), + typename=dask.utils.typename(type(obj)), + ) if serializers is not None: ret._obj_pxy_serialize(serializers=serializers) return ret @@ -89,6 +89,7 @@ def _obj_pxy_serialize(self, serializers): frames: List[Bytes] List of frames that makes up the serialized object """ + assert serializers is not None if ( self._obj_pxy["serializers"] is not None and self._obj_pxy["serializers"] != serializers diff --git a/dask_cuda/tests/test_proxy.py b/dask_cuda/tests/test_proxy.py index 661d0095c..2f2c69492 100644 --- a/dask_cuda/tests/test_proxy.py +++ b/dask_cuda/tests/test_proxy.py @@ -25,6 +25,22 @@ def test_proxy_object(serializers): assert -1 not in pxy +@pytest.mark.parametrize("serializers_first", [None, ["dask", "pickle"]]) +@pytest.mark.parametrize("serializers_second", [None, ["dask", "pickle"]]) +def test_double_proxy_object(serializers_first, serializers_second): + """Check asproxy() when creating a proxy object of a proxy object""" + org = list(range(10)) + pxy1 = proxy_object.asproxy(org, serializers=serializers_first) + assert pxy1._obj_pxy["serializers"] == serializers_first + pxy2 = proxy_object.asproxy(pxy1, serializers=serializers_second) + if serializers_second is None: + # Check that `serializers=None` doesn't change the initial serializers + assert pxy2._obj_pxy["serializers"] == serializers_first + else: + assert pxy2._obj_pxy["serializers"] == serializers_second + assert pxy1 is pxy2 + + @pytest.mark.parametrize("serializers", [None, ["dask", "pickle"]]) def test_proxy_object_of_numpy(serializers): """Check that a proxied numpy array behaves as a regular dataframe""" From 0e80d3a3b6e74c777cbe828e2e61a562204debf2 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Thu, 8 Oct 2020 14:11:36 +0200 Subject: [PATCH 27/49] test_spilling_local_cuda_cluster(): added some extra checks --- dask_cuda/tests/test_proxy.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/dask_cuda/tests/test_proxy.py b/dask_cuda/tests/test_proxy.py index 2f2c69492..609cf1397 100644 --- a/dask_cuda/tests/test_proxy.py +++ b/dask_cuda/tests/test_proxy.py @@ -162,6 +162,12 @@ def test_spilling_local_cuda_cluster(jit_unspill): def task(x): assert isinstance(x, cudf.DataFrame) + if jit_unspill: + # Check that `x` is a proxy object and the proxied DataFrame is serialized + assert type(x) == proxy_object.ObjectProxy + assert x._obj_pxy_get_meta()["serializers"] == "['dask', 'pickle']" + else: + assert type(x) == cudf.DataFrame assert len(x) == 10 # Trigger deserialization return x From 0f270307eebf6517b497e3a1d1c06445f76862a6 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Fri, 9 Oct 2020 10:03:10 +0200 Subject: [PATCH 28/49] Added _obj_pxy_is_cuda_object() --- dask_cuda/proxy_object.py | 37 ++++++++++++++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/dask_cuda/proxy_object.py b/dask_cuda/proxy_object.py index 891d0e659..32bc86cab 100644 --- a/dask_cuda/proxy_object.py +++ b/dask_cuda/proxy_object.py @@ -8,6 +8,8 @@ import distributed.protocol import distributed.utils +from .is_device_object import is_device_object + # List of attributes that should be copied to the proxy at creation, which makes # them accessible without deserialization of the proxied object _FIXED_ATTRS = ["name"] @@ -23,6 +25,9 @@ def asproxy(obj, serializers=None): serializers: List[Str], optional List of serializers to use to serialize `obj`. If None, no serialization is done. + + Returns + ------- ret: ObjectProxy The proxy object proxing `obj` """ @@ -42,6 +47,7 @@ def asproxy(obj, serializers=None): fixed_attr=fixed_attr, type_serialized=pickle.dumps(type(obj)), typename=dask.utils.typename(type(obj)), + is_cuda_object=is_device_object(obj), ) if serializers is not None: ret._obj_pxy_serialize(serializers=serializers) @@ -54,12 +60,21 @@ class ObjectProxy: "__obj_pxy_cache", # A dict used for caching attributes ] - def __init__(self, obj, fixed_attr, type_serialized, typename, serializers=None): + def __init__( + self, + obj, + fixed_attr, + type_serialized, + typename, + is_cuda_object, + serializers=None, + ): self._obj_pxy = { "obj": obj, "fixed_attr": fixed_attr, "type_serialized": type_serialized, "typename": typename, + "is_cuda_object": is_cuda_object, "serializers": serializers, } self.__obj_pxy_cache = {} @@ -120,6 +135,16 @@ def _obj_pxy_deserialize(self): self._obj_pxy["serializers"] = None return self._obj_pxy["obj"] + def _obj_pxy_is_cuda_object(self): + """Inplace deserialization of the proxied object + + Returns + ------- + ret : object + The proxied object (deserialized) + """ + return self._obj_pxy["is_cuda_object"] + def __getattr__(self, name): typename = self._obj_pxy["typename"] if name in _FIXED_ATTRS: @@ -365,6 +390,16 @@ def __index__(self): return operator.index(self._obj_pxy_deserialize()) +@is_device_object.register(ObjectProxy) +def obj_pxy_is_device_object(obj: ObjectProxy): + """ + In order to avoid de-serializing the proxied object, we call + `_obj_pxy_is_cuda_object()` instead of the default + `hasattr(o, "__cuda_array_interface__")` check. + """ + return obj._obj_pxy_is_cuda_object() + + @distributed.protocol.dask_serialize.register(ObjectProxy) def obj_pxy_dask_serialize(obj: ObjectProxy): """ From cb2bafbbace55882352dcb09a13407fcf3d9f7bb Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Fri, 9 Oct 2020 10:22:05 +0200 Subject: [PATCH 29/49] asproxy(): added subclass argument --- dask_cuda/proxy_object.py | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/dask_cuda/proxy_object.py b/dask_cuda/proxy_object.py index 32bc86cab..c153a7170 100644 --- a/dask_cuda/proxy_object.py +++ b/dask_cuda/proxy_object.py @@ -15,7 +15,7 @@ _FIXED_ATTRS = ["name"] -def asproxy(obj, serializers=None): +def asproxy(obj, serializers=None, subclass=None): """Wrap `obj` in a ObjectProxy object if it isn't already. Parameters @@ -25,9 +25,9 @@ def asproxy(obj, serializers=None): serializers: List[Str], optional List of serializers to use to serialize `obj`. If None, no serialization is done. - - Returns - ------- + subclass: Class, optional + Specify a subclass of ObjectProxy to create instead of ObjectProxy. + `subclass` must be pickable. ret: ObjectProxy The proxy object proxing `obj` """ @@ -42,12 +42,16 @@ def asproxy(obj, serializers=None): except AttributeError: pass - ret = ObjectProxy( + if subclass is None: + subclass = ObjectProxy + ret = subclass( obj=obj, fixed_attr=fixed_attr, type_serialized=pickle.dumps(type(obj)), typename=dask.utils.typename(type(obj)), is_cuda_object=is_device_object(obj), + subclass=pickle.dumps(subclass) if subclass else None, + serializers=None, ) if serializers is not None: ret._obj_pxy_serialize(serializers=serializers) @@ -67,7 +71,8 @@ def __init__( type_serialized, typename, is_cuda_object, - serializers=None, + subclass, + serializers, ): self._obj_pxy = { "obj": obj, @@ -75,6 +80,7 @@ def __init__( "type_serialized": type_serialized, "typename": typename, "is_cuda_object": is_cuda_object, + "subclass": subclass, "serializers": serializers, } self.__obj_pxy_cache = {} @@ -435,7 +441,12 @@ def obj_pxy_dask_deserialize(header, frames): deserialized using the same serializers that were used when the object was serialized. """ - return ObjectProxy( + meta = header["obj-pxy-meta"] + if meta["subclass"] is None: + subclass = ObjectProxy + else: + subclass = pickle.loads(meta["subclass"]) + return subclass( obj=(header["proxied-header"], frames), **header["obj-pxy-meta"], ) From de7a49cd466caab2646137336b8569b58185c3da Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Fri, 9 Oct 2020 10:56:10 +0200 Subject: [PATCH 30/49] fixed type in test_spilling_local_cuda_cluster check --- dask_cuda/tests/test_proxy.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_cuda/tests/test_proxy.py b/dask_cuda/tests/test_proxy.py index 609cf1397..3c72b9e30 100644 --- a/dask_cuda/tests/test_proxy.py +++ b/dask_cuda/tests/test_proxy.py @@ -165,7 +165,7 @@ def task(x): if jit_unspill: # Check that `x` is a proxy object and the proxied DataFrame is serialized assert type(x) == proxy_object.ObjectProxy - assert x._obj_pxy_get_meta()["serializers"] == "['dask', 'pickle']" + assert x._obj_pxy_get_meta()["serializers"] == ['dask', 'pickle'] else: assert type(x) == cudf.DataFrame assert len(x) == 10 # Trigger deserialization From 95b31c48c480e84b07ada4153f95eb4b4c706f92 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Fri, 9 Oct 2020 12:26:43 +0200 Subject: [PATCH 31/49] Added test of communicating proxy objects --- dask_cuda/tests/test_proxy.py | 59 ++++++++++++++++++++++++++++++++++- 1 file changed, 58 insertions(+), 1 deletion(-) diff --git a/dask_cuda/tests/test_proxy.py b/dask_cuda/tests/test_proxy.py index 3c72b9e30..348886afb 100644 --- a/dask_cuda/tests/test_proxy.py +++ b/dask_cuda/tests/test_proxy.py @@ -165,7 +165,7 @@ def task(x): if jit_unspill: # Check that `x` is a proxy object and the proxied DataFrame is serialized assert type(x) == proxy_object.ObjectProxy - assert x._obj_pxy_get_meta()["serializers"] == ['dask', 'pickle'] + assert x._obj_pxy_get_meta()["serializers"] == ["dask", "pickle"] else: assert type(x) == cudf.DataFrame assert len(x) == 10 # Trigger deserialization @@ -181,3 +181,60 @@ def task(x): ddf = ddf.map_partitions(task, meta=df.head()) got = ddf.compute() assert_frame_equal(got.to_pandas(), df.to_pandas()) + + +class _PxyObjTest(proxy_object.ObjectProxy): + """ + A class that: + - defines `__dask_tokenize__` in order to avoid deserialization when + calling `client.scatter()` + - Asserts that no deserialization is performaned when communicating. + """ + + def __dask_tokenize__(self): + return 42 + + def _obj_pxy_deserialize(self): + if self.assert_on_deserializing: + assert self._obj_pxy["serializers"] is None + return super()._obj_pxy_deserialize() + + +@pytest.mark.parametrize("send_serializers", [None, ["dask", "pickle"], ["cuda"]]) +@pytest.mark.parametrize("protocol", ["tcp", "ucx"]) +def test_communicating_proxy_objects(protocol, send_serializers): + """Testing serialization of cuDF dataframe when communicating""" + cudf = pytest.importorskip("cudf") + + def task(x): + # Check that the subclass survives the trip from client to worker + assert isinstance(x, _PxyObjTest) + serializers_used = list(x._obj_pxy_get_meta()["serializers"]) + + # Check that `x` is serialized with the expected serializers + if protocol == "ucx": + if send_serializers is None: + assert serializers_used == ["cuda", "dask", "pickle"] + else: + assert serializers_used == send_serializers + else: + assert serializers_used == ["dask", "pickle"] + + with dask_cuda.LocalCUDACluster( + n_workers=1, protocol=protocol, enable_tcp_over_ucx=protocol == "ucx" + ) as cluster: + with Client(cluster) as client: + df = cudf.DataFrame({"a": range(10)}) + df = proxy_object.asproxy( + df, serializers=send_serializers, subclass=_PxyObjTest + ) + + # Notice, in one case we expect deserialization when communicating. + # Since "tcp" cannot send device memory directly, it will be re-serialized + # using the default dask serializers that spill the data to main memory. + if protocol == "tcp" and send_serializers == ["cuda"]: + df.assert_on_deserializing = False + else: + df.assert_on_deserializing = True + df = client.scatter(df) + client.submit(task, df).result() From 5d7ee69aaec19337f102252c791383fcc84b3b25 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Fri, 9 Oct 2020 12:48:02 +0200 Subject: [PATCH 32/49] Making ObjectProxy threadsafe --- dask_cuda/proxy_object.py | 123 +++++++++++++++++++++----------------- 1 file changed, 69 insertions(+), 54 deletions(-) diff --git a/dask_cuda/proxy_object.py b/dask_cuda/proxy_object.py index c153a7170..193dec6ca 100644 --- a/dask_cuda/proxy_object.py +++ b/dask_cuda/proxy_object.py @@ -1,6 +1,7 @@ import operator import pickle import sys +import threading import dask import dask.dataframe.methods @@ -61,6 +62,7 @@ def asproxy(obj, serializers=None, subclass=None): class ObjectProxy: __slots__ = [ "_obj_pxy", # A dict that holds the state of the proxy object + "_obj_pxy_lock", # Threading lock for all obj_pxy access "__obj_pxy_cache", # A dict used for caching attributes ] @@ -83,6 +85,7 @@ def __init__( "subclass": subclass, "serializers": serializers, } + self._obj_pxy_lock = threading.RLock() self.__obj_pxy_cache = {} def _obj_pxy_get_meta(self): @@ -93,7 +96,8 @@ def _obj_pxy_get_meta(self): ret: dict Dictionary of metadata """ - return {k: self._obj_pxy[k] for k in self._obj_pxy.keys() if k != "obj"} + with self._obj_pxy_lock: + return {k: self._obj_pxy[k] for k in self._obj_pxy.keys() if k != "obj"} def _obj_pxy_serialize(self, serializers): """Inplace serialization of the proxied object using the `serializers` @@ -110,22 +114,23 @@ def _obj_pxy_serialize(self, serializers): frames: List[Bytes] List of frames that makes up the serialized object """ - assert serializers is not None - if ( - self._obj_pxy["serializers"] is not None - and self._obj_pxy["serializers"] != serializers - ): - # The proxied object is serialized with other serializers - self._obj_pxy_deserialize() - - if self._obj_pxy["serializers"] is None: - self._obj_pxy["obj"] = distributed.protocol.serialize( - self._obj_pxy["obj"], serializers - ) - self._obj_pxy["serializers"] = serializers + with self._obj_pxy_lock: + assert serializers is not None + if ( + self._obj_pxy["serializers"] is not None + and self._obj_pxy["serializers"] != serializers + ): + # The proxied object is serialized with other serializers + self._obj_pxy_deserialize() + + if self._obj_pxy["serializers"] is None: + self._obj_pxy["obj"] = distributed.protocol.serialize( + self._obj_pxy["obj"], serializers + ) + self._obj_pxy["serializers"] = serializers - assert serializers == self._obj_pxy["serializers"] - return self._obj_pxy["obj"] + assert serializers == self._obj_pxy["serializers"] + return self._obj_pxy["obj"] def _obj_pxy_deserialize(self): """Inplace deserialization of the proxied object @@ -135,11 +140,12 @@ def _obj_pxy_deserialize(self): ret : object The proxied object (deserialized) """ - if self._obj_pxy["serializers"] is not None: - header, frames = self._obj_pxy["obj"] - self._obj_pxy["obj"] = distributed.protocol.deserialize(header, frames) - self._obj_pxy["serializers"] = None - return self._obj_pxy["obj"] + with self._obj_pxy_lock: + if self._obj_pxy["serializers"] is not None: + header, frames = self._obj_pxy["obj"] + self._obj_pxy["obj"] = distributed.protocol.deserialize(header, frames) + self._obj_pxy["serializers"] = None + return self._obj_pxy["obj"] def _obj_pxy_is_cuda_object(self): """Inplace deserialization of the proxied object @@ -149,47 +155,54 @@ def _obj_pxy_is_cuda_object(self): ret : object The proxied object (deserialized) """ - return self._obj_pxy["is_cuda_object"] + with self._obj_pxy_lock: + return self._obj_pxy["is_cuda_object"] def __getattr__(self, name): - typename = self._obj_pxy["typename"] - if name in _FIXED_ATTRS: - try: - return self._obj_pxy["fixed_attr"][name] - except KeyError: - raise AttributeError( - f"type object '{typename}' has no attribute '{name}'" - ) - - return getattr(self._obj_pxy_deserialize(), name) + with self._obj_pxy_lock: + typename = self._obj_pxy["typename"] + if name in _FIXED_ATTRS: + try: + return self._obj_pxy["fixed_attr"][name] + except KeyError: + raise AttributeError( + f"type object '{typename}' has no attribute '{name}'" + ) + + return getattr(self._obj_pxy_deserialize(), name) def __str__(self): return str(self._obj_pxy_deserialize()) def __repr__(self): - typename = self._obj_pxy["typename"] - ret = f"<{dask.utils.typename(type(self))} at {hex(id(self))} for {typename}" - if self._obj_pxy["serializers"] is not None: - ret += f" (serialized={repr(self._obj_pxy['serializers'])})>" - else: - ret += f" at {hex(id(self._obj_pxy['obj']))}>" - return ret + with self._obj_pxy_lock: + typename = self._obj_pxy["typename"] + ret = ( + f"<{dask.utils.typename(type(self))} at {hex(id(self))} for {typename}" + ) + if self._obj_pxy["serializers"] is not None: + ret += f" (serialized={repr(self._obj_pxy['serializers'])})>" + else: + ret += f" at {hex(id(self._obj_pxy['obj']))}>" + return ret @property def __class__(self): - try: - return self.__obj_pxy_cache["type_serialized"] - except KeyError: - ret = pickle.loads(self._obj_pxy["type_serialized"]) - self.__obj_pxy_cache["type_serialized"] = ret - return ret + with self._obj_pxy_lock: + try: + return self.__obj_pxy_cache["type_serialized"] + except KeyError: + ret = pickle.loads(self._obj_pxy["type_serialized"]) + self.__obj_pxy_cache["type_serialized"] = ret + return ret def __sizeof__(self): - if self._obj_pxy["serializers"] is not None: - frames = self._obj_pxy["obj"][1] - return sum(map(distributed.utils.nbytes, frames)) - else: - return sys.getsizeof(self._obj_pxy_deserialize()) + with self._obj_pxy_lock: + if self._obj_pxy["serializers"] is not None: + frames = self._obj_pxy["obj"][1] + return sum(map(distributed.utils.nbytes, frames)) + else: + return sys.getsizeof(self._obj_pxy_deserialize()) def __len__(self): return len(self._obj_pxy_deserialize()) @@ -322,13 +335,15 @@ def __imul__(self, other): return self def __idiv__(self, other): - proxied = self._obj_pxy_deserialize() - self._obj_pxy["obj"] = operator.idiv(proxied, other) + with self._obj_pxy_lock: + proxied = self._obj_pxy_deserialize() + self._obj_pxy["obj"] = operator.idiv(proxied, other) return self def __itruediv__(self, other): - proxied = self._obj_pxy_deserialize() - self._obj_pxy["obj"] = operator.itruediv(proxied, other) + with self._obj_pxy_lock: + proxied = self._obj_pxy_deserialize() + self._obj_pxy["obj"] = operator.itruediv(proxied, other) return self def __ifloordiv__(self, other): From 4c32ca61d16b72bde926602e59c0f0a0d501f05a Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Fri, 9 Oct 2020 13:00:01 +0200 Subject: [PATCH 33/49] renamed ObjectProxy => ProxyObject --- dask_cuda/device_host_file.py | 6 +-- dask_cuda/local_cuda_cluster.py | 2 +- dask_cuda/proxy_object.py | 52 ++++++++++++------------ dask_cuda/tests/test_device_host_file.py | 2 +- dask_cuda/tests/test_proxy.py | 4 +- 5 files changed, 33 insertions(+), 33 deletions(-) diff --git a/dask_cuda/device_host_file.py b/dask_cuda/device_host_file.py index 126e92144..97c43cbf0 100644 --- a/dask_cuda/device_host_file.py +++ b/dask_cuda/device_host_file.py @@ -66,14 +66,14 @@ def host_to_device(s: DeviceSerialized) -> object: @nvtx_annotate("SPILL_D2H", color="red", domain="dask_cuda") -def pxy_obj_device_to_host(obj: object) -> proxy_object.ObjectProxy: +def pxy_obj_device_to_host(obj: object) -> proxy_object.ProxyObject: # Notice, both the "dask" and the "pickle" serializer will # spill `obj` to main memory. return proxy_object.asproxy(obj, serializers=["dask", "pickle"]) @nvtx_annotate("SPILL_H2D", color="green", domain="dask_cuda") -def pxy_obj_host_to_device(s: proxy_object.ObjectProxy) -> object: +def pxy_obj_host_to_device(s: proxy_object.ProxyObject) -> object: # Notice, we do _not_ deserialize at this point. The proxy # object automatically deserialize just-in-time. return s @@ -100,7 +100,7 @@ class DeviceHostFile(ZictBase): local_directory: path Path where to store serialized objects on disk jit_unspill: bool - If True, enable just-in-time unspilling (see proxy_object.ObjectProxy). + If True, enable just-in-time unspilling (see proxy_object.ProxyObject). """ def __init__( diff --git a/dask_cuda/local_cuda_cluster.py b/dask_cuda/local_cuda_cluster.py index 8acafb28d..96b901a04 100644 --- a/dask_cuda/local_cuda_cluster.py +++ b/dask_cuda/local_cuda_cluster.py @@ -109,7 +109,7 @@ class LocalCUDACluster(LocalCluster): WARNING: managed memory is currently incompatible with NVLink, trying to enable both will result in an exception. jit_unspill: bool - If True, enable just-in-time unspilling (see proxy_object.ObjectProxy). + If True, enable just-in-time unspilling (see proxy_object.ProxyObject). Examples -------- diff --git a/dask_cuda/proxy_object.py b/dask_cuda/proxy_object.py index 193dec6ca..1f1e89ce2 100644 --- a/dask_cuda/proxy_object.py +++ b/dask_cuda/proxy_object.py @@ -17,19 +17,19 @@ def asproxy(obj, serializers=None, subclass=None): - """Wrap `obj` in a ObjectProxy object if it isn't already. + """Wrap `obj` in a ProxyObject object if it isn't already. Parameters ---------- obj: object - Object to wrap in a ObjectProxy object. + Object to wrap in a ProxyObject object. serializers: List[Str], optional List of serializers to use to serialize `obj`. If None, no serialization is done. subclass: Class, optional - Specify a subclass of ObjectProxy to create instead of ObjectProxy. + Specify a subclass of ProxyObject to create instead of ProxyObject. `subclass` must be pickable. - ret: ObjectProxy + ret: ProxyObject The proxy object proxing `obj` """ @@ -44,7 +44,7 @@ def asproxy(obj, serializers=None, subclass=None): pass if subclass is None: - subclass = ObjectProxy + subclass = ProxyObject ret = subclass( obj=obj, fixed_attr=fixed_attr, @@ -59,7 +59,7 @@ def asproxy(obj, serializers=None, subclass=None): return ret -class ObjectProxy: +class ProxyObject: __slots__ = [ "_obj_pxy", # A dict that holds the state of the proxy object "_obj_pxy_lock", # Threading lock for all obj_pxy access @@ -411,8 +411,8 @@ def __index__(self): return operator.index(self._obj_pxy_deserialize()) -@is_device_object.register(ObjectProxy) -def obj_pxy_is_device_object(obj: ObjectProxy): +@is_device_object.register(ProxyObject) +def obj_pxy_is_device_object(obj: ProxyObject): """ In order to avoid de-serializing the proxied object, we call `_obj_pxy_is_cuda_object()` instead of the default @@ -421,21 +421,21 @@ def obj_pxy_is_device_object(obj: ObjectProxy): return obj._obj_pxy_is_cuda_object() -@distributed.protocol.dask_serialize.register(ObjectProxy) -def obj_pxy_dask_serialize(obj: ObjectProxy): +@distributed.protocol.dask_serialize.register(ProxyObject) +def obj_pxy_dask_serialize(obj: ProxyObject): """ - The generic serialization of ObjectProxy used by Dask when communicating - ObjectProxy. As serializers, it uses "dask" or "pickle", which means + The generic serialization of ProxyObject used by Dask when communicating + ProxyObject. As serializers, it uses "dask" or "pickle", which means that proxied CUDA objects are spilled to main memory before communicated. """ header, frames = obj._obj_pxy_serialize(serializers=["dask", "pickle"]) return {"proxied-header": header, "obj-pxy-meta": obj._obj_pxy_get_meta()}, frames -@distributed.protocol.cuda.cuda_serialize.register(ObjectProxy) -def obj_pxy_cuda_serialize(obj: ObjectProxy): +@distributed.protocol.cuda.cuda_serialize.register(ProxyObject) +def obj_pxy_cuda_serialize(obj: ProxyObject): """ - The CUDA serialization of ObjectProxy used by Dask when communicating using UCX + The CUDA serialization of ProxyObject used by Dask when communicating using UCX or another CUDA friendly communicantion library. As serializers, it uses "cuda", "dask" or "pickle", which means that proxied CUDA objects are _not_ spilled to main memory. @@ -447,18 +447,18 @@ def obj_pxy_cuda_serialize(obj: ObjectProxy): return {"proxied-header": header, "obj-pxy-meta": obj._obj_pxy_get_meta()}, frames -@distributed.protocol.dask_deserialize.register(ObjectProxy) -@distributed.protocol.cuda.cuda_deserialize.register(ObjectProxy) +@distributed.protocol.dask_deserialize.register(ProxyObject) +@distributed.protocol.cuda.cuda_deserialize.register(ProxyObject) def obj_pxy_dask_deserialize(header, frames): """ - The generic deserialization of ObjectProxy. Notice, it doesn't deserialize + The generic deserialization of ProxyObject. Notice, it doesn't deserialize the proxied object at this time. When accessed, the proxied object are deserialized using the same serializers that were used when the object was serialized. """ meta = header["obj-pxy-meta"] if meta["subclass"] is None: - subclass = ObjectProxy + subclass = ProxyObject else: subclass = pickle.loads(meta["subclass"]) return subclass( @@ -467,24 +467,24 @@ def obj_pxy_dask_deserialize(header, frames): ) -@dask.dataframe.utils.hash_object_dispatch.register(ObjectProxy) -def obj_pxy_hash_object(obj: ObjectProxy, index=True): +@dask.dataframe.utils.hash_object_dispatch.register(ProxyObject) +def obj_pxy_hash_object(obj: ProxyObject, index=True): return dask.dataframe.utils.hash_object_dispatch(obj._obj_pxy_deserialize(), index) -@dask.dataframe.utils.group_split_dispatch.register(ObjectProxy) -def obj_pxy_group_split(obj: ObjectProxy, c, k, ignore_index=False): +@dask.dataframe.utils.group_split_dispatch.register(ProxyObject) +def obj_pxy_group_split(obj: ProxyObject, c, k, ignore_index=False): return dask.dataframe.utils.group_split_dispatch( obj._obj_pxy_deserialize(), c, k, ignore_index ) -@dask.dataframe.utils.make_scalar.register(ObjectProxy) -def obj_pxy_make_scalar(obj: ObjectProxy): +@dask.dataframe.utils.make_scalar.register(ProxyObject) +def obj_pxy_make_scalar(obj: ProxyObject): return dask.dataframe.utils.make_scalar(obj._obj_pxy_deserialize()) -@dask.dataframe.methods.concat_dispatch.register(ObjectProxy) +@dask.dataframe.methods.concat_dispatch.register(ProxyObject) def obj_pxy_concat(objs, *args, **kwargs): # Deserialize concat inputs (in-place) for i in range(len(objs)): diff --git a/dask_cuda/tests/test_device_host_file.py b/dask_cuda/tests/test_device_host_file.py index ab43354d0..2b26320de 100644 --- a/dask_cuda/tests/test_device_host_file.py +++ b/dask_cuda/tests/test_device_host_file.py @@ -23,7 +23,7 @@ def assert_eq(x, y): - # Explicitly calling "cupy.asnumpy" to support `ObjectProxy` because + # Explicitly calling "cupy.asnumpy" to support `ProxyObject` because # "cupy" is hardcoded in `dask.array.normalize_to_array()` return dask.array.assert_eq(cupy.asnumpy(x), cupy.asnumpy(y)) diff --git a/dask_cuda/tests/test_proxy.py b/dask_cuda/tests/test_proxy.py index 348886afb..5e308823e 100644 --- a/dask_cuda/tests/test_proxy.py +++ b/dask_cuda/tests/test_proxy.py @@ -164,7 +164,7 @@ def task(x): assert isinstance(x, cudf.DataFrame) if jit_unspill: # Check that `x` is a proxy object and the proxied DataFrame is serialized - assert type(x) == proxy_object.ObjectProxy + assert type(x) == proxy_object.ProxyObject assert x._obj_pxy_get_meta()["serializers"] == ["dask", "pickle"] else: assert type(x) == cudf.DataFrame @@ -183,7 +183,7 @@ def task(x): assert_frame_equal(got.to_pandas(), df.to_pandas()) -class _PxyObjTest(proxy_object.ObjectProxy): +class _PxyObjTest(proxy_object.ProxyObject): """ A class that: - defines `__dask_tokenize__` in order to avoid deserialization when From f02835efe06176446cba374825df922ef0f191ed Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Mon, 12 Oct 2020 04:06:26 -0700 Subject: [PATCH 34/49] Never re-serialize proxy objects --- dask_cuda/device_host_file.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/dask_cuda/device_host_file.py b/dask_cuda/device_host_file.py index 97c43cbf0..ae4a34886 100644 --- a/dask_cuda/device_host_file.py +++ b/dask_cuda/device_host_file.py @@ -67,6 +67,13 @@ def host_to_device(s: DeviceSerialized) -> object: @nvtx_annotate("SPILL_D2H", color="red", domain="dask_cuda") def pxy_obj_device_to_host(obj: object) -> proxy_object.ProxyObject: + try: + # Never re-serialize proxy objects. + if obj._obj_pxy["serializers"] is None: + return obj + except (KeyError, AttributeError): + pass + # Notice, both the "dask" and the "pickle" serializer will # spill `obj` to main memory. return proxy_object.asproxy(obj, serializers=["dask", "pickle"]) From 76638aecacdda63b3dbd069958a957e073becf02 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Tue, 10 Nov 2020 15:09:15 +0100 Subject: [PATCH 35/49] Test: setting device_memory_limit="1B" to force serialization --- dask_cuda/tests/test_proxy.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dask_cuda/tests/test_proxy.py b/dask_cuda/tests/test_proxy.py index 5e308823e..8eb4164c7 100644 --- a/dask_cuda/tests/test_proxy.py +++ b/dask_cuda/tests/test_proxy.py @@ -164,16 +164,16 @@ def task(x): assert isinstance(x, cudf.DataFrame) if jit_unspill: # Check that `x` is a proxy object and the proxied DataFrame is serialized - assert type(x) == proxy_object.ProxyObject + assert type(x) is proxy_object.ProxyObject assert x._obj_pxy_get_meta()["serializers"] == ["dask", "pickle"] else: assert type(x) == cudf.DataFrame assert len(x) == 10 # Trigger deserialization return x - # Notice, setting `device_memory_limit=1` to trigger spilling + # Notice, setting `device_memory_limit=1B` to trigger spilling with dask_cuda.LocalCUDACluster( - n_workers=1, device_memory_limit=1, jit_unspill=jit_unspill + n_workers=1, device_memory_limit="1B", jit_unspill=jit_unspill ) as cluster: with Client(cluster): df = cudf.DataFrame({"a": range(10)}) From 56f0546f3c492dc26bb11bf4cbae4deb5ce1bd27 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Wed, 11 Nov 2020 10:20:22 +0100 Subject: [PATCH 36/49] test: added an explicit client shutdown --- dask_cuda/tests/test_proxy.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dask_cuda/tests/test_proxy.py b/dask_cuda/tests/test_proxy.py index 8eb4164c7..c53e84b7c 100644 --- a/dask_cuda/tests/test_proxy.py +++ b/dask_cuda/tests/test_proxy.py @@ -238,3 +238,4 @@ def task(x): df.assert_on_deserializing = True df = client.scatter(df) client.submit(task, df).result() + client.shutdown() # Avoids a UCX shutdown error From a86ca447678418104c765fe453201294de099388 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Wed, 11 Nov 2020 14:52:22 +0100 Subject: [PATCH 37/49] Added some str/repr tests --- dask_cuda/tests/test_proxy.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/dask_cuda/tests/test_proxy.py b/dask_cuda/tests/test_proxy.py index c53e84b7c..3b02eb24c 100644 --- a/dask_cuda/tests/test_proxy.py +++ b/dask_cuda/tests/test_proxy.py @@ -23,6 +23,13 @@ def test_proxy_object(serializers): assert org[0] == pxy[0] assert 1 in pxy assert -1 not in pxy + assert str(org) == str(pxy) + assert "dask_cuda.proxy_object.ProxyObject at " in repr(pxy) + assert "list at " in repr(pxy) + + pxy._obj_pxy_serialize(serializers=["dask", "pickle"]) + assert "dask_cuda.proxy_object.ProxyObject at " in repr(pxy) + assert "list (serialized=['dask', 'pickle'])" in repr(pxy) @pytest.mark.parametrize("serializers_first", [None, ["dask", "pickle"]]) From eafef51a511cf001796b4960cc7a528ebed43313 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Wed, 11 Nov 2020 15:09:43 +0100 Subject: [PATCH 38/49] clean up --- dask_cuda/proxy_object.py | 12 ------------ dask_cuda/tests/test_proxy.py | 4 ++++ 2 files changed, 4 insertions(+), 12 deletions(-) diff --git a/dask_cuda/proxy_object.py b/dask_cuda/proxy_object.py index 1f1e89ce2..6558128b5 100644 --- a/dask_cuda/proxy_object.py +++ b/dask_cuda/proxy_object.py @@ -244,9 +244,6 @@ def __sub__(self, other): def __mul__(self, other): return self._obj_pxy_deserialize() * other - def __div__(self, other): - return operator.div(self._obj_pxy_deserialize(), other) - def __truediv__(self, other): return operator.truediv(self._obj_pxy_deserialize(), other) @@ -286,9 +283,6 @@ def __rsub__(self, other): def __rmul__(self, other): return other * self._obj_pxy_deserialize() - def __rdiv__(self, other): - return operator.div(other, self._obj_pxy_deserialize()) - def __rtruediv__(self, other): return operator.truediv(other, self._obj_pxy_deserialize()) @@ -334,12 +328,6 @@ def __imul__(self, other): proxied *= other return self - def __idiv__(self, other): - with self._obj_pxy_lock: - proxied = self._obj_pxy_deserialize() - self._obj_pxy["obj"] = operator.idiv(proxied, other) - return self - def __itruediv__(self, other): with self._obj_pxy_lock: proxied = self._obj_pxy_deserialize() diff --git a/dask_cuda/tests/test_proxy.py b/dask_cuda/tests/test_proxy.py index 3b02eb24c..66f117656 100644 --- a/dask_cuda/tests/test_proxy.py +++ b/dask_cuda/tests/test_proxy.py @@ -107,8 +107,12 @@ def test_proxy_object_of_numpy(serializers): "truediv", "xor", "iadd", + "ior", + "iand", "ifloordiv", "ilshift", + "irshift", + "ipow", "imod", "imul", "isub", From deb2f58b1ef21078ee6ab761268e637e8f55f3e6 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Thu, 12 Nov 2020 15:42:56 +0100 Subject: [PATCH 39/49] added some more checks in test_proxy_object_of_numpy --- dask_cuda/proxy_object.py | 3 +-- dask_cuda/tests/test_proxy.py | 21 +++++++++++++++++++++ 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/dask_cuda/proxy_object.py b/dask_cuda/proxy_object.py index 6558128b5..5a7de5caa 100644 --- a/dask_cuda/proxy_object.py +++ b/dask_cuda/proxy_object.py @@ -232,8 +232,7 @@ def __iter__(self): return iter(self._obj_pxy_deserialize()) def __array__(self): - ret = getattr(self._obj_pxy_deserialize(), "__array__")() - return ret + return getattr(self._obj_pxy_deserialize(), "__array__")() def __add__(self, other): return self._obj_pxy_deserialize() + other diff --git a/dask_cuda/tests/test_proxy.py b/dask_cuda/tests/test_proxy.py index 66f117656..92c76ae6e 100644 --- a/dask_cuda/tests/test_proxy.py +++ b/dask_cuda/tests/test_proxy.py @@ -141,6 +141,27 @@ def test_proxy_object_of_numpy(serializers): assert type(expect) == type(got) assert expect == got + # Check reflected methods + for op_str in [ + "__radd__", + "__rsub__", + "__rmul__", + "__rtruediv__", + "__rfloordiv__", + "__rmod__", + "__rpow__", + "__rlshift__", + "__rrshift__", + "__rxor__", + "__ror__", + ]: + org = np.arange(10) + 1 + pxy = proxy_object.asproxy(org.copy(), serializers=serializers) + expect = getattr(org, op_str)(org) + got = getattr(org, op_str)(pxy) + assert isinstance(got, type(expect)) + assert all(expect == got) + @pytest.mark.parametrize("serializers", [None, ["dask"]]) def test_proxy_object_of_cudf(serializers): From 3a227559fdad7f2e090c2a7c25f722bb77ca5323 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Thu, 12 Nov 2020 20:01:52 +0100 Subject: [PATCH 40/49] ProxyObject: added docs --- dask_cuda/proxy_object.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/dask_cuda/proxy_object.py b/dask_cuda/proxy_object.py index 5a7de5caa..24b680525 100644 --- a/dask_cuda/proxy_object.py +++ b/dask_cuda/proxy_object.py @@ -60,6 +60,26 @@ def asproxy(obj, serializers=None, subclass=None): class ProxyObject: + """Object wrapper/proxy for serializable objects + + This is used by DeviceHostFile to delay deserialization of returned objects. + An objects proxied by an instance of this class will JIT-deserialized when + accessed. The instance behaves as the proxied object and can be accessed/used + just like the proxied object. + + Notice + ------ + Type checking using instance() works as expected but direct type checking + doesn't: + >>> import numpy as np + >>> from dask_cuda.proxy_object import asproxy + >>> x = np.arange(3) + >>> isinstance(asproxy(x), type(x)) + True + >>> type(asproxy(x)) is type(x) + False + """ + __slots__ = [ "_obj_pxy", # A dict that holds the state of the proxy object "_obj_pxy_lock", # Threading lock for all obj_pxy access From 9a76b8e43278c22fe46b4c6d9cc65711f7755d93 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Fri, 13 Nov 2020 09:40:32 +0100 Subject: [PATCH 41/49] added unproxy() --- dask_cuda/proxy_object.py | 25 +++++++++++++++++++++++++ dask_cuda/tests/test_proxy.py | 3 +++ 2 files changed, 28 insertions(+) diff --git a/dask_cuda/proxy_object.py b/dask_cuda/proxy_object.py index 24b680525..bba3e2aa3 100644 --- a/dask_cuda/proxy_object.py +++ b/dask_cuda/proxy_object.py @@ -29,6 +29,9 @@ def asproxy(obj, serializers=None, subclass=None): subclass: Class, optional Specify a subclass of ProxyObject to create instead of ProxyObject. `subclass` must be pickable. + + Returns + ------- ret: ProxyObject The proxy object proxing `obj` """ @@ -59,6 +62,28 @@ def asproxy(obj, serializers=None, subclass=None): return ret +def unproxy(obj): + """Unwrap ProxyObject objects and pass-through anything else. + + Use this function to retrieve the proxied object. + + Parameters + ---------- + obj: object + Any kind of object + + Returns + ------- + ret: object + The proxied object or `obj` itself if it isn't a ProxyObject + """ + try: + obj = obj._obj_pxy_deserialize() + except AttributeError: + pass + return obj + + class ProxyObject: """Object wrapper/proxy for serializable objects diff --git a/dask_cuda/tests/test_proxy.py b/dask_cuda/tests/test_proxy.py index 92c76ae6e..469693df2 100644 --- a/dask_cuda/tests/test_proxy.py +++ b/dask_cuda/tests/test_proxy.py @@ -31,6 +31,9 @@ def test_proxy_object(serializers): assert "dask_cuda.proxy_object.ProxyObject at " in repr(pxy) assert "list (serialized=['dask', 'pickle'])" in repr(pxy) + assert org == proxy_object.unproxy(pxy) + assert org == proxy_object.unproxy(org) + @pytest.mark.parametrize("serializers_first", [None, ["dask", "pickle"]]) @pytest.mark.parametrize("serializers_second", [None, ["dask", "pickle"]]) From b1f933159cb63afc7d6eb57d1e9961069716a7c8 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Fri, 13 Nov 2020 10:12:35 +0100 Subject: [PATCH 42/49] Added docs --- dask_cuda/proxy_object.py | 40 ++++++++++++++++++++++++++++++--------- 1 file changed, 31 insertions(+), 9 deletions(-) diff --git a/dask_cuda/proxy_object.py b/dask_cuda/proxy_object.py index bba3e2aa3..ceb4c972b 100644 --- a/dask_cuda/proxy_object.py +++ b/dask_cuda/proxy_object.py @@ -92,17 +92,39 @@ class ProxyObject: accessed. The instance behaves as the proxied object and can be accessed/used just like the proxied object. - Notice - ------ + ProxyObject has some limitations and doesn't mimic the proxied object perfectly. + Thus, if encountering problems remember that is always possible to use unproxy() + to access the proxied object directly or disable `jit_unspill=False` completely. + Type checking using instance() works as expected but direct type checking doesn't: - >>> import numpy as np - >>> from dask_cuda.proxy_object import asproxy - >>> x = np.arange(3) - >>> isinstance(asproxy(x), type(x)) - True - >>> type(asproxy(x)) is type(x) - False + >>> import numpy as np + >>> from dask_cuda.proxy_object import asproxy + >>> x = np.arange(3) + >>> isinstance(asproxy(x), type(x)) + True + >>> type(asproxy(x)) is type(x) + False + + Parameters + ---------- + obj: object + Any kind of object to be proxied. + fixed_attr: Dict + Dictionary of attributes that are accessable without deserialization + the proxied object. + type_serialized: bytes + Pickled type of `obj`. + typename: str + Name of the type of `obj`. + is_cuda_object: boolean + Whether `obj` is a CUDA object or not. + subclass: bytes + Pickled type to use instead of ProxyObject when deserializing. The type + must inherit from ProxyObject. + serializers: List[Str], optional + List of serializers to used to serialize `obj`. If None, `obj` + isn't serialized. """ __slots__ = [ From bd8e745c0c3ea14d01e4045231f7ab867ee9a03e Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Fri, 13 Nov 2020 10:25:59 +0100 Subject: [PATCH 43/49] Added ValueError when serializers isn't specified --- dask_cuda/proxy_object.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/dask_cuda/proxy_object.py b/dask_cuda/proxy_object.py index ceb4c972b..39e34de3f 100644 --- a/dask_cuda/proxy_object.py +++ b/dask_cuda/proxy_object.py @@ -181,8 +181,10 @@ def _obj_pxy_serialize(self, serializers): frames: List[Bytes] List of frames that makes up the serialized object """ + if not serializers: + raise ValueError("Please specify a list of serializers") + with self._obj_pxy_lock: - assert serializers is not None if ( self._obj_pxy["serializers"] is not None and self._obj_pxy["serializers"] != serializers @@ -196,7 +198,6 @@ def _obj_pxy_serialize(self, serializers): ) self._obj_pxy["serializers"] = serializers - assert serializers == self._obj_pxy["serializers"] return self._obj_pxy["obj"] def _obj_pxy_deserialize(self): From f8ea867991848fcb9f19627eb9ed5e624a90c200 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Fri, 13 Nov 2020 10:56:52 +0100 Subject: [PATCH 44/49] typo --- dask_cuda/proxy_object.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_cuda/proxy_object.py b/dask_cuda/proxy_object.py index 39e34de3f..e4e2f7f01 100644 --- a/dask_cuda/proxy_object.py +++ b/dask_cuda/proxy_object.py @@ -93,7 +93,7 @@ class ProxyObject: just like the proxied object. ProxyObject has some limitations and doesn't mimic the proxied object perfectly. - Thus, if encountering problems remember that is always possible to use unproxy() + Thus, if encountering problems remember that it is always possible to use unproxy() to access the proxied object directly or disable `jit_unspill=False` completely. Type checking using instance() works as expected but direct type checking From 7a73f35197a69fcc0d04a08642bc377870d7d840 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Fri, 13 Nov 2020 12:54:40 +0100 Subject: [PATCH 45/49] Style and spelling fixes Co-authored-by: Peter Andreas Entschev --- dask_cuda/proxy_object.py | 39 ++++++++++++++++------------------- dask_cuda/tests/test_proxy.py | 2 +- 2 files changed, 19 insertions(+), 22 deletions(-) diff --git a/dask_cuda/proxy_object.py b/dask_cuda/proxy_object.py index e4e2f7f01..260c6b500 100644 --- a/dask_cuda/proxy_object.py +++ b/dask_cuda/proxy_object.py @@ -23,17 +23,16 @@ def asproxy(obj, serializers=None, subclass=None): ---------- obj: object Object to wrap in a ProxyObject object. - serializers: List[Str], optional + serializers: list(str), optional List of serializers to use to serialize `obj`. If None, no serialization is done. - subclass: Class, optional + subclass: class, optional Specify a subclass of ProxyObject to create instead of ProxyObject. `subclass` must be pickable. Returns ------- - ret: ProxyObject - The proxy object proxing `obj` + The ProxyObject proxying `obj` """ if hasattr(obj, "_obj_pxy"): # Already a proxy object @@ -74,8 +73,7 @@ def unproxy(obj): Returns ------- - ret: object - The proxied object or `obj` itself if it isn't a ProxyObject + The proxied object or `obj` itself if it isn't a ProxyObject """ try: obj = obj._obj_pxy_deserialize() @@ -88,13 +86,14 @@ class ProxyObject: """Object wrapper/proxy for serializable objects This is used by DeviceHostFile to delay deserialization of returned objects. - An objects proxied by an instance of this class will JIT-deserialized when + Objects proxied by an instance of this class will be JIT-deserialized when accessed. The instance behaves as the proxied object and can be accessed/used just like the proxied object. ProxyObject has some limitations and doesn't mimic the proxied object perfectly. Thus, if encountering problems remember that it is always possible to use unproxy() - to access the proxied object directly or disable `jit_unspill=False` completely. + to access the proxied object directly or disable JIT deserialization completely + with `jit_unspill=False`. Type checking using instance() works as expected but direct type checking doesn't: @@ -110,8 +109,8 @@ class ProxyObject: ---------- obj: object Any kind of object to be proxied. - fixed_attr: Dict - Dictionary of attributes that are accessable without deserialization + fixed_attr: dict + Dictionary of attributes that are accessible without deserializing the proxied object. type_serialized: bytes Pickled type of `obj`. @@ -122,8 +121,8 @@ class ProxyObject: subclass: bytes Pickled type to use instead of ProxyObject when deserializing. The type must inherit from ProxyObject. - serializers: List[Str], optional - List of serializers to used to serialize `obj`. If None, `obj` + serializers: list(str), optional + List of serializers to use to serialize `obj`. If None, `obj` isn't serialized. """ @@ -160,8 +159,7 @@ def _obj_pxy_get_meta(self): Returns ------- - ret: dict - Dictionary of metadata + Dictionary of metadata """ with self._obj_pxy_lock: return {k: self._obj_pxy[k] for k in self._obj_pxy.keys() if k != "obj"} @@ -171,15 +169,15 @@ def _obj_pxy_serialize(self, serializers): Parameters ---------- - serializers: List[Str] + serializers: list(str) List of serializers to use to serialize the proxied object. Returns ------- header: dict The header of the serialized frames - frames: List[Bytes] - List of frames that makes up the serialized object + frames: list(bytes) + List of frames that make up the serialized object """ if not serializers: raise ValueError("Please specify a list of serializers") @@ -205,7 +203,7 @@ def _obj_pxy_deserialize(self): Returns ------- - ret : object + object The proxied object (deserialized) """ with self._obj_pxy_lock: @@ -220,8 +218,7 @@ def _obj_pxy_is_cuda_object(self): Returns ------- - ret : object - The proxied object (deserialized) + The deserialized proxied object """ with self._obj_pxy_lock: return self._obj_pxy["is_cuda_object"] @@ -491,7 +488,7 @@ def obj_pxy_dask_serialize(obj: ProxyObject): def obj_pxy_cuda_serialize(obj: ProxyObject): """ The CUDA serialization of ProxyObject used by Dask when communicating using UCX - or another CUDA friendly communicantion library. As serializers, it uses "cuda", + or another CUDA friendly communication library. As serializers, it uses "cuda", "dask" or "pickle", which means that proxied CUDA objects are _not_ spilled to main memory. """ diff --git a/dask_cuda/tests/test_proxy.py b/dask_cuda/tests/test_proxy.py index 469693df2..9649a0cfd 100644 --- a/dask_cuda/tests/test_proxy.py +++ b/dask_cuda/tests/test_proxy.py @@ -192,7 +192,7 @@ def test_serialize_of_proxied_cudf(proxy_serializers, dask_serializers): @pytest.mark.parametrize("jit_unspill", [True, False]) def test_spilling_local_cuda_cluster(jit_unspill): - """Testing spelling of a proxied cudf dataframe in a local cuda cluster""" + """Testing spilling of a proxied cudf dataframe in a local cuda cluster""" cudf = pytest.importorskip("cudf") def task(x): From e0fb226f015ddea8d0a1ee5ca6d31e1768fe8729 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Fri, 20 Nov 2020 05:11:32 -0800 Subject: [PATCH 46/49] docs --- dask_cuda/proxy_object.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dask_cuda/proxy_object.py b/dask_cuda/proxy_object.py index 1f1e89ce2..cd6ddf694 100644 --- a/dask_cuda/proxy_object.py +++ b/dask_cuda/proxy_object.py @@ -148,12 +148,12 @@ def _obj_pxy_deserialize(self): return self._obj_pxy["obj"] def _obj_pxy_is_cuda_object(self): - """Inplace deserialization of the proxied object + """Return whether the proxied object is a CUDA or not Returns ------- - ret : object - The proxied object (deserialized) + ret : boolean + Is the proxied object a CUDA object? """ with self._obj_pxy_lock: return self._obj_pxy["is_cuda_object"] From b565bc551f43cbf4b7d80435f71e209d43d22c34 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Fri, 20 Nov 2020 05:36:50 -0800 Subject: [PATCH 47/49] ProxyObject.__sizeof__(): use dask.sizeof() --- dask_cuda/proxy_object.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dask_cuda/proxy_object.py b/dask_cuda/proxy_object.py index cd6ddf694..f902ce150 100644 --- a/dask_cuda/proxy_object.py +++ b/dask_cuda/proxy_object.py @@ -1,6 +1,5 @@ import operator import pickle -import sys import threading import dask @@ -8,6 +7,7 @@ import dask.dataframe.utils import distributed.protocol import distributed.utils +from dask.sizeof import sizeof from .is_device_object import is_device_object @@ -202,7 +202,7 @@ def __sizeof__(self): frames = self._obj_pxy["obj"][1] return sum(map(distributed.utils.nbytes, frames)) else: - return sys.getsizeof(self._obj_pxy_deserialize()) + return sizeof(self._obj_pxy_deserialize()) def __len__(self): return len(self._obj_pxy_deserialize()) From dbcf68d2419c044ef70c48c6b4eba7b0914efdaf Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Fri, 20 Nov 2020 08:15:11 -0800 Subject: [PATCH 48/49] Serializer: convert to tuples before comparing --- dask_cuda/proxy_object.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_cuda/proxy_object.py b/dask_cuda/proxy_object.py index 97dc11af8..24e15622f 100644 --- a/dask_cuda/proxy_object.py +++ b/dask_cuda/proxy_object.py @@ -185,7 +185,7 @@ def _obj_pxy_serialize(self, serializers): with self._obj_pxy_lock: if ( self._obj_pxy["serializers"] is not None - and self._obj_pxy["serializers"] != serializers + and tuple(self._obj_pxy["serializers"]) != tuple(serializers) ): # The proxied object is serialized with other serializers self._obj_pxy_deserialize() From 81fa6156b33402f4415790733eb8683791b95803 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Fri, 20 Nov 2020 08:16:48 -0800 Subject: [PATCH 49/49] typos --- dask_cuda/proxy_object.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/dask_cuda/proxy_object.py b/dask_cuda/proxy_object.py index 24e15622f..cbb33d80c 100644 --- a/dask_cuda/proxy_object.py +++ b/dask_cuda/proxy_object.py @@ -86,7 +86,8 @@ class ProxyObject: """Object wrapper/proxy for serializable objects This is used by DeviceHostFile to delay deserialization of returned objects. - Objects proxied by an instance of this class will be JIT-deserialized when + + Objects proxied by an instance of this class will be JIT-deserialized when accessed. The instance behaves as the proxied object and can be accessed/used just like the proxied object. @@ -183,10 +184,9 @@ def _obj_pxy_serialize(self, serializers): raise ValueError("Please specify a list of serializers") with self._obj_pxy_lock: - if ( - self._obj_pxy["serializers"] is not None - and tuple(self._obj_pxy["serializers"]) != tuple(serializers) - ): + if self._obj_pxy["serializers"] is not None and tuple( + self._obj_pxy["serializers"] + ) != tuple(serializers): # The proxied object is serialized with other serializers self._obj_pxy_deserialize()