Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zipkin exporter v2 api support for protobuf format #1318

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
a756db0
Zipkin exporter v2 api support for protobuf format
robwknox Nov 2, 2020
0197866
adding env var OTEL_EXPORTER_ZIPKIN_TRANSPORT_FORMAT and tidying up T…
robwknox Nov 2, 2020
131cbea
lint corrections
robwknox Nov 2, 2020
bd7eef7
moving auto-gen'd files to separate dir in order to add exclusion ent…
robwknox Nov 2, 2020
a8a2407
flake8 config update to exclude new auto-gen files
robwknox Nov 3, 2020
f9c4ff8
pylint hints to help with auto-gen protobuf objects
robwknox Nov 3, 2020
d74518d
Merge branch 'master' into 1175_zipkin_exporter_v2_protobuf_support
robwknox Nov 3, 2020
8ce0dc9
documentation update
robwknox Nov 3, 2020
b6d7e6b
switching exporter to use OTEL Configuration class instead of direct …
robwknox Nov 3, 2020
17bf5f7
refactor of SPAN_KIND map structure and loosening of allowed protobuf…
robwknox Nov 3, 2020
980b028
pylint disable directive
robwknox Nov 3, 2020
c6ce2a4
docstring format correction
robwknox Nov 3, 2020
8cb1c60
Merge branch 'master' into 1175_zipkin_exporter_v2_protobuf_support
robwknox Nov 3, 2020
1bcc834
Merge branch 'master' into 1175_zipkin_exporter_v2_protobuf_support
robwknox Nov 6, 2020
bb2fec4
Merge branch 'master' into 1175_zipkin_exporter_v2_protobuf_support
robwknox Nov 9, 2020
4c50f2c
Merge branch 'master' into 1175_zipkin_exporter_v2_protobuf_support
robwknox Nov 13, 2020
d440819
changelog
robwknox Nov 16, 2020
9b0f8a3
Merge branch 'master' into 1175_zipkin_exporter_v2_protobuf_support
robwknox Nov 17, 2020
b1c95f6
Merge branch 'master' into 1175_zipkin_exporter_v2_protobuf_support
lzchen Nov 18, 2020
d825b64
Merge branch 'master' into 1175_zipkin_exporter_v2_protobuf_support
lzchen Nov 19, 2020
05ef05b
Merge branch 'master' into 1175_zipkin_exporter_v2_protobuf_support
robwknox Nov 19, 2020
5fa2da0
Merge branch 'master' into 1175_zipkin_exporter_v2_protobuf_support
lzchen Nov 23, 2020
78d6163
Merge branch 'master' into 1175_zipkin_exporter_v2_protobuf_support
lzchen Nov 23, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions exporter/opentelemetry-exporter-zipkin/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ package_dir=
=src
packages=find_namespace:
install_requires =
protobuf == 3.12.2
robwknox marked this conversation as resolved.
Show resolved Hide resolved
requests ~= 2.7
opentelemetry-api == 0.15.dev0
opentelemetry-sdk == 0.15.dev0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,25 +64,37 @@
import json
import logging
import os
from typing import Optional, Sequence
from typing import Optional, Sequence, Union
from urllib.parse import urlparse

import requests

from opentelemetry.exporter.zipkin import zipkin_pb2
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
from opentelemetry.trace import Span, SpanContext, SpanKind

TRANSPORT_FORMAT_JSON = "application/json"
TRANSPORT_FORMAT_PROTOBUF = "application/x-protobuf"
robwknox marked this conversation as resolved.
Show resolved Hide resolved

DEFAULT_RETRY = False
DEFAULT_URL = "http://localhost:9411/api/v2/spans"
DEFAULT_MAX_TAG_VALUE_LENGTH = 128
ZIPKIN_HEADERS = {"Content-Type": "application/json"}

SPAN_KIND_MAP = {
SpanKind.INTERNAL: None,
SpanKind.SERVER: "SERVER",
SpanKind.CLIENT: "CLIENT",
SpanKind.PRODUCER: "PRODUCER",
SpanKind.CONSUMER: "CONSUMER",
TRANSPORT_FORMAT_JSON: {
SpanKind.INTERNAL: None,
SpanKind.SERVER: "SERVER",
SpanKind.CLIENT: "CLIENT",
SpanKind.PRODUCER: "PRODUCER",
SpanKind.CONSUMER: "CONSUMER",
},
TRANSPORT_FORMAT_PROTOBUF: {
SpanKind.INTERNAL: zipkin_pb2.Span.Kind.SPAN_KIND_UNSPECIFIED,
SpanKind.SERVER: zipkin_pb2.Span.Kind.SERVER,
SpanKind.CLIENT: zipkin_pb2.Span.Kind.CLIENT,
SpanKind.PRODUCER: zipkin_pb2.Span.Kind.PRODUCER,
SpanKind.CONSUMER: zipkin_pb2.Span.Kind.CONSUMER,
}
}

SUCCESS_STATUS_CODES = (200, 202)
Expand All @@ -100,6 +112,7 @@ class ZipkinSpanExporter(SpanExporter):
ipv4: Primary IPv4 address associated with this connection.
ipv6: Primary IPv6 address associated with this connection.
retry: Set to True to configure the exporter to retry on failure.
transport_format: transport interchange format to use
"""

def __init__(
Expand All @@ -110,6 +123,7 @@ def __init__(
ipv6: Optional[str] = None,
retry: Optional[str] = DEFAULT_RETRY,
max_tag_value_length: Optional[int] = DEFAULT_MAX_TAG_VALUE_LENGTH,
transport_format: Union[TRANSPORT_FORMAT_JSON, TRANSPORT_FORMAT_PROTOBUF, None] = TRANSPORT_FORMAT_JSON
):
self.service_name = service_name
if url is None:
Expand All @@ -125,11 +139,13 @@ def __init__(
self.ipv6 = ipv6
self.retry = retry
self.max_tag_value_length = max_tag_value_length
self.transport_format = transport_format
robwknox marked this conversation as resolved.
Show resolved Hide resolved

def export(self, spans: Sequence[Span]) -> SpanExportResult:
zipkin_spans = self._translate_to_zipkin(spans)
result = requests.post(
url=self.url, data=json.dumps(zipkin_spans), headers=ZIPKIN_HEADERS
url=self.url,
data=self._translate_to_transport_format(spans),
headers={"Content-Type": self.transport_format}
)

if result.status_code not in SUCCESS_STATUS_CODES:
Expand All @@ -147,8 +163,11 @@ def export(self, spans: Sequence[Span]) -> SpanExportResult:
def shutdown(self) -> None:
pass

def _translate_to_zipkin(self, spans: Sequence[Span]):
def _translate_to_transport_format(self, spans: Sequence[Span]):
return self._translate_to_json(spans) \
if self.transport_format == TRANSPORT_FORMAT_JSON else self._translate_to_protobuf(spans)

def _translate_to_json(self, spans: Sequence[Span]):
local_endpoint = {"serviceName": self.service_name, "port": self.port}

if self.ipv4 is not None:
Expand All @@ -165,8 +184,8 @@ def _translate_to_zipkin(self, spans: Sequence[Span]):

# Timestamp in zipkin spans is int of microseconds.
# see: https://zipkin.io/pages/instrumenting.html
start_timestamp_mus = _nsec_to_usec_round(span.start_time)
duration_mus = _nsec_to_usec_round(span.end_time - span.start_time)
start_timestamp_mus = nsec_to_usec_round(span.start_time)
duration_mus = nsec_to_usec_round(span.end_time - span.start_time)

zipkin_span = {
# Ensure left-zero-padding of traceId, spanId, parentId
Expand All @@ -176,7 +195,7 @@ def _translate_to_zipkin(self, spans: Sequence[Span]):
"timestamp": start_timestamp_mus,
"duration": duration_mus,
"localEndpoint": local_endpoint,
"kind": SPAN_KIND_MAP[span.kind],
"kind": SPAN_KIND_MAP[TRANSPORT_FORMAT_JSON][span.kind],
robwknox marked this conversation as resolved.
Show resolved Hide resolved
"tags": self._extract_tags_from_span(span),
"annotations": self._extract_annotations_from_events(
span.events
Expand Down Expand Up @@ -211,7 +230,81 @@ def _translate_to_zipkin(self, spans: Sequence[Span]):
zipkin_span["parentId"] = format(span.parent.span_id, "016x")

zipkin_spans.append(zipkin_span)
return zipkin_spans

return json.dumps(zipkin_spans)
robwknox marked this conversation as resolved.
Show resolved Hide resolved

def _translate_to_protobuf(self, spans: Sequence[Span]):

local_endpoint = zipkin_pb2.Endpoint(service_name=self.service_name, port=self.port)

if self.ipv4 is not None:
local_endpoint.ipv4 = self.ipv4

if self.ipv6 is not None:
local_endpoint.ipv6 = self.ipv6

pbuf_spans = zipkin_pb2.ListOfSpans()

for span in spans:
context = span.get_span_context()
trace_id = context.trace_id.to_bytes(length=16, byteorder="big", signed=False)
span_id = self.format_pbuf_span_id(context.span_id)

# Timestamp in zipkin spans is int of microseconds.
# see: https://zipkin.io/pages/instrumenting.html
start_timestamp_mus = nsec_to_usec_round(span.start_time)
duration_mus = nsec_to_usec_round(span.end_time - span.start_time)

pbuf_span = zipkin_pb2.Span(
trace_id=trace_id,
id=span_id,
name=span.name,
timestamp=start_timestamp_mus,
duration=duration_mus,
local_endpoint=local_endpoint,
kind=SPAN_KIND_MAP[TRANSPORT_FORMAT_PROTOBUF][span.kind],
tags=self._extract_tags_from_span(span)
)

annotations = self._extract_annotations_from_events(span.events)

if annotations is not None:
for annotation in annotations:
pbuf_span.annotations.append(
zipkin_pb2.Annotation(
timestamp=annotation["timestamp"],
value=annotation["value"],
)
)

if span.instrumentation_info is not None:
pbuf_span.tags.update({
"otel.instrumentation_library.name": span.instrumentation_info.name,
"otel.instrumentation_library.version": span.instrumentation_info.version
})

if span.status is not None:
pbuf_span.tags.update({"otel.status_code": str(span.status.status_code.value)})
if span.status.description is not None:
pbuf_span.tags.update({"otel.status_description": span.status.description})

if context.trace_flags.sampled:
pbuf_span.debug = True

if isinstance(span.parent, Span):
pbuf_span.parent_id = self.format_pbuf_span_id(
span.parent.get_span_context().span_id
)
elif isinstance(span.parent, SpanContext):
pbuf_span.parent_id = self.format_pbuf_span_id(span.parent.span_id)

pbuf_spans.spans.append(pbuf_span)

return pbuf_spans.SerializeToString()

@staticmethod
def format_pbuf_span_id(span_id: int):
return span_id.to_bytes(length=8, byteorder="big", signed=False)

def _extract_tags_from_dict(self, tags_dict):
tags = {}
Expand Down Expand Up @@ -251,13 +344,13 @@ def _extract_annotations_from_events(self, events):

annotations.append(
{
"timestamp": _nsec_to_usec_round(event.timestamp),
"timestamp": nsec_to_usec_round(event.timestamp),
"value": json.dumps({event.name: attrs}),
}
)
return annotations


def _nsec_to_usec_round(nsec):
def nsec_to_usec_round(nsec):
"""Round nanoseconds to microseconds"""
return (nsec + 500) // 10 ** 3
Loading