From d40d04bff81f162f192f6ce4b63e913d53040907 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Wed, 5 Jun 2019 20:11:56 +1000 Subject: [PATCH 01/12] initial work to support reporting non-incrementing buckets --- synapse/metrics/__init__.py | 31 ++++++++++++++++++++++++++++++- synapse/storage/events.py | 32 +++++++++++++++++++++++++++++++- 2 files changed, 61 insertions(+), 2 deletions(-) diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index ef48984fdd27..e4f46e5ec1db 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -25,7 +25,7 @@ import attr from prometheus_client import Counter, Gauge, Histogram -from prometheus_client.core import REGISTRY, GaugeMetricFamily +from prometheus_client.core import REGISTRY, GaugeMetricFamily, HistogramMetricFamily from twisted.internet import reactor @@ -193,6 +193,35 @@ def _register_with_collector(self): all_gauges[self.name] = self +@attr.s(hash=True) +class BucketCollector(object): + + name = attr.ib() + data_collector = attr.ib() + + def collect(self): + + # Fetch the data -- this must be synchronous! + data = self.data_collector() + + res = [] + for i in sorted(data.keys()): + res.append([i, data[i]]) + + res.append(["+Inf", 0]) + + metric = HistogramMetricFamily(self.name, "", buckets=res, sum_value=sum(data.values())) + yield metric + + def __attrs_post_init__(self): + if self.name in all_gauges.keys(): + logger.warning("%s already registered, reregistering" % (self.name,)) + REGISTRY.unregister(all_gauges.pop(self.name)) + + REGISTRY.register(self) + all_gauges[self.name] = self + + # # Detailed CPU metrics # diff --git a/synapse/storage/events.py b/synapse/storage/events.py index f9162be9b90a..4e0dee4a9fe0 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -17,7 +17,7 @@ import itertools import logging -from collections import OrderedDict, deque, namedtuple +from collections import OrderedDict, deque, namedtuple, defaultdict from functools import wraps from six import iteritems, text_type @@ -33,6 +33,7 @@ from synapse.api.errors import SynapseError from synapse.events import EventBase # noqa: F401 from synapse.events.snapshot import EventContext # noqa: F401 +from synapse.metrics import BucketCollector from synapse.metrics.background_process_metrics import run_as_background_process from synapse.state import StateResolutionStore from synapse.storage.background_updates import BackgroundUpdateStore @@ -227,6 +228,35 @@ def __init__(self, db_conn, hs): self._event_persist_queue = _EventPeristenceQueue() self._state_resolution_handler = hs.get_state_resolution_handler() + # Collect metrics on the number of forward extremities that exist. + self._current_forward_extremities_amount = {} + + BucketCollector( + "synapse_forward_extremities", + lambda: self._current_forward_extremities_amount + ) + + # Read the extrems every 60 sec + hs.get_clock().looping_call(self._read_forward_extremities, 60000) + + @defer.inlineCallbacks + def _read_forward_extremities(self): + + def fetch(txn): + txn.execute( + "select room_id, count(*) c from event_forward_extremities group by room_id" + ) + return txn.fetchall() + + res = self.runInteraction("read_forward_extremities", fetch) + + d = defaultdict(default=0) + + for i in res: + d[res[1]] += 1 + + self._current_forward_extremities_amount = d + @defer.inlineCallbacks def persist_events(self, events_and_contexts, backfilled=False): """ From cc42f8691dd32a10a1ab04ce6dcfe7770c363851 Mon Sep 17 00:00:00 2001 From: "Amber H. Brown" Date: Fri, 7 Jun 2019 19:26:41 +1000 Subject: [PATCH 02/12] run black --- scripts/generate_signing_key.py | 2 +- synapse/metrics/__init__.py | 51 ++++++++++++++++++++------------- synapse/storage/events.py | 25 ++++++---------- 3 files changed, 40 insertions(+), 38 deletions(-) diff --git a/scripts/generate_signing_key.py b/scripts/generate_signing_key.py index ba3ba9739574..36e9140b5017 100755 --- a/scripts/generate_signing_key.py +++ b/scripts/generate_signing_key.py @@ -16,7 +16,7 @@ import argparse import sys -from signedjson.key import write_signing_keys, generate_signing_key +from signedjson.key import generate_signing_key, write_signing_keys from synapse.util.stringutils import random_string diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index e4f46e5ec1db..f02ccdb3edc0 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -40,7 +40,6 @@ class RegistryProxy(object): - @staticmethod def collect(): for metric in REGISTRY.collect(): @@ -63,10 +62,7 @@ def collect(self): try: calls = self.caller() except Exception: - logger.exception( - "Exception running callback for LaterGauge(%s)", - self.name, - ) + logger.exception("Exception running callback for LaterGauge(%s)", self.name) yield g return @@ -116,9 +112,7 @@ def __init__(self, name, desc, labels, sub_metrics): # Create a class which have the sub_metrics values as attributes, which # default to 0 on initialization. Used to pass to registered callbacks. self._metrics_class = attr.make_class( - "_MetricsEntry", - attrs={x: attr.ib(0) for x in sub_metrics}, - slots=True, + "_MetricsEntry", attrs={x: attr.ib(0) for x in sub_metrics}, slots=True ) # Counts number of in flight blocks for a given set of label values @@ -157,7 +151,9 @@ def collect(self): Note: may be called by a separate thread. """ - in_flight = GaugeMetricFamily(self.name + "_total", self.desc, labels=self.labels) + in_flight = GaugeMetricFamily( + self.name + "_total", self.desc, labels=self.labels + ) metrics_by_key = {} @@ -179,7 +175,9 @@ def collect(self): yield in_flight for name in self.sub_metrics: - gauge = GaugeMetricFamily("_".join([self.name, name]), "", labels=self.labels) + gauge = GaugeMetricFamily( + "_".join([self.name, name]), "", labels=self.labels + ) for key, metrics in six.iteritems(metrics_by_key): gauge.add_metric(key, getattr(metrics, name)) yield gauge @@ -210,7 +208,9 @@ def collect(self): res.append(["+Inf", 0]) - metric = HistogramMetricFamily(self.name, "", buckets=res, sum_value=sum(data.values())) + metric = HistogramMetricFamily( + self.name, "", buckets=res, sum_value=sum(data.values()) + ) yield metric def __attrs_post_init__(self): @@ -226,8 +226,8 @@ def __attrs_post_init__(self): # Detailed CPU metrics # -class CPUMetrics(object): +class CPUMetrics(object): def __init__(self): ticks_per_sec = 100 try: @@ -266,13 +266,28 @@ def collect(self): "python_gc_time", "Time taken to GC (sec)", ["gen"], - buckets=[0.0025, 0.005, 0.01, 0.025, 0.05, 0.10, 0.25, 0.50, 1.00, 2.50, - 5.00, 7.50, 15.00, 30.00, 45.00, 60.00], + buckets=[ + 0.0025, + 0.005, + 0.01, + 0.025, + 0.05, + 0.10, + 0.25, + 0.50, + 1.00, + 2.50, + 5.00, + 7.50, + 15.00, + 30.00, + 45.00, + 60.00, + ], ) class GCCounts(object): - def collect(self): cm = GaugeMetricFamily("python_gc_counts", "GC object counts", labels=["gen"]) for n, m in enumerate(gc.get_count()): @@ -308,9 +323,7 @@ def collect(self): events_processed_counter = Counter("synapse_federation_client_events_processed", "") event_processing_loop_counter = Counter( - "synapse_event_processing_loop_count", - "Event processing loop iterations", - ["name"], + "synapse_event_processing_loop_count", "Event processing loop iterations", ["name"] ) event_processing_loop_room_count = Counter( @@ -340,7 +353,6 @@ def collect(self): class ReactorLastSeenMetric(object): - def collect(self): cm = GaugeMetricFamily( "python_twisted_reactor_last_seen", @@ -354,7 +366,6 @@ def collect(self): def runUntilCurrentTimer(func): - @functools.wraps(func) def f(*args, **kwargs): now = reactor.seconds() diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 4e0dee4a9fe0..9537cdb6157b 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -17,7 +17,7 @@ import itertools import logging -from collections import OrderedDict, deque, namedtuple, defaultdict +from collections import OrderedDict, defaultdict, deque, namedtuple from functools import wraps from six import iteritems, text_type @@ -221,7 +221,6 @@ class EventsStore( EventsWorkerStore, BackgroundUpdateStore, ): - def __init__(self, db_conn, hs): super(EventsStore, self).__init__(db_conn, hs) @@ -233,7 +232,7 @@ def __init__(self, db_conn, hs): BucketCollector( "synapse_forward_extremities", - lambda: self._current_forward_extremities_amount + lambda: self._current_forward_extremities_amount, ) # Read the extrems every 60 sec @@ -241,10 +240,10 @@ def __init__(self, db_conn, hs): @defer.inlineCallbacks def _read_forward_extremities(self): - def fetch(txn): txn.execute( - "select room_id, count(*) c from event_forward_extremities group by room_id" + "select room_id, count(*) c from event_forward_extremities " + "group by room_id" ) return txn.fetchall() @@ -255,7 +254,7 @@ def fetch(txn): for i in res: d[res[1]] += 1 - self._current_forward_extremities_amount = d + self._current_forward_extremities_amount = dict(d) @defer.inlineCallbacks def persist_events(self, events_and_contexts, backfilled=False): @@ -598,17 +597,11 @@ def _get_events_which_are_prevs_txn(txn, batch): ) txn.execute(sql, batch) - results.extend( - r[0] - for r in txn - if not json.loads(r[1]).get("soft_failed") - ) + results.extend(r[0] for r in txn if not json.loads(r[1]).get("soft_failed")) for chunk in batch_iter(event_ids, 100): yield self.runInteraction( - "_get_events_which_are_prevs", - _get_events_which_are_prevs_txn, - chunk, + "_get_events_which_are_prevs", _get_events_which_are_prevs_txn, chunk ) defer.returnValue(results) @@ -670,9 +663,7 @@ def _get_prevs_before_rejected_txn(txn, batch): for chunk in batch_iter(event_ids, 100): yield self.runInteraction( - "_get_prevs_before_rejected", - _get_prevs_before_rejected_txn, - chunk, + "_get_prevs_before_rejected", _get_prevs_before_rejected_txn, chunk ) defer.returnValue(existing_prevs) From 4425436e6250f9afe73681ab122930abadbabff3 Mon Sep 17 00:00:00 2001 From: "Amber H. Brown" Date: Fri, 7 Jun 2019 19:26:47 +1000 Subject: [PATCH 03/12] docstring --- changelog.d/5384.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/5384.feature diff --git a/changelog.d/5384.feature b/changelog.d/5384.feature new file mode 100644 index 000000000000..9497f521c832 --- /dev/null +++ b/changelog.d/5384.feature @@ -0,0 +1 @@ +Statistics on forward extremities per room are now exposed via Prometheus. From 859edee2a5103e70e87f8ec5781e62e96124a9e5 Mon Sep 17 00:00:00 2001 From: "Amber H. Brown" Date: Fri, 7 Jun 2019 20:10:24 +1000 Subject: [PATCH 04/12] move the create and send into the base unittest --- synapse/storage/events.py | 2 +- tests/storage/test_cleanup_extrems.py | 124 +++++++++++--------------- tests/unittest.py | 57 ++++++++++++ 3 files changed, 108 insertions(+), 75 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 9537cdb6157b..827dde19fc2c 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -247,7 +247,7 @@ def fetch(txn): ) return txn.fetchall() - res = self.runInteraction("read_forward_extremities", fetch) + res = yield self.runInteraction("read_forward_extremities", fetch) d = defaultdict(default=0) diff --git a/tests/storage/test_cleanup_extrems.py b/tests/storage/test_cleanup_extrems.py index 6aa8b8b3c679..ebbdef977adf 100644 --- a/tests/storage/test_cleanup_extrems.py +++ b/tests/storage/test_cleanup_extrems.py @@ -15,7 +15,6 @@ import os.path -from synapse.api.constants import EventTypes from synapse.storage import prepare_database from synapse.types import Requester, UserID @@ -23,8 +22,10 @@ class CleanupExtremBackgroundUpdateStoreTestCase(HomeserverTestCase): - """Test the background update to clean forward extremities table. """ + Test the background update to clean forward extremities table. + """ + def make_homeserver(self, reactor, clock): # Hack until we understand why test_forked_graph_cleanup fails with v4 config = self.default_config() @@ -33,7 +34,6 @@ def make_homeserver(self, reactor, clock): def prepare(self, reactor, clock, homeserver): self.store = homeserver.get_datastore() - self.event_creator = homeserver.get_event_creation_handler() self.room_creator = homeserver.get_room_creation_handler() # Create a test user and room @@ -42,56 +42,6 @@ def prepare(self, reactor, clock, homeserver): info = self.get_success(self.room_creator.create_room(self.requester, {})) self.room_id = info["room_id"] - def create_and_send_event(self, soft_failed=False, prev_event_ids=None): - """Create and send an event. - - Args: - soft_failed (bool): Whether to create a soft failed event or not - prev_event_ids (list[str]|None): Explicitly set the prev events, - or if None just use the default - - Returns: - str: The new event's ID. - """ - prev_events_and_hashes = None - if prev_event_ids: - prev_events_and_hashes = [[p, {}, 0] for p in prev_event_ids] - - event, context = self.get_success( - self.event_creator.create_event( - self.requester, - { - "type": EventTypes.Message, - "room_id": self.room_id, - "sender": self.user.to_string(), - "content": {"body": "", "msgtype": "m.text"}, - }, - prev_events_and_hashes=prev_events_and_hashes, - ) - ) - - if soft_failed: - event.internal_metadata.soft_failed = True - - self.get_success( - self.event_creator.send_nonmember_event(self.requester, event, context) - ) - - return event.event_id - - def add_extremity(self, event_id): - """Add the given event as an extremity to the room. - """ - self.get_success( - self.store._simple_insert( - table="event_forward_extremities", - values={"room_id": self.room_id, "event_id": event_id}, - desc="test_add_extremity", - ) - ) - - self.store.get_latest_event_ids_in_room.invalidate((self.room_id,)) - def run_background_update(self): """Re run the background update to clean up the extremities. """ @@ -131,10 +81,16 @@ def test_soft_failed_extremities_handled_correctly(self): """ # Create the room graph - event_id_1 = self.create_and_send_event() - event_id_2 = self.create_and_send_event(True, [event_id_1]) - event_id_3 = self.create_and_send_event(True, [event_id_2]) - event_id_4 = self.create_and_send_event(False, [event_id_3]) + event_id_1 = self.create_and_send_event(self.room_id, self.user) + event_id_2 = self.create_and_send_event( + self.room_id, self.user, True, [event_id_1] + ) + event_id_3 = self.create_and_send_event( + self.room_id, self.user, True, [event_id_2] + ) + event_id_4 = self.create_and_send_event( + self.room_id, self.user, False, [event_id_3] + ) # Check the latest events are as expected latest_event_ids = self.get_success( @@ -154,12 +110,16 @@ def test_basic_cleanup(self): Where SF* are soft failed, and with extremities of A and B """ # Create the room graph - event_id_a = self.create_and_send_event() - event_id_sf1 = self.create_and_send_event(True, [event_id_a]) - event_id_b = self.create_and_send_event(False, [event_id_sf1]) + event_id_a = self.create_and_send_event(self.room_id, self.user) + event_id_sf1 = self.create_and_send_event( + self.room_id, self.user, True, [event_id_a] + ) + event_id_b = self.create_and_send_event( + self.room_id, self.user, False, [event_id_sf1] + ) # Add the new extremity and check the latest events are as expected - self.add_extremity(event_id_a) + self.add_extremity(self.room_id, event_id_a) latest_event_ids = self.get_success( self.store.get_latest_event_ids_in_room(self.room_id) @@ -185,13 +145,19 @@ def test_chain_of_fail_cleanup(self): Where SF* are soft failed, and with extremities of A and B """ # Create the room graph - event_id_a = self.create_and_send_event() - event_id_sf1 = self.create_and_send_event(True, [event_id_a]) - event_id_sf2 = self.create_and_send_event(True, [event_id_sf1]) - event_id_b = self.create_and_send_event(False, [event_id_sf2]) + event_id_a = self.create_and_send_event(self.room_id, self.user) + event_id_sf1 = self.create_and_send_event( + self.room_id, self.user, True, [event_id_a] + ) + event_id_sf2 = self.create_and_send_event( + self.room_id, self.user, True, [event_id_sf1] + ) + event_id_b = self.create_and_send_event( + self.room_id, self.user, False, [event_id_sf2] + ) # Add the new extremity and check the latest events are as expected - self.add_extremity(event_id_a) + self.add_extremity(self.room_id, event_id_a) latest_event_ids = self.get_success( self.store.get_latest_event_ids_in_room(self.room_id) @@ -227,16 +193,26 @@ def test_forked_graph_cleanup(self): """ # Create the room graph - event_id_a = self.create_and_send_event() - event_id_b = self.create_and_send_event() - event_id_sf1 = self.create_and_send_event(True, [event_id_a]) - event_id_sf2 = self.create_and_send_event(True, [event_id_a, event_id_b]) - event_id_sf3 = self.create_and_send_event(True, [event_id_sf1]) - self.create_and_send_event(True, [event_id_sf2, event_id_sf3]) # SF4 - event_id_c = self.create_and_send_event(False, [event_id_sf3]) + event_id_a = self.create_and_send_event(self.room_id, self.user) + event_id_b = self.create_and_send_event(self.room_id, self.user) + event_id_sf1 = self.create_and_send_event( + self.room_id, self.user, True, [event_id_a] + ) + event_id_sf2 = self.create_and_send_event( + self.room_id, self.user, True, [event_id_a, event_id_b] + ) + event_id_sf3 = self.create_and_send_event( + self.room_id, self.user, True, [event_id_sf1] + ) + self.create_and_send_event( + self.room_id, self.user, True, [event_id_sf2, event_id_sf3] + ) # SF4 + event_id_c = self.create_and_send_event( + self.room_id, self.user, False, [event_id_sf3] + ) # Add the new extremity and check the latest events are as expected - self.add_extremity(event_id_a) + self.add_extremity(self.room_id, event_id_a) latest_event_ids = self.get_success( self.store.get_latest_event_ids_in_room(self.room_id) diff --git a/tests/unittest.py b/tests/unittest.py index 26204470b167..49a0cb1a23dc 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -27,6 +27,7 @@ from twisted.internet.defer import Deferred from twisted.trial import unittest +from synapse.api.constants import EventTypes from synapse.config.homeserver import HomeServerConfig from synapse.http.server import JsonResource from synapse.http.site import SynapseRequest @@ -441,3 +442,59 @@ def login(self, username, password, device_id=None): access_token = channel.json_body["access_token"] return access_token + + def create_and_send_event( + self, room_id, user, soft_failed=False, prev_event_ids=None + ): + """ + Create and send an event. + + Args: + soft_failed (bool): Whether to create a soft failed event or not + prev_event_ids (list[str]|None): Explicitly set the prev events, + or if None just use the default + + Returns: + str: The new event's ID. + """ + event_creator = self.hs.get_event_creation_handler() + + prev_events_and_hashes = None + if prev_event_ids: + prev_events_and_hashes = [[p, {}, 0] for p in prev_event_ids] + + event, context = self.get_success( + event_creator.create_event( + self.requester, + { + "type": EventTypes.Message, + "room_id": room_id, + "sender": user.to_string(), + "content": {"body": "", "msgtype": "m.text"}, + }, + prev_events_and_hashes=prev_events_and_hashes, + ) + ) + + if soft_failed: + event.internal_metadata.soft_failed = True + + self.get_success( + event_creator.send_nonmember_event(self.requester, event, context) + ) + + return event.event_id + + def add_extremity(self, room_id, event_id): + """ + Add the given event as an extremity to the room. + """ + self.get_success( + self.hs.get_datastore()._simple_insert( + table="event_forward_extremities", + values={"room_id": room_id, "event_id": event_id}, + desc="test_add_extremity", + ) + ) + + self.hs.get_datastore().get_latest_event_ids_in_room.invalidate((self.room_id,)) From 9701f7a3bcfb3c73e19537da699ab381374aa370 Mon Sep 17 00:00:00 2001 From: "Amber H. Brown" Date: Fri, 7 Jun 2019 21:25:58 +1000 Subject: [PATCH 05/12] get working --- synapse/metrics/__init__.py | 2 +- synapse/storage/events.py | 10 ++-------- tests/storage/test_cleanup_extrems.py | 6 ------ tests/unittest.py | 12 +++++++----- 4 files changed, 10 insertions(+), 20 deletions(-) diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index f02ccdb3edc0..3433d72b3b91 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -209,7 +209,7 @@ def collect(self): res.append(["+Inf", 0]) metric = HistogramMetricFamily( - self.name, "", buckets=res, sum_value=sum(data.values()) + self.name, "", buckets=res, sum_value=sum([x * y for x, y in data.items()]) ) yield metric diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 827dde19fc2c..79fe1ca663f2 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -17,7 +17,7 @@ import itertools import logging -from collections import OrderedDict, defaultdict, deque, namedtuple +from collections import Counter as c_counter, OrderedDict, deque, namedtuple from functools import wraps from six import iteritems, text_type @@ -248,13 +248,7 @@ def fetch(txn): return txn.fetchall() res = yield self.runInteraction("read_forward_extremities", fetch) - - d = defaultdict(default=0) - - for i in res: - d[res[1]] += 1 - - self._current_forward_extremities_amount = dict(d) + self._current_forward_extremities_amount = c_counter(list(x[1] for x in res)) @defer.inlineCallbacks def persist_events(self, events_and_contexts, backfilled=False): diff --git a/tests/storage/test_cleanup_extrems.py b/tests/storage/test_cleanup_extrems.py index ebbdef977adf..f4c81ef77dda 100644 --- a/tests/storage/test_cleanup_extrems.py +++ b/tests/storage/test_cleanup_extrems.py @@ -26,12 +26,6 @@ class CleanupExtremBackgroundUpdateStoreTestCase(HomeserverTestCase): Test the background update to clean forward extremities table. """ - def make_homeserver(self, reactor, clock): - # Hack until we understand why test_forked_graph_cleanup fails with v4 - config = self.default_config() - config['default_room_version'] = '1' - return self.setup_test_homeserver(config=config) - def prepare(self, reactor, clock, homeserver): self.store = homeserver.get_datastore() self.room_creator = homeserver.get_room_creation_handler() diff --git a/tests/unittest.py b/tests/unittest.py index 49a0cb1a23dc..3e81dd7ba7bd 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -32,7 +32,7 @@ from synapse.http.server import JsonResource from synapse.http.site import SynapseRequest from synapse.server import HomeServer -from synapse.types import UserID, create_requester +from synapse.types import Requester, UserID, create_requester from synapse.util.logcontext import LoggingContext from tests.server import get_clock, make_request, render, setup_test_homeserver @@ -458,6 +458,8 @@ def create_and_send_event( str: The new event's ID. """ event_creator = self.hs.get_event_creation_handler() + secrets = self.hs.get_secrets() + requester = Requester(user, None, False, None, None) prev_events_and_hashes = None if prev_event_ids: @@ -465,12 +467,12 @@ def create_and_send_event( event, context = self.get_success( event_creator.create_event( - self.requester, + requester, { "type": EventTypes.Message, "room_id": room_id, "sender": user.to_string(), - "content": {"body": "", "msgtype": "m.text"}, + "content": {"body": secrets.token_hex(), "msgtype": "m.text"}, }, prev_events_and_hashes=prev_events_and_hashes, ) @@ -480,7 +482,7 @@ def create_and_send_event( event.internal_metadata.soft_failed = True self.get_success( - event_creator.send_nonmember_event(self.requester, event, context) + event_creator.send_nonmember_event(requester, event, context) ) return event.event_id @@ -497,4 +499,4 @@ def add_extremity(self, room_id, event_id): ) ) - self.hs.get_datastore().get_latest_event_ids_in_room.invalidate((self.room_id,)) + self.hs.get_datastore().get_latest_event_ids_in_room.invalidate((room_id,)) From 910ef2019a623c5429c46bd16207a987086d0b65 Mon Sep 17 00:00:00 2001 From: "Amber H. Brown" Date: Sat, 8 Jun 2019 01:40:41 +1000 Subject: [PATCH 06/12] finish test --- synapse/metrics/__init__.py | 2 +- tests/storage/test_event_metrics.py | 77 +++++++++++++++++++++++++++++ 2 files changed, 78 insertions(+), 1 deletion(-) create mode 100644 tests/storage/test_event_metrics.py diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 3433d72b3b91..04fc4d1a61ba 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -206,7 +206,7 @@ def collect(self): for i in sorted(data.keys()): res.append([i, data[i]]) - res.append(["+Inf", 0]) + res.append(["+Inf", sum(data.values())]) metric = HistogramMetricFamily( self.name, "", buckets=res, sum_value=sum([x * y for x, y in data.items()]) diff --git a/tests/storage/test_event_metrics.py b/tests/storage/test_event_metrics.py new file mode 100644 index 000000000000..3e1c8185fe85 --- /dev/null +++ b/tests/storage/test_event_metrics.py @@ -0,0 +1,77 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.metrics import REGISTRY +from synapse.types import Requester, UserID + +from tests.unittest import HomeserverTestCase + + +class ExtremStatisticsTestCase(HomeserverTestCase): + def test_exposed_to_prometheus(self): + """ + Forward extremity counts are exposed via Prometheus. + """ + room_creator = self.hs.get_room_creation_handler() + + user = UserID("alice", "test") + requester = Requester(user, None, False, None, None) + + # Real events, forward extremities + events = [(3, 2), (6, 2), (4, 5)] + + for event_count, extrems in events: + info = self.get_success(room_creator.create_room(requester, {})) + room_id = info["room_id"] + + last_event = None + + # Make a real event chain + for i in range(event_count): + ev = self.create_and_send_event(room_id, user, False, last_event) + last_event = [ev] + + # Sprinkle in some extremities + for i in range(extrems): + ev = self.create_and_send_event(room_id, user, False, last_event) + + # Let it run for a while, then pull out the statistics from the + # Prometheus client registry + self.pump(1) + items = list( + filter( + lambda x: x.name == "synapse_forward_extremities", + list(REGISTRY.collect()), + ) + ) + + # Check the values are what we want + buckets = {} + _count = 0 + _sum = 0 + + for i in items[0].samples: + if i[0].endswith("_bucket"): + buckets[i[1]['le']] = i[2] + elif i[0].endswith("_count"): + _count = i[2] + elif i[0].endswith("_sum"): + _sum = i[2] + + # 3 buckets, 2 with 2 extrems, 1 with 5 extrems, and +Inf which is all + self.assertEqual(buckets, {2: 2, 5: 1, "+Inf": 3}) + # 3 rooms, with 9 total events + self.assertEqual(_count, 3) + self.assertEqual(_sum, 9) From 52c5a5f7fbcc54f374f410c025e0cfbb9e85dc83 Mon Sep 17 00:00:00 2001 From: "Amber H. Brown" Date: Tue, 11 Jun 2019 18:26:21 +1000 Subject: [PATCH 07/12] fix for bucketing --- synapse/metrics/__init__.py | 19 +++++++++++++++++-- synapse/storage/events.py | 5 +++-- tests/storage/test_event_metrics.py | 21 ++++++++++++++++++++- 3 files changed, 40 insertions(+), 5 deletions(-) diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 04fc4d1a61ba..5fc5813ef7c8 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -196,15 +196,24 @@ class BucketCollector(object): name = attr.ib() data_collector = attr.ib() + buckets = attr.ib() def collect(self): # Fetch the data -- this must be synchronous! data = self.data_collector() + buckets = {} + res = [] - for i in sorted(data.keys()): - res.append([i, data[i]]) + for x in data.keys(): + for i, bound in enumerate(self.buckets[:-1]): + if x <= bound: + buckets[bound] = buckets.get(bound, 0) + data[x] + break + + for i in self.buckets[:-1]: + res.append([i, buckets.get(i, 0)]) res.append(["+Inf", sum(data.values())]) @@ -214,6 +223,12 @@ def collect(self): yield metric def __attrs_post_init__(self): + self.buckets = [float(x) for x in self.buckets] + if self.buckets != sorted(self.buckets): + raise ValueError("Buckets not sorted") + + self.buckets = tuple(self.buckets) + if self.name in all_gauges.keys(): logger.warning("%s already registered, reregistering" % (self.name,)) REGISTRY.unregister(all_gauges.pop(self.name)) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 79fe1ca663f2..30b16ae17a67 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -233,10 +233,11 @@ def __init__(self, db_conn, hs): BucketCollector( "synapse_forward_extremities", lambda: self._current_forward_extremities_amount, + buckets=[1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"] ) - # Read the extrems every 60 sec - hs.get_clock().looping_call(self._read_forward_extremities, 60000) + # Read the extrems every 60 minutes + hs.get_clock().looping_call(self._read_forward_extremities, 60 * 60 * 1000) @defer.inlineCallbacks def _read_forward_extremities(self): diff --git a/tests/storage/test_event_metrics.py b/tests/storage/test_event_metrics.py index 3e1c8185fe85..99383b3df95d 100644 --- a/tests/storage/test_event_metrics.py +++ b/tests/storage/test_event_metrics.py @@ -49,7 +49,9 @@ def test_exposed_to_prometheus(self): # Let it run for a while, then pull out the statistics from the # Prometheus client registry + self.reactor.advance(60 * 60 * 1000) self.pump(1) + items = list( filter( lambda x: x.name == "synapse_forward_extremities", @@ -71,7 +73,24 @@ def test_exposed_to_prometheus(self): _sum = i[2] # 3 buckets, 2 with 2 extrems, 1 with 5 extrems, and +Inf which is all - self.assertEqual(buckets, {2: 2, 5: 1, "+Inf": 3}) + self.assertEqual( + buckets, + { + 1.0: 0, + 2.0: 2, + 3.0: 0, + 5.0: 1, + 7.0: 0, + 10.0: 0, + 15.0: 0, + 20.0: 0, + 50.0: 0, + 100.0: 0, + 200.0: 0, + 500.0: 0, + "+Inf": 3, + }, + ) # 3 rooms, with 9 total events self.assertEqual(_count, 3) self.assertEqual(_sum, 9) From beaf15ceb23662a14ea530142b003245004bb546 Mon Sep 17 00:00:00 2001 From: "Amber H. Brown" Date: Tue, 11 Jun 2019 18:27:28 +1000 Subject: [PATCH 08/12] fix for bucketing --- tests/storage/test_event_metrics.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/storage/test_event_metrics.py b/tests/storage/test_event_metrics.py index 99383b3df95d..965cfc4a86ae 100644 --- a/tests/storage/test_event_metrics.py +++ b/tests/storage/test_event_metrics.py @@ -30,7 +30,7 @@ def test_exposed_to_prometheus(self): requester = Requester(user, None, False, None, None) # Real events, forward extremities - events = [(3, 2), (6, 2), (4, 5)] + events = [(3, 2), (6, 2), (4, 6)] for event_count, extrems in events: info = self.get_success(room_creator.create_room(requester, {})) @@ -72,15 +72,15 @@ def test_exposed_to_prometheus(self): elif i[0].endswith("_sum"): _sum = i[2] - # 3 buckets, 2 with 2 extrems, 1 with 5 extrems, and +Inf which is all + # 3 buckets, 2 with 2 extrems, 1 with 6 extrems (bucketed as 7), and +Inf which is all self.assertEqual( buckets, { 1.0: 0, 2.0: 2, 3.0: 0, - 5.0: 1, - 7.0: 0, + 5.0: 0, + 7.0: 1, 10.0: 0, 15.0: 0, 20.0: 0, @@ -91,6 +91,6 @@ def test_exposed_to_prometheus(self): "+Inf": 3, }, ) - # 3 rooms, with 9 total events + # 3 rooms, with 10 total events self.assertEqual(_count, 3) - self.assertEqual(_sum, 9) + self.assertEqual(_sum, 10) From 0d73102a0ca9e0b68f98f705efd00547e1d4fd4e Mon Sep 17 00:00:00 2001 From: "Amber H. Brown" Date: Tue, 11 Jun 2019 18:28:50 +1000 Subject: [PATCH 09/12] clean up sql --- synapse/storage/events.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 30b16ae17a67..1578403f7976 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -243,13 +243,15 @@ def __init__(self, db_conn, hs): def _read_forward_extremities(self): def fetch(txn): txn.execute( - "select room_id, count(*) c from event_forward_extremities " - "group by room_id" + """ + select count(*) c from event_forward_extremities + group by room_id + """ ) return txn.fetchall() res = yield self.runInteraction("read_forward_extremities", fetch) - self._current_forward_extremities_amount = c_counter(list(x[1] for x in res)) + self._current_forward_extremities_amount = c_counter(list(x[0] for x in res)) @defer.inlineCallbacks def persist_events(self, events_and_contexts, backfilled=False): From 93e0ce9880c8d88da5ab94314ee1695b85da11e6 Mon Sep 17 00:00:00 2001 From: "Amber H. Brown" Date: Tue, 11 Jun 2019 18:29:56 +1000 Subject: [PATCH 10/12] pep8 --- tests/storage/test_event_metrics.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/storage/test_event_metrics.py b/tests/storage/test_event_metrics.py index 965cfc4a86ae..20a068f1fcc6 100644 --- a/tests/storage/test_event_metrics.py +++ b/tests/storage/test_event_metrics.py @@ -72,7 +72,8 @@ def test_exposed_to_prometheus(self): elif i[0].endswith("_sum"): _sum = i[2] - # 3 buckets, 2 with 2 extrems, 1 with 6 extrems (bucketed as 7), and +Inf which is all + # 3 buckets, 2 with 2 extrems, 1 with 6 extrems (bucketed as 7), and + # +Inf which is all self.assertEqual( buckets, { From 2b11471219da143323b7f7768a08e7a7d4b8fd00 Mon Sep 17 00:00:00 2001 From: "Amber H. Brown" Date: Thu, 13 Jun 2019 20:19:49 +1000 Subject: [PATCH 11/12] docstring and cleanup --- synapse/metrics/__init__.py | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 5fc5813ef7c8..a9d6b0248a54 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -193,6 +193,20 @@ def _register_with_collector(self): @attr.s(hash=True) class BucketCollector(object): + """ + Like a Histogram, but allows buckets to be point-in-time instead of + incrementally added to. + + Args: + name (str): Base name of metric to be exported to Prometheus. + data_collector (callable -> dict): A synchronous callable that + returns a dict mapping bucket to number of items in the + bucket. If these buckets are not the same as the buckets + given to this class, they will be remapped into them. + buckets (list[float]): List of floats/ints of the buckets to + give to Prometheus. +Inf is ignored, if given. + + """ name = attr.ib() data_collector = attr.ib() @@ -207,23 +221,26 @@ def collect(self): res = [] for x in data.keys(): - for i, bound in enumerate(self.buckets[:-1]): + for i, bound in enumerate(self.buckets): if x <= bound: buckets[bound] = buckets.get(bound, 0) + data[x] break - for i in self.buckets[:-1]: + for i in self.buckets: res.append([i, buckets.get(i, 0)]) res.append(["+Inf", sum(data.values())]) metric = HistogramMetricFamily( - self.name, "", buckets=res, sum_value=sum([x * y for x, y in data.items()]) + self.name, + "", + buckets=res + [float("+Inf")], + sum_value=sum([x * y for x, y in data.items()]), ) yield metric def __attrs_post_init__(self): - self.buckets = [float(x) for x in self.buckets] + self.buckets = [float(x) for x in self.buckets if x != "+Inf"] if self.buckets != sorted(self.buckets): raise ValueError("Buckets not sorted") From 1c9b54ad04ffa7a0981a3d65e6b98cd3f4ccc3ec Mon Sep 17 00:00:00 2001 From: "Amber H. Brown" Date: Thu, 13 Jun 2019 21:51:37 +1000 Subject: [PATCH 12/12] fix --- synapse/metrics/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index a9d6b0248a54..539c35352868 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -234,7 +234,7 @@ def collect(self): metric = HistogramMetricFamily( self.name, "", - buckets=res + [float("+Inf")], + buckets=res, sum_value=sum([x * y for x, y in data.items()]), ) yield metric