Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add basic read/write lock #15782

Merged
merged 9 commits into from
Jul 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/15782.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add read/write style cross-worker locks.
Copy link
Contributor

@MadLittleMods MadLittleMods Jun 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this looks good 👍

I haven't fully mind-melded into the code around the constraints to see something wrong (I did look at everything though) but the tests look robust enough to catch any errant behavior. If you want me to fully understand this, feel free to poke again specifically.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gave the constraints a closer look. I think they make sense.

Kept trying to think if we could do it with one table but couldn't figure anything out that gets us that same read xor write behavior.

9 changes: 8 additions & 1 deletion synapse/_scripts/synapse_port_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,11 @@
"ui_auth_sessions",
"ui_auth_sessions_credentials",
"ui_auth_sessions_ips",
# Ignore the worker locks table, as a) there shouldn't be any acquired locks
# after porting, and b) the circular foreign key constraints make it hard to
# port.
"worker_read_write_locks_mode",
"worker_read_write_locks",
}


Expand Down Expand Up @@ -803,7 +808,9 @@ def alter_table(txn: LoggingTransaction) -> None:
)
# Map from table name to args passed to `handle_table`, i.e. a tuple
# of: `postgres_size`, `table_size`, `forward_chunk`, `backward_chunk`.
tables_to_port_info_map = {r[0]: r[1:] for r in setup_res}
tables_to_port_info_map = {
r[0]: r[1:] for r in setup_res if r[0] not in IGNORED_TABLES
}

# Step 5. Do the copying.
#
Expand Down
224 changes: 168 additions & 56 deletions synapse/storage/databases/main/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
LoggingDatabaseConnection,
LoggingTransaction,
)
from synapse.storage.engines import PostgresEngine
from synapse.util import Clock
from synapse.util.stringutils import random_string

