From 8a6a114dec6fc9cdc48306b75ea12cfabd90b7cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Thu, 12 Oct 2023 13:35:06 +0000 Subject: [PATCH 1/4] Allow tracking/reporting and closing of "lost" connections. ConnectionPool keeps a WeakSet of in_use connections, allowing lost ones to be collected. Collection produces a warning and closes the underlying transport. --- redis/asyncio/client.py | 1 + redis/asyncio/connection.py | 21 ++++++++++++++++++++- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py index e4d2e776bc..acc89941f2 100644 --- a/redis/asyncio/client.py +++ b/redis/asyncio/client.py @@ -546,6 +546,7 @@ def __del__( _grl().call_exception_handler(context) except RuntimeError: pass + self.connection._close() async def aclose(self, close_connection_pool: Optional[bool] = None) -> None: """ diff --git a/redis/asyncio/connection.py b/redis/asyncio/connection.py index 1ef9960ff3..7b0443454b 100644 --- a/redis/asyncio/connection.py +++ b/redis/asyncio/connection.py @@ -5,6 +5,7 @@ import socket import ssl import sys +import warnings import weakref from abc import abstractmethod from itertools import chain @@ -204,6 +205,24 @@ def __init__( raise ConnectionError("protocol must be either 2 or 3") self.protocol = protocol + def __del__(self, _warnings: Any = warnings): + # For some reason, the individual streams don't get properly garbage + # collected and therefore produce no resource warnings. We add one + # here, in the same style as those from the stdlib. + if getattr(self, "_writer", None): + _warnings.warn( + f"unclosed Connection {self!r}", ResourceWarning, source=self + ) + self._close() + + def _close(self): + """ + Internal method to silently close the connection without waiting + """ + if self._writer: + self._writer.close() + self._writer = self._reader = None + def __repr__(self): repr_args = ",".join((f"{k}={v}" for k, v in self.repr_pieces())) return f"{self.__class__.__name__}<{repr_args}>" @@ -1017,7 +1036,7 @@ def __repr__(self): def reset(self): self._available_connections = [] - self._in_use_connections = set() + self._in_use_connections = weakref.WeakSet() def can_get_connection(self) -> bool: """Return True if a connection can be retrieved from the pool.""" From 57876e7ccb3cdfffdf668692896dc310cebc3420 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Fri, 13 Oct 2023 14:41:44 +0000 Subject: [PATCH 2/4] Add tests for the __del__ handlers of async Redis and Connection objects --- tests/test_asyncio/test_connection.py | 47 +++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/tests/test_asyncio/test_connection.py b/tests/test_asyncio/test_connection.py index 28e6b0d9c3..42f4c79110 100644 --- a/tests/test_asyncio/test_connection.py +++ b/tests/test_asyncio/test_connection.py @@ -436,3 +436,50 @@ async def mock_disconnect(_): assert called == 0 await pool.disconnect() + + +async def test_client_garbage_collection(request): + """ + Test that a Redis client will call _close() on any + connection that it holds at time of destruction + """ + + url: str = request.config.getoption("--redis-url") + pool = ConnectionPool.from_url(url) + + # create a client with a connection from the pool + client = Redis(connection_pool=pool, single_connection_client=True) + await client.initialize() + with mock.patch.object(client, "connection") as a: + # we cannot, in unittests, or from asyncio, reliably trigger garbage collection + # so we must just invoke the handler + client.__del__() + assert a._close.called + + await client.aclose() + await pool.aclose() + + +async def test_connection_garbage_collection(request): + """ + Test that a Connection object will call close() on the + stream that it holds. + """ + + url: str = request.config.getoption("--redis-url") + pool = ConnectionPool.from_url(url) + + # create a client with a connection from the pool + client = Redis(connection_pool=pool, single_connection_client=True) + await client.initialize() + conn = client.connection + + with mock.patch.object(conn, "_reader"): + with mock.patch.object(conn, "_writer") as a: + # we cannot, in unittests, or from asyncio, reliably trigger garbage collection + # so we must just invoke the handler + conn.__del__() + assert a.close.called + + await client.aclose() + await pool.aclose() From e86bdce3cb018c467e1de79a6c1b129921ce41e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Fri, 13 Oct 2023 14:42:15 +0000 Subject: [PATCH 3/4] capture expected warnings in the test --- tests/test_asyncio/test_connection.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/tests/test_asyncio/test_connection.py b/tests/test_asyncio/test_connection.py index 42f4c79110..85834181cc 100644 --- a/tests/test_asyncio/test_connection.py +++ b/tests/test_asyncio/test_connection.py @@ -316,7 +316,8 @@ async def mock_aclose(self): url: str = request.config.getoption("--redis-url") r1 = await Redis.from_url(url) with patch.object(r1, "aclose", mock_aclose): - await r1.close() + with pytest.deprecated_call(): + await r1.close() assert calls == 1 with pytest.deprecated_call(): @@ -453,8 +454,9 @@ async def test_client_garbage_collection(request): with mock.patch.object(client, "connection") as a: # we cannot, in unittests, or from asyncio, reliably trigger garbage collection # so we must just invoke the handler - client.__del__() - assert a._close.called + with pytest.warns(ResourceWarning): + client.__del__() + assert a._close.called await client.aclose() await pool.aclose() @@ -478,8 +480,9 @@ async def test_connection_garbage_collection(request): with mock.patch.object(conn, "_writer") as a: # we cannot, in unittests, or from asyncio, reliably trigger garbage collection # so we must just invoke the handler - conn.__del__() - assert a.close.called + with pytest.warns(ResourceWarning): + conn.__del__() + assert a.close.called await client.aclose() await pool.aclose() From c497789161fc913ff7d3e2cd4fbfc07bb68d9706 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Fri, 13 Oct 2023 15:57:12 +0000 Subject: [PATCH 4/4] lint --- tests/test_asyncio/test_connection.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_asyncio/test_connection.py b/tests/test_asyncio/test_connection.py index 85834181cc..474a906091 100644 --- a/tests/test_asyncio/test_connection.py +++ b/tests/test_asyncio/test_connection.py @@ -478,8 +478,8 @@ async def test_connection_garbage_collection(request): with mock.patch.object(conn, "_reader"): with mock.patch.object(conn, "_writer") as a: - # we cannot, in unittests, or from asyncio, reliably trigger garbage collection - # so we must just invoke the handler + # we cannot, in unittests, or from asyncio, reliably trigger + # garbage collection so we must just invoke the handler with pytest.warns(ResourceWarning): conn.__del__() assert a.close.called