Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sampling] add distributed tracing capabilities #325

Closed
wants to merge 41 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
6bef657
[core] Tracer Context Provider is available via public API; a context…
Sep 1, 2017
c5d9620
[asyncio] honors the Context Provider public API
Sep 1, 2017
0f474e9
[gevent] honors the Context Provider public API
Sep 1, 2017
1a88131
[tornado] use the exposed context_provider alias
Sep 1, 2017
1a43e6f
[distributed sampling] introducing func to decide early to sample or …
ufoot Aug 3, 2017
fcfa9ce
[distributed sampling] added test
ufoot Aug 4, 2017
6d2b167
[distributed sampled] forwarding is_sampled parameter
ufoot Aug 4, 2017
dd829db
[distributed tracing] made sampling and distributed sampling orthogon…
ufoot Aug 4, 2017
16a0371
[distributed tracing] added a test for distributed sampling
ufoot Aug 10, 2017
c055da6
[distributed tracing] fixed tests and using random approach for distr…
ufoot Aug 11, 2017
bf9bee6
[distributed tracing] fixed aiohttp distributed sampling, added test
ufoot Aug 11, 2017
9dc5e9e
[distributed tracing] forwarding distributed_sampled attribute
ufoot Aug 11, 2017
c433275
[distributed tracing] setting distributed_sampled only on root spans
ufoot Aug 11, 2017
02f2c72
[distributed sampling] fixed celery tests
ufoot Aug 11, 2017
72800da
[distributed sampling] transition from is_sampled to sampling_priority
ufoot Aug 14, 2017
8648258
[distributed sampling] udpated tests
ufoot Aug 14, 2017
99e4967
[distributed tracing] (re)fixed celery tests
ufoot Aug 14, 2017
0f76d49
[distributed tracing] fixed encoding tests
ufoot Aug 16, 2017
abaa8d3
[distributed tracing] yet more tests fixes
ufoot Aug 16, 2017
d355cea
[distributed tracing] prepared test to control the JSON content
ufoot Aug 18, 2017
b258765
Drop deprecated ThroughputSampler
LotharSee Aug 22, 2017
00cb9f6
Revamp sampling with propagation by context
LotharSee Aug 23, 2017
52442d5
[distributed sampling] fixing tests errors
ufoot Aug 31, 2017
7f85e15
[distributed sampling] using feedback from agent to set the sampling …
ufoot Sep 1, 2017
5a0b09a
[distributed sampling] fixing Python 3 bytes to str decoding issue
ufoot Sep 4, 2017
3d91855
[distributed sampling] back to 0.3 API as agent is not ready yet
ufoot Sep 5, 2017
d95d117
[distributed sampling] using no priority sampler by default
ufoot Sep 7, 2017
2c9275b
[distributed sampling] added tests for set/get priority on spans
ufoot Sep 7, 2017
b4b97e9
[distributed sampling] added sampler test
ufoot Sep 7, 2017
d6e0961
[distributed sampling] fixed aiohttp and django tests
ufoot Sep 11, 2017
e8f4387
[distributed sampling] aiohttp integration skips distributed meta inf…
ufoot Sep 18, 2017
0286531
[distributed sampling] using 0.4 endpoint when distributed sampling i…
ufoot Sep 21, 2017
97c5984
[distributed tracing] fixed tracer initializer
ufoot Sep 22, 2017
73f42a9
[distributed sampling] fix priority sampling handling in tracer confi…
ufoot Sep 22, 2017
b9b9d87
[distributed sampling] fixing key deletion in rates by sample
ufoot Sep 25, 2017
5b9d9d2
[distributed sampling] fixed sampler test for Python 2/3 compat
ufoot Sep 25, 2017
9f9b269
[distributed sampling] generalized downgrade mecanism
ufoot Sep 27, 2017
a69f9c9
[distributed sampling] setting pylons service earlier for better prio…
ufoot Sep 28, 2017
61ad28c
[distributed tracing] fixing API downgrade
ufoot Sep 28, 2017
48897c0
[distributed sampling] adding hint to upgrade agent when protocol is …
ufoot Sep 29, 2017
7a7804a
[distributed sampling] setting pyscopg service as soon as possible
ufoot Oct 5, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 43 additions & 16 deletions ddtrace/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,40 +12,67 @@

