Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Port replication http server endpoints to async/await #6274

Merged
merged 3 commits into from
Oct 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/6274.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Port replication http server endpoints to async/await.
6 changes: 3 additions & 3 deletions synapse/replication/http/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,14 @@ def _serialize_payload(**kwargs):
return {}

@abc.abstractmethod
def _handle_request(self, request, **kwargs):
async def _handle_request(self, request, **kwargs):
"""Handle incoming request.

This is called with the request object and PATH_ARGS.

Returns:
Deferred[dict]: A JSON serialisable dict to be used as response
body of request.
tuple[int, dict]: HTTP status code and a JSON serialisable dict
to be used as response body of request.
"""
pass

Expand Down
24 changes: 9 additions & 15 deletions synapse/replication/http/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,7 @@ def _serialize_payload(store, event_and_contexts, backfilled):

return payload

@defer.inlineCallbacks
def _handle_request(self, request):
async def _handle_request(self, request):
with Measure(self.clock, "repl_fed_send_events_parse"):
content = parse_json_object_from_request(request)

Expand All @@ -101,15 +100,13 @@ def _handle_request(self, request):
EventType = event_type_from_format_version(format_ver)
event = EventType(event_dict, internal_metadata, rejected_reason)

context = yield EventContext.deserialize(
self.store, event_payload["context"]
)
context = EventContext.deserialize(self.store, event_payload["context"])

event_and_contexts.append((event, context))

logger.info("Got %d events from federation", len(event_and_contexts))

yield self.federation_handler.persist_events_and_notify(
await self.federation_handler.persist_events_and_notify(
event_and_contexts, backfilled
)

Expand Down Expand Up @@ -144,8 +141,7 @@ def __init__(self, hs):
def _serialize_payload(edu_type, origin, content):
return {"origin": origin, "content": content}

@defer.inlineCallbacks
def _handle_request(self, request, edu_type):
async def _handle_request(self, request, edu_type):
with Measure(self.clock, "repl_fed_send_edu_parse"):
content = parse_json_object_from_request(request)

Expand All @@ -154,7 +150,7 @@ def _handle_request(self, request, edu_type):

logger.info("Got %r edu from %s", edu_type, origin)

result = yield self.registry.on_edu(edu_type, origin, edu_content)
result = await self.registry.on_edu(edu_type, origin, edu_content)

return 200, result

Expand Down Expand Up @@ -193,16 +189,15 @@ def _serialize_payload(query_type, args):
"""
return {"args": args}

@defer.inlineCallbacks
def _handle_request(self, request, query_type):
async def _handle_request(self, request, query_type):
with Measure(self.clock, "repl_fed_query_parse"):
content = parse_json_object_from_request(request)

args = content["args"]

logger.info("Got %r query", query_type)

result = yield self.registry.on_query(query_type, args)
result = await self.registry.on_query(query_type, args)

return 200, result

Expand Down Expand Up @@ -234,9 +229,8 @@ def _serialize_payload(room_id, args):
"""
return {}

@defer.inlineCallbacks
def _handle_request(self, request, room_id):
yield self.store.clean_room_for_join(room_id)
async def _handle_request(self, request, room_id):
await self.store.clean_room_for_join(room_id)

return 200, {}

Expand Down
7 changes: 2 additions & 5 deletions synapse/replication/http/login.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@

import logging

from twisted.internet import defer

from synapse.http.servlet import parse_json_object_from_request
from synapse.replication.http._base import ReplicationEndpoint

Expand Down Expand Up @@ -52,15 +50,14 @@ def _serialize_payload(user_id, device_id, initial_display_name, is_guest):
"is_guest": is_guest,
}

@defer.inlineCallbacks
def _handle_request(self, request, user_id):
async def _handle_request(self, request, user_id):
content = parse_json_object_from_request(request)

device_id = content["device_id"]
initial_display_name = content["initial_display_name"]
is_guest = content["is_guest"]

device_id, access_token = yield self.registration_handler.register_device(
device_id, access_token = await self.registration_handler.register_device(
user_id, device_id, initial_display_name, is_guest
)

Expand Down
14 changes: 5 additions & 9 deletions synapse/replication/http/membership.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@

import logging

from twisted.internet import defer

from synapse.http.servlet import parse_json_object_from_request
from synapse.replication.http._base import ReplicationEndpoint
from synapse.types import Requester, UserID
Expand Down Expand Up @@ -65,8 +63,7 @@ def _serialize_payload(requester, room_id, user_id, remote_room_hosts, content):
"content": content,
}

