diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py index b70fb010190..0393d249d42 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py @@ -19,6 +19,10 @@ from enum import Enum from opentelemetry.context import Context +from opentelemetry.sdk.util import ( + set_timeout_signal_handler, + timeout_in_seconds, +) from opentelemetry.util import time_ns from .. import Span, SpanProcessor @@ -66,8 +70,11 @@ class SimpleExportSpanProcessor(SpanProcessor): passes ended spans directly to the configured `SpanExporter`. """ - def __init__(self, span_exporter: SpanExporter): + def __init__( + self, span_exporter: SpanExporter, timeout: int = 60, + ): self.span_exporter = span_exporter + self.timeout = timeout def on_start(self, span: Span) -> None: pass @@ -75,9 +82,11 @@ def on_start(self, span: Span) -> None: def on_end(self, span: Span) -> None: with Context.use(suppress_instrumentation=True): try: - self.span_exporter.export((span,)) - # pylint: disable=broad-except - except Exception: + with timeout_in_seconds(self.timeout): + self.span_exporter.export((span,)) + except TimeoutError: + logger.exception("Timeout Exception while exporting Span.") + except Exception: # pylint: disable=broad-except logger.exception("Exception while exporting Span.") def shutdown(self) -> None: @@ -97,6 +106,7 @@ def __init__( max_queue_size: int = 2048, schedule_delay_millis: float = 5000, max_export_batch_size: int = 512, + timeout: int = 60, ): if max_queue_size <= 0: raise ValueError("max_queue_size must be a positive integer.") @@ -131,6 +141,9 @@ def __init__( None ] * self.max_export_batch_size # type: typing.List[typing.Optional[Span]] self.worker_thread.start() + self.timeout = timeout + # used by general timeout mechanism + set_timeout_signal_handler() def on_start(self, span: Span) -> None: pass @@ -184,13 +197,17 @@ def export(self) -> None: idx += 1 with Context.use(suppress_instrumentation=True): try: - # Ignore type b/c the Optional[None]+slicing is too "clever" - # for mypy - self.span_exporter.export( - self.spans_list[:idx] - ) # type: ignore - # pylint: disable=broad-except - except Exception: + with timeout_in_seconds(self.timeout): + # Ignore type b/c the Optional[None]+slicing is too "clever" + # for mypy + self.span_exporter.export( + self.spans_list[:idx] + ) # type: ignore + except TimeoutError: + logger.exception( + "Timeout Exception while exporting Span batch." + ) + except Exception: # pylint: disable=broad-except logger.exception("Exception while exporting Span batch.") # clean up list diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/util.py b/opentelemetry-sdk/src/opentelemetry/sdk/util.py index 2265c29460b..d3e427f1d87 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/util.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/util.py @@ -13,8 +13,10 @@ # limitations under the License. import datetime +import signal import threading from collections import OrderedDict, deque +from contextlib import contextmanager try: # pylint: disable=ungrouped-imports @@ -26,6 +28,31 @@ from collections import Sequence +def set_timeout_signal_handler(): + "Signal timeout setter." + + def signal_handler_function(signum, frame): + raise TimeoutError + + return signal.signal(signal.SIGALRM, signal_handler_function) + + +@contextmanager +def timeout_in_seconds(seconds=None): + """A general timeout mechanism.""" + + if seconds is None: + yield + else: + if threading.current_thread() is threading.main_thread(): + set_timeout_signal_handler() + signal.alarm(seconds) + try: + yield + finally: + signal.alarm(0) + + def ns_to_iso_str(nanoseconds): """Get an ISO 8601 string from time_ns value.""" ts = datetime.datetime.utcfromtimestamp(nanoseconds / 1e9)