From cd7f781c022dd1d1ec3c6695a0fd6ab3ce864fd5 Mon Sep 17 00:00:00 2001 From: Miguel Grinberg Date: Thu, 16 Mar 2023 16:22:23 +0000 Subject: [PATCH] Made aio_pika client manager more robust and efficient (Fixes #1142) --- src/socketio/asyncio_aiopika_manager.py | 98 +++++++++++++++---------- 1 file changed, 59 insertions(+), 39 deletions(-) diff --git a/src/socketio/asyncio_aiopika_manager.py b/src/socketio/asyncio_aiopika_manager.py index eff3f8c8..c6d71379 100644 --- a/src/socketio/asyncio_aiopika_manager.py +++ b/src/socketio/asyncio_aiopika_manager.py @@ -44,9 +44,10 @@ def __init__(self, url='amqp://guest:guest@localhost:5672//', '(Run "pip install aio_pika" in your ' 'virtualenv).') self.url = url - self.listener_connection = None - self.listener_channel = None - self.listener_queue = None + self._lock = asyncio.Lock() + self.publisher_connection = None + self.publisher_channel = None + self.publisher_exchange = None super().__init__(channel=channel, write_only=write_only, logger=logger) async def _connection(self): @@ -66,41 +67,60 @@ async def _queue(self, channel, exchange): return queue async def _publish(self, data): - connection = await self._connection() - channel = await self._channel(connection) - exchange = await self._exchange(channel) - await exchange.publish( - aio_pika.Message(body=pickle.dumps(data), - delivery_mode=aio_pika.DeliveryMode.PERSISTENT), - routing_key='*' - ) - - async def _listen(self): - retry_sleep = 1 - while True: - try: - if self.listener_connection is None: - self.listener_connection = await self._connection() - self.listener_channel = await self._channel( - self.listener_connection + if self.publisher_connection is None: + async with self._lock: + if self.publisher_connection is None: + self.publisher_connection = await self._connection() + self.publisher_channel = await self._channel( + self.publisher_connection ) - await self.listener_channel.set_qos(prefetch_count=1) - exchange = await self._exchange(self.listener_channel) - self.listener_queue = await self._queue( - self.listener_channel, exchange + self.publisher_exchange = await self._exchange( + self.publisher_channel ) - retry_sleep = 1 - - async with self.listener_queue.iterator() as queue_iter: - async for message in queue_iter: - async with message.process(): - yield pickle.loads(message.body) - except Exception: - self._get_logger().error('Cannot receive from rabbitmq... ' - 'retrying in ' - '{} secs'.format(retry_sleep)) - self.listener_connection = None - await asyncio.sleep(retry_sleep) - retry_sleep *= 2 - if retry_sleep > 60: - retry_sleep = 60 + retry = True + while True: + try: + await self.publisher_exchange.publish( + aio_pika.Message( + body=pickle.dumps(data), + delivery_mode=aio_pika.DeliveryMode.PERSISTENT + ), routing_key='*', + ) + return + except aio_pika.AMQPException: + if retry: + self._get_logger().error('Cannot publish to rabbitmq... ' + 'retrying') + retry = False + else: + self._get_logger().error( + 'Cannot publish to rabbitmq... giving up') + break + except aio_pika.exceptions.ChannelInvalidStateError: + # aio_pika raises this exception when the task is cancelled + raise asyncio.CancelledError() + + async def _listen(self): + async with (await self._connection()) as connection: + channel = await self._channel(connection) + await channel.set_qos(prefetch_count=1) + exchange = await self._exchange(channel) + queue = await self._queue(channel, exchange) + + retry_sleep = 1 + while True: + try: + async with queue.iterator() as queue_iter: + async for message in queue_iter: + async with message.process(): + yield pickle.loads(message.body) + retry_sleep = 1 + except aio_pika.AMQPException: + self._get_logger().error( + 'Cannot receive from rabbitmq... ' + 'retrying in {} secs'.format(retry_sleep)) + await asyncio.sleep(retry_sleep) + retry_sleep = min(retry_sleep * 2, 60) + except aio_pika.exceptions.ChannelInvalidStateError: + # aio_pika raises this exception when the task is cancelled + raise asyncio.CancelledError()