Skip to content

Commit

Permalink
Add LogProcessors implementation (open-telemetry#1916)
Browse files Browse the repository at this point in the history
  • Loading branch information
srikanthccv authored and lzchen committed Oct 15, 2021
1 parent 4d96ccf commit e2963d2
Show file tree
Hide file tree
Showing 5 changed files with 878 additions and 12 deletions.
166 changes: 156 additions & 10 deletions opentelemetry-sdk/src/opentelemetry/sdk/logs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@

import abc
import atexit
import concurrent.futures
import logging
import os
from typing import Any, Optional, cast
import threading
from typing import Any, Callable, Optional, Tuple, Union, cast

from opentelemetry.sdk.environment_variables import (
OTEL_PYTHON_LOG_EMITTER_PROVIDER,
Expand All @@ -27,6 +29,7 @@
from opentelemetry.trace import get_current_span
from opentelemetry.trace.span import TraceFlags
from opentelemetry.util._providers import _load_provider
from opentelemetry.util._time import _time_ns
from opentelemetry.util.types import Attributes

_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -112,6 +115,135 @@ def force_flush(self, timeout_millis: int = 30000):
"""


# Temporary fix until https://github.com/PyCQA/pylint/issues/4098 is resolved
# pylint:disable=no-member
class SynchronousMultiLogProcessor(LogProcessor):
"""Implementation of class:`LogProcessor` that forwards all received
events to a list of log processors sequentially.
The underlying log processors are called in sequential order as they were
added.
"""

def __init__(self):
# use a tuple to avoid race conditions when adding a new log and
# iterating through it on "emit".
self._log_processors = () # type: Tuple[LogProcessor, ...]
self._lock = threading.Lock()

def add_log_processor(self, log_processor: LogProcessor) -> None:
"""Adds a Logprocessor to the list of log processors handled by this instance"""
with self._lock:
self._log_processors = self._log_processors + (log_processor,)

def emit(self, log_data: LogData) -> None:
for lp in self._log_processors:
lp.emit(log_data)

def shutdown(self) -> None:
"""Shutdown the log processors one by one"""
for lp in self._log_processors:
lp.shutdown()

def force_flush(self, timeout_millis: int = 30000) -> bool:
"""Force flush the log processors one by one
Args:
timeout_millis: The maximum amount of time to wait for logs to be
exported. If the first n log processors exceeded the timeout
then remaining log processors will not be flushed.
Returns:
True if all the log processors flushes the logs within timeout,
False otherwise.
"""
deadline_ns = _time_ns() + timeout_millis * 1000000
for lp in self._log_processors:
current_ts = _time_ns()
if current_ts >= deadline_ns:
return False

if not lp.force_flush((deadline_ns - current_ts) // 1000000):
return False

return True


class ConcurrentMultiLogProcessor(LogProcessor):
"""Implementation of :class:`LogProcessor` that forwards all received
events to a list of log processors in parallel.
Calls to the underlying log processors are forwarded in parallel by
submitting them to a thread pool executor and waiting until each log
processor finished its work.
Args:
max_workers: The number of threads managed by the thread pool executor
and thus defining how many log processors can work in parallel.
"""

def __init__(self, max_workers: int = 2):
# use a tuple to avoid race conditions when adding a new log and
# iterating through it on "emit".
self._log_processors = () # type: Tuple[LogProcessor, ...]
self._lock = threading.Lock()
self._executor = concurrent.futures.ThreadPoolExecutor(
max_workers=max_workers
)

def add_log_processor(self, log_processor: LogProcessor):
with self._lock:
self._log_processors = self._log_processors + (log_processor,)

def _submit_and_wait(
self,
func: Callable[[LogProcessor], Callable[..., None]],
*args: Any,
**kwargs: Any,
):
futures = []
for lp in self._log_processors:
future = self._executor.submit(func(lp), *args, **kwargs)
futures.append(future)
for future in futures:
future.result()

def emit(self, log_data: LogData):
self._submit_and_wait(lambda lp: lp.emit, log_data)

def shutdown(self):
self._submit_and_wait(lambda lp: lp.shutdown)

def force_flush(self, timeout_millis: int = 30000) -> bool:
"""Force flush the log processors in parallel.
Args:
timeout_millis: The maximum amount of time to wait for logs to be
exported.
Returns:
True if all the log processors flushes the logs within timeout,
False otherwise.
"""
futures = []
for lp in self._log_processors:
future = self._executor.submit(lp.force_flush, timeout_millis)
futures.append(future)

done_futures, not_done_futures = concurrent.futures.wait(
futures, timeout_millis / 1e3
)

if not_done_futures:
return False

for future in done_futures:
if not future.result():
return False

return True


class OTLPHandler(logging.Handler):
"""A handler class which writes logging records, in OTLP format, to
a network destination or file.
Expand Down Expand Up @@ -155,36 +287,49 @@ def flush(self) -> None:


class LogEmitter:
# TODO: Add multi_log_processor
def __init__(
self,
resource: Resource,
multi_log_processor: Union[
SynchronousMultiLogProcessor, ConcurrentMultiLogProcessor
],
instrumentation_info: InstrumentationInfo,
):
self._resource = resource
self._multi_log_processor = multi_log_processor
self._instrumentation_info = instrumentation_info

@property
def resource(self):
return self._resource

def emit(self, record: LogRecord):
# TODO: multi_log_processor.emit
pass
"""Emits the :class:`LogData` by associating :class:`LogRecord`
and instrumentation info.
"""
log_data = LogData(record, self._instrumentation_info)
self._multi_log_processor.emit(log_data)

# TODO: Should this flush everything in pipeline?
# Prior discussion https://github.com/open-telemetry/opentelemetry-python/pull/1916#discussion_r659945290
def flush(self):
# TODO: multi_log_processor.force_flush
pass
"""Ensure all logging output has been flushed."""
self._multi_log_processor.force_flush()


class LogEmitterProvider:
# TODO: Add multi_log_processor
def __init__(
self,
resource: Resource = Resource.create(),
shutdown_on_exit: bool = True,
multi_log_processor: Union[
SynchronousMultiLogProcessor, ConcurrentMultiLogProcessor
] = None,
):
self._resource = resource
self._multi_log_processor = (
multi_log_processor or SynchronousMultiLogProcessor()
)
self._at_exit_handler = None
if shutdown_on_exit:
self._at_exit_handler = atexit.register(self.shutdown)
Expand All @@ -200,6 +345,7 @@ def get_log_emitter(
) -> LogEmitter:
return LogEmitter(
self._resource,
self._multi_log_processor,
InstrumentationInfo(
instrumenting_module_name, instrumenting_module_verison
),
Expand All @@ -210,11 +356,11 @@ def add_log_processor(self, log_processor: LogProcessor):
The log processors are invoked in the same order they are registered.
"""
# TODO: multi_log_processor.add_log_processor.
self._multi_log_processor.add_log_processor(log_processor)

def shutdown(self):
"""Shuts down the log processors."""
# TODO: multi_log_processor.shutdown
self._multi_log_processor.shutdown()
if self._at_exit_handler is not None:
atexit.unregister(self._at_exit_handler)
self._at_exit_handler = None
Expand All @@ -230,7 +376,7 @@ def force_flush(self, timeout_millis: int = 30000) -> bool:
True if all the log processors flushes the logs within timeout,
False otherwise.
"""
# TODO: multi_log_processor.force_flush
return self._multi_log_processor.force_flush(timeout_millis)


_LOG_EMITTER_PROVIDER = None
Expand Down
Loading

0 comments on commit e2963d2

Please sign in to comment.