Skip to content

Commit

Permalink
Merge pull request #5139 from jakirkham/add_serializable
Browse files Browse the repository at this point in the history
Add ``Serializable`` ABC for Python
  • Loading branch information
jakirkham authored May 20, 2020
2 parents 855648c + 3491152 commit d9629b1
Show file tree
Hide file tree
Showing 11 changed files with 107 additions and 83 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
- PR #4881 Support row_number in rolling_window
- PR #5068 Add Java bindings for arctan2
- PR #5132 Support out-of-band buffers in Python pickling
- PR #5139 Add ``Serializable`` ABC for Python
- PR #5149 Add Java bindings for PMOD
- PR #5153 Add Java bindings for extract
- PR #5196 Add Java bindings for NULL_EQUALS, NULL_MAX and NULL_MIN
Expand Down
48 changes: 10 additions & 38 deletions python/cudf/cudf/comm/serialize.py
Original file line number Diff line number Diff line change
@@ -1,59 +1,31 @@
import pickle

import cudf
import cudf.core.groupby.groupby

# all (de-)serializations are attached to cudf Objects:
# Series/DataFrame/Index/Column/Buffer/etc
serializable_classes = (
cudf.CategoricalDtype,
cudf.DataFrame,
cudf.Index,
cudf.MultiIndex,
cudf.Series,
cudf.core.groupby.groupby.GroupBy,
cudf.core.groupby.groupby._Grouping,
cudf.core.column.column.ColumnBase,
cudf.core.buffer.Buffer,
)
import cudf # noqa: F401
from cudf.core.abc import Serializable

try:
from distributed.protocol import dask_deserialize, dask_serialize
from distributed.protocol.cuda import cuda_deserialize, cuda_serialize
from distributed.utils import log_errors

@cuda_serialize.register(serializable_classes)
@cuda_serialize.register(Serializable)
def cuda_serialize_cudf_object(x):
with log_errors():
header, frames = x.serialize()
assert all((type(f) is cudf.core.buffer.Buffer) for f in frames)
header["lengths"] = [f.nbytes for f in frames]
return header, frames
return x.device_serialize()

# all (de-)serializations are attached to cudf Objects:
# Series/DataFrame/Index/Column/Buffer/etc
@dask_serialize.register(serializable_classes)
@dask_serialize.register(Serializable)
def dask_serialize_cudf_object(x):
header, frames = cuda_serialize_cudf_object(x)
with log_errors():
frames = [f.to_host_array().data for f in frames]
return header, frames
return x.host_serialize()

@cuda_deserialize.register(serializable_classes)
@dask_deserialize.register(serializable_classes)
@cuda_deserialize.register(Serializable)
@dask_deserialize.register(Serializable)
def deserialize_cudf_object(header, frames):
with log_errors():
if header["serializer"] == "cuda":
for f in frames:
# some frames are empty -- meta/empty partitions/etc
if len(f) > 0:
assert hasattr(f, "__cuda_array_interface__")
return Serializable.device_deserialize(header, frames)
elif header["serializer"] == "dask":
frames = [memoryview(f) for f in frames]

cudf_typ = pickle.loads(header["type-serialized"])
cudf_obj = cudf_typ.deserialize(header, frames)
return cudf_obj
return Serializable.host_deserialize(header, frames)


except ImportError:
Expand Down
62 changes: 62 additions & 0 deletions python/cudf/cudf/core/abc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Copyright (c) 2020, NVIDIA CORPORATION.

import abc
import pickle
from abc import abstractmethod

import numpy

import rmm

import cudf


class Serializable(abc.ABC):
@abstractmethod
def serialize(self):
pass

@classmethod
@abstractmethod
def deserialize(cls, header, frames):
pass

def device_serialize(self):
header, frames = self.serialize()
assert all((type(f) is cudf.core.buffer.Buffer) for f in frames)
header["type-serialized"] = pickle.dumps(type(self))
header["lengths"] = [f.nbytes for f in frames]
return header, frames

