Skip to content

Commit

Permalink
[dogstatsd] add service checks handling
Browse files Browse the repository at this point in the history
Most dogstatsd clients at this point do not support sending
service checks, but from now, any client that supports them
should be able to send their packets.
See related documentation on the dogstatsd datagrams.

Details:
* added new unit testing for service checks
* several tweaks around message escaping
* service checks are submitted using the requests lib

[[email protected]] Rebased on current master, squashing some commits
and fixing conflicts with previously merged branches
  • Loading branch information
Arthur Wang authored and LeoCavaille committed Jan 28, 2015
1 parent c89f2d2 commit 44bc927
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 3 deletions.
82 changes: 81 additions & 1 deletion aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,9 +394,11 @@ def __init__(self, hostname, interval=1.0, expiry_seconds=300,
histogram_aggregates=None, histogram_percentiles=None,
utf8_decoding=False):
self.events = []
self.service_checks = []
self.total_count = 0
self.count = 0
self.event_count = 0
self.service_check_count = 0
self.hostname = hostname
self.expiry_seconds = expiry_seconds
self.formatter = formatter or api_formatter
Expand Down Expand Up @@ -487,6 +489,9 @@ def parse_metric_packet(self, packet):

return parsed_packets

def _unescape_sc_content(self, string):
return string.replace('\\n', '\n').replace('m\:', 'm:')

def _unescape_event_text(self, string):
return string.replace('\\n', '\n')

Expand Down Expand Up @@ -527,6 +532,47 @@ def parse_event_packet(self, packet):
except (IndexError, ValueError):
raise Exception(u'Unparseable event packet: %s' % packet)

def parse_sc_packet(self, packet):
try:
_, data_and_metadata = packet.split('|', 1)
# Service check syntax:
# _sc|check_name|status|meta
if data_and_metadata.count('|') == 1:
# Case with no metadata
check_name, status = data_and_metadata.split('|')
metadata = ''
else:
check_name, status, metadata = data_and_metadata.split('|', 2)

service_check = {
'check_name': check_name,
'status': int(status)
}

message_delimiter = '|m:' if '|m:' in metadata else 'm:'
if message_delimiter in metadata:
meta, message = metadata.rsplit(message_delimiter, 1)
service_check['message'] = self._unescape_sc_content(message)
else:
meta = metadata

if not meta:
return service_check

meta = unicode(meta)
for m in meta.split('|'):
if m[0] == u'd':
service_check['timestamp'] = float(m[2:])
elif m[0] == u'h':
service_check['hostname'] = m[2:]
elif m[0] == u'#':
service_check['tags'] = sorted(m[1:].split(u','))

return service_check

except (IndexError, ValueError):
raise Exception(u'Unparseable service check packet: %s' % packet)

def submit_packets(self, packets):
# We should probably consider that packets are always encoded
# in utf8, but decoding all packets has an perf overhead of 7%
Expand All @@ -544,6 +590,10 @@ def submit_packets(self, packets):
self.event_count += 1
event = self.parse_event_packet(packet)
self.event(**event)
elif packet.startswith('_sc'):
self.service_check_count += 1
service_check = self.parse_sc_packet(packet)
self.service_check(**service_check)
else:
self.count += 1
parsed_packets = self.parse_metric_packet(packet)
Expand Down Expand Up @@ -606,8 +656,27 @@ def event(self, title, text, date_happened=None, alert_type=None, aggregation_ke

self.events.append(event)

def service_check(self, check_name, status, tags=None, timestamp=None,
hostname=None, message=None):
service_check = {
'check': check_name,
'status': status,
'timestamp': timestamp or int(time())
}
if tags is not None:
service_check['tags'] = sorted(tags)

if hostname is not None:
service_check['host_name'] = hostname
else:
service_check['host_name'] = self.hostname
if message is not None:
service_check['message'] = message

self.service_checks.append(service_check)

def flush(self):
""" Flush aggreaged metrics """
""" Flush aggregated metrics """
raise NotImplementedError()

def flush_events(self):
Expand All @@ -621,6 +690,17 @@ def flush_events(self):

return events

def flush_service_checks(self):
service_checks = self.service_checks
self.service_checks = []

self.total_count += self.service_check_count
self.service_check_count = 0

log.debug("Received {0} service check runs since last flush".format(len(service_checks)))

return service_checks

def send_packet_count(self, metric_name):
self.submit_metric(metric_name, self.count, 'g')

Expand Down
19 changes: 17 additions & 2 deletions dogstatsd.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,16 @@ def flush(self):
if event_count:
self.submit_events(events)

service_checks = self.metrics_aggregator.flush_service_checks()
check_count = len(service_checks)
if check_count:
self.submit_service_checks(service_checks)

should_log = self.flush_count <= FLUSH_LOGGING_INITIAL or self.log_count <= FLUSH_LOGGING_COUNT
log_func = log.info
if not should_log:
log_func = log.debug
log_func("Flush #%s: flushed %s metric%s and %s event%s" % (self.flush_count, count, plural(count), event_count, plural(event_count)))
log_func("Flush #%s: flushed %s metric%s, %s event%s, and %s service check run%s" % (self.flush_count, count, plural(count), event_count, plural(event_count), check_count, plural(check_count)))
if self.flush_count == FLUSH_LOGGING_INITIAL:
log.info("First flushes done, %s flushes will be logged every %s flushes." % (FLUSH_LOGGING_COUNT, FLUSH_LOGGING_PERIOD))

Expand Down Expand Up @@ -185,7 +190,7 @@ def submit_events(self, events):
params['api_key'] = self.api_key
url = '%s/intake?%s' % (self.api_host, urlencode(params))

self.submit_http(url, payload, headers)
self.submit_http(url, payload, headers)

def submit_http(self, url, data, headers):
no_proxy = {
Expand Down Expand Up @@ -216,6 +221,16 @@ def submit_http(self, url, data, headers):
pass


def submit_service_checks(self, service_checks):
headers = {'Content-Type':'application/json'}

params = {}
if self.api_key:
params['api_key'] = self.api_key

url = '{0}/api/v1/check_run?{1}'.format(self.api_host, urlencode(params))
self.submit_http(url, json.dumps(service_checks), headers)

class Server(object):
"""
A statsd udp server.
Expand Down
72 changes: 72 additions & 0 deletions tests/test_dogstatsd.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ def sort_by(m):
return (m['msg_title'], m['msg_text'], ','.join(m.get('tags', None) or []))
return sorted(metrics, key=sort_by)

@staticmethod
def sort_service_checks(service_checks):
def sort_by(m):
return (m['check'], m['status'], m.get('message', None), ','.join(m.get('tags', None) or []))
return sorted(service_checks, key=sort_by)

@staticmethod
def assert_almost_equal(i, j, e=1):
# Floating point math?
Expand Down Expand Up @@ -671,6 +677,72 @@ def test_event_text_utf8(self):
nt.assert_equal(events[0]['msg_text'], 'First line\nSecond line')
nt.assert_equal(events[1]['msg_text'], u'♬ †øU †øU ¥ºu T0µ ♪')

def test_service_check_basic(self):
stats = MetricsAggregator('myhost')
stats.submit_packets('_sc|check.1|0')
stats.submit_packets('_sc|check.2|1')
stats.submit_packets('_sc|check.3|2')

service_checks = self.sort_service_checks(stats.flush_service_checks())

assert len(service_checks) == 3
first, second, third = service_checks

nt.assert_equal(first['check'], 'check.1')
nt.assert_equal(first['status'], 0)
nt.assert_equal(second['check'], 'check.2')
nt.assert_equal(second['status'], 1)
nt.assert_equal(third['check'], 'check.3')
nt.assert_equal(third['status'], 2)

def test_service_check_message(self):
stats = MetricsAggregator('myhost')
stats.submit_packets('_sc|check.1|0|m:testing')
stats.submit_packets('_sc|check.2|0|m:First line\\nSecond line')
stats.submit_packets(u'_sc|check.3|0|m:♬ †øU †øU ¥ºu T0µ ♪')
stats.submit_packets('_sc|check.4|0|m:|t:|m\:|d:')

service_checks = self.sort_service_checks(stats.flush_service_checks())

assert len(service_checks) == 4
first, second, third, fourth = service_checks

nt.assert_equal(first['check'], 'check.1')
nt.assert_equal(first['message'], 'testing')
nt.assert_equal(second['check'], 'check.2')
nt.assert_equal(second['message'], 'First line\nSecond line')
nt.assert_equal(third['check'], 'check.3')
nt.assert_equal(third['message'], u'♬ †øU †øU ¥ºu T0µ ♪')
nt.assert_equal(fourth['check'], 'check.4')
nt.assert_equal(fourth['message'], '|t:|m:|d:')

def test_service_check_tags(self):
stats = MetricsAggregator('myhost')
stats.submit_packets('_sc|check.1|0')
stats.submit_packets('_sc|check.2|0|#t1')
stats.submit_packets('_sc|check.3|0|h:i-abcd1234|#t1,t2|m:fakeout#t5')
stats.submit_packets('_sc|check.4|0|#t1,t2:v2,t3,t4')

service_checks = self.sort_service_checks(stats.flush_service_checks())

assert len(service_checks) == 4
first, second, third, fourth = service_checks

nt.assert_equal(first['check'], 'check.1')
assert first.get('tags') is None, "service_check['tags'] shouldn't be" + \
"defined when no tags aren't explicited in the packet"

nt.assert_equal(second['check'], 'check.2')
nt.assert_equal(second['tags'], sorted(['t1']))

nt.assert_equal(third['check'], 'check.3')
nt.assert_equal(third['host_name'], 'i-abcd1234')
nt.assert_equal(third['message'], 'fakeout#t5')
nt.assert_equal(third['tags'], sorted(['t1', 't2']))

nt.assert_equal(fourth['check'], 'check.4')
nt.assert_equal(fourth['tags'], sorted(['t1', 't2:v2', 't3', 't4']))

def test_recent_point_threshold(self):
threshold = 100
# The min is not enabled by default
Expand Down

0 comments on commit 44bc927

Please sign in to comment.