Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix up logic for delaying sending read receipts over federation. #17933

Merged
merged 3 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17933.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix long-standing bug where read receipts could get overly delayed being sent over federation.
165 changes: 88 additions & 77 deletions synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@
Iterable,
List,
Optional,
Set,
Tuple,
)

Expand Down Expand Up @@ -170,7 +169,13 @@
run_as_background_process,
wrap_as_background_process,
)
from synapse.types import JsonDict, ReadReceipt, RoomStreamToken, StrCollection
from synapse.types import (
JsonDict,
ReadReceipt,
RoomStreamToken,
StrCollection,
get_domain_from_id,
)
from synapse.util import Clock
from synapse.util.metrics import Measure
from synapse.util.retryutils import filter_destinations_by_retry_limiter
Expand Down Expand Up @@ -297,12 +302,10 @@ class _DestinationWakeupQueue:
# being woken up.
_MAX_TIME_IN_QUEUE = 30.0

# The maximum duration in seconds between waking up consecutive destination
# queues.
_MAX_DELAY = 0.1

sender: "FederationSender" = attr.ib()
clock: Clock = attr.ib()
max_delay_s: int = attr.ib()

queue: "OrderedDict[str, Literal[None]]" = attr.ib(factory=OrderedDict)
processing: bool = attr.ib(default=False)

Expand Down Expand Up @@ -332,7 +335,7 @@ async def _handle(self) -> None:
# We also add an upper bound to the delay, to gracefully handle the
# case where the queue only has a few entries in it.
current_sleep_seconds = min(
self._MAX_DELAY, self._MAX_TIME_IN_QUEUE / len(self.queue)
self.max_delay_s, self._MAX_TIME_IN_QUEUE / len(self.queue)
)

while self.queue:
Expand Down Expand Up @@ -416,19 +419,14 @@ def __init__(self, hs: "HomeServer"):
self._is_processing = False
self._last_poked_id = -1

# map from room_id to a set of PerDestinationQueues which we believe are
# awaiting a call to flush_read_receipts_for_room. The presence of an entry
# here for a given room means that we are rate-limiting RR flushes to that room,
# and that there is a pending call to _flush_rrs_for_room in the system.
self._queues_awaiting_rr_flush_by_room: Dict[str, Set[PerDestinationQueue]] = {}
self._external_cache = hs.get_external_cache()

