diff --git a/aggregator.py b/aggregator.py index 90cb0e14ee..c99f973b43 100644 --- a/aggregator.py +++ b/aggregator.py @@ -394,9 +394,11 @@ def __init__(self, hostname, interval=1.0, expiry_seconds=300, histogram_aggregates=None, histogram_percentiles=None, utf8_decoding=False): self.events = [] + self.service_checks = [] self.total_count = 0 self.count = 0 self.event_count = 0 + self.service_check_count = 0 self.hostname = hostname self.expiry_seconds = expiry_seconds self.formatter = formatter or api_formatter @@ -487,6 +489,9 @@ def parse_metric_packet(self, packet): return parsed_packets + def _unescape_sc_content(self, string): + return string.replace('\\n', '\n').replace('m\:', 'm:') + def _unescape_event_text(self, string): return string.replace('\\n', '\n') @@ -527,6 +532,47 @@ def parse_event_packet(self, packet): except (IndexError, ValueError): raise Exception(u'Unparseable event packet: %s' % packet) + def parse_sc_packet(self, packet): + try: + _, data_and_metadata = packet.split('|', 1) + # Service check syntax: + # _sc|check_name|status|meta + if data_and_metadata.count('|') == 1: + # Case with no metadata + check_name, status = data_and_metadata.split('|') + metadata = '' + else: + check_name, status, metadata = data_and_metadata.split('|', 2) + + service_check = { + 'check_name': check_name, + 'status': int(status) + } + + message_delimiter = '|m:' if '|m:' in metadata else 'm:' + if message_delimiter in metadata: + meta, message = metadata.rsplit(message_delimiter, 1) + service_check['message'] = self._unescape_sc_content(message) + else: + meta = metadata + + if not meta: + return service_check + + meta = unicode(meta) + for m in meta.split('|'): + if m[0] == u'd': + service_check['timestamp'] = float(m[2:]) + elif m[0] == u'h': + service_check['hostname'] = m[2:] + elif m[0] == u'#': + service_check['tags'] = sorted(m[1:].split(u',')) + + return service_check + + except (IndexError, ValueError): + raise Exception(u'Unparseable service check packet: %s' % packet) + def submit_packets(self, packets): # We should probably consider that packets are always encoded # in utf8, but decoding all packets has an perf overhead of 7% @@ -544,6 +590,10 @@ def submit_packets(self, packets): self.event_count += 1 event = self.parse_event_packet(packet) self.event(**event) + elif packet.startswith('_sc'): + self.service_check_count += 1 + service_check = self.parse_sc_packet(packet) + self.service_check(**service_check) else: self.count += 1 parsed_packets = self.parse_metric_packet(packet) @@ -606,8 +656,27 @@ def event(self, title, text, date_happened=None, alert_type=None, aggregation_ke self.events.append(event) + def service_check(self, check_name, status, tags=None, timestamp=None, + hostname=None, message=None): + service_check = { + 'check': check_name, + 'status': status, + 'timestamp': timestamp or int(time()) + } + if tags is not None: + service_check['tags'] = sorted(tags) + + if hostname is not None: + service_check['host_name'] = hostname + else: + service_check['host_name'] = self.hostname + if message is not None: + service_check['message'] = message + + self.service_checks.append(service_check) + def flush(self): - """ Flush aggreaged metrics """ + """ Flush aggregated metrics """ raise NotImplementedError() def flush_events(self): @@ -621,6 +690,17 @@ def flush_events(self): return events + def flush_service_checks(self): + service_checks = self.service_checks + self.service_checks = [] + + self.total_count += self.service_check_count + self.service_check_count = 0 + + log.debug("Received {0} service check runs since last flush".format(len(service_checks))) + + return service_checks + def send_packet_count(self, metric_name): self.submit_metric(metric_name, self.count, 'g') diff --git a/dogstatsd.py b/dogstatsd.py index c02458c6f7..2fe1d4c4c2 100755 --- a/dogstatsd.py +++ b/dogstatsd.py @@ -134,11 +134,16 @@ def flush(self): if event_count: self.submit_events(events) + service_checks = self.metrics_aggregator.flush_service_checks() + check_count = len(service_checks) + if check_count: + self.submit_service_checks(service_checks) + should_log = self.flush_count <= FLUSH_LOGGING_INITIAL or self.log_count <= FLUSH_LOGGING_COUNT log_func = log.info if not should_log: log_func = log.debug - log_func("Flush #%s: flushed %s metric%s and %s event%s" % (self.flush_count, count, plural(count), event_count, plural(event_count))) + log_func("Flush #%s: flushed %s metric%s, %s event%s, and %s service check run%s" % (self.flush_count, count, plural(count), event_count, plural(event_count), check_count, plural(check_count))) if self.flush_count == FLUSH_LOGGING_INITIAL: log.info("First flushes done, %s flushes will be logged every %s flushes." % (FLUSH_LOGGING_COUNT, FLUSH_LOGGING_PERIOD)) @@ -185,7 +190,7 @@ def submit_events(self, events): params['api_key'] = self.api_key url = '%s/intake?%s' % (self.api_host, urlencode(params)) - self.submit_http(url, payload, headers) + self.submit_http(url, payload, headers) def submit_http(self, url, data, headers): no_proxy = { @@ -216,6 +221,16 @@ def submit_http(self, url, data, headers): pass + def submit_service_checks(self, service_checks): + headers = {'Content-Type':'application/json'} + + params = {} + if self.api_key: + params['api_key'] = self.api_key + + url = '{0}/api/v1/check_run?{1}'.format(self.api_host, urlencode(params)) + self.submit_http(url, json.dumps(service_checks), headers) + class Server(object): """ A statsd udp server. diff --git a/tests/test_dogstatsd.py b/tests/test_dogstatsd.py index 0960d68adf..dfcc5f69e4 100644 --- a/tests/test_dogstatsd.py +++ b/tests/test_dogstatsd.py @@ -21,6 +21,12 @@ def sort_by(m): return (m['msg_title'], m['msg_text'], ','.join(m.get('tags', None) or [])) return sorted(metrics, key=sort_by) + @staticmethod + def sort_service_checks(service_checks): + def sort_by(m): + return (m['check'], m['status'], m.get('message', None), ','.join(m.get('tags', None) or [])) + return sorted(service_checks, key=sort_by) + @staticmethod def assert_almost_equal(i, j, e=1): # Floating point math? @@ -671,6 +677,72 @@ def test_event_text_utf8(self): nt.assert_equal(events[0]['msg_text'], 'First line\nSecond line') nt.assert_equal(events[1]['msg_text'], u'♬ †øU †øU ¥ºu T0µ ♪') + def test_service_check_basic(self): + stats = MetricsAggregator('myhost') + stats.submit_packets('_sc|check.1|0') + stats.submit_packets('_sc|check.2|1') + stats.submit_packets('_sc|check.3|2') + + service_checks = self.sort_service_checks(stats.flush_service_checks()) + + assert len(service_checks) == 3 + first, second, third = service_checks + + nt.assert_equal(first['check'], 'check.1') + nt.assert_equal(first['status'], 0) + nt.assert_equal(second['check'], 'check.2') + nt.assert_equal(second['status'], 1) + nt.assert_equal(third['check'], 'check.3') + nt.assert_equal(third['status'], 2) + + def test_service_check_message(self): + stats = MetricsAggregator('myhost') + stats.submit_packets('_sc|check.1|0|m:testing') + stats.submit_packets('_sc|check.2|0|m:First line\\nSecond line') + stats.submit_packets(u'_sc|check.3|0|m:♬ †øU †øU ¥ºu T0µ ♪') + stats.submit_packets('_sc|check.4|0|m:|t:|m\:|d:') + + service_checks = self.sort_service_checks(stats.flush_service_checks()) + + assert len(service_checks) == 4 + first, second, third, fourth = service_checks + + nt.assert_equal(first['check'], 'check.1') + nt.assert_equal(first['message'], 'testing') + nt.assert_equal(second['check'], 'check.2') + nt.assert_equal(second['message'], 'First line\nSecond line') + nt.assert_equal(third['check'], 'check.3') + nt.assert_equal(third['message'], u'♬ †øU †øU ¥ºu T0µ ♪') + nt.assert_equal(fourth['check'], 'check.4') + nt.assert_equal(fourth['message'], '|t:|m:|d:') + + def test_service_check_tags(self): + stats = MetricsAggregator('myhost') + stats.submit_packets('_sc|check.1|0') + stats.submit_packets('_sc|check.2|0|#t1') + stats.submit_packets('_sc|check.3|0|h:i-abcd1234|#t1,t2|m:fakeout#t5') + stats.submit_packets('_sc|check.4|0|#t1,t2:v2,t3,t4') + + service_checks = self.sort_service_checks(stats.flush_service_checks()) + + assert len(service_checks) == 4 + first, second, third, fourth = service_checks + + nt.assert_equal(first['check'], 'check.1') + assert first.get('tags') is None, "service_check['tags'] shouldn't be" + \ + "defined when no tags aren't explicited in the packet" + + nt.assert_equal(second['check'], 'check.2') + nt.assert_equal(second['tags'], sorted(['t1'])) + + nt.assert_equal(third['check'], 'check.3') + nt.assert_equal(third['host_name'], 'i-abcd1234') + nt.assert_equal(third['message'], 'fakeout#t5') + nt.assert_equal(third['tags'], sorted(['t1', 't2'])) + + nt.assert_equal(fourth['check'], 'check.4') + nt.assert_equal(fourth['tags'], sorted(['t1', 't2:v2', 't3', 't4'])) + def test_recent_point_threshold(self): threshold = 100 # The min is not enabled by default