TRACE_COUNT_HEADER = 'X-Datadog-Trace-Count'

_VERSIONS = {'v0.4': {'traces': '/v0.4/traces',
'services': '/v0.4/services',
'compatibility_mode': False,
'fallback': 'v0.3'},
'v0.3': {'traces': '/v0.3/traces',
'services': '/v0.3/services',
'compatibility_mode': False,
'fallback': 'v0.2'},
'v0.2': {'traces': '/v0.2/traces',
'services': '/v0.2/services',
'compatibility_mode': True,
'fallback': None}}

class API(object):
"""
Send data to the trace agent using the HTTP protocol and JSON format
"""
def __init__(self, hostname, port, headers=None, encoder=None):
def __init__(self, hostname, port, headers=None, encoder=None, priority_sampling=False):
self.hostname = hostname
self.port = port
self._traces = '/v0.3/traces'
self._services = '/v0.3/services'
self._compatibility_mode = False
self._encoder = encoder or get_encoder()

# overwrite the Content-type with the one chosen in the Encoder
self._headers = headers or {}
self._version = None

if priority_sampling:
self._set_version('v0.4', encoder=encoder)
else:
self._set_version('v0.3', encoder=encoder)

self._headers.update({
'Content-Type': self._encoder.content_type,
'Datadog-Meta-Lang': 'python',
'Datadog-Meta-Lang-Version': PYTHON_VERSION,
'Datadog-Meta-Lang-Interpreter': PYTHON_INTERPRETER,
'Datadog-Meta-Tracer-Version': ddtrace.__version__,
})

def _set_version(self, version, encoder=None):
if version not in _VERSIONS:
version = 'v0.2'
if version == self._version:
return
self._version = version
self._traces = _VERSIONS[version]['traces']
self._services = _VERSIONS[version]['services']
self._fallback = _VERSIONS[version]['fallback']
self._compatibility_mode = _VERSIONS[version]['compatibility_mode']
if self._compatibility_mode:
self._encoder = JSONEncoder()
else:
self._encoder = encoder or get_encoder()
# overwrite the Content-type with the one chosen in the Encoder
self._headers.update({'Content-Type': self._encoder.content_type})

def _downgrade(self):
"""
Downgrades the used encoder and API level. This method must fallback to a safe
encoder and API, so that it will success despite users' configurations. This action
ensures that the compatibility mode is activated so that the downgrade will be
executed only once.
"""
self._compatibility_mode = True
self._traces = '/v0.2/traces'
self._services = '/v0.2/services'
self._encoder = JSONEncoder()
self._headers.update({'Content-Type': self._encoder.content_type})
self._set_version(self._fallback)

def send_traces(self, traces):
if not traces:
Expand All @@ -55,8 +82,8 @@ def send_traces(self, traces):
response = self._put(self._traces, data, len(traces))

# the API endpoint is not available so we should downgrade the connection and re-try the call
if response.status in [404, 415] and self._compatibility_mode is False:
log.debug('calling the endpoint "%s" but received %s; downgrading the API', self._traces, response.status)
if response.status in [404, 415] and self._fallback:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There @palazzem I'd like to have your opinion on this. I thought it was OK to cascade down from v0.4 to v0.3 to v0.2 but I'm open to other possibilities. The advantage of doing this is that here I just care about the endpoint, but the question of knowing wether we have a JSON as an answer or a plain OK\n string is done later. This enables low coupling between this chunk of code and the one handling the JSON.

log.debug('calling endpoint "%s" but received %s; downgrading API', self._traces, response.status)
self._downgrade()
return self.send_traces(traces)

Expand All @@ -73,8 +100,8 @@ def send_services(self, services):
response = self._put(self._services, data)

