Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pickling error using multiprocessing in map_blocks #3472

Closed
chrisroat opened this issue Feb 13, 2020 · 13 comments
Closed

Pickling error using multiprocessing in map_blocks #3472

chrisroat opened this issue Feb 13, 2020 · 13 comments

Comments

@chrisroat
Copy link
Contributor

Tensorflow holds onto GPU memory, so one trick is to drop into a new process -- the memory is released when the process ends. I use a multiprocessing pool with processes=False or --no-nanny, to avoid #2142 .

The error in my original code, which uses flowdec was:

MaybeEncodingError: Error sending result: '<multiprocessing.pool.ExceptionWithTraceback object at 0x7fe8f2d44f50>'. Reason: 'TypeError("can't pickle _thread.RLock objects")'

As I pared back to a minimal example, the pickling error changed. Code and new error are below. Note that creating the array with np.zeros and converting it to dask.Array triggers the error, but using da.zeros does not. I've found this error with both 2.9.1 and 2.10.1.

I would love help understanding what may be happening.

import distributed

import numpy as np
import dask.array as da
import multiprocessing

def fn_in_process(chunk):
    with multiprocessing.Pool(1) as pool:
        return pool.apply(fn, (chunk,))

def fn(chunk):
    return chunk

client = distributed.Client(processes=False)

arr = np.zeros((32,32))
data = da.from_array(arr)
# data = da.zeros((32,32))  # this actually work
result = data.map_blocks(fn_in_process, dtype=data.dtype).compute()
distributed.worker - WARNING -  Compute Failed
Function:  subgraph_callable
args:      (array([[0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       ...,
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.]]))
kwargs:    {}
Exception: PicklingError("Can't pickle <function fn at 0x7f232dc01b90>: it's not the same object as __main__.fn")
@TomAugspurger
Copy link
Member

I think if you move fn and fn_in_process to a module, you'll avoid the issue

# foo.py
import multiprocessing

def fn_in_process(chunk):
    with multiprocessing.Pool(1) as pool:
        return pool.apply(fn, (chunk,))

def fn(chunk):
    return chunk

And then in your main script / notebook

>>> import foo
>>> result = data.map_blocks(foo.fn_in_process, dtype=data.dtype).compute()
>>> result

Pickle tends to struggle with dynamically generated functions.

>>> result = data.map_blocks(fn_in_process, dtype=data.dtype)
>>> list(result.dask.values())[0][0].dsk
Out[10]:
{'fn_in_process-d70faa90b1ff8e5c7fad8cf365bfd988': (<function __main__.fn_in_process(chunk)>,
  '_0')}

@mrocklin
Copy link
Member

mrocklin commented Feb 13, 2020 via email

@chrisroat
Copy link
Contributor Author

@mrocklin I may not fully grok what you are saying -- I'm not sure the scheduler type affects my base problem, which is that I'm trying to run my work within a subprocess on the task. The reason is that TF (and even other GPU workloads, I fear) does not give up GPU memory. So even if I restrict a machine to one worker per GPU, a subsequent task that needs the GPU will fail if a TF task ran before it in the same process and took up all the memory. The memory allocation is tied to a process, so by running the workload in a subprocess, the GPU memory is freed when the task completes. This thread gets hijacked a bit, but has relevant details

@TomAugspurger Well, pickle me tink. By condensing my repro example to a single file, I actually changed the bug. So, now back to the original. Tagging @eric-czech, as this seems to be tickled by padding.

$ cat deconvolve.py 
import multiprocessing
from flowdec import restoration as fd_restoration
from flowdec import data as fd_data
import numpy as np

def deconvolve_chunk_in_process(chunk):
    with multiprocessing.Pool(1) as pool:
        return pool.apply(deconvolve_chunk, (chunk,))

def deconvolve_chunk(chunk):
    psf = np.zeros((16,16,16))
    algo = fd_restoration.RichardsonLucyDeconvolver(n_dims=3, pad_mode='2357', device='/cpu:0').initialize()
    acq = fd_data.Acquisition(data=chunk, kernel=psf)
    res = algo.run(acq, niter=1)
    return chunk

$ cat issue.py 
import dask.array as da
import distributed
import deconvolve
import numpy as np

client = distributed.Client(n_workers=1, threads_per_worker=1, processes=False)
data = da.zeros((8,16,16), dtype=np.uint16)
data_deconv_dct = data.map_blocks(deconvolve.deconvolve_chunk_in_process, dtype=data.dtype)
result = data_deconv_dct.compute()

