Skip to content

Commit

Permalink
Add distribution context manager and decorator
Browse files Browse the repository at this point in the history
A decorator or context manager that will measure the distribution of a
function's/context's run time using custom metric DISTRIBUTION.

E.g. of usage follows:

    @statsd.distributed('user.query.time', sample_rate=0.5)
    def get_user(user_id):
        # Do what you need to ...
        pass

    # Is equivalent to ...
    with statsd.distributed('user.query.time', sample_rate=0.5):
        # Do what you need to ...
        pass

    # Is equivalent to ...
    start = time.time()
    try:
        get_user(user_id)
    finally:
        statsd.distribution('user.query.time', time.time() - start)

Closes #563
  • Loading branch information
dnlserrano committed Jun 27, 2020
1 parent 86f397c commit 3e0d3c7
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 2 deletions.
33 changes: 32 additions & 1 deletion datadog/dogstatsd/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
from threading import Lock

# datadog
from datadog.dogstatsd.context import TimedContextManagerDecorator
from datadog.dogstatsd.context import (
TimedContextManagerDecorator,
DistributedContextManagerDecorator,
)
from datadog.dogstatsd.route import get_default_route
from datadog.util.compat import text
from datadog.util.format import normalize_tags
Expand Down Expand Up @@ -349,6 +352,34 @@ def get_user(user_id):
"""
return TimedContextManagerDecorator(self, metric, tags, sample_rate, use_ms)

def distributed(self, metric=None, tags=None, sample_rate=None, use_ms=None):
"""
A decorator or context manager that will measure the distribution of a
function's/context's run time using custom metric distribution.
Optionally specify a list of tags or a sample rate. If the metric is not
defined as a decorator, the module name and function name will be used.
The metric is required as a context manager.
::
@statsd.distributed('user.query.time', sample_rate=0.5)
def get_user(user_id):
# Do what you need to ...
pass
# Is equivalent to ...
with statsd.distributed('user.query.time', sample_rate=0.5):
# Do what you need to ...
pass
# Is equivalent to ...
start = time.time()
try:
get_user(user_id)
finally:
statsd.distribution('user.query.time', time.time() - start)
"""
return DistributedContextManagerDecorator(self, metric, tags, sample_rate, use_ms)

def set(self, metric, value, tags=None, sample_rate=None):
"""
Sample a set value.
Expand Down
18 changes: 17 additions & 1 deletion datadog/dogstatsd/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# stdlib
from functools import wraps
from time import time
import sys

# datadog
from datadog.dogstatsd.context_async import _get_wrapped_co
Expand All @@ -17,6 +18,7 @@ class TimedContextManagerDecorator(object):
"""
def __init__(self, statsd, metric=None, tags=None, sample_rate=1, use_ms=None):
self.statsd = statsd
self.timing_func = statsd.timing
self.metric = metric
self.tags = tags
self.sample_rate = sample_rate
Expand Down Expand Up @@ -60,11 +62,25 @@ def _send(self, start):
elapsed = time() - start
use_ms = self.use_ms if self.use_ms is not None else self.statsd.use_ms
elapsed = int(round(1000 * elapsed)) if use_ms else elapsed
self.statsd.timing(self.metric, elapsed, self.tags, self.sample_rate)
self.timing_func(self.metric, elapsed, self.tags, self.sample_rate)
self.elapsed = elapsed

def start(self):
self.__enter__()

def stop(self):
self.__exit__(None, None, None)


class DistributedContextManagerDecorator(TimedContextManagerDecorator):
"""
A context manager and a decorator which will report the elapsed time in
the context OR in a function call using the custom distribution metric.
"""
def __init__(self, statsd, metric=None, tags=None, sample_rate=1, use_ms=None):
if sys.version_info >= (3, 0):
super().__init__(statsd, metric, tags, sample_rate, use_ms)
else:
super(DistributedContextManagerDecorator, self).__init__(statsd, metric, tags, sample_rate, use_ms)

self.timing_func = statsd.distribution
44 changes: 44 additions & 0 deletions tests/unit/dogstatsd/test_statsd.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,50 @@ def test_socket_overflown(self):
c = [call("Socket send would block: Socket error, dropping the packet")]
mock_log.warning.assert_has_calls(c * 2)

def test_distributed(self):
"""
Measure the distribution of a function's run time using distribution custom metric.
"""
# In seconds
@self.statsd.distributed('distributed.test')
def func(a, b, c=1, d=1):
"""docstring"""
time.sleep(0.5)
return (a, b, c, d)

assert_equal('func', func.__name__)
assert_equal('docstring', func.__doc__)

result = func(1, 2, d=3)
# Assert it handles args and kwargs correctly.
assert_equal(result, (1, 2, 1, 3))

packet = self.recv(2).split("\n")[0] # ignore telemetry packet
name_value, type_ = packet.split('|')
name, value = name_value.split(':')

assert_equal('d', type_)
assert_equal('distributed.test', name)
self.assert_almost_equal(0.5, float(value), 0.1)

# Repeat, force timer value in milliseconds
@self.statsd.distributed('distributed.test', use_ms=True)
def func(a, b, c=1, d=1):
"""docstring"""
time.sleep(0.5)
return (a, b, c, d)

func(1, 2, d=3)

packet = self.recv(2).split("\n")[0] # ignore telemetry packet
name_value, type_ = packet.split('|')
name, value = name_value.split(':')

assert_equal('d', type_)
assert_equal('distributed.test', name)
self.assert_almost_equal(500, float(value), 100)


def test_timed(self):
"""
Measure the distribution of a function's run time.
Expand Down

0 comments on commit 3e0d3c7

Please sign in to comment.