# the API endpoint is not available so we should downgrade the connection and re-try the call
if response.status in [404, 415] and self._compatibility_mode is False:
log.debug('calling the endpoint "%s" but received 404; downgrading the API', self._services)
if response.status in [404, 415] and self._fallback:
log.debug('calling endpoint "%s" but received %s; downgrading API', self._services, response.status)
self._downgrade()
return self.send_services(services)

Expand Down
1 change: 1 addition & 0 deletions ddtrace/constants.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
FILTERS_KEY = 'FILTERS'
SAMPLING_PRIORITY_KEY = '_sampling_priority_v1'
49 changes: 35 additions & 14 deletions ddtrace/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,31 @@ class Context(object):

This data structure is thread-safe.
"""
def __init__(self, trace_id=None, span_id=None, sampled=True):
def __init__(self, trace_id=None, span_id=None, sampled=True, sampling_priority=None):
"""
Initialize a new thread-safe ``Context``.

:param int trace_id: trace_id of parent span
:param int span_id: span_id of parent span
"""
self._trace = []
self._sampled = sampled
self._finished_spans = 0
self._current_span = None
self._lock = threading.Lock()
self._parent_span_id = span_id

self._parent_trace_id = trace_id
self._parent_span_id = span_id
self._sampled = sampled
self._sampling_priority = sampling_priority

def get_context_attributes(self):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to change the private API? that was set as internal because a possible refactoring could make the Context immutable so we can get rid of the lock. Anyway, it's something to keep in mind. Let me think of a possible alternative.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't especially defend nor like this API.
What is sure, it is that we will need somewhere a public way to access the current tuple (tid, sid, priority) for remote propagation (both for our own integrations and for customers to instrument their own inter service propagation). Do you agree too?
After that, it could live anywhere.

"""
Return the context propagatable attributes.

def _get_parent_span_ids(self):
""" Returns tuple of base trace_id, span_id for distributed tracing."""
Useful to propagate context to an external element.
"""
with self._lock:
return self._parent_trace_id, self._parent_span_id
return self._parent_trace_id, self._parent_span_id, self._sampling_priority

def get_current_span(self):
"""
Expand All @@ -50,13 +56,28 @@ def get_current_span(self):
with self._lock:
return self._current_span

def _set_current_span(self, span):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the class is meant to be thread-safe, whatever API we provide, it should use the internal lock for any change even if it's an internal API.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was just not to dupe code between add_span and close_span which themselves set the lock.
If we think that's too dangerous even if internal, we can inline it in these two functions.

"""
Set current span internally.

Non-safe if not used with a lock. For internal Context usage only.
"""
self._current_span = span
if span:
self._parent_trace_id = span.trace_id
self._parent_span_id = span.span_id
self._sampled = span.sampled
self._sampling_priority = span.get_sampling_priority()
else:
self._parent_span_id = None

def add_span(self, span):
"""
Add a span to the context trace list, keeping it as the last active span.
"""
with self._lock:
self._current_span = span
self._sampled = span.sampled
self._set_current_span(span)

self._trace.append(span)
span._context = self

