From d7d29f23208f3e1084afba6d6f801cd5e1f1b12a Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Tue, 7 Mar 2023 21:56:44 +0000 Subject: [PATCH 1/9] Add `forget_rooms_on_leave` config option Signed-off-by: Sean Quah --- docs/usage/configuration/config_documentation.md | 10 ++++++++++ synapse/config/room.py | 4 ++++ 2 files changed, 14 insertions(+) diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md index 015855ee7ef4..44bc92b0415b 100644 --- a/docs/usage/configuration/config_documentation.md +++ b/docs/usage/configuration/config_documentation.md @@ -3685,6 +3685,16 @@ default_power_level_content_override: trusted_private_chat: null public_chat: null ``` +--- +### `forget_rooms_on_leave` + +Set to true to automatically forget rooms for users when they leave them, either +normally or via a kick or ban. Defaults to false. + +Example configuration: +```yaml +forget_rooms_on_leave: false +``` --- ## Opentracing diff --git a/synapse/config/room.py b/synapse/config/room.py index 4a7ac0054086..b6696cd129c3 100644 --- a/synapse/config/room.py +++ b/synapse/config/room.py @@ -75,3 +75,7 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None: % preset ) # We validate the actual overrides when we try to apply them. + + # When enabled, users will forget rooms when they leave them, either via a + # leave, kick or ban. + self.forget_on_leave = config.get("forget_rooms_on_leave", False) From 601fe8040091d9b50f5924ca2766239040e040b8 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Tue, 7 Mar 2023 23:05:13 +0000 Subject: [PATCH 2/9] Add table to track position of room forgetter Signed-off-by: Sean Quah --- synapse/storage/databases/main/roommember.py | 26 +++++++++++++++++++ .../main/delta/74/01add_room_forgetter.sql | 24 +++++++++++++++++ 2 files changed, 50 insertions(+) create mode 100644 synapse/storage/schema/main/delta/74/01add_room_forgetter.sql diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 694a5b802c7c..545d5df70a34 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -1368,6 +1368,32 @@ def _is_local_host_in_room_ignoring_users_txn( _is_local_host_in_room_ignoring_users_txn, ) + async def get_room_forgetter_stream_pos(self) -> int: + """Get the stream position of the background process to forget rooms when left + by users. + """ + return await self.db_pool.simple_select_one_onecol( + table="room_forgetter_stream_pos", + keyvalues={}, + retcol="stream_id", + desc="room_forgetter_stream_pos", + ) + + async def update_room_forgetter_stream_pos(self, stream_id: int) -> None: + """Update the stream position of the background process to forget rooms when + left by users. + + Must only be used by the worker running the background process. + """ + assert self.hs.config.worker.run_background_tasks + + await self.db_pool.simple_update_one( + table="room_forgetter_stream_pos", + keyvalues={}, + updatevalues={"stream_id": stream_id}, + desc="room_forgetter_stream_pos", + ) + class RoomMemberBackgroundUpdateStore(SQLBaseStore): def __init__( diff --git a/synapse/storage/schema/main/delta/74/01add_room_forgetter.sql b/synapse/storage/schema/main/delta/74/01add_room_forgetter.sql new file mode 100644 index 000000000000..be4b57d86f7a --- /dev/null +++ b/synapse/storage/schema/main/delta/74/01add_room_forgetter.sql @@ -0,0 +1,24 @@ +/* Copyright 2023 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE room_forgetter_stream_pos ( + Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row. + stream_id BIGINT NOT NULL, + CHECK (Lock='X') +); + +INSERT INTO room_forgetter_stream_pos ( + stream_id +) SELECT COALESCE(MAX(stream_ordering), 0) from events; From b4926d96861333214d72b28d5e074df65fbe48a1 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Wed, 8 Mar 2023 04:03:56 +0000 Subject: [PATCH 3/9] Give workers the ability to forget rooms Signed-off-by: Sean Quah --- synapse/handlers/room_member.py | 38 ++++++++-------- synapse/handlers/room_member_worker.py | 3 -- synapse/storage/databases/main/roommember.py | 48 ++++++++++---------- 3 files changed, 42 insertions(+), 47 deletions(-) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 509c5578895b..92c9bf92d15a 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -278,9 +278,25 @@ async def _user_left_room(self, target: UserID, room_id: str) -> None: """ raise NotImplementedError() - @abc.abstractmethod async def forget(self, user: UserID, room_id: str) -> None: - raise NotImplementedError() + user_id = user.to_string() + + member = await self._storage_controllers.state.get_current_state_event( + room_id=room_id, event_type=EventTypes.Member, state_key=user_id + ) + membership = member.membership if member else None + + if membership is not None and membership not in [ + Membership.LEAVE, + Membership.BAN, + ]: + raise SynapseError(400, "User %s in room %s" % (user_id, room_id)) + + # In normal case this call is only required if `membership` is not `None`. + # But: After the last member had left the room, the background update + # `_background_remove_left_rooms` is deleting rows related to this room from + # the table `current_state_events` and `get_current_state_events` is `None`. + await self.store.forget(user_id, room_id) async def ratelimit_multiple_invites( self, @@ -2018,25 +2034,7 @@ async def _user_left_room(self, target: UserID, room_id: str) -> None: """Implements RoomMemberHandler._user_left_room""" user_left_room(self.distributor, target, room_id) - async def forget(self, user: UserID, room_id: str) -> None: - user_id = user.to_string() - - member = await self._storage_controllers.state.get_current_state_event( - room_id=room_id, event_type=EventTypes.Member, state_key=user_id - ) - membership = member.membership if member else None - if membership is not None and membership not in [ - Membership.LEAVE, - Membership.BAN, - ]: - raise SynapseError(400, "User %s in room %s" % (user_id, room_id)) - - # In normal case this call is only required if `membership` is not `None`. - # But: After the last member had left the room, the background update - # `_background_remove_left_rooms` is deleting rows related to this room from - # the table `current_state_events` and `get_current_state_events` is `None`. - await self.store.forget(user_id, room_id) def get_users_which_can_issue_invite(auth_events: StateMap[EventBase]) -> List[str]: diff --git a/synapse/handlers/room_member_worker.py b/synapse/handlers/room_member_worker.py index 76e36b8a6d53..e8ff1ad063d6 100644 --- a/synapse/handlers/room_member_worker.py +++ b/synapse/handlers/room_member_worker.py @@ -137,6 +137,3 @@ async def _user_left_room(self, target: UserID, room_id: str) -> None: await self._notify_change_client( user_id=target.to_string(), room_id=room_id, change="left" ) - - async def forget(self, target: UserID, room_id: str) -> None: - raise RuntimeError("Cannot forget rooms on workers.") diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 545d5df70a34..5e9c55e0a082 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -82,7 +82,7 @@ class EventIdMembership: membership: str -class RoomMemberWorkerStore(EventsWorkerStore): +class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): def __init__( self, database: DatabasePool, @@ -1368,6 +1368,29 @@ def _is_local_host_in_room_ignoring_users_txn( _is_local_host_in_room_ignoring_users_txn, ) + async def forget(self, user_id: str, room_id: str) -> None: + """Indicate that user_id wishes to discard history for room_id.""" + + def f(txn: LoggingTransaction) -> None: + sql = ( + "UPDATE" + " room_memberships" + " SET" + " forgotten = 1" + " WHERE" + " user_id = ?" + " AND" + " room_id = ?" + ) + txn.execute(sql, (user_id, room_id)) + + self._invalidate_cache_and_stream(txn, self.did_forget, (user_id, room_id)) + self._invalidate_cache_and_stream( + txn, self.get_forgotten_rooms_for_user, (user_id,) + ) + + await self.db_pool.runInteraction("forget_membership", f) + async def get_room_forgetter_stream_pos(self) -> int: """Get the stream position of the background process to forget rooms when left by users. @@ -1569,29 +1592,6 @@ def __init__( ): super().__init__(database, db_conn, hs) - async def forget(self, user_id: str, room_id: str) -> None: - """Indicate that user_id wishes to discard history for room_id.""" - - def f(txn: LoggingTransaction) -> None: - sql = ( - "UPDATE" - " room_memberships" - " SET" - " forgotten = 1" - " WHERE" - " user_id = ?" - " AND" - " room_id = ?" - ) - txn.execute(sql, (user_id, room_id)) - - self._invalidate_cache_and_stream(txn, self.did_forget, (user_id, room_id)) - self._invalidate_cache_and_stream( - txn, self.get_forgotten_rooms_for_user, (user_id,) - ) - - await self.db_pool.runInteraction("forget_membership", f) - def extract_heroes_from_room_summary( details: Mapping[str, MemberSummary], me: str From 923d5235b4ee60e2a55bc12bff5cbfafd618cb84 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Wed, 8 Mar 2023 04:09:23 +0000 Subject: [PATCH 4/9] Implement forgetting of rooms on leave Signed-off-by: Sean Quah --- synapse/handlers/room_member.py | 137 +++++++++++++++++++++++++++++++- synapse/server.py | 11 ++- 2 files changed, 146 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 92c9bf92d15a..ee060f6798c1 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -16,7 +16,7 @@ import logging import random from http import HTTPStatus -from typing import TYPE_CHECKING, Iterable, List, Optional, Set, Tuple +from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Tuple from synapse import types from synapse.api.constants import ( @@ -38,7 +38,10 @@ from synapse.events import EventBase from synapse.events.snapshot import EventContext from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN +from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler from synapse.logging import opentracing +from synapse.metrics import event_processing_positions +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.module_api import NOT_SPAM from synapse.types import ( JsonDict, @@ -2035,6 +2038,138 @@ async def _user_left_room(self, target: UserID, room_id: str) -> None: user_left_room(self.distributor, target, room_id) +class RoomForgetterHandler(StateDeltasHandler): + """Forgets rooms when they are left, when enabled in the homeserver config. + + For the purposes of this feature, kicks, bans and "leaves" via state resolution + weirdness are all considered to be leaves. + + Derived from `StatsHandler` and `UserDirectoryHandler`. + """ + + def __init__(self, hs: "HomeServer"): + super().__init__(hs) + + self._hs = hs + self._store = hs.get_datastores().main + self._storage_controllers = hs.get_storage_controllers() + self._clock = hs.get_clock() + self._notifier = hs.get_notifier() + self._room_member_handler = hs.get_room_member_handler() + + # The current position in the current_state_delta stream + self.pos: Optional[int] = None + + # Guard to ensure we only process deltas one at a time + self._is_processing = False + + if hs.config.worker.run_background_tasks: + self._notifier.add_replication_callback(self.notify_new_event) + + # We kick this off to pick up outstanding work from before the last restart. + self._clock.call_later(0, self.notify_new_event) + + def notify_new_event(self) -> None: + """Called when there may be more deltas to process""" + if self._is_processing: + return + + self._is_processing = True + + async def process() -> None: + try: + await self._unsafe_process() + finally: + self._is_processing = False + + run_as_background_process("room_forgetter.notify_new_event", process) + + async def _unsafe_process(self) -> None: + # If self.pos is None then means we haven't fetched it from DB + if self.pos is None: + self.pos = await self._store.get_room_forgetter_stream_pos() + room_max_stream_ordering = self._store.get_room_max_stream_ordering() + if self.pos > room_max_stream_ordering: + # apparently, we've processed more events than exist in the database! + # this can happen if events are removed with history purge or similar. + logger.warning( + "Event stream ordering appears to have gone backwards (%i -> %i): " + "rewinding room forgetter processor", + self.pos, + room_max_stream_ordering, + ) + self.pos = room_max_stream_ordering + + # Loop round handling deltas until we're up to date + + while True: + # Be sure to read the max stream_ordering *before* checking if there are any outstanding + # deltas, since there is otherwise a chance that we could miss updates which arrive + # after we check the deltas. + room_max_stream_ordering = self._store.get_room_max_stream_ordering() + if self.pos == room_max_stream_ordering: + break + + logger.debug( + "Processing room forgetting %s->%s", self.pos, room_max_stream_ordering + ) + if self._hs.config.room.forget_on_leave: + ( + max_pos, + deltas, + ) = await self._storage_controllers.state.get_current_state_deltas( + self.pos, room_max_stream_ordering + ) + + logger.debug("Handling %d state deltas", len(deltas)) + await self._handle_deltas(deltas) + else: + # Update the processing position, so that if the server admin turns the + # feature on at a later date, we don't decide to forget every room that + # has ever been left in the past. + pass + + self.pos = max_pos + + # Expose current event processing position to prometheus + event_processing_positions.labels("room_forgetter").set(max_pos) + + await self._store.update_room_forgetter_stream_pos(max_pos) + + async def _handle_deltas(self, deltas: List[Dict[str, Any]]) -> None: + """Called with the state deltas to process""" + for delta in deltas: + typ = delta["type"] + state_key = delta["state_key"] + room_id = delta["room_id"] + event_id = delta["event_id"] + prev_event_id = delta["prev_event_id"] + + if typ != EventTypes.Member: + continue + + if not self._hs.is_mine_id(state_key): + continue + + change = await self._get_key_change( + prev_event_id, + event_id, + key_name="membership", + public_value=Membership.JOIN, + ) + is_leave = change is MatchChange.now_false + + if is_leave: + try: + await self._room_member_handler.forget( + UserID.from_string(state_key), room_id + ) + except SynapseError as e: + if e.code == 400: + # The user is back in the room. + pass + else: + raise def get_users_which_can_issue_invite(auth_events: StateMap[EventBase]) -> List[str]: diff --git a/synapse/server.py b/synapse/server.py index df80fc1bebdc..e55b59f674c3 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -92,7 +92,11 @@ ) from synapse.handlers.room_batch import RoomBatchHandler from synapse.handlers.room_list import RoomListHandler -from synapse.handlers.room_member import RoomMemberHandler, RoomMemberMasterHandler +from synapse.handlers.room_member import ( + RoomForgetterHandler, + RoomMemberHandler, + RoomMemberMasterHandler, +) from synapse.handlers.room_member_worker import RoomMemberWorkerHandler from synapse.handlers.room_summary import RoomSummaryHandler from synapse.handlers.search import SearchHandler @@ -209,6 +213,7 @@ class HomeServer(metaclass=abc.ABCMeta): "message", "pagination", "profile", + "room_forgetter", "stats", ] @@ -801,6 +806,10 @@ def get_account_handler(self) -> AccountHandler: def get_push_rules_handler(self) -> PushRulesHandler: return PushRulesHandler(self) + @cache_in_self + def get_room_forgetter_handler(self) -> RoomForgetterHandler: + return RoomForgetterHandler(self) + @cache_in_self def get_outbound_redis_connection(self) -> "ConnectionHandler": """ From dea5247fab2d921dc58fd16d22446aaaa91dfa3f Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Wed, 8 Mar 2023 04:11:11 +0000 Subject: [PATCH 5/9] Add test for forgetting rooms on leave Signed-off-by: Sean Quah --- tests/handlers/test_room_member.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/tests/handlers/test_room_member.py b/tests/handlers/test_room_member.py index 6a38893b688a..a444d822cd4a 100644 --- a/tests/handlers/test_room_member.py +++ b/tests/handlers/test_room_member.py @@ -333,6 +333,17 @@ def test_leave_and_forget(self) -> None: self.get_success(self.store.is_locally_forgotten_room(self.room_id)) ) + @override_config({"forget_rooms_on_leave": True}) + def test_leave_and_auto_forget(self) -> None: + """Tests the `forget_rooms_on_leave` config option.""" + self.helper.join(self.room_id, user=self.bob, tok=self.bob_token) + + # alice is not the last room member that leaves and forgets the room + self.helper.leave(self.room_id, user=self.alice, tok=self.alice_token) + self.assertTrue( + self.get_success(self.store.did_forget(self.alice, self.room_id)) + ) + def test_leave_and_forget_last_user(self) -> None: """Tests that forget a room is successfully when the last user has left the room.""" From f3badbbaa9067abfba351a5cc8e08a55c4ff1027 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Wed, 8 Mar 2023 14:14:45 +0000 Subject: [PATCH 6/9] Add newsfile --- changelog.d/15224.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/15224.feature diff --git a/changelog.d/15224.feature b/changelog.d/15224.feature new file mode 100644 index 000000000000..5d8413f8be2b --- /dev/null +++ b/changelog.d/15224.feature @@ -0,0 +1 @@ +Add `forget_rooms_on_leave` config option to automatically forget rooms when users leave them or are removed from them. From 5dfada47a47f0b229563f86bd0478ddf58a5b9fa Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Tue, 2 May 2023 13:55:01 +0100 Subject: [PATCH 7/9] fixup: lift non-forget on leave branch out of loop --- synapse/handlers/room_member.py | 30 ++++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index ee060f6798c1..3731340ac1b9 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -2100,6 +2100,14 @@ async def _unsafe_process(self) -> None: ) self.pos = room_max_stream_ordering + if not self._hs.config.room.forget_on_leave: + # Update the processing position, so that if the server admin turns the + # feature on at a later date, we don't decide to forget every room that + # has ever been left in the past. + self.pos = self._store.get_room_max_stream_ordering() + await self._store.update_room_forgetter_stream_pos(self.pos) + return + # Loop round handling deltas until we're up to date while True: @@ -2113,21 +2121,15 @@ async def _unsafe_process(self) -> None: logger.debug( "Processing room forgetting %s->%s", self.pos, room_max_stream_ordering ) - if self._hs.config.room.forget_on_leave: - ( - max_pos, - deltas, - ) = await self._storage_controllers.state.get_current_state_deltas( - self.pos, room_max_stream_ordering - ) + ( + max_pos, + deltas, + ) = await self._storage_controllers.state.get_current_state_deltas( + self.pos, room_max_stream_ordering + ) - logger.debug("Handling %d state deltas", len(deltas)) - await self._handle_deltas(deltas) - else: - # Update the processing position, so that if the server admin turns the - # feature on at a later date, we don't decide to forget every room that - # has ever been left in the past. - pass + logger.debug("Handling %d state deltas", len(deltas)) + await self._handle_deltas(deltas) self.pos = max_pos From a7b6777b8de09e830340fc3dab7c8a11f63f03f9 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Tue, 2 May 2023 13:55:52 +0100 Subject: [PATCH 8/9] fixup: replace handrolled query with `simple_update_txn` --- synapse/storage/databases/main/roommember.py | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 5e9c55e0a082..0df4ddd6365e 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -1372,17 +1372,12 @@ async def forget(self, user_id: str, room_id: str) -> None: """Indicate that user_id wishes to discard history for room_id.""" def f(txn: LoggingTransaction) -> None: - sql = ( - "UPDATE" - " room_memberships" - " SET" - " forgotten = 1" - " WHERE" - " user_id = ?" - " AND" - " room_id = ?" + self.db_pool.simple_update_txn( + txn, + table="room_memberships", + keyvalues={"user_id": user_id, "room_id": room_id}, + updatevalues={"forgotten": 1}, ) - txn.execute(sql, (user_id, room_id)) self._invalidate_cache_and_stream(txn, self.did_forget, (user_id, room_id)) self._invalidate_cache_and_stream( From 95e81e34a08cbc8521738394717346928303cc9d Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Tue, 2 May 2023 14:06:50 +0100 Subject: [PATCH 9/9] fixup: move schema delta to correct directory --- .../{74/01add_room_forgetter.sql => 76/04_add_room_forgetter.sql} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename synapse/storage/schema/main/delta/{74/01add_room_forgetter.sql => 76/04_add_room_forgetter.sql} (100%) diff --git a/synapse/storage/schema/main/delta/74/01add_room_forgetter.sql b/synapse/storage/schema/main/delta/76/04_add_room_forgetter.sql similarity index 100% rename from synapse/storage/schema/main/delta/74/01add_room_forgetter.sql rename to synapse/storage/schema/main/delta/76/04_add_room_forgetter.sql