Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Factor out backgroung updates #6178

Merged
merged 14 commits into from
Oct 9, 2019
Merged
1 change: 1 addition & 0 deletions changelog.d/6178.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Make the `synapse_port_db` script create the right indexes on a new PostgreSQL database.
babolivier marked this conversation as resolved.
Show resolved Hide resolved
207 changes: 106 additions & 101 deletions synapse/storage/client_ips.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,9 @@
LAST_SEEN_GRANULARITY = 120 * 1000


class ClientIpStore(background_updates.BackgroundUpdateStore):
class ClientIpBackgroundUpdateStore(background_updates.BackgroundUpdateStore):
def __init__(self, db_conn, hs):

self.client_ip_last_seen = Cache(
name="client_ip_last_seen", keylen=4, max_entries=50000 * CACHE_SIZE_FACTOR
)

super(ClientIpStore, self).__init__(db_conn, hs)

self.user_ips_max_age = hs.config.user_ips_max_age
super(ClientIpBackgroundUpdateStore, self).__init__(db_conn, hs)

self.register_background_index_update(
"user_ips_device_index",
Expand Down Expand Up @@ -92,19 +85,6 @@ def __init__(self, db_conn, hs):
"devices_last_seen", self._devices_last_seen_update
)

# (user_id, access_token, ip,) -> (user_agent, device_id, last_seen)
self._batch_row_update = {}

self._client_ip_looper = self._clock.looping_call(
self._update_client_ips_batch, 5 * 1000
)
self.hs.get_reactor().addSystemEventTrigger(
"before", "shutdown", self._update_client_ips_batch
)

if self.user_ips_max_age:
self._clock.looping_call(self._prune_old_user_ips, 5 * 1000)

@defer.inlineCallbacks
def _remove_user_ip_nonunique(self, progress, batch_size):
def f(conn):
Expand Down Expand Up @@ -303,6 +283,110 @@ def remove(txn):

return batch_size

@defer.inlineCallbacks
def _devices_last_seen_update(self, progress, batch_size):
"""Background update to insert last seen info into devices table
"""

last_user_id = progress.get("last_user_id", "")
last_device_id = progress.get("last_device_id", "")

def _devices_last_seen_update_txn(txn):
# This consists of two queries:
#
# 1. The sub-query searches for the next N devices and joins
# against user_ips to find the max last_seen associated with
# that device.
# 2. The outer query then joins again against user_ips on
# user/device/last_seen. This *should* hopefully only
# return one row, but if it does return more than one then
# we'll just end up updating the same device row multiple
# times, which is fine.

if self.database_engine.supports_tuple_comparison:
where_clause = "(user_id, device_id) > (?, ?)"
where_args = [last_user_id, last_device_id]
else:
# We explicitly do a `user_id >= ? AND (...)` here to ensure
# that an index is used, as doing `user_id > ? OR (user_id = ? AND ...)`
# makes it hard for query optimiser to tell that it can use the
# index on user_id
where_clause = "user_id >= ? AND (user_id > ? OR device_id > ?)"
where_args = [last_user_id, last_user_id, last_device_id]

sql = """
SELECT
last_seen, ip, user_agent, user_id, device_id
FROM (
SELECT
user_id, device_id, MAX(u.last_seen) AS last_seen
FROM devices
INNER JOIN user_ips AS u USING (user_id, device_id)
WHERE %(where_clause)s
GROUP BY user_id, device_id
ORDER BY user_id ASC, device_id ASC
LIMIT ?
) c
INNER JOIN user_ips AS u USING (user_id, device_id, last_seen)
""" % {
"where_clause": where_clause
}
txn.execute(sql, where_args + [batch_size])

rows = txn.fetchall()
if not rows:
return 0

sql = """
UPDATE devices
SET last_seen = ?, ip = ?, user_agent = ?
WHERE user_id = ? AND device_id = ?
"""
txn.execute_batch(sql, rows)

_, _, _, user_id, device_id = rows[-1]
self._background_update_progress_txn(
txn,
"devices_last_seen",
{"last_user_id": user_id, "last_device_id": device_id},
)

return len(rows)

updated = yield self.runInteraction(
"_devices_last_seen_update", _devices_last_seen_update_txn
)

if not updated:
yield self._end_background_update("devices_last_seen")

return updated


class ClientIpStore(ClientIpBackgroundUpdateStore):
def __init__(self, db_conn, hs):

self.client_ip_last_seen = Cache(
name="client_ip_last_seen", keylen=4, max_entries=50000 * CACHE_SIZE_FACTOR
)

