Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[process] prettify & cache AccessDenied failures #1595

Merged
merged 2 commits into from
May 19, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
335 changes: 194 additions & 141 deletions checks.d/process.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
# 3rd party
# stdlib
from collections import defaultdict
import time

# 3p
import psutil

# project
Expand All @@ -7,170 +11,213 @@
from utils.platform import Platform


DEFAULT_AD_CACHE_DURATION = 120
DEFAULT_PID_CACHE_DURATION = 120


ATTR_TO_METRIC = {
'thr': 'threads',
'cpu': 'cpu.pct',
'rss': 'mem.rss',
'vms': 'mem.vms',
'real': 'mem.real',
'open_fd': 'open_file_descriptors',
'r_count': 'ioread_count', # FIXME: namespace me correctly (6.x), io.r_count
'w_count': 'iowrite_count', # FIXME: namespace me correctly (6.x) io.r_bytes
'r_bytes': 'ioread_bytes', # FIXME: namespace me correctly (6.x) io.w_count
'w_bytes': 'iowrite_bytes', # FIXME: namespace me correctly (6.x) io.w_bytes
'ctx_swtch_vol': 'voluntary_ctx_switches', # FIXME: namespace me correctly (6.x), ctx_swt.voluntary
'ctx_swtch_invol': 'involuntary_ctx_switches', # FIXME: namespace me correctly (6.x), ctx_swt.involuntary
}


class ProcessCheck(AgentCheck):
def __init__(self, name, init_config, agentConfig, instances=None):
AgentCheck.__init__(self, name, init_config, agentConfig, instances)

# ad stands for access denied
# We cache the PIDs getting this error and don't iterate on them
# more often than `access_denied_cache_duration`
# This cache is for all PIDs so it's global
self.last_ad_cache_ts = 0
self.ad_cache = set()
self.access_denied_cache_duration = int(
init_config.get(
'access_denied_cache_duration',
DEFAULT_AD_CACHE_DURATION
)
)

SOURCE_TYPE_NAME = 'system'

PROCESS_GAUGE = (
'system.processes.threads',
'system.processes.cpu.pct',
'system.processes.mem.rss',
'system.processes.mem.vms',
'system.processes.mem.real',
'system.processes.open_file_descriptors',
'system.processes.ioread_count',
'system.processes.iowrite_count',
'system.processes.ioread_bytes',
'system.processes.iowrite_bytes',
'system.processes.voluntary_ctx_switches',
'system.processes.involuntary_ctx_switches',
)

def find_pids(self, search_string, exact_match, ignore_denied_access):
# By default cache the PID list for a while
# Sometimes it's not wanted b/c it can mess with no-data monitoring
# This cache is indexed per instance
self.last_pid_cache_ts = {}
self.pid_cache = {}
self.pid_cache_duration = int(
init_config.get(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't that be an instance parameter instead ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in most cases, this shouldn't be changed so I don't think it's worth the trouble for an instance parameter.

'pid_cache_duration',
DEFAULT_PID_CACHE_DURATION
)
)

def should_refresh_ad_cache(self):
now = time.time()
return now - self.last_ad_cache_ts > self.access_denied_cache_duration

def should_refresh_pid_cache(self, name):
now = time.time()
return now - self.last_pid_cache_ts.get(name, 0) > self.pid_cache_duration

def find_pids(self, name, search_string, exact_match, ignore_ad=True,
refresh_ad_cache=True):
"""
Create a set of pids of selected processes.
Search for search_string
"""
found_process_list = []
if not self.should_refresh_pid_cache(name):
return self.pid_cache[name]

ad_error_logger = self.log.debug
if not ignore_ad:
ad_error_logger = self.log.error

refresh_ad_cache = self.should_refresh_ad_cache()

matching_pids = set()

for proc in psutil.process_iter():
# Skip access denied processes
if not refresh_ad_cache and proc.pid in self.ad_cache:
continue

found = False
for string in search_string:
if exact_match:
try:
try:
# FIXME 6.x: All has been deprecated from the doc, should be removed
if string == 'All':
found = True
if exact_match:
if proc.name() == string:
found = True
except psutil.NoSuchProcess:
self.log.warning('Process disappeared while scanning')
except psutil.AccessDenied, e:
self.log.error('Access denied to %s process' % string)
self.log.error('Error: %s' % e)
if not ignore_denied_access:
raise
else:
cmdline = proc.cmdline()
if string in ' '.join(cmdline):
found = True
except psutil.NoSuchProcess:
self.log.warning('Process disappeared while scanning')
except psutil.AccessDenied, e:
ad_error_logger('Access denied to process with PID %s', proc.pid)
ad_error_logger('Error: %s', e)
if refresh_ad_cache:
self.ad_cache.add(proc.pid)
if not ignore_ad:
raise
else:
if not found:
try:
cmdline = proc.cmdline()
if string in ' '.join(cmdline):
found = True
except psutil.NoSuchProcess:
self.warning('Process disappeared while scanning')
except psutil.AccessDenied, e:
self.log.error('Access denied to %s process' % string)
self.log.error('Error: %s' % e)
if not ignore_denied_access:
raise

if found or string == 'All':
found_process_list.append(proc.pid)

return set(found_process_list)

def get_process_metrics(self, pids, cpu_check_interval, ignore_denied_access=True):

# initialize process metrics
# process metrics available for all versions of psutil
rss = 0
vms = 0
cpu = 0
thr = 0
voluntary_ctx_switches = 0
involuntary_ctx_switches = 0

# process metrics available for psutil versions 0.6.0 and later
if Platform.is_win32() or Platform.is_solaris():
real = None
else:
real = 0
if refresh_ad_cache:
self.ad_cache.discard(proc.pid)
if found:
matching_pids.add(proc.pid)
break

if Platform.is_unix():
open_file_descriptors = 0
self.pid_cache[name] = matching_pids
self.last_pid_cache_ts[name] = time.time()
return matching_pids

def psutil_wrapper(self, process, method, accessors, *args, **kwargs):
"""
A psutil wrapper that is calling
* psutil.method(*args, **kwargs) and returns the result
OR
* psutil.method(*args, **kwargs).accessor[i] for each accessors given in
a list, the result being indexed in a dictionary by the accessor name
"""

if accessors is None:
result = None
else:
open_file_descriptors = None
result = {}

# Ban certain method that we know fail
if method == 'memory_info_ex'\
and (Platform.is_win32() or Platform.is_solaris()):
return result
elif method == 'num_fds' and not Platform.is_unix():
return result

try:
res = getattr(process, method)(*args, **kwargs)
if accessors is None:
result = res
else:
for acc in accessors:
try:
result[acc] = getattr(res, acc)
except AttributeError:
self.log.debug("psutil.%s().%s attribute does not exist", method, acc)
except (NotImplementedError, AttributeError):
self.log.debug("psutil method %s not implemented", method)
except psutil.AccessDenied:
self.log.debug("psutil was denied acccess for method %s", method)
except psutil.NoSuchProcess:
self.warning("Process {0} disappeared while scanning".format(process.pid))

return result


# process I/O counters (agent might not have permission to access)
read_count = 0
write_count = 0
read_bytes = 0
write_bytes = 0
def get_process_state(self, name, pids, cpu_check_interval):
st = defaultdict(list)

got_denied = False
for pid in pids:
st['pids'].append(pid)

for pid in set(pids):
try:
p = psutil.Process(pid)
try:
if real is not None:
mem = p.memory_info_ex()
real += mem.rss - mem.shared
else:
mem = p.memory_info()

if Platform.is_unix():
ctx_switches = p.num_ctx_switches()
voluntary_ctx_switches += ctx_switches.voluntary
involuntary_ctx_switches += ctx_switches.involuntary

rss += mem.rss
vms += mem.vms
thr += p.num_threads()
cpu += p.cpu_percent(cpu_check_interval)

if open_file_descriptors is not None:
open_file_descriptors += p.num_fds()

except NotImplementedError:
# Handle old Kernels which don't provide this info.
voluntary_ctx_switches = None
involuntary_ctx_switches = None
except AttributeError:
self.log.debug("process attribute not supported on this platform")
except psutil.AccessDenied:
got_denied = True

# user agent might not have permission to call io_counters()
# user agent might have access to io counters for some processes and not others
if read_count is not None:
try:
io_counters = p.io_counters()
read_count += io_counters.read_count
write_count += io_counters.write_count
read_bytes += io_counters.read_bytes
write_bytes += io_counters.write_bytes
except AttributeError:
self.log.debug("process attribute not supported on this platform")
except psutil.AccessDenied:
log_func = self.log.debug if ignore_denied_access else self.log.info
log_func('dd-agent user does not have access \
to I/O counters for process %d: %s' % (pid, p.name()))
read_count = None
write_count = None
read_bytes = None
write_bytes = None

