diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 4137fd50b..8d4bc19ab 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -122,6 +122,10 @@ from synapse.types import ( from synapse.util.async_helpers import Linearizer from synapse.util.metrics import Measure from synapse.util.wheel_timer import WheelTimer +from collections import defaultdict, deque +import time + +user_device_to_updates = defaultdict(lambda: defaultdict(lambda: deque(maxlen=10))) if TYPE_CHECKING: from synapse.server import HomeServer @@ -1106,7 +1110,7 @@ class PresenceHandler(BasePresenceHandler): prev_state = await self.current_state_for_user(user_id) new_fields: Dict[str, Any] = { "last_active_ts": now, - "state": _combine_device_states(devices.values()), + "state": _combine_device_states(user_id, devices.values()), } await self._update_states([prev_state.copy_and_replace(**new_fields)]) @@ -1395,7 +1399,7 @@ class PresenceHandler(BasePresenceHandler): device_state.last_sync_ts = now # Based on the state of each user's device calculate the new presence state. - presence = _combine_device_states(devices.values()) + presence = _combine_device_states(user_id, devices.values()) new_fields = {"state": presence} @@ -2021,6 +2025,7 @@ def handle_timeouts( syncing_user_devices, user_to_devices.get(user_id, {}), now, + user_id ) if new_state: changes[state.user_id] = new_state @@ -2034,6 +2039,7 @@ def handle_timeout( syncing_device_ids: AbstractSet[Tuple[str, Optional[str]]], user_devices: Dict[Optional[str], UserDevicePresenceState], now: int, + user_id: str ) -> Optional[UserPresenceState]: """Checks the presence of the user to see if any of the timers have elapsed @@ -2095,7 +2101,7 @@ def handle_timeout( # If the presence state of the devices changed, then (maybe) update # the user's overall presence state. if device_changed: - new_presence = _combine_device_states(user_devices.values()) + new_presence = _combine_device_states(user_id, user_devices.values()) if new_presence != state.state: state = state.copy_and_replace(state=new_presence) changed = True @@ -2219,35 +2225,80 @@ PRESENCE_BY_PRIORITY = { PresenceState.OFFLINE: 1, } +def _resolve_inconsistencies_for_device(device_updates: deque) -> str: + """ + Resolve potentially inconsistent state for a single device by examining its most recent presence updates. + + Args: + device_updates: A deque of (timestamp, presence state) tuples for the device. + + Returns: + The resolved presence state for the device. + """ + + # Ensure there are at least two updates to consider inconsistencies + if not device_updates: + return PresenceState.OFFLINE + if len(device_updates) < 2: + return device_updates[-1][1] + + # Extract unique states values + unique_states = {state for _, state in device_updates} + + # If there are non-consistent states within a polling timeframe, we reduce the inconsistencies + if len(unique_states) > 1: + highest_state = max(unique_states, key=lambda x: PRESENCE_BY_PRIORITY[x]) + return highest_state + + # If no inconsistencies, return the most recent state + return device_updates[-1][1] + def _combine_device_states( + user_id: str, device_states: Iterable[UserDevicePresenceState], ) -> str: """ - Find the device to use presence information from. - - Orders devices by priority, then last_active_ts. + Determine the user's presence state by considering the last changes. + If changes are not cohesive, return the state with the highest priority among the last ones. Args: + user_id: The ID of the user associated to the devices (in case it's not set in the devices states) device_states: An iterable of device presence states Return: The combined presence state. """ - # Based on (all) the user's devices calculate the new presence state. + # No devices means: OFFLINE presence = PresenceState.OFFLINE - last_active_ts = -1 - - # Find the device to use the presence state of based on the presence priority, - # but tie-break with how recently the device has been seen. - for device_state in device_states: - if (PRESENCE_BY_PRIORITY[device_state.state], device_state.last_active_ts) > ( - PRESENCE_BY_PRIORITY[presence], - last_active_ts, - ): - presence = device_state.state - last_active_ts = device_state.last_active_ts + highest_priority = 0 + if not list(device_states): + return presence + + device_to_updates = user_device_to_updates[user_id] + current_ts = int(time.time() * 1000) + threshold_ms = SYNC_ONLINE_TIMEOUT + 5 + + # Populate the map with the latest updates for each device + for state in sorted(device_states, key=lambda x: x.last_active_ts): + current_device_updates = device_to_updates[state.device_id] + # Remove updates that are older than 30000ms from the current timestamp + filtered_updates = [(ts, st) for ts, st in current_device_updates if current_ts - ts <= threshold_ms] + # Update the list directly within device_to_updates + device_to_updates[state.device_id] = filtered_updates + device_to_updates[state.device_id].append((state.last_active_ts, state.state)) + + # Resolve inconsistencies (e. g. multitab) for each device and collect the resolved states + resolved_states = [ + _resolve_inconsistencies_for_device(updates) for updates in device_to_updates.values() + ] + + # Combine the resolved device states into a single presence state for the user + for state in resolved_states: + if PRESENCE_BY_PRIORITY[state] > highest_priority: + presence = state + highest_priority = PRESENCE_BY_PRIORITY[state] return presence