@classmethod
def device_deserialize(cls, header, frames):
for f in frames:
# some frames are empty -- meta/empty partitions/etc
if len(f) > 0:
assert hasattr(f, "__cuda_array_interface__")

typ = pickle.loads(header["type-serialized"])
obj = typ.deserialize(header, frames)

return obj

def host_serialize(self):
header, frames = self.device_serialize()
frames = [f.to_host_array().view("u1").data for f in frames]
return header, frames

@classmethod
def host_deserialize(cls, header, frames):
frames = [
rmm.DeviceBuffer.to_device(memoryview(f).cast("B")) for f in frames
]
obj = cls.device_deserialize(header, frames)
return obj

def __reduce_ex__(self, protocol):
header, frames = self.host_serialize()
if protocol >= 5:
frames = [pickle.PickleBuffer(f) for f in frames]
else:
frames = [numpy.asarray(f) for f in frames]
return self.host_deserialize, (header, frames)
10 changes: 3 additions & 7 deletions python/cudf/cudf/core/buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
import rmm
from rmm import DeviceBuffer, _DevicePointer

from cudf.core.abc import Serializable

class Buffer:

class Buffer(Serializable):
def __init__(self, data=None, size=None, owner=None):
"""
A Buffer represents a device memory allocation.
Expand Down Expand Up @@ -58,12 +60,6 @@ def __init__(self, data=None, size=None, owner=None):
raise TypeError("data must be Buffer, array-like or integer")
self._init_from_array_like(np.asarray(data), owner)

def __reduce_ex__(self, protocol):
data = self.to_host_array()
if protocol >= 5:
data = pickle.PickleBuffer(data)
return self.__class__, (data,)

def __len__(self):
return self.size

Expand Down
17 changes: 2 additions & 15 deletions python/cudf/cudf/core/column/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from cudf._lib.scalar import as_scalar
from cudf._lib.stream_compaction import unique_count as cpp_unique_count
from cudf._lib.transform import bools_to_mask
from cudf.core.abc import Serializable
from cudf.core.buffer import Buffer
from cudf.core.dtypes import CategoricalDtype
from cudf.utils import cudautils, ioutils, utils
Expand All @@ -38,7 +39,7 @@
from cudf.utils.utils import buffers_from_pyarrow, mask_dtype


class ColumnBase(Column):
class ColumnBase(Column, Serializable):
def __init__(
self,
data,
Expand Down Expand Up @@ -67,20 +68,6 @@ def __init__(
children=children,
)

def __reduce__(self):
return (
build_column,
(
self.data,
self.dtype,
self.mask,
self.size,
0,
self.null_count,
self.children,
),
)

def as_frame(self):
from cudf.core.frame import Frame

Expand Down
6 changes: 2 additions & 4 deletions python/cudf/cudf/core/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from cudf._lib.null_mask import MaskState, create_null_mask
from cudf._lib.nvtx import annotate
from cudf.core import column
from cudf.core.abc import Serializable
from cudf.core.column import as_column, column_empty
from cudf.core.column_accessor import ColumnAccessor
from cudf.core.frame import Frame
Expand Down Expand Up @@ -87,7 +88,7 @@ def _reverse_op(fn):
}[fn]


class DataFrame(Frame):
class DataFrame(Frame, Serializable):
"""
A GPU Dataframe object.
Expand Down Expand Up @@ -1684,9 +1685,6 @@ def __deepcopy__(self, memo={}):
memo = {}
return self.copy(deep=True)

def __reduce__(self):
return (DataFrame, (self._data, self.index))

@annotate("INSERT", color="green", domain="cudf_python")
def insert(self, loc, name, value):
""" Add a column to DataFrame at the index specified by loc.
Expand Down
5 changes: 3 additions & 2 deletions python/cudf/cudf/core/groupby/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@
import cudf
import cudf._lib.groupby as libgroupby
from cudf._lib.nvtx import annotate
from cudf.core.abc import Serializable


class GroupBy(object):
class GroupBy(Serializable):
"""
Group a DataFrame or Series by a set of columns.
Expand Down Expand Up @@ -513,7 +514,7 @@ def __init__(self, key=None, level=None):
self.level = level


