diff --git a/changelog.d/13918.misc b/changelog.d/13918.misc new file mode 100644 index 000000000000..b03f6f42e5fd --- /dev/null +++ b/changelog.d/13918.misc @@ -0,0 +1 @@ +Use new receipts column to optimise receipt and push action SQL queries. Contributed by Nick @ Beeper (@fizzadar). diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 39556481ffc9..aa65e030b2c2 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -867,9 +867,8 @@ def _get_receipts_by_room_txn( ) sql = f""" - SELECT room_id, thread_id, MAX(stream_ordering) + SELECT room_id, thread_id, MAX(event_stream_ordering) FROM receipts_linearized - INNER JOIN events USING (room_id, event_id) WHERE {receipt_types_clause} AND user_id = ? GROUP BY room_id, thread_id @@ -1348,11 +1347,10 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool: ) sql = """ - SELECT r.stream_id, r.room_id, r.user_id, r.thread_id, e.stream_ordering - FROM receipts_linearized AS r - INNER JOIN events AS e USING (event_id) - WHERE ? < r.stream_id AND r.stream_id <= ? AND user_id LIKE ? - ORDER BY r.stream_id ASC + SELECT stream_id, room_id, user_id, thread_id, event_stream_ordering + FROM receipts_linearized + WHERE ? < stream_id AND stream_id <= ? AND user_id LIKE ? + ORDER BY stream_id ASC LIMIT ? """ diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 56e8eb16a896..34cf79fa9261 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -38,6 +38,7 @@ LoggingDatabaseConnection, LoggingTransaction, ) +from synapse.storage.databases.main.stream import StreamWorkerStore from synapse.storage.engines import PostgresEngine from synapse.storage.engines._base import IsolationLevel from synapse.storage.util.id_generators import ( @@ -61,7 +62,7 @@ logger = logging.getLogger(__name__) -class ReceiptsWorkerStore(SQLBaseStore): +class ReceiptsWorkerStore(StreamWorkerStore, SQLBaseStore): def __init__( self, database: DatabasePool, @@ -172,14 +173,13 @@ def get_last_unthreaded_receipt_for_user_txn( ) sql = f""" - SELECT event_id, stream_ordering + SELECT event_id, event_stream_ordering FROM receipts_linearized - INNER JOIN events USING (room_id, event_id) WHERE {clause} AND user_id = ? AND room_id = ? AND thread_id IS NULL - ORDER BY stream_ordering DESC + ORDER BY event_stream_ordering DESC LIMIT 1 """ @@ -701,6 +701,16 @@ def _insert_linearized_receipt_txn( allow_none=True, ) + # Local user receipts must have an event_stream_ordering so that push action + # summarisation works correctly. This should always be the case because any + # local user should only receive events from this server. This exception + # protects against bad actors sending dodgy receipts. + if res is None and self.hs.is_mine_id(user_id): + raise ValueError( + "Local users cannot send receipts for unknown events, " + f"roomID={room_id}, eventID={event_id}", + ) + stream_ordering = int(res["stream_ordering"]) if res else None rx_ts = res["received_ts"] if res else 0 @@ -708,16 +718,15 @@ def _insert_linearized_receipt_txn( # have to compare orderings of existing receipts if stream_ordering is not None: if thread_id is None: - thread_clause = "r.thread_id IS NULL" + thread_clause = "thread_id IS NULL" thread_args: Tuple[str, ...] = () else: - thread_clause = "r.thread_id = ?" + thread_clause = "thread_id = ?" thread_args = (thread_id,) sql = f""" - SELECT stream_ordering, event_id FROM events - INNER JOIN receipts_linearized AS r USING (event_id, room_id) - WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ? AND {thread_clause} + SELECT event_stream_ordering, event_id FROM receipts_linearized + WHERE room_id = ? AND receipt_type = ? AND user_id = ? AND {thread_clause} """ txn.execute( sql, @@ -730,7 +739,8 @@ def _insert_linearized_receipt_txn( ) for so, eid in txn: - if int(so) >= stream_ordering: + # Guard against old receipts with no `event_stream_ordering` + if so and int(so) >= stream_ordering: logger.debug( "Ignoring new receipt for %s in favour of existing " "one for later event %s", diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index 78646cb5dccc..c67883a51d25 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -613,7 +613,7 @@ def test_sending_read_receipt_batches_to_application_services(self) -> None: ], ApplicationService.NS_ROOMS: [ { - "regex": "!fakeroom_.*", + "regex": ".*", "exclusive": True, } ], @@ -622,16 +622,22 @@ def test_sending_read_receipt_batches_to_application_services(self) -> None: # Now, pretend that we receive a large burst of read receipts (300 total) that # all come in at once. - for i in range(300): + for _ in range(300): + room_id = self.helper.create_room_as( + self.local_user, tok=self.local_user_token + ) + resp = self.helper.send(room_id, tok=self.local_user_token) + event_id = resp["event_id"] + self.get_success( # Insert a fake read receipt into the database self.hs.get_datastores().main.insert_receipt( # We have to use unique room ID + user ID combinations here, as the db query # is an upsert. - room_id=f"!fakeroom_{i}:test", + room_id=room_id, receipt_type="m.read", user_id=self.local_user, - event_ids=[f"$eventid_{i}"], + event_ids=[event_id], thread_id=None, data={}, ) diff --git a/tests/rest/client/test_receipts.py b/tests/rest/client/test_receipts.py index ec638c89b722..f8f931449b87 100644 --- a/tests/rest/client/test_receipts.py +++ b/tests/rest/client/test_receipts.py @@ -30,6 +30,7 @@ class ReceiptsTestCase(unittest.HomeserverTestCase): servlets = [ login.register_servlets, receipts.register_servlets, + room.register_servlets, synapse.rest.admin.register_servlets, room.register_servlets, sync.register_servlets,