Skip to content

Commit

Permalink
Merge pull request #324 from njsmith/rename-call_soon
Browse files Browse the repository at this point in the history
Renames and deprecations for #68
  • Loading branch information
njsmith authored Sep 16, 2017
2 parents 585ab4e + e2df899 commit 1075239
Show file tree
Hide file tree
Showing 17 changed files with 530 additions and 358 deletions.
3 changes: 3 additions & 0 deletions docs/source/history.rst
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ will work but complain loudly, and won't work in 0.3.0):
``run_in_worker_thread`` → ``run_sync_in_worker_thread``
``nursery.spawn`` → ``nursery.start_soon``

``current_call_soon_thread_and_signal_safe`` → :class:`trio.hazmat.TrioToken`
``run_in_trio_thread``, ``await_in_trio_thread`` → :class:`trio.BlockingTrioPortal`

deprecated big chunks of nursery and Task API


Expand Down
56 changes: 11 additions & 45 deletions docs/source/reference-core.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1314,9 +1314,8 @@ for working with real, operating-system level,
:mod:`threading`\-module-style threads. First, if you're in Trio but
need to push some blocking I/O into a thread, there's
:func:`run_sync_in_worker_thread`. And if you're in a thread and need
to communicate back with trio, there's the closely related
:func:`current_run_in_trio_thread` and
:func:`current_await_in_trio_thread`.
to communicate back with trio, you can use a
:class:`BlockingTrioPortal`.


Trio's philosophy about managing worker threads
Expand Down Expand Up @@ -1452,40 +1451,8 @@ Putting blocking I/O into worker threads
Getting back into the trio thread from another thread
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. function:: current_run_in_trio_thread
current_await_in_trio_thread

Call these from inside a trio run to get a reference to the current
run's :func:`run_in_trio_thread` or :func:`await_in_trio_thread`:

.. function:: run_in_trio_thread(sync_fn, *args)
:module:
.. function:: await_in_trio_thread(async_fn, *args)
:module:

These functions schedule a call to ``sync_fn(*args)`` or ``await
async_fn(*args)`` to happen in the main trio thread, wait for it to
complete, and then return the result or raise whatever exception it
raised.

These are the *only* non-hazmat functions that interact with the
trio run loop and that can safely be called from a different thread
than the one that called :func:`trio.run`. These two functions
*must* be called from a different thread than the one that called
:func:`trio.run`. (After all, they're blocking functions!)

.. warning::

If the relevant call to :func:`trio.run` finishes while a call
to ``await_in_trio_thread`` is in progress, then the call to
``async_fn`` will be :ref:`cancelled <cancellation>` and the
resulting :exc:`~trio.Cancelled` exception may propagate out of
``await_in_trio_thread`` and into the calling thread. You should
be prepared for this.

:raises RunFinishedError: If the corresponding call to
:func:`trio.run` has already completed.

.. autoclass:: BlockingTrioPortal
:members:

This will probably be clearer with an example. Here we demonstrate how
to spawn a child thread, and then use a :class:`trio.Queue` to send
Expand All @@ -1494,28 +1461,27 @@ messages between the thread and a trio task::
import trio
import threading

def thread_fn(await_in_trio_thread, request_queue, response_queue):
def thread_fn(portal, request_queue, response_queue):
while True:
# Since we're in a thread, we can't call trio.Queue methods
# directly -- so we use await_in_trio_thread to call them.
request = await_in_trio_thread(request_queue.get)
# directly -- so we use our portal to call them.
request = portal.run(request_queue.get)
# We use 'None' as a request to quit
if request is not None:
response = request + 1
await_in_trio_thread(response_queue.put, response)
portal.run(response_queue.put, response)
else:
# acknowledge that we're shutting down, and then do it
await_in_trio_thread(response_queue.put, None)
portal.run(response_queue.put, None)
return

async def main():
# Get a reference to the await_in_trio_thread function
await_in_trio_thread = trio.current_await_in_trio_thread()
portal = trio.BlockingTrioPortal()
request_queue = trio.Queue(1)
response_queue = trio.Queue(1)
thread = threading.Thread(
target=thread_fn,
args=(await_in_trio_thread, request_queue, response_queue))
args=(portal, request_queue, response_queue))
thread.start()

