Skip to content

Commit

Permalink
Add nursery.start and nursery.start_soon
Browse files Browse the repository at this point in the history
start_soon is just a new name for spawn, except it doesn't return the
new task (in preparation for python-triogh-136, where we're going to stop
emphasizing task objects in the main api)

start is a major new feature: it provides a very simple way to start
up a long running task, while blocking until it's finished whatever
initialization it wants to do. At least... it's simple from the user's
point of view. Internally it's quite tricky indeed. The whole _run.py
file probably needs some refactoring and splitting up, but this is one
of those cases where I think it's best to first get the new
functionality working and nailed down, and then we can see what shape
the new abstractions should be.

Fixes python-triogh-284.
  • Loading branch information
njsmith committed Aug 21, 2017
1 parent 1cdb8ea commit 63785eb
Show file tree
Hide file tree
Showing 3 changed files with 327 additions and 20 deletions.
183 changes: 163 additions & 20 deletions trio/_core/_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@
# namespaces.
__all__ = [
"Task", "run", "open_nursery", "open_cancel_scope", "yield_briefly",
"current_task", "current_effective_deadline", "yield_if_cancelled"
"current_task", "current_effective_deadline", "yield_if_cancelled",
"STATUS_IGNORED"
]

GLOBAL_RUN_CONTEXT = threading.local()
Expand Down Expand Up @@ -156,6 +157,15 @@ def _remove_task(self, task):
assert task._cancel_stack[-1] is self
task._cancel_stack.pop()

# Used by the nursery.start trickiness
def _tasks_removed_by_adoption(self, tasks):
with self._might_change_effective_deadline():
self._tasks.difference_update(tasks)

# Used by the nursery.start trickiness
def _tasks_added_by_adoption(self, tasks):
self._tasks.update(tasks)

def _make_exc(self):
exc = Cancelled()
exc._scope = self
Expand Down Expand Up @@ -192,6 +202,99 @@ def open_cancel_scope(*, deadline=inf, shield=False):
################################################################


# This code needs to be read alongside the code from Nursery.start to make
# sense.
@attr.s(slots=True, cmp=False, hash=False, repr=False)
class _TaskStatus:
_old_nursery = attr.ib()
_new_nursery = attr.ib()
_called_started = attr.ib(default=False)
_value = attr.ib(default=None)

def __repr__(self):
return "<Task status object at {:#x}>".format(id(self))

def started(self, value=None):
if self._called_started:
raise RuntimeError(
"called 'started' twice on the same task status"
)
self._called_started = True
self._value = value

# If the old nursery is cancelled, then quietly quit now; the child
# will eventually exit on its own, and we don't want to risk moving
# the children into a different scope while they might have
# propagating Cancelled exceptions that assume they're under the old
# scope.
if _pending_cancel_scope(self._old_nursery._cancel_stack) is not None:
return

# if the new nursery is not accepting new children, then inject an
# error into the old nursery (which will implicitly cancel the child)
if self._new_nursery._closed:

async def raise_new_nursery_closed():
raise RuntimeError("Nursery is closed to new arrivals")

self._old_nursery.start_soon(raise_new_nursery_closed)
return

# otherwise, find all the tasks under the old nursery, and move them
# under the new nursery instead. This means:
# - changing parents of direct children
# - changing cancel stack of all direct+indirect children
# - changing cancel stack of all direct+indirect children's nurseries
# - checking for cancellation in all changed cancel stacks
old_stack = self._old_nursery._cancel_stack
new_stack = self._new_nursery._cancel_stack
# LIFO todo stack for depth-first traversal
todo = list(self._old_nursery._children)
munged_tasks = []
while todo:
task = todo.pop()
# Direct children need to be reparented
if task._parent_nursery is self._old_nursery:
self._old_nursery._children.remove(task)
task._parent_nursery = self._new_nursery
self._new_nursery._children.add(task)
# Everyone needs their cancel scopes fixed up...
assert task._cancel_stack[:len(old_stack)] == old_stack
task._cancel_stack[:len(old_stack)] = new_stack
# ...and their nurseries' cancel scopes fixed up.
for nursery in task._child_nurseries:
assert nursery._cancel_stack[:len(old_stack)] == old_stack
nursery._cancel_stack[:len(old_stack)] = new_stack
# And then add all the nursery's children to our todo list
todo.extend(nursery._children)
# And make a note to check for cancellation later
munged_tasks.append(task)

