Skip to content

Commit

Permalink
Migrate async Redis client manager to aioredis 2.x (Fixes #771)
Browse files Browse the repository at this point in the history
  • Loading branch information
sam-mosleh authored and miguelgrinberg committed Oct 17, 2021
1 parent 4f5bf1e commit f245191
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 157 deletions.
2 changes: 1 addition & 1 deletion src/socketio/asyncio_aiopika_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ async def _listen(self):
async with self.listener_queue.iterator() as queue_iter:
async for message in queue_iter:
with message.process():
return pickle.loads(message.body)
yield pickle.loads(message.body)
except Exception:
self._get_logger().error('Cannot receive from rabbitmq... '
'retrying in '
Expand Down
55 changes: 27 additions & 28 deletions src/socketio/asyncio_pubsub_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,35 +148,34 @@ async def _handle_close_room(self, message):
async def _thread(self):
while True:
try:
message = await self._listen()
async for message in self._listen(): # pragma: no branch
data = None
if isinstance(message, dict):
data = message
else:
if isinstance(message, bytes): # pragma: no cover
try:
data = pickle.loads(message)
except:
pass
if data is None:
try:
data = json.loads(message)
except:
pass
if data and 'method' in data:
self._get_logger().info('pubsub message: {}'.format(
data['method']))
if data['method'] == 'emit':
await self._handle_emit(data)
elif data['method'] == 'callback':
await self._handle_callback(data)
elif data['method'] == 'disconnect':
await self._handle_disconnect(data)
elif data['method'] == 'close_room':
await self._handle_close_room(data)
except asyncio.CancelledError: # pragma: no cover
break
except:
except: # pragma: no cover
import traceback
traceback.print_exc()
break
data = None
if isinstance(message, dict):
data = message
else:
if isinstance(message, bytes): # pragma: no cover
try:
data = pickle.loads(message)
except:
pass
if data is None:
try:
data = json.loads(message)
except:
pass
if data and 'method' in data:
self._get_logger().info('pubsub message: {}'.format(
data['method']))
if data['method'] == 'emit':
await self._handle_emit(data)
elif data['method'] == 'callback':
await self._handle_callback(data)
elif data['method'] == 'disconnect':
await self._handle_disconnect(data)
elif data['method'] == 'close_room':
await self._handle_close_room(data)
90 changes: 42 additions & 48 deletions src/socketio/asyncio_redis_manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import asyncio
import pickle
from urllib.parse import urlparse

try:
import aioredis
Expand All @@ -10,34 +9,18 @@
from .asyncio_pubsub_manager import AsyncPubSubManager


def _parse_redis_url(url):
p = urlparse(url)
if p.scheme not in {'redis', 'rediss'}:
raise ValueError('Invalid redis url')
ssl = p.scheme == 'rediss'
host = p.hostname or 'localhost'
port = p.port or 6379
password = p.password
if p.path:
db = int(p.path[1:])
else:
db = 0
return host, port, password, db, ssl


class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover
"""Redis based client manager for asyncio servers.
This class implements a Redis backend for event sharing across multiple
processes. Only kept here as one more example of how to build a custom
backend, since the kombu backend is perfectly adequate to support a Redis
message queue.
processes.
To use a Redis backend, initialize the :class:`Server` instance as
To use a Redis backend, initialize the :class:`AsyncServer` instance as
follows::
server = socketio.Server(client_manager=socketio.AsyncRedisManager(
'redis://hostname:port/0'))
url = 'redis://hostname:port/0'
server = socketio.AsyncServer(
client_manager=socketio.AsyncRedisManager(url))
:param url: The connection URL for the Redis server. For a default Redis
store running on the same host, use ``redis://``. To use an
Expand All @@ -47,62 +30,73 @@ class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover
:param write_only: If set to ``True``, only initialize to emit events. The
default of ``False`` initializes the class for emitting
and receiving.
:param redis_options: additional keyword arguments to be passed to
``aioredis.from_url()``.
"""
name = 'aioredis'

def __init__(self, url='redis://localhost:6379/0', channel='socketio',
write_only=False, logger=None):
write_only=False, logger=None, redis_options=None):
if aioredis is None:
raise RuntimeError('Redis package is not installed '
'(Run "pip install aioredis" in your '
'virtualenv).')
(
self.host, self.port, self.password, self.db, self.ssl
) = _parse_redis_url(url)
self.pub = None
self.sub = None
if not hasattr(aioredis.Redis, 'from_url'):
raise RuntimeError('Version 2 of aioredis package is required.')
self.redis_url = url
self.redis_options = redis_options or {}
self._redis_connect()
super().__init__(channel=channel, write_only=write_only, logger=logger)

def _redis_connect(self):
self.redis = aioredis.Redis.from_url(self.redis_url,
**self.redis_options)
self.pubsub = self.redis.pubsub()

async def _publish(self, data):
retry = True
while True:
try:
if self.pub is None:
self.pub = await aioredis.create_redis(
(self.host, self.port), db=self.db,
password=self.password, ssl=self.ssl
)
return await self.pub.publish(self.channel,
pickle.dumps(data))
except (aioredis.RedisError, OSError):
if not retry:
self._redis_connect()
return await self.redis.publish(
self.channel, pickle.dumps(data))
except aioredis.exceptions.RedisError:
if retry:
self._get_logger().error('Cannot publish to redis... '
'retrying')
self.pub = None
retry = False
else:
self._get_logger().error('Cannot publish to redis... '
'giving up')
break

async def _listen(self):
async def _redis_listen_with_retries(self):
retry_sleep = 1
connect = False
while True:
try:
if self.sub is None:
self.sub = await aioredis.create_redis(
(self.host, self.port), db=self.db,
password=self.password, ssl=self.ssl
)
self.ch = (await self.sub.subscribe(self.channel))[0]
retry_sleep = 1
return await self.ch.get()
except (aioredis.RedisError, OSError):
if connect:
self._redis_connect()
await self.pubsub.subscribe(self.channel)
retry_sleep = 1
async for message in self.pubsub.listen():
yield message
except aioredis.exceptions.RedisError:
self._get_logger().error('Cannot receive from redis... '
'retrying in '
'{} secs'.format(retry_sleep))
self.sub = None
connect = True
await asyncio.sleep(retry_sleep)
retry_sleep *= 2
if retry_sleep > 60:
retry_sleep = 60

async def _listen(self):
channel = self.channel.encode('utf-8')
await self.pubsub.subscribe(self.channel)
async for message in self._redis_listen_with_retries():
if message['channel'] == channel and \
message['type'] == 'message' and 'data' in message:
yield message['data']
await self.pubsub.unsubscribe(self.channel)
3 changes: 2 additions & 1 deletion src/socketio/redis_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ class RedisManager(PubSubManager): # pragma: no cover
server = socketio.Server(client_manager=socketio.RedisManager(url))
:param url: The connection URL for the Redis server. For a default Redis
store running on the same host, use ``redis://``.
store running on the same host, use ``redis://``. To use an
SSL connection, use ``rediss://``.
:param channel: The channel name on which the server sends and receives
notifications. Must be the same in all the servers.
:param write_only: If set to ``True``, only initialize to emit events. The
Expand Down
10 changes: 4 additions & 6 deletions tests/asyncio/test_asyncio_pubsub_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ def test_background_thread(self):
self.pm._handle_disconnect = AsyncMock()
self.pm._handle_close_room = AsyncMock()

def messages():
async def messages():
import pickle

yield {'method': 'emit', 'value': 'foo'}
Expand All @@ -428,12 +428,10 @@ def messages():
yield pickle.dumps({'method': 'close_room', 'value': 'baz'})
yield 'bad json'
yield b'bad pickled'
raise asyncio.CancelledError() # force the thread to exit

self.pm._listen = AsyncMock(side_effect=list(messages()))
try:
_run(self.pm._thread())
except StopIteration:
pass
self.pm._listen = messages
_run(self.pm._thread())

self.pm._handle_emit.mock.assert_called_once_with(
{'method': 'emit', 'value': 'foo'}
Expand Down
73 changes: 0 additions & 73 deletions tests/asyncio/test_asyncio_redis_manager.py

This file was deleted.

0 comments on commit f245191

Please sign in to comment.