diff --git a/changelog.d/12886.misc b/changelog.d/12886.misc new file mode 100644 index 000000000000..3dd08f74bada --- /dev/null +++ b/changelog.d/12886.misc @@ -0,0 +1 @@ +Refactor `have_seen_events` to reduce memory consumed when processing federation traffic. diff --git a/changelog.d/12889.bugfix b/changelog.d/12889.bugfix new file mode 100644 index 000000000000..582b2f0642d3 --- /dev/null +++ b/changelog.d/12889.bugfix @@ -0,0 +1 @@ +Fix a bug introduced in Synapse 1.59.0 which caused room deletion to fail with a foreign key violation. diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 75321f6af5d7..cfa438bc3814 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1356,14 +1356,23 @@ async def have_seen_events( Returns: The set of events we have already seen. """ - res = await self._have_seen_events_dict( - (room_id, event_id) for event_id in event_ids - ) - return {eid for ((_rid, eid), have_event) in res.items() if have_event} + + # @cachedList chomps lots of memory if you call it with a big list, so + # we break it down. However, each batch requires its own index scan, so we make + # the batches as big as possible. + + results: Set[str] = set() + for chunk in batch_iter(event_ids, 500): + r = await self._have_seen_events_dict( + [(room_id, event_id) for event_id in chunk] + ) + results.update(eid for ((_rid, eid), have_event) in r.items() if have_event) + + return results @cachedList(cached_method_name="have_seen_event", list_name="keys") async def _have_seen_events_dict( - self, keys: Iterable[Tuple[str, str]] + self, keys: Collection[Tuple[str, str]] ) -> Dict[Tuple[str, str], bool]: """Helper for have_seen_events @@ -1375,11 +1384,12 @@ async def _have_seen_events_dict( cache_results = { (rid, eid) for (rid, eid) in keys if self._get_event_cache.contains((eid,)) } - results = {x: True for x in cache_results} + results = dict.fromkeys(cache_results, True) + remaining = [k for k in keys if k not in cache_results] + if not remaining: + return results - def have_seen_events_txn( - txn: LoggingTransaction, chunk: Tuple[Tuple[str, str], ...] - ) -> None: + def have_seen_events_txn(txn: LoggingTransaction) -> None: # we deliberately do *not* query the database for room_id, to make the # query an index-only lookup on `events_event_id_key`. # @@ -1387,21 +1397,17 @@ def have_seen_events_txn( sql = "SELECT event_id FROM events AS e WHERE " clause, args = make_in_list_sql_clause( - txn.database_engine, "e.event_id", [eid for (_rid, eid) in chunk] + txn.database_engine, "e.event_id", [eid for (_rid, eid) in remaining] ) txn.execute(sql + clause, args) found_events = {eid for eid, in txn} - # ... and then we can update the results for each row in the batch - results.update({(rid, eid): (eid in found_events) for (rid, eid) in chunk}) - - # each batch requires its own index scan, so we make the batches as big as - # possible. - for chunk in batch_iter((k for k in keys if k not in cache_results), 500): - await self.db_pool.runInteraction( - "have_seen_events", have_seen_events_txn, chunk + # ... and then we can update the results for each key + results.update( + {(rid, eid): (eid in found_events) for (rid, eid) in remaining} ) + await self.db_pool.runInteraction("have_seen_events", have_seen_events_txn) return results @cached(max_entries=100000, tree=True) diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py index f84d768dc3f6..9969c2a21023 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py @@ -322,12 +322,7 @@ async def purge_room(self, room_id: str) -> List[int]: ) def _purge_room_txn(self, txn: LoggingTransaction, room_id: str) -> List[int]: - # We *immediately* delete the room from the rooms table. This ensures - # that we don't race when persisting events (as that transaction checks - # that the room exists). - txn.execute("DELETE FROM rooms WHERE room_id = ?", (room_id,)) - - # Next, we fetch all the state groups that should be deleted, before + # First, fetch all the state groups that should be deleted, before # we delete that information. txn.execute( """ @@ -387,7 +382,7 @@ def _purge_room_txn(self, txn: LoggingTransaction, room_id: str) -> List[int]: (room_id,), ) - # and finally, the tables with an index on room_id (or no useful index) + # next, the tables with an index on room_id (or no useful index) for table in ( "current_state_events", "destination_rooms", @@ -395,8 +390,13 @@ def _purge_room_txn(self, txn: LoggingTransaction, room_id: str) -> List[int]: "event_forward_extremities", "event_push_actions", "event_search", + "partial_state_events", "events", + "federation_inbound_events_staging", "group_rooms", + "local_current_membership", + "partial_state_rooms_servers", + "partial_state_rooms", "receipts_graph", "receipts_linearized", "room_aliases", @@ -416,8 +416,9 @@ def _purge_room_txn(self, txn: LoggingTransaction, room_id: str) -> List[int]: "group_summary_rooms", "room_account_data", "room_tags", - "local_current_membership", - "federation_inbound_events_staging", + # "rooms" happens last, to keep the foreign keys in the other tables + # happy + "rooms", ): logger.info("[purge] removing %s from %s", room_id, table) txn.execute("DELETE FROM %s WHERE room_id=?" % (table,), (room_id,))