Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Convert the well known resolver to async #8214

Merged
merged 4 commits into from
Sep 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8214.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Convert various parts of the codebase to async/await.
1 change: 1 addition & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ files =
synapse/handlers/saml_handler.py,
synapse/handlers/sync.py,
synapse/handlers/ui_auth,
synapse/http/federation/well_known_resolver.py,
synapse/http/server.py,
synapse/http/site.py,
synapse/logging/,
Expand Down
4 changes: 2 additions & 2 deletions synapse/http/federation/matrix_federation_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ def request(self, method, uri, headers=None, bodyProducer=None):
and not _is_ip_literal(parsed_uri.hostname)
and not parsed_uri.port
):
well_known_result = yield self._well_known_resolver.get_well_known(
parsed_uri.hostname
well_known_result = yield defer.ensureDeferred(
self._well_known_resolver.get_well_known(parsed_uri.hostname)
)
delegated_server = well_known_result.delegated_server

Expand Down
57 changes: 31 additions & 26 deletions synapse/http/federation/well_known_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
import logging
import random
import time
from typing import Callable, Dict, Optional, Tuple

import attr

from twisted.internet import defer
from twisted.web.client import RedirectAgent, readBody
from twisted.web.http import stringToDatetime
from twisted.web.http_headers import Headers
from twisted.web.iweb import IResponse

from synapse.logging.context import make_deferred_yieldable
from synapse.util import Clock, json_decoder
Expand Down Expand Up @@ -99,15 +101,14 @@ def __init__(
self._well_known_agent = RedirectAgent(agent)
self.user_agent = user_agent

@defer.inlineCallbacks
def get_well_known(self, server_name):
async def get_well_known(self, server_name: bytes) -> WellKnownLookupResult:
"""Attempt to fetch and parse a .well-known file for the given server

Args:
server_name (bytes): name of the server, from the requested url
server_name: name of the server, from the requested url

Returns:
Deferred[WellKnownLookupResult]: The result of the lookup
The result of the lookup
"""
try:
prev_result, expiry, ttl = self._well_known_cache.get_with_expiry(
Expand All @@ -124,7 +125,9 @@ def get_well_known(self, server_name):
# requests for the same server in parallel?
try:
with Measure(self._clock, "get_well_known"):
result, cache_period = yield self._fetch_well_known(server_name)
result, cache_period = await self._fetch_well_known(
server_name
) # type: Tuple[Optional[bytes], float]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How come this is Optional[bytes], whereas _fetch_well_known supposedly returns bytes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is because result can be set to None below, so without it you get:

synapse/http/federation/well_known_resolver.py:139: error: Incompatible types in assignment (expression has type "None", variable has type "bytes") [assignment]

This isn't saying the type of the return value of _fetch_well_known, but the types of the variables result and cache_period.


except _FetchWellKnownFailure as e:
if prev_result and e.temporary:
Expand Down Expand Up @@ -153,26 +156,25 @@ def get_well_known(self, server_name):

return WellKnownLookupResult(delegated_server=result)

@defer.inlineCallbacks
def _fetch_well_known(self, server_name):
async def _fetch_well_known(self, server_name: bytes) -> Tuple[bytes, float]:
"""Actually fetch and parse a .well-known, without checking the cache

Args:
server_name (bytes): name of the server, from the requested url
server_name: name of the server, from the requested url

Raises:
_FetchWellKnownFailure if we fail to lookup a result

Returns:
Deferred[Tuple[bytes,int]]: The lookup result and cache period.
The lookup result and cache period.
"""

had_valid_well_known = self._had_valid_well_known_cache.get(server_name, False)

# We do this in two steps to differentiate between possibly transient
# errors (e.g. can't connect to host, 503 response) and more permenant
# errors (such as getting a 404 response).
response, body = yield self._make_well_known_request(
response, body = await self._make_well_known_request(
server_name, retry=had_valid_well_known
)

Expand Down Expand Up @@ -215,20 +217,20 @@ def _fetch_well_known(self, server_name):

return result, cache_period

@defer.inlineCallbacks
def _make_well_known_request(self, server_name, retry):
async def _make_well_known_request(
self, server_name: bytes, retry: bool
) -> Tuple[IResponse, bytes]:
"""Make the well known request.

This will retry the request if requested and it fails (with unable
to connect or receives a 5xx error).

Args:
server_name (bytes)
retry (bool): Whether to retry the request if it fails.
server_name: name of the server, from the requested url
retry: Whether to retry the request if it fails.

Returns:
Deferred[tuple[IResponse, bytes]] Returns the response object and
body. Response may be a non-200 response.
Returns the response object and body. Response may be a non-200 response.
"""
uri = b"https://%s/.well-known/matrix/server" % (server_name,)
uri_str = uri.decode("ascii")
Expand All @@ -243,12 +245,12 @@ def _make_well_known_request(self, server_name, retry):

logger.info("Fetching %s", uri_str)
try:
response = yield make_deferred_yieldable(
response = await make_deferred_yieldable(
self._well_known_agent.request(
b"GET", uri, headers=Headers(headers)
)
)
body = yield make_deferred_yieldable(readBody(response))
body = await make_deferred_yieldable(readBody(response))

if 500 <= response.code < 600:
raise Exception("Non-200 response %s" % (response.code,))
Expand All @@ -265,21 +267,24 @@ def _make_well_known_request(self, server_name, retry):
logger.info("Error fetching %s: %s. Retrying", uri_str, e)

# Sleep briefly in the hopes that they come back up
yield self._clock.sleep(0.5)
await self._clock.sleep(0.5)


def _cache_period_from_headers(headers, time_now=time.time):
def _cache_period_from_headers(
headers: Headers, time_now: Callable[[], float] = time.time
) -> Optional[float]:
cache_controls = _parse_cache_control(headers)

if b"no-store" in cache_controls:
return 0

if b"max-age" in cache_controls:
try:
max_age = int(cache_controls[b"max-age"])
return max_age
except ValueError:
pass
max_age = cache_controls[b"max-age"]
if max_age:
Comment on lines +282 to +283
Copy link
Member Author

@clokep clokep Aug 31, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This extra check was to make mypy happy that we weren't doing int(None). This case was actually not being handled properly anyway since it throws a TypeError, not a ValueError.

try:
return int(max_age)
except ValueError:
pass

expires = headers.getRawHeaders(b"expires")
if expires is not None:
Expand All @@ -295,7 +300,7 @@ def _cache_period_from_headers(headers, time_now=time.time):
return None


def _parse_cache_control(headers):
def _parse_cache_control(headers: Headers) -> Dict[bytes, Optional[bytes]]:
cache_controls = {}
for hdr in headers.getRawHeaders(b"cache-control", []):
for directive in hdr.split(b","):
Expand Down
24 changes: 18 additions & 6 deletions tests/http/federation/test_matrix_federation_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -972,7 +972,9 @@ def test_idna_srv_target(self):
def test_well_known_cache(self):
self.reactor.lookups["testserv"] = "1.2.3.4"

fetch_d = self.well_known_resolver.get_well_known(b"testserv")
fetch_d = defer.ensureDeferred(
self.well_known_resolver.get_well_known(b"testserv")
)

# there should be an attempt to connect on port 443 for the .well-known
clients = self.reactor.tcpClients
Expand All @@ -995,15 +997,19 @@ def test_well_known_cache(self):
well_known_server.loseConnection()

# repeat the request: it should hit the cache
fetch_d = self.well_known_resolver.get_well_known(b"testserv")
fetch_d = defer.ensureDeferred(
self.well_known_resolver.get_well_known(b"testserv")
)
r = self.successResultOf(fetch_d)
self.assertEqual(r.delegated_server, b"target-server")

# expire the cache
self.reactor.pump((1000.0,))

# now it should connect again
fetch_d = self.well_known_resolver.get_well_known(b"testserv")
fetch_d = defer.ensureDeferred(
self.well_known_resolver.get_well_known(b"testserv")
)

self.assertEqual(len(clients), 1)
(host, port, client_factory, _timeout, _bindAddress) = clients.pop(0)
Expand All @@ -1026,7 +1032,9 @@ def test_well_known_cache_with_temp_failure(self):

self.reactor.lookups["testserv"] = "1.2.3.4"

fetch_d = self.well_known_resolver.get_well_known(b"testserv")
fetch_d = defer.ensureDeferred(
self.well_known_resolver.get_well_known(b"testserv")
)

# there should be an attempt to connect on port 443 for the .well-known
clients = self.reactor.tcpClients
Expand All @@ -1052,7 +1060,9 @@ def test_well_known_cache_with_temp_failure(self):
# another lookup.
self.reactor.pump((900.0,))

fetch_d = self.well_known_resolver.get_well_known(b"testserv")
fetch_d = defer.ensureDeferred(
self.well_known_resolver.get_well_known(b"testserv")
)

# The resolver may retry a few times, so fonx all requests that come along
attempts = 0
Expand Down Expand Up @@ -1082,7 +1092,9 @@ def test_well_known_cache_with_temp_failure(self):
self.reactor.pump((10000.0,))

# Repated the request, this time it should fail if the lookup fails.
fetch_d = self.well_known_resolver.get_well_known(b"testserv")
fetch_d = defer.ensureDeferred(
self.well_known_resolver.get_well_known(b"testserv")
)

clients = self.reactor.tcpClients
(host, port, client_factory, _timeout, _bindAddress) = clients.pop(0)
Expand Down