Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an asyncio.TaskGroup.cancel method #108951

Open
smurfix opened this issue Sep 5, 2023 · 66 comments
Open

Add an asyncio.TaskGroup.cancel method #108951

smurfix opened this issue Sep 5, 2023 · 66 comments
Labels
stdlib Python modules in the Lib dir topic-asyncio type-feature A feature request or enhancement

Comments

@smurfix
Copy link

smurfix commented Sep 5, 2023

Feature or enhancement

Proposal:

Trio/anyio have a way to cancel all tasks within a taskgroup, via taskgroup.cancel_scope.cancel().

Asyncio doesn't have cancel scopes. The way to currently do this is to cancel the task which the taskgroup is a child of, but there are problems with this approach:

  • the task is a private member of the taskgroup, so the task is inaccessible through it
  • what about nested taskgroups?
  • also passing the task around is additional overhead and risks cancelling it when the taskgroup is no longer active

To fix this, I propose to add a taskgroup.cancelmethod which behaves like task.cancel if the taskgroup is still active (i.e. not aborting).

If this change is likely to be accepted I'm OK with writing a PR.

Has this already been discussed elsewhere?

I have already discussed this feature proposal on Discourse

Links to previous discussion of this feature:

https://discuss.python.org/t/how-to-cancel-all-tasks-created-within-same-taskgroup/30215

Linked PRs

@smurfix smurfix added the type-feature A feature request or enhancement label Sep 5, 2023
@github-project-automation github-project-automation bot moved this to Todo in asyncio Sep 5, 2023
@gvanrossum
Copy link
Member

I am afraid that I don't understand the problem well enough. Could you explain the need for this and the desired behavior in edge cases better? A bullet like "what about nested taskgroups" makes me wonder -- what about them? I have no idea what structure your app has (and I haven't been following Trio or anyio so referencing that doesn't help me).

@smurfix
Copy link
Author

smurfix commented Sep 6, 2023

Consider this test program.

import asyncio

async def t1():
    async with asyncio.TaskGroup() as tg:
        tg.create_task(asyncio.sleep(1))
        tg._parent_task.cancel() # asyncio.current-task().cancel()
    assert False, "not reached"

try:
    asyncio.run(t1())
    assert False, "not reached"
except asyncio.CancelledError:
    print("Cancelled")

The cancellation always bubbles out of the taskgroup.

Here's the anyio equivalent:

import anyio

async def t2():
    async with anyio.create_task_group() as tg:
        tg.start_soon(anyio.sleep,1)
        tg.cancel_scope.cancel()
    print("after anyio cancel")

anyio.run(t2)

While asyncio doesn't have cancel scopes, that's an implementation detail. The point is that, when cancelled via its own .cancel method, the taskgroup swallows the exception (after cleaning up its subtasks of course) and simply leaves the scope.

As to code structure, let's assume I have a complex program with parts that want to be restarted separately if/when their configuration changes. With a cancel-able taskgroup, this is easy, each subpart can do

async def subsys(config):
    while True:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(wait_changed(tg.cancel, config))
            ... # now do whatever the config tells us to do

where wait_changed runs the callback (cancelling the taskgroup) as soon as the configuration starts changing, but waits for the change to stabilize before it itself exits.

This is difficult to do without TaskGroup.cancel. I'd have to raise some exception from within wait_changed instead of using a callback, and catch that outside of the taskgroup. Then I need to add a separate way to do the stabilization part, which probably means I need to create an object for the wait_changed part instead of simply starting a subtask. So these two lines I need for config-change handling blow up to ten lines of boilerplate I need to sprinkle my program with, which is kindof suboptimal.

@gvanrossum
Copy link
Member

It feels, at least to me, that "cancel" elicits all the wrong connotations, in an asyncio context, because it sounds like it would cause a CancelledError exception to propagate out.

It looks like the functionality you are looking for is a way to stop all the tasks running in the task group but then make the task group's async with block exit "peacefully" (not raising).

If you come up with a name other than "cancel" (maybe "stop"? but up to you) and a PR that implements it, I will consider it. Try not to use Trio/anyio as guidance or model, since we should not assume asyncio users are familiar with those.

@gvanrossum
Copy link
Member

Ugh.

Can you please refrain from this kind of negative language? It's not helping your case.

@smurfix
Copy link
Author

smurfix commented Sep 6, 2023

Sorry. Reworded.

@smurfix
Copy link
Author

smurfix commented Sep 6, 2023

If you come up with a name other than "cancel" (maybe "stop"? but up to you) and a PR that implements it, I will consider it.

Will do.

@Eclips4 Eclips4 changed the title Add a taskgroup.cancel method Add a taskgroup.stop method May 25, 2024
@Eclips4 Eclips4 self-assigned this May 25, 2024
@sobolevn
Copy link
Member

sobolevn commented Sep 5, 2024

I had some time today to experiment with this. Here's a patch that seems to do the very basic thing:

diff --git Lib/asyncio/taskgroups.py Lib/asyncio/taskgroups.py
index f2ee9648c43..968a539f5dd 100644
--- Lib/asyncio/taskgroups.py
+++ Lib/asyncio/taskgroups.py
@@ -150,25 +150,34 @@ async def __aexit__(self, et, exc, tb):
             # cycles (bad for GC); let's not keep a reference to
             # a bunch of them.
             try:
-                me = BaseExceptionGroup('unhandled errors in a TaskGroup', self._errors)
-                raise me from None
+                raise BaseExceptionGroup(
+                    'unhandled errors in a TaskGroup',
+                    self._errors
+                ) from None
+            except* _TaskGroupStop:
+                pass
             finally:
                 self._errors = None
 
+        # Direct `.stop()` call in `TaskGroup` body, like:
+        #
+        # async with TaskGroup() as tg:
+        #     if some_condition:
+        #         tg.stop()  <- here
+        if issubclass(et, _TaskGroupStop):
+            return True
+
+    def stop(self):
+        """Stop all the tasks in the task group."""
+        self._check_state()
+        raise _TaskGroupStop()
+
     def create_task(self, coro, *, name=None, context=None):
         """Create a new task in this group and return it.
 
         Similar to `asyncio.create_task`.
         """
-        if not self._entered:
-            coro.close()
-            raise RuntimeError(f"TaskGroup {self!r} has not been entered")
-        if self._exiting and not self._tasks:
-            coro.close()
-            raise RuntimeError(f"TaskGroup {self!r} is finished")
-        if self._aborting:
-            coro.close()
-            raise RuntimeError(f"TaskGroup {self!r} is shutting down")
+        self._check_state(coro)
         if context is None:
             task = self._loop.create_task(coro, name=name)
         else:
@@ -184,6 +193,19 @@ def create_task(self, coro, *, name=None, context=None):
             task.add_done_callback(self._on_task_done)
         return task
 
+    def _check_state(self, coro=None):
+        error = None
+        if not self._entered:
+            error = RuntimeError(f"TaskGroup {self!r} has not been entered")
+        if self._exiting and not self._tasks:
+            error = RuntimeError(f"TaskGroup {self!r} is finished")
+        if self._aborting:
+            error = RuntimeError(f"TaskGroup {self!r} is shutting down")
+        if error is not None:
+            if coro is not None:
+                coro.close()
+            raise error
+
     # Since Python 3.8 Tasks propagate all exceptions correctly,
     # except for KeyboardInterrupt and SystemExit which are
     # still considered special.
@@ -250,3 +272,7 @@ def _on_task_done(self, task):
             self._abort()
             self._parent_cancel_requested = True
             self._parent_task.cancel()
+
+
+class _TaskGroupStop(Exception):
+    """Exception to stop all the tasks in a task group."""

It allows to stop the TaskGroup gracefully:

  • from tasks
  • from the TaskGroup's body

I used this code to test the concept:

import asyncio
from asyncio import TaskGroup

from random import random

async def job(i, group=None):
    while True:
        print(f"Task {i}")
        await asyncio.sleep(1)
        if random() < 0.4:
            print(f"Task {i} cancelling others")
            group.stop()
            return

async def main():
    async with TaskGroup() as group:
        group.create_task(job(1, group))
        group.create_task(job(2, group))
        # TODO: try this as well
        # group.stop()

asyncio.run(main())

It seems to work for the general case, but I am not confident enough in it to submit a PR. Maybe someone else can improve it :)

@picnixz picnixz added the stdlib Python modules in the Lib dir label Sep 5, 2024
@picnixz
Copy link
Member

picnixz commented Sep 5, 2024

Playing the devil's advocate but should we care of a try-except that could trap this exception?

async with TaskGroup() as group: 
    group.create_task(job(1, group))
	try:
		group.stop()
	except:
		pass

Alternatively, can we use an event + waiter task to cancel the tasks in the group? or would this solution too tricky to implement?

@sobolevn
Copy link
Member

sobolevn commented Sep 5, 2024

I don't know :)
That's why I didn't commit to the implementation.

@gvanrossum
Copy link
Member

I forget if I suggested this before -- I presume you could do something like add a new task to the task group that self-cancels, and then do something like except* CancelledError:? Assuming that stopping a task group is relatively rare, that would be most efficient, since it costs nothing if you don't need to stop a group. It would only be a few lines to turn that into a helper function in an app that does need it.

@sobolevn
Copy link
Member

sobolevn commented Sep 6, 2024

Following @gvanrossum's idea, this can be done on unchanged TaskGroup:

import asyncio
from asyncio import TaskGroup
from contextlib import asynccontextmanager
from random import random

class Stop(Exception):
    """Stop the task group."""

@asynccontextmanager
async def stoppable_task_group():
    try:
        async with TaskGroup() as group:
            yield group
    except* Stop as ex:
        print(f'{ex} was stopped')

async def stop(group):
    async def factory():
        raise Stop(group)
    await group.create_task(factory())

async def job(i, group=None):
    while True:
        print(f"Task {i}")
        await asyncio.sleep(1)
        if random() < 0.4:
            print(f"Task {i} cancelling others")
            await stop(group)
            return

async def main():
    async with stoppable_task_group() as group:
        group.create_task(job(1, group))
        group.create_task(job(2, group))

asyncio.run(main())

I changed CancelledError to be a custom Stop, because CancelledError has a bunch of special handling already, which we don't seem to require with this specific case.

@picnixz
Copy link
Member

picnixz commented Sep 6, 2024

Personally,

  • I would add this example to the docs.
  • I wouldn't add it to the stdlib because it would create an additional exception class. I think this solution has its place in a util.py file rather than as a dedicated functionality.
  • If you still want to add it to the stdlib, I think we can create a stop-task factory instead and not expose the exception itself (or maybe we should, but in this case a better name should be found [bikeshedding yay!]). Though I need to think of whether I can still play the devil's advocate in this case or not...

@gvanrossum
Copy link
Member

Could one of you elaborate why we shouldn't just use CancelledError?

@sobolevn
Copy link
Member

sobolevn commented Sep 9, 2024

@gvanrossum I tried the same exact example code but with asyncio.CancelledError in both raise and except* and it sometimes didn't work correctly for me. When I run this example there are cases like:

Task 1 cancelling others
Task 2
Task 2
Task 2 cancelling others

Notice: Task 2 executes once after the Task 1 is cancelled. After that it is also cancelling other tasks.

I can also get cases like:

Task 1
Task 2
Task 1
Task 2 cancelling others
Task 1
Task 1
Task 1
Task 1
Task 1
(continues to run)

I am not sure exactly why, but it does not happen with custom exceptions.

This might be a bug on its own.

@gvanrossum
Copy link
Member

gvanrossum commented Sep 10, 2024

Oh, it's because of this comment (part of a docstring) in taskgroups.py:

    Any exceptions other than `asyncio.CancelledError` raised within
    a task will cancel all remaining tasks and wait for them to exit.

There are numerous special cases for CancelledError that I don't feel like understanding right now, but I'm sure some of them are related to this documented behavior. It's not a bug.

Befor we go ahead with documenting the pattern for stopping a task group we should probably reason through or try via experiments how things should work when there are several "nested" task groups. ("Nested" in quotes because there's no concept of nesting task groups, but we certainly can create a task group in a task associated with another task group, and then we can consider the latter group the "parent" of the former.

  • What happens when the parent task group gets stopped this way? Presumably the task that created the inner task group gets cancelled while in the with statement that manages the inner task group, and that's going to cancel all the tasks belonging to the inner group. I think.
  • And what happens if the inner task group gets stopped this way? Since it catches the Stop exception it won't bubble out an exception, unless some of its tasks raise an exception upon being cancelled (or had already raised some exception but other tasks in the group were still processing a cancellation). The exception bubbling out will be an ExceptionGroup; I think that will be wrapped by the outer task group in another ExceptionGroup (but I haven't tried to check this, so I may be wrong -- possibly the ExceptionGroups are combined).

@sobolevn
Copy link
Member

Trio / AnyIO have clear anwsers to these complex questions thanks to CancelScopes:

I feel like this abstraction allows to clearly see what and where we want to bubble up or to suppress.

Basic example of how we can try to think about different task groups:

async def nested_group(cancel_scope):
    async with TaskGroup(cancel_scope=cancel_scope) as tg2:
         ...

async def main():
    async with TaskGroup() as tg:
        # cancelling of `tg2` will cancel `tg`, since they are in the same scope:
        tg.create_task(nested_group(tg.cancel_scope))  
        # cancelling of `tg2` here will not bubble up, since they are independent:
        tg.create_task(nested_group(None))  

However, even with scopes there are lots of cases that can break the flow: https://anyio.readthedocs.io/en/stable/cancellation.html#avoiding-cancel-scope-stack-corruption

@gvanrossum
Copy link
Member

Sorry, I have never managed to understand cancel scopes in Trio. And the models are just so different that the Trio abstractions just can't help asyncio.

gvanrossum pushed a commit that referenced this issue Sep 11, 2024

Unverified

This commit is not signed, but one or more authors requires that any commit attributed to them is signed.
We don't want to add another API, since the recipe is straightforward and rarely needed.

The advantage is that we could backport this to the earliest Python version that has taskgroups (3.11, alas in security mode already, so we'll just do 3.12 and 3.13).
miss-islington pushed a commit to miss-islington/cpython that referenced this issue Sep 11, 2024
…onGH-123837)

We don't want to add another API, since the recipe is straightforward and rarely needed.

The advantage is that we could backport this to the earliest Python version that has taskgroups (3.11, alas in security mode already, so we'll just do 3.12 and 3.13).
(cherry picked from commit ef05801)

Co-authored-by: Bénédikt Tran <[email protected]>
miss-islington pushed a commit to miss-islington/cpython that referenced this issue Sep 11, 2024
…onGH-123837)

We don't want to add another API, since the recipe is straightforward and rarely needed.

The advantage is that we could backport this to the earliest Python version that has taskgroups (3.11, alas in security mode already, so we'll just do 3.12 and 3.13).
(cherry picked from commit ef05801)

Co-authored-by: Bénédikt Tran <[email protected]>
@belm0
Copy link
Contributor

belm0 commented Nov 24, 2024

you can't use tg.create_task(...) immediately after entering the task group, because it's already shutting down

That's sad-- would it be reasonable to stop the task group at enter rather than the first await? Maybe that isn't possible... then relax the RuntimeError on aborting in this case? The error seems questionable in general.

PR #127214 is in progress

@smurfix
Copy link
Author

smurfix commented Nov 25, 2024

You can't stop the taskgroup "at enter" because an exception raised in __aenter__ isn't passed through the corresponding __aexit__, thus we have no way to catch it.

While it may or may not be possible to fix that — notionally some special exception raised by __enter__ could mean "behave as if the exception contained in me was raised immediately after entering the block normally", except I have no idea how that would work in conjunction with ExitStack), that's out of scope of this PR, IMHO.

