-
Notifications
You must be signed in to change notification settings - Fork 248
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Sliding sync: Add connection tracking to the account_data
extension
#17695
Changes from all commits
119b752
20be70d
1c1eaf7
aae9e91
68c5cd8
48ab85f
65dc3aa
7d5484e
2b2e37e
7359b51
e995a98
83c7040
3fa6960
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Fix bug where room account data would not correctly be sent down sliding sync for old rooms. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,7 +19,6 @@ | |
AbstractSet, | ||
ChainMap, | ||
Dict, | ||
List, | ||
Mapping, | ||
MutableMapping, | ||
Optional, | ||
|
@@ -119,6 +118,8 @@ async def get_extensions_response( | |
if sync_config.extensions.account_data is not None: | ||
account_data_response = await self.get_account_data_extension_response( | ||
sync_config=sync_config, | ||
previous_connection_state=previous_connection_state, | ||
new_connection_state=new_connection_state, | ||
actual_lists=actual_lists, | ||
actual_room_ids=actual_room_ids, | ||
account_data_request=sync_config.extensions.account_data, | ||
|
@@ -361,6 +362,8 @@ async def get_e2ee_extension_response( | |
async def get_account_data_extension_response( | ||
self, | ||
sync_config: SlidingSyncConfig, | ||
previous_connection_state: "PerConnectionState", | ||
new_connection_state: "MutablePerConnectionState", | ||
actual_lists: Mapping[str, SlidingSyncResult.SlidingWindowList], | ||
actual_room_ids: Set[str], | ||
account_data_request: SlidingSyncConfig.Extensions.AccountDataExtension, | ||
|
@@ -425,25 +428,51 @@ async def get_account_data_extension_response( | |
|
||
# Fetch room account data | ||
# | ||
# List of -> Mapping from room_id to mapping of `type` to `content` of room | ||
# account data events. | ||
# | ||
# This is is a list so we can avoid making copies of immutable data and instead | ||
# just provide multiple maps that need to be combined. Normally, we could | ||
# reach for `ChainMap` in this scenario, but this is a nested map and accessing | ||
# the ChainMap by room_id won't combine the two maps for that room (we would | ||
# need a new `NestedChainMap` type class). | ||
account_data_by_room_maps: List[Mapping[str, Mapping[str, JsonMapping]]] = [] | ||
account_data_by_room_map: MutableMapping[str, Mapping[str, JsonMapping]] = {} | ||
relevant_room_ids = self.find_relevant_room_ids_for_extension( | ||
requested_lists=account_data_request.lists, | ||
requested_room_ids=account_data_request.rooms, | ||
actual_lists=actual_lists, | ||
actual_room_ids=actual_room_ids, | ||
) | ||
if len(relevant_room_ids) > 0: | ||
# We need to handle the different cases depending on if we have sent | ||
# down account data previously or not, so we split the relevant | ||
# rooms up into different collections based on status. | ||
live_rooms = set() | ||
previously_rooms: Dict[str, int] = {} | ||
initial_rooms = set() | ||
|
||
for room_id in relevant_room_ids: | ||
if not from_token: | ||
initial_rooms.add(room_id) | ||
continue | ||
|
||
room_status = previous_connection_state.account_data.have_sent_room( | ||
room_id | ||
) | ||
if room_status.status == HaveSentRoomFlag.LIVE: | ||
live_rooms.add(room_id) | ||
elif room_status.status == HaveSentRoomFlag.PREVIOUSLY: | ||
assert room_status.last_token is not None | ||
previously_rooms[room_id] = room_status.last_token | ||
elif room_status.status == HaveSentRoomFlag.NEVER: | ||
initial_rooms.add(room_id) | ||
else: | ||
assert_never(room_status.status) | ||
|
||
# We fetch all room account data since the from_token. This is so | ||
# that we can record which rooms have updates that haven't been sent | ||
# down. | ||
# | ||
# Mapping from room_id to mapping of `type` to `content` of room account | ||
# data events. | ||
all_updates_since_the_from_token: Mapping[ | ||
str, Mapping[str, JsonMapping] | ||
] = {} | ||
if from_token is not None: | ||
# TODO: This should take into account the `from_token` and `to_token` | ||
account_data_by_room_map = ( | ||
all_updates_since_the_from_token = ( | ||
await self.store.get_updated_room_account_data_for_user( | ||
user_id, from_token.stream_token.account_data_key | ||
) | ||
|
@@ -456,58 +485,108 @@ async def get_account_data_extension_response( | |
user_id, from_token.stream_token.account_data_key | ||
) | ||
for room_id, tags in tags_by_room.items(): | ||
account_data_by_room_map.setdefault(room_id, {})[ | ||
all_updates_since_the_from_token.setdefault(room_id, {})[ | ||
AccountDataTypes.TAG | ||
] = {"tags": tags} | ||
|
||
account_data_by_room_maps.append(account_data_by_room_map) | ||
else: | ||
# TODO: This should take into account the `to_token` | ||
immutable_account_data_by_room_map = ( | ||
await self.store.get_room_account_data_for_user(user_id) | ||
) | ||
account_data_by_room_maps.append(immutable_account_data_by_room_map) | ||
# For live rooms we just get the updates from `all_updates_since_the_from_token` | ||
if live_rooms: | ||
for room_id in all_updates_since_the_from_token.keys() & live_rooms: | ||
account_data_by_room_map[room_id] = ( | ||
all_updates_since_the_from_token[room_id] | ||
) | ||
|
||
# Add room tags | ||
# | ||
# TODO: This should take into account the `to_token` | ||
tags_by_room = await self.store.get_tags_for_user(user_id) | ||
account_data_by_room_maps.append( | ||
{ | ||
room_id: {AccountDataTypes.TAG: {"tags": tags}} | ||
for room_id, tags in tags_by_room.items() | ||
} | ||
# For previously and initial rooms we query each room individually. | ||
if previously_rooms or initial_rooms: | ||
|
||
async def handle_previously(room_id: str) -> None: | ||
# Either get updates or all account data in the room | ||
# depending on if the room state is PREVIOUSLY or NEVER. | ||
previous_token = previously_rooms.get(room_id) | ||
if previous_token is not None: | ||
room_account_data = await ( | ||
self.store.get_updated_room_account_data_for_user_for_room( | ||
user_id=user_id, | ||
room_id=room_id, | ||
from_stream_id=previous_token, | ||
to_stream_id=to_token.account_data_key, | ||
) | ||
) | ||
|
||
# Add room tags | ||
changed = await self.store.has_tags_changed_for_room( | ||
user_id=user_id, | ||
room_id=room_id, | ||
from_stream_id=previous_token, | ||
to_stream_id=to_token.account_data_key, | ||
) | ||
if changed: | ||
# XXX: Ideally, this should take into account the `to_token` | ||
# and return the set of tags at that time but we don't track | ||
# changes to tags so we just have to return all tags for the | ||
# room. | ||
immutable_tag_map = await self.store.get_tags_for_room( | ||
user_id, room_id | ||
) | ||
room_account_data[AccountDataTypes.TAG] = { | ||
"tags": immutable_tag_map | ||
} | ||
|
||
# Only add an entry if there were any updates. | ||
if room_account_data: | ||
account_data_by_room_map[room_id] = room_account_data | ||
else: | ||
# TODO: This should take into account the `to_token` | ||
immutable_room_account_data = ( | ||
await self.store.get_account_data_for_room(user_id, room_id) | ||
) | ||
|
||
# Add room tags | ||
# | ||
# XXX: Ideally, this should take into account the `to_token` | ||
# and return the set of tags at that time but we don't track | ||
# changes to tags so we just have to return all tags for the | ||
# room. | ||
immutable_tag_map = await self.store.get_tags_for_room( | ||
user_id, room_id | ||
) | ||
|
||
account_data_by_room_map[room_id] = ChainMap( | ||
{AccountDataTypes.TAG: {"tags": immutable_tag_map}} | ||
if immutable_tag_map | ||
else {}, | ||
Comment on lines
+555
to
+557
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we want to always return tags in an intial sync? The problem is we can't tell no tag account data from empty set of tags. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We probably don't? Though I don't think it really matters There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I got it right. Only show the empty tag set if we previously told the client about tags. Check if the tests match your expectations |
||
# Cast is safe because `ChainMap` only mutates the top-most map, | ||
# see https://github.com/python/typeshed/issues/8430 | ||
cast( | ||
MutableMapping[str, JsonMapping], | ||
immutable_room_account_data, | ||
), | ||
) | ||
|
||
# We handle these rooms concurrently to speed it up. | ||
await concurrently_execute( | ||
handle_previously, | ||
previously_rooms.keys() | initial_rooms, | ||
limit=20, | ||
) | ||
|
||
# Filter down to the relevant rooms ... and combine the maps | ||
relevant_account_data_by_room_map: MutableMapping[ | ||
str, Mapping[str, JsonMapping] | ||
] = {} | ||
for room_id in relevant_room_ids: | ||
# We want to avoid adding empty maps for relevant rooms that have no room | ||
# account data so do a quick check to see if it's in any of the maps. | ||
is_room_in_maps = False | ||
for room_map in account_data_by_room_maps: | ||
if room_id in room_map: | ||
is_room_in_maps = True | ||
break | ||
# Now record which rooms are now up to data, and which rooms have | ||
# pending updates to send. | ||
new_connection_state.account_data.record_sent_rooms(relevant_room_ids) | ||
missing_updates = ( | ||
all_updates_since_the_from_token.keys() - relevant_room_ids | ||
) | ||
if missing_updates: | ||
# If we have missing updates then we must have had a from_token. | ||
assert from_token is not None | ||
|
||
# If we found the room in any of the maps, combine the maps for that room | ||
if is_room_in_maps: | ||
relevant_account_data_by_room_map[room_id] = ChainMap( | ||
{}, | ||
*( | ||
# Cast is safe because `ChainMap` only mutates the top-most map, | ||
# see https://github.com/python/typeshed/issues/8430 | ||
cast(MutableMapping[str, JsonMapping], room_map[room_id]) | ||
for room_map in account_data_by_room_maps | ||
if room_map.get(room_id) | ||
), | ||
new_connection_state.account_data.record_unsent_rooms( | ||
missing_updates, from_token.stream_token.account_data_key | ||
) | ||
|
||
return SlidingSyncResult.Extensions.AccountDataExtension( | ||
global_account_data_map=global_account_data_map, | ||
account_data_by_room_map=relevant_account_data_by_room_map, | ||
account_data_by_room_map=account_data_by_room_map, | ||
) | ||
|
||
@trace | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cache for
get_tags_for_room(...)
is being added in #17730