-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Remove get rooms for user with stream ordering #13991
Changes from 5 commits
7fe92f2
c75836b
2ce65b5
ebe268c
516dd2c
ef2950a
a82b8d7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Optimise queries used to get a users rooms during sync. Contributed by Nick @ Beeper (@fizzadar). |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1314,36 +1314,89 @@ async def generate_sync_result( | |
At the end, we transfer data from the `sync_result_builder` to a new `SyncResult` | ||
instance to signify that the sync calculation is complete. | ||
""" | ||
|
||
user_id = sync_config.user.to_string() | ||
app_service = self.store.get_app_service_by_user_id(user_id) | ||
if app_service: | ||
# We no longer support AS users using /sync directly. | ||
# See https://github.com/matrix-org/matrix-doc/issues/1144 | ||
raise NotImplementedError() | ||
|
||
# Note: we get the users room list *before* we get the current token, this | ||
# avoids checking back in history if rooms are joined after the token is fetched. | ||
mutable_joined_room_ids = set(await self.store.get_rooms_for_user(user_id)) | ||
|
||
# NB: The now_token gets changed by some of the generate_sync_* methods, | ||
# this is due to some of the underlying streams not supporting the ability | ||
# to query up to a given point. | ||
# Always use the `now_token` in `SyncResultBuilder` | ||
now_token = self.event_sources.get_current_token() | ||
log_kv({"now_token": now_token}) | ||
|
||
# Since we fetched the users room list before the token, there's a small window | ||
# during which membership events may have been persisted, so we fetch these now | ||
# and modify the joined room list for any changes between the get_rooms_for_user | ||
# call and the get_current_token call. | ||
membership_change_events = [] | ||
if since_token: | ||
membership_change_events = await self.store.get_membership_changes_for_user( | ||
user_id, since_token.room_key, now_token.room_key, self.rooms_to_exclude | ||
) | ||
|
||
mem_last_change_by_room_id: Dict[str, EventBase] = {} | ||
for event in membership_change_events: | ||
mem_last_change_by_room_id[event.room_id] = event | ||
|
||
# For the latest membership event in each room found, add/remove the room ID | ||
# from the joined room list accordingly. In this case we only care if the | ||
# latest change is JOIN. | ||
|
||
for room_id, event in mem_last_change_by_room_id.items(): | ||
logger.info( | ||
"User membership change between getting rooms and current token: %s %s %s", | ||
user_id, | ||
event.membership, | ||
room_id, | ||
) | ||
# User joined a room - we have to then check the room state to ensure we | ||
# respect any bans if there's a race between the join and ban events. | ||
if event.membership == Membership.JOIN: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Copied from old draft PR I think we need to be careful that we handle a join -> join transition? e.g. a display name update? I think we could look at the current_state_deltas table to get a more accurate sense of how the state has changed? erikjohnston erikjohnston 3 days ago Actually, no we don't need to worry about join -> join transitions. I was thinking of a different un-applicable scenario. current_state_deltas probably still the right table? Fizzadar Fizzadar 2 days ago Was just trying to figure this out but I don't think current_state_deltas has all the information we need - specifically the membership field in event content means we can't determine whether an event was a join or invite. Pulling in the events would solve this, but at that point it's essentially the same as the current get_membership_changes_for_user call I think, is there a reason this isn't suitable? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
You'd want to join against Though it occurs to me that we only keep There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a way to ask clients to re-init sync currently? Assuming this is possible I think all the membership change lookups in sync could be optimised out to use the I think this fits into a subsequent PR though, definitely out of scope here! As it stands I believe this change will behave as expected; the sync handler already uses the membership change events to calculate room changes. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the best that we can do currently is soft log the user out, which isn't the most ideal. Will double check if the sliding sync proposal has a mechanism There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, sliding sync supports it |
||
assert event.internal_metadata.stream_ordering | ||
extrems = await self.store.get_forward_extremities_for_room_at_stream_ordering( | ||
room_id, event.internal_metadata.stream_ordering | ||
) | ||
user_ids_in_room = await self.state.get_current_user_ids_in_room( | ||
room_id, extrems | ||
) | ||
if user_id in user_ids_in_room: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is an expensive check. Can we instead check if the given user is now in the room? I think that might be good enough? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah I think that works - ef2950a. |
||
mutable_joined_room_ids.add(room_id) | ||
# The user left the room, or left and was re-invited but not joined yet | ||
else: | ||
mutable_joined_room_ids.discard(room_id) | ||
|
||
# Now we have our list of joined room IDs, exclude as configured and freeze | ||
joined_room_ids = frozenset( | ||
( | ||
room_id | ||
for room_id in mutable_joined_room_ids | ||
if room_id not in self.rooms_to_exclude | ||
) | ||
) | ||
|
||
logger.debug( | ||
"Calculating sync response for %r between %s and %s", | ||
sync_config.user, | ||
since_token, | ||
now_token, | ||
) | ||
|
||
user_id = sync_config.user.to_string() | ||
app_service = self.store.get_app_service_by_user_id(user_id) | ||
if app_service: | ||
# We no longer support AS users using /sync directly. | ||
# See https://github.com/matrix-org/matrix-doc/issues/1144 | ||
raise NotImplementedError() | ||
else: | ||
joined_room_ids = await self.get_rooms_for_user_at( | ||
user_id, now_token.room_key | ||
) | ||
sync_result_builder = SyncResultBuilder( | ||
sync_config, | ||
full_state, | ||
since_token=since_token, | ||
now_token=now_token, | ||
joined_room_ids=joined_room_ids, | ||
membership_change_events=membership_change_events, | ||
) | ||
|
||
logger.debug("Fetching account data") | ||
|
@@ -1824,19 +1877,12 @@ async def _have_rooms_changed( | |
|
||
Does not modify the `sync_result_builder`. | ||
""" | ||
user_id = sync_result_builder.sync_config.user.to_string() | ||
since_token = sync_result_builder.since_token | ||
now_token = sync_result_builder.now_token | ||
membership_change_events = sync_result_builder.membership_change_events | ||
|
||
assert since_token | ||
|
||
# Get a list of membership change events that have happened to the user | ||
# requesting the sync. | ||
membership_changes = await self.store.get_membership_changes_for_user( | ||
user_id, since_token.room_key, now_token.room_key | ||
) | ||
|
||
if membership_changes: | ||
if membership_change_events: | ||
return True | ||
|
||
stream_id = since_token.room_key.stream | ||
|
@@ -1875,16 +1921,10 @@ async def _get_rooms_changed( | |
since_token = sync_result_builder.since_token | ||
now_token = sync_result_builder.now_token | ||
sync_config = sync_result_builder.sync_config | ||
membership_change_events = sync_result_builder.membership_change_events | ||
|
||
assert since_token | ||
|
||
# TODO: we've already called this function and ran this query in | ||
# _have_rooms_changed. We could keep the results in memory to avoid a | ||
# second query, at the cost of more complicated source code. | ||
membership_change_events = await self.store.get_membership_changes_for_user( | ||
user_id, since_token.room_key, now_token.room_key, self.rooms_to_exclude | ||
) | ||
|
||
mem_change_events_by_room_id: Dict[str, List[EventBase]] = {} | ||
for event in membership_change_events: | ||
mem_change_events_by_room_id.setdefault(event.room_id, []).append(event) | ||
|
@@ -2385,60 +2425,6 @@ async def _generate_room_entry( | |
else: | ||
raise Exception("Unrecognized rtype: %r", room_builder.rtype) | ||
|
||
async def get_rooms_for_user_at( | ||
self, | ||
user_id: str, | ||
room_key: RoomStreamToken, | ||
) -> FrozenSet[str]: | ||
"""Get set of joined rooms for a user at the given stream ordering. | ||
|
||
The stream ordering *must* be recent, otherwise this may throw an | ||
exception if older than a month. (This function is called with the | ||
current token, which should be perfectly fine). | ||
|
||
Args: | ||
user_id | ||
stream_ordering | ||
|
||
ReturnValue: | ||
Set of room_ids the user is in at given stream_ordering. | ||
""" | ||
joined_rooms = await self.store.get_rooms_for_user_with_stream_ordering(user_id) | ||
|
||
joined_room_ids = set() | ||
|
||
# We need to check that the stream ordering of the join for each room | ||
# is before the stream_ordering asked for. This might not be the case | ||
# if the user joins a room between us getting the current token and | ||
# calling `get_rooms_for_user_with_stream_ordering`. | ||
# If the membership's stream ordering is after the given stream | ||
# ordering, we need to go and work out if the user was in the room | ||
# before. | ||
# We also need to check whether the room should be excluded from sync | ||
# responses as per the homeserver config. | ||
for joined_room in joined_rooms: | ||
if joined_room.room_id in self.rooms_to_exclude: | ||
continue | ||
|
||
if not joined_room.event_pos.persisted_after(room_key): | ||
joined_room_ids.add(joined_room.room_id) | ||
continue | ||
|
||
logger.info("User joined room after current token: %s", joined_room.room_id) | ||
|
||
extrems = ( | ||
await self.store.get_forward_extremities_for_room_at_stream_ordering( | ||
joined_room.room_id, joined_room.event_pos.stream | ||
) | ||
) | ||
user_ids_in_room = await self.state.get_current_user_ids_in_room( | ||
joined_room.room_id, extrems | ||
) | ||
if user_id in user_ids_in_room: | ||
joined_room_ids.add(joined_room.room_id) | ||
|
||
return frozenset(joined_room_ids) | ||
|
||
|
||
def _action_has_highlight(actions: List[JsonDict]) -> bool: | ||
for action in actions: | ||
|
@@ -2535,6 +2521,7 @@ class SyncResultBuilder: | |
since_token: Optional[StreamToken] | ||
now_token: StreamToken | ||
joined_room_ids: FrozenSet[str] | ||
membership_change_events: List[EventBase] | ||
|
||
presence: List[UserPresenceState] = attr.Factory(list) | ||
account_data: List[JsonDict] = attr.Factory(list) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copied from old draft PR
erikjohnston 3 days ago
Isn't since_token significantly older than we need? Can we take the current token before we fetch get_rooms_for_user and then only get the changes between the two?
Member Author
Fizzadar Fizzadar 2 days ago
Yes much older - if we switch to using deltas as above this would be far more efficient.
In the current state of this PR though it makes sense to call get_membership_changes_for_user between the since_token & now because we later fetch this anyway (twice), so passing it around as part of the SyncResultBuilder removes those duplicate lookups.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, right. But doesn't that mean we'll be handling a bunch of spurious changes below? Specifically won't we hit the log line for every membership change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh nice spot yeah that'll potentially check tonnes of events 🤦 Have pushed up a82b8d7 which limits it to the specific window between get rooms + current token.