Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Prevent multiple device list updates from breaking a batch send #5156

Merged
merged 25 commits into from
Jun 6, 2019

Conversation

anoadragon453
Copy link
Member

Fixes #5153 and towards fixing #5095.

This PR:

  • Ensures we take device list updates from the DB in order of ascending stream_id
  • Raises the limit of updates from 20 to 100 (presumably this must be kept under 100 EDU transaction limit)
  • Removes one more update from the DB and checks to see if its stream_id matches that of the update before it. If so, an update is running over the bounds of the batch, and we try to send everything but updates with that device ID.
  • If the above results in us not sending any updates, then we simply ignore this update as it's probably not correct behavior anyways.

cc @richvdh

@anoadragon453
Copy link
Member Author

Hm, this is currently failing as:

            SELECT user_id, device_id, max(stream_id) FROM device_lists_outbound_pokes
            WHERE destination = ? AND ? < stream_id AND stream_id <= ? AND sent = ?
            GROUP BY user_id, device_id
            ORDER BY stream_id
            LIMIT %d

is not valid SQL, failing with:

GroupingError: column "device_lists_outbound_pokes.stream_id" must appear in the GROUP BY clause or be used in an aggregate function
LINE 1: ...sent = false GROUP BY user_id, device_id ORDER BY stream_id ...

Searching tells me we need to use an ORDER BY ... OVER to combine both functions, but does this work on both Postgres and SQLite?

