-
Notifications
You must be signed in to change notification settings - Fork 239
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Sliding sync: various fixups to the background update #17652
Changes from 16 commits
b5ad7da
269dc55
01860e1
f71dd25
8140ca3
4f3333b
4369e94
330e614
037cb10
20542f0
0a7f41c
47bfb1b
2a48840
5f04c2f
b778219
1679ba0
1963f18
ec6b2d5
46804dd
7e84e7c
06173a3
e0e7a8b
cf4e6cb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Pre-populate room data used in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint for quick filtering/sorting. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -44,6 +44,7 @@ | |
from synapse.storage.databases.main.events_worker import DatabaseCorruptionError | ||
from synapse.storage.databases.main.state_deltas import StateDeltasStore | ||
from synapse.storage.databases.main.stream import StreamWorkerStore | ||
from synapse.storage.engines import PostgresEngine | ||
from synapse.storage.types import Cursor | ||
from synapse.types import JsonDict, RoomStreamToken, StateMap, StrCollection | ||
from synapse.types.handlers import SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES | ||
|
@@ -1865,9 +1866,29 @@ async def _sliding_sync_membership_snapshots_bg_update( | |
def _find_memberships_to_update_txn( | ||
txn: LoggingTransaction, | ||
) -> List[ | ||
Tuple[str, Optional[str], str, str, str, str, int, Optional[str], bool] | ||
Tuple[ | ||
str, | ||
Optional[str], | ||
Optional[str], | ||
str, | ||
str, | ||
str, | ||
str, | ||
int, | ||
Optional[str], | ||
bool, | ||
] | ||
]: | ||
# Fetch the set of event IDs that we want to update | ||
# | ||
# We skip over rows which we've already handled, i.e. have a | ||
# matching row in `sliding_sync_membership_snapshots` with the same | ||
# room, user and event ID. | ||
# | ||
# We also ignore rooms that the user has left themselves (i.e. not | ||
# kicked). This is to avoid having to port lots of old rooms that we | ||
# will never send down sliding sync (as we exclude such rooms from | ||
# initial syncs). | ||
|
||
if initial_phase: | ||
# There are some old out-of-band memberships (before | ||
|
@@ -1880,6 +1901,7 @@ def _find_memberships_to_update_txn( | |
SELECT | ||
c.room_id, | ||
r.room_id, | ||
r.room_version, | ||
c.user_id, | ||
e.sender, | ||
c.event_id, | ||
|
@@ -1888,13 +1910,16 @@ def _find_memberships_to_update_txn( | |
e.instance_name, | ||
e.outlier | ||
FROM local_current_membership AS c | ||
LEFT JOIN sliding_sync_membership_snapshots AS m USING (room_id, user_id) | ||
INNER JOIN events AS e USING (event_id) | ||
LEFT JOIN rooms AS r ON (c.room_id = r.room_id) | ||
WHERE (c.room_id, c.user_id) > (?, ?) | ||
AND (m.user_id IS NULL OR c.event_id != m.membership_event_id) | ||
AND (c.membership != ? OR c.user_id != e.sender) | ||
ORDER BY c.room_id ASC, c.user_id ASC | ||
LIMIT ? | ||
""", | ||
(last_room_id, last_user_id, batch_size), | ||
(last_room_id, last_user_id, Membership.LEAVE, batch_size), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd prefer not to exclude leave memberships. It's just extra things to think about. We want the table to be complete in the end so that means we will need to add another background update to fill in the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm a bit in tow minds about this. The problem is that this is a surprisingly huge amount of data that will just never be read. On the other hand, it feels inconsistent to not port them over but to keep future old rows. I wonder if the right thing to do here maybe is to skip these rows, but add a background job that clears out old left rooms from the table? Which is possible since we can response with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we want to accept the forever background update to clean-up leaves and keep the size of the database table down, that can work 👍 I assume we want some grace period for left rooms (a day)? That way people don't immediately get a Is it really worth this complexity though? We're not storing every membership ever, just the latest membership of a given user for a given room. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are we sure that we're not going to add a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yeah, probably something like a week or something, maybe longer.
It's kinda sucky to keep around things forever. It's fine when it's not causing issues, but it'll take a really long time for the background updates to run for data that we currently don't use.
That's a fair question, I guess it would be good to know if any clients actually use that option. I guess a potential half-way house is for us to not port over the metadata for left rooms for now? Which also seems wrong but will be a lot quicker and have a chance to actually complete. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Some discussion in an internal room (light discussion with people on both sides) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If I remove this patch and open a separate PR are you happy with the rest? We probably want the actual bug fixes to go into the RC There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good 🙂 ("Ignore leave events for bg updates" moved to another PR) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. c.f. #17699
erikjohnston marked this conversation as resolved.
Show resolved
Hide resolved
|
||
) | ||
elif last_event_stream_ordering is not None: | ||
# It's important to sort by `event_stream_ordering` *ascending* (oldest to | ||
|
@@ -1910,7 +1935,8 @@ def _find_memberships_to_update_txn( | |
""" | ||
SELECT | ||
c.room_id, | ||
c.room_id, | ||
r.room_id, | ||
r.room_version, | ||
c.user_id, | ||
e.sender, | ||
c.event_id, | ||
|
@@ -1919,20 +1945,33 @@ def _find_memberships_to_update_txn( | |
e.instance_name, | ||
e.outlier | ||
FROM local_current_membership AS c | ||
LEFT JOIN sliding_sync_membership_snapshots AS m USING (room_id, user_id) | ||
INNER JOIN events AS e USING (event_id) | ||
WHERE event_stream_ordering > ? | ||
ORDER BY event_stream_ordering ASC | ||
LEFT JOIN rooms AS r ON (c.room_id = r.room_id) | ||
WHERE c.event_stream_ordering > ? | ||
AND (m.user_id IS NULL OR c.event_id != m.membership_event_id) | ||
AND (c.membership != ? OR c.user_id != e.sender) | ||
ORDER BY c.event_stream_ordering ASC | ||
LIMIT ? | ||
""", | ||
(last_event_stream_ordering, batch_size), | ||
(last_event_stream_ordering, Membership.LEAVE, batch_size), | ||
) | ||
else: | ||
raise Exception("last_event_stream_ordering should not be None") | ||
|
||
memberships_to_update_rows = cast( | ||
List[ | ||
Tuple[ | ||
str, Optional[str], str, str, str, str, int, Optional[str], bool | ||
str, | ||
Optional[str], | ||
Optional[str], | ||
str, | ||
str, | ||
str, | ||
str, | ||
int, | ||
Optional[str], | ||
bool, | ||
] | ||
], | ||
txn.fetchall(), | ||
|
@@ -1965,7 +2004,7 @@ def _find_memberships_to_update_txn( | |
|
||
def _find_previous_membership_txn( | ||
txn: LoggingTransaction, room_id: str, user_id: str, event_id: str | ||
) -> Tuple[str, str]: | ||
) -> Optional[Tuple[str, str]]: | ||
# Find the previous invite/knock event before the leave event | ||
# | ||
# Here are some notes on how we landed on this query: | ||
|
@@ -2011,8 +2050,13 @@ def _find_previous_membership_txn( | |
) | ||
row = txn.fetchone() | ||
|
||
# We should see a corresponding previous invite/knock event | ||
assert row is not None | ||
if row is None: | ||
# Generally we should have an invite or knock event for leaves | ||
# that are outliers, however this may not always be the case | ||
# (e.g. a local user got kicked but the kick event got pulled in | ||
# as an outlier). | ||
return None | ||
|
||
event_id, membership = row | ||
|
||
return event_id, membership | ||
|
@@ -2027,6 +2071,7 @@ def _find_previous_membership_txn( | |
for ( | ||
room_id, | ||
room_id_from_rooms_table, | ||
room_version_id, | ||
user_id, | ||
sender, | ||
membership_event_id, | ||
|
@@ -2045,6 +2090,14 @@ def _find_previous_membership_txn( | |
Membership.BAN, | ||
) | ||
|
||
if ( | ||
room_version_id is not None | ||
and room_version_id not in KNOWN_ROOM_VERSIONS | ||
): | ||
# Ignore rooms with unknown room versions (these were | ||
# experimental rooms, that we no longer support). | ||
continue | ||
|
||
# There are some old out-of-band memberships (before | ||
# https://github.com/matrix-org/synapse/issues/6983) where we don't have the | ||
# corresponding room stored in the `rooms` table`. We have a `FOREIGN KEY` | ||
|
@@ -2132,14 +2185,17 @@ def _find_previous_membership_txn( | |
# in the events table though. We'll just say that we don't | ||
# know the state for these rooms and continue on with our | ||
# day. | ||
sliding_sync_membership_snapshots_insert_map["has_known_state"] = ( | ||
False | ||
) | ||
sliding_sync_membership_snapshots_insert_map = { | ||
"has_known_state": False, | ||
"room_type": None, | ||
"room_name": None, | ||
"is_encrypted": False, | ||
} | ||
elif membership in (Membership.INVITE, Membership.KNOCK) or ( | ||
membership in (Membership.LEAVE, Membership.BAN) and is_outlier | ||
): | ||
invite_or_knock_event_id = membership_event_id | ||
invite_or_knock_membership = membership | ||
invite_or_knock_event_id = None | ||
invite_or_knock_membership = None | ||
|
||
# If the event is an `out_of_band_membership` (special case of | ||
# `outlier`), we never had historical state so we have to pull from | ||
|
@@ -2148,35 +2204,55 @@ def _find_previous_membership_txn( | |
# membership (i.e. the room shouldn't disappear if your using the | ||
# `is_encrypted` filter and you leave). | ||
if membership in (Membership.LEAVE, Membership.BAN) and is_outlier: | ||
( | ||
invite_or_knock_event_id, | ||
invite_or_knock_membership, | ||
) = await self.db_pool.runInteraction( | ||
previous_membership = await self.db_pool.runInteraction( | ||
"sliding_sync_membership_snapshots_bg_update._find_previous_membership", | ||
_find_previous_membership_txn, | ||
room_id, | ||
user_id, | ||
membership_event_id, | ||
) | ||
if previous_membership is not None: | ||
( | ||
invite_or_knock_event_id, | ||
invite_or_knock_membership, | ||
) = previous_membership | ||
else: | ||
invite_or_knock_event_id = membership_event_id | ||
invite_or_knock_membership = membership | ||
|
||
# Pull from the stripped state on the invite/knock event | ||
invite_or_knock_event = await self.get_event(invite_or_knock_event_id) | ||
|
||
raw_stripped_state_events = None | ||
if invite_or_knock_membership == Membership.INVITE: | ||
invite_room_state = invite_or_knock_event.unsigned.get( | ||
"invite_room_state" | ||
) | ||
raw_stripped_state_events = invite_room_state | ||
elif invite_or_knock_membership == Membership.KNOCK: | ||
knock_room_state = invite_or_knock_event.unsigned.get( | ||
"knock_room_state" | ||
if ( | ||
invite_or_knock_event_id is not None | ||
and invite_or_knock_membership is not None | ||
): | ||
# Pull from the stripped state on the invite/knock event | ||
invite_or_knock_event = await self.get_event( | ||
invite_or_knock_event_id | ||
) | ||
raw_stripped_state_events = knock_room_state | ||
|
||
sliding_sync_membership_snapshots_insert_map = PersistEventsStore._get_sliding_sync_insert_values_from_stripped_state( | ||
raw_stripped_state_events | ||
) | ||
raw_stripped_state_events = None | ||
if invite_or_knock_membership == Membership.INVITE: | ||
invite_room_state = invite_or_knock_event.unsigned.get( | ||
"invite_room_state" | ||
) | ||
raw_stripped_state_events = invite_room_state | ||
elif invite_or_knock_membership == Membership.KNOCK: | ||
knock_room_state = invite_or_knock_event.unsigned.get( | ||
"knock_room_state" | ||
) | ||
raw_stripped_state_events = knock_room_state | ||
|
||
sliding_sync_membership_snapshots_insert_map = PersistEventsStore._get_sliding_sync_insert_values_from_stripped_state( | ||
raw_stripped_state_events | ||
) | ||
else: | ||
# We couldn't find any state for the membership, so we just have to | ||
# leave it as empty. | ||
sliding_sync_membership_snapshots_insert_map = { | ||
"has_known_state": False, | ||
"room_type": None, | ||
"room_name": None, | ||
"is_encrypted": False, | ||
} | ||
|
||
# We should have some insert values for each room, even if no | ||
# stripped state is on the event because we still want to record | ||
|
@@ -2295,19 +2371,42 @@ def _fill_table_txn(txn: LoggingTransaction) -> None: | |
) | ||
# We need to find the `forgotten` value during the transaction because | ||
# we can't risk inserting stale data. | ||
txn.execute( | ||
""" | ||
UPDATE sliding_sync_membership_snapshots | ||
SET | ||
forgotten = (SELECT forgotten FROM room_memberships WHERE event_id = ?) | ||
WHERE room_id = ? and user_id = ? | ||
""", | ||
( | ||
membership_event_id, | ||
room_id, | ||
user_id, | ||
), | ||
) | ||
if isinstance(txn.database_engine, PostgresEngine): | ||
txn.execute( | ||
""" | ||
UPDATE sliding_sync_membership_snapshots | ||
SET | ||
forgotten = m.forgotten | ||
FROM room_memberships AS m | ||
WHERE sliding_sync_membership_snapshots.room_id = ? | ||
AND sliding_sync_membership_snapshots.user_id = ? | ||
AND membership_event_id = ? | ||
AND membership_event_id = m.event_id | ||
AND m.event_id IS NOT NULL | ||
""", | ||
( | ||
room_id, | ||
user_id, | ||
membership_event_id, | ||
), | ||
) | ||
else: | ||
# SQLite doesn't support UPDATE FROM before 3.33.0, so we do | ||
# this via sub-selects. | ||
txn.execute( | ||
""" | ||
UPDATE sliding_sync_membership_snapshots | ||
SET | ||
forgotten = (SELECT forgotten FROM room_memberships WHERE event_id = ?) | ||
WHERE room_id = ? and user_id = ? AND membership_event_id = ? | ||
""", | ||
( | ||
membership_event_id, | ||
room_id, | ||
user_id, | ||
membership_event_id, | ||
), | ||
) | ||
|
||
await self.db_pool.runInteraction( | ||
"sliding_sync_membership_snapshots_bg_update", _fill_table_txn | ||
|
@@ -2317,6 +2416,7 @@ def _fill_table_txn(txn: LoggingTransaction) -> None: | |
( | ||
room_id, | ||
_room_id_from_rooms_table, | ||
_room_version_id, | ||
user_id, | ||
_sender, | ||
_membership_event_id, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we care to check this? As far as I can tell, it's valid JSON. Is this a Matrix spec thing?
Perhaps it's because we have to mix with a system that is sensitive to null-terminated strings (C strings)? Doing some quick searching, it seems like Postgres does not allow null bytes in
TEXT
fields but it is allowed in SQLite.The only other place we do this is in
synapse/synapse/storage/databases/main/stats.py
Lines 268 to 288 in 391c4f8
Why aren't we checking other content values like we do in the stats code?
Why aren't we checking this in the
events
code where we also insert these state values into the database? Do we disallow this with some validation somewhere to prevent this with new data?We should explain the reason why we're doing this. I assume it's the Postgres reason.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
D'oh! Sorry yeah, postgres
TEXT
fields can't have\0
bytes in them (nyargghghgh why). Will update with commentThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
^
^
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yeah, I forgot that
room_type
is also a thing. The rest I think are fine, e.g. tombstone room ID should be a valid room ID (but we may as well check that too).