Skip to content

Commit

Permalink
Merge pull request #156 from OFFIS-DAI/scheduling-error-handling-canc…
Browse files Browse the repository at this point in the history
…elling

Scheduling error handling cancelling
  • Loading branch information
rcschrg authored Jan 27, 2025
2 parents 3695e9f + 44aad87 commit a7a7e08
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 8 deletions.
4 changes: 0 additions & 4 deletions mango/agent/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,10 +487,6 @@ async def shutdown(self):
await self._check_inbox_task
except asyncio.CancelledError:
pass
try:
await self.scheduler.stop()
except asyncio.CancelledError:
pass
try:
await self.scheduler.shutdown()
except asyncio.CancelledError:
Expand Down
21 changes: 17 additions & 4 deletions mango/util/scheduling.py
Original file line number Diff line number Diff line change
Expand Up @@ -894,13 +894,21 @@ def _remove_generic_task(self, target_list, fut=asyncio.Future):
del target_list[i]
break

async def stop_tasks(self, task_list):
for i in range(len(task_list) - 1, -1, -1):
_, task, _, _ = task_list[i]
task.cancel()
try:
await task
except asyncio.CancelledError:
pass

async def stop(self):
"""
Cancel all not finished scheduled tasks
"""
for _, task, _, _ in self._scheduled_tasks + self._scheduled_process_tasks:
task.cancel()
await task
await self.stop_tasks(self._scheduled_tasks)
await self.stop_tasks(self._scheduled_process_tasks)

async def tasks_complete(self, timeout=1, recursive=False):
"""Finish all pending tasks using a timeout.
Expand Down Expand Up @@ -945,8 +953,13 @@ async def shutdown(self):
# resume all process so they can get shutdown
for _, _, scheduled_process_control, _ in self._scheduled_process_tasks:
scheduled_process_control.kill_process()
if len(self._scheduled_tasks) > 0:
logger.debug(
"There are still scheduled tasks running on shutdown %s",
self._scheduled_tasks,
)
await self.stop()
for task, _, _, _ in self._scheduled_tasks:
task.close()
await self.stop()
if self._process_pool_exec is not None:
self._process_pool_exec.shutdown()
21 changes: 21 additions & 0 deletions tests/unit_tests/core/test_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,24 @@ async def run_this(c):
assert agent2.test_counter == 1

asyncio.run(run_this(c))


async def do_weird_stuff():
fut = asyncio.Future()
await fut


@pytest.mark.asyncio
async def test_agent_with_deadlock_task():
# GIVEN
c = create_tcp_container(addr=("127.0.0.1", 5555))
agent = c.register(MyAgent())

async with activate(c) as c:
t = agent.schedule_instant_task(do_weird_stuff())
t = agent.schedule_instant_task(do_weird_stuff())
t = agent.schedule_instant_task(do_weird_stuff())
t = agent.schedule_instant_task(do_weird_stuff())

# THEN
assert len(agent.scheduler._scheduled_tasks) == 0

0 comments on commit a7a7e08

Please sign in to comment.