From 0489ba7eb49eec5a1ea04d4a3809d19eaa1f4e46 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 29 Aug 2024 13:14:55 -0500 Subject: [PATCH 01/12] Don't pre-calculate `stream_ordering` for `sliding_sync_joined_rooms` --- synapse/storage/databases/main/events.py | 129 +++++++++-------------- 1 file changed, 50 insertions(+), 79 deletions(-) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 60c92e58041..e524a0451cf 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -170,17 +170,6 @@ class SlidingSyncMembershipInfo: @attr.s(slots=True, auto_attribs=True) class SlidingSyncTableChanges: room_id: str - # `stream_ordering` of the most recent event being persisted in the room. This doesn't - # need to be perfect, we just need *some* answer that points to a real event in the - # room in case we are the first ones inserting into the `sliding_sync_joined_rooms` - # table because of the `NON NULL` constraint on `event_stream_ordering`. In reality, - # `_update_sliding_sync_tables_with_new_persisted_events_txn()` is run after - # `_update_current_state_txn()` whenever a new event is persisted to update it to the - # correct latest value. - # - # This should be *some* value that points to a real event in the room if we are - # still joined to the room and some state is changing (`to_insert` or `to_delete`). - joined_room_best_effort_most_recent_stream_ordering: Optional[int] # If the row doesn't exist in the `sliding_sync_joined_rooms` table, we need to # fully-insert it which means we also need to include a `bump_stamp` value to use # for the row. This should only be populated when we're trying to fully-insert a @@ -410,7 +399,6 @@ async def _calculate_sliding_sync_table_changes( if not to_insert and not to_delete: return SlidingSyncTableChanges( room_id=room_id, - joined_room_best_effort_most_recent_stream_ordering=None, joined_room_bump_stamp_to_fully_insert=None, joined_room_updates={}, membership_snapshot_shared_insert_values={}, @@ -568,7 +556,6 @@ async def _calculate_sliding_sync_table_changes( # `_update_sliding_sync_tables_with_new_persisted_events_txn()`) # joined_room_updates: SlidingSyncStateInsertValues = {} - best_effort_most_recent_stream_ordering: Optional[int] = None bump_stamp_to_fully_insert: Optional[int] = None if not delta_state.no_longer_in_room: current_state_ids_map = {} @@ -657,52 +644,9 @@ async def _calculate_sliding_sync_table_changes( elif state_key == (EventTypes.Name, ""): joined_room_updates["room_name"] = None - # Figure out `best_effort_most_recent_stream_ordering`. This doesn't need to - # be perfect, we just need *some* answer that points to a real event in the - # room in case we are the first ones inserting into the - # `sliding_sync_joined_rooms` table because of the `NON NULL` constraint on - # `event_stream_ordering`. In reality, - # `_update_sliding_sync_tables_with_new_persisted_events_txn()` is run after - # `_update_current_state_txn()` whenever a new event is persisted to update - # it to the correct latest value. - # - if len(events_and_contexts) > 0: - # Since the list is sorted ascending by `stream_ordering`, the last event - # should have the highest `stream_ordering`. - best_effort_most_recent_stream_ordering = events_and_contexts[-1][ - 0 - ].internal_metadata.stream_ordering - else: - # If there are no `events_and_contexts`, we assume it's one of two scenarios: - # 1. If there are new state `to_insert` but no `events_and_contexts`, - # then it's a state reset. - # 2. Otherwise, it's some partial-state room re-syncing the current state and - # going through un-partial process. - # - # Either way, we assume no new events are being persisted and we can - # find the latest already in the database. Since this is a best-effort - # value, we don't need to be perfect although I think we're pretty close - # here. - most_recent_event_pos_results = ( - await self.store.get_last_event_pos_in_room( - room_id, event_types=None - ) - ) - assert most_recent_event_pos_results, ( - f"We should not be seeing `None` here because we are still in the room ({room_id}) and " - + "it should at-least have a join membership event that's keeping us here." - ) - best_effort_most_recent_stream_ordering = most_recent_event_pos_results[ - 1 - ].stream - - # We should have found a value if we are still in the room - assert best_effort_most_recent_stream_ordering is not None - return SlidingSyncTableChanges( room_id=room_id, # For `sliding_sync_joined_rooms` - joined_room_best_effort_most_recent_stream_ordering=best_effort_most_recent_stream_ordering, joined_room_bump_stamp_to_fully_insert=bump_stamp_to_fully_insert, joined_room_updates=joined_room_updates, # For `sliding_sync_membership_snapshots` @@ -1773,31 +1717,52 @@ def _update_current_state_txn( # # We only need to update when one of the relevant state values has changed if sliding_sync_table_changes.joined_room_updates: - # This should be *some* value that points to a real event in the room if - # we are still joined to the room. - assert ( - sliding_sync_table_changes.joined_room_best_effort_most_recent_stream_ordering - is not None + sliding_sync_updates_keys = ( + sliding_sync_table_changes.joined_room_updates.keys() + ) + sliding_sync_updates_values = ( + sliding_sync_table_changes.joined_room_updates.values() ) - self.db_pool.simple_upsert_txn( - txn, - table="sliding_sync_joined_rooms", - keyvalues={"room_id": room_id}, - values=sliding_sync_table_changes.joined_room_updates, - insertion_values={ - # The reason we're only *inserting* (not *updating*) - # `event_stream_ordering` here is because the column has a `NON - # NULL` constraint and we need *some* answer. And if the row - # already exists, it already has the correct value and it's - # better to just rely on - # `_update_sliding_sync_tables_with_new_persisted_events_txn()` - # to do the right thing (same for `bump_stamp`). - "event_stream_ordering": sliding_sync_table_changes.joined_room_best_effort_most_recent_stream_ordering, - # If we're trying to fully-insert a row, we need to provide a - # value for `bump_stamp` if it exists for the room. - "bump_stamp": sliding_sync_table_changes.joined_room_bump_stamp_to_fully_insert, - }, + args: List[Any] = [ + room_id, + room_id, + sliding_sync_table_changes.joined_room_bump_stamp_to_fully_insert, + ] + args.extend(iter(sliding_sync_updates_values)) + + # We need some value for `stream_ordering` for an event that exists to + # satisfy the `FOREIGN KEY` constraint to the `events` table. We don't + # pre-calculate this value because when we + # `_calculate_sliding_sync_table_changes()`, we could be working with + # events that were previously persisted as `outlier` with one + # `stream_ordering` but are now persisted again and de-outliered with a + # different `stream_ordering`. Since we end up keeping the old + # `stream_ordering` value when the event was first persisted (because + # `update_outliers_txn()` is run before we actually + # `_store_event_txn()`), the new `stream_ordering` value is unused. + # + # We don't update `event_stream_ordering` `ON CONFLICT` because it's + # simpler and we can just rely on + # `_update_sliding_sync_tables_with_new_persisted_events_txn()` to do + # the right thing (same for `bump_stamp`). The only reason we're + # inserting `event_stream_ordering` here is because the column has a + # `NON NULL` constraint and we need some answer. + txn.execute( + f""" + INSERT INTO sliding_sync_joined_rooms + (room_id, event_stream_ordering, bump_stamp, {", ".join(sliding_sync_updates_keys)}) + VALUES ( + ?, + (SELECT stream_ordering FROM events WHERE room_id = ? ORDER BY stream_ordering DESC LIMIT 1), + ?, + {", ".join("?" for _ in sliding_sync_updates_values)} + ) + ON CONFLICT (room_id) + DO UPDATE SET + {", ".join(f"{key} = EXCLUDED.{key}" for key in sliding_sync_updates_keys)} + """, + args, ) # We now update `local_current_membership`. We do this regardless @@ -2166,6 +2131,12 @@ def _update_sliding_sync_tables_with_new_persisted_events_txn( max_stream_ordering is not None ), "Expected to have a stream_ordering if we have events" + logger.info( + "asdf _update_sliding_sync_tables_with_new_persisted_events_txn %s %s %s", + room_id, + max_stream_ordering, + max_bump_stamp, + ) # Handle updating the `sliding_sync_joined_rooms` table. # txn.execute( From a17ae2cf572e72c67137a2a138a323082746c201 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 29 Aug 2024 13:44:43 -0500 Subject: [PATCH 02/12] Don't pre-calculate `stream_ordering` for `sliding_sync_membership_snapshots` --- synapse/storage/databases/main/events.py | 98 ++++++++++++++---------- 1 file changed, 56 insertions(+), 42 deletions(-) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index e524a0451cf..bff5d09ebbb 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1731,16 +1731,16 @@ def _update_current_state_txn( ] args.extend(iter(sliding_sync_updates_values)) - # We need some value for `stream_ordering` for an event that exists to - # satisfy the `FOREIGN KEY` constraint to the `events` table. We don't - # pre-calculate this value because when we - # `_calculate_sliding_sync_table_changes()`, we could be working with - # events that were previously persisted as `outlier` with one - # `stream_ordering` but are now persisted again and de-outliered with a - # different `stream_ordering`. Since we end up keeping the old - # `stream_ordering` value when the event was first persisted (because - # `update_outliers_txn()` is run before we actually - # `_store_event_txn()`), the new `stream_ordering` value is unused. + # We use a sub-query for `stream_ordering` because it's unreliable to + # pre-calculate from `events_and_contexts` at the time when + # `_calculate_sliding_sync_table_changes()` is ran. We could be working + # with events that were previously persisted as an `outlier` with one + # `stream_ordering` but are now being persisted again and de-outliered + # and assigned a different `stream_ordering`. Since we call + # `_calculate_sliding_sync_table_changes()` before + # `_update_outliers_txn()` which fixes this discrepancy, we're working + # with an unreliable `stream_ordering` value that will possibly be + # unused and not make it into the `events` table. # # We don't update `event_stream_ordering` `ON CONFLICT` because it's # simpler and we can just rely on @@ -1819,38 +1819,58 @@ def _update_current_state_txn( if sliding_sync_table_changes.to_insert_membership_snapshots: # Update the `sliding_sync_membership_snapshots` table # - # We need to insert/update regardless of whether we have `sliding_sync_snapshot_keys` - # because there are other fields in the `ON CONFLICT` upsert to run (see - # inherit case above for more context when this happens). - self.db_pool.simple_upsert_many_txn( - txn=txn, - table="sliding_sync_membership_snapshots", - key_names=("room_id", "user_id"), - key_values=[ - (room_id, membership_info.user_id) - for membership_info in sliding_sync_table_changes.to_insert_membership_snapshots - ], - value_names=[ - "sender", - "membership_event_id", - "membership", - "event_stream_ordering", - "event_instance_name", - ] - + list( - sliding_sync_table_changes.membership_snapshot_shared_insert_values.keys() - ), - value_values=[ + sliding_sync_snapshot_keys = ( + sliding_sync_table_changes.membership_snapshot_shared_insert_values.keys() + ) + sliding_sync_snapshot_values = ( + sliding_sync_table_changes.membership_snapshot_shared_insert_values.values() + ) + # We need to insert/update regardless of whether we have + # `sliding_sync_snapshot_keys` because there are other fields in the `ON + # CONFLICT` upsert to run (see inherit case (explained in + # `_calculate_sliding_sync_table_changes()`) for more context when this + # happens). + # + # We use a sub-query for `stream_ordering` because it's unreliable to + # pre-calculate from `events_and_contexts` at the time when + # `_calculate_sliding_sync_table_changes()` is ran. We could be working + # with events that were previously persisted as an `outlier` with one + # `stream_ordering` but are now being persisted again and de-outliered + # and assigned a different `stream_ordering`. Since we call + # `_calculate_sliding_sync_table_changes()` before + # `_update_outliers_txn()` which fixes this discrepancy, we're working + # with an unreliable `stream_ordering` value that will possibly be + # unused and not make it into the `events` table. + txn.execute_batch( + f""" + INSERT INTO sliding_sync_membership_snapshots + (room_id, user_id, sender, membership_event_id, membership, event_stream_ordering, event_instance_name + {("," + ", ".join(sliding_sync_snapshot_keys)) if sliding_sync_snapshot_keys else ""}) + VALUES ( + ?, ?, ?, ?, ?, + (SELECT stream_ordering FROM events WHERE event_id = ?), + ? + {("," + ", ".join("?" for _ in sliding_sync_snapshot_values)) if sliding_sync_snapshot_values else ""} + ) + ON CONFLICT (room_id, user_id) + DO UPDATE SET + sender = EXCLUDED.sender, + membership_event_id = EXCLUDED.membership_event_id, + membership = EXCLUDED.membership, + event_stream_ordering = EXCLUDED.event_stream_ordering + {("," + ", ".join(f"{key} = EXCLUDED.{key}" for key in sliding_sync_snapshot_keys)) if sliding_sync_snapshot_keys else ""} + """, + [ [ + room_id, + membership_info.user_id, membership_info.sender, membership_info.membership_event_id, membership_info.membership, - membership_info.membership_event_stream_ordering, + membership_info.membership_event_id, membership_info.membership_event_instance_name, ] - + list( - sliding_sync_table_changes.membership_snapshot_shared_insert_values.values() - ) + + list(sliding_sync_snapshot_values) for membership_info in sliding_sync_table_changes.to_insert_membership_snapshots ], ) @@ -2131,12 +2151,6 @@ def _update_sliding_sync_tables_with_new_persisted_events_txn( max_stream_ordering is not None ), "Expected to have a stream_ordering if we have events" - logger.info( - "asdf _update_sliding_sync_tables_with_new_persisted_events_txn %s %s %s", - room_id, - max_stream_ordering, - max_bump_stamp, - ) # Handle updating the `sliding_sync_joined_rooms` table. # txn.execute( From 8180229cb3cfb3206eca68fd5f3ec19439971ae9 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 29 Aug 2024 13:56:24 -0500 Subject: [PATCH 03/12] Also do not trust instance_name --- synapse/storage/databases/main/events.py | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index bff5d09ebbb..ec3a3eda2a9 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -457,7 +457,18 @@ async def _calculate_sliding_sync_table_changes( membership_event_id, user_id, ) in membership_event_id_to_user_id_map.items(): - # We should only be seeing events with `stream_ordering`/`instance_name` assigned by this point + # We should only be seeing events with `stream_ordering`/`instance_name` + # assigned by this point. + # + # XXX: Because we're sourcing the event from `events_and_contexts`, we + # can't rely on `stream_ordering`/`instance_name` being correct. We + # could be working with events that were previously persisted as an + # `outlier` with one `stream_ordering` but are now being persisted again + # and de-outliered and assigned a different `stream_ordering`. Since we + # call `_calculate_sliding_sync_table_changes()` before + # `_update_outliers_txn()` which fixes this discrepancy, we're working + # with an unreliable `stream_ordering` value that will possibly be + # unused and not make it into the `events` table. membership_event_stream_ordering = membership_event_map[ membership_event_id ].internal_metadata.stream_ordering @@ -1831,7 +1842,7 @@ def _update_current_state_txn( # `_calculate_sliding_sync_table_changes()`) for more context when this # happens). # - # We use a sub-query for `stream_ordering` because it's unreliable to + # XXX: We use a sub-query for `stream_ordering` because it's unreliable to # pre-calculate from `events_and_contexts` at the time when # `_calculate_sliding_sync_table_changes()` is ran. We could be working # with events that were previously persisted as an `outlier` with one @@ -1849,7 +1860,7 @@ def _update_current_state_txn( VALUES ( ?, ?, ?, ?, ?, (SELECT stream_ordering FROM events WHERE event_id = ?), - ? + (SELECT instance_name FROM events WHERE event_id = ?) {("," + ", ".join("?" for _ in sliding_sync_snapshot_values)) if sliding_sync_snapshot_values else ""} ) ON CONFLICT (room_id, user_id) @@ -1867,8 +1878,12 @@ def _update_current_state_txn( membership_info.sender, membership_info.membership_event_id, membership_info.membership, + # XXX: We do not use `membership_info.membership_event_stream_ordering` here + # because it is an unreliable value. See XXX note above. + membership_info.membership_event_id, + # XXX: We do not use `membership_info.membership_event_instance_name` here + # because it is an unreliable value. See XXX note above. membership_info.membership_event_id, - membership_info.membership_event_instance_name, ] + list(sliding_sync_snapshot_values) for membership_info in sliding_sync_table_changes.to_insert_membership_snapshots From 420efb4f6570b9f204a9337d5fb0025b5b98c1b9 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 29 Aug 2024 13:57:43 -0500 Subject: [PATCH 04/12] Try clarify which stream_ordering is used --- synapse/storage/databases/main/events.py | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index ec3a3eda2a9..c6d2dd34e03 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -466,7 +466,8 @@ async def _calculate_sliding_sync_table_changes( # `outlier` with one `stream_ordering` but are now being persisted again # and de-outliered and assigned a different `stream_ordering`. Since we # call `_calculate_sliding_sync_table_changes()` before - # `_update_outliers_txn()` which fixes this discrepancy, we're working + # `_update_outliers_txn()` which fixes this discrepancy (always use the + # `stream_ordering` from the first time it was persisted), we're working # with an unreliable `stream_ordering` value that will possibly be # unused and not make it into the `events` table. membership_event_stream_ordering = membership_event_map[ @@ -1749,7 +1750,8 @@ def _update_current_state_txn( # `stream_ordering` but are now being persisted again and de-outliered # and assigned a different `stream_ordering`. Since we call # `_calculate_sliding_sync_table_changes()` before - # `_update_outliers_txn()` which fixes this discrepancy, we're working + # `_update_outliers_txn()` which fixes this discrepancy (always use the + # `stream_ordering` from the first time it was persisted), we're working # with an unreliable `stream_ordering` value that will possibly be # unused and not make it into the `events` table. # @@ -1844,14 +1846,15 @@ def _update_current_state_txn( # # XXX: We use a sub-query for `stream_ordering` because it's unreliable to # pre-calculate from `events_and_contexts` at the time when - # `_calculate_sliding_sync_table_changes()` is ran. We could be working - # with events that were previously persisted as an `outlier` with one - # `stream_ordering` but are now being persisted again and de-outliered - # and assigned a different `stream_ordering`. Since we call - # `_calculate_sliding_sync_table_changes()` before - # `_update_outliers_txn()` which fixes this discrepancy, we're working - # with an unreliable `stream_ordering` value that will possibly be - # unused and not make it into the `events` table. + # `_calculate_sliding_sync_table_changes()` is ran. We could be working with + # events that were previously persisted as an `outlier` with one + # `stream_ordering` but are now being persisted again and de-outliered and + # assigned a different `stream_ordering`. Since we call + # `_calculate_sliding_sync_table_changes()` before `_update_outliers_txn()` + # which fixes this discrepancy (always use the `stream_ordering` from the + # first time it was persisted), we're working with an unreliable + # `stream_ordering` value that will possibly be unused and not make it into + # the `events` table. txn.execute_batch( f""" INSERT INTO sliding_sync_membership_snapshots From df4e1877a4e5d69bdf8b93aaf8ff07029042d551 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 29 Aug 2024 14:06:03 -0500 Subject: [PATCH 05/12] Separate data structures so someone doesn't accidentally use bad data --- synapse/storage/databases/main/events.py | 47 +++++++++---------- .../databases/main/events_bg_updates.py | 6 +-- 2 files changed, 26 insertions(+), 27 deletions(-) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index c6d2dd34e03..2be219c7abc 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -163,6 +163,15 @@ class SlidingSyncMembershipInfo: sender: str membership_event_id: str membership: str + + +@attr.s(slots=True, auto_attribs=True) +class SlidingSyncMembershipInfoWithEventPos(SlidingSyncMembershipInfo): + """ + SlidingSyncMembershipInfo + `stream_ordering`/`instance_name` of the membership + event + """ + membership_event_stream_ordering: int membership_event_instance_name: str @@ -390,6 +399,9 @@ async def _calculate_sliding_sync_table_changes( `stream_ordering`). delta_state: Deltas that are going to be used to update the `current_state_events` table. Changes to the current state of the room. + + Returns: + SlidingSyncTableChanges """ to_insert = delta_state.to_insert to_delete = delta_state.to_delete @@ -457,36 +469,23 @@ async def _calculate_sliding_sync_table_changes( membership_event_id, user_id, ) in membership_event_id_to_user_id_map.items(): - # We should only be seeing events with `stream_ordering`/`instance_name` - # assigned by this point. - # - # XXX: Because we're sourcing the event from `events_and_contexts`, we - # can't rely on `stream_ordering`/`instance_name` being correct. We - # could be working with events that were previously persisted as an - # `outlier` with one `stream_ordering` but are now being persisted again - # and de-outliered and assigned a different `stream_ordering`. Since we - # call `_calculate_sliding_sync_table_changes()` before - # `_update_outliers_txn()` which fixes this discrepancy (always use the - # `stream_ordering` from the first time it was persisted), we're working - # with an unreliable `stream_ordering` value that will possibly be - # unused and not make it into the `events` table. - membership_event_stream_ordering = membership_event_map[ - membership_event_id - ].internal_metadata.stream_ordering - assert membership_event_stream_ordering is not None - membership_event_instance_name = membership_event_map[ - membership_event_id - ].internal_metadata.instance_name - assert membership_event_instance_name is not None - membership_infos_to_insert_membership_snapshots.append( + # XXX: We don't use `SlidingSyncMembershipInfoWithEventPos` here + # because we're sourcing the event from `events_and_contexts`, we + # can't rely on `stream_ordering`/`instance_name` being correct. We + # could be working with events that were previously persisted as an + # `outlier` with one `stream_ordering` but are now being persisted + # again and de-outliered and assigned a different `stream_ordering`. + # Since we call `_calculate_sliding_sync_table_changes()` before + # `_update_outliers_txn()` which fixes this discrepancy (always use + # the `stream_ordering` from the first time it was persisted), we're + # working with an unreliable `stream_ordering` value that will + # possibly be unused and not make it into the `events` table. SlidingSyncMembershipInfo( user_id=user_id, sender=membership_event_map[membership_event_id].sender, membership_event_id=membership_event_id, membership=membership_event_map[membership_event_id].membership, - membership_event_stream_ordering=membership_event_stream_ordering, - membership_event_instance_name=membership_event_instance_name, ) ) diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index 3160e12bb30..8e2ee0bf824 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -37,7 +37,7 @@ from synapse.storage.databases.main.events import ( SLIDING_SYNC_RELEVANT_STATE_SET, PersistEventsStore, - SlidingSyncMembershipInfo, + SlidingSyncMembershipInfoWithEventPos, SlidingSyncMembershipSnapshotSharedInsertValues, SlidingSyncStateInsertValues, ) @@ -1993,7 +1993,7 @@ def _find_previous_membership_txn( to_insert_membership_snapshots: Dict[ Tuple[str, str], SlidingSyncMembershipSnapshotSharedInsertValues ] = {} - to_insert_membership_infos: Dict[Tuple[str, str], SlidingSyncMembershipInfo] = ( + to_insert_membership_infos: Dict[Tuple[str, str], SlidingSyncMembershipInfoWithEventPos] = ( {} ) for ( @@ -2184,7 +2184,7 @@ def _find_previous_membership_txn( to_insert_membership_snapshots[(room_id, user_id)] = ( sliding_sync_membership_snapshots_insert_map ) - to_insert_membership_infos[(room_id, user_id)] = SlidingSyncMembershipInfo( + to_insert_membership_infos[(room_id, user_id)] = SlidingSyncMembershipInfoWithEventPos( user_id=user_id, sender=sender, membership_event_id=membership_event_id, From 40fb3ccaf80aff75792eba5adfca2368db94000e Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 29 Aug 2024 14:14:16 -0500 Subject: [PATCH 06/12] Fix lints --- .../databases/main/events_bg_updates.py | 26 ++++++++++--------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index 8e2ee0bf824..573dce3d154 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -1993,9 +1993,9 @@ def _find_previous_membership_txn( to_insert_membership_snapshots: Dict[ Tuple[str, str], SlidingSyncMembershipSnapshotSharedInsertValues ] = {} - to_insert_membership_infos: Dict[Tuple[str, str], SlidingSyncMembershipInfoWithEventPos] = ( - {} - ) + to_insert_membership_infos: Dict[ + Tuple[str, str], SlidingSyncMembershipInfoWithEventPos + ] = {} for ( room_id, room_id_from_rooms_table, @@ -2184,15 +2184,17 @@ def _find_previous_membership_txn( to_insert_membership_snapshots[(room_id, user_id)] = ( sliding_sync_membership_snapshots_insert_map ) - to_insert_membership_infos[(room_id, user_id)] = SlidingSyncMembershipInfoWithEventPos( - user_id=user_id, - sender=sender, - membership_event_id=membership_event_id, - membership=membership, - membership_event_stream_ordering=membership_event_stream_ordering, - # If instance_name is null we default to "master" - membership_event_instance_name=membership_event_instance_name - or "master", + to_insert_membership_infos[(room_id, user_id)] = ( + SlidingSyncMembershipInfoWithEventPos( + user_id=user_id, + sender=sender, + membership_event_id=membership_event_id, + membership=membership, + membership_event_stream_ordering=membership_event_stream_ordering, + # If instance_name is null we default to "master" + membership_event_instance_name=membership_event_instance_name + or "master", + ) ) def _fill_table_txn(txn: LoggingTransaction) -> None: From 7ee85c520e1d4f736cd52892a5dcf061f733be3a Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 29 Aug 2024 14:19:55 -0500 Subject: [PATCH 07/12] Add changelog --- changelog.d/17635.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/17635.misc diff --git a/changelog.d/17635.misc b/changelog.d/17635.misc new file mode 100644 index 00000000000..756918e2b21 --- /dev/null +++ b/changelog.d/17635.misc @@ -0,0 +1 @@ +Pre-populate room data used in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint for quick filtering/sorting. From 4f341f140df4b0abf9c2914b5ac44bc9eb67e713 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 29 Aug 2024 14:20:50 -0500 Subject: [PATCH 08/12] Update comments --- synapse/storage/databases/main/events.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 2be219c7abc..6903c5f8ff5 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -475,8 +475,9 @@ async def _calculate_sliding_sync_table_changes( # can't rely on `stream_ordering`/`instance_name` being correct. We # could be working with events that were previously persisted as an # `outlier` with one `stream_ordering` but are now being persisted - # again and de-outliered and assigned a different `stream_ordering`. - # Since we call `_calculate_sliding_sync_table_changes()` before + # again and de-outliered and assigned a different `stream_ordering` + # that won't end up being used. Since we call + # `_calculate_sliding_sync_table_changes()` before # `_update_outliers_txn()` which fixes this discrepancy (always use # the `stream_ordering` from the first time it was persisted), we're # working with an unreliable `stream_ordering` value that will @@ -1848,12 +1849,12 @@ def _update_current_state_txn( # `_calculate_sliding_sync_table_changes()` is ran. We could be working with # events that were previously persisted as an `outlier` with one # `stream_ordering` but are now being persisted again and de-outliered and - # assigned a different `stream_ordering`. Since we call - # `_calculate_sliding_sync_table_changes()` before `_update_outliers_txn()` - # which fixes this discrepancy (always use the `stream_ordering` from the - # first time it was persisted), we're working with an unreliable - # `stream_ordering` value that will possibly be unused and not make it into - # the `events` table. + # assigned a different `stream_ordering` that won't end up being used. Since + # we call `_calculate_sliding_sync_table_changes()` before + # `_update_outliers_txn()` which fixes this discrepancy (always use the + # `stream_ordering` from the first time it was persisted), we're working + # with an unreliable `stream_ordering` value that will possibly be unused + # and not make it into the `events` table. txn.execute_batch( f""" INSERT INTO sliding_sync_membership_snapshots From b7a8dcc22431d5a24423f68c8a02a521d0ad9adf Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 29 Aug 2024 14:28:51 -0500 Subject: [PATCH 09/12] Add xxx to match other spots --- synapse/storage/databases/main/events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 6903c5f8ff5..c118d9f7a40 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1743,7 +1743,7 @@ def _update_current_state_txn( ] args.extend(iter(sliding_sync_updates_values)) - # We use a sub-query for `stream_ordering` because it's unreliable to + # XXX: We use a sub-query for `stream_ordering` because it's unreliable to # pre-calculate from `events_and_contexts` at the time when # `_calculate_sliding_sync_table_changes()` is ran. We could be working # with events that were previously persisted as an `outlier` with one From e61ebfbd49a4c8f00fb4fd5e5bd6b344809e577a Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 29 Aug 2024 17:26:27 -0500 Subject: [PATCH 10/12] Add test --- synapse/api/constants.py | 2 + tests/storage/test_sliding_sync_tables.py | 120 ++++++++++++++++++++++ 2 files changed, 122 insertions(+) diff --git a/synapse/api/constants.py b/synapse/api/constants.py index 8e3b404aed3..8db302b3d8b 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -230,6 +230,8 @@ class EventContentFields: ROOM_NAME: Final = "name" + MEMBERSHIP: Final = "membership" + # Used in m.room.guest_access events. GUEST_ACCESS: Final = "guest_access" diff --git a/tests/storage/test_sliding_sync_tables.py b/tests/storage/test_sliding_sync_tables.py index d0bbc1c8038..233caf81d0c 100644 --- a/tests/storage/test_sliding_sync_tables.py +++ b/tests/storage/test_sliding_sync_tables.py @@ -38,6 +38,7 @@ _resolve_stale_data_in_sliding_sync_joined_rooms_table, _resolve_stale_data_in_sliding_sync_membership_snapshots_table, ) +from synapse.types import create_requester from synapse.util import Clock from tests.test_utils.event_injection import create_event @@ -925,6 +926,125 @@ def test_joined_room_is_bumped(self) -> None: user2_snapshot, ) + @parameterized.expand( + [ + ("insert", True), + ("upsert", False), + ] + ) + def test_joined_room_outlier_and_deoutlier( + self, description: str, should_insert: bool + ) -> None: + """ + This is a regression test. + + This is to simulate the case where an event is first persisted as an outlier + (like a remote invite) and then later persisted again to de-outlier it. The + first the time, the `outlier` is persisted with one `stream_ordering` but when + persisted again and de-outliered, it is assigned a different `stream_ordering` + that won't end up being used. Since we call + `_calculate_sliding_sync_table_changes()` before `_update_outliers_txn()` which + fixes this discrepancy (always use the `stream_ordering` from the first time it + was persisted), make sure we're not using an unreliable `stream_ordering` values + that will cause `FOREIGN KEY constraint failed` in the + `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` tables. + """ + user1_id = self.register_user("user1", "pass") + _user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + room_version = RoomVersions.V10 + room_id = self.helper.create_room_as( + user2_id, tok=user2_tok, room_version=room_version.identifier + ) + + if should_insert: + # Clear these out so we always insert + self.get_success( + self.store.db_pool.simple_delete( + table="sliding_sync_joined_rooms", + keyvalues={"room_id": room_id}, + desc="TODO", + ) + ) + self.get_success( + self.store.db_pool.simple_delete( + table="sliding_sync_membership_snapshots", + keyvalues={"room_id": room_id}, + desc="TODO", + ) + ) + + # Create a membership event (which triggers an insert into + # `sliding_sync_membership_snapshots`) + membership_event_dict = { + "type": EventTypes.Member, + "state_key": user1_id, + "sender": user1_id, + "room_id": room_id, + "content": {EventContentFields.MEMBERSHIP: Membership.JOIN}, + } + # Create a relevant state event (which triggers an insert into + # `sliding_sync_joined_rooms`) + state_event_dict = { + "type": EventTypes.Name, + "state_key": "", + "sender": user2_id, + "room_id": room_id, + "content": {EventContentFields.ROOM_NAME: "my super room"}, + } + event_dicts_to_persist = [ + membership_event_dict, + state_event_dict, + ] + + for event_dict in event_dicts_to_persist: + events_to_persist = [] + + # Create the events as an outliers + ( + event, + unpersisted_context, + ) = self.get_success( + self.hs.get_event_creation_handler().create_event( + requester=create_requester(user1_id), + event_dict=event_dict, + outlier=True, + ) + ) + # FIXME: Should we use an `EventContext.for_outlier(...)` here? + # Doesn't seem to matter for this test. + context = self.get_success(unpersisted_context.persist(event)) + events_to_persist.append((event, context)) + + # Create the event again but as an non-outlier. This will de-outlier the event + # when we persist it. + ( + event, + unpersisted_context, + ) = self.get_success( + self.hs.get_event_creation_handler().create_event( + requester=create_requester(user1_id), + event_dict=event_dict, + outlier=False, + ) + ) + context = self.get_success(unpersisted_context.persist(event)) + events_to_persist.append((event, context)) + + persist_controller = self.hs.get_storage_controllers().persistence + assert persist_controller is not None + for event, context in events_to_persist: + self.get_success( + persist_controller.persist_event( + event, + context, + ) + ) + + # We're just testing that it does not explode + def test_joined_room_meta_state_reset(self) -> None: """ Test that a state reset on the room name is reflected in the From 93e60f4c4e261e74916cb6cbd010f934b45f3189 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 29 Aug 2024 17:26:43 -0500 Subject: [PATCH 11/12] Fix logic bug in finding missing events (separate) --- synapse/storage/databases/main/events.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index c118d9f7a40..f3dbe5bba7c 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -631,9 +631,7 @@ async def _calculate_sliding_sync_table_changes( # Otherwise, we need to find a couple events that we were reset to. if missing_event_ids: - remaining_events = await self.store.get_events( - current_state_ids_map.values() - ) + remaining_events = await self.store.get_events(missing_event_ids) # There shouldn't be any missing events assert ( remaining_events.keys() == missing_event_ids From 5b6dc37954a4198adcd54ae45154c412ef43bd05 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 29 Aug 2024 17:44:23 -0500 Subject: [PATCH 12/12] Add comment why parameterize --- tests/storage/test_sliding_sync_tables.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/storage/test_sliding_sync_tables.py b/tests/storage/test_sliding_sync_tables.py index 233caf81d0c..621f46fff82 100644 --- a/tests/storage/test_sliding_sync_tables.py +++ b/tests/storage/test_sliding_sync_tables.py @@ -927,6 +927,9 @@ def test_joined_room_is_bumped(self) -> None: ) @parameterized.expand( + # Test both an insert an upsert into the + # `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` to exercise + # more possibilities of things going wrong. [ ("insert", True), ("upsert", False),