Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge branch 'release-v1.45' of github.com:matrix-org/synapse into ma…
Browse files Browse the repository at this point in the history
…trix-org-hotfixes
  • Loading branch information
David Robertson committed Oct 13, 2021
2 parents 2aacb49 + b83e822 commit 27e6e45
Show file tree
Hide file tree
Showing 15 changed files with 923 additions and 94 deletions.
1 change: 1 addition & 0 deletions changelog.d/10825.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add an 'approximate difference' method to `StateFilter`.
1 change: 1 addition & 0 deletions changelog.d/10970.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix inconsistent behavior of `get_last_client_by_ip` when reporting data that has not been stored in the database yet.
1 change: 1 addition & 0 deletions changelog.d/10996.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug introduced in Synapse 1.21.0 that causes opentracing and Prometheus metrics for replication requests to be measured incorrectly.
2 changes: 2 additions & 0 deletions changelog.d/11053.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Fix a bug introduced in Synapse 1.45.0rc1 where the user directory would stop updating if it processed an event from a
user not in the `users` table.
1 change: 1 addition & 0 deletions changelog.d/11061.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug introduced in Synapse v1.44.0 when logging errors during oEmbed processing.
8 changes: 8 additions & 0 deletions synapse/logging/opentracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -807,6 +807,14 @@ def err_back(result):
result.addCallbacks(call_back, err_back)

else:
if inspect.isawaitable(result):
logger.error(
"@trace may not have wrapped %s correctly! "
"The function is not async but returned a %s.",
func.__qualname__,
type(result).__name__,
)

scope.__exit__(None, None, None)

return result
Expand Down
154 changes: 78 additions & 76 deletions synapse/replication/http/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,85 +182,87 @@ def make_client(cls, hs):
)