Expand All @@ -67,7 +88,7 @@ def close_span(self, span):
"""
with self._lock:
self._finished_spans += 1
self._current_span = span._parent
self._set_current_span(span._parent)

# notify if the trace is not closed properly; this check is executed only
# if the tracer debug_logging is enabled and when the root span is closed
Expand Down Expand Up @@ -114,9 +135,11 @@ def get(self):
sampled = self._sampled
# clean the current state
self._trace = []
self._sampled = False
self._finished_spans = 0
self._current_span = None
self._parent_trace_id = None
self._parent_span_id = None
self._sampling_priority = None
self._sampled = True
return trace, sampled
else:
return None, None
Expand Down Expand Up @@ -145,9 +168,7 @@ def set(self, ctx):
def get(self):
ctx = getattr(self._locals, 'context', None)
if not ctx:
# create a new Context if it's not available; this action
# is done once because the Context has the reset() method
# to reuse the same instance
# create a new Context if it's not available
ctx = Context()
self._locals.context = ctx

Expand Down
32 changes: 20 additions & 12 deletions ddtrace/contrib/aiohttp/middlewares.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from ..asyncio import context_provider
from ...ext import AppTypes, http
from ...compat import stringify
from ...context import Context


CONFIG_KEY = 'datadog_trace'
Expand All @@ -11,6 +12,7 @@

PARENT_TRACE_HEADER_ID = 'x-datadog-trace-id'
PARENT_SPAN_HEADER_ID = 'x-datadog-parent-id'
SAMPLING_PRIORITY_HEADER_ID = 'x-datadog-sampling-priority'


@asyncio.coroutine
Expand All @@ -29,24 +31,30 @@ def attach_context(request):
service = app[CONFIG_KEY]['service']
distributed_tracing = app[CONFIG_KEY]['distributed_tracing_enabled']

context = tracer.get_call_context()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically this must be None. If it's not and we have a distributed_tracing == True, it means that we're going to create a new Context object using the new one as a child_of. In the current API, having two different Context objects in one request, means having two different traces (and it's wrong).

Also, creating manually a Context, doesn't set it as active. This means that the request_span lives in a Context that is not propagated and so any attempt to use start_span() or trace(), will add the new Span to a different (wrong) context.

Copy link
Contributor

@LotharSee LotharSee Aug 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When implementing, I only looked at ThreadLocalContext (which, when they are no context, create one and puts it in the current thread local) and assumed a similar behavior (I should have checked!).

So what is the proper thing to do there?


# Create a new context based on the propagated information
# Do not fill context with distributed sampling if the tracer is disabled
# because the would call the callee to generate references to data which
# has never been sent to agent.
if tracer.enabled and distributed_tracing:
trace_id = int(request.headers.get(PARENT_TRACE_HEADER_ID, 0))
parent_span_id = int(request.headers.get(PARENT_SPAN_HEADER_ID, 0))
sampling_priority = request.headers.get(SAMPLING_PRIORITY_HEADER_ID)
# keep sampling priority as None if not propagated, to support older client versions on the parent side
if sampling_priority:
sampling_priority = int(sampling_priority)

context = Context(trace_id=trace_id, span_id=parent_span_id, sampling_priority=sampling_priority)

# trace the handler
request_span = tracer.trace(
request_span = tracer.start_span(
'aiohttp.request',
service=service,
span_type=http.TYPE,
child_of=context,
)

if distributed_tracing:
# set parent trace/span IDs if present:
# http://pypi.datadoghq.com/trace/docs/#distributed-tracing
parent_trace_id = request.headers.get(PARENT_TRACE_HEADER_ID)
if parent_trace_id is not None:
request_span.trace_id = int(parent_trace_id)

parent_span_id = request.headers.get(PARENT_SPAN_HEADER_ID)
if parent_span_id is not None:
request_span.parent_id = int(parent_span_id)

# attach the context and the root span to the request; the Context
# may be freely used by the application code
request[REQUEST_CONTEXT_KEY] = request_span.context
Expand Down
18 changes: 8 additions & 10 deletions ddtrace/contrib/asyncio/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ def set_call_context(task, ctx):
"""
Updates the ``Context`` for the given Task. Useful when you need to
pass the context among different tasks.

