-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Prevent multiple device list updates from breaking a batch send #5156
Changes from 5 commits
93b1a2d
0ee2a8b
56cf3fb
a843676
80b6e1a
c988c1e
7770494
0cb7a60
2e5e32e
7684259
d9078b6
a674d8c
84db73d
cf77343
fcda607
69c0c1b
06fa759
5c7bb2c
c674c95
3dbb5f0
322e1a3
da6a2ad
b536cdd
2231131
0de7b17
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Prevent federation device list updates breaking when processing multiple updates at once. |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -97,20 +97,69 @@ def get_devices_by_remote(self, destination, from_stream_id): | |||||
def _get_devices_by_remote_txn( | ||||||
self, txn, destination, from_stream_id, now_stream_id | ||||||
): | ||||||
# We retrieve n+1 devices from the list of outbound pokes were n is our | ||||||
# maximum. We then check if the very last device has the same stream_id as the | ||||||
# second-to-last device. If so, then we ignore all devices with that stream_id | ||||||
# and only send the devices with a lower stream_id. | ||||||
# | ||||||
# If when culling the list we end up with no devices afterwards, we consider the | ||||||
# device update to be too large, and simply skip the stream_id - the rationale | ||||||
# being that such a large device list update is likely an error. | ||||||
# | ||||||
# Note: The code below assumes this value is at least 1 | ||||||
maximum_devices = 100 | ||||||
sql = """ | ||||||
SELECT user_id, device_id, max(stream_id) FROM device_lists_outbound_pokes | ||||||
SELECT user_id, device_id, stream_id FROM device_lists_outbound_pokes | ||||||
WHERE destination = ? AND ? < stream_id AND stream_id <= ? AND sent = ? | ||||||
GROUP BY user_id, device_id | ||||||
LIMIT 20 | ||||||
""" | ||||||
ORDER BY stream_id | ||||||
LIMIT %d | ||||||
""" % (maximum_devices + 1) | ||||||
txn.execute(sql, (destination, from_stream_id, now_stream_id, False)) | ||||||
|
||||||
# maps (user_id, device_id) -> stream_id | ||||||
query_map = {(r[0], r[1]): r[2] for r in txn} | ||||||
if not query_map: | ||||||
duplicate_updates = [r for r in txn] | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
|
||||||
# Return if there are no updates to send out | ||||||
if len(duplicate_updates) == 0: | ||||||
return (now_stream_id, []) | ||||||
|
||||||
if len(query_map) >= 20: | ||||||
# Perform the equivalent of a GROUP BY | ||||||
# Iterate through the updates list and copy any non-duplicate | ||||||
# (user_id, device_id) entries | ||||||
updates = [duplicate_updates[0]] | ||||||
for i in range(1, len(duplicate_updates)): | ||||||
update = duplicate_updates[i] | ||||||
prev_update = duplicate_updates[i - 1] | ||||||
|
||||||
if (update[0], update[1]) == (prev_update[0], prev_update[1]): | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not quite following what's going on here, but I think you're assuming that duplicates can only occur on adjacent rows, which is not the case. I suggest you just build a dict which maps from |
||||||
# This is a duplicate, don't copy it over | ||||||
# However if its stream_id is higher, copy that to the new list | ||||||
if update[3] > prev_update[3]: | ||||||
updates[-1][3] = update[3] | ||||||
continue | ||||||
|
||||||
# Not a duplicate, copy over | ||||||
updates.append(update) | ||||||
|
||||||
# Check if the last and second-to-last row's stream_id's are the same | ||||||
richvdh marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
offending_stream_id = None | ||||||
if ( | ||||||
len(updates) > maximum_devices and | ||||||
updates[-1][2] == updates[-2][2] | ||||||
anoadragon453 marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this condition is redundant. We may as well set the |
||||||
): | ||||||
offending_stream_id = updates[-1][2] | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This name is a bit opaque, and actually I think you can just overwrite
Suggested change
|
||||||
|
||||||
# maps (user_id, device_id) -> stream_id | ||||||
# as long as their stream_id does not match that of the last row | ||||||
query_map = { | ||||||
(r[0], r[1]): r[2] for r in updates | ||||||
if r[2] is not offending_stream_id | ||||||
} | ||||||
|
||||||
# If we ended up not being left over with any device updates to send | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this could be phrased more clearly.
|
||||||
# out, then skip this stream_id | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this comment could do with an explanation as to what it means if we ended up in this situation. |
||||||
if len(query_map) == 0: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||||||
return (now_stream_id + 1, []) | ||||||
elif len(query_map) >= maximum_devices: | ||||||
now_stream_id = max(stream_id for stream_id in itervalues(query_map)) | ||||||
|
||||||
devices = self._get_e2e_device_keys_txn( | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should probably be passed in as a
limit
param, so that it can be derived from https://github.com/matrix-org/synapse/blob/develop/synapse/federation/sender/per_destination_queue.py#L37.