diff --git a/changelog.d/15868.feature b/changelog.d/15868.feature new file mode 100644 index 000000000000..a866bf5774dc --- /dev/null +++ b/changelog.d/15868.feature @@ -0,0 +1 @@ +Remove old rows from the `cache_invalidation_stream_by_instance` table automatically (this table is unused in SQLite). diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index c940f864d173..2fbd389c7168 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -18,6 +18,8 @@ from typing import TYPE_CHECKING, Any, Collection, Iterable, List, Optional, Tuple from synapse.api.constants import EventTypes +from synapse.config._base import Config +from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.replication.tcp.streams import BackfillStream, CachesStream from synapse.replication.tcp.streams.events import ( EventsStream, @@ -52,6 +54,21 @@ # As above, but for invalidating room caches on room deletion DELETE_ROOM_CACHE_NAME = "dr_cache_fake" +# How long between cache invalidation table cleanups, once we have caught up +# with the backlog. +REGULAR_CLEANUP_INTERVAL_MS = Config.parse_duration("1h") + +# How long between cache invalidation table cleanups, before we have caught +# up with the backlog. +CATCH_UP_CLEANUP_INTERVAL_MS = Config.parse_duration("1m") + +# Maximum number of cache invalidation rows to delete at once. +CLEAN_UP_MAX_BATCH_SIZE = 20_000 + +# Keep cache invalidations for 7 days +# (This is likely to be quite excessive.) +RETENTION_PERIOD_OF_CACHE_INVALIDATIONS_MS = Config.parse_duration("7d") + class CacheInvalidationWorkerStore(SQLBaseStore): def __init__( @@ -98,6 +115,18 @@ def __init__( else: self._cache_id_gen = None + # Occasionally clean up the cache invalidations stream table by deleting + # old rows. + # This is only applicable when Postgres is in use; this table is unused + # and not populated at all when SQLite is the active database engine. + if hs.config.worker.run_background_tasks and isinstance( + self.database_engine, PostgresEngine + ): + self.hs.get_clock().call_later( + CATCH_UP_CLEANUP_INTERVAL_MS / 1000, + self._clean_up_cache_invalidation_wrapper, + ) + async def get_all_updated_caches( self, instance_name: str, last_id: int, current_id: int, limit: int ) -> Tuple[List[Tuple[int, tuple]], int, bool]: @@ -554,3 +583,104 @@ def get_cache_stream_token_for_writer(self, instance_name: str) -> int: return self._cache_id_gen.get_current_token_for_writer(instance_name) else: return 0 + + @wrap_as_background_process("clean_up_old_cache_invalidations") + async def _clean_up_cache_invalidation_wrapper(self) -> None: + """ + Clean up cache invalidation stream table entries occasionally. + If we are behind (i.e. there are entries old enough to + be deleted but too many of them to be deleted in one go), + then we run slightly more frequently. + """ + delete_up_to: int = ( + self.hs.get_clock().time_msec() - RETENTION_PERIOD_OF_CACHE_INVALIDATIONS_MS + ) + + in_backlog = await self._clean_up_batch_of_old_cache_invalidations(delete_up_to) + + # Vary how long we wait before calling again depending on whether we + # are still sifting through backlog or we have caught up. + if in_backlog: + next_interval = CATCH_UP_CLEANUP_INTERVAL_MS + else: + next_interval = REGULAR_CLEANUP_INTERVAL_MS + + self.hs.get_clock().call_later( + next_interval / 1000, self._clean_up_cache_invalidation_wrapper + ) + + async def _clean_up_batch_of_old_cache_invalidations( + self, delete_up_to_millisec: int + ) -> bool: + """ + Remove old rows from the `cache_invalidation_stream_by_instance` table automatically (this table is unused in SQLite). + + Up to `CLEAN_UP_BATCH_SIZE` rows will be deleted at once. + + Returns true if and only if we were limited by batch size (i.e. we are in backlog: + there are more things to clean up). + """ + + def _clean_up_batch_of_old_cache_invalidations_txn( + txn: LoggingTransaction, + ) -> bool: + # First get the earliest stream ID + txn.execute( + """ + SELECT stream_id FROM cache_invalidation_stream_by_instance + ORDER BY stream_id ASC + LIMIT 1 + """ + ) + row = txn.fetchone() + if row is None: + return False + earliest_stream_id: int = row[0] + + # Then find the last stream ID of the range we will delete + txn.execute( + """ + SELECT stream_id FROM cache_invalidation_stream_by_instance + WHERE stream_id <= ? AND invalidation_ts <= ? + ORDER BY stream_id DESC + LIMIT 1 + """, + (earliest_stream_id + CLEAN_UP_MAX_BATCH_SIZE, delete_up_to_millisec), + ) + row = txn.fetchone() + if row is None: + return False + cutoff_stream_id: int = row[0] + + # Determine whether we are caught up or still catching up + txn.execute( + """ + SELECT invalidation_ts FROM cache_invalidation_stream_by_instance + WHERE stream_id > ? + ORDER BY stream_id ASC + LIMIT 1 + """, + (cutoff_stream_id,), + ) + row = txn.fetchone() + if row is None: + in_backlog = False + else: + # We are in backlog if the next row could have been deleted + # if we didn't have such a small batch size + in_backlog = row[0] <= delete_up_to_millisec + + txn.execute( + """ + DELETE FROM cache_invalidation_stream_by_instance + WHERE ? <= stream_id AND stream_id <= ? + """, + (earliest_stream_id, cutoff_stream_id), + ) + + return in_backlog + + return await self.db_pool.runInteraction( + "clean_up_old_cache_invalidations", + _clean_up_batch_of_old_cache_invalidations_txn, + )