From f530e1791c842bedc5f38629e58fff9b5c7484ce Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 9 Sep 2020 10:42:26 +0100 Subject: [PATCH 1/8] Use max_room_stream_id more Using the event persisted stream ID sort of doesn't make much sense, as it may be *after* the current token. --- synapse/notifier.py | 32 ++++++++++++++++++++------------ 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index 71f2370874b3..25d6bd976ff7 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -280,33 +280,41 @@ def _notify_pending_new_room_events(self, max_room_stream_id: int): (room_stream_id, event, extra_users) ) else: - self._on_new_room_event(event, room_stream_id, extra_users) + self._on_new_room_event(event, max_room_stream_id, extra_users) + + self._on_updated_room_token(max_room_stream_id) def _on_new_room_event( self, event: EventBase, - room_stream_id: int, + max_room_stream_id: int, extra_users: Collection[Union[str, UserID]] = [], ): """Notify any user streams that are interested in this room event""" - # poke any interested application service. - run_as_background_process( - "notify_app_services", self._notify_app_services, room_stream_id - ) - - if self.federation_sender: - self.federation_sender.notify_new_events(room_stream_id) if event.type == EventTypes.Member and event.membership == Membership.JOIN: self._user_joined_room(event.state_key, event.room_id) self.on_new_event( - "room_key", room_stream_id, users=extra_users, rooms=[event.room_id] + "room_key", max_room_stream_id, users=extra_users, rooms=[event.room_id] ) - async def _notify_app_services(self, room_stream_id: int): + def _on_updated_room_token(self, max_room_stream_id: int): + """Poke services that might care that the room position has been + updated. + """ + + # poke any interested application service. + run_as_background_process( + "notify_app_services", self._notify_app_services, max_room_stream_id + ) + + if self.federation_sender: + self.federation_sender.notify_new_events(max_room_stream_id) + + async def _notify_app_services(self, max_room_stream_id: int): try: - await self.appservice_handler.notify_interested_services(room_stream_id) + await self.appservice_handler.notify_interested_services(max_room_stream_id) except Exception: logger.exception("Error notifying application services of event") From 00cbe427ad09e1053e029b9bef50a24dd48834b0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 9 Sep 2020 16:39:38 +0100 Subject: [PATCH 2/8] Move pusher pool notification to notifier --- synapse/handlers/federation.py | 3 --- synapse/handlers/message.py | 4 ---- synapse/notifier.py | 11 +++++++++++ synapse/replication/tcp/client.py | 4 ---- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index be9b0701a062..c195eba83001 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -128,7 +128,6 @@ def __init__(self, hs): self.keyring = hs.get_keyring() self.action_generator = hs.get_action_generator() self.is_mine_id = hs.is_mine_id - self.pusher_pool = hs.get_pusherpool() self.spam_checker = hs.get_spam_checker() self.event_creation_handler = hs.get_event_creation_handler() self._message_handler = hs.get_message_handler() @@ -2939,8 +2938,6 @@ async def _notify_persisted_event( event, event_stream_id, max_stream_id, extra_users=extra_users ) - await self.pusher_pool.on_new_notifications(max_stream_id) - async def _clean_room_for_join(self, room_id: str) -> None: """Called to clean up any data in DB for a given room, ready for the server to join the room. diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index d1556659e3e5..276de8f8d090 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -387,8 +387,6 @@ def __init__(self, hs: "HomeServer"): # This is only used to get at ratelimit function, and maybe_kick_guest_users self.base_handler = BaseHandler(hs) - self.pusher_pool = hs.get_pusherpool() - # We arbitrarily limit concurrent event creation for a room to 5. # This is to stop us from diverging history *too* much. self.limiter = Linearizer(max_count=5, name="room_event_creation_limit") @@ -1145,8 +1143,6 @@ def is_inviter_member_event(e): # If there's an expiry timestamp on the event, schedule its expiry. self._message_handler.maybe_schedule_expiry(event) - await self.pusher_pool.on_new_notifications(max_stream_id) - def _notify(): try: self.notifier.on_new_room_event( diff --git a/synapse/notifier.py b/synapse/notifier.py index 25d6bd976ff7..98179acac55e 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -198,6 +198,7 @@ def __init__(self, hs: "synapse.server.HomeServer"): self.clock = hs.get_clock() self.appservice_handler = hs.get_application_service_handler() + self._pusher_pool = hs.get_pusherpool() self.federation_sender = None if hs.should_send_federation(): @@ -309,6 +310,10 @@ def _on_updated_room_token(self, max_room_stream_id: int): "notify_app_services", self._notify_app_services, max_room_stream_id ) + run_as_background_process( + "_notify_pusher_pool", self._notify_pusher_pool, max_room_stream_id + ) + if self.federation_sender: self.federation_sender.notify_new_events(max_room_stream_id) @@ -318,6 +323,12 @@ async def _notify_app_services(self, max_room_stream_id: int): except Exception: logger.exception("Error notifying application services of event") + async def _notify_pusher_pool(self, max_room_stream_id: int): + try: + await self._pusher_pool.on_new_notifications(max_room_stream_id) + except Exception: + logger.exception("Error pusher pool of event") + def on_new_event( self, stream_key: str, diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index ccd3147dfdaf..a608a90583b6 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -98,7 +98,6 @@ class ReplicationDataHandler: def __init__(self, hs: "HomeServer"): self.store = hs.get_datastore() - self.pusher_pool = hs.get_pusherpool() self.notifier = hs.get_notifier() self._reactor = hs.get_reactor() self._clock = hs.get_clock() @@ -154,9 +153,6 @@ async def on_rdata( max_token = self.store.get_room_max_stream_ordering() self.notifier.on_new_room_event(event, token, max_token, extra_users) - max_token = self.store.get_room_max_stream_ordering() - await self.pusher_pool.on_new_notifications(max_token) - # Notify any waiting deferreds. The list is ordered by position so we # just iterate through the list until we reach a position that is # greater than the received row position. From d283fcdc1a88641cf158d526f8bf944429c4b006 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 9 Sep 2020 16:41:18 +0100 Subject: [PATCH 3/8] Add type to pusherpool --- synapse/push/pusherpool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index fa8473bf8d00..cc839ffce43d 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -184,7 +184,7 @@ async def remove_pushers_by_access_token(self, user_id, access_tokens): ) await self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"]) - async def on_new_notifications(self, max_stream_id): + async def on_new_notifications(self, max_stream_id: int): if not self.pushers: # nothing to do here. return From 9b723168e099843208d2e4769790e91c5f7ff2ce Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 9 Sep 2020 17:24:41 +0100 Subject: [PATCH 4/8] Refactor _on_new_room_event to reduce duplicate calls There is no point in repeatedly calling `on_new_event` with the same `max_stream_token`. --- synapse/notifier.py | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index 98179acac55e..bd2e959eec3a 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -275,30 +275,30 @@ def _notify_pending_new_room_events(self, max_room_stream_id: int): """ pending = self.pending_new_room_events self.pending_new_room_events = [] + + users = set() # type: Set[Union[str, UserID]] + rooms = set() # type: Set[str] + for room_stream_id, event, extra_users in pending: if room_stream_id > max_room_stream_id: self.pending_new_room_events.append( (room_stream_id, event, extra_users) ) else: - self._on_new_room_event(event, max_room_stream_id, extra_users) - - self._on_updated_room_token(max_room_stream_id) - - def _on_new_room_event( - self, - event: EventBase, - max_room_stream_id: int, - extra_users: Collection[Union[str, UserID]] = [], - ): - """Notify any user streams that are interested in this room event""" - - if event.type == EventTypes.Member and event.membership == Membership.JOIN: - self._user_joined_room(event.state_key, event.room_id) - - self.on_new_event( - "room_key", max_room_stream_id, users=extra_users, rooms=[event.room_id] - ) + if ( + event.type == EventTypes.Member + and event.membership == Membership.JOIN + ): + self._user_joined_room(event.state_key, event.room_id) + + users.update(extra_users) + rooms.add(event.room_id) + + if users or rooms: + self.on_new_event( + "room_key", max_room_stream_id, users=extra_users, rooms=[event.room_id] + ) + self._on_updated_room_token(max_room_stream_id) def _on_updated_room_token(self, max_room_stream_id: int): """Poke services that might care that the room position has been From 05741e336f3f6322792450ac6007483ce74245e7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 9 Sep 2020 17:28:37 +0100 Subject: [PATCH 5/8] Newsfile --- changelog.d/8288.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/8288.misc diff --git a/changelog.d/8288.misc b/changelog.d/8288.misc new file mode 100644 index 000000000000..c08a53a5ee27 --- /dev/null +++ b/changelog.d/8288.misc @@ -0,0 +1 @@ +Refactor notifier code to correctly use the max event stream position. From 996eec4a706caf320613386e2e48c2ecf254eff7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 10 Sep 2020 10:42:32 +0100 Subject: [PATCH 6/8] Update synapse/notifier.py Co-authored-by: Patrick Cloke --- synapse/notifier.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index bd2e959eec3a..c4e0497009ac 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -307,7 +307,7 @@ def _on_updated_room_token(self, max_room_stream_id: int): # poke any interested application service. run_as_background_process( - "notify_app_services", self._notify_app_services, max_room_stream_id + "_notify_app_services", self._notify_app_services, max_room_stream_id ) run_as_background_process( From 8080c65607be42d839e69d9b717ab713892ca34e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 10 Sep 2020 10:50:05 +0100 Subject: [PATCH 7/8] Fix up types --- synapse/notifier.py | 9 ++++----- synapse/replication/tcp/client.py | 5 +++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index c4e0497009ac..49540e4a4def 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -25,7 +25,6 @@ Set, Tuple, TypeVar, - Union, ) from prometheus_client import Counter @@ -187,7 +186,7 @@ def __init__(self, hs: "synapse.server.HomeServer"): self.store = hs.get_datastore() self.pending_new_room_events = ( [] - ) # type: List[Tuple[int, EventBase, Collection[Union[str, UserID]]]] + ) # type: List[Tuple[int, EventBase, Collection[UserID]]] # Called when there are new things to stream over replication self.replication_callbacks = [] # type: List[Callable[[], None]] @@ -248,7 +247,7 @@ def on_new_room_event( event: EventBase, room_stream_id: int, max_room_stream_id: int, - extra_users: Collection[Union[str, UserID]] = [], + extra_users: Collection[UserID] = [], ): """ Used by handlers to inform the notifier something has happened in the room, room event wise. @@ -276,7 +275,7 @@ def _notify_pending_new_room_events(self, max_room_stream_id: int): pending = self.pending_new_room_events self.pending_new_room_events = [] - users = set() # type: Set[Union[str, UserID]] + users = set() # type: Set[UserID] rooms = set() # type: Set[str] for room_stream_id, event, extra_users in pending: @@ -333,7 +332,7 @@ def on_new_event( self, stream_key: str, new_token: int, - users: Collection[Union[str, UserID]] = [], + users: Collection[UserID] = [], rooms: Collection[str] = [], ): """ Used to inform listeners that something has happened event wise. diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index a608a90583b6..e82b9e386f2a 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -29,6 +29,7 @@ EventsStreamEventRow, EventsStreamRow, ) +from synapse.types import UserID from synapse.util.async_helpers import timeout_deferred from synapse.util.metrics import Measure @@ -147,9 +148,9 @@ async def on_rdata( if event.rejected_reason: continue - extra_users = () # type: Tuple[str, ...] + extra_users = () # type: Tuple[UserID, ...] if event.type == EventTypes.Member: - extra_users = (event.state_key,) + extra_users = (UserID.from_string(event.state_key),) max_token = self.store.get_room_max_stream_ordering() self.notifier.on_new_room_event(event, token, max_token, extra_users) From f99e3d2ab772e0aecd1bb2100551b08d72cd3a1a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 10 Sep 2020 10:50:36 +0100 Subject: [PATCH 8/8] Actually use the batched users and rooms vars --- synapse/notifier.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index 49540e4a4def..16f19c938eef 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -294,9 +294,7 @@ def _notify_pending_new_room_events(self, max_room_stream_id: int): rooms.add(event.room_id) if users or rooms: - self.on_new_event( - "room_key", max_room_stream_id, users=extra_users, rooms=[event.room_id] - ) + self.on_new_event("room_key", max_room_stream_id, users=users, rooms=rooms) self._on_updated_room_token(max_room_stream_id) def _on_updated_room_token(self, max_room_stream_id: int):