diff --git a/socketio/asyncio_manager.py b/socketio/asyncio_manager.py index f4496ec7..55032abc 100644 --- a/socketio/asyncio_manager.py +++ b/socketio/asyncio_manager.py @@ -5,6 +5,9 @@ class AsyncManager(BaseManager): """Manage a client list for an asyncio server.""" + async def can_disconnect(self, sid, namespace): + return self.is_connected(sid, namespace) + async def emit(self, event, data, namespace, room=None, skip_sid=None, callback=None, **kwargs): """Emit a message to a single client, a room, or all the clients diff --git a/socketio/asyncio_pubsub_manager.py b/socketio/asyncio_pubsub_manager.py index 6fdba6d0..8088d418 100644 --- a/socketio/asyncio_pubsub_manager.py +++ b/socketio/asyncio_pubsub_manager.py @@ -69,6 +69,19 @@ async def emit(self, event, data, namespace=None, room=None, skip_sid=None, 'skip_sid': skip_sid, 'callback': callback, 'host_id': self.host_id}) + async def can_disconnect(self, sid, namespace): + await self._publish({'method': 'disconnect', 'sid': sid, + 'namespace': namespace or '/'}) + + async def disconnect(self, sid, namespace=None): + """Disconnect a client.""" + # this is a bit weird, the can_disconnect call on pubsub managers just + # issues a disconnect request to the message queue and returns None, + # indicating that the client cannot disconnect immediately. The + # server(s) listening on the queue will get this request and carry out + # the disconnect appropriately. + await self.can_disconnect(sid, namespace) + async def close_room(self, room, namespace=None): await self._publish({'method': 'close_room', 'room': room, 'namespace': namespace or '/'}) @@ -128,6 +141,11 @@ async def _return_callback(self, host_id, sid, namespace, callback_id, 'sid': sid, 'namespace': namespace, 'id': callback_id, 'args': args}) + async def _handle_disconnect(self, message): + await self.server.disconnect(sid=message.get('sid'), + namespace=message.get('namespace'), + ignore_queue=True) + async def _handle_close_room(self, message): await super().close_room( room=message.get('room'), namespace=message.get('namespace')) @@ -155,9 +173,13 @@ async def _thread(self): except: pass if data and 'method' in data: + self._get_logger().info('pubsub message: {}'.format( + data['method'])) if data['method'] == 'emit': await self._handle_emit(data) elif data['method'] == 'callback': await self._handle_callback(data) + elif data['method'] == 'disconnect': + await self._handle_disconnect(data) elif data['method'] == 'close_room': await self._handle_close_room(data) diff --git a/socketio/asyncio_server.py b/socketio/asyncio_server.py index 9b41a698..1e4a74ae 100644 --- a/socketio/asyncio_server.py +++ b/socketio/asyncio_server.py @@ -297,17 +297,23 @@ async def __aexit__(self, *args): return _session_context_manager(self, sid, namespace) - async def disconnect(self, sid, namespace=None): + async def disconnect(self, sid, namespace=None, ignore_queue=False): """Disconnect a client. :param sid: Session ID of the client. :param namespace: The Socket.IO namespace to disconnect. If this argument is omitted the default namespace is used. + :param ignore_queue: Only used when a message queue is configured. If + set to ``True``, the disconnect is processed + locally, without broadcasting on the queue. It is + recommended to always leave this parameter with + its default value of ``False``. Note: this method is a coroutine. """ namespace = namespace or '/' - if self.manager.is_connected(sid, namespace=namespace): + if (ignore_queue and self.manager.is_connected(sid, namespace)) or \ + await self.manager.can_disconnect(sid, namespace): self.logger.info('Disconnecting %s [%s]', sid, namespace) self.manager.pre_disconnect(sid, namespace=namespace) await self._send_packet(sid, packet.Packet(packet.DISCONNECT, diff --git a/socketio/base_manager.py b/socketio/base_manager.py index 3cccb856..09264623 100644 --- a/socketio/base_manager.py +++ b/socketio/base_manager.py @@ -55,6 +55,9 @@ def is_connected(self, sid, namespace): except KeyError: pass + def can_disconnect(self, sid, namespace): + return self.is_connected(sid, namespace) + def pre_disconnect(self, sid, namespace): """Put the client in the to-be-disconnected list. diff --git a/socketio/pubsub_manager.py b/socketio/pubsub_manager.py index 2905b2c3..985087e4 100644 --- a/socketio/pubsub_manager.py +++ b/socketio/pubsub_manager.py @@ -67,6 +67,19 @@ def emit(self, event, data, namespace=None, room=None, skip_sid=None, 'skip_sid': skip_sid, 'callback': callback, 'host_id': self.host_id}) + def can_disconnect(self, sid, namespace): + self._publish({'method': 'disconnect', 'sid': sid, + 'namespace': namespace or '/'}) + + def disconnect(self, sid, namespace=None): + """Disconnect a client.""" + # this is a bit weird, the can_disconnect call on pubsub managers just + # issues a disconnect request to the message queue and returns None, + # indicating that the client cannot disconnect immediately. The + # server(s) listening on the queue will get this request and carry out + # the disconnect appropriately. + self.can_disconnect(sid, namespace) + def close_room(self, room, namespace=None): self._publish({'method': 'close_room', 'room': room, 'namespace': namespace or '/'}) @@ -125,6 +138,11 @@ def _return_callback(self, host_id, sid, namespace, callback_id, *args): 'sid': sid, 'namespace': namespace, 'id': callback_id, 'args': args}) + def _handle_disconnect(self, message): + self.server.disconnect(sid=message.get('sid'), + namespace=message.get('namespace'), + ignore_queue=True) + def _handle_close_room(self, message): super(PubSubManager, self).close_room( room=message.get('room'), namespace=message.get('namespace')) @@ -146,9 +164,13 @@ def _thread(self): except: pass if data and 'method' in data: + self._get_logger().info('pubsub message: {}'.format( + data['method'])) if data['method'] == 'emit': self._handle_emit(data) elif data['method'] == 'callback': self._handle_callback(data) + elif data['method'] == 'disconnect': + self._handle_disconnect(data) elif data['method'] == 'close_room': self._handle_close_room(data) diff --git a/socketio/server.py b/socketio/server.py index 24445261..4404fddf 100644 --- a/socketio/server.py +++ b/socketio/server.py @@ -492,15 +492,21 @@ def __exit__(self, *args): return _session_context_manager(self, sid, namespace) - def disconnect(self, sid, namespace=None): + def disconnect(self, sid, namespace=None, ignore_queue=False): """Disconnect a client. :param sid: Session ID of the client. :param namespace: The Socket.IO namespace to disconnect. If this argument is omitted the default namespace is used. + :param ignore_queue: Only used when a message queue is configured. If + set to ``True``, the disconnect is processed + locally, without broadcasting on the queue. It is + recommended to always leave this parameter with + its default value of ``False``. """ namespace = namespace or '/' - if self.manager.is_connected(sid, namespace=namespace): + if (ignore_queue and self.manager.is_connected(sid, namespace)) or \ + self.manager.can_disconnect(sid, namespace): self.logger.info('Disconnecting %s [%s]', sid, namespace) self.manager.pre_disconnect(sid, namespace=namespace) self._send_packet(sid, packet.Packet(packet.DISCONNECT, diff --git a/tests/asyncio/test_asyncio_pubsub_manager.py b/tests/asyncio/test_asyncio_pubsub_manager.py index 53e4b6bc..437c21e4 100644 --- a/tests/asyncio/test_asyncio_pubsub_manager.py +++ b/tests/asyncio/test_asyncio_pubsub_manager.py @@ -34,6 +34,7 @@ class TestAsyncPubSubManager(unittest.TestCase): def setUp(self): mock_server = mock.MagicMock() mock_server._emit_internal = AsyncMock() + mock_server.disconnect = AsyncMock() self.pm = asyncio_pubsub_manager.AsyncPubSubManager() self.pm._publish = AsyncMock() self.pm.set_server(mock_server) @@ -115,6 +116,11 @@ def test_emit_with_ignore_queue(self): self.pm.server._emit_internal.mock.assert_called_once_with( '123', 'foo', 'bar', '/', None) + def test_disconnect(self): + _run(self.pm.disconnect('123', '/foo')) + self.pm._publish.mock.assert_called_once_with( + {'method': 'disconnect', 'sid': '123', 'namespace': '/foo'}) + def test_close_room(self): _run(self.pm.close_room('foo')) self.pm._publish.mock.assert_called_once_with( @@ -142,7 +148,7 @@ def test_handle_emit_with_namespace(self): self.pm, 'foo', 'bar', namespace='/baz', room=None, skip_sid=None, callback=None) - def test_handle_emiti_with_room(self): + def test_handle_emit_with_room(self): with mock.patch.object(asyncio_manager.AsyncManager, 'emit', new=AsyncMock()) as super_emit: _run(self.pm._handle_emit({'event': 'foo', 'data': 'bar', @@ -216,6 +222,12 @@ def test_handle_callback_missing_args(self): 'host_id': host_id})) self.assertEqual(trigger.mock.call_count, 0) + def test_handle_disconnect(self): + _run(self.pm._handle_disconnect({'method': 'disconnect', 'sid': '123', + 'namespace': '/foo'})) + self.pm.server.disconnect.mock.assert_called_once_with( + sid='123', namespace='/foo', ignore_queue=True) + def test_handle_close_room(self): with mock.patch.object(asyncio_manager.AsyncManager, 'close_room', new=AsyncMock()) as super_close_room: @@ -236,6 +248,7 @@ def test_handle_close_room_with_namespace(self): def test_background_thread(self): self.pm._handle_emit = AsyncMock() self.pm._handle_callback = AsyncMock() + self.pm._handle_disconnect = AsyncMock() self.pm._handle_close_room = AsyncMock() def messages(): @@ -243,6 +256,7 @@ def messages(): yield {'method': 'emit', 'value': 'foo'} yield {'missing': 'method'} yield '{"method": "callback", "value": "bar"}' + yield {'method': 'disconnect', 'sid': '123', 'namespace': '/foo'} yield {'method': 'bogus'} yield pickle.dumps({'method': 'close_room', 'value': 'baz'}) yield 'bad json' @@ -258,5 +272,7 @@ def messages(): {'method': 'emit', 'value': 'foo'}) self.pm._handle_callback.mock.assert_called_once_with( {'method': 'callback', 'value': 'bar'}) + self.pm._handle_disconnect.mock.assert_called_once_with( + {'method': 'disconnect', 'sid': '123', 'namespace': '/foo'}) self.pm._handle_close_room.mock.assert_called_once_with( {'method': 'close_room', 'value': 'baz'}) diff --git a/tests/asyncio/test_asyncio_server.py b/tests/asyncio/test_asyncio_server.py index 29c702cd..1ed42e80 100644 --- a/tests/asyncio/test_asyncio_server.py +++ b/tests/asyncio/test_asyncio_server.py @@ -42,6 +42,7 @@ def tearDown(self): def _get_mock_manager(self): mgr = mock.MagicMock() + mgr.can_disconnect = AsyncMock() mgr.emit = AsyncMock() mgr.close_room = AsyncMock() mgr.trigger_callback = AsyncMock() diff --git a/tests/common/test_pubsub_manager.py b/tests/common/test_pubsub_manager.py index 295151ab..a8288f60 100644 --- a/tests/common/test_pubsub_manager.py +++ b/tests/common/test_pubsub_manager.py @@ -112,6 +112,11 @@ def test_emit_with_ignore_queue(self): self.pm.server._emit_internal.assert_called_once_with('123', 'foo', 'bar', '/', None) + def test_disconnect(self): + self.pm.disconnect('123', '/foo') + self.pm._publish.assert_called_once_with( + {'method': 'disconnect', 'sid': '123', 'namespace': '/foo'}) + def test_close_room(self): self.pm.close_room('foo') self.pm._publish.assert_called_once_with( @@ -137,7 +142,7 @@ def test_handle_emit_with_namespace(self): room=None, skip_sid=None, callback=None) - def test_handle_emiti_with_room(self): + def test_handle_emit_with_room(self): with mock.patch.object(base_manager.BaseManager, 'emit') as super_emit: self.pm._handle_emit({'event': 'foo', 'data': 'bar', 'room': 'baz'}) @@ -204,6 +209,13 @@ def test_handle_callback_missing_args(self): 'host_id': host_id}) self.assertEqual(trigger.call_count, 0) + def test_handle_disconnect(self): + self.pm._handle_disconnect({'method': 'disconnect', 'sid': '123', + 'namespace': '/foo'}) + self.pm.server.disconnect.assert_called_once_with(sid='123', + namespace='/foo', + ignore_queue=True) + def test_handle_close_room(self): with mock.patch.object(base_manager.BaseManager, 'close_room') \ as super_close_room: @@ -223,6 +235,7 @@ def test_handle_close_room_with_namespace(self): def test_background_thread(self): self.pm._handle_emit = mock.MagicMock() self.pm._handle_callback = mock.MagicMock() + self.pm._handle_disconnect = mock.MagicMock() self.pm._handle_close_room = mock.MagicMock() def messages(): @@ -230,6 +243,7 @@ def messages(): yield {'method': 'emit', 'value': 'foo'} yield {'missing': 'method'} yield '{"method": "callback", "value": "bar"}' + yield {'method': 'disconnect', 'sid': '123', 'namespace': '/foo'} yield {'method': 'bogus'} yield pickle.dumps({'method': 'close_room', 'value': 'baz'}) yield 'bad json' @@ -245,5 +259,7 @@ def messages(): {'method': 'emit', 'value': 'foo'}) self.pm._handle_callback.assert_called_once_with( {'method': 'callback', 'value': 'bar'}) + self.pm._handle_disconnect.assert_called_once_with( + {'method': 'disconnect', 'sid': '123', 'namespace': '/foo'}) self.pm._handle_close_room.assert_called_once_with( {'method': 'close_room', 'value': 'baz'})