Skip to content

Commit

Permalink
Support for multiple servers
Browse files Browse the repository at this point in the history
  • Loading branch information
miguelgrinberg committed Oct 8, 2023
1 parent f9ea051 commit 26e728f
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 18 deletions.
31 changes: 23 additions & 8 deletions src/socketio/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ def __init__(self, sio, auth=None, namespace='/admin', read_only=False,

# start thread that emits "server_stats" every 2 seconds
self.stop_stats_event = sio.eio.create_event()
self.stats_task = None
self.stats_task = self.sio.start_background_task(
self._emit_server_stats)

def instrument(self):
self.sio.on('connect', self.admin_connect,
Expand Down Expand Up @@ -119,6 +120,12 @@ def instrument(self):
Socket._websocket_handler = functools.partialmethod(
self.__class__._eio_websocket_handler, self)

# report connected sockets with each ping
if self.mode == 'development':
Socket.__send_ping = Socket._send_ping
Socket._send_ping = functools.partialmethod(
self.__class__._eio_send_ping, self)

def uninstrument(self): # pragma: no cover
if self.mode == 'development':
self.sio.manager.connect = self.sio.manager.__connect
Expand All @@ -134,6 +141,7 @@ def uninstrument(self): # pragma: no cover
from engineio.socket import Socket
Socket.handle_post_request = Socket.__handle_post_request
Socket._websocket_handler = Socket.__websocket_handler
Socket.send_ping = Socket.__send_ping

def admin_connect(self, sid, environ, client_auth):
if self.auth:
Expand Down Expand Up @@ -172,9 +180,6 @@ def config(sid):
namespace=self.admin_namespace)

self.sio.start_background_task(config, sid)
if self.stats_task is None:
self.stats_task = self.sio.start_background_task(
self._emit_server_stats)

def admin_emit(self, _, namespace, room_filter, event, *data):
self.sio.emit(event, data, to=room_filter, namespace=namespace)
Expand Down Expand Up @@ -208,10 +213,6 @@ def _connect(self, eio_sid, namespace):
serialized_socket,
datetime.utcfromtimestamp(t).isoformat() + 'Z',
), namespace=self.admin_namespace)

if serialized_socket['transport'] == 'polling': # pragma: no cover
self.sio.start_background_task(
self._check_for_upgrade, eio_sid, sid, namespace)
return sid

def _disconnect(self, sid, namespace, **kwargs):
Expand Down Expand Up @@ -332,6 +333,20 @@ def _wait(ws):
ws.wait = functools.partial(_wait, ws)
return socket.__websocket_handler(ws)

def _eio_send_ping(socket, self):
eio_sid = socket.sid
t = time.time()
for namespace in self.sio.manager.get_namespaces():
sid = self.sio.manager.sid_from_eio_sid(eio_sid, namespace)
if sid:
serialized_socket = self.serialize_socket(sid, namespace,
eio_sid)
self.sio.emit('socket_connected', (
serialized_socket,
datetime.utcfromtimestamp(t).isoformat() + 'Z',
), namespace=self.admin_namespace)
return socket.__send_ping()

def _emit_server_stats(self):
start_time = time.time()
namespaces = list(self.sio.handlers.keys())
Expand Down
35 changes: 25 additions & 10 deletions src/socketio/async_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ def __init__(self, sio, auth=None, namespace='/admin', read_only=False,

# start thread that emits "server_stats" every 2 seconds
self.stop_stats_event = sio.eio.create_event()
self.stats_task = None
self.stats_task = self.sio.start_background_task(
self._emit_server_stats)

def instrument(self):
self.sio.on('connect', self.admin_connect,
Expand Down Expand Up @@ -88,7 +89,7 @@ def instrument(self):
self.sio.eio.on('disconnect', self._handle_eio_disconnect)

# report polling packets
from engineio.asyncio_socket import AsyncSocket
from engineio.async_socket import AsyncSocket
self.sio.eio.__ok = self.sio.eio._ok
self.sio.eio._ok = self._eio_http_response
AsyncSocket.__handle_post_request = AsyncSocket.handle_post_request
Expand All @@ -100,6 +101,12 @@ def instrument(self):
AsyncSocket._websocket_handler = functools.partialmethod(
self.__class__._eio_websocket_handler, self)

# report connected sockets with each ping
if self.mode == 'development':
AsyncSocket.__send_ping = AsyncSocket._send_ping
AsyncSocket._send_ping = functools.partialmethod(
self.__class__._eio_send_ping, self)

def uninstrument(self): # pragma: no cover
if self.mode == 'development':
self.sio.manager.connect = self.sio.manager.__connect
Expand All @@ -112,9 +119,10 @@ def uninstrument(self): # pragma: no cover
self.sio._handle_event_internal = self.sio.__handle_event_internal
self.sio.eio._ok = self.sio.eio.__ok

from engineio.asyncio_socket import AsyncSocket
from engineio.async_socket import AsyncSocket
AsyncSocket.handle_post_request = AsyncSocket.__handle_post_request
AsyncSocket._websocket_handler = AsyncSocket.__websocket_handler
AsyncSocket.schedule_ping = AsyncSocket.__schedule_ping

async def admin_connect(self, sid, environ, client_auth):
authenticated = True
Expand Down Expand Up @@ -157,9 +165,6 @@ async def config(sid):
namespace=self.admin_namespace)

self.sio.start_background_task(config, sid)
if self.stats_task is None:
self.stats_task = self.sio.start_background_task(
self._emit_server_stats)

async def admin_emit(self, _, namespace, room_filter, event, *data):
await self.sio.emit(event, data, to=room_filter, namespace=namespace)
Expand Down Expand Up @@ -193,10 +198,6 @@ async def _connect(self, eio_sid, namespace):
serialized_socket,
datetime.utcfromtimestamp(t).isoformat() + 'Z',
), namespace=self.admin_namespace)

if serialized_socket['transport'] == 'polling':
self.sio.start_background_task(
self._check_for_upgrade, eio_sid, sid, namespace)
return sid

async def _disconnect(self, sid, namespace, **kwargs):
Expand Down Expand Up @@ -318,6 +319,20 @@ async def _wait(ws):
ws.wait = functools.partial(_wait, ws)
return await socket.__websocket_handler(ws)

async def _eio_send_ping(socket, self):
eio_sid = socket.sid
t = time.time()
for namespace in self.sio.manager.get_namespaces():
sid = self.sio.manager.sid_from_eio_sid(eio_sid, namespace)
if sid:
serialized_socket = self.serialize_socket(sid, namespace,
eio_sid)
await self.sio.emit('socket_connected', (
serialized_socket,
datetime.utcfromtimestamp(t).isoformat() + 'Z',
), namespace=self.admin_namespace)
return await socket.__send_ping()

async def _emit_server_stats(self):
start_time = time.time()
namespaces = list(self.sio.handlers.keys())
Expand Down

0 comments on commit 26e728f

Please sign in to comment.