-
-
Notifications
You must be signed in to change notification settings - Fork 727
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pickling error using multiprocessing in map_blocks #3472
Comments
I think if you move # foo.py
import multiprocessing
def fn_in_process(chunk):
with multiprocessing.Pool(1) as pool:
return pool.apply(fn, (chunk,))
def fn(chunk):
return chunk And then in your main script / notebook >>> import foo
>>> result = data.map_blocks(foo.fn_in_process, dtype=data.dtype).compute()
>>> result Pickle tends to struggle with dynamically generated functions. >>> result = data.map_blocks(fn_in_process, dtype=data.dtype)
>>> list(result.dask.values())[0][0].dsk
Out[10]:
{'fn_in_process-d70faa90b1ff8e5c7fad8cf365bfd988': (<function __main__.fn_in_process(chunk)>,
'_0')} |
Right, this is more of a problem with multiprocessing.Pool than with
anything else. One way to get around this is to use the Dask
multiprocessing scheduler and dask.delayed instead of
multiprocessing.Pool.
value = dask.delayed(fn)(chunk)
value = value.compute(scheduler="processes")
…On Thu, Feb 13, 2020 at 6:03 AM Tom Augspurger ***@***.***> wrote:
I think if you move fn and fn_in_process to a module, you'll avoid the
issue
# foo.pyimport multiprocessing
def fn_in_process(chunk):
with multiprocessing.Pool(1) as pool:
return pool.apply(fn, (chunk,))
def fn(chunk):
return chunk
And then in your main script / notebook
>>> import foo>>> result = data.map_blocks(foo.fn_in_process, dtype=data.dtype).compute()>>> result
Pickle tends to struggle with dynamically generated functions.
>>> result = data.map_blocks(fn_in_process, dtype=data.dtype)>>> list(result.dask.values())[0][0].dsk
Out[10]:
{'fn_in_process-d70faa90b1ff8e5c7fad8cf365bfd988': (<function __main__.fn_in_process(chunk)>,
'_0')}
—
You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
<#3472?email_source=notifications&email_token=AACKZTDZHUJNPLDUORB2T2DRCVHK5A5CNFSM4KUMMCHKYY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOELVB7LA#issuecomment-585768876>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AACKZTHSSU5QWNWBY7WS7CLRCVHK5ANCNFSM4KUMMCHA>
.
|
@mrocklin I may not fully grok what you are saying -- I'm not sure the scheduler type affects my base problem, which is that I'm trying to run my work within a subprocess on the task. The reason is that TF (and even other GPU workloads, I fear) does not give up GPU memory. So even if I restrict a machine to one worker per GPU, a subsequent task that needs the GPU will fail if a TF task ran before it in the same process and took up all the memory. The memory allocation is tied to a process, so by running the workload in a subprocess, the GPU memory is freed when the task completes. This thread gets hijacked a bit, but has relevant details @TomAugspurger Well, pickle me tink. By condensing my repro example to a single file, I actually changed the bug. So, now back to the original. Tagging @eric-czech, as this seems to be tickled by padding.
|
Understood. The error that I'm seeing on the screen is from Multiprocessing not being able to serialize a function (presumably the |
[Deleted a previous comment that was here, as part of the issue might be stemming from odd shapes of my data and psf. Working on repro-ing a problem I'm seeing with deadlock in my project with this solution. ] |
I would echo the suggestions from @mrocklin and @TomAugspurger to define your functions in some module and then load that module in the tasks instead (and use the "processes" scheduler). For example if you made a module like from flowdec import restoration as fd_restoration
from flowdec import data as fd_data
import numpy as np
# Make sure this isn't defined every time in the decon method! You may know this already
# but each new "algo" instance corresponds to a new TF graph w/ lots of overhead
# so they should be created as infrequently as possible
algo = fd_restoration.RichardsonLucyDeconvolver(n_dims=3, pad_mode='2357', device='/cpu:0').initialize()
def deconvolve_chunk(chunk):
psf = np.zeros((16,16,16))
acq = fd_data.Acquisition(data=chunk, kernel=psf)
res = algo.run(acq, niter=1)
return chunk # return res instead? and then called it in another script/notebook like: # You have to use processes=True to ever use multiple GPUS (or at least I could never get TF to play nicely with multiple GPUs in a single process w/ the threading scheduler)
client = distributed.Client(n_workers=1, threads_per_worker=1, processes=True)
data = da.zeros((8,16,16), dtype=np.uint16)
def process_chunk(chunk):
import my_decon_module
return my_decon_module.deconvolve_chunk(chunk)
data_deconv_dct = data.map_blocks(process_chunk, dtype=data.dtype)
result = data_deconv_dct.compute() it should eschew the serialization issues from passing closures out of the top level. It did so for me at least, and you could also do that with your own multiprocessing Pool but I don't see why you'd need to with |
@eric-czech The problem I'm trying to solve is having TF give back the memory, so a subsequent dask task can use the GPU with another framework (say cupy). I think the code you have will claim the memory for TF for all time for that dask worker. Perhaps I'm not following, though. Operating under the assumption I need to do the work in subprocesses (the subprocess will give back the GPU memory it claims), I needed processes=False, or use --no-nanny in a cluster, in order to avoid the "daemonic processes can't create children" error. The workers/threads specification was just to force a single-worker with a single-thread on my workstation, which has a single GPU (and that is how I setup my Kubernetes pods -- one dask worker on a machine with one GPU). I'm happy to figure out multi-GPU at some later time. |
[Updated] @mrocklin Do I need to watch out for any data transfer (not deadlock) issues? I'm seeing them in my project code with this fix, but it's hard to isolate. I get a whole bunch of tasks/deconvolutions done, and then everything just halts. I've even simplified it to remove any computation or GPU usages -- just map_overlaps with returning a no-op computation. A lot of time is being spent in dask.local calling queue.not_empty.wait. |
This snippet demonstrates the overhead of the dask-within-dask scheduling. It takes 10-15 seconds to do a no-op operation on O(100MB) chunk of data. For my larger datasets, this was going for many many minutes (I killed it before waiting.)
|
Ah I see, well if you have to rely on a subprocess dying to free the memory before a subsequent task runs then I don't see how you could possibly avoid the TF initialization and graph construction for every single chunk or image too (I'm following on your example now). Would you want that even if you can work out the dispatching/scheduling? You could potentially move the hack to the hardware level and on a machine with two GPUs dispatch decon tasks to one GPU + worker + process combo with the other being free for non-TF tasks. That would at least avoid the performance problems. I don't know how much you've looked at the core parts of the flowdec code yet, beyond the stuff related to TF 2.x compatibility, but I think it would actually be fairly easy to roll a simplified port in CuPy if you've got the time and can't find any other deconvolution packages that integrate well. |
Thanks for the idea on the dual-GPU worker, @eric-czech. As far as CuPy, I'm guessing it may do something similar with memory, and so may not solve the problem -- and I'm not sure if it would be faster/slower in performance or setup. As this point, I'm essentially using your suggestion, but at a higher level. I have a different pool of dask workers for these deconv tasks that use TF, and am using dask resources to decide where to put the work. |
Closing this because it's not a dask error, but rather a consequence of a particular use of code within multiprocessing on dask workers. It's not a general problem, and I've found a sufficient workaround. |
In my brief experience some of the other deep learning libraries like
PyTorch or CuPy/Chainer are more interoperable. Users who use PyTorch or
CuPy with Dask tend to be much happier. Those libraries assume much less
about the environment in which they run, while Tensorflow seems to expect a
fairly controlled environment.
…On Fri, Feb 14, 2020 at 7:09 AM Chris Roat ***@***.***> wrote:
Closed #3472 <#3472>.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#3472?email_source=notifications&email_token=AACKZTFKL5YEG53OGBYJ5HLRC2XY7A5CNFSM4KUMMCHKYY3PNVWWK3TUL52HS4DFWZEXG43VMVCXMZLOORHG65DJMZUWGYLUNFXW5KTDN5WW2ZLOORPWSZGOWUSHSPY#event-3039066431>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AACKZTFQZ5PQLRGL3RCCO4TRC2XY7ANCNFSM4KUMMCHA>
.
|
Tensorflow holds onto GPU memory, so one trick is to drop into a new process -- the memory is released when the process ends. I use a
multiprocessing
pool withprocesses=False
or--no-nanny
, to avoid #2142 .The error in my original code, which uses flowdec was:
As I pared back to a minimal example, the pickling error changed. Code and new error are below. Note that creating the array with
np.zeros
and converting it to dask.Array triggers the error, but usingda.zeros
does not. I've found this error with both 2.9.1 and 2.10.1.I would love help understanding what may be happening.
The text was updated successfully, but these errors were encountered: