From 71a36f1ab40997d467eec6ce420be0da190b3b2c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 17 Sep 2020 16:19:21 +0100 Subject: [PATCH 1/6] Add ResponseCache to incoming transactions path --- synapse/federation/federation_server.py | 27 +++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 218df884b02a..9836f50d2ac4 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -101,6 +101,11 @@ def __init__(self, hs): self._server_linearizer = Linearizer("fed_server") self._transaction_linearizer = Linearizer("fed_txn_handler") + # We cache results for transaction with the same ID + self._transaciton_resp_cache = ResponseCache( + hs, "fed_txn_handler", timeout_ms=30000 + ) + self.transaction_actions = TransactionActions(self.store) self.registry = hs.get_federation_registry() @@ -135,14 +140,28 @@ async def on_incoming_transaction( request_time = self._clock.time_msec() transaction = Transaction(**transaction_data) + transaction_id = transaction.transaction_id # type: ignore - if not transaction.transaction_id: # type: ignore + if not transaction_id: raise Exception("Transaction missing transaction_id") - logger.debug("[%s] Got transaction", transaction.transaction_id) # type: ignore + logger.debug("[%s] Got transaction", transaction_id) + + # We wrap in a ResponseCache so that we de-duplicate retried + # transactions. + return await self._transaciton_resp_cache.wrap( + (origin, transaction_id), + self._on_incoming_transaction_inner, + origin, + transaction, + request_time, + ) - # use a linearizer to ensure that we don't process the same transaction - # multiple times in parallel. + async def _on_incoming_transaction_inner( + self, origin: str, transaction: Transaction, request_time: int + ) -> Tuple[int, Dict[str, Any]]: + # Use a linearizer to ensure that process transactions from a remote in + # order. with ( await self._transaction_linearizer.queue( (origin, transaction.transaction_id) # type: ignore From cc8a26d6034e17b63c7cf858873eac88bd686c48 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 17 Sep 2020 17:21:01 +0100 Subject: [PATCH 2/6] Move ratelimit of /send/ after queues Otherwise we fill up the ratelimiter with blocked requests, which isn't terribly helpful. --- synapse/federation/federation_server.py | 12 +++++++++--- synapse/federation/transport/server.py | 13 ++++++++----- synapse/server.py | 5 +++++ 3 files changed, 22 insertions(+), 8 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 9836f50d2ac4..041a224d3e35 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -97,6 +97,7 @@ def __init__(self, hs): self.state = hs.get_state_handler() self.device_handler = hs.get_device_handler() + self._federation_ratelimiter = hs.get_federation_ratelimiter() self._server_linearizer = Linearizer("fed_server") self._transaction_linearizer = Linearizer("fed_txn_handler") @@ -167,9 +168,14 @@ async def _on_incoming_transaction_inner( (origin, transaction.transaction_id) # type: ignore ) ): - result = await self._handle_incoming_transaction( - origin, transaction, request_time - ) + # We rate limit here *after* we've queued up the incoming requests, + # so that we don't fill up the ratelimiter with blocked requests. + with self._federation_ratelimiter.ratelimit(origin) as d: + await d + + result = await self._handle_incoming_transaction( + origin, transaction, request_time + ) return result diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 9325e0f857d1..cc7e9a973ba2 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -45,7 +45,6 @@ ) from synapse.server import HomeServer from synapse.types import ThirdPartyInstanceID, get_domain_from_id -from synapse.util.ratelimitutils import FederationRateLimiter from synapse.util.versionstring import get_version_string logger = logging.getLogger(__name__) @@ -72,9 +71,7 @@ def __init__(self, hs, servlet_groups=None): super(TransportLayerServer, self).__init__(hs, canonical_json=False) self.authenticator = Authenticator(hs) - self.ratelimiter = FederationRateLimiter( - self.clock, config=hs.config.rc_federation - ) + self.ratelimiter = hs.get_federation_ratelimiter() self.register_servlets() @@ -272,6 +269,8 @@ class BaseFederationServlet: PREFIX = FEDERATION_V1_PREFIX # Allows specifying the API version + RATELIMIT = True # Whether to rate limit requests or not + def __init__(self, handler, authenticator, ratelimiter, server_name): self.handler = handler self.authenticator = authenticator @@ -335,7 +334,7 @@ async def new_func(request, *args, **kwargs): ) with scope: - if origin: + if origin and self.RATELIMIT: with ratelimiter.ratelimit(origin) as d: await d if request._disconnected: @@ -372,6 +371,10 @@ def register(self, server): class FederationSendServlet(BaseFederationServlet): PATH = "/send/(?P[^/]*)/?" + # We ratelimit manually in the handler as we queue up the requests and we + # don't want to fill up the ratelimiter with blocked requests. + RATELIMIT = False + def __init__(self, handler, server_name, **kwargs): super(FederationSendServlet, self).__init__( handler, server_name=server_name, **kwargs diff --git a/synapse/server.py b/synapse/server.py index 9055b97ac317..5e3752c3334f 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -114,6 +114,7 @@ from synapse.types import DomainSpecificString from synapse.util import Clock from synapse.util.distributor import Distributor +from synapse.util.ratelimitutils import FederationRateLimiter from synapse.util.stringutils import random_string logger = logging.getLogger(__name__) @@ -642,6 +643,10 @@ def get_replication_data_handler(self) -> ReplicationDataHandler: def get_replication_streams(self) -> Dict[str, Stream]: return {stream.NAME: stream(self) for stream in STREAMS_MAP.values()} + @cache_in_self + def get_federation_ratelimiter(self) -> FederationRateLimiter: + return FederationRateLimiter(self.clock, config=self.config.rc_federation) + async def remove_pusher(self, app_id: str, push_key: str, user_id: str): return await self.get_pusherpool().remove_pusher(app_id, push_key, user_id) From d34d7d76768ab98db4bec49ad46ca1114dbecbbf Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 17 Sep 2020 17:23:11 +0100 Subject: [PATCH 3/6] Newsfile --- changelog.d/8342.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/8342.bugfix diff --git a/changelog.d/8342.bugfix b/changelog.d/8342.bugfix new file mode 100644 index 000000000000..786057facb44 --- /dev/null +++ b/changelog.d/8342.bugfix @@ -0,0 +1 @@ +Fix ratelimitng of federation `/send` requests. From a98f8cb708355faaf24cfc03decfdb8c2ccb5599 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 17 Sep 2020 18:30:55 +0100 Subject: [PATCH 4/6] Spelling Co-authored-by: Patrick Cloke --- synapse/federation/federation_server.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 041a224d3e35..5e93af543687 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -103,7 +103,7 @@ def __init__(self, hs): self._transaction_linearizer = Linearizer("fed_txn_handler") # We cache results for transaction with the same ID - self._transaciton_resp_cache = ResponseCache( + self._transaction_resp_cache = ResponseCache( hs, "fed_txn_handler", timeout_ms=30000 ) @@ -150,7 +150,7 @@ async def on_incoming_transaction( # We wrap in a ResponseCache so that we de-duplicate retried # transactions. - return await self._transaciton_resp_cache.wrap( + return await self._transaction_resp_cache.wrap( (origin, transaction_id), self._on_incoming_transaction_inner, origin, @@ -161,8 +161,8 @@ async def on_incoming_transaction( async def _on_incoming_transaction_inner( self, origin: str, transaction: Transaction, request_time: int ) -> Tuple[int, Dict[str, Any]]: - # Use a linearizer to ensure that process transactions from a remote in - # order. + # Use a linearizer to ensure that transactions from a remote are + # processed in order. with ( await self._transaction_linearizer.queue( (origin, transaction.transaction_id) # type: ignore From 7323f7cc6a0029e59da90bba48d7a21f151b0639 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 17 Sep 2020 18:34:56 +0100 Subject: [PATCH 5/6] Only queue based on origin, not transaction ID --- synapse/federation/federation_server.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 5e93af543687..bed6233efba9 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -163,11 +163,7 @@ async def _on_incoming_transaction_inner( ) -> Tuple[int, Dict[str, Any]]: # Use a linearizer to ensure that transactions from a remote are # processed in order. - with ( - await self._transaction_linearizer.queue( - (origin, transaction.transaction_id) # type: ignore - ) - ): + with await self._transaction_linearizer.queue(origin): # We rate limit here *after* we've queued up the incoming requests, # so that we don't fill up the ratelimiter with blocked requests. with self._federation_ratelimiter.ratelimit(origin) as d: From 81f2b25f00f087bdbd49b3421784743cc637e52e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 17 Sep 2020 18:36:36 +0100 Subject: [PATCH 6/6] Comment --- synapse/federation/federation_server.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index bed6233efba9..ff00f0b3022e 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -166,6 +166,13 @@ async def _on_incoming_transaction_inner( with await self._transaction_linearizer.queue(origin): # We rate limit here *after* we've queued up the incoming requests, # so that we don't fill up the ratelimiter with blocked requests. + # + # This is important as the ratelimiter allows N concurrent requests + # at a time, and only starts ratelimiting if there are more requests + # than that being processed at a time. If we queued up requests in + # the linearizer/response cache *after* the ratelimiting then those + # queued up requests would count as part of the allowed limit of N + # concurrent requests. with self._federation_ratelimiter.ratelimit(origin) as d: await d