# Tell all the cancel scopes about the change. (There are probably
# some scopes in common between the two stacks, so some scopes will
# get the same tasks removed and then immediately re-added. This is
# fine though.)
for cancel_scope in old_stack:
cancel_scope._tasks_removed_by_adoption(munged_tasks)
for cancel_scope in new_stack:
cancel_scope._tasks_added_by_adoption(munged_tasks)

# That should have removed all the children from the old nursery
assert not self._old_nursery._children

# After all the delicate surgery is done, check for cancellation in
# all the tasks that had their cancel scopes munged. This can trigger
# arbitrary abort() callbacks, so we put it off until our internal
# data structures are all self-consistent again.
for task in munged_tasks:
task._attempt_delivery_of_any_pending_cancel()

# And finally, we cancel the old nursery's scope, so that its
# __aexit__ notices that all the children are gone and it can exit.
# (This is a bit of a hack.)
self._old_nursery.cancel_scope.cancel()


@acontextmanager
@async_generator
@enable_ki_protection
Expand Down Expand Up @@ -223,13 +326,13 @@ async def open_nursery():
# async def __aenter__(self):
# self._scope_manager = open_cancel_scope()
# scope = self._scope_manager.__enter__()
# self._nursery = Nursery(current_task(), scope)
# return self._nursery
# self._parent_nursery = Nursery(current_task(), scope)
# return self._parent_nursery
#
# @enable_ki_protection
# async def __aexit__(self, etype, exc, tb):
# try:
# await self._nursery._clean_up(exc)
# await self._parent_nursery._clean_up(exc)
# except BaseException as new_exc:
# if not self._scope_manager.__exit__(
# type(new_exc), new_exc, new_exc.__traceback__):
Expand All @@ -250,6 +353,7 @@ def __init__(self, parent, cancel_scope):
# the parent task -- only used for introspection, to implement
# task.parent_task
self._parent = parent
parent._child_nurseries.append(self)
# the cancel stack that children inherit - we take a snapshot, so it
# won't be affected by any changes in the parent.
self._cancel_stack = list(parent._cancel_stack)
Expand All @@ -275,9 +379,28 @@ def _child_finished(self, task):
self._zombies.add(task)
self.monitor.put_nowait(task)

def start_soon(self, async_fn, *args, name=None):
GLOBAL_RUN_CONTEXT.runner.spawn_impl(async_fn, args, self, name)

# Returns the task, unlike start_soon
#@deprecated("nursery.spawn", version="0.2.0", "nursery.start_soon")
def spawn(self, async_fn, *args, name=None):
return GLOBAL_RUN_CONTEXT.runner.spawn_impl(async_fn, args, self, name)

async def start(self, async_fn, *args, name=None):
async with open_nursery() as old_nursery:
task_status = _TaskStatus(old_nursery, self)
thunk = functools.partial(async_fn, task_status=task_status)
old_nursery.start_soon(thunk, *args, name=name)
# If we get here, then the child either got reparented or exited
# normally. The complicated logic is all in __TaskStatus.started().
# (Any exceptions propagate directly out of the above.)
if not task_status._called_started:
raise RuntimeError(
"child exited without calling task_status.started()"
)
return task_status._value

def reap(self, task):
try:
self._zombies.remove(task)
Expand Down Expand Up @@ -329,6 +452,8 @@ async def _clean_up(self, pending_exc):
exceptions.append(exc)

self._closed = True
popped = self._parent._child_nurseries.pop()
assert popped is self
if exceptions:
mexc = MultiError(exceptions)
if (pending_exc and mexc.__cause__ is None
Expand Down Expand Up @@ -363,9 +488,22 @@ def __del__(self):
################################################################


def _pending_cancel_scope(cancel_stack):
# Return the outermost exception that is is not outside a shield.
pending_scope = None
for scope in cancel_stack:
# Check shield before _exc, because shield should not block
# processing of *this* scope's exception
if scope.shield:
pending_scope = None
if pending_scope is None and scope.cancel_called:
pending_scope = scope
return pending_scope


@attr.s(slots=True, cmp=False, hash=False, repr=False)
class Task:
_nursery = attr.ib()
_parent_nursery = attr.ib()
coro = attr.ib()
_runner = attr.ib()
name = attr.ib()
Expand All @@ -380,6 +518,9 @@ class Task:
_next_send = attr.ib(default=None)
_abort_func = attr.ib(default=None)

# For introspection and nursery.start()
_child_nurseries = attr.ib(default=attr.Factory(list))

# Task-local values, see _local.py
_locals = attr.ib(default=attr.Factory(dict))

Expand All @@ -399,10 +540,10 @@ def parent_task(self):
Example use case: drawing a visualization of the task tree.
"""
if self._nursery is None:
if self._parent_nursery is None:
return None
else:
return self._nursery._parent
return self._parent_nursery._parent

################
# Monitoring task exit
Expand Down Expand Up @@ -469,16 +610,7 @@ async def wait(self):
_cancel_stack = attr.ib(default=attr.Factory(list), repr=False)

def _pending_cancel_scope(self):
# Return the outermost exception that is is not outside a shield.
pending_scope = None
for scope in self._cancel_stack:
# Check shield before _exc, because shield should not block
# processing of *this* scope's exception
if scope.shield:
pending_scope = None
if pending_scope is None and scope.cancel_called:
pending_scope = scope
return pending_scope
return _pending_cancel_scope(self._cancel_stack)

def _attempt_abort(self, raise_cancel):
# Either the abort succeeds, in which case we will reschedule the
Expand Down Expand Up @@ -757,7 +889,7 @@ def _return_value_looks_like_wrong_library(value):
name = "{}.{}".format(name.__module__, name.__qualname__)
except AttributeError:
name = repr(name)
task = Task(coro=coro, nursery=nursery, runner=self, name=name)
task = Task(coro=coro, parent_nursery=nursery, runner=self, name=name)
self.tasks.add(task)
if nursery is not None:
nursery._children.add(task)
Expand All @@ -784,11 +916,11 @@ def task_exited(self, task, result):
while task._cancel_stack:
task._cancel_stack[-1]._remove_task(task)
self.tasks.remove(task)
if task._nursery is None:
if task._parent_nursery is None:
# the init task should be the last task to exit
assert not self.tasks
else:
task._nursery._child_finished(task)
task._parent_nursery._child_finished(task)
for monitor in task._monitors:
monitor.put_nowait(task)
task._monitors.clear()
Expand Down Expand Up @@ -1443,6 +1575,17 @@ def run_impl(runner, async_fn, args):
################################################################


class _StatusIgnored:
def __repr__(self):
return "STATUS_IGNORED"

def started(self, value=None):
pass


STATUS_IGNORED = _StatusIgnored()


def current_task():
"""Return the :class:`Task` object representing the current task.
Expand Down
5 changes: 5 additions & 0 deletions trio/_core/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ def mock_clock():
return MockClock()


@pytest.fixture
def autojump_clock():
return MockClock(autojump_threshold=0)


# FIXME: split off into a package (or just make part of trio's public
# interface?), with config file to enable? and I guess a mark option too; I
# guess it's useful with the class- and file-level marking machinery (where
Expand Down
Loading

0 comments on commit 63785eb

Please sign in to comment.