-
-
Notifications
You must be signed in to change notification settings - Fork 595
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixed the problem of AsyncAioPikaManager repeatedly creating a connec… #692
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -46,30 +46,32 @@ def __init__(self, url='amqp://guest:guest@localhost:5672//', | |
self.url = url | ||
self.listener_connection = None | ||
self.listener_channel = None | ||
self.listener_exchange = None | ||
self.listener_queue = None | ||
super().__init__(channel=channel, write_only=write_only, logger=logger) | ||
|
||
async def _connection(self): | ||
return await aio_pika.connect_robust(self.url) | ||
async def _initialize(self): | ||
""" | ||
initialization aio_pika connection, channel, exchange and queue | ||
|
||
async def _channel(self, connection): | ||
return await connection.channel() | ||
""" | ||
self.listener_connection = await aio_pika.connect_robust(url=self.url) | ||
self.listener_channel = await self.listener_connection.channel() | ||
self.listener_exchange = await self.listener_channel.declare_exchange(self.channel, | ||
aio_pika.ExchangeType.FANOUT) | ||
|
||
async def _exchange(self, channel): | ||
return await channel.declare_exchange(self.channel, | ||
aio_pika.ExchangeType.FANOUT) | ||
if not self.write_only: | ||
await self.listener_channel.set_qos(prefetch_count=1) | ||
self.listener_queue = await self.listener_channel.declare_queue(durable=False) | ||
await self.listener_queue.bind(self.listener_exchange) | ||
AsyncPubSubManager.initialize(self) | ||
|
||
async def _queue(self, channel, exchange): | ||
queue = await channel.declare_queue(durable=False, | ||
arguments={'x-expires': 300000}) | ||
await queue.bind(exchange) | ||
return queue | ||
def initialize(self): | ||
loop = asyncio.get_event_loop() | ||
loop.create_task(self._initialize()) | ||
|
||
async def _publish(self, data): | ||
connection = await self._connection() | ||
channel = await self._channel(connection) | ||
exchange = await self._exchange(channel) | ||
await exchange.publish( | ||
await self.listener_exchange.publish( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will fail if the connection to the queue breaks and needs to be re-established. The Redis manage, which is the one that the vast majority of people use, has connection retries, maybe we want that here as well? |
||
aio_pika.Message(body=pickle.dumps(data), | ||
delivery_mode=aio_pika.DeliveryMode.PERSISTENT), | ||
routing_key='*' | ||
|
@@ -80,16 +82,7 @@ async def _listen(self): | |
while True: | ||
try: | ||
if self.listener_connection is None: | ||
self.listener_connection = await self._connection() | ||
self.listener_channel = await self._channel( | ||
self.listener_connection | ||
) | ||
await self.listener_channel.set_qos(prefetch_count=1) | ||
exchange = await self._exchange(self.listener_channel) | ||
self.listener_queue = await self._queue( | ||
self.listener_channel, exchange | ||
) | ||
retry_sleep = 1 | ||
await self._initialize() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is problematic, because you have one more _initialize() call above. Both will set the same variables. |
||
|
||
async with self.listener_queue.iterator() as queue_iter: | ||
async for message in queue_iter: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a really strange way to initialize. It is also bound to race conditions, since you also call _initialize() in the _listen() method. I mean, I understand why you are doing it, but it isn't a robust solution, I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you suggest? I want to do this well.