Skip to content

Commit

Permalink
Only run the sliding sync background updates on the main database
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston committed Aug 28, 2024
1 parent 7c9c620 commit bb905cd
Show file tree
Hide file tree
Showing 3 changed files with 256 additions and 262 deletions.
253 changes: 253 additions & 0 deletions synapse/storage/databases/main/events_bg_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
from synapse.types import JsonDict, RoomStreamToken, StateMap, StrCollection
from synapse.types.handlers import SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES
from synapse.types.state import StateFilter
from synapse.util import json_encoder
from synapse.util.iterutils import batch_iter

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand Down Expand Up @@ -325,6 +327,15 @@ def __init__(
self._sliding_sync_membership_snapshots_bg_update,
)

# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the
# foreground update for
# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by
# https://github.com/element-hq/synapse/issues/TODO)
with db_conn.cursor(txn_name="resolve_sliding_sync") as txn:
_resolve_stale_data_in_sliding_sync_tables(
txn=txn,
)

async def _background_reindex_fields_sender(
self, progress: JsonDict, batch_size: int
) -> int:
Expand Down Expand Up @@ -2147,3 +2158,245 @@ def _fill_table_txn(txn: LoggingTransaction) -> None:
)

return len(memberships_to_update_rows)


def _resolve_stale_data_in_sliding_sync_tables(
txn: LoggingTransaction,
) -> None:
"""
Clears stale/out-of-date entries from the
`sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` tables.
This accounts for when someone downgrades their Synapse version and then upgrades it
again. This will ensure that we don't have any stale/out-of-date data in the
`sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` tables since any new
events sent in rooms would have also needed to be written to the sliding sync
tables. For example a new event needs to bump `event_stream_ordering` in
`sliding_sync_joined_rooms` table or some state in the room changing (like the room
name). Or another example of someone's membership changing in a room affecting
`sliding_sync_membership_snapshots`.
This way, if a row exists in the sliding sync tables, we are able to rely on it
(accurate data). And if a row doesn't exist, we use a fallback to get the same info
until the background updates fill in the rows or a new event comes in triggering it
to be fully inserted.
FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the
foreground update for
`sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by
https://github.com/element-hq/synapse/issues/TODO)
"""

_resolve_stale_data_in_sliding_sync_joined_rooms_table(txn)
_resolve_stale_data_in_sliding_sync_membership_snapshots_table(txn)


def _resolve_stale_data_in_sliding_sync_joined_rooms_table(
txn: LoggingTransaction,
) -> None:
"""
Clears stale/out-of-date entries from the `sliding_sync_joined_rooms` table and
kicks-off the background update to catch-up with what we missed while Synapse was
downgraded.
See `_resolve_stale_data_in_sliding_sync_tables()` description above for more
context.
"""

# Find the point when we stopped writing to the `sliding_sync_joined_rooms` table
txn.execute(
"""
SELECT event_stream_ordering
FROM sliding_sync_joined_rooms
ORDER BY event_stream_ordering DESC
LIMIT 1
""",
)

# If we have nothing written to the `sliding_sync_joined_rooms` table, there is
# nothing to clean up
row = cast(Optional[Tuple[int]], txn.fetchone())
max_stream_ordering_sliding_sync_joined_rooms_table = None
depends_on = None
if row is not None:
(max_stream_ordering_sliding_sync_joined_rooms_table,) = row

txn.execute(
"""
SELECT room_id
FROM events
WHERE stream_ordering > ?
GROUP BY room_id
ORDER BY MAX(stream_ordering) ASC
""",
(max_stream_ordering_sliding_sync_joined_rooms_table,),
)

room_rows = txn.fetchall()
# No new events have been written to the `events` table since the last time we wrote
# to the `sliding_sync_joined_rooms` table so there is nothing to clean up. This is
# the expected normal scenario for people who have not downgraded their Synapse
# version.
if not room_rows:
return

# 1000 is an arbitrary batch size with no testing
for chunk in batch_iter(room_rows, 1000):
# Handle updating the `sliding_sync_joined_rooms` table
#
# Clear out the stale data
DatabasePool.simple_delete_many_batch_txn(
txn,
table="sliding_sync_joined_rooms",
keys=("room_id",),
values=chunk,
)

# Update the `sliding_sync_joined_rooms_to_recalculate` table with the rooms
# that went stale and now need to be recalculated.
DatabasePool.simple_upsert_many_txn_native_upsert(
txn,
table="sliding_sync_joined_rooms_to_recalculate",
key_names=("room_id",),
key_values=chunk,
value_names=(),
# No value columns, therefore make a blank list so that the following
# zip() works correctly.
value_values=[() for x in range(len(chunk))],
)
else:
# Re-run the `sliding_sync_joined_rooms_to_recalculate` prefill if there is
# nothing in the `sliding_sync_joined_rooms` table
DatabasePool.simple_upsert_txn_native_upsert(
txn,
table="background_updates",
keyvalues={
"update_name": _BackgroundUpdates.SLIDING_SYNC_PREFILL_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE
},
values={},
# Only insert the row if it doesn't already exist. If it already exists,
# we're already working on it
insertion_values={
"progress_json": "{}",
},
)
depends_on = (
_BackgroundUpdates.SLIDING_SYNC_PREFILL_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE
)

# Now kick-off the background update to catch-up with what we missed while Synapse
# was downgraded.
#
# We may need to catch-up on everything if we have nothing written to the
# `sliding_sync_joined_rooms` table yet. This could happen if someone had zero rooms
# on their server (so the normal background update completes), downgrade Synapse
# versions, join and create some new rooms, and upgrade again.
DatabasePool.simple_upsert_txn_native_upsert(
txn,
table="background_updates",
keyvalues={
"update_name": _BackgroundUpdates.SLIDING_SYNC_JOINED_ROOMS_BG_UPDATE
},
values={},
# Only insert the row if it doesn't already exist. If it already exists, we will
# eventually fill in the rows we're trying to populate.
insertion_values={
# Empty progress is expected since it's not used for this background update.
"progress_json": "{}",
# Wait for the prefill to finish
"depends_on": depends_on,
},
)


def _resolve_stale_data_in_sliding_sync_membership_snapshots_table(
txn: LoggingTransaction,
) -> None:
"""
Clears stale/out-of-date entries from the `sliding_sync_membership_snapshots` table
and kicks-off the background update to catch-up with what we missed while Synapse
was downgraded.
See `_resolve_stale_data_in_sliding_sync_tables()` description above for more
context.
"""

# Find the point when we stopped writing to the `sliding_sync_membership_snapshots` table
txn.execute(
"""
SELECT event_stream_ordering
FROM sliding_sync_membership_snapshots
ORDER BY event_stream_ordering DESC
LIMIT 1
""",
)

# If we have nothing written to the `sliding_sync_membership_snapshots` table,
# there is nothing to clean up
row = cast(Optional[Tuple[int]], txn.fetchone())
max_stream_ordering_sliding_sync_membership_snapshots_table = None
if row is not None:
(max_stream_ordering_sliding_sync_membership_snapshots_table,) = row

# XXX: Since `forgotten` is simply a flag on the `room_memberships` table that is
# set out-of-band, there is no way to tell whether it was set while Synapse was
# downgraded. The only thing the user can do is `/forget` again if they run into
# this.
#
# This only picks up changes to memberships.
txn.execute(
"""
SELECT user_id, room_id
FROM local_current_membership
WHERE event_stream_ordering > ?
ORDER BY event_stream_ordering ASC
""",
(max_stream_ordering_sliding_sync_membership_snapshots_table,),
)

membership_rows = txn.fetchall()
# No new events have been written to the `events` table since the last time we wrote
# to the `sliding_sync_membership_snapshots` table so there is nothing to clean up.
# This is the expected normal scenario for people who have not downgraded their
# Synapse version.
if not membership_rows:
return

# 1000 is an arbitrary batch size with no testing
for chunk in batch_iter(membership_rows, 1000):
# Handle updating the `sliding_sync_membership_snapshots` table
#
DatabasePool.simple_delete_many_batch_txn(
txn,
table="sliding_sync_membership_snapshots",
keys=("user_id", "room_id"),
values=chunk,
)

# Now kick-off the background update to catch-up with what we missed while Synapse
# was downgraded.
#
# We may need to catch-up on everything if we have nothing written to the
# `sliding_sync_membership_snapshots` table yet. This could happen if someone had
# zero rooms on their server (so the normal background update completes), downgrade
# Synapse versions, join and create some new rooms, and upgrade again.
#
progress_json: JsonDict = {}
if max_stream_ordering_sliding_sync_membership_snapshots_table is not None:
progress_json["last_event_stream_ordering"] = (
max_stream_ordering_sliding_sync_membership_snapshots_table
)

DatabasePool.simple_upsert_txn_native_upsert(
txn,
table="background_updates",
keyvalues={
"update_name": _BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE
},
values={},
# Only insert the row if it doesn't already exist. If it already exists, we will
# eventually fill in the rows we're trying to populate.
insertion_values={
"progress_json": json_encoder.encode(progress_json),
},
)
Loading

0 comments on commit bb905cd

Please sign in to comment.