. o ( Perhaps this was the reason we didn't have an ORDER BY in the first place )

@codecov
Copy link

codecov bot commented May 9, 2019

Codecov Report

Merging #5156 into develop will increase coverage by 0.08%.
The diff coverage is 100%.

@@             Coverage Diff             @@
##           develop    #5156      +/-   ##
===========================================
+ Coverage    62.99%   63.08%   +0.08%     
===========================================
  Files          341      341              
  Lines        35607    35624      +17     
  Branches      5827     5828       +1     
===========================================
+ Hits         22432    22474      +42     
+ Misses       11605    11584      -21     
+ Partials      1570     1566       -4

@anoadragon453 anoadragon453 marked this pull request as ready for review May 10, 2019 02:04
@anoadragon453 anoadragon453 requested a review from a team May 10, 2019 02:06
Copy link
Member

@richvdh richvdh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you've ended up overcomplicating this; could you have another go?

Also: we try to minimise the work we do in a _txn function, since you're hanging onto a database connection which could otherwise be doing useful work. Suggest returning the raw list and doing the deduping magic in the parent?

(this will probably require you to split _get_devices_by_remote_txn in two, but afaict there is no need for both parts of it to be done on the same transaction, so that should be fine)

# being that such a large device list update is likely an error.
#
# Note: The code below assumes this value is at least 1
maximum_devices = 100
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should probably be passed in as a limit param, so that it can be derived from https://github.com/matrix-org/synapse/blob/develop/synapse/federation/sender/per_destination_queue.py#L37.

# maps (user_id, device_id) -> stream_id
query_map = {(r[0], r[1]): r[2] for r in txn}
if not query_map:
duplicate_updates = [r for r in txn]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
duplicate_updates = [r for r in txn]
duplicate_updates = list(txn)

update = duplicate_updates[i]
prev_update = duplicate_updates[i - 1]

if (update[0], update[1]) == (prev_update[0], prev_update[1]):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not quite following what's going on here, but I think you're assuming that duplicates can only occur on adjacent rows, which is not the case.

I suggest you just build a dict which maps from (user_id, device_id) to stream_id, and then you can iterate over the results and check the dict for each row.

synapse/storage/devices.py Outdated Show resolved Hide resolved
* develop:
  Revert 085ae34
  Add a DUMMY stage to captcha-only registration flow
  Make Prometheus snippet less confusing on the metrics collection doc (#4288)
  Set syslog identifiers in systemd units (#5023)
  Run Black on the tests again (#5170)
  Add AllowEncodedSlashes to apache (#5068)
  remove instructions for jessie installation (#5164)
  Run `black` on per_destination_queue
  Limit the number of EDUs in transactions to 100 as expected by receiver (#5138)
@anoadragon453 anoadragon453 requested a review from richvdh May 11, 2019 02:43
Copy link
Member

@richvdh richvdh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getting there, but I think it could do with a bit of cleaning up to be clear and elegant. I've made a few suggestions, but please don't feel constrained by them: have a look at the code yourself and ask yourself if there are things that could be simplified.

[Incidentally, some of this stuff is getting a bit gnarly. Some UTs for get_devices_by_remote wouldn't go amiss.]

@@ -351,7 +351,7 @@ def _get_new_device_messages(self, limit):
last_device_list = self._last_device_list_stream_id
# Will return at most 20 entries
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't look right any more

@@ -351,7 +351,7 @@ def _get_new_device_messages(self, limit):
last_device_list = self._last_device_list_stream_id
# Will return at most 20 entries
now_stream_id, results = yield self._store.get_devices_by_remote(
self._destination, last_device_list
self._destination, last_device_list, limit=limit - 1,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why - 1?

"""Get stream of updates to send to remote servers

Returns:
(int, list[dict]): current stream id and list of updates
"""
if limit < 1:
raise StoreError("Device limit must be at least 1")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RuntimeError is probably more appropriate here. It's not really a failure at the storage layer.


def _get_max_stream_id_for_devices_txn(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not really getting the max_stream_id. Indeed it's now just returning the now_stream_id which gets passed in, so we could simplify that and just return results.

What it's really doing is building the device EDUs, including the e2e keys. So it could do with a better name too.

"""
txn.execute(sql, (destination, from_stream_id, now_stream_id, False))
txn.execute(sql, (destination, from_stream_id, now_stream_id, False, limit + 1))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the moment there's a bit of a funny split where the fancy logic for clipping the list is in the parent, but we've got a +1 and the long comment in this function. I think it would be more intuitive to make _get_devices_by_remote_txn dumb, and move the +1 and the comment to the parent too.

synapse/storage/devices.py Outdated Show resolved Hide resolved

# If we ended up not being left over with any device updates to send
# out, then skip this stream_id
if len(query_map) == 0:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if not query_map

query_map[key] = update[2]

# If we ended up not being left over with any device updates to send
# out, then skip this stream_id
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment could do with an explanation as to what it means if we ended up in this situation.

continue

key = (update[0], update[1])
if key in query_map and query_map[key] >= update[2]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be inclined to write this:

query_map[key] = max(query_map.get(key, 0), update[2])

# out, then skip this stream_id
if len(query_map) == 0:
defer.returnValue((now_stream_id + 1, []))
elif len(query_map) >= limit:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

query_map is deduplicated, so it might have fewer than limit entries even if we hit the limit. As above, I think you can combine the clip of now_stream_id with this.

@anoadragon453 anoadragon453 requested a review from a team May 23, 2019 12:24
@richvdh richvdh self-assigned this May 30, 2019
@@ -72,7 +72,8 @@ def get_devices_by_user(self, user_id):

defer.returnValue({d["device_id"]: d for d in devices})

def get_devices_by_remote(self, destination, from_stream_id):
@defer.inlineCallbacks
def get_devices_by_remote(self, destination, from_stream_id, limit=100):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the default for limit is redundant, and probably just confusing?


stream_id_cutoff = now_stream_id + 1

# Check if the last and second-to-last row's stream_id's are the same
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Check if the last and second-to-last row's stream_id's are the same
# Check if the last and second-to-last rows' stream_ids are the same

if (
len(updates) > 1 and
len(updates) > limit and
updates[-1][2] == updates[-2][2]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this condition is redundant. We may as well set the stream_id_cutoff to the stream_id of the last row whenever we exceed the limit. Note that this also makes the len(updates) > 1 condition redundant.

# thus we're just going to assume it was a client-side error and not
# send them. We return an empty list of updates instead.
if not query_map:
defer.returnValue((now_stream_id + 1, []))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note that the token we return here is used as the from_token next time, which is exclusive, so this should be now_stream_id rather than now_stream_id + 1.

Also: there is no need to skip everything between the stream_id_cutoff and now_stream_id: we should return stream_id_cutoff - 1 instead.

key = (update[0], update[1])
query_map[key] = max(query_map.get(key, 0), update[2])

# If we ended up not being left over with any device updates to send
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could be phrased more clearly.

If we didn't find any updates with a stream_id lower than the cutoff, it means that there are more than limit
updates all of which have the same steam_id.

That should only happen if a client is spamming the server with new devices, in which case E2E isn't going to work well anyway. We'll just skip that stream_id and return an empty list, and continue with the next stream_id next time.

self._get_device_update_edus_by_remote_txn,
destination,
from_stream_id,
now_stream_id,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we not want stream_id_cutoff - 1 here too?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, _get_device_update_edus_by_remote_txn doesn't seem to use this param. let's kill it!

limit,
)

defer.returnValue((now_stream_id, results))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and here.

maybe we can get rid of stream_id_cutoff altogether, and just use now_stream_id ?

):
"""Return device update information for a given remote destination"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a bit of docstring on the params and results (here and _get_device_update_edus_by_remote_txn) wouldn't go amiss.

tests/storage/test_devices.py Outdated Show resolved Hide resolved
tests/storage/test_devices.py Show resolved Hide resolved

def _get_device_update_edus_by_remote_txn(
self, txn, destination, from_stream_id, now_stream_id, query_map, limit
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

limit is unused as well.


def _get_device_update_edus_by_remote_txn(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this need to be a txn function? if you make it do runInteraction itself, we can free up the db connection for the second half of the function.

@richvdh richvdh merged commit 2d1d7b7 into develop Jun 6, 2019
@richvdh richvdh deleted the anoa/device_list_update_fixes branch June 6, 2019 22:54
@richvdh richvdh restored the anoa/device_list_update_fixes branch June 6, 2019 22:54
@anoadragon453 anoadragon453 deleted the anoa/device_list_update_fixes branch June 6, 2019 23:18
neilisfragile added a commit that referenced this pull request Jun 7, 2019
Synapse 1.0.0rc1 (2019-06-07)
=============================

Features
--------

- Synapse now more efficiently collates room statistics. ([\#4338](#4338), [\#5260](#5260), [\#5324](#5324))
- Add experimental support for relations (aka reactions and edits). ([\#5220](#5220))
- Ability to configure default room version. ([\#5223](#5223), [\#5249](#5249))
- Allow configuring a range for the account validity startup job. ([\#5276](#5276))
- CAS login will now hit the r0 API, not the deprecated v1 one. ([\#5286](#5286))
- Validate federation server TLS certificates by default (implements [MSC1711](https://github.com/matrix-org/matrix-doc/blob/master/proposals/1711-x509-for-federation.md)). ([\#5359](#5359))
- Update /_matrix/client/versions to reference support for r0.5.0. ([\#5360](#5360))
- Add a script to generate new signing-key files. ([\#5361](#5361))
- Update upgrade and installation guides ahead of 1.0. ([\#5371](#5371))
- Replace the `perspectives` configuration section with `trusted_key_servers`, and make validating the signatures on responses optional (since TLS will do this job for us). ([\#5374](#5374))
- Add ability to perform password reset via email without trusting the identity server. ([\#5377](#5377))
- Set default room version to v4. ([\#5379](#5379))

Bugfixes
--------

- Fixes client-server API not sending "m.heroes" to lazy-load /sync requests when a rooms name or its canonical alias are empty. Thanks to @dnaf for this work! ([\#5089](#5089))
- Prevent federation device list updates breaking when processing multiple updates at once. ([\#5156](#5156))
- Fix worker registration bug caused by ClientReaderSlavedStore being unable to see get_profileinfo. ([\#5200](#5200))
- Fix race when backfilling in rooms with worker mode. ([\#5221](#5221))
- Fix appservice timestamp massaging. ([\#5233](#5233))
- Ensure that server_keys fetched via a notary server are correctly signed. ([\#5251](#5251))
- Show the correct error when logging out and access token is missing. ([\#5256](#5256))
- Fix error code when there is an invalid parameter on /_matrix/client/r0/publicRooms ([\#5257](#5257))
- Fix error when downloading thumbnail with missing width/height parameter. ([\#5258](#5258))
- Fix schema update for account validity. ([\#5268](#5268))
- Fix bug where we leaked extremities when we soft failed events, leading to performance degradation. ([\#5274](#5274), [\#5278](#5278), [\#5291](#5291))
- Fix "db txn 'update_presence' from sentinel context" log messages. ([\#5275](#5275))
- Fix dropped logcontexts during high outbound traffic. ([\#5277](#5277))
- Fix a bug where it is not possible to get events in the federation format with the request `GET /_matrix/client/r0/rooms/{roomId}/messages`. ([\#5293](#5293))
- Fix performance problems with the rooms stats background update. ([\#5294](#5294))
- Fix noisy 'no key for server' logs. ([\#5300](#5300))
- Fix bug where a notary server would sometimes forget old keys. ([\#5307](#5307))
- Prevent users from setting huge displaynames and avatar URLs. ([\#5309](#5309))
- Fix handling of failures when processing incoming events where calling `/event_auth` on remote server fails. ([\#5317](#5317))
- Ensure that we have an up-to-date copy of the signing key when validating incoming federation requests. ([\#5321](#5321))
- Fix various problems which made the signing-key notary server time out for some requests. ([\#5333](#5333))
- Fix bug which would make certain operations (such as room joins) block for 20 minutes while attemoting to fetch verification keys. ([\#5334](#5334))
- Fix a bug where we could rapidly mark a server as unreachable even though it was only down for a few minutes. ([\#5335](#5335), [\#5340](#5340))
- Fix a bug where account validity renewal emails could only be sent when email notifs were enabled. ([\#5341](#5341))
- Fix failure when fetching batches of events during backfill, etc. ([\#5342](#5342))
- Add a new room version where the timestamps on events are checked against the validity periods on signing keys. ([\#5348](#5348), [\#5354](#5354))
- Fix room stats and presence background updates to correctly handle missing events. ([\#5352](#5352))
- Include left members in room summaries' heroes. ([\#5355](#5355))
- Fix `federation_custom_ca_list` configuration option. ([\#5362](#5362))
- Fix missing logcontext warnings on shutdown. ([\#5369](#5369))

Improved Documentation
----------------------

- Fix docs on resetting the user directory. ([\#5282](#5282))
- Fix notes about ACME in the MSC1711 faq. ([\#5357](#5357))

Internal Changes
----------------

- Synapse will now serve the experimental "room complexity" API endpoint. ([\#5216](#5216))
- The base classes for the v1 and v2_alpha REST APIs have been unified. ([\#5226](#5226), [\#5328](#5328))
- Simplifications and comments in do_auth. ([\#5227](#5227))
- Remove urllib3 pin as requests 2.22.0 has been released supporting urllib3 1.25.2. ([\#5230](#5230))
- Preparatory work for key-validity features. ([\#5232](#5232), [\#5234](#5234), [\#5235](#5235), [\#5236](#5236), [\#5237](#5237), [\#5244](#5244), [\#5250](#5250), [\#5296](#5296), [\#5299](#5299), [\#5343](#5343), [\#5347](#5347), [\#5356](#5356))
- Specify the type of reCAPTCHA key to use. ([\#5283](#5283))
- Improve sample config for monthly active user blocking. ([\#5284](#5284))
- Remove spurious debug from MatrixFederationHttpClient.get_json. ([\#5287](#5287))
- Improve logging for logcontext leaks. ([\#5288](#5288))
- Clarify that the admin change password API logs the user out. ([\#5303](#5303))
- New installs will now use the v54 full schema, rather than the full schema v14 and applying incremental updates to v54. ([\#5320](#5320))
- Improve docstrings on MatrixFederationClient. ([\#5332](#5332))
- Clean up FederationClient.get_events for clarity. ([\#5344](#5344))
- Various improvements to debug logging. ([\#5353](#5353))
- Don't run CI build checks until sample config check has passed. ([\#5370](#5370))
- Automatically retry buildkite builds (max twice) when an agent is lost. ([\#5380](#5380))
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants