This repository has been archived by the owner on Apr 26, 2024. It is now read-only.
-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Add type hints to synapse/storage/databases/main
#11984
Merged
Merged
Changes from 3 commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
6c8bcf2
Add type hints to `synapse/storage/databases/main`
dklimpel 43846dc
newsfile
dklimpel e3c90b7
try to fix errors
dklimpel 0c8d4af
rework `self._instance_name`
dklimpel bf4384c
rework `current_state_for_users`
dklimpel 46be186
Apply suggestions from code review
dklimpel 10ef203
Apply suggestions from code review
dklimpel File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Add missing type hints to storage classes. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,15 +12,23 @@ | |
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
from typing import TYPE_CHECKING, Dict, Iterable, List, Tuple | ||
from typing import TYPE_CHECKING, Dict, Iterable, List, Tuple, cast | ||
|
||
from synapse.api.presence import PresenceState, UserPresenceState | ||
from synapse.replication.tcp.streams import PresenceStream | ||
from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause | ||
from synapse.storage.database import DatabasePool, LoggingDatabaseConnection | ||
from synapse.storage.database import ( | ||
DatabasePool, | ||
LoggingDatabaseConnection, | ||
LoggingTransaction, | ||
) | ||
from synapse.storage.engines import PostgresEngine | ||
from synapse.storage.types import Connection | ||
from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator | ||
from synapse.storage.util.id_generators import ( | ||
AbstractStreamIdGenerator, | ||
MultiWriterIdGenerator, | ||
StreamIdGenerator, | ||
) | ||
from synapse.util.caches.descriptors import cached, cachedList | ||
from synapse.util.caches.stream_change_cache import StreamChangeCache | ||
from synapse.util.iterutils import batch_iter | ||
|
@@ -35,7 +43,7 @@ def __init__( | |
database: DatabasePool, | ||
db_conn: LoggingDatabaseConnection, | ||
hs: "HomeServer", | ||
): | ||
) -> None: | ||
super().__init__(database, db_conn, hs) | ||
|
||
# Used by `PresenceStore._get_active_presence()` | ||
|
@@ -54,19 +62,21 @@ def __init__( | |
database: DatabasePool, | ||
db_conn: LoggingDatabaseConnection, | ||
hs: "HomeServer", | ||
): | ||
) -> None: | ||
super().__init__(database, db_conn, hs) | ||
|
||
self._can_persist_presence = ( | ||
hs.get_instance_name() in hs.config.worker.writers.presence | ||
) | ||
|
||
self._presence_id_gen: AbstractStreamIdGenerator | ||
|
||
if isinstance(database.engine, PostgresEngine): | ||
self._presence_id_gen = MultiWriterIdGenerator( | ||
db_conn=db_conn, | ||
db=database, | ||
stream_name="presence_stream", | ||
instance_name=self._instance_name, | ||
instance_name=self._instance_name, # type: ignore[attr-defined] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd be sorely tempted to do a |
||
tables=[("presence_stream", "instance_name", "stream_id")], | ||
sequence_name="presence_stream_sequence", | ||
writers=hs.config.worker.writers.presence, | ||
|
@@ -109,7 +119,9 @@ async def update_presence(self, presence_states) -> Tuple[int, int]: | |
|
||
return stream_orderings[-1], self._presence_id_gen.get_current_token() | ||
|
||
def _update_presence_txn(self, txn, stream_orderings, presence_states): | ||
def _update_presence_txn( | ||
self, txn: LoggingTransaction, stream_orderings, presence_states | ||
) -> None: | ||
for stream_id, state in zip(stream_orderings, presence_states): | ||
txn.call_after( | ||
self.presence_stream_cache.entity_has_changed, state.user_id, stream_id | ||
|
@@ -150,7 +162,7 @@ def _update_presence_txn(self, txn, stream_orderings, presence_states): | |
state.last_user_sync_ts, | ||
state.status_msg, | ||
state.currently_active, | ||
self._instance_name, | ||
self._instance_name, # type: ignore[attr-defined] | ||
) | ||
for stream_id, state in zip(stream_orderings, presence_states) | ||
], | ||
|
@@ -183,19 +195,23 @@ async def get_all_presence_updates( | |
if last_id == current_id: | ||
return [], current_id, False | ||
|
||
def get_all_presence_updates_txn(txn): | ||
def get_all_presence_updates_txn( | ||
txn: LoggingTransaction, | ||
) -> Tuple[List[Tuple[int, list]], int, bool]: | ||
sql = """ | ||
SELECT stream_id, user_id, state, last_active_ts, | ||
last_federation_update_ts, last_user_sync_ts, | ||
status_msg, | ||
currently_active | ||
status_msg, currently_active | ||
FROM presence_stream | ||
WHERE ? < stream_id AND stream_id <= ? | ||
ORDER BY stream_id ASC | ||
LIMIT ? | ||
""" | ||
txn.execute(sql, (last_id, current_id, limit)) | ||
updates = [(row[0], row[1:]) for row in txn] | ||
updates = cast( | ||
List[Tuple[int, list]], | ||
[(row[0], row[1:]) for row in txn], | ||
) | ||
|
||
upper_bound = current_id | ||
limited = False | ||
|
@@ -210,15 +226,17 @@ def get_all_presence_updates_txn(txn): | |
) | ||
|
||
@cached() | ||
def _get_presence_for_user(self, user_id): | ||
def _get_presence_for_user(self, user_id: str) -> None: | ||
raise NotImplementedError() | ||
|
||
@cachedList( | ||
cached_method_name="_get_presence_for_user", | ||
list_name="user_ids", | ||
num_args=1, | ||
) | ||
async def get_presence_for_users(self, user_ids): | ||
async def get_presence_for_users( | ||
self, user_ids: Iterable[str] | ||
) -> Dict[str, UserPresenceState]: | ||
rows = await self.db_pool.simple_select_many_batch( | ||
table="presence_stream", | ||
column="user_id", | ||
|
@@ -257,7 +275,9 @@ async def should_user_receive_full_presence_with_token( | |
True if the user should have full presence sent to them, False otherwise. | ||
""" | ||
|
||
def _should_user_receive_full_presence_with_token_txn(txn): | ||
def _should_user_receive_full_presence_with_token_txn( | ||
txn: LoggingTransaction, | ||
) -> bool: | ||
sql = """ | ||
SELECT 1 FROM users_to_send_full_presence_to | ||
WHERE user_id = ? | ||
|
@@ -271,7 +291,7 @@ def _should_user_receive_full_presence_with_token_txn(txn): | |
_should_user_receive_full_presence_with_token_txn, | ||
) | ||
|
||
async def add_users_to_send_full_presence_to(self, user_ids: Iterable[str]): | ||
async def add_users_to_send_full_presence_to(self, user_ids: Iterable[str]) -> None: | ||
"""Adds to the list of users who should receive a full snapshot of presence | ||
upon their next sync. | ||
|
||
|
@@ -353,10 +373,10 @@ async def get_presence_for_all_users( | |
|
||
return users_to_state | ||
|
||
def get_current_presence_token(self): | ||
def get_current_presence_token(self) -> int: | ||
return self._presence_id_gen.get_current_token() | ||
|
||
def _get_active_presence(self, db_conn: Connection): | ||
def _get_active_presence(self, db_conn: Connection) -> List[UserPresenceState]: | ||
"""Fetch non-offline presence from the database so that we can register | ||
the appropriate time outs. | ||
""" | ||
|
@@ -379,12 +399,12 @@ def _get_active_presence(self, db_conn: Connection): | |
|
||
return [UserPresenceState(**row) for row in rows] | ||
|
||
def take_presence_startup_info(self): | ||
def take_presence_startup_info(self) -> List[UserPresenceState]: | ||
active_on_startup = self._presence_on_startup | ||
self._presence_on_startup = None | ||
self._presence_on_startup = [] | ||
return active_on_startup | ||
|
||
def process_replication_rows(self, stream_name, instance_name, token, rows): | ||
def process_replication_rows(self, stream_name, instance_name, token, rows) -> None: | ||
if stream_name == PresenceStream.NAME: | ||
self._presence_id_gen.advance(instance_name, token) | ||
for row in rows: | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not surprised, given the code as it stands is convoluted. TBH I'd be tempted to rewrite it as something like:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hopefully it's better now. The database query for a single user is not implemented
_get_presence_for_user
. My thoughts were that callingget_presence_for_users
for every single user becomes a lot of overhead.The loops are getting smaller now and perhaps increase the performance.