From 755e6458986913ccdd4d1d1d6bab68f648b322d9 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 13 Jan 2023 19:47:30 +0000 Subject: [PATCH 1/8] Rename `_resume_sync_partial_state_room` Signed-off-by: Sean Quah --- synapse/handlers/federation.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index eca75f1108d1..01c510288724 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -176,7 +176,7 @@ def __init__(self, hs: "HomeServer"): # were shut down. if not hs.config.worker.worker_app: run_as_background_process( - "resume_sync_partial_state_room", self._resume_sync_partial_state_room + "resume_sync_partial_state_room", self._resume_partial_state_room_sync ) @trace @@ -1660,7 +1660,7 @@ async def get_room_complexity( # well. return None - async def _resume_sync_partial_state_room(self) -> None: + async def _resume_partial_state_room_sync(self) -> None: """Resumes resyncing of all partial-state rooms after a restart.""" assert not self.config.worker.worker_app From 403dec82eac47d6851795492da783d4ebe7d4311 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 13 Jan 2023 19:48:29 +0000 Subject: [PATCH 2/8] Add check to avoid starting duplicate partial state syncs When we are kicked from a room while syncing partial state, the sync can fail and leave us in a state where the homeserver is not joined to the room, yet has the partial state flag set on the room. When we next join the room (implemented by Mat in a separate PR), we would like to restart the sync. To do so without starting duplicate syncs, we add a set which tracks which partial state syncs are in progress. Signed-off-by: Sean Quah --- synapse/handlers/federation.py | 49 +++++++++++++++++++++++++++++----- 1 file changed, 43 insertions(+), 6 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 01c510288724..a4862272ac45 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -27,6 +27,7 @@ Iterable, List, Optional, + Set, Tuple, Union, ) @@ -171,6 +172,11 @@ def __init__(self, hs: "HomeServer"): self.third_party_event_rules = hs.get_third_party_event_rules() + # Tracks running partial state syncs by room ID. + # Partial state syncs currently only run on the main process, so it's okay to + # track them in-memory for now. + self._active_partial_state_syncs: Set[str] = set() + # if this is the main process, fire off a background process to resume # any partial-state-resync operations which were in flight when we # were shut down. @@ -679,9 +685,7 @@ async def do_invite_join( if ret.partial_state: # Kick off the process of asynchronously fetching the state for this # room. - run_as_background_process( - desc="sync_partial_state_room", - func=self._sync_partial_state_room, + self._start_partial_state_room_sync( initial_destination=origin, other_destinations=ret.servers_in_room, room_id=room_id, @@ -1666,14 +1670,47 @@ async def _resume_partial_state_room_sync(self) -> None: partial_state_rooms = await self.store.get_partial_state_room_resync_info() for room_id, resync_info in partial_state_rooms.items(): - run_as_background_process( - desc="sync_partial_state_room", - func=self._sync_partial_state_room, + self._start_partial_state_room_sync( initial_destination=resync_info.joined_via, other_destinations=resync_info.servers_in_room, room_id=room_id, ) + def _start_partial_state_room_sync( + self, + initial_destination: Optional[str], + other_destinations: Collection[str], + room_id: str, + ) -> None: + """Starts the background process to resync the state of a partial-state room, + if it is not already running. + + Args: + initial_destination: the initial homeserver to pull the state from + other_destinations: other homeservers to try to pull the state from, if + `initial_destination` is unavailable + room_id: room to be resynced + """ + + async def _sync_partial_state_room_wrapper() -> None: + if room_id in self._active_partial_state_syncs: + return + + self._active_partial_state_syncs.add(room_id) + + try: + await self._sync_partial_state_room( + initial_destination=initial_destination, + other_destinations=other_destinations, + room_id=room_id, + ) + finally: + self._active_partial_state_syncs.remove(room_id) + + run_as_background_process( + desc="sync_partial_state_room", func=_sync_partial_state_room_wrapper + ) + async def _sync_partial_state_room( self, initial_destination: Optional[str], From aeb8bbe4dfd68c4ad53e9d3ece0d61279cae772e Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 13 Jan 2023 20:26:19 +0000 Subject: [PATCH 3/8] Ensure we restart partial state syncs when we rejoin a room Signed-off-by: Sean Quah --- synapse/handlers/federation.py | 37 ++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index a4862272ac45..6a35b7539c31 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -176,6 +176,12 @@ def __init__(self, hs: "HomeServer"): # Partial state syncs currently only run on the main process, so it's okay to # track them in-memory for now. self._active_partial_state_syncs: Set[str] = set() + # Tracks partial state syncs we may want to restart. + # A dictionary mapping room IDs to (initial destination, other destinations) + # tuples. + self._partial_state_syncs_to_restart: Dict[ + str, Tuple[Optional[str], Collection[str]] + ] = {} # if this is the main process, fire off a background process to resume # any partial-state-resync operations which were in flight when we @@ -1694,6 +1700,14 @@ def _start_partial_state_room_sync( async def _sync_partial_state_room_wrapper() -> None: if room_id in self._active_partial_state_syncs: + # Mark the partial state sync as possibly needing a restart. + # We want to do this when the partial state sync is about to fail + # because we've been kicked from the room, but we rejoin before the sync + # finishes falling over. + self._partial_state_syncs_to_restart[room_id] = ( + initial_destination, + other_destinations, + ) return self._active_partial_state_syncs.add(room_id) @@ -1705,8 +1719,31 @@ async def _sync_partial_state_room_wrapper() -> None: room_id=room_id, ) finally: + # Check whether the room is still partial stated, while we still claim + # to be the active sync. Usually, the partial state flag will be gone, + # unless we left and rejoined the room, or the sync failed. + is_still_partial_state_room = await self.store.is_partial_state_room( + room_id + ) self._active_partial_state_syncs.remove(room_id) + # Check if we need to restart the sync. + if room_id in self._partial_state_syncs_to_restart: + ( + restart_initial_destination, + restart_other_destinations, + ) = self._partial_state_syncs_to_restart[room_id] + + # Clear the restart flag. + self._partial_state_syncs_to_restart.pop(room_id, None) + + if is_still_partial_state_room: + self._start_partial_state_room_sync( + initial_destination=restart_initial_destination, + other_destinations=restart_other_destinations, + room_id=room_id, + ) + run_as_background_process( desc="sync_partial_state_room", func=_sync_partial_state_room_wrapper ) From 4f36846a8e602f0bb1e3fd30d415521531f7baa8 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 13 Jan 2023 20:29:05 +0000 Subject: [PATCH 4/8] Add newsfile --- changelog.d/14844.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/14844.misc diff --git a/changelog.d/14844.misc b/changelog.d/14844.misc new file mode 100644 index 000000000000..30ce8663045f --- /dev/null +++ b/changelog.d/14844.misc @@ -0,0 +1 @@ +Add check to avoid starting duplicate partial state syncs. From 5e8447c121568de1e2c3d80d57d2ab096f98a70d Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Wed, 18 Jan 2023 18:57:21 +0000 Subject: [PATCH 5/8] fixup: pop and read from dict in same statement --- synapse/handlers/federation.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 6a35b7539c31..8c37eb50cae0 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1732,10 +1732,7 @@ async def _sync_partial_state_room_wrapper() -> None: ( restart_initial_destination, restart_other_destinations, - ) = self._partial_state_syncs_to_restart[room_id] - - # Clear the restart flag. - self._partial_state_syncs_to_restart.pop(room_id, None) + ) = self._partial_state_syncs_to_restart.pop(room_id) if is_still_partial_state_room: self._start_partial_state_room_sync( From ab7020f26e0bb8f3b6d2187e6bc17682d18f8b69 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Thu, 19 Jan 2023 00:52:50 +0000 Subject: [PATCH 6/8] fixup: rename restart flag --- synapse/handlers/federation.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 8c37eb50cae0..8fd1bae56684 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -179,7 +179,7 @@ def __init__(self, hs: "HomeServer"): # Tracks partial state syncs we may want to restart. # A dictionary mapping room IDs to (initial destination, other destinations) # tuples. - self._partial_state_syncs_to_restart: Dict[ + self._partial_state_syncs_maybe_needing_restart: Dict[ str, Tuple[Optional[str], Collection[str]] ] = {} @@ -1704,7 +1704,7 @@ async def _sync_partial_state_room_wrapper() -> None: # We want to do this when the partial state sync is about to fail # because we've been kicked from the room, but we rejoin before the sync # finishes falling over. - self._partial_state_syncs_to_restart[room_id] = ( + self._partial_state_syncs_maybe_needing_restart[room_id] = ( initial_destination, other_destinations, ) @@ -1728,11 +1728,11 @@ async def _sync_partial_state_room_wrapper() -> None: self._active_partial_state_syncs.remove(room_id) # Check if we need to restart the sync. - if room_id in self._partial_state_syncs_to_restart: + if room_id in self._partial_state_syncs_maybe_needing_restart: ( restart_initial_destination, restart_other_destinations, - ) = self._partial_state_syncs_to_restart.pop(room_id) + ) = self._partial_state_syncs_maybe_needing_restart.pop(room_id) if is_still_partial_state_room: self._start_partial_state_room_sync( From 71064de1d4f40c50267e02c97d493223dfac0ea5 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Thu, 19 Jan 2023 00:54:04 +0000 Subject: [PATCH 7/8] fixup: redo comments --- synapse/handlers/federation.py | 37 +++++++++++++++++++++++++--------- 1 file changed, 28 insertions(+), 9 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 8fd1bae56684..e386f77de6d9 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1688,7 +1688,7 @@ def _start_partial_state_room_sync( other_destinations: Collection[str], room_id: str, ) -> None: - """Starts the background process to resync the state of a partial-state room, + """Starts the background process to resync the state of a partial state room, if it is not already running. Args: @@ -1700,10 +1700,27 @@ def _start_partial_state_room_sync( async def _sync_partial_state_room_wrapper() -> None: if room_id in self._active_partial_state_syncs: - # Mark the partial state sync as possibly needing a restart. - # We want to do this when the partial state sync is about to fail - # because we've been kicked from the room, but we rejoin before the sync - # finishes falling over. + # Another local user has joined the room while there is already a + # partial state sync running. This implies that there is a new join + # event to un-partial state. We might find ourselves in one of a few + # scenarios: + # 1. There is an existing partial state sync. The partial state sync + # un-partial states the new join event before completing and all is + # well. + # 2. Before the latest join, the homeserver was no longer in the room + # and there is an existing partial state sync from our previous + # membership of the room. The partial state sync may have: + # a) succeeded, but not yet terminated. The room will not be + # un-partial stated again unless we restart the partial state + # sync. + # b) failed, because we were no longer in the room and remote + # homeservers were refusing our requests, but not yet + # terminated. After the latest join, remote homeservers may + # start answering our requests again, so we should restart the + # partial state sync. + # In the cases where we would want to restart the partial state sync, + # the room would have the partial state flag when the partial state sync + # terminates. self._partial_state_syncs_maybe_needing_restart[room_id] = ( initial_destination, other_destinations, @@ -1719,15 +1736,17 @@ async def _sync_partial_state_room_wrapper() -> None: room_id=room_id, ) finally: - # Check whether the room is still partial stated, while we still claim - # to be the active sync. Usually, the partial state flag will be gone, - # unless we left and rejoined the room, or the sync failed. + # Read the room's partial state flag while we still hold the claim to + # being the active partial state sync (so that another partial state + # sync can't come along and mess with it under us). + # Normally, the partial state flag will be gone. If it isn't, then we + # may find ourselves in scenario 2a or 2b as described in the comment + # above, where we want to restart the partial state sync. is_still_partial_state_room = await self.store.is_partial_state_room( room_id ) self._active_partial_state_syncs.remove(room_id) - # Check if we need to restart the sync. if room_id in self._partial_state_syncs_maybe_needing_restart: ( restart_initial_destination, From 06807749b61dd41aca7f2aab5d8b82959a65af97 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Thu, 19 Jan 2023 01:37:05 +0000 Subject: [PATCH 8/8] Add tests for restart and deduplication of partial state syncs --- tests/handlers/test_federation.py | 112 +++++++++++++++++++++++++++++- 1 file changed, 111 insertions(+), 1 deletion(-) diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py index cedbb9fafcfa..c1558c40c370 100644 --- a/tests/handlers/test_federation.py +++ b/tests/handlers/test_federation.py @@ -12,10 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import cast +from typing import Collection, Optional, cast from unittest import TestCase from unittest.mock import Mock, patch +from twisted.internet.defer import Deferred from twisted.test.proto_helpers import MemoryReactor from synapse.api.constants import EventTypes @@ -679,3 +680,112 @@ def test_failed_partial_join_is_clean(self) -> None: f"Stale partial-stated room flag left over for {room_id} after a" f" failed do_invite_join!", ) + + def test_duplicate_partial_state_room_syncs(self) -> None: + """ + Tests that concurrent partial state syncs are not started for the same room. + """ + is_partial_state = True + end_sync: "Deferred[None]" = Deferred() + + async def is_partial_state_room(room_id: str) -> bool: + return is_partial_state + + async def sync_partial_state_room( + initial_destination: Optional[str], + other_destinations: Collection[str], + room_id: str, + ) -> None: + nonlocal end_sync + try: + await end_sync + finally: + end_sync = Deferred() + + mock_is_partial_state_room = Mock(side_effect=is_partial_state_room) + mock_sync_partial_state_room = Mock(side_effect=sync_partial_state_room) + + fed_handler = self.hs.get_federation_handler() + store = self.hs.get_datastores().main + + with patch.object( + fed_handler, "_sync_partial_state_room", mock_sync_partial_state_room + ), patch.object(store, "is_partial_state_room", mock_is_partial_state_room): + # Start the partial state sync. + fed_handler._start_partial_state_room_sync("hs1", ["hs2"], "room_id") + self.assertEqual(mock_sync_partial_state_room.call_count, 1) + + # Try to start another partial state sync. + # Nothing should happen. + fed_handler._start_partial_state_room_sync("hs3", ["hs2"], "room_id") + self.assertEqual(mock_sync_partial_state_room.call_count, 1) + + # End the partial state sync + is_partial_state = False + end_sync.callback(None) + + # The partial state sync should not be restarted. + self.assertEqual(mock_sync_partial_state_room.call_count, 1) + + # The next attempt to start the partial state sync should work. + is_partial_state = True + fed_handler._start_partial_state_room_sync("hs3", ["hs2"], "room_id") + self.assertEqual(mock_sync_partial_state_room.call_count, 2) + + def test_partial_state_room_sync_restart(self) -> None: + """ + Tests that partial state syncs are restarted when a second partial state sync + was deduplicated and the first partial state sync fails. + """ + is_partial_state = True + end_sync: "Deferred[None]" = Deferred() + + async def is_partial_state_room(room_id: str) -> bool: + return is_partial_state + + async def sync_partial_state_room( + initial_destination: Optional[str], + other_destinations: Collection[str], + room_id: str, + ) -> None: + nonlocal end_sync + try: + await end_sync + finally: + end_sync = Deferred() + + mock_is_partial_state_room = Mock(side_effect=is_partial_state_room) + mock_sync_partial_state_room = Mock(side_effect=sync_partial_state_room) + + fed_handler = self.hs.get_federation_handler() + store = self.hs.get_datastores().main + + with patch.object( + fed_handler, "_sync_partial_state_room", mock_sync_partial_state_room + ), patch.object(store, "is_partial_state_room", mock_is_partial_state_room): + # Start the partial state sync. + fed_handler._start_partial_state_room_sync("hs1", ["hs2"], "room_id") + self.assertEqual(mock_sync_partial_state_room.call_count, 1) + + # Fail the partial state sync. + # The partial state sync should not be restarted. + end_sync.errback(Exception("Failed to request /state_ids")) + self.assertEqual(mock_sync_partial_state_room.call_count, 1) + + # Start the partial state sync again. + fed_handler._start_partial_state_room_sync("hs1", ["hs2"], "room_id") + self.assertEqual(mock_sync_partial_state_room.call_count, 2) + + # Deduplicate another partial state sync. + fed_handler._start_partial_state_room_sync("hs3", ["hs2"], "room_id") + self.assertEqual(mock_sync_partial_state_room.call_count, 2) + + # Fail the partial state sync. + # It should restart with the latest parameters. + end_sync.errback(Exception("Failed to request /state_ids")) + self.assertEqual(mock_sync_partial_state_room.call_count, 3) + mock_sync_partial_state_room.assert_called_with( + initial_destination="hs3", + other_destinations=["hs2"], + room_id="room_id", + )