Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Implement unread counter (MSC2625) (#7673)
Browse files Browse the repository at this point in the history
  • Loading branch information
babolivier authored Jun 17, 2020
2 parents e452973 + 6efb2b0 commit 46613aa
Show file tree
Hide file tree
Showing 9 changed files with 187 additions and 51 deletions.
1 change: 1 addition & 0 deletions changelog.d/7673.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add a per-room counter for unread messages in responses to `/sync` requests. Implements [MSC2625](https://github.com/matrix-org/matrix-doc/pull/2625).
3 changes: 3 additions & 0 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -1893,6 +1893,9 @@ async def _generate_room_entry(
if notifs is not None:
unread_notifications["notification_count"] = notifs["notify_count"]
unread_notifications["highlight_count"] = notifs["highlight_count"]
unread_notifications["org.matrix.msc2625.unread_count"] = notifs[
"unread_count"
]

sync_result_builder.joined.append(room_sync)

Expand Down
7 changes: 5 additions & 2 deletions synapse/push/bulk_push_rule_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,11 @@ def action_for_event_by_user(self, event, context):
)
if matches:
actions = [x for x in rule["actions"] if x != "dont_notify"]
if actions and "notify" in actions:
# Push rules say we should notify the user of this event
if (
"notify" in actions
or "org.matrix.msc2625.mark_unread" in actions
):
# Push rules say we should act on this event.
actions_by_user[uid] = actions
break

Expand Down
5 changes: 4 additions & 1 deletion synapse/push/push_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ def get_badge_count(store, user_id):
)
# return one badge count per conversation, as count per
# message is so noisy as to be almost useless
badge += 1 if notifs["notify_count"] else 0
# We're populating this badge using the unread_count (instead of the
# notify_count) as this badge is the number of missed messages, not the
# number of missed notifications.
badge += 1 if notifs["unread_count"] else 0
return badge


Expand Down
4 changes: 2 additions & 2 deletions synapse/rest/client/v1/push_rule.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2014-2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -267,7 +267,7 @@ def _check_actions(actions):
raise InvalidRuleException("No actions found")

for a in actions:
if a in ["notify", "dont_notify", "coalesce"]:
if a in ["notify", "dont_notify", "coalesce", "org.matrix.msc2625.mark_unread"]:
pass
elif isinstance(a, dict) and "set_tweak" in a:
pass
Expand Down
131 changes: 104 additions & 27 deletions synapse/storage/data_stores/main/event_push_actions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# -*- coding: utf-8 -*-
# Copyright 2015 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
# Copyright 2015-2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -15,7 +14,9 @@
# limitations under the License.

import logging
from typing import Dict, Tuple

import attr
from canonicaljson import json

from twisted.internet import defer
Expand All @@ -36,6 +37,16 @@
]


@attr.s
class EventPushSummary:
"""Summary of pending event push actions for a given user in a given room."""

unread_count = attr.ib(type=int)
stream_ordering = attr.ib(type=int)
old_user_id = attr.ib(type=str)
notif_count = attr.ib(type=int)


