Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UniversalQueue ala curio #449

Closed
miracle2k opened this issue Feb 21, 2018 · 6 comments
Closed

UniversalQueue ala curio #449

miracle2k opened this issue Feb 21, 2018 · 6 comments

Comments

@miracle2k
Copy link
Contributor

I need to run a consumer in an OS thread. I want to communicate with a queue. curio tells me I should use their UniversalQueue, the purpose being that it blocks correctly from within curio and from within the thread.

I can't find an equivalent in trio (maybe I missed it?).

@smurfix
Copy link
Contributor

smurfix commented Feb 21, 2018

@njsmith
Copy link
Member

njsmith commented Feb 22, 2018

Yeah, the simplest approach is to create a trio.BlockingTrioPortal, which will let you call whatever Trio functions and methods you want from your thread -- in particular, but not restricted to, trio.Queue.put and trio.Queue.get.

Trio doesn't have Curio's feature of magically introspecting the calling context to automatically decide which functions should be wrapped in a call to BlockingTrioPortal.run -- I think it's too magical and fragile to belong in a foundational library like this. (If you look at Curio's code for this, it involves things like doing string comparisons on the names of stack frames.) If you find calling portal.run all the time too tedious, though, you can make a little helper class like:

class BlockingQueueWrapper:
    def __init__(self, queue, portal):
        self.queue = queue
        self.portal = portal

    def put(self, value):
        self.portal.run(self.queue.put, value)

    def get(self):
        return self.portal.run(self.queue.get, value)

    def put_nowait(self, value):
        self.portal.run_sync(self.queue.put_nowait, value)

    def get_nowait(self):
        return self.portal.run_sync(self.queue.get_nowait)

Then you can do something like:

queue = trio.Queue()
portal = trio.BlockingTrioPortal()
blocking_queue = BlockingQueueWrapper(queue, portal)
await run_sync_in_worker_thread(my_func, blocking_queue)

and my_func can freely use blocking_queue.get, .put, .get_nowait, .put_nowait.

Or maybe it's possible to automatically generate class wrappers like this, so we could have blocking_queue = portal.autowrap(queue)? I dunno, if you come up with something cool then let us know, maybe it should go into trio or into a utility library or something :-). We're still feeling out the best way to expose all this functionality.

@njsmith
Copy link
Member

njsmith commented Feb 22, 2018

Does that help?

@miracle2k
Copy link
Contributor Author

Yes that helps, thanks. I actually did end up doing the reverse, wrapping the stdlib Queue and calling await trio.run_sync_in_worker_thread(lambda: self.queue.put(item)) for the put, but I suspect that the portal way may be more performant.

@njsmith
Copy link
Member

njsmith commented Feb 22, 2018

The portal way will certainly use less worker threads; I can't say if that will be more performant or not without measuring :-).

The other difference to be aware of is that when you're using the queue from trio, then the trio.Queue operations integrate with trio's cancellation system, so for example you can wake up a task that's blocked on trio.Queue.get. This can be handy sometimes, and may also make your program better behaved if you hit control-C or have an unexpected error. run_sync_in_worker_thread is useful but it is one of the only operations in trio that's not cancellable.

@miracle2k
Copy link
Contributor Author

Thanks. By the way, I found your articles on async extremely insightful, and the way that trio really thinks through the various async control flow issues is just very attractive.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants