Skip to content

Commit

Permalink
Add support for OTLP/HTTP log exporter (#2462)
Browse files Browse the repository at this point in the history
  • Loading branch information
srikanthccv authored Jun 13, 2022
1 parent 6e282d2 commit e073d4d
Show file tree
Hide file tree
Showing 4 changed files with 626 additions and 1 deletion.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased](https://github.com/open-telemetry/opentelemetry-python/compare/v1.12.0rc1-0.31b0...HEAD)

- `opentelemetry-exporter-otlp-proto-http` Add support for OTLP/HTTP log exporter
([#2462](https://github.com/open-telemetry/opentelemetry-python/pull/2462))
- Fix yield of `None`-valued points
([#2745](https://github.com/open-telemetry/opentelemetry-python/pull/2745))
- Add missing `to_json` methods
Expand Down Expand Up @@ -115,7 +117,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
pages that have moved, see
[#2453](https://github.com/open-telemetry/opentelemetry-python/pull/2453), and
[#2498](https://github.com/open-telemetry/opentelemetry-python/pull/2498).
- `opentelemetry-exporter-otlp-grpc` update SDK dependency to ~1.9.
- `opentelemetry-exporter-otlp-proto-grpc` update SDK dependency to ~1.9.
([#2442](https://github.com/open-telemetry/opentelemetry-python/pull/2442))
- bugfix(auto-instrumentation): attach OTLPHandler to root logger
([#2450](https://github.com/open-telemetry/opentelemetry-python/pull/2450))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import gzip
import logging
import zlib
from io import BytesIO
from os import environ
from typing import Dict, Optional, Sequence
from time import sleep

import requests
from backoff import expo

from opentelemetry.sdk.environment_variables import (
OTEL_EXPORTER_OTLP_CERTIFICATE,
OTEL_EXPORTER_OTLP_COMPRESSION,
OTEL_EXPORTER_OTLP_ENDPOINT,
OTEL_EXPORTER_OTLP_HEADERS,
OTEL_EXPORTER_OTLP_TIMEOUT,
)
from opentelemetry.sdk._logs.export import (
LogExporter,
LogExportResult,
LogData,
)
from opentelemetry.exporter.otlp.proto.http import Compression
from opentelemetry.exporter.otlp.proto.http._log_exporter.encoder import (
_ProtobufEncoder,
)
from opentelemetry.util.re import parse_headers


_logger = logging.getLogger(__name__)


DEFAULT_COMPRESSION = Compression.NoCompression
DEFAULT_ENDPOINT = "http://localhost:4318/"
DEFAULT_LOGS_EXPORT_PATH = "v1/logs"
DEFAULT_TIMEOUT = 10 # in seconds


class OTLPLogExporter(LogExporter):

_MAX_RETRY_TIMEOUT = 64

def __init__(
self,
endpoint: Optional[str] = None,
certificate_file: Optional[str] = None,
headers: Optional[Dict[str, str]] = None,
timeout: Optional[int] = None,
compression: Optional[Compression] = None,
):
self._endpoint = endpoint or _append_logs_path(
environ.get(OTEL_EXPORTER_OTLP_ENDPOINT, DEFAULT_ENDPOINT)
)
self._certificate_file = certificate_file or environ.get(
OTEL_EXPORTER_OTLP_CERTIFICATE, True
)
headers_string = environ.get(OTEL_EXPORTER_OTLP_HEADERS, "")
self._headers = headers or parse_headers(headers_string)
self._timeout = timeout or int(
environ.get(OTEL_EXPORTER_OTLP_TIMEOUT, DEFAULT_TIMEOUT)
)
self._compression = compression or _compression_from_env()
self._session = requests.Session()
self._session.headers.update(self._headers)
self._session.headers.update(
{"Content-Type": _ProtobufEncoder._CONTENT_TYPE}
)
if self._compression is not Compression.NoCompression:
self._session.headers.update(
{"Content-Encoding": self._compression.value}
)
self._shutdown = False

def _export(self, serialized_data: str):
data = serialized_data
if self._compression == Compression.Gzip:
gzip_data = BytesIO()
with gzip.GzipFile(fileobj=gzip_data, mode="w") as gzip_stream:
gzip_stream.write(serialized_data)
data = gzip_data.getvalue()
elif self._compression == Compression.Deflate:
data = zlib.compress(bytes(serialized_data))

return self._session.post(
url=self._endpoint,
data=data,
verify=self._certificate_file,
timeout=self._timeout,
)

@staticmethod
def _retryable(resp: requests.Response) -> bool:
if resp.status_code == 408:
return True
if resp.status_code >= 500 and resp.status_code <= 599:
return True
return False

def export(self, batch: Sequence[LogData]) -> LogExportResult:
# After the call to Shutdown subsequent calls to Export are
# not allowed and should return a Failure result.
if self._shutdown:
_logger.warning("Exporter already shutdown, ignoring batch")
return LogExportResult.FAILURE

serialized_data = _ProtobufEncoder.serialize(batch)

for delay in expo(max_value=self._MAX_RETRY_TIMEOUT):

if delay == self._MAX_RETRY_TIMEOUT:
return LogExportResult.FAILURE

resp = self._export(serialized_data)
# pylint: disable=no-else-return
if resp.status_code in (200, 202):
return LogExportResult.SUCCESS
elif self._retryable(resp):
_logger.warning(
"Transient error %s encountered while exporting logs batch, retrying in %ss.",
resp.reason,
delay,
)
sleep(delay)
continue
else:
_logger.error(
"Failed to export logs batch code: %s, reason: %s",
resp.status_code,
resp.text,
)
return LogExportResult.FAILURE
return LogExportResult.FAILURE

def shutdown(self):
if self._shutdown:
_logger.warning("Exporter already shutdown, ignoring call")
return
self._session.close()
self._shutdown = True


def _compression_from_env() -> Compression:
compression = (
environ.get(OTEL_EXPORTER_OTLP_COMPRESSION, "none").lower().strip()
)
return Compression(compression)


def _append_logs_path(endpoint: str) -> str:
if endpoint.endswith("/"):
return endpoint + DEFAULT_LOGS_EXPORT_PATH
return endpoint + f"/{DEFAULT_LOGS_EXPORT_PATH}"
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Sequence, List

from opentelemetry.proto.collector.logs.v1.logs_service_pb2 import (
ExportLogsServiceRequest,
)
from opentelemetry.proto.logs.v1.logs_pb2 import (
ScopeLogs,
ResourceLogs,
)
from opentelemetry.proto.logs.v1.logs_pb2 import LogRecord as PB2LogRecord
from opentelemetry.exporter.otlp.proto.http.trace_exporter.encoder import (
_encode_instrumentation_scope,
_encode_resource,
_encode_span_id,
_encode_trace_id,
_encode_value,
_encode_attributes,
)


from opentelemetry.sdk._logs.export import LogData


class _ProtobufEncoder:
_CONTENT_TYPE = "application/x-protobuf"

@classmethod
def serialize(cls, batch: Sequence[LogData]) -> str:
return cls.encode(batch).SerializeToString()

@staticmethod
def encode(batch: Sequence[LogData]) -> ExportLogsServiceRequest:
return ExportLogsServiceRequest(
resource_logs=_encode_resource_logs(batch)
)


def _encode_log(log_data: LogData) -> PB2LogRecord:
kwargs = {}
kwargs["time_unix_nano"] = log_data.log_record.timestamp
kwargs["span_id"] = _encode_span_id(log_data.log_record.span_id)
kwargs["trace_id"] = _encode_trace_id(log_data.log_record.trace_id)
kwargs["flags"] = int(log_data.log_record.trace_flags)
kwargs["body"] = _encode_value(log_data.log_record.body)
kwargs["severity_text"] = log_data.log_record.severity_text
kwargs["attributes"] = _encode_attributes(log_data.log_record.attributes)
kwargs["severity_number"] = log_data.log_record.severity_number.value

return PB2LogRecord(**kwargs)


def _encode_resource_logs(batch: Sequence[LogData]) -> List[ResourceLogs]:

sdk_resource_logs = {}

for sdk_log in batch:
sdk_resource = sdk_log.log_record.resource
sdk_instrumentation = sdk_log.instrumentation_scope or None
pb2_log = _encode_log(sdk_log)

if sdk_resource not in sdk_resource_logs.keys():
sdk_resource_logs[sdk_resource] = {sdk_instrumentation: [pb2_log]}
elif sdk_instrumentation not in sdk_resource_logs[sdk_resource].keys():
sdk_resource_logs[sdk_resource][sdk_instrumentation] = [pb2_log]
else:
sdk_resource_logs[sdk_resource][sdk_instrumentation].append(
pb2_log
)

pb2_resource_logs = []

for sdk_resource, sdk_instrumentations in sdk_resource_logs.items():
scope_logs = []
for sdk_instrumentation, pb2_logs in sdk_instrumentations.items():
scope_logs.append(
ScopeLogs(
scope=(_encode_instrumentation_scope(sdk_instrumentation)),
log_records=pb2_logs,
)
)
pb2_resource_logs.append(
ResourceLogs(
resource=_encode_resource(sdk_resource),
scope_logs=scope_logs,
)
)

return pb2_resource_logs
Loading

0 comments on commit e073d4d

Please sign in to comment.