def _serialize_action(actions, is_highlight):
"""Custom serializer for actions. This allows us to "compress" common actions.
Expand Down Expand Up @@ -122,32 +133,50 @@ def _get_unread_counts_by_receipt_txn(

def _get_unread_counts_by_pos_txn(self, txn, room_id, user_id, stream_ordering):

# First get number of notifications.
# We don't need to put a notif=1 clause as all rows always have
# notif=1
# First get number of actions, grouped on whether the action notifies.
sql = (
"SELECT count(*)"
"SELECT count(*), notif"
" FROM event_push_actions ea"
" WHERE"
" user_id = ?"
" AND room_id = ?"
" AND stream_ordering > ?"
" GROUP BY notif"
)

txn.execute(sql, (user_id, room_id, stream_ordering))
row = txn.fetchone()
notify_count = row[0] if row else 0
rows = txn.fetchall()

# We should get a maximum number of two rows: one for notif = 0, which is the
# number of actions that contribute to the unread_count but not to the
# notify_count, and one for notif = 1, which is the number of actions that
# contribute to both counters. If one or both rows don't appear, then the
# value for the matching counter should be 0.
unread_count = 0
notify_count = 0
for row in rows:
# We always increment unread_count because actions that notify also
# contribute to it.
unread_count += row[0]
if row[1] == 1:
notify_count = row[0]
elif row[1] != 0:
logger.warning(
"Unexpected value %d for column 'notif' in table"
" 'event_push_actions'",
row[1],
)

txn.execute(
"""
SELECT notif_count FROM event_push_summary
SELECT notif_count, unread_count FROM event_push_summary
WHERE room_id = ? AND user_id = ? AND stream_ordering > ?
""",
(room_id, user_id, stream_ordering),
)
rows = txn.fetchall()
if rows:
notify_count += rows[0][0]
unread_count += rows[0][1]

# Now get the number of highlights
sql = (
Expand All @@ -164,7 +193,11 @@ def _get_unread_counts_by_pos_txn(self, txn, room_id, user_id, stream_ordering):
row = txn.fetchone()
highlight_count = row[0] if row else 0

return {"notify_count": notify_count, "highlight_count": highlight_count}
return {
"unread_count": unread_count,
"notify_count": notify_count,
"highlight_count": highlight_count,
}

@defer.inlineCallbacks
def get_push_action_users_in_range(self, min_stream_ordering, max_stream_ordering):
Expand Down Expand Up @@ -222,6 +255,7 @@ def get_after_receipt(txn):
" AND ep.user_id = ?"
" AND ep.stream_ordering > ?"
" AND ep.stream_ordering <= ?"
" AND ep.notif = 1"
" ORDER BY ep.stream_ordering ASC LIMIT ?"
)
args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit]
Expand Down Expand Up @@ -250,6 +284,7 @@ def get_no_receipt(txn):
" AND ep.user_id = ?"
" AND ep.stream_ordering > ?"
" AND ep.stream_ordering <= ?"
" AND ep.notif = 1"
" ORDER BY ep.stream_ordering ASC LIMIT ?"
)
args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit]
Expand Down Expand Up @@ -322,6 +357,7 @@ def get_after_receipt(txn):
" AND ep.user_id = ?"
" AND ep.stream_ordering > ?"
" AND ep.stream_ordering <= ?"
" AND ep.notif = 1"
" ORDER BY ep.stream_ordering DESC LIMIT ?"
)
args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit]
Expand Down Expand Up @@ -350,6 +386,7 @@ def get_no_receipt(txn):
" AND ep.user_id = ?"
" AND ep.stream_ordering > ?"
" AND ep.stream_ordering <= ?"
" AND ep.notif = 1"
" ORDER BY ep.stream_ordering DESC LIMIT ?"
)
args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit]
Expand Down Expand Up @@ -399,7 +436,7 @@ def get_if_maybe_push_in_range_for_user(self, user_id, min_stream_ordering):
def _get_if_maybe_push_in_range_for_user_txn(txn):
sql = """
SELECT 1 FROM event_push_actions
WHERE user_id = ? AND stream_ordering > ?
WHERE user_id = ? AND stream_ordering > ? AND notif = 1
LIMIT 1
"""

Expand Down Expand Up @@ -428,14 +465,15 @@ def add_push_actions_to_staging(self, event_id, user_id_actions):
return

# This is a helper function for generating the necessary tuple that
# can be used to inert into the `event_push_actions_staging` table.
# can be used to insert into the `event_push_actions_staging` table.
def _gen_entry(user_id, actions):
is_highlight = 1 if _action_has_highlight(actions) else 0
notif = 0 if "org.matrix.msc2625.mark_unread" in actions else 1
return (
event_id, # event_id column
user_id, # user_id column
_serialize_action(actions, is_highlight), # actions column
1, # notif column
notif, # notif column
is_highlight, # highlight column
)

Expand Down Expand Up @@ -817,24 +855,51 @@ def _rotate_notifs_before_txn(self, txn, rotate_to_stream_ordering):
# Calculate the new counts that should be upserted into event_push_summary
sql = """
SELECT user_id, room_id,
coalesce(old.notif_count, 0) + upd.notif_count,
coalesce(old.%s, 0) + upd.cnt,
upd.stream_ordering,
old.user_id
FROM (
SELECT user_id, room_id, count(*) as notif_count,
SELECT user_id, room_id, count(*) as cnt,
max(stream_ordering) as stream_ordering
FROM event_push_actions
WHERE ? <= stream_ordering AND stream_ordering < ?
AND highlight = 0
%s
GROUP BY user_id, room_id
) AS upd
LEFT JOIN event_push_summary AS old USING (user_id, room_id)
"""

txn.execute(sql, (old_rotate_stream_ordering, rotate_to_stream_ordering))
rows = txn.fetchall()
# First get the count of unread messages.
txn.execute(
sql % ("unread_count", ""),
(old_rotate_stream_ordering, rotate_to_stream_ordering),
)

logger.info("Rotating notifications, handling %d rows", len(rows))
# We need to merge both lists into a single object because we might not have the
# same amount of rows in each of them. In this case we use a dict indexed on the
# user ID and room ID to make it easier to populate.
summaries = {} # type: Dict[Tuple[str, str], EventPushSummary]
for row in txn:
summaries[(row[0], row[1])] = EventPushSummary(
unread_count=row[2],
stream_ordering=row[3],
old_user_id=row[4],
notif_count=0,
)

# Then get the count of notifications.
txn.execute(
sql % ("notif_count", "AND notif = 1"),
(old_rotate_stream_ordering, rotate_to_stream_ordering),
)

# notif_rows is populated based on a subset of the query used to populate
# unread_rows, so we can be sure that there will be no KeyError here.
for row in txn:
summaries[(row[0], row[1])].notif_count = row[2]

logger.info("Rotating notifications, handling %d rows", len(summaries))

# If the `old.user_id` above is NULL then we know there isn't already an
# entry in the table, so we simply insert it. Otherwise we update the
Expand All @@ -844,22 +909,34 @@ def _rotate_notifs_before_txn(self, txn, rotate_to_stream_ordering):
table="event_push_summary",
values=[
{
"user_id": row[0],
"room_id": row[1],
"notif_count": row[2],
"stream_ordering": row[3],
"user_id": user_id,
"room_id": room_id,
"notif_count": summary.notif_count,
"unread_count": summary.unread_count,
"stream_ordering": summary.stream_ordering,
}
for row in rows
if row[4] is None
for ((user_id, room_id), summary) in summaries.items()
if summary.old_user_id is None
],
)

txn.executemany(
"""
UPDATE event_push_summary SET notif_count = ?, stream_ordering = ?
UPDATE event_push_summary
SET notif_count = ?, unread_count = ?, stream_ordering = ?
WHERE user_id = ? AND room_id = ?
""",
((row[2], row[3], row[0], row[1]) for row in rows if row[4] is not None),
(
(
summary.notif_count,
summary.unread_count,
summary.stream_ordering,
user_id,
room_id,
)
for ((user_id, room_id), summary) in summaries.items()
if summary.old_user_id is not None
),
)

txn.execute(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/* Copyright 2020 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- Store the number of unread messages, i.e. messages that triggered either a notify
-- action or a mark_unread one.
ALTER TABLE event_push_summary ADD COLUMN unread_count BIGINT NOT NULL DEFAULT 0;

-- Pre-populate the new column with the count of pending notifications.
-- We expect event_push_summary to be relatively small, so we can do this update
-- synchronously without impacting Synapse's startup time too much.
UPDATE event_push_summary SET unread_count = notif_count;
19 changes: 16 additions & 3 deletions tests/replication/slave/storage/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ def test_push_actions_for_user(self):
self.check(
"get_unread_event_push_actions_by_room_for_user",
[ROOM_ID, USER_ID_2, event1.event_id],
{"highlight_count": 0, "notify_count": 0},
{"highlight_count": 0, "notify_count": 0, "unread_count": 0},
)

self.persist(
Expand All @@ -173,7 +173,7 @@ def test_push_actions_for_user(self):
self.check(
"get_unread_event_push_actions_by_room_for_user",
[ROOM_ID, USER_ID_2, event1.event_id],
{"highlight_count": 0, "notify_count": 1},
{"highlight_count": 0, "notify_count": 1, "unread_count": 1},
)

self.persist(
Expand All @@ -188,7 +188,20 @@ def test_push_actions_for_user(self):
self.check(
"get_unread_event_push_actions_by_room_for_user",
[ROOM_ID, USER_ID_2, event1.event_id],
{"highlight_count": 1, "notify_count": 2},
{"highlight_count": 1, "notify_count": 2, "unread_count": 2},
)

self.persist(
type="m.room.message",
msgtype="m.text",
body="world",
push_actions=[(USER_ID_2, ["org.matrix.msc2625.mark_unread"])],
)
self.replicate()
self.check(
"get_unread_event_push_actions_by_room_for_user",
[ROOM_ID, USER_ID_2, event1.event_id],
{"highlight_count": 1, "notify_count": 2, "unread_count": 3},
)

def test_get_rooms_for_user_with_stream_ordering(self):
Expand Down
Loading

0 comments on commit 46613aa

Please sign in to comment.