You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
When doing a clean shutdown of an app using asyncio.run(my_app()), aiofakfa is sometimes failing to handle asyncio.exceptions.CancelledError. Here is the example:
^C2021-01-19 01:04:31,629 - asyncio(_cancel_all_tasks:61) - ERROR: Exception in callback <bound method Sender._fail_all of <aiokafka.producer.sender.Sender object at 0x7f0ec6542b20>>
handle: <Handle Sender._fail_all>
Traceback (most recent call last):
File "/home/vvk/miniconda3/envs/fetcher/lib/python3.8/asyncio/runners.py", line 43, in run
return loop.run_until_complete(main)
File "uvloop/loop.pyx", line 1450, in uvloop.loop.Loop.run_until_complete
File "uvloop/loop.pyx", line 1443, in uvloop.loop.Loop.run_until_complete
File "uvloop/loop.pyx", line 1351, in uvloop.loop.Loop.run_forever
File "uvloop/loop.pyx", line 519, in uvloop.loop.Loop._run
File "uvloop/loop.pyx", line 436, in uvloop.loop.Loop._on_idle
File "uvloop/cbhandles.pyx", line 90, in uvloop.loop.Handle._run
File "uvloop/cbhandles.pyx", line 70, in uvloop.loop.Handle._run
File "./bin/order_book_fetcher.py", line 87, in fetch_ws_messages
await fetcher.log_messages()
File "/home/vvk/devel/hummingbot-team/exchange-data-fetcher/fetcher/ws_fetcher.py", line 123, in log_messages
await producer.send(topic_name, item, timestamp_ms=timestamp)
File "/home/vvk/miniconda3/envs/fetcher/lib/python3.8/site-packages/aiokafka/producer/producer.py", line 439, in send
partition = self._partition(topic, partition, key, value,
File "/home/vvk/miniconda3/envs/fetcher/lib/python3.8/site-packages/aiokafka/producer/producer.py", line 366, in _partition
return self._partitioner(
File "/home/vvk/miniconda3/envs/fetcher/lib/python3.8/site-packages/kafka/partitioner/default.py", line 15, in __call__
@classmethod
KeyboardInterrupt
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "uvloop/cbhandles.pyx", line 70, in uvloop.loop.Handle._run
File "/home/vvk/miniconda3/envs/fetcher/lib/python3.8/site-packages/aiokafka/producer/sender.py", line 62, in _fail_all
if task.exception() is not None:
asyncio.exceptions.CancelledError
2021-01-19 01:04:40,965 - asyncio(__del__:282) - ERROR: Unclosed AIOKafkaProducer
producer: <aiokafka.producer.producer.AIOKafkaProducer object at 0x7f0ec65429d0>
After looking into aiokafka code I found that logic in aiokafka.producer.Sender._fail_all() does not account for raised exceptions here:
def_fail_all(self, task):
""" Called when sender fails. Will fail all pending batches, as they will never be delivered as well as fail transaction """iftask.exception() isnotNone: # <---- hereself._message_accumulator.fail_all(task.exception())
ifself._txn_managerisnotNone:
self._txn_manager.fatal_error(task.exception())
Describe the bug
When doing a clean shutdown of an app using
asyncio.run(my_app())
, aiofakfa is sometimes failing to handleasyncio.exceptions.CancelledError
. Here is the example:After looking into aiokafka code I found that logic in
aiokafka.producer.Sender._fail_all()
does not account for raised exceptions here:According to https://docs.python.org/3.9/library/asyncio-task.html#asyncio.Task.exception
exception()
method raisesCancelledError
orInvalidStateError
.So basically to fix the error,
CancelledError
exception handling should be added into_fail_all()
.The text was updated successfully, but these errors were encountered: