Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bust _membership_stream_cache cache when current state changes #17732

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17732.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix membership caches not updating in state reset scenarios.
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
4 changes: 3 additions & 1 deletion synapse/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ def process_replication_position( # noqa: B027 (no-op by design)
"""

def _invalidate_state_caches(
self, room_id: str, members_changed: Collection[str]
self,
room_id: str,
members_changed: Collection[str],
) -> None:
"""Invalidates caches that are based on the current state, but does
not stream invalidations down replication.
Expand Down
36 changes: 36 additions & 0 deletions synapse/storage/databases/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,11 @@ def process_replication_rows(
room_id = row.keys[0]
members_changed = set(row.keys[1:])
self._invalidate_state_caches(room_id, members_changed)
self._curr_state_delta_stream_cache.entity_has_changed( # type: ignore[attr-defined]
room_id, token
)
for user_id in members_changed:
self._membership_stream_cache.entity_has_changed(user_id, token) # type: ignore[attr-defined]
Copy link
Contributor Author

@MadLittleMods MadLittleMods Sep 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kinda weird to just stick this here (same with the others in process_replication_rows). Better way to organize this?

MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
Comment on lines +222 to +226
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wherever we are busting _curr_state_delta_stream_cache, we should also be busting _membership_stream_cache (at-least in the general area, expand the hidden diff to find if not visible)

We've forgotten to bust _curr_state_delta_stream_cache in various places which is why it's added and sometimes not.

elif row.cache_func == PURGE_HISTORY_CACHE_NAME:
if row.keys is None:
raise Exception(
Expand All @@ -236,6 +241,35 @@ def process_replication_rows(
room_id = row.keys[0]
self._invalidate_caches_for_room_events(room_id)
self._invalidate_caches_for_room(room_id)
Comment on lines 242 to 243
Copy link
Contributor Author

@MadLittleMods MadLittleMods Dec 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Separate from this PR and in line with the fact that these code paths need to be cleaned up, there is an obvious duplication here because we call self._invalidate_caches_for_room_events(room_id) inside self._invalidate_caches_for_room(room_id) as well.

self._curr_state_delta_stream_cache.entity_has_changed( # type: ignore[attr-defined]
room_id, token
)
# Note: This code is commented out to improve cache performance.
# While uncommenting would provide complete correctness, our
# automatic forgotten room purge logic (see
# `forgotten_room_retention_period`) means this would frequently
# clear the entire cache (effectively) and probably have a noticable
# impact on the cache hit ratio.
#
# Not updating the cache here is safe because:
#
# 1. `_membership_stream_cache` is only used to indicate the
# *absence* of changes, i.e. "nothing has changed between tokens
# X and Y and so return early and don't query the database".
# 2. `_membership_stream_cache` is used when we query data from
# `current_state_delta_stream` and `room_memberships` but since
# nothing new is written to the database for those tables when
# purging/deleting a room (only deleting rows), there is nothing
# changed to care about.
#
# At worst, the cache might indicate a change at token X, at which
# point, we will query the database and discover nothing is there.
#
# Ideally, we would make it so that we could clear the cache on a
# more granular level but that's a bit complex and fiddly to do with
# room membership.
#
# self._membership_stream_cache.all_entities_changed(token) # type: ignore[attr-defined]
else:
self._attempt_to_invalidate_cache(row.cache_func, row.keys)

Expand Down Expand Up @@ -275,6 +309,7 @@ def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None:
self._attempt_to_invalidate_cache(
"get_sliding_sync_rooms_for_user", None
)
self._membership_stream_cache.entity_has_changed(data.state_key, token) # type: ignore[attr-defined]
elif data.type == EventTypes.RoomEncryption:
self._attempt_to_invalidate_cache(
"get_room_encryption", (data.room_id,)
Expand All @@ -291,6 +326,7 @@ def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None:
# Similar to the above, but the entire caches are invalidated. This is
# unfortunate for the membership caches, but should recover quickly.
self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token) # type: ignore[attr-defined]
self._membership_stream_cache.all_entities_changed(token) # type: ignore[attr-defined]
self._attempt_to_invalidate_cache("get_rooms_for_user", None)
self._attempt_to_invalidate_cache("get_room_type", (data.room_id,))
self._attempt_to_invalidate_cache("get_room_encryption", (data.room_id,))
Expand Down
15 changes: 14 additions & 1 deletion synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1605,7 +1605,13 @@ def _update_current_state_txn(
room_id
delta_state: Deltas that are going to be used to update the
`current_state_events` table. Changes to the current state of the room.
stream_id: TODO
stream_id: This is expected to be the minimum `stream_ordering` for the
batch of events that we are persisting; which means we do not end up in a
situation where workers see events before the `current_state_delta` updates.
FIXME: However, this function also gets called with next upcoming
`stream_ordering` when we re-sync the state of a partial stated room (see
`update_current_state(...)`) which may be "correct" but it would be good to
nail down what exactly is the expected value here.
Comment on lines +1608 to +1614
Copy link
Contributor Author

@MadLittleMods MadLittleMods Oct 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previous conversation: #17512 (comment)

I decided to define it in some way given we're using it for cache busting below and was curious if it is actually correct. Still not confident whether it's perfect for cache busting but might be good enough.

sliding_sync_table_changes: Changes to the
`sliding_sync_membership_snapshots` and `sliding_sync_joined_rooms` tables
derived from the given `delta_state` (see
Expand Down Expand Up @@ -1908,6 +1914,13 @@ def _update_current_state_txn(
stream_id,
)

for user_id in members_to_cache_bust:
txn.call_after(
self.store._membership_stream_cache.entity_has_changed,
user_id,
stream_id,
)
Comment on lines +1917 to +1922
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This matches what we do for _curr_state_delta_stream_cache just above this

Comment on lines +1917 to +1922
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the actual call that busts the the membership cache for the tests. I assume that is because this is what busts in monolith mode vs the other calls I've added are more for workers over replication


# Invalidate the various caches
self.store._invalidate_state_caches_and_stream(
txn, room_id, members_to_cache_bust
Expand Down
9 changes: 9 additions & 0 deletions synapse/util/caches/stream_change_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,15 @@ def entity_has_changed(self, entity: EntityType, stream_pos: int) -> None:
self._entity_to_key[entity] = stream_pos
self._evict()

def all_entities_changed(self, stream_pos: int) -> None:
"""
Mark all entities as changed. This is useful when the cache is invalidated and
there may be some potential change for all of the entities.
"""
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
self._cache.clear()
self._entity_to_key.clear()
self._earliest_known_stream_pos = stream_pos

def _evict(self) -> None:
"""
Ensure the cache has not exceeded the maximum size.
Expand Down
18 changes: 0 additions & 18 deletions tests/rest/client/sliding_sync/test_sliding_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -1169,12 +1169,6 @@ def test_state_reset_room_comes_down_incremental_sync(self) -> None:
self.persistence.persist_event(join_rule_event, join_rule_context)
)

# FIXME: We're manually busting the cache since
# https://github.com/element-hq/synapse/issues/17368 is not solved yet
self.store._membership_stream_cache.entity_has_changed(
user1_id, join_rule_event_pos.stream
)

# Ensure that the state reset worked and only user2 is in the room now
users_in_room = self.get_success(self.store.get_users_in_room(room_id1))
self.assertIncludes(set(users_in_room), {user2_id}, exact=True)
Expand Down Expand Up @@ -1322,12 +1316,6 @@ def test_state_reset_previously_room_comes_down_incremental_sync_with_filters(
self.persistence.persist_event(join_rule_event, join_rule_context)
)

# FIXME: We're manually busting the cache since
# https://github.com/element-hq/synapse/issues/17368 is not solved yet
self.store._membership_stream_cache.entity_has_changed(
user1_id, join_rule_event_pos.stream
)

# Ensure that the state reset worked and only user2 is in the room now
users_in_room = self.get_success(self.store.get_users_in_room(space_room_id))
self.assertIncludes(set(users_in_room), {user2_id}, exact=True)
Expand Down Expand Up @@ -1506,12 +1494,6 @@ def test_state_reset_never_room_incremental_sync_with_filters(
self.persistence.persist_event(join_rule_event, join_rule_context)
)

# FIXME: We're manually busting the cache since
# https://github.com/element-hq/synapse/issues/17368 is not solved yet
self.store._membership_stream_cache.entity_has_changed(
user1_id, join_rule_event_pos.stream
)

# Ensure that the state reset worked and only user2 is in the room now
users_in_room = self.get_success(self.store.get_users_in_room(space_room_id))
self.assertIncludes(set(users_in_room), {user2_id}, exact=True)
Expand Down
6 changes: 0 additions & 6 deletions tests/storage/test_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -1209,12 +1209,6 @@ def test_state_reset2(self) -> None:
self.persistence.persist_event(join_rule_event, join_rule_context)
)

# FIXME: We're manually busting the cache since
# https://github.com/element-hq/synapse/issues/17368 is not solved yet
self.store._membership_stream_cache.entity_has_changed(
user1_id, join_rule_event_pos.stream
)

after_reset_token = self.event_sources.get_current_token()

membership_changes = self.get_success(
Expand Down
25 changes: 25 additions & 0 deletions tests/util/test_stream_change_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,3 +255,28 @@ def test_max_pos(self) -> None:

# Unknown entities will return None
self.assertEqual(cache.get_max_pos_of_last_change("[email protected]"), None)

def test_all_entities_changed(self) -> None:
"""
`StreamChangeCache.all_entities_changed(...)` will mark all entites as changed.
"""
cache = StreamChangeCache("#test", 1, max_size=10)

cache.entity_has_changed("[email protected]", 2)
cache.entity_has_changed("[email protected]", 3)
cache.entity_has_changed("[email protected]", 4)

cache.all_entities_changed(5)

# Everything should be marked as changed before the stream position where the
# change occurred.
self.assertTrue(cache.has_entity_changed("[email protected]", 4))
self.assertTrue(cache.has_entity_changed("[email protected]", 4))
self.assertTrue(cache.has_entity_changed("[email protected]", 4))

# Nothing should be marked as changed at/after the stream position where the
# change occurred. In other words, nothing has changed since the stream position
# 5.
self.assertFalse(cache.has_entity_changed("[email protected]", 5))
self.assertFalse(cache.has_entity_changed("[email protected]", 5))
self.assertFalse(cache.has_entity_changed("[email protected]", 5))
Loading