Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Try to reuse already created connection and channel while emitting from external processes #935

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions src/socketio/asyncio_aiopika_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,28 @@ def __init__(self, url='amqp://guest:guest@localhost:5672//',
self.listener_connection = None
self.listener_channel = None
self.listener_queue = None

self.aiopika_connection = None
self.aiopika_channel = None
self.aiopika_exchange = None

super().__init__(channel=channel, write_only=write_only, logger=logger)

async def _connection(self):
return await aio_pika.connect_robust(self.url)
if (self.aiopika_connection is None or self.aiopika_connection.is_closed is True):
return await aio_pika.connect_robust(self.url)
return self.aiopika_connection
Comment on lines +58 to +60

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should have a lock around it as if there are several calls in close succession, it'll create several new connections. See #1142 for an example of this in use.


async def _channel(self, connection):
return await connection.channel()
if self.aiopika_channel is None:
return await connection.channel()
return self.aiopika_channel

async def _exchange(self, channel):
return await channel.declare_exchange(self.channel,
if self.aiopika_exchange is None:
return await channel.declare_exchange(self.channel,
aio_pika.ExchangeType.FANOUT)
return self.aiopika_exchange

async def _queue(self, channel, exchange):
queue = await channel.declare_queue(durable=False,
Expand All @@ -66,10 +77,10 @@ async def _queue(self, channel, exchange):
return queue

async def _publish(self, data):
connection = await self._connection()
channel = await self._channel(connection)
exchange = await self._exchange(channel)
await exchange.publish(
self.aiopika_connection = await self._connection()
self.aiopika_channel = await self._channel(self.aiopika_connection)
self.aiopika_exchange = await self._exchange(self.aiopika_channel)
await self.aiopika_exchange.publish(
aio_pika.Message(body=pickle.dumps(data),
delivery_mode=aio_pika.DeliveryMode.PERSISTENT),
routing_key='*'
Expand Down