diff --git a/checks.d/elastic.py b/checks.d/elastic.py index 6aaba3a30c..00f21552bd 100644 --- a/checks.d/elastic.py +++ b/checks.d/elastic.py @@ -1,5 +1,5 @@ # stdlib -from collections import namedtuple +from collections import namedtuple, defaultdict import socket import subprocess import time @@ -11,12 +11,14 @@ # project from checks import AgentCheck from config import _is_affirmative -from util import headers, Platform +from utils.platform import Platform +from util import headers class NodeNotFound(Exception): pass + ESInstanceConfig = namedtuple( 'ESInstanceConfig', [ 'is_external', @@ -162,6 +164,12 @@ class ESCheck(AgentCheck): "elasticsearch.cluster_status": ("gauge", "status", lambda v: {"red": 0, "yellow": 1, "green": 2}.get(v, -1)), } + CLUSTER_PENDING_TASKS = { + "elasticsearch.pending_tasks_total": ("gauge", "pending_task_total"), + "elasticsearch.pending_tasks_priority_high": ("gauge", "pending_tasks_priority_high"), + "elasticsearch.pending_tasks_priority_urgent": ("gauge", "pending_tasks_priority_urgent") + } + SOURCE_TYPE_NAME = 'elasticsearch' def __init__(self, name, init_config, agentConfig, instances=None): @@ -214,19 +222,24 @@ def check(self, instance): # (URLs and metrics) accordingly version = self._get_es_version(config) - health_url, nodes_url, stats_url, stats_metrics = self._define_params( - version, config.is_external) + health_url, nodes_url, stats_url, pending_tasks_url, stats_metrics\ + = self._define_params(version, config.is_external) # Load stats data. stats_url = urlparse.urljoin(config.url, stats_url) stats_data = self._get_data(stats_url, config) - self._process_stats_data(stats_data, stats_metrics, config) + self._process_stats_data(nodes_url, stats_data, stats_metrics, config) # Load the health data. health_url = urlparse.urljoin(config.url, health_url) health_data = self._get_data(health_url, config) self._process_health_data(health_data, config) + # Load the pending_tasks data. + pending_tasks_url = urlparse.urljoin(config.url, pending_tasks_url) + pending_tasks_data = self._get_data(pending_tasks_url, config) + self._process_pending_tasks_data(pending_tasks_data, config) + # If we're here we did not have any ES conn issues self.service_check( self.SERVICE_CHECK_CONNECT_NAME, @@ -259,6 +272,7 @@ def _define_params(self, version, is_external): # ES versions 0.90.10 and above health_url = "/_cluster/health?pretty=true" nodes_url = "/_nodes?network=true" + pending_tasks_url = "/_cluster/pending_tasks?pretty=true" # For "external" clusters, we want to collect from all nodes. if is_external: @@ -270,6 +284,7 @@ def _define_params(self, version, is_external): else: health_url = "/_cluster/health?pretty=true" nodes_url = "/_cluster/nodes?network=true" + pending_tasks_url = None if is_external: stats_url = "/_cluster/nodes/stats?all=true" else: @@ -289,7 +304,7 @@ def _define_params(self, version, is_external): stats_metrics.update(additional_metrics) - return health_url, nodes_url, stats_url, stats_metrics + return health_url, nodes_url, stats_url, pending_tasks_url, stats_metrics def _get_data(self, url, config, send_sc=True): """ Hit a given URL and return the parsed json @@ -320,7 +335,24 @@ def _get_data(self, url, config, send_sc=True): return resp.json() - def _process_stats_data(self, data, stats_metrics, config): + def _process_pending_tasks_data(self, data, config): + p_tasks = defaultdict(int) + + for task in data.get('tasks', []): + p_tasks[task.get('priority')] += 1 + + node_data = { + 'pending_task_total': sum(p_tasks.values()), + 'pending_tasks_priority_high': p_tasks['high'], + 'pending_tasks_priority_urgent': p_tasks['urgent'], + } + + for metric in self.CLUSTER_PENDING_TASKS: + # metric description + desc = self.CLUSTER_PENDING_TASKS[metric] + self._process_metric(node_data, metric, *desc, tags=config.tags) + + def _process_stats_data(self, nodes_url, data, stats_metrics, config): is_external = config.is_external for node_name in data['nodes']: node_data = data['nodes'][node_name] @@ -329,7 +361,7 @@ def _process_stats_data(self, data, stats_metrics, config): 'hostname', node_data.get('host', None)) should_process = ( is_external or self.should_process_node( - node_name, node_hostname, config)) + nodes_url, node_name, node_hostname, config)) # Override the metric hostname if we're hitting an external cluster metric_hostname = node_hostname if is_external else None @@ -341,7 +373,7 @@ def _process_stats_data(self, data, stats_metrics, config): node_data, metric, *desc, tags=config.tags, hostname=metric_hostname) - def should_process_node(self, node_name, node_hostname, config): + def should_process_node(self, nodes_url, node_name, node_hostname, config): """ The node stats API will return stats for every node so we want to filter out nodes that we don't care about. """ @@ -355,11 +387,12 @@ def should_process_node(self, node_name, node_hostname, config): if node_hostname.decode('utf-8') in hostnames: return True else: + # FIXME 6.x : deprecate this code, it's EOL'd # ES < 0.19 # Fetch interface address from ifconfig or ip addr and check # against the primary IP from ES try: - nodes_url = urlparse.urljoin(config.url, self.NODES_URL) + nodes_url = urlparse.urljoin(config.url, nodes_url) primary_addr = self._get_primary_addr( nodes_url, node_name, config) except NodeNotFound: diff --git a/tests/checks/integration/test_elastic.py b/tests/checks/integration/test_elastic.py index bd5b877bde..bb5f27b7ff 100644 --- a/tests/checks/integration/test_elastic.py +++ b/tests/checks/integration/test_elastic.py @@ -221,8 +221,7 @@ def test_check(self): count=1) - status = AgentCheck.OK if os.environ.get("TRAVIS")\ - else AgentCheck.CRITICAL + status = AgentCheck.OK # Travis doesn't have any shards in the cluster and consider this as green self.assertServiceCheck('elasticsearch.cluster_health', status=status, tags=good_sc_tags,