From 121bb890f343db4c14e479fea523f3288c9cabc5 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 2 Mar 2023 13:48:05 -0500 Subject: [PATCH 1/2] Combine AbstractStreamIdTracker and AbstractStreamIdGenerator. --- synapse/storage/databases/main/devices.py | 7 ++----- synapse/storage/databases/main/events_worker.py | 5 ++--- synapse/storage/databases/main/push_rule.py | 3 +-- synapse/storage/databases/main/pusher.py | 3 +-- synapse/storage/databases/main/receipts.py | 6 +++--- synapse/storage/util/id_generators.py | 17 +++++------------ 6 files changed, 14 insertions(+), 27 deletions(-) diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 0dd15f16ff7b..5503621ad613 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -52,7 +52,6 @@ from synapse.storage.types import Cursor from synapse.storage.util.id_generators import ( AbstractStreamIdGenerator, - AbstractStreamIdTracker, StreamIdGenerator, ) from synapse.types import JsonDict, StrCollection, get_verify_key_from_cross_signing_key @@ -91,7 +90,7 @@ def __init__( # In the worker store this is an ID tracker which we overwrite in the non-worker # class below that is used on the main process. - self._device_list_id_gen: AbstractStreamIdTracker = StreamIdGenerator( + self._device_list_id_gen = StreamIdGenerator( db_conn, hs.get_replication_notifier(), "device_lists_stream", @@ -712,9 +711,7 @@ async def add_user_signature_change_to_streams( The new stream ID. """ - # TODO: this looks like it's _writing_. Should this be on DeviceStore rather - # than DeviceWorkerStore? - async with self._device_list_id_gen.get_next() as stream_id: # type: ignore[attr-defined] + async with self._device_list_id_gen.get_next() as stream_id: await self.db_pool.runInteraction( "add_user_sig_change_to_streams", self._add_user_signature_change_txn, diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index b7e74981256c..20b7a683622c 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -72,7 +72,6 @@ from synapse.storage.types import Cursor from synapse.storage.util.id_generators import ( AbstractStreamIdGenerator, - AbstractStreamIdTracker, MultiWriterIdGenerator, StreamIdGenerator, ) @@ -187,8 +186,8 @@ def __init__( ): super().__init__(database, db_conn, hs) - self._stream_id_gen: AbstractStreamIdTracker - self._backfill_id_gen: AbstractStreamIdTracker + self._stream_id_gen: AbstractStreamIdGenerator + self._backfill_id_gen: AbstractStreamIdGenerator if isinstance(database.engine, PostgresEngine): # If we're using Postgres than we can use `MultiWriterIdGenerator` # regardless of whether this process writes to the streams or not. diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py index 9b2bbe060ddc..9f862f00c1c5 100644 --- a/synapse/storage/databases/main/push_rule.py +++ b/synapse/storage/databases/main/push_rule.py @@ -46,7 +46,6 @@ from synapse.storage.push_rule import InconsistentRuleException, RuleNotFoundException from synapse.storage.util.id_generators import ( AbstractStreamIdGenerator, - AbstractStreamIdTracker, IdGenerator, StreamIdGenerator, ) @@ -118,7 +117,7 @@ def __init__( # In the worker store this is an ID tracker which we overwrite in the non-worker # class below that is used on the main process. - self._push_rules_stream_id_gen: AbstractStreamIdTracker = StreamIdGenerator( + self._push_rules_stream_id_gen = StreamIdGenerator( db_conn, hs.get_replication_notifier(), "push_rules_stream", diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py index fddbc07afa52..9a24f7a65526 100644 --- a/synapse/storage/databases/main/pusher.py +++ b/synapse/storage/databases/main/pusher.py @@ -36,7 +36,6 @@ ) from synapse.storage.util.id_generators import ( AbstractStreamIdGenerator, - AbstractStreamIdTracker, StreamIdGenerator, ) from synapse.types import JsonDict @@ -60,7 +59,7 @@ def __init__( # In the worker store this is an ID tracker which we overwrite in the non-worker # class below that is used on the main process. - self._pushers_id_gen: AbstractStreamIdTracker = StreamIdGenerator( + self._pushers_id_gen = StreamIdGenerator( db_conn, hs.get_replication_notifier(), "pushers", diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 92a82240ab4c..074942b16785 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -39,7 +39,7 @@ from synapse.storage.engines import PostgresEngine from synapse.storage.engines._base import IsolationLevel from synapse.storage.util.id_generators import ( - AbstractStreamIdTracker, + AbstractStreamIdGenerator, MultiWriterIdGenerator, StreamIdGenerator, ) @@ -65,7 +65,7 @@ def __init__( # In the worker store this is an ID tracker which we overwrite in the non-worker # class below that is used on the main process. - self._receipts_id_gen: AbstractStreamIdTracker + self._receipts_id_gen: AbstractStreamIdGenerator if isinstance(database.engine, PostgresEngine): self._can_write_to_receipts = ( @@ -768,7 +768,7 @@ async def insert_receipt( "insert_receipt_conv", self._graph_to_linear, room_id, event_ids ) - async with self._receipts_id_gen.get_next() as stream_id: # type: ignore[attr-defined] + async with self._receipts_id_gen.get_next() as stream_id: event_ts = await self.db_pool.runInteraction( "insert_linearized_receipt", self._insert_linearized_receipt_txn, diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index 334d3d718b4b..d2c874b9a8d7 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -93,8 +93,11 @@ def _load_current_id( return res -class AbstractStreamIdTracker(metaclass=abc.ABCMeta): - """Tracks the "current" stream ID of a stream that may have multiple writers. +class AbstractStreamIdGenerator(metaclass=abc.ABCMeta): + """Generates or tracks stream IDs for a stream that may have multiple writers. + + Each stream ID represents a write transaction, whose completion is tracked + so that the "current" stream ID of the stream can be determined. Stream IDs are monotonically increasing or decreasing integers representing write transactions. The "current" stream ID is the stream ID such that all transactions @@ -130,16 +133,6 @@ def get_current_token_for_writer(self, instance_name: str) -> int: """ raise NotImplementedError() - -class AbstractStreamIdGenerator(AbstractStreamIdTracker): - """Generates stream IDs for a stream that may have multiple writers. - - Each stream ID represents a write transaction, whose completion is tracked - so that the "current" stream ID of the stream can be determined. - - See `AbstractStreamIdTracker` for more details. - """ - @abc.abstractmethod def get_next(self) -> AsyncContextManager[int]: """ From 79cfa863750e417a56c0393168730049f87ac9f6 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 2 Mar 2023 13:49:37 -0500 Subject: [PATCH 2/2] Newsfragment --- changelog.d/15192.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/15192.misc diff --git a/changelog.d/15192.misc b/changelog.d/15192.misc new file mode 100644 index 000000000000..107668687583 --- /dev/null +++ b/changelog.d/15192.misc @@ -0,0 +1 @@ +Combine `AbstractStreamIdTracker` and `AbstractStreamIdGenerator`.