Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Include thread information when sending receipts over federation. #14466

Merged
merged 13 commits into from
Nov 28, 2022
1 change: 1 addition & 0 deletions changelog.d/14466.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug introduced in Synapse 1.70.0 where a receipt's thread ID was not sent over federation.
183 changes: 120 additions & 63 deletions synapse/federation/sender/per_destination_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from synapse.logging.opentracing import SynapseTags, set_tag
from synapse.metrics import sent_transactions_counter
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import ReadReceipt
from synapse.types import JsonDict, ReadReceipt
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
from synapse.visibility import filter_events_for_server

Expand Down Expand Up @@ -136,8 +136,11 @@ def __init__(
# destination
self._pending_presence: Dict[str, UserPresenceState] = {}

# room_id -> receipt_type -> user_id -> receipt_dict
self._pending_rrs: Dict[str, Dict[str, Dict[str, dict]]] = {}
# List of room_id -> receipt_type -> user_id -> receipt_dict,
#
# Each receipt can only have a single receipt per
# (room ID, receipt type, user ID, thread ID) tuple.
self._pending_receipt_edus: List[Dict[str, Dict[str, Dict[str, dict]]]] = []
self._rrs_pending_flush = False

# stream_id of last successfully sent to-device message.
Expand Down Expand Up @@ -202,17 +205,53 @@ def queue_read_receipt(self, receipt: ReadReceipt) -> None:
Args:
receipt: receipt to be queued
"""
self._pending_rrs.setdefault(receipt.room_id, {}).setdefault(
receipt.receipt_type, {}
)[receipt.user_id] = {"event_ids": receipt.event_ids, "data": receipt.data}
serialized_receipt: JsonDict = {
"event_ids": receipt.event_ids,
"data": receipt.data,
}
if receipt.thread_id is not None:
serialized_receipt["data"]["thread_id"] = receipt.thread_id

# Find which EDU to add this receipt to. There's three situations depending
# on the (room ID, receipt type, user, thread ID) tuple:
#
# 1. If it fully matches, clobber the information.
# 2. If it is missing, add the information.
# 3. If the subset tuple of (room ID, receipt type, user) matches, check
# the next EDU (or add a new EDU).
for edu in self._pending_receipt_edus:
receipt_content = edu.setdefault(receipt.room_id, {}).setdefault(
receipt.receipt_type, {}
)
# If this room ID, receipt type, user ID is not in this EDU, OR if
# the full tuple matches, use the current EDU.
if (
receipt.user_id not in receipt_content
or receipt_content[receipt.user_id].get("thread_id")
== receipt.thread_id
):
receipt_content[receipt.user_id] = serialized_receipt
break

# If no matching EDU was found, create a new one.
else:
self._pending_receipt_edus.append(
{
receipt.room_id: {
receipt.receipt_type: {receipt.user_id: serialized_receipt}
}
}
)

def flush_read_receipts_for_room(self, room_id: str) -> None:
# if we don't have any read-receipts for this room, it may be that we've already
# sent them out, so we don't need to flush.
if room_id not in self._pending_rrs:
return
self._rrs_pending_flush = True
self.attempt_new_transaction()
# If there are any pending receipts for this room then force-flush them
# in a new transaction.
for edu in self._pending_receipt_edus:
if room_id in edu:
self._rrs_pending_flush = True
self.attempt_new_transaction()
clokep marked this conversation as resolved.
Show resolved Hide resolved
# No use in checking remaining EDUs if the room was found.
break

def send_keyed_edu(self, edu: Edu, key: Hashable) -> None:
self._pending_edus_keyed[(edu.edu_type, key)] = edu
Expand Down Expand Up @@ -351,7 +390,7 @@ async def _transaction_transmission_loop(self) -> None:
self._pending_edus = []
self._pending_edus_keyed = {}
self._pending_presence = {}
self._pending_rrs = {}
self._pending_receipt_edus = []

self._start_catching_up()
except FederationDeniedError as e:
Expand Down Expand Up @@ -542,22 +581,27 @@ async def _catch_up_transmission_loop(self) -> None:
self._destination, last_successful_stream_ordering
)

def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]:
if not self._pending_rrs:
def _get_receipt_edus(self, force_flush: bool, limit: int) -> Iterable[Edu]:
if not self._pending_receipt_edus:
return
if not force_flush and not self._rrs_pending_flush:
# not yet time for this lot
return

edu = Edu(
origin=self._server_name,
destination=self._destination,
edu_type=EduTypes.RECEIPT,
content=self._pending_rrs,
)
self._pending_rrs = {}
self._rrs_pending_flush = False
yield edu
# Send at most limit EDUs for receipts.
for content in self._pending_receipt_edus[:limit]:
yield Edu(
origin=self._server_name,
destination=self._destination,
edu_type=EduTypes.RECEIPT,
content=content,
)
self._pending_receipt_edus = self._pending_receipt_edus[limit:]

# If there are still pending read-receipts, don't reset the pending flush
# flag.
if not self._pending_receipt_edus:
self._rrs_pending_flush = False

def _pop_pending_edus(self, limit: int) -> List[Edu]:
pending_edus = self._pending_edus
Expand Down Expand Up @@ -644,68 +688,79 @@ class _TransactionQueueManager:
async def __aenter__(self) -> Tuple[List[EventBase], List[Edu]]:
# First we calculate the EDUs we want to send, if any.

# We start by fetching device related EDUs, i.e device updates and to
# device messages. We have to keep 2 free slots for presence and rr_edus.
device_edu_limit = MAX_EDUS_PER_TRANSACTION - 2
# There's a maximum number of EDUs that can be sent with a transaction,
# generally device updates and to-device messages get priority, but we
# want to ensure that there's room for some other EDUs as well.
#
# This is done by:
#
# * Add a presence EDU, if one exists.
# * Add up-to a small limit of read receipt EDUs.
# * Add to-device EDUs, but leave some space for device list updates.
# * Add device list updates EDUs.
# * If there's any remaining room, add other EDUs.
pending_edus = []

# Add presence EDU.
if self.queue._pending_presence:
pending_edus.append(
Edu(
origin=self.queue._server_name,
destination=self.queue._destination,
edu_type=EduTypes.PRESENCE,
content={
"push": [
format_user_presence_state(
presence, self.queue._clock.time_msec()
)
for presence in self.queue._pending_presence.values()
]
},
)
)
self.queue._pending_presence = {}

# We prioritize to-device messages so that existing encryption channels
# Add read receipt EDUs.
pending_edus.extend(self.queue._get_receipt_edus(force_flush=False, limit=5))
edu_limit = MAX_EDUS_PER_TRANSACTION - len(pending_edus)

# Next, prioritize to-device messages so that existing encryption channels
# work. We also keep a few slots spare (by reducing the limit) so that
# we can still trickle out some device list updates.
(
to_device_edus,
device_stream_id,
) = await self.queue._get_to_device_message_edus(device_edu_limit - 10)
) = await self.queue._get_to_device_message_edus(edu_limit - 10)

if to_device_edus:
self._device_stream_id = device_stream_id
else:
self.queue._last_device_stream_id = device_stream_id

device_edu_limit -= len(to_device_edus)
pending_edus.extend(to_device_edus)
edu_limit -= len(to_device_edus)

# Add device list update EDUs.
device_update_edus, dev_list_id = await self.queue._get_device_update_edus(
device_edu_limit
edu_limit
)

if device_update_edus:
self._device_list_id = dev_list_id
else:
self.queue._last_device_list_stream_id = dev_list_id

pending_edus = device_update_edus + to_device_edus

# Now add the read receipt EDU.
pending_edus.extend(self.queue._get_rr_edus(force_flush=False))

# And presence EDU.
if self.queue._pending_presence:
pending_edus.append(
Edu(
origin=self.queue._server_name,
destination=self.queue._destination,
edu_type=EduTypes.PRESENCE,
content={
"push": [
format_user_presence_state(
presence, self.queue._clock.time_msec()
)
for presence in self.queue._pending_presence.values()
]
},
)
)
self.queue._pending_presence = {}
pending_edus.extend(device_update_edus)
edu_limit -= len(device_update_edus)

# Finally add any other types of EDUs if there is room.
pending_edus.extend(
self.queue._pop_pending_edus(MAX_EDUS_PER_TRANSACTION - len(pending_edus))
)
while (
len(pending_edus) < MAX_EDUS_PER_TRANSACTION
and self.queue._pending_edus_keyed
):
other_edus = self.queue._pop_pending_edus(edu_limit)
pending_edus.extend(other_edus)
edu_limit -= len(other_edus)
while edu_limit > 0 and self.queue._pending_edus_keyed:
_, val = self.queue._pending_edus_keyed.popitem()
pending_edus.append(val)
edu_limit -= 1

# Now we look for any PDUs to send, by getting up to 50 PDUs from the
# queue
Expand All @@ -716,8 +771,10 @@ async def __aenter__(self) -> Tuple[List[EventBase], List[Edu]]:

# if we've decided to send a transaction anyway, and we have room, we
# may as well send any pending RRs
if len(pending_edus) < MAX_EDUS_PER_TRANSACTION:
pending_edus.extend(self.queue._get_rr_edus(force_flush=True))
if edu_limit:
pending_edus.extend(
self.queue._get_receipt_edus(force_flush=True, limit=edu_limit)
)

if self._pdus:
self._last_stream_ordering = self._pdus[
Expand Down
1 change: 0 additions & 1 deletion synapse/handlers/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ async def _received_remote_receipt(self, origin: str, content: JsonDict) -> None
continue

# Check if these receipts apply to a thread.
thread_id = None
data = user_values.get("data", {})
thread_id = data.get("thread_id")
# If the thread ID is invalid, consider it missing.
Expand Down
77 changes: 77 additions & 0 deletions tests/federation/test_federation_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,83 @@ def test_send_receipts(self):
],
)

@override_config({"send_federation": True})
def test_send_receipts_thread(self):
mock_send_transaction = (
self.hs.get_federation_transport_client().send_transaction
)
mock_send_transaction.return_value = make_awaitable({})

# Create receipts for:
#
# * The same room / user on multiple threads.
# * A different user in the same room.
sender = self.hs.get_federation_sender()
for user, thread in (
("alice", None),
("alice", "thread"),
("bob", None),
("bob", "diff-thread"),
):
receipt = ReadReceipt(
"room_id",
"m.read",
user,
["event_id"],
thread_id=thread,
data={"ts": 1234},
)
self.successResultOf(
defer.ensureDeferred(sender.send_read_receipt(receipt))
)

self.pump()

# expect a call to send_transaction with two EDUs to separate threads.
mock_send_transaction.assert_called_once()
json_cb = mock_send_transaction.call_args[0][1]
data = json_cb()
# Note that the ordering of the EDUs doesn't matter.
self.assertCountEqual(
data["edus"],
[
{
"edu_type": EduTypes.RECEIPT,
"content": {
"room_id": {
"m.read": {
"alice": {
"event_ids": ["event_id"],
"data": {"ts": 1234, "thread_id": "thread"},
},
"bob": {
"event_ids": ["event_id"],
"data": {"ts": 1234, "thread_id": "diff-thread"},
},
}
}
},
},
{
"edu_type": EduTypes.RECEIPT,
"content": {
"room_id": {
"m.read": {
"alice": {
"event_ids": ["event_id"],
"data": {"ts": 1234},
},
"bob": {
"event_ids": ["event_id"],
"data": {"ts": 1234},
},
}
}
},
},
],
)

@override_config({"send_federation": True})
def test_send_receipts_with_backoff(self):
"""Send two receipts in quick succession; the second should be flushed, but
Expand Down