Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Remove redundant get_current_events_token #11643

Merged
merged 4 commits into from
Jan 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/11643.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Remove redundant `get_current_events_token` method.
2 changes: 1 addition & 1 deletion synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -1838,7 +1838,7 @@ async def persist_events_and_notify(
The stream ID after which all events have been persisted.
"""
if not event_and_contexts:
return self._store.get_current_events_token()
return self._store.get_room_max_stream_ordering()

instance = self._config.worker.events_shard_config.get_instance(room_id)
if instance != self._instance_name:
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ def run_persister() -> Awaitable[None]:

# Presence is best effort and quickly heals itself, so lets just always
# stream from the current state when we restart.
self._event_pos = self.store.get_current_events_token()
self._event_pos = self.store.get_room_max_stream_ordering()
self._event_processing = False

async def _on_shutdown(self) -> None:
Expand Down
9 changes: 0 additions & 9 deletions synapse/replication/slave/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,3 @@ def __init__(
min_curr_state_delta_id,
prefilled_cache=curr_state_delta_prefill,
)

# Cached functions can't be accessed through a class instance so we need
# to reach inside the __dict__ to extract them.

def get_room_max_stream_ordering(self):
return self._stream_id_gen.get_current_token()

def get_room_min_stream_ordering(self):
return self._backfill_id_gen.get_current_token()
4 changes: 2 additions & 2 deletions synapse/storage/databases/main/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
from .signatures import SignatureStore
from .state import StateStore
from .stats import StatsStore
from .stream import StreamStore
from .stream import StreamWorkerStore
from .tags import TagsStore
from .transactions import TransactionWorkerStore
from .ui_auth import UIAuthStore
Expand All @@ -87,7 +87,7 @@ class DataStore(
RoomStore,
RoomBatchStore,
RegistrationStore,
StreamStore,
StreamWorkerStore,
ProfileStore,
PresenceStore,
TransactionWorkerStore,
Expand Down
4 changes: 0 additions & 4 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1383,10 +1383,6 @@ async def get_room_complexity(self, room_id: str) -> Dict[str, float]:

return {"v1": complexity_v1}

def get_current_events_token(self) -> int:
"""The current maximum token that events have reached"""
return self._stream_id_gen.get_current_token()

async def get_all_new_forward_event_rows(
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> List[Tuple[int, str, str, str, str, str, str, str, str]]:
Expand Down
34 changes: 15 additions & 19 deletions synapse/storage/databases/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
- topological tokems: "t%d-%d", where the integers map to the topological
and stream ordering columns respectively.
"""
import abc

import logging
from collections import namedtuple
from typing import TYPE_CHECKING, Collection, Dict, List, Optional, Set, Tuple
Expand Down Expand Up @@ -334,12 +334,7 @@ def filter_to_clause(event_filter: Optional[Filter]) -> Tuple[str, List[str]]:
return " AND ".join(clauses), args


class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
"""This is an abstract base class where subclasses must implement
`get_room_max_stream_ordering` and `get_room_min_stream_ordering`
which can be called in the initializer.
"""

class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
def __init__(
self,
database: DatabasePool,
Expand Down Expand Up @@ -377,13 +372,22 @@ def __init__(

self._stream_order_on_start = self.get_room_max_stream_ordering()

@abc.abstractmethod
def get_room_max_stream_ordering(self) -> int:
raise NotImplementedError()
"""Get the stream_ordering of regular events that we have committed up to

Returns the maximum stream id such that all stream ids less than or
equal to it have been successfully persisted.
"""
return self._stream_id_gen.get_current_token()

@abc.abstractmethod
def get_room_min_stream_ordering(self) -> int:
raise NotImplementedError()
"""Get the stream_ordering of backfilled events that we have committed up to

Backfilled events use *negative* stream orderings, so this returns the
minimum negative stream id such that all stream ids greater than or
equal to it have been successfully persisted.
"""
return self._backfill_id_gen.get_current_token()

def get_room_max_token(self) -> RoomStreamToken:
"""Get a `RoomStreamToken` that marks the current maximum persisted
Expand Down Expand Up @@ -1349,11 +1353,3 @@ async def get_name_from_instance_id(self, instance_id: int) -> str:
retcol="instance_name",
desc="get_name_from_instance_id",
)


class StreamStore(StreamWorkerStore):
def get_room_max_stream_ordering(self) -> int:
return self._stream_id_gen.get_current_token()

def get_room_min_stream_ordering(self) -> int:
return self._backfill_id_gen.get_current_token()