diff --git a/ddtrace/api.py b/ddtrace/api.py index 1db81454b5f..293e4a3d2e0 100644 --- a/ddtrace/api.py +++ b/ddtrace/api.py @@ -12,28 +12,59 @@ TRACE_COUNT_HEADER = 'X-Datadog-Trace-Count' +_VERSIONS = {'v0.4': {'traces': '/v0.4/traces', + 'services': '/v0.4/services', + 'compatibility_mode': False, + 'fallback': 'v0.3'}, + 'v0.3': {'traces': '/v0.3/traces', + 'services': '/v0.3/services', + 'compatibility_mode': False, + 'fallback': 'v0.2'}, + 'v0.2': {'traces': '/v0.2/traces', + 'services': '/v0.2/services', + 'compatibility_mode': True, + 'fallback': None}} + class API(object): """ Send data to the trace agent using the HTTP protocol and JSON format """ - def __init__(self, hostname, port, headers=None, encoder=None): + def __init__(self, hostname, port, headers=None, encoder=None, priority_sampling=False): self.hostname = hostname self.port = port - self._traces = '/v0.3/traces' - self._services = '/v0.3/services' - self._compatibility_mode = False - self._encoder = encoder or get_encoder() - # overwrite the Content-type with the one chosen in the Encoder self._headers = headers or {} + self._version = None + + if priority_sampling: + self._set_version('v0.4', encoder=encoder) + else: + self._set_version('v0.3', encoder=encoder) + self._headers.update({ - 'Content-Type': self._encoder.content_type, 'Datadog-Meta-Lang': 'python', 'Datadog-Meta-Lang-Version': PYTHON_VERSION, 'Datadog-Meta-Lang-Interpreter': PYTHON_INTERPRETER, 'Datadog-Meta-Tracer-Version': ddtrace.__version__, }) + def _set_version(self, version, encoder=None): + if version not in _VERSIONS: + version = 'v0.2' + if version == self._version: + return + self._version = version + self._traces = _VERSIONS[version]['traces'] + self._services = _VERSIONS[version]['services'] + self._fallback = _VERSIONS[version]['fallback'] + self._compatibility_mode = _VERSIONS[version]['compatibility_mode'] + if self._compatibility_mode: + self._encoder = JSONEncoder() + else: + self._encoder = encoder or get_encoder() + # overwrite the Content-type with the one chosen in the Encoder + self._headers.update({'Content-Type': self._encoder.content_type}) + def _downgrade(self): """ Downgrades the used encoder and API level. This method must fallback to a safe @@ -41,11 +72,7 @@ def _downgrade(self): ensures that the compatibility mode is activated so that the downgrade will be executed only once. """ - self._compatibility_mode = True - self._traces = '/v0.2/traces' - self._services = '/v0.2/services' - self._encoder = JSONEncoder() - self._headers.update({'Content-Type': self._encoder.content_type}) + self._set_version(self._fallback) def send_traces(self, traces): if not traces: @@ -55,8 +82,8 @@ def send_traces(self, traces): response = self._put(self._traces, data, len(traces)) # the API endpoint is not available so we should downgrade the connection and re-try the call - if response.status in [404, 415] and self._compatibility_mode is False: - log.debug('calling the endpoint "%s" but received %s; downgrading the API', self._traces, response.status) + if response.status in [404, 415] and self._fallback: + log.debug('calling endpoint "%s" but received %s; downgrading API', self._traces, response.status) self._downgrade() return self.send_traces(traces) @@ -73,8 +100,8 @@ def send_services(self, services): response = self._put(self._services, data) # the API endpoint is not available so we should downgrade the connection and re-try the call - if response.status in [404, 415] and self._compatibility_mode is False: - log.debug('calling the endpoint "%s" but received 404; downgrading the API', self._services) + if response.status in [404, 415] and self._fallback: + log.debug('calling endpoint "%s" but received %s; downgrading API', self._services, response.status) self._downgrade() return self.send_services(services) diff --git a/ddtrace/constants.py b/ddtrace/constants.py index 6f6c3972d6f..ae236278995 100644 --- a/ddtrace/constants.py +++ b/ddtrace/constants.py @@ -1 +1,2 @@ FILTERS_KEY = 'FILTERS' +SAMPLING_PRIORITY_KEY = '_sampling_priority_v1' diff --git a/ddtrace/context.py b/ddtrace/context.py index 16b30995ae7..f6983800186 100644 --- a/ddtrace/context.py +++ b/ddtrace/context.py @@ -20,7 +20,7 @@ class Context(object): This data structure is thread-safe. """ - def __init__(self, trace_id=None, span_id=None, sampled=True): + def __init__(self, trace_id=None, span_id=None, sampled=True, sampling_priority=None): """ Initialize a new thread-safe ``Context``. @@ -28,17 +28,23 @@ def __init__(self, trace_id=None, span_id=None, sampled=True): :param int span_id: span_id of parent span """ self._trace = [] - self._sampled = sampled self._finished_spans = 0 self._current_span = None self._lock = threading.Lock() - self._parent_span_id = span_id + self._parent_trace_id = trace_id + self._parent_span_id = span_id + self._sampled = sampled + self._sampling_priority = sampling_priority + + def get_context_attributes(self): + """ + Return the context propagatable attributes. - def _get_parent_span_ids(self): - """ Returns tuple of base trace_id, span_id for distributed tracing.""" + Useful to propagate context to an external element. + """ with self._lock: - return self._parent_trace_id, self._parent_span_id + return self._parent_trace_id, self._parent_span_id, self._sampling_priority def get_current_span(self): """ @@ -50,13 +56,28 @@ def get_current_span(self): with self._lock: return self._current_span + def _set_current_span(self, span): + """ + Set current span internally. + + Non-safe if not used with a lock. For internal Context usage only. + """ + self._current_span = span + if span: + self._parent_trace_id = span.trace_id + self._parent_span_id = span.span_id + self._sampled = span.sampled + self._sampling_priority = span.get_sampling_priority() + else: + self._parent_span_id = None + def add_span(self, span): """ Add a span to the context trace list, keeping it as the last active span. """ with self._lock: - self._current_span = span - self._sampled = span.sampled + self._set_current_span(span) + self._trace.append(span) span._context = self @@ -67,7 +88,7 @@ def close_span(self, span): """ with self._lock: self._finished_spans += 1 - self._current_span = span._parent + self._set_current_span(span._parent) # notify if the trace is not closed properly; this check is executed only # if the tracer debug_logging is enabled and when the root span is closed @@ -114,9 +135,11 @@ def get(self): sampled = self._sampled # clean the current state self._trace = [] - self._sampled = False self._finished_spans = 0 - self._current_span = None + self._parent_trace_id = None + self._parent_span_id = None + self._sampling_priority = None + self._sampled = True return trace, sampled else: return None, None @@ -145,9 +168,7 @@ def set(self, ctx): def get(self): ctx = getattr(self._locals, 'context', None) if not ctx: - # create a new Context if it's not available; this action - # is done once because the Context has the reset() method - # to reuse the same instance + # create a new Context if it's not available ctx = Context() self._locals.context = ctx diff --git a/ddtrace/contrib/aiohttp/middlewares.py b/ddtrace/contrib/aiohttp/middlewares.py index 080d769297c..32d051ff3b8 100644 --- a/ddtrace/contrib/aiohttp/middlewares.py +++ b/ddtrace/contrib/aiohttp/middlewares.py @@ -3,6 +3,7 @@ from ..asyncio import context_provider from ...ext import AppTypes, http from ...compat import stringify +from ...context import Context CONFIG_KEY = 'datadog_trace' @@ -11,6 +12,7 @@ PARENT_TRACE_HEADER_ID = 'x-datadog-trace-id' PARENT_SPAN_HEADER_ID = 'x-datadog-parent-id' +SAMPLING_PRIORITY_HEADER_ID = 'x-datadog-sampling-priority' @asyncio.coroutine @@ -29,24 +31,30 @@ def attach_context(request): service = app[CONFIG_KEY]['service'] distributed_tracing = app[CONFIG_KEY]['distributed_tracing_enabled'] + context = tracer.get_call_context() + + # Create a new context based on the propagated information + # Do not fill context with distributed sampling if the tracer is disabled + # because the would call the callee to generate references to data which + # has never been sent to agent. + if tracer.enabled and distributed_tracing: + trace_id = int(request.headers.get(PARENT_TRACE_HEADER_ID, 0)) + parent_span_id = int(request.headers.get(PARENT_SPAN_HEADER_ID, 0)) + sampling_priority = request.headers.get(SAMPLING_PRIORITY_HEADER_ID) + # keep sampling priority as None if not propagated, to support older client versions on the parent side + if sampling_priority: + sampling_priority = int(sampling_priority) + + context = Context(trace_id=trace_id, span_id=parent_span_id, sampling_priority=sampling_priority) + # trace the handler - request_span = tracer.trace( + request_span = tracer.start_span( 'aiohttp.request', service=service, span_type=http.TYPE, + child_of=context, ) - if distributed_tracing: - # set parent trace/span IDs if present: - # http://pypi.datadoghq.com/trace/docs/#distributed-tracing - parent_trace_id = request.headers.get(PARENT_TRACE_HEADER_ID) - if parent_trace_id is not None: - request_span.trace_id = int(parent_trace_id) - - parent_span_id = request.headers.get(PARENT_SPAN_HEADER_ID) - if parent_span_id is not None: - request_span.parent_id = int(parent_span_id) - # attach the context and the root span to the request; the Context # may be freely used by the application code request[REQUEST_CONTEXT_KEY] = request_span.context diff --git a/ddtrace/contrib/asyncio/helpers.py b/ddtrace/contrib/asyncio/helpers.py index dde8e8e73f1..5b976923496 100644 --- a/ddtrace/contrib/asyncio/helpers.py +++ b/ddtrace/contrib/asyncio/helpers.py @@ -18,6 +18,9 @@ def set_call_context(task, ctx): """ Updates the ``Context`` for the given Task. Useful when you need to pass the context among different tasks. + + This method is available for backward-compatibility. Use the + ``AsyncioContextProvider`` API to set the current active ``Context``. """ setattr(task, CONTEXT_ATTR, ctx) @@ -74,7 +77,7 @@ def _wrap_executor(fn, args, tracer, ctx): # the AsyncioContextProvider knows that this is a new thread # so it is legit to pass the Context in the thread-local storage; # fn() will be executed outside the asyncio loop as a synchronous code - tracer._context_provider._local.set(ctx) + tracer.context_provider.activate(ctx) return fn(*args) @@ -104,16 +107,11 @@ def _wrapped_create_task(wrapped, instance, args, kwargs): ctx = getattr(current_task, CONTEXT_ATTR, None) span = ctx.get_current_span() if ctx else None - if span: - parent_trace_id, parent_span_id = span.trace_id, span.span_id - elif ctx: - parent_trace_id, parent_span_id = ctx._get_parent_span_ids() - else: - parent_trace_id = parent_span_id = None - - if parent_trace_id and parent_span_id: + if ctx: + parent_trace_id, parent_span_id, sampling_priority = ctx.get_context_attributes() + # current task has a context, so parent a new context to the base context - new_ctx = Context(trace_id=parent_trace_id, span_id=parent_span_id) + new_ctx = Context(trace_id=parent_trace_id, span_id=parent_span_id, sampling_priority=sampling_priority) set_call_context(new_task, new_ctx) return new_task diff --git a/ddtrace/contrib/asyncio/provider.py b/ddtrace/contrib/asyncio/provider.py index 10545d1fbda..d65ff9ad933 100644 --- a/ddtrace/contrib/asyncio/provider.py +++ b/ddtrace/contrib/asyncio/provider.py @@ -14,8 +14,28 @@ class AsyncioContextProvider(DefaultContextProvider): execution. It must be used in asynchronous programming that relies in the built-in ``asyncio`` library. Framework instrumentation that is built on top of the ``asyncio`` library, can use this provider. + + This Context Provider inherits from ``DefaultContextProvider`` because + it uses a thread-local storage when the ``Context`` is propagated to + a different thread, than the one that is running the async loop. """ - def __call__(self, loop=None): + def activate(self, context, loop=None): + """Sets the scoped ``Context`` for the current running ``Task``. + """ + try: + loop = loop or asyncio.get_event_loop() + except RuntimeError: + # detects if a loop is available in the current thread; + # This happens when a new thread is created from the one that is running + # the async loop + return self._local.set(context) + + # the current unit of work (if tasks are used) + task = asyncio.Task.current_task(loop=loop) + setattr(task, CONTEXT_ATTR, context) + return context + + def active(self, loop=None): """ Returns the scoped Context for this execution flow. The ``Context`` uses the current task as a carrier so if a single task is used for the entire application, diff --git a/ddtrace/contrib/gevent/greenlet.py b/ddtrace/contrib/gevent/greenlet.py index 39b3ae62c5b..1da91e10e11 100644 --- a/ddtrace/contrib/gevent/greenlet.py +++ b/ddtrace/contrib/gevent/greenlet.py @@ -29,7 +29,11 @@ def __init__(self, *args, **kwargs): # the context is always available made exception of the main greenlet if ctx: # create a new context that inherits the current active span - new_ctx = Context() - new_ctx._sampled = ctx._sampled + # TODO: a better API for Context, should get the tuple at once + new_ctx = Context( + trace_id=ctx._parent_trace_id, + span_id=ctx._parent_span_id, + sampled=ctx._sampled, + ) new_ctx._current_span = ctx._current_span setattr(self, CONTEXT_ATTR, new_ctx) diff --git a/ddtrace/contrib/gevent/provider.py b/ddtrace/contrib/gevent/provider.py index a18ae385ade..f9d597e4213 100644 --- a/ddtrace/contrib/gevent/provider.py +++ b/ddtrace/contrib/gevent/provider.py @@ -15,12 +15,19 @@ class GeventContextProvider(BaseContextProvider): in the ``gevent`` library. Framework instrumentation that uses the gevent WSGI server (or gevent in general), can use this provider. """ - def __call__(self): + def activate(self, context): + """Sets the scoped ``Context`` for the current running ``Greenlet``. + """ + current_g = gevent.getcurrent() + if current_g is not None: + setattr(current_g, CONTEXT_ATTR, context) + return context + + def active(self): """ Returns the scoped ``Context`` for this execution flow. The ``Context`` uses the ``Greenlet`` class as a carrier, and everytime a greenlet - is created it receives the "parent" context. The main greenlet - will never have an attached ``Context``. + is created it receives the "parent" context. """ current_g = gevent.getcurrent() ctx = getattr(current_g, CONTEXT_ATTR, None) @@ -29,9 +36,14 @@ def __call__(self): return ctx # the Greenlet doesn't have a Context so it's created and attached - # unless it's the main greenlet; in that case we must be sure - # that no Context is generated - if current_g.parent: + # TODO: previous implementation avoided to add a Context to the main + # greenlet because it could have side-effects when switching back + # and forth between different executions. This results in issues such + # as: https://github.com/DataDog/dd-trace-py/issues/309 + # and is required for Distributed Tracing when providing a new arbitrary + # Context. On the other hand, it's imperative to double check if there + # are side effects. + if current_g: ctx = Context() setattr(current_g, CONTEXT_ATTR, ctx) return ctx diff --git a/ddtrace/contrib/psycopg/connection.py b/ddtrace/contrib/psycopg/connection.py index 0c90c8c8c18..0edfaaf60f9 100644 --- a/ddtrace/contrib/psycopg/connection.py +++ b/ddtrace/contrib/psycopg/connection.py @@ -49,12 +49,11 @@ def execute(self, query, vars=None): if not self._datadog_tracer: return cursor.execute(self, query, vars) - with self._datadog_tracer.trace("postgres.query") as s: + with self._datadog_tracer.trace("postgres.query", service=self._datadog_service) as s: if not s.sampled: return super(TracedCursor, self).execute(query, vars) s.resource = query - s.service = self._datadog_service s.span_type = sql.TYPE s.set_tags(self._datadog_tags) try: diff --git a/ddtrace/contrib/pylons/middleware.py b/ddtrace/contrib/pylons/middleware.py index 55f27734f69..395984998e6 100644 --- a/ddtrace/contrib/pylons/middleware.py +++ b/ddtrace/contrib/pylons/middleware.py @@ -21,8 +21,9 @@ def __init__(self, app, tracer, service="pylons"): ) def __call__(self, environ, start_response): - with self._tracer.trace("pylons.request") as span: - span.service = self._service + with self._tracer.trace("pylons.request", service=self._service) as span: + # Set the service in tracer.trace() as priority sampling requires it to be + # set as early as possible when different services share one single agent. span.span_type = http.TYPE if not span.sampled: diff --git a/ddtrace/contrib/tornado/__init__.py b/ddtrace/contrib/tornado/__init__.py index 90a97d816a8..840d0905556 100644 --- a/ddtrace/contrib/tornado/__init__.py +++ b/ddtrace/contrib/tornado/__init__.py @@ -78,12 +78,12 @@ def notify(self): with require_modules(required_modules) as missing_modules: if not missing_modules: - from .patch import patch, unpatch - from .stack_context import run_with_trace_context, TracerStackContext - # alias for API compatibility + from .stack_context import run_with_trace_context, TracerStackContext context_provider = TracerStackContext.current_context + from .patch import patch, unpatch + __all__ = [ 'patch', 'unpatch', diff --git a/ddtrace/contrib/tornado/application.py b/ddtrace/contrib/tornado/application.py index 3f926f8a497..cf4b9a5dcb5 100644 --- a/ddtrace/contrib/tornado/application.py +++ b/ddtrace/contrib/tornado/application.py @@ -2,9 +2,8 @@ from tornado import template -from . import decorators +from . import decorators, context_provider from .constants import CONFIG_KEY -from .stack_context import TracerStackContext from ...ext import AppTypes @@ -37,7 +36,7 @@ def tracer_config(__init__, app, args, kwargs): # global tracer while here we can have a different instance (even if # this is not usual). tracer.configure( - context_provider=TracerStackContext.current_context, + context_provider=context_provider, wrap_executor=decorators.wrap_executor, enabled=settings.get('enabled', None), hostname=settings.get('agent_hostname', None), diff --git a/ddtrace/contrib/tornado/patch.py b/ddtrace/contrib/tornado/patch.py index aa9d7aacdeb..c607b54d267 100644 --- a/ddtrace/contrib/tornado/patch.py +++ b/ddtrace/contrib/tornado/patch.py @@ -3,8 +3,7 @@ from wrapt import wrap_function_wrapper as _w -from . import handlers, application, decorators, template -from .stack_context import TracerStackContext +from . import handlers, application, decorators, template, context_provider from ...util import unwrap as _u @@ -34,7 +33,7 @@ def patch(): # configure the global tracer ddtrace.tracer.configure( - context_provider=TracerStackContext.current_context, + context_provider=context_provider, wrap_executor=decorators.wrap_executor, ) diff --git a/ddtrace/provider.py b/ddtrace/provider.py index e679f0563ef..4ea739e67f2 100644 --- a/ddtrace/provider.py +++ b/ddtrace/provider.py @@ -7,16 +7,20 @@ class BaseContextProvider(object): for a callable class, capable to retrieve the current active ``Context`` instance. Context providers must inherit this class and implement: - * the ``__call__`` method, so that the class is callable + * the ``active`` method, that returns the current active ``Context`` + * the ``activate`` method, that sets the current active ``Context`` """ + def activate(self, context): + raise NotImplementedError + + def active(self): + raise NotImplementedError def __call__(self, *args, **kwargs): + """Method available for backward-compatibility. It proxies the call to + ``self.active()`` and must not do anything more. """ - Makes the class callable so that the ``Tracer`` can invoke the - ``ContextProvider`` to retrieve the current context. - This class must be implemented. - """ - raise NotImplementedError + return self.active() class DefaultContextProvider(BaseContextProvider): @@ -28,9 +32,15 @@ class DefaultContextProvider(BaseContextProvider): def __init__(self): self._local = ThreadLocalContext() - def __call__(self, *args, **kwargs): + def activate(self, context): + """Makes the given ``context`` active, so that the provider calls + the thread-local storage implementation. """ - Returns the global context for this tracer. Returned ``Context`` must be thread-safe - or thread-local. + return self._local.set(context) + + def active(self): + """Returns the current active ``Context`` for this tracer. Returned + ``Context`` must be thread-safe or thread-local for this specific + implementation. """ return self._local.get() diff --git a/ddtrace/sampler.py b/ddtrace/sampler.py index 12ab557981c..02ea9f63eff 100644 --- a/ddtrace/sampler.py +++ b/ddtrace/sampler.py @@ -3,8 +3,11 @@ Any `sampled = False` trace won't be written, and can be ignored by the instrumentation. """ import logging -import array -import threading + +from json import loads +from threading import Lock + +from .compat import iteritems log = logging.getLogger(__name__) @@ -18,8 +21,7 @@ class AllSampler(object): """Sampler sampling all the traces""" def sample(self, span): - span.sampled = True - + return True class RateSampler(object): """Sampler based on a rate @@ -28,7 +30,7 @@ class RateSampler(object): It samples randomly, its main purpose is to reduce the instrumentation footprint. """ - def __init__(self, sample_rate): + def __init__(self, sample_rate=1): if sample_rate <= 0: log.error("sample_rate is negative or null, disable the Sampler") sample_rate = 1 @@ -44,59 +46,67 @@ def set_sample_rate(self, sample_rate): self.sampling_id_threshold = sample_rate * MAX_TRACE_ID def sample(self, span): - span.sampled = ((span.trace_id * KNUTH_FACTOR) % MAX_TRACE_ID) <= self.sampling_id_threshold - span.set_metric(SAMPLE_RATE_METRIC_KEY, self.sample_rate) + sampled = ((span.trace_id * KNUTH_FACTOR) % MAX_TRACE_ID) <= self.sampling_id_threshold -class ThroughputSampler(object): - """ Sampler applying a strict limit over the trace volume. + return sampled - Stop tracing once reached more than `tps` traces per second. - Computation is based on a circular buffer over the last - `BUFFER_DURATION` with a `BUFFER_SIZE` size. +class RateByServiceSampler(object): + """Sampler based on a rate, by service - DEPRECATED: Outdated implementation. + Keep (100 * `sample_rate`)% of the traces. + The sample rate is kept independently for each service/env tuple. """ - # Reasonable values - BUCKETS_PER_S = 10 - BUFFER_DURATION = 2 - BUFFER_SIZE = BUCKETS_PER_S * BUFFER_DURATION + def __init__(self, sample_rate=1): + self._lock = Lock() + self._by_service_samplers = {} + self._default_key = self._key(None, None) + self._by_service_samplers[self._default_key] = RateSampler(sample_rate) - def __init__(self, tps): - self.buffer_limit = tps * self.BUFFER_DURATION + def _key(self, service="", env=""): + service = service or "" + env = env or "" + return "service:" + service + ",env:" + env - # Circular buffer counting sampled traces over the last `BUFFER_DURATION` - self.counter = 0 - self.counter_buffer = array.array('L', [0] * self.BUFFER_SIZE) - self._buffer_lock = threading.Lock() - # Last time we sampled a trace, multiplied by `BUCKETS_PER_S` - self.last_track_time = 0 + def _set_sample_rate_by_key(self, sample_rate, key): + with self._lock: + if key in self._by_service_samplers: + self._by_service_samplers[key].set_sample_rate(sample_rate) + else: + self._by_service_samplers[key] = RateSampler(sample_rate) - log.info("initialized ThroughputSampler, sample up to %s traces/s", tps) + def set_sample_rate(self, sample_rate, service="", env=""): + self._set_sample_rate_by_key(sample_rate, self._key(service, env)) def sample(self, span): - now = int(span.start * self.BUCKETS_PER_S) - - with self._buffer_lock: - last_track_time = self.last_track_time - if now > last_track_time: - self.last_track_time = now - self.expire_buckets(last_track_time, now) - - span.sampled = self.counter < self.buffer_limit - - if span.sampled: - self.counter += 1 - self.counter_buffer[self.key_from_time(now)] += 1 - - return span - - def key_from_time(self, t): - return t % self.BUFFER_SIZE - - def expire_buckets(self, start, end): - period = min(self.BUFFER_SIZE, (end - start)) - for i in range(period): - key = self.key_from_time(start + i + 1) - self.counter -= self.counter_buffer[key] - self.counter_buffer[key] = 0 + tags = span.tracer().tags + env = tags['env'] if 'env' in tags else None + key = self._key(span.service, env) + with self._lock: + if key in self._by_service_samplers: + return self._by_service_samplers[key].sample(span) + return self._by_service_samplers[self._default_key].sample(span) + + def set_sample_rates_from_json(self, body): + log.debug("setting sample rates from JSON '%s'" % repr(body)) + try: + if not isinstance(body, str): + body = body.decode('utf-8') + if body.startswith('OK'): + # This typically happens when using a priority-sampling enabled + # library with an outdated agent. It still works, but priority sampling + # will probably send too many traces, so the next step is to upgrade agent. + log.warning("'OK' is not a valid JSON, please make sure trace-agent is up to date") + return + content = loads(body) + except ValueError as err: + log.error("unable to load JSON '%s': %s" % (body, err)) + return + + rate_by_service = content['rate_by_service'] + for key, sample_rate in iteritems(rate_by_service): + self._set_sample_rate_by_key(sample_rate, key) + with self._lock: + for key in list(self._by_service_samplers): + if key not in rate_by_service and key != self._default_key: + del self._by_service_samplers[key] diff --git a/ddtrace/span.py b/ddtrace/span.py index 0139944ea93..5a2b3f67b69 100644 --- a/ddtrace/span.py +++ b/ddtrace/span.py @@ -7,6 +7,7 @@ from .compat import StringIO, stringify, iteritems, numeric_types from .ext import errors +from .constants import SAMPLING_PRIORITY_KEY log = logging.getLogger(__name__) @@ -30,6 +31,7 @@ class Span(object): 'duration', # Sampler attributes 'sampled', + 'priority', # Internal attributes '_tracer', '_context', @@ -90,6 +92,7 @@ def __init__( # sampling self.sampled = True + self.priority = None self._tracer = tracer self._context = context @@ -181,6 +184,31 @@ def set_metrics(self, metrics): def get_metric(self, key): return self.metrics.get(key) + def set_sampling_priority(self, sampling_priority): + """ + Set the sampling priority. + + 0 means that the trace can be dropped, any higher value indicates the + importance of the trace to the backend sampler. + Default is None, the priority mechanism is disabled. + """ + if sampling_priority is None: + self.priority = None + else: + try: + self.priority = int(sampling_priority) + except ValueError: + # if the provided sampling_priority is invalid, ignore it. + pass + + def get_sampling_priority(self): + """ + Return the sampling priority. + + Return an positive integer. Can also be None when not defined. + """ + return self.priority + def to_dict(self): d = { 'trace_id' : self.trace_id, @@ -214,6 +242,12 @@ def to_dict(self): if self.span_type: d['type'] = self.span_type + if self.priority is not None: + if d.get('metrics'): + d['metrics'][SAMPLING_PRIORITY_KEY] = self.priority + else: + d['metrics'] = {SAMPLING_PRIORITY_KEY : self.priority} + return d def set_traceback(self, limit=20): @@ -260,6 +294,7 @@ def pprint(self): ("start", self.start), ("end", "" if not self.duration else self.start + self.duration), ("duration", "%fs" % (self.duration or 0)), + ("priority", self.priority), ("error", self.error), ("tags", "") ] diff --git a/ddtrace/tracer.py b/ddtrace/tracer.py index b2878fcfc3a..6192a4d4c10 100644 --- a/ddtrace/tracer.py +++ b/ddtrace/tracer.py @@ -1,15 +1,15 @@ import functools import logging +from os import getpid from .ext import system from .provider import DefaultContextProvider from .context import Context -from .sampler import AllSampler +from .sampler import AllSampler, RateSampler, RateByServiceSampler, SAMPLE_RATE_METRIC_KEY from .writer import AgentWriter from .span import Span from .constants import FILTERS_KEY from . import compat -from os import getpid log = logging.getLogger(__name__) @@ -34,6 +34,9 @@ def __init__(self): Create a new ``Tracer`` instance. A global tracer is already initialized for common usage, so there is no need to initialize your own ``Tracer``. """ + self.sampler = None + self.priority_sampler = None + # Apply the default configuration self.configure( enabled=True, @@ -70,8 +73,15 @@ async def web_handler(request): """ return self._context_provider(*args, **kwargs) - def configure(self, enabled=None, hostname=None, port=None, sampler=None, - context_provider=None, wrap_executor=None, settings=None): + @property + def context_provider(self): + """Returns the current Tracer Context Provider""" + return self._context_provider + + def configure(self, enabled=None, hostname=None, port=None, + sampler=None, priority_sampler=None, + context_provider=None, wrap_executor=None, priority_sampling=None, + settings=None): """ Configure an existing Tracer the easy way. Allow to configure or reconfigure a Tracer instance. @@ -80,13 +90,16 @@ def configure(self, enabled=None, hostname=None, port=None, sampler=None, Otherwise they'll be dropped. :param str hostname: Hostname running the Trace Agent :param int port: Port of the Trace Agent - :param object sampler: A custom Sampler instance + :param object sampler: A custom Sampler instance, locally deciding to totally drop the trace or not. + :param object priority_sampler: A custom Sampler instance, taking the priority sampling decision. :param object context_provider: The ``ContextProvider`` that will be used to retrieve automatically the current call context. This is an advanced option that usually doesn't need to be changed from the default value :param object wrap_executor: callable that is used when a function is decorated with ``Tracer.wrap()``. This is an advanced option that usually doesn't need to be changed from the default value + :param priority_sampling: enable priority sampling, this is required for + complete distributed tracing support. """ if enabled is not None: self.enabled = enabled @@ -95,16 +108,24 @@ def configure(self, enabled=None, hostname=None, port=None, sampler=None, if settings is not None: filters = settings.get(FILTERS_KEY) - if hostname is not None or port is not None or filters is not None: + if sampler is not None: + self.sampler = sampler + + if priority_sampling and priority_sampler is None: + self.priority_sampler = RateByServiceSampler() + + if priority_sampler is not None: + self.priority_sampler = priority_sampler + + if hostname is not None or port is not None or filters is not None or \ + priority_sampler is not None or priority_sampling is not None: self.writer = AgentWriter( hostname or self.DEFAULT_HOSTNAME, port or self.DEFAULT_PORT, - filters=filters + filters=filters, + priority_sampler=self.priority_sampler, ) - if sampler is not None: - self.sampler = sampler - if context_provider is not None: self._context_provider = context_provider @@ -137,8 +158,8 @@ def start_span(self, name, child_of=None, service=None, resource=None, span_type context = tracer.get_call_context() span = tracer.start_span("web.worker", child_of=context) """ - # retrieve if the span is a child_of a Span or a Context if child_of is not None: + # retrieve if the span is a child_of a Span or a of Context child_of_context = isinstance(child_of, Context) context = child_of if child_of_context else child_of.context parent = child_of.get_current_span() if child_of_context else child_of @@ -147,20 +168,37 @@ def start_span(self, name, child_of=None, service=None, resource=None, span_type parent = None if parent: - # this is a child span + trace_id = parent.trace_id + parent_span_id = parent.span_id + sampling_priority = parent.get_sampling_priority() + else: + trace_id, parent_span_id, sampling_priority = context.get_context_attributes() + + if trace_id: + # child_of a non-empty context, so either a local child span or from a remote context + + # when not provided, inherit from parent's service + if parent: + service = service or parent.service + span = Span( self, name, - service=(service or parent.service), + trace_id=trace_id, + parent_id=parent_span_id, + service=service, resource=resource, span_type=span_type, - trace_id=parent.trace_id, - parent_id=parent.span_id, ) - span._parent = parent - span.sampled = parent.sampled + span.set_sampling_priority(sampling_priority) + + # Extra attributes when from a local parent + if parent: + span.sampled = parent.sampled + span._parent = parent + else: - # this is a root span + # this is the root span of a new trace span = Span( self, name, @@ -169,24 +207,34 @@ def start_span(self, name, child_of=None, service=None, resource=None, span_type span_type=span_type, ) - span.set_tag(system.PID, getpid()) - - # http://pypi.datadoghq.com/trace/docs/#distributed-tracing - parent_trace_id, parent_span_id = context._get_parent_span_ids() - if parent_trace_id: - span.trace_id = parent_trace_id - - if parent_span_id: - span.parent_id = parent_span_id - - self.sampler.sample(span) + span.sampled = self.sampler.sample(span) + if span.sampled: + # When doing client sampling in the client, keep the sample rate so that we can + # scale up statistics in the next steps of the pipeline. + if isinstance(self.sampler, RateSampler): + span.set_metric(SAMPLE_RATE_METRIC_KEY, self.sampler.sample_rate) + + if self.priority_sampler: + if self.priority_sampler.sample(span): + span.set_sampling_priority(1) + else: + span.set_sampling_priority(0) + else: + if self.priority_sampler: + # If dropped by the local sampler, distributed instrumentation can drop it too. + span.set_sampling_priority(0) # add common tags if self.tags: span.set_tags(self.tags) + if not span._parent: + span.set_tag(system.PID, getpid()) + + # TODO: add protection if the service is missing? # add it to the current context context.add_span(span) + return span def trace(self, name, service=None, resource=None, span_type=None): @@ -232,7 +280,7 @@ def trace(self, name, service=None, resource=None, span_type=None): child_of=context, service=service, resource=resource, - span_type=span_type + span_type=span_type, ) def current_span(self): diff --git a/ddtrace/writer.py b/ddtrace/writer.py index 4cf6384ac18..f7e2676a80f 100644 --- a/ddtrace/writer.py +++ b/ddtrace/writer.py @@ -22,13 +22,15 @@ class AgentWriter(object): - def __init__(self, hostname='localhost', port=8126, filters=None): + def __init__(self, hostname='localhost', port=8126, filters=None, priority_sampler=None): self._pid = None self._traces = None self._services = None self._worker = None self._filters = filters - self.api = api.API(hostname, port) + self._priority_sampler = priority_sampler + priority_sampling = priority_sampler is not None + self.api = api.API(hostname, port, priority_sampling=priority_sampling) def write(self, spans=None, services=None): # if the worker needs to be reset, do it. @@ -58,18 +60,21 @@ def _reset_worker(self): self._traces, self._services, filters=self._filters, + priority_sampler=self._priority_sampler, ) class AsyncWorker(object): - def __init__(self, api, trace_queue, service_queue, shutdown_timeout=DEFAULT_TIMEOUT, filters=None): + def __init__(self, api, trace_queue, service_queue, shutdown_timeout=DEFAULT_TIMEOUT, + filters=None, priority_sampler=None): self._trace_queue = trace_queue self._service_queue = service_queue self._lock = threading.Lock() self._thread = None self._shutdown_timeout = shutdown_timeout self._filters = filters + self._priority_sampler = priority_sampler self._last_error_ts = 0 self.api = api self.start() @@ -151,6 +156,11 @@ def _target(self): # no traces and the queue is closed. our work is done return + if hasattr(result_traces, 'read'): + result_traces_body = result_traces.read() + if hasattr(self._priority_sampler, 'set_sample_rates_from_json'): + self._priority_sampler.set_sample_rates_from_json(result_traces_body) + self._log_error_status(result_traces, "traces") result_traces = None self._log_error_status(result_services, "services") diff --git a/tests/contrib/aiohttp/test_middleware.py b/tests/contrib/aiohttp/test_middleware.py index 6663c8a526e..acab7834787 100644 --- a/tests/contrib/aiohttp/test_middleware.py +++ b/tests/contrib/aiohttp/test_middleware.py @@ -4,6 +4,7 @@ from aiohttp.test_utils import unittest_run_loop from ddtrace.contrib.aiohttp.middlewares import trace_app, trace_middleware +from ddtrace.sampler import RateSampler from .utils import TraceTestCase from .app.web import setup_app, noop_middleware @@ -229,6 +230,67 @@ def test_distributed_tracing(self): # with the right trace_id and parent_id eq_(span.trace_id, 100) eq_(span.parent_id, 42) + eq_(span.get_sampling_priority(), None) + + @unittest_run_loop + @asyncio.coroutine + def test_distributed_tracing_with_sampling_true(self): + old_sampler = self.tracer.priority_sampler + self.tracer.priority_sampler = RateSampler(0.1) + + # activate distributed tracing + self.app['datadog_trace']['distributed_tracing_enabled'] = True + tracing_headers = { + 'x-datadog-trace-id': '100', + 'x-datadog-parent-id': '42', + 'x-datadog-sampling-priority': '1', + } + + request = yield from self.client.request('GET', '/', headers=tracing_headers) + eq_(200, request.status) + text = yield from request.text() + eq_("What's tracing?", text) + # the trace is created + traces = self.tracer.writer.pop_traces() + eq_(1, len(traces)) + eq_(1, len(traces[0])) + span = traces[0][0] + # with the right trace_id and parent_id + eq_(100, span.trace_id) + eq_(42, span.parent_id) + eq_(1, span.get_sampling_priority()) + + self.tracer.priority_sampler = old_sampler + + @unittest_run_loop + @asyncio.coroutine + def test_distributed_tracing_with_sampling_false(self): + old_sampler = self.tracer.priority_sampler + self.tracer.priority_sampler = RateSampler(0.9) + + # activate distributed tracing + self.app['datadog_trace']['distributed_tracing_enabled'] = True + tracing_headers = { + 'x-datadog-trace-id': '100', + 'x-datadog-parent-id': '42', + 'x-datadog-sampling-priority': '0', + } + + request = yield from self.client.request('GET', '/', headers=tracing_headers) + eq_(200, request.status) + text = yield from request.text() + eq_("What's tracing?", text) + # the trace is created + traces = self.tracer.writer.pop_traces() + eq_(1, len(traces)) + eq_(1, len(traces[0])) + span = traces[0][0] + # with the right trace_id and parent_id + eq_(100, span.trace_id) + eq_(42, span.parent_id) + eq_(0, span.get_sampling_priority()) + + self.tracer.priority_sampler = old_sampler @unittest_run_loop @asyncio.coroutine diff --git a/tests/contrib/asyncio/test_tracer.py b/tests/contrib/asyncio/test_tracer.py index ccce4fef805..8c11f75a417 100644 --- a/tests/contrib/asyncio/test_tracer.py +++ b/tests/contrib/asyncio/test_tracer.py @@ -273,9 +273,9 @@ def f2(): eq_(child_2.parent_id, main_task.span_id) @mark_asyncio - def test_propagation_with_new_context(self): + def test_propagation_with_set_call_context(self): # ensures that if a new Context is attached to the current - # running Task, a previous trace is resumed + # running Task via helpers, a previous trace is resumed task = asyncio.Task.current_task() ctx = Context(trace_id=100, span_id=101) set_call_context(task, ctx) @@ -290,6 +290,24 @@ def test_propagation_with_new_context(self): eq_(span.trace_id, 100) eq_(span.parent_id, 101) + @mark_asyncio + def test_propagation_with_new_context(self): + # ensures that if a new Context is activated, a trace + # with the Context arguments is created + task = asyncio.Task.current_task() + ctx = Context(trace_id=100, span_id=101) + self.tracer.context_provider.activate(ctx) + + with self.tracer.trace('async_task'): + yield from asyncio.sleep(0.01) + + traces = self.tracer.writer.pop_traces() + eq_(len(traces), 1) + eq_(len(traces[0]), 1) + span = traces[0][0] + eq_(span.trace_id, 100) + eq_(span.parent_id, 101) + @mark_asyncio def test_event_loop_unpatch(self): # ensures that the event loop can be unpatched diff --git a/tests/contrib/celery/test_task.py b/tests/contrib/celery/test_task.py index 8ddabd17dd4..698d1065912 100644 --- a/tests/contrib/celery/test_task.py +++ b/tests/contrib/celery/test_task.py @@ -13,6 +13,10 @@ from ...test_tracer import get_dummy_tracer from ...util import assert_list_issuperset +EXPECTED_KEYS = ['service', 'resource', 'meta', 'name', + 'parent_id', 'trace_id', 'span_id', + 'duration', 'error', 'start', +] class CeleryTaskTest(unittest.TestCase): def assert_items_equal(self, a, b): @@ -109,10 +113,7 @@ def test_task_run(self): self.assertEqual(len(spans), 1) span = spans[0] - self.assert_items_equal( - span.to_dict().keys(), - ['service', 'resource', 'meta', 'name', 'parent_id', 'trace_id', 'duration', 'error', 'start', 'span_id'] - ) + self.assert_items_equal(span.to_dict().keys(), EXPECTED_KEYS) self.assertEqual(span.service, 'celery-test') self.assertEqual(span.resource, 'mock.mock.patched_task') self.assertEqual(span.name, 'celery.task.run') @@ -143,10 +144,7 @@ def test_task___call__(self): self.assertEqual(len(spans), 1) span = spans[0] - self.assert_items_equal( - span.to_dict().keys(), - ['service', 'resource', 'meta', 'name', 'parent_id', 'trace_id', 'duration', 'error', 'start', 'span_id'] - ) + self.assert_items_equal(span.to_dict().keys(), EXPECTED_KEYS) self.assertEqual(span.service, 'celery-test') self.assertEqual(span.resource, 'mock.mock.patched_task') self.assertEqual(span.name, 'celery.task.run') @@ -177,10 +175,7 @@ def test_task_apply_async(self): # Assert the first span for calling `apply` span = spans[0] - self.assert_items_equal( - span.to_dict().keys(), - ['service', 'resource', 'meta', 'name', 'parent_id', 'trace_id', 'duration', 'error', 'start', 'span_id'] - ) + self.assert_items_equal(span.to_dict().keys(), EXPECTED_KEYS) self.assertEqual(span.service, 'celery-test') self.assertEqual(span.resource, 'mock.mock.patched_task') self.assertEqual(span.name, 'celery.task.apply') @@ -197,10 +192,7 @@ def test_task_apply_async(self): # Assert the celery service span for calling `run` span = spans[1] - self.assert_items_equal( - span.to_dict().keys(), - ['service', 'resource', 'meta', 'name', 'parent_id', 'trace_id', 'duration', 'error', 'start', 'span_id'] - ) + self.assert_items_equal(span.to_dict().keys(), EXPECTED_KEYS) self.assertEqual(span.service, 'celery-test') self.assertEqual(span.resource, 'mock.mock.patched_task') self.assertEqual(span.name, 'celery.task.run') @@ -244,10 +236,7 @@ def test_task_apply(self): self.assertEqual(len(spans), 1) span = spans[0] - self.assert_items_equal( - span.to_dict().keys(), - ['service', 'resource', 'meta', 'name', 'parent_id', 'trace_id', 'duration', 'error', 'start', 'span_id'] - ) + self.assert_items_equal(span.to_dict().keys(), EXPECTED_KEYS) self.assertEqual(span.service, 'celery-test') self.assertEqual(span.resource, 'mock.mock.patched_task') self.assertEqual(span.name, 'celery.task.apply_async') @@ -286,10 +275,7 @@ def test_task_apply_eager(self): self.assertEqual(len(spans), 3) span = spans[0] - self.assert_items_equal( - span.to_dict().keys(), - ['service', 'resource', 'meta', 'name', 'parent_id', 'trace_id', 'duration', 'error', 'start', 'span_id'] - ) + self.assert_items_equal(span.to_dict().keys(), EXPECTED_KEYS) self.assertEqual(span.service, 'celery-test') self.assertEqual(span.resource, 'mock.mock.patched_task') self.assertEqual(span.name, 'celery.task.apply_async') @@ -304,10 +290,7 @@ def test_task_apply_eager(self): assert_list_issuperset(meta.keys(), ['id']) span = spans[1] - self.assert_items_equal( - span.to_dict().keys(), - ['service', 'resource', 'meta', 'name', 'parent_id', 'trace_id', 'duration', 'error', 'start', 'span_id'] - ) + self.assert_items_equal(span.to_dict().keys(), EXPECTED_KEYS) self.assertEqual(span.service, 'celery-test') self.assertEqual(span.resource, 'mock.mock.patched_task') self.assertEqual(span.name, 'celery.task.apply') @@ -324,10 +307,7 @@ def test_task_apply_eager(self): # The last span emitted span = spans[2] - self.assert_items_equal( - span.to_dict().keys(), - ['service', 'resource', 'meta', 'name', 'parent_id', 'trace_id', 'duration', 'error', 'start', 'span_id'] - ) + self.assert_items_equal(span.to_dict().keys(), EXPECTED_KEYS) self.assertEqual(span.service, 'celery-test') self.assertEqual(span.resource, 'mock.mock.patched_task') self.assertEqual(span.name, 'celery.task.run') @@ -371,10 +351,7 @@ def test_task_delay(self): self.assertEqual(len(spans), 1) span = spans[0] - self.assert_items_equal( - span.to_dict().keys(), - ['service', 'resource', 'meta', 'name', 'parent_id', 'trace_id', 'duration', 'error', 'start', 'span_id'] - ) + self.assert_items_equal(span.to_dict().keys(), EXPECTED_KEYS) self.assertEqual(span.service, 'celery-test') self.assertEqual(span.resource, 'mock.mock.patched_task') self.assertEqual(span.name, 'celery.task.apply_async') @@ -413,10 +390,7 @@ def test_task_delay_eager(self): self.assertEqual(len(spans), 3) span = spans[0] - self.assert_items_equal( - span.to_dict().keys(), - ['service', 'resource', 'meta', 'name', 'parent_id', 'trace_id', 'duration', 'error', 'start', 'span_id'] - ) + self.assert_items_equal(span.to_dict().keys(), EXPECTED_KEYS) self.assertEqual(span.service, 'celery-test') self.assertEqual(span.resource, 'mock.mock.patched_task') self.assertEqual(span.name, 'celery.task.apply_async') @@ -431,10 +405,7 @@ def test_task_delay_eager(self): assert_list_issuperset(meta.keys(), ['id']) span = spans[1] - self.assert_items_equal( - span.to_dict().keys(), - ['service', 'resource', 'meta', 'name', 'parent_id', 'trace_id', 'duration', 'error', 'start', 'span_id'] - ) + self.assert_items_equal(span.to_dict().keys(), EXPECTED_KEYS) self.assertEqual(span.service, 'celery-test') self.assertEqual(span.resource, 'mock.mock.patched_task') self.assertEqual(span.name, 'celery.task.apply') @@ -451,10 +422,7 @@ def test_task_delay_eager(self): # The last span emitted span = spans[2] - self.assert_items_equal( - span.to_dict().keys(), - ['service', 'resource', 'meta', 'name', 'parent_id', 'trace_id', 'duration', 'error', 'start', 'span_id'] - ) + self.assert_items_equal(span.to_dict().keys(), EXPECTED_KEYS) self.assertEqual(span.service, 'celery-test') self.assertEqual(span.resource, 'mock.mock.patched_task') self.assertEqual(span.name, 'celery.task.run') diff --git a/tests/contrib/gevent/test_tracer.py b/tests/contrib/gevent/test_tracer.py index 989b474feb2..b2e9e25ba73 100644 --- a/tests/contrib/gevent/test_tracer.py +++ b/tests/contrib/gevent/test_tracer.py @@ -1,6 +1,7 @@ import gevent import ddtrace +from ddtrace.context import Context from ddtrace.contrib.gevent import patch, unpatch from unittest import TestCase @@ -24,6 +25,8 @@ def setUp(self): patch() def tearDown(self): + # clean the active Context + self.tracer.context_provider.activate(None) # restore the original tracer ddtrace.tracer = self._original_tracer # untrace gevent @@ -35,6 +38,14 @@ def test_main_greenlet(self): ctx = getattr(main_greenlet, '__datadog_context', None) ok_(ctx is None) + def test_main_greenlet_context(self): + # the main greenlet must have a ``Context`` if called + ctx_tracer = self.tracer.get_call_context() + main_greenlet = gevent.getcurrent() + ctx_greenlet = getattr(main_greenlet, '__datadog_context', None) + ok_(ctx_tracer is ctx_greenlet) + eq_(len(ctx_tracer._trace), 0) + def test_get_call_context(self): # it should return the context attached to the provider def greenlet(): @@ -205,6 +216,25 @@ def greenlet(): eq_(1, len(traces[0])) eq_('greenlet', traces[0][0].name) + def test_propagation_with_new_context(self): + # create multiple futures so that we expect multiple + # traces instead of a single one + ctx = Context(trace_id=100, span_id=101) + self.tracer.context_provider.activate(ctx) + + def greenlet(): + with self.tracer.trace('greenlet') as span: + gevent.sleep(0.01) + + jobs = [gevent.spawn(greenlet) for x in range(1)] + gevent.joinall(jobs) + + traces = self.tracer.writer.pop_traces() + eq_(1, len(traces)) + eq_(1, len(traces[0])) + eq_(traces[0][0].trace_id, 100) + eq_(traces[0][0].parent_id, 101) + def test_trace_concurrent_spawn_later_calls(self): # create multiple futures so that we expect multiple # traces instead of a single one, even if greenlets diff --git a/tests/test_context.py b/tests/test_context.py index 74fb058d6b4..ea6fdcabff8 100644 --- a/tests/test_context.py +++ b/tests/test_context.py @@ -62,7 +62,7 @@ def test_get_trace(self): eq_(0, len(ctx._trace)) eq_(0, ctx._finished_spans) ok_(ctx._current_span is None) - ok_(ctx._sampled is False) + ok_(ctx._sampled is True) def test_get_trace_empty(self): # it should return None if the Context is not finished diff --git a/tests/test_encoders.py b/tests/test_encoders.py index ea78f14be2e..8c7ff13348a 100644 --- a/tests/test_encoders.py +++ b/tests/test_encoders.py @@ -35,6 +35,9 @@ def test_encode_traces_json(self): eq_(len(items), 2) eq_(len(items[0]), 2) eq_(len(items[1]), 2) + for i in range(2): + for j in range(2): + eq_('client.testing', items[i][j]['name']) def test_encode_traces_msgpack(self): # test encoding for MsgPack format @@ -58,3 +61,6 @@ def test_encode_traces_msgpack(self): eq_(len(items), 2) eq_(len(items[0]), 2) eq_(len(items[1]), 2) + for i in range(2): + for j in range(2): + eq_(b'client.testing', items[i][j][b'name']) diff --git a/tests/test_integration.py b/tests/test_integration.py index 4cc23db8eea..ee43ef9852a 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -480,3 +480,36 @@ def test_downgrade_api(self): ok_(response) eq_(response.status, 200) ok_(isinstance(api._encoder, JSONEncoder)) + +@skipUnless( + os.environ.get('TEST_DATADOG_INTEGRATION', False), + 'You should have a running trace agent and set TEST_DATADOG_INTEGRATION=1 env variable' +) +class TestRateByService(TestCase): + """ + Check we get feedback from the agent and we're able to process it. + """ + def setUp(self): + """ + Create a tracer without workers, while spying the ``send()`` method + """ + # create a new API object to test the transport using synchronous calls + self.tracer = get_dummy_tracer() + self.api_json = API('localhost', 8126, encoder=JSONEncoder()) + self.api_msgpack = API('localhost', 8126, encoder=MsgpackEncoder()) + + def test_send_single_trace(self): + # register a single trace with a span and send them to the trace agent + self.tracer.trace('client.testing').finish() + trace = self.tracer.writer.pop() + traces = [trace] + + # test JSON encoder + response = self.api_json.send_traces(traces) + ok_(response) + eq_(response.status, 200) + + # test Msgpack encoder + response = self.api_msgpack.send_traces(traces) + ok_(response) + eq_(response.status, 200) diff --git a/tests/test_sampler.py b/tests/test_sampler.py index cd53536faf8..cb3e99ad2e8 100644 --- a/tests/test_sampler.py +++ b/tests/test_sampler.py @@ -6,7 +6,9 @@ import threading from ddtrace.tracer import Tracer -from ddtrace.sampler import RateSampler, ThroughputSampler, SAMPLE_RATE_METRIC_KEY +from ddtrace.span import Span +from ddtrace.sampler import RateSampler, AllSampler, RateByServiceSampler, SAMPLE_RATE_METRIC_KEY +from ddtrace.compat import iteritems from .test_tracer import DummyWriter from .util import patch_time @@ -20,12 +22,11 @@ def test_sample_rate_deviation(self): tracer = Tracer() tracer.writer = writer - sample_rate = 0.5 tracer.sampler = RateSampler(sample_rate) random.seed(1234) - iterations = int(2e4) + iterations = int(1e4 / sample_rate) for i in range(iterations): span = tracer.trace(i) @@ -34,121 +35,92 @@ def test_sample_rate_deviation(self): samples = writer.pop() # We must have at least 1 sample, check that it has its sample rate properly assigned - assert samples[0].get_metric(SAMPLE_RATE_METRIC_KEY) == 0.5 + assert samples[0].get_metric(SAMPLE_RATE_METRIC_KEY) == sample_rate - # Less than 1% deviation when "enough" iterations (arbitrary, just check if it converges) + # Less than 2% deviation when "enough" iterations (arbitrary, just check if it converges) deviation = abs(len(samples) - (iterations * sample_rate)) / (iterations * sample_rate) - assert deviation < 0.01, "Deviation too high %f with sample_rate %f" % (deviation, sample_rate) + assert deviation < 0.02, "Deviation too high %f with sample_rate %f" % (deviation, sample_rate) - -class ThroughputSamplerTest(unittest.TestCase): - """Test suite for the ThroughputSampler""" - - def test_simple_limit(self): + def test_deterministic_behavior(self): + """ Test that for a given trace ID, the result is always the same """ writer = DummyWriter() - tracer = Tracer() - tracer.writer = writer - - with patch_time() as fake_time: - tps = 5 - tracer.sampler = ThroughputSampler(tps) - - for _ in range(10): - s = tracer.trace("whatever") - s.finish() - traces = writer.pop() - - got = len(traces) - expected = 10 - - assert got == expected, \ - "Wrong number of traces sampled, %s instead of %s" % (got, expected) - # Wait enough to reset - fake_time.sleep(tracer.sampler.BUFFER_DURATION + 1) - - for _ in range(100): - s = tracer.trace("whatever") - s.finish() - traces = writer.pop() - - got = len(traces) - expected = tps * tracer.sampler.BUFFER_DURATION - - assert got == expected, \ - "Wrong number of traces sampled, %s instead of %s" % (got, expected) - - def test_long_run(self): - writer = DummyWriter() tracer = Tracer() tracer.writer = writer - # Test a big matrix of combinaisons - # Ensure to have total_time >> BUFFER_DURATION to reduce edge effects - for tps in [10, 23, 15, 31]: - for (traces_per_s, total_time) in [(80, 23), (75, 66), (1000, 77)]: - - with patch_time() as fake_time: - # We do tons of operations in this test, do not let the time slowly shift - fake_time.set_delta(0) - - tracer.sampler = ThroughputSampler(tps) - - for _ in range(total_time): - for _ in range(traces_per_s): - s = tracer.trace("whatever") - s.finish() - fake_time.sleep(1) + tracer.sampler = RateSampler(0.5) - traces = writer.pop() - # The current sampler implementation can introduce an error of up to - # `tps * BUFFER_DURATION` traces at initialization (since the sampler starts empty) - got = len(traces) - expected = tps * total_time - error_delta = tps * tracer.sampler.BUFFER_DURATION + random.seed(1234) - assert abs(got - expected) <= error_delta, \ - "Wrong number of traces sampled, %s instead of %s (error_delta > %s)" % (got, expected, error_delta) + for i in range(10): + span = tracer.trace(i) + span.finish() + samples = writer.pop() + assert len(samples) <= 1, "there should be 0 or 1 spans" + sampled = (1 == len(samples)) + for j in range(10): + other_span = Span(tracer, i, trace_id=span.trace_id) + assert sampled == tracer.sampler.sample(other_span), "sampling should give the same result for a given trace_id" - def test_concurrency(self): - # Test that the sampler works well when used in different threads +class RateByServiceSamplerTest(unittest.TestCase): + def test_sample_rate_deviation(self): writer = DummyWriter() - tracer = Tracer() - tracer.writer = writer - total_time = 3 - concurrency = 100 - end_time = time.time() + total_time + for sample_rate in [0.1, 0.25, 0.5, 1]: + tracer = Tracer() + tracer.configure(sampler=AllSampler(), priority_sampler=RateByServiceSampler(sample_rate)) + tracer.writer = writer - # Let's sample to a multiple of BUFFER_SIZE, so that we can pre-populate the buffer - tps = 15 * ThroughputSampler.BUFFER_SIZE - tracer.sampler = ThroughputSampler(tps) + random.seed(1234) - threads = [] + iterations = int(1e4 / sample_rate) - def run_simulation(tracer, end_time): - while time.time() < end_time: - s = tracer.trace("whatever") - s.finish() - # ~1000 traces per s per thread - time.sleep(0.001) + for i in range(iterations): + span = tracer.trace(i) + span.finish() - for i in range(concurrency): - thread = threading.Thread(target=run_simulation, args=(tracer, end_time)) - threads.append(thread) + samples = writer.pop() + samples_with_high_priority = 0 + for sample in samples: + if sample.get_sampling_priority() > 0: + samples_with_high_priority += 1 - for t in threads: - t.start() + # We must have at least 1 sample, check that it has its sample rate properly assigned + assert samples[0].get_metric(SAMPLE_RATE_METRIC_KEY) is None - for t in threads: - t.join() + # Less than 2% deviation when "enough" iterations (arbitrary, just check if it converges) + deviation = abs(samples_with_high_priority - (iterations * sample_rate)) / (iterations * sample_rate) + assert deviation < 0.02, "Deviation too high %f with sample_rate %f" % (deviation, sample_rate) - traces = writer.pop() + def test_set_sample_rates_from_json(self): + cases = { + '{"rate_by_service": {"service:,env:": 1}}': {"service:,env:":1}, + '{"rate_by_service": {"service:,env:": 1, "service:mcnulty,env:dev": 0.33, "service:postgres,env:dev": 0.7}}': {"service:,env:":1, "service:mcnulty,env:dev":0.33, "service:postgres,env:dev":0.7}, + '{"rate_by_service": {"service:,env:": 1, "service:mcnulty,env:dev": 0.25, "service:postgres,env:dev": 0.5, "service:redis,env:prod": 0.75}}': {"service:,env:":1, "service:mcnulty,env:dev": 0.25, "service:postgres,env:dev": 0.5, "service:redis,env:prod": 0.75} + } - got = len(traces) - expected = tps * total_time - error_delta = tps * ThroughputSampler.BUFFER_DURATION + writer = DummyWriter() - assert abs(got - expected) <= error_delta, \ - "Wrong number of traces sampled, %s instead of %s (error_delta > %s)" % (got, expected, error_delta) + tracer = Tracer() + priority_sampler = RateByServiceSampler() + tracer.configure(sampler=AllSampler(), priority_sampler=priority_sampler) + tracer.writer = writer + keys = list(cases) + for k in keys: + case = cases[k] + priority_sampler.set_sample_rates_from_json(k) + rates = {} + for k,v in iteritems(priority_sampler._by_service_samplers): + rates[k] = v.sample_rate + assert case == rates + # It's important to also test in reverse mode for we want to make sure key deletion + # works as well as key insertion (and doing this both ways ensures we trigger both cases) + keys.reverse() + for k in keys: + case = cases[k] + priority_sampler.set_sample_rates_from_json(k) + rates = {} + for k,v in iteritems(priority_sampler._by_service_samplers): + rates[k] = v.sample_rate + assert case == rates diff --git a/tests/test_span.py b/tests/test_span.py index 537bdfea442..8aa61611c75 100644 --- a/tests/test_span.py +++ b/tests/test_span.py @@ -19,6 +19,10 @@ def test_ids(): eq_(s2.span_id, 2) eq_(s2.parent_id, 1) +def test_sampled(): + s = Span(tracer=None, name="span.test") + assert s.sampled + assert s.get_sampling_priority() is None def test_tags(): s = Span(tracer=None, name="test.span") @@ -83,7 +87,7 @@ def test_tags_not_string(): # ensure we can cast as strings class Foo(object): def __repr__(self): - 1/0 + 1 / 0 s = Span(tracer=None, name="test.span") s.set_tag("a", Foo()) @@ -131,7 +135,7 @@ def test_finish_set_span_duration(): def test_traceback_with_error(): s = Span(None, "test.span") try: - 1/0 + 1 / 0 except ZeroDivisionError: s.set_traceback() else: @@ -171,8 +175,43 @@ def test_ctx_mgr(): else: assert 0, "should have failed" +def test_span_priority(): + s = Span(tracer=None, name="test.span", service="s", resource="r") + for i in range(10): + s.set_sampling_priority(i) + eq_(i, s.priority) + eq_(i, s.get_sampling_priority()) + s.set_sampling_priority('this is not a valid integer') + eq_(9, s.priority) + eq_(9, s.get_sampling_priority()) + s.set_sampling_priority(None) + eq_(None, s.priority) + eq_(None, s.get_sampling_priority()) + s.set_sampling_priority(0.0) + eq_(0, s.priority) + eq_(0, s.get_sampling_priority()) + def test_span_to_dict(): - s = Span(tracer=None, name="test.span", service="s", resource="r") + s = Span(tracer=None, name="test.span", service="s", resource="r") + s.span_type = "foo" + s.set_tag("a", "1") + s.set_meta("b", "2") + s.finish() + + d = s.to_dict() + assert d + eq_(d["span_id"], s.span_id) + eq_(d["trace_id"], s.trace_id) + eq_(d["parent_id"], s.parent_id) + eq_(d["meta"], {"a": "1", "b": "2"}) + eq_(d["type"], "foo") + eq_(d["error"], 0) + eq_(type(d["error"]), int) + +def test_span_to_dict_sub(): + parent = Span(tracer=None, name="test.span", service="s", resource="r") + s = Span(tracer=None, name="test.span", service="s", resource="r") + s._parent = parent s.span_type = "foo" s.set_tag("a", "1") s.set_meta("b", "2") @@ -189,7 +228,7 @@ def test_span_to_dict(): eq_(type(d["error"]), int) def test_span_boolean_err(): - s = Span(tracer=None, name="foo.bar", service="s", resource="r") + s = Span(tracer=None, name="foo.bar", service="s", resource="r") s.error = True s.finish() @@ -198,7 +237,25 @@ def test_span_boolean_err(): eq_(d["error"], 1) eq_(type(d["error"]), int) - +def test_span_to_dict_priority(): + for i in range(10): + s = Span(tracer=None, name="test.span", service="s", resource="r") + s.span_type = "foo" + s.set_tag("a", "1") + s.set_meta("b", "2") + s.set_sampling_priority(i) + s.finish() + + d = s.to_dict() + assert d + eq_(d["span_id"], s.span_id) + eq_(d["trace_id"], s.trace_id) + eq_(d["parent_id"], s.parent_id) + eq_(d["meta"], {"a": "1", "b": "2"}) + eq_(d["metrics"], {"_sampling_priority_v1": i}) + eq_(d["type"], "foo") + eq_(d["error"], 0) + eq_(type(d["error"]), int) class DummyTracer(object): def __init__(self): diff --git a/tests/test_tracer.py b/tests/test_tracer.py index 61a9ecc3586..91ae69068e3 100644 --- a/tests/test_tracer.py +++ b/tests/test_tracer.py @@ -375,6 +375,36 @@ def test_tracer_current_span(): eq_(span, tracer.current_span()) +def test_default_provider_get(): + # Tracer Context Provider must return a Context object + # even if empty + tracer = get_dummy_tracer() + ctx = tracer.context_provider.active() + ok_(isinstance(ctx, Context)) + eq_(len(ctx._trace), 0) + + +def test_default_provider_set(): + # The Context Provider can set the current active Context; + # this could happen in distributed tracing + tracer = get_dummy_tracer() + ctx = Context(trace_id=42, span_id=100) + tracer.context_provider.activate(ctx) + span = tracer.trace('web.request') + eq_(span.trace_id, 42) + eq_(span.parent_id, 100) + + +def test_default_provider_trace(): + # Context handled by a default provider must be used + # when creating a trace + tracer = get_dummy_tracer() + span = tracer.trace('web.request') + ctx = tracer.context_provider.active() + eq_(len(ctx._trace), 1) + eq_(span._context, ctx) + + def test_start_span(): # it should create a root Span tracer = get_dummy_tracer()