diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index b883c64f1ba3..caaacecfb781 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -65,7 +65,6 @@ from synapse.state import StateResolutionStore, resolve_events_with_store from synapse.storage.data_stores.main.events_worker import EventRedactBehaviour from synapse.types import UserID, get_domain_from_id -from synapse.util import batch_iter, unwrapFirstError from synapse.util.async_helpers import Linearizer, concurrently_execute from synapse.util.distributor import user_joined_room from synapse.util.retryutils import NotRetryingDestination @@ -238,7 +237,6 @@ async def on_receive_pdu(self, origin, pdu, sent_to_us_directly=False) -> None: return None state = None - auth_chain = [] # Get missing pdus if necessary. if not pdu.internal_metadata.is_outlier(): @@ -342,7 +340,6 @@ async def on_receive_pdu(self, origin, pdu, sent_to_us_directly=False) -> None: # Calculate the state after each of the previous events, and # resolve them to find the correct state at the current event. - auth_chains = set() event_map = {event_id: pdu} try: # Get the state of the events we know about @@ -366,24 +363,14 @@ async def on_receive_pdu(self, origin, pdu, sent_to_us_directly=False) -> None: p, ) - room_version = await self.store.get_room_version(room_id) - with nested_logging_context(p): # note that if any of the missing prevs share missing state or # auth events, the requests to fetch those events are deduped # by the get_pdu_cache in federation_client. - ( - remote_state, - got_auth_chain, - ) = await self._get_state_for_room( + (remote_state, _,) = await self._get_state_for_room( origin, room_id, p, include_event_in_state=True ) - # XXX hrm I'm not convinced that duplicate events will compare - # for equality, so I'm not sure this does what the author - # hoped. - auth_chains.update(got_auth_chain) - remote_state_map = { (x.type, x.state_key): x.event_id for x in remote_state } @@ -392,6 +379,7 @@ async def on_receive_pdu(self, origin, pdu, sent_to_us_directly=False) -> None: for x in remote_state: event_map[x.event_id] = x + room_version = await self.store.get_room_version(room_id) state_map = await resolve_events_with_store( room_id, room_version, @@ -413,7 +401,6 @@ async def on_receive_pdu(self, origin, pdu, sent_to_us_directly=False) -> None: event_map.update(evs) state = [event_map[e] for e in six.itervalues(state_map)] - auth_chain = list(auth_chains) except Exception: logger.warning( "[%s %s] Error attempting to resolve state at missing " @@ -429,9 +416,7 @@ async def on_receive_pdu(self, origin, pdu, sent_to_us_directly=False) -> None: affected=event_id, ) - await self._process_received_pdu( - origin, pdu, state=state, auth_chain=auth_chain - ) + await self._process_received_pdu(origin, pdu, state=state) async def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth): """ @@ -633,6 +618,8 @@ def _get_events_from_store_or_dest(self, destination, room_id, event_ids): room_id (str) event_ids (Iterable[str]) + Persists any events we don't already have as outliers. + If we fail to fetch any of the events, a warning will be logged, and the event will be omitted from the result. Likewise, any events which turn out not to be in the given room. @@ -652,27 +639,15 @@ def _get_events_from_store_or_dest(self, destination, room_id, event_ids): room_id, ) - room_version = yield self.store.get_room_version(room_id) - - # XXX 20 requests at once? really? - for batch in batch_iter(missing_events, 20): - deferreds = [ - run_in_background( - self.federation_client.get_pdu, - destinations=[destination], - event_id=e_id, - room_version=room_version, - ) - for e_id in batch - ] - - res = yield make_deferred_yieldable( - defer.DeferredList(deferreds, consumeErrors=True) - ) + yield self._get_events_and_persist( + destination=destination, room_id=room_id, events=missing_events + ) - for success, result in res: - if success and result: - fetched_events[result.event_id] = result + # we need to make sure we re-load from the database to get the rejected + # state correct. + fetched_events.update( + (yield self.store.get_events(missing_events, allow_rejected=True)) + ) # check for events which were in the wrong room. # @@ -702,7 +677,7 @@ def _get_events_from_store_or_dest(self, destination, room_id, event_ids): return fetched_events @defer.inlineCallbacks - def _process_received_pdu(self, origin, event, state, auth_chain): + def _process_received_pdu(self, origin, event, state): """ Called when we have a new pdu. We need to do auth checks and put it through the StateHandler.