Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed the problem of AsyncAioPikaManager repeatedly creating a connec… #692

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 19 additions & 26 deletions socketio/asyncio_aiopika_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,30 +46,32 @@ def __init__(self, url='amqp://guest:guest@localhost:5672//',
self.url = url
self.listener_connection = None
self.listener_channel = None
self.listener_exchange = None
self.listener_queue = None
super().__init__(channel=channel, write_only=write_only, logger=logger)

async def _connection(self):
return await aio_pika.connect_robust(self.url)
async def _initialize(self):
"""
initialization aio_pika connection, channel, exchange and queue

async def _channel(self, connection):
return await connection.channel()
"""
self.listener_connection = await aio_pika.connect_robust(url=self.url)
self.listener_channel = await self.listener_connection.channel()
self.listener_exchange = await self.listener_channel.declare_exchange(self.channel,
aio_pika.ExchangeType.FANOUT)

async def _exchange(self, channel):
return await channel.declare_exchange(self.channel,
aio_pika.ExchangeType.FANOUT)
if not self.write_only:
await self.listener_channel.set_qos(prefetch_count=1)
self.listener_queue = await self.listener_channel.declare_queue(durable=False)
await self.listener_queue.bind(self.listener_exchange)
AsyncPubSubManager.initialize(self)

async def _queue(self, channel, exchange):
queue = await channel.declare_queue(durable=False,
arguments={'x-expires': 300000})
await queue.bind(exchange)
return queue
def initialize(self):
loop = asyncio.get_event_loop()
loop.create_task(self._initialize())
Copy link
Owner

@miguelgrinberg miguelgrinberg May 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a really strange way to initialize. It is also bound to race conditions, since you also call _initialize() in the _listen() method. I mean, I understand why you are doing it, but it isn't a robust solution, I think.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you suggest? I want to do this well.


async def _publish(self, data):
connection = await self._connection()
channel = await self._channel(connection)
exchange = await self._exchange(channel)
await exchange.publish(
await self.listener_exchange.publish(
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will fail if the connection to the queue breaks and needs to be re-established. The Redis manage, which is the one that the vast majority of people use, has connection retries, maybe we want that here as well?

aio_pika.Message(body=pickle.dumps(data),
delivery_mode=aio_pika.DeliveryMode.PERSISTENT),
routing_key='*'
Expand All @@ -80,16 +82,7 @@ async def _listen(self):
while True:
try:
if self.listener_connection is None:
self.listener_connection = await self._connection()
self.listener_channel = await self._channel(
self.listener_connection
)
await self.listener_channel.set_qos(prefetch_count=1)
exchange = await self._exchange(self.listener_channel)
self.listener_queue = await self._queue(
self.listener_channel, exchange
)
retry_sleep = 1
await self._initialize()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is problematic, because you have one more _initialize() call above. Both will set the same variables.


async with self.listener_queue.iterator() as queue_iter:
async for message in queue_iter:
Expand Down