From e4c46c1a4ea935dc4d2a5b0cf85909ed8b593792 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 25 Feb 2022 16:04:13 +0000 Subject: [PATCH 1/3] Fix 'Unhandled error in Deferred' Fixes a CRITICAL "Unhandled error in Deferred" log message which happened when a function wrapped with `@cachedList` failed --- changelog.d/12089.bugfix | 1 + synapse/util/caches/descriptors.py | 43 +++++++++++++++--------------- 2 files changed, 23 insertions(+), 21 deletions(-) create mode 100644 changelog.d/12089.bugfix diff --git a/changelog.d/12089.bugfix b/changelog.d/12089.bugfix new file mode 100644 index 000000000000..27172c482803 --- /dev/null +++ b/changelog.d/12089.bugfix @@ -0,0 +1 @@ +Fix occasional 'Unhandled error in Deferred' error message. diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index df4fb156c2d0..bc03e6639711 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -444,39 +444,40 @@ def arg_to_cache_key(arg: Hashable) -> Hashable: deferred: "defer.Deferred[Any]" = defer.Deferred() deferreds_map[arg] = deferred key = arg_to_cache_key(arg) - cache.set(key, deferred, callback=invalidate_callback) + cached_defers.append( + cache.set(key, deferred, callback=invalidate_callback) + ) def complete_all(res: Dict[Hashable, Any]) -> None: - # the wrapped function has completed. It returns a - # a dict. We can now resolve the observable deferreds in - # the cache and update our own result map. + # the wrapped function has completed. It returns a dict. + # We can now update our own result map, and then resolve the + # observable deferreds in the cache. for e in missing: val = res.get(e, None) - deferreds_map[e].callback(val) + # make sure we update the results map before running the + # deferreds, because as soon as we run the last deferred, the + # gatherResults() below will complete and return the result + # dict to our caller. results[e] = val + deferreds_map[e].callback(val) - def errback(f: Failure) -> Failure: - # the wrapped function has failed. Invalidate any cache - # entries we're supposed to be populating, and fail - # their deferreds. - for e in missing: - key = arg_to_cache_key(e) - cache.invalidate(key) - deferreds_map[e].errback(f) - - # return the failure, to propagate to our caller. - return f + def errback_all(f: Failure) -> None: + # the wrapped function has failed. Propagate the failure into + # the cache, which will invalidate the entry, and cause the + # relevant cached_deferreds to fail, which will propagate the + # failure to our caller. + for d1 in deferreds_map.values(): + d1.errback(f) args_to_call = dict(arg_dict) # copy the missing set before sending it to the callee, to guard against # modification. args_to_call[self.list_name] = tuple(missing) - cached_defers.append( - defer.maybeDeferred( - preserve_fn(self.orig), **args_to_call - ).addCallbacks(complete_all, errback) - ) + # dispatch the call, and attach the two handlers + defer.maybeDeferred( + preserve_fn(self.orig), **args_to_call + ).addCallbacks(complete_all, errback_all) if cached_defers: d = defer.gatherResults(cached_defers, consumeErrors=True).addCallbacks( From 880aa9962e7f20c6fc0466e26e83f416c6b89bde Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 25 Feb 2022 16:54:47 +0000 Subject: [PATCH 2/3] Minor optimisation to cachedListDescriptor we can avoid re-using `missing`, which saves looking up entries in `deferreds_map`, and means we don't need to copy it. --- synapse/util/caches/descriptors.py | 14 ++++++-------- tests/util/caches/test_descriptors.py | 10 +++++----- 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index bc03e6639711..ba2410998a96 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -346,10 +346,10 @@ class DeferredCacheListDescriptor(_CacheDescriptorBase): """Wraps an existing cache to support bulk fetching of keys. Given an iterable of keys it looks in the cache to find any hits, then passes - the tuple of missing keys to the wrapped function. + the set of missing keys to the wrapped function. - Once wrapped, the function returns a Deferred which resolves to the list - of results. + Once wrapped, the function returns a Deferred which resolves to a Dict mapping from + input key to output value. """ def __init__( @@ -452,14 +452,14 @@ def complete_all(res: Dict[Hashable, Any]) -> None: # the wrapped function has completed. It returns a dict. # We can now update our own result map, and then resolve the # observable deferreds in the cache. - for e in missing: + for e, d1 in deferreds_map.items(): val = res.get(e, None) # make sure we update the results map before running the # deferreds, because as soon as we run the last deferred, the # gatherResults() below will complete and return the result # dict to our caller. results[e] = val - deferreds_map[e].callback(val) + d1.callback(val) def errback_all(f: Failure) -> None: # the wrapped function has failed. Propagate the failure into @@ -470,9 +470,7 @@ def errback_all(f: Failure) -> None: d1.errback(f) args_to_call = dict(arg_dict) - # copy the missing set before sending it to the callee, to guard against - # modification. - args_to_call[self.list_name] = tuple(missing) + args_to_call[self.list_name] = missing # dispatch the call, and attach the two handlers defer.maybeDeferred( diff --git a/tests/util/caches/test_descriptors.py b/tests/util/caches/test_descriptors.py index b92d3f0c1b51..19741ffcdaf1 100644 --- a/tests/util/caches/test_descriptors.py +++ b/tests/util/caches/test_descriptors.py @@ -673,14 +673,14 @@ async def list_fn(self, args1, arg2): self.assertEqual(current_context(), SENTINEL_CONTEXT) r = yield d1 self.assertEqual(current_context(), c1) - obj.mock.assert_called_once_with((10, 20), 2) + obj.mock.assert_called_once_with({10, 20}, 2) self.assertEqual(r, {10: "fish", 20: "chips"}) obj.mock.reset_mock() # a call with different params should call the mock again obj.mock.return_value = {30: "peas"} r = yield obj.list_fn([20, 30], 2) - obj.mock.assert_called_once_with((30,), 2) + obj.mock.assert_called_once_with({30}, 2) self.assertEqual(r, {20: "chips", 30: "peas"}) obj.mock.reset_mock() @@ -701,7 +701,7 @@ async def list_fn(self, args1, arg2): obj.mock.return_value = {40: "gravy"} iterable = (x for x in [10, 40, 40]) r = yield obj.list_fn(iterable, 2) - obj.mock.assert_called_once_with((40,), 2) + obj.mock.assert_called_once_with({40}, 2) self.assertEqual(r, {10: "fish", 40: "gravy"}) def test_concurrent_lookups(self): @@ -729,7 +729,7 @@ def list_fn(self, args1) -> "Deferred[dict]": d3 = obj.list_fn([10]) # the mock should have been called exactly once - obj.mock.assert_called_once_with((10,)) + obj.mock.assert_called_once_with({10}) obj.mock.reset_mock() # ... and none of the calls should yet be complete @@ -771,7 +771,7 @@ async def list_fn(self, args1, arg2): # cache miss obj.mock.return_value = {10: "fish", 20: "chips"} r1 = yield obj.list_fn([10, 20], 2, on_invalidate=invalidate0) - obj.mock.assert_called_once_with((10, 20), 2) + obj.mock.assert_called_once_with({10, 20}, 2) self.assertEqual(r1, {10: "fish", 20: "chips"}) obj.mock.reset_mock() From fd103e60f82f10093b177c06f2af20e4c444d644 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 25 Feb 2022 17:00:23 +0000 Subject: [PATCH 3/3] Improve type annotation on CachedListDescriptor --- synapse/util/caches/descriptors.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index ba2410998a96..1cdead02f14b 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -18,6 +18,7 @@ import logging from typing import ( Any, + Awaitable, Callable, Dict, Generic, @@ -354,7 +355,7 @@ class DeferredCacheListDescriptor(_CacheDescriptorBase): def __init__( self, - orig: Callable[..., Any], + orig: Callable[..., Awaitable[Dict]], cached_method_name: str, list_name: str, num_args: Optional[int] = None, @@ -385,13 +386,13 @@ def __init__( def __get__( self, obj: Optional[Any], objtype: Optional[Type] = None - ) -> Callable[..., Any]: + ) -> Callable[..., "defer.Deferred[Dict[Hashable, Any]]"]: cached_method = getattr(obj, self.cached_method_name) cache: DeferredCache[CacheKey, Any] = cached_method.cache num_args = cached_method.num_args @functools.wraps(self.orig) - def wrapped(*args: Any, **kwargs: Any) -> Any: + def wrapped(*args: Any, **kwargs: Any) -> "defer.Deferred[Dict]": # If we're passed a cache_context then we'll want to call its # invalidate() whenever we are invalidated invalidate_callback = kwargs.pop("on_invalidate", None)