Skip to content

Commit

Permalink
[snmp] thread safety fixes for snmp check, improving instance cleanup…
Browse files Browse the repository at this point in the history
… in network checks.

[network] different ports should account for different instances.

[snmp] cmdgen is not threadsafe. We need one copy per instance.

[snmp] create mib builder on the right cmd_generator.

[snmp] don't overwrite cmdgen.

[snmp] cache command generators

[snmp] if a batch fails partially, try to continue, send correspondig SC level.

[snmp] fixing tests - command generator no longer an attribue also, instance key should
include port - enforce at the check-level.

[network] best effort when it comes to generating the instance_key.

[snmp] unique instance names should be enforce at the check level. Be prepared for  keys.

[snmp] use the instance key as name.
  • Loading branch information
truthbk authored and olivielpeau committed Mar 30, 2016
1 parent 23b2d65 commit cf4e903
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 56 deletions.
78 changes: 55 additions & 23 deletions checks.d/snmp.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ def reply_invalid(oid):
class SnmpCheck(NetworkCheck):

SOURCE_TYPE_NAME = 'system'
cmd_generator = None
# pysnmp default values
DEFAULT_RETRIES = 5
DEFAULT_TIMEOUT = 1
Expand All @@ -54,9 +53,11 @@ class SnmpCheck(NetworkCheck):
def __init__(self, name, init_config, agentConfig, instances):
for instance in instances:
if 'name' not in instance:
instance['name'] = instance['ip_address']
instance['name'] = self._get_instance_key(instance)
instance['skip_event'] = True

self.generators = {}

# Set OID batch size
self.oid_batch_size = int(init_config.get("oid_batch_size", DEFAULT_OID_BATCH_SIZE))

Expand All @@ -68,9 +69,6 @@ def __init__(self, name, init_config, agentConfig, instances):
self.ignore_nonincreasing_oid = _is_affirmative(
init_config.get("ignore_nonincreasing_oid", False))

# Create SNMP command generator and aliases
self.create_command_generator(self.mibs_path, self.ignore_nonincreasing_oid)

NetworkCheck.__init__(self, name, init_config, agentConfig, instances)

def _load_conf(self, instance):
Expand All @@ -81,8 +79,32 @@ def _load_conf(self, instance):
retries = int(instance.get('retries', self.DEFAULT_RETRIES))
enforce_constraints = _is_affirmative(instance.get('enforce_mib_constraints', True))

return ip_address, tags, metrics, timeout, retries, enforce_constraints

instance_key = instance['name']
cmd_generator = self.generators.get(instance_key, None)
if not cmd_generator:
cmd_generator = self.create_command_generator(self.mibs_path, self.ignore_nonincreasing_oid)
self.generators[instance_key] = cmd_generator

return cmd_generator, ip_address, tags, metrics, timeout, retries, enforce_constraints

def _get_instance_key(self, instance):
key = instance.get('name', None)
if key:
return key

host = instance.get('host', None)
ip = instance.get('ip_address', None)
port = instance.get('port', None)
if host and port:
key = "{host}:{port}".format(host=host, port=port)
elif ip and port:
key = "{host}:{port}".format(host=ip, port=port)
elif host:
key = host
elif ip:
key = ip

return key

