Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migration of elastic search and couch db checks to checks.d interface #311

Merged
merged 21 commits into from
Jan 24, 2013
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Move Elastic Search check to the checks.d interface
  • Loading branch information
Remi Hakim committed Dec 20, 2012
commit 1912d1a028e775e97ee6f3c781f39d0048a082ed
302 changes: 116 additions & 186 deletions checks/db/elastic.py → checks.d/elastic.py
Original file line number Diff line number Diff line change
@@ -6,7 +6,7 @@
from datetime import datetime
import time

from checks import Check, gethostname
from checks import AgentCheck, gethostname
from util import json, headers

HEALTH_URL = "/_cluster/health?pretty=true"
@@ -23,70 +23,14 @@ def _get_data(agentConfig, url):

class NodeNotFound(Exception): pass

class ElasticSearchClusterStatus(Check):
key = "ElasticSearch"

def __init__(self, logger):
Check.__init__(self, logger)
self.cluster_status = None

def check(self, logger, config, data=None):
config_url = config.get("elasticsearch", None)

# Check if we are configured properly
if config_url is None:
return False

url = urlparse.urljoin(config_url, HEALTH_URL)
self.logger.info("Fetching elasticsearch data from: %s" % url)

try:
if not data:
data = _get_data(config, url)
if not self.cluster_status:
self.cluster_status = data['status']
if data['status'] in ["yellow", "red"]:
event = self._create_event(config)
return [event]
return []
if data['status'] != self.cluster_status:
self.cluster_status = data['status']
event = self._create_event(config)
return [event]
return []

except:
self.logger.exception('Unable to get elasticsearch statistics')
return False



def _create_event(self, agentConfig):
hostname = gethostname(agentConfig).decode('utf-8')
if self.cluster_status == "red" or self.cluster_status=="yellow":
alert_type = "error"
msg_title = "%s is %s" % (hostname, self.cluster_status)
else:
# then it should be green
alert_type == "info"
msg_title = "%s recovered as %s" % (hostname, self.cluster_status)

msg = "ElasticSearch: %s just reported as %s" % (hostname, self.cluster_status)

return { 'timestamp': int(time.mktime(datetime.utcnow().timetuple())),
'event_type': 'elasticsearch',
'host': hostname,
'api_key': agentConfig['api_key'],
'msg_text':msg,
'msg_title': msg_title,
"alert_type": alert_type,
"source_type_name": "elasticsearch",
"event_object": hostname
}


class ElasticSearch(Check):
class ElasticSearch(AgentCheck):
def __init__(self, name, init_config, agentConfig):
AgentCheck.__init__(self, name, init_config, agentConfig)

# Host status needs to persist across all checks
self.cluster_status = {}


METRICS = {
@@ -195,58 +139,66 @@ class ElasticSearch(Check):
"elasticsearch.unassigned_shards": ("gauge", "unassigned_shards"),
}

@classmethod
def _map_metric(cls, func):
"""Apply a function to all known metrics.
Used to create and sample metrics.
"""
for metric in cls.METRICS:
# metric description
desc = cls.METRICS[metric]
func(metric, *desc)

def __init__(self, logger):
Check.__init__(self, logger)
def check(self, instance):
config_url = instance.get('url')
if config_url is None:
raise Exception("An url must be specified")

def generate_metric(name, xtype, *args):
if xtype == "counter":
self.counter(name)
else:
self.gauge(name)
tags = ['url:%s' % config_url]
self.load_url(config_url, instance, tags=tags, url_suffix=STATS_URL)

self._map_metric(generate_metric)
def check_status(self, data, url):
if self.cluster_status.get(url, None) is None:
self.cluster_status[url] = data['status']
if data['status'] in ["yellow", "red"]:
event = self._create_event(self.agentConfig)
self.event(event)
if data['status'] != self.cluster_status.get(url):
self.cluster_status[url] = data['status']
event = self._create_event(config)
self.event(event)


def load_url(self, config_url, instance, tags=None, url_suffix=STATS_URL):

def _metric_not_found(self, metric, path):
self.logger.warning("Metric not found: %s -> %s", path, metric)
# Try to fetch data from the stats URL
# If only the hostname was passed, accept that and add our stats_url
# Else use the full URL as provided
if urlparse.urlparse(config_url).path == "":
url = urlparse.urljoin(config_url, url_suffix)
else:
url = config_url

self.log.info("Fetching elasticsearch data from: %s" % url)

try:
data = _get_data(self.agentConfig, url)

if url_suffix==STATS_URL:
self._process_data(self.agentConfig, data, tags=tags)
self.load_url(config_url, instance, tags=tags, url_suffix=HEALTH_URL)

def _process_metric(self, data, metric, path, xform=None):
"""data: dictionary containing all the stats
metric: datadog metric
path: corresponding path in data, flattened, e.g. thread_pool.bulk.queue
xfom: a lambda to apply to the numerical value
"""
value = data
# Traverse the nested dictionaries
for key in path.split('.'):
if value is not None:
value = value.get(key, None)
else:
break
self.check_status(data, config_url)
self._process_health_data(data, tags=tags)

if value is not None:
if xform: value = xform(value)
self.save_sample(metric, value)
else:
self._metric_not_found(metric, path)
except Exception, e:
self.log.exception('Unable to get elasticsearch statistics %s' % str(e))

def _base_es_url(self, config_url):
parsed = urlparse.urlparse(config_url)
if parsed.path == "":
return config_url
return "%s://%s" % (parsed.scheme, parsed.netloc)

def _process_data(self, agentConfig, data):
def _process_data(self, agentConfig, data, tags=None):
for node in data['nodes']:
node_data = data['nodes'][node]

def process_metric(metric, xtype, path, xform=None):
# closure over node_data
self._process_metric(node_data, metric, path, xform)
self._process_metric(node_data, metric, path, xform, tags=tags)

if 'hostname' in node_data:
# For ES >= 0.19
@@ -271,106 +223,84 @@ def process_metric(metric, xtype, path, xform=None):
if self._host_matches_node(primary_addr):
self._map_metric(process_metric)

def _process_health_data(self, agentConfig, data):
def process_metric(metric, xtype, path, xform=None):
# closure over node_data
self._process_metric(data, metric, path, xform)
self._map_metric(process_metric)
def _process_metric(self, data, metric, path, xform=None, tags=None):
"""data: dictionary containing all the stats
metric: datadog metric
path: corresponding path in data, flattened, e.g. thread_pool.bulk.queue
xfom: a lambda to apply to the numerical value
"""
value = data
# Traverse the nested dictionaries
for key in path.split('.'):
if value is not None:
value = value.get(key, None)
else:
break

if value is not None:
if xform: value = xform(value)
if self.METRICS[metric][0] == "gauge":

def _get_primary_addr(self, agentConfig, url, node_name):
''' Returns a list of primary interface addresses as seen by ES.
Used in ES < 0.19
'''
req = urllib2.Request(url, None, headers(agentConfig))
request = urllib2.urlopen(req)
response = request.read()
data = json.loads(response)

if node_name in data['nodes']:
node = data['nodes'][node_name]
if 'network' in node\
and 'primary_interface' in node['network']\
and 'address' in node['network']['primary_interface']:
return node['network']['primary_interface']['address']

raise NodeNotFound()

def _host_matches_node(self, primary_addrs):
''' For < 0.19, check if the current host matches the IP given
in the cluster nodes check `/_cluster/nodes`. Uses `ip addr` on Linux
and `ifconfig` on Mac
'''
if sys.platform == 'darwin':
ifaces = subprocess.Popen(['ifconfig'], stdout=subprocess.PIPE)
self.gauge(metric, value, tags=tags)
else:
self.rate(metric, value, tags=tags)
else:
ifaces = subprocess.Popen(['ip', 'addr'], stdout=subprocess.PIPE)
grepper = subprocess.Popen(['grep', 'inet'], stdin=ifaces.stdout,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)

ifaces.stdout.close()
out, err = grepper.communicate()

# Capture the list of interface IPs
ips = []
for iface in out.split("\n"):
iface = iface.strip()
if iface:
ips.append( iface.split(' ')[1].split('/')[0] )

# Check the interface addresses against the primary address
return primary_addrs in ips
self._metric_not_found(metric, path)

def _base_es_url(self, config_url):
parsed = urlparse.urlparse(config_url)
if parsed.path == "":
return config_url
return "%s://%s" % (parsed.scheme, parsed.netloc)
def _process_health_data(self, data, tags=None):
def process_metric(metric, xtype, path, xform=None):
# closure over node_data
self._process_metric(data, metric, path, xform, tags=tags)
self._map_metric(process_metric)

def check(self, config, url_suffix=STATS_URL):
"""Extract data from stats URL
http://www.elasticsearch.org/guide/reference/api/admin-cluster-nodes-stats.html
@classmethod
def _map_metric(cls, func):
"""Apply a function to all known metrics.
Used to create and sample metrics.
"""
for metric in cls.METRICS:
# metric description
desc = cls.METRICS[metric]
func(metric, *desc)

config_url = config.get("elasticsearch", None)

# Check if we are configured properly
if config_url is None:
return False
def _metric_not_found(self, metric, path):
self.log.warning("Metric not found: %s -> %s", path, metric)

# Try to fetch data from the stats URL
# If only the hostname was passed, accept that and add our stats_url
# Else use the full URL as provided
if urlparse.urlparse(config_url).path == "":
url = urlparse.urljoin(config_url, url_suffix)
def _create_event(self, agentConfig):
hostname = gethostname(agentConfig).decode('utf-8')
if self.cluster_status == "red" or self.cluster_status=="yellow":
alert_type = "error"
msg_title = "%s is %s" % (hostname, self.cluster_status)
else:
url = config_url
# then it should be green
alert_type = "info"
msg_title = "%s recovered as %s" % (hostname, self.cluster_status)

self.logger.info("Fetching elasticsearch data from: %s" % url)
msg = "ElasticSearch: %s just reported as %s" % (hostname, self.cluster_status)

try:
data = _get_data(config, url)
return { 'timestamp': int(time.mktime(datetime.utcnow().timetuple())),
'event_type': 'elasticsearch',
'host': hostname,
'api_key': agentConfig['api_key'],
'msg_text':msg,
'msg_title': msg_title,
"alert_type": alert_type,
"source_type_name": "elasticsearch",
"event_object": hostname
}

if url_suffix==STATS_URL:
self._process_data(config, data)
self.check(config, HEALTH_URL)
@staticmethod
def parse_agent_config(agentConfig):
if not agentConfig.get('elasticsearch'):
return False

return {
'instances': [{
'url': agentConfig.get('elasticsearch'),
}]
}

else:
self._process_health_data(config, data)

return self.get_metrics()
except:
self.logger.exception('Unable to get elasticsearch statistics')
return False


if __name__ == "__main__":
import pprint
import logging
from config import get_version
logging.basicConfig()
logger = logging.getLogger()
c = ElasticSearch(logger)
config = {"elasticsearch": "http://localhost:9200", "version": get_version(), "api_key":"apiKey 2"}
pprint.pprint(c.check(config))

4 changes: 4 additions & 0 deletions conf.d/elastic.yaml.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
init_config:

instances:
- #url: http://localhost:9200
Loading