Skip to content

Commit

Permalink
fix occasional issue delivering the last message in a sequence
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Whitehead <[email protected]>
  • Loading branch information
andrewwhitehead committed Nov 26, 2019
1 parent 5842f7a commit 7877fd5
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 22 deletions.
22 changes: 14 additions & 8 deletions aries_cloudagent/messaging/task_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

def task_exc_info(task: asyncio.Task):
"""Extract exception info from an asyncio task."""
if not task or not task.done():
return
try:
exc_val = task.exception()
except asyncio.CancelledError:
Expand Down Expand Up @@ -44,9 +46,9 @@ def __init__(self, max_active: int = 0):
self.total_done = 0
self.total_failed = 0
self._cancelled = False
self._drain_evt = asyncio.Event()
self._drain_task: asyncio.Task = None
self._max_active = max_active
self._updated_evt = asyncio.Event()

@property
def cancelled(self) -> bool:
Expand Down Expand Up @@ -88,23 +90,27 @@ def __len__(self) -> int:

def drain(self) -> asyncio.Task:
"""Start the process to run queued tasks."""
if self._drain_task:
self._updated_evt.set()
if self._drain_task and not self._drain_task.done():
self._drain_evt.set()
elif self.pending_tasks:
self._drain_task = self.loop.create_task(self._drain_loop())
self._drain_task.add_done_callback(lambda task: self._drain_done())
self._drain_task.add_done_callback(lambda task: self._drain_done(task))
return self._drain_task

def _drain_done(self):
def _drain_done(self, task: asyncio.Task):
"""Handle completion of the drain process."""
self._drain_task = None
exc_info = task_exc_info(task)
if exc_info:
LOGGER.exception("Error draining task queue:", exc_info=exc_info)
if self._drain_task and self._drain_task.done():
self._drain_task = None

async def _drain_loop(self):
"""Run pending tasks while there is room in the queue."""
# Note: this method should not call async methods apart from
# waiting for the updated event, to avoid yielding to other queue methods
while True:
self._updated_evt.clear()
self._drain_evt.clear()
while self.pending_tasks and (
not self._max_active or len(self.active_tasks) < self._max_active
):
Expand All @@ -113,7 +119,7 @@ async def _drain_loop(self):
if fut and not fut.done():
fut.set_result(task)
if self.pending_tasks:
await self._updated_evt.wait()
await self._drain_evt.wait()
else:
break

Expand Down
43 changes: 29 additions & 14 deletions aries_cloudagent/transport/outbound/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from ...classloader import ClassLoader, ModuleLoadError, ClassNotFoundError
from ...connections.models.connection_target import ConnectionTarget
from ...config.injection_context import InjectionContext
from ...messaging.task_queue import CompletedTask, TaskQueue
from ...messaging.task_queue import CompletedTask, TaskQueue, task_exc_info
from ...stats import Collector

from ..wire_format import BaseWireFormat
Expand Down Expand Up @@ -283,16 +283,22 @@ def process_queued(self) -> asyncio.Task:
Returns: the current queue processing task or None
"""
if self._process_task:
if self._process_task and not self._process_task.done():
self.outbound_event.set()
elif self.outbound_new or self.outbound_buffer:
self._process_task = self.loop.create_task(self._process_loop())
self._process_task.add_done_callback(lambda task: self._process_done())
self._process_task.add_done_callback(lambda task: self._process_done(task))
return self._process_task

def _process_done(self):
def _process_done(self, task: asyncio.Task):
"""Handle completion of the drain process."""
self._process_task = None
exc_info = task_exc_info(task)
if exc_info:
LOGGER.exception(
"Exception in outbound queue processing:", exc_info=exc_info
)
if self._process_task and self._process_task.done():
self._process_task = None

async def _process_loop(self):
"""Continually kick off encoding and delivery on outbound messages."""
Expand All @@ -308,12 +314,13 @@ async def _process_loop(self):
if queued.state == QueuedOutboundMessage.STATE_DONE:
if queued.error:
LOGGER.exception(
"Outbound message could not be delivered",
"Outbound message could not be delivered to %s",
queued.endpoint,
exc_info=queued.error,
)
if self.handle_not_delivered:
self.handle_not_delivered(queued.context, queued.message)
continue
if self.handle_not_delivered:
self.handle_not_delivered(queued.context, queued.message)
continue # remove from buffer

deliver = False

Expand All @@ -330,24 +337,28 @@ async def _process_loop(self):

upd_buffer.append(queued)

new_messages = self.outbound_new.copy()
new_pending = 0
new_messages = self.outbound_new
self.outbound_new = []
for queued in new_messages:

for queued in new_messages:
if queued.state == QueuedOutboundMessage.STATE_NEW:
if queued.message and queued.message.enc_payload:
queued.payload = queued.message.enc_payload
queued.state = QueuedOutboundMessage.STATE_DELIVER
self.deliver_queued_message(queued)
queued.state = QueuedOutboundMessage.STATE_PENDING
new_pending += 1
else:
queued.state = QueuedOutboundMessage.STATE_ENCODE
self.encode_queued_message(queued)
else:
new_pending += 1

upd_buffer.append(queued)

self.outbound_buffer = upd_buffer
if self.outbound_buffer:
await self.outbound_event.wait()
if not new_pending:
await self.outbound_event.wait()
else:
break

Expand Down Expand Up @@ -396,6 +407,10 @@ def finished_deliver(self, queued: QueuedOutboundMessage, completed: CompletedTa
"""Handle completion of queued message delivery."""
if completed.exc_info:
queued.error = completed.exc_info
LOGGER.exception(
"Outbound message could not be delivered", exc_info=queued.error,
)

if queued.retries:
queued.retries -= 1
queued.state = QueuedOutboundMessage.STATE_RETRY
Expand Down

0 comments on commit 7877fd5

Please sign in to comment.