# Skip processes dead in the meantime
except psutil.NoSuchProcess:
self.warning('Process %s disappeared while scanning' % pid)
# reset the PID cache now, something chaned
self.last_pid_cache_ts[name] = 0

if got_denied and not ignore_denied_access:
self.warning('The Datadog Agent was denied access '
'when trying to get the number of file descriptors')
meminfo = self.psutil_wrapper(p, 'memory_info', ['rss', 'vms'])
st['rss'].append(meminfo.get('rss'))
st['vms'].append(meminfo.get('vms'))

# Memory values are in Byte
return (thr, cpu, rss, vms, real, open_file_descriptors,
read_count, write_count, read_bytes, write_bytes,
voluntary_ctx_switches, involuntary_ctx_switches)
# will fail on win32 and solaris
shared_mem = self.psutil_wrapper(p, 'memory_info_ex', ['shared']).get('shared')
if shared_mem is not None and meminfo.get('rss') is not None:
st['real'].append(meminfo['rss'] - shared_mem)
else:
st['real'].append(None)

ctxinfo = self.psutil_wrapper(p, 'num_ctx_switches', ['voluntary', 'involuntary'])
st['ctx_swtch_vol'].append(ctxinfo.get('voluntary'))
st['ctx_swtch_invol'].append(ctxinfo.get('involuntary'))

st['thr'].append(self.psutil_wrapper(p, 'num_threads', None))
st['cpu'].append(self.psutil_wrapper(p, 'cpu_percent', None, cpu_check_interval))

st['open_fd'].append(self.psutil_wrapper(p, 'num_fds', None))

ioinfo = self.psutil_wrapper(p, 'io_counters', ['read_count', 'write_count', 'read_bytes', 'write_bytes'])
st['r_count'].append(ioinfo.get('read_count'))
st['w_count'].append(ioinfo.get('write_count'))
st['r_bytes'].append(ioinfo.get('read_bytes'))
st['w_bytes'].append(ioinfo.get('write_bytes'))

return st

def check(self, instance):
name = instance.get('name', None)
tags = instance.get('tags', [])
exact_match = _is_affirmative(instance.get('exact_match', True))
search_string = instance.get('search_string', None)
ignore_denied_access = _is_affirmative(instance.get('ignore_denied_access', True))
ignore_ad = _is_affirmative(instance.get('ignore_denied_access', True))
cpu_check_interval = instance.get('cpu_check_interval', 0.1)

if not isinstance(search_string, list):
raise KeyError('"search_string" parameter should be a list')

# FIXME 6.x remove me
if "All" in search_string:
self.warning('Deprecated: Having "All" in your search_string will'
'greatly reduce the performance of the check and '
Expand All @@ -186,21 +233,27 @@ def check(self, instance):
self.warning("cpu_check_interval must be a number. Defaulting to 0.1")
cpu_check_interval = 0.1

pids = self.find_pids(search_string,
exact_match,
ignore_denied_access)
tags.extend(['process_name:%s' % name, name])
pids = self.find_pids(
name,
search_string,
exact_match,
ignore_ad=ignore_ad
)

self.log.debug('ProcessCheck: process %s analysed' % name)
proc_state = self.get_process_state(name, pids, cpu_check_interval)

self.gauge('system.processes.number', len(pids), tags=tags)
# FIXME 6.x remove the `name` tag
tags.extend(['process_name:%s' % name, name])

metrics = dict(zip(ProcessCheck.PROCESS_GAUGE, self.get_process_metrics(pids,
cpu_check_interval, ignore_denied_access)))
self.log.debug('ProcessCheck: process %s analysed', name)
self.gauge('system.processes.number', len(pids), tags=tags)

for metric, value in metrics.iteritems():
if value is not None:
self.gauge(metric, value, tags=tags)
for attr, mname in ATTR_TO_METRIC.iteritems():
vals = [x for x in proc_state[attr] if x is not None]
# skip []
if vals:
# FIXME 6.x: change this prefix?
self.gauge('system.processes.%s' % mname, sum(vals), tags=tags)

self._process_service_check(name, len(pids), instance.get('thresholds', None))

Expand Down
Loading