Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Remove old rows from the cache_invalidation_stream_by_instance tabl…
Browse files Browse the repository at this point in the history
…e automatically. (This table is not used when Synapse is configured to use SQLite.) (#15868)

* Add a cache invalidation clean-up task

* Run the cache invalidation stream clean-up on the background worker

* Tune down

* call_later is in millis!

* Newsfile

Signed-off-by: Olivier Wilkinson (reivilibre) <[email protected]>

* fixup! Add a cache invalidation clean-up task

* Update synapse/storage/databases/main/cache.py

Co-authored-by: Eric Eastwood <[email protected]>

* Update synapse/storage/databases/main/cache.py

Co-authored-by: Eric Eastwood <[email protected]>

* MILLISEC -> MS

* Expand on comment

* Move and tweak comment about Postgres

* Use `wrap_as_background_process`

---------

Signed-off-by: Olivier Wilkinson (reivilibre) <[email protected]>
Co-authored-by: Eric Eastwood <[email protected]>
  • Loading branch information
reivilibre and MadLittleMods authored Aug 8, 2023
1 parent 8af3f33 commit f3dc6dc
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 0 deletions.
1 change: 1 addition & 0 deletions changelog.d/15868.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Remove old rows from the `cache_invalidation_stream_by_instance` table automatically (this table is unused in SQLite).
130 changes: 130 additions & 0 deletions synapse/storage/databases/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from typing import TYPE_CHECKING, Any, Collection, Iterable, List, Optional, Tuple

from synapse.api.constants import EventTypes
from synapse.config._base import Config
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.replication.tcp.streams import BackfillStream, CachesStream
from synapse.replication.tcp.streams.events import (
EventsStream,
Expand Down Expand Up @@ -52,6 +54,21 @@
# As above, but for invalidating room caches on room deletion
DELETE_ROOM_CACHE_NAME = "dr_cache_fake"

# How long between cache invalidation table cleanups, once we have caught up
# with the backlog.
REGULAR_CLEANUP_INTERVAL_MS = Config.parse_duration("1h")

# How long between cache invalidation table cleanups, before we have caught
# up with the backlog.
CATCH_UP_CLEANUP_INTERVAL_MS = Config.parse_duration("1m")

# Maximum number of cache invalidation rows to delete at once.
CLEAN_UP_MAX_BATCH_SIZE = 20_000

# Keep cache invalidations for 7 days
# (This is likely to be quite excessive.)
RETENTION_PERIOD_OF_CACHE_INVALIDATIONS_MS = Config.parse_duration("7d")


class CacheInvalidationWorkerStore(SQLBaseStore):
def __init__(
Expand Down Expand Up @@ -98,6 +115,18 @@ def __init__(
else:
self._cache_id_gen = None

# Occasionally clean up the cache invalidations stream table by deleting
# old rows.
# This is only applicable when Postgres is in use; this table is unused
# and not populated at all when SQLite is the active database engine.
if hs.config.worker.run_background_tasks and isinstance(
self.database_engine, PostgresEngine
):
self.hs.get_clock().call_later(
CATCH_UP_CLEANUP_INTERVAL_MS / 1000,
self._clean_up_cache_invalidation_wrapper,
)

async def get_all_updated_caches(
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> Tuple[List[Tuple[int, tuple]], int, bool]:
Expand Down Expand Up @@ -554,3 +583,104 @@ def get_cache_stream_token_for_writer(self, instance_name: str) -> int:
return self._cache_id_gen.get_current_token_for_writer(instance_name)
else:
return 0

@wrap_as_background_process("clean_up_old_cache_invalidations")
async def _clean_up_cache_invalidation_wrapper(self) -> None:
"""
Clean up cache invalidation stream table entries occasionally.
If we are behind (i.e. there are entries old enough to
be deleted but too many of them to be deleted in one go),
then we run slightly more frequently.
"""
delete_up_to: int = (
self.hs.get_clock().time_msec() - RETENTION_PERIOD_OF_CACHE_INVALIDATIONS_MS
)

in_backlog = await self._clean_up_batch_of_old_cache_invalidations(delete_up_to)

# Vary how long we wait before calling again depending on whether we
# are still sifting through backlog or we have caught up.
if in_backlog:
next_interval = CATCH_UP_CLEANUP_INTERVAL_MS
else:
next_interval = REGULAR_CLEANUP_INTERVAL_MS

self.hs.get_clock().call_later(
next_interval / 1000, self._clean_up_cache_invalidation_wrapper
)

async def _clean_up_batch_of_old_cache_invalidations(
self, delete_up_to_millisec: int
) -> bool:
"""
Remove old rows from the `cache_invalidation_stream_by_instance` table automatically (this table is unused in SQLite).
Up to `CLEAN_UP_BATCH_SIZE` rows will be deleted at once.
Returns true if and only if we were limited by batch size (i.e. we are in backlog:
there are more things to clean up).
"""

def _clean_up_batch_of_old_cache_invalidations_txn(
txn: LoggingTransaction,
) -> bool:
# First get the earliest stream ID
txn.execute(
"""
SELECT stream_id FROM cache_invalidation_stream_by_instance
ORDER BY stream_id ASC
LIMIT 1
"""
)
row = txn.fetchone()
if row is None:
return False
earliest_stream_id: int = row[0]

# Then find the last stream ID of the range we will delete
txn.execute(
"""
SELECT stream_id FROM cache_invalidation_stream_by_instance
WHERE stream_id <= ? AND invalidation_ts <= ?
ORDER BY stream_id DESC
LIMIT 1
""",
(earliest_stream_id + CLEAN_UP_MAX_BATCH_SIZE, delete_up_to_millisec),
)
row = txn.fetchone()
if row is None:
return False
cutoff_stream_id: int = row[0]

# Determine whether we are caught up or still catching up
txn.execute(
"""
SELECT invalidation_ts FROM cache_invalidation_stream_by_instance
WHERE stream_id > ?
ORDER BY stream_id ASC
LIMIT 1
""",
(cutoff_stream_id,),
)
row = txn.fetchone()
if row is None:
in_backlog = False
else:
# We are in backlog if the next row could have been deleted
# if we didn't have such a small batch size
in_backlog = row[0] <= delete_up_to_millisec

txn.execute(
"""
DELETE FROM cache_invalidation_stream_by_instance
WHERE ? <= stream_id AND stream_id <= ?
""",
(earliest_stream_id, cutoff_stream_id),
)

return in_backlog

return await self.db_pool.runInteraction(
"clean_up_old_cache_invalidations",
_clean_up_batch_of_old_cache_invalidations_txn,
)

0 comments on commit f3dc6dc

Please sign in to comment.