-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
terminating a simulation with a task which was not awaited has a bad stacktrace #62
Comments
Hello @maurerle, thats really interesting. I think it really is a rare edge case but definetly not that beautiful. It is caused by the suspendable feature of scheduler tasks. You can use Scheduler(suspendable=False), or even Agent(suspendable_tasks=False) to deactivate it, if you do not need your tasks to be suspendable. Would this be sufficient? |
Good find, thanks. from mango import RoleAgent, create_container, Role
import asyncio
class Caller(Role):
def setup(self):
super().setup()
self.context.schedule_timestamp_task(
coroutine=self.send_hello_world(self.context.addr, self.context.aid),
timestamp=self.context.current_timestamp + 5,
)
async def send_hello_world(self, receiver_addr, receiver_id):
await self.send_acl_message(
receiver_addr=receiver_addr, receiver_id=receiver_id, content="Hello World"
)
async def main():
mr = Caller()
container = await create_container(addr=("0.0.0.0", 9099), connection_type="tcp")
ag = RoleAgent(container, suspendable_tasks=False)
ag.add_role(mr)
if __name__ == "__main__":
try:
asyncio.run(main())
# exception handler does not help
except asyncio.CancelledError:
pass
This does only seem to occur when using RoleAgents? |
What happens: The cancellation happens because the asyncio loop has to be closed, so all open tasks are canceled. In your example, the task is never awaited; right after you create the task, your coroutine main() finishes, so the task is never executed. From what I understood, this is intended in the example, right? (if not, you might wanna await shutdown() (agents', containers') and or task_complete()). And you want to catch the exception if that happens for other reasons and it is simplified here. The whole problem here is asyncios implementation: to_cancel = tasks.all_tasks(loop)
if not to_cancel:
return
for task in to_cancel:
task.cancel()
loop.run_until_complete(tasks.gather(*to_cancel, return_exceptions=True))
for task in to_cancel:
if task.cancelled():
continue
if task.exception() is not None:
loop.call_exception_handler({
'message': 'unhandled exception during asyncio.run() shutdown',
'exception': task.exception(),
'task': task,
}) What I understand from this is that the exception is never raised to be caught from outside but handled purely within asyncio using call_exception_handler. So there should not be a possibility of catching the Exception when using asyncio.run(). By the way, it also happens with plain Agents. from mango import Agent, create_container
import asyncio
class Caller(Agent):
def __init__(
self,
container,
suggested_aid: str = None,
suspendable_tasks=True,
observable_tasks=True,
):
super().__init__(container, suggested_aid, suspendable_tasks, observable_tasks)
self.schedule_timestamp_task(
coroutine=self.send_hello_world(self.addr, self.aid),
timestamp=self.current_timestamp + 5,
)
async def send_hello_world(self, receiver_addr, receiver_id):
await self.send_acl_message(
receiver_addr=receiver_addr, receiver_id=receiver_id, content="Hello World"
)
async def main():
container = await create_container(addr=("0.0.0.0", 9099), connection_type="tcp")
Caller(container, suspendable_tasks=False)
if __name__ == "__main__":
try:
asyncio.run(main())
except asyncio.exceptions.CancelledError:
print("Catch it")
pass Maybe you can provide more information on the specific case, the error occurs in your simulation? |
Hi @rcschrg , thanks for analyzing this. Background: I have a market, which schedules hourly openings, but has this error if the next opening is scheduled after the end time of the simulation. I now have a minimal example, which failes in a weird way, even though: from mango import Agent, create_container
from mango.util.clock import ExternalClock
import asyncio
from datetime import datetime, timedelta
import calendar
from tqdm import tqdm
class Caller(Agent):
def __init__(
self,
container,
schedule_time:datetime,
suggested_aid: str = None,
suspendable_tasks=True,
observable_tasks=True,
):
super().__init__(container, suggested_aid, suspendable_tasks, observable_tasks)
self.schedule_timestamp_task(
coroutine=self.send_hello_world(self.addr, self.aid),
timestamp=schedule_time.timestamp(),
)
async def send_hello_world(self, receiver_addr, receiver_id):
await self.send_acl_message(
receiver_addr=receiver_addr, receiver_id=receiver_id, content=f"Hello World {self.current_timestamp}"
)
def handle_message(self, content, meta):
print("got", content)
class World():
def __init__(self, start: datetime, end: datetime):
self.clock = ExternalClock(0)
self.start = start
self.end = end
self.loop = asyncio.get_event_loop()
asyncio.set_event_loop(self.loop)
async def setup(self):
self.container = await create_container(
connection_type="tcp",
addr=("localhost", 9099),
clock=self.clock,
)
# the caller wants to send a message after the end of the simulation
self.caller = Caller(self.container, suspendable_tasks=True, schedule_time=self.end+timedelta(days=14))
async def async_run(self, start_ts, end_ts):
pbar = tqdm(total=end_ts - start_ts)
# allow registration before first opening
self.clock.set_time(start_ts - 1)
while self.clock.time < end_ts:
await asyncio.sleep(0)
self.clock.set_time(self.clock.time + 600)
# normal agent stuff happening on schedules
pbar.update(600)
pbar.set_description(
repr(datetime.utcfromtimestamp(self.clock.time)),
refresh=False,
)
await asyncio.sleep(0.01)
pbar.close()
# # the above loop is only for better understanding of what happens.
# # same can be achieved with the below line, which schedules in the future
# self.caller.schedule_timestamp_task(
# coroutine=self.caller.send_hello_world(self.caller.addr, self.caller.aid),
# timestamp=self.caller.current_timestamp + 100,
# )
await asyncio.sleep(0.1)
await self.container.shutdown()
def run(self):
start_ts = calendar.timegm(self.start.utctimetuple())
end_ts = calendar.timegm(self.end.utctimetuple())
try:
return self.loop.run_until_complete(
self.async_run(start_ts=start_ts, end_ts=end_ts)
)
except KeyboardInterrupt:
pass
def test_func():
start = datetime(2019, 1, 1)
end = datetime(2019, 1, 1, 1)
w = World(start, end)
try:
w.loop.run_until_complete(w.setup())
w.run()
except asyncio.exceptions.CancelledError:
print("Catch it")
if __name__ == "__main__":
test_func()
# # direct execution works fine:
# start = datetime(2019, 1, 1)
# end = datetime(2019, 1, 1, 1)
# w = World(start, end)
# try:
# w.loop.run_until_complete(w.setup())
# w.run()
# except asyncio.exceptions.CancelledError:
# print("Catch it") results in:
It is very weird, that the function When running with
But this is still not good and can have a lot of lines if many tasks are aborted. |
or an even smaller example, derived from your example: from mango import Agent, create_container
import asyncio
class Caller(Agent):
def __init__(
self,
container,
suggested_aid: str = None,
suspendable_tasks=True,
observable_tasks=True,
):
super().__init__(container, suggested_aid, suspendable_tasks, observable_tasks)
self.schedule_timestamp_task(
coroutine=self.send_hello_world(self.addr, self.aid),
timestamp=self.current_timestamp + 5,
)
async def send_hello_world(self, receiver_addr, receiver_id):
await self.send_acl_message(
receiver_addr=receiver_addr, receiver_id=receiver_id, content="Hello World"
)
async def main():
from mango.util.clock import ExternalClock
container = await create_container(addr=("0.0.0.0", 9099), connection_type="tcp", clock=ExternalClock(0))
Caller(container, suspendable_tasks=False)
await asyncio.sleep(0)
await container.shutdown()
def sync_main():
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
if __name__ == "__main__":
import warnings
warnings.filterwarnings("ignore")
#warnings.filterwarnings("ignore", "coroutine.*?was never awaited.*")
sync_main() To clarify: i am calling the script directly from a terminal |
I see. I think I understand the core problem, which might cause this in real scenarios. Ok, I see three options:
with warnings.catch_warnings():
warnings.filterwarnings("ignore", "coroutine.*?was never awaited.*")
asyncio.run(...)
My opinion:
|
Thanks, I updated my minimal example above, so that it also uses externalclock, which does not run further and still gives:
even with warnings ignored.. Yes, I thought about 2. too, but wasn't to sure if this would be a valid solution |
This should be fixed when you replace loop.run_until_complete() with asyncio.run(). At least it works for me. Regarding, "coroutine 'Caller.send_hello_world' was never awaited", I think I'd prefer 3 at this point, just because of the API break 2 would cause. I'm putting it on my timetable for next week. |
If you have this example with pytest:
and run it with
pytest tests/unit_tests/util/scheduling_test.py
It responds with a lot of warnings:
I have a similar issue stopping a mango simulation which also has this problem.
Maybe we can improve the error handling here
I tried catching this in mango, but could not succeed.
Maybe you can have a look here?
The text was updated successfully, but these errors were encountered: