From f91df1f761b1e9e4da184560b0e7d9557129d064 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 10 Aug 2016 11:31:46 +0100 Subject: [PATCH 1/6] Store if we fail to fetch an event from a destination --- synapse/federation/federation_client.py | 37 ++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index da95c2ad6dc2..baa672c4ac65 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -51,10 +51,34 @@ sent_queries_counter = metrics.register_counter("sent_queries", labels=["type"]) +PDU_RETRY_TIME_MS = 1 * 60 * 1000 + + class FederationClient(FederationBase): def __init__(self, hs): super(FederationClient, self).__init__(hs) + self.pdu_destination_tried = {} + self._clock.looping_call( + self._clear_tried_cache, 60 * 1000, + ) + + def _clear_tried_cache(self): + """Clear pdu_destination_tried cache""" + now = self._clock.time_msec() + + old_dict = self.pdu_destination_tried + self.pdu_destination_tried = {} + + for event_id, destination_dict in old_dict.items(): + destination_dict = { + dest: time + for dest, time in destination_dict.items() + if time + PDU_RETRY_TIME_MS > now + } + if destination_dict: + self.pdu_destination_tried[event_id] = destination_dict + def start_get_pdu_cache(self): self._get_pdu_cache = ExpiringCache( cache_name="get_pdu_cache", @@ -240,8 +264,15 @@ def get_pdu(self, destinations, event_id, outlier=False, timeout=None): if ev: defer.returnValue(ev) + pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {}) + pdu = None for destination in destinations: + now = self._clock.time_msec() + last_attempt = pdu_attempts.get(destination, 0) + if last_attempt + PDU_RETRY_TIME_MS > now: + continue + try: limiter = yield get_retry_limiter( destination, @@ -276,9 +307,11 @@ def get_pdu(self, destinations, event_id, outlier=False, timeout=None): ) continue except CodeMessageException as e: - if 400 <= e.code < 500: + if 400 <= e.code < 500 and e.code != 404: raise + pdu_attempts[destination] = now + logger.info( "Failed to get PDU %s from %s because %s", event_id, destination, e, @@ -288,6 +321,8 @@ def get_pdu(self, destinations, event_id, outlier=False, timeout=None): logger.info(e.message) continue except Exception as e: + pdu_attempts[destination] = now + logger.info( "Failed to get PDU %s from %s because %s", event_id, destination, e, From 11fdfaf03b67810f3d289241f772d3177e3c6b7e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 10 Aug 2016 11:55:15 +0100 Subject: [PATCH 2/6] Only resign our own events --- synapse/handlers/federation.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 618cb53629bf..55d11122bad5 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1093,16 +1093,17 @@ def get_persisted_pdu(self, origin, event_id, do_auth=True): ) if event: - # FIXME: This is a temporary work around where we occasionally - # return events slightly differently than when they were - # originally signed - event.signatures.update( - compute_event_signature( - event, - self.hs.hostname, - self.hs.config.signing_key[0] + if self.hs.is_mine_id(event.event_id): + # FIXME: This is a temporary work around where we occasionally + # return events slightly differently than when they were + # originally signed + event.signatures.update( + compute_event_signature( + event, + self.hs.hostname, + self.hs.config.signing_key[0] + ) ) - ) if do_auth: in_room = yield self.auth.check_host_in_room( From 7f41bcbeec64da874c10a70f8a0b8e3f9f0626d8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 10 Aug 2016 13:22:20 +0100 Subject: [PATCH 3/6] Correctly auth /event/ requests --- synapse/handlers/federation.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 55d11122bad5..2f8959db9234 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -249,7 +249,7 @@ def redact_disallowed(event, state): if ev.type != EventTypes.Member: continue try: - domain = UserID.from_string(ev.state_key).domain + domain = get_domain_from_id(ev.state_key) except: continue @@ -1106,13 +1106,14 @@ def get_persisted_pdu(self, origin, event_id, do_auth=True): ) if do_auth: - in_room = yield self.auth.check_host_in_room( - event.room_id, - origin + events = yield self._filter_events_for_server( + origin, event.room_id, [event] ) - if not in_room: + if not events: raise AuthError(403, "Host not in room.") + event = events[0] + defer.returnValue(event) else: defer.returnValue(None) From ea8c4094dbaa9cec30c543a03f451d2555d1d23d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 10 Aug 2016 13:26:13 +0100 Subject: [PATCH 4/6] Also pull out rejected events --- synapse/federation/federation_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index baa672c4ac65..56115a87d7c9 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -441,7 +441,7 @@ def get_events(self, destinations, room_id, event_ids, return_local=True): events and the second is a list of event ids that we failed to fetch. """ if return_local: - seen_events = yield self.store.get_events(event_ids) + seen_events = yield self.store.get_events(event_ids, allow_rejected=True) signed_events = seen_events.values() else: seen_events = yield self.store.have_events(event_ids) From 739ea29d1ecbdf414db1f5062c8a2aeaa519f4ff Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 10 Aug 2016 13:32:23 +0100 Subject: [PATCH 5/6] Also check if server is in the room --- synapse/handlers/federation.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 2f8959db9234..ff6bb475b58c 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1106,11 +1106,16 @@ def get_persisted_pdu(self, origin, event_id, do_auth=True): ) if do_auth: + in_room = yield self.auth.check_host_in_room( + event.room_id, + origin + ) + if not in_room: + raise AuthError(403, "Host not in room.") + events = yield self._filter_events_for_server( origin, event.room_id, [event] ) - if not events: - raise AuthError(403, "Host not in room.") event = events[0] From 487bc49bf8dcaadd6abd9cee1ef762f1bf0d35a7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 10 Aug 2016 13:39:12 +0100 Subject: [PATCH 6/6] Don't stop on 4xx series errors --- synapse/federation/federation_client.py | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 56115a87d7c9..9ba315171300 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -300,23 +300,13 @@ def get_pdu(self, destinations, event_id, outlier=False, timeout=None): break - except SynapseError as e: - logger.info( - "Failed to get PDU %s from %s because %s", - event_id, destination, e, - ) - continue - except CodeMessageException as e: - if 400 <= e.code < 500 and e.code != 404: - raise - pdu_attempts[destination] = now + except SynapseError as e: logger.info( "Failed to get PDU %s from %s because %s", event_id, destination, e, ) - continue except NotRetryingDestination as e: logger.info(e.message) continue