Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][wmi] gracefully timeout WMI queries #2185

Merged
merged 4 commits into from
Jan 14, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions checks.d/wmi_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,11 @@ def _get_tag_query_tag(self, sampler, wmi_obj, tag_query):
self._format_tag_query(sampler, wmi_obj, tag_query)

# Create a specific sampler
connection = sampler.get_connection()
tag_query_sampler = WMISampler(
self.log,
target_class, [target_property],
filters=filters,
**connection
**sampler.connection
)

tag_query_sampler.sample()
Expand Down
125 changes: 86 additions & 39 deletions checks/libs/wmi/sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
"""

# stdlib
from collections import defaultdict
from contextlib import contextmanager
from copy import deepcopy
from itertools import izip
import pywintypes
Expand All @@ -28,6 +30,7 @@

# project
from checks.libs.wmi.counter_type import get_calculator, get_raw, UndefinedCalculator
from utils.timeout import timeout, TimeoutException


class CaseInsensitiveDict(dict):
Expand All @@ -48,11 +51,12 @@ class WMISampler(object):
"""
WMI Sampler.
"""
# Shared resources
_wmi_locators = {}
_wmi_connections = {}
_wmi_connections = defaultdict(set)

def __init__(self, logger, class_name, property_names, filters="", host="localhost",
namespace="root\\cimv2", username="", password=""):
namespace="root\\cimv2", username="", password="", timeout_duration=10):
self.logger = logger

# Connection information
Expand All @@ -63,10 +67,11 @@ def __init__(self, logger, class_name, property_names, filters="", host="localho

self.is_raw_perf_class = "_PERFRAWDATA_" in class_name.upper()

# WMI class, properties, filters and counter types
# Include required properties for making calculations with raw
# performance counters:
# https://msdn.microsoft.com/en-us/library/aa394299(v=vs.85).aspx
# Sampler settings
# WMI class, properties, filters and counter types
# Include required properties for making calculations with raw
# performance counters:
# https://msdn.microsoft.com/en-us/library/aa394299(v=vs.85).aspx
if self.is_raw_perf_class:
property_names.extend([
"Timestamp_Sys100NS",
Expand All @@ -84,14 +89,20 @@ def __init__(self, logger, class_name, property_names, filters="", host="localho
self.filters = filters
self._formatted_filters = None
self.property_counter_types = None
self._timeout_duration = timeout_duration
self._query = timeout(timeout_duration)(self._query)

# Samples
self.current_sample = None
self.previous_sample = None

def get_connection(self):
# Sampling state
self._sampling = False

@property
def connection(self):
"""
A Getter to retrieve the sampler connection information.
A property to retrieve the sampler connection information.
"""
return {
'host': self.host,
Expand All @@ -100,6 +111,17 @@ def get_connection(self):
'password': self.password,
}

@property
def connection_key(self):
"""
Return an index key used to cache the sampler connection.
"""
return "{host}:{namespace}:{username}".format(
host=self.host,
namespace=self.namespace,
username=self.username
)

@property
def formatted_filters(self):
"""
Expand All @@ -114,25 +136,43 @@ def sample(self):
"""
Compute new samples.
"""
if self.is_raw_perf_class and not self.previous_sample:
self.logger.debug(u"Querying for initial sample for raw performance counter.")
self.current_sample = self._query()
self.previous_sample = self.current_sample
self._sampling = True

self.current_sample = self._query()
try:
if self.is_raw_perf_class and not self.previous_sample:
self.logger.debug(u"Querying for initial sample for raw performance counter.")
self.current_sample = self._query()

self.logger.debug(u"Sample: {0}".format(self.current_sample))
self.previous_sample = self.current_sample
self.current_sample = self._query()
except TimeoutException:
self.logger.warning(
u"Query timeout after {timeout}s".format(
timeout=self._timeout_duration
)
)
else:
self._sampling = False
self.logger.debug(u"Sample: {0}".format(self.current_sample))

def __len__(self):
"""
Return the number of WMI Objects in the current sample.
"""
# No data is returned while sampling
if self._sampling:
return 0

return len(self.current_sample)

def __iter__(self):
"""
Iterate on the current sample's WMI Objects and format the property values.
"""
# No data is returned while sampling
if self._sampling:
return

