Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

sanity-checking for events used in state res #6531

Merged
merged 3 commits into from
Dec 13, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/6531.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve sanity-checking when receiving events over federation.
1 change: 1 addition & 0 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@ async def on_receive_pdu(self, origin, pdu, sent_to_us_directly=False) -> None:
event_map[x.event_id] = x

state_map = await resolve_events_with_store(
room_id,
room_version,
state_maps,
event_map,
Expand Down
32 changes: 21 additions & 11 deletions synapse/state/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import logging
from collections import namedtuple
from typing import Iterable, Optional
from typing import Dict, Iterable, List, Optional, Tuple

from six import iteritems, itervalues

Expand Down Expand Up @@ -417,6 +417,7 @@ def resolve_events(self, room_version, state_sets, event):

with Measure(self.clock, "state._resolve_events"):
new_state = yield resolve_events_with_store(
event.room_id,
room_version,
state_set_ids,
event_map=state_map,
Expand Down Expand Up @@ -462,7 +463,7 @@ def resolve_state_groups(
not be called for a single state group

Args:
room_id (str): room we are resolving for (used for logging)
room_id (str): room we are resolving for (used for logging and sanity checks)
room_version (str): version of the room
state_groups_ids (dict[int, dict[(str, str), str]]):
map from state group id to the state in that state group
Expand Down Expand Up @@ -518,6 +519,7 @@ def resolve_state_groups(
logger.info("Resolving conflicted state for %r", room_id)
with Measure(self.clock, "state._resolve_events"):
new_state = yield resolve_events_with_store(
room_id,
room_version,
list(itervalues(state_groups_ids)),
event_map=event_map,
Expand Down Expand Up @@ -589,36 +591,44 @@ def _make_state_cache_entry(new_state, state_groups_ids):
)


def resolve_events_with_store(room_version, state_sets, event_map, state_res_store):
def resolve_events_with_store(
room_id: str,
room_version: str,
state_sets: List[Dict[Tuple[str, str], str]],
event_map: Optional[Dict[str, EventBase]],
state_res_store: "StateResolutionStore",
):
"""
Args:
room_version(str): Version of the room
room_id: the room we are working in

room_version: Version of the room

state_sets(list): List of dicts of (type, state_key) -> event_id,
state_sets: List of dicts of (type, state_key) -> event_id,
which are the different state groups to resolve.

event_map(dict[str,FrozenEvent]|None):
event_map:
a dict from event_id to event, for any events that we happen to
have in flight (eg, those currently being persisted). This will be
used as a starting point fof finding the state we need; any missing
events will be requested via state_map_factory.

If None, all events will be fetched via state_map_factory.
If None, all events will be fetched via state_res_store.

state_res_store (StateResolutionStore)
state_res_store: a place to fetch events from

Returns
Returns:
Deferred[dict[(str, str), str]]:
a map from (type, state_key) to event_id.
"""
v = KNOWN_ROOM_VERSIONS[room_version]
if v.state_res == StateResolutionVersions.V1:
return v1.resolve_events_with_store(
state_sets, event_map, state_res_store.get_events
room_id, state_sets, event_map, state_res_store.get_events
)
else:
return v2.resolve_events_with_store(
room_version, state_sets, event_map, state_res_store
room_id, room_version, state_sets, event_map, state_res_store
)


Expand Down
34 changes: 29 additions & 5 deletions synapse/state/v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import hashlib
import logging
from typing import Callable, Dict, List, Optional, Tuple

from six import iteritems, iterkeys, itervalues

Expand All @@ -24,6 +25,7 @@
from synapse.api.constants import EventTypes
from synapse.api.errors import AuthError
from synapse.api.room_versions import RoomVersions
from synapse.events import EventBase

logger = logging.getLogger(__name__)

Expand All @@ -32,25 +34,32 @@


@defer.inlineCallbacks
def resolve_events_with_store(state_sets, event_map, state_map_factory):
def resolve_events_with_store(
room_id: str,
state_sets: List[Dict[Tuple[str, str], str]],
event_map: Optional[Dict[str, EventBase]],
state_map_factory: Callable,
):
"""
Args:
state_sets(list): List of dicts of (type, state_key) -> event_id,
room_id: the room we are working in

state_sets: List of dicts of (type, state_key) -> event_id,
which are the different state groups to resolve.

event_map(dict[str,FrozenEvent]|None):
event_map:
a dict from event_id to event, for any events that we happen to
have in flight (eg, those currently being persisted). This will be
used as a starting point fof finding the state we need; any missing
events will be requested via state_map_factory.

If None, all events will be fetched via state_map_factory.

state_map_factory(func): will be called
state_map_factory: will be called
with a list of event_ids that are needed, and should return with
a Deferred of dict of event_id to event.

Returns
Returns:
Deferred[dict[(str, str), str]]:
a map from (type, state_key) to event_id.
"""
Expand All @@ -76,6 +85,14 @@ def resolve_events_with_store(state_sets, event_map, state_map_factory):
if event_map is not None:
state_map.update(event_map)

# everything in the state map should be in the right room
for event in state_map.values():
if event.room_id != room_id:
raise Exception(
"Attempting to state-resolve for room %s with event %s which is in %s"
% (room_id, event.event_id, event.room_id,)
)

# get the ids of the auth events which allow us to authenticate the
# conflicted state, picking only from the unconflicting state.
#
Expand All @@ -95,6 +112,13 @@ def resolve_events_with_store(state_sets, event_map, state_map_factory):
)

state_map_new = yield state_map_factory(new_needed_events)
for event in state_map_new.values():
if event.room_id != room_id:
raise Exception(
"Attempting to state-resolve for room %s with event %s which is in %s"
% (room_id, event.event_id, event.room_id,)
)

state_map.update(state_map_new)

return _resolve_with_state(
Expand Down
Loading