# prints "1"
Expand Down
9 changes: 6 additions & 3 deletions docs/source/reference-hazmat.rst
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,13 @@ Global state: system tasks and run-local storage
.. autofunction:: spawn_system_task


Entering trio from external threads or signal handlers
======================================================
Trio tokens
===========

.. autofunction:: current_call_soon_thread_and_signal_safe
.. autoclass:: TrioToken()
:members:

.. autofunction:: current_trio_token


Safer KeyboardInterrupt handling
Expand Down
1 change: 1 addition & 0 deletions trio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
# Having the public path in .__module__ attributes is important for:
# - exception names in printed tracebacks
# - sphinx :show-inheritance:
# - deprecation warnings
# - pickle
# - probably other stuff
from ._util import fixup_module_metadata
Expand Down
3 changes: 3 additions & 0 deletions trio/_core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ def _public(fn):
from ._run import *
__all__ += _run.__all__

from ._entry_queue import *
__all__ += _entry_queue.__all__

from ._parking_lot import *
__all__ += _parking_lot.__all__

Expand Down
196 changes: 196 additions & 0 deletions trio/_core/_entry_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
from collections import deque
import threading

import attr

from .. import _core
from ._wakeup_socketpair import WakeupSocketpair

__all__ = ["TrioToken"]


@attr.s
class EntryQueue:
# This used to use a queue.Queue. but that was broken, because Queues are
# implemented in Python, and not reentrant -- so it was thread-safe, but
# not signal-safe. deque is implemented in C, so each operation is atomic
# WRT threads (and this is guaranteed in the docs), AND each operation is
# atomic WRT signal delivery (signal handlers can run on either side, but
# not *during* a deque operation). dict makes similar guarantees - and on
# CPython 3.6 and PyPy, it's even ordered!
queue = attr.ib(default=attr.Factory(deque))
idempotent_queue = attr.ib(default=attr.Factory(dict))

wakeup = attr.ib(default=attr.Factory(WakeupSocketpair))
done = attr.ib(default=False)
# Must be a reentrant lock, because it's acquired from signal handlers.
# RLock is signal-safe as of cpython 3.2. NB that this does mean that the
# lock is effectively *disabled* when we enter from signal context. The
# way we use the lock this is OK though, because when
# run_sync_soon is called from a signal it's atomic WRT the
# main thread -- it just might happen at some inconvenient place. But if
# you look at the one place where the main thread holds the lock, it's
# just to make 1 assignment, so that's atomic WRT a signal anyway.
lock = attr.ib(default=attr.Factory(threading.RLock))

async def task(self):
assert _core.currently_ki_protected()
# RLock has two implementations: a signal-safe version in _thread, and
# and signal-UNsafe version in threading. We need the signal safe
# version. Python 3.2 and later should always use this anyway, but,
# since the symptoms if this goes wrong are just "weird rare
# deadlocks", then let's make a little check.
# See:
# https://bugs.python.org/issue13697#msg237140
assert self.lock.__class__.__module__ == "_thread"

def run_cb(job):
# We run this with KI protection enabled; it's the callback's
# job to disable it if it wants it disabled. Exceptions are
# treated like system task exceptions (i.e., converted into
# TrioInternalError and cause everything to shut down).
sync_fn, args = job
try:
sync_fn(*args)
except BaseException as exc:

async def kill_everything(exc):
raise exc

_core.spawn_system_task(kill_everything, exc)
return True

# This has to be carefully written to be safe in the face of new items
# being queued while we iterate, and to do a bounded amount of work on
# each pass:
def run_all_bounded():
for _ in range(len(self.queue)):
run_cb(self.queue.popleft())
for job in list(self.idempotent_queue):
del self.idempotent_queue[job]
run_cb(job)

