Skip to content

Commit

Permalink
Try to reuse already established and created connection and channel w…
Browse files Browse the repository at this point in the history
…hile emitting from external processes
  • Loading branch information
theDigitalGuy committed Jun 16, 2022
1 parent 5b91346 commit 4ff70fb
Showing 1 changed file with 18 additions and 7 deletions.
25 changes: 18 additions & 7 deletions src/socketio/asyncio_aiopika_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,28 @@ def __init__(self, url='amqp://guest:guest@localhost:5672//',
self.listener_connection = None
self.listener_channel = None
self.listener_queue = None

self.aiopika_connection = None
self.aiopika_channel = None
self.aiopika_exchange = None

super().__init__(channel=channel, write_only=write_only, logger=logger)

async def _connection(self):
return await aio_pika.connect_robust(self.url)
if (self.aiopika_connection is None or self.aiopika_connection.is_closed is False):
return await aio_pika.connect_robust(self.url)
return self.aiopika_connection

async def _channel(self, connection):
return await connection.channel()
if self.aiopika_channel is None:
return await connection.channel()
return self.aiopika_channel

async def _exchange(self, channel):
return await channel.declare_exchange(self.channel,
if self.aiopika_exchange is None:
return await channel.declare_exchange(self.channel,
aio_pika.ExchangeType.FANOUT)
return self.aiopika_exchange

async def _queue(self, channel, exchange):
queue = await channel.declare_queue(durable=False,
Expand All @@ -66,10 +77,10 @@ async def _queue(self, channel, exchange):
return queue

async def _publish(self, data):
connection = await self._connection()
channel = await self._channel(connection)
exchange = await self._exchange(channel)
await exchange.publish(
self.aiopika_connection = await self._connection()
self.aiopika_channel = await self._channel(self.aiopika_connection)
self.aiopika_exchange = await self._exchange(self.aiopika_channel)
await self.aiopika_exchange.publish(
aio_pika.Message(body=pickle.dumps(data),
delivery_mode=aio_pika.DeliveryMode.PERSISTENT),
routing_key='*'
Expand Down

1 comment on commit 4ff70fb

@theDigitalGuy
Copy link
Owner Author

@theDigitalGuy theDigitalGuy commented on 4ff70fb Jun 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.external_sio = AsyncAioPikaManager(url='amqp://guest:guest@rabbitmq:5672//', write_only=True)
await self.external_sio.emit('info', data=info_data)

I was having the problem, that my script, that was emitting socketio messages from an external process, started to hang after ~832 messages.

The rabbitmq log showed:

[warn] <0.352.0> file descriptor limit alarm set.
[warn] <0.352.0> 
[warn] <0.352.0> ********************************************************************
[warn] <0.352.0> *** New connections will not be accepted until this alarm clears ***
[warn] <0.352.0> ********************************************************************

Apparently the maximum number of connections configured in the rabbitmq server was reached.

File Descriptors
Total: 51, limit: 927
Sockets: 48, limit: 832

Then I modified the asyncio_aiopika_manager.py to reuse an old connection, if possible. Then the script hang after emitting 2047 messages.
[erro] <0.25229.47> operation none caused a connection exception not_allowed: "number of channels opened (2047) has reached the negotiated channel_max (2047)"

Now the script also tries to reuse the channel and the exchange. Not entirely sure, whether the latter is necessary.

This commit contains my first attempt to fix the issue of opening many connections and channels without closing or reusing them. Any suggestions are welcome.

Additionally, it would probably be a good idea to use the same methods to reuse existing connections for the listener_connection and the connection used in write_only mode.

Please sign in to comment.