From 0a032ea9c5a9022aa0cbb89d84e2228a03e0c92f Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 10 Dec 2019 16:36:48 +0000 Subject: [PATCH 1/3] Move get_state methods into FederationHandler This is a non-functional refactor as a precursor to some other work. --- changelog.d/6503.misc | 1 + synapse/federation/federation_client.py | 97 ++++------------------- synapse/handlers/federation.py | 101 ++++++++++++++++++++++-- 3 files changed, 111 insertions(+), 88 deletions(-) create mode 100644 changelog.d/6503.misc diff --git a/changelog.d/6503.misc b/changelog.d/6503.misc new file mode 100644 index 000000000000..e4e9a5a3d4e5 --- /dev/null +++ b/changelog.d/6503.misc @@ -0,0 +1 @@ +Move get_state methods into FederationHandler. diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 73e1dda6a35c..3a00b998f268 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -17,6 +17,7 @@ import copy import itertools import logging +from typing import List, Tuple from prometheus_client import Counter @@ -37,9 +38,9 @@ ) from synapse.events import builder, room_version_to_event_format from synapse.federation.federation_base import FederationBase, event_from_pdu_json -from synapse.logging.context import make_deferred_yieldable, run_in_background +from synapse.logging.context import make_deferred_yieldable from synapse.logging.utils import log_function -from synapse.util import batch_iter, unwrapFirstError +from synapse.util import unwrapFirstError from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.retryutils import NotRetryingDestination @@ -307,96 +308,28 @@ def get_pdu( return signed_pdu - @defer.inlineCallbacks - @log_function - def get_state_for_room(self, destination, room_id, event_id): - """Requests all of the room state at a given event from a remote homeserver. - - Args: - destination (str): The remote homeserver to query for the state. - room_id (str): The id of the room we're interested in. - event_id (str): The id of the event we want the state at. + async def get_room_state_ids( + self, destination: str, room_id: str, event_id: str + ) -> Tuple[List[str], List[str]]: + """Calls the /state_ids endpoint to fetch the state at a particular point + in the room, and the auth events for the given event Returns: - Deferred[Tuple[List[EventBase], List[EventBase]]]: - A list of events in the state, and a list of events in the auth chain - for the given event. + a tuple of (state event_ids, auth event_ids) """ - result = yield self.transport_layer.get_room_state_ids( + result = await self.transport_layer.get_room_state_ids( destination, room_id, event_id=event_id ) state_event_ids = result["pdu_ids"] auth_event_ids = result.get("auth_chain_ids", []) - desired_events = set(state_event_ids + auth_event_ids) - event_map = yield self.get_events_from_store_or_dest( - destination, room_id, desired_events - ) - - failed_to_fetch = desired_events - event_map.keys() - if failed_to_fetch: - logger.warning( - "Failed to fetch missing state/auth events for %s: %s", - room_id, - failed_to_fetch, - ) - - pdus = [event_map[e_id] for e_id in state_event_ids if e_id in event_map] - auth_chain = [event_map[e_id] for e_id in auth_event_ids if e_id in event_map] - - auth_chain.sort(key=lambda e: e.depth) - - return pdus, auth_chain - - @defer.inlineCallbacks - def get_events_from_store_or_dest(self, destination, room_id, event_ids): - """Fetch events from a remote destination, checking if we already have them. - - Args: - destination (str) - room_id (str) - event_ids (Iterable[str]) - - Returns: - Deferred[dict[str, EventBase]]: A deferred resolving to a map - from event_id to event - """ - fetched_events = yield self.store.get_events(event_ids, allow_rejected=True) - - missing_events = set(event_ids) - fetched_events.keys() - - if not missing_events: - return fetched_events - - logger.debug( - "Fetching unknown state/auth events %s for room %s", - missing_events, - event_ids, - ) - - room_version = yield self.store.get_room_version(room_id) - - # XXX 20 requests at once? really? - for batch in batch_iter(missing_events, 20): - deferreds = [ - run_in_background( - self.get_pdu, - destinations=[destination], - event_id=e_id, - room_version=room_version, - ) - for e_id in batch - ] - - res = yield make_deferred_yieldable( - defer.DeferredList(deferreds, consumeErrors=True) - ) - for success, result in res: - if success and result: - fetched_events[result.event_id] = result + if not isinstance(state_event_ids, list) or not isinstance( + auth_event_ids, list + ): + raise Exception("invalid response from /state_ids") - return fetched_events + return state_event_ids, auth_event_ids @defer.inlineCallbacks @log_function diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index bc26921768c7..c0dcf9abf860 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -64,7 +64,7 @@ from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet from synapse.state import StateResolutionStore, resolve_events_with_store from synapse.types import UserID, get_domain_from_id -from synapse.util import unwrapFirstError +from synapse.util import batch_iter, unwrapFirstError from synapse.util.async_helpers import Linearizer from synapse.util.distributor import user_joined_room from synapse.util.retryutils import NotRetryingDestination @@ -379,11 +379,9 @@ def on_receive_pdu(self, origin, pdu, sent_to_us_directly=False): ( remote_state, got_auth_chain, - ) = yield self.federation_client.get_state_for_room( - origin, room_id, p - ) + ) = yield self._get_state_for_room(origin, room_id, p) - # we want the state *after* p; get_state_for_room returns the + # we want the state *after* p; _get_state_for_room returns the # state *before* p. remote_event = yield self.federation_client.get_pdu( [origin], p, room_version, outlier=True @@ -583,6 +581,97 @@ def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth): else: raise + @defer.inlineCallbacks + @log_function + def _get_state_for_room(self, destination, room_id, event_id): + """Requests all of the room state at a given event from a remote homeserver. + + Args: + destination (str): The remote homeserver to query for the state. + room_id (str): The id of the room we're interested in. + event_id (str): The id of the event we want the state at. + + Returns: + Deferred[Tuple[List[EventBase], List[EventBase]]]: + A list of events in the state, and a list of events in the auth chain + for the given event. + """ + ( + state_event_ids, + auth_event_ids, + ) = yield self.federation_client.get_room_state_ids( + destination, room_id, event_id=event_id + ) + + desired_events = set(state_event_ids + auth_event_ids) + event_map = yield self._get_events_from_store_or_dest( + destination, room_id, desired_events + ) + + failed_to_fetch = desired_events - event_map.keys() + if failed_to_fetch: + logger.warning( + "Failed to fetch missing state/auth events for %s: %s", + room_id, + failed_to_fetch, + ) + + pdus = [event_map[e_id] for e_id in state_event_ids if e_id in event_map] + auth_chain = [event_map[e_id] for e_id in auth_event_ids if e_id in event_map] + + auth_chain.sort(key=lambda e: e.depth) + + return pdus, auth_chain + + @defer.inlineCallbacks + def _get_events_from_store_or_dest(self, destination, room_id, event_ids): + """Fetch events from a remote destination, checking if we already have them. + + Args: + destination (str) + room_id (str) + event_ids (Iterable[str]) + + Returns: + Deferred[dict[str, EventBase]]: A deferred resolving to a map + from event_id to event + """ + fetched_events = yield self.store.get_events(event_ids, allow_rejected=True) + + missing_events = set(event_ids) - fetched_events.keys() + + if not missing_events: + return fetched_events + + logger.debug( + "Fetching unknown state/auth events %s for room %s", + missing_events, + event_ids, + ) + + room_version = yield self.store.get_room_version(room_id) + + # XXX 20 requests at once? really? + for batch in batch_iter(missing_events, 20): + deferreds = [ + run_in_background( + self.federation_client.get_pdu, + destinations=[destination], + event_id=e_id, + room_version=room_version, + ) + for e_id in batch + ] + + res = yield make_deferred_yieldable( + defer.DeferredList(deferreds, consumeErrors=True) + ) + for success, result in res: + if success and result: + fetched_events[result.event_id] = result + + return fetched_events + @defer.inlineCallbacks def _process_received_pdu(self, origin, event, state, auth_chain): """ Called when we have a new pdu. We need to do auth checks and put it @@ -723,7 +812,7 @@ def backfill(self, dest, room_id, limit, extremities): state_events = {} events_to_state = {} for e_id in edges: - state, auth = yield self.federation_client.get_state_for_room( + state, auth = yield self._get_state_for_room( destination=dest, room_id=room_id, event_id=e_id ) auth_events.update({a.event_id: a for a in auth}) From 8dd0159d97031b36ca5eda3eca376b4b2205ad9f Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 10 Dec 2019 15:17:34 +0000 Subject: [PATCH 2/3] un-async `get_room_state_ids` this is called from non-async functions, so we're not ready to async it yet. --- synapse/federation/federation_client.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 3a00b998f268..42afe2543634 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -308,16 +308,15 @@ def get_pdu( return signed_pdu - async def get_room_state_ids( - self, destination: str, room_id: str, event_id: str - ) -> Tuple[List[str], List[str]]: + @defer.inlineCallbacks + def get_room_state_ids(self, destination: str, room_id: str, event_id: str): """Calls the /state_ids endpoint to fetch the state at a particular point in the room, and the auth events for the given event Returns: - a tuple of (state event_ids, auth event_ids) + Tuple[List[str], List[str]]: a tuple of (state event_ids, auth event_ids) """ - result = await self.transport_layer.get_room_state_ids( + result = yield self.transport_layer.get_room_state_ids( destination, room_id, event_id=event_id ) From 1a617e9bef2e45e271932fc1f83c8646c6197903 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 10 Dec 2019 16:39:42 +0000 Subject: [PATCH 3/3] fix lint --- synapse/federation/federation_client.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 42afe2543634..d396e6564f6c 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -17,7 +17,6 @@ import copy import itertools import logging -from typing import List, Tuple from prometheus_client import Counter