Expand Down Expand Up @@ -68,12 +69,20 @@ def __init__(
self._reactor = hs.get_reactor()
self._instance_name = hs.get_instance_id()

# A map from `(lock_name, lock_key)` to the token of any locks that we
# think we currently hold.
self._live_tokens: WeakValueDictionary[
# A map from `(lock_name, lock_key)` to lock that we think we
# currently hold.
self._live_lock_tokens: WeakValueDictionary[
Tuple[str, str], Lock
] = WeakValueDictionary()

# A map from `(lock_name, lock_key, token)` to read/write lock that we
# think we currently hold. For a given lock_name/lock_key, there can be
# multiple read locks at a time but only one write lock (no mixing read
# and write locks at the same time).
self._live_read_write_lock_tokens: WeakValueDictionary[
Tuple[str, str, str], Lock
] = WeakValueDictionary()

# When we shut down we want to remove the locks. Technically this can
# lead to a race, as we may drop the lock while we are still processing.
# However, a) it should be a small window, b) the lock is best effort
Expand All @@ -91,11 +100,13 @@ async def _on_shutdown(self) -> None:
"""Called when the server is shutting down"""
logger.info("Dropping held locks due to shutdown")

# We need to take a copy of the tokens dict as dropping the locks will
# cause the dictionary to change.
locks = dict(self._live_tokens)
# We need to take a copy of the locks as dropping the locks will cause
# the dictionary to change.
locks = list(self._live_lock_tokens.values()) + list(
self._live_read_write_lock_tokens.values()
)

for lock in locks.values():
for lock in locks:
await lock.release()

logger.info("Dropped locks due to shutdown")
Expand All @@ -122,7 +133,7 @@ async def _try_acquire_lock(
"""

# Check if this process has taken out a lock and if it's still valid.
lock = self._live_tokens.get((lock_name, lock_key))
lock = self._live_lock_tokens.get((lock_name, lock_key))
if lock and await lock.is_still_valid():
return None

Expand Down Expand Up @@ -176,61 +187,111 @@ def _try_acquire_lock_txn(txn: LoggingTransaction) -> bool:
self._reactor,
self._clock,
self,
read_write=False,
lock_name=lock_name,
lock_key=lock_key,
token=token,
)

self._live_tokens[(lock_name, lock_key)] = lock
self._live_lock_tokens[(lock_name, lock_key)] = lock

return lock

async def _is_lock_still_valid(
self, lock_name: str, lock_key: str, token: str
) -> bool:
"""Checks whether this instance still holds the lock."""
last_renewed_ts = await self.db_pool.simple_select_one_onecol(
table="worker_locks",
keyvalues={
"lock_name": lock_name,
"lock_key": lock_key,
"token": token,
},
retcol="last_renewed_ts",
allow_none=True,
desc="is_lock_still_valid",
)
return (
last_renewed_ts is not None
and self._clock.time_msec() - _LOCK_TIMEOUT_MS < last_renewed_ts
)
async def try_acquire_read_write_lock(
self,
lock_name: str,
lock_key: str,
write: bool,
) -> Optional["Lock"]:
"""Try to acquire a lock for the given name/key. Will return an async
context manager if the lock is successfully acquired, which *must* be
used (otherwise the lock will leak).
"""

async def _renew_lock(self, lock_name: str, lock_key: str, token: str) -> None:
"""Attempt to renew the lock if we still hold it."""
await self.db_pool.simple_update(
table="worker_locks",
keyvalues={
"lock_name": lock_name,
"lock_key": lock_key,
"token": token,
},
updatevalues={"last_renewed_ts": self._clock.time_msec()},
desc="renew_lock",
)
now = self._clock.time_msec()
token = random_string(6)

async def _drop_lock(self, lock_name: str, lock_key: str, token: str) -> None:
"""Attempt to drop the lock, if we still hold it"""
await self.db_pool.simple_delete(
table="worker_locks",
keyvalues={
"lock_name": lock_name,
"lock_key": lock_key,
"token": token,
},
desc="drop_lock",
def _try_acquire_read_write_lock_txn(txn: LoggingTransaction) -> None:
# We attempt to acquire the lock by inserting into
# `worker_read_write_locks` and seeing if that fails any
# constraints. If it doesn't then we have acquired the lock,
# otherwise we haven't.
#
# Before that though we clear the table of any stale locks.

delete_sql = """
DELETE FROM worker_read_write_locks
WHERE last_renewed_ts < ? AND lock_name = ? AND lock_key = ?;
"""

insert_sql = """
INSERT INTO worker_read_write_locks (lock_name, lock_key, write_lock, instance_name, token, last_renewed_ts)
VALUES (?, ?, ?, ?, ?, ?)
"""

if isinstance(self.database_engine, PostgresEngine):
# For Postgres we can send these queries at the same time.
txn.execute(
delete_sql + ";" + insert_sql,
(
# DELETE args
now - _LOCK_TIMEOUT_MS,
lock_name,
lock_key,
# UPSERT args
lock_name,
lock_key,
write,
self._instance_name,
token,
now,
),
)
else:
# For SQLite these need to be two queries.
txn.execute(
delete_sql,
(
now - _LOCK_TIMEOUT_MS,
lock_name,
lock_key,
),
)
txn.execute(
insert_sql,
(
lock_name,
lock_key,
write,
self._instance_name,
token,
now,
),
)

return

try:
await self.db_pool.runInteraction(
"try_acquire_read_write_lock",
_try_acquire_read_write_lock_txn,
)
except self.database_engine.module.IntegrityError:
return None

lock = Lock(
self._reactor,
self._clock,
self,
read_write=True,
lock_name=lock_name,
lock_key=lock_key,
token=token,
)

self._live_tokens.pop((lock_name, lock_key), None)
self._live_read_write_lock_tokens[(lock_name, lock_key, token)] = lock

return lock


class Lock:
Expand Down Expand Up @@ -259,20 +320,31 @@ def __init__(
reactor: IReactorCore,
clock: Clock,
store: LockStore,
read_write: bool,
lock_name: str,
lock_key: str,
token: str,
) -> None:
self._reactor = reactor
self._clock = clock
self._store = store
self._read_write = read_write
self._lock_name = lock_name
self._lock_key = lock_key

self._token = token

self._table = "worker_read_write_locks" if read_write else "worker_locks"

self._looping_call = clock.looping_call(
self._renew, _RENEWAL_INTERVAL_MS, store, lock_name, lock_key, token
self._renew,
_RENEWAL_INTERVAL_MS,
store,
clock,
read_write,
lock_name,
lock_key,
token,
)

self._dropped = False
Expand All @@ -281,6 +353,8 @@ def __init__(
@wrap_as_background_process("Lock._renew")
async def _renew(
store: LockStore,
clock: Clock,
read_write: bool,
lock_name: str,
lock_key: str,
token: str,
Expand All @@ -291,12 +365,34 @@ async def _renew(
don't end up with a reference to `self` in the reactor, which would stop
this from being cleaned up if we dropped the context manager.
"""
await store._renew_lock(lock_name, lock_key, token)
table = "worker_read_write_locks" if read_write else "worker_locks"
await store.db_pool.simple_update(
table=table,
keyvalues={
"lock_name": lock_name,
"lock_key": lock_key,
"token": token,
},
updatevalues={"last_renewed_ts": clock.time_msec()},
desc="renew_lock",
)

async def is_still_valid(self) -> bool:
"""Check if the lock is still held by us"""
return await self._store._is_lock_still_valid(
self._lock_name, self._lock_key, self._token
last_renewed_ts = await self._store.db_pool.simple_select_one_onecol(
table=self._table,
keyvalues={
"lock_name": self._lock_name,
"lock_key": self._lock_key,
"token": self._token,
},
retcol="last_renewed_ts",
allow_none=True,
desc="is_lock_still_valid",
)
return (
last_renewed_ts is not None
and self._clock.time_msec() - _LOCK_TIMEOUT_MS < last_renewed_ts
)

async def __aenter__(self) -> None:
Expand Down Expand Up @@ -325,7 +421,23 @@ async def release(self) -> None:
if self._looping_call.running:
self._looping_call.stop()

await self._store._drop_lock(self._lock_name, self._lock_key, self._token)
await self._store.db_pool.simple_delete(
table=self._table,
keyvalues={
"lock_name": self._lock_name,
"lock_key": self._lock_key,
"token": self._token,
},
desc="drop_lock",
)

if self._read_write:
self._store._live_read_write_lock_tokens.pop(
(self._lock_name, self._lock_key, self._token), None
)
else:
self._store._live_lock_tokens.pop((self._lock_name, self._lock_key), None)

self._dropped = True

def __del__(self) -> None:
Expand Down
Loading