Skip to content

Commit

Permalink
Merge pull request #116 from neuro-inc/add_new_trace_context_manager
Browse files Browse the repository at this point in the history
add new_trace and new_sampled_trace context managers
  • Loading branch information
zubenkoivan authored Apr 22, 2021
2 parents 6a64b5e + 8dbd0b4 commit 7a16dde
Show file tree
Hide file tree
Showing 3 changed files with 233 additions and 42 deletions.
12 changes: 10 additions & 2 deletions platform_logging/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@
from typing import Any, Dict, Union

from .trace import (
create_zipkin_tracer,
make_request_logging_trace_config,
make_sentry_trace_config,
new_sampled_trace,
new_trace,
new_trace_cm,
notrace,
setup_sentry,
setup_zipkin,
setup_zipkin_tracer,
trace,
trace_cm,
)
from .version import VERSION

Expand All @@ -19,13 +23,17 @@
__all__ = [
"init_logging",
"HideLessThanFilter",
"create_zipkin_tracer",
"setup_zipkin_tracer",
"make_request_logging_trace_config",
"make_sentry_trace_config",
"notrace",
"setup_sentry",
"setup_zipkin",
"trace",
"trace_cm",
"new_sampled_trace",
"new_trace",
"new_trace_cm",
]


Expand Down
154 changes: 135 additions & 19 deletions platform_logging/trace.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import functools
import logging
from contextlib import asynccontextmanager
Expand All @@ -9,11 +10,14 @@
Awaitable,
Callable,
Iterable,
List,
Optional,
Type,
TypeVar,
cast,
)

import aiohttp
import aiozipkin
import sentry_sdk
from aiohttp import (
Expand All @@ -25,7 +29,10 @@
web,
)
from aiohttp.web import AbstractRoute
from aiozipkin.aiohttp_helpers import set_context_value, zipkin_context
from aiozipkin.span import SpanAbc
from aiozipkin.transport import Transport
from sentry_sdk import Hub
from sentry_sdk.integrations.aiohttp import AioHttpIntegration
from yarl import URL

Expand All @@ -41,7 +48,58 @@


@asynccontextmanager
async def zipkin_tracing_cm(name: str) -> AsyncIterator[Optional[SpanAbc]]:
async def new_zipkin_trace_cm(
name: str, sampled: Optional[bool]
) -> AsyncIterator[Optional[SpanAbc]]:
tracer = CURRENT_TRACER.get(None)
if tracer is None:
# No tracer is set,
# the call is made from unittest most likely.
yield None
return

span = tracer.new_trace(sampled=sampled)

with set_context_value(zipkin_context, span.context):
with span:
span.name(name)
reset_token = CURRENT_SPAN.set(span)
try:
yield span
finally:
CURRENT_SPAN.reset(reset_token)


@asynccontextmanager
async def new_sentry_trace_cm(
name: str, sampled: Optional[bool]
) -> AsyncIterator[sentry_sdk.tracing.Span]:
with Hub(Hub.current) as hub:
with hub.configure_scope() as scope:
scope.clear_breadcrumbs()

with hub.start_transaction(name=name, sampled=sampled) as transaction:
try:
yield transaction
except asyncio.CancelledError:
transaction.set_status("cancelled")
raise
except Exception as exc:
hub.capture_exception(error=exc)
raise


@asynccontextmanager
async def new_trace_cm(
name: str, sampled: Optional[bool] = None
) -> AsyncIterator[None]:
async with new_zipkin_trace_cm(name, sampled):
async with new_sentry_trace_cm(name, sampled):
yield


@asynccontextmanager
async def zipkin_trace_cm(name: str) -> AsyncIterator[Optional[SpanAbc]]:
tracer = CURRENT_TRACER.get(None)
if tracer is None:
# No tracer is set,
Expand All @@ -50,9 +108,10 @@ async def zipkin_tracing_cm(name: str) -> AsyncIterator[Optional[SpanAbc]]:
return
try:
span = CURRENT_SPAN.get()
child = tracer.new_child(span.context)
except LookupError:
child = tracer.new_trace(sampled=False)
yield None
return
child = tracer.new_child(span.context)
reset_token = CURRENT_SPAN.set(child)
try:
with child:
Expand All @@ -63,7 +122,7 @@ async def zipkin_tracing_cm(name: str) -> AsyncIterator[Optional[SpanAbc]]:


