Skip to content

Commit

Permalink
Always update AS last_pos, even on no events
Browse files Browse the repository at this point in the history
Fixes matrix-org#1834

Signed-off-by: Willem Mulder <[email protected]>
  • Loading branch information
14mRh4X0r committed Jun 2, 2021
1 parent 3ff6fe2 commit 91a7f7b
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 17 deletions.
1 change: 1 addition & 0 deletions changelog.d/10107.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed a bug that could cause Synapse to stop notifying application services. Contributed by Willem Mulder.
25 changes: 12 additions & 13 deletions synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,17 +87,15 @@ async def _notify_interested_services(self, max_token: RoomStreamToken):
self.is_processing = True
try:
limit = 100
while True:
upper_bound = -1
while upper_bound < self.current_max:
(
upper_bound,
events,
) = await self.store.get_new_events_for_appservice(
self.current_max, limit
)

if not events:
break

events_by_room = {} # type: Dict[str, List[EventBase]]
for event in events:
events_by_room.setdefault(event.room_id, []).append(event)
Expand Down Expand Up @@ -153,9 +151,6 @@ async def handle_room_events(events):

await self.store.set_appservice_last_pos(upper_bound)

now = self.clock.time_msec()
ts = await self.store.get_received_ts(events[-1].event_id)

synapse.metrics.event_processing_positions.labels(
"appservice_sender"
).set(upper_bound)
Expand All @@ -168,12 +163,16 @@ async def handle_room_events(events):

event_processing_loop_counter.labels("appservice_sender").inc()

synapse.metrics.event_processing_lag.labels(
"appservice_sender"
).set(now - ts)
synapse.metrics.event_processing_last_ts.labels(
"appservice_sender"
).set(ts)
if events:
now = self.clock.time_msec()
ts = await self.store.get_received_ts(events[-1].event_id)

synapse.metrics.event_processing_lag.labels(
"appservice_sender"
).set(now - ts)
synapse.metrics.event_processing_last_ts.labels(
"appservice_sender"
).set(ts)
finally:
self.is_processing = False

Expand Down
6 changes: 2 additions & 4 deletions tests/handlers/test_appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ def test_notify_interested_services(self):
sender="@someone:anywhere", type="m.room.message", room_id="!foo:bar"
)
self.mock_store.get_new_events_for_appservice.side_effect = [
make_awaitable((0, [event])),
make_awaitable((0, [])),
make_awaitable((1, [event])),
]
self.handler.notify_interested_services(RoomStreamToken(None, 0))
self.handler.notify_interested_services(RoomStreamToken(None, 1))

self.mock_scheduler.submit_event_for_as.assert_called_once_with(
interested_service, event
Expand All @@ -77,7 +77,6 @@ def test_query_user_exists_unknown_user(self):
self.mock_as_api.query_user.return_value = make_awaitable(True)
self.mock_store.get_new_events_for_appservice.side_effect = [
make_awaitable((0, [event])),
make_awaitable((0, [])),
]

self.handler.notify_interested_services(RoomStreamToken(None, 0))
Expand All @@ -95,7 +94,6 @@ def test_query_user_exists_known_user(self):
self.mock_as_api.query_user.return_value = make_awaitable(True)
self.mock_store.get_new_events_for_appservice.side_effect = [
make_awaitable((0, [event])),
make_awaitable((0, [])),
]

self.handler.notify_interested_services(RoomStreamToken(None, 0))
Expand Down

0 comments on commit 91a7f7b

Please sign in to comment.