diff --git a/docs/source/design.rst b/docs/source/design.rst index cc445569ea..256350d601 100644 --- a/docs/source/design.rst +++ b/docs/source/design.rst @@ -309,7 +309,7 @@ where it motivates the use of "nurseries":: async def parent(): async with trio.open_nursery() as nursery: - nursery.spawn(child) + nursery.start_soon(child) (See :ref:`tasks` for full details.) @@ -326,7 +326,7 @@ In `the blog post `__ I called out a nice feature of curio's spawning API, which is that since spawning is the only way to break causality, and in curio -``spawn`` is async, this means that in curio sync functions are +``spawn`` is async, which means that in curio sync functions are guaranteed to be causal. One limitation though is that this invariant is actually not very predictive: in curio there are lots of async functions that could spawn off children and violate causality, but @@ -338,11 +338,11 @@ one. In trio: * Sync functions can't create nurseries, because nurseries require an ``async with`` -* Any async function can create a nursery and spawn new tasks... but - creating a nursery *allows task spawning without allowing causality - breaking*, because the children have to exit before the function is - allowed to return. So we can preserve causality without having to - give up concurrency! +* Any async function can create a nursery and start new tasks... but + creating a nursery *allows task starting but does not permit + causality breaking*, because the children have to exit before the + function is allowed to return. So we can preserve causality without + having to give up concurrency! * The only way to violate causality (which is an important feature, just one that needs to be handled carefully) is to explicitly create @@ -417,9 +417,9 @@ Specific style guidelines and the ``nowait`` version raises :exc:`trio.WouldBlock` if it would block. * The word ``monitor`` is used for APIs that involve an - :class:`UnboundedQueue` receiving some kind of events. (Examples: - nursery ``.monitor`` attribute, some of the low-level I/O functions in - :mod:`trio.hazmat`.) + :class:`trio.hazmat.UnboundedQueue` receiving some kind of events. + (Examples: nursery ``.monitor`` attribute, some of the low-level I/O + functions in :mod:`trio.hazmat`.) * ...we should, but currently don't, have a solid convention to distinguish between functions that take an async callable and those @@ -466,7 +466,7 @@ There are three notable sub-modules that are largely independent of the rest of trio, and could (possibly should?) be extracted into their own independent packages: -* ``_result.py``: Defines :class:`Result`. +* ``_result.py``: Defines :class:`~trio.hazmat.Result`. * ``_multierror.py``: Implements :class:`MultiError` and associated infrastructure. @@ -478,8 +478,9 @@ The most important submodule, where everything is integrated, is ``_run.py``. (This is also by far the largest submodule; it'd be nice to factor bits of it out with possible, but it's tricky because the core functionality genuinely is pretty intertwined.) Notably, this is -where cancel scopes, nurseries, and :class:`Task` are defined; it's -also where the scheduler state and :func:`trio.run` live. +where cancel scopes, nurseries, and :class:`~trio.hazmat.Task` are +defined; it's also where the scheduler state and :func:`trio.run` +live. The one thing that *isn't* in ``_run.py`` is I/O handling. This is delegated to an ``IOManager`` class, of which there are currently diff --git a/docs/source/reference-core.rst b/docs/source/reference-core.rst index a39681ceed..687b2d37c5 100644 --- a/docs/source/reference-core.rst +++ b/docs/source/reference-core.rst @@ -640,17 +640,17 @@ walk *and* chew gum, this is the section for you. Nurseries and spawning ~~~~~~~~~~~~~~~~~~~~~~ -Most libraries for concurrent programming let you spawn new child +Most libraries for concurrent programming let you start new child tasks (or threads, or whatever) willy-nilly, whenever and where-ever -you feel like it. Trio is a bit different: you can't spawn a child +you feel like it. Trio is a bit different: you can't start a child task unless you're prepared to be a responsible parent. The way you demonstrate your responsibility is by creating a nursery:: async with trio.open_nursery() as nursery: ... -And once you have a reference to a nursery object, you can spawn -children into that nursery:: +And once you have a reference to a nursery object, you can start +children in that nursery:: async def child(): ... @@ -658,8 +658,8 @@ children into that nursery:: async def parent(): async with trio.open_nursery() as nursery: # Make two concurrent calls to child() - nursery.spawn(child) - nursery.spawn(child) + nursery.start_soon(child) + nursery.start_soon(child) This means that tasks form a tree: when you call :func:`run`, then this creates an initial task, and all your other tasks will be @@ -693,24 +693,6 @@ of this is that :func:`run` can't finish until all tasks have finished. -Getting results from child tasks -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -The ``spawn`` method returns a :class:`Task` object that can be used -for various things – and in particular, for retrieving the task's -return value. Example:: - - async def child_fn(x): - return 2 * x - - async with trio.open_nursery() as nursery: - child_task = nursery.spawn(child_fn, 3) - # We've left the nursery, so we know child_task has completed - assert child_task.result.unwrap() == 6 - -See :attr:`Task.result` and :class:`Result` for more details. - - Child tasks and cancellation ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -720,17 +702,17 @@ expires:: with move_on_after(TIMEOUT): async with trio.open_nursery() as nursery: - nursery.spawn(child1) - nursery.spawn(child2) + nursery.start_soon(child1) + nursery.start_soon(child2) Note that what matters here is the scopes that were active when :func:`open_nursery` was called, *not* the scopes active when -``spawn`` is called. So for example, the timeout block below does +``start_soon`` is called. So for example, the timeout block below does nothing at all:: async with trio.open_nursery() as nursery: with move_on_after(TIMEOUT): # don't do this! - nursery.spawn(child) + nursery.start_soon(child) Errors in multiple child tasks @@ -750,8 +732,8 @@ limitation. Consider code like:: async def parent(): async with trio.open_nursery() as nursery: - nursery.spawn(broken1) - nursery.spawn(broken2) + nursery.start_soon(broken1) + nursery.start_soon(broken2) ``broken1`` raises ``KeyError``. ``broken2`` raises ``IndexError``. Obviously ``parent`` should raise some error, but @@ -773,22 +755,22 @@ How to be a good parent task Supervising child tasks is a full time job. If you want your program to do two things at once, then don't expect the parent task to do one -while a child task does another – instead, spawn two children and let +while a child task does another – instead, start two children and let the parent focus on managing them. So, don't do this:: # bad idea! async with trio.open_nursery() as nursery: - nursery.spawn(walk) + nursery.start_soon(walk) await chew_gum() Instead, do this:: # good idea! async with trio.open_nursery() as nursery: - nursery.spawn(walk) - nursery.spawn(chew_gum) + nursery.start_soon(walk) + nursery.start_soon(chew_gum) # now parent task blocks in the nursery cleanup code The difference between these is that in the first example, if ``walk`` @@ -802,7 +784,7 @@ Spawning tasks without becoming a parent Sometimes it doesn't make sense for the task that spawns a child to take on responsibility for watching it. For example, a server task may -want to spawn a new task for each connection, but it can't listen for +want to start a new task for each connection, but it can't listen for connections and supervise children at the same time. The solution here is simple once you see it: there's no requirement @@ -812,26 +794,26 @@ code like this:: async def new_connection_listener(handler, nursery): while True: conn = await get_new_connection() - nursery.spawn(handler, conn) + nursery.start_soon(handler, conn) async def server(handler): async with trio.open_nursery() as nursery: - nursery.spawn(new_connection_listener, handler, nursery) + nursery.start_soon(new_connection_listener, handler, nursery) Now ``new_connection_listener`` can focus on handling new connections, while its parent focuses on supervising both it and all the individual connection handlers. And remember that cancel scopes are inherited from the nursery, -**not** from the task that calls ``spawn``. So in this example, the -timeout does *not* apply to ``child`` (or to anything else):: +**not** from the task that calls ``start_soon``. So in this example, +the timeout does *not* apply to ``child`` (or to anything else):: async with do_spawn(nursery): with move_on_after(TIMEOUT): # don't do this, it has no effect - nursery.spawn(child) + nursery.start_soon(child) async with trio.open_nursery() as nursery: - nursery.spawn(do_spawn, nursery) + nursery.start_soon(do_spawn, nursery) Custom supervisors @@ -841,37 +823,47 @@ The default cleanup logic is often sufficient for simple cases, but what if you want a more sophisticated supervisor? For example, maybe you have `Erlang envy `__ and want features like automatic restart of crashed tasks. Trio itself -doesn't provide such a feature, but the nursery interface is designed -to give you all the tools you need to build such a thing, while -enforcing basic hygiene (e.g., it's not possible to build a supervisor -that exits and leaves orphaned tasks behind). And then hopefully -you'll wrap your fancy supervisor up in a library and put it on PyPI, -because building custom supervisors is a challenging task that most -people don't want to deal with! - -For simple custom supervisors, it's often possible to lean on the -default nursery logic to take care of annoying details. For example, -here's a function that takes a list of functions, runs them all -concurrently, and returns the result from the one that finishes -first:: +doesn't provide these kinds of features, but you can build them on +top; Trio's goal is to enforce basic hygiene and then get out of your +way. (Specifically: Trio won't let you build a supervisor that exits +and leaves orphaned tasks behind, and if you have an unhandled +exception due to bugs or laziness then Trio will make sure they +propagate.) And then you can wrap your fancy supervisor up in a +library and put it on PyPI, because supervisors are tricky and there's +no reason everyone should have to write their own. + +For example, here's a function that takes a list of functions, runs +them all concurrently, and returns the result from the one that +finishes first:: + + # XX this example can be simplified a little after #136 is fixed in 0.3.0 async def race(*async_fns): if not async_fns: raise ValueError("must pass at least one argument") + + async def racecar(results, async_fn, cancel_scope): + result = await async_fn() + results.append(result) + cancel_scope.cancel() + async with trio.open_nursery() as nursery: + results = [] + cancel_scope = nursery.cancel_scope for async_fn in async_fns: - nursery.spawn(async_fn) - task_batch = await nursery.monitor.get_batch() - nursery.cancel_scope.cancel() - finished_task = task_batch[0] - return nursery.reap_and_unwrap(finished_task) + nursery.start_soon(racecar, results, async_fn, cancel_scope) + + return results[0] -This works by waiting until at least one task has finished, then -cancelling all remaining tasks and returning the result from the first -task. This implicitly invokes the default logic to take care of all -the other tasks, so it blocks to wait for the cancellation to finish, -and if any of them raise errors in the process it will propagate -those. +This works by starting a set of racecar tasks which each try to run +their function, report back, and then cancel all the rest. Eventually +one suceeds, all the tasks are cancelled and exit, and then our +nursery exits and we return the winning value. And if one or more of +them raises an unhandled exception then Trio's normal handling kicks +in: it cancels the others and then propagates the exception. If you +wanted different behavior, you could do that by adding a ``try`` block +to the ``racecar`` function to catch exceptions and handle them +however you like. Task-related API details @@ -887,14 +879,23 @@ Nursery objects provide the following interface: .. interface:: The nursery interface - .. method:: spawn(async_fn, *args, name=None) + .. method:: start_soon(async_fn, *args, name=None) + + Creates a new child task inside this nursery, and sets it up to + run ``await async_fn(*args)``. - Runs ``await async_fn(*args)`` in a new child task inside this nursery. + This and :meth:`start` are the two fundamental methods for + creating concurrent tasks in trio. - This is *the* method for creating concurrent tasks in trio. + Note that this is a synchronous function: it sets up the new + task, but then returns immediately, *before* it has a chance to + run. It won't actually run until some later point when you + execute a checkpoint and the scheduler decides to run it. If you + need to wait for the task to initialize itself before + continuing, see :meth:`start`. It's possible to pass a nursery object into another task, which - allows that task to spawn new tasks into the first task's + allows that task to start new child tasks in the first task's nursery. The child task inherits its parent nursery's cancel scopes. @@ -906,20 +907,15 @@ Nursery objects provide the following interface: :param name: The name for this task. Only used for debugging/introspection (e.g. ``repr(task_obj)``). If this isn't a string, - :meth:`spawn` will try to make it one. A common use - case is if you're wrapping a function before - spawning a new task, you might pass the original - function as the ``name=`` to make debugging easier. - :return: the newly spawned task - :rtype trio.Task: + :meth:`start_soon` will try to make it one. A + common use case is if you're wrapping a function + before spawning a new task, you might pass the + original function as the ``name=`` to make + debugging easier. :raises RuntimeError: If this nursery is no longer open (i.e. its ``async with`` block has exited). - .. method:: start_soon(async_fn, *args, name=None) - - Like :meth:`spawn`, but doesn't return the newly created task. - .. method:: start(async_fn, *args, name=None) :async: @@ -970,102 +966,24 @@ Nursery objects provide the following interface: other things, e.g. if you want to explicitly cancel all children in response to some external event. - The remaining attributes and methods are mainly used for - implementing new types of task supervisor: - - .. attribute:: monitor - - A :class:`~trio.UnboundedQueue` which receives each child - :class:`~trio.Task` object when it exits. - - It also receives a ``None`` value after each call to - :meth:`start`. + The last two attributes are mainly to enable introspection of the + task tree, for example in debuggers. - .. attribute:: children + .. attribute:: parent_task - A :class:`frozenset` containing all the child - :class:`~trio.Task` objects which are still running. + The :class:`~trio.hazmat.Task` that opened this nursery. - .. attribute:: zombies + .. attribute:: child_tasks A :class:`frozenset` containing all the child - :class:`~trio.Task` objects which have exited, but not yet been - reaped. - - .. method:: reap(task) - - Removes the given task from the :attr:`zombies` set. - - Calling this method indicates to the nursery that you have taken - care of any cleanup needed. In particular, if the task exited - with an exception and you don't call this method, then - ``__aexit__`` will eventually re-raise that exception. If you do - call this method, then ``__aexit__`` won't do anything. + :class:`~trio.hazmat.Task` objects which are still running. - Once you call this method, then as far as trio is concerned the - :class:`~trio.Task` object no longer exists. You can hold onto a - reference to it as long as you like, but trio no longer has any - record of it. - - :raises ValueError: If the given ``task`` is not in :attr:`zombies`. - - .. method:: reap_and_unwrap(task) - - A convenience shorthand for:: - - nursery.reap(task) - return task.result.unwrap() .. attribute:: STATUS_IGNORED See :meth:`~The nursery interface.start`. -Task object API -+++++++++++++++ - -.. autofunction:: current_task() - -.. class:: Task() - - A :class:`Task` object represents a concurrent "thread" of - execution. - - .. attribute:: result - - If this :class:`Task` is still running, then its :attr:`result` - attribute is ``None``. (This can be used to check whether a task - has finished running.) - - Otherwise, this is a :class:`Result` object holding the value - returned or the exception raised by the async function that was - spawned to create this task. - - The next three methods allow another task to monitor the result of - this task, even if it isn't supervising it: - - .. automethod:: wait - - .. automethod:: add_monitor - - .. automethod:: discard_monitor - - The remaining members are mostly useful for introspection and - debugging: - - .. attribute:: name - - String containing this :class:`Task`\'s name. Usually (but not - always) the name of the function this :class:`Task` is running. - - .. attribute:: coro - - This task's coroutine object. Example usage: extracting a stack - trace. - - .. autoattribute:: parent_task - - Working with :exc:`MultiError`\s ++++++++++++++++++++++++++++++++ @@ -1134,25 +1052,6 @@ you return a new exception object, then the new object's exception. -Result objects -++++++++++++++ - -.. autoclass:: Result - :members: - -.. autoclass:: Value - :members: - -.. autoclass:: Error - :members: - -.. note:: - - Since :class:`Result` objects are simple immutable data structures - that don't otherwise interact with the trio machinery, it's safe to - create and access :class:`Result` objects from any thread you like. - - Task-local storage ------------------ @@ -1306,8 +1205,8 @@ you'll see that the two tasks politely take turns:: async def main(): async with trio.open_nursery() as nursery: lock = trio.Lock() - nursery.spawn(loopy_child, 1, lock) - nursery.spawn(loopy_child, 2, lock) + nursery.start_soon(loopy_child, 1, lock) + nursery.start_soon(loopy_child, 2, lock) trio.run(main) @@ -1319,13 +1218,15 @@ Broadcasting an event with :class:`Event` :members: -Passing messages with :class:`Queue` and :class:`UnboundedQueue` -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +.. _queue: -Trio provides two types of queues suitable for different -purposes. Where they differ is in their strategies for handling flow -control. Here's a toy example to demonstrate the problem. Suppose we -have a queue with two producers and one consumer:: +Passing messages with :class:`Queue` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +You can use :class:`Queue` objects to safely pass objects between +tasks. Trio :class:`Queue` objects always have a bounded size. Here's +a toy example to demonstrate why this is important. Suppose we have a +queue with two producers and one consumer:: async def producer(queue): while True: @@ -1336,16 +1237,16 @@ have a queue with two producers and one consumer:: print(await queue.get()) async def main(): - # Trio's actual queue classes have countermeasures to prevent - # this example from working, so imagine we have some sort of - # platonic ideal of a queue here + # This example won't work with Trio's actual Queue class, so + # imagine we have some sort of platonic ideal of an unbounded + # queue here: queue = trio.HypotheticalQueue() async with trio.open_nursery() as nursery: # Two producers - nursery.spawn(producer, queue) - nursery.spawn(producer, queue) + nursery.start_soon(producer, queue) + nursery.start_soon(producer, queue) # One consumer - nursery.spawn(consumer, queue) + nursery.start_soon(consumer, queue) trio.run(main) @@ -1356,41 +1257,14 @@ we add two items to the queue but only remove one, then over time the queue size grows arbitrarily large, our latency is terrible, we run out of memory, it's just generally bad news all around. -There are two potential strategies for avoiding this problem. - -The preferred solution is to apply *backpressure*. If our queue starts -getting too big, then we can make the producers slow down by having -``put`` block until ``get`` has had a chance to remove an item. This -is the strategy used by :class:`trio.Queue`. - -The other possibility is for the queue consumer to get greedy: each -time it runs, it could eagerly consume all of the pending items before -allowing another task to run. (In some other systems, this would -happen automatically because their queue's ``get`` method doesn't -invoke the scheduler unless it has to block. But :ref:`in trio, get is -always a checkpoint `.) This would work, but it's a -bit risky: basically instead of applying backpressure to specifically -the producer tasks, we're applying it to *all* the tasks in our -system. The danger here is that if enough items have built up in the -queue, then "stopping the world" to process them all may cause -unacceptable latency spikes in unrelated tasks. Nonetheless, this is -still the right choice in situations where it's impossible to apply -backpressure more precisely. For example, when monitoring exiting -tasks, blocking tasks from reporting their death doesn't really -accomplish anything – the tasks are taking up memory either way, -etc. (In this particular case it `might be possible to do better -`__, but in general the -principle holds.) So this is the strategy implemented by -:class:`trio.UnboundedQueue`. - -tl;dr: use :class:`Queue` if you can. +By placing an upper bound on our queue's size, we avoid this problem. +If the queue gets too big, then it applies *backpressure*: ``put`` +blocks and forces the producers to slow down and wait until the +consumer calls ``get``. .. autoclass:: Queue :members: -.. autoclass:: UnboundedQueue - :members: - Lower-level synchronization primitives ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -1666,8 +1540,8 @@ Debugging and instrumentation ----------------------------- Trio tries hard to provide useful hooks for debugging and -instrumentation. Some are documented above (:attr:`Task.name`, -:meth:`Queue.statistics`, etc.). Here are some more: +instrumentation. Some are documented above (the nursery introspection +attributes, :meth:`Queue.statistics`, etc.). Here are some more: Global statistics diff --git a/docs/source/reference-core/tasklocal-example.py b/docs/source/reference-core/tasklocal-example.py index 369861b3ab..897c68a0f4 100644 --- a/docs/source/reference-core/tasklocal-example.py +++ b/docs/source/reference-core/tasklocal-example.py @@ -19,8 +19,8 @@ async def handle_request(tag): log("Request handler started") await trio.sleep(random.random()) async with trio.open_nursery() as nursery: - nursery.spawn(concurrent_helper, "a") - nursery.spawn(concurrent_helper, "b") + nursery.start_soon(concurrent_helper, "a") + nursery.start_soon(concurrent_helper, "b") await trio.sleep(random.random()) log("Request received finished") @@ -34,6 +34,6 @@ async def concurrent_helper(job): async def main(): async with trio.open_nursery() as nursery: for i in range(3): - nursery.spawn(handle_request, i) + nursery.start_soon(handle_request, i) trio.run(main) diff --git a/docs/source/reference-hazmat.rst b/docs/source/reference-hazmat.rst index 9742e62f01..150ed83559 100644 --- a/docs/source/reference-hazmat.rst +++ b/docs/source/reference-hazmat.rst @@ -1,36 +1,52 @@ -========================================= - Low-level operations in ``trio.hazmat`` -========================================= +======================================================= + Introspecting and extending Trio with ``trio.hazmat`` +======================================================= .. module:: trio.hazmat .. warning:: - ⚠️ DANGER DANGER DANGER ⚠️ - You probably don't want to use this module. -The :mod:`trio.hazmat` API is public and stable (or at least, `as -stable as anything in trio is! -`__), but it has `nasty -big pointy teeth +:mod:`trio.hazmat` contains APIs useful for introspecting and +extending Trio. If you're writing ordinary, everyday code, then you +can ignore this module completely. But sometimes it's what you need. +Here are some examples of situations where you should reach for +:mod:`trio.hazmat`: + +* You want to implement a new :ref:`synchronization primitive + ` that Trio doesn't (yet) provide, like a + reader-writer lock. +* You want to extract low-level metrics to monitor the health of your + application. +* You're wrapping some low-level operating system interfaces that Trio + doesn't (yet) expose, like watching a filesystem directory for + changes. +* You want to implement an interface for calling between Trio and + another event loop within the same process. +* You're writing a debugger and want to visualize Trio's task tree. +* You need to interoperate with a C library whose API exposes raw file + descriptors. + +Using :mod:`trio.hazmat` is perfectly safe, *if* you take proper +precautions. In fact, you're already using it – it's how most of the +functionality described in previous chapters is implemented. The APIs +described here have strictly defined and carefully documented +semantics. But some of them have `nasty big pointy teeth `__. Mistakes may not be handled gracefully; rules and conventions that are followed -strictly in the rest of trio do not always apply. Read and tread -carefully. - -But if you find yourself needing to, for example, implement new -synchronization primitives or expose new low-level I/O functionality, -then you're in the right place. +strictly in the rest of Trio do not always apply. Read and tread +carefully: using this module makes it your responsibility to handle +the nasty cases and expose a friendly Trio-style API to your users. Low-level I/O primitives ======================== Different environments expose different low-level APIs for performing -async I/O. :mod:`trio.hazmat` attempts to expose these APIs in a -relatively direct way, so as to allow maximum power and flexibility -for higher level code. However, this means that the exact API provided -may vary depending on what system trio is running on. +async I/O. :mod:`trio.hazmat` exposes these APIs in a relatively +direct way, so as to allow maximum power and flexibility for higher +level code. However, this means that the exact API provided may vary +depending on what system trio is running on. Universally available API @@ -111,8 +127,9 @@ Unix-like systems provide the following functions: Kqueue-specific API ------------------- -TODO: these are currently more of a sketch than anything real. See -`#26 `__. +TODO: these are implemented, but are currently more of a sketch than +anything real. See `#26 +`__. .. function:: current_kqueue() @@ -126,8 +143,9 @@ TODO: these are currently more of a sketch than anything real. See Windows-specific API -------------------- -TODO: these are currently more of a sketch than anything real. See -`#26 `__ and `#52 +TODO: these are implemented, but are currently more of a sketch than +anything real. See `#26 +`__ and `#52 `__. .. function:: register_with_iocp(handle) @@ -141,6 +159,36 @@ TODO: these are currently more of a sketch than anything real. See :with: queue +Unbounded queues +================ + +In the section :ref:`queue`, we showed an example with two producers +and one consumer using the same queue, where the queue size would grow +without bound to produce unbounded latency and memory usage. +:class:`trio.Queue` avoids this by placing an upper bound on how big +the queue can get before ``put`` starts blocking. But what if you're +in a situation where ``put`` can't block? + +There is another option: the queue consumer could get greedy. Each +time it runs, it could eagerly consume all of the pending items before +allowing another task to run. (In some other systems, this would +happen automatically because their queue's ``get`` method doesn't +invoke the scheduler unless it has to block. But :ref:`in trio, get is +always a checkpoint `.) This works, but it's a bit +risky: basically instead of applying backpressure to specifically the +producer tasks, we're applying it to *all* the tasks in our system. +The danger here is that if enough items have built up in the queue, +then "stopping the world" to process them all may cause unacceptable +latency spikes in unrelated tasks. Nonetheless, this is still the +right choice in situations where it's impossible to apply backpressure +more precisely. So this is the strategy implemented by +:class:`UnboundedQueue`. The main time you should use this is when +working with low-level APIs like :func:`monitor_kevent`. + +.. autoclass:: UnboundedQueue + :members: + + Global state: system tasks and run-local storage ================================================ @@ -237,6 +285,43 @@ These transitions are accomplished using two function decorators: .. autofunction:: currently_ki_protected +Result objects +============== + +Trio provides some simple classes for representing the result of a +Python function call, so that it can be passed around. The basic rule +is:: + + result = Result.capture(f, *args) + x = result.unwrap() + +is the same as:: + + x = f(*args) + +even if ``f`` raises an error. And there's also +:meth:`Result.acapture`, which is like ``await f(*args)``. + +There's nothing really dangerous about this system – it's actually +very general and quite handy! But mostly it's used for things like +implementing :func:`trio.run_sync_in_worker_thread`, or for getting +values to pass to :func:`reschedule`, so we put it in +:mod:`trio.hazmat` to avoid cluttering up the main API. + +Since :class:`Result` objects are simple immutable data structures +that don't otherwise interact with the trio machinery, it's safe to +create and access :class:`Result` objects from any thread you like. + +.. autoclass:: Result + :members: + +.. autoclass:: Value + :members: + +.. autoclass:: Error + :members: + + Sleeping and waking =================== @@ -312,9 +397,9 @@ first to make sure that the whole thing is a checkpoint. Low-level blocking ------------------ +.. autofunction:: yield_indefinitely .. autoclass:: Abort .. autofunction:: reschedule -.. autofunction:: yield_indefinitely Here's an example lock class implemented using :func:`yield_indefinitely` directly. This implementation has a number @@ -346,3 +431,35 @@ this does serve to illustrate the basic structure of the if self._blocked_tasks: woken_task = self._blocked_tasks.popleft() trio.hazmat.reschedule(woken_task) + + +Task API +-------- + +.. autofunction:: current_task() + +.. class:: Task() + + A :class:`Task` object represents a concurrent "thread" of + execution. It has no public constructor; Trio internally creates a + :class:`Task` object for each call to ``nursery.start(...)`` or + ``nursery.start_soon(...)``. + + Its public members are mostly useful for introspection and + debugging: + + .. attribute:: name + + String containing this :class:`Task`\'s name. Usually the name + of the function this :class:`Task` is running, but can be + overridden by passing ``name=`` to ``start`` or ``start_soon``. + + .. attribute:: coro + + This task's coroutine object. Example usage: extracting a stack + trace. + + .. autoattribute:: parent_nursery + + .. autoattribute:: child_nurseries + diff --git a/docs/source/reference-io.rst b/docs/source/reference-io.rst index d39f52c6be..845342417d 100644 --- a/docs/source/reference-io.rst +++ b/docs/source/reference-io.rst @@ -79,7 +79,7 @@ create complex transport configurations. Here's some examples: * The :mod:`trio.testing` module provides a set of :ref:`flexible in-memory stream object implementations `, so if - you have a protocol implementation to test then you can can spawn + you have a protocol implementation to test then you can can start two tasks, set up a virtual "socket" connecting them, and then do things like inject random-but-repeatable delays into the connection. diff --git a/docs/source/reference-testing/across-realtime.py b/docs/source/reference-testing/across-realtime.py index a2393198e8..bf9f46514a 100644 --- a/docs/source/reference-testing/across-realtime.py +++ b/docs/source/reference-testing/across-realtime.py @@ -41,8 +41,8 @@ async def task2(): async def main(): async with trio.open_nursery() as nursery: - nursery.spawn(task1) - nursery.spawn(task2) + nursery.start_soon(task1) + nursery.start_soon(task2) def run_example(clock): real_start = time.time() diff --git a/docs/source/tutorial.rst b/docs/source/tutorial.rst index e5fe9d66ea..6d8cf71efa 100644 --- a/docs/source/tutorial.rst +++ b/docs/source/tutorial.rst @@ -377,13 +377,13 @@ Now that we understand ``async with``, let's look at ``parent`` again: There are only 4 lines of code that really do anything here. On line 17, we use :func:`trio.open_nursery` to get a "nursery" object, and -then inside the ``async with`` block we call ``nursery.spawn`` twice, +then inside the ``async with`` block we call ``nursery.start_soon`` twice, on lines 19 and 22. There are actually two ways to call an async function: the first one is the one we already saw, using ``await -async_fn()``; the new one is ``nursery.spawn(async_fn)``: it asks trio +async_fn()``; the new one is ``nursery.start_soon(async_fn)``: it asks trio to start running this async function, *but then returns immediately without waiting for the function to finish*. So after our two calls to -``nursery.spawn``, ``child1`` and ``child2`` are now running in the +``nursery.start_soon``, ``child1`` and ``child2`` are now running in the background. And then at line 25, the commented line, we hit the end of the ``async with`` block, and the nursery's ``__aexit__`` function runs. What this does is force ``parent`` to stop here and wait for all @@ -402,7 +402,7 @@ parenting is a full-time job! Any given piece of code manage a nursery – which means opening it, spawning some children, and then sitting in ``__aexit__`` to supervise them – or it can do actual work, but you shouldn't try to do both at the same time in the same function. If you -find yourself tempted to do some work in the parent, then ``spawn`` +find yourself tempted to do some work in the parent, then ``start_soon`` another child and have it do the work. In trio, children are cheap. Ok! Let's try running it and see what we get: @@ -827,7 +827,7 @@ I'm running on", so ``(127.0.0.1, PORT)`` means that we want to connect to whatever program on the current computer is using ``PORT`` as its contact point. And then once the connection is made, we pass the connected client socket into the two child tasks. (This is also a -good example of how ``nursery.spawn`` lets you pass positional +good example of how ``nursery.start_soon`` lets you pass positional arguments to the spawned function.) Our first task's job is to send data to the server: @@ -869,7 +869,7 @@ thing, and then we'll discuss the pieces: :linenos: Let's start with ``echo_server``. As we'll see below, each time an -echo client connects, our server will spawn a child task running +echo client connects, our server will start a child task running ``echo_server``; there might be lots of these running at once if lots of clients are connected. Its job is to read data from its particular client, and then echo it back. It should be pretty straightforward to @@ -909,7 +909,7 @@ But where do these ``echo_server`` tasks come from? An important part of writing a trio program is deciding how you want to organize your tasks. In the examples we've seen so far, this was simple, because the set of tasks was fixed. Here, we want to wait for clients to connect, -and then spawn a new task for each one. The tricky part is that like +and then start a new task for each one. The tricky part is that like we mentioned above, managing a nursery is a full time job: you don't want the task that has the nursery and is supervising the child tasks to do anything else, like listen for new connections. @@ -923,7 +923,7 @@ and then *passes the nursery object to the child task*: :lineno-match: :pyobject: parent -Now ``echo_listener`` can spawn "siblings" instead of children – even +Now ``echo_listener`` can start "siblings" instead of children – even though the ``echo_listener`` is the one spawning ``echo_server`` tasks, we end up with a task tree that looks like: diff --git a/docs/source/tutorial/echo-client-low-level.py b/docs/source/tutorial/echo-client-low-level.py index fe345512de..b0739304a9 100644 --- a/docs/source/tutorial/echo-client-low-level.py +++ b/docs/source/tutorial/echo-client-low-level.py @@ -35,9 +35,9 @@ async def parent(): await client_sock.connect(("127.0.0.1", PORT)) async with trio.open_nursery() as nursery: print("parent: spawning sender...") - nursery.spawn(sender, client_sock) + nursery.start_soon(sender, client_sock) print("parent: spawning receiver...") - nursery.spawn(receiver, client_sock) + nursery.start_soon(receiver, client_sock) trio.run(parent) diff --git a/docs/source/tutorial/echo-server-low-level.py b/docs/source/tutorial/echo-server-low-level.py index c3aff01f9d..de6f32fa9d 100644 --- a/docs/source/tutorial/echo-server-low-level.py +++ b/docs/source/tutorial/echo-server-low-level.py @@ -42,11 +42,11 @@ async def echo_listener(nursery): server_sock, _ = await listen_sock.accept() print("echo_listener: got new connection, spawning echo_server") ident += 1 - nursery.spawn(echo_server, server_sock, ident) + nursery.start_soon(echo_server, server_sock, ident) async def parent(): async with trio.open_nursery() as nursery: print("parent: spawning echo_listener") - nursery.spawn(echo_listener, nursery) + nursery.start_soon(echo_listener, nursery) trio.run(parent) diff --git a/docs/source/tutorial/tasks-intro.py b/docs/source/tutorial/tasks-intro.py index d317fa7399..a316cb933d 100644 --- a/docs/source/tutorial/tasks-intro.py +++ b/docs/source/tutorial/tasks-intro.py @@ -16,10 +16,10 @@ async def parent(): print("parent: started!") async with trio.open_nursery() as nursery: print("parent: spawning child1...") - nursery.spawn(child1) + nursery.start_soon(child1) print("parent: spawning child2...") - nursery.spawn(child2) + nursery.start_soon(child2) print("parent: waiting for children to finish...") # -- we exit the nursery block here -- diff --git a/docs/source/tutorial/tasks-with-trace.py b/docs/source/tutorial/tasks-with-trace.py index ed3a80a5dc..38a1ffe862 100644 --- a/docs/source/tutorial/tasks-with-trace.py +++ b/docs/source/tutorial/tasks-with-trace.py @@ -16,10 +16,10 @@ async def parent(): print("parent: started!") async with trio.open_nursery() as nursery: print("parent: spawning child1...") - nursery.spawn(child1) + nursery.start_soon(child1) print("parent: spawning child2...") - nursery.spawn(child2) + nursery.start_soon(child2) print("parent: waiting for children to finish...") # -- we exit the nursery block here -- diff --git a/notes-to-self/loopy.py b/notes-to-self/loopy.py index 4b9911590b..fca033a6c0 100644 --- a/notes-to-self/loopy.py +++ b/notes-to-self/loopy.py @@ -10,8 +10,8 @@ async def loopy(): print("KI!") async def main(): - await trio.spawn(loopy) - await trio.spawn(loopy) - await trio.spawn(loopy) + await trio.start_soon(loopy) + await trio.start_soon(loopy) + await trio.start_soon(loopy) trio.run(main) diff --git a/notes-to-self/lots-of-tasks.py b/notes-to-self/lots-of-tasks.py index 0fdb3bf9ce..fca2741de9 100644 --- a/notes-to-self/lots-of-tasks.py +++ b/notes-to-self/lots-of-tasks.py @@ -7,6 +7,6 @@ async def main(): async with trio.open_nursery() as nursery: for _ in range(COUNT): - nursery.spawn(trio.sleep, 1) + nursery.start_soon(trio.sleep, 1) trio.run(main) diff --git a/notes-to-self/schedule-timing.py b/notes-to-self/schedule-timing.py index 601aae5fde..25ca702806 100644 --- a/notes-to-self/schedule-timing.py +++ b/notes-to-self/schedule-timing.py @@ -31,7 +31,7 @@ async def report_loop(): async def main(): async with trio.open_nursery() as nursery: - nursery.spawn(reschedule_loop, 10) - nursery.spawn(report_loop) + nursery.start_soon(reschedule_loop, 10) + nursery.start_soon(report_loop) trio.run(main) diff --git a/notes-to-self/trivial-err.py b/notes-to-self/trivial-err.py index 96c7be800d..ed11ec33e6 100644 --- a/notes-to-self/trivial-err.py +++ b/notes-to-self/trivial-err.py @@ -8,8 +8,8 @@ async def child1(): async def child2(): async with trio.open_nursery() as nursery: - nursery.spawn(grandchild1) - nursery.spawn(grandchild2) + nursery.start_soon(grandchild1) + nursery.start_soon(grandchild2) async def grandchild1(): raise KeyError @@ -19,8 +19,8 @@ async def grandchild2(): async def main(): async with trio.open_nursery() as nursery: - nursery.spawn(child1) - nursery.spawn(child2) - #nursery.spawn(grandchild1) + nursery.start_soon(child1) + nursery.start_soon(child2) + #nursery.start_soon(grandchild1) trio.run(main) diff --git a/trio/__init__.py b/trio/__init__.py index c2c4ef46b8..96cab098ee 100644 --- a/trio/__init__.py +++ b/trio/__init__.py @@ -76,6 +76,32 @@ from . import ssl # Not imported by default: testing +# Stuff that got moved: +_deprecate.enable_attribute_deprecations(__name__) + +__deprecated_attributes__ = { + "Task": + _deprecate.DeprecatedAttribute(hazmat.Task, "0.2.0", issue=136), + "current_task": + _deprecate.DeprecatedAttribute( + hazmat.current_task, "0.2.0", issue=136 + ), + "Result": + _deprecate.DeprecatedAttribute(hazmat.Result, "0.2.0", issue=136), + "Value": + _deprecate.DeprecatedAttribute(hazmat.Value, "0.2.0", issue=136), + "Error": + _deprecate.DeprecatedAttribute(hazmat.Error, "0.2.0", issue=136), + "UnboundedQueue": + _deprecate.DeprecatedAttribute( + hazmat.UnboundedQueue, "0.2.0", issue=136 + ), + "run_in_worker_thread": + _deprecate.DeprecatedAttribute( + run_sync_in_worker_thread, "0.2.0", issue=68 + ), +} + # Having the public path in .__module__ attributes is important for: # - exception names in printed tracebacks # - sphinx :show-inheritance: diff --git a/trio/_abc.py b/trio/_abc.py index 407f47cbee..536c827bc7 100644 --- a/trio/_abc.py +++ b/trio/_abc.py @@ -91,7 +91,7 @@ def task_spawned(self, task): """Called when the given task is created. Args: - task (trio.Task): The new task. + task (trio.hazmat.Task): The new task. """ @@ -102,7 +102,7 @@ def task_scheduled(self, task): runnable tasks ahead of it. Args: - task (trio.Task): The task that became runnable. + task (trio.hazmat.Task): The task that became runnable. """ @@ -110,7 +110,7 @@ def before_task_step(self, task): """Called immediately before we resume running the given task. Args: - task (trio.Task): The task that is about to run. + task (trio.hazmat.Task): The task that is about to run. """ @@ -118,7 +118,7 @@ def after_task_step(self, task): """Called when we return to the main run loop after a task has yielded. Args: - task (trio.Task): The task that just ran. + task (trio.hazmat.Task): The task that just ran. """ @@ -126,7 +126,7 @@ def task_exited(self, task): """Called when the given task exits. Args: - task (trio.Task): The finished task. + task (trio.hazmat.Task): The finished task. """ diff --git a/trio/_core/_local.py b/trio/_core/_local.py index 5e42f5207d..9710bab9a1 100644 --- a/trio/_core/_local.py +++ b/trio/_core/_local.py @@ -62,13 +62,14 @@ class TaskLocal(_LocalBase): Instances of this class have no particular attributes or methods. Instead, they serve as a blank slate to which you can add whatever attributes you like. Modifications made within one task will only be visible to that task - – with one exception: when you ``spawn`` a new task, then any - :class:`TaskLocal` attributes that are visible in the spawning task will - be inherited by the child. This inheritance takes the form of a shallow - copy: further changes in the parent will *not* affect the child, and - changes in the child will not affect the parent. (If you're familiar with - how environment variables are inherited across processes, then - :class:`TaskLocal` inheritance is somewhat similar.) + – with one exception: when you start a new task, then any + :class:`TaskLocal` attributes that are visible in the task that called + ``start`` or ``start_soon`` will be inherited by the child. This + inheritance takes the form of a shallow copy: further changes in the + parent will *not* affect the child, and changes in the child will not + affect the parent. (If you're familiar with how environment variables are + inherited across processes, then :class:`TaskLocal` inheritance is + somewhat similar.) If you're familiar with :class:`threading.local`, then :class:`trio.TaskLocal` is very similar, except adapted to work with tasks diff --git a/trio/_core/_parking_lot.py b/trio/_core/_parking_lot.py index 09f59f0ba9..1d3c00b22c 100644 --- a/trio/_core/_parking_lot.py +++ b/trio/_core/_parking_lot.py @@ -183,7 +183,7 @@ async def main(): lot1 = trio.hazmat.ParkingLot() lot2 = trio.hazmat.ParkingLot() async with trio.open_nursery() as nursery: - nursery.spawn(lot1) + nursery.start_soon(lot1) await trio.testing.wait_all_tasks_blocked() assert len(lot1) == 1 assert len(lot2) == 0 diff --git a/trio/_core/_result.py b/trio/_core/_result.py index 39b6d78d92..83f5db4fa0 100644 --- a/trio/_core/_result.py +++ b/trio/_core/_result.py @@ -1,9 +1,12 @@ import abc import attr +from . import _hazmat + __all__ = ["Result", "Value", "Error"] +@_hazmat @attr.s(slots=True, frozen=True) class Result(metaclass=abc.ABCMeta): """An abstract class representing the result of a Python computation. @@ -81,6 +84,7 @@ async def asend(self, agen): """ +@_hazmat @attr.s(slots=True, frozen=True, repr=False) class Value(Result): """Concrete :class:`Result` subclass representing a regular value. @@ -103,6 +107,7 @@ async def asend(self, agen): return await agen.asend(self.value) +@_hazmat @attr.s(slots=True, frozen=True, repr=False) class Error(Result): """Concrete :class:`Result` subclass representing a raised exception. diff --git a/trio/_core/_run.py b/trio/_core/_run.py index 83d82234dd..c6eabd00a9 100644 --- a/trio/_core/_run.py +++ b/trio/_core/_run.py @@ -344,14 +344,14 @@ async def open_nursery(): class Nursery: - def __init__(self, parent, cancel_scope): + def __init__(self, parent_task, cancel_scope): # the parent task -- only used for introspection, to implement # task.parent_task - self._parent = parent - parent._child_nurseries.append(self) + self._parent_task = parent_task + parent_task._child_nurseries.append(self) # the cancel stack that children inherit - we take a snapshot, so it # won't be affected by any changes in the parent. - self._cancel_stack = list(parent._cancel_stack) + self._cancel_stack = list(parent_task._cancel_stack) # the cancel scope that directly surrounds us; used for cancelling all # children. self.cancel_scope = cancel_scope @@ -359,27 +359,47 @@ def __init__(self, parent, cancel_scope): self._children = set() self._pending_starts = 0 self._zombies = set() - self.monitor = _core.UnboundedQueue() + self._monitor = _core.UnboundedQueue() self._closed = False @property + @deprecated("0.2.0", instead="child_tasks", issue=136) def children(self): return frozenset(self._children) @property + def child_tasks(self): + return frozenset(self._children) + + @property + def parent_task(self): + return self._parent_task + + @property + @deprecated("0.2.0", instead=None, issue=136) def zombies(self): return frozenset(self._zombies) + @property + @deprecated("0.2.0", instead=None, issue=136) + def monitor(self): + return self._monitor + def _child_finished(self, task): self._children.remove(task) self._zombies.add(task) - self.monitor.put_nowait(task) + self._monitor.put_nowait(task) def start_soon(self, async_fn, *args, name=None): GLOBAL_RUN_CONTEXT.runner.spawn_impl(async_fn, args, self, name) # Returns the task, unlike start_soon - #@deprecated("nursery.spawn", version="0.2.0", "nursery.start_soon") + @deprecated( + "0.2.0", + thing="nursery.spawn", + instead="nursery.start_soon", + issue=284 + ) def spawn(self, async_fn, *args, name=None): return GLOBAL_RUN_CONTEXT.runner.spawn_impl(async_fn, args, self, name) @@ -402,17 +422,25 @@ async def start(self, async_fn, *args, name=None): return task_status._value finally: self._pending_starts -= 1 - self.monitor.put_nowait(None) + self._monitor.put_nowait(None) - def reap(self, task): + def _reap(self, task): try: self._zombies.remove(task) except KeyError: raise ValueError("{} is not a zombie in this nursery".format(task)) + @deprecated("0.2.0", instead=None, issue=136) + def reap(self, task): + return self._reap(task) + + def _reap_and_unwrap(self, task): + self._reap(task) + return task._result.unwrap() + + @deprecated("0.2.0", instead=None, issue=136) def reap_and_unwrap(self, task): - self.reap(task) - return task.result.unwrap() + return self._reap_and_unwrap(task) async def _clean_up(self, pending_exc): cancelled_children = False @@ -435,27 +463,27 @@ async def _clean_up(self, pending_exc): # of remaining tasks, so we have to check first before # blocking on the monitor queue. for task in list(self._zombies): - if type(task.result) is Error: - exceptions.append(task.result.error) - self.reap(task) + if type(task._result) is Error: + exceptions.append(task._result.error) + self._reap(task) if exceptions and not cancelled_children: self.cancel_scope.cancel() clean_up_scope.shield = True cancelled_children = True - if self.children or self._pending_starts: + if self._children or self._pending_starts: try: # We ignore the return value here, and will pick up # the actual tasks from the zombies set after looping # around. (E.g. it's possible there are tasks in the # queue that were already reaped.) - await self.monitor.get_batch() + await self._monitor.get_batch() except (Cancelled, KeyboardInterrupt) as exc: exceptions.append(exc) self._closed = True - popped = self._parent._child_nurseries.pop() + popped = self._parent_task._child_nurseries.pop() assert popped is self if exceptions: mexc = MultiError(exceptions) @@ -483,7 +511,7 @@ async def _clean_up(self, pending_exc): raise mexc def __del__(self): - assert not self.children and not self.zombies + assert not self._children and not self._zombies ################################################################ @@ -504,6 +532,7 @@ def _pending_cancel_scope(cancel_stack): return pending_scope +@_hazmat @attr.s(slots=True, cmp=False, hash=False, repr=False) class Task: _parent_nursery = attr.ib() @@ -513,7 +542,7 @@ class Task: # Invariant: # - for unfinished tasks, result is None # - for finished tasks, result is a Result object - result = attr.ib(default=None) + _result = attr.ib(default=None) # Invariant: # - for unscheduled tasks, _next_send is None # - for scheduled tasks, _next_send is a Result object @@ -537,8 +566,14 @@ class Task: def __repr__(self): return ("".format(self.name, id(self))) + @property + @deprecated("0.2.0", instead=None, issue=136) + def result(self): + return self._result + # For debugging and visualization: @property + @deprecated("0.2.0", instead="parent_nursery.parent_task", issue=136) def parent_task(self): """This task's parent task (or None if this is the "init" task). @@ -547,7 +582,27 @@ def parent_task(self): if self._parent_nursery is None: return None else: - return self._parent_nursery._parent + return self._parent_nursery._parent_task + + @property + def parent_nursery(self): + """The nursery this task is inside (or None if this is the "init" + take). + + Example use case: drawing a visualization of the task tree in a + debugger. + + """ + return self._parent_nursery + + @property + def child_nurseries(self): + """The nurseries this task contains. + + This is a list, with outer nurseries before inner nurseries. + + """ + return list(self._child_nurseries) ################ # Monitoring task exit @@ -555,6 +610,7 @@ def parent_task(self): _monitors = attr.ib(default=attr.Factory(set)) + @deprecated("0.2.0", instead=None, issue=136) def add_monitor(self, queue): """Register to be notified when this task exits. @@ -567,6 +623,9 @@ def add_monitor(self, queue): ValueError: if ``queue`` is already registered with this task """ + return self._add_monitor(queue) + + def _add_monitor(self, queue): # Rationale: (a) don't particularly want to create a # callback-in-disguise API by allowing people to stick in some # arbitrary object with a put_nowait method, (b) don't want to have to @@ -577,11 +636,12 @@ def add_monitor(self, queue): raise TypeError("monitor must be an UnboundedQueue object") if queue in self._monitors: raise ValueError("can't add same monitor twice") - if self.result is not None: + if self._result is not None: queue.put_nowait(self) else: self._monitors.add(queue) + @deprecated("0.2.0", instead=None, issue=136) def discard_monitor(self, queue): """Unregister the given queue from being notified about this task exiting. @@ -596,16 +656,17 @@ def discard_monitor(self, queue): self._monitors.discard(queue) + @deprecated("0.2.0", instead=None, issue=136) async def wait(self): """Wait for this task to exit. """ q = _core.UnboundedQueue() - self.add_monitor(q) + self._add_monitor(q) try: await q.get_batch() finally: - self.discard_monitor(q) + self._monitors.discard(q) ################ # Cancellation @@ -765,7 +826,8 @@ def current_clock(self): @_public @_hazmat def reschedule(self, task, next_send=Value(None)): - """Reschedule the given task with the given :class:`~trio.Result`. + """Reschedule the given task with the given + :class:`~trio.hazmat.Result`. See :func:`yield_indefinitely` for the gory details. @@ -775,10 +837,10 @@ def reschedule(self, task, next_send=Value(None)): to calling :func:`reschedule` once.) Args: - task (trio.Task): the task to be rescheduled. Must be blocked in a - call to :func:`yield_indefinitely`. - next_send (trio.Result): the value (or error) to return (or raise) - from :func:`yield_indefinitely`. + task (trio.hazmat.Task): the task to be rescheduled. Must be blocked + in a call to :func:`yield_indefinitely`. + next_send (trio.hazmat.Result): the value (or error) to return (or + raise) from :func:`yield_indefinitely`. """ assert task._runner is self @@ -830,7 +892,7 @@ def _return_value_looks_like_wrong_library(value): try: coro = async_fn(*args) except TypeError: - # Give good error for: nursery.spawn(trio.sleep(1)) + # Give good error for: nursery.start_soon(trio.sleep(1)) if inspect.iscoroutine(async_fn): raise TypeError( "trio was expecting an async function, but instead it got " @@ -838,17 +900,17 @@ def _return_value_looks_like_wrong_library(value): "\n" "Probably you did something like:\n" "\n" - " trio.run({async_fn.__name__}(...)) # incorrect!\n" - " nursery.spawn({async_fn.__name__}(...)) # incorrect!\n" + " trio.run({async_fn.__name__}(...)) # incorrect!\n" + " nursery.start_soon({async_fn.__name__}(...)) # incorrect!\n" "\n" "Instead, you want (notice the parentheses!):\n" "\n" - " trio.run({async_fn.__name__}, ...) # correct!\n" - " nursery.spawn({async_fn.__name__}, ...) # correct!" + " trio.run({async_fn.__name__}, ...) # correct!\n" + " nursery.start_soon({async_fn.__name__}, ...) # correct!" .format(async_fn=async_fn) ) from None - # Give good error for: nursery.spawn(asyncio.sleep(1)) + # Give good error for: nursery.start_soon(asyncio.sleep(1)) if _return_value_looks_like_wrong_library(async_fn): raise TypeError( "trio was expecting an async function, but instead it got " @@ -865,15 +927,15 @@ def _return_value_looks_like_wrong_library(value): # function. So we have to just call it and then check whether the # result is a coroutine object. if not inspect.iscoroutine(coro): - # Give good error for: nursery.spawn(asyncio.sleep, 1) + # Give good error for: nursery.start_soon(asyncio.sleep, 1) if _return_value_looks_like_wrong_library(coro): raise TypeError( - "spawn got unexpected {!r} – are you trying to use a " + "start_soon got unexpected {!r} – are you trying to use a " "library written for asyncio/twisted/tornado or similar? " "That won't work without some sort of compatibility shim." .format(coro) ) - # Give good error for: nursery.spawn(some_sync_fn) + # Give good error for: nursery.start_soon(some_sync_fn) raise TypeError( "trio expected an async function, but {!r} appears to be " "synchronous" @@ -916,7 +978,7 @@ def _return_value_looks_like_wrong_library(value): return task def task_exited(self, task, result): - task.result = result + task._result = result while task._cancel_stack: task._cancel_stack[-1]._remove_task(task) self.tasks.remove(task) @@ -1002,14 +1064,17 @@ async def init(self, async_fn, args): self.spawn_system_task( self.call_soon_task, name="" ) - self.main_task = system_nursery.spawn(async_fn, *args) - async for task_batch in system_nursery.monitor: + + self.main_task = self.spawn_impl( + async_fn, args, self.system_nursery, name=None + ) + async for task_batch in system_nursery._monitor: for task in task_batch: if task is self.main_task: system_nursery.cancel_scope.cancel() - return system_nursery.reap_and_unwrap(task) + return system_nursery._reap_and_unwrap(task) else: - system_nursery.reap_and_unwrap(task) + system_nursery._reap_and_unwrap(task) ################ # Outside Context Problems @@ -1194,7 +1259,7 @@ def _deliver_ki_cb(self): # same time -- so even if KI arrives before main_task is created, we # won't get here until afterwards. assert self.main_task is not None - if self.main_task.result is not None: + if self.main_task._result is not None: # We're already in the process of exiting -- leave ki_pending set # and we'll check it again on our way out of run(). return @@ -1236,7 +1301,7 @@ async def wait_all_tasks_blocked(self, cushion=0.0, tiebreaker=0): Example: Here's an example of one way to test that trio's locks are fair: we - take the lock in the parent, spawn a child, wait for the child to be + take the lock in the parent, start a child, wait for the child to be blocked waiting for the lock (!), and then check that we can't release and immediately re-acquire the lock:: @@ -1248,7 +1313,7 @@ async def test_lock_fairness(): lock = trio.Lock() await lock.acquire() async with trio.open_nursery() as nursery: - child = nursery.spawn(lock_taker, lock) + child = nursery.start_soon(lock_taker, lock) # child hasn't run yet, we have the lock assert lock.locked() assert lock._owner is trio.current_task() @@ -1606,7 +1671,7 @@ def run_impl(runner, async_fn, args): runner.instrument("after_task_step", task) del GLOBAL_RUN_CONTEXT.task - return runner.init_task.result + return runner.init_task._result ################################################################ @@ -1625,6 +1690,7 @@ def started(self, value=None): STATUS_IGNORED = _StatusIgnored() +@_hazmat def current_task(): """Return the :class:`Task` object representing the current task. diff --git a/trio/_core/_traps.py b/trio/_core/_traps.py index d9b1802d3c..ed0f89adcd 100644 --- a/trio/_core/_traps.py +++ b/trio/_core/_traps.py @@ -66,7 +66,7 @@ def yield_indefinitely(abort_func): """Put the current task to sleep, with cancellation support. This is the lowest-level API for blocking in trio. Every time a - :class:`~trio.Task` blocks, it does so by calling this function. + :class:`~trio.hazmat.Task` blocks, it does so by calling this function. This is a tricky interface with no guard rails. If you can use :class:`ParkingLot` or the built-in I/O wait functions instead, then you diff --git a/trio/_core/_unbounded_queue.py b/trio/_core/_unbounded_queue.py index fb6844d8a3..24c39906de 100644 --- a/trio/_core/_unbounded_queue.py +++ b/trio/_core/_unbounded_queue.py @@ -14,6 +14,7 @@ class _UnboundedQueueStats: tasks_waiting = attr.ib() +@_hazmat class UnboundedQueue: """An unbounded queue suitable for certain unusual forms of inter-task communication. @@ -25,8 +26,8 @@ class UnboundedQueue: "batches". If a consumer task processes each batch without yielding, then this helps achieve (but does not guarantee) an effective bound on the queue's memory use, at the cost of potentially increasing system latencies - in general. You should generally prefer to use a :class:`Queue` instead if - you can. + in general. You should generally prefer to use a :class:`trio.Queue` + instead if you can. Currently each batch completely empties the queue, but `this may change in the future `__. @@ -99,10 +100,10 @@ def get_batch_nowait(self): Returns: list: A list of dequeued items, in order. On a successful call this list is always non-empty; if it would be empty we raise - :exc:`WouldBlock` instead. + :exc:`~trio.WouldBlock` instead. Raises: - WouldBlock: if the queue is empty. + ~trio.WouldBlock: if the queue is empty. """ if not self._can_get: diff --git a/trio/_core/tests/test_epoll.py b/trio/_core/tests/test_epoll.py index 4e77e596fa..8b6752a5ae 100644 --- a/trio/_core/tests/test_epoll.py +++ b/trio/_core/tests/test_epoll.py @@ -41,11 +41,11 @@ async def test_epoll_statistics(): fill_socket(a1) fill_socket(a3) async with _core.open_nursery() as nursery: - nursery.spawn(_core.wait_writable, a1) - nursery.spawn(_core.wait_readable, a2) - nursery.spawn(_core.wait_readable, b2) - nursery.spawn(_core.wait_writable, a3) - nursery.spawn(_core.wait_readable, a3) + nursery.start_soon(_core.wait_writable, a1) + nursery.start_soon(_core.wait_readable, a2) + nursery.start_soon(_core.wait_readable, b2) + nursery.start_soon(_core.wait_writable, a3) + nursery.start_soon(_core.wait_readable, a3) await wait_all_tasks_blocked() diff --git a/trio/_core/tests/test_io.py b/trio/_core/tests/test_io.py index cec14eb74e..f17b31de85 100644 --- a/trio/_core/tests/test_io.py +++ b/trio/_core/tests/test_io.py @@ -103,14 +103,13 @@ async def block_on_read(): record.append("cancelled") else: record.append("readable") - return a.recv(10) + assert a.recv(10) == b"x" async with _core.open_nursery() as nursery: - t = nursery.spawn(block_on_read) + nursery.start_soon(block_on_read) await wait_all_tasks_blocked() assert record == [] b.send(b"x") - assert t.result.unwrap() == b"x" fill_socket(a) @@ -129,7 +128,7 @@ async def block_on_write(): record.append("writable") async with _core.open_nursery() as nursery: - t = nursery.spawn(block_on_write) + nursery.start_soon(block_on_write) await wait_all_tasks_blocked() assert record == [] drain_socket(b) @@ -137,7 +136,7 @@ async def block_on_write(): # check cancellation record = [] async with _core.open_nursery() as nursery: - t = nursery.spawn(block_on_read) + nursery.start_soon(block_on_read) await wait_all_tasks_blocked() nursery.cancel_scope.cancel() assert record == ["cancelled"] @@ -145,7 +144,7 @@ async def block_on_write(): fill_socket(a) record = [] async with _core.open_nursery() as nursery: - t = nursery.spawn(block_on_write) + nursery.start_soon(block_on_write) await wait_all_tasks_blocked() nursery.cancel_scope.cancel() assert record == ["cancelled"] @@ -157,7 +156,7 @@ async def test_double_read(socketpair, wait_readable): # You can't have two tasks trying to read from a socket at the same time async with _core.open_nursery() as nursery: - nursery.spawn(wait_readable, a) + nursery.start_soon(wait_readable, a) await wait_all_tasks_blocked() with assert_yields(): with pytest.raises(_core.ResourceBusyError): @@ -172,7 +171,7 @@ async def test_double_write(socketpair, wait_writable): # You can't have two tasks trying to write to a socket at the same time fill_socket(a) async with _core.open_nursery() as nursery: - nursery.spawn(wait_writable, a) + nursery.start_soon(wait_writable, a) await wait_all_tasks_blocked() with assert_yields(): with pytest.raises(_core.ResourceBusyError): @@ -185,18 +184,29 @@ async def test_double_write(socketpair, wait_writable): async def test_socket_simultaneous_read_write( socketpair, wait_readable, wait_writable ): + record = [] + + async def r_task(sock): + await wait_readable(sock) + record.append("r_task") + + async def w_task(sock): + await wait_writable(sock) + record.append("w_task") + a, b = socketpair fill_socket(a) async with _core.open_nursery() as nursery: - r_task = nursery.spawn(wait_readable, a) - w_task = nursery.spawn(wait_writable, a) + nursery.start_soon(r_task, a) + nursery.start_soon(w_task, a) await wait_all_tasks_blocked() - assert r_task.result is None - assert w_task.result is None + assert record == [] b.send(b"x") - await r_task.wait() + await wait_all_tasks_blocked() + assert record == ["r_task"] drain_socket(b) - await w_task.wait() + await wait_all_tasks_blocked() + assert record == ["r_task", "w_task"] @read_socket_test @@ -213,7 +223,9 @@ async def test_socket_actual_streaming( N = 1000000 # 1 megabyte MAX_CHUNK = 65536 - async def sender(sock, seed): + results = {} + + async def sender(sock, seed, key): r = random.Random(seed) sent = 0 while sent < N: @@ -226,9 +238,9 @@ async def sender(sock, seed): sent += this_chunk_size del chunk[:this_chunk_size] sock.shutdown(stdlib_socket.SHUT_WR) - return sent + results[key] = sent - async def receiver(sock): + async def receiver(sock, key): received = 0 while True: print("received", received) @@ -238,13 +250,13 @@ async def receiver(sock): if not this_chunk_size: break received += this_chunk_size - return received + results[key] = received async with _core.open_nursery() as nursery: - send_a = nursery.spawn(sender, a, 0) - send_b = nursery.spawn(sender, b, 1) - recv_a = nursery.spawn(receiver, a) - recv_b = nursery.spawn(receiver, b) + nursery.start_soon(sender, a, 0, "send_a") + nursery.start_soon(sender, b, 1, "send_b") + nursery.start_soon(receiver, a, "recv_a") + nursery.start_soon(receiver, b, "recv_b") - assert send_a.result.unwrap() == recv_b.result.unwrap() - assert send_b.result.unwrap() == recv_a.result.unwrap() + assert results["send_a"] == results["recv_b"] + assert results["send_b"] == results["recv_a"] diff --git a/trio/_core/tests/test_ki.py b/trio/_core/tests/test_ki.py index b8e650f2dc..dd4c732cda 100644 --- a/trio/_core/tests/test_ki.py +++ b/trio/_core/tests/test_ki.py @@ -62,10 +62,10 @@ async def aunprotected(): await aprotected() # make sure that the decorator here overrides the automatic manipulation - # that spawn() does: + # that start_soon() does: async with _core.open_nursery() as nursery: - nursery.spawn(aprotected) - nursery.spawn(aunprotected) + nursery.start_soon(aprotected) + nursery.start_soon(aunprotected) @_core.enable_ki_protection def gen_protected(): @@ -235,9 +235,9 @@ async def raiser(name, record): async def check_unprotected_kill(): async with _core.open_nursery() as nursery: - nursery.spawn(sleeper, "s1", record) - nursery.spawn(sleeper, "s2", record) - nursery.spawn(raiser, "r1", record) + nursery.start_soon(sleeper, "s1", record) + nursery.start_soon(sleeper, "s2", record) + nursery.start_soon(raiser, "r1", record) with pytest.raises(KeyboardInterrupt): _core.run(check_unprotected_kill) @@ -250,9 +250,11 @@ async def check_unprotected_kill(): async def check_protected_kill(): async with _core.open_nursery() as nursery: - nursery.spawn(sleeper, "s1", record) - nursery.spawn(sleeper, "s2", record) - nursery.spawn(_core.enable_ki_protection(raiser), "r1", record) + nursery.start_soon(sleeper, "s1", record) + nursery.start_soon(sleeper, "s2", record) + nursery.start_soon( + _core.enable_ki_protection(raiser), "r1", record + ) # __aexit__ blocks, and then receives the KI with pytest.raises(KeyboardInterrupt): diff --git a/trio/_core/tests/test_local.py b/trio/_core/tests/test_local.py index 38de8443b0..4dba087e92 100644 --- a/trio/_core/tests/test_local.py +++ b/trio/_core/tests/test_local.py @@ -40,7 +40,7 @@ async def child(): assert local.b == 2 async with _core.open_nursery() as nursery: - nursery.spawn(child) + nursery.start_soon(child) async def test_local_isolation(): @@ -73,8 +73,8 @@ async def child2(): rlocal.a = "run child2" async with _core.open_nursery() as nursery: - nursery.spawn(child1) - nursery.spawn(child2) + nursery.start_soon(child1) + nursery.start_soon(child2) assert tlocal.a == "task root" assert rlocal.a == "run child2" @@ -148,13 +148,13 @@ async def test_local_inheritance_from_spawner_not_supervisor(): async def spawner(nursery): t.x = "spawner" - nursery.spawn(child) + nursery.start_soon(child) async def child(): assert t.x == "spawner" async with _core.open_nursery() as nursery: - nursery.spawn(spawner, nursery) + nursery.start_soon(spawner, nursery) async def test_local_defaults(): diff --git a/trio/_core/tests/test_parking_lot.py b/trio/_core/tests/test_parking_lot.py index 2d49d98127..de6de63081 100644 --- a/trio/_core/tests/test_parking_lot.py +++ b/trio/_core/tests/test_parking_lot.py @@ -20,7 +20,7 @@ async def waiter(i, lot): assert len(lot) == 0 assert lot.statistics().tasks_waiting == 0 for i in range(3): - nursery.spawn(waiter, i, lot) + nursery.start_soon(waiter, i, lot) await wait_all_tasks_blocked() assert len(record) == 3 assert bool(lot) @@ -41,7 +41,7 @@ async def waiter(i, lot): async with _core.open_nursery() as nursery: record = [] for i in range(3): - nursery.spawn(waiter, i, lot) + nursery.start_soon(waiter, i, lot) await wait_all_tasks_blocked() assert len(record) == 3 for i in range(3): @@ -66,7 +66,7 @@ async def waiter(i, lot): async with _core.open_nursery() as nursery: record = [] for i in range(3): - nursery.spawn(waiter, i, lot) + nursery.start_soon(waiter, i, lot) await wait_all_tasks_blocked() lot.unpark(count=2) await wait_all_tasks_blocked() @@ -83,7 +83,7 @@ async def waiter(i, lot): async def cancellable_waiter(name, lot, scopes, record): with _core.open_cancel_scope() as scope: - scopes[_core.current_task()] = scope + scopes[name] = scope record.append("sleep {}".format(name)) try: await lot.park() @@ -99,15 +99,15 @@ async def test_parking_lot_cancel(): async with _core.open_nursery() as nursery: lot = ParkingLot() - w1 = nursery.spawn(cancellable_waiter, 1, lot, scopes, record) + nursery.start_soon(cancellable_waiter, 1, lot, scopes, record) await wait_all_tasks_blocked() - w2 = nursery.spawn(cancellable_waiter, 2, lot, scopes, record) + nursery.start_soon(cancellable_waiter, 2, lot, scopes, record) await wait_all_tasks_blocked() - w3 = nursery.spawn(cancellable_waiter, 3, lot, scopes, record) + nursery.start_soon(cancellable_waiter, 3, lot, scopes, record) await wait_all_tasks_blocked() assert len(record) == 3 - scopes[w2].cancel() + scopes[2].cancel() await wait_all_tasks_blocked() assert len(record) == 4 lot.unpark_all() @@ -135,11 +135,11 @@ async def test_parking_lot_repark(): lot1.repark([]) async with _core.open_nursery() as nursery: - w1 = nursery.spawn(cancellable_waiter, 1, lot1, scopes, record) + nursery.start_soon(cancellable_waiter, 1, lot1, scopes, record) await wait_all_tasks_blocked() - w2 = nursery.spawn(cancellable_waiter, 2, lot1, scopes, record) + nursery.start_soon(cancellable_waiter, 2, lot1, scopes, record) await wait_all_tasks_blocked() - w3 = nursery.spawn(cancellable_waiter, 3, lot1, scopes, record) + nursery.start_soon(cancellable_waiter, 3, lot1, scopes, record) await wait_all_tasks_blocked() assert len(record) == 3 @@ -156,7 +156,7 @@ async def test_parking_lot_repark(): assert len(lot1) == 0 assert len(lot2) == 2 - scopes[w2].cancel() + scopes[2].cancel() await wait_all_tasks_blocked() assert len(lot2) == 1 assert record == [ @@ -176,11 +176,11 @@ async def test_parking_lot_repark_with_count(): lot1 = ParkingLot() lot2 = ParkingLot() async with _core.open_nursery() as nursery: - w1 = nursery.spawn(cancellable_waiter, 1, lot1, scopes, record) + nursery.start_soon(cancellable_waiter, 1, lot1, scopes, record) await wait_all_tasks_blocked() - w2 = nursery.spawn(cancellable_waiter, 2, lot1, scopes, record) + nursery.start_soon(cancellable_waiter, 2, lot1, scopes, record) await wait_all_tasks_blocked() - w3 = nursery.spawn(cancellable_waiter, 3, lot1, scopes, record) + nursery.start_soon(cancellable_waiter, 3, lot1, scopes, record) await wait_all_tasks_blocked() assert len(record) == 3 diff --git a/trio/_core/tests/test_run.py b/trio/_core/tests/test_run.py index 32dfc1bd6a..476defb1a8 100644 --- a/trio/_core/tests/test_run.py +++ b/trio/_core/tests/test_run.py @@ -91,7 +91,7 @@ async def main(): # pragma: no cover assert "from inside" in str(excinfo.value) -async def test_basic_spawn_wait(): +async def test_basic_spawn_wait(recwarn): async def child(x): return 2 * x @@ -115,7 +115,8 @@ async def test_nursery_warn_use_async_with(): pass -async def test_child_crash_basic(): +# can remove after 0.2.0 +async def test_child_crash_basic_deprecated(recwarn): exc = ValueError("uh oh") async def erroring(): @@ -127,15 +128,22 @@ async def erroring(): assert task.result.error is exc nursery.reap(task) + +async def test_child_crash_basic(recwarn): + exc = ValueError("uh oh") + + async def erroring(): + raise exc + try: # nursery.__aexit__ propagates exception from child back to parent async with _core.open_nursery() as nursery: - nursery.spawn(erroring) + nursery.start_soon(erroring) except ValueError as e: assert e is exc -async def test_reap_bad_task(): +async def test_reap_bad_task(recwarn): async def child(): pass @@ -159,8 +167,8 @@ async def looper(whoami, record): record = [] async with _core.open_nursery() as nursery: - t1 = nursery.spawn(looper, "a", record) - t2 = nursery.spawn(looper, "b", record) + nursery.start_soon(looper, "a", record) + nursery.start_soon(looper, "b", record) check_sequence_matches( record, @@ -186,8 +194,8 @@ async def crasher(): async def main(): async with _core.open_nursery() as nursery: - nursery.spawn(looper) - nursery.spawn(crasher) + nursery.start_soon(looper) + nursery.start_soon(crasher) with pytest.raises(ValueError) as excinfo: _core.run(main) @@ -196,7 +204,7 @@ async def main(): assert excinfo.value.args == ("argh",) -def test_main_and_task_both_crash(): +def test_main_and_task_both_crash(recwarn): # If main crashes and there's also a task crash, then we get both in a # MultiError async def crasher(): @@ -224,8 +232,8 @@ async def crasher(etype): async def main(): async with _core.open_nursery() as nursery: - nursery.spawn(crasher, KeyError) - nursery.spawn(crasher, ValueError) + nursery.start_soon(crasher, KeyError) + nursery.start_soon(crasher, ValueError) with pytest.raises(_core.MultiError) as excinfo: _core.run(main) @@ -234,7 +242,12 @@ async def main(): async def test_reschedule(): + t1 = None + t2 = None + async def child1(): + nonlocal t1, t2 + t1 = _core.current_task() print("child1 start") x = await sleep_forever() print("child1 woke") @@ -244,7 +257,9 @@ async def child1(): print("child1 exit") async def child2(): + nonlocal t1, t2 print("child2 start") + t2 = _core.current_task() _core.reschedule(t1, _core.Value(0)) print("child2 sleep") with pytest.raises(ValueError): @@ -252,13 +267,13 @@ async def child2(): print("child2 successful exit") async with _core.open_nursery() as nursery: - t1 = nursery.spawn(child1) + nursery.start_soon(child1) # let t1 run and fall asleep await _core.yield_briefly() - t2 = nursery.spawn(child2) + nursery.start_soon(child2) -async def test_task_monitor(): +async def test_task_monitor(recwarn): async def child(): return 1 @@ -301,7 +316,7 @@ async def child(): # loop around and do it again -async def test_bad_monitor_object(): +async def test_bad_monitor_object(recwarn): task = _core.current_task() with pytest.raises(TypeError): @@ -338,12 +353,13 @@ async def test_current_clock(mock_clock): async def test_current_task(): + parent_task = _core.current_task() + async def child(): - return _core.current_task() + assert _core.current_task().parent_nursery.parent_task is parent_task async with _core.open_nursery() as nursery: - child_task = nursery.spawn(child) - assert child_task == child_task.result.unwrap() + nursery.start_soon(child) def test_out_of_context(): @@ -371,7 +387,7 @@ async def child(): assert stats.call_soon_queue_size == 0 async with _core.open_nursery() as nursery: - task = nursery.spawn(child) + nursery.start_soon(child) await wait_all_tasks_blocked() call_soon = _core.current_call_soon_thread_and_signal_safe() call_soon(lambda: None) @@ -503,15 +519,17 @@ def test_instruments_interleave(): tasks = {} async def two_step1(): + tasks["t1"] = _core.current_task() await _core.yield_briefly() async def two_step2(): + tasks["t2"] = _core.current_task() await _core.yield_briefly() async def main(): async with _core.open_nursery() as nursery: - tasks["t1"] = nursery.spawn(two_step1) - tasks["t2"] = nursery.spawn(two_step2) + nursery.start_soon(two_step1) + nursery.start_soon(two_step2) r = TaskRecorder() _core.run(main, instruments=[r]) @@ -686,19 +704,16 @@ async def crasher(): try: async with _core.open_nursery() as nursery: # Two children that get cancelled by the nursery scope - t1 = nursery.spawn(child) - t2 = nursery.spawn(child) + nursery.start_soon(child) # t1 + nursery.start_soon(child) # t2 nursery.cancel_scope.cancel() with _core.open_cancel_scope(shield=True): - # Make sure they receive the inner cancellation - # exception before we cancel the outer scope - await t1.wait() - await t2.wait() + await wait_all_tasks_blocked() # One child that gets cancelled by the outer scope - t3 = nursery.spawn(child) + nursery.start_soon(child) # t3 outer.cancel() # And one that raises a different error - t4 = nursery.spawn(crasher) + nursery.start_soon(crasher) # t4 except _core.MultiError as multi_exc: # This is outside the nursery scope but inside the outer # scope, so the nursery should have absorbed t1 and t2's @@ -731,7 +746,7 @@ async def blocker(): async with _core.open_nursery() as nursery: nursery.cancel_scope.cancel() - nursery.spawn(blocker) + nursery.start_soon(blocker) assert record == ["started"] @@ -787,20 +802,15 @@ async def leaf(ident): async def worker(ident): async with _core.open_nursery() as nursery: - t1 = nursery.spawn(leaf, ident + "-l1") - t2 = nursery.spawn(leaf, ident + "-l2") - with _core.open_cancel_scope(shield=True): - await t1.wait() - await t2.wait() + nursery.start_soon(leaf, ident + "-l1") + nursery.start_soon(leaf, ident + "-l2") async with _core.open_nursery() as nursery: - w1 = nursery.spawn(worker, "w1") - w2 = nursery.spawn(worker, "w2") + nursery.start_soon(worker, "w1") + nursery.start_soon(worker, "w2") nursery.cancel_scope.cancel() - with _core.open_cancel_scope(shield=True): - await w1.wait() - await w2.wait() - assert record == {"w1-l1", "w1-l2", "w2-l1", "w2-l2"} + + assert record == {"w1-l1", "w1-l2", "w2-l1", "w2-l2"} async def test_cancel_shield_abort(): @@ -819,7 +829,7 @@ async def sleeper(): except _core.Cancelled: record.append("cancelled") - task = nursery.spawn(sleeper) + nursery.start_soon(sleeper) await wait_all_tasks_blocked() assert record == ["sleeping"] # now when we unshield, it should abort the sleep. @@ -830,7 +840,8 @@ async def sleeper(): # written, without these last few lines, the test spuriously # passed, even though shield assignment was buggy.) with _core.open_cancel_scope(shield=True): - await task.wait() + await wait_all_tasks_blocked() + assert record == ["sleeping", "cancelled"] async def test_basic_timeout(mock_clock): @@ -929,10 +940,12 @@ async def test_timekeeping(): async def test_failed_abort(): + stubborn_task = [None] stubborn_scope = [None] record = [] async def stubborn_sleeper(): + stubborn_task[0] = _core.current_task() with _core.open_cancel_scope() as scope: stubborn_scope[0] = scope record.append("sleep") @@ -945,7 +958,7 @@ async def stubborn_sleeper(): record.append("cancelled") async with _core.open_nursery() as nursery: - task = nursery.spawn(stubborn_sleeper) + nursery.start_soon(stubborn_sleeper) await wait_all_tasks_blocked() assert record == ["sleep"] stubborn_scope[0].cancel() @@ -953,7 +966,7 @@ async def stubborn_sleeper(): # cancel didn't wake it up assert record == ["sleep"] # wake it up again by hand - _core.reschedule(task, _core.Value(1)) + _core.reschedule(stubborn_task[0], _core.Value(1)) assert record == ["sleep", "woke", "cancelled"] @@ -1003,8 +1016,8 @@ async def system_task(x): record.append(("ki", _core.currently_ki_protected())) await _core.yield_briefly() - task = _core.spawn_system_task(system_task, 1) - await task.wait() + _core.spawn_system_task(system_task, 1) + await wait_all_tasks_blocked() assert record == [("x", 1), ("ki", True)] @@ -1015,8 +1028,8 @@ async def crasher(): raise KeyError async def main(): - task = _core.spawn_system_task(crasher) - await task.wait() + _core.spawn_system_task(crasher) + await sleep_forever() with pytest.raises(_core.TrioInternalError): _core.run(main) @@ -1031,8 +1044,8 @@ async def crasher2(): async def system_task(): async with _core.open_nursery() as nursery: - nursery.spawn(crasher1) - nursery.spawn(crasher2) + nursery.start_soon(crasher1) + nursery.start_soon(crasher2) async def main(): _core.spawn_system_task(system_task) @@ -1063,8 +1076,8 @@ async def cancelme(): async def system_task(): async with _core.open_nursery() as nursery: - nursery.spawn(crasher) - nursery.spawn(cancelme) + nursery.start_soon(crasher) + nursery.start_soon(cancelme) async def main(): _core.spawn_system_task(system_task) @@ -1152,8 +1165,8 @@ async def child2(): record.append("child2 success") async with _core.open_nursery() as nursery: - nursery.spawn(child1) - nursery.spawn(child2) + nursery.start_soon(child1) + nursery.start_soon(child2) assert record == [ "child1 raise", "child1 sleep", "child2 wake", "child2 sleep again", @@ -1168,7 +1181,12 @@ async def child2(): # # https://bugs.python.org/issue29587 async def test_exc_info_after_yield_error(): + child_task = None + async def child(): + nonlocal child_task + child_task = _core.current_task() + try: raise KeyError except Exception: @@ -1178,32 +1196,34 @@ async def child(): pass raise - async with _core.open_nursery() as nursery: - t = nursery.spawn(child) - await wait_all_tasks_blocked() - _core.reschedule(t, _core.Error(ValueError())) - await t.wait() - with pytest.raises(KeyError): - nursery.reap_and_unwrap(t) + with pytest.raises(KeyError): + async with _core.open_nursery() as nursery: + nursery.start_soon(child) + await wait_all_tasks_blocked() + _core.reschedule(child_task, _core.Error(ValueError())) # Similar to previous test -- if the ValueError() gets sent in via 'throw', -# then Python's normal implicit chaining stuff is broken. We have to +# then Python's normal implicit chaining stuff is broken. async def test_exception_chaining_after_yield_error(): + child_task = None + async def child(): + nonlocal child_task + child_task = _core.current_task() + try: raise KeyError except Exception: await sleep_forever() - async with _core.open_nursery() as nursery: - t = nursery.spawn(child) - await wait_all_tasks_blocked() - _core.reschedule(t, _core.Error(ValueError())) - await t.wait() - with pytest.raises(ValueError) as excinfo: - nursery.reap_and_unwrap(t) - assert isinstance(excinfo.value.__context__, KeyError) + with pytest.raises(ValueError) as excinfo: + async with _core.open_nursery() as nursery: + nursery.start_soon(child) + await wait_all_tasks_blocked() + _core.reschedule(child_task, _core.Error(ValueError())) + + assert isinstance(excinfo.value.__context__, KeyError) async def test_call_soon_basic(): @@ -1456,7 +1476,7 @@ def slow_abort(raise_cancel): async with _core.open_nursery() as nursery: # So we have a task blocked on an operation that can't be # aborted immediately - nursery.spawn(slow_aborter) + nursery.start_soon(slow_aborter) await wait_all_tasks_blocked() assert record == ["sleeping"] # And then we cancel it, so the abort callback gets run @@ -1473,21 +1493,22 @@ def slow_abort(raise_cancel): assert record == ["sleeping", "abort-called", "cancelled", "done"] -async def test_parent_task(): +async def test_Task_parent_task_deprecated(recwarn): + tasks = {} + async def child2(): - pass + tasks["child2"] = _core.current_task() async def child1(): + tasks["child1"] = _core.current_task() async with _core.open_nursery() as nursery: - return nursery.spawn(child2) + return nursery.start_soon(child2) async with _core.open_nursery() as nursery: - t1 = nursery.spawn(child1) - await t1.wait() - t2 = t1.result.unwrap() + nursery.start_soon(child1) - assert t1.parent_task is _core.current_task() - assert t2.parent_task is t1 + assert tasks["child1"].parent_task is _core.current_task() + assert tasks["child2"].parent_task is tasks["child1"] t = _core.current_task() # Make sure that chaining parent_task eventually gives None (and not, for @@ -1496,46 +1517,85 @@ async def child1(): t = t.parent_task +async def test_task_tree_introspection(): + tasks = {} + + tasks["parent"] = _core.current_task() + + assert tasks["parent"].child_nurseries == [] + + async with _core.open_nursery() as nursery1: + async with _core.open_nursery() as nursery2: + assert tasks["parent"].child_nurseries == [nursery1, nursery2] + + assert tasks["parent"].child_nurseries == [] + + nurseries = {} + + async def child2(): + tasks["child2"] = _core.current_task() + assert tasks["parent"].child_nurseries == [nurseries["parent"]] + assert nurseries["parent"].child_tasks == frozenset({tasks["child1"]}) + assert tasks["child1"].child_nurseries == [nurseries["child1"]] + assert nurseries["child1"].child_tasks == frozenset({tasks["child2"]}) + assert tasks["child2"].child_nurseries == [] + + async def child1(): + tasks["child1"] = _core.current_task() + async with _core.open_nursery() as nursery: + nurseries["child1"] = nursery + nursery.start_soon(child2) + + async with _core.open_nursery() as nursery: + nurseries["parent"] = nursery + nursery.start_soon(child1) + + # Upward links survive after tasks/nurseries exit + assert nurseries["parent"].parent_task is tasks["parent"] + assert tasks["child1"].parent_nursery is nurseries["parent"] + assert nurseries["child1"].parent_task is tasks["child1"] + assert tasks["child2"].parent_nursery is nurseries["child1"] + + nursery = _core.current_task().parent_nursery + # Make sure that chaining eventually gives a nursery of None (and not, for + # example, an error) + while nursery is not None: + t = nursery.parent_task + nursery = t.parent_nursery + + async def test_nursery_closure(): async def child1(nursery): # We can add new tasks to the nursery even after entering __aexit__, # so long as there are still tasks running - nursery.spawn(child2) + nursery.start_soon(child2) async def child2(): pass async with _core.open_nursery() as nursery: - nursery.spawn(child1, nursery) + nursery.start_soon(child1, nursery) # But once we've left __aexit__, the nursery is closed with pytest.raises(RuntimeError): - nursery.spawn(child2) + nursery.start_soon(child2) async def test_spawn_name(): - async def func1(): - pass + async def func1(expected): + task = _core.current_task() + assert expected in task.name async def func2(): # pragma: no cover pass async with _core.open_nursery() as nursery: - for spawn_fn in [nursery.spawn, _core.spawn_system_task]: - t0 = spawn_fn(func1) - assert "func1" in t0.name - - t1 = spawn_fn(func1, name=func2) - assert "func2" in t1.name - - t2 = spawn_fn(func1, name="func3") - assert "func3" == t2.name - - t3 = spawn_fn(functools.partial(func1)) - assert "func1" in t3.name - - t4 = spawn_fn(func1, name=object()) - assert "object" in t4.name + for spawn_fn in [nursery.start_soon, _core.spawn_system_task]: + spawn_fn(func1, "func1") + spawn_fn(func1, "func2", name=func2) + spawn_fn(func1, "func3", name="func3") + spawn_fn(functools.partial(func1, "func1")) + spawn_fn(func1, "object", name=object()) async def test_current_effective_deadline(mock_clock): @@ -1567,7 +1627,7 @@ def bad_call_run(*args): def bad_call_spawn(*args): async def main(): async with _core.open_nursery() as nursery: - nursery.spawn(*args) + nursery.start_soon(*args) _core.run(main) @@ -1629,7 +1689,7 @@ async def misguided(): assert "asyncio" in str(excinfo.value) -async def test_trivial_yields(): +async def test_trivial_yields(recwarn): with assert_yields(): await _core.yield_briefly() @@ -1681,11 +1741,11 @@ async def sleep_then_start(seconds, *, task_status=_core.STATUS_IGNORED): # to exit. for seconds in [1, 2]: async with _core.open_nursery() as nursery: - assert len(nursery.children) == 0 + assert len(nursery.child_tasks) == 0 t0 = _core.current_time() assert await nursery.start(sleep_then_start, seconds) == seconds assert _core.current_time() - t0 == seconds - assert len(nursery.children) == 1 + assert len(nursery.child_tasks) == 1 assert _core.current_time() - t0 == 2 * seconds # Make sure STATUS_IGNORED works so task function can be called directly @@ -1838,3 +1898,26 @@ async def start_sleep_then_crash(nursery): nursery1.start_soon(start_sleep_then_crash, nursery2) await wait_all_tasks_blocked() assert _core.current_time() - t0 == 7 + + +# can remove after 0.2.0 +async def test_some_deprecated_but_uncovered_methods(recwarn): + async def noop(): + return 33 + + async with _core.open_nursery() as nursery: + assert not nursery.zombies + assert not nursery.children + + nursery.start_soon(noop) + assert len(nursery.children) == 1 + + await wait_all_tasks_blocked() + assert len(nursery.zombies) == 1 + assert not nursery.children + + batch = await nursery.monitor.get_batch() + for task in batch: + assert nursery.reap_and_unwrap(task) == 33 + + assert not nursery.zombies diff --git a/trio/_core/tests/test_unbounded_queue.py b/trio/_core/tests/test_unbounded_queue.py index b74709e8c5..3b57b70f9c 100644 --- a/trio/_core/tests/test_unbounded_queue.py +++ b/trio/_core/tests/test_unbounded_queue.py @@ -49,7 +49,7 @@ async def aiter_consumer(): for consumer in (get_batch_consumer, aiter_consumer): record.clear() async with _core.open_nursery() as nursery: - task = nursery.spawn(consumer) + nursery.start_soon(consumer) await _core.wait_all_tasks_blocked() stats = q.statistics() assert stats.qsize == 0 @@ -72,16 +72,21 @@ async def test_UnboundedQueue_fairness(): q.put_nowait(2) assert q.get_batch_nowait() == [1, 2] + result = None + + async def get_batch(q): + nonlocal result + result = await q.get_batch() + # But if someone else is waiting to read, then they get dibs async with _core.open_nursery() as nursery: - t = nursery.spawn(q.get_batch) + nursery.start_soon(get_batch, q) await _core.wait_all_tasks_blocked() q.put_nowait(3) q.put_nowait(4) with pytest.raises(_core.WouldBlock): q.get_batch_nowait() - await t.wait() - assert t.result.unwrap() == [3, 4] + assert result == [3, 4] # If two tasks are trying to read, they alternate record = [] @@ -91,9 +96,9 @@ async def reader(name): record.append((name, await q.get_batch())) async with _core.open_nursery() as nursery: - nursery.spawn(reader, "a") + nursery.start_soon(reader, "a") await _core.wait_all_tasks_blocked() - nursery.spawn(reader, "b") + nursery.start_soon(reader, "b") await _core.wait_all_tasks_blocked() for i in range(20): @@ -129,17 +134,15 @@ async def getter(q, i): async with _core.open_nursery() as nursery: q = _core.UnboundedQueue() - t1 = nursery.spawn(getter, q, 1) + nursery.start_soon(getter, q, 1) await wait_all_tasks_blocked() - t2 = nursery.spawn(getter, q, 2) + nursery.start_soon(getter, q, 2) await wait_all_tasks_blocked() for i in range(10): q.put_nowait(i) await wait_all_tasks_blocked() - assert t1.result is not None - assert t2.result is None assert record == [(1, list(range(10)))] nursery.cancel_scope.cancel() diff --git a/trio/_core/tests/test_windows.py b/trio/_core/tests/test_windows.py index 77bd27c0ae..13df26705a 100644 --- a/trio/_core/tests/test_windows.py +++ b/trio/_core/tests/test_windows.py @@ -24,7 +24,7 @@ async def post(key): with _core.monitor_completion_key() as (key, queue): async with _core.open_nursery() as nursery: - task = nursery.spawn(post, key) + nursery.start_soon(post, key) i = 0 print("loop") async for batch in queue: # pragma: no branch diff --git a/trio/_deprecate.py b/trio/_deprecate.py index 855822907c..3227c71dea 100644 --- a/trio/_deprecate.py +++ b/trio/_deprecate.py @@ -1,6 +1,10 @@ +import sys from functools import wraps, partial +from types import ModuleType import warnings +import attr + __all__ = ["TrioDeprecationWarning"] @@ -93,3 +97,38 @@ def wrapper(*args, **kwargs): wrapper.__qualname__ = old_qualname wrapper.__name__ = old_qualname.rpartition(".")[-1] return wrapper + + +@attr.s(frozen=True) +class DeprecatedAttribute: + _not_set = object() + + value = attr.ib() + version = attr.ib() + issue = attr.ib() + instead = attr.ib(default=_not_set) + + +class ModuleWithDeprecations(ModuleType): + def __getattr__(self, name): + if name in self.__deprecated_attributes__: + info = self.__deprecated_attributes__[name] + instead = info.instead + if instead is DeprecatedAttribute._not_set: + instead = info.value + thing = "{}.{}".format(self.__name__, name) + warn_deprecated( + thing, info.version, issue=info.issue, instead=instead + ) + return info.value + + raise AttributeError(name) + + +def enable_attribute_deprecations(module_name): + module = sys.modules[module_name] + try: + module.__class__ = ModuleWithDeprecations + except TypeError: + # Need PyPy 5.9+ to support module __class__ assignment + return diff --git a/trio/_highlevel_open_tcp_stream.py b/trio/_highlevel_open_tcp_stream.py index 3dfb17adfd..1036e557f0 100644 --- a/trio/_highlevel_open_tcp_stream.py +++ b/trio/_highlevel_open_tcp_stream.py @@ -272,7 +272,7 @@ async def attempt_connect(nursery, previous_attempt_failed): # Then kick off the next attempt. this_attempt_failed = trio.Event() - nursery.spawn(attempt_connect, nursery, this_attempt_failed) + nursery.start_soon(attempt_connect, nursery, this_attempt_failed) # Then make this invocation's attempt try: @@ -293,7 +293,7 @@ async def attempt_connect(nursery, previous_attempt_failed): # Kick off the chain of connection attempts. async with trio.open_nursery() as nursery: - nursery.spawn(attempt_connect, nursery, None) + nursery.start_soon(attempt_connect, nursery, None) # All connection attempts complete, and no unexpected errors escaped. So # at this point the oserrors and winning_sockets lists are filled in. diff --git a/trio/_signals.py b/trio/_signals.py index bd9fca40b5..817d8b6cbf 100644 --- a/trio/_signals.py +++ b/trio/_signals.py @@ -113,10 +113,7 @@ def catch_signals(signals): The async iterator blocks until at least one signal has arrived, and then yields a :class:`set` containing all of the signals that were received - since the last iteration. (This is generally similar to how - :class:`UnboundedQueue` works, but since Unix semantics are that identical - signals can/should be coalesced, here we use a :class:`set` for storage - instead of a :class:`list`.) + since the last iteration. Note that if you leave the ``with`` block while the iterator has unextracted signals still pending inside it, then they will be diff --git a/trio/_socket.py b/trio/_socket.py index b778bca459..56dca4481a 100644 --- a/trio/_socket.py +++ b/trio/_socket.py @@ -394,7 +394,6 @@ def real_socket_type(type_num): return type_num & _SOCK_TYPE_MASK -@_add_to_all class _SocketType: def __init__(self, sock): if type(sock) is not _stdlib_socket.socket: diff --git a/trio/_sync.py b/trio/_sync.py index 7b1b467f7c..82c02477d6 100644 --- a/trio/_sync.py +++ b/trio/_sync.py @@ -232,8 +232,8 @@ def acquire_on_behalf_of_nowait(self, borrower): blocking. Args: - borrower: A :class:`Task` or arbitrary opaque object used to record - who is borrowing this token. This is used by + borrower: A :class:`trio.hazmat.Task` or arbitrary opaque object + used to record who is borrowing this token. This is used by :func:`run_sync_in_worker_thread` to allow threads to "hold tokens", with the intention in the future of using it to `allow deadlock detection and other useful things @@ -272,8 +272,8 @@ async def acquire_on_behalf_of(self, borrower): necessary. Args: - borrower: A :class:`Task` or arbitrary opaque object used to record - who is borrowing this token; see + borrower: A :class:`trio.hazmat.Task` or arbitrary opaque object + used to record who is borrowing this token; see :meth:`acquire_on_behalf_of_nowait` for details. Raises: @@ -579,8 +579,8 @@ def statistics(self): Currently the following fields are defined: * ``locked``: boolean indicating whether the lock is held. - * ``owner``: the :class:`Task` currently holding the lock, or None if - the lock is not held. + * ``owner``: the :class:`trio.hazmat.Task` currently holding the lock, + or None if the lock is not held. * ``tasks_waiting``: The number of tasks blocked on this lock's :meth:`acquire` method. @@ -808,8 +808,7 @@ class Queue: """A bounded queue suitable for inter-task communication. This class is generally modelled after :class:`queue.Queue`, but with the - major difference that it is always bounded. For an unbounded queue, see - :class:`trio.UnboundedQueue`. + major difference that it is always bounded. A :class:`Queue` object can be used as an asynchronous iterator, that dequeues objects one at a time. I.e., these two loops are equivalent:: diff --git a/trio/_threads.py b/trio/_threads.py index 5799007ac5..26fe21bc67 100644 --- a/trio/_threads.py +++ b/trio/_threads.py @@ -13,7 +13,6 @@ "current_run_in_trio_thread", "run_sync_in_worker_thread", "current_default_worker_thread_limiter", - "run_in_worker_thread", ] @@ -340,8 +339,3 @@ def abort(_): return _core.Abort.FAILED return await _core.yield_indefinitely(abort) - - -run_in_worker_thread = deprecated_alias( - "run_in_worker_thread", run_sync_in_worker_thread, "0.2.0", issue=68 -) diff --git a/trio/socket.py b/trio/socket.py index 93af20e383..3a7dd841a4 100644 --- a/trio/socket.py +++ b/trio/socket.py @@ -6,3 +6,14 @@ # here. from ._socket import * from ._socket import __all__ + +from . import _deprecate +from ._socket import _SocketType +_deprecate.enable_attribute_deprecations(__name__) +__deprecated_attributes__ = { + "SocketType": + _deprecate.DeprecatedAttribute( + _SocketType, "0.2.0", issue=170, instead="is_trio_socket" + ) +} +del _deprecate, _SocketType diff --git a/trio/testing/_check_streams.py b/trio/testing/_check_streams.py index 4f4cec667f..a7effb7c06 100644 --- a/trio/testing/_check_streams.py +++ b/trio/testing/_check_streams.py @@ -82,8 +82,8 @@ async def do_aclose(resource): # Simple sending/receiving async with _core.open_nursery() as nursery: - nursery.spawn(do_send_all, b"x") - nursery.spawn(checked_receive_1, b"x") + nursery.start_soon(do_send_all, b"x") + nursery.start_soon(checked_receive_1, b"x") async def send_empty_then_y(): # Streams should tolerate sending b"" without giving it any @@ -92,19 +92,19 @@ async def send_empty_then_y(): await do_send_all(b"y") async with _core.open_nursery() as nursery: - nursery.spawn(send_empty_then_y) - nursery.spawn(checked_receive_1, b"y") + nursery.start_soon(send_empty_then_y) + nursery.start_soon(checked_receive_1, b"y") ### Checking various argument types # send_all accepts bytearray and memoryview async with _core.open_nursery() as nursery: - nursery.spawn(do_send_all, bytearray(b"1")) - nursery.spawn(checked_receive_1, b"1") + nursery.start_soon(do_send_all, bytearray(b"1")) + nursery.start_soon(checked_receive_1, b"1") async with _core.open_nursery() as nursery: - nursery.spawn(do_send_all, memoryview(b"2")) - nursery.spawn(checked_receive_1, b"2") + nursery.start_soon(do_send_all, memoryview(b"2")) + nursery.start_soon(checked_receive_1, b"2") # max_bytes must be a positive integer with _assert_raises(ValueError): @@ -116,25 +116,25 @@ async def send_empty_then_y(): with _assert_raises(_core.ResourceBusyError): async with _core.open_nursery() as nursery: - nursery.spawn(do_receive_some, 1) - nursery.spawn(do_receive_some, 1) + nursery.start_soon(do_receive_some, 1) + nursery.start_soon(do_receive_some, 1) # Method always has to exist, and an empty stream with a blocked # receive_some should *always* allow send_all. (Technically it's legal # for send_all to wait until receive_some is called to run, though; a # stream doesn't *have* to have any internal buffering. That's why we - # spawn a concurrent receive_some call, then cancel it.) + # start a concurrent receive_some call, then cancel it.) async def simple_check_wait_send_all_might_not_block(scope): with assert_yields(): await s.wait_send_all_might_not_block() scope.cancel() async with _core.open_nursery() as nursery: - nursery.spawn( + nursery.start_soon( simple_check_wait_send_all_might_not_block, nursery.cancel_scope ) - nursery.spawn(do_receive_some, 1) + nursery.start_soon(do_receive_some, 1) # closing the r side leads to BrokenStreamError on the s side # (eventually) @@ -144,8 +144,8 @@ async def expect_broken_stream_on_send(): await do_send_all(b"x" * 100) async with _core.open_nursery() as nursery: - nursery.spawn(expect_broken_stream_on_send) - nursery.spawn(do_aclose, r) + nursery.start_soon(expect_broken_stream_on_send) + nursery.start_soon(do_aclose, r) # once detected, the stream stays broken with _assert_raises(BrokenStreamError): @@ -192,8 +192,8 @@ async def receive_send_then_close(): await do_aclose(r) async with _core.open_nursery() as nursery: - nursery.spawn(send_then_close) - nursery.spawn(receive_send_then_close) + nursery.start_soon(send_then_close) + nursery.start_soon(receive_send_then_close) async with _ForceCloseBoth(await stream_maker()) as (s, r): await aclose_forcefully(r) @@ -252,12 +252,12 @@ async def expect_cancelled(afn, *args): with _core.open_cancel_scope() as scope: scope.cancel() async with _core.open_nursery() as nursery: - nursery.spawn(expect_cancelled, do_send_all, b"x") - nursery.spawn(expect_cancelled, do_receive_some, 1) + nursery.start_soon(expect_cancelled, do_send_all, b"x") + nursery.start_soon(expect_cancelled, do_receive_some, 1) async with _core.open_nursery() as nursery: - nursery.spawn(do_aclose, s) - nursery.spawn(do_aclose, r) + nursery.start_soon(do_aclose, s) + nursery.start_soon(do_aclose, r) # check wait_send_all_might_not_block, if we can if clogged_stream_maker is not None: @@ -279,9 +279,9 @@ async def receiver(): await r.receive_some(16834) async with _core.open_nursery() as nursery: - nursery.spawn(waiter, nursery.cancel_scope) + nursery.start_soon(waiter, nursery.cancel_scope) await _core.wait_all_tasks_blocked() - nursery.spawn(receiver) + nursery.start_soon(receiver) assert record == [ "waiter sleeping", @@ -293,8 +293,8 @@ async def receiver(): # simultaneous wait_send_all_might_not_block fails with _assert_raises(_core.ResourceBusyError): async with _core.open_nursery() as nursery: - nursery.spawn(s.wait_send_all_might_not_block) - nursery.spawn(s.wait_send_all_might_not_block) + nursery.start_soon(s.wait_send_all_might_not_block) + nursery.start_soon(s.wait_send_all_might_not_block) # and simultaneous send_all and wait_send_all_might_not_block (NB # this test might destroy the stream b/c we end up cancelling @@ -302,16 +302,16 @@ async def receiver(): # recreate afterwards) with _assert_raises(_core.ResourceBusyError): async with _core.open_nursery() as nursery: - nursery.spawn(s.wait_send_all_might_not_block) - nursery.spawn(s.send_all, b"123") + nursery.start_soon(s.wait_send_all_might_not_block) + nursery.start_soon(s.send_all, b"123") async with _ForceCloseBoth(await clogged_stream_maker()) as (s, r): # send_all and send_all blocked simultaneously should also raise # (but again this might destroy the stream) with _assert_raises(_core.ResourceBusyError): async with _core.open_nursery() as nursery: - nursery.spawn(s.send_all, b"123") - nursery.spawn(s.send_all, b"123") + nursery.start_soon(s.send_all, b"123") + nursery.start_soon(s.send_all, b"123") # closing the receiver causes wait_send_all_might_not_block to return async with _ForceCloseBoth(await clogged_stream_maker()) as (s, r): @@ -327,8 +327,8 @@ async def receiver(): await aclose_forcefully(r) async with _core.open_nursery() as nursery: - nursery.spawn(sender) - nursery.spawn(receiver) + nursery.start_soon(sender) + nursery.start_soon(receiver) # and again with the call starting after the close async with _ForceCloseBoth(await clogged_stream_maker()) as (s, r): @@ -398,18 +398,18 @@ async def receiver(s, data, seed): assert got == data async with _core.open_nursery() as nursery: - nursery.spawn(sender, s1, test_data, 0) - nursery.spawn(sender, s2, test_data[::-1], 1) - nursery.spawn(receiver, s1, test_data[::-1], 2) - nursery.spawn(receiver, s2, test_data, 3) + nursery.start_soon(sender, s1, test_data, 0) + nursery.start_soon(sender, s2, test_data[::-1], 1) + nursery.start_soon(receiver, s1, test_data[::-1], 2) + nursery.start_soon(receiver, s2, test_data, 3) async def expect_receive_some_empty(): assert await s2.receive_some(10) == b"" await s2.aclose() async with _core.open_nursery() as nursery: - nursery.spawn(expect_receive_some_empty) - nursery.spawn(s1.aclose) + nursery.start_soon(expect_receive_some_empty) + nursery.start_soon(s1.aclose) async def check_half_closeable_stream(stream_maker, clogged_stream_maker): @@ -442,8 +442,8 @@ async def expect_x_then_eof(r): assert await r.receive_some(10) == b"" async with _core.open_nursery() as nursery: - nursery.spawn(send_x_then_eof, s1) - nursery.spawn(expect_x_then_eof, s2) + nursery.start_soon(send_x_then_eof, s1) + nursery.start_soon(expect_x_then_eof, s2) # now sending is disallowed with _assert_raises(ClosedStreamError): @@ -455,25 +455,23 @@ async def expect_x_then_eof(r): # and we can still send stuff back the other way async with _core.open_nursery() as nursery: - nursery.spawn(send_x_then_eof, s2) - nursery.spawn(expect_x_then_eof, s1) + nursery.start_soon(send_x_then_eof, s2) + nursery.start_soon(expect_x_then_eof, s1) if clogged_stream_maker is not None: async with _ForceCloseBoth(await clogged_stream_maker()) as (s1, s2): # send_all and send_eof simultaneously is not ok with _assert_raises(_core.ResourceBusyError): async with _core.open_nursery() as nursery: - t = nursery.spawn(s1.send_all, b"x") + nursery.start_soon(s1.send_all, b"x") await _core.wait_all_tasks_blocked() - assert t.result is None - nursery.spawn(s1.send_eof) + nursery.start_soon(s1.send_eof) async with _ForceCloseBoth(await clogged_stream_maker()) as (s1, s2): # wait_send_all_might_not_block and send_eof simultaneously is not # ok either with _assert_raises(_core.ResourceBusyError): async with _core.open_nursery() as nursery: - t = nursery.spawn(s1.wait_send_all_might_not_block) + nursery.start_soon(s1.wait_send_all_might_not_block) await _core.wait_all_tasks_blocked() - assert t.result is None - nursery.spawn(s1.send_eof) + nursery.start_soon(s1.send_eof) diff --git a/trio/testing/_memory_streams.py b/trio/testing/_memory_streams.py index 8b7e31fe18..fc79ceb0dc 100644 --- a/trio/testing/_memory_streams.py +++ b/trio/testing/_memory_streams.py @@ -400,8 +400,8 @@ async def receiver(): print(data) async with trio.open_nursery() as nursery: - nursery.spawn(sender) - nursery.spawn(receiver) + nursery.start_soon(sender) + nursery.start_soon(receiver) By default, this will print ``b"12345"`` and then immediately exit; with our trickle stream it instead sleeps 1 second, then prints ``b"1"``, then diff --git a/trio/testing/_sequencer.py b/trio/testing/_sequencer.py index 50703f9f22..2a3c1d2ad0 100644 --- a/trio/testing/_sequencer.py +++ b/trio/testing/_sequencer.py @@ -45,9 +45,9 @@ async def worker3(seq): async def main(): seq = trio.testing.Sequencer() async with trio.open_nursery() as nursery: - nursery.spawn(worker1, seq) - nursery.spawn(worker2, seq) - nursery.spawn(worker3, seq) + nursery.start_soon(worker1, seq) + nursery.start_soon(worker2, seq) + nursery.start_soon(worker3, seq) """ diff --git a/trio/tests/module_with_deprecations.py b/trio/tests/module_with_deprecations.py new file mode 100644 index 0000000000..72af21c82a --- /dev/null +++ b/trio/tests/module_with_deprecations.py @@ -0,0 +1,21 @@ +regular = "hi" + +from .. import _deprecate + +_deprecate.enable_attribute_deprecations(__name__) + +__deprecated_attributes__ = { + "dep1": + _deprecate.DeprecatedAttribute( + "value1", + "1.1", + issue=1, + ), + "dep2": + _deprecate.DeprecatedAttribute( + "value2", + "1.2", + issue=1, + instead="instead-string", + ), +} diff --git a/trio/tests/test_deprecate.py b/trio/tests/test_deprecate.py index 4cec262ef0..c9d09dbe8e 100644 --- a/trio/tests/test_deprecate.py +++ b/trio/tests/test_deprecate.py @@ -2,8 +2,13 @@ import inspect import warnings +from types import ModuleType -from .._deprecate import TrioDeprecationWarning, warn_deprecated, deprecated, deprecated_alias +from .._deprecate import ( + TrioDeprecationWarning, warn_deprecated, deprecated, deprecated_alias +) + +from . import module_with_deprecations @pytest.fixture @@ -203,3 +208,44 @@ def test_deprecated_docstring_munging(): .. deprecated:: 2.1 """ + + +mod = ModuleType("mod") + + +class SubMod(ModuleType): + pass + + +try: + mod.__class__ = SubMod +except TypeError: + have_class_assignment = False +else: + have_class_assignment = True + + +@pytest.mark.skipif( + not have_class_assignment, reason="need ModuleType.__class__ assignment" +) +def test_module_with_deprecations(recwarn_always): + assert module_with_deprecations.regular == "hi" + assert len(recwarn_always) == 0 + + filename, lineno = _here() # https://github.com/google/yapf/issues/447 + assert module_with_deprecations.dep1 == "value1" + got = recwarn_always.pop(TrioDeprecationWarning) + assert got.filename == filename + assert got.lineno == lineno + 1 + + assert "module_with_deprecations.dep1" in got.message.args[0] + assert "Trio 1.1" in got.message.args[0] + assert "/issues/1" in got.message.args[0] + assert "value1 instead" in got.message.args[0] + + assert module_with_deprecations.dep2 == "value2" + got = recwarn_always.pop(TrioDeprecationWarning) + assert "instead-string instead" in got.message.args[0] + + with pytest.raises(AttributeError): + module_with_deprecations.asdf diff --git a/trio/tests/test_highlevel_serve_listeners.py b/trio/tests/test_highlevel_serve_listeners.py index 173893f54a..025fa7474e 100644 --- a/trio/tests/test_highlevel_serve_listeners.py +++ b/trio/tests/test_highlevel_serve_listeners.py @@ -128,7 +128,7 @@ async def connection_watcher(*, task_status=trio.STATUS_IGNORED): async with trio.open_nursery() as nursery: task_status.started(nursery) await wait_all_tasks_blocked() - assert len(nursery.children) == 10 + assert len(nursery.child_tasks) == 10 raise Done with pytest.raises(Done): diff --git a/trio/tests/test_highlevel_socket.py b/trio/tests/test_highlevel_socket.py index 26cab46284..2686ca3e81 100644 --- a/trio/tests/test_highlevel_socket.py +++ b/trio/tests/test_highlevel_socket.py @@ -66,8 +66,8 @@ async def waiter(nursery): nursery.cancel_scope.cancel() async with _core.open_nursery() as nursery: - nursery.spawn(sender) - nursery.spawn(waiter, nursery) + nursery.start_soon(sender) + nursery.start_soon(waiter, nursery) async def test_SocketStream_generic(): diff --git a/trio/tests/test_socket.py b/trio/tests/test_socket.py index 2656251b26..8b93356bf6 100644 --- a/trio/tests/test_socket.py +++ b/trio/tests/test_socket.py @@ -6,7 +6,7 @@ from .. import _core from .. import socket as tsocket -from .._socket import _NUMERIC_ONLY, _try_sync +from .._socket import _NUMERIC_ONLY, _try_sync, _SocketType from ..testing import assert_yields, wait_all_tasks_blocked ################################################################ @@ -229,8 +229,8 @@ async def child(sock): a, b = tsocket.socketpair() with a, b: async with _core.open_nursery() as nursery: - nursery.spawn(child, a) - nursery.spawn(child, b) + nursery.start_soon(child, a) + nursery.start_soon(child, b) @pytest.mark.skipif(not hasattr(tsocket, "fromshare"), reason="windows only") @@ -248,12 +248,12 @@ async def test_fromshare(): async def test_socket(): with tsocket.socket() as s: - assert isinstance(s, tsocket._SocketType) + assert isinstance(s, _SocketType) assert tsocket.is_trio_socket(s) assert s.family == tsocket.AF_INET with tsocket.socket(tsocket.AF_INET6, tsocket.SOCK_DGRAM) as s: - assert isinstance(s, tsocket._SocketType) + assert isinstance(s, _SocketType) assert tsocket.is_trio_socket(s) assert s.family == tsocket.AF_INET6 @@ -319,7 +319,8 @@ async def test_SocketType_dup(): with a, b: a2 = a.dup() with a2: - assert isinstance(a2, tsocket._SocketType) + assert isinstance(a2, _SocketType) + assert tsocket.is_trio_socket(a2) assert a2.fileno() != a.fileno() a.close() await a2.send(b"x") @@ -367,9 +368,8 @@ async def test_SocketType_simple_server(address, socket_type): listener.listen(20) addr = listener.getsockname()[:2] async with _core.open_nursery() as nursery: - nursery.spawn(client.connect, addr) - accept_task = nursery.spawn(listener.accept) - server, client_addr = accept_task.result.unwrap() + nursery.start_soon(client.connect, addr) + server, client_addr = await listener.accept() with server: assert client_addr == server.getpeername() == client.getsockname() await server.send(b"x") @@ -498,7 +498,7 @@ async def do_successful_blocking_recv(): assert await ta.recv(10) == b"2" async with _core.open_nursery() as nursery: - nursery.spawn(do_successful_blocking_recv) + nursery.start_soon(do_successful_blocking_recv) await wait_all_tasks_blocked() b.send(b"2") # block then cancelled @@ -508,7 +508,7 @@ async def do_cancelled_blocking_recv(): await ta.recv(10) async with _core.open_nursery() as nursery: - nursery.spawn(do_cancelled_blocking_recv) + nursery.start_soon(do_cancelled_blocking_recv) await wait_all_tasks_blocked() nursery.cancel_scope.cancel() # Okay, here's the trickiest one: we want to exercise the path where @@ -533,8 +533,8 @@ async def t2(): assert await ta.recv(1) == b"a" async with _core.open_nursery() as nursery: - nursery.spawn(t1) - nursery.spawn(t2) + nursery.start_soon(t1) + nursery.start_soon(t2) await wait_all_tasks_blocked() a.send(b"b") b.send(b"a") @@ -736,8 +736,8 @@ async def receiver(): assert nbytes == BIG async with _core.open_nursery() as nursery: - nursery.spawn(sender) - nursery.spawn(receiver) + nursery.start_soon(sender) + nursery.start_soon(receiver) # We know that we received BIG bytes of NULs so far. Make sure that # was all the data in there. diff --git a/trio/tests/test_ssl.py b/trio/tests/test_ssl.py index a087347cad..e28e11873b 100644 --- a/trio/tests/test_ssl.py +++ b/trio/tests/test_ssl.py @@ -120,7 +120,7 @@ async def ssl_echo_server_raw(**kwargs): # causes the thread to exit (possibly with an error), which allows the # nursery context manager to exit too. with a, b: - nursery.spawn( + nursery.start_soon( trio.run_sync_in_worker_thread, partial(ssl_echo_serve_sync, b, **kwargs) ) @@ -290,29 +290,29 @@ async def test_PyOpenSSLEchoStream_gives_resource_busy_errors(): s = PyOpenSSLEchoStream() with pytest.raises(_core.ResourceBusyError) as excinfo: async with _core.open_nursery() as nursery: - nursery.spawn(s.send_all, b"x") - nursery.spawn(s.send_all, b"x") + nursery.start_soon(s.send_all, b"x") + nursery.start_soon(s.send_all, b"x") assert "simultaneous" in str(excinfo.value) s = PyOpenSSLEchoStream() with pytest.raises(_core.ResourceBusyError) as excinfo: async with _core.open_nursery() as nursery: - nursery.spawn(s.send_all, b"x") - nursery.spawn(s.wait_send_all_might_not_block) + nursery.start_soon(s.send_all, b"x") + nursery.start_soon(s.wait_send_all_might_not_block) assert "simultaneous" in str(excinfo.value) s = PyOpenSSLEchoStream() with pytest.raises(_core.ResourceBusyError) as excinfo: async with _core.open_nursery() as nursery: - nursery.spawn(s.wait_send_all_might_not_block) - nursery.spawn(s.wait_send_all_might_not_block) + nursery.start_soon(s.wait_send_all_might_not_block) + nursery.start_soon(s.wait_send_all_might_not_block) assert "simultaneous" in str(excinfo.value) s = PyOpenSSLEchoStream() with pytest.raises(_core.ResourceBusyError) as excinfo: async with _core.open_nursery() as nursery: - nursery.spawn(s.receive_some, 1) - nursery.spawn(s.receive_some, 1) + nursery.start_soon(s.receive_some, 1) + nursery.start_soon(s.receive_some, 1) assert "simultaneous" in str(excinfo.value) @@ -520,12 +520,12 @@ async def receiver(s): async with ssl_echo_server() as s: async with _core.open_nursery() as nursery: - nursery.spawn(sender, s) - nursery.spawn(receiver, s) + nursery.start_soon(sender, s) + nursery.start_soon(receiver, s) # And let's have some doing handshakes too, everyone # simultaneously - nursery.spawn(s.do_handshake) - nursery.spawn(s.do_handshake) + nursery.start_soon(s.do_handshake) + nursery.start_soon(s.do_handshake) await s.aclose() @@ -598,11 +598,11 @@ async def expect(expected): b2 = bytes([(2 * i) % 0xff]) s.transport_stream.renegotiate() async with _core.open_nursery() as nursery: - nursery.spawn(send, b1) - nursery.spawn(expect, b1) + nursery.start_soon(send, b1) + nursery.start_soon(expect, b1) async with _core.open_nursery() as nursery: - nursery.spawn(expect, b2) - nursery.spawn(send, b2) + nursery.start_soon(expect, b2) + nursery.start_soon(send, b2) await clear() for i in range(100): @@ -612,8 +612,8 @@ async def expect(expected): s.transport_stream.renegotiate() await expect(b1) async with _core.open_nursery() as nursery: - nursery.spawn(expect, b2) - nursery.spawn(send, b2) + nursery.start_soon(expect, b2) + nursery.start_soon(send, b2) await clear() # Checking that wait_send_all_might_not_block and receive_some don't @@ -637,8 +637,8 @@ async def sleep_then_wait_writable(): await send(b"x") s.transport_stream.renegotiate() async with _core.open_nursery() as nursery: - nursery.spawn(expect, b"x") - nursery.spawn(sleep_then_wait_writable) + nursery.start_soon(expect, b"x") + nursery.start_soon(sleep_then_wait_writable) await clear() @@ -658,8 +658,8 @@ async def sleeper_with_slow_wait_writable_and_expect(method): await send(b"x") s.transport_stream.renegotiate() async with _core.open_nursery() as nursery: - nursery.spawn(expect, b"x") - nursery.spawn(s.wait_send_all_might_not_block) + nursery.start_soon(expect, b"x") + nursery.start_soon(s.wait_send_all_might_not_block) await clear() @@ -682,29 +682,29 @@ async def do_wait_send_all_might_not_block(): s, _ = ssl_lockstep_stream_pair() with pytest.raises(_core.ResourceBusyError) as excinfo: async with _core.open_nursery() as nursery: - nursery.spawn(do_send_all) - nursery.spawn(do_send_all) + nursery.start_soon(do_send_all) + nursery.start_soon(do_send_all) assert "another task" in str(excinfo.value) s, _ = ssl_lockstep_stream_pair() with pytest.raises(_core.ResourceBusyError) as excinfo: async with _core.open_nursery() as nursery: - nursery.spawn(do_receive_some) - nursery.spawn(do_receive_some) + nursery.start_soon(do_receive_some) + nursery.start_soon(do_receive_some) assert "another task" in str(excinfo.value) s, _ = ssl_lockstep_stream_pair() with pytest.raises(_core.ResourceBusyError) as excinfo: async with _core.open_nursery() as nursery: - nursery.spawn(do_send_all) - nursery.spawn(do_wait_send_all_might_not_block) + nursery.start_soon(do_send_all) + nursery.start_soon(do_wait_send_all_might_not_block) assert "another task" in str(excinfo.value) s, _ = ssl_lockstep_stream_pair() with pytest.raises(_core.ResourceBusyError) as excinfo: async with _core.open_nursery() as nursery: - nursery.spawn(do_wait_send_all_might_not_block) - nursery.spawn(do_wait_send_all_might_not_block) + nursery.start_soon(do_wait_send_all_might_not_block) + nursery.start_soon(do_wait_send_all_might_not_block) assert "another task" in str(excinfo.value) @@ -783,8 +783,8 @@ async def clogged_stream_maker(): # Then the client's receive_some will actually send some data to start # the handshake, and itself get stuck. async with _core.open_nursery() as nursery: - nursery.spawn(client.do_handshake) - nursery.spawn(server.do_handshake) + nursery.start_soon(client.do_handshake) + nursery.start_soon(server.do_handshake) return client, server await check_two_way_stream(stream_maker, clogged_stream_maker) @@ -840,8 +840,8 @@ async def server(): assert server_ssl.transport_stream is None async with _core.open_nursery() as nursery: - nursery.spawn(client) - nursery.spawn(server) + nursery.start_soon(client) + nursery.start_soon(server) async def test_closing_nice_case(): @@ -863,8 +863,8 @@ async def server_closer(): await server_ssl.aclose() async with _core.open_nursery() as nursery: - nursery.spawn(client_closer) - nursery.spawn(server_closer) + nursery.start_soon(client_closer) + nursery.start_soon(server_closer) # closing the SSLStream also closes its transport with pytest.raises(ClosedStreamError): @@ -906,16 +906,16 @@ async def expect_eof_server(): await server_ssl.aclose() async with _core.open_nursery() as nursery: - nursery.spawn(client_ssl.aclose) - nursery.spawn(expect_eof_server) + nursery.start_soon(client_ssl.aclose) + nursery.start_soon(expect_eof_server) async def test_send_all_fails_in_the_middle(): client, server = ssl_memory_stream_pair() async with _core.open_nursery() as nursery: - nursery.spawn(client.do_handshake) - nursery.spawn(server.do_handshake) + nursery.start_soon(client.do_handshake) + nursery.start_soon(server.do_handshake) async def bad_hook(): raise KeyError @@ -963,16 +963,16 @@ async def server(): await server_2.send_all(b"bye") async with _core.open_nursery() as nursery: - nursery.spawn(client) - nursery.spawn(server) + nursery.start_soon(client) + nursery.start_soon(server) async def test_ssl_bad_shutdown(): client, server = ssl_memory_stream_pair() async with _core.open_nursery() as nursery: - nursery.spawn(client.do_handshake) - nursery.spawn(server.do_handshake) + nursery.start_soon(client.do_handshake) + nursery.start_soon(server.do_handshake) await trio.aclose_forcefully(client) # now the server sees a broken stream @@ -991,8 +991,8 @@ async def test_ssl_bad_shutdown_but_its_ok(): ) async with _core.open_nursery() as nursery: - nursery.spawn(client.do_handshake) - nursery.spawn(server.do_handshake) + nursery.start_soon(client.do_handshake) + nursery.start_soon(server.do_handshake) await trio.aclose_forcefully(client) # the server sees that as a clean shutdown @@ -1030,8 +1030,8 @@ async def test_ssl_only_closes_stream_once(): client, server = ssl_memory_stream_pair() async with _core.open_nursery() as nursery: - nursery.spawn(client.do_handshake) - nursery.spawn(server.do_handshake) + nursery.start_soon(client.do_handshake) + nursery.start_soon(server.do_handshake) client_orig_close_hook = client.transport_stream.send_stream.close_hook transport_close_count = 0 @@ -1056,8 +1056,8 @@ async def test_ssl_https_compatibility_disagreement(): ) async with _core.open_nursery() as nursery: - nursery.spawn(client.do_handshake) - nursery.spawn(server.do_handshake) + nursery.start_soon(client.do_handshake) + nursery.start_soon(server.do_handshake) # client is in HTTPS-mode, server is not # so client doing graceful_shutdown causes an error on server @@ -1067,8 +1067,8 @@ async def receive_and_expect_error(): assert isinstance(excinfo.value.__cause__, tssl.SSLEOFError) async with _core.open_nursery() as nursery: - nursery.spawn(client.aclose) - nursery.spawn(receive_and_expect_error) + nursery.start_soon(client.aclose) + nursery.start_soon(receive_and_expect_error) async def test_https_mode_eof_before_handshake(): @@ -1081,8 +1081,8 @@ async def server_expect_clean_eof(): assert await server.receive_some(10) == b"" async with _core.open_nursery() as nursery: - nursery.spawn(client.aclose) - nursery.spawn(server_expect_clean_eof) + nursery.start_soon(client.aclose) + nursery.start_soon(server_expect_clean_eof) async def test_send_error_during_handshake(): @@ -1117,8 +1117,8 @@ async def client_side(cancel_scope): cancel_scope.cancel() async with _core.open_nursery() as nursery: - nursery.spawn(client_side, nursery.cancel_scope) - nursery.spawn(server.do_handshake) + nursery.start_soon(client_side, nursery.cancel_scope) + nursery.start_soon(server.do_handshake) with pytest.raises(BrokenStreamError): with assert_yields(): @@ -1130,8 +1130,8 @@ async def test_getpeercert(): client, server = ssl_memory_stream_pair() async with _core.open_nursery() as nursery: - nursery.spawn(client.do_handshake) - nursery.spawn(server.do_handshake) + nursery.start_soon(client.do_handshake) + nursery.start_soon(server.do_handshake) assert server.getpeercert() is None print(client.getpeercert()) @@ -1167,8 +1167,8 @@ async def setup(**kwargs): # Make sure the connection works async with _core.open_nursery() as nursery: - nursery.spawn(ssl_client.do_handshake) - nursery.spawn(ssl_server.do_handshake) + nursery.start_soon(ssl_client.do_handshake) + nursery.start_soon(ssl_server.do_handshake) # Test SSLListener.aclose await ssl_listener.aclose() diff --git a/trio/tests/test_sync.py b/trio/tests/test_sync.py index ad1b4732cc..accc893604 100644 --- a/trio/tests/test_sync.py +++ b/trio/tests/test_sync.py @@ -28,8 +28,8 @@ async def child(): record.append("woken") async with _core.open_nursery() as nursery: - t1 = nursery.spawn(child) - t2 = nursery.spawn(child) + nursery.start_soon(child) + nursery.start_soon(child) await wait_all_tasks_blocked() assert record == ["sleeping", "sleeping"] assert e.statistics().tasks_waiting == 2 @@ -94,9 +94,8 @@ async def test_CapacityLimiter(): async with _core.open_nursery() as nursery: await c.acquire_on_behalf_of("value 1") await c.acquire_on_behalf_of("value 2") - t = nursery.spawn(c.acquire_on_behalf_of, "value 3") + nursery.start_soon(c.acquire_on_behalf_of, "value 3") await wait_all_tasks_blocked() - assert t.result is None assert c.borrowed_tokens == 2 assert c.statistics().tasks_waiting == 1 c.release_on_behalf_of("value 2") @@ -104,7 +103,6 @@ async def test_CapacityLimiter(): assert c.borrowed_tokens == 2 with pytest.raises(_core.WouldBlock): c.acquire_nowait() - await t.wait() c.release_on_behalf_of("value 3") c.release_on_behalf_of("value 1") @@ -126,7 +124,7 @@ async def test_CapacityLimiter_change_total_tokens(): async with _core.open_nursery() as nursery: for i in range(5): - nursery.spawn(c.acquire_on_behalf_of, i) + nursery.start_soon(c.acquire_on_behalf_of, i) await wait_all_tasks_blocked() assert set(c.statistics().borrowers) == {0, 1} assert c.statistics().tasks_waiting == 3 @@ -174,17 +172,24 @@ async def test_Semaphore(): assert s.value == 1 s.acquire_nowait() + record = [] + + async def do_acquire(s): + record.append("started") + await s.acquire() + record.append("finished") + async with _core.open_nursery() as nursery: - t = nursery.spawn(s.acquire) + nursery.start_soon(do_acquire, s) await wait_all_tasks_blocked() - assert t.result is None + assert record == ["started"] assert s.value == 0 s.release() # Fairness: assert s.value == 0 with pytest.raises(_core.WouldBlock): s.acquire_nowait() - await t.wait() + assert record == ["started", "finished"] async def test_Semaphore_bounded(): @@ -237,13 +242,17 @@ async def test_Lock_and_StrictFIFOLock(lockcls): # Error out if we don't own the lock l.release() + holder_task = None + async def holder(): + nonlocal holder_task + holder_task = _core.current_task() async with l: await sleep_forever() async with _core.open_nursery() as nursery: assert not l.locked() - t = nursery.spawn(holder) + nursery.start_soon(holder) await wait_all_tasks_blocked() assert l.locked() # WouldBlock if someone else holds the lock @@ -256,10 +265,10 @@ async def holder(): statistics = l.statistics() print(statistics) assert statistics.locked - assert statistics.owner is t + assert statistics.owner is holder_task assert statistics.tasks_waiting == 0 - nursery.spawn(holder) + nursery.start_soon(holder) await wait_all_tasks_blocked() statistics = l.statistics() print(statistics) @@ -305,31 +314,31 @@ async def test_Condition(): # Can't notify without holding the lock c.notify_all() - async def waiter(): + finished_waiters = set() + + async def waiter(i): async with c: await c.wait() + finished_waiters.add(i) async with _core.open_nursery() as nursery: - w = [] - for _ in range(3): - w.append(nursery.spawn(waiter)) + for i in range(3): + nursery.start_soon(waiter, i) await wait_all_tasks_blocked() async with c: c.notify() assert c.locked() await wait_all_tasks_blocked() - assert w[0].result is not None - assert w[1].result is w[2].result is None + assert finished_waiters == {0} async with c: c.notify_all() await wait_all_tasks_blocked() - assert w[1].result is not None - assert w[2].result is not None + assert finished_waiters == {0, 1, 2} + finished_waiters = set() async with _core.open_nursery() as nursery: - w = [] - for _ in range(3): - w.append(nursery.spawn(waiter)) + for i in range(3): + nursery.start_soon(waiter, i) await wait_all_tasks_blocked() async with c: c.notify(2) @@ -341,9 +350,7 @@ async def waiter(): assert c.statistics().lock_statistics.tasks_waiting == 1 await wait_all_tasks_blocked() - assert w[0].result is not None - assert w[1].result is not None - assert w[2].result is None + assert finished_waiters == {0, 1} async with c: c.notify_all() @@ -401,20 +408,29 @@ async def test_Queue_join(): with assert_yields(): await q.join() + record = [] + + async def do_join(q): + record.append("started") + await q.join() + record.append("finished") + async with _core.open_nursery() as nursery: await q.put(None) - t1 = nursery.spawn(q.join) - t2 = nursery.spawn(q.join) + nursery.start_soon(do_join, q) + nursery.start_soon(do_join, q) await wait_all_tasks_blocked() - assert t1.result is t2.result is None + assert record == ["started", "started"] q.put_nowait(None) q.get_nowait() q.get_nowait() q.task_done() await wait_all_tasks_blocked() - assert t1.result is t2.result is None + assert record == ["started", "started"] q.task_done() + assert record == ["started", "started", "finished", "finished"] + async def test_Queue_iter(): q = Queue(1) @@ -432,8 +448,8 @@ async def consumer(): assert item == next(expected) async with _core.open_nursery() as nursery: - nursery.spawn(producer) - nursery.spawn(consumer) + nursery.start_soon(producer) + nursery.start_soon(consumer) async def test_Queue_statistics(): @@ -450,9 +466,9 @@ async def test_Queue_statistics(): q.put_nowait(2) q.put_nowait(3) assert q.full() - nursery.spawn(q.put, 4) - nursery.spawn(q.put, 5) - nursery.spawn(q.join) + nursery.start_soon(q.put, 4) + nursery.start_soon(q.put, 5) + nursery.start_soon(q.join) await wait_all_tasks_blocked() statistics = q.statistics() assert statistics.qsize == 3 @@ -464,9 +480,9 @@ async def test_Queue_statistics(): q = Queue(4) async with _core.open_nursery() as nursery: - nursery.spawn(q.get) - nursery.spawn(q.get) - nursery.spawn(q.get) + nursery.start_soon(q.get) + nursery.start_soon(q.get) + nursery.start_soon(q.get) await wait_all_tasks_blocked() statistics = q.statistics() assert statistics.qsize == 0 @@ -490,13 +506,20 @@ async def test_Queue_fairness(): # But if someone else is waiting to get, then they "own" the item we put, # so we can't get it (even though we run first): q = Queue(1) + + result = None + + async def do_get(q): + nonlocal result + result = await q.get() + async with _core.open_nursery() as nursery: - t = nursery.spawn(q.get) + nursery.start_soon(do_get, q) await wait_all_tasks_blocked() q.put_nowait(2) with pytest.raises(_core.WouldBlock): q.get_nowait() - assert t.result.unwrap() == 2 + assert result == 2 # And the analogous situation for put: if we free up a space, we can't # immediately put something in it if someone is already waiting to do that @@ -506,7 +529,7 @@ async def test_Queue_fairness(): q.put_nowait(None) assert q.qsize() == 1 async with _core.open_nursery() as nursery: - t = nursery.spawn(q.put, 2) + nursery.start_soon(q.put, 2) await wait_all_tasks_blocked() assert q.qsize() == 1 assert q.get_nowait() == 1 @@ -602,7 +625,7 @@ async def worker(lock_like): async with _core.open_nursery() as nursery: lock_like = lock_factory() for _ in range(WORKERS): - nursery.spawn(worker, lock_like) + nursery.start_soon(worker, lock_like) assert not in_critical_section assert acquires == LOOPS * WORKERS @@ -624,9 +647,9 @@ async def loopy(name, lock_like): lock_like = lock_factory() async with _core.open_nursery() as nursery: - nursery.spawn(loopy, 1, lock_like) - nursery.spawn(loopy, 2, lock_like) - nursery.spawn(loopy, 3, lock_like) + nursery.start_soon(loopy, 1, lock_like) + nursery.start_soon(loopy, 2, lock_like) + nursery.start_soon(loopy, 3, lock_like) # The first three could be in any order due to scheduling randomness, # but after that they should repeat in the same order for i in range(LOOPS): @@ -637,13 +660,17 @@ async def loopy(name, lock_like): async def test_generic_lock_acquire_nowait_blocks_acquire(lock_factory): lock_like = lock_factory() + record = [] + async def lock_taker(): + record.append("started") async with lock_like: pass + record.append("finished") async with _core.open_nursery() as nursery: lock_like.acquire_nowait() - t = nursery.spawn(lock_taker) + nursery.start_soon(lock_taker) await wait_all_tasks_blocked() - assert t.result is None + assert record == ["started"] lock_like.release() diff --git a/trio/tests/test_testing.py b/trio/tests/test_testing.py index 0cce9c6e17..19228509df 100644 --- a/trio/tests/test_testing.py +++ b/trio/tests/test_testing.py @@ -29,21 +29,23 @@ async def waiting_for_bee_to_leave(): record.append("quiet at last!") async with _core.open_nursery() as nursery: - t1 = nursery.spawn(busy_bee) - t2 = nursery.spawn(waiting_for_bee_to_leave) - t3 = nursery.spawn(waiting_for_bee_to_leave) + nursery.start_soon(busy_bee) + nursery.start_soon(waiting_for_bee_to_leave) + nursery.start_soon(waiting_for_bee_to_leave) # check cancellation + record = [] + async def cancelled_while_waiting(): try: await wait_all_tasks_blocked() except _core.Cancelled: - return "ok" + record.append("ok") async with _core.open_nursery() as nursery: - t4 = nursery.spawn(cancelled_while_waiting) + nursery.start_soon(cancelled_while_waiting) nursery.cancel_scope.cancel() - assert t4.result.unwrap() == "ok" + assert record == ["ok"] async def test_wait_all_tasks_blocked_with_timeouts(mock_clock): @@ -55,7 +57,7 @@ async def timeout_task(): record.append("tt finished") async with _core.open_nursery() as nursery: - t = nursery.spawn(timeout_task) + nursery.start_soon(timeout_task) await wait_all_tasks_blocked() assert record == ["tt start"] mock_clock.jump(10) @@ -86,11 +88,11 @@ async def wait_big_cushion(): record.append("wait_big_cushion end") async with _core.open_nursery() as nursery: - nursery.spawn(blink) - nursery.spawn(wait_no_cushion) - nursery.spawn(wait_small_cushion) - nursery.spawn(wait_small_cushion) - nursery.spawn(wait_big_cushion) + nursery.start_soon(blink) + nursery.start_soon(wait_no_cushion) + nursery.start_soon(wait_small_cushion) + nursery.start_soon(wait_small_cushion) + nursery.start_soon(wait_big_cushion) assert record == [ "blink start", @@ -110,12 +112,12 @@ async def do_wait(cushion, tiebreaker): record.append((cushion, tiebreaker)) async with _core.open_nursery() as nursery: - nursery.spawn(do_wait, 0, 0) - nursery.spawn(do_wait, 0, -1) - nursery.spawn(do_wait, 0, 1) - nursery.spawn(do_wait, 0, -1) - nursery.spawn(do_wait, 0.0001, 10) - nursery.spawn(do_wait, 0.0001, -10) + nursery.start_soon(do_wait, 0, 0) + nursery.start_soon(do_wait, 0, -1) + nursery.start_soon(do_wait, 0, 1) + nursery.start_soon(do_wait, 0, -1) + nursery.start_soon(do_wait, 0.0001, 10) + nursery.start_soon(do_wait, 0.0001, -10) assert record == sorted(record) assert record == [ @@ -193,11 +195,10 @@ async def f2(seq): seq = Sequencer() async with _core.open_nursery() as nursery: - t1 = nursery.spawn(f1, seq) - t2 = nursery.spawn(f2, seq) + nursery.start_soon(f1, seq) + nursery.start_soon(f2, seq) async with seq(5): - await t1.wait() - await t2.wait() + await wait_all_tasks_blocked() assert record == [ ("f2", 0), ("f1", 1), ("f2", 2), ("f1", 3), ("f1", 4) ] @@ -227,8 +228,8 @@ async def child(i): record.append("seq({}) RuntimeError".format(i)) async with _core.open_nursery() as nursery: - t1 = nursery.spawn(child, 1) - t2 = nursery.spawn(child, 2) + nursery.start_soon(child, 1) + nursery.start_soon(child, 2) async with seq(0): pass # pragma: no cover @@ -387,8 +388,8 @@ async def waiter(): record.append("waiter done") async with _core.open_nursery() as nursery: - nursery.spawn(sleeper) - nursery.spawn(waiter) + nursery.start_soon(sleeper) + nursery.start_soon(waiter) assert record == list(range(10)) + ["yawn", "waiter done"] @@ -448,14 +449,14 @@ async def getter(expect): assert await ubq.get() == expect async with _core.open_nursery() as nursery: - nursery.spawn(getter, b"xyz") - nursery.spawn(putter, b"xyz") + nursery.start_soon(getter, b"xyz") + nursery.start_soon(putter, b"xyz") # Two gets at the same time -> ResourceBusyError with pytest.raises(_core.ResourceBusyError): async with _core.open_nursery() as nursery: - nursery.spawn(getter, b"asdf") - nursery.spawn(getter, b"asdf") + nursery.start_soon(getter, b"asdf") + nursery.start_soon(getter, b"asdf") # Closing @@ -479,8 +480,8 @@ async def closer(): ubq2.close() async with _core.open_nursery() as nursery: - nursery.spawn(getter, b"") - nursery.spawn(closer) + nursery.start_soon(getter, b"") + nursery.start_soon(closer) async def test_MemorySendStream(): @@ -508,8 +509,8 @@ async def do_send_all(data): with pytest.raises(_core.ResourceBusyError): async with _core.open_nursery() as nursery: - nursery.spawn(do_send_all, b"xxx") - nursery.spawn(do_send_all, b"xxx") + nursery.start_soon(do_send_all, b"xxx") + nursery.start_soon(do_send_all, b"xxx") with assert_yields(): await mss.aclose() @@ -576,8 +577,8 @@ async def do_receive_some(max_bytes): with pytest.raises(_core.ResourceBusyError): async with _core.open_nursery() as nursery: - nursery.spawn(do_receive_some, 10) - nursery.spawn(do_receive_some, 10) + nursery.start_soon(do_receive_some, 10) + nursery.start_soon(do_receive_some, 10) assert mrs.receive_some_hook is None @@ -684,8 +685,8 @@ async def receiver(expected): assert await r.receive_some(10) == expected async with _core.open_nursery() as nursery: - nursery.spawn(receiver, b"abc") - nursery.spawn(sender) + nursery.start_soon(receiver, b"abc") + nursery.start_soon(sender) # And this fails if we don't pump from close_hook async def aclose_after_all_tasks_blocked(): @@ -693,8 +694,8 @@ async def aclose_after_all_tasks_blocked(): await s.aclose() async with _core.open_nursery() as nursery: - nursery.spawn(receiver, b"") - nursery.spawn(aclose_after_all_tasks_blocked) + nursery.start_soon(receiver, b"") + nursery.start_soon(aclose_after_all_tasks_blocked) s, r = memory_stream_one_way_pair() @@ -703,8 +704,8 @@ async def close_after_all_tasks_blocked(): s.close() async with _core.open_nursery() as nursery: - nursery.spawn(receiver, b"") - nursery.spawn(close_after_all_tasks_blocked) + nursery.start_soon(receiver, b"") + nursery.start_soon(close_after_all_tasks_blocked) s, r = memory_stream_one_way_pair() @@ -723,8 +724,8 @@ async def check_for_cancel(): await r.receive_some(10) async with _core.open_nursery() as nursery: - nursery.spawn(cancel_after_idle, nursery) - nursery.spawn(check_for_cancel) + nursery.start_soon(cancel_after_idle, nursery) + nursery.start_soon(check_for_cancel) s.send_all_hook = old await s.send_all(b"789") @@ -749,8 +750,8 @@ async def receiver(): assert await a.receive_some(10) == b"xyz" async with _core.open_nursery() as nursery: - nursery.spawn(receiver) - nursery.spawn(sender) + nursery.start_soon(receiver) + nursery.start_soon(sender) async def test_memory_streams_with_generic_tests(): diff --git a/trio/tests/test_threads.py b/trio/tests/test_threads.py index aeb8e88c22..93c5ab0e41 100644 --- a/trio/tests/test_threads.py +++ b/trio/tests/test_threads.py @@ -195,15 +195,23 @@ def f(q): register[0] = "finished" async def child(q, cancellable): - return await run_sync_in_worker_thread(f, q, cancellable=cancellable) + record.append("start") + try: + return await run_sync_in_worker_thread( + f, q, cancellable=cancellable + ) + finally: + record.append("exit") + record = [] q = stdlib_queue.Queue() async with _core.open_nursery() as nursery: - task1 = nursery.spawn(child, q, True) + nursery.start_soon(child, q, True) # Give it a chance to get started. (This is important because # run_sync_in_worker_thread does a yield_if_cancelled before blocking # on the thread, and we don't want to trigger this.) await wait_all_tasks_blocked() + assert record == ["start"] # Then cancel it. nursery.cancel_scope.cancel() # The task exited, but the thread didn't: @@ -214,16 +222,17 @@ async def child(q, cancellable): time.sleep(0.01) # This one can't be cancelled + record = [] register[0] = None async with _core.open_nursery() as nursery: - task2 = nursery.spawn(child, q, False) + nursery.start_soon(child, q, False) await wait_all_tasks_blocked() nursery.cancel_scope.cancel() with _core.open_cancel_scope(shield=True): for _ in range(10): await _core.yield_briefly() # It's still running - assert task2.result is None + assert record == ["start"] q.put(None) # Now it exits @@ -251,7 +260,7 @@ async def child(): await run_sync_in_worker_thread(thread_fn, cancellable=True) async with _core.open_nursery() as nursery: - t = nursery.spawn(child) + nursery.start_soon(child) await wait_all_tasks_blocked() nursery.cancel_scope.cancel() @@ -328,7 +337,7 @@ def thread_fn(cancel_scope): state.running -= 1 print("thread_fn exiting") - async def run_thread(): + async def run_thread(event): with _core.open_cancel_scope() as cancel_scope: await run_sync_in_worker_thread( thread_fn, @@ -340,12 +349,14 @@ async def run_thread(): "run_thread finished, cancelled:", cancel_scope.cancelled_caught ) + event.set() async with _core.open_nursery() as nursery: print("spawning") - tasks = [] + events = [] for i in range(COUNT): - tasks.append(nursery.spawn(run_thread)) + events.append(Event()) + nursery.start_soon(run_thread, events[-1]) await wait_all_tasks_blocked() # In the cancel case, we in particular want to make sure that the # cancelled tasks don't release the semaphore. So let's wait until @@ -354,7 +365,7 @@ async def run_thread(): # who's supposed to be waiting is waiting: if cancel: print("waiting for first cancellation to clear") - await tasks[0].wait() + await events[0].wait() await wait_all_tasks_blocked() # Then wait until the first MAX threads are parked in gate.wait(), # and the next MAX threads are parked on the semaphore, to make diff --git a/trio/tests/test_util.py b/trio/tests/test_util.py index 4765ff969f..6847161dba 100644 --- a/trio/tests/test_util.py +++ b/trio/tests/test_util.py @@ -47,8 +47,8 @@ async def wait_with_ul1(): with pytest.raises(_core.ResourceBusyError) as excinfo: async with _core.open_nursery() as nursery: - nursery.spawn(wait_with_ul1) - nursery.spawn(wait_with_ul1) + nursery.start_soon(wait_with_ul1) + nursery.start_soon(wait_with_ul1) assert "ul1" in str(excinfo.value) # mixing sync and async entry