$ pip install dask distributed numpy flowdec tensorflow  # flowdec needs tensorflow, though not directly imported
$ python issue.py
... snip ...
distributed.worker - WARNING -  Compute Failed
Function:  execute_task
args:      ((subgraph_callable, (functools.partial(<built-in function zeros>, dtype=dtype('uint16')), (8, 16, 16))))
kwargs:    {}
Exception: <MaybeEncodingError: Error sending result: '<multiprocessing.pool.ExceptionWithTraceback object at 0x7fb573036bd0>'. Reason: 'TypeError("can't pickle _thread.RLock objects")'>

Traceback (most recent call last):
  File "issue.py", line 9, in <module>
    result = data_deconv_dct.compute()
  File "/home/roat/multiprocessing_bug/venv/lib/python3.7/site-packages/dask/base.py", line 165, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/home/roat/multiprocessing_bug/venv/lib/python3.7/site-packages/dask/base.py", line 436, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/home/roat/multiprocessing_bug/venv/lib/python3.7/site-packages/distributed/client.py", line 2587, in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
  File "/home/roat/multiprocessing_bug/venv/lib/python3.7/site-packages/distributed/client.py", line 1885, in gather
    asynchronous=asynchronous,
  File "/home/roat/multiprocessing_bug/venv/lib/python3.7/site-packages/distributed/client.py", line 767, in sync
    self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
  File "/home/roat/multiprocessing_bug/venv/lib/python3.7/site-packages/distributed/utils.py", line 345, in sync
    raise exc.with_traceback(tb)
  File "/home/roat/multiprocessing_bug/venv/lib/python3.7/site-packages/distributed/utils.py", line 329, in f
    result[0] = yield future
  File "/home/roat/multiprocessing_bug/venv/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/home/roat/multiprocessing_bug/venv/lib/python3.7/site-packages/distributed/client.py", line 1741, in _gather
    raise exception.with_traceback(traceback)
  File "/home/roat/multiprocessing_bug/venv/lib/python3.7/site-packages/dask/optimization.py", line 982, in __call__
    return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
  File "/home/roat/multiprocessing_bug/venv/lib/python3.7/site-packages/dask/core.py", line 149, in get
    result = _execute_task(task, cache)
  File "/home/roat/multiprocessing_bug/venv/lib/python3.7/site-packages/dask/core.py", line 119, in _execute_task
    return func(*args2)
  File "/home/roat/multiprocessing_bug/deconvolve.py", line 8, in deconvolve_chunk_in_process
    return pool.apply(deconvolve_chunk, (chunk,))
  File "/usr/lib/python3.7/multiprocessing/pool.py", line 261, in apply
    return self.apply_async(func, args, kwds).get()
  File "/usr/lib/python3.7/multiprocessing/pool.py", line 657, in get
    raise self._value
multiprocessing.pool.MaybeEncodingError: Error sending result: '<multiprocessing.pool.ExceptionWithTraceback object at 0x7fb573036bd0>'. Reason: 'TypeError("can't pickle _thread.RLock objects")'

@mrocklin
Copy link
Member

The memory allocation is tied to a process, so by running the workload in a subprocess, the GPU memory is freed when the task completes

Understood. The error that I'm seeing on the screen is from Multiprocessing not being able to serialize a function (presumably the fn function). Multiprocessing isn't good for these kinds of interactive sessions. So I'm proposing that you swap out that one part of your workflow for a second and entirely independent use of Dask's multiprocessing scheduler, which gets around the serialization/pickle issues. I'm suggesting that you use Dask within Dask.

@chrisroat
Copy link
Contributor Author

chrisroat commented Feb 13, 2020