self._rr_txn_interval_per_room_ms = (
1000.0
/ hs.config.ratelimiting.federation_rr_transactions_per_room_per_second
rr_txn_interval_per_room_s = (
1.0 / hs.config.ratelimiting.federation_rr_transactions_per_room_per_second
)
self._destination_wakeup_queue = _DestinationWakeupQueue(
self, self.clock, max_delay_s=rr_txn_interval_per_room_s
)

self._external_cache = hs.get_external_cache()
self._destination_wakeup_queue = _DestinationWakeupQueue(self, self.clock)

# Regularly wake up destinations that have outstanding PDUs to be caught up
self.clock.looping_call_now(
Expand Down Expand Up @@ -745,37 +743,48 @@ async def send_read_receipt(self, receipt: ReadReceipt) -> None:

# Some background on the rate-limiting going on here.
#
# It turns out that if we attempt to send out RRs as soon as we get them from
# a client, then we end up trying to do several hundred Hz of federation
# transactions. (The number of transactions scales as O(N^2) on the size of a
# room, since in a large room we have both more RRs coming in, and more servers
# to send them to.)
# It turns out that if we attempt to send out RRs as soon as we get them
# from a client, then we end up trying to do several hundred Hz of
# federation transactions. (The number of transactions scales as O(N^2)
# on the size of a room, since in a large room we have both more RRs
# coming in, and more servers to send them to.)
#
# This leads to a lot of CPU load, and we end up getting behind. The solution
# currently adopted is as follows:
# This leads to a lot of CPU load, and we end up getting behind. The
# solution currently adopted is to differentiate between receipts and
# destinations we should immediately send to, and those we can trickle
# the receipts to.
#
# The first receipt in a given room is sent out immediately, at time T0. Any
# further receipts are, in theory, batched up for N seconds, where N is calculated
# based on the number of servers in the room to achieve a transaction frequency
# of around 50Hz. So, for example, if there were 100 servers in the room, then
# N would be 100 / 50Hz = 2 seconds.
# The current logic is to send receipts out immediately if:
# - the room is "small", i.e. there's only N servers to send receipts
# to, and so sending out the receipts immediately doesn't cause too
# much load; or
# - the receipt is for an event that happened recently, as users
# notice if receipts are delayed when they know other users are
# currently reading the room; or
# - the receipt is being sent to the server that sent the event, so
# that users see receipts for their own receipts quickly.
#
# Then, after T+N, we flush out any receipts that have accumulated, and restart
# the timer to flush out more receipts at T+2N, etc. If no receipts accumulate,
# we stop the cycle and go back to the start.
# For destinations that we should delay sending the receipt to, we queue
# the receipts up to be sent in the next transaction, but don't trigger
# a new transaction to be sent. We then add the destination to the
# `DestinationWakeupQueue`, which will slowly iterate over each
# destination and trigger a new transaction to be sent.
#
# However, in practice, it is often possible to flush out receipts earlier: in
# particular, if we are sending a transaction to a given server anyway (for
# example, because we have a PDU or a RR in another room to send), then we may
# as well send out all of the pending RRs for that server. So it may be that
# by the time we get to T+N, we don't actually have any RRs left to send out.
# Nevertheless we continue to buffer up RRs for the room in question until we
# reach the point that no RRs arrive between timer ticks.
# However, in practice, it is often possible to send out delayed
# receipts earlier: in particular, if we are sending a transaction to a
# given server anyway (for example, because we have a PDU or a RR in
# another room to send), then we may as well send out all of the pending
# RRs for that server. So it may be that by the time we get to waking up
# the destination, we don't actually have any RRs left to send out.
#
# For even more background, see https://github.com/matrix-org/synapse/issues/4730.
# For even more background, see
# https://github.com/matrix-org/synapse/issues/4730.

room_id = receipt.room_id

# Local read receipts always have 1 event ID.
event_id = receipt.event_ids[0]

# Work out which remote servers should be poked and poke them.
domains_set = await self._storage_controllers.state.get_current_hosts_in_room_or_partial_state_approximation(
room_id
Expand All @@ -797,49 +806,51 @@ async def send_read_receipt(self, receipt: ReadReceipt) -> None:
if not domains:
return

queues_pending_flush = self._queues_awaiting_rr_flush_by_room.get(room_id)
# We now split which domains we want to wake up immediately vs which we
# want to delay waking up.
immediate_domains: StrCollection
delay_domains: StrCollection

# if there is no flush yet scheduled, we will send out these receipts with
# immediate flushes, and schedule the next flush for this room.
if queues_pending_flush is not None:
logger.debug("Queuing receipt for: %r", domains)
if len(domains) < 10:
# For "small" rooms send to all domains immediately
immediate_domains = domains
delay_domains = ()
else:
logger.debug("Sending receipt to: %r", domains)
self._schedule_rr_flush_for_room(room_id, len(domains))
metadata = await self.store.get_metadata_for_event(
receipt.room_id, event_id
)
assert metadata is not None

for domain in domains:
queue = self._get_per_destination_queue(domain)
queue.queue_read_receipt(receipt)
sender_domain = get_domain_from_id(metadata.sender)

# if there is already a RR flush pending for this room, then make sure this
# destination is registered for the flush
if queues_pending_flush is not None:
queues_pending_flush.add(queue)
if self.clock.time_msec() - metadata.received_ts < 60_000:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know it's a silly case, but maybe

Suggested change
if self.clock.time_msec() - metadata.received_ts < 60_000:
if 0 <= self.clock.time_msec() - metadata.received_ts < 60_000:

to guard against the case where someone's clock was wrong (in the future) for a while, or is currently in the past, from causing them to treat all events as recent?

Or perhaps it should have some tolerance for workers being out of sync, so perhaps -60_000 <=???...

# We always send receipts for recent messages immediately
immediate_domains = domains
delay_domains = ()
else:
queue.flush_read_receipts_for_room(room_id)

def _schedule_rr_flush_for_room(self, room_id: str, n_domains: int) -> None:
# that is going to cause approximately len(domains) transactions, so now back
# off for that multiplied by RR_TXN_INTERVAL_PER_ROOM
backoff_ms = self._rr_txn_interval_per_room_ms * n_domains

logger.debug("Scheduling RR flush in %s in %d ms", room_id, backoff_ms)
self.clock.call_later(backoff_ms, self._flush_rrs_for_room, room_id)
self._queues_awaiting_rr_flush_by_room[room_id] = set()

def _flush_rrs_for_room(self, room_id: str) -> None:
queues = self._queues_awaiting_rr_flush_by_room.pop(room_id)
logger.debug("Flushing RRs in %s to %s", room_id, queues)

if not queues:
# no more RRs arrived for this room; we are done.
return
# Otherwise, we delay waking up all destinations except for the
# sender's domain.
immediate_domains = []
delay_domains = []
for domain in domains:
if domain == sender_domain:
immediate_domains.append(domain)
else:
delay_domains.append(domain)

for domain in immediate_domains:
# Add to destination queue and wake the destination up
queue = self._get_per_destination_queue(domain)
queue.queue_read_receipt(receipt)
queue.attempt_new_transaction()

# schedule the next flush
self._schedule_rr_flush_for_room(room_id, len(queues))
for domain in delay_domains:
# Add to destination queue...
queue = self._get_per_destination_queue(domain)
queue.queue_read_receipt(receipt)

for queue in queues:
queue.flush_read_receipts_for_room(room_id)
# ... and schedule the destination to be woken up.
self._destination_wakeup_queue.add_to_queue(domain)

async def send_presence_to_destinations(
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
Expand Down
25 changes: 3 additions & 22 deletions synapse/federation/sender/per_destination_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ def __init__(
# Each receipt can only have a single receipt per
# (room ID, receipt type, user ID, thread ID) tuple.
self._pending_receipt_edus: List[Dict[str, Dict[str, Dict[str, dict]]]] = []
self._rrs_pending_flush = False

# stream_id of last successfully sent to-device message.
# NB: may be a long or an int.
Expand Down Expand Up @@ -258,15 +257,7 @@ def queue_read_receipt(self, receipt: ReadReceipt) -> None:
}
)

def flush_read_receipts_for_room(self, room_id: str) -> None:
# If there are any pending receipts for this room then force-flush them
# in a new transaction.
for edu in self._pending_receipt_edus:
if room_id in edu:
self._rrs_pending_flush = True
self.attempt_new_transaction()
# No use in checking remaining EDUs if the room was found.
break
self.mark_new_data()

def send_keyed_edu(self, edu: Edu, key: Hashable) -> None:
self._pending_edus_keyed[(edu.edu_type, key)] = edu
Expand Down Expand Up @@ -603,12 +594,9 @@ async def _catch_up_transmission_loop(self) -> None:
self._destination, last_successful_stream_ordering
)

def _get_receipt_edus(self, force_flush: bool, limit: int) -> Iterable[Edu]:
def _get_receipt_edus(self, limit: int) -> Iterable[Edu]:
if not self._pending_receipt_edus:
return
if not force_flush and not self._rrs_pending_flush:
# not yet time for this lot
return

# Send at most limit EDUs for receipts.
for content in self._pending_receipt_edus[:limit]:
Expand Down Expand Up @@ -747,7 +735,7 @@ async def __aenter__(self) -> Tuple[List[EventBase], List[Edu]]:
)

# Add read receipt EDUs.
pending_edus.extend(self.queue._get_receipt_edus(force_flush=False, limit=5))
pending_edus.extend(self.queue._get_receipt_edus(limit=5))
edu_limit = MAX_EDUS_PER_TRANSACTION - len(pending_edus)

# Next, prioritize to-device messages so that existing encryption channels
Expand Down Expand Up @@ -795,13 +783,6 @@ async def __aenter__(self) -> Tuple[List[EventBase], List[Edu]]:
if not self._pdus and not pending_edus:
return [], []

# if we've decided to send a transaction anyway, and we have room, we
# may as well send any pending RRs
if edu_limit:
pending_edus.extend(
self.queue._get_receipt_edus(force_flush=True, limit=edu_limit)
)

if self._pdus:
self._last_stream_ordering = self._pdus[
-1
Expand Down
2 changes: 2 additions & 0 deletions synapse/storage/databases/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ def _invalidate_caches_for_event(
self._attempt_to_invalidate_cache(
"get_unread_event_push_actions_by_room_for_user", (room_id,)
)
self._attempt_to_invalidate_cache("get_metadata_for_event", (room_id, event_id))

self._attempt_to_invalidate_cache("_get_max_event_pos", (room_id,))

Expand Down Expand Up @@ -446,6 +447,7 @@ def _invalidate_caches_for_room_events(self, room_id: str) -> None:
self._attempt_to_invalidate_cache("_get_state_group_for_event", None)

self._attempt_to_invalidate_cache("get_event_ordering", None)
self._attempt_to_invalidate_cache("get_metadata_for_event", (room_id,))
self._attempt_to_invalidate_cache("is_partial_state_event", None)
self._attempt_to_invalidate_cache("_get_joined_profile_from_event_id", None)

Expand Down
27 changes: 27 additions & 0 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,14 @@ class _EventRow:
outlier: bool


@attr.s(slots=True, frozen=True, auto_attribs=True)
class EventMetadata:
"""Event metadata returned by `get_metadata_for_event(..)`"""

sender: str
received_ts: int


class EventRedactBehaviour(Enum):
"""
What to do when retrieving a redacted event from the database.
Expand Down Expand Up @@ -2580,3 +2588,22 @@ async def have_finished_sliding_sync_background_jobs(self) -> bool:
_BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE,
)
)

@cached(tree=True)
async def get_metadata_for_event(
self, room_id: str, event_id: str
) -> Optional[EventMetadata]:
row = await self.db_pool.simple_select_one(
table="events",
keyvalues={"room_id": room_id, "event_id": event_id},
retcols=("sender", "received_ts"),
allow_none=True,
desc="get_metadata_for_event",
)
if row is None:
return None

return EventMetadata(
sender=row[0],
received_ts=row[1],
)
Loading
Loading