-
Notifications
You must be signed in to change notification settings - Fork 667
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add LogProcessors implementation #1916
Conversation
Co-authored-by: alrex <[email protected]>
…lemetry-python into logging-otlp-handler
""" | ||
|
||
def __init__(self): | ||
# use a tuple to avoid race conditions when adding a new log and |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the tuple what avoids race conditions or is it the lock in add_log_processor
? A tuple is an ordered data structure, is the order meaningful for log processors? If not, I suggest this be a set
, instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's both tuple and the lock. We are trying to be consistent with span processors so the order is important here.
self._log_processors = self._log_processors + (log_processor,) | ||
|
||
def emit(self, log_data: LogData) -> None: | ||
for lp in self._log_processors: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is a lock also necessary here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, we wouldn't want to hold the lock until all processors complete emit. This just iterates over the log processor available at the time of emitting and returns.
|
||
def shutdown(self) -> None: | ||
"""Shutdown the log processors one by one""" | ||
for lp in self._log_processors: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above
False otherwise. | ||
""" | ||
deadline_ns = _time_ns() + timeout_millis * 1000000 | ||
for lp in self._log_processors: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same
@@ -112,6 +115,133 @@ def force_flush(self, timeout_millis: int = 30000): | |||
""" | |||
|
|||
|
|||
class SynchronousMultiLogProcessor(LogProcessor): | |||
"""Implementation of class:`LogProcessor` that forwards all received | |||
events to a list of log processors sequentially. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this class be named SequentialMultiLogProcessor
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the current name is more accurate as we are synchronously calling one log processor after other.
return False | ||
|
||
for future in done_futures: | ||
if not future.result(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If any of the processors fails to flush within timeout this should return false.
@@ -51,3 +58,215 @@ def shutdown(self): | |||
|
|||
Called when the SDK is shut down. | |||
""" | |||
|
|||
|
|||
class SimpleLogProcessor(LogProcessor): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is Simple
descriptive for this kind of processor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is from spec
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you referring to the tracing spec? Are we trying to be consistent with tracing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I am referring to logs spec(OTEP at this point) https://github.com/open-telemetry/oteps/blob/main/text/logs/0150-logging-library-sdk.md#logprocessor.
Built-in implementations: SimpleLogProcessor, BatchLogProcessor.
And yes the Logs SDK tries to be consistent with tracing as much as possible.
The specification defines SDK elements that to some extent mirror the OpenTelemetry Trace SDK. This ensures uniformity and consistency of the OpenTelemetry specification and of the implementations across traces and logs. For additional clarity the definitions in this document refer to the Trace analogs where appropriate.
opentelemetry-sdk/src/opentelemetry/sdk/logs/export/__init__.py
Outdated
Show resolved
Hide resolved
and instrumentation info. | ||
""" | ||
log_data = LogData(record, self._instrumentation_info) | ||
self._multi_log_processor.emit(log_data) | ||
|
||
def flush(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does the LogEmitter
have a flush method? Shouldn't the Provider
be responsible for the processors?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both provider and emitter can flush. This exists to complete the Appender/Handler
like Interfaces/ABC. In our case, OTLPHandler
has to implement two methods (emit and flush) after subclassing logging.Handler
. Handler will have a access to Emitter (analogous to Tracer in instrumentation). So if .flush
of the handler is called (by high-level call logging.shutdown()
or any other manner) we make use of LogEmitter
flush method to flush the logs. Does this explanation make it clear why there is a flush on the Emitter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The use case makes sense. However, I'm wondering what would happen if multiple LogEmitter
s are created via Provider
but all sharing the same multilogprocessor
. If flush
is called for a specific LogEmitter
, would we flush the entire queue in the multilogprocessor
, which might contain log records from different sources -> different LogEmitter
s correct? I don't think we have this problem in tracing because only the TracerProvider
is allowed to flush. Thoughts? Unless I am missing something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, how could one avoid it and only flush the logs from one lib, except when we create a new pipeline for each source? (that doesn't sound like a good idea). On the other hand provider flush does the same thing just flush everything out. Do you see any potential problems? This is probably worth sharing with logs SIG and see what they have to say about it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah we should probably bring it up. I'll be out this week so if you could bring it up during the SIG and relay what is discussed that would be great.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now, I don't think this should block the PR, maybe just put a TODO next to flush
?
self.num_log_records = 0 | ||
|
||
|
||
class BatchLogProcessor(LogProcessor): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think I quite understand the logic flow. How do the handlers (that pick up logging messages from the Python logger) communicate with the Processor
? Are handlers added to the root log handler?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are handlers added to the root log handler?
Correct. End-user will have to attach the OTLPHandler to the root logger.
How do the handlers (that pick up logging messages from the Python logger) communicate with the Processor
Every log statement from the logging lib will be propagated to the root logger. Users can attach any number of Handlers with a logger instance. They may already be using out-of-box handlers from stdlib. OTLPHandler
is such which sends the log data in OTLP data model format to configured destination (could be n/w destination or file etc...).
The logic flow is Logger -> Handler -> LogEmitter -> LogProcessor -> LogExporter
. Think of Handler as Instrumentation and LogEmitter as Tracer from tracing SDK (kind of inaccurate but helps in visualizing things). Does it make sense now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh I see, each handler SHOULD have a reference to a log emitter. We would expect users to possible create their own Handlers
correct? Maybe it would be good to have another abstraction layer on top of logging.Handler
specific for OpenTelemetry log handlers (similar to BaseInstrumentor
). Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We would expect users to possible create their own Handlers correct?
They don't need another handler for the same logging lib but they can create Handler like hooks for other third-party logging libs (as recommended by the same module for example structlog has concept called Processors equivalent to logging.Handler).
Maybe it would be good to have another abstraction layer on top of logging.Handler specific for OpenTelemetry log handlers (similar to BaseInstrumentor).
Yeah, thought about it but couldn't come up with anything meaningful. Do you have any rough idea in your head how you would like it to be?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nothing big for now, but similarly to instrumentations, BaseInstrumentor
was needed eventually as we found more commonalities and functions between instrumentations. If a custom Handler
is created for OpenTelemetry, maybe we need to it have an emitter
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also probably not blocking the PR, just something to note if we ever get confusion on HOW a handler should look like.
Co-authored-by: Leighton Chen <[email protected]>
…emetry-python into logs-processor-impl
This reverts commit 0367af1.
The failing tests are not related to this PR. The fix has been merged in this open-telemetry/opentelemetry-python-contrib#560 but if I update the SHA there will be more failures because logs branch is behind main. I think this can get in. |
@lonewolf3739
What do you suggest? |
The first one looks good to me. We can update the https://github.com/lonewolf3739/opentelemetry-python/blob/logs-processor-impl/.github/workflows/test.yml to not run contrib-bulid tests, what do you think? |
@lonewolf3739 |
* Add initial overall structure and classes for logs sdk (#1894) * Add global LogEmitterProvider and convenience function get_log_emitter (#1901) * Add OTLPHandler for standard library logging module (#1903) * Add LogProcessors implementation (#1916) * Fix typos in test_handler.py (#1953) * Add support for OTLP Log exporter (#1943) * Add support for user defined attributes in OTLPHandler (#1952) * use timeout in force_flush (#2118) * use timeout in force_flush * fix lint * Update opentelemetry-sdk/src/opentelemetry/sdk/logs/export/__init__.py Co-authored-by: Srikanth Chekuri <[email protected]> * fix lint Co-authored-by: Srikanth Chekuri <[email protected]> * add a ConsoleExporter for logging (#2099) Co-authored-by: Srikanth Chekuri <[email protected]> * Update SDK docs and Add example with OTEL collector logging (debug) exporter (#2050) * Fix exception in severity number transformation (#2208) * Fix exception with warning message transformation * Fix lint * Fix lint * fstring * Demonstrate how to set the Resource for LogEmitterProvider (#2209) * Demonstrate how to set the Resource for LogEmitterProvider Added a Resource to the logs example to make it more complete. Previously it was using the built-in Resource. Now it adds the service.name and service.instance.id attributes. The resulting emitted log records look like this: ``` Resource labels: -> telemetry.sdk.language: STRING(python) -> telemetry.sdk.name: STRING(opentelemetry) -> telemetry.sdk.version: STRING(1.5.0) -> service.name: STRING(shoppingcart) -> service.instance.id: STRING(instance-12) InstrumentationLibraryLogs #0 InstrumentationLibrary __main__ 0.1 LogRecord #0 Timestamp: 2021-10-14 18:33:43.425820928 +0000 UTC Severity: ERROR ShortName: Body: Hyderabad, we have a major problem. Trace ID: ce1577e4a703f42d569e72593ad71888 Span ID: f8908ac4258ceff6 Flags: 1 ``` * Fix linting * Use batch processor in example (#2225) * move logs to _logs (#2240) * move logs to _logs * fix lint * move log_exporter to _log_exporter as it's still experimental (#2252) Co-authored-by: Srikanth Chekuri <[email protected]> Co-authored-by: Adrian Garcia Badaracco <[email protected]> Co-authored-by: Leighton Chen <[email protected]> Co-authored-by: Tigran Najaryan <[email protected]> Co-authored-by: Owais Lone <[email protected]>
Description
Part-4 of #1890. This PR adds support for log processors implementation.