Skip to content

Commit

Permalink
Performance updates to SimpleCache, session cache, cache middlewares
Browse files Browse the repository at this point in the history
- Don't lock caches until we have to. If reading from the cache yields a value and a stale value is not an issue, return it. Lock the cache only when writing to it or reading when, say, a stale session is a consideration (async session cache).
  • Loading branch information
fselmo committed Nov 17, 2022
1 parent ff3e023 commit 81807c7
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 66 deletions.
28 changes: 14 additions & 14 deletions tests/core/utilities/test_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
generate_cache_key,
)
from web3._utils.request import (
cache_and_return_async_session,
async_cache_and_return_session,
cache_and_return_session,
)
from web3.utils.caching import (
Expand Down Expand Up @@ -159,21 +159,21 @@ def test_precached_session(mocker):

def test_cache_session_class():
cache = SimpleCache(2)
evicted_items = cache.cache("1", "Hello1")
_, evicted_items = cache.cache("1", "Hello1")
assert cache.get_cache_entry("1") == "Hello1"
assert evicted_items is None

evicted_items = cache.cache("2", "Hello2")
_, evicted_items = cache.cache("2", "Hello2")
assert cache.get_cache_entry("2") == "Hello2"
assert evicted_items is None

# Changing what is stored at a given cache key should not cause the
# anything to be evicted
evicted_items = cache.cache("1", "HelloChanged")
_, evicted_items = cache.cache("1", "HelloChanged")
assert cache.get_cache_entry("1") == "HelloChanged"
assert evicted_items is None

evicted_items = cache.cache("3", "Hello3")
_, evicted_items = cache.cache("3", "Hello3")
assert "2" in cache
assert "3" in cache

Expand Down Expand Up @@ -297,15 +297,15 @@ async def test_async_make_post_request(mocker):
async def test_async_precached_session():
# Add a session
session = ClientSession()
await request.cache_and_return_async_session(TEST_URI, session)
await request.async_cache_and_return_session(TEST_URI, session)
assert len(request._async_session_cache) == 1

# Make sure the session isn't duplicated
await request.cache_and_return_async_session(TEST_URI, session)
await request.async_cache_and_return_session(TEST_URI, session)
assert len(request._async_session_cache) == 1

# Make sure a request with a different URI adds another cached session
await request.cache_and_return_async_session(URI(f"{TEST_URI}/test"), session)
await request.async_cache_and_return_session(URI(f"{TEST_URI}/test"), session)
assert len(request._async_session_cache) == 2

# -- teardown -- #
Expand All @@ -326,7 +326,7 @@ async def test_async_cache_does_not_close_session_before_a_call_when_multithread
request.DEFAULT_TIMEOUT = _timeout_for_testing

async def cache_uri_and_return_session(uri):
_session = await cache_and_return_async_session(uri)
_session = await async_cache_and_return_session(uri)

# simulate a call taking 0.01s to return a response
await asyncio.sleep(0.01)
Expand Down Expand Up @@ -371,7 +371,7 @@ async def test_async_unique_cache_keys_created_per_thread_with_same_uri():
def target_function(endpoint_uri):
event_loop = asyncio.new_event_loop()
unique_session = event_loop.run_until_complete(
cache_and_return_async_session(endpoint_uri)
async_cache_and_return_session(endpoint_uri)
)
test_sessions.append(unique_session)

Expand Down Expand Up @@ -406,7 +406,7 @@ async def test_async_use_new_session_if_loop_closed_for_cached_session():
session1 = ClientSession(raise_for_status=True)
session1._loop = loop1

await cache_and_return_async_session(TEST_URI, session=session1)
await async_cache_and_return_session(TEST_URI, session=session1)

# assert session1 was cached
cache_key = generate_cache_key(f"{threading.get_ident()}:{TEST_URI}")
Expand All @@ -420,7 +420,7 @@ async def test_async_use_new_session_if_loop_closed_for_cached_session():

# assert we create a new session when trying to retrieve the session at the
# cache key for TEST_URI
session2 = await cache_and_return_async_session(TEST_URI)
session2 = await async_cache_and_return_session(TEST_URI)
assert not session2._loop.is_closed()
assert session2 != session1

Expand All @@ -442,7 +442,7 @@ async def test_async_use_new_session_if_session_closed_for_cached_session():
# create a session, close it, and cache it at the cache key for TEST_URI
session1 = ClientSession(raise_for_status=True)
await session1.close()
await cache_and_return_async_session(TEST_URI, session=session1)
await async_cache_and_return_session(TEST_URI, session=session1)

# assert session1 was cached
cache_key = generate_cache_key(f"{threading.get_ident()}:{TEST_URI}")
Expand All @@ -452,7 +452,7 @@ async def test_async_use_new_session_if_session_closed_for_cached_session():
assert cached_session == session1

# assert we create a new session when trying to retrieve closed session from cache
session2 = await cache_and_return_async_session(TEST_URI)
session2 = await async_cache_and_return_session(TEST_URI)
assert not session2.closed
assert session2 != session1

Expand Down
4 changes: 2 additions & 2 deletions web3/_utils/module_testing/module_testing_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
Literal,
)
from web3._utils.request import (
cache_and_return_async_session,
async_cache_and_return_session,
cache_and_return_session,
)
from web3.types import (
Expand Down Expand Up @@ -153,7 +153,7 @@ async def _mock_specific_request(
return AsyncMockedResponse()

# else, make a normal request (no mocking)
session = await cache_and_return_async_session(url_from_args)
session = await async_cache_and_return_session(url_from_args)
return await session.request(
method=http_method.upper(), url=url_from_args, **kwargs
)
Expand Down
41 changes: 23 additions & 18 deletions web3/_utils/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,20 @@ def cache_and_return_session(
# cache key should have a unique thread identifier
cache_key = generate_cache_key(f"{threading.get_ident()}:{endpoint_uri}")

evicted_items = None
with _session_cache_lock:
if cache_key not in _session_cache:
if session is None:
session = requests.Session()
cached_session = _session_cache.get_cache_entry(cache_key)
if cached_session is not None:
# If read from cache yields a session, no need to lock; return the session.
# Sync is a bit simpler in this way since a `requests.Session` doesn't really
# "close" in the same way that an async `ClientSession` does. When "closed", it
# still uses http / https adapters successfully if a request is made.
return cached_session

evicted_items = _session_cache.cache(cache_key, session)
logger.debug(f"Session cached: {endpoint_uri}, {session}")
if session is None:
session = requests.Session()

cached_session = _session_cache.get_cache_entry(cache_key)
with _session_cache_lock:
cached_session, evicted_items = _session_cache.cache(cache_key, session)
logger.debug(f"Session cached: {endpoint_uri}, {cached_session}")

if evicted_items is not None:
evicted_sessions = evicted_items.values()
Expand Down Expand Up @@ -126,7 +130,7 @@ def _close_evicted_sessions(evicted_sessions: List[requests.Session]) -> None:
_async_session_pool = ThreadPoolExecutor(max_workers=1)


async def cache_and_return_async_session(
async def async_cache_and_return_session(
endpoint_uri: URI,
session: Optional[ClientSession] = None,
) -> ClientSession:
Expand All @@ -139,8 +143,10 @@ async def cache_and_return_async_session(
if session is None:
session = ClientSession(raise_for_status=True)

evicted_items = _async_session_cache.cache(cache_key, session)
logger.debug(f"Async session cached: {endpoint_uri}, {session}")
cached_session, evicted_items = _async_session_cache.cache(
cache_key, session
)
logger.debug(f"Async session cached: {endpoint_uri}, {cached_session}")

else:
# get the cached session
Expand Down Expand Up @@ -171,11 +177,10 @@ async def cache_and_return_async_session(

# replace stale session with a new session at the cache key
_session = ClientSession(raise_for_status=True)
evicted_items = _async_session_cache.cache(cache_key, _session)
logger.debug(f"Async session cached: {endpoint_uri}, {_session}")

# get the cached session
cached_session = _async_session_cache.get_cache_entry(cache_key)
cached_session, evicted_items = _async_session_cache.cache(
cache_key, _session
)
logger.debug(f"Async session cached: {endpoint_uri}, {cached_session}")

if evicted_items is not None:
# At this point the evicted sessions are already popped out of the cache and
Expand Down Expand Up @@ -206,7 +211,7 @@ async def async_get_response_from_get_request(
endpoint_uri: URI, *args: Any, **kwargs: Any
) -> ClientResponse:
kwargs.setdefault("timeout", ClientTimeout(DEFAULT_TIMEOUT))
session = await cache_and_return_async_session(endpoint_uri)
session = await async_cache_and_return_session(endpoint_uri)
response = await session.get(endpoint_uri, *args, **kwargs)
return response

Expand All @@ -223,7 +228,7 @@ async def async_get_response_from_post_request(
endpoint_uri: URI, *args: Any, **kwargs: Any
) -> ClientResponse:
kwargs.setdefault("timeout", ClientTimeout(DEFAULT_TIMEOUT))
session = await cache_and_return_async_session(endpoint_uri)
session = await async_cache_and_return_session(endpoint_uri)
response = await session.post(endpoint_uri, *args, **kwargs)
return response

Expand Down
19 changes: 10 additions & 9 deletions web3/middleware/async_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,18 @@ async def async_simple_cache_middleware(

async def middleware(method: RPCEndpoint, params: Any) -> RPCResponse:
if method in rpc_whitelist:
async with async_lock(_async_request_thread_pool, lock):
cache_key = generate_cache_key(
f"{threading.get_ident()}:{(method, params)}"
)
if cache.__contains__(cache_key):
return cache.get_cache_entry(cache_key)
cache_key = generate_cache_key(
f"{threading.get_ident()}:{(method, params)}"
)
cached_request = cache.get_cache_entry(cache_key)
if cached_request is not None:
return cached_request

response = await make_request(method, params)
if should_cache_fn(method, params, response):
response = await make_request(method, params)
if should_cache_fn(method, params, response):
async with async_lock(_async_request_thread_pool, lock):
cache.cache(cache_key, response)
return response
return response
else:
return await make_request(method, params)

Expand Down
37 changes: 17 additions & 20 deletions web3/middleware/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,27 +93,24 @@ def simple_cache_middleware(
lock = threading.Lock()

def middleware(method: RPCEndpoint, params: Any) -> RPCResponse:
lock_acquired = (
lock.acquire(blocking=False) if method in rpc_whitelist else False
)

try:
if lock_acquired and method in rpc_whitelist:
cache_key = generate_cache_key(
f"{threading.get_ident()}:{(method, params)}"
)
if cache.__contains__(cache_key):
return cache.get_cache_entry(cache_key)

response = make_request(method, params)
if should_cache_fn(method, params, response):
if method in rpc_whitelist:
cache_key = generate_cache_key(
f"{threading.get_ident()}:{(method, params)}"
)
cached_request = cache.get_cache_entry(cache_key)
if cached_request is not None:
return cached_request

response = make_request(method, params)
if should_cache_fn(method, params, response):
lock.acquire(blocking=False)
try:
cache.cache(cache_key, response)
return response
else:
return make_request(method, params)
finally:
if lock_acquired:
lock.release()
finally:
lock.release()
return response
else:
return make_request(method, params)

return middleware

Expand Down
2 changes: 1 addition & 1 deletion web3/providers/async_rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
construct_user_agent,
)
from web3._utils.request import (
async_cache_and_return_session as _cache_and_return_async_session,
async_make_post_request,
cache_and_return_async_session as _cache_and_return_async_session,
get_default_http_endpoint,
)
from web3.types import (
Expand Down
10 changes: 8 additions & 2 deletions web3/utils/caching.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
Any,
Dict,
Optional,
Tuple,
)


Expand All @@ -13,7 +14,7 @@ def __init__(self, size: int = 100):
self._size = size
self._data: OrderedDict[str, Any] = OrderedDict()

def cache(self, key: str, value: Any) -> Dict[str, Any]:
def cache(self, key: str, value: Any) -> Tuple[Any, Dict[str, Any]]:
evicted_items = None
# If the key is already in the OrderedDict just update it
# and don't evict any values. Ideally, we could still check to see
Expand All @@ -26,7 +27,12 @@ def cache(self, key: str, value: Any) -> Dict[str, Any]:
k, v = self._data.popitem(last=False)
evicted_items[k] = v
self._data[key] = value
return evicted_items

# Return the cached value along with the evicted items at the same time. If we
# only return the evicted items and try to pull the cached entry back out of the
# cache, by the time we reach back into the cache it may have been evicted
# from the cache.
return value, evicted_items

def get_cache_entry(self, key: str) -> Optional[Any]:
return self._data[key] if key in self._data else None
Expand Down

0 comments on commit 81807c7

Please sign in to comment.