Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow checks to store tagged metrics #79

Merged
merged 4 commits into from
Jun 19, 2012
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 34 additions & 28 deletions checks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,17 @@ def filter(self, record):
class Check(object):
"""
(Abstract) class for all checks with the ability to:
* store 1 (and only 1) sample for gauges per metric/tag combination
* compute rates for counters
* only log error messages once (instead of each time they occur)

"""
def __init__(self, logger):
# where to store samples, indexed by metric_name
# metric_name: [(ts, value), (ts, value)]
# metric_name: {("sorted", "tags"): [(ts, value), (ts, value)],
# tuple(tags) are stored as a key since lists are not hashable
# None: [(ts, value), (ts, value)]}
# untagged values are indexed by None
self._sample_store = {}
self._counters = {} # metric_name: bool
self.logger = logger
Expand Down Expand Up @@ -92,7 +97,7 @@ def counter(self, metric):
ACHTUNG: Resets previous values associated with this metric.
"""
self._counters[metric] = True
self._sample_store[metric] = []
self._sample_store[metric] = {}

def is_counter(self, metric):
"Is this metric a counter?"
Expand All @@ -103,7 +108,7 @@ def gauge(self, metric):
Treats the metric as a gauge, i.e. keep the data as is
ACHTUNG: Resets previous values associated with this metric.
"""
self._sample_store[metric] = []
self._sample_store[metric] = {}

def is_metric(self, metric):
return metric in self._sample_store
Expand All @@ -116,8 +121,9 @@ def get_metric_names(self):
"Get all metric names"
return self._sample_store.keys()

def save_sample(self, metric, value, timestamp=None):
"""Save a simple sample, evict old values if needed"""
def save_sample(self, metric, value, timestamp=None, tags=None):
"""Save a simple sample, evict old values if needed
"""
if timestamp is None:
timestamp = time.time()
if metric not in self._sample_store:
Expand All @@ -127,21 +133,27 @@ def save_sample(self, metric, value, timestamp=None):
except ValueError, ve:
raise NaN(ve)

# sort tags
if tags is not None:
tags.sort()
tags = tuple(tags)

# Data eviction rules
if self.is_gauge(metric):
self._sample_store[metric] = [(timestamp, value)]
self._sample_store[metric][tags] = ((timestamp, value), )
elif self.is_counter(metric):
if len(self._sample_store[metric]) == 0:
self._sample_store[metric] = [(timestamp, value)]
if self._sample_store[metric].get(tags) is None:
self._sample_store[metric][tags] = [(timestamp, value)]
else:
self._sample_store[metric] = self._sample_store[metric][-1:] + [(timestamp, value)]
self._sample_store[metric][tags] = self._sample_store[metric][tags][-1:] + [(timestamp, value)]
else:
raise CheckException("%s must be either gauge or counter, skipping sample at %s" % (metric, time.ctime(timestamp)))

if self.is_gauge(metric):
assert len(self._sample_store[metric]) in (0, 1), self._sample_store[metric]
# store[metric][tags] = (ts, val) - only 1 value allowd
assert len(self._sample_store[metric][tags]) == 1, self._sample_store[metric]
elif self.is_counter(metric):
assert len(self._sample_store[metric]) in (0, 1, 2), self._sample_store[metric]
assert len(self._sample_store[metric][tags]) in (1, 2), self._sample_store[metric]

@classmethod
def _rate(cls, sample1, sample2):
Expand All @@ -163,28 +175,28 @@ def _rate(cls, sample1, sample2):
except Exception, e:
raise NaN(e)

def get_sample_with_timestamp(self, metric):
def get_sample_with_timestamp(self, metric, tags=None):
"Get (timestamp-epoch-style, value)"
# Never seen this metric
if metric not in self._sample_store:
raise UnknownValue()

# Not enough value to compute rate
elif self.is_counter(metric) and len(self._sample_store[metric]) < 2:
elif self.is_counter(metric) and len(self._sample_store[metric][tags]) < 2:
raise UnknownValue()

elif self.is_counter(metric) and len(self._sample_store[metric]) >= 2:
return self._rate(self._sample_store[metric][-2], self._sample_store[metric][-1])
elif self.is_counter(metric) and len(self._sample_store[metric][tags]) >= 2:
return self._rate(self._sample_store[metric][tags][-2], self._sample_store[metric][tags][-1])

elif self.is_gauge(metric) and len(self._sample_store[metric]) >= 1:
return self._sample_store[metric][-1]
elif self.is_gauge(metric) and len(self._sample_store[metric][tags]) >= 1:
return self._sample_store[metric][tags][-1]

else:
raise UnknownValue()

def get_sample(self, metric):
def get_sample(self, metric, tags=None):
"Return the last value for that metric"
x = self.get_sample_with_timestamp(metric)
x = self.get_sample_with_timestamp(metric, tags)
assert type(x) == types.TupleType and len(x) == 2, x
return x[1]

Expand All @@ -209,21 +221,15 @@ def get_samples(self):
pass
return values

def get_metadata(self):
"""Return a dictionary of key-value pairs with metadata
How these metadata are interpreted and processed is not defined here
"""
return {}

def get_metrics(self):
"""This is the new format to send metrics backs
"""
metrics = []
for m in self._sample_store:
try:
ts, val = self.get_sample_with_timestamp(m)
# FIXME alq - no metadata yet
metrics.append((m, int(ts), val, {}))
for t in self._sample_store[m]:
ts, val = self.get_sample_with_timestamp(m, t)
metrics.append((m, int(ts), val, {"tags": list(t)}))
except:
pass
return metrics
Expand Down
19 changes: 19 additions & 0 deletions tests/test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,25 @@ def test_counter(self):
self.c.save_sample("test-counter", -2.0, 3.0)
self.assertRaises(UnknownValue, self.c.get_sample_with_timestamp, "test-counter")

def test_tags(self):
# Test metric tagging
now = int(time.time())
# Tag metrics
self.c.save_sample("test-counter", 1.0, 1.0, tags = ["tag1", "tag2"])
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New syntax to tag metrics.

self.c.save_sample("test-counter", 2.0, 2.0, tags = ["tag1", "tag2"])
# Only 1 point recording for this combination of tags, won't be sent
self.c.save_sample("test-counter", 3.0, 3.0, tags = ["tag1", "tag3"])
self.c.save_sample("test-metric", 3.0, now, tags = ["tag3", "tag4"])
# This is a different combination of tags
self.c.save_sample("test-metric", 3.0, now, tags = ["tag5", "tag3"])
results = self.c.get_metrics()
results.sort()
self.assertEquals(results,
[("test-counter", 2.0, 1.0, {"tags": ["tag1", "tag2"]}),
("test-metric", now, 3.0, {"tags": ["tag3", "tag4"]}),
("test-metric", now, 3.0, {"tags": ["tag3", "tag5"]}),
])

def test_samples(self):
self.assertEquals(self.c.get_samples(), {})
self.c.save_sample("test-metric", 1.0, 0.0) # value, ts
Expand Down