From e3701a61e44f6e82921da51f772c3ebec014402d Mon Sep 17 00:00:00 2001 From: Vladimir Kamarzin Date: Wed, 18 Aug 2021 16:40:14 +0500 Subject: [PATCH] Fix asyncio.CancelledError handling in Sender._fail_all() (#711) * Fix asyncio.CancelledError handling in Sender._fail_all() Closes: #710 * Fix long line * Refactor task cancel handling Co-authored-by: Denis Otkidach Co-authored-by: Denis Otkidach --- CHANGES/710.bugfix | 1 + aiokafka/producer/sender.py | 10 +++++++--- 2 files changed, 8 insertions(+), 3 deletions(-) create mode 100644 CHANGES/710.bugfix diff --git a/CHANGES/710.bugfix b/CHANGES/710.bugfix new file mode 100644 index 00000000..4065cc5b --- /dev/null +++ b/CHANGES/710.bugfix @@ -0,0 +1 @@ +Fix asyncio.CancelledError handling in Sender._fail_all() diff --git a/aiokafka/producer/sender.py b/aiokafka/producer/sender.py index 4e36ca4b..f3355b41 100644 --- a/aiokafka/producer/sender.py +++ b/aiokafka/producer/sender.py @@ -59,10 +59,14 @@ def _fail_all(self, task): """ Called when sender fails. Will fail all pending batches, as they will never be delivered as well as fail transaction """ - if task.exception() is not None: - self._message_accumulator.fail_all(task.exception()) + if task.cancelled(): + return + task_exception = task.exception() + + if task_exception is not None: + self._message_accumulator.fail_all(task_exception) if self._txn_manager is not None: - self._txn_manager.fatal_error(task.exception()) + self._txn_manager.fatal_error(task_exception) @property def sender_task(self):