Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
speed up /members and add at= and membership params (#3568)
Browse files Browse the repository at this point in the history
  • Loading branch information
ara4n authored and richvdh committed Aug 15, 2018
1 parent dc56c47 commit 2f78f43
Show file tree
Hide file tree
Showing 8 changed files with 174 additions and 19 deletions.
1 change: 1 addition & 0 deletions changelog.d/3331.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
add support for the include_redundant_members filter param as per MSC1227
1 change: 1 addition & 0 deletions changelog.d/3567.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
make the /context API filter & lazy-load aware as per MSC1227
1 change: 1 addition & 0 deletions changelog.d/3568.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
speed up /members API and add `at` and `membership` params as per MSC1227
88 changes: 76 additions & 12 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,13 @@
from twisted.internet.defer import succeed

from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
from synapse.api.errors import AuthError, Codes, ConsentNotGivenError, SynapseError
from synapse.api.errors import (
AuthError,
Codes,
ConsentNotGivenError,
NotFoundError,
SynapseError,
)
from synapse.api.urls import ConsentURIBuilder
from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.events.utils import serialize_event
Expand All @@ -36,6 +42,7 @@
from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.logcontext import run_in_background
from synapse.util.metrics import measure_func
from synapse.visibility import filter_events_for_client

from ._base import BaseHandler

Expand Down Expand Up @@ -82,28 +89,85 @@ def get_room_data(self, user_id=None, room_id=None,
defer.returnValue(data)

@defer.inlineCallbacks
def get_state_events(self, user_id, room_id, is_guest=False):
def get_state_events(
self, user_id, room_id, types=None, filtered_types=None,
at_token=None, is_guest=False,
):
"""Retrieve all state events for a given room. If the user is
joined to the room then return the current state. If the user has
left the room return the state events from when they left.
left the room return the state events from when they left. If an explicit
'at' parameter is passed, return the state events as of that event, if
visible.
Args:
user_id(str): The user requesting state events.
room_id(str): The room ID to get all state events from.
types(list[(str, str|None)]|None): List of (type, state_key) tuples
which are used to filter the state fetched. If `state_key` is None,
all events are returned of the given type.
May be None, which matches any key.
filtered_types(list[str]|None): Only apply filtering via `types` to this
list of event types. Other types of events are returned unfiltered.
If None, `types` filtering is applied to all events.
at_token(StreamToken|None): the stream token of the at which we are requesting
the stats. If the user is not allowed to view the state as of that
stream token, we raise a 403 SynapseError. If None, returns the current
state based on the current_state_events table.
is_guest(bool): whether this user is a guest
Returns:
A list of dicts representing state events. [{}, {}, {}]
Raises:
NotFoundError (404) if the at token does not yield an event
AuthError (403) if the user doesn't have permission to view
members of this room.
"""
membership, membership_event_id = yield self.auth.check_in_room_or_world_readable(
room_id, user_id
)
if at_token:
# FIXME this claims to get the state at a stream position, but
# get_recent_events_for_room operates by topo ordering. This therefore
# does not reliably give you the state at the given stream position.
# (https://github.com/matrix-org/synapse/issues/3305)
last_events, _ = yield self.store.get_recent_events_for_room(
room_id, end_token=at_token.room_key, limit=1,
)

if membership == Membership.JOIN:
room_state = yield self.state.get_current_state(room_id)
elif membership == Membership.LEAVE:
room_state = yield self.store.get_state_for_events(
[membership_event_id], None
if not last_events:
raise NotFoundError("Can't find event for token %s" % (at_token, ))

visible_events = yield filter_events_for_client(
self.store, user_id, last_events,
)

event = last_events[0]
if visible_events:
room_state = yield self.store.get_state_for_events(
[event.event_id], types, filtered_types=filtered_types,
)
room_state = room_state[event.event_id]
else:
raise AuthError(
403,
"User %s not allowed to view events in room %s at token %s" % (
user_id, room_id, at_token,
)
)
else:
membership, membership_event_id = (
yield self.auth.check_in_room_or_world_readable(
room_id, user_id,
)
)
room_state = room_state[membership_event_id]

if membership == Membership.JOIN:
state_ids = yield self.store.get_filtered_current_state_ids(
room_id, types, filtered_types=filtered_types,
)
room_state = yield self.store.get_events(state_ids.values())
elif membership == Membership.LEAVE:
room_state = yield self.store.get_state_for_events(
[membership_event_id], types, filtered_types=filtered_types,
)
room_state = room_state[membership_event_id]

now = self.clock.time_msec()
defer.returnValue(
Expand Down
32 changes: 29 additions & 3 deletions synapse/rest/client/v1/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
parse_string,
)
from synapse.streams.config import PaginationConfig
from synapse.types import RoomAlias, RoomID, ThirdPartyInstanceID, UserID
from synapse.types import RoomAlias, RoomID, StreamToken, ThirdPartyInstanceID, UserID

from .base import ClientV1RestServlet, client_path_patterns

Expand Down Expand Up @@ -384,15 +384,39 @@ def __init__(self, hs):
def on_GET(self, request, room_id):
# TODO support Pagination stream API (limit/tokens)
requester = yield self.auth.get_user_by_req(request)
events = yield self.message_handler.get_state_events(
handler = self.message_handler

# request the state as of a given event, as identified by a stream token,
# for consistency with /messages etc.
# useful for getting the membership in retrospect as of a given /sync
# response.
at_token_string = parse_string(request, "at")
if at_token_string is None:
at_token = None
else:
at_token = StreamToken.from_string(at_token_string)

# let you filter down on particular memberships.
# XXX: this may not be the best shape for this API - we could pass in a filter
# instead, except filters aren't currently aware of memberships.
# See https://github.com/matrix-org/matrix-doc/issues/1337 for more details.
membership = parse_string(request, "membership")
not_membership = parse_string(request, "not_membership")

events = yield handler.get_state_events(
room_id=room_id,
user_id=requester.user.to_string(),
at_token=at_token,
types=[(EventTypes.Member, None)],
)

chunk = []

for event in events:
if event["type"] != EventTypes.Member:
if (
(membership and event['content'].get("membership") != membership) or
(not_membership and event['content'].get("membership") == not_membership)
):
continue
chunk.append(event)

Expand All @@ -401,6 +425,8 @@ def on_GET(self, request, room_id):
}))


# deprecated in favour of /members?membership=join?
# except it does custom AS logic and has a simpler return format
class JoinedRoomMemberListRestServlet(ClientV1RestServlet):
PATTERNS = client_path_patterns("/rooms/(?P<room_id>[^/]*)/joined_members$")

Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1911,7 +1911,7 @@ def _purge_history_txn(
max_depth = max(row[0] for row in rows)

if max_depth <= token.topological:
# We need to ensure we don't delete all the events from the datanase
# We need to ensure we don't delete all the events from the database
# otherwise we wouldn't be able to send any events (due to not
# having any backwards extremeties)
raise SynapseError(
Expand Down
66 changes: 64 additions & 2 deletions synapse/storage/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,69 @@ def _get_current_state_ids_txn(txn):
_get_current_state_ids_txn,
)

# FIXME: how should this be cached?
def get_filtered_current_state_ids(self, room_id, types, filtered_types=None):
"""Get the current state event of a given type for a room based on the
current_state_events table. This may not be as up-to-date as the result
of doing a fresh state resolution as per state_handler.get_current_state
Args:
room_id (str)
types (list[(Str, (Str|None))]): List of (type, state_key) tuples
which are used to filter the state fetched. `state_key` may be
None, which matches any `state_key`
filtered_types (list[Str]|None): List of types to apply the above filter to.
Returns:
deferred: dict of (type, state_key) -> event
"""

include_other_types = False if filtered_types is None else True

def _get_filtered_current_state_ids_txn(txn):
results = {}
sql = """SELECT type, state_key, event_id FROM current_state_events
WHERE room_id = ? %s"""
# Turns out that postgres doesn't like doing a list of OR's and
# is about 1000x slower, so we just issue a query for each specific
# type seperately.
if types:
clause_to_args = [
(
"AND type = ? AND state_key = ?",
(etype, state_key)
) if state_key is not None else (
"AND type = ?",
(etype,)
)
for etype, state_key in types
]

if include_other_types:
unique_types = set(filtered_types)
clause_to_args.append(
(
"AND type <> ? " * len(unique_types),
list(unique_types)
)
)
else:
# If types is None we fetch all the state, and so just use an
# empty where clause with no extra args.
clause_to_args = [("", [])]
for where_clause, where_args in clause_to_args:
args = [room_id]
args.extend(where_args)
txn.execute(sql % (where_clause,), args)
for row in txn:
typ, state_key, event_id = row
key = (intern_string(typ), intern_string(state_key))
results[key] = event_id
return results

return self.runInteraction(
"get_filtered_current_state_ids",
_get_filtered_current_state_ids_txn,
)

@cached(max_entries=10000, iterable=True)
def get_state_group_delta(self, state_group):
"""Given a state group try to return a previous group and a delta between
Expand Down Expand Up @@ -389,8 +452,7 @@ def get_state_for_events(self, event_ids, types, filtered_types=None):
If None, `types` filtering is applied to all events.
Returns:
deferred: A list of dicts corresponding to the event_ids given.
The dicts are mappings from (type, state_key) -> state_events
deferred: A dict of (event_id) -> (type, state_key) -> [state_events]
"""
event_to_groups = yield self._get_state_group_for_events(
event_ids,
Expand Down
2 changes: 1 addition & 1 deletion tests/storage/test_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def test_get_state_for_event(self):
{(e3.type, e3.state_key): e3, (e5.type, e5.state_key): e5}, state
)

# check we can use filter_types to grab a specific room member
# check we can use filtered_types to grab a specific room member
# without filtering out the other event types
state = yield self.store.get_state_for_event(
e5.event_id,
Expand Down

0 comments on commit 2f78f43

Please sign in to comment.