@trace(opname="outgoing_replication_request")
@outgoing_gauge.track_inprogress()
async def send_request(*, instance_name="master", **kwargs):
if instance_name == local_instance_name:
raise Exception("Trying to send HTTP request to self")
if instance_name == "master":
host = master_host
port = master_port
elif instance_name in instance_map:
host = instance_map[instance_name].host
port = instance_map[instance_name].port
else:
raise Exception(
"Instance %r not in 'instance_map' config" % (instance_name,)
with outgoing_gauge.track_inprogress():
if instance_name == local_instance_name:
raise Exception("Trying to send HTTP request to self")
if instance_name == "master":
host = master_host
port = master_port
elif instance_name in instance_map:
host = instance_map[instance_name].host
port = instance_map[instance_name].port
else:
raise Exception(
"Instance %r not in 'instance_map' config" % (instance_name,)
)

data = await cls._serialize_payload(**kwargs)

url_args = [
urllib.parse.quote(kwargs[name], safe="") for name in cls.PATH_ARGS
]

if cls.CACHE:
txn_id = random_string(10)
url_args.append(txn_id)

if cls.METHOD == "POST":
request_func = client.post_json_get_json
elif cls.METHOD == "PUT":
request_func = client.put_json
elif cls.METHOD == "GET":
request_func = client.get_json
else:
# We have already asserted in the constructor that a
# compatible was picked, but lets be paranoid.
raise Exception(
"Unknown METHOD on %s replication endpoint" % (cls.NAME,)
)

uri = "http://%s:%s/_synapse/replication/%s/%s" % (
host,
port,
cls.NAME,
"/".join(url_args),
)

data = await cls._serialize_payload(**kwargs)

url_args = [
urllib.parse.quote(kwargs[name], safe="") for name in cls.PATH_ARGS
]

if cls.CACHE:
txn_id = random_string(10)
url_args.append(txn_id)

if cls.METHOD == "POST":
request_func = client.post_json_get_json
elif cls.METHOD == "PUT":
request_func = client.put_json
elif cls.METHOD == "GET":
request_func = client.get_json
else:
# We have already asserted in the constructor that a
# compatible was picked, but lets be paranoid.
raise Exception(
"Unknown METHOD on %s replication endpoint" % (cls.NAME,)
)

uri = "http://%s:%s/_synapse/replication/%s/%s" % (
host,
port,
cls.NAME,
"/".join(url_args),
)

try:
# We keep retrying the same request for timeouts. This is so that we
# have a good idea that the request has either succeeded or failed on
# the master, and so whether we should clean up or not.
while True:
headers: Dict[bytes, List[bytes]] = {}
# Add an authorization header, if configured.
if replication_secret:
headers[b"Authorization"] = [b"Bearer " + replication_secret]
opentracing.inject_header_dict(headers, check_destination=False)
try:
result = await request_func(uri, data, headers=headers)
break
except RequestTimedOutError:
if not cls.RETRY_ON_TIMEOUT:
raise

logger.warning("%s request timed out; retrying", cls.NAME)

# If we timed out we probably don't need to worry about backing
# off too much, but lets just wait a little anyway.
await clock.sleep(1)
except HttpResponseException as e:
# We convert to SynapseError as we know that it was a SynapseError
# on the main process that we should send to the client. (And
# importantly, not stack traces everywhere)
_outgoing_request_counter.labels(cls.NAME, e.code).inc()
raise e.to_synapse_error()
except Exception as e:
_outgoing_request_counter.labels(cls.NAME, "ERR").inc()
raise SynapseError(502, "Failed to talk to main process") from e

_outgoing_request_counter.labels(cls.NAME, 200).inc()
return result
try:
# We keep retrying the same request for timeouts. This is so that we
# have a good idea that the request has either succeeded or failed
# on the master, and so whether we should clean up or not.
while True:
headers: Dict[bytes, List[bytes]] = {}
# Add an authorization header, if configured.
if replication_secret:
headers[b"Authorization"] = [
b"Bearer " + replication_secret
]
opentracing.inject_header_dict(headers, check_destination=False)
try:
result = await request_func(uri, data, headers=headers)
break
except RequestTimedOutError:
if not cls.RETRY_ON_TIMEOUT:
raise

logger.warning("%s request timed out; retrying", cls.NAME)

# If we timed out we probably don't need to worry about backing
# off too much, but lets just wait a little anyway.
await clock.sleep(1)
except HttpResponseException as e:
# We convert to SynapseError as we know that it was a SynapseError
# on the main process that we should send to the client. (And
# importantly, not stack traces everywhere)
_outgoing_request_counter.labels(cls.NAME, e.code).inc()
raise e.to_synapse_error()
except Exception as e:
_outgoing_request_counter.labels(cls.NAME, "ERR").inc()
raise SynapseError(502, "Failed to talk to main process") from e

_outgoing_request_counter.labels(cls.NAME, 200).inc()
return result

return send_request

Expand Down
2 changes: 1 addition & 1 deletion synapse/rest/media/v1/oembed.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ def parse_oembed_response(self, url: str, raw_body: bytes) -> OEmbedResult:

except Exception as e:
# Trap any exception and let the code follow as usual.
logger.warning(f"Error parsing oEmbed metadata from {url}: {e:r}")
logger.warning("Error parsing oEmbed metadata from %s: %r", url, e)
open_graph_response = {}
cache_age = None

Expand Down
13 changes: 9 additions & 4 deletions synapse/storage/databases/main/client_ips.py
Original file line number Diff line number Diff line change
Expand Up @@ -538,15 +538,20 @@ async def get_last_client_ip_by_device(
"""
ret = await super().get_last_client_ip_by_device(user_id, device_id)

# Update what is retrieved from the database with data which is pending insertion.
# Update what is retrieved from the database with data which is pending
# insertion, as if it has already been stored in the database.
for key in self._batch_row_update:
uid, access_token, ip = key
uid, _access_token, ip = key
if uid == user_id:
user_agent, did, last_seen = self._batch_row_update[key]

if did is None:
# These updates don't make it to the `devices` table
continue

if not device_id or did == device_id:
ret[(user_id, device_id)] = {
ret[(user_id, did)] = {
"user_id": user_id,
"access_token": access_token,
"ip": ip,
"user_agent": user_agent,
"device_id": did,
Expand Down
24 changes: 22 additions & 2 deletions synapse/storage/databases/main/user_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
cast,
)

from synapse.api.errors import StoreError

if TYPE_CHECKING:
from synapse.server import HomeServer

Expand Down Expand Up @@ -383,7 +385,19 @@ async def should_include_local_user_in_dir(self, user: str) -> bool:
"""Certain classes of local user are omitted from the user directory.
Is this user one of them?
"""
# App service users aren't usually contactable, so exclude them.
# We're opting to exclude the appservice sender (user defined by the
# `sender_localpart` in the appservice registration) even though
# technically it could be DM-able. In the future, this could potentially
# be configurable per-appservice whether the appservice sender can be
# contacted.
if self.get_app_service_by_user_id(user) is not None:
return False

# We're opting to exclude appservice users (anyone matching the user
# namespace regex in the appservice registration) even though technically
# they could be DM-able. In the future, this could potentially
# be configurable per-appservice whether the appservice users can be
# contacted.
if self.get_if_app_services_interested_in_user(user):
# TODO we might want to make this configurable for each app service
return False
Expand All @@ -393,8 +407,14 @@ async def should_include_local_user_in_dir(self, user: str) -> bool:
return False

# Deactivated users aren't contactable, so should not appear in the user directory.
if await self.get_user_deactivated_status(user):
try:
if await self.get_user_deactivated_status(user):
return False
except StoreError:
# No such user in the users table. No need to do this when calling
# is_support_user---that returns False if the user is missing.
return False

return True

async def is_room_world_readable_or_publicly_joinable(self, room_id: str) -> bool:
Expand Down
Loading

0 comments on commit 27e6e45

Please sign in to comment.