Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Move get_state methods into FederationHandler #6503

Merged
merged 3 commits into from
Dec 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/6503.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Move get_state methods into FederationHandler.
91 changes: 11 additions & 80 deletions synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@
)
from synapse.events import builder, room_version_to_event_format
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.logging.context import make_deferred_yieldable
from synapse.logging.utils import log_function
from synapse.util import batch_iter, unwrapFirstError
from synapse.util import unwrapFirstError
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.retryutils import NotRetryingDestination

Expand Down Expand Up @@ -308,19 +308,12 @@ def get_pdu(
return signed_pdu

@defer.inlineCallbacks
@log_function
def get_state_for_room(self, destination, room_id, event_id):
"""Requests all of the room state at a given event from a remote homeserver.

Args:
destination (str): The remote homeserver to query for the state.
room_id (str): The id of the room we're interested in.
event_id (str): The id of the event we want the state at.
def get_room_state_ids(self, destination: str, room_id: str, event_id: str):
"""Calls the /state_ids endpoint to fetch the state at a particular point
in the room, and the auth events for the given event

Returns:
Deferred[Tuple[List[EventBase], List[EventBase]]]:
A list of events in the state, and a list of events in the auth chain
for the given event.
Tuple[List[str], List[str]]: a tuple of (state event_ids, auth event_ids)
"""
result = yield self.transport_layer.get_room_state_ids(
destination, room_id, event_id=event_id
Expand All @@ -329,74 +322,12 @@ def get_state_for_room(self, destination, room_id, event_id):
state_event_ids = result["pdu_ids"]
auth_event_ids = result.get("auth_chain_ids", [])

desired_events = set(state_event_ids + auth_event_ids)
event_map = yield self.get_events_from_store_or_dest(
destination, room_id, desired_events
)

failed_to_fetch = desired_events - event_map.keys()
if failed_to_fetch:
logger.warning(
"Failed to fetch missing state/auth events for %s: %s",
room_id,
failed_to_fetch,
)

pdus = [event_map[e_id] for e_id in state_event_ids if e_id in event_map]
auth_chain = [event_map[e_id] for e_id in auth_event_ids if e_id in event_map]

auth_chain.sort(key=lambda e: e.depth)

return pdus, auth_chain

@defer.inlineCallbacks
def get_events_from_store_or_dest(self, destination, room_id, event_ids):
"""Fetch events from a remote destination, checking if we already have them.

Args:
destination (str)
room_id (str)
event_ids (Iterable[str])

Returns:
Deferred[dict[str, EventBase]]: A deferred resolving to a map
from event_id to event
"""
fetched_events = yield self.store.get_events(event_ids, allow_rejected=True)

missing_events = set(event_ids) - fetched_events.keys()

if not missing_events:
return fetched_events

logger.debug(
"Fetching unknown state/auth events %s for room %s",
missing_events,
event_ids,
)

room_version = yield self.store.get_room_version(room_id)

# XXX 20 requests at once? really?
for batch in batch_iter(missing_events, 20):
deferreds = [
run_in_background(
self.get_pdu,
destinations=[destination],
event_id=e_id,
room_version=room_version,
)
for e_id in batch
]

res = yield make_deferred_yieldable(
defer.DeferredList(deferreds, consumeErrors=True)
)
for success, result in res:
if success and result:
fetched_events[result.event_id] = result
if not isinstance(state_event_ids, list) or not isinstance(
auth_event_ids, list
):
raise Exception("invalid response from /state_ids")

return fetched_events
return state_event_ids, auth_event_ids

@defer.inlineCallbacks
@log_function
Expand Down
101 changes: 95 additions & 6 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
from synapse.state import StateResolutionStore, resolve_events_with_store
from synapse.types import UserID, get_domain_from_id
from synapse.util import unwrapFirstError
from synapse.util import batch_iter, unwrapFirstError
from synapse.util.async_helpers import Linearizer
from synapse.util.distributor import user_joined_room
from synapse.util.retryutils import NotRetryingDestination
Expand Down Expand Up @@ -379,11 +379,9 @@ def on_receive_pdu(self, origin, pdu, sent_to_us_directly=False):
(
remote_state,
got_auth_chain,
) = yield self.federation_client.get_state_for_room(
origin, room_id, p
)
) = yield self._get_state_for_room(origin, room_id, p)

# we want the state *after* p; get_state_for_room returns the
# we want the state *after* p; _get_state_for_room returns the
# state *before* p.
remote_event = yield self.federation_client.get_pdu(
[origin], p, room_version, outlier=True
Expand Down Expand Up @@ -583,6 +581,97 @@ def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth):
else:
raise

@defer.inlineCallbacks
@log_function
def _get_state_for_room(self, destination, room_id, event_id):
"""Requests all of the room state at a given event from a remote homeserver.

Args:
destination (str): The remote homeserver to query for the state.
room_id (str): The id of the room we're interested in.
event_id (str): The id of the event we want the state at.

Returns:
Deferred[Tuple[List[EventBase], List[EventBase]]]:
A list of events in the state, and a list of events in the auth chain
for the given event.
"""
(
state_event_ids,
auth_event_ids,
) = yield self.federation_client.get_room_state_ids(
destination, room_id, event_id=event_id
)

desired_events = set(state_event_ids + auth_event_ids)
event_map = yield self._get_events_from_store_or_dest(
destination, room_id, desired_events
)

failed_to_fetch = desired_events - event_map.keys()
if failed_to_fetch:
logger.warning(
"Failed to fetch missing state/auth events for %s: %s",
room_id,
failed_to_fetch,
)

pdus = [event_map[e_id] for e_id in state_event_ids if e_id in event_map]
auth_chain = [event_map[e_id] for e_id in auth_event_ids if e_id in event_map]

auth_chain.sort(key=lambda e: e.depth)

return pdus, auth_chain

@defer.inlineCallbacks
def _get_events_from_store_or_dest(self, destination, room_id, event_ids):
"""Fetch events from a remote destination, checking if we already have them.

Args:
destination (str)
room_id (str)
event_ids (Iterable[str])

Returns:
Deferred[dict[str, EventBase]]: A deferred resolving to a map
from event_id to event
"""
fetched_events = yield self.store.get_events(event_ids, allow_rejected=True)

missing_events = set(event_ids) - fetched_events.keys()

if not missing_events:
return fetched_events

logger.debug(
"Fetching unknown state/auth events %s for room %s",
missing_events,
event_ids,
)

room_version = yield self.store.get_room_version(room_id)

# XXX 20 requests at once? really?
for batch in batch_iter(missing_events, 20):
deferreds = [
run_in_background(
self.federation_client.get_pdu,
destinations=[destination],
event_id=e_id,
room_version=room_version,
)
for e_id in batch
]

res = yield make_deferred_yieldable(
defer.DeferredList(deferreds, consumeErrors=True)
)
for success, result in res:
if success and result:
fetched_events[result.event_id] = result

return fetched_events

@defer.inlineCallbacks
def _process_received_pdu(self, origin, event, state, auth_chain):
""" Called when we have a new pdu. We need to do auth checks and put it
Expand Down Expand Up @@ -723,7 +812,7 @@ def backfill(self, dest, room_id, limit, extremities):
state_events = {}
events_to_state = {}
for e_id in edges:
state, auth = yield self.federation_client.get_state_for_room(
state, auth = yield self._get_state_for_room(
destination=dest, room_id=room_id, event_id=e_id
)
auth_events.update({a.event_id: a for a in auth})
Expand Down