Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

fix race condiftion in calling initialise_reserved_users #4081

Merged
merged 7 commits into from
Oct 25, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changelog.d/4081.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Fix race condition where config defined reserved users were not being added to
the monthly active user list prior to the homeserver reactor firing up
8 changes: 0 additions & 8 deletions synapse/app/homeserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -553,14 +553,6 @@ def start_generate_monthly_active_users():
generate_monthly_active_users,
)

# XXX is this really supposed to be a background process? it looks
# like it needs to complete before some of the other stuff runs.
run_as_background_process(
"initialise_reserved_users",
hs.get_datastore().initialise_reserved_users,
hs.config.mau_limits_reserved_threepids,
)

start_generate_monthly_active_users()
if hs.config.limit_usage_by_mau:
clock.looping_call(start_generate_monthly_active_users, 5 * 60 * 1000)
Expand Down
1 change: 1 addition & 0 deletions synapse/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ def setup(self):
logger.info("Setting up.")
with self.get_db_conn() as conn:
self.datastore = self.DATASTORE_CLASS(conn, self)
conn.commit()
logger.info("Finished setting up.")

def get_reactor(self):
Expand Down
71 changes: 52 additions & 19 deletions synapse/storage/monthly_active_users.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,29 @@ def __init__(self, dbconn, hs):
self._clock = hs.get_clock()
self.hs = hs
self.reserved_users = ()
# Do not add more reserved users than the total allowable number
self._initialise_reserved_users(
dbconn.cursor(),
hs.config.mau_limits_reserved_threepids[:self.hs.config.max_mau_value],
)

@defer.inlineCallbacks
def initialise_reserved_users(self, threepids):
store = self.hs.get_datastore()
def _initialise_reserved_users(self, txn, threepids):
"""Ensures that reserved threepids are accounted for in the MAU table, should
be called on start up.

Args:
txn (cursor):
threepids (list[dict]): List of threepid dicts to reserve
"""
reserved_user_list = []

# Do not add more reserved users than the total allowable number
for tp in threepids[:self.hs.config.max_mau_value]:
user_id = yield store.get_user_id_by_threepid(
for tp in threepids:
user_id = self.get_user_id_by_threepid_txn(
txn,
tp["medium"], tp["address"]
)
if user_id:
yield self.upsert_monthly_active_user(user_id)
self.upsert_monthly_active_user_txn(txn, user_id)
neilisfragile marked this conversation as resolved.
Show resolved Hide resolved
reserved_user_list.append(user_id)
else:
logger.warning(
Expand All @@ -55,8 +65,7 @@ def initialise_reserved_users(self, threepids):

@defer.inlineCallbacks
def reap_monthly_active_users(self):
"""
Cleans out monthly active user table to ensure that no stale
"""Cleans out monthly active user table to ensure that no stale
entries exist.

Returns:
Expand Down Expand Up @@ -165,19 +174,44 @@ def get_registered_reserved_users_count(self):

@defer.inlineCallbacks
def upsert_monthly_active_user(self, user_id):
"""Updates or inserts the user into the monthly active user table, which
is used to track the current MAU usage of the server

Args:
user_id (str): user to add/update
"""
Updates or inserts monthly active user member
Arguments:
user_id (str): user to add/update
Deferred[bool]: True if a new entry was created, False if an
existing one was updated.
is_insert = yield self.runInteraction(
"upsert_monthly_active_user", self.upsert_monthly_active_user_txn,
user_id
)

if is_insert:
self.user_last_seen_monthly_active.invalidate((user_id,))
self.get_monthly_active_count.invalidate(())

def upsert_monthly_active_user_txn(self, txn, user_id):
"""Updates or inserts monthly active user member

Note that, after calling this method, it will generally be necessary
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest an extra blank line here:

Suggested change
Note that, after calling this method, it will generally be necessary
Note that, after calling this method, it will generally be necessary

to invalidate the caches on user_last_seen_monthly_active and
get_monthly_active_count. We can't do that here, because we are running
in a database thread rather than the main thread, and we can't call
txn.call_after because txn may not be a LoggingTransaction.

Args:
txn (cursor):
user_id (str): user to add/update

Returns:
bool: True if a new entry was created, False if an
neilisfragile marked this conversation as resolved.
Show resolved Hide resolved
existing one was updated.
"""
# Am consciously deciding to lock the table on the basis that is ought
# never be a big table and alternative approaches (batching multiple
# upserts into a single txn) introduced a lot of extra complexity.
# See https://github.com/matrix-org/synapse/issues/3854 for more
is_insert = yield self._simple_upsert(
desc="upsert_monthly_active_user",
is_insert = self._simple_upsert_txn(
txn,
table="monthly_active_users",
keyvalues={
"user_id": user_id,
Expand All @@ -186,9 +220,8 @@ def upsert_monthly_active_user(self, user_id):
"timestamp": int(self._clock.time_msec()),
},
)
if is_insert:
self.user_last_seen_monthly_active.invalidate((user_id,))
neilisfragile marked this conversation as resolved.
Show resolved Hide resolved
self.get_monthly_active_count.invalidate(())

return is_insert

@cached(num_args=1)
def user_last_seen_monthly_active(self, user_id):
Expand Down
35 changes: 31 additions & 4 deletions synapse/storage/registration.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,17 +474,44 @@ def user_get_threepids(self, user_id):

@defer.inlineCallbacks
def get_user_id_by_threepid(self, medium, address):
ret = yield self._simple_select_one(
"""Returns user id from threepid

Args:
medium (str): threepid medium e.g. email
address (str): threepid address e.g. [email protected]

Returns:
Deferred[str|None]: user id or None if no user id/threepid mapping exists
"""
user_id = yield self.runInteraction(
"get_user_id_by_threepid", self.get_user_id_by_threepid_txn,
medium, address
)
defer.returnValue(user_id)

def get_user_id_by_threepid_txn(self, txn, medium, address):
neilisfragile marked this conversation as resolved.
Show resolved Hide resolved
"""Returns user id from threepid

Args:
txn (cursor):
medium (str): threepid medium e.g. email
address (str): threepid address e.g. [email protected]

Returns:
str|None: user id or None if no user id/threepid mapping exists
"""
ret = self._simple_select_one_txn(
txn,
"user_threepids",
{
"medium": medium,
"address": address
},
['user_id'], True, 'get_user_id_by_threepid'
['user_id'], True
)
if ret:
defer.returnValue(ret['user_id'])
defer.returnValue(None)
return ret['user_id']
return None

def user_delete_threepid(self, user_id, medium, address):
return self._simple_delete(
Expand Down
10 changes: 8 additions & 2 deletions tests/storage/test_monthly_active_users.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ def test_initialise_reserved_users(self):
now = int(self.hs.get_clock().time_msec())
self.store.user_add_threepid(user1, "email", user1_email, now, now)
self.store.user_add_threepid(user2, "email", user2_email, now, now)
self.store.initialise_reserved_users(threepids)

self.store.runInteraction(
"initialise", self.store._initialise_reserved_users, threepids
)
self.pump()

active_count = self.store.get_monthly_active_count()
Expand Down Expand Up @@ -199,7 +202,10 @@ def test_get_reserved_real_user_account(self):
{'medium': 'email', 'address': user2_email},
]
self.hs.config.mau_limits_reserved_threepids = threepids
self.store.initialise_reserved_users(threepids)
self.store.runInteraction(
"initialise", self.store._initialise_reserved_users, threepids
)

self.pump()
count = self.store.get_registered_reserved_users_count()
self.assertEquals(self.get_success(count), 0)
Expand Down