Skip to content

Commit

Permalink
Keep the message listener alive inside a while True
Browse files Browse the repository at this point in the history
  • Loading branch information
fselmo committed Jan 23, 2024
1 parent 402ab62 commit 51407e6
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 25 deletions.
19 changes: 12 additions & 7 deletions tests/core/providers/test_wsv2_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pytest
from unittest.mock import (
AsyncMock,
Mock,
patch,
)

Expand Down Expand Up @@ -139,14 +140,20 @@ async def test_msg_listener_task_silences_exceptions_by_default_and_error_logs(c
provider._ws = WebsocketMessageStreamMock(
raise_exception=Exception("test exception")
)
await provider._message_listener_task
await asyncio.sleep(0.05)

assert "test exception" in caplog.text
assert (
"Exception caught in listener, error logging and keeping listener background "
"task alive.\n error=test exception"
) in caplog.text

# assert is still running
assert not provider._message_listener_task.cancelled()

# proper cleanup
await provider.disconnect()


@pytest.mark.asyncio
async def test_msg_listener_task_raises_when_raise_listener_task_exceptions_is_true():
Expand All @@ -169,17 +176,12 @@ async def test_msg_listener_task_raises_when_raise_listener_task_exceptions_is_t


@pytest.mark.asyncio
@skip_if_py37
async def test_listen_event_awaits_msg_processing_when_subscription_queue_is_full():
"""
This test is to ensure that the `listen_event` method will wait for the
`process_subscriptions` method to process a message when the subscription queue
is full.
"""
from unittest.mock import (
AsyncMock,
)

with patch(
"web3.providers.websocket.websocket_v2.connect", new=lambda *_1, **_2: _coro()
):
Expand All @@ -204,7 +206,7 @@ async def test_listen_event_awaits_msg_processing_when_subscription_queue_is_ful

# mock listen event
async_w3.provider._listen_event.wait = AsyncMock()
async_w3.provider._listen_event.set = AsyncMock()
async_w3.provider._listen_event.set = Mock()

# mock subscription and add to active subscriptions
sub_id = "0x1"
Expand Down Expand Up @@ -252,3 +254,6 @@ async def test_listen_event_awaits_msg_processing_when_subscription_queue_is_ful

# assert we set the _listen_event after we consume the message
async_w3.provider._listen_event.set.assert_called_once()

# proper cleanup
await async_w3.provider.disconnect()
39 changes: 21 additions & 18 deletions web3/providers/websocket/websocket_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,23 +216,26 @@ async def _ws_message_listener(self) -> None:
"Websocket listener background task started. Storing all messages in "
"appropriate request processor queues / caches to be processed."
)
try:
async for raw_message in self._ws:
# sleep(0) here seems to be the most efficient way to yield control
# back to the event loop to share the loop with other tasks.
await asyncio.sleep(0)
while True:
# the use of sleep(0) seems to be the most efficient way to yield control
# back to the event loop to share the loop with other tasks.
await asyncio.sleep(0)

response = json.loads(raw_message)
subscription = response.get("method") == "eth_subscription"
await self._request_processor.cache_raw_response(
response, subscription=subscription
try:
async for raw_message in self._ws:
await asyncio.sleep(0)

response = json.loads(raw_message)
subscription = response.get("method") == "eth_subscription"
await self._request_processor.cache_raw_response(
response, subscription=subscription
)
except Exception as e:
if self.raise_listener_task_exceptions:
# If ``True``, raise; else, error log & keep task alive
raise e

self.logger.error(
"Exception caught in listener, error logging and keeping listener "
f"background task alive.\n error={e}"
)
except Exception as e:
if self.raise_listener_task_exceptions:
# If ``True``, raise; else, error log & keep task alive
raise e

self.logger.error(
"Exception caught in listener, error logging and keeping listener "
f"background task alive.\n error={e}"
)

0 comments on commit 51407e6

Please sign in to comment.