Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Prevent multiple device list updates from breaking a batch send #5156

Merged
merged 25 commits into from
Jun 6, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/5156.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Prevent federation device list updates breaking when processing multiple updates at once.
65 changes: 57 additions & 8 deletions synapse/storage/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,20 +97,69 @@ def get_devices_by_remote(self, destination, from_stream_id):
def _get_devices_by_remote_txn(
self, txn, destination, from_stream_id, now_stream_id
):
# We retrieve n+1 devices from the list of outbound pokes were n is our
# maximum. We then check if the very last device has the same stream_id as the
# second-to-last device. If so, then we ignore all devices with that stream_id
# and only send the devices with a lower stream_id.
#
# If when culling the list we end up with no devices afterwards, we consider the
# device update to be too large, and simply skip the stream_id - the rationale
# being that such a large device list update is likely an error.
#
# Note: The code below assumes this value is at least 1
maximum_devices = 100
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should probably be passed in as a limit param, so that it can be derived from https://github.com/matrix-org/synapse/blob/develop/synapse/federation/sender/per_destination_queue.py#L37.

sql = """
SELECT user_id, device_id, max(stream_id) FROM device_lists_outbound_pokes
SELECT user_id, device_id, stream_id FROM device_lists_outbound_pokes
WHERE destination = ? AND ? < stream_id AND stream_id <= ? AND sent = ?
GROUP BY user_id, device_id
LIMIT 20
"""
ORDER BY stream_id
LIMIT %d
""" % (maximum_devices + 1)
txn.execute(sql, (destination, from_stream_id, now_stream_id, False))

# maps (user_id, device_id) -> stream_id
query_map = {(r[0], r[1]): r[2] for r in txn}
if not query_map:
duplicate_updates = [r for r in txn]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
duplicate_updates = [r for r in txn]
duplicate_updates = list(txn)


# Return if there are no updates to send out
if len(duplicate_updates) == 0:
return (now_stream_id, [])

if len(query_map) >= 20:
# Perform the equivalent of a GROUP BY
# Iterate through the updates list and copy any non-duplicate
# (user_id, device_id) entries
updates = [duplicate_updates[0]]
for i in range(1, len(duplicate_updates)):
update = duplicate_updates[i]
prev_update = duplicate_updates[i - 1]

if (update[0], update[1]) == (prev_update[0], prev_update[1]):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not quite following what's going on here, but I think you're assuming that duplicates can only occur on adjacent rows, which is not the case.

I suggest you just build a dict which maps from (user_id, device_id) to stream_id, and then you can iterate over the results and check the dict for each row.

# This is a duplicate, don't copy it over
# However if its stream_id is higher, copy that to the new list
if update[3] > prev_update[3]:
updates[-1][3] = update[3]
continue

# Not a duplicate, copy over
updates.append(update)

# Check if the last and second-to-last row's stream_id's are the same
richvdh marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Check if the last and second-to-last row's stream_id's are the same
# Check if the last and second-to-last rows' stream_ids are the same

offending_stream_id = None
if (
len(updates) > maximum_devices and
updates[-1][2] == updates[-2][2]
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this condition is redundant. We may as well set the stream_id_cutoff to the stream_id of the last row whenever we exceed the limit. Note that this also makes the len(updates) > 1 condition redundant.

):
offending_stream_id = updates[-1][2]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This name is a bit opaque, and actually I think you can just overwrite now_stream_id here and use it below.

Suggested change
offending_stream_id = updates[-1][2]
now_stream_id = updates[-1][2]


# maps (user_id, device_id) -> stream_id
# as long as their stream_id does not match that of the last row
query_map = {
(r[0], r[1]): r[2] for r in updates
if r[2] is not offending_stream_id
}

# If we ended up not being left over with any device updates to send
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could be phrased more clearly.

If we didn't find any updates with a stream_id lower than the cutoff, it means that there are more than limit
updates all of which have the same steam_id.

That should only happen if a client is spamming the server with new devices, in which case E2E isn't going to work well anyway. We'll just skip that stream_id and return an empty list, and continue with the next stream_id next time.

# out, then skip this stream_id
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment could do with an explanation as to what it means if we ended up in this situation.

if len(query_map) == 0:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if not query_map

return (now_stream_id + 1, [])
elif len(query_map) >= maximum_devices:
now_stream_id = max(stream_id for stream_id in itervalues(query_map))

devices = self._get_e2e_device_keys_txn(
Expand Down