Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

sanity-checking for events used in state res #6531

Merged
merged 3 commits into from
Dec 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/6531.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve sanity-checking when receiving events over federation.
1 change: 1 addition & 0 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@ async def on_receive_pdu(self, origin, pdu, sent_to_us_directly=False) -> None:
event_map[x.event_id] = x

state_map = await resolve_events_with_store(
room_id,
room_version,
state_maps,
event_map,
Expand Down
32 changes: 21 additions & 11 deletions synapse/state/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import logging
from collections import namedtuple
from typing import Iterable, Optional
from typing import Dict, Iterable, List, Optional, Tuple

from six import iteritems, itervalues

Expand Down Expand Up @@ -417,6 +417,7 @@ def resolve_events(self, room_version, state_sets, event):

with Measure(self.clock, "state._resolve_events"):
new_state = yield resolve_events_with_store(
event.room_id,
room_version,
state_set_ids,
event_map=state_map,
Expand Down Expand Up @@ -462,7 +463,7 @@ def resolve_state_groups(
not be called for a single state group

Args:
room_id (str): room we are resolving for (used for logging)
room_id (str): room we are resolving for (used for logging and sanity checks)
room_version (str): version of the room
state_groups_ids (dict[int, dict[(str, str), str]]):
map from state group id to the state in that state group
Expand Down Expand Up @@ -518,6 +519,7 @@ def resolve_state_groups(
logger.info("Resolving conflicted state for %r", room_id)
with Measure(self.clock, "state._resolve_events"):
new_state = yield resolve_events_with_store(
room_id,
room_version,
list(itervalues(state_groups_ids)),
event_map=event_map,
Expand Down Expand Up @@ -589,36 +591,44 @@ def _make_state_cache_entry(new_state, state_groups_ids):
)


def resolve_events_with_store(room_version, state_sets, event_map, state_res_store):
def resolve_events_with_store(
room_id: str,
room_version: str,
state_sets: List[Dict[Tuple[str, str], str]],
event_map: Optional[Dict[str, EventBase]],
state_res_store: "StateResolutionStore",
):
"""
Args:
room_version(str): Version of the room
room_id: the room we are working in

room_version: Version of the room

state_sets(list): List of dicts of (type, state_key) -> event_id,
state_sets: List of dicts of (type, state_key) -> event_id,
which are the different state groups to resolve.

event_map(dict[str,FrozenEvent]|None):
event_map:
a dict from event_id to event, for any events that we happen to
have in flight (eg, those currently being persisted). This will be
used as a starting point fof finding the state we need; any missing
events will be requested via state_map_factory.

If None, all events will be fetched via state_map_factory.
If None, all events will be fetched via state_res_store.

state_res_store (StateResolutionStore)
state_res_store: a place to fetch events from

Returns
Returns:
Deferred[dict[(str, str), str]]:
a map from (type, state_key) to event_id.
"""
v = KNOWN_ROOM_VERSIONS[room_version]
if v.state_res == StateResolutionVersions.V1:
return v1.resolve_events_with_store(
state_sets, event_map, state_res_store.get_events
room_id, state_sets, event_map, state_res_store.get_events
)
else:
return v2.resolve_events_with_store(
room_version, state_sets, event_map, state_res_store
room_id, room_version, state_sets, event_map, state_res_store
)


Expand Down
34 changes: 29 additions & 5 deletions synapse/state/v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import hashlib
import logging
from typing import Callable, Dict, List, Optional, Tuple

from six import iteritems, iterkeys, itervalues

Expand All @@ -24,6 +25,7 @@
from synapse.api.constants import EventTypes
from synapse.api.errors import AuthError
from synapse.api.room_versions import RoomVersions
from synapse.events import EventBase

logger = logging.getLogger(__name__)

Expand All @@ -32,25 +34,32 @@


@defer.inlineCallbacks
def resolve_events_with_store(state_sets, event_map, state_map_factory):
def resolve_events_with_store(
room_id: str,
state_sets: List[Dict[Tuple[str, str], str]],
event_map: Optional[Dict[str, EventBase]],
state_map_factory: Callable,
):
"""
Args:
state_sets(list): List of dicts of (type, state_key) -> event_id,
room_id: the room we are working in

state_sets: List of dicts of (type, state_key) -> event_id,
which are the different state groups to resolve.

event_map(dict[str,FrozenEvent]|None):
event_map:
a dict from event_id to event, for any events that we happen to
have in flight (eg, those currently being persisted). This will be
used as a starting point fof finding the state we need; any missing
events will be requested via state_map_factory.

If None, all events will be fetched via state_map_factory.

state_map_factory(func): will be called
state_map_factory: will be called
with a list of event_ids that are needed, and should return with
a Deferred of dict of event_id to event.

Returns
Returns:
Deferred[dict[(str, str), str]]:
a map from (type, state_key) to event_id.
"""
Expand All @@ -76,6 +85,14 @@ def resolve_events_with_store(state_sets, event_map, state_map_factory):
if event_map is not None:
state_map.update(event_map)

# everything in the state map should be in the right room
for event in state_map.values():
if event.room_id != room_id:
raise Exception(
"Attempting to state-resolve for room %s with event %s which is in %s"
% (room_id, event.event_id, event.room_id,)
)

# get the ids of the auth events which allow us to authenticate the
# conflicted state, picking only from the unconflicting state.
#
Expand All @@ -95,6 +112,13 @@ def resolve_events_with_store(state_sets, event_map, state_map_factory):
)

state_map_new = yield state_map_factory(new_needed_events)
for event in state_map_new.values():
if event.room_id != room_id:
raise Exception(
"Attempting to state-resolve for room %s with event %s which is in %s"
% (room_id, event.event_id, event.room_id,)
)

state_map.update(state_map_new)

return _resolve_with_state(
Expand Down
Loading