Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Use wrap_as_background_process
Browse files Browse the repository at this point in the history
  • Loading branch information
reivilibre committed Aug 7, 2023
1 parent 71fcc0e commit ca52419
Showing 1 changed file with 21 additions and 28 deletions.
49 changes: 21 additions & 28 deletions synapse/storage/databases/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from synapse.api.constants import EventTypes
from synapse.config._base import Config
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.replication.tcp.streams import BackfillStream, CachesStream
from synapse.replication.tcp.streams.events import (
EventsStream,
Expand Down Expand Up @@ -584,36 +584,29 @@ def get_cache_stream_token_for_writer(self, instance_name: str) -> int:
else:
return 0

def _clean_up_cache_invalidation_wrapper(self) -> None:
async def _clean_up_cache_invalidation_background() -> None:
"""
Clean up cache invalidation stream table entries occasionally.
If we are behind (i.e. there are entries old enough to
be deleted but too many of them to be deleted in one go),
then we run slightly more frequently.
"""
delete_up_to: int = (
self.hs.get_clock().time_msec()
- RETENTION_PERIOD_OF_CACHE_INVALIDATIONS_MS
)

in_backlog = await self._clean_up_batch_of_old_cache_invalidations(
delete_up_to
)
@wrap_as_background_process("clean_up_old_cache_invalidations")
async def _clean_up_cache_invalidation_wrapper(self) -> None:
"""
Clean up cache invalidation stream table entries occasionally.
If we are behind (i.e. there are entries old enough to
be deleted but too many of them to be deleted in one go),
then we run slightly more frequently.
"""
delete_up_to: int = (
self.hs.get_clock().time_msec() - RETENTION_PERIOD_OF_CACHE_INVALIDATIONS_MS
)

# Vary how long we wait before calling again depending on whether we
# are still sifting through backlog or we have caught up.
if in_backlog:
next_interval = CATCH_UP_CLEANUP_INTERVAL_MS
else:
next_interval = REGULAR_CLEANUP_INTERVAL_MS
in_backlog = await self._clean_up_batch_of_old_cache_invalidations(delete_up_to)

self.hs.get_clock().call_later(
next_interval / 1000, self._clean_up_cache_invalidation_wrapper
)
# Vary how long we wait before calling again depending on whether we
# are still sifting through backlog or we have caught up.
if in_backlog:
next_interval = CATCH_UP_CLEANUP_INTERVAL_MS
else:
next_interval = REGULAR_CLEANUP_INTERVAL_MS

run_as_background_process(
"clean_up_old_cache_invalidations", _clean_up_cache_invalidation_background
self.hs.get_clock().call_later(
next_interval / 1000, self._clean_up_cache_invalidation_wrapper
)

async def _clean_up_batch_of_old_cache_invalidations(
Expand Down

0 comments on commit ca52419

Please sign in to comment.