Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Maxime/rabbitmq custom tags #506

Merged
merged 3 commits into from
Jul 3, 2017
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rabbitmq/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
### Changes

* [FEATURE] Add a metric about the number of connections to rabbitmq.
* [FEATURE] Add custom tags to metrics, event and service checks.

1.1.0 / 06-05-2017
==================
Expand Down
45 changes: 23 additions & 22 deletions rabbitmq/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ def _get_config(self, instance):
base_url += '/'
username = instance.get('rabbitmq_user', 'guest')
password = instance.get('rabbitmq_pass', 'guest')
custom_tags = instance.get('tags', [])
parsed_url = urlparse.urlparse(base_url)
ssl_verify = _is_affirmative(instance.get('ssl_verify', True))
skip_proxy = _is_affirmative(instance.get('no_proxy', False))
if not ssl_verify and parsed_url.scheme == 'https':
self.log.warning('Skipping SSL cert validation for %s based on configuration.' % (base_url))

Expand Down Expand Up @@ -147,7 +147,7 @@ def _get_config(self, instance):

auth = (username, password)

return base_url, max_detailed, specified, auth, ssl_verify, skip_proxy
return base_url, max_detailed, specified, auth, ssl_verify, custom_tags

def _get_vhosts(self, instance, base_url, auth=None, ssl_verify=True):
vhosts = instance.get('vhosts')
Expand All @@ -162,28 +162,28 @@ def _get_vhosts(self, instance, base_url, auth=None, ssl_verify=True):
return vhosts

def check(self, instance):
base_url, max_detailed, specified, auth, ssl_verify, skip_proxy = self._get_config(instance)
base_url, max_detailed, specified, auth, ssl_verify, custom_tags = self._get_config(instance)
try:
# Generate metrics from the status API.
self.get_stats(instance, base_url, QUEUE_TYPE, max_detailed[QUEUE_TYPE], specified[QUEUE_TYPE],
auth=auth, ssl_verify=ssl_verify, skip_proxy=skip_proxy)
self.get_stats(instance, base_url, NODE_TYPE, max_detailed[NODE_TYPE], specified[NODE_TYPE],
auth=auth, ssl_verify=ssl_verify, skip_proxy=skip_proxy)
self.get_stats(instance, base_url, QUEUE_TYPE, max_detailed[QUEUE_TYPE], specified[QUEUE_TYPE], custom_tags,
auth=auth, ssl_verify=ssl_verify)
self.get_stats(instance, base_url, NODE_TYPE, max_detailed[NODE_TYPE], specified[NODE_TYPE], custom_tags,
auth=auth, ssl_verify=ssl_verify)

vhosts = self._get_vhosts(instance, base_url, auth=auth, ssl_verify=ssl_verify)
self.get_connections_stat(instance, base_url, CONNECTION_TYPE, vhosts,
auth=auth, ssl_verify=ssl_verify, skip_proxy=skip_proxy)
self.get_connections_stat(instance, base_url, CONNECTION_TYPE, vhosts, custom_tags,
auth=auth, ssl_verify=ssl_verify)

# Generate a service check from the aliveness API. In the case of an invalid response
# code or unparseable JSON this check will send no data.
self._check_aliveness(instance, base_url, vhosts, auth=auth, ssl_verify=ssl_verify, skip_proxy=skip_proxy)
self._check_aliveness(instance, base_url, vhosts, custom_tags, auth=auth, ssl_verify=ssl_verify)

# Generate a service check for the service status.
self.service_check('rabbitmq.status', AgentCheck.OK)
self.service_check('rabbitmq.status', AgentCheck.OK, custom_tags)

except RabbitMQException as e:
msg = "Error executing check: {}".format(e)
self.service_check('rabbitmq.status', AgentCheck.CRITICAL, message=msg)
self.service_check('rabbitmq.status', AgentCheck.CRITICAL, custom_tags, message=msg)
self.log.error(msg)

def _get_data(self, url, auth=None, ssl_verify=True, proxies={}):
Expand All @@ -196,7 +196,7 @@ def _get_data(self, url, auth=None, ssl_verify=True, proxies={}):
except ValueError as e:
raise RabbitMQException('Cannot parse JSON response from API url: {} {}'.format(url, str(e)))

def get_stats(self, instance, base_url, object_type, max_detailed, filters, auth=None, ssl_verify=True, skip_proxy=False):
def get_stats(self, instance, base_url, object_type, max_detailed, filters, custom_tags, auth=None, ssl_verify=True):
"""
instance: the check instance
base_url: the url of the rabbitmq management api (e.g. http://localhost:15672/api)
Expand Down Expand Up @@ -274,7 +274,7 @@ def get_stats(self, instance, base_url, object_type, max_detailed, filters, auth
# if no filters are specified, check everything according to the limits
if len(data) > ALERT_THRESHOLD * max_detailed:
# Post a message on the dogweb stream to warn
self.alert(base_url, max_detailed, len(data), object_type)
self.alert(base_url, max_detailed, len(data), object_type, custom_tags)

if len(data) > max_detailed:
# Display a warning in the info page
Expand All @@ -283,16 +283,17 @@ def get_stats(self, instance, base_url, object_type, max_detailed, filters, auth

for data_line in data[:max_detailed]:
# We truncate the list of nodes/queues if it's above the limit
self._get_metrics(data_line, object_type)
self._get_metrics(data_line, object_type, custom_tags)

def _get_metrics(self, data, object_type):
def _get_metrics(self, data, object_type, custom_tags):
tags = []
tag_list = TAGS_MAP[object_type]
for t in tag_list:
tag = data.get(t)
if tag:
# FIXME 6.x: remove this suffix or unify (sc doesn't have it)
tags.append('%s_%s:%s' % (TAG_PREFIX, tag_list[t], tag))
tags.extend(custom_tags)

for attribute, metric_name, operation in ATTRIBUTES[object_type]:
# Walk down through the data path, e.g. foo/bar => d['foo']['bar']
Expand All @@ -310,7 +311,7 @@ def _get_metrics(self, data, object_type):
self.log.debug("Caught ValueError for %s %s = %s with tags: %s" % (
METRIC_SUFFIX[object_type], attribute, value, tags))

def get_connections_stat(self, instance, base_url, object_type, vhosts, auth=None, ssl_verify=True, skip_proxy=False):
def get_connections_stat(self, instance, base_url, object_type, vhosts, custom_tags, auth=None, ssl_verify=True):
"""
Collect metrics on currently open connection per vhost.
"""
Expand All @@ -324,9 +325,9 @@ def get_connections_stat(self, instance, base_url, object_type, vhosts, auth=Non
stats[conn['vhost']] += 1

for vhost, nb_conn in stats.iteritems():
self.gauge('rabbitmq.connections', nb_conn, tags=['%s_vhost:%s' % (TAG_PREFIX, vhost)])
self.gauge('rabbitmq.connections', nb_conn, tags=['%s_vhost:%s' % (TAG_PREFIX, vhost)] + custom_tags)

def alert(self, base_url, max_detailed, size, object_type):
def alert(self, base_url, max_detailed, size, object_type, custom_tags):
key = "%s%s" % (base_url, object_type)
if key in self.already_alerted:
# We have already posted an event
Expand All @@ -347,21 +348,21 @@ def alert(self, base_url, max_detailed, size, object_type):
"alert_type": 'warning',
"source_type_name": SOURCE_TYPE_NAME,
"host": self.hostname,
"tags": ["base_url:%s" % base_url, "host:%s" % self.hostname],
"tags": ["base_url:%s" % base_url, "host:%s" % self.hostname] + custom_tags,
"event_object": "rabbitmq.limit.%s" % object_type,
}

self.event(event)

def _check_aliveness(self, instance, base_url, vhosts, auth=None, ssl_verify=True, skip_proxy=False):
def _check_aliveness(self, instance, base_url, vhosts, custom_tags, auth=None, ssl_verify=True):
"""
Check the aliveness API against all or a subset of vhosts. The API
will return {"status": "ok"} and a 200 response code in the case
that the check passes.
"""

for vhost in vhosts:
tags = ['vhost:%s' % vhost]
tags = ['vhost:%s' % vhost] + custom_tags
# We need to urlencode the vhost because it can be '/'.
path = u'aliveness-test/%s' % (urllib.quote_plus(vhost))
aliveness_url = urlparse.urljoin(base_url, path)
Expand Down
5 changes: 5 additions & 0 deletions rabbitmq/conf.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,8 @@ instances:
# vhosts:
# - vhost1
# - vhost2

# Optional tags to be applied to every emitted metric, service checks and events.
# tags:
# - "key:value"
# - "instance:production"
13 changes: 7 additions & 6 deletions rabbitmq/test_rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
'rabbitmq_user': 'guest',
'rabbitmq_pass': 'guest',
'queues': ['test1'],
'tags': ["tag1:1", "tag2"],
}
]
}
Expand Down Expand Up @@ -110,7 +111,7 @@ def test_check(self):
self.assertMetricTagPrefix(mname, 'rabbitmq_node', count=1)

self.assertMetric('rabbitmq.node.partitions', value=0, count=1)
self.assertMetric('rabbitmq.connections', tags=['rabbitmq_vhost:/'], value=0, count=1)
self.assertMetric('rabbitmq.connections', tags=['rabbitmq_vhost:/', "tag1:1", "tag2"], value=0, count=1)

# Queue attributes, should be only one queue fetched
# TODO: create a 'fake consumer' and get missing metrics
Expand All @@ -119,8 +120,8 @@ def test_check(self):
self.assertMetricTag('rabbitmq.queue.%s' %
mname, 'rabbitmq_queue:test1', count=1)

self.assertServiceCheckOK('rabbitmq.aliveness', tags=['vhost:/'])
self.assertServiceCheckOK('rabbitmq.status')
self.assertServiceCheckOK('rabbitmq.aliveness', tags=['vhost:/', "tag1:1", "tag2"])
self.assertServiceCheckOK('rabbitmq.status', tags=["tag1:1", "tag2"])

self.coverage_report()

Expand Down Expand Up @@ -167,7 +168,7 @@ def test_family_tagging(self):
def test_connections(self):
# no connections and no 'vhosts' list in the conf don't produce 'connections' metric
self.run_check(CONFIG)
self.assertMetric('rabbitmq.connections', tags=['rabbitmq_vhost:/'], value=0, count=1)
self.assertMetric('rabbitmq.connections', tags=['rabbitmq_vhost:/', "tag1:1", "tag2"], value=0, count=1)

# no connections with a 'vhosts' list in the conf produce one metrics per vhost
self.run_check(CONFIG_TEST_VHOSTS, force_reload=True)
Expand All @@ -180,7 +181,7 @@ def test_connections(self):
connection2 = pika.BlockingConnection()

self.run_check(CONFIG, force_reload=True)
self.assertMetric('rabbitmq.connections', tags=['rabbitmq_vhost:/'], value=2, count=1)
self.assertMetric('rabbitmq.connections', tags=['rabbitmq_vhost:/', "tag1:1", "tag2"], value=2, count=1)
self.assertMetric('rabbitmq.connections', count=1)

self.run_check(CONFIG_DEFAULT_VHOSTS, force_reload=True)
Expand Down Expand Up @@ -238,7 +239,7 @@ def test__check_aliveness(self):

# only one vhost should be OK
self.check._get_data.side_effect = [{"status": "ok"}, {}]
self.check._check_aliveness(instances['instances'][0], '', vhosts=['foo', 'bar'])
self.check._check_aliveness(instances['instances'][0], '', vhosts=['foo', 'bar'], custom_tags=[])
sc = self.check.get_service_checks()

self.assertEqual(len(sc), 2)
Expand Down