From 9e5ddf7332b8038de6b1c7108f51f314c4a49bb7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 7 May 2021 15:41:51 +0100 Subject: [PATCH 1/3] Run cache_joined_hosts_for_event in background --- changelog.d/9951.feature | 1 + synapse/handlers/message.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) create mode 100644 changelog.d/9951.feature diff --git a/changelog.d/9951.feature b/changelog.d/9951.feature new file mode 100644 index 000000000000..96a0e7f09fbc --- /dev/null +++ b/changelog.d/9951.feature @@ -0,0 +1 @@ +Improve performance of sending events for worker-based deployments using Redis. diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 5afb7fc261b5..2a577649b309 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -981,7 +981,7 @@ async def handle_new_client_event( await self.action_generator.handle_push_actions_for_event(event, context) - await self.cache_joined_hosts_for_event(event, context) + run_in_background(self.cache_joined_hosts_for_event, event, context) try: # If we're a worker we need to hit out to the master. From b5e2691504f6d6aaa501af042c41bed7ea6fc3b3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 May 2021 15:25:10 +0100 Subject: [PATCH 2/3] Wait for the 'cache_joined_hosts_for_event' to return --- synapse/handlers/message.py | 43 +++++++++++++++++++++++++++++++++---- 1 file changed, 39 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 2a577649b309..60eb599e24f1 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -19,6 +19,7 @@ from canonicaljson import encode_canonical_json +from twisted.internet import defer from twisted.internet.interfaces import IDelayedCall from synapse import event_auth @@ -43,14 +44,14 @@ from synapse.events.builder import EventBuilder from synapse.events.snapshot import EventContext from synapse.events.validator import EventValidator -from synapse.logging.context import run_in_background +from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.http.send_event import ReplicationSendEventRestServlet from synapse.storage.databases.main.events_worker import EventRedactBehaviour from synapse.storage.state import StateFilter from synapse.types import Requester, RoomAlias, StreamToken, UserID, create_requester from synapse.util import json_decoder, json_encoder -from synapse.util.async_helpers import Linearizer +from synapse.util.async_helpers import Linearizer, unwrapFirstError from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.metrics import measure_func from synapse.visibility import filter_events_for_client @@ -979,9 +980,43 @@ async def handle_new_client_event( logger.exception("Failed to encode content: %r", event.content) raise - await self.action_generator.handle_push_actions_for_event(event, context) + # We now persist the event (and update the cache in parallel, since we + # don't want to block on it). + result = await make_deferred_yieldable( + defer.gatherResults( + [ + run_in_background( + self._persist_event, + requester=requester, + event=event, + context=context, + ratelimit=ratelimit, + extra_users=extra_users, + ), + run_in_background( + self.cache_joined_hosts_for_event, event, context + ), + ], + consumeErrors=True, + ) + ).addErrback(unwrapFirstError) - run_in_background(self.cache_joined_hosts_for_event, event, context) + return result[0] + + async def _persist_event( + self, + requester: Requester, + event: EventBase, + context: EventContext, + ratelimit: bool = True, + extra_users: Optional[List[UserID]] = None, + ) -> EventBase: + """Actually persists the event. Should only be called by + `handle_new_client_event`, and see its docstring for documentation of + the arguments. + """ + + await self.action_generator.handle_push_actions_for_event(event, context) try: # If we're a worker we need to hit out to the master. From 11ae6d5b1e784c53d39788ff316c8de5b3a73757 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 12 May 2021 12:22:32 +0100 Subject: [PATCH 3/3] Ensure that cache_joined_hosts_for_event failing doesn't fail request --- synapse/handlers/message.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 60eb599e24f1..9f365eb5ad5a 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -50,7 +50,7 @@ from synapse.storage.databases.main.events_worker import EventRedactBehaviour from synapse.storage.state import StateFilter from synapse.types import Requester, RoomAlias, StreamToken, UserID, create_requester -from synapse.util import json_decoder, json_encoder +from synapse.util import json_decoder, json_encoder, log_failure from synapse.util.async_helpers import Linearizer, unwrapFirstError from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.metrics import measure_func @@ -995,7 +995,7 @@ async def handle_new_client_event( ), run_in_background( self.cache_joined_hosts_for_event, event, context - ), + ).addErrback(log_failure, "cache_joined_hosts_for_event failed"), ], consumeErrors=True, )