From 44b036779eb959337ce5c1146eb04b0b654291d1 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 30 Aug 2019 14:32:48 +0100 Subject: [PATCH 01/10] Add stats regenerator --- synapse/storage/stats.py | 494 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 491 insertions(+), 3 deletions(-) diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 9d6c3027d5d8..58aa8a7e4998 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -17,6 +17,12 @@ import logging from itertools import chain +from synapse.storage.prepare_database import get_statements + +from synapse.storage.engines import Sqlite3Engine +from twisted.internet import defer + +from synapse.api.constants import Membership, EventTypes from twisted.internet.defer import DeferredLock from synapse.storage import PostgresEngine @@ -49,6 +55,9 @@ TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")} +# these are the tables which contain our actual subjects +TYPE_TO_ORIGIN_TABLE = {"room": "rooms", "user": "users"} + class StatsStore(StateDeltasStore): def __init__(self, db_conn, hs): @@ -61,8 +70,18 @@ def __init__(self, db_conn, hs): self.stats_delta_processing_lock = DeferredLock() - self.register_noop_background_update("populate_stats_createtables") - self.register_noop_background_update("populate_stats_process_rooms") + self.register_background_update_handler( + "populate_stats_prepare", self._populate_stats_prepare + ) + self.register_background_update_handler( + "populate_stats_process_rooms", self._populate_stats_process_rooms + ) + self.register_background_update_handler( + "populate_stats_process_users", self._populate_stats_process_users + ) + # we no longer need to perform clean-up, but we will give ourselves + # the potential to reintroduce it in the future – so documentation + # will still encourage the use of this no-op handler. self.register_noop_background_update("populate_stats_cleanup") def quantise_stats_time(self, ts): @@ -81,6 +100,442 @@ def quantise_stats_time(self, ts): """ return (ts // self.stats_bucket_size) * self.stats_bucket_size + @defer.inlineCallbacks + def _unwedge_incremental_processor(self, forced_promise): + """ + Make a promise about what this stats regeneration will handle, + so that we can allow the incremental processor to start doing things + right away – 'unwedging' it. + + Args: + forced_promise (dict of positions): + If supplied, this is the promise that is made. + Otherwise, a promise is made that reduces the amount of work + that must be performed by the incremental processor. + """ + + if forced_promise is None: + promised_stats_delta_pos = ( + yield self.get_max_stream_id_in_current_state_deltas() + ) + promised_max = self.get_room_max_stream_ordering() + promised_min = self.get_room_min_stream_ordering() + + promised_positions = { + "state_delta_stream_id": promised_stats_delta_pos, + "total_events_min_stream_ordering": promised_min, + "total_events_max_stream_ordering": promised_max, + } + else: + promised_positions = forced_promise + + # this stores it for our reference later + yield self.update_stats_positions( + promised_positions, for_initial_processor=True + ) + + # this unwedges the incremental processor + yield self.update_stats_positions( + promised_positions, for_initial_processor=False + ) + + # with the delta processor unwedged, now let it catch up in case + # anything was missed during the wedge period + self.clock.call_later(0, self.hs.get_stats_handler().notify_new_event) + + @defer.inlineCallbacks + def _populate_stats_prepare(self, progress, batch_size): + """ + This is a background update, which prepares the database for + statistics regeneration. + """ + + if not self.stats_enabled: + yield self._end_background_update("populate_stats_prepare") + return 1 + + def _wedge_incremental_processor(txn): + """ + Wedge the incremental processor (by setting its positions to NULL), + and return its previous positions – atomically. + """ + + with self.stats_delta_processing_lock: + old = self._get_stats_positions_txn(txn, for_initial_processor=False) + self._update_stats_positions_txn(txn, None, for_initial_processor=False) + + return old + + def _make_skeletons(txn, stats_type): + """ + Get all the rooms and users that we want to process, and create + 'skeletons' (incomplete _stats_current rows) for them, if they do + not already have a row. + """ + + if isinstance(self.database_engine, Sqlite3Engine): + sql = """ + INSERT OR IGNORE INTO %(table)s_current + (%(id_col)s, completed_delta_stream_id, %(zero_cols)s) + SELECT %(id_col)s, NULL, %(zeroes)s FROM %(origin_table)s + """ + else: + sql = """ + INSERT INTO %(table)s_current + (%(id_col)s, completed_delta_stream_id, %(zero_cols)s) + SELECT %(id_col)s, NULL, %(zeroes)s FROM %(origin_table)s + ON CONFLICT DO NOTHING + """ + + table, id_col = TYPE_TO_TABLE[stats_type] + origin_table = TYPE_TO_ORIGIN_TABLE[stats_type] + zero_cols = list(chain(ABSOLUTE_STATS_FIELDS[stats_type], PER_SLICE_FIELDS[stats_type])) + + txn.execute(sql % { + "table": table, + "id_col": id_col, + "origin_table": origin_table, + "zero_cols": zero_cols, + "zeroes": ", ".join(["0"] * len(zero_cols)) + }) + + def _delete_dirty_skeletons(txn): + """ + Delete pre-existing rows which are incomplete. + """ + sql = """ + DELETE FROM %s_current + WHERE completed_delta_stream_id IS NULL + """ + + for _k, (table, id_col) in TYPE_TO_TABLE: + txn.execute(sql % (table,)) + + # first wedge the incremental processor and reset our promise + old_positions = yield self.runInteraction( + "populate_stats_wedge", _wedge_incremental_processor + ) + + if None in old_positions.values(): + old_positions = None + + # with the incremental processor wedged, we delete dirty skeleton rows + # since we don't want to double-count them. + yield self.runInteraction( + "populate_stats_delete_dirty_skeletons", _delete_dirty_skeletons + ) + + yield self._unwedge_incremental_processor(old_positions) + + yield self.runInteraction("populate_stats_make_skeletons", _make_skeletons) + self.get_earliest_token_for_stats.invalidate_all() + + yield self._end_background_update("populate_stats_prepare") + return 1 + + @defer.inlineCallbacks + def _populate_stats_process_users(self, progress, batch_size): + """ + This is a background update which regenerates statistics for users. + """ + if not self.stats_enabled: + yield self._end_background_update("populate_stats_process_users") + return 1 + + def _get_next_batch(txn): + # Only fetch 250 users, so we don't fetch too many at once, even + # if those 250 users have less than batch_size state events. + sql = """ + SELECT user_id FROM user_stats_current + WHERE completed_delta_stream_id IS NULL + LIMIT 250 + """ + txn.execute(sql) + users_to_work_on = txn.fetchall() + + if not users_to_work_on: + return None + + # Get how many are left to process, so we can give status on how + # far we are in processing + txn.execute( + "SELECT COUNT(*) FROM room_stats_current" + " WHERE completed_delta_stream_id IS NULL" + ) + progress["remaining"] = txn.fetchone()[0] + + return users_to_work_on + + users_to_work_on = yield self.runInteraction( + "populate_stats_users_get_batch", _get_next_batch + ) + + # No more users -- complete the transaction. + if not users_to_work_on: + yield self._end_background_update("populate_stats_process_users") + return 1 + + logger.info( + "Processing the next %d users of %d remaining", + len(users_to_work_on), + progress["remaining"], + ) + + processed_membership_count = 0 + + promised_positions = yield self.get_stats_positions(for_initial_processor=True) + + if None in promised_positions: + logger.error( + "There is a None in promised_positions;" + " dependency task must not have been run." + " promised_positions: %r", + promised_positions, + ) + yield self._end_background_update("populate_stats_process_users") + return 1 + + for (user_id,) in users_to_work_on: + now = self.hs.get_reactor().seconds() + + def _process_user(txn): + # Get the current token + current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn) + + sql = """ + SELECT + ( + join_rules = 'public' + OR history_visibility = 'world_readable' + ) AS is_public, + COUNT(*) AS count + FROM room_memberships + JOIN room_state USING (room_id) + WHERE + user_id = ? AND membership = 'join' + GROUP BY is_public + """ + txn.execute(sql, (user_id,)) + room_counts_by_publicness = dict(txn.fetchall()) + + self._update_stats_delta_txn( + txn, + now, + "user", + user_id, + {}, + complete_with_stream_id=current_token, + absolute_field_overrides={ + # these are counted absolutely because it is + # more difficult to count them from the promised time, + # because counting them now can use the quick lookup + # tables. + "public_rooms": room_counts_by_publicness.get(True, 0), + "private_rooms": room_counts_by_publicness.get(False, 0), + }, + ) + + # we use this count for rate-limiting + return sum(room_counts_by_publicness.values()) + + processed_membership_count += yield self.runInteraction( + "update_user_stats", _process_user + ) + + # Update the remaining counter. + progress["remaining"] -= 1 + + if processed_membership_count > batch_size: + # Don't process any more users, we've hit our batch size. + return processed_membership_count + + yield self.runInteraction( + "populate_stats", + self._background_update_progress_txn, + "populate_stats_process_users", + progress, + ) + + return processed_membership_count + + @defer.inlineCallbacks + def _populate_stats_process_rooms(self, progress, batch_size): + """ + This is a background update which regenerates statistics for rooms. + """ + if not self.stats_enabled: + yield self._end_background_update("populate_stats_process_rooms") + return 1 + + def _get_next_batch(txn): + # Only fetch 250 rooms, so we don't fetch too many at once, even + # if those 250 rooms have less than batch_size state events. + sql = """ + SELECT room_id FROM room_stats_current + WHERE completed_delta_stream_id IS NULL + LIMIT 250 + """ + txn.execute(sql) + rooms_to_work_on = txn.fetchall() + + if not rooms_to_work_on: + return None + + # Get how many are left to process, so we can give status on how + # far we are in processing + txn.execute( + "SELECT COUNT(*) FROM room_stats_current" + " WHERE completed_delta_stream_id IS NULL" + ) + progress["remaining"] = txn.fetchone()[0] + + return rooms_to_work_on + + rooms_to_work_on = yield self.runInteraction( + "populate_stats_rooms_get_batch", _get_next_batch + ) + + # No more rooms -- complete the transaction. + if not rooms_to_work_on: + yield self._end_background_update("populate_stats_process_rooms") + return 1 + + logger.info( + "Processing the next %d rooms of %d remaining", + len(rooms_to_work_on), + progress["remaining"], + ) + + # Number of state events we've processed by going through each room + processed_event_count = 0 + + promised_positions = yield self.get_stats_positions(for_initial_processor=True) + + if None in promised_positions: + logger.error( + "There is a None in promised_positions;" + " dependency task must not have been run." + " promised_positions: %s", + promised_positions, + ) + yield self._end_background_update("populate_stats_process_rooms") + return 1 + + for (room_id,) in rooms_to_work_on: + current_state_ids = yield self.get_current_state_ids(room_id) + + join_rules_id = current_state_ids.get((EventTypes.JoinRules, "")) + history_visibility_id = current_state_ids.get( + (EventTypes.RoomHistoryVisibility, "") + ) + encryption_id = current_state_ids.get((EventTypes.RoomEncryption, "")) + name_id = current_state_ids.get((EventTypes.Name, "")) + topic_id = current_state_ids.get((EventTypes.Topic, "")) + avatar_id = current_state_ids.get((EventTypes.RoomAvatar, "")) + canonical_alias_id = current_state_ids.get((EventTypes.CanonicalAlias, "")) + + event_ids = [ + join_rules_id, + history_visibility_id, + encryption_id, + name_id, + topic_id, + avatar_id, + canonical_alias_id, + ] + + state_events = yield self.get_events( + [ev for ev in event_ids if ev is not None] + ) + + def _get_or_none(event_id, arg): + event = state_events.get(event_id) + if event: + return event.content.get(arg) + return None + + yield self.update_room_state( + room_id, + { + "join_rules": _get_or_none(join_rules_id, "join_rule"), + "history_visibility": _get_or_none( + history_visibility_id, "history_visibility" + ), + "encryption": _get_or_none(encryption_id, "algorithm"), + "name": _get_or_none(name_id, "name"), + "topic": _get_or_none(topic_id, "topic"), + "avatar": _get_or_none(avatar_id, "url"), + "canonical_alias": _get_or_none(canonical_alias_id, "alias"), + }, + ) + + now = self.clock.time_msec() + + def _fetch_data(txn): + # Get the current token of the room + current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn) + + current_state_events = len(current_state_ids) + + membership_counts = self._get_user_counts_in_room_txn(txn, room_id) + + room_total_event_count, room_total_event_bytes = self._count_events_and_bytes_in_room_txn( + txn, + room_id, + promised_positions["total_events_min_stream_ordering"], + promised_positions["total_events_max_stream_ordering"], + ) + + self._update_stats_delta_txn( + txn, + now, + "room", + room_id, + { + "total_events": room_total_event_count, + "total_event_bytes": room_total_event_bytes + }, + complete_with_stream_id=current_token, + absolute_field_overrides={ + # these are counted absolutely because it is + # more difficult to count them from the promised time, + # because counting them now can use the quick lookup + # tables. + "current_state_events": current_state_events, + "joined_members": membership_counts.get(Membership.JOIN, 0), + "invited_members": membership_counts.get( + Membership.INVITE, 0 + ), + "left_members": membership_counts.get(Membership.LEAVE, 0), + "banned_members": membership_counts.get(Membership.BAN, 0), + }, + ) + + # we use this count for rate-limiting + return room_total_event_count + + room_event_count = yield self.runInteraction( + "update_room_stats", _fetch_data + ) + + # Update the remaining counter. + progress["remaining"] -= 1 + + processed_event_count += room_event_count + + if processed_event_count > batch_size: + # Don't process any more rooms, we've hit our batch size. + return processed_event_count + + yield self.runInteraction( + "populate_stats", + self._background_update_progress_txn, + "populate_stats_process_rooms", + progress, + ) + + return processed_event_count + def get_stats_positions(self, for_initial_processor=False): """ Returns the stats processor positions. @@ -555,7 +1010,7 @@ def update_total_event_and_bytes_count_between_txn(self, txn, low_pos, high_pos) # nothing to do here. return - now = self.hs.clock.time_msec() + now = self.clock.time_msec() # we choose comparators based on the signs low_comparator = "<=" if low_pos < 0 else "<" @@ -587,3 +1042,36 @@ def update_total_event_and_bytes_count_between_txn(self, txn, low_pos, high_pos) room_id, {"total_events": new_events, "total_event_bytes": new_bytes}, ) + + def _count_events_and_bytes_in_room_txn(self, txn, room_id, low_token, high_token): + """ + Count the number of events and event bytes in a room between two tokens, + inclusive. + Args: + txn (cursor): The database + room_id (str): The ID of the room to count events for + low_token (int): the minimum stream ordering to count + high_token (int): the maximum stream ordering to count + + Returns (tuple[int, int]): + First element (int): + the number of events + Second element (int): + the number of bytes in events' event JSON + """ + + if isinstance(self.database_engine, PostgresEngine): + bytes_expression = "OCTET_LENGTH(json)" + else: + bytes_expression = "LENGTH(CAST(json AS BLOB))" + + sql = """ + SELECT COUNT(*) AS num_events, SUM(%s) AS num_bytes + FROM events + JOIN event_json USING (event_id) + WHERE room_id = ? + AND ? <= stream_ordering + AND stream_ordering <= ? + """ % (bytes_expression,) + txn.execute(sql, (room_id, low_token, high_token)) + return txn.fetchone() From 893729a3a6bc0cff6cc803206e413bf746520371 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 30 Aug 2019 14:36:18 +0100 Subject: [PATCH 02/10] Code formatting Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 38 ++++++++++++++++++++------------------ 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 58aa8a7e4998..9386e9e15038 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -17,15 +17,12 @@ import logging from itertools import chain -from synapse.storage.prepare_database import get_statements - -from synapse.storage.engines import Sqlite3Engine from twisted.internet import defer - -from synapse.api.constants import Membership, EventTypes from twisted.internet.defer import DeferredLock +from synapse.api.constants import EventTypes, Membership from synapse.storage import PostgresEngine +from synapse.storage.engines import Sqlite3Engine from synapse.storage.state_deltas import StateDeltasStore from synapse.util.caches.descriptors import cached @@ -189,15 +186,20 @@ def _make_skeletons(txn, stats_type): table, id_col = TYPE_TO_TABLE[stats_type] origin_table = TYPE_TO_ORIGIN_TABLE[stats_type] - zero_cols = list(chain(ABSOLUTE_STATS_FIELDS[stats_type], PER_SLICE_FIELDS[stats_type])) + zero_cols = list( + chain(ABSOLUTE_STATS_FIELDS[stats_type], PER_SLICE_FIELDS[stats_type]) + ) - txn.execute(sql % { - "table": table, - "id_col": id_col, - "origin_table": origin_table, - "zero_cols": zero_cols, - "zeroes": ", ".join(["0"] * len(zero_cols)) - }) + txn.execute( + sql + % { + "table": table, + "id_col": id_col, + "origin_table": origin_table, + "zero_cols": zero_cols, + "zeroes": ", ".join(["0"] * len(zero_cols)), + } + ) def _delete_dirty_skeletons(txn): """ @@ -493,7 +495,7 @@ def _fetch_data(txn): room_id, { "total_events": room_total_event_count, - "total_event_bytes": room_total_event_bytes + "total_event_bytes": room_total_event_bytes, }, complete_with_stream_id=current_token, absolute_field_overrides={ @@ -503,9 +505,7 @@ def _fetch_data(txn): # tables. "current_state_events": current_state_events, "joined_members": membership_counts.get(Membership.JOIN, 0), - "invited_members": membership_counts.get( - Membership.INVITE, 0 - ), + "invited_members": membership_counts.get(Membership.INVITE, 0), "left_members": membership_counts.get(Membership.LEAVE, 0), "banned_members": membership_counts.get(Membership.BAN, 0), }, @@ -1072,6 +1072,8 @@ def _count_events_and_bytes_in_room_txn(self, txn, room_id, low_token, high_toke WHERE room_id = ? AND ? <= stream_ordering AND stream_ordering <= ? - """ % (bytes_expression,) + """ % ( + bytes_expression, + ) txn.execute(sql, (room_id, low_token, high_token)) return txn.fetchone() From 440c60ef8f6840c48c388e5f9c9789743f3d172c Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 30 Aug 2019 15:05:57 +0100 Subject: [PATCH 03/10] Some fixes that have become necessary due to changes in other PRs Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 9386e9e15038..5f8ab9464f21 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -157,9 +157,8 @@ def _wedge_incremental_processor(txn): and return its previous positions – atomically. """ - with self.stats_delta_processing_lock: - old = self._get_stats_positions_txn(txn, for_initial_processor=False) - self._update_stats_positions_txn(txn, None, for_initial_processor=False) + old = self._get_stats_positions_txn(txn, for_initial_processor=False) + self._update_stats_positions_txn(txn, None, for_initial_processor=False) return old @@ -210,13 +209,17 @@ def _delete_dirty_skeletons(txn): WHERE completed_delta_stream_id IS NULL """ - for _k, (table, id_col) in TYPE_TO_TABLE: + for _k, (table, id_col) in TYPE_TO_TABLE.items(): txn.execute(sql % (table,)) # first wedge the incremental processor and reset our promise - old_positions = yield self.runInteraction( - "populate_stats_wedge", _wedge_incremental_processor - ) + yield self.stats_delta_processing_lock.acquire() + try: + old_positions = yield self.runInteraction( + "populate_stats_wedge", _wedge_incremental_processor + ) + finally: + yield self.stats_delta_processing_lock.release() if None in old_positions.values(): old_positions = None @@ -229,7 +232,8 @@ def _delete_dirty_skeletons(txn): yield self._unwedge_incremental_processor(old_positions) - yield self.runInteraction("populate_stats_make_skeletons", _make_skeletons) + yield self.runInteraction("populate_stats_make_skeletons", _make_skeletons, "room") + yield self.runInteraction("populate_stats_make_skeletons", _make_skeletons, "user") self.get_earliest_token_for_stats.invalidate_all() yield self._end_background_update("populate_stats_prepare") From 065042c3253bc8c1176eb5dc8e7e1ed682731325 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 30 Aug 2019 15:08:26 +0100 Subject: [PATCH 04/10] Code formatting and typo pointed out by Erik. Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 5f8ab9464f21..4925f5b62bb5 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -232,8 +232,12 @@ def _delete_dirty_skeletons(txn): yield self._unwedge_incremental_processor(old_positions) - yield self.runInteraction("populate_stats_make_skeletons", _make_skeletons, "room") - yield self.runInteraction("populate_stats_make_skeletons", _make_skeletons, "user") + yield self.runInteraction( + "populate_stats_make_skeletons", _make_skeletons, "room" + ) + yield self.runInteraction( + "populate_stats_make_skeletons", _make_skeletons, "user" + ) self.get_earliest_token_for_stats.invalidate_all() yield self._end_background_update("populate_stats_prepare") @@ -264,10 +268,11 @@ def _get_next_batch(txn): # Get how many are left to process, so we can give status on how # far we are in processing - txn.execute( - "SELECT COUNT(*) FROM room_stats_current" - " WHERE completed_delta_stream_id IS NULL" - ) + sql = """ + SELECT COUNT(*) FROM user_stats_current + WHERE completed_delta_stream_id IS NULL + """ + txn.execute(sql) progress["remaining"] = txn.fetchone()[0] return users_to_work_on @@ -389,10 +394,10 @@ def _get_next_batch(txn): # Get how many are left to process, so we can give status on how # far we are in processing - txn.execute( - "SELECT COUNT(*) FROM room_stats_current" - " WHERE completed_delta_stream_id IS NULL" - ) + sql = """ + SELECT COUNT(*) FROM room_stats_current + WHERE completed_delta_stream_id IS NULL + """ progress["remaining"] = txn.fetchone()[0] return rooms_to_work_on From 0f2e59f4f171ec84ba73f99d2ba69576ce3baa6f Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 30 Aug 2019 15:11:03 +0100 Subject: [PATCH 05/10] Fix that became apparent after unit testing Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 4925f5b62bb5..adb66a1340aa 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -195,7 +195,7 @@ def _make_skeletons(txn, stats_type): "table": table, "id_col": id_col, "origin_table": origin_table, - "zero_cols": zero_cols, + "zero_cols": ", ".join(zero_cols), "zeroes": ", ".join(["0"] * len(zero_cols)), } ) From b379a11dc22a003494f30820c2c1d6f98f801fda Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 30 Aug 2019 15:13:41 +0100 Subject: [PATCH 06/10] `users` table's ID field is actually called `name`. Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index adb66a1340aa..5591079caefa 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -52,8 +52,8 @@ TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")} -# these are the tables which contain our actual subjects -TYPE_TO_ORIGIN_TABLE = {"room": "rooms", "user": "users"} +# these are the tables (& ID columns) which contain our actual subjects +TYPE_TO_ORIGIN_TABLE = {"room": ("rooms", "room_id"), "user": ("users", "name")} class StatsStore(StateDeltasStore): @@ -173,18 +173,18 @@ def _make_skeletons(txn, stats_type): sql = """ INSERT OR IGNORE INTO %(table)s_current (%(id_col)s, completed_delta_stream_id, %(zero_cols)s) - SELECT %(id_col)s, NULL, %(zeroes)s FROM %(origin_table)s + SELECT %(origin_id_col)s, NULL, %(zeroes)s FROM %(origin_table)s """ else: sql = """ INSERT INTO %(table)s_current (%(id_col)s, completed_delta_stream_id, %(zero_cols)s) - SELECT %(id_col)s, NULL, %(zeroes)s FROM %(origin_table)s + SELECT %(origin_id_col)s, NULL, %(zeroes)s FROM %(origin_table)s ON CONFLICT DO NOTHING """ table, id_col = TYPE_TO_TABLE[stats_type] - origin_table = TYPE_TO_ORIGIN_TABLE[stats_type] + origin_table, origin_id_col = TYPE_TO_ORIGIN_TABLE[stats_type] zero_cols = list( chain(ABSOLUTE_STATS_FIELDS[stats_type], PER_SLICE_FIELDS[stats_type]) ) @@ -194,6 +194,7 @@ def _make_skeletons(txn, stats_type): % { "table": table, "id_col": id_col, + "origin_id_col": origin_id_col, "origin_table": origin_table, "zero_cols": ", ".join(zero_cols), "zeroes": ", ".join(["0"] * len(zero_cols)), From 4ecc62b0b6d2608c912028cc72a2ff01b74d4f7d Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 30 Aug 2019 15:19:50 +0100 Subject: [PATCH 07/10] Whoops, took out a line there... Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 5591079caefa..3bcfcaaeaea2 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -399,6 +399,7 @@ def _get_next_batch(txn): SELECT COUNT(*) FROM room_stats_current WHERE completed_delta_stream_id IS NULL """ + txn.execute(sql) progress["remaining"] = txn.fetchone()[0] return rooms_to_work_on From d39c09c28408b28612484ba1984b74830d12319e Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 30 Aug 2019 15:23:55 +0100 Subject: [PATCH 08/10] Ambiguous `room_id` Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 3bcfcaaeaea2..ac08836d933a 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -1080,7 +1080,7 @@ def _count_events_and_bytes_in_room_txn(self, txn, room_id, low_token, high_toke SELECT COUNT(*) AS num_events, SUM(%s) AS num_bytes FROM events JOIN event_json USING (event_id) - WHERE room_id = ? + WHERE events.room_id = ? AND ? <= stream_ordering AND stream_ordering <= ? """ % ( From 50c321d8f3b28d463b3cb98cf3815541d54c82c1 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 30 Aug 2019 15:25:33 +0100 Subject: [PATCH 09/10] Adapt to use renamed `room_state` Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index ac08836d933a..2c7f57c25119 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -322,7 +322,7 @@ def _process_user(txn): ) AS is_public, COUNT(*) AS count FROM room_memberships - JOIN room_state USING (room_id) + JOIN room_stats_state USING (room_id) WHERE user_id = ? AND membership = 'join' GROUP BY is_public From fca3a9c121e3ad463759e6bdb54442712fc819cd Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 30 Aug 2019 17:01:24 +0100 Subject: [PATCH 10/10] Fix to use milliseconds Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 2c7f57c25119..804be45c5f1b 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -308,7 +308,7 @@ def _get_next_batch(txn): return 1 for (user_id,) in users_to_work_on: - now = self.hs.get_reactor().seconds() + now = self.clock.time_msec() def _process_user(txn): # Get the current token