From 444b231b4becd52898c71f33815785fe991423e6 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Thu, 13 Apr 2023 23:47:01 +0100 Subject: [PATCH 1/8] More precise type for LoggingTransaction.execute --- synapse/storage/database.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 226ccc1671ad..8fc13425f332 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -58,7 +58,7 @@ from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.background_updates import BackgroundUpdater from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine -from synapse.storage.types import Connection, Cursor +from synapse.storage.types import Connection, Cursor, _Parameters from synapse.util.async_helpers import delay_cancellation from synapse.util.iterutils import batch_iter @@ -394,8 +394,8 @@ def execute_values( sql, ) - def execute(self, sql: str, *args: Any) -> None: - self._do_execute(self.txn.execute, sql, *args) + def execute(self, sql: str, parameters: _Parameters = ()) -> None: + self._do_execute(self.txn.execute, sql, parameters) def executemany(self, sql: str, *args: Any) -> None: self._do_execute(self.txn.executemany, sql, *args) From 75a70af28fe2132f7b35b1e28a61c43483caf9aa Mon Sep 17 00:00:00 2001 From: David Robertson Date: Thu, 13 Apr 2023 23:57:36 +0100 Subject: [PATCH 2/8] Add an annotation for stream_ordering_month_ago --- .../databases/main/event_federation.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 2ad6fa7d5efe..ac19de183cb6 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -114,6 +114,10 @@ def __init__(self, room_id: str): class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBaseStore): + # TODO: this attribute comes from EventPushActionWorkerStore. Should we inherit from + # that store so that mypy can deduce this for itself? + stream_ordering_month_ago: Optional[int] + def __init__( self, database: DatabasePool, @@ -1182,8 +1186,8 @@ async def have_room_forward_extremities_changed_since( Throws a StoreError if we have since purged the index for stream_orderings from that point. """ - - if stream_ordering <= self.stream_ordering_month_ago: # type: ignore[attr-defined] + assert self.stream_ordering_month_ago is not None + if stream_ordering <= self.stream_ordering_month_ago: raise StoreError(400, f"stream_ordering too old {stream_ordering}") sql = """ @@ -1231,7 +1235,8 @@ async def get_forward_extremities_for_room_at_stream_ordering( # provided the last_change is recent enough, we now clamp the requested # stream_ordering to it. - if last_change > self.stream_ordering_month_ago: # type: ignore[attr-defined] + assert self.stream_ordering_month_ago is not None + if last_change > self.stream_ordering_month_ago: stream_ordering = min(last_change, stream_ordering) return await self._get_forward_extremeties_for_room(room_id, stream_ordering) @@ -1246,8 +1251,8 @@ async def _get_forward_extremeties_for_room( Throws a StoreError if we have since purged the index for stream_orderings from that point. """ - - if stream_ordering <= self.stream_ordering_month_ago: # type: ignore[attr-defined] + assert self.stream_ordering_month_ago is not None + if stream_ordering <= self.stream_ordering_month_ago: raise StoreError(400, "stream_ordering too old %s" % (stream_ordering,)) sql = """ @@ -1707,9 +1712,7 @@ def _delete_old_forward_extrem_cache_txn(txn: LoggingTransaction) -> None: DELETE FROM stream_ordering_to_exterm WHERE stream_ordering < ? """ - txn.execute( - sql, (self.stream_ordering_month_ago,) # type: ignore[attr-defined] - ) + txn.execute(sql, (self.stream_ordering_month_ago,)) await self.db_pool.runInteraction( "_delete_old_forward_extrem_cache", From f0863f735947d157e3a8083ff1449d9c4e69b304 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Fri, 14 Apr 2023 00:05:19 +0100 Subject: [PATCH 3/8] [DO NOT COMMIT] Revert "Add comma missing from #15382. (#15429)" This reverts commit 38272be03710f0675d7f73d15a8a9c4398619b68. This demonstartes that this PR would have spotted the underlying error. --- changelog.d/15429.misc | 1 - synapse/storage/databases/main/event_federation.py | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) delete mode 100644 changelog.d/15429.misc diff --git a/changelog.d/15429.misc b/changelog.d/15429.misc deleted file mode 100644 index c5b054d19e72..000000000000 --- a/changelog.d/15429.misc +++ /dev/null @@ -1 +0,0 @@ -Improve DB performance of clearing out old data from `stream_ordering_to_exterm`. diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index ac19de183cb6..4023dd8c43a8 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1712,7 +1712,7 @@ def _delete_old_forward_extrem_cache_txn(txn: LoggingTransaction) -> None: DELETE FROM stream_ordering_to_exterm WHERE stream_ordering < ? """ - txn.execute(sql, (self.stream_ordering_month_ago,)) + txn.execute(sql, self.stream_ordering_month_ago) await self.db_pool.runInteraction( "_delete_old_forward_extrem_cache", From b1f8cb6e9e13ee3ece792d4b3f76ad20ea96f600 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Fri, 14 Apr 2023 00:13:56 +0100 Subject: [PATCH 4/8] Undo the revert (missing comma is back for good) This reverts commit f0863f735947d157e3a8083ff1449d9c4e69b304. --- changelog.d/15429.misc | 1 + synapse/storage/databases/main/event_federation.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) create mode 100644 changelog.d/15429.misc diff --git a/changelog.d/15429.misc b/changelog.d/15429.misc new file mode 100644 index 000000000000..c5b054d19e72 --- /dev/null +++ b/changelog.d/15429.misc @@ -0,0 +1 @@ +Improve DB performance of clearing out old data from `stream_ordering_to_exterm`. diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 4023dd8c43a8..ac19de183cb6 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1712,7 +1712,7 @@ def _delete_old_forward_extrem_cache_txn(txn: LoggingTransaction) -> None: DELETE FROM stream_ordering_to_exterm WHERE stream_ordering < ? """ - txn.execute(sql, self.stream_ordering_month_ago) + txn.execute(sql, (self.stream_ordering_month_ago,)) await self.db_pool.runInteraction( "_delete_old_forward_extrem_cache", From c83dbb26c05dc1cf1edcb46310a7aa807225e46e Mon Sep 17 00:00:00 2001 From: David Robertson Date: Fri, 14 Apr 2023 00:56:07 +0100 Subject: [PATCH 5/8] Warning comments --- synapse/storage/database.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 8fc13425f332..93cf7aae96ab 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -370,11 +370,18 @@ def execute_batch(self, sql: str, args: Iterable[Iterable[Any]]) -> None: if isinstance(self.database_engine, PostgresEngine): from psycopg2.extras import execute_batch - + # TODO: is it safe for values to be Iterable[Iterable[Any]] here? + # https://www.psycopg.org/docs/extras.html?highlight=execute_batch#psycopg2.extras.execute_batch + # suggests each arg in args should be a sequence or mapping self._do_execute( lambda the_sql: execute_batch(self.txn, the_sql, args), sql ) else: + # TODO: is it safe for values to be Iterable[Iterable[Any]] here? + # https://docs.python.org/3/library/sqlite3.html?highlight=sqlite3#sqlite3.Cursor.executemany + # suggests that the outer collection may be iterable, but + # https://docs.python.org/3/library/sqlite3.html?highlight=sqlite3#how-to-use-placeholders-to-bind-values-in-sql-queries + # suggests that the inner collection should be a sequence or dict. self.executemany(sql, args) def execute_values( @@ -390,6 +397,8 @@ def execute_values( from psycopg2.extras import execute_values return self._do_execute( + # TODO: is it safe for values to be Iterable[Iterable[Any]] here? + # https://www.psycopg.org/docs/extras.html?highlight=execute_batch#psycopg2.extras.execute_values says values should be Sequence[Sequence] lambda the_sql: execute_values(self.txn, the_sql, values, fetch=fetch), sql, ) @@ -398,6 +407,10 @@ def execute(self, sql: str, parameters: _Parameters = ()) -> None: self._do_execute(self.txn.execute, sql, parameters) def executemany(self, sql: str, *args: Any) -> None: + # TODO: we should add a type for *args here. Looking at Cursor.executemany + # and DBAPI2 it ought to be Sequence[_Parameter], but we pass in + # Iterable[Iterable[Any]] in execute_batch and execute_values above, which mypy + # complains about. self._do_execute(self.txn.executemany, sql, *args) def executescript(self, sql: str) -> None: From 56a9f75362a9b30183ead4bd8eda4247b65569aa Mon Sep 17 00:00:00 2001 From: David Robertson Date: Fri, 14 Apr 2023 01:05:55 +0100 Subject: [PATCH 6/8] Changelog --- changelog.d/15432.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/15432.misc diff --git a/changelog.d/15432.misc b/changelog.d/15432.misc new file mode 100644 index 000000000000..93ceaeafc9b9 --- /dev/null +++ b/changelog.d/15432.misc @@ -0,0 +1 @@ +Improve type hints. From 162d8510a18f27d78b8c0d28c5d18eeb6c1e8166 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Fri, 14 Apr 2023 01:09:42 +0100 Subject: [PATCH 7/8] Oh go away isort --- synapse/storage/database.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 93cf7aae96ab..fd03441996d6 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -370,6 +370,7 @@ def execute_batch(self, sql: str, args: Iterable[Iterable[Any]]) -> None: if isinstance(self.database_engine, PostgresEngine): from psycopg2.extras import execute_batch + # TODO: is it safe for values to be Iterable[Iterable[Any]] here? # https://www.psycopg.org/docs/extras.html?highlight=execute_batch#psycopg2.extras.execute_batch # suggests each arg in args should be a sequence or mapping From 2672b7c6517f98870dec4ddb0754316ae3c47831 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Fri, 14 Apr 2023 18:27:46 +0100 Subject: [PATCH 8/8] Rename _Parameters -> SQLQueryParameters didn't want to use "QueryParameters" because that makes me think of `/urls?with=stuff&at=the&end` --- synapse/storage/database.py | 4 ++-- synapse/storage/types.py | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index fd03441996d6..1f5f5eb6f8c7 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -58,7 +58,7 @@ from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.background_updates import BackgroundUpdater from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine -from synapse.storage.types import Connection, Cursor, _Parameters +from synapse.storage.types import Connection, Cursor, SQLQueryParameters from synapse.util.async_helpers import delay_cancellation from synapse.util.iterutils import batch_iter @@ -404,7 +404,7 @@ def execute_values( sql, ) - def execute(self, sql: str, parameters: _Parameters = ()) -> None: + def execute(self, sql: str, parameters: SQLQueryParameters = ()) -> None: self._do_execute(self.txn.execute, sql, parameters) def executemany(self, sql: str, *args: Any) -> None: diff --git a/synapse/storage/types.py b/synapse/storage/types.py index 56a00485393d..34ac80753012 100644 --- a/synapse/storage/types.py +++ b/synapse/storage/types.py @@ -31,14 +31,14 @@ Some very basic protocol definitions for the DB-API2 classes specified in PEP-249 """ -_Parameters = Union[Sequence[Any], Mapping[str, Any]] +SQLQueryParameters = Union[Sequence[Any], Mapping[str, Any]] class Cursor(Protocol): - def execute(self, sql: str, parameters: _Parameters = ...) -> Any: + def execute(self, sql: str, parameters: SQLQueryParameters = ...) -> Any: ... - def executemany(self, sql: str, parameters: Sequence[_Parameters]) -> Any: + def executemany(self, sql: str, parameters: Sequence[SQLQueryParameters]) -> Any: ... def fetchone(self) -> Optional[Tuple]: