Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Avoid checking the event cache when backfilling events #14164

Merged
merged 6 commits into from
Oct 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/14164.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug introduced in 1.30.0 where purging and rejoining a room without restarting in-between would result in a broken room.
47 changes: 34 additions & 13 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -792,9 +792,42 @@ async def _process_pulled_events(
],
)

# Check if we already any of these have these events.
# Note: we currently make a lookup in the database directly here rather than
# checking the event cache, due to:
# https://github.com/matrix-org/synapse/issues/13476
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could add an additional comment about potentially adding a cached check back when "something like #13916 comes along and correctly invalidates the event cache."

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went ahead and added a comment to the referenced bug instead, which should serve the same purpose and tie things together a bit better on GitHub.

existing_events_map = await self._store._get_events_from_db(
[event.event_id for event in events]
)

new_events = []
for event in events:
event_id = event.event_id

# If we've already seen this event ID...
if event_id in existing_events_map:
existing_event = existing_events_map[event_id]

# ...and the event itself was not previously stored as an outlier...
if not existing_event.event.internal_metadata.is_outlier():
# ...then there's no need to persist it. We have it already.
logger.info(
"_process_pulled_event: Ignoring received event %s which we "
"have already seen",
event.event_id,
)
continue

# While we have seen this event before, it was stored as an outlier.
# We'll now persist it as a non-outlier.
logger.info("De-outliering event %s", event_id)

# Continue on with the events that are new to us.
new_events.append(event)

# We want to sort these by depth so we process them and
# tell clients about them in order.
sorted_events = sorted(events, key=lambda x: x.depth)
sorted_events = sorted(new_events, key=lambda x: x.depth)
for ev in sorted_events:
with nested_logging_context(ev.event_id):
await self._process_pulled_event(origin, ev, backfilled=backfilled)
Expand Down Expand Up @@ -846,18 +879,6 @@ async def _process_pulled_event(

event_id = event.event_id

existing = await self._store.get_event(
event_id, allow_none=True, allow_rejected=True
)
if existing:
if not existing.internal_metadata.is_outlier():
logger.info(
"_process_pulled_event: Ignoring received event %s which we have already seen",
event_id,
)
return
logger.info("De-outliering event %s", event_id)

try:
self._sanity_check_event(event)
except SynapseError as err:
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ async def get_event(
If there is a mismatch, behave as per allow_none.

Returns:
The event, or None if the event was not found.
The event, or None if the event was not found and allow_none is `True`.
"""
if not isinstance(event_id, str):
raise TypeError("Invalid event event_id %r" % (event_id,))
Expand Down
105 changes: 104 additions & 1 deletion tests/handlers/test_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,13 @@
from twisted.test.proto_helpers import MemoryReactor

from synapse.api.constants import EventTypes
from synapse.api.errors import AuthError, Codes, LimitExceededError, SynapseError
from synapse.api.errors import (
AuthError,
Codes,
LimitExceededError,
NotFoundError,
SynapseError,
)
from synapse.api.room_versions import RoomVersions
from synapse.events import EventBase, make_event_from_dict
from synapse.federation.federation_base import event_from_pdu_json
Expand All @@ -28,6 +34,7 @@
from synapse.rest import admin
from synapse.rest.client import login, room
from synapse.server import HomeServer
from synapse.storage.databases.main.events_worker import EventCacheEntry
from synapse.util import Clock
from synapse.util.stringutils import random_string

Expand Down Expand Up @@ -322,6 +329,102 @@ def test_backfill_with_many_backward_extremities(self) -> None:
)
self.get_success(d)

def test_backfill_ignores_known_events(self) -> None:
"""
Tests that events that we already know about are ignored when backfilling.
"""
# Set up users
user_id = self.register_user("kermit", "test")
tok = self.login("kermit", "test")

other_server = "otherserver"
other_user = "@otheruser:" + other_server

# Create a room to backfill events into
room_id = self.helper.create_room_as(room_creator=user_id, tok=tok)
room_version = self.get_success(self.store.get_room_version(room_id))

# Build an event to backfill
event = event_from_pdu_json(
{
"type": EventTypes.Message,
"content": {"body": "hello world", "msgtype": "m.text"},
"room_id": room_id,
"sender": other_user,
"depth": 32,
"prev_events": [],
"auth_events": [],
"origin_server_ts": self.clock.time_msec(),
},
room_version,
)

# Ensure the event is not already in the DB
self.get_failure(
self.store.get_event(event.event_id),
NotFoundError,
)

# Backfill the event and check that it has entered the DB.

# We mock out the FederationClient.backfill method, to pretend that a remote
# server has returned our fake event.
federation_client_backfill_mock = Mock(return_value=make_awaitable([event]))
self.hs.get_federation_client().backfill = federation_client_backfill_mock

# We also mock the persist method with a side effect of itself. This allows us
# to track when it has been called while preserving its function.
persist_events_and_notify_mock = Mock(
side_effect=self.hs.get_federation_event_handler().persist_events_and_notify
)
self.hs.get_federation_event_handler().persist_events_and_notify = (
persist_events_and_notify_mock
)

# Small side-tangent. We populate the event cache with the event, even though
# it is not yet in the DB. This is an invalid scenario that can currently occur
# due to not properly invalidating the event cache.
# See https://github.com/matrix-org/synapse/issues/13476.
#
# As a result, backfill should not rely on the event cache to check whether
# we already have an event in the DB.
# TODO: Remove this bit when the event cache is properly invalidated.
cache_entry = EventCacheEntry(
event=event,
redacted_event=None,
)
self.store._get_event_cache.set_local((event.event_id,), cache_entry)

# We now call FederationEventHandler.backfill (a separate method) to trigger
# a backfill request. It should receive the fake event.
self.get_success(
self.hs.get_federation_event_handler().backfill(
other_user,
room_id,
limit=10,
extremities=[],
)
)

# Check that our fake event was persisted.
persist_events_and_notify_mock.assert_called_once()
persist_events_and_notify_mock.reset_mock()

# Now we repeat the backfill, having the homeserver receive the fake event
# again.
self.get_success(
self.hs.get_federation_event_handler().backfill(
other_user,
room_id,
limit=10,
extremities=[],
),
)

# This time, we expect no event persistence to have occurred, as we already
# have this event.
persist_events_and_notify_mock.assert_not_called()

@unittest.override_config(
{"rc_invites": {"per_user": {"per_second": 0.5, "burst_count": 3}}}
)
Expand Down