[Deleted a previous comment that was here, as part of the issue might be stemming from odd shapes of my data and psf. Working on repro-ing a problem I'm seeing with deadlock in my project with this solution. ]

@eric-czech
Copy link

eric-czech commented Feb 13, 2020

I would echo the suggestions from @mrocklin and @TomAugspurger to define your functions in some module and then load that module in the tasks instead (and use the "processes" scheduler). For example if you made a module like my_decon_module.py as follows:

from flowdec import restoration as fd_restoration
from flowdec import data as fd_data
import numpy as np

# Make sure this isn't defined every time in the decon method!  You may know this already
# but each new "algo" instance corresponds to a new TF graph w/ lots of overhead
# so they should be created as infrequently as possible
algo = fd_restoration.RichardsonLucyDeconvolver(n_dims=3, pad_mode='2357', device='/cpu:0').initialize()

def deconvolve_chunk(chunk):
    psf = np.zeros((16,16,16))
    acq = fd_data.Acquisition(data=chunk, kernel=psf)
    res = algo.run(acq, niter=1)
    return chunk # return res instead?

and then called it in another script/notebook like:

# You have to use processes=True to ever use multiple GPUS (or at least I could never get TF to play nicely with multiple GPUs in a single process w/ the threading scheduler)
client = distributed.Client(n_workers=1, threads_per_worker=1, processes=True)
data = da.zeros((8,16,16), dtype=np.uint16)

def process_chunk(chunk):
    import my_decon_module
    return my_decon_module.deconvolve_chunk(chunk)

data_deconv_dct = data.map_blocks(process_chunk, dtype=data.dtype)
result = data_deconv_dct.compute()

it should eschew the serialization issues from passing closures out of the top level. It did so for me at least, and you could also do that with your own multiprocessing Pool but I don't see why you'd need to with processes=True and n_workers=1, threads_per_worker=1. I think that gets you what you're looking for -- only one chunk being deconvolved at a time on a single process, and you can increases the number of workers to match the number of GPUs (each worker will correspond to a new process).

@chrisroat
Copy link
Contributor Author

@eric-czech The problem I'm trying to solve is having TF give back the memory, so a subsequent dask task can use the GPU with another framework (say cupy). I think the code you have will claim the memory for TF for all time for that dask worker. Perhaps I'm not following, though.

Operating under the assumption I need to do the work in subprocesses (the subprocess will give back the GPU memory it claims), I needed processes=False, or use --no-nanny in a cluster, in order to avoid the "daemonic processes can't create children" error.

The workers/threads specification was just to force a single-worker with a single-thread on my workstation, which has a single GPU (and that is how I setup my Kubernetes pods -- one dask worker on a machine with one GPU). I'm happy to figure out multi-GPU at some later time.

@chrisroat
Copy link
Contributor Author

chrisroat commented Feb 13, 2020

[Updated]

@mrocklin Do I need to watch out for any data transfer (not deadlock) issues? I'm seeing them in my project code with this fix, but it's hard to isolate. I get a whole bunch of tasks/deconvolutions done, and then everything just halts. I've even simplified it to remove any computation or GPU usages -- just map_overlaps with returning a no-op computation. A lot of time is being spent in dask.local calling queue.not_empty.wait.

@chrisroat
Copy link
Contributor Author

chrisroat commented Feb 13, 2020

This snippet demonstrates the overhead of the dask-within-dask scheduling. It takes 10-15 seconds to do a no-op operation on O(100MB) chunk of data. For my larger datasets, this was going for many many minutes (I killed it before waiting.)

import dask
import dask.array as da
import distributed
import numpy as np

def deconvolve_chunk_in_process(chunk):
    # Sub-second computation, though takes 5 seconds if processes=True in the client
    # return chunk
    
    # 13-15 seconds
    value = dask.delayed(deconvolve_chunk)(chunk)
    return value.compute(scheduler='processes', num_workers=1)

def deconvolve_chunk(chunk):
    return chunk

client = distributed.Client(n_workers=1, threads_per_worker=1, processes=False)

data = np.zeros((4, 32, 1024, 1024))
data = da.from_array(data, chunks=(4, 32, 512, 512))

data_deconv = data.map_blocks(deconvolve_chunk_in_process, dtype=data.dtype)

result = dask.compute(data_deconv)

@eric-czech
Copy link

Ah I see, well if you have to rely on a subprocess dying to free the memory before a subsequent task runs then I don't see how you could possibly avoid the TF initialization and graph construction for every single chunk or image too (I'm following on your example now). Would you want that even if you can work out the dispatching/scheduling?

You could potentially move the hack to the hardware level and on a machine with two GPUs dispatch decon tasks to one GPU + worker + process combo with the other being free for non-TF tasks. That would at least avoid the performance problems.

I don't know how much you've looked at the core parts of the flowdec code yet, beyond the stuff related to TF 2.x compatibility, but I think it would actually be fairly easy to roll a simplified port in CuPy if you've got the time and can't find any other deconvolution packages that integrate well.

@chrisroat
Copy link
Contributor Author

Thanks for the idea on the dual-GPU worker, @eric-czech. As far as CuPy, I'm guessing it may do something similar with memory, and so may not solve the problem -- and I'm not sure if it would be faster/slower in performance or setup.

As this point, I'm essentially using your suggestion, but at a higher level. I have a different pool of dask workers for these deconv tasks that use TF, and am using dask resources to decide where to put the work.

@chrisroat
Copy link
Contributor Author

Closing this because it's not a dask error, but rather a consequence of a particular use of code within multiprocessing on dask workers. It's not a general problem, and I've found a sufficient workaround.

@mrocklin
Copy link
Member

mrocklin commented Feb 14, 2020 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants