Skip to content

Commit

Permalink
GH-96764: rewrite asyncio.wait_for to use asyncio.timeout (#98518)
Browse files Browse the repository at this point in the history
Changes `asyncio.wait_for` to use `asyncio.timeout` as its underlying implementation.
  • Loading branch information
kumaraditya303 authored Feb 16, 2023
1 parent 226484e commit a5024a2
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 79 deletions.
77 changes: 29 additions & 48 deletions Lib/asyncio/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from . import events
from . import exceptions
from . import futures
from . import timeouts
from .coroutines import _is_coroutine

# Helper to generate new task names
Expand Down Expand Up @@ -437,65 +438,44 @@ async def wait_for(fut, timeout):
If the wait is cancelled, the task is also cancelled.
If the task supresses the cancellation and returns a value instead,
that value is returned.
This function is a coroutine.
"""
loop = events.get_running_loop()
# The special case for timeout <= 0 is for the following case:
#
# async def test_waitfor():
# func_started = False
#
# async def func():
# nonlocal func_started
# func_started = True
#
# try:
# await asyncio.wait_for(func(), 0)
# except asyncio.TimeoutError:
# assert not func_started
# else:
# assert False
#
# asyncio.run(test_waitfor())

if timeout is None:
return await fut

if timeout <= 0:
fut = ensure_future(fut, loop=loop)
if timeout is not None and timeout <= 0:
fut = ensure_future(fut)

if fut.done():
return fut.result()

await _cancel_and_wait(fut, loop=loop)
await _cancel_and_wait(fut)
try:
return fut.result()
except exceptions.CancelledError as exc:
raise exceptions.TimeoutError() from exc

waiter = loop.create_future()
timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
cb = functools.partial(_release_waiter, waiter)

fut = ensure_future(fut, loop=loop)
fut.add_done_callback(cb)

try:
# wait until the future completes or the timeout
try:
await waiter
except exceptions.CancelledError:
if fut.done():
return fut.result()
else:
fut.remove_done_callback(cb)
# We must ensure that the task is not running
# after wait_for() returns.
# See https://bugs.python.org/issue32751
await _cancel_and_wait(fut, loop=loop)
raise

if fut.done():
return fut.result()
else:
fut.remove_done_callback(cb)
# We must ensure that the task is not running
# after wait_for() returns.
# See https://bugs.python.org/issue32751
await _cancel_and_wait(fut, loop=loop)
# In case task cancellation failed with some
# exception, we should re-raise it
# See https://bugs.python.org/issue40607
try:
return fut.result()
except exceptions.CancelledError as exc:
raise exceptions.TimeoutError() from exc
finally:
timeout_handle.cancel()
raise TimeoutError from exc

async with timeouts.timeout(timeout):
return await fut

async def _wait(fs, timeout, return_when, loop):
"""Internal helper for wait().
Expand Down Expand Up @@ -541,9 +521,10 @@ def _on_completion(f):
return done, pending


async def _cancel_and_wait(fut, loop):
async def _cancel_and_wait(fut):
"""Cancel the *fut* future or task and wait until it completes."""

loop = events.get_running_loop()
waiter = loop.create_future()
cb = functools.partial(_release_waiter, waiter)
fut.add_done_callback(cb)
Expand Down
7 changes: 3 additions & 4 deletions Lib/test/test_asyncio/test_futures2.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,9 @@ async def test_recursive_repr_for_pending_tasks(self):
async def func():
return asyncio.all_tasks()

# The repr() call should not raise RecursiveError at first.
# The check for returned string is not very reliable but
# exact comparison for the whole string is even weaker.
self.assertIn('...', repr(await asyncio.wait_for(func(), timeout=10)))
# The repr() call should not raise RecursionError at first.
waiter = await asyncio.wait_for(asyncio.Task(func()),timeout=10)
self.assertIn('...', repr(waiter))


if __name__ == '__main__':
Expand Down
127 changes: 100 additions & 27 deletions Lib/test/test_asyncio/test_waitfor.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,33 +237,6 @@ async def inner():
with self.assertRaises(FooException):
await foo()

async def test_wait_for_self_cancellation(self):
async def inner():
try:
await asyncio.sleep(0.3)
except asyncio.CancelledError:
try:
await asyncio.sleep(0.3)
except asyncio.CancelledError:
await asyncio.sleep(0.3)

return 42

inner_task = asyncio.create_task(inner())

wait = asyncio.wait_for(inner_task, timeout=0.1)

# Test that wait_for itself is properly cancellable
# even when the initial task holds up the initial cancellation.
task = asyncio.create_task(wait)
await asyncio.sleep(0.2)
task.cancel()

with self.assertRaises(asyncio.CancelledError):
await task

self.assertEqual(await inner_task, 42)

async def _test_cancel_wait_for(self, timeout):
loop = asyncio.get_running_loop()

Expand All @@ -289,6 +262,106 @@ async def test_cancel_blocking_wait_for(self):
async def test_cancel_wait_for(self):
await self._test_cancel_wait_for(60.0)

async def test_wait_for_cancel_suppressed(self):
# GH-86296: Supressing CancelledError is discouraged
# but if a task subpresses CancelledError and returns a value,
# `wait_for` should return the value instead of raising CancelledError.
# This is the same behavior as `asyncio.timeout`.

async def return_42():
try:
await asyncio.sleep(10)
except asyncio.CancelledError:
return 42

res = await asyncio.wait_for(return_42(), timeout=0.1)
self.assertEqual(res, 42)


async def test_wait_for_issue86296(self):
# GH-86296: The task should get cancelled and not run to completion.
# inner completes in one cycle of the event loop so it
# completes before the task is cancelled.

async def inner():
return 'done'

inner_task = asyncio.create_task(inner())
reached_end = False

async def wait_for_coro():
await asyncio.wait_for(inner_task, timeout=100)
await asyncio.sleep(1)
nonlocal reached_end
reached_end = True

task = asyncio.create_task(wait_for_coro())
self.assertFalse(task.done())
# Run the task
await asyncio.sleep(0)
task.cancel()
with self.assertRaises(asyncio.CancelledError):
await task
self.assertTrue(inner_task.done())
self.assertEqual(await inner_task, 'done')
self.assertFalse(reached_end)


class WaitForShieldTests(unittest.IsolatedAsyncioTestCase):

async def test_zero_timeout(self):
# `asyncio.shield` creates a new task which wraps the passed in
# awaitable and shields it from cancellation so with timeout=0
# the task returned by `asyncio.shield` aka shielded_task gets
# cancelled immediately and the task wrapped by it is scheduled
# to run.

async def coro():
await asyncio.sleep(0.01)
return 'done'

task = asyncio.create_task(coro())
with self.assertRaises(asyncio.TimeoutError):
shielded_task = asyncio.shield(task)
await asyncio.wait_for(shielded_task, timeout=0)

# Task is running in background
self.assertFalse(task.done())
self.assertFalse(task.cancelled())
self.assertTrue(shielded_task.cancelled())

# Wait for the task to complete
await asyncio.sleep(0.1)
self.assertTrue(task.done())


async def test_none_timeout(self):
# With timeout=None the timeout is disabled so it
# runs till completion.
async def coro():
await asyncio.sleep(0.1)
return 'done'

task = asyncio.create_task(coro())
await asyncio.wait_for(asyncio.shield(task), timeout=None)

self.assertTrue(task.done())
self.assertEqual(await task, "done")

async def test_shielded_timeout(self):
# shield prevents the task from being cancelled.
async def coro():
await asyncio.sleep(0.1)
return 'done'

task = asyncio.create_task(coro())
with self.assertRaises(asyncio.TimeoutError):
await asyncio.wait_for(asyncio.shield(task), timeout=0.01)

self.assertFalse(task.done())
self.assertFalse(task.cancelled())
self.assertEqual(await task, "done")


if __name__ == '__main__':
unittest.main()
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
:func:`asyncio.wait_for` now uses :func:`asyncio.timeout` as its underlying implementation. Patch by Kumar Aditya.

0 comments on commit a5024a2

Please sign in to comment.