@arthur-tacca
Copy link

arthur-tacca commented Nov 26, 2024

That's sad-- would it be reasonable to stop the task group at enter rather than the first await? Maybe that isn't possible... then relax the RuntimeError on aborting in this case? The error seems questionable in general.

smurfix addressed the first part of this. So what about suppressing the error in this case, or indeed always (if you find it questionable)?

Actually, this error is very useful. It provides the guarantee that a task group will never silently discard a request (though sadly that is only true if using eager tasks – see #116048). Then you can write code that consumes and reliably closes some resource, like a database connection:

async def uses_connection(conn, work):
    try:
        await do_some_work(conn, work)
    finally:
        conn.close()

async def do_multiple():
    async with asyncio.TaskGroup() as tg:
        for i in range(my_count):
            conn = await create_connection()
            try:
                tg.create_task(uses_connection(conn, i))
            except RuntimeError:
                conn.close()
                raise

The above code should safely close the connection, regardless of what happens with cancellation and exceptions (again, so long as you're using eager tasks).

@graingert
Copy link
Contributor

graingert commented Nov 26, 2024

stop() to me reminds me too much of asyncio.EventLoop.stop, it also implies that code after the TaskGroup won't run. how about: TaskGroup.move_on()

@MarkofChina
Copy link

why not to use:

async with asyncio.timeout(None)as tm:
    async with asyncio.TaskGroup() as tg:
             …

then:
tm.reschedule(0).
It felt more graceful.

@graingert
Copy link
Contributor

graingert commented Nov 27, 2024

why not to use:

async with asyncio.timeout(None)as tm:
    async with asyncio.TaskGroup() as tg:
             …

then: tm.reschedule(0). It felt more graceful.

This doesn't cancel the tasks soon enough, it waits for an event loop cycle, it raises a CancelledError that then needs to propagate out the TaskGroup body, and it raises a TimeoutError that needs careful management to suppresses correctly.

timeout could be fixed to cancel immediately when the deadline is exceeded (but you still need to wait for the cancellation to be raised and propagated), and have a exc=None kwarg to suppress the TimeputError

@python python deleted a comment from graingert-coef Nov 27, 2024
@smurfix
Copy link
Author

smurfix commented Nov 28, 2024

I have to admit that I'm strongly +1 on .cancel given that trio+anyio use that name. Trio is already an outlier in that its taskgroups are called "nurseries". We don't need even more different names for what's supposed to be the same thing.

IIRC Guido has objected to that because .cancel is supposed to raise a CancelledError. Well, it does do that. The fact that the exception will not propagate beyond the taskgroup it's been raised in is a feature; if you do want it to do that, you're free to expose the task's cancel method instead of the taskgroup's.

@belm0
Copy link
Contributor

belm0 commented Nov 30, 2024

from @graingert @arthur-tacca

It provides the guarantee that a task group will never silently discard a request

I don't agree this is necessary. If acknowledgement that a request was received is needed, it can be done at a higher level rather than relying on the task group API.

Then you can write code that consumes and reliably closes some resource, like a database connection:

Maybe this example was contrived, but I didn't understand why the scope that created the connection isn't responsible for closing it (either via try/finally or a context manager).

@belm0
Copy link
Contributor

belm0 commented Nov 30, 2024

As stated, calling create_task() on a task group that is cancelling yields a RuntimeError. I'll try making a case against this behavior.

Raising a generic RuntimeError usually implies you're doing something wrong-- like a logical error that cannot be resolved at runtime. It says "don't catch me explicitly" -- you'd have to parse the error string to do so, after all. While unlucky, I don't think it's wrong to call create_task() on a task group in cancelling state.

I view the following as an API inconsistency. Why is it OK (i.e. not a RuntimeError) to call create_task() on a task group that is essentially dead another way, namely within an expired timeout:

async with timeout(0):
    async with asyncio.TaskGroup() as tg:
        tg.create_task(asyncio.sleep(1))  # not a RuntimeError
        # __aexit__ produces the expected TimeoutError

@smurfix
Copy link
Author

smurfix commented Nov 30, 2024

Why is it OK (i.e. not a RuntimeError) to call create_task() on a task group that is essentially dead another way

IMHO the contract should be that either the create_task fails, ideally with some non-generic RuntimeError subclass (TaskgroupClosedError?) which you can catch, or the task starts to run (but may be cancelled the first time it awaits something, assuming that the async call in question actually yields).

Anything else risks data loss.

Trio does the latter. For both compatibility and improved error handling (you only need to catch the problem at one point, inside the task, where you need to handle it anyway) I recommend following its example.

This example prints "Yes" before propagating the RuntimeError:

import trio

async def a(n):
    await trio.sleep(0.1)
    raise RuntimeError("This error is expected")

async def b(n):
    try:
        await trio.sleep(0.2)
    finally:
        n.start_soon(c)

async def c():
    print("Yes")
    await trio.sleep(0)
    raise RuntimeError("We don't go here")

async def main():
    async with trio.open_nursery() as n:
        n.start_soon(a,n)
        n.start_soon(b,n)
        pass

trio.run(main)

@smurfix
Copy link
Author

smurfix commented Nov 30, 2024

Trio does the latter

as long as the taskgroup's __aexit__method has not returned, of course. After that it raises RuntimeError, which IMHO is reasonable at that point.

@belm0
Copy link
Contributor

belm0 commented Nov 30, 2024

A problem with create_task() raising an error in the cancelled-before-enter case is that it makes the first create_task() call of any group special, and that call might not be written literally in the group's body. It could be called indirectly, by a helper function that spawns into a given TaskGroup instance. So then we'll need paranoia from any code potentially making such a call, as well as competing approaches: require the exception to be suppressed locally, vs. require something higher in the callback to be prepared for the exception.

I understand that when TaskGroup was first authored, this was an edge case that may not have been given much thought (especially prior to a feature of being able to cancel task groups), and the conservative thing to do was raise a runtime error. But raising an error (and especially as RuntimeError) is not clearly helpful in practice, and I hope this decision can be revisited.

@arthur-tacca
Copy link

arthur-tacca commented Dec 1, 2024

@belm0 You quoted me but attributed it to graingert.

For a fairly practical example of where a task group produces a resource and some tasks consume them, see this example using asyncio.start_server() in the issue I linked earlier. I think you'd be hard pressed for a normal user of asyncio to notice the possible gap there.

By the way, the first call to tg.create_task() is not necessarily special (not that I agree with that characterisation anyway). If that call happens after an await (as happens in the example I just linked to) then the pending cancellation will be raised at that await and won't make it to tg.create_task().

Maybe this example was contrived, but I didn't understand why the scope that created the connection isn't responsible for closing it (either via try/finally or a context manager).

@belm0 Sorry I didn't actually answer your question initially.

On the contrary, it would contrived if the connection closure didn't happen until the whole task group shuts down. If the connection sends a grateful shutdown request, or causes some fatal (for the connection) error, then it needs to be closed, and only the task representing the connection's lifetime would know to do that. The task group is for the whole length of time you can accept connections, which is usually much longer, potentially the whole lifetime of the server; if you wait for that to finish then you'll leak connections over time.

Side note 1: Yes, you could also close connections when the task group shuts, as a backup for if the connection's task misses it. In fact I mentioned that in that issue (in the comment before). But you have to do some extra admin for that, and for no other reason than the missing run-till-await guarantee. As I said in that bug report: it looks suspiciously like a workaround for a bug because, IMO, it is.

Side note 2: This is really off topic, but actually a more careful application may well have a different scope for connection creation and connection cleanup. That way, when the overall server shuts down, it can immediately stop accepting new connections (cancel listener task group immediately) while still giving some some time to allow gracefully shutting down existing connections (so, maybe surprisingly, the connection task group is outside the listener task group).

@asvetlov asvetlov changed the title Add a taskgroup.stop method Add a taskgroup.cancel method Dec 27, 2024
@picnixz picnixz changed the title Add a taskgroup.cancel method Add an asyncio.TaskGroup.cancel method Dec 27, 2024
@rrevans
Copy link

rrevans commented Dec 27, 2024

Many of the use cases mentioned above can be implemented without TaskGroup.cancel? Examples below.

Also maybe I'm misunderstanding, but it seems all cases could be implemented by wrapping the group in a Task and cancelling that?

Race-to-the-finish with two calls (e.g. happy eyeballs use case)

await with asyncio.TaskGroup() as g:
  t1 = g.create_task(...)
  t2 = g.create_task(...)
  t1.add_done_callback(lambda _: t2.cancel())
  t2.add_done_callback(lambda _: t1.cancel())

Or await asyncio.wait([t1, t2], return_when=asyncio.FIRST_COMPLETED) then cancel the still-pending tasks.

Explicit stop signal sent to a task group

stop = asyncio.Event()
await with asyncio.TaskGroup() as g:
  # create some tasks
  await stop.wait()
  raise asyncio.CancelledError()

Call stop.set() elsewhere to stop the TaskGroup.

Tracking tasks from asyncio.start_server for eager shutdown

async def handle_connection(r, w):
  ...
await with asyncio.TaskGroup() as g:
  def connect_cb(r, w):
    g.create_task(handle_connection(r, w))
  s = await asyncio.start_server(connect_cb, ...)
  g.create_task(s.serve_forever())

Run this as a coroutine or task, and cancel propagates automatically to server and all active connections.

@smurfix
Copy link
Author

smurfix commented Dec 30, 2024

Also maybe I'm misunderstanding, but it seems all cases could be implemented by wrapping the group in a Task and cancelling that?

Well. Possibly. However …

then cancel the still-pending tasks

… unraveling thread spaghetti is exactly what taskgroups are supposed to shield the programmer from.

Also, when you cancel the task you preclude it from returning a result. "Return a value by assigning it to some variable in an outer scope" (or to an object's attribute) is an anti-pattern I'd like to be able to avoid.

@rrevans
Copy link

rrevans commented Dec 31, 2024

Maybe this is really about avoiding separate functions / objects for the state? I agree there's merit to keeping that wiring together in one scope to avoid clumsy decomposition into separate tasks.

So then the main goal becomes creating a cancellable scope within a function.

I think it's helpful to consider the semantics such a cancellable scope should have separately from the possibility of integrating it into the existing TaskGroup. Such a scope could enclose a TaskGroup after if they are separate.

For exposition, suppose the parts of TaskGroup and timeout that deal with cancelling the inner suite of the async with statement could be factored out to a separate (synchronous) CancelContext() context manager type.

with CancelContext() as ctx:
  SUITE

CancelContext.cancel() cancels the inner suite. Execution continues after the with-statement entering the cancelled scope unless the suite raises an exception (for clarity, CancelledError is suppressed). Callers can detect normal vs. cancel exit with try-else or CancelContext can expose a CancelContext.cancelled property.

An as example of using CancelContext, the config reload case then becomes:

async def subsys(config):
  while True:
    with CancelContext() as ctx:
      async with TaskGroup() as tg:
        tg.create_task(wait_changed(ctx.cancel, config))
        ... # now do whatever the config tells us to do

or factored out as a context manager for reuse:

@contextlib.asynccontextmanager
async def monitor(config):
  with CancelContext() as ctx:
    async with asyncio.TaskGroup() as tg:
      tg.create_task(wait_changed(ctx.cancel, config))
      yield

async def subsys(config):
  while True:
    async with monitor(config):
      ... # now do whatever the config tells us to do

Note that CancelContext is almost equivalent to running the statements of the inner suite in a separate task with cancel() implemented by cancelling the task and suppressing the CancelledError:

CancelContext

Separate Task

with CancelContext() as ctx:
  something.add_callback(ctx.cancel)
  async with asyncio.TaskGroup() as tg:
    ... do some stuff
async def do_stuff(...):
  async with asyncio.TaskGroup() as tg:
    ... do some stuff

async with asyncio.TaskGroup() as tg:
  t = tg.create_task(do_stuff(...))
  something.add_callback(t.cancel)

With this new type, TaskGroup.cancel() as imagined so far is semantically identical to the sample above wrapping TaskGroup in a CancelContext and calling CancelContext.cancel().

This also means uses of TaskGroup.cancel() should be able to be transformed to a separate task with almost the same semantics, but the goal should be a cancellable scope to avoid the required boilerplate.

Some other possible variations of CancelContext:

  • CancelContext.cancel() could instead raise CancelledError
  • CancelContext.throw(exc) could raise a user-provided exception once the inner suite is cancelled

Coming back around to TaskGroup, its main responsibilities are:

  • guarantee all tasks are finished upon context exit
  • propagate all exceptions raised other than tasks raising CancelledError
  • cancel all tasks and the suite if and only if an exception is propagated

These are already complex, so adding more should be done very carefully and ideally match the those somewhat closely. So what about CancelContext and its variations?

  • CancelContext.cancel() silently exiting the suite is surprising and does not match any exit behavior of TaskGroup.

    • TaskGroup never cancels tasks or the suite for a normal exit
    • TaskGroup never exits the suite normally when cancelling tasks
    • CancelContext.cancel() acts neither like task.cancel() nor the suite raising CancelledError
    • Also in general silently exiting would be subtle and error-prone.
  • CancelContext.cancel() raising CancelledError is consistent with non-normal exit always raising an exception for TaskGroup, but existing TaskGroup does not raise CancelledError except when the enclosing scope is cancelled.

    • Raising CancelledError would be a tricky pitfall for unconditionally swallowing cancel vs. external cancel
  • CancelContext.throw(exc) throwing a user-defined exception is quite close to the existing semantics

    • It is almost the same as TaskGroup.create_task() with a coroutine that unconditionally raises the exception. But then passing CancelledError would be suppressed, so probably this should be treated as-if the suite raised it.
    • The raised exception would probably be added unconditionally to the resulting ExceptionGroup

Meanwhile CancelContext semantics seem useful enough for implementing context managers that need to cancel an inner suite, and it can be implemented without access to TaskGroup internals.

As an example, consider the config reload sample as a context manager. config.watch() would await the config to be ready initially and cancel the inner suite when reloaded.

class Config:
  @contextlib.asynccontextmanager
  async def watch(self):
    await self.ready()
    with CancelContext() as c:
      self.add_callback(c.cancel)
      yield self

async def subsys(config):
  while True:
    async with config.watch():
      ... # now do whatever the config tells us to do

Parting observations:

  • CancelContext could be a model for a low-level API exposed by Task itself to properly scope nested cancel operations and clean up the subtle Task.cancel(), Task.cancelling(), Task.uncancel() dance once and for all.
  • CancelContext.throw(exc) also seems maybe useful for addressing the problems that motivate Asyncio: Send arbitrary exceptions to Tasks #102847.

Okay this got long so I'll try to summarize the main points:

  • the main value here seems to be to define a cancellable scope rather than a stoppable TaskGroup
  • it seems useful to reason about cancellable scope semantics separately from TaskGroup details
  • it seems like a separate question whether TaskGroup and CancelContext should be the same
  • silently exiting the suite when cancelled seems quite error prone and subtle
  • CancelContext.throw(exc) almost fits TaskGroup except its special-casing of CancelledError

@graingert
Copy link
Contributor

graingert commented Dec 31, 2024

I think we should keep discussions of what sort of name/arguments .cancel() should have here, but move whether the feature is useful or not back to discuss: https://discuss.python.org/t/how-to-cancel-all-tasks-created-within-same-taskgroup/30215

@smurfix
Copy link
Author

smurfix commented Dec 31, 2024

Okay this got long so I'll try to summarize the main points:

… or you could compare with Trio's (and anyio's) CancelScope which already do this.

Personally I don't particularly care whether these objects are a separate class, or simply a Taskgroup that happens not to have subtasks.

The problem is that if you create a separate cancellable scope, you run into a fundamental disparity between cancellation as it currently works on asyncio (it's a singular exception, for the most part) and Trio/anyio (it's a state of the scope: any current or subsequent async call might raise/re-raise the exception, which is why Trio introduced a shielding CancelScope that blocks this – essential in finally: blocks that need to do async calls).

anyio's CancelScopes basically work like Trio's. It goes to some lengths to paper over this disparity. ("Task still running? Well, cancel it again.") I'd expect some minor(?) compatibility problems if asyncio were to switch its paradigm.

@rrevans
Copy link

rrevans commented Jan 1, 2025

we should keep discussions of what sort of name/arguments .cancel() should have here

In my opinion:

TaskGroup.cancel() should take no arguments and cause TaskGroup.__aexit__ to raise CancelledError after cancelling the suite and its tasks.

TaskGroup.cancel() should have no effect at all if the TaskGroup has already exited or if the suite or any task raises an exception other than CancelledError (in which case an ExceptionGroup is raised instead).

When catching the CancelledError, callers must (as always) check current_task().cancelling() before swallowing the exception if the parent task is itself subject to cancellation.

This behavior is the same as arranging for the suite to raise CancelledError upon request except that the suite can execute other statements.

@arthur-tacca
Copy link

arthur-tacca commented Jan 1, 2025

@rrevans Perhaps this is just a language thing, but to me the tone of your most recent comment (with no "in my view" or "I would do it like this" etc.) suggests you're trying to summarise consensus. But what you've written is definitely not that. As far as I can tell, you're the only person to suggest that TaskGroup.cancel() should be allowed to propagate out of the context manager. That would defeat the point of the feature!

About your previous comment: As @smurfix said, you've rediscovered Trio's cancellation scopes, which are non-async context managers. I think it's a pity that asyncio didn't copy their design in task groups (and for timeouts) but that ship has sailed and there's no point going back over that now. The same goes for edge-based vs level-based cancellation (which Guido has explicitly said will never be in asyncio).

One thing occurred to me reading the bit of your comment about injecting a different cancellation exception: I think the problem you're actually trying to solve is detected why a cancellation occurred (was it because this task group / cancellation scope was manually cancelled?). Trio actually has a way to do this that would be really easy to copy here: the cancel_called and cancel_caught attributes. You could use it like this:

with trio.CancelScope() as cs:
    ...
    if something():
        cs.cancel()
    ...
if cs.cancel_caught:
    ...

(It feels more natural to use cancel_called, and in this snippet it probably wouldn't make any difference, but the docs say it's usually better to cancel_caught instead. They can differ if an outside cancellation happens at roughly the same time, which causes cancellation to propagate or rather than be caught, or if you cancel the scope after it's already closed.)

It's this possible for the application developer to wrap up this functionality in a context manager, which can re-raise a different exception if desired. Actually, that's exactly how trio.fail_after works. These facilities are also available on nurseries (Trio task groups) because they include a cancel scope as an attribute.

As a side note, you can already do something similar in asyncio already too, using asyncio.timeout:

try:
    with asyncio.timeout(None) as to:
        ...
        if something():
            to.reschedule(asyncio.current_loop().time() - 1)
        ...
except TimeoutError
    ...

A disadvantage of this approach is that there's no way to tell if the TimeoutError came from this score or some inner one. Another disadvantage is that it's clearly a really indirect and confusing usage of the API.

I'll cross-post the attribute idea on the Discuss topic too.

@rrevans
Copy link

rrevans commented Jan 2, 2025

propagate out of the context manager

TaskGroup raises an exception in all cases where it exits before successfully completing the suite and its tasks. I think .cancel() should also raise an exception because it has the same effect.

Why not .stop() and exit silently? Because asyncio and python at large prefer exceptions to communicate unusual conditions. I think most of the time callers will need to respond to cancel-exit as an unexpected outcome, so explicitly raising an exception is clear, natural, and consistent.

Why CancelledError? Because it matches Task.cancel(). Both would effectively raise CancelledError from the body of the thing being cancelled. This is the coroutine for Task and the with-statement for TaskGroup.

Callers might inadvertently swallow the exception if the parent is being cancelled, but I believe the risk of bad side-effects from that programming error are less severe that the risks of proceeding inadvertently past a cancelled TaskGroup.

would defeat the point of the feature

Maybe. My take is that the semantics of TaskGroup.cancel() should closely follow the behavior of Task.cancel() rather than trying to emulate Trio cancellation scopes.

@gvanrossum
Copy link
Member

I may be mistaken, but it appears this discussion is going around in circles. Why exactly was the title changed from stop to cancel? It seems we're not even on the same page about whether the desired API should exit the async with clause without error (if all tasks were cancelled without errors) or not.

Maybe it's time for the key participants to start collaborating on a PEP? If you can get a draft PEP ready and rough consensus around it I am willing to sponsoring it. I am proposing this as an alternative to continuing the debate without making progress for another 6 months.

One suggestion for a draft PEP: make minimal use of references to Trio. We can't expect the ultimate deciders (the SC; and before them myself or whichever core dev ends up sponsoring) to understand much of the ins and outs of Trio, so motivations of the form "this is how Trio does it" are not helpful for the deciders (especially since this appears largely about cancellation, and that's an are where asyncio and Trio differ fundamentally).

You're also all welcome to keep arguing like you have been, but there has to be some kind of consensus. Currently the discussion is hard to follow unless you have read the entire issue, which is unreasonable at this point.

@belm0
Copy link
Contributor

belm0 commented Jan 4, 2025

Why exactly was the title changed from stop to cancel?

Yury balked at stop() in the PR, so I changed it back to cancel().

Though I suggested starting a PEP when asking for the issue to be reopened, I wasn't imagining this level of controversy 😓

@gvanrossum
Copy link
Member

Why exactly was the title changed from stop to cancel?

Yury balked at stop() in the PR, so I changed it back.

But he was just asking why — not telling you to change it. You could maybe have answered “because this design doesn’t raise CancelledError out of the asynchronous with block”?

@kumaraditya303
Copy link
Contributor

One suggestion for a draft PEP: make minimal use of references to Trio. We can't expect the ultimate deciders (the SC; and before them myself or whichever core dev ends up sponsoring) to understand much of the ins and outs of Trio, so motivations of the form "this is how Trio does it" are not helpful for the deciders (especially since this appears largely about cancellation, and that's an are where asyncio and Trio differ fundamentally).

I agree with Guido, fwiw this is the reason I don't participate much in these discussions because to me it feels more about making asyncio behave like trio or anyio etc and I don't understand much of those projects. Both projects are fundamentally different in many regards for that to happen so IMO it would be better to propose concrete semantics in terms of how things happen in asyncio rather than any other such projects.

@gvanrossum
Copy link
Member

Though I suggested starting a PEP when asking for the issue to be reopened, I wasn't imagining this level of controversy 😓

It seems there are at least two dimensions of controversy, and it also seems we're going back and forth on at least one?

  • Whether it should raise CancelledError or exits cleanly after all tasks are cancelled cleanly (and from the outcome of this I would derive a preference for the name -- stop() if it exits cleanly, cancel() if it raises CancelledError -- though I see in he PR that you prefer cancel() regardless; nevertheless we should decide whether it raises CancelledError or not first, IMO).

  • Whether the use case is important enough to add such an API at all. Connected to this is whether it can be done without too much fuss and with the correct semantics using the existing APIs; or by making an existing internal API public, which would argue that it's not really adding new functionality. But it appears we have conclusively proven that we can't just promote an existing API, and the only correct way using existing APIs seems to be by adding a dummy task that raises a unique exception, and catching that, which is hardly clean. You also appear to have shown (by looking at the popularity of the corresponding Trio API) that the use case is important -- although I'd like to see at least a few of those Trio uses for myself to understand how it's used in Trio and whether we can really draw conclusions for asyncio from that.

Maybe you can try to address these bullets separately and drive consensus that way (instead of by drafting a PEP)? You'd have to go to battle a bit with the faction (or one person?) who want the async with block to raise CanceledError. Does that desire come largely out of the desire to name the API cancel()? Or is there a practical reason, like making the coding of various realistic-sounding examples more convenient? Arguments from a pure desire of consistency are sometimes red herrings.

Finally: If I'm the only person here who believes that if the async with block can exit cleanly, then the API should be called stop(), I'll give up on that.

@smurfix
Copy link
Author

smurfix commented Jan 4, 2025

I'd like to see at least a few of those Trio uses for myself

The canonical example is Happy Eyeballs-style concurrency, i.e. start a few tasks that all try to produce a result, the first one that succeeds wins, all the others get cancelled, you return the result of the winning task.

This is easy enough with a taskgroup that doesn't raise an exception when getting cancelled. If you don't have that, you need a wrapper to catch the exception. That wrapper needs to distinguish "the taskgroup has been (self-)cancelled" from "the cancellation was triggered externally and thus must propagate". That's boilerplate which people can and will get wrong. (Consider what happens when, due to scheduling quirks, these coincide …)

Another example: a network connection. When you decide to disconnect, you self-cancel the whole connection handling taskgroup (sometimes these are quite complex: send task, receive task, background ping task …) and you're done. Why would you propagate that cancellation? Worse, if the network connection was to retrieve a result from the other side, again you need that wrapper.

I'd ask the other way 'round: what's the usecase of propagating that cancellation? Because frankly I don't see one, esp. given that you can do it that way simply by cancelling the current task instead.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
stdlib Python modules in the Lib dir topic-asyncio type-feature A feature request or enhancement
Projects
Status: In Progress
Development

No branches or pull requests