Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[elasticsearch] Get information about pending_tasks #1507

Merged
merged 2 commits into from
May 15, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 43 additions & 10 deletions checks.d/elastic.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# stdlib
from collections import namedtuple
from collections import namedtuple, defaultdict
import socket
import subprocess
import time
Expand All @@ -11,12 +11,14 @@
# project
from checks import AgentCheck
from config import _is_affirmative
from util import headers, Platform
from utils.platform import Platform
from util import headers


class NodeNotFound(Exception):
pass


ESInstanceConfig = namedtuple(
'ESInstanceConfig', [
'is_external',
Expand Down Expand Up @@ -162,6 +164,12 @@ class ESCheck(AgentCheck):
"elasticsearch.cluster_status": ("gauge", "status", lambda v: {"red": 0, "yellow": 1, "green": 2}.get(v, -1)),
}

CLUSTER_PENDING_TASKS = {
"elasticsearch.pending_tasks_total": ("gauge", "pending_task_total"),
"elasticsearch.pending_tasks_priority_high": ("gauge", "pending_tasks_priority_high"),
"elasticsearch.pending_tasks_priority_urgent": ("gauge", "pending_tasks_priority_urgent")
}

SOURCE_TYPE_NAME = 'elasticsearch'

def __init__(self, name, init_config, agentConfig, instances=None):
Expand Down Expand Up @@ -214,19 +222,24 @@ def check(self, instance):
# (URLs and metrics) accordingly
version = self._get_es_version(config)

health_url, nodes_url, stats_url, stats_metrics = self._define_params(
version, config.is_external)
health_url, nodes_url, stats_url, pending_tasks_url, stats_metrics\
= self._define_params(version, config.is_external)

# Load stats data.
stats_url = urlparse.urljoin(config.url, stats_url)
stats_data = self._get_data(stats_url, config)
self._process_stats_data(stats_data, stats_metrics, config)
self._process_stats_data(nodes_url, stats_data, stats_metrics, config)

# Load the health data.
health_url = urlparse.urljoin(config.url, health_url)
health_data = self._get_data(health_url, config)
self._process_health_data(health_data, config)

# Load the pending_tasks data.
pending_tasks_url = urlparse.urljoin(config.url, pending_tasks_url)
pending_tasks_data = self._get_data(pending_tasks_url, config)
self._process_pending_tasks_data(pending_tasks_data, config)

# If we're here we did not have any ES conn issues
self.service_check(
self.SERVICE_CHECK_CONNECT_NAME,
Expand Down Expand Up @@ -259,6 +272,7 @@ def _define_params(self, version, is_external):
# ES versions 0.90.10 and above
health_url = "/_cluster/health?pretty=true"
nodes_url = "/_nodes?network=true"
pending_tasks_url = "/_cluster/pending_tasks?pretty=true"

# For "external" clusters, we want to collect from all nodes.
if is_external:
Expand All @@ -270,6 +284,7 @@ def _define_params(self, version, is_external):
else:
health_url = "/_cluster/health?pretty=true"
nodes_url = "/_cluster/nodes?network=true"
pending_tasks_url = None
if is_external:
stats_url = "/_cluster/nodes/stats?all=true"
else:
Expand All @@ -289,7 +304,7 @@ def _define_params(self, version, is_external):

stats_metrics.update(additional_metrics)

return health_url, nodes_url, stats_url, stats_metrics
return health_url, nodes_url, stats_url, pending_tasks_url, stats_metrics

def _get_data(self, url, config, send_sc=True):
""" Hit a given URL and return the parsed json
Expand Down Expand Up @@ -320,7 +335,24 @@ def _get_data(self, url, config, send_sc=True):

return resp.json()

def _process_stats_data(self, data, stats_metrics, config):
def _process_pending_tasks_data(self, data, config):
p_tasks = defaultdict(int)

for task in data.get('tasks', []):
p_tasks[task.get('priority')] += 1

node_data = {
'pending_task_total': sum(p_tasks.values()),
'pending_tasks_priority_high': p_tasks['high'],
'pending_tasks_priority_urgent': p_tasks['urgent'],
}

for metric in self.CLUSTER_PENDING_TASKS:
# metric description
desc = self.CLUSTER_PENDING_TASKS[metric]
self._process_metric(node_data, metric, *desc, tags=config.tags)

def _process_stats_data(self, nodes_url, data, stats_metrics, config):
is_external = config.is_external
for node_name in data['nodes']:
node_data = data['nodes'][node_name]
Expand All @@ -329,7 +361,7 @@ def _process_stats_data(self, data, stats_metrics, config):
'hostname', node_data.get('host', None))
should_process = (
is_external or self.should_process_node(
node_name, node_hostname, config))
nodes_url, node_name, node_hostname, config))

# Override the metric hostname if we're hitting an external cluster
metric_hostname = node_hostname if is_external else None
Expand All @@ -341,7 +373,7 @@ def _process_stats_data(self, data, stats_metrics, config):
node_data, metric, *desc, tags=config.tags,
hostname=metric_hostname)

def should_process_node(self, node_name, node_hostname, config):
def should_process_node(self, nodes_url, node_name, node_hostname, config):
""" The node stats API will return stats for every node so we
want to filter out nodes that we don't care about.
"""
Expand All @@ -355,11 +387,12 @@ def should_process_node(self, node_name, node_hostname, config):
if node_hostname.decode('utf-8') in hostnames:
return True
else:
# FIXME 6.x : deprecate this code, it's EOL'd
# ES < 0.19
# Fetch interface address from ifconfig or ip addr and check
# against the primary IP from ES
try:
nodes_url = urlparse.urljoin(config.url, self.NODES_URL)
nodes_url = urlparse.urljoin(config.url, nodes_url)
primary_addr = self._get_primary_addr(
nodes_url, node_name, config)
except NodeNotFound:
Expand Down
3 changes: 1 addition & 2 deletions tests/checks/integration/test_elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,7 @@ def test_check(self):
count=1)


status = AgentCheck.OK if os.environ.get("TRAVIS")\
else AgentCheck.CRITICAL
status = AgentCheck.OK
# Travis doesn't have any shards in the cluster and consider this as green
self.assertServiceCheck('elasticsearch.cluster_health',
status=status, tags=good_sc_tags,
Expand Down