Skip to content

Commit

Permalink
Merge pull request #144 from axiomiety/executor-double-acquire-fix
Browse files Browse the repository at this point in the history
_base.py: fix double acquire by TrioExecuto, issue #143
  • Loading branch information
oremanj authored Apr 18, 2024
2 parents 410b9be + 9d6103f commit ffbd11c
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 8 deletions.
3 changes: 3 additions & 0 deletions newsfragments/143.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
``TrioExecutor.submit``, called from :meth:`asyncio.loop.run_in_executor`, no longer acquires a token from its `~trio.CapacityLimiter` before calling `~trio.to_thread.run_sync` (which already does its own ``acquire()``).

The previous behaviour led to a double-acquire, leading each worker thread to require two tokens to run instead of one. Tasks could get stuck having acquired the first token but unable to acquire the second as part of `~trio.to_thread.run_sync`, leading to a deadlock.
15 changes: 15 additions & 0 deletions tests/test_trio_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,3 +176,18 @@ async def unshield_later():
]
assert trio.current_time() == 1.5 + (shield * 0.5)
assert scope.cancelled_caught == (not shield)


@pytest.mark.trio
async def test_executor_limiter_deadlock():
def noop():
pass

# capacity of 1 to catch a double-acquire
limiter = trio.CapacityLimiter(1)
executor = trio_asyncio.TrioExecutor(limiter=limiter)
async with trio_asyncio.open_loop() as loop:
with trio.move_on_after(1) as scope:
await trio_asyncio.aio_as_trio(loop.run_in_executor)(executor, noop)

assert not scope.cancelled_caught
10 changes: 2 additions & 8 deletions trio_asyncio/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,8 @@ def __init__(self, limiter=None, thread_name_prefix=None, max_workers=None):
async def submit(self, func, *args):
if not self._running: # pragma: no cover
raise RuntimeError("Executor is down")
lim = self._limiter
if lim is not None:
await lim.acquire()
try:
return await trio.to_thread.run_sync(func, *args, limiter=self._limiter)
finally:
if lim is not None:
lim.release()
# there is no need to call self._limiter.acquire() here, run_sync does it
return await trio.to_thread.run_sync(func, *args, limiter=self._limiter)

def shutdown(self, wait=None):
self._running = False
Expand Down

0 comments on commit ffbd11c

Please sign in to comment.