From 57ca4d0ab012b0c8553766ba0e8ebf84e2bc3eb2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Benno=20F=C3=BCnfst=C3=BCck?= Date: Fri, 31 Aug 2018 17:57:12 +0200 Subject: [PATCH] fix race condition causing unreliable websocket upgrade Sending a single noop is not enough in all cases, since the client may start a new polling request right away before doing the websocket upgrade tasks. Before this fix the socketio javascript client could in some cases block for up to ping_timeout seconds (not sending or receiving any messages) during upgrade to websockets. This can be observed in the debug browser console logs from engine.io-client: ``` 15:17:42.456 engine.io-client:polling we are currently polling - waiting to pause +1ms browser.js:138 ... nothing for 60 seconds ... 15:18:42.247 The connection to ws://localhost:5000/ was interrupted while the page was loading. websocket.js:117 ``` The fix is to periodically send noops if the queue is empty so that no polling request blocks for a long time during upgrade. --- engineio/asyncio_socket.py | 17 ++++++++++++++++- engineio/socket.py | 17 ++++++++++++++++- tests/test_asyncio_socket.py | 1 + tests/test_socket.py | 1 + 4 files changed, 34 insertions(+), 2 deletions(-) diff --git a/engineio/asyncio_socket.py b/engineio/asyncio_socket.py index acef5135..63333348 100644 --- a/engineio/asyncio_socket.py +++ b/engineio/asyncio_socket.py @@ -119,6 +119,20 @@ async def _upgrade_websocket(self, environ): async def _websocket_handler(self, ws): """Engine.IO handler for websocket transport.""" + # force a polling cycle on the client in short intervals + # this is necessary because some clients may block on a long polling request + # and while they are blocked they cannot continue the upgrade process + # sending a no-op every 100ms ensures that clients wake up and the upgrade is fast + # (the same behaviour is also implemented in the javascript engine.io server implementation) + async def check_loop(): + once = False + while not once or not check_loop.quit: + once = True + if self.queue.empty(): + await self.send(packet.Packet(packet.NOOP)) + await self.server.sleep(0.1) + check_loop.quit = False + if self.connected: # the socket was already connected, so this is an upgrade await self.queue.join() # flush the queue first @@ -133,9 +147,10 @@ async def _websocket_handler(self, ws): await ws.send(packet.Packet( packet.PONG, data=six.text_type('probe')).encode(always_bytes=False)) - await self.send(packet.Packet(packet.NOOP)) + asyncio.ensure_future(check_loop()) pkt = await ws.wait() + check_loop.quit = True # we are done with the upgrade process (either success or fail) decoded_pkt = packet.Packet(encoded_packet=pkt) if decoded_pkt.packet_type != packet.UPGRADE: self.upgraded = False diff --git a/engineio/socket.py b/engineio/socket.py index 65cff1c7..6a72ed94 100644 --- a/engineio/socket.py +++ b/engineio/socket.py @@ -139,6 +139,20 @@ def _websocket_handler(self, ws): if hasattr(ws, attr) and hasattr(getattr(ws, attr), 'settimeout'): getattr(ws, attr).settimeout(self.server.ping_timeout) + # force a polling cycle on the client in short intervals + # this is necessary because some clients may block on a long polling request + # and while they are blocked they cannot continue the upgrade process + # sending a no-op every 100ms ensures that clients wake up and the upgrade is fast + # (the same behaviour is also implemented in the javascript engine.io server implementation) + def check_loop(): + once = False + while not once or not check_loop.quit: + once = True + if self.queue.empty(): + self.send(packet.Packet(packet.NOOP)) + self.server.sleep(0.1) + check_loop.quit = False + if self.connected: # the socket was already connected, so this is an upgrade self.queue.join() # flush the queue first @@ -153,9 +167,10 @@ def _websocket_handler(self, ws): ws.send(packet.Packet( packet.PONG, data=six.text_type('probe')).encode(always_bytes=False)) - self.send(packet.Packet(packet.NOOP)) + self.server.start_background_task(check_loop) pkt = ws.wait() + check_loop.quit = True # we are done with the upgrade process (either success or fail) decoded_pkt = packet.Packet(encoded_packet=pkt) if decoded_pkt.packet_type != packet.UPGRADE: self.upgraded = False diff --git a/tests/test_asyncio_socket.py b/tests/test_asyncio_socket.py index 20a8fc24..b4348b4b 100644 --- a/tests/test_asyncio_socket.py +++ b/tests/test_asyncio_socket.py @@ -59,6 +59,7 @@ def _get_mock_server(self): 'websocket_class': 'wc'} mock_server._async['translate_request'].return_value = 'request' mock_server._async['make_response'].return_value = 'response' + mock_server.sleep = asyncio.sleep mock_server._trigger_event = AsyncMock() return mock_server diff --git a/tests/test_socket.py b/tests/test_socket.py index 17dc44e7..b243fc09 100644 --- a/tests/test_socket.py +++ b/tests/test_socket.py @@ -41,6 +41,7 @@ def bg_task(target, *args, **kwargs): return th mock_server.start_background_task = bg_task + mock_server.sleep = time.sleep return mock_server def _join_bg_tasks(self):