Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Clean up duplicate receipts when creating unique indexes
Browse files Browse the repository at this point in the history
Before creating the `receipts_graph_unique_index` and
`receipts_linearized_unique_index` unique indexes, we have to clean up
any duplicate receipts that may have crept in due to
#14406.

Signed-off-by: Sean Quah <[email protected]>
  • Loading branch information
Sean Quah committed Nov 15, 2022
1 parent b1d191d commit 50f26cd
Show file tree
Hide file tree
Showing 2 changed files with 259 additions and 18 deletions.
108 changes: 90 additions & 18 deletions synapse/storage/databases/main/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,24 +113,6 @@ def __init__(
prefilled_cache=receipts_stream_prefill,
)

self.db_pool.updates.register_background_index_update(
"receipts_linearized_unique_index",
index_name="receipts_linearized_unique_index",
table="receipts_linearized",
columns=["room_id", "receipt_type", "user_id"],
where_clause="thread_id IS NULL",
unique=True,
)

self.db_pool.updates.register_background_index_update(
"receipts_graph_unique_index",
index_name="receipts_graph_unique_index",
table="receipts_graph",
columns=["room_id", "receipt_type", "user_id"],
where_clause="thread_id IS NULL",
unique=True,
)

def get_max_receipt_stream_id(self) -> int:
"""Get the current max stream ID for receipts stream"""
return self._receipts_id_gen.get_current_token()
Expand Down Expand Up @@ -864,6 +846,8 @@ def _insert_graph_receipt_txn(

class ReceiptsBackgroundUpdateStore(SQLBaseStore):
POPULATE_RECEIPT_EVENT_STREAM_ORDERING = "populate_event_stream_ordering"
RECEIPTS_LINEARIZED_UNIQUE_INDEX_UPDATE_NAME = "receipts_linearized_unique_index"
RECEIPTS_GRAPH_UNIQUE_INDEX_UPDATE_NAME = "receipts_graph_unique_index"

def __init__(
self,
Expand All @@ -877,6 +861,14 @@ def __init__(
self.POPULATE_RECEIPT_EVENT_STREAM_ORDERING,
self._populate_receipt_event_stream_ordering,
)
self.db_pool.updates.register_background_update_handler(
self.RECEIPTS_LINEARIZED_UNIQUE_INDEX_UPDATE_NAME,
self._background_receipts_linearized_unique_index,
)
self.db_pool.updates.register_background_update_handler(
self.RECEIPTS_GRAPH_UNIQUE_INDEX_UPDATE_NAME,
self._background_receipts_graph_unique_index,
)

async def _populate_receipt_event_stream_ordering(
self, progress: JsonDict, batch_size: int
Expand Down Expand Up @@ -932,6 +924,86 @@ def _populate_receipt_event_stream_ordering_txn(

return batch_size

async def _background_receipts_unique_index(
self, update_name: str, index_name: str, table: str
) -> int:
"""Adds a unique index on `(room_id, receipt_type, user_id)` to the given
receipts table, for non-thread receipts.
"""
def _receipts_unique_index_txn(txn: LoggingTransaction) -> None:
# Identify any duplicate receipts arising from
# https://github.com/matrix-org/synapse/issues/14406.
# We expect the following query to use the per-thread receipt index and take
# less than a minute.
sql = f"""
SELECT room_id, receipt_type, user_id FROM {table}
WHERE thread_id IS NULL
GROUP BY room_id, receipt_type, user_id
HAVING COUNT(*) > 1
"""
txn.execute(sql)
duplicate_keys = cast(List[Tuple[str, str, str]], list(txn))

# Then remove all duplicate receipts.
# We could be clever and try to keep the latest receipt out of every set of
# duplicates, but it's far simpler to remove them all.
for room_id, receipt_type, user_id in duplicate_keys:
sql = f"""
DELETE FROM {table}
WHERE
room_id = ? AND
receipt_type = ? AND
user_id = ? AND
thread_id IS NULL
"""
txn.execute(sql, (room_id, receipt_type, user_id))

# Now that the duplicates are gone, we can create the index.
concurrently = (
"CONCURRENTLY"
if isinstance(self.database_engine, PostgresEngine)
else ""
)
sql = f"""
CREATE UNIQUE INDEX {concurrently} {index_name}
ON {table}(room_id, receipt_type, user_id)
WHERE thread_id IS NULL
"""
txn.execute(sql)

await self.db_pool.runInteraction(
update_name,
_receipts_unique_index_txn,
)

await self.db_pool.updates._end_background_update(update_name)

return 1

async def _background_receipts_linearized_unique_index(
self, progress: dict, batch_size: int
) -> int:
"""Adds a unique index on `(room_id, receipt_type, user_id)` to
`receipts_linearized`, for non-thread receipts.
"""
return await self._background_receipts_unique_index(
self.RECEIPTS_LINEARIZED_UNIQUE_INDEX_UPDATE_NAME,
"receipts_linearized_unique_index",
"receipts_linearized",
)

async def _background_receipts_graph_unique_index(
self, progress: dict, batch_size: int
) -> int:
"""Adds a unique index on `(room_id, receipt_type, user_id)` to
`receipts_graph`, for non-thread receipts.
"""
return await self._background_receipts_unique_index(
self.RECEIPTS_GRAPH_UNIQUE_INDEX_UPDATE_NAME,
"receipts_graph_unique_index",
"receipts_graph",
)


class ReceiptsStore(ReceiptsWorkerStore, ReceiptsBackgroundUpdateStore):
pass
169 changes: 169 additions & 0 deletions tests/storage/databases/main/test_receipts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
# Copyright 2022 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Any, Dict

from twisted.test.proto_helpers import MemoryReactor

from synapse.rest import admin
from synapse.rest.client import login, room
from synapse.server import HomeServer
from synapse.storage.database import LoggingTransaction
from synapse.util import Clock

from tests.unittest import HomeserverTestCase


class ReceiptsBackgroundUpdateStoreTestCase(HomeserverTestCase):

servlets = [
admin.register_servlets,
room.register_servlets,
login.register_servlets,
]

def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer):
self.store = hs.get_datastores().main
self.user_id = self.register_user("foo", "pass")
self.token = self.login("foo", "pass")
self.room_id = self.helper.create_room_as(self.user_id, tok=self.token)
self.other_room_id = self.helper.create_room_as(self.user_id, tok=self.token)

def _test_background_receipts_unique_index(
self,
update_name: str,
index_name: str,
table: str,
values: Dict[str, Any],
):
"""Test that the background update to uniqueify non-thread receipts in
the given receipts table works properly.
"""
# First, undo the background update.
def drop_receipts_unique_index(txn: LoggingTransaction) -> None:
txn.execute(f"DROP INDEX IF EXISTS {index_name}")

self.get_success(
self.store.db_pool.runInteraction(
"drop_receipts_unique_index",
drop_receipts_unique_index,
)
)

# Add duplicate receipts for `room_id`.
for _ in range(2):
self.get_success(
self.store.db_pool.simple_insert(
table,
{
"room_id": self.room_id,
"receipt_type": "m.read",
"user_id": self.user_id,
"thread_id": None,
"data": "{}",
**values,
},
)
)

# Add a unique receipt for `other_room_id`.
self.get_success(
self.store.db_pool.simple_insert(
table,
{
"room_id": self.other_room_id,
"receipt_type": "m.read",
"user_id": self.user_id,
"thread_id": None,
"data": "{}",
**values,
},
)
)

# Insert and run the background update.
self.get_success(
self.store.db_pool.simple_insert(
"background_updates",
{
"update_name": update_name,
"progress_json": "{}",
},
)
)

self.store.db_pool.updates._all_done = False

self.wait_for_background_updates()

# Check that the background task deleted the duplicate receipts.
res = self.get_success(
self.store.db_pool.simple_select_onecol(
table=table,
keyvalues={
"room_id": self.room_id,
"receipt_type": "m.read",
"user_id": self.user_id,
# `simple_select_onecol` does not support NULL filters,
# so skip the filter on `thread_id`.
},
retcol="room_id",
desc="get_receipt",
)
)
self.assertEqual(0, len(res))

# Check that the background task did not delete the unique receipts.
res = self.get_success(
self.store.db_pool.simple_select_onecol(
table=table,
keyvalues={
"room_id": self.other_room_id,
"receipt_type": "m.read",
"user_id": self.user_id,
# `simple_select_onecol` does not support NULL filters,
# so skip the filter on `thread_id`.
},
retcol="room_id",
desc="get_receipt",
)
)
self.assertEqual(1, len(res))

def test_background_receipts_linearized_unique_index(self):
"""Test that the background update to uniqueify non-thread receipts in
`receipts_linearized` works properly.
"""
self._test_background_receipts_unique_index(
"receipts_linearized_unique_index",
"receipts_linearized_unique_index",
"receipts_linearized",
{
"stream_id": 5,
"event_id": "$some_event",
},
)

def test_background_receipts_graph_unique_index(self):
"""Test that the background update to uniqueify non-thread receipts in
`receipts_graph` works properly.
"""
self._test_background_receipts_unique_index(
"receipts_graph_unique_index",
"receipts_graph_unique_index",
"receipts_graph",
{
"event_ids": '["$some_event"]',
},
)

0 comments on commit 50f26cd

Please sign in to comment.