if self.is_raw_perf_class:
# Format required
for previous_wmi_object, current_wmi_object in \
Expand Down Expand Up @@ -212,17 +252,19 @@ def _format_property_values(self, previous, current):

return formatted_wmi_object

def _get_connection(self):
@contextmanager
def get_connection(self):
"""
Create and cache WMI connections.
Return an existing, available WMI connection or create a new one.

Release, i.e. mark as available at exit.
"""
connection_key = "{host}:{namespace}:{username}".format(
host=self.host,
namespace=self.namespace,
username=self.username
)
connection = None

# Fetch an existing connection or create a new one
available_connections = self._wmi_connections[self.connection_key]

if connection_key in self._wmi_connections:
if available_connections:
self.logger.debug(
u"Using cached connection "
u"(host={host}, namespace={namespace}, username={username}).".format(
Expand All @@ -231,24 +273,27 @@ def _get_connection(self):
username=self.username
)
)
return self._wmi_connections[connection_key]

self.logger.debug(
u"Connecting to WMI server "
u"(host={host}, namespace={namespace}, username={username}).".format(
host=self.host,
namespace=self.namespace,
username=self.username
connection = available_connections.pop()
else:
self.logger.debug(
u"Connecting to WMI server "
u"(host={host}, namespace={namespace}, username={username}).".format(
host=self.host,
namespace=self.namespace,
username=self.username
)
)
locator = Dispatch("WbemScripting.SWbemLocator")
connection = locator.ConnectServer(
self.host, self.namespace,
self.username, self.password
)
)

locator = Dispatch("WbemScripting.SWbemLocator")
self._wmi_locators[connection_key] = locator

connection = locator.ConnectServer(self.host, self.namespace, self.username, self.password)
self._wmi_connections[connection_key] = connection
# Yield the connection
yield connection

return connection
# Release it
self._wmi_connections[self.connection_key].add(connection)

@staticmethod
def _format_filter(filters):
Expand Down Expand Up @@ -282,7 +327,7 @@ def _query(self):
"""
Query WMI using WMI Query Language (WQL) & parse the results.

Returns: List of WMI objects
Returns: List of WMI objects or `TimeoutException`.
"""
formated_property_names = ",".join(self.property_names)
wql = "Select {property_names} from {class_name}{filters}".format(
Expand All @@ -307,7 +352,9 @@ def _query(self):
self.property_counter_types = CaseInsensitiveDict()
query_flags |= flag_use_amended_qualifiers

raw_results = self._get_connection().ExecQuery(wql, "WQL", query_flags)
with self.get_connection() as connection:
raw_results = connection.ExecQuery(wql, "WQL", query_flags)

results = self._parse_results(raw_results, includes_qualifiers=includes_qualifiers)

except pywintypes.com_error:
Expand Down
30 changes: 14 additions & 16 deletions ci/common.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def self.check_url(url)
Timeout.timeout(0.5) do
begin
r = HTTParty.get(url)
return (200...300).include? r.code
return (200...300).cover? r.code
rescue Errno::ECONNREFUSED, Errno::EHOSTUNREACH
return false
end
Expand Down Expand Up @@ -93,11 +93,8 @@ def self.for(smth, max_timeout = DEFAULT_TIMEOUT)
n += 1
sleep 0.25
end
if status
puts 'Found!'
else
fail "Still not up after #{max_timeout}s"
end
fail "Still not up after #{max_timeout}s" unless status
puts 'Found!'
status
end
end
Expand Down Expand Up @@ -180,17 +177,18 @@ def self.for(smth, max_timeout = DEFAULT_TIMEOUT)
task :run_tests, :flavor do |t, attr|
flavors = attr[:flavor]
filter = ENV['NOSE_FILTER'] || '1'
if flavors.include?('default') || flavors.include?('checks_mock')
nose = "(not requires) and #{filter}"
else
nose = "(requires in ['#{flavors.join("','")}']) and #{filter}"
end
if flavors.include?('default') || flavors.include?('core_integration')
tests_directory = 'tests/core'
else
tests_directory = 'tests/checks'
end

nose = if flavors.include?('default') || flavors.include?('checks_mock')
"(not requires) and #{filter}"
else
"(requires in ['#{flavors.join("','")}']) and #{filter}"
end

tests_directory = if flavors.include?('default') || flavors.include?('core_integration')
'tests/core'
else
'tests/checks'
end
# Rake on Windows doesn't support setting the var at the beginning of the
# command
path = ''
Expand Down
2 changes: 1 addition & 1 deletion ci/etcd.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def etcd_rootdir
# Downloads:
# https://github.com/coreos/etcd/releases/download/v#{etcd_version}/etcd-v#{etcd_version}-darwin-amd64.zip
# https://github.com/coreos/etcd/releases/download/v#{etcd_version}/etcd-v#{etcd_version}-linux-amd64.tar.gz
if `uname -s`.strip.downcase == 'darwin'
if 'darwin'.casecmp(`uname -s`.strip)
sh %(curl -s -L -o $VOLATILE_DIR/etcd.zip\
https://s3.amazonaws.com/dd-agent-tarball-mirror/etcd-v#{etcd_version}-darwin-amd64.zip)
sh %(mkdir -p #{etcd_rootdir})
Expand Down
10 changes: 5 additions & 5 deletions ci/haproxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ def haproxy_rootdir
sh %(tar zxf $VOLATILE_DIR/haproxy-#{haproxy_version}.tar.gz\
-C $VOLATILE_DIR/haproxy --strip-components=1)

if `uname`.strip == 'Darwin'
target = 'mac'
else
target = 'linux2628'
end
target = if `uname`.strip == 'Darwin'
'mac'
else
'linux2628'
end
sh %(cd $VOLATILE_DIR/haproxy\
&& make -j $CONCURRENCY TARGET=#{target} USE_PCRE=1 USE_OPENSSL=1 USE_ZLIB=1)
sh %(cp $VOLATILE_DIR/haproxy/haproxy #{haproxy_rootdir})
Expand Down
10 changes: 5 additions & 5 deletions ci/mongo.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ def mongo_rootdir
unless Dir.exist? File.expand_path(mongo_rootdir)
# Downloads
# https://fastdl.mongodb.org/linux/mongodb-#{target}-x86_64-#{mongo_version}.tgz
if `uname`.strip == 'Darwin'
target = 'osx'
else
target = 'linux'
end
target = if `uname`.strip == 'Darwin'
'osx'
else
'linux'
end
sh %(curl -s -L\
-o $VOLATILE_DIR/mongodb-#{target}-x86_64-#{mongo_version}.tgz\
https://s3.amazonaws.com/dd-agent-tarball-mirror/mongodb-#{target}-x86_64-#{mongo_version}.tgz)
Expand Down
14 changes: 7 additions & 7 deletions ci/resources/cache.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@
class Cache
MSGS = {
config_missing: 'Worker S3 config missing: %s'
}
}.freeze

VALIDATE = {
bucket: 'bucket name',
access_key_id: 'access key id',
secret_access_key: 'secret access key'
}
}.freeze

CURL_FORMAT = <<-EOF
CURL_FORMAT = "<<-EOF
time_namelookup: %{time_namelookup} s
time_connect: %{time_connect} s
time_appconnect: %{time_appconnect} s
Expand All @@ -50,7 +50,7 @@ class Cache
url_effective: %{url_effective}
----------
time_total: %{time_total} s
EOF
EOF".freeze

KeyPair = Struct.new(:id, :secret)

Expand All @@ -60,9 +60,9 @@ def hostname
end
end

CASHER_URL = 'https://raw.githubusercontent.com/DataDog/casher/%s/bin/casher'
USE_RUBY = '1.9.3'
BIN_PATH = '$DD_CASHER_DIR/bin/casher'
CASHER_URL = 'https://raw.githubusercontent.com/DataDog/casher/%s/bin/casher'.freeze
USE_RUBY = '1.9.3'.freeze
BIN_PATH = '$DD_CASHER_DIR/bin/casher'.freeze

attr_reader :data, :slug, :start, :msgs
attr_accessor :directories
Expand Down
4 changes: 2 additions & 2 deletions tests/checks/mock/test_wmi_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ def test_wmi_connection(self):
wmi_sampler = self.check.wmi_samplers["myhost:some/namespace:Win32_OperatingSystem"]

# Connection was established with the right parameters
self.assertWMIConnWith(wmi_sampler, "myhost")
self.assertWMIConnWith(wmi_sampler, "some/namespace")
self.assertWMIConn(wmi_sampler, "myhost")
self.assertWMIConn(wmi_sampler, "some/namespace")

def test_wmi_sampler_initialization(self):
"""
Expand Down
Loading