From 17c43d1f4f5894931b220b46dd7e5f6436e6b8d4 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 9 Dec 2022 12:05:22 +0000 Subject: [PATCH 1/4] Factor out index creation method from `register_background_index_update` Signed-off-by: Sean Quah --- synapse/storage/background_updates.py | 55 ++++++++++++++++++++++----- 1 file changed, 46 insertions(+), 9 deletions(-) diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 2056ecb2c331..a99aea89261f 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -544,6 +544,48 @@ def register_background_index_update( The named index will be dropped upon completion of the new index. """ + async def updater(progress: JsonDict, batch_size: int) -> int: + await self.create_index_in_background( + index_name=index_name, + table=table, + columns=columns, + where_clause=where_clause, + unique=unique, + psql_only=psql_only, + replaces_index=replaces_index, + ) + await self._end_background_update(update_name) + return 1 + + self._background_update_handlers[update_name] = _BackgroundUpdateHandler( + updater, oneshot=True + ) + + async def create_index_in_background( + self, + index_name: str, + table: str, + columns: Iterable[str], + where_clause: Optional[str] = None, + unique: bool = False, + psql_only: bool = False, + replaces_index: Optional[str] = None, + ) -> None: + """Add an index in the background. + + Args: + update_name: update_name to register for + index_name: name of index to add + table: table to add index to + columns: columns/expressions to include in index + where_clause: A WHERE clause to specify a partial unique index. + unique: true to make a UNIQUE index + psql_only: true to only create this index on psql databases (useful + for virtual sqlite tables) + replaces_index: The name of an index that this index replaces. + The named index will be dropped upon completion of the new index. + """ + def create_index_psql(conn: Connection) -> None: conn.rollback() # postgres insists on autocommit for the index @@ -618,16 +660,11 @@ def create_index_sqlite(conn: Connection) -> None: else: runner = create_index_sqlite - async def updater(progress: JsonDict, batch_size: int) -> int: - if runner is not None: - logger.info("Adding index %s to %s", index_name, table) - await self.db_pool.runWithConnection(runner) - await self._end_background_update(update_name) - return 1 + if runner is None: + return - self._background_update_handlers[update_name] = _BackgroundUpdateHandler( - updater, oneshot=True - ) + logger.info("Adding index %s to %s", index_name, table) + await self.db_pool.runWithConnection(runner) async def _end_background_update(self, update_name: str) -> None: """Removes a completed background update task from the queue. From 5e14dcb9d604cbaca1829c1c4372346ad3820530 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 9 Dec 2022 13:28:16 +0000 Subject: [PATCH 2/4] Handle half-created indices in receipts index background update When Synapse is terminated while running the background update to create the `receipts_graph` or `receipts_linearized` indexes, the indexes may be successfully created (or marked as invalid on postgres) while the background update remains unfinished. When Synapse next starts up, the background update will fail because the index already exists, or exists but is invalid on postgres. Use the existing code to create indices in background updates, since it handles these edge cases. Signed-off-by: Sean Quah --- synapse/storage/databases/main/receipts.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index a580e4bddaf2..0a7a72f120dd 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -999,9 +999,12 @@ def _remote_duplicate_receipts_txn(txn: LoggingTransaction) -> None: _remote_duplicate_receipts_txn, ) - await self._create_receipts_index( - "receipts_linearized_unique_index", - "receipts_linearized", + await self.db_pool.updates.create_index_in_background( + index_name="receipts_linearized_unique_index", + table="receipts_linearized", + columns=["room_id", "receipt_type", "user_id"], + where_clause="thread_id IS NULL", + unique=True, ) await self.db_pool.updates._end_background_update( @@ -1050,9 +1053,12 @@ def _remote_duplicate_receipts_txn(txn: LoggingTransaction) -> None: _remote_duplicate_receipts_txn, ) - await self._create_receipts_index( - "receipts_graph_unique_index", - "receipts_graph", + await self.db_pool.updates.create_index_in_background( + index_name="receipts_graph_unique_index", + table="receipts_graph", + columns=["room_id", "receipt_type", "user_id"], + where_clause="thread_id IS NULL", + unique=True, ) await self.db_pool.updates._end_background_update( From 55723abf12e53b3eff5c4fcf909e76a64b38eeaa Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 9 Dec 2022 13:54:08 +0000 Subject: [PATCH 3/4] Add newsfile --- changelog.d/14650.bugfix | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 changelog.d/14650.bugfix diff --git a/changelog.d/14650.bugfix b/changelog.d/14650.bugfix new file mode 100644 index 000000000000..5e18641bf711 --- /dev/null +++ b/changelog.d/14650.bugfix @@ -0,0 +1,2 @@ +Fix a bug introduced in Synapse 1.72.0 where the background updates to add non-thread unique indexes on receipts would fail if they were previously interrupted. + From 5cc472940076eb69645da731080be7e0f22d8af7 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 9 Dec 2022 14:05:51 +0000 Subject: [PATCH 4/4] fixup: remove dead code (thanks dmr!) --- synapse/storage/databases/main/receipts.py | 33 ---------------------- 1 file changed, 33 deletions(-) diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 0a7a72f120dd..e06725f69c31 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -924,39 +924,6 @@ def _populate_receipt_event_stream_ordering_txn( return batch_size - async def _create_receipts_index(self, index_name: str, table: str) -> None: - """Adds a unique index on `(room_id, receipt_type, user_id)` to the given - receipts table, for non-thread receipts.""" - - def _create_index(conn: LoggingDatabaseConnection) -> None: - conn.rollback() - - # we have to set autocommit, because postgres refuses to - # CREATE INDEX CONCURRENTLY without it. - if isinstance(self.database_engine, PostgresEngine): - conn.set_session(autocommit=True) - - try: - c = conn.cursor() - - # Now that the duplicates are gone, we can create the index. - concurrently = ( - "CONCURRENTLY" - if isinstance(self.database_engine, PostgresEngine) - else "" - ) - sql = f""" - CREATE UNIQUE INDEX {concurrently} {index_name} - ON {table}(room_id, receipt_type, user_id) - WHERE thread_id IS NULL - """ - c.execute(sql) - finally: - if isinstance(self.database_engine, PostgresEngine): - conn.set_session(autocommit=False) - - await self.db_pool.runWithConnection(_create_index) - async def _background_receipts_linearized_unique_index( self, progress: dict, batch_size: int ) -> int: