Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Allow client event serialization to be async #5183

Merged
merged 3 commits into from
May 15, 2019
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Allow client event serialization to be async
erikjohnston committed May 14, 2019

Verified

This commit was signed with the committer’s verified signature.
tomusdrw Tomek Drwięga
commit b54b03f9e1abc1964fe5f00115a165a2b8e10df5
44 changes: 44 additions & 0 deletions synapse/events/utils.py
Original file line number Diff line number Diff line change
@@ -19,7 +19,10 @@

from frozendict import frozendict

from twisted.internet import defer

from synapse.api.constants import EventTypes
from synapse.util.async_helpers import yieldable_gather_results

from . import EventBase

@@ -311,3 +314,44 @@ def serialize_event(e, time_now_ms, as_client_event=True,
d = only_fields(d, only_event_fields)

return d


class EventClientSerializer(object):
"""Serializes events that are to be sent to clients.
This is used for bundling extra information with any events to be sent to
clients.
"""

def __init__(self, hs):
pass

def serialize_event(self, event, time_now, **kwargs):
"""Serializes a single event.
Args:
event (EventBase)
time_now (int): The current time in milliseconds
**kwargs: Arguments to pass to `serialize_event`
Returns:
Deferred[dict]: The serialized event
"""
event = serialize_event(event, time_now, **kwargs)
return defer.succeed(event)

def serialize_events(self, events, time_now, **kwargs):
"""Serializes multiple events.
Args:
event (iter[EventBase])
time_now (int): The current time in milliseconds
**kwargs: Arguments to pass to `serialize_event`
Returns:
Deferred[list[dict]]: The list of serialized events
"""
return yieldable_gather_results(
self.serialize_event, events,
time_now=time_now, **kwargs
)
8 changes: 4 additions & 4 deletions synapse/handlers/events.py
Original file line number Diff line number Diff line change
@@ -21,7 +21,6 @@
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, SynapseError
from synapse.events import EventBase
from synapse.events.utils import serialize_event
from synapse.types import UserID
from synapse.util.logutils import log_function
from synapse.visibility import filter_events_for_client
@@ -50,6 +49,7 @@ def __init__(self, hs):
self.notifier = hs.get_notifier()
self.state = hs.get_state_handler()
self._server_notices_sender = hs.get_server_notices_sender()
self._event_serializer = hs.get_event_client_serializer()

@defer.inlineCallbacks
@log_function
@@ -120,9 +120,9 @@ def get_stream(self, auth_user_id, pagin_config, timeout=0,

time_now = self.clock.time_msec()

chunks = [
serialize_event(e, time_now, as_client_event) for e in events
]
chunks = yield self._event_serializer.serialize_events(
events, time_now, as_client_event=as_client_event,
)

chunk = {
"chunk": chunks,
44 changes: 27 additions & 17 deletions synapse/handlers/initial_sync.py
Original file line number Diff line number Diff line change
@@ -19,7 +19,6 @@

from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, Codes, SynapseError
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
from synapse.handlers.presence import format_user_presence_state
from synapse.streams.config import PaginationConfig
@@ -43,6 +42,7 @@ def __init__(self, hs):
self.clock = hs.get_clock()
self.validator = EventValidator()
self.snapshot_cache = SnapshotCache()
self._event_serializer = hs.get_event_client_serializer()

def snapshot_all_rooms(self, user_id=None, pagin_config=None,
as_client_event=True, include_archived=False):
@@ -138,7 +138,9 @@ def handle_room(event):
d["inviter"] = event.sender

invite_event = yield self.store.get_event(event.event_id)
d["invite"] = serialize_event(invite_event, time_now, as_client_event)
d["invite"] = yield self._event_serializer.serialize_event(
invite_event, time_now, as_client_event,
)

rooms_ret.append(d)

@@ -185,18 +187,21 @@ def handle_room(event):
time_now = self.clock.time_msec()

d["messages"] = {
"chunk": [
serialize_event(m, time_now, as_client_event)
for m in messages
],
"chunk": (
yield self._event_serializer.serialize_events(
messages, time_now=time_now,
as_client_event=as_client_event,
)
),
"start": start_token.to_string(),
"end": end_token.to_string(),
}

d["state"] = [
serialize_event(c, time_now, as_client_event)
for c in current_state.values()
]
d["state"] = yield self._event_serializer.serialize_events(
current_state.values(),
time_now=time_now,
as_client_event=as_client_event
)

account_data_events = []
tags = tags_by_room.get(event.room_id)
@@ -337,11 +342,15 @@ def _room_initial_sync_parted(self, user_id, room_id, pagin_config,
"membership": membership,
"room_id": room_id,
"messages": {
"chunk": [serialize_event(m, time_now) for m in messages],
"chunk": (yield self._event_serializer.serialize_events(
messages, time_now,
)),
"start": start_token.to_string(),
"end": end_token.to_string(),
},
"state": [serialize_event(s, time_now) for s in room_state.values()],
"state": (yield self._event_serializer.serialize_events(
room_state.values(), time_now,
)),
"presence": [],
"receipts": [],
})
@@ -355,10 +364,9 @@ def _room_initial_sync_joined(self, user_id, room_id, pagin_config,

# TODO: These concurrently
time_now = self.clock.time_msec()
state = [
serialize_event(x, time_now)
for x in current_state.values()
]
state = yield self._event_serializer.serialize_events(
current_state.values(), time_now,
)

now_token = yield self.hs.get_event_sources().get_current_token()

@@ -425,7 +433,9 @@ def get_receipts():
ret = {
"room_id": room_id,
"messages": {
"chunk": [serialize_event(m, time_now) for m in messages],
"chunk": (yield self._event_serializer.serialize_events(
messages, time_now,
)),
"start": start_token.to_string(),
"end": end_token.to_string(),
},
7 changes: 4 additions & 3 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
@@ -32,7 +32,6 @@
)
from synapse.api.room_versions import RoomVersions
from synapse.api.urls import ConsentURIBuilder
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
from synapse.storage.state import StateFilter
@@ -57,6 +56,7 @@ def __init__(self, hs):
self.clock = hs.get_clock()
self.state = hs.get_state_handler()
self.store = hs.get_datastore()
self._event_serializer = hs.get_event_client_serializer()

@defer.inlineCallbacks
def get_room_data(self, user_id=None, room_id=None,
@@ -164,9 +164,10 @@ def get_state_events(
room_state = room_state[membership_event_id]

now = self.clock.time_msec()
defer.returnValue(
[serialize_event(c, now) for c in room_state.values()]
events = yield self._event_serializer.serialize_events(
room_state.values(), now,
)
defer.returnValue(events)

@defer.inlineCallbacks
def get_joined_members(self, requester, room_id):
22 changes: 13 additions & 9 deletions synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
@@ -20,7 +20,6 @@

from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import SynapseError
from synapse.events.utils import serialize_event
from synapse.storage.state import StateFilter
from synapse.types import RoomStreamToken
from synapse.util.async_helpers import ReadWriteLock
@@ -78,6 +77,7 @@ def __init__(self, hs):
self._purges_in_progress_by_room = set()
# map from purge id to PurgeStatus
self._purges_by_id = {}
self._event_serializer = hs.get_event_client_serializer()

def start_purge_history(self, room_id, token,
delete_local_events=False):
@@ -278,18 +278,22 @@ def get_messages(self, requester, room_id=None, pagin_config=None,
time_now = self.clock.time_msec()

chunk = {
"chunk": [
serialize_event(e, time_now, as_client_event)
for e in events
],
"chunk": (
yield self._event_serializer.serialize_events(
events, time_now,
as_client_event=as_client_event,
)
),
"start": pagin_config.from_token.to_string(),
"end": next_token.to_string(),
}

if state:
chunk["state"] = [
serialize_event(e, time_now, as_client_event)
for e in state
]
chunk["state"] = (
yield self._event_serializer.serialize_events(
state, time_now,
as_client_event=as_client_event,
)
)

defer.returnValue(chunk)
42 changes: 23 additions & 19 deletions synapse/handlers/search.py
Original file line number Diff line number Diff line change
@@ -23,7 +23,6 @@
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import SynapseError
from synapse.api.filtering import Filter
from synapse.events.utils import serialize_event
from synapse.storage.state import StateFilter
from synapse.visibility import filter_events_for_client

@@ -36,6 +35,7 @@ class SearchHandler(BaseHandler):

def __init__(self, hs):
super(SearchHandler, self).__init__(hs)
self._event_serializer = hs.get_event_client_serializer()

@defer.inlineCallbacks
def get_old_rooms_from_upgraded_room(self, room_id):
@@ -401,14 +401,16 @@ def search(self, user, content, batch=None):
time_now = self.clock.time_msec()

for context in contexts.values():
context["events_before"] = [
serialize_event(e, time_now)
for e in context["events_before"]
]
context["events_after"] = [
serialize_event(e, time_now)
for e in context["events_after"]
]
context["events_before"] = (
yield self._event_serializer.serialize_events(
context["events_before"], time_now,
)
)
context["events_after"] = (
yield self._event_serializer.serialize_events(
context["events_after"], time_now,
)
)

state_results = {}
if include_state:
@@ -422,14 +424,13 @@ def search(self, user, content, batch=None):
# We're now about to serialize the events. We should not make any
# blocking calls after this. Otherwise the 'age' will be wrong

results = [
{
results = []
for e in allowed_events:
results.append({
"rank": rank_map[e.event_id],
"result": serialize_event(e, time_now),
"result": (yield self._event_serializer.serialize_event(e, time_now)),
"context": contexts.get(e.event_id, {}),
}
for e in allowed_events
]
})

rooms_cat_res = {
"results": results,
@@ -438,10 +439,13 @@ def search(self, user, content, batch=None):
}

if state_results:
rooms_cat_res["state"] = {
room_id: [serialize_event(e, time_now) for e in state]
for room_id, state in state_results.items()
}
s = {}
for room_id, state in state_results.items():
s[room_id] = yield self._event_serializer.serialize_events(
state, time_now,
)

rooms_cat_res["state"] = s

if room_groups and "room_id" in group_keys:
rooms_cat_res.setdefault("groups", {})["room_id"] = room_groups
5 changes: 3 additions & 2 deletions synapse/rest/client/v1/events.py
Original file line number Diff line number Diff line change
@@ -19,7 +19,6 @@
from twisted.internet import defer

from synapse.api.errors import SynapseError
from synapse.events.utils import serialize_event
from synapse.streams.config import PaginationConfig

from .base import ClientV1RestServlet, client_path_patterns
@@ -84,6 +83,7 @@ def __init__(self, hs):
super(EventRestServlet, self).__init__(hs)
self.clock = hs.get_clock()
self.event_handler = hs.get_event_handler()
self._event_serializer = hs.get_event_client_serializer()

@defer.inlineCallbacks
def on_GET(self, request, event_id):
@@ -92,7 +92,8 @@ def on_GET(self, request, event_id):

time_now = self.clock.time_msec()
if event:
defer.returnValue((200, serialize_event(event, time_now)))
event = yield self._event_serializer.serialize_event(event, time_now)
defer.returnValue((200, event))
else:
defer.returnValue((404, "Event not found."))

29 changes: 17 additions & 12 deletions synapse/rest/client/v1/room.py
Original file line number Diff line number Diff line change
@@ -26,7 +26,7 @@
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, Codes, SynapseError
from synapse.api.filtering import Filter
from synapse.events.utils import format_event_for_client_v2, serialize_event
from synapse.events.utils import format_event_for_client_v2
from synapse.http.servlet import (
assert_params_in_dict,
parse_integer,
@@ -537,6 +537,7 @@ def __init__(self, hs):
super(RoomEventServlet, self).__init__(hs)
self.clock = hs.get_clock()
self.event_handler = hs.get_event_handler()
self._event_serializer = hs.get_event_client_serializer()

@defer.inlineCallbacks
def on_GET(self, request, room_id, event_id):
@@ -545,7 +546,8 @@ def on_GET(self, request, room_id, event_id):

time_now = self.clock.time_msec()
if event:
defer.returnValue((200, serialize_event(event, time_now)))
event = yield self._event_serializer.serialize_event(event, time_now)
defer.returnValue((200, event))
else:
defer.returnValue((404, "Event not found."))

@@ -559,6 +561,7 @@ def __init__(self, hs):
super(RoomEventContextServlet, self).__init__(hs)
self.clock = hs.get_clock()
self.room_context_handler = hs.get_room_context_handler()
self._event_serializer = hs.get_event_client_serializer()

@defer.inlineCallbacks
def on_GET(self, request, room_id, event_id):
@@ -588,16 +591,18 @@ def on_GET(self, request, room_id, event_id):
)

time_now = self.clock.time_msec()
results["events_before"] = [
serialize_event(event, time_now) for event in results["events_before"]
]
results["event"] = serialize_event(results["event"], time_now)
results["events_after"] = [
serialize_event(event, time_now) for event in results["events_after"]
]
results["state"] = [
serialize_event(event, time_now) for event in results["state"]
]
results["events_before"] = yield self._event_serializer.serialize_events(
results["events_before"], time_now,
)
results["event"] = yield self._event_serializer.serialize_event(
results["event"], time_now,
)
results["events_after"] = yield self._event_serializer.serialize_events(
results["events_after"], time_now,
)
results["state"] = yield self._event_serializer.serialize_events(
results["state"], time_now,
)

defer.returnValue((200, results))

10 changes: 4 additions & 6 deletions synapse/rest/client/v2_alpha/notifications.py
Original file line number Diff line number Diff line change
@@ -17,10 +17,7 @@

from twisted.internet import defer

from synapse.events.utils import (
format_event_for_client_v2_without_room_id,
serialize_event,
)
from synapse.events.utils import format_event_for_client_v2_without_room_id
from synapse.http.servlet import RestServlet, parse_integer, parse_string

from ._base import client_v2_patterns
@@ -36,6 +33,7 @@ def __init__(self, hs):
self.store = hs.get_datastore()
self.auth = hs.get_auth()
self.clock = hs.get_clock()
self._event_serializer = hs.get_event_client_serializer()

@defer.inlineCallbacks
def on_GET(self, request):
@@ -69,11 +67,11 @@ def on_GET(self, request):
"profile_tag": pa["profile_tag"],
"actions": pa["actions"],
"ts": pa["received_ts"],
"event": serialize_event(
"event": (yield self._event_serializer.serialize_event(
notif_events[pa["event_id"]],
self.clock.time_msec(),
event_format=format_event_for_client_v2_without_room_id,
),
)),
}

if pa["room_id"] not in receipts_by_room:
47 changes: 24 additions & 23 deletions synapse/rest/client/v2_alpha/sync.py
Original file line number Diff line number Diff line change
@@ -26,7 +26,6 @@
from synapse.events.utils import (
format_event_for_client_v2_without_room_id,
format_event_raw,
serialize_event,
)
from synapse.handlers.presence import format_user_presence_state
from synapse.handlers.sync import SyncConfig
@@ -86,6 +85,7 @@ def __init__(self, hs):
self.filtering = hs.get_filtering()
self.presence_handler = hs.get_presence_handler()
self._server_notices_sender = hs.get_server_notices_sender()
self._event_serializer = hs.get_event_client_serializer()

@defer.inlineCallbacks
def on_GET(self, request):
@@ -168,33 +168,33 @@ def on_GET(self, request):
)

time_now = self.clock.time_msec()
response_content = self.encode_response(
response_content = yield self.encode_response(
time_now, sync_result, requester.access_token_id, filter
)

defer.returnValue((200, response_content))

@staticmethod
def encode_response(time_now, sync_result, access_token_id, filter):
@defer.inlineCallbacks
def encode_response(self, time_now, sync_result, access_token_id, filter):
if filter.event_format == 'client':
event_formatter = format_event_for_client_v2_without_room_id
elif filter.event_format == 'federation':
event_formatter = format_event_raw
else:
raise Exception("Unknown event format %s" % (filter.event_format, ))

joined = SyncRestServlet.encode_joined(
joined = yield self.encode_joined(
sync_result.joined, time_now, access_token_id,
filter.event_fields,
event_formatter,
)

invited = SyncRestServlet.encode_invited(
invited = yield self.encode_invited(
sync_result.invited, time_now, access_token_id,
event_formatter,
)

archived = SyncRestServlet.encode_archived(
archived = yield self.encode_archived(
sync_result.archived, time_now, access_token_id,
filter.event_fields,
event_formatter,
@@ -239,8 +239,8 @@ def encode_presence(events, time_now):
]
}

@staticmethod
def encode_joined(rooms, time_now, token_id, event_fields, event_formatter):
@defer.inlineCallbacks
def encode_joined(self, rooms, time_now, token_id, event_fields, event_formatter):
"""
Encode the joined rooms in a sync result
@@ -261,15 +261,15 @@ def encode_joined(rooms, time_now, token_id, event_fields, event_formatter):
"""
joined = {}
for room in rooms:
joined[room.room_id] = SyncRestServlet.encode_room(
joined[room.room_id] = yield self.encode_room(
room, time_now, token_id, joined=True, only_fields=event_fields,
event_formatter=event_formatter,
)

return joined

@staticmethod
def encode_invited(rooms, time_now, token_id, event_formatter):
@defer.inlineCallbacks
def encode_invited(self, rooms, time_now, token_id, event_formatter):
"""
Encode the invited rooms in a sync result
@@ -289,7 +289,7 @@ def encode_invited(rooms, time_now, token_id, event_formatter):
"""
invited = {}
for room in rooms:
invite = serialize_event(
invite = yield self._event_serializer.serialize_event(
room.invite, time_now, token_id=token_id,
event_format=event_formatter,
is_invite=True,
@@ -304,8 +304,8 @@ def encode_invited(rooms, time_now, token_id, event_formatter):

return invited

@staticmethod
def encode_archived(rooms, time_now, token_id, event_fields, event_formatter):
@defer.inlineCallbacks
def encode_archived(self, rooms, time_now, token_id, event_fields, event_formatter):
"""
Encode the archived rooms in a sync result
@@ -326,17 +326,17 @@ def encode_archived(rooms, time_now, token_id, event_fields, event_formatter):
"""
joined = {}
for room in rooms:
joined[room.room_id] = SyncRestServlet.encode_room(
joined[room.room_id] = yield self.encode_room(
room, time_now, token_id, joined=False,
only_fields=event_fields,
event_formatter=event_formatter,
)

return joined

@staticmethod
@defer.inlineCallbacks
def encode_room(
room, time_now, token_id, joined,
self, room, time_now, token_id, joined,
only_fields, event_formatter,
):
"""
@@ -355,9 +355,10 @@ def encode_room(
Returns:
dict[str, object]: the room, encoded in our response format
"""
def serialize(event):
return serialize_event(
event, time_now, token_id=token_id,
def serialize(events):
return self._event_serializer.serialize_events(
events, time_now=time_now,
token_id=token_id,
event_format=event_formatter,
only_event_fields=only_fields,
)
@@ -376,8 +377,8 @@ def serialize(event):
event.event_id, room.room_id, event.room_id,
)

serialized_state = [serialize(e) for e in state_events]
serialized_timeline = [serialize(e) for e in timeline_events]
serialized_state = yield serialize(state_events)
serialized_timeline = yield serialize(timeline_events)

account_data = room.account_data

5 changes: 5 additions & 0 deletions synapse/server.py
Original file line number Diff line number Diff line change
@@ -35,6 +35,7 @@
from synapse.crypto.keyring import Keyring
from synapse.events.builder import EventBuilderFactory
from synapse.events.spamcheck import SpamChecker
from synapse.events.utils import EventClientSerializer
from synapse.federation.federation_client import FederationClient
from synapse.federation.federation_server import (
FederationHandlerRegistry,
@@ -185,6 +186,7 @@ def build_DEPENDENCY(self)
'sendmail',
'registration_handler',
'account_validity_handler',
'event_client_serializer',
]

REQUIRED_ON_MASTER_STARTUP = [
@@ -511,6 +513,9 @@ def build_registration_handler(self):
def build_account_validity_handler(self):
return AccountValidityHandler(self)

def build_event_client_serializer(self):
return EventClientSerializer(self)

def remove_pusher(self, app_id, push_key, user_id):
return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)

19 changes: 19 additions & 0 deletions synapse/util/async_helpers.py
Original file line number Diff line number Diff line change
@@ -156,6 +156,25 @@ def _concurrently_execute_inner():
], consumeErrors=True)).addErrback(unwrapFirstError)


def yieldable_gather_results(func, iter, *args, **kwargs):
"""Executes the function with each argument concurrently.
Args:
func (func): Function to execute that returns a Deferred
iter (iter): An iterable that yields items that get passed as the first
argument to the function
*args: Arguments to be passed to each call to func
Returns
Deferred: Resolved when all functions have been invoked, or errors if
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
one of the function calls fails.
"""
return logcontext.make_deferred_yieldable(defer.gatherResults([
run_in_background(func, item, *args, **kwargs)
for item in iter
], consumeErrors=True)).addErrback(unwrapFirstError)


class Linearizer(object):
"""Limits concurrent access to resources based on a key. Useful to ensure
only a few things happen at a time on a given resource.