-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Parallel generator pool thing #90
Comments
@ali1234 I was thinking that this is really interesting idea for my own purposes as well since I'm working towards building out a stream based analytics system for financial data. The idea has interesting implications for processing real-time frame oriented data (like audio) where you could just keep inter-frame state in a generator's local scope as opposed managing it with more "global" variables. Also neat if the switch from parallel <-> non-parallel is simple as well. I want to respond to a bunch of this stuff in terms of what I'll report back soon! |
I'm using it to process video data. Yes - you can keep some state in the generators, but only in as much as it does not affect the output. For example, the state i set up and tear down is mostly setting up a CUDA context, compiling kernels, and loading pattern data in to GPU memory. That can and must only be done once in any given process. Putting it in to the generator instead of a separate set up function makes it far easier to only do the set up when it is necessary (ie it only happens if the pipeline contains that particular generator - the pipeline builder does not need to keep track of it). As soon as you try to parallelize a generator like this, it can't have results that are influenced by previous results, because operations will happen in a different order (from the pov of each subprocess, some will not happen at all as they happen in a different process.) That said, if the inter-frame state is something like "count the number of frames with property x" then you could add a thing to sum the results from each subprocess fairly easily. |
More contretely, if you have a generator like:
This generator isn't pure according to my definition, and isn't a case that I need to handle, but if the pool is aware, it can still be parallelized by just adding together the final value yielded by each subprocess. |
Have you considered switching from the generator paradigm to the synchronous coroutine paradigm? The outer layer would be simpler and not require as many queues (at the expense of the slightly weird worker function style). So you could just: def do_the_work():
some_context = init()
result = None
with some_context: # I heard somewhere that we should beware of context in generators/coroutines but I don't exactly understand in which cases it's ok / not ok
while True:
task = yield result
result = task.arg[0] + 1 # replace with actual code to produce result
coro = do_the_work()
with closing(coro): # make sure the context inside the inner coroutine is finalized by throwing a GeneratorClosed exception in it
next(coro) # prime it (run the part of the coroutine up to first yield)
for task_num, task_obj in iter(work_queue.get, None):
response = coro.send(task_obj)
done_queue.put((task_num, response)) |
@parity3 that seems to be pretty much what I am looking for, but I need it to also be multiprocess to leverage multiple CPUs and GPUs. I don't see any benefit to making the rest of my codebase work this way as it just adds extra complexity, so I would like it all wrapped in a function that looks like a normal generator from outside. Then it can be applied to any existing (pure) generator without having to refactor it and additionally the codebase can still work (slowly) even if some particular async library isn't available. |
Just quickly addressing the pitfalls section of your current solution:
Responding in order:
Regarding,
The generator composition here is why I think a reactive style api for building so called "pipelines" (really in the functional world these are just plain old compositions) is probably the best approach. There's good motivation (streamz and rxpy) for why such systems exist and how generators / iterators are insufficient for more complex flows. I want to dig into this more later on but for now just know support for a declarative style streaming system is definitely something |
Sounds good. Re 3., my streams are basically pure data - ints, strings, tuples, and numpy arrays. Nothing that should be particularly difficult to serialize. I do need it to be fast though. And Re streamz, as I mentioned, I looked at it the other day. It looks like a fine API, but they have a rationale that says "generators quickly become cumbersome" without actually explaining why, and I don't think they are cumbersome at all. The benefit of using generators is that you can use them with a large number of other libraries, eg tqdm by just inserting a tqdm object anywhere in the pipeline:
... and now I've got a progress bar with no other changes. Notice that |
I just completely rewrote mp.py to use a pipe per worker instead of a single pair of shared queues. This makes the code much simpler and removes pitfalls 1 and 3 entirely. Still can't tell why a worker crashed, but it is much easier to detect it happened since the pipe will die and raise an error. That means most of the shared events are no longer needed. It still has the same apply API. The best part is it made my slowest pipeline 10% faster, and the "no op" demo at the bottom of mp.py runs about 2.5x as fast. So the shared queues must have had quite a lot of overhead, for apparently little benefit and a lot of headaches. |
@ali1234 sweet, yes this looks much easier to understand now. The reactive stuff becomes useful when you want to do forking and merging of streams (obviously not used in your case) as well as time based operations. I will take a look at implementing a version with |
I think I've only used New mp.py turned out to be broken on Windows because |
Go a proper fix for the blocking pipes: use threads. This allows everything to continue (slowly) even if the pipe blocks in both directions, as the reader thread will eventually unblock it. It also allows the pool to block on pipe.recv instead of polling which is faster and also means workers use no CPU when waiting for work. It doesn't quite make up for the overhead of the extra queues, but it is still faster than |
Not sure if this helps anything or not, but I remember seeing this project a while back: https://github.com/KholdStare/generators-to-coroutines It actually provides a decorator which takes any iterator function and ast-transforms it into a coroutine! It could help in certain scenarios especially if using a heavy-weight construct like a thread/queue to get around this issue. |
Though not for the same type of work (this is targetting IO bound) the |
@ali1234 sorry to have left this for so long but we do indeed need something like this for |
Please do. I ended up rewriting my code with ZMQ. Not sure if that was before or after we discussed it. It is slightly more portable but still has many of the same issues, mostly involving deadlocks and IPC. |
@ali1234 nice. Link to the new code if you don't mind (I'm sure I can also find it if need be). I will definitely keep you in the loop. The new sub-system will hopefully get broken out into a new repo within the next few weeks. |
New code is at the same link: https://github.com/ali1234/vhs-teletext/blob/master/teletext/mp.py The API is the same. It just uses ZMQ sockets instead of multiprocessing shared memory objects. |
As bump for this issue after discussion in chat.
import time
from itertools import count
from multiprocessing import Pool
from functools import partial
already_called = False
def work(it, a, b):
global already_called
if already_called:
raise AssertionError("You can only call work() once per process.")
already_called = True
time.sleep(1)
for x in it:
time.sleep(0.1)
yield (a*x)**b
def helper(x, args):
return next(work([x], *args))
def parallel(f, it, *args):
with Pool(1) as p:
yield from p.map(partial(helper, args=args), it)
def notparallel(f, it, *args):
yield from f(it, *args)
if __name__ == '__main__':
parallel(work, count(), 3, 2)
notparallel(work, count(), 3, 2) Further comments:
|
Linking to @richardsheridan's |
Best to paste the code from the third and final link, because the original one I shared was half finished and doesn't work. And those pastebins will expire in a month I think. |
@ali1234 ur wish is me command: from itertools import count, islice
import time
import timeit
from multiprocessing import Pool
from functools import partial
already_called = False
def work(it, a, b):
global already_called
if already_called: # simulate not thread safe
assert False, "You called work() more than once in the same process."
already_called = True
time.sleep(0.1) # simulate slow set up.
for x in it:
time.sleep(0.01) # simulate work per work item.
yield (a*x)**b
def notparallel(procs, f, it, *args):
# procs is ignored because this is notparallel
global already_called
already_called = False
yield from f(it, *args)
def parallel(procs, f, it, *args):
# only modify the body of this function
yield from notparallel(procs, f, it, *args)
if __name__ == '__main__':
procs = 2
start = time.time()
a = list(islice(notparallel(procs, work, count(), 3, 2), 100))
atime = time.time() - start
start = time.time()
b = list(islice(parallel(procs, work, count(), 3, 2), 100))
btime = time.time() - start
# check we got the same result
assert(all(x == y for x, y in zip(a, b)))
print(atime, btime)
# check it was actually faster
assert(btime < atime)
# Task: Rewrite parallel() so that it runs faster
# using parallel processing. It must not be slower
# than notparallel() for any pool size:
# 1 < N < number of available processors
# You may not modify the interface of (not)parallel().
# Note: parallel() must return results in the same order as
# the original iterable.
# You may not modify work() or count().
# work() performs one-time setup which takes a long time.
# and may crash if run more than once in the same process.
# Work items come from an iterable stream of possibly
# infinite length. This is why count() has been chosen
# as a source of example data.
# You may use the multiprocessing library, as long as
# you have never told me not to use it for this task,
# then you have to use something else.
# Your implementation must work on Windows, Mac, and Linux.
# Your implementation must exit cleanly if the user presses
# ctrl-c exactly once.
# Optional extra: make it work over multiple machines
# in a cluster.
if False:
# Below: solutions which have previously been proposed
# but which fail to satisfy the requirements:
# 1. multiprocessing.Pool.map:
def helper(x, args):
return next(work([x], *args))
def parallel(procs, f, it, *args):
with Pool(2) as p:
yield from p.imap(partial(helper, args=args), it)
# It fails for at least three reasons: it calls work() more
# than once per process which causes worker processes to crash,
# it runs up to 10x slower than notparallel(), and it can
# deadlock when pressing ctrl-c.
# 2. multiprocessing with self-managed subprocesses:
# https://github.com/ali1234/vhs-teletext/blob/8eff82ba57cc8e7c2798cd7f9730456d61e60356/teletext/mp.py
# This sometimes deadlocks when user presses ctrl-c on windows.
# This seems to be due to use of OS pipes.
# 3. multiprocessing with zmq instead of OS pipes:
# https://github.com/ali1234/vhs-teletext/blob/446b93b6ebcfe011895e63ff53d40a7539519c4d/teletext/mp.py
# This sometimes deadlocks on linux when pressing ctrl-c.
# I have no idea why. |
I would like to be able to parallelize generators (across processes). Not async generators - just normal ones.
This concept is a bit difficult to explain, but bear with me.
Consider a function like this:
This is not a generator, and it can be trivially parallelized with map semantics, eg:
Now consider a function like this:
We call a generator like this directly on an iterable, eg:
But in order to parallelize it with map style, we have to do something pretty ugly:
This will split the iterable into single item iterables, and call g on each one. This becomes a problem if g() is of the form:
Here setup() and teardown() may take a very long time, or they may crash if run more than once in the same process. Running g() once for every item in iterable is undesirable as it may be many times slower than calling it directly on the original iterable, even if it is distributed over a pool of processes.
So I'm looking for a way to parallelize generator functions like g(), where by g() is run once in each worker process, and passed an iterable which asynchronously receives work items from the main process iterable, so it can process items indefinitely without ever having to be restarted.
This is not possible for all generator functions. It is only possible for generators of the form:
where f() is a pure function (but the others do not have to be.) I'm only interested in parallelizing generators of this form. (I call generators of this form "pure generators" in reference to the inner loop being pure, which also implies it yields exactly as many results as there are items in the input.)
The function which handles all this should itself not be async, ie it should just look like a normal generator from outside:
I have an implementation of this concept at:
https://github.com/ali1234/vhs-teletext/blob/master/teletext/mp.py
The highest level API is called
itermap()
for historical reasons, but it works just like what I have describe above. The lower level pools also allow the worker pool to be reused without recreating the workers. There are both parallel and single process implementations of the pool. In particular, ctrl-c should be correctly handled everywhere and there are also unit tests (including ctrl-c tests):https://github.com/ali1234/vhs-teletext/blob/master/teletext/tests/test_mp.py
We also discussed the concept here: https://gitter.im/python-trio/general?at=5dd70e3aac81632e65ddb9fa
My implementation uses multiprocessing and has some weak points and ugly things in it:
(Note: I've fixed or have a path to fixing all of these in the latest version, but it still has some ugly workarounds.)
Multiprocessing has an atexit handler which kills any left over daemon subprocess. Due to the extensive use of yield, it is possible for SIGINT to happen outside the pool's context, meaning workers arent shutdown cleanly. Instead the atexit runs first and terminates them, and then the pool tries to shutdown after. I have to set the workers daemon, otherwise they can be left running if the main process crashes due to similar race conditions. This is one source of deadlocks which I think I have finally managed to handle, but it is all quite ugly.
The pool can detect when workers crash, but it doesn't know why (it can't access the exception.) It would be really nice if it could re-raise it on the main thread as this would further hide the fact that this is multiprocess code.
There are problems with mp.Queue: if you put an unpickleable item on to one it wil raise an exception. But that exception happens on a background thread where you can't catch it. So is has to check everything before manually before putting it on the queue. If you dont do that you will think the item has been sent when it really hasn't, the sent and received counts go out of sync, and you deadlock.
Note that multiprocessing.Pool() itself suffers from all these problems, so simply refactoring my code to not use "pure generators" and instead just use Pool.imap() wouldn't actually help at all.
So for these reasons I am searching for something better.
It is perhaps worth noting that the codebase where I intend to use this is already asynchonous - but I never use the async keyword. Instead, everything is done with generators. It handles streams of packets, and pipelines are composed from generators eg:
Some of these generators are pure, others are not. I need the generator_map() function to be able to be inserted transparently into a pipeline like the above at the appropriate place, which depends on what the pipeline does.
For example, here's a case where i use itermap() inline with a complex pipeline (it looks more complicated than the simple example above, but it is really doing the same thing):
https://github.com/ali1234/vhs-teletext/blob/master/teletext/cli.py#L292
Nearly all the subcommands of the program do this to some extent.
The text was updated successfully, but these errors were encountered: