Skip to content

Commit

Permalink
Switched from ObservableGauge to sync Gauge metric export (#46510)
Browse files Browse the repository at this point in the history
Co-authored-by: Marco Küttelwesch <[email protected]>
  • Loading branch information
AutomationDev85 and Marco Küttelwesch authored Feb 7, 2025
1 parent ae46b9e commit 764bf20
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 74 deletions.
67 changes: 23 additions & 44 deletions airflow/metrics/otel_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,10 @@
import logging
import random
import warnings
from collections.abc import Iterable
from functools import partial
from typing import TYPE_CHECKING, Callable, Union

from opentelemetry import metrics
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter
from opentelemetry.metrics import Observation
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics._internal.export import ConsoleMetricExporter, PeriodicExportingMetricReader
from opentelemetry.sdk.resources import HOST_NAME, SERVICE_NAME, Resource
Expand Down Expand Up @@ -159,7 +156,7 @@ def __init__(self, otel_logger: SafeOtelLogger, name: str | None, tags: Attribut

def stop(self, send: bool = True) -> None:
super().stop(send)
if self.name and send:
if self.name and send and self.duration:
self.otel_logger.metrics_map.set_gauge_value(
full_name(prefix=self.otel_logger.prefix, name=self.name), self.duration, False, self.tags
)
Expand Down Expand Up @@ -290,6 +287,25 @@ def timer(
return _OtelTimer(self, stat, tags)


class InternalGauge:
"""Stores sync gauge instrument and current value to support delta feature."""

def __init__(self, meter, name: str, tags: Attributes):
self.attributes = tags
otel_safe_name = _get_otel_safe_name(name)
self.gauge = meter.create_gauge(name=otel_safe_name)
log.debug("Created %s as type: %s", otel_safe_name, _type_as_str(self.gauge))
self.value = DEFAULT_GAUGE_VALUE
self.gauge.set(self.value, attributes=self.attributes)

def set_value(self, new_value: int | float, delta: bool):
"""Delta feature to increase old value with new value and metric export."""
if delta:
new_value += self.value
self.value = new_value
self.gauge.set(new_value, attributes=self.attributes)


class MetricsMap:
"""Stores Otel Instruments."""

Expand Down Expand Up @@ -335,7 +351,7 @@ def del_counter(self, name: str, attributes: Attributes = None) -> None:
if key in self.map.keys():
del self.map[key]

def set_gauge_value(self, name: str, value: float | None, delta: bool, tags: Attributes):
def set_gauge_value(self, name: str, value: int | float, delta: bool, tags: Attributes):
"""
Override the last reading for a Gauge with a new value.
Expand All @@ -346,48 +362,11 @@ def set_gauge_value(self, name: str, value: float | None, delta: bool, tags: Att
:returns: None
"""
key: str = _generate_key_name(name, tags)
new_value = value or DEFAULT_GAUGE_VALUE
old_value = self.poke_gauge(name, tags)
if delta:
new_value += old_value
# If delta is true, add the new value to the last reading otherwise overwrite it.
self.map[key] = Observation(new_value, tags)

def _create_gauge(self, name: str, attributes: Attributes = None):
"""
Create a new Observable Gauge with the provided name and the default value.

:param name: The name of the gauge to fetch or create.
:param attributes: Gauge attributes, used to generate a unique key to store the gauge.
"""
otel_safe_name = _get_otel_safe_name(name)
key = _generate_key_name(name, attributes)

gauge = self.meter.create_observable_gauge(
name=otel_safe_name,
callbacks=[partial(self.read_gauge, _generate_key_name(name, attributes))],
)
self.map[key] = Observation(DEFAULT_GAUGE_VALUE, attributes)

return gauge

def read_gauge(self, key: str, *args) -> Iterable[Observation]:
"""Return the Observation for the provided key; callback for the Observable Gauges."""
yield self.map[key]

def poke_gauge(self, name: str, attributes: Attributes = None) -> GaugeValues:
"""
Return the value of the gauge; creates a new one with the default value if it did not exist.
:param name: The name of the gauge to fetch or create.
:param attributes: Gauge attributes, used to generate a unique key to store the gauge.
:returns: The integer or float value last recorded for the provided Gauge name.
"""
key = _generate_key_name(name, attributes)
if key not in self.map:
self._create_gauge(name, attributes)
self.map[key] = InternalGauge(meter=self.meter, name=name, tags=tags)

return self.map[key].value
self.map[key].set_value(value, delta)


def get_otel_logger(cls) -> SafeOtelLogger:
Expand Down
41 changes: 11 additions & 30 deletions tests/core/test_otel_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import logging
import time
from unittest import mock
from unittest.mock import ANY

import pytest
from opentelemetry.metrics import MeterProvider
Expand Down Expand Up @@ -181,9 +180,7 @@ def test_decr_with_rate_limit_works(self, mock_random, name):
def test_gauge_new_metric(self, name):
self.stats.gauge(name, value=1)

self.meter.get_meter().create_observable_gauge.assert_called_once_with(
name=full_name(name), callbacks=ANY
)
self.meter.get_meter().create_gauge.assert_called_once_with(name=full_name(name))
assert self.map[full_name(name)].value == 1

def test_gauge_new_metric_with_tags(self, name):
Expand All @@ -192,27 +189,21 @@ def test_gauge_new_metric_with_tags(self, name):

self.stats.gauge(name, value=1, tags=tags)

self.meter.get_meter().create_observable_gauge.assert_called_once_with(
name=full_name(name), callbacks=ANY
)
self.meter.get_meter().create_gauge.assert_called_once_with(name=full_name(name))
self.map[key].attributes == tags

def test_gauge_existing_metric(self, name):
self.stats.gauge(name, value=1)
self.stats.gauge(name, value=2)

self.meter.get_meter().create_observable_gauge.assert_called_once_with(
name=full_name(name), callbacks=ANY
)
self.meter.get_meter().create_gauge.assert_called_once_with(name=full_name(name))
assert self.map[full_name(name)].value == 2

def test_gauge_existing_metric_with_delta(self, name):
self.stats.gauge(name, value=1)
self.stats.gauge(name, value=2, delta=True)

self.meter.get_meter().create_observable_gauge.assert_called_once_with(
name=full_name(name), callbacks=ANY
)
self.meter.get_meter().create_gauge.assert_called_once_with(name=full_name(name))
assert self.map[full_name(name)].value == 3

@mock.patch("random.random", side_effect=[0.1, 0.9])
Expand All @@ -239,9 +230,7 @@ def test_timing_new_metric(self, name):

self.stats.timing(name, dt=datetime.timedelta(seconds=123))

self.meter.get_meter().create_observable_gauge.assert_called_once_with(
name=full_name(name), callbacks=ANY
)
self.meter.get_meter().create_gauge.assert_called_once_with(name=full_name(name))
expected_value = 123000.0
assert self.map[full_name(name)].value == expected_value

Expand All @@ -251,18 +240,14 @@ def test_timing_new_metric_with_tags(self, name):

self.stats.timing(name, dt=1, tags=tags)

self.meter.get_meter().create_observable_gauge.assert_called_once_with(
name=full_name(name), callbacks=ANY
)
self.meter.get_meter().create_gauge.assert_called_once_with(name=full_name(name))
self.map[key].attributes == tags

def test_timing_existing_metric(self, name):
self.stats.timing(name, dt=1)
self.stats.timing(name, dt=2)

self.meter.get_meter().create_observable_gauge.assert_called_once_with(
name=full_name(name), callbacks=ANY
)
self.meter.get_meter().create_gauge.assert_called_once_with(name=full_name(name))
assert self.map[full_name(name)].value == 2

# For the four test_timer_foo tests below:
Expand All @@ -278,9 +263,7 @@ def test_timer_with_name_returns_float_and_stores_value(self, mock_time, name):
expected_duration = 3140.0
assert timer.duration == expected_duration
assert mock_time.call_count == 2
self.meter.get_meter().create_observable_gauge.assert_called_once_with(
name=full_name(name), callbacks=ANY
)
self.meter.get_meter().create_gauge.assert_called_once_with(name=full_name(name))

@mock.patch.object(time, "perf_counter", side_effect=[0.0, 3.14])
def test_timer_no_name_returns_float_but_does_not_store_value(self, mock_time, name):
Expand All @@ -291,7 +274,7 @@ def test_timer_no_name_returns_float_but_does_not_store_value(self, mock_time, n
expected_duration = 3140.0
assert timer.duration == expected_duration
assert mock_time.call_count == 2
self.meter.get_meter().create_observable_gauge.assert_not_called()
self.meter.get_meter().create_gauge.assert_not_called()

@mock.patch.object(time, "perf_counter", side_effect=[0.0, 3.14])
def test_timer_start_and_stop_manually_send_false(self, mock_time, name):
Expand All @@ -304,7 +287,7 @@ def test_timer_start_and_stop_manually_send_false(self, mock_time, name):
expected_value = 3140.0
assert timer.duration == expected_value
assert mock_time.call_count == 2
self.meter.get_meter().create_observable_gauge.assert_not_called()
self.meter.get_meter().create_gauge.assert_not_called()

@mock.patch.object(time, "perf_counter", side_effect=[0.0, 3.14])
def test_timer_start_and_stop_manually_send_true(self, mock_time, name):
Expand All @@ -317,6 +300,4 @@ def test_timer_start_and_stop_manually_send_true(self, mock_time, name):
expected_value = 3140.0
assert timer.duration == expected_value
assert mock_time.call_count == 2
self.meter.get_meter().create_observable_gauge.assert_called_once_with(
name=full_name(name), callbacks=ANY
)
self.meter.get_meter().create_gauge.assert_called_once_with(name=full_name(name))

0 comments on commit 764bf20

Please sign in to comment.