diff --git a/changelog.d/7673.feature b/changelog.d/7673.feature new file mode 100644 index 000000000000..ecc3ffd8d5fc --- /dev/null +++ b/changelog.d/7673.feature @@ -0,0 +1 @@ +Add a per-room counter for unread messages in responses to `/sync` requests. Implements [MSC2625](https://github.com/matrix-org/matrix-doc/pull/2625). diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 4c7524493ef6..0b82aa72a67e 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1893,6 +1893,9 @@ async def _generate_room_entry( if notifs is not None: unread_notifications["notification_count"] = notifs["notify_count"] unread_notifications["highlight_count"] = notifs["highlight_count"] + unread_notifications["org.matrix.msc2625.unread_count"] = notifs[ + "unread_count" + ] sync_result_builder.joined.append(room_sync) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 43ffe6faf030..5b00602a56ae 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -189,8 +189,11 @@ def action_for_event_by_user(self, event, context): ) if matches: actions = [x for x in rule["actions"] if x != "dont_notify"] - if actions and "notify" in actions: - # Push rules say we should notify the user of this event + if ( + "notify" in actions + or "org.matrix.msc2625.mark_unread" in actions + ): + # Push rules say we should act on this event. actions_by_user[uid] = actions break diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py index 5dae4648c0f7..9f264ca4a433 100644 --- a/synapse/push/push_tools.py +++ b/synapse/push/push_tools.py @@ -39,7 +39,10 @@ def get_badge_count(store, user_id): ) # return one badge count per conversation, as count per # message is so noisy as to be almost useless - badge += 1 if notifs["notify_count"] else 0 + # We're populating this badge using the unread_count (instead of the + # notify_count) as this badge is the number of missed messages, not the + # number of missed notifications. + badge += 1 if notifs["unread_count"] else 0 return badge diff --git a/synapse/rest/client/v1/push_rule.py b/synapse/rest/client/v1/push_rule.py index 9fd490813693..f563b3dc3572 100644 --- a/synapse/rest/client/v1/push_rule.py +++ b/synapse/rest/client/v1/push_rule.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2014-2016 OpenMarket Ltd +# Copyright 2014-2020 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -267,7 +267,7 @@ def _check_actions(actions): raise InvalidRuleException("No actions found") for a in actions: - if a in ["notify", "dont_notify", "coalesce"]: + if a in ["notify", "dont_notify", "coalesce", "org.matrix.msc2625.mark_unread"]: pass elif isinstance(a, dict) and "set_tweak" in a: pass diff --git a/synapse/storage/data_stores/main/event_push_actions.py b/synapse/storage/data_stores/main/event_push_actions.py index bc9f4f08eac4..ba1b33a0a9f9 100644 --- a/synapse/storage/data_stores/main/event_push_actions.py +++ b/synapse/storage/data_stores/main/event_push_actions.py @@ -1,6 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2015 OpenMarket Ltd -# Copyright 2018 New Vector Ltd +# Copyright 2015-2020 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,7 +14,9 @@ # limitations under the License. import logging +from typing import Dict, Tuple +import attr from canonicaljson import json from twisted.internet import defer @@ -36,6 +37,16 @@ ] +@attr.s +class EventPushSummary: + """Summary of pending event push actions for a given user in a given room.""" + + unread_count = attr.ib(type=int) + stream_ordering = attr.ib(type=int) + old_user_id = attr.ib(type=str) + notif_count = attr.ib(type=int) + + def _serialize_action(actions, is_highlight): """Custom serializer for actions. This allows us to "compress" common actions. @@ -122,25 +133,42 @@ def _get_unread_counts_by_receipt_txn( def _get_unread_counts_by_pos_txn(self, txn, room_id, user_id, stream_ordering): - # First get number of notifications. - # We don't need to put a notif=1 clause as all rows always have - # notif=1 + # First get number of actions, grouped on whether the action notifies. sql = ( - "SELECT count(*)" + "SELECT count(*), notif" " FROM event_push_actions ea" " WHERE" " user_id = ?" " AND room_id = ?" " AND stream_ordering > ?" + " GROUP BY notif" ) - txn.execute(sql, (user_id, room_id, stream_ordering)) - row = txn.fetchone() - notify_count = row[0] if row else 0 + rows = txn.fetchall() + + # We should get a maximum number of two rows: one for notif = 0, which is the + # number of actions that contribute to the unread_count but not to the + # notify_count, and one for notif = 1, which is the number of actions that + # contribute to both counters. If one or both rows don't appear, then the + # value for the matching counter should be 0. + unread_count = 0 + notify_count = 0 + for row in rows: + # We always increment unread_count because actions that notify also + # contribute to it. + unread_count += row[0] + if row[1] == 1: + notify_count = row[0] + elif row[1] != 0: + logger.warning( + "Unexpected value %d for column 'notif' in table" + " 'event_push_actions'", + row[1], + ) txn.execute( """ - SELECT notif_count FROM event_push_summary + SELECT notif_count, unread_count FROM event_push_summary WHERE room_id = ? AND user_id = ? AND stream_ordering > ? """, (room_id, user_id, stream_ordering), @@ -148,6 +176,7 @@ def _get_unread_counts_by_pos_txn(self, txn, room_id, user_id, stream_ordering): rows = txn.fetchall() if rows: notify_count += rows[0][0] + unread_count += rows[0][1] # Now get the number of highlights sql = ( @@ -164,7 +193,11 @@ def _get_unread_counts_by_pos_txn(self, txn, room_id, user_id, stream_ordering): row = txn.fetchone() highlight_count = row[0] if row else 0 - return {"notify_count": notify_count, "highlight_count": highlight_count} + return { + "unread_count": unread_count, + "notify_count": notify_count, + "highlight_count": highlight_count, + } @defer.inlineCallbacks def get_push_action_users_in_range(self, min_stream_ordering, max_stream_ordering): @@ -222,6 +255,7 @@ def get_after_receipt(txn): " AND ep.user_id = ?" " AND ep.stream_ordering > ?" " AND ep.stream_ordering <= ?" + " AND ep.notif = 1" " ORDER BY ep.stream_ordering ASC LIMIT ?" ) args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit] @@ -250,6 +284,7 @@ def get_no_receipt(txn): " AND ep.user_id = ?" " AND ep.stream_ordering > ?" " AND ep.stream_ordering <= ?" + " AND ep.notif = 1" " ORDER BY ep.stream_ordering ASC LIMIT ?" ) args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit] @@ -322,6 +357,7 @@ def get_after_receipt(txn): " AND ep.user_id = ?" " AND ep.stream_ordering > ?" " AND ep.stream_ordering <= ?" + " AND ep.notif = 1" " ORDER BY ep.stream_ordering DESC LIMIT ?" ) args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit] @@ -350,6 +386,7 @@ def get_no_receipt(txn): " AND ep.user_id = ?" " AND ep.stream_ordering > ?" " AND ep.stream_ordering <= ?" + " AND ep.notif = 1" " ORDER BY ep.stream_ordering DESC LIMIT ?" ) args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit] @@ -399,7 +436,7 @@ def get_if_maybe_push_in_range_for_user(self, user_id, min_stream_ordering): def _get_if_maybe_push_in_range_for_user_txn(txn): sql = """ SELECT 1 FROM event_push_actions - WHERE user_id = ? AND stream_ordering > ? + WHERE user_id = ? AND stream_ordering > ? AND notif = 1 LIMIT 1 """ @@ -428,14 +465,15 @@ def add_push_actions_to_staging(self, event_id, user_id_actions): return # This is a helper function for generating the necessary tuple that - # can be used to inert into the `event_push_actions_staging` table. + # can be used to insert into the `event_push_actions_staging` table. def _gen_entry(user_id, actions): is_highlight = 1 if _action_has_highlight(actions) else 0 + notif = 0 if "org.matrix.msc2625.mark_unread" in actions else 1 return ( event_id, # event_id column user_id, # user_id column _serialize_action(actions, is_highlight), # actions column - 1, # notif column + notif, # notif column is_highlight, # highlight column ) @@ -817,24 +855,51 @@ def _rotate_notifs_before_txn(self, txn, rotate_to_stream_ordering): # Calculate the new counts that should be upserted into event_push_summary sql = """ SELECT user_id, room_id, - coalesce(old.notif_count, 0) + upd.notif_count, + coalesce(old.%s, 0) + upd.cnt, upd.stream_ordering, old.user_id FROM ( - SELECT user_id, room_id, count(*) as notif_count, + SELECT user_id, room_id, count(*) as cnt, max(stream_ordering) as stream_ordering FROM event_push_actions WHERE ? <= stream_ordering AND stream_ordering < ? AND highlight = 0 + %s GROUP BY user_id, room_id ) AS upd LEFT JOIN event_push_summary AS old USING (user_id, room_id) """ - txn.execute(sql, (old_rotate_stream_ordering, rotate_to_stream_ordering)) - rows = txn.fetchall() + # First get the count of unread messages. + txn.execute( + sql % ("unread_count", ""), + (old_rotate_stream_ordering, rotate_to_stream_ordering), + ) - logger.info("Rotating notifications, handling %d rows", len(rows)) + # We need to merge both lists into a single object because we might not have the + # same amount of rows in each of them. In this case we use a dict indexed on the + # user ID and room ID to make it easier to populate. + summaries = {} # type: Dict[Tuple[str, str], EventPushSummary] + for row in txn: + summaries[(row[0], row[1])] = EventPushSummary( + unread_count=row[2], + stream_ordering=row[3], + old_user_id=row[4], + notif_count=0, + ) + + # Then get the count of notifications. + txn.execute( + sql % ("notif_count", "AND notif = 1"), + (old_rotate_stream_ordering, rotate_to_stream_ordering), + ) + + # notif_rows is populated based on a subset of the query used to populate + # unread_rows, so we can be sure that there will be no KeyError here. + for row in txn: + summaries[(row[0], row[1])].notif_count = row[2] + + logger.info("Rotating notifications, handling %d rows", len(summaries)) # If the `old.user_id` above is NULL then we know there isn't already an # entry in the table, so we simply insert it. Otherwise we update the @@ -844,22 +909,34 @@ def _rotate_notifs_before_txn(self, txn, rotate_to_stream_ordering): table="event_push_summary", values=[ { - "user_id": row[0], - "room_id": row[1], - "notif_count": row[2], - "stream_ordering": row[3], + "user_id": user_id, + "room_id": room_id, + "notif_count": summary.notif_count, + "unread_count": summary.unread_count, + "stream_ordering": summary.stream_ordering, } - for row in rows - if row[4] is None + for ((user_id, room_id), summary) in summaries.items() + if summary.old_user_id is None ], ) txn.executemany( """ - UPDATE event_push_summary SET notif_count = ?, stream_ordering = ? + UPDATE event_push_summary + SET notif_count = ?, unread_count = ?, stream_ordering = ? WHERE user_id = ? AND room_id = ? """, - ((row[2], row[3], row[0], row[1]) for row in rows if row[4] is not None), + ( + ( + summary.notif_count, + summary.unread_count, + summary.stream_ordering, + user_id, + room_id, + ) + for ((user_id, room_id), summary) in summaries.items() + if summary.old_user_id is not None + ), ) txn.execute( diff --git a/synapse/storage/data_stores/main/schema/delta/58/07push_summary_unread_count.sql b/synapse/storage/data_stores/main/schema/delta/58/07push_summary_unread_count.sql new file mode 100644 index 000000000000..f1459ef7f064 --- /dev/null +++ b/synapse/storage/data_stores/main/schema/delta/58/07push_summary_unread_count.sql @@ -0,0 +1,23 @@ +/* Copyright 2020 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Store the number of unread messages, i.e. messages that triggered either a notify +-- action or a mark_unread one. +ALTER TABLE event_push_summary ADD COLUMN unread_count BIGINT NOT NULL DEFAULT 0; + +-- Pre-populate the new column with the count of pending notifications. +-- We expect event_push_summary to be relatively small, so we can do this update +-- synchronously without impacting Synapse's startup time too much. +UPDATE event_push_summary SET unread_count = notif_count; \ No newline at end of file diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py index 1a88c7fb8005..cd8680e8127c 100644 --- a/tests/replication/slave/storage/test_events.py +++ b/tests/replication/slave/storage/test_events.py @@ -160,7 +160,7 @@ def test_push_actions_for_user(self): self.check( "get_unread_event_push_actions_by_room_for_user", [ROOM_ID, USER_ID_2, event1.event_id], - {"highlight_count": 0, "notify_count": 0}, + {"highlight_count": 0, "notify_count": 0, "unread_count": 0}, ) self.persist( @@ -173,7 +173,7 @@ def test_push_actions_for_user(self): self.check( "get_unread_event_push_actions_by_room_for_user", [ROOM_ID, USER_ID_2, event1.event_id], - {"highlight_count": 0, "notify_count": 1}, + {"highlight_count": 0, "notify_count": 1, "unread_count": 1}, ) self.persist( @@ -188,7 +188,20 @@ def test_push_actions_for_user(self): self.check( "get_unread_event_push_actions_by_room_for_user", [ROOM_ID, USER_ID_2, event1.event_id], - {"highlight_count": 1, "notify_count": 2}, + {"highlight_count": 1, "notify_count": 2, "unread_count": 2}, + ) + + self.persist( + type="m.room.message", + msgtype="m.text", + body="world", + push_actions=[(USER_ID_2, ["org.matrix.msc2625.mark_unread"])], + ) + self.replicate() + self.check( + "get_unread_event_push_actions_by_room_for_user", + [ROOM_ID, USER_ID_2, event1.event_id], + {"highlight_count": 1, "notify_count": 2, "unread_count": 3}, ) def test_get_rooms_for_user_with_stream_ordering(self): diff --git a/tests/storage/test_event_push_actions.py b/tests/storage/test_event_push_actions.py index b45bc9c1151f..303dc8571c4c 100644 --- a/tests/storage/test_event_push_actions.py +++ b/tests/storage/test_event_push_actions.py @@ -22,6 +22,10 @@ USER_ID = "@user:example.com" +MARK_UNREAD = [ + "org.matrix.msc2625.mark_unread", + {"set_tweak": "highlight", "value": False}, +] PlAIN_NOTIF = ["notify", {"set_tweak": "highlight", "value": False}] HIGHLIGHT = [ "notify", @@ -55,13 +59,17 @@ def test_count_aggregation(self): user_id = "@user1235:example.com" @defer.inlineCallbacks - def _assert_counts(noitf_count, highlight_count): + def _assert_counts(unread_count, notif_count, highlight_count): counts = yield self.store.db.runInteraction( "", self.store._get_unread_counts_by_pos_txn, room_id, user_id, 0 ) self.assertEquals( counts, - {"notify_count": noitf_count, "highlight_count": highlight_count}, + { + "unread_count": unread_count, + "notify_count": notif_count, + "highlight_count": highlight_count, + }, ) @defer.inlineCallbacks @@ -96,23 +104,23 @@ def _mark_read(stream, depth): stream, ) - yield _assert_counts(0, 0) + yield _assert_counts(0, 0, 0) yield _inject_actions(1, PlAIN_NOTIF) - yield _assert_counts(1, 0) + yield _assert_counts(1, 1, 0) yield _rotate(2) - yield _assert_counts(1, 0) + yield _assert_counts(1, 1, 0) yield _inject_actions(3, PlAIN_NOTIF) - yield _assert_counts(2, 0) + yield _assert_counts(2, 2, 0) yield _rotate(4) - yield _assert_counts(2, 0) + yield _assert_counts(2, 2, 0) yield _inject_actions(5, PlAIN_NOTIF) yield _mark_read(3, 3) - yield _assert_counts(1, 0) + yield _assert_counts(1, 1, 0) yield _mark_read(5, 5) - yield _assert_counts(0, 0) + yield _assert_counts(0, 0, 0) yield _inject_actions(6, PlAIN_NOTIF) yield _rotate(7) @@ -121,17 +129,22 @@ def _mark_read(stream, depth): table="event_push_actions", keyvalues={"1": 1}, desc="" ) - yield _assert_counts(1, 0) + yield _assert_counts(1, 1, 0) yield _mark_read(7, 7) - yield _assert_counts(0, 0) + yield _assert_counts(0, 0, 0) - yield _inject_actions(8, HIGHLIGHT) - yield _assert_counts(1, 1) + yield _inject_actions(8, MARK_UNREAD) + yield _assert_counts(1, 0, 0) yield _rotate(9) - yield _assert_counts(1, 1) - yield _rotate(10) - yield _assert_counts(1, 1) + yield _assert_counts(1, 0, 0) + + yield _inject_actions(10, HIGHLIGHT) + yield _assert_counts(2, 1, 1) + yield _rotate(11) + yield _assert_counts(2, 1, 1) + yield _rotate(12) + yield _assert_counts(2, 1, 1) @defer.inlineCallbacks def test_find_first_stream_ordering_after_ts(self):