Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Refactor devices changed query to pull less from DB #5559

Merged
merged 7 commits into from
Jun 27, 2019
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/5559.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Optimise devices changed query to not pull unnecessary rows from the database, reducing database load.
14 changes: 7 additions & 7 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,13 @@ def get_user_ids_changed(self, user_id, from_token):

room_ids = yield self.store.get_rooms_for_user(user_id)

# First we check if any devices have changed
changed = yield self.store.get_user_whose_devices_changed(
from_token.device_list_key
# First we check if any devices have changed for users that we share
# rooms with.
users_who_share_room = yield self.store.get_users_who_share_room_with_user(
user_id
)
changed = yield self.store.get_users_whose_devices_changed(
from_token.device_list_key, users_who_share_room
)

# Then work out if any users have since joined
Expand Down Expand Up @@ -188,10 +192,6 @@ def get_user_ids_changed(self, user_id, from_token):
break

if possibly_changed or possibly_left:
users_who_share_room = yield self.store.get_users_who_share_room_with_user(
user_id
)

# Take the intersection of the users whose devices may have changed
# and those that actually still share a room with the user
possibly_joined = possibly_changed & users_who_share_room
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the & here now redundant?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly. We do go and add entries in the loop, though they should all be new users

Expand Down
70 changes: 52 additions & 18 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -1058,40 +1058,74 @@ def _generate_sync_entry_for_device_list(
newly_left_rooms,
newly_left_users,
):
"""Generate the DeviceLists section of sync

Args:
sync_result_builder (SyncResultBuilder)
newly_joined_rooms (set[str]): Set of rooms user has joined since
previous sync
newly_joined_or_invited_users (set[str]): Set of users that have
joined or been invited to a room since previous sync.
newly_left_rooms (set[str]): Set of rooms user has left since
previous sync
newly_left_users (set[str]): Set of users that have left a room
we're in since previous sync

Returns:
Deferred[DeviceLists]
"""

user_id = sync_result_builder.sync_config.user.to_string()
since_token = sync_result_builder.since_token

# We're going to mutate these fields, so lets copy them rather than
# assume they won't get used later.
newly_joined_or_invited_users = set(newly_joined_or_invited_users)
newly_left_users = set(newly_left_users)

if since_token and since_token.device_list_key:
changed = yield self.store.get_user_whose_devices_changed(
since_token.device_list_key
# We want to figure out what user IDs the client should refetch
# device keys for, and which users we aren't going to track changes
# for anymore.
#
# For the first step we check:
# 1. if any users we share a room with have updated their devices,
# and
# 2. we also check if we've joined any new rooms, or if a user has
# joined a room we're in.
#
# For the second step we just find any users we no longer share a
# room with by looking at all users that have left a room plus users
# that were in a room we've left.

users_who_share_room = yield self.store.get_users_who_share_room_with_user(
user_id
)

# Step 1, check for changes in devices of users we share a room with
users_that_have_changed = yield self.store.get_users_whose_devices_changed(
since_token.device_list_key, users_who_share_room
)

# TODO: Be more clever than this, i.e. remove users who we already
# share a room with?
# Step 2, check for newly joined rooms
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

errm, I think this is a different step 2 to that mentioned at line 1097. Perhaps you want 1a and 1b.

for room_id in newly_joined_rooms:
joined_users = yield self.state.get_current_users_in_room(room_id)
newly_joined_or_invited_users.update(joined_users)

for room_id in newly_left_rooms:
left_users = yield self.state.get_current_users_in_room(room_id)
newly_left_users.update(left_users)

# TODO: Check that these users are actually new, i.e. either they
# weren't in the previous sync *or* they left and rejoined.
changed.update(newly_joined_or_invited_users)
users_that_have_changed.update(newly_joined_or_invited_users)

if not changed and not newly_left_users:
defer.returnValue(DeviceLists(changed=[], left=newly_left_users))
# Now find users that we no longer track
for room_id in newly_left_rooms:
left_users = yield self.state.get_current_users_in_room(room_id)
newly_left_users.update(left_users)

users_who_share_room = yield self.store.get_users_who_share_room_with_user(
user_id
)
# Remove any users that we still share a room with.
newly_left_users -= users_who_share_room

defer.returnValue(
DeviceLists(
changed=users_who_share_room & changed,
richvdh marked this conversation as resolved.
Show resolved Hide resolved
left=set(newly_left_users) - users_who_share_room,
)
DeviceLists(changed=users_that_have_changed, left=newly_left_users)
)
else:
defer.returnValue(DeviceLists(changed=[], left=[]))
Expand Down
52 changes: 40 additions & 12 deletions synapse/storage/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import Cache, SQLBaseStore, db_to_json
from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.util import batch_iter
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -391,22 +392,49 @@ def _get_devices_with_keys_by_user_txn(self, txn, user_id):

return now_stream_id, []

@defer.inlineCallbacks
def get_user_whose_devices_changed(self, from_key):
"""Get set of users whose devices have changed since `from_key`.
def get_users_whose_devices_changed(self, from_key, user_ids):
"""Get set of users whose devices have changed since `from_key` that
are in the given list of user_ids.

Args:
from_key (str): The device lists stream token
user_ids (Iterable[str])
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved

Returns:
Deferred[set[str]]: The set of user_ids whose devices have changed
since `from_key`
"""
from_key = int(from_key)
changed = self._device_list_stream_cache.get_all_entities_changed(from_key)
if changed is not None:
defer.returnValue(set(changed))

sql = """
SELECT DISTINCT user_id FROM device_lists_stream WHERE stream_id > ?
"""
rows = yield self._execute(
"get_user_whose_devices_changed", None, sql, from_key
# Get set of users who *may* have changed. Users not in the returned
# list have definitely not changed.
to_check = list(
self._device_list_stream_cache.get_entities_changed(user_ids, from_key)
)

if not to_check:
return defer.succeed(set())

def _get_users_whose_devices_changed_txn(txn):
changes = set()

sql = """
SELECT DISTINCT user_id FROM device_lists_stream
WHERE stream_id > ?
AND user_id IN (%s)
"""

for chunk in batch_iter(to_check, 100):
txn.execute(
sql % (",".join("?" for _ in chunk),), [from_key] + list(chunk)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

chunk is a tuple, so copying to an intermediate list is redundant:

Suggested change
sql % (",".join("?" for _ in chunk),), [from_key] + list(chunk)
sql % (",".join("?" for _ in chunk),), (from_key,) + chunk

)
changes.update(user_id for user_id, in txn)

return changes

return self.runInteraction(
"get_users_whose_devices_changed", _get_users_whose_devices_changed_txn
)
defer.returnValue(set(row[0] for row in rows))

def get_all_device_list_changes_for_remotes(self, from_key, to_key):
"""Return a list of `(stream_id, user_id, destination)` which is the
Expand Down