Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: accommodate dask_loads for default serializers of dask #1942

Conversation

ZhiyLiu
Copy link

@ZhiyLiu ZhiyLiu commented Jul 30, 2020

PR Checklist

  • No API changes were made (or the changes have been approved)
  • No major design changes were made (or the changes have been approved)
  • Added test (or behavior not changed)
  • Updated API documentation (or API not changed)
  • Added license to new files (if any)
  • Added Python wrapping to new files (if any) as described in ITK Software Guide Section 9.5
  • Added ITK examples for all new major features (if any)

Refer to the ITK Software Guide for
further development details if necessary.

Dask loads method may complain TypeError: No dispatch for <class 'itkPyBufferPython.NDArrayITKBase'> due to the missing of a deserialization method for NDArrayITKBase. This commit tries to solve this issue by registering a deserialization method in dask for NDArrayITKBase.

Correspondingly, the test case verifies the pickling and unpickling of a NDArrayITKBase instance.

@thewtex
Copy link
Member

thewtex commented Jul 31, 2020

@ZhiyLiu thanks!

The ITK.Linux.Python CI test is failing -- do the tests pass for you locally from a fresh build?

@thewtex
Copy link
Member

thewtex commented Jul 31, 2020

@mrocklin @jakirkham how does this look to you?

@ZhiyLiu
Copy link
Author

ZhiyLiu commented Aug 1, 2020

@ZhiyLiu thanks!

The ITK.Linux.Python CI test is failing -- do the tests pass for you locally from a fresh build?

Yes. It passes on my local machine.

Looking at the log. There is an error regarding to the compilation of C++ code, as follows:

error: ISO C++ forbids declaration of 'ITK_DISALLOW_COPY_AND_ASSIGN' with no type [-fpermissive]
ITK_DISALLOW_COPY_AND_ASSIGN(STLContainerAdaptor);

Not sure this is relevant to the python code.

Copy link
Contributor

@mrocklin mrocklin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand the changes here. Is there a particular issue or traceback somewhere that explains what was breaking before? It might be simpler for ITK to implement standard pickle serialization/deserialization (I thought that it did this already).

Comment on lines 20 to 29
try:
import numpy as np
from distributed.protocol import dask_deserialize
from typing import Dict, List
@dask_deserialize.register(NDArrayITKBase)
def deserialize(header: Dict, frames: List[bytes]) -> NDArrayITKBase:
loads = dask_deserialize.dispatch(np.ndarray)
return NDArrayITKBase(loads(header, frames))
except ImportError:
pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is typical to handle this as follows:

Suggested change
try:
import numpy as np
from distributed.protocol import dask_deserialize
from typing import Dict, List
@dask_deserialize.register(NDArrayITKBase)
def deserialize(header: Dict, frames: List[bytes]) -> NDArrayITKBase:
loads = dask_deserialize.dispatch(np.ndarray)
return NDArrayITKBase(loads(header, frames))
except ImportError:
pass
try:
import numpy as np
from distributed.protocol import dask_deserialize
from typing import Dict, List
except ImportError:
pass
else:
@dask_deserialize.register(NDArrayITKBase)
def deserialize(header: Dict, frames: List[bytes]) -> NDArrayITKBase:
loads = dask_deserialize.dispatch(np.ndarray)
return NDArrayITKBase(loads(header, frames))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that registering a deserialization function would have much of an effect without also registering a serialization function.

Comment on lines +55 to +58
pickled = pickle.dumps(ndarray_itk_base)
reloaded = pickle.loads(pickled)
equal = (reloaded == ndarray_itk_base).all()
assert equal, 'Different results before and after pickle'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test doesn't engage the code that you've written above. Pickle is the standard python serialization technique. Dask's serialization is different. If you wanted to test it you would try the following:

from distributed.protocol import serialize, deserialize

header, frames = serialize(ndarray_itk_base)
obj = deserialize(header, frames)
assert ...

@jhlegarreta
Copy link
Member

Looking at the log. There is an error regarding to the compilation of C++ code, as follows:

