From 51407e68b8b2c02025caf70d64f8b47a02326992 Mon Sep 17 00:00:00 2001 From: fselmo Date: Fri, 12 Jan 2024 16:50:08 -0300 Subject: [PATCH] Keep the message listener alive inside a while True --- tests/core/providers/test_wsv2_provider.py | 19 +++++++---- web3/providers/websocket/websocket_v2.py | 39 ++++++++++++---------- 2 files changed, 33 insertions(+), 25 deletions(-) diff --git a/tests/core/providers/test_wsv2_provider.py b/tests/core/providers/test_wsv2_provider.py index 7689da6f1f..85172129c1 100644 --- a/tests/core/providers/test_wsv2_provider.py +++ b/tests/core/providers/test_wsv2_provider.py @@ -3,6 +3,7 @@ import pytest from unittest.mock import ( AsyncMock, + Mock, patch, ) @@ -139,7 +140,7 @@ async def test_msg_listener_task_silences_exceptions_by_default_and_error_logs(c provider._ws = WebsocketMessageStreamMock( raise_exception=Exception("test exception") ) - await provider._message_listener_task + await asyncio.sleep(0.05) assert "test exception" in caplog.text assert ( @@ -147,6 +148,12 @@ async def test_msg_listener_task_silences_exceptions_by_default_and_error_logs(c "task alive.\n error=test exception" ) in caplog.text + # assert is still running + assert not provider._message_listener_task.cancelled() + + # proper cleanup + await provider.disconnect() + @pytest.mark.asyncio async def test_msg_listener_task_raises_when_raise_listener_task_exceptions_is_true(): @@ -169,17 +176,12 @@ async def test_msg_listener_task_raises_when_raise_listener_task_exceptions_is_t @pytest.mark.asyncio -@skip_if_py37 async def test_listen_event_awaits_msg_processing_when_subscription_queue_is_full(): """ This test is to ensure that the `listen_event` method will wait for the `process_subscriptions` method to process a message when the subscription queue is full. """ - from unittest.mock import ( - AsyncMock, - ) - with patch( "web3.providers.websocket.websocket_v2.connect", new=lambda *_1, **_2: _coro() ): @@ -204,7 +206,7 @@ async def test_listen_event_awaits_msg_processing_when_subscription_queue_is_ful # mock listen event async_w3.provider._listen_event.wait = AsyncMock() - async_w3.provider._listen_event.set = AsyncMock() + async_w3.provider._listen_event.set = Mock() # mock subscription and add to active subscriptions sub_id = "0x1" @@ -252,3 +254,6 @@ async def test_listen_event_awaits_msg_processing_when_subscription_queue_is_ful # assert we set the _listen_event after we consume the message async_w3.provider._listen_event.set.assert_called_once() + + # proper cleanup + await async_w3.provider.disconnect() diff --git a/web3/providers/websocket/websocket_v2.py b/web3/providers/websocket/websocket_v2.py index 056ff3b2c2..40d0fec8ff 100644 --- a/web3/providers/websocket/websocket_v2.py +++ b/web3/providers/websocket/websocket_v2.py @@ -216,23 +216,26 @@ async def _ws_message_listener(self) -> None: "Websocket listener background task started. Storing all messages in " "appropriate request processor queues / caches to be processed." ) - try: - async for raw_message in self._ws: - # sleep(0) here seems to be the most efficient way to yield control - # back to the event loop to share the loop with other tasks. - await asyncio.sleep(0) + while True: + # the use of sleep(0) seems to be the most efficient way to yield control + # back to the event loop to share the loop with other tasks. + await asyncio.sleep(0) - response = json.loads(raw_message) - subscription = response.get("method") == "eth_subscription" - await self._request_processor.cache_raw_response( - response, subscription=subscription + try: + async for raw_message in self._ws: + await asyncio.sleep(0) + + response = json.loads(raw_message) + subscription = response.get("method") == "eth_subscription" + await self._request_processor.cache_raw_response( + response, subscription=subscription + ) + except Exception as e: + if self.raise_listener_task_exceptions: + # If ``True``, raise; else, error log & keep task alive + raise e + + self.logger.error( + "Exception caught in listener, error logging and keeping listener " + f"background task alive.\n error={e}" ) - except Exception as e: - if self.raise_listener_task_exceptions: - # If ``True``, raise; else, error log & keep task alive - raise e - - self.logger.error( - "Exception caught in listener, error logging and keeping listener " - f"background task alive.\n error={e}" - )