This method is available for backward-compatibility. Use the
``AsyncioContextProvider`` API to set the current active ``Context``.
"""
setattr(task, CONTEXT_ATTR, ctx)

Expand Down Expand Up @@ -74,7 +77,7 @@ def _wrap_executor(fn, args, tracer, ctx):
# the AsyncioContextProvider knows that this is a new thread
# so it is legit to pass the Context in the thread-local storage;
# fn() will be executed outside the asyncio loop as a synchronous code
tracer._context_provider._local.set(ctx)
tracer.context_provider.activate(ctx)
return fn(*args)


Expand Down Expand Up @@ -104,16 +107,11 @@ def _wrapped_create_task(wrapped, instance, args, kwargs):

ctx = getattr(current_task, CONTEXT_ATTR, None)
span = ctx.get_current_span() if ctx else None
if span:
parent_trace_id, parent_span_id = span.trace_id, span.span_id
elif ctx:
parent_trace_id, parent_span_id = ctx._get_parent_span_ids()
else:
parent_trace_id = parent_span_id = None

if parent_trace_id and parent_span_id:
if ctx:
parent_trace_id, parent_span_id, sampling_priority = ctx.get_context_attributes()

# current task has a context, so parent a new context to the base context
new_ctx = Context(trace_id=parent_trace_id, span_id=parent_span_id)
new_ctx = Context(trace_id=parent_trace_id, span_id=parent_span_id, sampling_priority=sampling_priority)
set_call_context(new_task, new_ctx)

return new_task
22 changes: 21 additions & 1 deletion ddtrace/contrib/asyncio/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,28 @@ class AsyncioContextProvider(DefaultContextProvider):
execution. It must be used in asynchronous programming that relies
in the built-in ``asyncio`` library. Framework instrumentation that
is built on top of the ``asyncio`` library, can use this provider.

This Context Provider inherits from ``DefaultContextProvider`` because
it uses a thread-local storage when the ``Context`` is propagated to
a different thread, than the one that is running the async loop.
"""
def __call__(self, loop=None):
def activate(self, context, loop=None):
"""Sets the scoped ``Context`` for the current running ``Task``.
"""
try:
loop = loop or asyncio.get_event_loop()
except RuntimeError:
# detects if a loop is available in the current thread;
# This happens when a new thread is created from the one that is running
# the async loop
return self._local.set(context)

# the current unit of work (if tasks are used)
task = asyncio.Task.current_task(loop=loop)
setattr(task, CONTEXT_ATTR, context)
return context

def active(self, loop=None):
"""
Returns the scoped Context for this execution flow. The ``Context`` uses
the current task as a carrier so if a single task is used for the entire application,
Expand Down
8 changes: 6 additions & 2 deletions ddtrace/contrib/gevent/greenlet.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@ def __init__(self, *args, **kwargs):
# the context is always available made exception of the main greenlet
if ctx:
# create a new context that inherits the current active span
new_ctx = Context()
new_ctx._sampled = ctx._sampled
# TODO: a better API for Context, should get the tuple at once
new_ctx = Context(
trace_id=ctx._parent_trace_id,
span_id=ctx._parent_span_id,
sampled=ctx._sampled,
)
new_ctx._current_span = ctx._current_span
setattr(self, CONTEXT_ATTR, new_ctx)
24 changes: 18 additions & 6 deletions ddtrace/contrib/gevent/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,19 @@ class GeventContextProvider(BaseContextProvider):
in the ``gevent`` library. Framework instrumentation that uses the
gevent WSGI server (or gevent in general), can use this provider.
"""
def __call__(self):
def activate(self, context):
"""Sets the scoped ``Context`` for the current running ``Greenlet``.
"""
current_g = gevent.getcurrent()
if current_g is not None:
setattr(current_g, CONTEXT_ATTR, context)
return context

def active(self):
"""
Returns the scoped ``Context`` for this execution flow. The ``Context``
uses the ``Greenlet`` class as a carrier, and everytime a greenlet
is created it receives the "parent" context. The main greenlet
will never have an attached ``Context``.
is created it receives the "parent" context.
"""
current_g = gevent.getcurrent()
ctx = getattr(current_g, CONTEXT_ATTR, None)
Expand All @@ -29,9 +36,14 @@ def __call__(self):
return ctx

# the Greenlet doesn't have a Context so it's created and attached
# unless it's the main greenlet; in that case we must be sure
# that no Context is generated
if current_g.parent:
# TODO: previous implementation avoided to add a Context to the main
# greenlet because it could have side-effects when switching back
# and forth between different executions. This results in issues such
# as: https://github.com/DataDog/dd-trace-py/issues/309
# and is required for Distributed Tracing when providing a new arbitrary
# Context. On the other hand, it's imperative to double check if there
# are side effects.
if current_g:
ctx = Context()
setattr(current_g, CONTEXT_ATTR, ctx)
return ctx
Loading