Skip to content

Commit

Permalink
Handle old rows with null event_stream_ordering column
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston committed Aug 28, 2024
1 parent bb905cd commit 6f9932d
Showing 1 changed file with 99 additions and 33 deletions.
132 changes: 99 additions & 33 deletions synapse/storage/databases/main/events_bg_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -1828,38 +1828,84 @@ async def _sliding_sync_membership_snapshots_bg_update(
"""
Background update to populate the `sliding_sync_membership_snapshots` table.
"""
last_event_stream_ordering = progress.get(
"last_event_stream_ordering", -(1 << 31)
)
# We do this in two phases: a) the initial phase where we go through all
# room memberships, and then b) a second phase where we look at new
# memberships (this is to handle the case where we downgrade and then
# upgrade again).
#
# We have to do this as two phases (rather than just the second phase
# where we iterate on event_stream_ordering), as the
# `event_stream_ordering` column may have null values for old rows.
# Therefore we first do the set of historic rooms and *then* look at any
# new rows (which will have a non-null `event_stream_ordering`).
initial_phase = progress.get("initial_phase")
if initial_phase is None:
# If this is the first run, store the current max stream position.
# We know we will go through all memberships less than the current
# max in the initial phase.
progress = {
"initial_phase": True,
"last_event_stream_ordering": self.get_room_max_stream_ordering(),
}
await self.db_pool.updates._background_update_progress(
_BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE,
progress,
)
initial_phase = True

last_room_id = progress.get("last_room_id", "")
last_event_stream_ordering = progress["last_event_stream_ordering"]

def _find_memberships_to_update_txn(
txn: LoggingTransaction,
) -> List[Tuple[str, str, str, str, str, int, bool]]:
# Fetch the set of event IDs that we want to update
#
# It's important to sort by `event_stream_ordering` *ascending* (oldest to
# newest) so that if we see that this background update in progress and want
# to start the catch-up process, we can safely assume that it will
# eventually get to the rooms we want to catch-up on anyway (see
# `_resolve_stale_data_in_sliding_sync_tables()`).
txn.execute(
"""
SELECT
c.room_id,
c.user_id,
e.sender,
c.event_id,
c.membership,
c.event_stream_ordering,
e.outlier
FROM local_current_membership as c
INNER JOIN events AS e USING (event_id)
WHERE event_stream_ordering > ?
ORDER BY event_stream_ordering ASC
LIMIT ?
""",
(last_event_stream_ordering, batch_size),
)

if initial_phase:
txn.execute(
"""
SELECT
c.room_id,
c.user_id,
e.sender,
c.event_id,
c.membership,
e.stream_ordering,
e.outlier
FROM local_current_membership as c
INNER JOIN events AS e USING (event_id)
WHERE c.room_id > ?
ORDER BY c.room_id ASC
LIMIT ?
""",
(last_room_id, batch_size),
)
elif last_event_stream_ordering is not None:
# It's important to sort by `event_stream_ordering` *ascending* (oldest to
# newest) so that if we see that this background update in progress and want
# to start the catch-up process, we can safely assume that it will
# eventually get to the rooms we want to catch-up on anyway (see
# `_resolve_stale_data_in_sliding_sync_tables()`).
txn.execute(
"""
SELECT
c.room_id,
c.user_id,
e.sender,
c.event_id,
c.membership,
c.event_stream_ordering,
e.outlier
FROM local_current_membership as c
INNER JOIN events AS e USING (event_id)
WHERE event_stream_ordering > ?
ORDER BY event_stream_ordering ASC
LIMIT ?
""",
(last_event_stream_ordering, batch_size),
)
else:
raise Exception("last_event_stream_ordering should not be None")

memberships_to_update_rows = cast(
List[Tuple[str, str, str, str, str, int, bool]], txn.fetchall()
Expand All @@ -1873,10 +1919,22 @@ def _find_memberships_to_update_txn(
)

if not memberships_to_update_rows:
await self.db_pool.updates._end_background_update(
_BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE
)
return 0
if initial_phase:
# Move onto the next phase.
await self.db_pool.updates._background_update_progress(
_BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE,
{
"initial_phase": False,
"last_event_stream_ordering": last_event_stream_ordering,
},
)
return 0
else:
# We've finished both phases, we're done.
await self.db_pool.updates._end_background_update(
_BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE
)
return 0

def _find_previous_membership_txn(
txn: LoggingTransaction, room_id: str, user_id: str, stream_ordering: int
Expand Down Expand Up @@ -2144,17 +2202,24 @@ def _fill_table_txn(txn: LoggingTransaction) -> None:

# Update the progress
(
_room_id,
room_id,
_user_id,
_sender,
_membership_event_id,
_membership,
membership_event_stream_ordering,
_is_outlier,
) = memberships_to_update_rows[-1]

progress = {
"initial_phase": initial_phase,
"last_room_id": room_id,
"last_event_stream_ordering": membership_event_stream_ordering,
}

await self.db_pool.updates._background_update_progress(
_BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE,
{"last_event_stream_ordering": membership_event_stream_ordering},
progress,
)

return len(memberships_to_update_rows)
Expand Down Expand Up @@ -2383,6 +2448,7 @@ def _resolve_stale_data_in_sliding_sync_membership_snapshots_table(
#
progress_json: JsonDict = {}
if max_stream_ordering_sliding_sync_membership_snapshots_table is not None:
progress_json["initial_phase"] = False
progress_json["last_event_stream_ordering"] = (
max_stream_ordering_sliding_sync_membership_snapshots_table
)
Expand Down

0 comments on commit 6f9932d

Please sign in to comment.