Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix 'Unhandled error in Deferred' #12089

Merged
merged 3 commits into from
Mar 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/12089.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix occasional 'Unhandled error in Deferred' error message.
64 changes: 32 additions & 32 deletions synapse/util/caches/descriptors.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import logging
from typing import (
Any,
Awaitable,
Callable,
Dict,
Generic,
Expand Down Expand Up @@ -346,15 +347,15 @@ class DeferredCacheListDescriptor(_CacheDescriptorBase):
"""Wraps an existing cache to support bulk fetching of keys.

Given an iterable of keys it looks in the cache to find any hits, then passes
the tuple of missing keys to the wrapped function.
the set of missing keys to the wrapped function.

Once wrapped, the function returns a Deferred which resolves to the list
of results.
Once wrapped, the function returns a Deferred which resolves to a Dict mapping from
input key to output value.
Comment on lines +352 to +353
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a fix to the documentation - it always returned a Dict, rather than a list.

"""

def __init__(
self,
orig: Callable[..., Any],
orig: Callable[..., Awaitable[Dict]],
cached_method_name: str,
list_name: str,
num_args: Optional[int] = None,
Expand Down Expand Up @@ -385,13 +386,13 @@ def __init__(

def __get__(
self, obj: Optional[Any], objtype: Optional[Type] = None
) -> Callable[..., Any]:
) -> Callable[..., "defer.Deferred[Dict[Hashable, Any]]"]:
cached_method = getattr(obj, self.cached_method_name)
cache: DeferredCache[CacheKey, Any] = cached_method.cache
num_args = cached_method.num_args

@functools.wraps(self.orig)
def wrapped(*args: Any, **kwargs: Any) -> Any:
def wrapped(*args: Any, **kwargs: Any) -> "defer.Deferred[Dict]":
# If we're passed a cache_context then we'll want to call its
# invalidate() whenever we are invalidated
invalidate_callback = kwargs.pop("on_invalidate", None)
Expand Down Expand Up @@ -444,39 +445,38 @@ def arg_to_cache_key(arg: Hashable) -> Hashable:
deferred: "defer.Deferred[Any]" = defer.Deferred()
deferreds_map[arg] = deferred
key = arg_to_cache_key(arg)
cache.set(key, deferred, callback=invalidate_callback)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was the source of the problem.

cache.set creates a new observer, which we were then throwing away. So when the error happened, the failure would be propagated into the observer, but there was nothing to handle it.

One option was to change cache.set to avoid creating an observed, but that gets fiddly because it makes the other call to cache.set inconvenient/less efficient.

So instead we change things round and add the resulting observer to the list of deferreds we wait for down at line 479.

cached_defers.append(
cache.set(key, deferred, callback=invalidate_callback)
)

def complete_all(res: Dict[Hashable, Any]) -> None:
# the wrapped function has completed. It returns a
# a dict. We can now resolve the observable deferreds in
# the cache and update our own result map.
for e in missing:
# the wrapped function has completed. It returns a dict.
# We can now update our own result map, and then resolve the
# observable deferreds in the cache.
for e, d1 in deferreds_map.items():
val = res.get(e, None)
deferreds_map[e].callback(val)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

per the comment, having to move this to after the update to results, since we no longer wait for all the calls to complete_all to complete at line 477

# make sure we update the results map before running the
# deferreds, because as soon as we run the last deferred, the
# gatherResults() below will complete and return the result
# dict to our caller.
results[e] = val
d1.callback(val)

def errback(f: Failure) -> Failure:
# the wrapped function has failed. Invalidate any cache
# entries we're supposed to be populating, and fail
# their deferreds.
for e in missing:
key = arg_to_cache_key(e)
cache.invalidate(key)
Comment on lines -463 to -464
Copy link
Member Author

@richvdh richvdh Feb 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this cache.invalidate seems to have been redundant (since the cache will invalidate the entry itself when we errback the deferred), so I have removed it

deferreds_map[e].errback(f)

# return the failure, to propagate to our caller.
return f
def errback_all(f: Failure) -> None:
# the wrapped function has failed. Propagate the failure into
# the cache, which will invalidate the entry, and cause the
# relevant cached_deferreds to fail, which will propagate the
# failure to our caller.
for d1 in deferreds_map.values():
d1.errback(f)

args_to_call = dict(arg_dict)
# copy the missing set before sending it to the callee, to guard against
# modification.
args_to_call[self.list_name] = tuple(missing)

cached_defers.append(
defer.maybeDeferred(
preserve_fn(self.orig), **args_to_call
).addCallbacks(complete_all, errback)
)
args_to_call[self.list_name] = missing

# dispatch the call, and attach the two handlers
defer.maybeDeferred(
preserve_fn(self.orig), **args_to_call
).addCallbacks(complete_all, errback_all)

if cached_defers:
d = defer.gatherResults(cached_defers, consumeErrors=True).addCallbacks(
Expand Down
10 changes: 5 additions & 5 deletions tests/util/caches/test_descriptors.py
Original file line number Diff line number Diff line change
Expand Up @@ -673,14 +673,14 @@ async def list_fn(self, args1, arg2):
self.assertEqual(current_context(), SENTINEL_CONTEXT)
r = yield d1
self.assertEqual(current_context(), c1)
obj.mock.assert_called_once_with((10, 20), 2)
obj.mock.assert_called_once_with({10, 20}, 2)
self.assertEqual(r, {10: "fish", 20: "chips"})
obj.mock.reset_mock()

# a call with different params should call the mock again
obj.mock.return_value = {30: "peas"}
r = yield obj.list_fn([20, 30], 2)
obj.mock.assert_called_once_with((30,), 2)
obj.mock.assert_called_once_with({30}, 2)
self.assertEqual(r, {20: "chips", 30: "peas"})
obj.mock.reset_mock()

Expand All @@ -701,7 +701,7 @@ async def list_fn(self, args1, arg2):
obj.mock.return_value = {40: "gravy"}
iterable = (x for x in [10, 40, 40])
r = yield obj.list_fn(iterable, 2)
obj.mock.assert_called_once_with((40,), 2)
obj.mock.assert_called_once_with({40}, 2)
self.assertEqual(r, {10: "fish", 40: "gravy"})

def test_concurrent_lookups(self):
Expand Down Expand Up @@ -729,7 +729,7 @@ def list_fn(self, args1) -> "Deferred[dict]":
d3 = obj.list_fn([10])

# the mock should have been called exactly once
obj.mock.assert_called_once_with((10,))
obj.mock.assert_called_once_with({10})
obj.mock.reset_mock()

# ... and none of the calls should yet be complete
Expand Down Expand Up @@ -771,7 +771,7 @@ async def list_fn(self, args1, arg2):
# cache miss
obj.mock.return_value = {10: "fish", 20: "chips"}
r1 = yield obj.list_fn([10, 20], 2, on_invalidate=invalidate0)
obj.mock.assert_called_once_with((10, 20), 2)
obj.mock.assert_called_once_with({10, 20}, 2)
self.assertEqual(r1, {10: "fish", 20: "chips"})
obj.mock.reset_mock()

Expand Down