class _Grouping(object):
class _Grouping(Serializable):
def __init__(self, obj, by=None, level=None):
self._obj = obj
self._key_columns = []
Expand Down
14 changes: 2 additions & 12 deletions python/cudf/cudf/core/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from __future__ import division, print_function

import functools
import pickle

import cupy
Expand All @@ -11,6 +10,7 @@

import cudf
from cudf._lib.nvtx import annotate
from cudf.core.abc import Serializable
from cudf.core.column import (
CategoricalColumn,
ColumnBase,
Expand Down Expand Up @@ -56,7 +56,7 @@ def _to_frame(this_index, index=True, name=None):
)


class Index(Frame):
class Index(Frame, Serializable):
"""The root interface for all Series indexes.
"""

Expand Down Expand Up @@ -657,9 +657,6 @@ def __getitem__(self, index):
def __eq__(self, other):
return super(type(self), self).__eq__(other)

def __reduce__(self):
return (RangeIndex, (self._start, self._stop, self.name))

def equals(self, other):
if self is other:
return True
Expand Down Expand Up @@ -840,13 +837,6 @@ def copy(self, deep=True):
def __sizeof__(self):
return self._values.__sizeof__()

def __reduce__(self):
_maker = functools.partial(
self.__class__, self._values, name=self.name
)

return _maker, ()

def __len__(self):
return len(self._values)

Expand Down
3 changes: 2 additions & 1 deletion python/cudf/cudf/core/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import cudf
import cudf._lib as libcudf
from cudf._lib.nvtx import annotate
from cudf.core.abc import Serializable
from cudf.core.column import (
ColumnBase,
DatetimeColumn,
Expand All @@ -35,7 +36,7 @@
)


class Series(Frame):
class Series(Frame, Serializable):
"""
Data and null-masks.
Expand Down
10 changes: 10 additions & 0 deletions python/cudf/cudf/tests/test_pickling.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ def check_serialization(df):
sortvaldf = df.sort_values("vals")
assert isinstance(sortvaldf.index, GenericIndex)
assert_frame_picklable(sortvaldf)
# out-of-band
if pickle.HIGHEST_PROTOCOL >= 5:
buffers = []
serialbytes = pickle.dumps(
df, protocol=5, buffer_callback=buffers.append
)
for b in buffers:
assert isinstance(b, pickle.PickleBuffer)
loaded = pickle.loads(serialbytes, buffers=buffers)
pd.util.testing.assert_frame_equal(loaded.to_pandas(), df.to_pandas())


def assert_frame_picklable(df):
Expand Down
14 changes: 10 additions & 4 deletions python/cudf/cudf/tests/test_serialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,26 @@
# pd.util.testing.makeMultiIndex, # Indices not serialized on device
],
)
def test_serialize(df):
@pytest.mark.parametrize("to_host", [True, False])
def test_serialize(df, to_host):
""" This should hopefully replace all functions below """
a = df()
if "cudf" not in type(a).__module__:
a = cudf.from_pandas(a)
header, frames = a.serialize()
if to_host:
header, frames = a.host_serialize()
else:
header, frames = a.device_serialize()
msgpack.dumps(header) # ensure that header is msgpack serializable
ndevice = 0
for frame in frames:
if not isinstance(frame, (bytes, memoryview)):
if hasattr(frame, "__cuda_array_interface__"):
ndevice += 1
# Indices etc. will not be DeviceNDArray
# but data should be...
if hasattr(df, "_cols"):
if to_host:
assert ndevice == 0
elif hasattr(df, "_cols"):
assert ndevice >= len(df._data)
else:
assert ndevice > 0
Expand Down

0 comments on commit d9629b1

Please sign in to comment.