@defer.inlineCallbacks
def _handle_request(self, request, room_id, user_id):
async def _handle_request(self, request, room_id, user_id):
content = parse_json_object_from_request(request)

remote_room_hosts = content["remote_room_hosts"]
Expand All @@ -79,7 +76,7 @@ def _handle_request(self, request, room_id, user_id):

logger.info("remote_join: %s into room: %s", user_id, room_id)

yield self.federation_handler.do_invite_join(
await self.federation_handler.do_invite_join(
remote_room_hosts, room_id, user_id, event_content
)

Expand Down Expand Up @@ -123,8 +120,7 @@ def _serialize_payload(requester, room_id, user_id, remote_room_hosts):
"remote_room_hosts": remote_room_hosts,
}

@defer.inlineCallbacks
def _handle_request(self, request, room_id, user_id):
async def _handle_request(self, request, room_id, user_id):
content = parse_json_object_from_request(request)

remote_room_hosts = content["remote_room_hosts"]
Expand All @@ -137,7 +133,7 @@ def _handle_request(self, request, room_id, user_id):
logger.info("remote_reject_invite: %s out of room: %s", user_id, room_id)

try:
event = yield self.federation_handler.do_remotely_reject_invite(
event = await self.federation_handler.do_remotely_reject_invite(
remote_room_hosts, room_id, user_id
)
ret = event.get_pdu_json()
Expand All @@ -150,7 +146,7 @@ def _handle_request(self, request, room_id, user_id):
#
logger.warn("Failed to reject invite: %s", e)

yield self.store.locally_reject_invite(user_id, room_id)
await self.store.locally_reject_invite(user_id, room_id)
ret = {}

return 200, ret
Expand Down
12 changes: 4 additions & 8 deletions synapse/replication/http/register.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@

import logging

from twisted.internet import defer

from synapse.http.servlet import parse_json_object_from_request
from synapse.replication.http._base import ReplicationEndpoint

Expand Down Expand Up @@ -74,11 +72,10 @@ def _serialize_payload(
"address": address,
}

@defer.inlineCallbacks
def _handle_request(self, request, user_id):
async def _handle_request(self, request, user_id):
content = parse_json_object_from_request(request)

yield self.registration_handler.register_with_store(
await self.registration_handler.register_with_store(
user_id=user_id,
password_hash=content["password_hash"],
was_guest=content["was_guest"],
Expand Down Expand Up @@ -117,14 +114,13 @@ def _serialize_payload(user_id, auth_result, access_token):
"""
return {"auth_result": auth_result, "access_token": access_token}

@defer.inlineCallbacks
def _handle_request(self, request, user_id):
async def _handle_request(self, request, user_id):
content = parse_json_object_from_request(request)

auth_result = content["auth_result"]
access_token = content["access_token"]

yield self.registration_handler.post_registration_actions(
await self.registration_handler.post_registration_actions(
user_id=user_id, auth_result=auth_result, access_token=access_token
)

Expand Down
7 changes: 3 additions & 4 deletions synapse/replication/http/send_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ def _serialize_payload(

return payload

@defer.inlineCallbacks
def _handle_request(self, request, event_id):
async def _handle_request(self, request, event_id):
with Measure(self.clock, "repl_send_event_parse"):
content = parse_json_object_from_request(request)

Expand All @@ -101,7 +100,7 @@ def _handle_request(self, request, event_id):
event = EventType(event_dict, internal_metadata, rejected_reason)

requester = Requester.deserialize(self.store, content["requester"])
context = yield EventContext.deserialize(self.store, content["context"])
context = EventContext.deserialize(self.store, content["context"])

ratelimit = content["ratelimit"]
extra_users = [UserID.from_string(u) for u in content["extra_users"]]
Expand All @@ -113,7 +112,7 @@ def _handle_request(self, request, event_id):
"Got event to send with ID: %s into room: %s", event.event_id, event.room_id
)

yield self.event_creation_handler.persist_and_notify_client_event(
await self.event_creation_handler.persist_and_notify_client_event(
requester, event, context, ratelimit=ratelimit, extra_users=extra_users
)

Expand Down