Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Federation outbound proxy (#15773)
Browse files Browse the repository at this point in the history
Allow configuring the set of workers to proxy outbound federation traffic through (`outbound_federation_restricted_to`).

This is useful when you have a worker setup with `federation_sender` instances responsible for sending outbound federation requests and want to make sure *all* outbound federation traffic goes through those instances. Before this change, the generic workers would still contact federation themselves for things like profile lookups, backfill, etc. This PR allows you to set more strict access controls/firewall for all workers and only allow the `federation_sender`'s to contact the outside world.

The original code is from @erikjohnston's branches which I've gotten in-shape to merge.
  • Loading branch information
MadLittleMods authored Jul 5, 2023
1 parent 561d06b commit b07b14b
Show file tree
Hide file tree
Showing 29 changed files with 890 additions and 90 deletions.
1 change: 1 addition & 0 deletions changelog.d/15773.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Allow configuring the set of workers to proxy outbound federation traffic through via `outbound_federation_restricted_to`.
31 changes: 24 additions & 7 deletions docs/usage/configuration/config_documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -3930,13 +3930,14 @@ federation_sender_instances:
---
### `instance_map`

When using workers this should be a map from [`worker_name`](#worker_name) to the
HTTP replication listener of the worker, if configured, and to the main process.
Each worker declared under [`stream_writers`](../../workers.md#stream-writers) needs
a HTTP replication listener, and that listener should be included in the `instance_map`.
The main process also needs an entry on the `instance_map`, and it should be listed under
`main` **if even one other worker exists**. Ensure the port matches with what is declared
inside the `listener` block for a `replication` listener.
When using workers this should be a map from [`worker_name`](#worker_name) to the HTTP
replication listener of the worker, if configured, and to the main process. Each worker
declared under [`stream_writers`](../../workers.md#stream-writers) and
[`outbound_federation_restricted_to`](#outbound_federation_restricted_to) needs a HTTP replication listener, and that
listener should be included in the `instance_map`. The main process also needs an entry
on the `instance_map`, and it should be listed under `main` **if even one other worker
exists**. Ensure the port matches with what is declared inside the `listener` block for
a `replication` listener.


Example configuration:
Expand Down Expand Up @@ -3966,6 +3967,22 @@ stream_writers:
typing: worker1
```
---
### `outbound_federation_restricted_to`

When using workers, you can restrict outbound federation traffic to only go through a
specific subset of workers. Any worker specified here must also be in the
[`instance_map`](#instance_map).

```yaml
outbound_federation_restricted_to:
- federation_sender1
- federation_sender2
```

Also see the [worker
documentation](../../workers.md#restrict-outbound-federation-traffic-to-a-specific-set-of-workers)
for more info.
---
### `run_background_tasks_on`

The [worker](../../workers.md#background-tasks) that is used to run
Expand Down
20 changes: 20 additions & 0 deletions docs/workers.md
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,26 @@ the stream writer for the `presence` stream:

^/_matrix/client/(api/v1|r0|v3|unstable)/presence/

#### Restrict outbound federation traffic to a specific set of workers

The `outbound_federation_restricted_to` configuration is useful to make sure outbound
federation traffic only goes through a specified subset of workers. This allows you to
set more strict access controls (like a firewall) for all workers and only allow the
`federation_sender`'s to contact the outside world.
```yaml
instance_map:
main:
host: localhost
port: 8030
federation_sender1:
host: localhost
port: 8034
outbound_federation_restricted_to:
- federation_sender1
```
#### Background tasks
There is also support for moving background tasks to a separate
Expand Down
2 changes: 2 additions & 0 deletions synapse/app/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ def listen_unix(


def listen_http(
hs: "HomeServer",
listener_config: ListenerConfig,
root_resource: Resource,
version_string: str,
Expand All @@ -406,6 +407,7 @@ def listen_http(
version_string,
max_request_body_size=max_request_body_size,
reactor=reactor,
federation_agent=hs.get_federation_http_client().agent,
)

if isinstance(listener_config, TCPListenerConfig):
Expand Down
1 change: 1 addition & 0 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ def _listen_http(self, listener_config: ListenerConfig) -> None:
root_resource = create_resource_tree(resources, OptionsResource())

_base.listen_http(
self,
listener_config,
root_resource,
self.version_string,
Expand Down
1 change: 1 addition & 0 deletions synapse/app/homeserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ def _listener_http(
root_resource = OptionsResource()

ports = listen_http(
self,
listener_config,
create_resource_tree(resources, root_resource),
self.version_string,
Expand Down
40 changes: 39 additions & 1 deletion synapse/config/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import argparse
import logging
from typing import Any, Dict, List, Union
from typing import Any, Dict, List, Optional, Union

import attr
from pydantic import BaseModel, Extra, StrictBool, StrictInt, StrictStr
Expand Down Expand Up @@ -148,6 +148,27 @@ class WriterLocations:
)


@attr.s(auto_attribs=True)
class OutboundFederationRestrictedTo:
"""Whether we limit outbound federation to a certain set of instances.
Attributes:
instances: optional list of instances that can make outbound federation
requests. If None then all instances can make federation requests.
locations: list of instance locations to connect to proxy via.
"""

instances: Optional[List[str]]
locations: List[InstanceLocationConfig] = attr.Factory(list)

def __contains__(self, instance: str) -> bool:
# It feels a bit dirty to return `True` if `instances` is `None`, but it makes
# sense in downstream usage in the sense that if
# `outbound_federation_restricted_to` is not configured, then any instance can
# talk to federation (no restrictions so always return `True`).
return self.instances is None or instance in self.instances


class WorkerConfig(Config):
"""The workers are processes run separately to the main synapse process.
They have their own pid_file and listener configuration. They use the
Expand Down Expand Up @@ -357,6 +378,23 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:
new_option_name="update_user_directory_from_worker",
)

outbound_federation_restricted_to = config.get(
"outbound_federation_restricted_to", None
)
self.outbound_federation_restricted_to = OutboundFederationRestrictedTo(
outbound_federation_restricted_to
)
if outbound_federation_restricted_to:
for instance in outbound_federation_restricted_to:
if instance not in self.instance_map:
raise ConfigError(
"Instance %r is configured in 'outbound_federation_restricted_to' but does not appear in `instance_map` config."
% (instance,)
)
self.outbound_federation_restricted_to.locations.append(
self.instance_map[instance]
)

def _should_this_worker_perform_duty(
self,
config: Dict[str, Any],
Expand Down
7 changes: 6 additions & 1 deletion synapse/http/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1037,7 +1037,12 @@ def connectionLost(self, reason: Failure = connectionDone) -> None:
if reason.check(ResponseDone):
self.deferred.callback(self.length)
elif reason.check(PotentialDataLoss):
# stolen from https://github.com/twisted/treq/pull/49/files
# This applies to requests which don't set `Content-Length` or a
# `Transfer-Encoding` in the response because in this case the end of the
# response is indicated by the connection being closed, an event which may
# also be due to a transient network problem or other error. But since this
# behavior is expected of some servers (like YouTube), let's ignore it.
# Stolen from https://github.com/twisted/treq/pull/49/files
# http://twistedmatrix.com/trac/ticket/4840
self.deferred.callback(self.length)
else:
Expand Down
132 changes: 122 additions & 10 deletions synapse/http/matrixfederationclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
from twisted.internet.task import Cooperator
from twisted.web.client import ResponseFailed
from twisted.web.http_headers import Headers
from twisted.web.iweb import IBodyProducer, IResponse
from twisted.web.iweb import IAgent, IBodyProducer, IResponse

import synapse.metrics
import synapse.util.retryutils
Expand All @@ -72,6 +72,7 @@
read_body_with_max_size,
)
from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
from synapse.http.proxyagent import ProxyAgent
from synapse.http.types import QueryParams
from synapse.logging import opentracing
from synapse.logging.context import make_deferred_yieldable, run_in_background
Expand Down Expand Up @@ -393,17 +394,32 @@ def __init__(
if hs.config.server.user_agent_suffix:
user_agent = "%s %s" % (user_agent, hs.config.server.user_agent_suffix)

federation_agent = MatrixFederationAgent(
self.reactor,
tls_client_options_factory,
user_agent.encode("ascii"),
hs.config.server.federation_ip_range_allowlist,
hs.config.server.federation_ip_range_blocklist,
outbound_federation_restricted_to = (
hs.config.worker.outbound_federation_restricted_to
)
if hs.get_instance_name() in outbound_federation_restricted_to:
# Talk to federation directly
federation_agent: IAgent = MatrixFederationAgent(
self.reactor,
tls_client_options_factory,
user_agent.encode("ascii"),
hs.config.server.federation_ip_range_allowlist,
hs.config.server.federation_ip_range_blocklist,
)
else:
# We need to talk to federation via the proxy via one of the configured
# locations
federation_proxies = outbound_federation_restricted_to.locations
federation_agent = ProxyAgent(
self.reactor,
self.reactor,
tls_client_options_factory,
federation_proxies=federation_proxies,
)

# Use a BlocklistingAgentWrapper to prevent circumventing the IP
# blocking via IP literals in server names
self.agent = BlocklistingAgentWrapper(
self.agent: IAgent = BlocklistingAgentWrapper(
federation_agent,
ip_blocklist=hs.config.server.federation_ip_range_blocklist,
)
Expand All @@ -412,7 +428,6 @@ def __init__(
self._store = hs.get_datastores().main
self.version_string_bytes = hs.version_string.encode("ascii")
self.default_timeout_seconds = hs.config.federation.client_timeout_ms / 1000

self.max_long_retry_delay_seconds = (
hs.config.federation.max_long_retry_delay_ms / 1000
)
Expand Down Expand Up @@ -1131,6 +1146,101 @@ async def get_json(
Succeeds when we get a 2xx HTTP response. The
result will be the decoded JSON body.
Raises:
HttpResponseException: If we get an HTTP response code >= 300
(except 429).
NotRetryingDestination: If we are not yet ready to retry this
server.
FederationDeniedError: If this destination is not on our
federation whitelist
RequestSendFailed: If there were problems connecting to the
remote, due to e.g. DNS failures, connection timeouts etc.
"""
json_dict, _ = await self.get_json_with_headers(
destination=destination,
path=path,
args=args,
retry_on_dns_fail=retry_on_dns_fail,
timeout=timeout,
ignore_backoff=ignore_backoff,
try_trailing_slash_on_400=try_trailing_slash_on_400,
parser=parser,
)
return json_dict

@overload
async def get_json_with_headers(
self,
destination: str,
path: str,
args: Optional[QueryParams] = None,
retry_on_dns_fail: bool = True,
timeout: Optional[int] = None,
ignore_backoff: bool = False,
try_trailing_slash_on_400: bool = False,
parser: Literal[None] = None,
) -> Tuple[JsonDict, Dict[bytes, List[bytes]]]:
...

@overload
async def get_json_with_headers(
self,
destination: str,
path: str,
args: Optional[QueryParams] = ...,
retry_on_dns_fail: bool = ...,
timeout: Optional[int] = ...,
ignore_backoff: bool = ...,
try_trailing_slash_on_400: bool = ...,
parser: ByteParser[T] = ...,
) -> Tuple[T, Dict[bytes, List[bytes]]]:
...

async def get_json_with_headers(
self,
destination: str,
path: str,
args: Optional[QueryParams] = None,
retry_on_dns_fail: bool = True,
timeout: Optional[int] = None,
ignore_backoff: bool = False,
try_trailing_slash_on_400: bool = False,
parser: Optional[ByteParser[T]] = None,
) -> Tuple[Union[JsonDict, T], Dict[bytes, List[bytes]]]:
"""GETs some json from the given host homeserver and path
Args:
destination: The remote server to send the HTTP request to.
path: The HTTP path.
args: A dictionary used to create query strings, defaults to
None.
retry_on_dns_fail: true if the request should be retried on DNS failures
timeout: number of milliseconds to wait for the response.
self._default_timeout (60s) by default.
Note that we may make several attempts to send the request; this
timeout applies to the time spent waiting for response headers for
*each* attempt (including connection time) as well as the time spent
reading the response body after a 200 response.
ignore_backoff: true to ignore the historical backoff data
and try the request anyway.
try_trailing_slash_on_400: True if on a 400 M_UNRECOGNIZED
response we should try appending a trailing slash to the end of
the request. Workaround for #3622 in Synapse <= v0.99.3.
parser: The parser to use to decode the response. Defaults to
parsing as JSON.
Returns:
Succeeds when we get a 2xx HTTP response. The result will be a tuple of the
decoded JSON body and a dict of the response headers.
Raises:
HttpResponseException: If we get an HTTP response code >= 300
(except 429).
Expand All @@ -1156,6 +1266,8 @@ async def get_json(
timeout=timeout,
)

headers = dict(response.headers.getAllRawHeaders())

if timeout is not None:
_sec_timeout = timeout / 1000
else:
Expand All @@ -1173,7 +1285,7 @@ async def get_json(
parser=parser,
)

return body
return body, headers

async def delete_json(
self,
Expand Down
Loading

0 comments on commit b07b14b

Please sign in to comment.