Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Reduce DB load of /sync when using presence #12885

Merged
merged 6 commits into from
May 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/12885.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Reduce database load of `/sync` when presence is enabled.
75 changes: 48 additions & 27 deletions synapse/storage/databases/main/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Tuple, cast
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Tuple, cast

from synapse.api.presence import PresenceState, UserPresenceState
from synapse.replication.tcp.streams import PresenceStream
Expand All @@ -22,6 +22,7 @@
LoggingDatabaseConnection,
LoggingTransaction,
)
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.engines import PostgresEngine
from synapse.storage.types import Connection
from synapse.storage.util.id_generators import (
Expand Down Expand Up @@ -56,7 +57,7 @@ def __init__(
)


class PresenceStore(PresenceBackgroundUpdateStore):
class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore):
def __init__(
self,
database: DatabasePool,
Expand Down Expand Up @@ -281,20 +282,30 @@ async def should_user_receive_full_presence_with_token(
True if the user should have full presence sent to them, False otherwise.
"""

def _should_user_receive_full_presence_with_token_txn(
txn: LoggingTransaction,
) -> bool:
sql = """
SELECT 1 FROM users_to_send_full_presence_to
WHERE user_id = ?
AND presence_stream_id >= ?
"""
txn.execute(sql, (user_id, from_token))
return bool(txn.fetchone())
token = await self._get_full_presence_stream_token_for_user(user_id)
if token is None:
return False

return await self.db_pool.runInteraction(
"should_user_receive_full_presence_with_token",
_should_user_receive_full_presence_with_token_txn,
return from_token <= token

@cached()
async def _get_full_presence_stream_token_for_user(
self, user_id: str
) -> Optional[int]:
"""Get the presence token corresponding to the last full presence update
for this user.

If the user presents a sync token with a presence stream token at least
as old as the result, then we need to send them a full presence update.

If this user has never needed a full presence update, returns `None`.
"""
return await self.db_pool.simple_select_one_onecol(
table="users_to_send_full_presence_to",
keyvalues={"user_id": user_id},
retcol="presence_stream_id",
allow_none=True,
desc="_get_full_presence_stream_token_for_user",
)

async def add_users_to_send_full_presence_to(self, user_ids: Iterable[str]) -> None:
Expand All @@ -307,18 +318,28 @@ async def add_users_to_send_full_presence_to(self, user_ids: Iterable[str]) -> N
# Add user entries to the table, updating the presence_stream_id column if the user already
# exists in the table.
presence_stream_id = self._presence_id_gen.get_current_token()
await self.db_pool.simple_upsert_many(
table="users_to_send_full_presence_to",
key_names=("user_id",),
key_values=[(user_id,) for user_id in user_ids],
value_names=("presence_stream_id",),
# We save the current presence stream ID token along with the user ID entry so
# that when a user /sync's, even if they syncing multiple times across separate
# devices at different times, each device will receive full presence once - when
# the presence stream ID in their sync token is less than the one in the table
# for their user ID.
value_values=[(presence_stream_id,) for _ in user_ids],
desc="add_users_to_send_full_presence_to",

def _add_users_to_send_full_presence_to(txn: LoggingTransaction) -> None:
self.db_pool.simple_upsert_many_txn(
txn,
table="users_to_send_full_presence_to",
key_names=("user_id",),
key_values=[(user_id,) for user_id in user_ids],
value_names=("presence_stream_id",),
# We save the current presence stream ID token along with the user ID entry so
# that when a user /sync's, even if they syncing multiple times across separate
# devices at different times, each device will receive full presence once - when
# the presence stream ID in their sync token is less than the one in the table
# for their user ID.
value_values=[(presence_stream_id,) for _ in user_ids],
)
for user_id in user_ids:
self._invalidate_cache_and_stream(
txn, self._get_full_presence_stream_token_for_user, (user_id,)
)

return await self.db_pool.runInteraction(
"add_users_to_send_full_presence_to", _add_users_to_send_full_presence_to
)

async def get_presence_for_all_users(
Expand Down