try:
while True:
run_all_bounded()
if not self.queue and not self.idempotent_queue:
await self.wakeup.wait_woken()
else:
await _core.checkpoint()
except _core.Cancelled:
# Keep the work done with this lock held as minimal as possible,
# because it doesn't protect us against concurrent signal delivery
# (see the comment above). Notice that this code would still be
# correct if written like:
# self.done = True
# with self.lock:
# pass
# because all we want is to force run_sync_soon
# to either be completely before or completely after the write to
# done. That's why we don't need the lock to protect
# against signal handlers.
with self.lock:
self.done = True
# No more jobs will be submitted, so just clear out any residual
# ones:
run_all_bounded()
assert not self.queue
assert not self.idempotent_queue

def close(self):
self.wakeup.close()

def size(self):
return len(self.queue) + len(self.idempotent_queue)

def spawn(self):
name = "<TrioToken.run_sync_soon task>"
_core.spawn_system_task(self.task, name=name)

def run_sync_soon(self, sync_fn, *args, idempotent=False):
with self.lock:
if self.done:
raise _core.RunFinishedError("run() has exited")
# We have to hold the lock all the way through here, because
# otherwise the main thread might exit *while* we're doing these
# calls, and then our queue item might not be processed, or the
# wakeup call might trigger an OSError b/c the IO manager has
# already been shut down.
if idempotent:
self.idempotent_queue[(sync_fn, args)] = None
else:
self.queue.append((sync_fn, args))
self.wakeup.wakeup_thread_and_signal_safe()


class TrioToken:
"""An opaque object representing a single call to :func:`trio.run`.
It has no public constructor; instead, see :func:`current_trio_token`.
This object has two uses:
1. It lets you re-enter the Trio run loop from external threads or signal
handlers. This is the low-level primitive that
:func:`trio.run_sync_in_worker_thread` uses to receive results from
worker threads, that :func:`trio.catch_signals` uses to receive
notifications about signals, and so forth.
2. Each call to :func:`trio.run` has exactly one associated
:class:`TrioToken` object, so you can use it to identify a particular
call.
"""

def __init__(self, reentry_queue):
self._reentry_queue = reentry_queue

def run_sync_soon(self, sync_fn, *args, idempotent=False):
"""Schedule a call to ``sync_fn(*args)`` to occur in the context of a
trio task.
This is safe to call from the main thread, from other threads, and
from signal handlers. This is the fundamental primitive used to
re-enter the Trio run loop from outside of it.
The call will happen "soon", but there's no guarantee about exactly
when, and no mechanism provided for finding out when it's happened.
If you need this, you'll have to build your own.
The call is effectively run as part of a system task (see
:func:`~trio.hazmat.spawn_system_task`). In particular this means
that:
* :exc:`KeyboardInterrupt` protection is *enabled* by default; if
you want ``sync_fn`` to be interruptible by control-C, then you
need to use :func:`~trio.hazmat.disable_ki_protection`
explicitly.
* If ``sync_fn`` raises an exception, then it's converted into a
:exc:`~trio.TrioInternalError` and *all* tasks are cancelled. You
should be careful that ``sync_fn`` doesn't crash.
All calls with ``idempotent=False`` are processed in strict
first-in first-out order.
If ``idempotent=True``, then ``sync_fn`` and ``args`` must be
hashable, and trio will make a best-effort attempt to discard any
call submission which is equal to an already-pending call. Trio
will make an attempt to process these in first-in first-out order,
but no guarantees. (Currently processing is FIFO on CPython 3.6 and
PyPy, but not CPython 3.5.)
Any ordering guarantees apply separately to ``idempotent=False``
and ``idempotent=True`` calls; there's no rule for how calls in the
different categories are ordered with respect to each other.
:raises trio.RunFinishedError:
if the associated call to :func:`trio.run`
has already exited. (Any call that *doesn't* raise this error
is guaranteed to be fully processed before :func:`trio.run`
exits.)
"""
self._reentry_queue.run_sync_soon(
sync_fn, *args, idempotent=idempotent
)
Loading

0 comments on commit 1075239

Please sign in to comment.