Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add support for MSC3202: sending one-time key counts and fallback key usage states to Application Services. #11617

Merged
merged 26 commits into from
Feb 24, 2022
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
eab4a03
Support opting-in to MSC3202 transactional behaviour using the regist…
reivilibre Dec 1, 2021
2c74755
Add type aliases for one-time key counts and unused fallback keys tha…
reivilibre Dec 1, 2021
d641b98
Feed one-time key counts and unused fallback keys through the transac…
reivilibre Dec 1, 2021
a5d830a
Emit the one-time key counts and fallback keys over federation
reivilibre Dec 1, 2021
873f341
During AS catch-up, send empty OTK counts and fallback keys for now
reivilibre Dec 1, 2021
a1d9c34
Boring piping
reivilibre Dec 10, 2021
344649d
Fix up some tests
reivilibre Dec 10, 2021
3761419
Add feature flag for experimental MSC3202 transaction extensions
reivilibre Dec 10, 2021
e38a650
Pipe through the feature flag
reivilibre Dec 10, 2021
f5b7291
Add some method stubs and add the OTKs and FBKs to the response
reivilibre Dec 13, 2021
a105a2f
Find interesting users for the AS when sending OTKs and FBKs
reivilibre Dec 13, 2021
e257122
Fix up tests that weren't expecting extra call arguments
reivilibre Dec 13, 2021
053f8ed
Count the OTKs in bulk
reivilibre Dec 13, 2021
d3fff94
Get unused fallback key types in bulk and send them out
reivilibre Dec 13, 2021
38495b1
Add a test for sending OTKs and UFBKs to ASes upon receiving PDUs
reivilibre Dec 17, 2021
e0fbd84
Newsfile
reivilibre Dec 20, 2021
5a3a5d9
Antilint
reivilibre Feb 3, 2022
9819173
Update synapse/appservice/api.py
reivilibre Feb 11, 2022
853b29b
Remove misleading word 'Lazily'
reivilibre Feb 11, 2022
c696a03
Fix TODO comment regarding E2E AS reliability (catch-up)
reivilibre Feb 11, 2022
f9ffeb3
Collapse the two functions into one
reivilibre Feb 16, 2022
64febcb
Add comment about why we deliberately add empty lists for zero-unused…
reivilibre Feb 23, 2022
a6f7126
Add similar comment about why we deliberately add empty dicts for no …
reivilibre Feb 23, 2022
35b23d3
Merge branch 'develop' into rei/msc3202_otks_fbks
reivilibre Feb 23, 2022
f2b2b3a
Fix references to just-removed get_datastore()
reivilibre Feb 23, 2022
712ef2d
Merge branch 'develop' into rei/msc3202_otks_fbks
reivilibre Feb 24, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/11617.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add support for MSC3202: sending one-time key counts and fallback key usage states to Application Services.
16 changes: 16 additions & 0 deletions synapse/appservice/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@

logger = logging.getLogger(__name__)

# Type for the `device_one_time_key_counts` field in an appservice transaction
# user ID -> {device ID -> {algorithm -> count}}
TransactionOneTimeKeyCounts = Dict[str, Dict[str, Dict[str, int]]]

# Type for the `device_unused_fallback_keys` field in an appservice transaction
# user ID -> {device ID -> [algorithm]}
TransactionUnusedFallbackKeys = Dict[str, Dict[str, List[str]]]


class ApplicationServiceState(Enum):
DOWN = "down"
Expand Down Expand Up @@ -72,6 +80,7 @@ def __init__(
rate_limited: bool = True,
ip_range_whitelist: Optional[IPSet] = None,
supports_ephemeral: bool = False,
msc3202_transaction_extensions: bool = False,
):
self.token = token
self.url = (
Expand All @@ -84,6 +93,7 @@ def __init__(
self.id = id
self.ip_range_whitelist = ip_range_whitelist
self.supports_ephemeral = supports_ephemeral
self.msc3202_transaction_extensions = msc3202_transaction_extensions

if "|" in self.id:
raise Exception("application service ID cannot contain '|' character")
Expand Down Expand Up @@ -352,12 +362,16 @@ def __init__(
events: List[EventBase],
ephemeral: List[JsonDict],
to_device_messages: List[JsonDict],
one_time_key_counts: TransactionOneTimeKeyCounts,
unused_fallback_keys: TransactionUnusedFallbackKeys,
):
self.service = service
self.id = id
self.events = events
self.ephemeral = ephemeral
self.to_device_messages = to_device_messages
self.one_time_key_counts = one_time_key_counts
self.unused_fallback_keys = unused_fallback_keys

async def send(self, as_api: "ApplicationServiceApi") -> bool:
"""Sends this transaction using the provided AS API interface.
Expand All @@ -372,6 +386,8 @@ async def send(self, as_api: "ApplicationServiceApi") -> bool:
events=self.events,
ephemeral=self.ephemeral,
to_device_messages=self.to_device_messages,
one_time_key_counts=self.one_time_key_counts,
unused_fallback_keys=self.unused_fallback_keys,
txn_id=self.id,
)

Expand Down
20 changes: 18 additions & 2 deletions synapse/appservice/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@

from synapse.api.constants import EventTypes, Membership, ThirdPartyEntityKind
from synapse.api.errors import CodeMessageException
from synapse.appservice import (
ApplicationService,
TransactionOneTimeKeyCounts,
TransactionUnusedFallbackKeys,
)
from synapse.events import EventBase
from synapse.events.utils import serialize_event
from synapse.http.client import SimpleHttpClient
from synapse.types import JsonDict, ThirdPartyInstanceID
from synapse.util.caches.response_cache import ResponseCache

if TYPE_CHECKING:
from synapse.appservice import ApplicationService
from synapse.server import HomeServer

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -219,6 +223,8 @@ async def push_bulk(
events: List[EventBase],
ephemeral: List[JsonDict],
to_device_messages: List[JsonDict],
one_time_key_counts: TransactionOneTimeKeyCounts,
unused_fallback_keys: TransactionUnusedFallbackKeys,
txn_id: Optional[int] = None,
) -> bool:
"""
Expand Down Expand Up @@ -252,7 +258,7 @@ async def push_bulk(
uri = service.url + ("/transactions/%s" % urllib.parse.quote(str(txn_id)))

# Never send ephemeral events to appservices that do not support it
body: Dict[str, List[JsonDict]] = {"events": serialized_events}
body: JsonDict = {"events": serialized_events}
if service.supports_ephemeral:
body.update(
{
Expand All @@ -262,6 +268,16 @@ async def push_bulk(
}
)

if service.msc3202_transaction_extensions:
if one_time_key_counts:
body[
"org.matrix.msc3202.device_one_time_key_counts"
] = one_time_key_counts
if unused_fallback_keys:
body[
"org.matrix.msc3202.device_unused_fallback_keys"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having tried to play with this just now, I think this is wrong. The MSC states device_unused_fallback_key_types

] = unused_fallback_keys

try:
await self.put_json(
uri=uri,
Expand Down
106 changes: 102 additions & 4 deletions synapse/appservice/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,19 @@
Callable,
Collection,
Dict,
Iterable,
List,
Optional,
Set,
Tuple,
)

from synapse.appservice import ApplicationService, ApplicationServiceState
from synapse.appservice import (
ApplicationService,
ApplicationServiceState,
TransactionOneTimeKeyCounts,
TransactionUnusedFallbackKeys,
)
from synapse.appservice.api import ApplicationServiceApi
from synapse.events import EventBase
from synapse.logging.context import run_in_background
Expand Down Expand Up @@ -96,7 +103,7 @@ def __init__(self, hs: "HomeServer"):
self.as_api = hs.get_application_service_api()

self.txn_ctrl = _TransactionController(self.clock, self.store, self.as_api)
self.queuer = _ServiceQueuer(self.txn_ctrl, self.clock)
self.queuer = _ServiceQueuer(self.txn_ctrl, self.clock, hs)

async def start(self) -> None:
logger.info("Starting appservice scheduler")
Expand Down Expand Up @@ -153,7 +160,9 @@ class _ServiceQueuer:
appservice at a given time.
"""

def __init__(self, txn_ctrl: "_TransactionController", clock: Clock):
def __init__(
self, txn_ctrl: "_TransactionController", clock: Clock, hs: "HomeServer"
):
# dict of {service_id: [events]}
self.queued_events: Dict[str, List[EventBase]] = {}
# dict of {service_id: [events]}
Expand All @@ -165,6 +174,10 @@ def __init__(self, txn_ctrl: "_TransactionController", clock: Clock):
self.requests_in_flight: Set[str] = set()
self.txn_ctrl = txn_ctrl
self.clock = clock
self._msc3202_transaction_extensions_enabled: bool = (
hs.config.experimental.msc3202_transaction_extensions
)
self._store = hs.get_datastore()

def start_background_request(self, service: ApplicationService) -> None:
# start a sender for this appservice if we don't already have one
Expand Down Expand Up @@ -202,15 +215,92 @@ async def _send_request(self, service: ApplicationService) -> None:
if not events and not ephemeral and not to_device_messages_to_send:
return

one_time_key_counts: Optional[TransactionOneTimeKeyCounts] = None
unused_fallback_keys: Optional[TransactionUnusedFallbackKeys] = None

if (
self._msc3202_transaction_extensions_enabled
and service.msc3202_transaction_extensions
):
# Compute the one-time key counts and fallback key usage states
# for the users which are mentioned in this transaction,
# as well as the appservice's sender.
interesting_users = await self._determine_interesting_users_for_msc3202_otk_counts_and_fallback_keys(
service, events, ephemeral, to_device_messages_to_send
)
(
one_time_key_counts,
unused_fallback_keys,
) = await self._compute_msc3202_otk_counts_and_fallback_keys(
interesting_users
)

try:
await self.txn_ctrl.send(
service, events, ephemeral, to_device_messages_to_send
service,
events,
ephemeral,
to_device_messages_to_send,
one_time_key_counts,
unused_fallback_keys,
)
except Exception:
logger.exception("AS request failed")
finally:
self.requests_in_flight.discard(service.id)

async def _determine_interesting_users_for_msc3202_otk_counts_and_fallback_keys(
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
self,
service: ApplicationService,
events: Iterable[EventBase],
ephemerals: Iterable[JsonDict],
to_device_messages: Iterable[JsonDict],
) -> Set[str]:
"""
Given a list of the events, ephemeral messages and to-device messages,
compute a list of application services users that may have interesting
updates to the one-time key counts or fallback key usage.
"""
interesting_users: Set[str] = set()

# The sender is always included
interesting_users.add(service.sender)

# All AS users that would receive the PDUs or EDUs sent to these rooms
# are classed as 'interesting'.
rooms_of_interesting_users: Set[str] = set()
# PDUs
rooms_of_interesting_users.update(event.room_id for event in events)
# EDUs
rooms_of_interesting_users.update(
ephemeral["room_id"] for ephemeral in ephemerals
)

# Look up the AS users in those rooms
for room_id in rooms_of_interesting_users:
interesting_users.update(
await self._store.get_app_service_users_in_room(room_id, service)
)

# Add recipients of to-device messages.
# device_message["user_id"] is the ID of the recipient.
interesting_users.update(
device_message["user_id"] for device_message in to_device_messages
)

return interesting_users

async def _compute_msc3202_otk_counts_and_fallback_keys(
self, users: Set[str]
) -> Tuple[TransactionOneTimeKeyCounts, TransactionUnusedFallbackKeys]:
"""
Given a list of application service users that are interesting,
compute one-time key counts and fallback key usages for the users.
"""
otk_counts = await self._store.count_bulk_e2e_one_time_keys_for_as(users)
unused_fbks = await self._store.get_e2e_bulk_unused_fallback_key_types(users)
return otk_counts, unused_fbks


class _TransactionController:
"""Transaction manager.
Expand Down Expand Up @@ -238,6 +328,8 @@ async def send(
events: List[EventBase],
ephemeral: Optional[List[JsonDict]] = None,
to_device_messages: Optional[List[JsonDict]] = None,
one_time_key_counts: Optional[TransactionOneTimeKeyCounts] = None,
unused_fallback_keys: Optional[TransactionUnusedFallbackKeys] = None,
) -> None:
"""
Create a transaction with the given data and send to the provided
Expand All @@ -248,13 +340,19 @@ async def send(
events: The persistent events to include in the transaction.
ephemeral: The ephemeral events to include in the transaction.
to_device_messages: The to-device messages to include in the transaction.
one_time_key_counts: Counts of remaining one-time keys for relevant
appservice devices in the transaction.
unused_fallback_keys: Lists of unused fallback keys for relevant
appservice devices in the transaction.
"""
try:
txn = await self.store.create_appservice_txn(
service=service,
events=events,
ephemeral=ephemeral or [],
to_device_messages=to_device_messages or [],
one_time_key_counts=one_time_key_counts or {},
unused_fallback_keys=unused_fallback_keys or {},
)
service_is_up = await self._is_service_up(service)
if service_is_up:
Expand Down
13 changes: 12 additions & 1 deletion synapse/config/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,16 @@ def _load_appservice(

supports_ephemeral = as_info.get("de.sorunome.msc2409.push_ephemeral", False)

# Opt-in flag for the MSC3202-specific transactional behaviour.
# When enabled, appservice transactions contain the following information:
# - device One-Time Key counts
# - device unused fallback key usage states
msc3202_transaction_extensions = as_info.get("org.matrix.msc3202", False)
if not isinstance(msc3202_transaction_extensions, bool):
raise ValueError(
"The `org.matrix.msc3202` option should be true or false if specified."
)

return ApplicationService(
token=as_info["as_token"],
hostname=hostname,
Expand All @@ -174,8 +184,9 @@ def _load_appservice(
hs_token=as_info["hs_token"],
sender=user_id,
id=as_info["id"],
supports_ephemeral=supports_ephemeral,
protocols=protocols,
rate_limited=rate_limited,
ip_range_whitelist=ip_range_whitelist,
supports_ephemeral=supports_ephemeral,
msc3202_transaction_extensions=msc3202_transaction_extensions,
)
6 changes: 6 additions & 0 deletions synapse/config/experimental.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,9 @@ def read_config(self, config: JsonDict, **kwargs):
self.msc2409_to_device_messages_enabled: bool = experimental.get(
"msc2409_to_device_messages_enabled", False
)

# Portion of MSC3202 related to transaction extensions:
# sending one-time key counts and fallback key usage to application services.
self.msc3202_transaction_extensions: bool = experimental.get(
"msc3202_transaction_extensions", False
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth having both options?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to merge if you think that's best — I wasn't right sure because they're really quite separate features that could easily have been separate MSCs, but not fussed either way.

Loading