def snmp_logger(self, func):
"""
Expand All @@ -103,18 +125,18 @@ def create_command_generator(self, mibs_path, ignore_nonincreasing_oid):
If mibs_path is not None, load the mibs present in the custom mibs
folder. (Need to be in pysnmp format)
'''
self.cmd_generator = cmdgen.CommandGenerator()
self.cmd_generator.ignoreNonIncreasingOid = ignore_nonincreasing_oid
cmd_generator = cmdgen.CommandGenerator()
cmd_generator.ignoreNonIncreasingOid = ignore_nonincreasing_oid

if mibs_path is not None:
mib_builder = self.cmd_generator.snmpEngine.msgAndPduDsp.\
mib_builder = cmd_generator.snmpEngine.msgAndPduDsp.\
mibInstrumController.mibBuilder
mib_sources = mib_builder.getMibSources() + \
(builder.DirMibSource(mibs_path), )
mib_builder.setMibSources(*mib_sources)

# Set aliases for snmpget and snmpgetnext with logging
self.snmpget = self.snmp_logger(self.cmd_generator.getCmd)
self.snmpgetnext = self.snmp_logger(self.cmd_generator.nextCmd)
return cmd_generator


@classmethod
def get_auth_data(cls, instance):
Expand Down Expand Up @@ -176,7 +198,8 @@ def raise_on_error_indication(self, error_indication, instance):
instance["service_check_error"] = message
raise Exception(message)

def check_table(self, instance, oids, lookup_names, timeout, retries, enforce_constraints=False):
def check_table(self, instance, cmd_generator, oids, lookup_names,
timeout, retries, enforce_constraints=False):
'''
Perform a snmpwalk on the domain specified by the oids, on the device
configured in instance.
Expand All @@ -193,6 +216,10 @@ def check_table(self, instance, oids, lookup_names, timeout, retries, enforce_co
# snmpgetnext -v2c -c public localhost:11111 1.36.1.2.1.25.4.2.1.7.222
# iso.3.6.1.2.1.25.4.2.1.7.224 = INTEGER: 2
# SOLUTION: perform a snmget command and fallback with snmpgetnext if not found

# Set aliases for snmpget and snmpgetnext with logging
snmpget = self.snmp_logger(cmd_generator.getCmd)
snmpgetnext = self.snmp_logger(cmd_generator.nextCmd)
transport_target = self.get_transport_target(instance, timeout, retries)
auth_data = self.get_auth_data(instance)

Expand All @@ -201,10 +228,9 @@ def check_table(self, instance, oids, lookup_names, timeout, retries, enforce_co
results = defaultdict(dict)

while first_oid < len(oids):

try:
# Start with snmpget command
error_indication, error_status, error_index, var_binds = self.snmpget(
error_indication, error_status, error_index, var_binds = snmpget(
auth_data,
transport_target,
*(oids[first_oid:first_oid + self.oid_batch_size]),
Expand All @@ -228,7 +254,7 @@ def check_table(self, instance, oids, lookup_names, timeout, retries, enforce_co

if missing_results:
# If we didn't catch the metric using snmpget, try snmpnext
error_indication, error_status, error_index, var_binds_table = self.snmpgetnext(
error_indication, error_status, error_index, var_binds_table = snmpgetnext(
auth_data,
transport_target,
*missing_results,
Expand All @@ -248,15 +274,21 @@ def check_table(self, instance, oids, lookup_names, timeout, retries, enforce_co
complete_results.extend(table_row)

all_binds.extend(complete_results)

except PySnmpError as e:
if "service_check_error" not in instance:
instance["service_check_error"] = "Fail to collect metrics: {0}".format(e)
instance["service_check_error"] = "Fail to collect some metrics: {0}".format(e)
if "service_check_severity" not in instance:
instance["service_check_severity"] = Status.CRITICAL
self.warning("Fail to collect metrics: {0}".format(e))
self.warning("Fail to collect some metrics: {0}".format(e))

# if we fail move onto next batch
first_oid = first_oid + self.oid_batch_size

# if we've collected some variables, it's not that bad.
if "service_check_severity" in instance and len(all_binds):
instance["service_check_severity"] = Status.WARNING

for result_oid, value in all_binds:
if lookup_names:
_, metric, indexes = result_oid.getMibSymbol()
Expand All @@ -274,7 +306,7 @@ def _check(self, instance):
and should be looked up and one for those specified by oids
'''

ip_address, tags, metrics, timeout, retries, enforce_constraints = self._load_conf(instance)
cmd_generator, ip_address, tags, metrics, timeout, retries, enforce_constraints = self._load_conf(instance)

tags += ['snmp_device:{0}'.format(ip_address)]

Expand All @@ -298,18 +330,18 @@ def _check(self, instance):
try:
if table_oids:
self.log.debug("Querying device %s for %s oids", ip_address, len(table_oids))
table_results = self.check_table(instance, table_oids, True, timeout, retries,
table_results = self.check_table(instance, cmd_generator, table_oids, True, timeout, retries,
enforce_constraints=enforce_constraints)
self.report_table_metrics(metrics, table_results, tags)

if raw_oids:
self.log.debug("Querying device %s for %s oids", ip_address, len(raw_oids))
raw_results = self.check_table(instance, raw_oids, False, timeout, retries,
raw_results = self.check_table(instance, cmd_generator, raw_oids, False, timeout, retries,
enforce_constraints=False)
self.report_raw_metrics(metrics, raw_results, tags)
except Exception as e:
if "service_check_error" not in instance:
instance["service_check_error"] = "Fail to collect metrics: {0}".format(e)
instance["service_check_error"] = "Fail to collect metrics for {0} - {1}".format(instance['name'], e)
self.warning(instance["service_check_error"])
return [(self.SC_STATUS, Status.CRITICAL, instance["service_check_error"])]
finally:
Expand Down
40 changes: 25 additions & 15 deletions checks/network_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class NetworkCheck(AgentCheck):
SERVICE_CHECK_PREFIX = 'network_check'

STATUS_TO_SERVICE_CHECK = {
Status.UP : AgentCheck.OK,
Status.UP : AgentCheck.OK,
Status.WARNING : AgentCheck.WARNING,
Status.CRITICAL : AgentCheck.CRITICAL,
Status.DOWN : AgentCheck.CRITICAL,
Expand Down Expand Up @@ -70,10 +70,11 @@ def __init__(self, name, init_config, agentConfig, instances):
# to keep track of statuses
names = []
for inst in instances:
if 'name' not in inst:
name = inst.get('name', None)
if not name:
raise Exception("All instances should have a 'name' parameter,"
" error on instance: {0}".format(inst))
if inst['name'] in names:
if name in names:
raise Exception("Duplicate names for instances with name {0}"
.format(inst['name']))

Expand Down Expand Up @@ -142,7 +143,7 @@ def _process(self, instance):
self.resultsq.put((status, msg, sc_name, instance))

except Exception:
result = (FAILURE, FAILURE, FAILURE, FAILURE)
result = (FAILURE, FAILURE, FAILURE, instance)
self.resultsq.put(result)

def _process_results(self):
Expand All @@ -153,19 +154,23 @@ def _process_results(self):
except Empty:
break

instance_name = instance['name']
if status == FAILURE:
self.nb_failures += 1
if self.nb_failures >= self.pool_size - 1:
self.nb_failures = 0
self.restart_pool()

# clean failed job
self._clean_job(instance_name)
continue

self.report_as_service_check(sc_name, status, instance, msg)

# FIXME: 5.3, this has been deprecated before, get rid of events
# Don't create any event to avoid duplicates with server side
# service_checks
skip_event = _is_affirmative(instance.get('skip_event', False))
instance_name = instance['name']
if not skip_event:
self.warning("Using events for service checks is deprecated in favor of monitors and will be removed in future versions of the Datadog Agent.")
event = None
Expand Down Expand Up @@ -200,16 +205,21 @@ def _process_results(self):
if event is not None:
self.events.append(event)

# The job is finished here, this instance can be re processed
if instance_name in self.jobs_status:
del self.jobs_status[instance_name]

# if an exception happened, log it
if instance_name in self.jobs_results:
ret = self.jobs_results[instance_name].get()
if isinstance(ret, Exception):
self.log.exception("Exception in worker thread: {0}".format(ret))
del self.jobs_results[instance_name]
self._clean_job(instance_name)

def _clean_job(self, instance_name):
# The job is finished here, this instance can be re processed
if instance_name in self.jobs_status:
self.log.debug("Instance: %s cleaned from jobs status." % instance_name)
del self.jobs_status[instance_name]

# if an exception happened, log it
if instance_name in self.jobs_results:
self.log.debug("Instance: %s cleaned from jobs results." % instance_name)
ret = self.jobs_results[instance_name].get()
if isinstance(ret, Exception):
self.log.exception("Exception in worker thread: {0}".format(ret))
del self.jobs_results[instance_name]


def _check(self, instance):
Expand Down
24 changes: 6 additions & 18 deletions tests/checks/integration/test_snmp.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,38 +172,26 @@ def wait_for_async(self, method, attribute, count):
.format(attribute=attribute, total=len(getattr(self.check, attribute)), expected=count, seconds=i,
attr=getattr(self.check, attribute)))

def wait_for_async_attrib(self, attribute):
"""
Raise after
"""
i = 0
while i < RESULTS_TIMEOUT:
if getattr(self.check, attribute):
return
time.sleep(1)
i += 1
raise Exception("Attribute not created in time.")


def test_command_generator(self):
"""
Command generator's parameters should match init_config
"""
self.run_check(self.MIBS_FOLDER)
self.wait_for_async_attrib('cmd_generator')
cmdgen, _, _, _, _, _, _ = self.check._load_conf(self.SNMP_CONF)

# Test command generator MIB source
mib_folders = self.check.cmd_generator.snmpEngine.msgAndPduDsp\
mib_folders = cmdgen.snmpEngine.msgAndPduDsp\
.mibInstrumController.mibBuilder.getMibSources()
full_path_mib_folders = map(lambda f: f.fullPath(), mib_folders)

self.assertTrue("/etc/mibs" in full_path_mib_folders)
self.assertFalse(self.check.cmd_generator.ignoreNonIncreasingOid)
self.assertFalse(cmdgen.ignoreNonIncreasingOid)

# Test command generator `ignoreNonIncreasingOid` parameter
self.run_check(self.IGNORE_NONINCREASING_OID, force_reload=True)
self.wait_for_async_attrib('cmd_generator')
self.assertTrue(self.check.cmd_generator.ignoreNonIncreasingOid)
cmdgen, _, _, _, _, _, _ = self.check._load_conf(self.SNMP_CONF)
self.assertTrue(cmdgen.ignoreNonIncreasingOid)

def test_type_support(self):
"""
Expand Down Expand Up @@ -320,7 +308,7 @@ def test_invalid_metric(self):
self.run_check(config)

self.warnings = self.wait_for_async('get_warnings', 'warnings', 1)
self.assertWarning("Fail to collect metrics: No symbol IF-MIB::noIdeaWhatIAmDoingHere",
self.assertWarning("Fail to collect some metrics: No symbol IF-MIB::noIdeaWhatIAmDoingHere",
count=1, exact_match=False)

# # Test service check
Expand Down

0 comments on commit cf4e903

Please sign in to comment.