From 3bad02fbfeb7019b14c3d41a52651659051768b2 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Thu, 12 Jan 2023 18:35:00 +0100 Subject: [PATCH 01/11] Non lazy loading sync not blocking during fast join Signed-off-by: Mathieu Velten --- changelog.d/14831.misc | 1 + synapse/handlers/sync.py | 63 +++++++++++++++++++-- synapse/storage/databases/main/relations.py | 1 + synapse/storage/databases/main/room.py | 32 ++++++++++- synapse/streams/events.py | 3 + synapse/types/__init__.py | 9 ++- tests/rest/admin/test_room.py | 4 +- tests/rest/client/test_rooms.py | 10 ++-- 8 files changed, 108 insertions(+), 15 deletions(-) create mode 100644 changelog.d/14831.misc diff --git a/changelog.d/14831.misc b/changelog.d/14831.misc new file mode 100644 index 000000000000..72d6463f254e --- /dev/null +++ b/changelog.d/14831.misc @@ -0,0 +1 @@ +Non lazy loading sync not blocking during fast join. diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 78d488f2b1cb..9cf1f29de139 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1817,11 +1817,34 @@ async def _generate_sync_entry_for_rooms( ) sync_result_builder.now_token = now_token + # Retrieve rooms that got un partial stated in the meantime, only useful in case + # of a non lazy-loading-members sync. + un_partial_stated_rooms = set() + if not sync_result_builder.sync_config.filter_collection.lazy_load_members(): + un_partial_state_rooms_since = 0 + if sync_result_builder.since_token is not None: + un_partial_state_rooms_since = int( + sync_result_builder.since_token.un_partial_state_rooms_key + ) + + un_partial_state_rooms_now = int( + sync_result_builder.now_token.un_partial_state_rooms_key + ) + if un_partial_state_rooms_since != un_partial_state_rooms_now: + un_partial_stated_rooms = ( + await self.store.get_un_partial_stated_rooms_between( + un_partial_state_rooms_since, + un_partial_state_rooms_now, + ) + ) + # 2. We check up front if anything has changed, if it hasn't then there is # no point in going further. if not sync_result_builder.full_state: if since_token and not ephemeral_by_room and not account_data_by_room: - have_changed = await self._have_rooms_changed(sync_result_builder) + have_changed = await self._have_rooms_changed( + sync_result_builder, un_partial_stated_rooms + ) log_kv({"rooms_have_changed": have_changed}) if not have_changed: tags_by_room = await self.store.get_updated_tags( @@ -1835,7 +1858,7 @@ async def _generate_sync_entry_for_rooms( ignored_users = await self.store.ignored_users(user_id) if since_token: room_changes = await self._get_rooms_changed( - sync_result_builder, ignored_users + sync_result_builder, ignored_users, un_partial_stated_rooms ) tags_by_room = await self.store.get_updated_tags( user_id, since_token.account_data_key @@ -1888,7 +1911,9 @@ async def handle_room_entries(room_entry: "RoomSyncResultBuilder") -> None: ) async def _have_rooms_changed( - self, sync_result_builder: "SyncResultBuilder" + self, + sync_result_builder: "SyncResultBuilder", + un_partial_stated_rooms: Set[str], ) -> bool: """Returns whether there may be any new events that should be sent down the sync. Returns True if there are. @@ -1905,6 +1930,11 @@ async def _have_rooms_changed( stream_id = since_token.room_key.stream for room_id in sync_result_builder.joined_room_ids: + # If a room has been un partial stated in the meantime, + # let's consider it has changes and deal with it accordingly + # in _get_rooms_changed. + if room_id in un_partial_stated_rooms: + return True if self.store.has_room_changed_since(room_id, stream_id): return True return False @@ -1913,6 +1943,7 @@ async def _get_rooms_changed( self, sync_result_builder: "SyncResultBuilder", ignored_users: FrozenSet[str], + un_partial_stated_rooms: Set[str], ) -> _RoomChanges: """Determine the changes in rooms to report to the user. @@ -2116,7 +2147,24 @@ async def _get_rooms_changed( room_entry = room_to_events.get(room_id, None) newly_joined = room_id in newly_joined_rooms - if room_entry: + + # In case of a non lazy-loading-members sync we want to include + # rooms that got un partial stated in the meantime, and we need + # to include the full state of them. + if ( + not sync_config.filter_collection.lazy_load_members() + and room_id in un_partial_stated_rooms + ): + entry = RoomSyncResultBuilder( + room_id=room_id, + rtype="joined", + events=None, + newly_joined=True, + full_state=True, + since_token=None, + upto_token=now_token, + ) + elif room_entry: events, start_key = room_entry prev_batch_token = now_token.copy_and_replace( @@ -2186,6 +2234,13 @@ async def _get_all_rooms( knocked = [] for event in room_list: + # Do not include rooms that we don't have the full state yet + # in case of non lazy-loading-members sync. + if ( + not sync_config.filter_collection.lazy_load_members() + ) and await self.store.is_partial_state_room(event.room_id): + continue + if event.room_version_id not in KNOWN_ROOM_VERSIONS: continue diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index aea96e9d2478..95787c2cfd8c 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -292,6 +292,7 @@ def _get_recent_references_for_event_txn( to_device_key=0, device_list_key=0, groups_key=0, + un_partial_state_rooms_key=0, ) return events[:limit], next_token diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index 78906a5e1d9e..c614eda0760e 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -26,6 +26,7 @@ Mapping, Optional, Sequence, + Set, Tuple, Union, cast, @@ -1285,10 +1286,39 @@ def get_un_partial_stated_rooms_token(self) -> int: # explanation.) return self._un_partial_stated_rooms_stream_id_gen.get_current_token() + async def get_un_partial_stated_rooms_between( + self, last_id: int, current_id: int + ) -> Set[str]: + """Get all rooms that got un partial stated between `last_id` exclusive and + `current_id` inclusive. + + Returns: + The list of rooms. + """ + + if last_id == current_id: + return set() + + def _get_un_partial_stated_rooms_between_txn( + txn: LoggingTransaction, + ) -> Set[str]: + sql = """ + SELECT DISTINCT room_id FROM un_partial_stated_room_stream + WHERE ? < stream_id AND stream_id <= ? + """ + txn.execute(sql, (last_id, current_id)) + + return {r[0] for r in txn} + + return await self.db_pool.runInteraction( + "get_un_partial_stated_rooms_between", + _get_un_partial_stated_rooms_between_txn, + ) + async def get_un_partial_stated_rooms_from_stream( self, instance_name: str, last_id: int, current_id: int, limit: int ) -> Tuple[List[Tuple[int, Tuple[str]]], int, bool]: - """Get updates for caches replication stream. + """Get updates for un partial stated rooms replication stream. Args: instance_name: The writer we want to fetch updates from. Unused diff --git a/synapse/streams/events.py b/synapse/streams/events.py index 619eb7f601de..7e7bd160b59c 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -58,6 +58,7 @@ def get_current_token(self) -> StreamToken: push_rules_key = self.store.get_max_push_rules_stream_id() to_device_key = self.store.get_to_device_stream_token() device_list_key = self.store.get_device_stream_token() + un_partial_state_rooms_key = self.store.get_un_partial_stated_rooms_token() token = StreamToken( room_key=self.sources.room.get_current_key(), @@ -70,6 +71,7 @@ def get_current_token(self) -> StreamToken: device_list_key=device_list_key, # Groups key is unused. groups_key=0, + un_partial_state_rooms_key=un_partial_state_rooms_key, ) return token @@ -107,5 +109,6 @@ async def get_current_token_for_pagination(self, room_id: str) -> StreamToken: to_device_key=0, device_list_key=0, groups_key=0, + un_partial_state_rooms_key=0, ) return token diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py index 0c725eb9677d..d378c39ec2eb 100644 --- a/synapse/types/__init__.py +++ b/synapse/types/__init__.py @@ -646,12 +646,13 @@ class StreamToken: 7. `to_device_key`: `274711` 8. `device_list_key`: `265584` 9. `groups_key`: `1` (note that this key is now unused) + 10. `un_partial_state_rooms_key`: `379` You can see how many of these keys correspond to the various fields in a "/sync" response: ```json { - "next_batch": "s12_4_0_1_1_1_1_4_1", + "next_batch": "s12_4_0_1_1_1_1_4_1_1", "presence": { "events": [] }, @@ -663,7 +664,7 @@ class StreamToken: "!QrZlfIDQLNLdZHqTnt:hs1": { "timeline": { "events": [], - "prev_batch": "s10_4_0_1_1_1_1_4_1", + "prev_batch": "s10_4_0_1_1_1_1_4_1_1", "limited": false }, "state": { @@ -699,6 +700,7 @@ class StreamToken: device_list_key: int # Note that the groups key is no longer used and may have bogus values. groups_key: int + un_partial_state_rooms_key: int _SEPARATOR = "_" START: ClassVar["StreamToken"] @@ -737,6 +739,7 @@ async def to_string(self, store: "DataStore") -> str: # serialized so that there will not be confusion in the future # if additional tokens are added. str(self.groups_key), + str(self.un_partial_state_rooms_key), ] ) @@ -769,7 +772,7 @@ def copy_and_replace(self, key: str, new_value: Any) -> "StreamToken": return attr.evolve(self, **{key: new_value}) -StreamToken.START = StreamToken(RoomStreamToken(None, 0), 0, 0, 0, 0, 0, 0, 0, 0) +StreamToken.START = StreamToken(RoomStreamToken(None, 0), 0, 0, 0, 0, 0, 0, 0, 0, 0) @attr.s(slots=True, frozen=True, auto_attribs=True) diff --git a/tests/rest/admin/test_room.py b/tests/rest/admin/test_room.py index e0f5d54abab0..453a6e979c02 100644 --- a/tests/rest/admin/test_room.py +++ b/tests/rest/admin/test_room.py @@ -1831,7 +1831,7 @@ def test_timestamp_to_event(self) -> None: def test_topo_token_is_accepted(self) -> None: """Test Topo Token is accepted.""" - token = "t1-0_0_0_0_0_0_0_0_0" + token = "t1-0_0_0_0_0_0_0_0_0_0" channel = self.make_request( "GET", "/_synapse/admin/v1/rooms/%s/messages?from=%s" % (self.room_id, token), @@ -1845,7 +1845,7 @@ def test_topo_token_is_accepted(self) -> None: def test_stream_token_is_accepted_for_fwd_pagianation(self) -> None: """Test that stream token is accepted for forward pagination.""" - token = "s0_0_0_0_0_0_0_0_0" + token = "s0_0_0_0_0_0_0_0_0_0" channel = self.make_request( "GET", "/_synapse/admin/v1/rooms/%s/messages?from=%s" % (self.room_id, token), diff --git a/tests/rest/client/test_rooms.py b/tests/rest/client/test_rooms.py index b4daace55617..9222cab19801 100644 --- a/tests/rest/client/test_rooms.py +++ b/tests/rest/client/test_rooms.py @@ -1987,7 +1987,7 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.room_id = self.helper.create_room_as(self.user_id) def test_topo_token_is_accepted(self) -> None: - token = "t1-0_0_0_0_0_0_0_0_0" + token = "t1-0_0_0_0_0_0_0_0_0_0" channel = self.make_request( "GET", "/rooms/%s/messages?access_token=x&from=%s" % (self.room_id, token) ) @@ -1998,7 +1998,7 @@ def test_topo_token_is_accepted(self) -> None: self.assertTrue("end" in channel.json_body) def test_stream_token_is_accepted_for_fwd_pagianation(self) -> None: - token = "s0_0_0_0_0_0_0_0_0" + token = "s0_0_0_0_0_0_0_0_0_0" channel = self.make_request( "GET", "/rooms/%s/messages?access_token=x&from=%s" % (self.room_id, token) ) @@ -2728,7 +2728,7 @@ def test_messages_filter_labels(self) -> None: """Test that we can filter by a label on a /messages request.""" self._send_labelled_messages_in_room() - token = "s0_0_0_0_0_0_0_0_0" + token = "s0_0_0_0_0_0_0_0_0_0" channel = self.make_request( "GET", "/rooms/%s/messages?access_token=%s&from=%s&filter=%s" @@ -2745,7 +2745,7 @@ def test_messages_filter_not_labels(self) -> None: """Test that we can filter by the absence of a label on a /messages request.""" self._send_labelled_messages_in_room() - token = "s0_0_0_0_0_0_0_0_0" + token = "s0_0_0_0_0_0_0_0_0_0" channel = self.make_request( "GET", "/rooms/%s/messages?access_token=%s&from=%s&filter=%s" @@ -2768,7 +2768,7 @@ def test_messages_filter_labels_not_labels(self) -> None: """ self._send_labelled_messages_in_room() - token = "s0_0_0_0_0_0_0_0_0" + token = "s0_0_0_0_0_0_0_0_0_0" channel = self.make_request( "GET", "/rooms/%s/messages?access_token=%s&from=%s&filter=%s" From 661e25bfa4667595d19d5d208c01840d2aaa5012 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Tue, 17 Jan 2023 15:28:45 +0100 Subject: [PATCH 02/11] Address comments --- synapse/handlers/sync.py | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 9cf1f29de139..921d9d3e6e0f 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1613,9 +1613,9 @@ async def _generate_sync_entry_for_to_device( now_token = sync_result_builder.now_token since_stream_id = 0 if sync_result_builder.since_token is not None: - since_stream_id = int(sync_result_builder.since_token.to_device_key) + since_stream_id = sync_result_builder.since_token.to_device_key - if device_id is not None and since_stream_id != int(now_token.to_device_key): + if device_id is not None and since_stream_id != now_token.to_device_key: messages, stream_id = await self.store.get_messages_for_device( user_id, device_id, since_stream_id, now_token.to_device_key ) @@ -1684,7 +1684,7 @@ async def _generate_sync_entry_for_account_data( ) push_rules_changed = await self.store.have_push_rules_changed_for_user( - user_id, int(since_token.push_rules_key) + user_id, since_token.push_rules_key ) if push_rules_changed: @@ -1823,20 +1823,16 @@ async def _generate_sync_entry_for_rooms( if not sync_result_builder.sync_config.filter_collection.lazy_load_members(): un_partial_state_rooms_since = 0 if sync_result_builder.since_token is not None: - un_partial_state_rooms_since = int( + un_partial_state_rooms_since = ( sync_result_builder.since_token.un_partial_state_rooms_key ) - un_partial_state_rooms_now = int( - sync_result_builder.now_token.un_partial_state_rooms_key - ) - if un_partial_state_rooms_since != un_partial_state_rooms_now: - un_partial_stated_rooms = ( - await self.store.get_un_partial_stated_rooms_between( - un_partial_state_rooms_since, - un_partial_state_rooms_now, - ) + un_partial_stated_rooms = ( + await self.store.get_un_partial_stated_rooms_between( + un_partial_state_rooms_since, + sync_result_builder.now_token.un_partial_state_rooms_key, ) + ) # 2. We check up front if anything has changed, if it hasn't then there is # no point in going further. From b01ec646006d11fc27f39ad8c30dca7410ca2bca Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Tue, 17 Jan 2023 15:49:29 +0100 Subject: [PATCH 03/11] Apply suggestions from code review Co-authored-by: David Robertson --- changelog.d/14831.misc | 2 +- synapse/handlers/sync.py | 13 +++++++------ synapse/storage/databases/main/room.py | 2 +- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/changelog.d/14831.misc b/changelog.d/14831.misc index 72d6463f254e..55aa666b1c9f 100644 --- a/changelog.d/14831.misc +++ b/changelog.d/14831.misc @@ -1 +1 @@ -Non lazy loading sync not blocking during fast join. +Faster joins: non lazy-loading syncs will return immediately after a faster join, by omitting partial state rooms until we acquire their full satte. diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 921d9d3e6e0f..b4c594ee0a1b 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1926,9 +1926,9 @@ async def _have_rooms_changed( stream_id = since_token.room_key.stream for room_id in sync_result_builder.joined_room_ids: - # If a room has been un partial stated in the meantime, - # let's consider it has changes and deal with it accordingly - # in _get_rooms_changed. + # If a room has been un partial stated during the sync period, + # assume it has seen some kind of change. We'll process that + # change later, in _get_rooms_changed. if room_id in un_partial_stated_rooms: return True if self.store.has_room_changed_since(room_id, stream_id): @@ -2144,9 +2144,10 @@ async def _get_rooms_changed( newly_joined = room_id in newly_joined_rooms - # In case of a non lazy-loading-members sync we want to include - # rooms that got un partial stated in the meantime, and we need - # to include the full state of them. + # Partially joined rooms are omitted from non lazy-loading-members + # syncs until the resync completes and that room is fully stated. + # When that happens, we need to include their full state in + # the next non-lazy-loading sync. if ( not sync_config.filter_collection.lazy_load_members() and room_id in un_partial_stated_rooms diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index c614eda0760e..2b200099bb4d 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -1293,7 +1293,7 @@ async def get_un_partial_stated_rooms_between( `current_id` inclusive. Returns: - The list of rooms. + The list of room ids. """ if last_id == current_id: From 4441f5b991be9708f1d23eae090d29eae26e28af Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Tue, 17 Jan 2023 15:51:11 +0100 Subject: [PATCH 04/11] Move changelog to feature --- changelog.d/{14831.misc => 14831.feature} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename changelog.d/{14831.misc => 14831.feature} (85%) diff --git a/changelog.d/14831.misc b/changelog.d/14831.feature similarity index 85% rename from changelog.d/14831.misc rename to changelog.d/14831.feature index 55aa666b1c9f..06b2a96854d9 100644 --- a/changelog.d/14831.misc +++ b/changelog.d/14831.feature @@ -1 +1 @@ -Faster joins: non lazy-loading syncs will return immediately after a faster join, by omitting partial state rooms until we acquire their full satte. +Faster joins: non lazy-loading syncs will return immediately after a faster join, by omitting partial state rooms until we acquire their full state. From 33d9642c0e2cd3430c8c328a015cd36f20ac722a Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Tue, 17 Jan 2023 15:54:36 +0100 Subject: [PATCH 05/11] Use AbstractSet --- synapse/handlers/sync.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index b4c594ee0a1b..b4c4c23499a5 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1909,7 +1909,7 @@ async def handle_room_entries(room_entry: "RoomSyncResultBuilder") -> None: async def _have_rooms_changed( self, sync_result_builder: "SyncResultBuilder", - un_partial_stated_rooms: Set[str], + un_partial_stated_rooms: AbstractSet[str], ) -> bool: """Returns whether there may be any new events that should be sent down the sync. Returns True if there are. @@ -1939,7 +1939,7 @@ async def _get_rooms_changed( self, sync_result_builder: "SyncResultBuilder", ignored_users: FrozenSet[str], - un_partial_stated_rooms: Set[str], + un_partial_stated_rooms: AbstractSet[str], ) -> _RoomChanges: """Determine the changes in rooms to report to the user. From 6023bee27f2f062645178e73d854dda2021289c0 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Tue, 17 Jan 2023 15:56:03 +0100 Subject: [PATCH 06/11] typo un_partial_state vs un_partial_stated --- synapse/handlers/sync.py | 10 +++++----- synapse/storage/databases/main/relations.py | 2 +- synapse/streams/events.py | 6 +++--- synapse/types/__init__.py | 6 +++--- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index b4c4c23499a5..97db0388bd75 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1821,16 +1821,16 @@ async def _generate_sync_entry_for_rooms( # of a non lazy-loading-members sync. un_partial_stated_rooms = set() if not sync_result_builder.sync_config.filter_collection.lazy_load_members(): - un_partial_state_rooms_since = 0 + un_partial_stated_rooms_since = 0 if sync_result_builder.since_token is not None: - un_partial_state_rooms_since = ( - sync_result_builder.since_token.un_partial_state_rooms_key + un_partial_stated_rooms_since = ( + sync_result_builder.since_token.un_partial_stated_rooms_key ) un_partial_stated_rooms = ( await self.store.get_un_partial_stated_rooms_between( - un_partial_state_rooms_since, - sync_result_builder.now_token.un_partial_state_rooms_key, + un_partial_stated_rooms_since, + sync_result_builder.now_token.un_partial_stated_rooms_key, ) ) diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index 95787c2cfd8c..84f844b79e7f 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -292,7 +292,7 @@ def _get_recent_references_for_event_txn( to_device_key=0, device_list_key=0, groups_key=0, - un_partial_state_rooms_key=0, + un_partial_stated_rooms_key=0, ) return events[:limit], next_token diff --git a/synapse/streams/events.py b/synapse/streams/events.py index 7e7bd160b59c..584ad1004b96 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -58,7 +58,7 @@ def get_current_token(self) -> StreamToken: push_rules_key = self.store.get_max_push_rules_stream_id() to_device_key = self.store.get_to_device_stream_token() device_list_key = self.store.get_device_stream_token() - un_partial_state_rooms_key = self.store.get_un_partial_stated_rooms_token() + un_partial_stated_rooms_key = self.store.get_un_partial_stated_rooms_token() token = StreamToken( room_key=self.sources.room.get_current_key(), @@ -71,7 +71,7 @@ def get_current_token(self) -> StreamToken: device_list_key=device_list_key, # Groups key is unused. groups_key=0, - un_partial_state_rooms_key=un_partial_state_rooms_key, + un_partial_stated_rooms_key=un_partial_stated_rooms_key, ) return token @@ -109,6 +109,6 @@ async def get_current_token_for_pagination(self, room_id: str) -> StreamToken: to_device_key=0, device_list_key=0, groups_key=0, - un_partial_state_rooms_key=0, + un_partial_stated_rooms_key=0, ) return token diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py index d378c39ec2eb..a45d7cf2e6fb 100644 --- a/synapse/types/__init__.py +++ b/synapse/types/__init__.py @@ -646,7 +646,7 @@ class StreamToken: 7. `to_device_key`: `274711` 8. `device_list_key`: `265584` 9. `groups_key`: `1` (note that this key is now unused) - 10. `un_partial_state_rooms_key`: `379` + 10. `un_partial_stated_rooms_key`: `379` You can see how many of these keys correspond to the various fields in a "/sync" response: @@ -700,7 +700,7 @@ class StreamToken: device_list_key: int # Note that the groups key is no longer used and may have bogus values. groups_key: int - un_partial_state_rooms_key: int + un_partial_stated_rooms_key: int _SEPARATOR = "_" START: ClassVar["StreamToken"] @@ -739,7 +739,7 @@ async def to_string(self, store: "DataStore") -> str: # serialized so that there will not be confusion in the future # if additional tokens are added. str(self.groups_key), - str(self.un_partial_state_rooms_key), + str(self.un_partial_stated_rooms_key), ] ) From 1cca9eb32b7eef35b6c016af0ed67fd4ed72318e Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Tue, 17 Jan 2023 15:57:07 +0100 Subject: [PATCH 07/11] Address comment --- synapse/types/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py index a45d7cf2e6fb..dfea825d98f0 100644 --- a/synapse/types/__init__.py +++ b/synapse/types/__init__.py @@ -634,7 +634,7 @@ class StreamToken: """A collection of keys joined together by underscores in the following order and which represent the position in their respective streams. - ex. `s2633508_17_338_6732159_1082514_541479_274711_265584_1` + ex. `s2633508_17_338_6732159_1082514_541479_274711_265584_1_379` 1. `room_key`: `s2633508` which is a `RoomStreamToken` - `RoomStreamToken`'s can also look like `t426-2633508` or `m56~2.58~3.59` - See the docstring for `RoomStreamToken` for more details. From fbfafca0abc85d7d023c0a771837b8fe90424e4f Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Tue, 17 Jan 2023 16:09:06 +0100 Subject: [PATCH 08/11] oups whitespace --- synapse/handlers/sync.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 97db0388bd75..5d6e6a4c079c 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -2145,8 +2145,8 @@ async def _get_rooms_changed( newly_joined = room_id in newly_joined_rooms # Partially joined rooms are omitted from non lazy-loading-members - # syncs until the resync completes and that room is fully stated. - # When that happens, we need to include their full state in + # syncs until the resync completes and that room is fully stated. + # When that happens, we need to include their full state in # the next non-lazy-loading sync. if ( not sync_config.filter_collection.lazy_load_members() From 6472178a41fff06c8f0e494abb645bf6e427d98d Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Wed, 18 Jan 2023 12:38:48 +0100 Subject: [PATCH 09/11] Optimize get_un_partial_stated_rooms_between --- synapse/handlers/sync.py | 1 + synapse/storage/databases/main/room.py | 11 ++++++++--- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 5d6e6a4c079c..44a818d6f051 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1831,6 +1831,7 @@ async def _generate_sync_entry_for_rooms( await self.store.get_un_partial_stated_rooms_between( un_partial_stated_rooms_since, sync_result_builder.now_token.un_partial_stated_rooms_key, + sync_result_builder.joined_room_ids, ) ) diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index 2b200099bb4d..7a16d2762711 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -1287,7 +1287,7 @@ def get_un_partial_stated_rooms_token(self) -> int: return self._un_partial_stated_rooms_stream_id_gen.get_current_token() async def get_un_partial_stated_rooms_between( - self, last_id: int, current_id: int + self, last_id: int, current_id: int, room_ids: Collection[str] ) -> Set[str]: """Get all rooms that got un partial stated between `last_id` exclusive and `current_id` inclusive. @@ -1304,9 +1304,14 @@ def _get_un_partial_stated_rooms_between_txn( ) -> Set[str]: sql = """ SELECT DISTINCT room_id FROM un_partial_stated_room_stream - WHERE ? < stream_id AND stream_id <= ? + WHERE ? < stream_id AND stream_id <= ? AND """ - txn.execute(sql, (last_id, current_id)) + + clause, args = make_in_list_sql_clause( + self.database_engine, "room_id", room_ids + ) + + txn.execute(sql + clause, [last_id, current_id] + list(args)) return {r[0] for r in txn} From 7b60abea53e80522ab132e94cd78ffec76682690 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Wed, 18 Jan 2023 12:45:49 +0100 Subject: [PATCH 10/11] Do not call get_un_partial_stated_rooms_between on initial sync --- synapse/handlers/sync.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 44a818d6f051..0f19e2c7cd87 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1819,8 +1819,12 @@ async def _generate_sync_entry_for_rooms( # Retrieve rooms that got un partial stated in the meantime, only useful in case # of a non lazy-loading-members sync. + # We also skip calculating that in case of initial sync since we don't need it. un_partial_stated_rooms = set() - if not sync_result_builder.sync_config.filter_collection.lazy_load_members(): + if ( + since_token + and not sync_result_builder.sync_config.filter_collection.lazy_load_members() + ): un_partial_stated_rooms_since = 0 if sync_result_builder.since_token is not None: un_partial_stated_rooms_since = ( From 46062d8019d862b3a3794e5010677b6f2676fc83 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Wed, 18 Jan 2023 23:54:24 +0100 Subject: [PATCH 11/11] Notify user stream listeners to wake up long polling syncs --- synapse/handlers/federation.py | 11 ++++++----- synapse/notifier.py | 26 ++++++++++++++++++++++++++ synapse/storage/databases/main/room.py | 10 +++++----- synapse/types/__init__.py | 1 + 4 files changed, 38 insertions(+), 10 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index eca75f1108d1..3fcd5f3db47e 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1726,15 +1726,16 @@ async def _sync_partial_state_room( await self._device_handler.handle_room_un_partial_stated(room_id) logger.info("Clearing partial-state flag for %s", room_id) - success = await self.store.clear_partial_state_room(room_id) - if success: + new_stream_id = await self.store.clear_partial_state_room(room_id) + if new_stream_id is not None: logger.info("State resync complete for %s", room_id) self._storage_controllers.state.notify_room_un_partial_stated( room_id ) - # Poke the notifier so that other workers see the write to - # the un-partial-stated rooms stream. - self._notifier.notify_replication() + + await self._notifier.on_un_partial_stated_room( + room_id, new_stream_id + ) # TODO(faster_joins) update room stats and user directory? # https://github.com/matrix-org/synapse/issues/12814 diff --git a/synapse/notifier.py b/synapse/notifier.py index 26b97cf766c3..258e60367ef9 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -315,6 +315,32 @@ async def on_new_room_events( event_entries.append((entry, event.event_id)) await self.notify_new_room_events(event_entries, max_room_stream_token) + async def on_un_partial_stated_room( + self, + room_id: str, + new_token: int, + ) -> None: + """Used by the resync background processes to wake up all listeners + of this room that it just got un-partial-stated. + + It will also notify replication listeners of the change in stream. + """ + + # Wake up all related user stream notifiers + user_streams = self.room_to_user_streams.get(room_id, set()) + time_now_ms = self.clock.time_msec() + for user_stream in user_streams: + try: + user_stream.notify( + StreamKeyType.UN_PARTIAL_STATED_ROOMS, new_token, time_now_ms + ) + except Exception: + logger.exception("Failed to notify listener") + + # Poke the replication so that other workers also see the write to + # the un-partial-stated rooms stream. + self.notify_replication() + async def notify_new_room_events( self, event_entries: List[Tuple[_PendingRoomEventEntry, str]], diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index 7a16d2762711..09ba372f7a7f 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -2330,16 +2330,16 @@ async def unblock_room(self, room_id: str) -> None: (room_id,), ) - async def clear_partial_state_room(self, room_id: str) -> bool: + async def clear_partial_state_room(self, room_id: str) -> Optional[int]: """Clears the partial state flag for a room. Args: room_id: The room whose partial state flag is to be cleared. Returns: - `True` if the partial state flag has been cleared successfully. + The corresponding stream id for the un-partial-stated rooms stream. - `False` if the partial state flag could not be cleared because the room + `None` if the partial state flag could not be cleared because the room still contains events with partial state. """ try: @@ -2350,7 +2350,7 @@ async def clear_partial_state_room(self, room_id: str) -> bool: room_id, un_partial_state_room_stream_id, ) - return True + return un_partial_state_room_stream_id except self.db_pool.engine.module.IntegrityError as e: # Assume that any `IntegrityError`s are due to partial state events. logger.info( @@ -2358,7 +2358,7 @@ async def clear_partial_state_room(self, room_id: str) -> bool: room_id, e, ) - return False + return None def _clear_partial_state_room_txn( self, diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py index dfea825d98f0..1ae1e9e526b7 100644 --- a/synapse/types/__init__.py +++ b/synapse/types/__init__.py @@ -627,6 +627,7 @@ class StreamKeyType: PUSH_RULES: Final = "push_rules_key" TO_DEVICE: Final = "to_device_key" DEVICE_LIST: Final = "device_list_key" + UN_PARTIAL_STATED_ROOMS = "un_partial_stated_rooms_key" @attr.s(slots=True, frozen=True, auto_attribs=True)