@asynccontextmanager
async def sentry_tracing_cm(
async def sentry_trace_cm(
name: str,
) -> AsyncIterator[Optional[sentry_sdk.tracing.Span]]:
parent_span = sentry_sdk.Hub.current.scope.span
Expand All @@ -73,26 +132,62 @@ async def sentry_tracing_cm(
yield None
else:
with parent_span.start_child(op="call", description=name) as child:
yield child
try:
yield child
except asyncio.CancelledError:
child.set_status("cancelled")
raise
except Exception as exc:
hub = child.hub or Hub.current
hub.capture_exception(error=exc)
raise


@asynccontextmanager
async def tracing_cm(name: str) -> AsyncIterator[None]:
async with zipkin_tracing_cm(name):
async with sentry_tracing_cm(name):
async def trace_cm(name: str) -> AsyncIterator[None]:
async with zipkin_trace_cm(name):
async with sentry_trace_cm(name):
yield


def trace(func: T) -> T:
@functools.wraps(func)
async def tracer(*args: Any, **kwargs: Any) -> Any:
name = func.__qualname__
async with tracing_cm(name):
async with trace_cm(name):
return await func(*args, **kwargs)

return cast(T, tracer)


def new_trace(func: T) -> T:
async def _tracer(*args: Any, **kwargs: Any) -> Any:
name = func.__qualname__
async with new_trace_cm(name):
return await func(*args, **kwargs)

@functools.wraps(func)
async def tracer(*args: Any, **kwargs: Any) -> Any:
# Create a task to wrap method call to avoid scope data leakage between calls.
return await asyncio.create_task(_tracer(*args, **kwargs))

return cast(T, tracer)


def new_sampled_trace(func: T) -> T:
async def _tracer(*args: Any, **kwargs: Any) -> Any:
name = func.__qualname__
async with new_trace_cm(name, sampled=True):
return await func(*args, **kwargs)

@functools.wraps(func)
async def tracer(*args: Any, **kwargs: Any) -> Any:
# Create a task to wrap method call to avoid scope data leakage between calls.
return await asyncio.create_task(_tracer(*args, **kwargs))

return cast(T, tracer)


def notrace(func: T) -> T:
@functools.wraps(func)
async def tracer(*args: Any, **kwargs: Any) -> Any:
Expand All @@ -109,29 +204,39 @@ async def tracer(*args: Any, **kwargs: Any) -> Any:
async def store_zipkin_span_middleware(
request: web.Request, handler: Handler
) -> web.StreamResponse: # pragma: no cover
tracer = aiozipkin.get_tracer(request.app)
span = aiozipkin.request_span(request)
CURRENT_TRACER.set(tracer)
CURRENT_SPAN.set(span)
return await handler(request)


async def create_zipkin_tracer(
app_name: str, host: str, port: int, zipkin_url: URL, sample_rate: float
) -> aiozipkin.Tracer:
def setup_zipkin_tracer(
app_name: str,
host: str,
port: int,
zipkin_url: URL,
sample_rate: float = 0.01,
send_interval: float = 5,
ignored_exceptions: Optional[List[Type[Exception]]] = None,
) -> None:
endpoint = aiozipkin.create_endpoint(app_name, ipv4=host, port=port)

return await aiozipkin.create(
str(zipkin_url / "api/v2/spans"), endpoint, sample_rate=sample_rate
)
sampler = aiozipkin.Sampler(sample_rate=sample_rate)
transport = Transport(str(zipkin_url / "api/v2/spans"), send_interval=send_interval)
tracer = aiozipkin.Tracer(transport, sampler, endpoint, ignored_exceptions)
CURRENT_TRACER.set(tracer)


def setup_zipkin(
app: web.Application,
tracer: aiozipkin.Tracer,
*,
skip_routes: Optional[Iterable[AbstractRoute]] = None,
) -> None: # pragma: no cover
tracer = CURRENT_TRACER.get(None)

if tracer is None:
raise Exception(
"Tracer was not initialized. Please setup tracer before calling this method"
)

aiozipkin.setup(app, tracer, skip_routes=skip_routes)
app.middlewares.append(store_zipkin_span_middleware)

Expand All @@ -148,6 +253,17 @@ def setup_sentry(
sentry_sdk.set_tag("cluster", cluster_name)


def make_zipkin_trace_config() -> aiohttp.TraceConfig: # pragma: no cover
tracer = CURRENT_TRACER.get(None)

if tracer is None:
raise Exception(
"Tracer was not initialized. Please setup tracer before calling this method"
)

return aiozipkin.make_trace_config(tracer)


def make_sentry_trace_config() -> TraceConfig:
"""
Creates aiohttp.TraceConfig with enabled Sentry distributive tracing
Expand Down
Loading

0 comments on commit 7a16dde

Please sign in to comment.