super(ClientIpStore, self).__init__(db_conn, hs)

self.user_ips_max_age = hs.config.user_ips_max_age

# (user_id, access_token, ip,) -> (user_agent, device_id, last_seen)
self._batch_row_update = {}

self._client_ip_looper = self._clock.looping_call(
self._update_client_ips_batch, 5 * 1000
)
self.hs.get_reactor().addSystemEventTrigger(
"before", "shutdown", self._update_client_ips_batch
)

if self.user_ips_max_age:
self._clock.looping_call(self._prune_old_user_ips, 5 * 1000)

@defer.inlineCallbacks
def insert_client_ip(
self, user_id, access_token, ip, user_agent, device_id, now=None
Expand Down Expand Up @@ -454,85 +538,6 @@ def get_user_ip_and_agents(self, user):
for (access_token, ip), (user_agent, last_seen) in iteritems(results)
)

@defer.inlineCallbacks
def _devices_last_seen_update(self, progress, batch_size):
"""Background update to insert last seen info into devices table
"""

last_user_id = progress.get("last_user_id", "")
last_device_id = progress.get("last_device_id", "")

def _devices_last_seen_update_txn(txn):
# This consists of two queries:
#
# 1. The sub-query searches for the next N devices and joins
# against user_ips to find the max last_seen associated with
# that device.
# 2. The outer query then joins again against user_ips on
# user/device/last_seen. This *should* hopefully only
# return one row, but if it does return more than one then
# we'll just end up updating the same device row multiple
# times, which is fine.

if self.database_engine.supports_tuple_comparison:
where_clause = "(user_id, device_id) > (?, ?)"
where_args = [last_user_id, last_device_id]
else:
# We explicitly do a `user_id >= ? AND (...)` here to ensure
# that an index is used, as doing `user_id > ? OR (user_id = ? AND ...)`
# makes it hard for query optimiser to tell that it can use the
# index on user_id
where_clause = "user_id >= ? AND (user_id > ? OR device_id > ?)"
where_args = [last_user_id, last_user_id, last_device_id]

sql = """
SELECT
last_seen, ip, user_agent, user_id, device_id
FROM (
SELECT
user_id, device_id, MAX(u.last_seen) AS last_seen
FROM devices
INNER JOIN user_ips AS u USING (user_id, device_id)
WHERE %(where_clause)s
GROUP BY user_id, device_id
ORDER BY user_id ASC, device_id ASC
LIMIT ?
) c
INNER JOIN user_ips AS u USING (user_id, device_id, last_seen)
""" % {
"where_clause": where_clause
}
txn.execute(sql, where_args + [batch_size])

rows = txn.fetchall()
if not rows:
return 0

sql = """
UPDATE devices
SET last_seen = ?, ip = ?, user_agent = ?
WHERE user_id = ? AND device_id = ?
"""
txn.execute_batch(sql, rows)

_, _, _, user_id, device_id = rows[-1]
self._background_update_progress_txn(
txn,
"devices_last_seen",
{"last_user_id": user_id, "last_device_id": device_id},
)

return len(rows)

updated = yield self.runInteraction(
"_devices_last_seen_update", _devices_last_seen_update_txn
)

if not updated:
yield self._end_background_update("devices_last_seen")

return updated

