diff --git a/changelog.d/11643.misc b/changelog.d/11643.misc new file mode 100644 index 000000000000..1c3b3071f66a --- /dev/null +++ b/changelog.d/11643.misc @@ -0,0 +1 @@ +Remove redundant `get_current_events_token` method. diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 9917613298c6..d08e48da58b4 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -1838,7 +1838,7 @@ async def persist_events_and_notify( The stream ID after which all events have been persisted. """ if not event_and_contexts: - return self._store.get_current_events_token() + return self._store.get_room_max_stream_ordering() instance = self._config.worker.events_shard_config.get_instance(room_id) if instance != self._instance_name: diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 454d06c9733d..c781fefb1bc7 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -729,7 +729,7 @@ def run_persister() -> Awaitable[None]: # Presence is best effort and quickly heals itself, so lets just always # stream from the current state when we restart. - self._event_pos = self.store.get_current_events_token() + self._event_pos = self.store.get_room_max_stream_ordering() self._event_processing = False async def _on_shutdown(self) -> None: diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 50e7379e8301..0f0837269486 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -80,12 +80,3 @@ def __init__( min_curr_state_delta_id, prefilled_cache=curr_state_delta_prefill, ) - - # Cached functions can't be accessed through a class instance so we need - # to reach inside the __dict__ to extract them. - - def get_room_max_stream_ordering(self): - return self._stream_id_gen.get_current_token() - - def get_room_min_stream_ordering(self): - return self._backfill_id_gen.get_current_token() diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py index a594223fc6ca..f024761ba7b8 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py @@ -68,7 +68,7 @@ from .signatures import SignatureStore from .state import StateStore from .stats import StatsStore -from .stream import StreamStore +from .stream import StreamWorkerStore from .tags import TagsStore from .transactions import TransactionWorkerStore from .ui_auth import UIAuthStore @@ -87,7 +87,7 @@ class DataStore( RoomStore, RoomBatchStore, RegistrationStore, - StreamStore, + StreamWorkerStore, ProfileStore, PresenceStore, TransactionWorkerStore, diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index c7b660ac5a6f..8d4287045a8b 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1383,10 +1383,6 @@ async def get_room_complexity(self, room_id: str) -> Dict[str, float]: return {"v1": complexity_v1} - def get_current_events_token(self) -> int: - """The current maximum token that events have reached""" - return self._stream_id_gen.get_current_token() - async def get_all_new_forward_event_rows( self, instance_name: str, last_id: int, current_id: int, limit: int ) -> List[Tuple[int, str, str, str, str, str, str, str, str]]: diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 9488fd509463..dbf3ab2f934f 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -34,7 +34,7 @@ - topological tokems: "t%d-%d", where the integers map to the topological and stream ordering columns respectively. """ -import abc + import logging from collections import namedtuple from typing import TYPE_CHECKING, Collection, Dict, List, Optional, Set, Tuple @@ -334,12 +334,7 @@ def filter_to_clause(event_filter: Optional[Filter]) -> Tuple[str, List[str]]: return " AND ".join(clauses), args -class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta): - """This is an abstract base class where subclasses must implement - `get_room_max_stream_ordering` and `get_room_min_stream_ordering` - which can be called in the initializer. - """ - +class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): def __init__( self, database: DatabasePool, @@ -377,13 +372,22 @@ def __init__( self._stream_order_on_start = self.get_room_max_stream_ordering() - @abc.abstractmethod def get_room_max_stream_ordering(self) -> int: - raise NotImplementedError() + """Get the stream_ordering of regular events that we have committed up to + + Returns the maximum stream id such that all stream ids less than or + equal to it have been successfully persisted. + """ + return self._stream_id_gen.get_current_token() - @abc.abstractmethod def get_room_min_stream_ordering(self) -> int: - raise NotImplementedError() + """Get the stream_ordering of backfilled events that we have committed up to + + Backfilled events use *negative* stream orderings, so this returns the + minimum negative stream id such that all stream ids greater than or + equal to it have been successfully persisted. + """ + return self._backfill_id_gen.get_current_token() def get_room_max_token(self) -> RoomStreamToken: """Get a `RoomStreamToken` that marks the current maximum persisted @@ -1349,11 +1353,3 @@ async def get_name_from_instance_id(self, instance_id: int) -> str: retcol="instance_name", desc="get_name_from_instance_id", ) - - -class StreamStore(StreamWorkerStore): - def get_room_max_stream_ordering(self) -> int: - return self._stream_id_gen.get_current_token() - - def get_room_min_stream_ordering(self) -> int: - return self._backfill_id_gen.get_current_token()