Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add general timeout mechanism in the export layer #385

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 28 additions & 11 deletions opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
from enum import Enum

from opentelemetry.context import Context
from opentelemetry.sdk.util import (
set_timeout_signal_handler,
timeout_in_seconds,
)
from opentelemetry.util import time_ns

from .. import Span, SpanProcessor
Expand Down Expand Up @@ -66,18 +70,23 @@ class SimpleExportSpanProcessor(SpanProcessor):
passes ended spans directly to the configured `SpanExporter`.
"""

def __init__(self, span_exporter: SpanExporter):
def __init__(
self, span_exporter: SpanExporter, timeout: int = 60,
):
self.span_exporter = span_exporter
self.timeout = timeout

def on_start(self, span: Span) -> None:
pass

def on_end(self, span: Span) -> None:
with Context.use(suppress_instrumentation=True):
try:
self.span_exporter.export((span,))
# pylint: disable=broad-except
except Exception:
with timeout_in_seconds(self.timeout):
self.span_exporter.export((span,))
except TimeoutError:
logger.exception("Timeout Exception while exporting Span.")
except Exception: # pylint: disable=broad-except
logger.exception("Exception while exporting Span.")

def shutdown(self) -> None:
Expand All @@ -97,6 +106,7 @@ def __init__(
max_queue_size: int = 2048,
schedule_delay_millis: float = 5000,
max_export_batch_size: int = 512,
timeout: int = 60,
):
if max_queue_size <= 0:
raise ValueError("max_queue_size must be a positive integer.")
Expand Down Expand Up @@ -131,6 +141,9 @@ def __init__(
None
] * self.max_export_batch_size # type: typing.List[typing.Optional[Span]]
self.worker_thread.start()
self.timeout = timeout
# used by general timeout mechanism
set_timeout_signal_handler()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like you're setting the handler here because export runs in a worker thread? But since we initialize the span processor right at startup, this is going to change the SIGALRM handler for the entire life of the process, not just while we're waiting for an export to timeout.


def on_start(self, span: Span) -> None:
pass
Expand Down Expand Up @@ -184,13 +197,17 @@ def export(self) -> None:
idx += 1
with Context.use(suppress_instrumentation=True):
try:
# Ignore type b/c the Optional[None]+slicing is too "clever"
# for mypy
self.span_exporter.export(
self.spans_list[:idx]
) # type: ignore
# pylint: disable=broad-except
except Exception:
with timeout_in_seconds(self.timeout):
# Ignore type b/c the Optional[None]+slicing is too "clever"
# for mypy
self.span_exporter.export(
self.spans_list[:idx]
) # type: ignore
except TimeoutError:
logger.exception(
"Timeout Exception while exporting Span batch."
)
except Exception: # pylint: disable=broad-except
logger.exception("Exception while exporting Span batch.")

# clean up list
Expand Down
27 changes: 27 additions & 0 deletions opentelemetry-sdk/src/opentelemetry/sdk/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
# limitations under the License.

import datetime
import signal
import threading
from collections import OrderedDict, deque
from contextlib import contextmanager

try:
# pylint: disable=ungrouped-imports
Expand All @@ -26,6 +28,31 @@
from collections import Sequence


def set_timeout_signal_handler():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function defines a inner function and then registers it into signal, but it is being called in an __init__. Keep in mind that you most likely want to do this only once, not every time that an object is initialized. Consider getting rid of this function and just do the registering into signal when this module is imported.

"Signal timeout setter."

def signal_handler_function(signum, frame):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a function, there is no need to add _function to its name. The name of the function should describe what the function does, in this case, it raises a TimeoutError.

raise TimeoutError

return signal.signal(signal.SIGALRM, signal_handler_function)


@contextmanager
def timeout_in_seconds(seconds=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea of having a general purpose timeout mechanism is great, but keep in mind that the specification requires export and shutdown to not block indefinitely. This approach (using a @contextmanager to make a context object that is used when export and shutdown are called) puts the responsibility of being specification compliant on the code that does the calling of export and shutdown instead of putting the responsibility on these 2 methods themselves not to block indefinitely.

Please consider making a decorator that is then applied directly to the functions export and shutdown. In this way, they can be called directly and the caller won't have to worry about making them not block indefinitely, something like this:

def timeout(function):

    def inner(*args, timeout=90, **kwargs):

        print(timeout)

        function(*args, **kwargs)

    return inner


@timeout
def export(first, second):
    print(first)
    print(second)


export(1, 2, timeout=9)
9
1
2

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exporters might have a better way to implement timeouts, e.g. in the zipkin exporter we'd pass a timeout arg to requests.post which sets a timeout on the underlying socket.

I think a general purpose timeout in processor is fine, but exporters should implement timeout logic themselves: #346 (comment).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think is better if you document the function attribute instead of using the name of the function as documentation for the attribute itself, something like this:

def timeout(time):
    """
    ...
    time: time to wait before timing out, in seconds
    """
    ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ocelotl you and @toumorokoshi can duke it out over hungarian notation. 😄

"""A general timeout mechanism."""

if seconds is None:
yield
else:
if threading.current_thread() is threading.main_thread():
set_timeout_signal_handler()
signal.alarm(seconds)
try:
yield
finally:
signal.alarm(0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this to be a general-purpose timeout func, shouldn't it reset the old signal handler after cancelling the alarm?



def ns_to_iso_str(nanoseconds):
"""Get an ISO 8601 string from time_ns value."""
ts = datetime.datetime.utcfromtimestamp(nanoseconds / 1e9)
Expand Down