@wrap_as_background_process("prune_old_user_ips")
async def _prune_old_user_ips(self):
"""Removes entries in user IPs older than the configured period.
Expand Down
37 changes: 22 additions & 15 deletions synapse/storage/deviceinbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,11 @@ def delete_messages_for_remote_destination_txn(txn):
)


class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
class DeviceInboxBackgroundUpdateStore(BackgroundUpdateStore):
DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"

def __init__(self, db_conn, hs):
super(DeviceInboxStore, self).__init__(db_conn, hs)
super(DeviceInboxBackgroundUpdateStore, self).__init__(db_conn, hs)

self.register_background_index_update(
"device_inbox_stream_index",
Expand All @@ -225,6 +225,26 @@ def __init__(self, db_conn, hs):
self.DEVICE_INBOX_STREAM_ID, self._background_drop_index_device_inbox
)

@defer.inlineCallbacks
def _background_drop_index_device_inbox(self, progress, batch_size):
def reindex_txn(conn):
txn = conn.cursor()
txn.execute("DROP INDEX IF EXISTS device_inbox_stream_id")
txn.close()

yield self.runWithConnection(reindex_txn)

yield self._end_background_update(self.DEVICE_INBOX_STREAM_ID)

return 1


class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore):
DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"

def __init__(self, db_conn, hs):
super(DeviceInboxStore, self).__init__(db_conn, hs)

# Map of (user_id, device_id) to the last stream_id that has been
# deleted up to. This is so that we can no op deletions.
self._last_device_delete_cache = ExpiringCache(
Expand Down Expand Up @@ -435,16 +455,3 @@ def get_all_new_device_messages_txn(txn):
return self.runInteraction(
"get_all_new_device_messages", get_all_new_device_messages_txn
)

@defer.inlineCallbacks
def _background_drop_index_device_inbox(self, progress, batch_size):
def reindex_txn(conn):
txn = conn.cursor()
txn.execute("DROP INDEX IF EXISTS device_inbox_stream_id")
txn.close()

yield self.runWithConnection(reindex_txn)

yield self._end_background_update(self.DEVICE_INBOX_STREAM_ID)

return 1
49 changes: 27 additions & 22 deletions synapse/storage/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -512,17 +512,9 @@ def get_device_list_last_stream_id_for_remotes(self, user_ids):
return results


class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
class DeviceBackgroundUpdateStore(BackgroundUpdateStore):
def __init__(self, db_conn, hs):
super(DeviceStore, self).__init__(db_conn, hs)

# Map of (user_id, device_id) -> bool. If there is an entry that implies
# the device exists.
self.device_id_exists_cache = Cache(
name="device_id_exists", keylen=2, max_entries=10000
)

self._clock.looping_call(self._prune_old_outbound_device_pokes, 60 * 60 * 1000)
super(DeviceBackgroundUpdateStore, self).__init__(db_conn, hs)

self.register_background_index_update(
"device_lists_stream_idx",
Expand Down Expand Up @@ -555,6 +547,31 @@ def __init__(self, db_conn, hs):
self._drop_device_list_streams_non_unique_indexes,
)

@defer.inlineCallbacks
def _drop_device_list_streams_non_unique_indexes(self, progress, batch_size):
def f(conn):
txn = conn.cursor()
txn.execute("DROP INDEX IF EXISTS device_lists_remote_cache_id")
txn.execute("DROP INDEX IF EXISTS device_lists_remote_extremeties_id")
txn.close()

yield self.runWithConnection(f)
yield self._end_background_update(DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES)
return 1


class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
def __init__(self, db_conn, hs):
super(DeviceStore, self).__init__(db_conn, hs)

# Map of (user_id, device_id) -> bool. If there is an entry that implies
# the device exists.
self.device_id_exists_cache = Cache(
name="device_id_exists", keylen=2, max_entries=10000
)

self._clock.looping_call(self._prune_old_outbound_device_pokes, 60 * 60 * 1000)

@defer.inlineCallbacks
def store_device(self, user_id, device_id, initial_device_display_name):
"""Ensure the given device is known; add it to the store if not
Expand Down Expand Up @@ -910,15 +927,3 @@ def _prune_txn(txn):
"_prune_old_outbound_device_pokes",
_prune_txn,
)

@defer.inlineCallbacks
def _drop_device_list_streams_non_unique_indexes(self, progress, batch_size):
def f(conn):
txn = conn.cursor()
txn.execute("DROP INDEX IF EXISTS device_lists_remote_cache_id")
txn.execute("DROP INDEX IF EXISTS device_lists_remote_extremeties_id")
txn.close()

yield self.runWithConnection(f)
yield self._end_background_update(DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES)
return 1
13 changes: 9 additions & 4 deletions synapse/storage/media_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@
from synapse.storage.background_updates import BackgroundUpdateStore


class MediaRepositoryStore(BackgroundUpdateStore):
"""Persistence for attachments and avatars"""

class MediaRepositoryBackgroundUpdateStore(BackgroundUpdateStore):
def __init__(self, db_conn, hs):
super(MediaRepositoryStore, self).__init__(db_conn, hs)
super(MediaRepositoryBackgroundUpdateStore, self).__init__(db_conn, hs)

self.register_background_index_update(
update_name="local_media_repository_url_idx",
Expand All @@ -29,6 +27,13 @@ def __init__(self, db_conn, hs):
where_clause="url_cache IS NOT NULL",
)


class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
"""Persistence for attachments and avatars"""

def __init__(self, db_conn, hs):
super(MediaRepositoryStore, self).__init__(db_conn, hs)

def get_local_media(self, media_id):
"""Get the metadata for a local piece of media
Returns:
Expand Down
Loading