error: ISO C++ forbids declaration of 'ITK_DISALLOW_COPY_AND_ASSIGN' with no type [-fpermissive]
ITK_DISALLOW_COPY_AND_ASSIGN(STLContainerAdaptor);

Not sure this is relevant to the python code.

That should have been addressed in db4a897. The Python wrapping and tests succeeded in the related CI builds, so @ZhiyLiu you may want to rebase on master when addressing the comments of the rest of the folks.

@ZhiyLiu
Copy link
Author

ZhiyLiu commented Aug 4, 2020

@thewtex @mrocklin @jhlegarreta @jakirkham

Thanks for your review and nice comments! This commit aims to solves an issue showed in this notebook.

Briefly, the final cell in the notebook complains:

TypeError: No dispatch for <class 'itk.itkPyBufferPython.NDArrayITKBase'>

if no deserialization method is registered for NDArrayITKBase.

Honestly, this kind usage of dask is not quite common. In the notebook I scatter the chunked data; on the contrary, a common scatter will scatter a numpy array.

However, there will be no problem if the submitted function
def denoise(block):

...

return x # x = np.asarray(x)

returns a numpy array instead of NDArrayITKBase. This suggests that NDArrayITKBase objects cannot be used like numpy arrays in dask.

There are a couple of solutions to this issue, some of which are commented out in the notebook, e.g.,
client = Client(n_workers=8, serializers=['pickle'])

This commit is one of these solutions, that needs no special care from a user.

Hope this makes sense.

P.S. The test data involved can be obtained from OME-TIFF sample data.

@jakirkham
Copy link
Member

Sorry for the slow reply. Some thoughts on options for approaching efficient serialization with Dask.

If one communicates objects that Dask already knows how to work with (like NumPy arrays), then one will get optimized serialization for free. This may involve a little (or maybe no) code in ITK to make sure NumPy arrays are handled reasonable well and handed to Dask when functions return. The benefit here seems useful even outside the Dask context.

If one adds pickle protocol 5 support to objects (possibly extending to older Python's with pickle5 or leveraging NumPy to do this indirectly ( numpy/numpy#12091 )), then one can leverage Dask's own support for pickle protocol 5 ( dask/distributed#3784 ) ( dask/distributed#3849 ), which should yield a similar performance boost to Dask's own custom serialization (as the underlying principles are the same). This would require merely updating how pickling works in ITK. It also will work with anything else that supports pickle protocol 5.

Alternatively one can use Dask's custom serialization by following this example on how to extend to custom types. The usual strategy here is to put these registration bits in its own module. This will soften the Distributed dependency and provide a single point for registering these functions. The latter is useful as one needs to import this module in distributed for it to work. This is doable with a bit of work in ITK and Distributed, but not too difficult.

Any of these options seems fine. Possibly it's worth doing multiple or even all of them (this is what we did in RAPIDS rapidsai/cudf#5139 ). Just trying to show what's possible and what it would entail to do 🙂

@ZhiyLiu ZhiyLiu force-pushed the ENH_NDArrayITKBase_Deserialize_4_dask branch from 33ffdd6 to 0a30c60 Compare August 5, 2020 20:40
@thewtex
Copy link
Member

thewtex commented Aug 6, 2020

@ZhiyLiu looking good! @mrocklin @jakirkham thank you for the detailed reviews and pointers!

From what I can see, with a squash this is ready to merge.

  • The try, except, else syntax suggested by @mrocklin is used.
  • dask_loads, dask_dumps are now exercised in the tests.
  • The missing serialize function has been added.
  • @jakirkham did an awesome job (thanks!) of pointing to additional ways to register serialization functions and optimizations for the pickling. For the NDArrayITKBase type, we are leveraging NumPy's serialization, but we should write serialization function for other itk.DataObject's. And use the more advanced methods to make the serialization function available (assuming that this will suffice for most use cases). This is tracked in this issue: Improve Dask serializers, registration #1948

If I am missing something, please let me know; otherwise, I will merge this on Friday.

@thewtex
Copy link
Member

thewtex commented Aug 10, 2020

Merged in 27fe268

@thewtex thewtex closed this Aug 10, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants