diff --git a/CHANGES/710.bugfix b/CHANGES/710.bugfix new file mode 100644 index 00000000..4065cc5b --- /dev/null +++ b/CHANGES/710.bugfix @@ -0,0 +1 @@ +Fix asyncio.CancelledError handling in Sender._fail_all() diff --git a/aiokafka/producer/sender.py b/aiokafka/producer/sender.py index 4e36ca4b..f3355b41 100644 --- a/aiokafka/producer/sender.py +++ b/aiokafka/producer/sender.py @@ -59,10 +59,14 @@ def _fail_all(self, task): """ Called when sender fails. Will fail all pending batches, as they will never be delivered as well as fail transaction """ - if task.exception() is not None: - self._message_accumulator.fail_all(task.exception()) + if task.cancelled(): + return + task_exception = task.exception() + + if task_exception is not None: + self._message_accumulator.fail_all(task_exception) if self._txn_manager is not None: - self._txn_manager.fatal_error(task.exception()) + self._txn_manager.fatal_error(task_exception) @property def sender_task(self):