From 41226c8d44c2b8cce865451f879437ddb0ad9f42 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 3 Mar 2021 11:45:53 -0500 Subject: [PATCH 1/7] Add a background task to purge unused chain IDs. --- changelog.d/9542.bugfix | 1 + .../databases/main/events_bg_updates.py | 85 +++++++++++++++++++ .../delta/59/10delete_purged_chain_cover.sql | 17 ++++ 3 files changed, 103 insertions(+) create mode 100644 changelog.d/9542.bugfix create mode 100644 synapse/storage/databases/main/schema/delta/59/10delete_purged_chain_cover.sql diff --git a/changelog.d/9542.bugfix b/changelog.d/9542.bugfix new file mode 100644 index 000000000000..dce0ad0920e2 --- /dev/null +++ b/changelog.d/9542.bugfix @@ -0,0 +1 @@ +Properly purge the event chain cover index when purging history. diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index cb6b1f8a0c17..85dfdac3656e 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -135,6 +135,11 @@ def __init__(self, database: DatabasePool, db_conn, hs): self._chain_cover_index, ) + self.db_pool.updates.register_background_update_handler( + "purged_chain_cover", + self._purged_chain_cover_index, + ) + async def _background_reindex_fields_sender(self, progress, batch_size): target_min_stream_id = progress["target_min_stream_id_inclusive"] max_stream_id = progress["max_stream_id_exclusive"] @@ -932,3 +937,83 @@ def _calculate_chain_cover_txn( processed_count=count, finished_room_map=finished_rooms, ) + + async def _purged_chain_cover_index(self, progress: dict, batch_size: int) -> int: + """ + A background updates that iterates over the chain cover and deletes the + chain cover for events that have been purged. + + This may be due to fully purging a room or via setting a retention policy. + """ + current_event_id = progress.get("current_event_id", "") + + def purged_chain_cover_txn(txn) -> int: + sql = ( + """ + SELECT event_id, chain_id, sequence_number FROM event_auth_chains + WHERE event_id > ? ORDER BY event_id ASC LIMIT ? + """ + ) + txn.execute(sql, (current_event_id, batch_size)) + + rows = txn.fetchall() + if not rows: + return 0 + + # The event IDs and chain IDs / sequence numbers where the event has + # been purged. + unreferenced_event_ids = [] + unreferenced_chain_id_tuples = [] + event_id = "" + for row in rows: + event_id = row[0] + + txn.execute( + """ + SELECT event_id FROM event_json WHERE event_id = ? + """, + (event_id,) + ) + if not txn.fetchone(): + unreferenced_event_ids.append(row[0]) + unreferenced_chain_id_tuples.append(row[1:2]) + + # Delete the unreferenced auth chains from event_auth_chain_links and + # event_auth_chains. + txn.executemany( + """ + DELETE FROM event_auth_chains WHERE event_id = ? + """, + unreferenced_event_ids, + ) + txn.executemany( + """ + DELETE FROM event_auth_chain_links WHERE + (origin_chain_id = ? AND origin_sequence_number = ?) OR + (target_chain_id = ? AND target_sequence_number = ?) + """, + ( + (chain_id, seq_num, chain_id, seq_num) + for (chain_id, seq_num) in unreferenced_chain_id_tuples + ), + ) + + progress = { + "current_event_id": event_id, + } + + self.db_pool.updates._background_update_progress_txn( + txn, "purged_chain_cover", progress + ) + + return len(rows) + + result = await self.db_pool.runInteraction( + "_purged_chain_cover_index", + purged_chain_cover_txn, + ) + + if not result: + await self.db_pool.updates._end_background_update("purged_chain_cover") + + return result diff --git a/synapse/storage/databases/main/schema/delta/59/10delete_purged_chain_cover.sql b/synapse/storage/databases/main/schema/delta/59/10delete_purged_chain_cover.sql new file mode 100644 index 000000000000..87cb1f3cfdeb --- /dev/null +++ b/synapse/storage/databases/main/schema/delta/59/10delete_purged_chain_cover.sql @@ -0,0 +1,17 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (5910, 'purged_chain_cover', '{}'); From 3acd1823de98076890c00ca0ddd6f42ba75f9b73 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 3 Mar 2021 13:48:36 -0500 Subject: [PATCH 2/7] Lint --- synapse/storage/databases/main/events_bg_updates.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index 85dfdac3656e..95b12d4aec7a 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -948,12 +948,10 @@ async def _purged_chain_cover_index(self, progress: dict, batch_size: int) -> in current_event_id = progress.get("current_event_id", "") def purged_chain_cover_txn(txn) -> int: - sql = ( - """ + sql = """ SELECT event_id, chain_id, sequence_number FROM event_auth_chains WHERE event_id > ? ORDER BY event_id ASC LIMIT ? """ - ) txn.execute(sql, (current_event_id, batch_size)) rows = txn.fetchall() @@ -972,7 +970,7 @@ def purged_chain_cover_txn(txn) -> int: """ SELECT event_id FROM event_json WHERE event_id = ? """, - (event_id,) + (event_id,), ) if not txn.fetchone(): unreferenced_event_ids.append(row[0]) From dbaf00a8c483e60ee887054612ad01f22111c6f5 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 8 Mar 2021 09:40:30 -0500 Subject: [PATCH 3/7] Calculate whether the event exists in the initial query. --- .../databases/main/events_bg_updates.py | 24 ++++++++----------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index 95b12d4aec7a..d319cc84e064 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -948,10 +948,14 @@ async def _purged_chain_cover_index(self, progress: dict, batch_size: int) -> in current_event_id = progress.get("current_event_id", "") def purged_chain_cover_txn(txn) -> int: + # The event ID from events will be null if the chain ID / sequence + # number points to a purged event. sql = """ - SELECT event_id, chain_id, sequence_number FROM event_auth_chains + SELECT event_id, chain_id, sequence_number, e.event_id + FROM event_auth_chains + LEFT JOIN events AS e USING (event_id) WHERE event_id > ? ORDER BY event_id ASC LIMIT ? - """ + """ txn.execute(sql, (current_event_id, batch_size)) rows = txn.fetchall() @@ -963,18 +967,10 @@ def purged_chain_cover_txn(txn) -> int: unreferenced_event_ids = [] unreferenced_chain_id_tuples = [] event_id = "" - for row in rows: - event_id = row[0] - - txn.execute( - """ - SELECT event_id FROM event_json WHERE event_id = ? - """, - (event_id,), - ) - if not txn.fetchone(): - unreferenced_event_ids.append(row[0]) - unreferenced_chain_id_tuples.append(row[1:2]) + for event_id, chain_id, sequence_number, events_event_id in rows: + if not events_event_id: + unreferenced_event_ids.append(event_id) + unreferenced_chain_id_tuples.append((chain_id, sequence_number)) # Delete the unreferenced auth chains from event_auth_chain_links and # event_auth_chains. From eacddfdabdf743cccdfb520beb30b4b7f58b87c5 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 8 Mar 2021 10:24:57 -0500 Subject: [PATCH 4/7] Fix ambiguous ordering by. --- synapse/storage/databases/main/events_bg_updates.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index d319cc84e064..b8b6b4993c5e 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -954,7 +954,7 @@ def purged_chain_cover_txn(txn) -> int: SELECT event_id, chain_id, sequence_number, e.event_id FROM event_auth_chains LEFT JOIN events AS e USING (event_id) - WHERE event_id > ? ORDER BY event_id ASC LIMIT ? + WHERE event_id > ? ORDER BY event_auth_chains.event_id ASC LIMIT ? """ txn.execute(sql, (current_event_id, batch_size)) From 35825d2eb9eddd01b5a07241cd0ba343c1b6ccac Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 9 Mar 2021 10:15:11 -0500 Subject: [PATCH 5/7] Handle review comments. --- .../databases/main/events_bg_updates.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index b8b6b4993c5e..1dd9022eb3c1 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -951,7 +951,7 @@ def purged_chain_cover_txn(txn) -> int: # The event ID from events will be null if the chain ID / sequence # number points to a purged event. sql = """ - SELECT event_id, chain_id, sequence_number, e.event_id + SELECT event_id, chain_id, sequence_number, e.event_id IS NOT NULL FROM event_auth_chains LEFT JOIN events AS e USING (event_id) WHERE event_id > ? ORDER BY event_auth_chains.event_id ASC LIMIT ? @@ -967,8 +967,8 @@ def purged_chain_cover_txn(txn) -> int: unreferenced_event_ids = [] unreferenced_chain_id_tuples = [] event_id = "" - for event_id, chain_id, sequence_number, events_event_id in rows: - if not events_event_id: + for event_id, chain_id, sequence_number, has_event in rows: + if not has_event: unreferenced_event_ids.append(event_id) unreferenced_chain_id_tuples.append((chain_id, sequence_number)) @@ -980,16 +980,16 @@ def purged_chain_cover_txn(txn) -> int: """, unreferenced_event_ids, ) + # We should also delete matching target_*, but there is no index on + # target_chain_id. Hopefully any purged events are due to a room + # being fully purged and they will be removed from the origin_* + # searches. txn.executemany( """ DELETE FROM event_auth_chain_links WHERE - (origin_chain_id = ? AND origin_sequence_number = ?) OR - (target_chain_id = ? AND target_sequence_number = ?) + (origin_chain_id = ? AND origin_sequence_number = ?) """, - ( - (chain_id, seq_num, chain_id, seq_num) - for (chain_id, seq_num) in unreferenced_chain_id_tuples - ), + unreferenced_chain_id_tuples, ) progress = { From d8b4fdd61877e7949514cbff9732df2d412f57f2 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 9 Mar 2021 10:17:00 -0500 Subject: [PATCH 6/7] Update newsfragment. --- changelog.d/9542.bugfix | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.d/9542.bugfix b/changelog.d/9542.bugfix index dce0ad0920e2..51b1876f3b61 100644 --- a/changelog.d/9542.bugfix +++ b/changelog.d/9542.bugfix @@ -1 +1 @@ -Properly purge the event chain cover index when purging history. +Purge chain cover indexes for events that were purged prior to Synapse v1.29.0. From c353c12dcbc4290c133b8884d61fffa68c9649fa Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 9 Mar 2021 10:33:24 -0500 Subject: [PATCH 7/7] Do not purge based on target_chain_id. --- synapse/storage/databases/main/events_bg_updates.py | 2 +- synapse/storage/databases/main/purge_events.py | 8 ++------ 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index 1dd9022eb3c1..73e69d4cb152 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -987,7 +987,7 @@ def purged_chain_cover_txn(txn) -> int: txn.executemany( """ DELETE FROM event_auth_chain_links WHERE - (origin_chain_id = ? AND origin_sequence_number = ?) + origin_chain_id = ? AND origin_sequence_number = ? """, unreferenced_chain_id_tuples, ) diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py index 0836e4af4934..41f4fe7f95cd 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py @@ -331,13 +331,9 @@ def _purge_room_txn(self, txn, room_id: str) -> List[int]: txn.executemany( """ DELETE FROM event_auth_chain_links WHERE - (origin_chain_id = ? AND origin_sequence_number = ?) OR - (target_chain_id = ? AND target_sequence_number = ?) + origin_chain_id = ? AND origin_sequence_number = ? """, - ( - (chain_id, seq_num, chain_id, seq_num) - for (chain_id, seq_num) in referenced_chain_id_tuples - ), + referenced_chain_id_tuples, ) # Now we delete tables which lack an index on room_id but have one on event_id