-
-
Notifications
You must be signed in to change notification settings - Fork 30.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add an asyncio.TaskGroup.cancel
method
#108951
Comments
I am afraid that I don't understand the problem well enough. Could you explain the need for this and the desired behavior in edge cases better? A bullet like "what about nested taskgroups" makes me wonder -- what about them? I have no idea what structure your app has (and I haven't been following Trio or anyio so referencing that doesn't help me). |
Consider this test program. import asyncio
async def t1():
async with asyncio.TaskGroup() as tg:
tg.create_task(asyncio.sleep(1))
tg._parent_task.cancel() # asyncio.current-task().cancel()
assert False, "not reached"
try:
asyncio.run(t1())
assert False, "not reached"
except asyncio.CancelledError:
print("Cancelled") The cancellation always bubbles out of the taskgroup. Here's the anyio equivalent: import anyio
async def t2():
async with anyio.create_task_group() as tg:
tg.start_soon(anyio.sleep,1)
tg.cancel_scope.cancel()
print("after anyio cancel")
anyio.run(t2) While asyncio doesn't have cancel scopes, that's an implementation detail. The point is that, when cancelled via its own As to code structure, let's assume I have a complex program with parts that want to be restarted separately if/when their configuration changes. With a cancel-able taskgroup, this is easy, each subpart can do async def subsys(config):
while True:
async with asyncio.TaskGroup() as tg:
tg.create_task(wait_changed(tg.cancel, config))
... # now do whatever the config tells us to do where This is difficult to do without |
It feels, at least to me, that "cancel" elicits all the wrong connotations, in an asyncio context, because it sounds like it would cause a CancelledError exception to propagate out. It looks like the functionality you are looking for is a way to stop all the tasks running in the task group but then make the task group's If you come up with a name other than "cancel" (maybe "stop"? but up to you) and a PR that implements it, I will consider it. Try not to use Trio/anyio as guidance or model, since we should not assume asyncio users are familiar with those. |
Can you please refrain from this kind of negative language? It's not helping your case. |
Sorry. Reworded. |
Will do. |
I had some time today to experiment with this. Here's a patch that seems to do the very basic thing: diff --git Lib/asyncio/taskgroups.py Lib/asyncio/taskgroups.py
index f2ee9648c43..968a539f5dd 100644
--- Lib/asyncio/taskgroups.py
+++ Lib/asyncio/taskgroups.py
@@ -150,25 +150,34 @@ async def __aexit__(self, et, exc, tb):
# cycles (bad for GC); let's not keep a reference to
# a bunch of them.
try:
- me = BaseExceptionGroup('unhandled errors in a TaskGroup', self._errors)
- raise me from None
+ raise BaseExceptionGroup(
+ 'unhandled errors in a TaskGroup',
+ self._errors
+ ) from None
+ except* _TaskGroupStop:
+ pass
finally:
self._errors = None
+ # Direct `.stop()` call in `TaskGroup` body, like:
+ #
+ # async with TaskGroup() as tg:
+ # if some_condition:
+ # tg.stop() <- here
+ if issubclass(et, _TaskGroupStop):
+ return True
+
+ def stop(self):
+ """Stop all the tasks in the task group."""
+ self._check_state()
+ raise _TaskGroupStop()
+
def create_task(self, coro, *, name=None, context=None):
"""Create a new task in this group and return it.
Similar to `asyncio.create_task`.
"""
- if not self._entered:
- coro.close()
- raise RuntimeError(f"TaskGroup {self!r} has not been entered")
- if self._exiting and not self._tasks:
- coro.close()
- raise RuntimeError(f"TaskGroup {self!r} is finished")
- if self._aborting:
- coro.close()
- raise RuntimeError(f"TaskGroup {self!r} is shutting down")
+ self._check_state(coro)
if context is None:
task = self._loop.create_task(coro, name=name)
else:
@@ -184,6 +193,19 @@ def create_task(self, coro, *, name=None, context=None):
task.add_done_callback(self._on_task_done)
return task
+ def _check_state(self, coro=None):
+ error = None
+ if not self._entered:
+ error = RuntimeError(f"TaskGroup {self!r} has not been entered")
+ if self._exiting and not self._tasks:
+ error = RuntimeError(f"TaskGroup {self!r} is finished")
+ if self._aborting:
+ error = RuntimeError(f"TaskGroup {self!r} is shutting down")
+ if error is not None:
+ if coro is not None:
+ coro.close()
+ raise error
+
# Since Python 3.8 Tasks propagate all exceptions correctly,
# except for KeyboardInterrupt and SystemExit which are
# still considered special.
@@ -250,3 +272,7 @@ def _on_task_done(self, task):
self._abort()
self._parent_cancel_requested = True
self._parent_task.cancel()
+
+
+class _TaskGroupStop(Exception):
+ """Exception to stop all the tasks in a task group.""" It allows to stop the
I used this code to test the concept: import asyncio
from asyncio import TaskGroup
from random import random
async def job(i, group=None):
while True:
print(f"Task {i}")
await asyncio.sleep(1)
if random() < 0.4:
print(f"Task {i} cancelling others")
group.stop()
return
async def main():
async with TaskGroup() as group:
group.create_task(job(1, group))
group.create_task(job(2, group))
# TODO: try this as well
# group.stop()
asyncio.run(main()) It seems to work for the general case, but I am not confident enough in it to submit a PR. Maybe someone else can improve it :) |
Playing the devil's advocate but should we care of a async with TaskGroup() as group:
group.create_task(job(1, group))
try:
group.stop()
except:
pass Alternatively, can we use an event + waiter task to cancel the tasks in the group? or would this solution too tricky to implement? |
I don't know :) |
I forget if I suggested this before -- I presume you could do something like add a new task to the task group that self-cancels, and then do something like |
Following @gvanrossum's idea, this can be done on unchanged import asyncio
from asyncio import TaskGroup
from contextlib import asynccontextmanager
from random import random
class Stop(Exception):
"""Stop the task group."""
@asynccontextmanager
async def stoppable_task_group():
try:
async with TaskGroup() as group:
yield group
except* Stop as ex:
print(f'{ex} was stopped')
async def stop(group):
async def factory():
raise Stop(group)
await group.create_task(factory())
async def job(i, group=None):
while True:
print(f"Task {i}")
await asyncio.sleep(1)
if random() < 0.4:
print(f"Task {i} cancelling others")
await stop(group)
return
async def main():
async with stoppable_task_group() as group:
group.create_task(job(1, group))
group.create_task(job(2, group))
asyncio.run(main()) I changed |
Personally,
|
Could one of you elaborate why we shouldn't just use CancelledError? |
@gvanrossum I tried the same exact example code but with
Notice: I can also get cases like:
I am not sure exactly why, but it does not happen with custom exceptions. This might be a bug on its own. |
Oh, it's because of this comment (part of a docstring) in taskgroups.py: Any exceptions other than `asyncio.CancelledError` raised within
a task will cancel all remaining tasks and wait for them to exit. There are numerous special cases for CancelledError that I don't feel like understanding right now, but I'm sure some of them are related to this documented behavior. It's not a bug. Befor we go ahead with documenting the pattern for stopping a task group we should probably reason through or try via experiments how things should work when there are several "nested" task groups. ("Nested" in quotes because there's no concept of nesting task groups, but we certainly can create a task group in a task associated with another task group, and then we can consider the latter group the "parent" of the former.
|
Trio / AnyIO have clear anwsers to these complex questions thanks to CancelScopes:
I feel like this abstraction allows to clearly see what and where we want to bubble up or to suppress. Basic example of how we can try to think about different task groups: async def nested_group(cancel_scope):
async with TaskGroup(cancel_scope=cancel_scope) as tg2:
...
async def main():
async with TaskGroup() as tg:
# cancelling of `tg2` will cancel `tg`, since they are in the same scope:
tg.create_task(nested_group(tg.cancel_scope))
# cancelling of `tg2` here will not bubble up, since they are independent:
tg.create_task(nested_group(None)) However, even with scopes there are lots of cases that can break the flow: https://anyio.readthedocs.io/en/stable/cancellation.html#avoiding-cancel-scope-stack-corruption |
Sorry, I have never managed to understand cancel scopes in Trio. And the models are just so different that the Trio abstractions just can't help asyncio. |
We don't want to add another API, since the recipe is straightforward and rarely needed. The advantage is that we could backport this to the earliest Python version that has taskgroups (3.11, alas in security mode already, so we'll just do 3.12 and 3.13).
…onGH-123837) We don't want to add another API, since the recipe is straightforward and rarely needed. The advantage is that we could backport this to the earliest Python version that has taskgroups (3.11, alas in security mode already, so we'll just do 3.12 and 3.13). (cherry picked from commit ef05801) Co-authored-by: Bénédikt Tran <[email protected]>
…onGH-123837) We don't want to add another API, since the recipe is straightforward and rarely needed. The advantage is that we could backport this to the earliest Python version that has taskgroups (3.11, alas in security mode already, so we'll just do 3.12 and 3.13). (cherry picked from commit ef05801) Co-authored-by: Bénédikt Tran <[email protected]>
That's sad-- would it be reasonable to stop the task group at enter rather than the first await? Maybe that isn't possible... then relax the RuntimeError on aborting in this case? The error seems questionable in general. PR #127214 is in progress |
You can't stop the taskgroup "at enter" because an exception raised in While it may or may not be possible to fix that — notionally some special exception raised by |
smurfix addressed the first part of this. So what about suppressing the error in this case, or indeed always (if you find it questionable)? Actually, this error is very useful. It provides the guarantee that a task group will never silently discard a request (though sadly that is only true if using eager tasks – see #116048). Then you can write code that consumes and reliably closes some resource, like a database connection: async def uses_connection(conn, work):
try:
await do_some_work(conn, work)
finally:
conn.close()
async def do_multiple():
async with asyncio.TaskGroup() as tg:
for i in range(my_count):
conn = await create_connection()
try:
tg.create_task(uses_connection(conn, i))
except RuntimeError:
conn.close()
raise The above code should safely close the connection, regardless of what happens with cancellation and exceptions (again, so long as you're using eager tasks). |
|
why not to use:
then: |
This doesn't cancel the tasks soon enough, it waits for an event loop cycle, it raises a CancelledError that then needs to propagate out the TaskGroup body, and it raises a TimeoutError that needs careful management to suppresses correctly. timeout could be fixed to cancel immediately when the deadline is exceeded (but you still need to wait for the cancellation to be raised and propagated), and have a exc=None kwarg to suppress the TimeputError |
I have to admit that I'm strongly +1 on IIRC Guido has objected to that because |
from
I don't agree this is necessary. If acknowledgement that a request was received is needed, it can be done at a higher level rather than relying on the task group API.
Maybe this example was contrived, but I didn't understand why the scope that created the connection isn't responsible for closing it (either via try/finally or a context manager). |
As stated, calling create_task() on a task group that is cancelling yields a RuntimeError. I'll try making a case against this behavior. Raising a generic RuntimeError usually implies you're doing something wrong-- like a logical error that cannot be resolved at runtime. It says "don't catch me explicitly" -- you'd have to parse the error string to do so, after all. While unlucky, I don't think it's wrong to call create_task() on a task group in cancelling state. I view the following as an API inconsistency. Why is it OK (i.e. not a RuntimeError) to call create_task() on a task group that is essentially dead another way, namely within an expired timeout: async with timeout(0):
async with asyncio.TaskGroup() as tg:
tg.create_task(asyncio.sleep(1)) # not a RuntimeError
# __aexit__ produces the expected TimeoutError |
IMHO the contract should be that either the Anything else risks data loss. Trio does the latter. For both compatibility and improved error handling (you only need to catch the problem at one point, inside the task, where you need to handle it anyway) I recommend following its example. This example prints "Yes" before propagating the
|
as long as the taskgroup's |
A problem with I understand that when TaskGroup was first authored, this was an edge case that may not have been given much thought (especially prior to a feature of being able to cancel task groups), and the conservative thing to do was raise a runtime error. But raising an error (and especially as RuntimeError) is not clearly helpful in practice, and I hope this decision can be revisited. |
@belm0 You quoted me but attributed it to graingert. For a fairly practical example of where a task group produces a resource and some tasks consume them, see this example using By the way, the first call to
@belm0 Sorry I didn't actually answer your question initially. On the contrary, it would contrived if the connection closure didn't happen until the whole task group shuts down. If the connection sends a grateful shutdown request, or causes some fatal (for the connection) error, then it needs to be closed, and only the task representing the connection's lifetime would know to do that. The task group is for the whole length of time you can accept connections, which is usually much longer, potentially the whole lifetime of the server; if you wait for that to finish then you'll leak connections over time. Side note 1: Yes, you could also close connections when the task group shuts, as a backup for if the connection's task misses it. In fact I mentioned that in that issue (in the comment before). But you have to do some extra admin for that, and for no other reason than the missing run-till-await guarantee. As I said in that bug report: it looks suspiciously like a workaround for a bug because, IMO, it is. Side note 2: This is really off topic, but actually a more careful application may well have a different scope for connection creation and connection cleanup. That way, when the overall server shuts down, it can immediately stop accepting new connections (cancel listener task group immediately) while still giving some some time to allow gracefully shutting down existing connections (so, maybe surprisingly, the connection task group is outside the listener task group). |
taskgroup.cancel
methodasyncio.TaskGroup.cancel
method
Many of the use cases mentioned above can be implemented without Also maybe I'm misunderstanding, but it seems all cases could be implemented by wrapping the group in a Race-to-the-finish with two calls (e.g. happy eyeballs use case)await with asyncio.TaskGroup() as g:
t1 = g.create_task(...)
t2 = g.create_task(...)
t1.add_done_callback(lambda _: t2.cancel())
t2.add_done_callback(lambda _: t1.cancel()) Or Explicit stop signal sent to a task groupstop = asyncio.Event()
await with asyncio.TaskGroup() as g:
# create some tasks
await stop.wait()
raise asyncio.CancelledError() Call Tracking tasks from
|
Well. Possibly. However …
… unraveling thread spaghetti is exactly what taskgroups are supposed to shield the programmer from. Also, when you cancel the task you preclude it from returning a result. "Return a value by assigning it to some variable in an outer scope" (or to an object's attribute) is an anti-pattern I'd like to be able to avoid. |
Maybe this is really about avoiding separate functions / objects for the state? I agree there's merit to keeping that wiring together in one scope to avoid clumsy decomposition into separate tasks. So then the main goal becomes creating a cancellable scope within a function. I think it's helpful to consider the semantics such a cancellable scope should have separately from the possibility of integrating it into the existing For exposition, suppose the parts of with CancelContext() as ctx:
SUITE
An as example of using async def subsys(config):
while True:
with CancelContext() as ctx:
async with TaskGroup() as tg:
tg.create_task(wait_changed(ctx.cancel, config))
... # now do whatever the config tells us to do or factored out as a context manager for reuse: @contextlib.asynccontextmanager
async def monitor(config):
with CancelContext() as ctx:
async with asyncio.TaskGroup() as tg:
tg.create_task(wait_changed(ctx.cancel, config))
yield
async def subsys(config):
while True:
async with monitor(config):
... # now do whatever the config tells us to do Note that
With this new type, This also means uses of Some other possible variations of
Coming back around to
These are already complex, so adding more should be done very carefully and ideally match the those somewhat closely. So what about
Meanwhile As an example, consider the config reload sample as a context manager. class Config:
@contextlib.asynccontextmanager
async def watch(self):
await self.ready()
with CancelContext() as c:
self.add_callback(c.cancel)
yield self
async def subsys(config):
while True:
async with config.watch():
... # now do whatever the config tells us to do Parting observations:
Okay this got long so I'll try to summarize the main points:
|
I think we should keep discussions of what sort of name/arguments .cancel() should have here, but move whether the feature is useful or not back to discuss: https://discuss.python.org/t/how-to-cancel-all-tasks-created-within-same-taskgroup/30215 |
… or you could compare with Trio's (and anyio's) Personally I don't particularly care whether these objects are a separate class, or simply a The problem is that if you create a separate cancellable scope, you run into a fundamental disparity between cancellation as it currently works on asyncio (it's a singular exception, for the most part) and Trio/anyio (it's a state of the scope: any current or subsequent async call might raise/re-raise the exception, which is why Trio introduced a shielding
|
In my opinion:
When catching the This behavior is the same as arranging for the suite to raise |
@rrevans Perhaps this is just a language thing, but to me the tone of your most recent comment (with no "in my view" or "I would do it like this" etc.) suggests you're trying to summarise consensus. But what you've written is definitely not that. As far as I can tell, you're the only person to suggest that About your previous comment: As @smurfix said, you've rediscovered Trio's cancellation scopes, which are non-async context managers. I think it's a pity that asyncio didn't copy their design in task groups (and for timeouts) but that ship has sailed and there's no point going back over that now. The same goes for edge-based vs level-based cancellation (which Guido has explicitly said will never be in asyncio). One thing occurred to me reading the bit of your comment about injecting a different cancellation exception: I think the problem you're actually trying to solve is detected why a cancellation occurred (was it because this task group / cancellation scope was manually cancelled?). Trio actually has a way to do this that would be really easy to copy here: the with trio.CancelScope() as cs:
...
if something():
cs.cancel()
...
if cs.cancel_caught:
... (It feels more natural to use It's this possible for the application developer to wrap up this functionality in a context manager, which can re-raise a different exception if desired. Actually, that's exactly how As a side note, you can already do something similar in asyncio already too, using try:
with asyncio.timeout(None) as to:
...
if something():
to.reschedule(asyncio.current_loop().time() - 1)
...
except TimeoutError
... A disadvantage of this approach is that there's no way to tell if the I'll cross-post the attribute idea on the Discuss topic too. |
Why not Why Callers might inadvertently swallow the exception if the parent is being cancelled, but I believe the risk of bad side-effects from that programming error are less severe that the risks of proceeding inadvertently past a cancelled
Maybe. My take is that the semantics of |
I may be mistaken, but it appears this discussion is going around in circles. Why exactly was the title changed from Maybe it's time for the key participants to start collaborating on a PEP? If you can get a draft PEP ready and rough consensus around it I am willing to sponsoring it. I am proposing this as an alternative to continuing the debate without making progress for another 6 months. One suggestion for a draft PEP: make minimal use of references to Trio. We can't expect the ultimate deciders (the SC; and before them myself or whichever core dev ends up sponsoring) to understand much of the ins and outs of Trio, so motivations of the form "this is how Trio does it" are not helpful for the deciders (especially since this appears largely about cancellation, and that's an are where asyncio and Trio differ fundamentally). You're also all welcome to keep arguing like you have been, but there has to be some kind of consensus. Currently the discussion is hard to follow unless you have read the entire issue, which is unreasonable at this point. |
Yury balked at stop() in the PR, so I changed it back to cancel(). Though I suggested starting a PEP when asking for the issue to be reopened, I wasn't imagining this level of controversy 😓 |
But he was just asking why — not telling you to change it. You could maybe have answered “because this design doesn’t raise CancelledError out of the |
I agree with Guido, fwiw this is the reason I don't participate much in these discussions because to me it feels more about making asyncio behave like trio or anyio etc and I don't understand much of those projects. Both projects are fundamentally different in many regards for that to happen so IMO it would be better to propose concrete semantics in terms of how things happen in asyncio rather than any other such projects. |
It seems there are at least two dimensions of controversy, and it also seems we're going back and forth on at least one?
Maybe you can try to address these bullets separately and drive consensus that way (instead of by drafting a PEP)? You'd have to go to battle a bit with the faction (or one person?) who want the Finally: If I'm the only person here who believes that if the |
The canonical example is Happy Eyeballs-style concurrency, i.e. start a few tasks that all try to produce a result, the first one that succeeds wins, all the others get cancelled, you return the result of the winning task. This is easy enough with a taskgroup that doesn't raise an exception when getting cancelled. If you don't have that, you need a wrapper to catch the exception. That wrapper needs to distinguish "the taskgroup has been (self-)cancelled" from "the cancellation was triggered externally and thus must propagate". That's boilerplate which people can and will get wrong. (Consider what happens when, due to scheduling quirks, these coincide …) Another example: a network connection. When you decide to disconnect, you self-cancel the whole connection handling taskgroup (sometimes these are quite complex: send task, receive task, background ping task …) and you're done. Why would you propagate that cancellation? Worse, if the network connection was to retrieve a result from the other side, again you need that wrapper. I'd ask the other way 'round: what's the usecase of propagating that cancellation? Because frankly I don't see one, esp. given that you can do it that way simply by cancelling the current task instead. |
Feature or enhancement
Proposal:
Trio/anyio have a way to cancel all tasks within a taskgroup, via
taskgroup.cancel_scope.cancel()
.Asyncio doesn't have cancel scopes. The way to currently do this is to cancel the task which the taskgroup is a child of, but there are problems with this approach:
To fix this, I propose to add a
taskgroup.cancel
method which behaves liketask.cancel
if the taskgroup is still active (i.e. not aborting).If this change is likely to be accepted I'm OK with writing a PR.
Has this already been discussed elsewhere?
I have already discussed this feature proposal on Discourse
Links to previous discussion of this feature:
https://discuss.python.org/t/how-to-cancel-all-tasks-created-within-same-taskgroup/30215
Linked PRs
asyncio.TaskGroup
#123837The text was updated successfully, but these errors were encountered: