diff --git a/aries_cloudagent/messaging/task_queue.py b/aries_cloudagent/messaging/task_queue.py index f61fef4b66..4a513b182e 100644 --- a/aries_cloudagent/messaging/task_queue.py +++ b/aries_cloudagent/messaging/task_queue.py @@ -9,6 +9,8 @@ def task_exc_info(task: asyncio.Task): """Extract exception info from an asyncio task.""" + if not task or not task.done(): + return try: exc_val = task.exception() except asyncio.CancelledError: @@ -44,9 +46,9 @@ def __init__(self, max_active: int = 0): self.total_done = 0 self.total_failed = 0 self._cancelled = False + self._drain_evt = asyncio.Event() self._drain_task: asyncio.Task = None self._max_active = max_active - self._updated_evt = asyncio.Event() @property def cancelled(self) -> bool: @@ -88,23 +90,27 @@ def __len__(self) -> int: def drain(self) -> asyncio.Task: """Start the process to run queued tasks.""" - if self._drain_task: - self._updated_evt.set() + if self._drain_task and not self._drain_task.done(): + self._drain_evt.set() elif self.pending_tasks: self._drain_task = self.loop.create_task(self._drain_loop()) - self._drain_task.add_done_callback(lambda task: self._drain_done()) + self._drain_task.add_done_callback(lambda task: self._drain_done(task)) return self._drain_task - def _drain_done(self): + def _drain_done(self, task: asyncio.Task): """Handle completion of the drain process.""" - self._drain_task = None + exc_info = task_exc_info(task) + if exc_info: + LOGGER.exception("Error draining task queue:", exc_info=exc_info) + if self._drain_task and self._drain_task.done(): + self._drain_task = None async def _drain_loop(self): """Run pending tasks while there is room in the queue.""" # Note: this method should not call async methods apart from # waiting for the updated event, to avoid yielding to other queue methods while True: - self._updated_evt.clear() + self._drain_evt.clear() while self.pending_tasks and ( not self._max_active or len(self.active_tasks) < self._max_active ): @@ -113,7 +119,7 @@ async def _drain_loop(self): if fut and not fut.done(): fut.set_result(task) if self.pending_tasks: - await self._updated_evt.wait() + await self._drain_evt.wait() else: break diff --git a/aries_cloudagent/transport/outbound/manager.py b/aries_cloudagent/transport/outbound/manager.py index b27775810b..cb8aed6d72 100644 --- a/aries_cloudagent/transport/outbound/manager.py +++ b/aries_cloudagent/transport/outbound/manager.py @@ -11,7 +11,7 @@ from ...classloader import ClassLoader, ModuleLoadError, ClassNotFoundError from ...connections.models.connection_target import ConnectionTarget from ...config.injection_context import InjectionContext -from ...messaging.task_queue import CompletedTask, TaskQueue +from ...messaging.task_queue import CompletedTask, TaskQueue, task_exc_info from ...stats import Collector from ..wire_format import BaseWireFormat @@ -283,16 +283,22 @@ def process_queued(self) -> asyncio.Task: Returns: the current queue processing task or None """ - if self._process_task: + if self._process_task and not self._process_task.done(): self.outbound_event.set() elif self.outbound_new or self.outbound_buffer: self._process_task = self.loop.create_task(self._process_loop()) - self._process_task.add_done_callback(lambda task: self._process_done()) + self._process_task.add_done_callback(lambda task: self._process_done(task)) return self._process_task - def _process_done(self): + def _process_done(self, task: asyncio.Task): """Handle completion of the drain process.""" - self._process_task = None + exc_info = task_exc_info(task) + if exc_info: + LOGGER.exception( + "Exception in outbound queue processing:", exc_info=exc_info + ) + if self._process_task and self._process_task.done(): + self._process_task = None async def _process_loop(self): """Continually kick off encoding and delivery on outbound messages.""" @@ -308,12 +314,13 @@ async def _process_loop(self): if queued.state == QueuedOutboundMessage.STATE_DONE: if queued.error: LOGGER.exception( - "Outbound message could not be delivered", + "Outbound message could not be delivered to %s", + queued.endpoint, exc_info=queued.error, ) - if self.handle_not_delivered: - self.handle_not_delivered(queued.context, queued.message) - continue + if self.handle_not_delivered: + self.handle_not_delivered(queued.context, queued.message) + continue # remove from buffer deliver = False @@ -330,24 +337,28 @@ async def _process_loop(self): upd_buffer.append(queued) - new_messages = self.outbound_new.copy() + new_pending = 0 + new_messages = self.outbound_new self.outbound_new = [] - for queued in new_messages: + for queued in new_messages: if queued.state == QueuedOutboundMessage.STATE_NEW: if queued.message and queued.message.enc_payload: queued.payload = queued.message.enc_payload - queued.state = QueuedOutboundMessage.STATE_DELIVER - self.deliver_queued_message(queued) + queued.state = QueuedOutboundMessage.STATE_PENDING + new_pending += 1 else: queued.state = QueuedOutboundMessage.STATE_ENCODE self.encode_queued_message(queued) + else: + new_pending += 1 upd_buffer.append(queued) self.outbound_buffer = upd_buffer if self.outbound_buffer: - await self.outbound_event.wait() + if not new_pending: + await self.outbound_event.wait() else: break @@ -396,6 +407,10 @@ def finished_deliver(self, queued: QueuedOutboundMessage, completed: CompletedTa """Handle completion of queued message delivery.""" if completed.exc_info: queued.error = completed.exc_info + LOGGER.exception( + "Outbound message could not be delivered", exc_info=queued.error, + ) + if queued.retries: queued.retries -= 1 queued.state = QueuedOutboundMessage.STATE_RETRY