Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix rare bug that broke looping calls #16210

Merged
merged 2 commits into from
Aug 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/16210.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix rare bug that broke looping calls, which could lead to e.g. linearly increasing memory usage. Introduced in v1.90.0.
Copy link
Contributor

@DMRobertson DMRobertson Aug 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To check: the symptom here is that GC never runs successfully?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but this could hit any looping call, so its not just hitting GC.

36 changes: 22 additions & 14 deletions synapse/storage/databases/main/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from typing import TYPE_CHECKING, Collection, Optional, Set, Tuple, Type
from weakref import WeakValueDictionary

from twisted.internet.interfaces import IReactorCore
from twisted.internet.task import LoopingCall

from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore
Expand All @@ -26,6 +26,7 @@
LoggingDatabaseConnection,
LoggingTransaction,
)
from synapse.types import ISynapseReactor
from synapse.util import Clock
from synapse.util.stringutils import random_string

Expand Down Expand Up @@ -358,7 +359,7 @@ class Lock:

def __init__(
self,
reactor: IReactorCore,
reactor: ISynapseReactor,
clock: Clock,
store: LockStore,
read_write: bool,
Expand All @@ -377,19 +378,25 @@ def __init__(

self._table = "worker_read_write_locks" if read_write else "worker_locks"

self._looping_call = clock.looping_call(
# We might be called from a non-main thread, so we defer setting up the
# looping call.
self._looping_call: Optional[LoopingCall] = None
reactor.callFromThread(self._setup_looping_call)

self._dropped = False

def _setup_looping_call(self) -> None:
self._looping_call = self._clock.looping_call(
self._renew,
_RENEWAL_INTERVAL_MS,
store,
clock,
read_write,
lock_name,
lock_key,
token,
self._store,
self._clock,
self._read_write,
self._lock_name,
self._lock_key,
self._token,
)

self._dropped = False

@staticmethod
@wrap_as_background_process("Lock._renew")
async def _renew(
Expand Down Expand Up @@ -459,7 +466,7 @@ async def release(self) -> None:
if self._dropped:
return

if self._looping_call.running:
if self._looping_call and self._looping_call.running:
self._looping_call.stop()

await self._store.db_pool.simple_delete(
Expand All @@ -486,8 +493,9 @@ def __del__(self) -> None:
# We should not be dropped without the lock being released (unless
# we're shutting down), but if we are then let's at least stop
# renewing the lock.
if self._looping_call.running:
self._looping_call.stop()
if self._looping_call and self._looping_call.running:
# We might be called from a non-main thread.
self._reactor.callFromThread(self._looping_call.stop)

if self._reactor.running:
logger.error(
Expand Down
2 changes: 2 additions & 0 deletions tests/storage/databases/main/test_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ def test_timeout_lock(self) -> None:

# We simulate the process getting stuck by cancelling the looping call
# that keeps the lock active.
assert lock._looping_call
lock._looping_call.stop()

# Wait for the lock to timeout.
Expand Down Expand Up @@ -403,6 +404,7 @@ def test_timeout_lock(self) -> None:

# We simulate the process getting stuck by cancelling the looping call
# that keeps the lock active.
assert lock._looping_call
lock._looping_call.stop()

# Wait for the lock to timeout.
Expand Down