From c674549e4ff54c1a65f241f35cdb32625cc88220 Mon Sep 17 00:00:00 2001 From: Leighton Chen Date: Wed, 28 Feb 2024 13:53:50 -0800 Subject: [PATCH 01/21] move state --- .../README.md | 2 +- .../exporter/_quickpulse/_constants.py | 6 +++ .../exporter/_quickpulse/_exporter.py | 48 +++++++++---------- .../exporter/_quickpulse/_live_metrics.py | 21 ++++++++ .../exporter/_quickpulse/_state.py | 30 ++++++++++++ .../tests/quickpulse/test_exporter.py | 13 +++-- .../tests/quickpulse/test_live_metrics.py | 9 ++++ 7 files changed, 98 insertions(+), 31 deletions(-) create mode 100644 sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_state.py diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/README.md b/sdk/monitor/azure-monitor-opentelemetry-exporter/README.md index 5be012c891d6..e5f360932f58 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/README.md +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/README.md @@ -1,6 +1,6 @@ # Microsoft OpenTelemetry exporter for Azure Monitor -The exporter for Azure Monitor allows Python applications to export data from the OpenTelemetry SDK to Azure Monitor. The exporter is intended for users who require advanced configuration or has more complicated telemetry needs that require all of distributed tracing, logging and metrics. If you have simpler configuration requirements, we recommend using the [Azure Monitor OpenTelemetry Distro](https://learn.microsoft.com/azure/azure-monitor/app/opentelemetry-enable?tabs=python) instead for a simpler one-line setup. +The exporter for Azure Monitor allows Python applications to export data from the OpenTelemetry SDK to Azure Monitor. The exporter is intended for users who require advanced configuration or have more complicated telemetry needs that require all of distributed tracing, logging and metrics. If you have simpler configuration requirements, we recommend using the [Azure Monitor OpenTelemetry Distro](https://learn.microsoft.com/azure/azure-monitor/app/opentelemetry-enable?tabs=python) instead for a simpler one-line setup. Prior to using this SDK, please read and understand [Data Collection Basics](https://learn.microsoft.com/azure/azure-monitor/app/opentelemetry-overview?tabs=python), especially the section on [telemetry types](https://learn.microsoft.com/azure/azure-monitor/app/opentelemetry-overview?tabs=python#telemetry-types). OpenTelemetry terminology differs from Application Insights terminology so it is important to understand the way the telemetry types map to each other. diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_constants.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_constants.py index b591258f2ee0..a51e88c7d0c4 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_constants.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_constants.py @@ -33,4 +33,10 @@ ] ) +# Quickpulse intervals +_SHORT_PING_INTERVAL_SECONDS = 5 +_POST_INTERVAL_SECONDS = 1 +_LONG_PING_INTERVAL_SECONDS = 60 +_POST_CANCEL_INTERVAL_SECONDS = 20 + # cSpell:disable diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_exporter.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_exporter.py index ffcfb33312d0..1b7f9b024b64 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_exporter.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_exporter.py @@ -1,7 +1,6 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. from datetime import datetime, timezone -from enum import Enum from typing import Any, List, Optional from opentelemetry.context import ( @@ -32,13 +31,23 @@ ) from azure.core.exceptions import HttpResponseError -from azure.monitor.opentelemetry.exporter._quickpulse._constants import _QUICKPULSE_METRIC_NAME_MAPPINGS +from azure.monitor.opentelemetry.exporter._quickpulse._constants import ( + _LONG_PING_INTERVAL_SECONDS, + _POST_CANCEL_INTERVAL_SECONDS, + _POST_INTERVAL_SECONDS, + _QUICKPULSE_METRIC_NAME_MAPPINGS, +) from azure.monitor.opentelemetry.exporter._quickpulse._generated._client import QuickpulseClient from azure.monitor.opentelemetry.exporter._quickpulse._generated.models import ( DocumentIngress, MetricPoint, MonitoringDataPoint, ) +from azure.monitor.opentelemetry.exporter._quickpulse._state import ( + _get_global_quickpulse_state, + _set_global_quickpulse_state, + _QuickpulseState, +) from azure.monitor.opentelemetry.exporter._connection_string_parser import ConnectionStringParser from azure.monitor.opentelemetry.exporter._utils import _ticks_since_dot_net_epoch, PeriodicTask @@ -52,11 +61,6 @@ UpDownCounter: AggregationTemporality.CUMULATIVE, } -_SHORT_PING_INTERVAL_SECONDS = 5 -_POST_INTERVAL_SECONDS = 1 -_LONG_PING_INTERVAL_SECONDS = 60 -_POST_CANCEL_INTERVAL_SECONDS = 20 - class _Response: """Response that encapsulates pipeline response and response headers from @@ -190,16 +194,6 @@ def _ping(self, monitoring_data_point) -> Optional[_Response]: return ping_response -class _QuickpulseState(Enum): - """Current state of quickpulse service. - The numerical value represents the ping/post interval in ms for those states. - """ - - PING_SHORT = _SHORT_PING_INTERVAL_SECONDS - PING_LONG = _LONG_PING_INTERVAL_SECONDS - POST_SHORT = _POST_INTERVAL_SECONDS - - class _QuickpulseMetricReader(MetricReader): def __init__( @@ -208,7 +202,6 @@ def __init__( base_monitoring_data_point: MonitoringDataPoint, ) -> None: self._exporter = exporter - self._quick_pulse_state = _QuickpulseState.PING_SHORT self._base_monitoring_data_point = base_monitoring_data_point self._elapsed_num_seconds = 0 self._worker = PeriodicTask( @@ -226,7 +219,7 @@ def __init__( def _ticker(self) -> None: if self._is_ping_state(): # Send a ping if elapsed number of request meets the threshold - if self._elapsed_num_seconds % int(self._quick_pulse_state.value) == 0: + if self._elapsed_num_seconds % _get_global_quickpulse_state().value == 0: print("pinging...") ping_response = self._exporter._ping( # pylint: disable=protected-access self._base_monitoring_data_point, @@ -236,22 +229,22 @@ def _ticker(self) -> None: if header and header == "true": print("ping succeeded: switching to post") # Switch state to post if subscribed - self._quick_pulse_state = _QuickpulseState.POST_SHORT + _set_global_quickpulse_state(_QuickpulseState.POST_SHORT) self._elapsed_num_seconds = 0 else: # Backoff after _LONG_PING_INTERVAL_SECONDS (60s) of no successful requests - if self._quick_pulse_state is _QuickpulseState.PING_SHORT and \ + if _get_global_quickpulse_state() is _QuickpulseState.PING_SHORT and \ self._elapsed_num_seconds >= _LONG_PING_INTERVAL_SECONDS: print("ping failed for 60s, switching to pinging every 60s") - self._quick_pulse_state = _QuickpulseState.PING_LONG + _set_global_quickpulse_state(_QuickpulseState.PING_LONG) # TODO: Implement redirect else: # Erroneous ping responses instigate backoff logic # Backoff after _LONG_PING_INTERVAL_SECONDS (60s) of no successful requests - if self._quick_pulse_state is _QuickpulseState.PING_SHORT and \ + if _get_global_quickpulse_state() is _QuickpulseState.PING_SHORT and \ self._elapsed_num_seconds >= _LONG_PING_INTERVAL_SECONDS: print("ping failed for 60s, switching to pinging every 60s") - self._quick_pulse_state = _QuickpulseState.PING_LONG + _set_global_quickpulse_state(_QuickpulseState.PING_LONG) else: print("posting...") try: @@ -262,7 +255,7 @@ def _ticker(self) -> None: # And resume pinging if self._elapsed_num_seconds >= _POST_CANCEL_INTERVAL_SECONDS: print("post failed for 20s, switching to pinging") - self._quick_pulse_state = _QuickpulseState.PING_SHORT + _set_global_quickpulse_state(_QuickpulseState.PING_SHORT) self._elapsed_num_seconds = 0 self._elapsed_num_seconds += 1 @@ -290,7 +283,10 @@ def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: self._worker.join() def _is_ping_state(self): - return self._quick_pulse_state in (_QuickpulseState.PING_SHORT, _QuickpulseState.PING_LONG) + return _get_global_quickpulse_state() in ( + _QuickpulseState.PING_SHORT, + _QuickpulseState.PING_LONG + ) def _metric_to_quick_pulse_data_points( # pylint: disable=too-many-nested-blocks metrics_data: OTMetricsData, diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py index 83bb3e073e3d..ba1bcbf13c33 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py @@ -4,9 +4,17 @@ from typing import Any, Optional from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.trace import ReadableSpan from opentelemetry.sdk.trace.id_generator import RandomIdGenerator from opentelemetry.sdk.resources import Resource from azure.monitor.opentelemetry.exporter._generated.models import ContextTagKeys +from azure.monitor.opentelemetry.exporter._quickpulse._state import ( + _QuickpulseState, + _set_global_quickpulse_state, +) +from azure.monitor.opentelemetry.exporter._quickpulse._constants import ( + _REQUEST_DURATION_NAME, +) from azure.monitor.opentelemetry.exporter._quickpulse._exporter import ( _QuickpulseExporter, _QuickpulseMetricReader, @@ -32,6 +40,7 @@ def enable_live_metrics(**kwargs: Any) -> None: class _QuickpulseManager(metaclass=Singleton): def __init__(self, connection_string: Optional[str], resource: Optional[Resource]) -> None: + _set_global_quickpulse_state(_QuickpulseState.PING_SHORT) self._exporter = _QuickpulseExporter(connection_string) part_a_fields = {} if resource: @@ -47,3 +56,15 @@ def __init__(self, connection_string: Optional[str], resource: Optional[Resource ) self._reader = _QuickpulseMetricReader(self._exporter, self._base_monitoring_data_point) self._meter_provider = MeterProvider([self._reader]) + # self._meter = self._meter_provider.get_meter() + + # self._request_duration = self._meter.create_histogram( + # _REQUEST_DURATION_NAME[0], + # "ms", + # "live metrics avg request duration in ms" + # ) + +# def record_span(self, span: ReadableSpan): + + +# def record_span_for_quickpulse() \ No newline at end of file diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_state.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_state.py new file mode 100644 index 000000000000..294633dd5f2d --- /dev/null +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_state.py @@ -0,0 +1,30 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +from enum import Enum + +from azure.monitor.opentelemetry.exporter._quickpulse._constants import ( + _LONG_PING_INTERVAL_SECONDS, + _POST_INTERVAL_SECONDS, + _SHORT_PING_INTERVAL_SECONDS, +) + +class _QuickpulseState(Enum): + """Current state of quickpulse service. + The numerical value represents the ping/post interval in ms for those states. + """ + + PING_SHORT = _SHORT_PING_INTERVAL_SECONDS + PING_LONG = _LONG_PING_INTERVAL_SECONDS + POST_SHORT = _POST_INTERVAL_SECONDS + + +_GLOBAL_QUICKPULSE_STATE = _QuickpulseState.PING_SHORT + + +def _set_global_quickpulse_state(state: _QuickpulseState): + global _GLOBAL_QUICKPULSE_STATE + _GLOBAL_QUICKPULSE_STATE = state + + +def _get_global_quickpulse_state() -> _QuickpulseState: + return _GLOBAL_QUICKPULSE_STATE diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_exporter.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_exporter.py index d053b3da4e59..a594e0517d55 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_exporter.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_exporter.py @@ -27,6 +27,11 @@ _Response, _UnsuccessfulQuickPulsePostError, ) +from azure.monitor.opentelemetry.exporter._quickpulse._state import ( + _get_global_quickpulse_state, + _set_global_quickpulse_state, + _QuickpulseState, +) def throw(exc_type, *args, **kwargs): @@ -39,6 +44,7 @@ def func(*_args, **_kwargs): class TestQuickpulse(unittest.TestCase): @classmethod def setUpClass(cls): + _set_global_quickpulse_state(_QuickpulseState.PING_SHORT) cls._resource = Resource.create( { ResourceAttributes.SERVICE_INSTANCE_ID: "test_instance", @@ -209,10 +215,9 @@ def test_quickpulsereader_init(self, task_mock): self._data_point, ) self.assertEqual(reader._exporter, self._exporter) - self.assertEqual(reader._quick_pulse_state, _QuickpulseState.PING_SHORT) + self.assertEqual(_get_global_quickpulse_state(), _QuickpulseState.PING_SHORT) self.assertEqual(reader._base_monitoring_data_point, self._data_point) self.assertEqual(reader._elapsed_num_seconds, 0) - self.assertEqual(reader._elapsed_num_seconds, 0) self.assertEqual(reader._worker, task_inst_mock) task_mock.assert_called_with( interval=_POST_INTERVAL_SECONDS, @@ -231,7 +236,7 @@ def test_quickpulsereader_ticker_ping_true(self, task_mock, ping_mock): self._exporter, self._data_point, ) - reader._quick_pulse_state = _QuickpulseState.PING_SHORT + _set_global_quickpulse_state(_QuickpulseState.PING_SHORT) reader._elapsed_num_seconds = _QuickpulseState.PING_SHORT.value ping_mock.return_value = _Response( None, @@ -244,7 +249,7 @@ def test_quickpulsereader_ticker_ping_true(self, task_mock, ping_mock): ping_mock.assert_called_once_with( self._data_point, ) - self.assertEqual(reader._quick_pulse_state, _QuickpulseState.POST_SHORT) + self.assertEqual(_get_global_quickpulse_state(), _QuickpulseState.POST_SHORT) self.assertEqual(reader._elapsed_num_seconds, 1) # TODO: Other ticker cases diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_live_metrics.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_live_metrics.py index 0b0f512865bf..f5940f8fcbbd 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_live_metrics.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_live_metrics.py @@ -17,6 +17,11 @@ enable_live_metrics, _QuickpulseManager, ) +from azure.monitor.opentelemetry.exporter._quickpulse._state import ( + _get_global_quickpulse_state, + _set_global_quickpulse_state, + _QuickpulseState, +) from azure.monitor.opentelemetry.exporter._utils import ( _get_sdk_version, _populate_part_a_fields, @@ -36,6 +41,9 @@ def test_enable_live_metrics(self, manager_mock): class TestQuickpulseManager(unittest.TestCase): + @classmethod + def setUpClass(cls): + _set_global_quickpulse_state(_QuickpulseState.PING_SHORT) @mock.patch("opentelemetry.sdk.trace.id_generator.RandomIdGenerator.generate_trace_id") def test_init(self, generator_mock): @@ -51,6 +59,7 @@ def test_init(self, generator_mock): connection_string="InstrumentationKey=4321abcd-5678-4efa-8abc-1234567890ac;LiveEndpoint=https://eastus.livediagnostics.monitor.azure.com/", resource=resource, ) + self.assertEqual(_get_global_quickpulse_state(), _QuickpulseState.PING_SHORT) self.assertTrue(isinstance(qpm._exporter, _QuickpulseExporter)) self.assertEqual( qpm._exporter._live_endpoint, From 6ddd599ccdba4a37798bc9b8d66143d771d7dc52 Mon Sep 17 00:00:00 2001 From: Leighton Chen Date: Mon, 4 Mar 2024 11:31:22 -0800 Subject: [PATCH 02/21] collection --- .../exporter/_quickpulse/_constants.py | 10 ++ .../exporter/_quickpulse/_exporter.py | 63 ++------- .../exporter/_quickpulse/_live_metrics.py | 85 ++++++++++-- .../exporter/_quickpulse/_state.py | 20 ++- .../exporter/_quickpulse/_utils.py | 127 ++++++++++++++++++ .../monitor/opentelemetry/exporter/_utils.py | 2 +- 6 files changed, 237 insertions(+), 70 deletions(-) create mode 100644 sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_utils.py diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_constants.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_constants.py index a51e88c7d0c4..94155b73f32c 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_constants.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_constants.py @@ -1,5 +1,7 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. +from enum import Enum + # cSpell:disable # (OpenTelemetry metric name, Quickpulse metric name) @@ -39,4 +41,12 @@ _LONG_PING_INTERVAL_SECONDS = 60 _POST_CANCEL_INTERVAL_SECONDS = 20 +# Live metrics data types +class _DocumentIngressDocumentType(Enum): + Request = "Request" + RemoteDependency = "RemoteDependency" + Exception = "Exception" + Event = "Event" + Trace = "Trace" + # cSpell:disable diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_exporter.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_exporter.py index 1b7f9b024b64..57594451ea89 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_exporter.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_exporter.py @@ -1,7 +1,6 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. -from datetime import datetime, timezone -from typing import Any, List, Optional +from typing import Any, Optional from opentelemetry.context import ( _SUPPRESS_INSTRUMENTATION_KEY, @@ -17,11 +16,7 @@ ObservableUpDownCounter, UpDownCounter, ) -from opentelemetry.sdk.metrics._internal.point import ( - NumberDataPoint, - HistogramDataPoint, - MetricsData, -) +from opentelemetry.sdk.metrics._internal.point import MetricsData from opentelemetry.sdk.metrics.export import ( AggregationTemporality, MetricExporter, @@ -35,19 +30,18 @@ _LONG_PING_INTERVAL_SECONDS, _POST_CANCEL_INTERVAL_SECONDS, _POST_INTERVAL_SECONDS, - _QUICKPULSE_METRIC_NAME_MAPPINGS, ) from azure.monitor.opentelemetry.exporter._quickpulse._generated._client import QuickpulseClient -from azure.monitor.opentelemetry.exporter._quickpulse._generated.models import ( - DocumentIngress, - MetricPoint, - MonitoringDataPoint, -) +from azure.monitor.opentelemetry.exporter._quickpulse._generated.models import MonitoringDataPoint from azure.monitor.opentelemetry.exporter._quickpulse._state import ( _get_global_quickpulse_state, + _is_ping_state, _set_global_quickpulse_state, _QuickpulseState, ) +from azure.monitor.opentelemetry.exporter._quickpulse._utils import ( + _metric_to_quick_pulse_data_points, +) from azure.monitor.opentelemetry.exporter._connection_string_parser import ConnectionStringParser from azure.monitor.opentelemetry.exporter._utils import _ticks_since_dot_net_epoch, PeriodicTask @@ -217,7 +211,7 @@ def __init__( self._worker.start() def _ticker(self) -> None: - if self._is_ping_state(): + if _is_ping_state(): # Send a ping if elapsed number of request meets the threshold if self._elapsed_num_seconds % _get_global_quickpulse_state().value == 0: print("pinging...") @@ -281,44 +275,3 @@ def _receive_metrics( def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: self._worker.cancel() self._worker.join() - - def _is_ping_state(self): - return _get_global_quickpulse_state() in ( - _QuickpulseState.PING_SHORT, - _QuickpulseState.PING_LONG - ) - -def _metric_to_quick_pulse_data_points( # pylint: disable=too-many-nested-blocks - metrics_data: OTMetricsData, - base_monitoring_data_point: MonitoringDataPoint, - documents: Optional[List[DocumentIngress]], -) -> List[MonitoringDataPoint]: - metric_points = [] - for resource_metric in metrics_data.resource_metrics: - for scope_metric in resource_metric.scope_metrics: - for metric in scope_metric.metrics: - for point in metric.data.data_points: - if point is not None: - metric_point = MetricPoint( - name=_QUICKPULSE_METRIC_NAME_MAPPINGS[metric.name.lower()], - weight=1, - ) - if isinstance(point, HistogramDataPoint): - metric_point.value = point.sum - elif isinstance(point, NumberDataPoint): - metric_point.value = point.value - else: - metric_point.value = 0 - metric_points.append(metric_point) - return [ - MonitoringDataPoint( - version=base_monitoring_data_point.version, - instance=base_monitoring_data_point.instance, - role_name=base_monitoring_data_point.role_name, - machine_name=base_monitoring_data_point.machine_name, - stream_id=base_monitoring_data_point.stream_id, - timestamp=datetime.now(tz=timezone.utc), - metrics=metric_points, - documents=documents, - ) - ] diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py index ba1bcbf13c33..0c0d04c9022e 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py @@ -3,23 +3,34 @@ import platform from typing import Any, Optional +from opentelemetry.trace import SpanKind +from opentelemetry.semconv.trace import SpanAttributes from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.trace import ReadableSpan from opentelemetry.sdk.trace.id_generator import RandomIdGenerator from opentelemetry.sdk.resources import Resource from azure.monitor.opentelemetry.exporter._generated.models import ContextTagKeys -from azure.monitor.opentelemetry.exporter._quickpulse._state import ( - _QuickpulseState, - _set_global_quickpulse_state, -) from azure.monitor.opentelemetry.exporter._quickpulse._constants import ( + _DEPENDENCY_DURATION_NAME, + _DEPENDENCY_FAILURE_RATE_NAME, + _DEPENDENCY_RATE_NAME, _REQUEST_DURATION_NAME, + _REQUEST_FAILURE_RATE_NAME, + _REQUEST_RATE_NAME, ) from azure.monitor.opentelemetry.exporter._quickpulse._exporter import ( _QuickpulseExporter, _QuickpulseMetricReader, ) from azure.monitor.opentelemetry.exporter._quickpulse._generated.models import MonitoringDataPoint +from azure.monitor.opentelemetry.exporter._quickpulse._state import ( + _QuickpulseState, + _is_post_state, + _set_global_quickpulse_state, +) +from azure.monitor.opentelemetry.exporter._quickpulse._utils import ( + _get_span_document, +) from azure.monitor.opentelemetry.exporter._utils import ( _get_sdk_version, _populate_part_a_fields, @@ -37,6 +48,12 @@ def enable_live_metrics(**kwargs: Any) -> None: _QuickpulseManager(kwargs.get('connection_string'), kwargs.get('resource')) +def record_span(span: ReadableSpan) -> None: + qpm = _QuickpulseManager._instance + if qpm: + qpm._record_span(span) + + class _QuickpulseManager(metaclass=Singleton): def __init__(self, connection_string: Optional[str], resource: Optional[Resource]) -> None: @@ -56,15 +73,61 @@ def __init__(self, connection_string: Optional[str], resource: Optional[Resource ) self._reader = _QuickpulseMetricReader(self._exporter, self._base_monitoring_data_point) self._meter_provider = MeterProvider([self._reader]) - # self._meter = self._meter_provider.get_meter() + self._meter = self._meter_provider.get_meter("azure_monitor_live_metrics") + + self._request_duration = self._meter.create_histogram( + _REQUEST_DURATION_NAME[0], + "ms", + "live metrics avg request duration in ms" + ) + self._dependency_duration = self._meter.create_histogram( + _DEPENDENCY_DURATION_NAME[0], + "ms", + "live metrics avg dependency duration in ms" + ) + # We use a counter to represent rates per second because collection + # interval is one second so we simply need the number of requests + # within the collection interval + self._request_rate_counter = self._meter.create_counter( + _REQUEST_RATE_NAME[0], + "req/sec", + "live metrics request rate per second" + ) + self._request_failed_rate_counter = self._meter.create_counter( + _REQUEST_FAILURE_RATE_NAME[0], + "req/sec", + "live metrics request failed rate per second" + ) + self._dependency_rate_counter = self._meter.create_counter( + _DEPENDENCY_RATE_NAME[0], + "dep/sec", + "live metrics dependency rate per second" + ) + self._dependency_failure_rate_counter = self._meter.create_counter( + _DEPENDENCY_FAILURE_RATE_NAME[0], + "dep/sec", + "live metrics dependency failure rate per second" + ) + + def _record_span(self, span: ReadableSpan): + if _is_post_state(): + document = _get_span_document(span) + duration_ms = (span.end_time - span.start_time) / 1e9 + status_code = str(span.attributes.get(SpanAttributes.HTTP_STATUS_CODE), "") + success = status_code == "200" - # self._request_duration = self._meter.create_histogram( - # _REQUEST_DURATION_NAME[0], - # "ms", - # "live metrics avg request duration in ms" - # ) + if span.kind in (SpanKind.SERVER, SpanKind.CONSUMER): + if success: + self._request_rate_counter.add(1) + else: + self._request_failed_rate_counter.add(1) + self._request_duration.record(duration_ms) + else: + if success: + self._dependency_rate_counter.add(1) + else: + self._dependency_failure_rate_counter.add(1) -# def record_span(self, span: ReadableSpan): # def record_span_for_quickpulse() \ No newline at end of file diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_state.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_state.py index 294633dd5f2d..9f5ef1ccf07f 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_state.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_state.py @@ -12,14 +12,13 @@ class _QuickpulseState(Enum): """Current state of quickpulse service. The numerical value represents the ping/post interval in ms for those states. """ - + OFFLINE = 0 PING_SHORT = _SHORT_PING_INTERVAL_SECONDS PING_LONG = _LONG_PING_INTERVAL_SECONDS POST_SHORT = _POST_INTERVAL_SECONDS -_GLOBAL_QUICKPULSE_STATE = _QuickpulseState.PING_SHORT - +_GLOBAL_QUICKPULSE_STATE = _QuickpulseState.OFFLINE def _set_global_quickpulse_state(state: _QuickpulseState): global _GLOBAL_QUICKPULSE_STATE @@ -28,3 +27,18 @@ def _set_global_quickpulse_state(state: _QuickpulseState): def _get_global_quickpulse_state() -> _QuickpulseState: return _GLOBAL_QUICKPULSE_STATE + + +def is_quick_pulse_enabled() -> bool: + return _get_global_quickpulse_state() is not _QuickpulseState.OFFLINE + + +def _is_ping_state() -> bool: + return _get_global_quickpulse_state() in ( + _QuickpulseState.PING_SHORT, + _QuickpulseState.PING_LONG + ) + + +def _is_post_state(): + return _get_global_quickpulse_state() is _QuickpulseState.POST_SHORT diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_utils.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_utils.py new file mode 100644 index 000000000000..0347c36496c2 --- /dev/null +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_utils.py @@ -0,0 +1,127 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +from datetime import datetime, timedelta, timezone +from typing import List, Optional, Union + +from opentelemetry.trace import SpanKind +from opentelemetry.util.types import Attributes +from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.sdk.metrics._internal.point import ( + NumberDataPoint, + HistogramDataPoint, +) +from opentelemetry.sdk.metrics.export import MetricsData as OTMetricsData +from opentelemetry.sdk.trace import ReadableSpan +from azure.monitor.opentelemetry.exporter._quickpulse._constants import ( + _DocumentIngressDocumentType, + _QUICKPULSE_METRIC_NAME_MAPPINGS, +) +from azure.monitor.opentelemetry.exporter._quickpulse._generated.models import ( + DocumentIngress, + MetricPoint, + MonitoringDataPoint, + RemoteDependency as RemoteDependencyDocument, + Request as RequestDocument, +) +def _metric_to_quick_pulse_data_points( # pylint: disable=too-many-nested-blocks + metrics_data: OTMetricsData, + base_monitoring_data_point: MonitoringDataPoint, + documents: Optional[List[DocumentIngress]], +) -> List[MonitoringDataPoint]: + metric_points = [] + for resource_metric in metrics_data.resource_metrics: + for scope_metric in resource_metric.scope_metrics: + for metric in scope_metric.metrics: + for point in metric.data.data_points: + if point is not None: + metric_point = MetricPoint( + name=_QUICKPULSE_METRIC_NAME_MAPPINGS[metric.name.lower()], + weight=1, + ) + if isinstance(point, HistogramDataPoint): + metric_point.value = point.sum + elif isinstance(point, NumberDataPoint): + metric_point.value = point.value + else: + metric_point.value = 0 + metric_points.append(metric_point) + return [ + MonitoringDataPoint( + version=base_monitoring_data_point.version, + instance=base_monitoring_data_point.instance, + role_name=base_monitoring_data_point.role_name, + machine_name=base_monitoring_data_point.machine_name, + stream_id=base_monitoring_data_point.stream_id, + timestamp=datetime.now(tz=timezone.utc), + metrics=metric_points, + documents=documents, + ) + ] + +def _get_span_document(span: ReadableSpan) -> Union[RemoteDependencyDocument, RequestDocument]: + duration = span.end_time - span.start_time + status_code = span.attributes.get(SpanAttributes.HTTP_STATUS_CODE) + grpc_status_code = span.attributes.get(SpanAttributes.RPC_GRPC_STATUS_CODE) + span_kind = span.kind + url = _get_url(span_kind, span.attributes) + if span_kind in (SpanKind.CLIENT, SpanKind.PRODUCER): + document = RemoteDependencyDocument( + document_type=_DocumentIngressDocumentType.RemoteDependency, + name=span.name, + command_name=url, + result_code=status_code, + duration=_ns_to_iso8601_string(duration), + ) + else: + if status_code: + code = str(status_code) + else: + code = str(grpc_status_code) + document = RequestDocument( + document_type=_DocumentIngressDocumentType.Request, + name=span.name, + url=url, + response_code=code, + duration=_ns_to_iso8601_string(duration), + ) + return document + + +def _get_url(span_kind: SpanKind, attributes: Attributes) -> str: + if not attributes: + return "" + http_method = attributes.get(SpanAttributes.HTTP_METHOD) + if http_method: + http_scheme = attributes.get(SpanAttributes.HTTP_SCHEME) + # Client + if span_kind in (SpanKind.CLIENT, SpanKind.PRODUCER): + http_url = attributes.get(SpanAttributes.HTTP_URL) + if http_url: + return str(http_url) + else: + host = attributes.get(SpanAttributes.NET_PEER_NAME) + port = attributes.get(SpanAttributes.NET_PEER_PORT, "") + ip = attributes.get(SpanAttributes.NET_PEER_IP) + if http_scheme: + if host: + return f"{http_scheme}://{host}:{port}" + else: + return f"{http_scheme}://{ip}:{port}" + else: # Server + host = attributes.get(SpanAttributes.NET_HOST_NAME) + port = attributes.get(SpanAttributes.NET_HOST_PORT) + http_target = attributes.get(SpanAttributes.HTTP_TARGET, "") + if http_scheme and host: + http_host = attributes.get(SpanAttributes.HTTP_HOST) + if http_host: + return f"{http_scheme}://{http_host}:{port}{http_target}" + return "" + + +def _ns_to_iso8601_string(nanoseconds: int) -> str: + seconds, nanoseconds_remainder = divmod(nanoseconds, 1e9) + microseconds = nanoseconds_remainder // 1000 # Convert nanoseconds to microseconds + dt = datetime.utcfromtimestamp(seconds) + dt_microseconds = timedelta(microseconds=microseconds) + dt_with_microseconds = dt + dt_microseconds + return dt_with_microseconds.isoformat() diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_utils.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_utils.py index a1510d736290..9ef51c6202f5 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_utils.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_utils.py @@ -116,7 +116,7 @@ def _getlocale(): } -def ns_to_duration(nanoseconds: int): +def ns_to_duration(nanoseconds: int) -> str: value = (nanoseconds + 500000) // 1000000 # duration in milliseconds value, microseconds = divmod(value, 1000) value, seconds = divmod(value, 60) From e2df1e0832d8dab5b8a570cda9a31c86fbfaec91 Mon Sep 17 00:00:00 2001 From: Leighton Chen Date: Mon, 4 Mar 2024 20:10:42 -0800 Subject: [PATCH 03/21] exceptions --- .../exporter/_quickpulse/_exporter.py | 9 ++-- .../exporter/_quickpulse/_live_metrics.py | 49 +++++++++++++++---- .../_quickpulse/_log_record_processor.py | 19 +++++++ .../exporter/_quickpulse/_span_processor.py | 13 +++++ .../exporter/_quickpulse/_utils.py | 26 ++++++++-- 5 files changed, 97 insertions(+), 19 deletions(-) create mode 100644 sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_log_record_processor.py create mode 100644 sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_span_processor.py diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_exporter.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_exporter.py index 57594451ea89..81d7dc9ae44c 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_exporter.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_exporter.py @@ -46,13 +46,10 @@ from azure.monitor.opentelemetry.exporter._utils import _ticks_since_dot_net_epoch, PeriodicTask -_APPLICATION_INSIGHTS_METRIC_TEMPORALITIES = { +_QUICKPULSE_METRIC_TEMPORALITIES = { + # Use DELTA temporalities because we want to reset the counts every collection interval Counter: AggregationTemporality.DELTA, Histogram: AggregationTemporality.DELTA, - ObservableCounter: AggregationTemporality.DELTA, - ObservableGauge: AggregationTemporality.CUMULATIVE, - ObservableUpDownCounter: AggregationTemporality.CUMULATIVE, - UpDownCounter: AggregationTemporality.CUMULATIVE, } @@ -89,7 +86,7 @@ def __init__(self, connection_string: Optional[str]) -> None: MetricExporter.__init__( self, - preferred_temporality=_APPLICATION_INSIGHTS_METRIC_TEMPORALITIES, # type: ignore + preferred_temporality=_QUICKPULSE_METRIC_TEMPORALITIES, # type: ignore ) def export( diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py index 0c0d04c9022e..d0fd2c2aae06 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py @@ -3,17 +3,20 @@ import platform from typing import Any, Optional -from opentelemetry.trace import SpanKind -from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.sdk._logs import LogData from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import ReadableSpan from opentelemetry.sdk.trace.id_generator import RandomIdGenerator -from opentelemetry.sdk.resources import Resource +from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.trace import SpanKind + from azure.monitor.opentelemetry.exporter._generated.models import ContextTagKeys from azure.monitor.opentelemetry.exporter._quickpulse._constants import ( _DEPENDENCY_DURATION_NAME, _DEPENDENCY_FAILURE_RATE_NAME, _DEPENDENCY_RATE_NAME, + _EXCEPTION_RATE_NAME, _REQUEST_DURATION_NAME, _REQUEST_FAILURE_RATE_NAME, _REQUEST_RATE_NAME, @@ -29,6 +32,7 @@ _set_global_quickpulse_state, ) from azure.monitor.opentelemetry.exporter._quickpulse._utils import ( + _get_log_record_document, _get_span_document, ) from azure.monitor.opentelemetry.exporter._utils import ( @@ -39,7 +43,7 @@ def enable_live_metrics(**kwargs: Any) -> None: - """Azure Monitor base exporter for OpenTelemetry. + """Live metrics entry point. :keyword str connection_string: The connection string used for your Application Insights resource. :keyword Resource resource: The OpenTelemetry Resource used for this Python application. @@ -48,12 +52,20 @@ def enable_live_metrics(**kwargs: Any) -> None: _QuickpulseManager(kwargs.get('connection_string'), kwargs.get('resource')) +# Used by _QuickpulseSpanProcessor to record live metrics on span record def record_span(span: ReadableSpan) -> None: qpm = _QuickpulseManager._instance if qpm: qpm._record_span(span) +# Used by _QuickpulseLogRecordProcessor to record live metrics on log data record +def record_log_record(log_data: LogData) -> None: + qpm = _QuickpulseManager._instance + if qpm: + qpm._record_log_record(log_data) + + class _QuickpulseManager(metaclass=Singleton): def __init__(self, connection_string: Optional[str], resource: Optional[Resource]) -> None: @@ -108,13 +120,21 @@ def __init__(self, connection_string: Optional[str], resource: Optional[Resource "dep/sec", "live metrics dependency failure rate per second" ) + self._exception_rate_counter = self._meter.create_counter( + _EXCEPTION_RATE_NAME[0], + "exc/sec", + "live metrics exception rate per second" + ) - def _record_span(self, span: ReadableSpan): + def _record_span(self, span: ReadableSpan) -> None: + # Only record if in post state if _is_post_state(): + # TODO: Include DocumentIngress in payload document = _get_span_document(span) duration_ms = (span.end_time - span.start_time) / 1e9 - status_code = str(span.attributes.get(SpanAttributes.HTTP_STATUS_CODE), "") - success = status_code == "200" + # status_code = str(span.attributes.get(SpanAttributes.HTTP_STATUS_CODE), "") + # success = status_code == "200" + success = span.status.is_ok if span.kind in (SpanKind.SERVER, SpanKind.CONSUMER): if success: @@ -128,6 +148,15 @@ def _record_span(self, span: ReadableSpan): else: self._dependency_failure_rate_counter.add(1) - - -# def record_span_for_quickpulse() \ No newline at end of file + def _record_log_record(self, log_data: LogData) -> None: + # Only record if in post state + if _is_post_state(): + if log_data.log_record: + log_record = log_data.log_record + if log_record.attributes: + # TODO: Include DocumentIngress in payload + document = _get_log_record_document(log_data) + exc_type = log_record.attributes.get(SpanAttributes.EXCEPTION_TYPE) + exc_message = log_record.attributes.get(SpanAttributes.EXCEPTION_MESSAGE) + if exc_type is not None or exc_message is not None: + self._exception_rate_counter.add(1) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_log_record_processor.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_log_record_processor.py new file mode 100644 index 000000000000..04de86e0c405 --- /dev/null +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_log_record_processor.py @@ -0,0 +1,19 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from opentelemetry.sdk._logs import LogData, LogRecordProcessor + +from azure.monitor.opentelemetry.exporter._quickpulse._live_metrics import record_log_record + + +class _QuickpulseLogRecordProcessor(LogRecordProcessor): + + def emit(self, log_data: LogData) -> None: + record_log_record(log_data) + super().emit(log_data) + + def shutdown(self): + super().shutdown() + + def force_flush(self, timeout_millis: int = 30000): + super().force_flush(timeout_millis=timeout_millis) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_span_processor.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_span_processor.py new file mode 100644 index 000000000000..080ff6af95e1 --- /dev/null +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_span_processor.py @@ -0,0 +1,13 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from opentelemetry.sdk.trace import ReadableSpan, SpanProcessor + +from azure.monitor.opentelemetry.exporter._quickpulse._live_metrics import record_span + + +class _QuickpulseSpanProcessor(SpanProcessor): + + def on_end(self, span: ReadableSpan) -> None: + record_span(span) + return super().on_end(span) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_utils.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_utils.py index 0347c36496c2..68ca4f0eefc3 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_utils.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_utils.py @@ -3,25 +3,29 @@ from datetime import datetime, timedelta, timezone from typing import List, Optional, Union -from opentelemetry.trace import SpanKind -from opentelemetry.util.types import Attributes -from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.sdk._logs import LogData from opentelemetry.sdk.metrics._internal.point import ( NumberDataPoint, HistogramDataPoint, ) from opentelemetry.sdk.metrics.export import MetricsData as OTMetricsData from opentelemetry.sdk.trace import ReadableSpan +from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.trace import SpanKind +from opentelemetry.util.types import Attributes + from azure.monitor.opentelemetry.exporter._quickpulse._constants import ( _DocumentIngressDocumentType, _QUICKPULSE_METRIC_NAME_MAPPINGS, ) from azure.monitor.opentelemetry.exporter._quickpulse._generated.models import ( DocumentIngress, + Exception as ExceptionDocument, MetricPoint, MonitoringDataPoint, RemoteDependency as RemoteDependencyDocument, Request as RequestDocument, + Trace as TraceDocument, ) def _metric_to_quick_pulse_data_points( # pylint: disable=too-many-nested-blocks metrics_data: OTMetricsData, @@ -86,6 +90,22 @@ def _get_span_document(span: ReadableSpan) -> Union[RemoteDependencyDocument, Re ) return document +def _get_log_record_document(log_data: LogData) -> Union[ExceptionDocument, TraceDocument]: + exc_type = log_data.log_record.attributes.get(SpanAttributes.EXCEPTION_TYPE) + exc_message = log_data.log_record.attributes.get(SpanAttributes.EXCEPTION_MESSAGE) + if exc_type is not None or exc_message is not None: + document = ExceptionDocument( + document_type=_DocumentIngressDocumentType.Exception, + exception_type=exc_type, + exception_message=exc_message, + ) + else: + document = TraceDocument( + document_type=_DocumentIngressDocumentType.Trace, + message=log_data.log_record.body, + ) + return document + def _get_url(span_kind: SpanKind, attributes: Attributes) -> str: if not attributes: From 7c031c414d4a0cf973bf0efaef7bf6e150303186 Mon Sep 17 00:00:00 2001 From: Leighton Chen Date: Mon, 4 Mar 2024 20:12:02 -0800 Subject: [PATCH 04/21] Update CHANGELOG.md --- sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md b/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md index 8ca743f46635..8adfea127e0d 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md @@ -4,6 +4,9 @@ ### Features Added +- Add live metrics collection of requests/dependencies/exceptions + ([#34141](https://github.com/Azure/azure-sdk-for-python/pull/34141)) + ### Breaking Changes ### Bugs Fixed From bc4a40ae4de3f77c93dd1d6b6b390b5aaa45a714 Mon Sep 17 00:00:00 2001 From: Leighton Chen Date: Wed, 6 Mar 2024 10:32:31 -0800 Subject: [PATCH 05/21] utils --- .../monitor/opentelemetry/exporter/_quickpulse/_utils.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_utils.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_utils.py index 68ca4f0eefc3..724c35edaf72 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_utils.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_utils.py @@ -43,7 +43,10 @@ def _metric_to_quick_pulse_data_points( # pylint: disable=too-many-nested-block weight=1, ) if isinstance(point, HistogramDataPoint): - metric_point.value = point.sum + if point.count > 0: + metric_point.value = point.sum / point.count + else: + metric_point.value = 0 elif isinstance(point, NumberDataPoint): metric_point.value = point.value else: From d6901b469d195183f1b985ea708a5f08da207abf Mon Sep 17 00:00:00 2001 From: Leighton Chen Date: Wed, 6 Mar 2024 10:32:59 -0800 Subject: [PATCH 06/21] sample --- .../samples/metrics/live.py | 85 +++++++++++++++++++ 1 file changed, 85 insertions(+) create mode 100644 sdk/monitor/azure-monitor-opentelemetry-exporter/samples/metrics/live.py diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/samples/metrics/live.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/samples/metrics/live.py new file mode 100644 index 000000000000..f41eab6768b3 --- /dev/null +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/samples/metrics/live.py @@ -0,0 +1,85 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +# cSpell:disable +import flask +import os +import logging +import time +import requests + +from azure.monitor.opentelemetry.exporter._quickpulse._live_metrics import enable_live_metrics +from azure.monitor.opentelemetry.exporter._quickpulse._log_record_processor import _QuickpulseLogRecordProcessor +from azure.monitor.opentelemetry.exporter._quickpulse._span_processor import _QuickpulseSpanProcessor +from opentelemetry.sdk.resources import Resource +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.instrumentation.flask import FlaskInstrumentor +from opentelemetry.instrumentation.requests import RequestsInstrumentor + +from azure.monitor.opentelemetry.exporter import AzureMonitorTraceExporter + +from opentelemetry._logs import ( + get_logger_provider, + set_logger_provider, +) +from opentelemetry.sdk._logs import ( + LoggerProvider, + LoggingHandler, +) +from opentelemetry.sdk._logs.export import BatchLogRecordProcessor +from azure.monitor.opentelemetry.exporter import AzureMonitorLogExporter + +FlaskInstrumentor().instrument() + +app = flask.Flask(__name__) + +manager = enable_live_metrics( + connection_string=os.environ["APPLICATIONINSIGHTS_CONNECTION_STRING"], + resource=Resource.create() +) + +exporter = AzureMonitorTraceExporter.from_connection_string( + os.environ["APPLICATIONINSIGHTS_CONNECTION_STRING"] +) + +RequestsInstrumentor().instrument() + +tracer_provider = TracerProvider() +trace.set_tracer_provider(tracer_provider) +tracer = trace.get_tracer(__name__) +qp_span_processor = _QuickpulseSpanProcessor() +trace.get_tracer_provider().add_span_processor(qp_span_processor) + +# logger_provider = LoggerProvider() +# set_logger_provider(logger_provider) +# exporter = AzureMonitorLogExporter.from_connection_string( +# os.environ["APPLICATIONINSIGHTS_CONNECTION_STRING"] +# ) +# qp_log_processor = _QuickpulseLogRecordProcessor() +# get_logger_provider().add_log_record_processor(qp_log_processor) + +# # Attach LoggingHandler to namespaced logger +# handler = LoggingHandler() +# logger = logging.getLogger(__name__) +# logger.addHandler(handler) +# logger.setLevel(logging.INFO) + +# while True: +# logger.exception("Hello World!") + +# @app.route("/") +# def test(): +# return "Test flask request" + +while True: + try: + with tracer.start_as_current_span("hello") as span: + time.sleep(0.2) + requests.get("https://httpstat.us/200") + raise Exception("Custom exception message.") + except Exception: + print("Exception raised") + +# if __name__ == "__main__": +# app.run(host="localhost", port=8080, threaded=True) From b1769dc8e08e244c30fe709f551ff8315e4a004b Mon Sep 17 00:00:00 2001 From: Leighton Chen Date: Wed, 6 Mar 2024 14:28:23 -0800 Subject: [PATCH 07/21] sample --- .../exporter/_quickpulse/_exporter.py | 4 - .../exporter/_quickpulse/_live_metrics.py | 3 +- .../samples/metrics/live.py | 85 ------------------- 3 files changed, 1 insertion(+), 91 deletions(-) delete mode 100644 sdk/monitor/azure-monitor-opentelemetry-exporter/samples/metrics/live.py diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_exporter.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_exporter.py index 81d7dc9ae44c..6c5019813400 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_exporter.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_exporter.py @@ -11,10 +11,6 @@ from opentelemetry.sdk.metrics import ( Counter, Histogram, - ObservableCounter, - ObservableGauge, - ObservableUpDownCounter, - UpDownCounter, ) from opentelemetry.sdk.metrics._internal.point import MetricsData from opentelemetry.sdk.metrics.export import ( diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py index d0fd2c2aae06..44072d4df332 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py @@ -132,8 +132,7 @@ def _record_span(self, span: ReadableSpan) -> None: # TODO: Include DocumentIngress in payload document = _get_span_document(span) duration_ms = (span.end_time - span.start_time) / 1e9 - # status_code = str(span.attributes.get(SpanAttributes.HTTP_STATUS_CODE), "") - # success = status_code == "200" + # TODO: Spec out what "success" is success = span.status.is_ok if span.kind in (SpanKind.SERVER, SpanKind.CONSUMER): diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/samples/metrics/live.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/samples/metrics/live.py deleted file mode 100644 index f41eab6768b3..000000000000 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/samples/metrics/live.py +++ /dev/null @@ -1,85 +0,0 @@ -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. -# cSpell:disable -import flask -import os -import logging -import time -import requests - -from azure.monitor.opentelemetry.exporter._quickpulse._live_metrics import enable_live_metrics -from azure.monitor.opentelemetry.exporter._quickpulse._log_record_processor import _QuickpulseLogRecordProcessor -from azure.monitor.opentelemetry.exporter._quickpulse._span_processor import _QuickpulseSpanProcessor -from opentelemetry.sdk.resources import Resource -from opentelemetry import trace -from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.trace.export import BatchSpanProcessor -from opentelemetry.instrumentation.flask import FlaskInstrumentor -from opentelemetry.instrumentation.requests import RequestsInstrumentor - -from azure.monitor.opentelemetry.exporter import AzureMonitorTraceExporter - -from opentelemetry._logs import ( - get_logger_provider, - set_logger_provider, -) -from opentelemetry.sdk._logs import ( - LoggerProvider, - LoggingHandler, -) -from opentelemetry.sdk._logs.export import BatchLogRecordProcessor -from azure.monitor.opentelemetry.exporter import AzureMonitorLogExporter - -FlaskInstrumentor().instrument() - -app = flask.Flask(__name__) - -manager = enable_live_metrics( - connection_string=os.environ["APPLICATIONINSIGHTS_CONNECTION_STRING"], - resource=Resource.create() -) - -exporter = AzureMonitorTraceExporter.from_connection_string( - os.environ["APPLICATIONINSIGHTS_CONNECTION_STRING"] -) - -RequestsInstrumentor().instrument() - -tracer_provider = TracerProvider() -trace.set_tracer_provider(tracer_provider) -tracer = trace.get_tracer(__name__) -qp_span_processor = _QuickpulseSpanProcessor() -trace.get_tracer_provider().add_span_processor(qp_span_processor) - -# logger_provider = LoggerProvider() -# set_logger_provider(logger_provider) -# exporter = AzureMonitorLogExporter.from_connection_string( -# os.environ["APPLICATIONINSIGHTS_CONNECTION_STRING"] -# ) -# qp_log_processor = _QuickpulseLogRecordProcessor() -# get_logger_provider().add_log_record_processor(qp_log_processor) - -# # Attach LoggingHandler to namespaced logger -# handler = LoggingHandler() -# logger = logging.getLogger(__name__) -# logger.addHandler(handler) -# logger.setLevel(logging.INFO) - -# while True: -# logger.exception("Hello World!") - -# @app.route("/") -# def test(): -# return "Test flask request" - -while True: - try: - with tracer.start_as_current_span("hello") as span: - time.sleep(0.2) - requests.get("https://httpstat.us/200") - raise Exception("Custom exception message.") - except Exception: - print("Exception raised") - -# if __name__ == "__main__": -# app.run(host="localhost", port=8080, threaded=True) From 8c104eb01b21232167a6c9b3082dd25a17580ae4 Mon Sep 17 00:00:00 2001 From: Leighton Chen Date: Wed, 6 Mar 2024 15:25:24 -0800 Subject: [PATCH 08/21] documentingress --- .../exporter/_quickpulse/_exporter.py | 4 ++-- .../exporter/_quickpulse/_live_metrics.py | 5 ++-- .../exporter/_quickpulse/_state.py | 24 ++++++++++++++++++- .../exporter/_quickpulse/_utils.py | 2 +- 4 files changed, 29 insertions(+), 6 deletions(-) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_exporter.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_exporter.py index 6c5019813400..630294218ee6 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_exporter.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_exporter.py @@ -33,6 +33,7 @@ _get_global_quickpulse_state, _is_ping_state, _set_global_quickpulse_state, + _get_and_clear_quickpulse_documents, _QuickpulseState, ) from azure.monitor.opentelemetry.exporter._quickpulse._utils import ( @@ -107,7 +108,7 @@ def export( data_points = _metric_to_quick_pulse_data_points( metrics_data, base_monitoring_data_point=base_monitoring_data_point, - documents=kwargs.get("documents"), + documents=_get_and_clear_quickpulse_documents(), ) token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) @@ -257,7 +258,6 @@ def _receive_metrics( metrics_data, timeout_millis=timeout_millis, base_monitoring_data_point=self._base_monitoring_data_point, - documents=[], ) if result is MetricExportResult.FAILURE: # There is currently no way to propagate unsuccessful metric post so diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py index 44072d4df332..011c7a96a349 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py @@ -29,6 +29,7 @@ from azure.monitor.opentelemetry.exporter._quickpulse._state import ( _QuickpulseState, _is_post_state, + _append_quickpulse_document, _set_global_quickpulse_state, ) from azure.monitor.opentelemetry.exporter._quickpulse._utils import ( @@ -129,8 +130,8 @@ def __init__(self, connection_string: Optional[str], resource: Optional[Resource def _record_span(self, span: ReadableSpan) -> None: # Only record if in post state if _is_post_state(): - # TODO: Include DocumentIngress in payload document = _get_span_document(span) + _append_quickpulse_document(document) duration_ms = (span.end_time - span.start_time) / 1e9 # TODO: Spec out what "success" is success = span.status.is_ok @@ -153,8 +154,8 @@ def _record_log_record(self, log_data: LogData) -> None: if log_data.log_record: log_record = log_data.log_record if log_record.attributes: - # TODO: Include DocumentIngress in payload document = _get_log_record_document(log_data) + _append_quickpulse_document(document) exc_type = log_record.attributes.get(SpanAttributes.EXCEPTION_TYPE) exc_message = log_record.attributes.get(SpanAttributes.EXCEPTION_MESSAGE) if exc_type is not None or exc_message is not None: diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_state.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_state.py index 9f5ef1ccf07f..f35af2b0bbf9 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_state.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_state.py @@ -1,12 +1,15 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. from enum import Enum +from typing import List from azure.monitor.opentelemetry.exporter._quickpulse._constants import ( _LONG_PING_INTERVAL_SECONDS, _POST_INTERVAL_SECONDS, _SHORT_PING_INTERVAL_SECONDS, ) +from azure.monitor.opentelemetry.exporter._quickpulse._generated.models import DocumentIngress + class _QuickpulseState(Enum): """Current state of quickpulse service. @@ -19,6 +22,7 @@ class _QuickpulseState(Enum): _GLOBAL_QUICKPULSE_STATE = _QuickpulseState.OFFLINE +_QUICKPULSE_DOCUMENTS = [] def _set_global_quickpulse_state(state: _QuickpulseState): global _GLOBAL_QUICKPULSE_STATE @@ -29,7 +33,7 @@ def _get_global_quickpulse_state() -> _QuickpulseState: return _GLOBAL_QUICKPULSE_STATE -def is_quick_pulse_enabled() -> bool: +def is_quickpulse_enabled() -> bool: return _get_global_quickpulse_state() is not _QuickpulseState.OFFLINE @@ -42,3 +46,21 @@ def _is_ping_state() -> bool: def _is_post_state(): return _get_global_quickpulse_state() is _QuickpulseState.POST_SHORT + + +def _append_quickpulse_document(document: DocumentIngress): + global _QUICKPULSE_DOCUMENTS + # Limit risk of memory leak by limiting doc length to something manageable + if len(_QUICKPULSE_DOCUMENTS) > 20: + try: + _QUICKPULSE_DOCUMENTS.pop(0) + except IndexError: + pass + _QUICKPULSE_DOCUMENTS.append(document) + + +def _get_and_clear_quickpulse_documents() -> List[DocumentIngress]: + global _QUICKPULSE_DOCUMENTS + documents = list(_QUICKPULSE_DOCUMENTS) + _QUICKPULSE_DOCUMENTS = [] + return documents diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_utils.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_utils.py index 724c35edaf72..c2cd923f4fb9 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_utils.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_utils.py @@ -71,7 +71,7 @@ def _get_span_document(span: ReadableSpan) -> Union[RemoteDependencyDocument, Re grpc_status_code = span.attributes.get(SpanAttributes.RPC_GRPC_STATUS_CODE) span_kind = span.kind url = _get_url(span_kind, span.attributes) - if span_kind in (SpanKind.CLIENT, SpanKind.PRODUCER): + if span_kind in (SpanKind.CLIENT, SpanKind.PRODUCER, SpanKind.INTERNAL): document = RemoteDependencyDocument( document_type=_DocumentIngressDocumentType.RemoteDependency, name=span.name, From d4ebbfd72d88e62ddfde004baa8489d0b214a832 Mon Sep 17 00:00:00 2001 From: Leighton Chen Date: Fri, 8 Mar 2024 11:47:41 -0800 Subject: [PATCH 09/21] feedback --- .../CHANGELOG.md | 2 +- .../exporter/_quickpulse/_live_metrics.py | 20 +++------- .../_quickpulse/_log_record_processor.py | 10 +++-- .../exporter/_quickpulse/_span_processor.py | 6 ++- .../exporter/_quickpulse/_state.py | 5 ++- .../exporter/_quickpulse/_utils.py | 40 +++++++++++-------- .../tests/quickpulse/test_exporter.py | 1 - 7 files changed, 43 insertions(+), 41 deletions(-) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md b/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md index 8adfea127e0d..d589309891ac 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md @@ -5,7 +5,7 @@ ### Features Added - Add live metrics collection of requests/dependencies/exceptions - ([#34141](https://github.com/Azure/azure-sdk-for-python/pull/34141)) + ([#34673](https://github.com/Azure/azure-sdk-for-python/pull/34673)) ### Breaking Changes diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py index 011c7a96a349..9f4895ef7836 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py @@ -53,20 +53,7 @@ def enable_live_metrics(**kwargs: Any) -> None: _QuickpulseManager(kwargs.get('connection_string'), kwargs.get('resource')) -# Used by _QuickpulseSpanProcessor to record live metrics on span record -def record_span(span: ReadableSpan) -> None: - qpm = _QuickpulseManager._instance - if qpm: - qpm._record_span(span) - - -# Used by _QuickpulseLogRecordProcessor to record live metrics on log data record -def record_log_record(log_data: LogData) -> None: - qpm = _QuickpulseManager._instance - if qpm: - qpm._record_log_record(log_data) - - +# pylint: disable=protected-access,too-many-instance-attributes class _QuickpulseManager(metaclass=Singleton): def __init__(self, connection_string: Optional[str], resource: Optional[Resource]) -> None: @@ -132,7 +119,9 @@ def _record_span(self, span: ReadableSpan) -> None: if _is_post_state(): document = _get_span_document(span) _append_quickpulse_document(document) - duration_ms = (span.end_time - span.start_time) / 1e9 + duration_ms = 0 + if span.end_time and span.start_time: + duration_ms = (span.end_time - span.start_time) / 1e9 # TODO: Spec out what "success" is success = span.status.is_ok @@ -147,6 +136,7 @@ def _record_span(self, span: ReadableSpan) -> None: self._dependency_rate_counter.add(1) else: self._dependency_failure_rate_counter.add(1) + self._dependency_duration.record(duration_ms) def _record_log_record(self, log_data: LogData) -> None: # Only record if in post state diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_log_record_processor.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_log_record_processor.py index 04de86e0c405..ed8d5aefed0a 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_log_record_processor.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_log_record_processor.py @@ -3,17 +3,19 @@ from opentelemetry.sdk._logs import LogData, LogRecordProcessor -from azure.monitor.opentelemetry.exporter._quickpulse._live_metrics import record_log_record +from azure.monitor.opentelemetry.exporter._quickpulse._live_metrics import _QuickpulseManager class _QuickpulseLogRecordProcessor(LogRecordProcessor): def emit(self, log_data: LogData) -> None: - record_log_record(log_data) + qpm = _QuickpulseManager._instance + if qpm: + qpm._record_log_record(log_data) super().emit(log_data) - + def shutdown(self): - super().shutdown() + pass def force_flush(self, timeout_millis: int = 30000): super().force_flush(timeout_millis=timeout_millis) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_span_processor.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_span_processor.py index 080ff6af95e1..fd89f7f84496 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_span_processor.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_span_processor.py @@ -3,11 +3,13 @@ from opentelemetry.sdk.trace import ReadableSpan, SpanProcessor -from azure.monitor.opentelemetry.exporter._quickpulse._live_metrics import record_span +from azure.monitor.opentelemetry.exporter._quickpulse._live_metrics import _QuickpulseManager class _QuickpulseSpanProcessor(SpanProcessor): def on_end(self, span: ReadableSpan) -> None: - record_span(span) + qpm = _QuickpulseManager._instance + if qpm: + qpm._record_span(span) return super().on_end(span) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_state.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_state.py index f35af2b0bbf9..05c78c72ebb5 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_state.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_state.py @@ -22,9 +22,10 @@ class _QuickpulseState(Enum): _GLOBAL_QUICKPULSE_STATE = _QuickpulseState.OFFLINE -_QUICKPULSE_DOCUMENTS = [] +_QUICKPULSE_DOCUMENTS: List[DocumentIngress] = [] def _set_global_quickpulse_state(state: _QuickpulseState): + # pylint: disable=global-statement global _GLOBAL_QUICKPULSE_STATE _GLOBAL_QUICKPULSE_STATE = state @@ -49,6 +50,7 @@ def _is_post_state(): def _append_quickpulse_document(document: DocumentIngress): + # pylint: disable=global-statement,global-variable-not-assigned global _QUICKPULSE_DOCUMENTS # Limit risk of memory leak by limiting doc length to something manageable if len(_QUICKPULSE_DOCUMENTS) > 20: @@ -60,6 +62,7 @@ def _append_quickpulse_document(document: DocumentIngress): def _get_and_clear_quickpulse_documents() -> List[DocumentIngress]: + # pylint: disable=global-statement global _QUICKPULSE_DOCUMENTS documents = list(_QUICKPULSE_DOCUMENTS) _QUICKPULSE_DOCUMENTS = [] diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_utils.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_utils.py index c2cd923f4fb9..12a3a71aa60e 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_utils.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_utils.py @@ -65,10 +65,13 @@ def _metric_to_quick_pulse_data_points( # pylint: disable=too-many-nested-block ) ] +# mypy: disable-error-code="assignment, union-attr" def _get_span_document(span: ReadableSpan) -> Union[RemoteDependencyDocument, RequestDocument]: - duration = span.end_time - span.start_time - status_code = span.attributes.get(SpanAttributes.HTTP_STATUS_CODE) - grpc_status_code = span.attributes.get(SpanAttributes.RPC_GRPC_STATUS_CODE) + duration = 0 + if span.end_time and span.start_time: + duration = span.end_time - span.start_time + status_code = span.attributes.get(SpanAttributes.HTTP_STATUS_CODE, "") + grpc_status_code = span.attributes.get(SpanAttributes.RPC_GRPC_STATUS_CODE, "") span_kind = span.kind url = _get_url(span_kind, span.attributes) if span_kind in (SpanKind.CLIENT, SpanKind.PRODUCER, SpanKind.INTERNAL): @@ -76,7 +79,7 @@ def _get_span_document(span: ReadableSpan) -> Union[RemoteDependencyDocument, Re document_type=_DocumentIngressDocumentType.RemoteDependency, name=span.name, command_name=url, - result_code=status_code, + result_code=str(status_code), duration=_ns_to_iso8601_string(duration), ) else: @@ -93,14 +96,15 @@ def _get_span_document(span: ReadableSpan) -> Union[RemoteDependencyDocument, Re ) return document +# mypy: disable-error-code="assignment" def _get_log_record_document(log_data: LogData) -> Union[ExceptionDocument, TraceDocument]: - exc_type = log_data.log_record.attributes.get(SpanAttributes.EXCEPTION_TYPE) - exc_message = log_data.log_record.attributes.get(SpanAttributes.EXCEPTION_MESSAGE) + exc_type = log_data.log_record.attributes.get(SpanAttributes.EXCEPTION_TYPE, "") + exc_message = log_data.log_record.attributes.get(SpanAttributes.EXCEPTION_MESSAGE, "") if exc_type is not None or exc_message is not None: document = ExceptionDocument( document_type=_DocumentIngressDocumentType.Exception, - exception_type=exc_type, - exception_message=exc_message, + exception_type=str(exc_type), + exception_message=str(exc_message), ) else: document = TraceDocument( @@ -114,6 +118,8 @@ def _get_url(span_kind: SpanKind, attributes: Attributes) -> str: if not attributes: return "" http_method = attributes.get(SpanAttributes.HTTP_METHOD) + if not http_method: + return "" if http_method: http_scheme = attributes.get(SpanAttributes.HTTP_SCHEME) # Client @@ -121,15 +127,15 @@ def _get_url(span_kind: SpanKind, attributes: Attributes) -> str: http_url = attributes.get(SpanAttributes.HTTP_URL) if http_url: return str(http_url) - else: - host = attributes.get(SpanAttributes.NET_PEER_NAME) - port = attributes.get(SpanAttributes.NET_PEER_PORT, "") - ip = attributes.get(SpanAttributes.NET_PEER_IP) - if http_scheme: - if host: - return f"{http_scheme}://{host}:{port}" - else: - return f"{http_scheme}://{ip}:{port}" + + host = attributes.get(SpanAttributes.NET_PEER_NAME) + port = attributes.get(SpanAttributes.NET_PEER_PORT, "") + ip = attributes.get(SpanAttributes.NET_PEER_IP) + if http_scheme: + if host: + return f"{http_scheme}://{host}:{port}" + else: + return f"{http_scheme}://{ip}:{port}" else: # Server host = attributes.get(SpanAttributes.NET_HOST_NAME) port = attributes.get(SpanAttributes.NET_HOST_PORT) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_exporter.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_exporter.py index a594e0517d55..18f40e87c303 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_exporter.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_exporter.py @@ -272,7 +272,6 @@ def test_quickpulsereader_receive_metrics(self, task_mock, export_mock): self._metrics_data, timeout_millis=20_000, base_monitoring_data_point=self._data_point, - documents=[], ) @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._exporter._QuickpulseExporter.export") From be403e45f3dcee3176a788b3a0f89fff8f48b688 Mon Sep 17 00:00:00 2001 From: Leighton Chen Date: Fri, 8 Mar 2024 11:50:16 -0800 Subject: [PATCH 10/21] Update _utils.py --- .../azure/monitor/opentelemetry/exporter/_quickpulse/_utils.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_utils.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_utils.py index 12a3a71aa60e..ff8c076c2837 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_utils.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_utils.py @@ -144,7 +144,6 @@ def _get_url(span_kind: SpanKind, attributes: Attributes) -> str: http_host = attributes.get(SpanAttributes.HTTP_HOST) if http_host: return f"{http_scheme}://{http_host}:{port}{http_target}" - return "" def _ns_to_iso8601_string(nanoseconds: int) -> str: From aeaa0d94679466497abc1db84b73d3c7ca98dcb3 Mon Sep 17 00:00:00 2001 From: Leighton Chen Date: Fri, 8 Mar 2024 12:39:48 -0800 Subject: [PATCH 11/21] record span --- ..._log_record_processor.py => _processor.py} | 10 ++ .../exporter/_quickpulse/_span_processor.py | 15 --- .../tests/quickpulse/test_live_metrics.py | 97 +++++++++++++++++++ .../tests/quickpulse/test_processor.py | 44 +++++++++ 4 files changed, 151 insertions(+), 15 deletions(-) rename sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/{_log_record_processor.py => _processor.py} (68%) delete mode 100644 sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_span_processor.py create mode 100644 sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_processor.py diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_log_record_processor.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_processor.py similarity index 68% rename from sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_log_record_processor.py rename to sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_processor.py index ed8d5aefed0a..5ad17896a7f5 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_log_record_processor.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_processor.py @@ -2,6 +2,7 @@ # Licensed under the MIT License. from opentelemetry.sdk._logs import LogData, LogRecordProcessor +from opentelemetry.sdk.trace import ReadableSpan, SpanProcessor from azure.monitor.opentelemetry.exporter._quickpulse._live_metrics import _QuickpulseManager @@ -19,3 +20,12 @@ def shutdown(self): def force_flush(self, timeout_millis: int = 30000): super().force_flush(timeout_millis=timeout_millis) + + +class _QuickpulseSpanProcessor(SpanProcessor): + + def on_end(self, span: ReadableSpan) -> None: + qpm = _QuickpulseManager._instance + if qpm: + qpm._record_span(span) + return super().on_end(span) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_span_processor.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_span_processor.py deleted file mode 100644 index fd89f7f84496..000000000000 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_span_processor.py +++ /dev/null @@ -1,15 +0,0 @@ -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. - -from opentelemetry.sdk.trace import ReadableSpan, SpanProcessor - -from azure.monitor.opentelemetry.exporter._quickpulse._live_metrics import _QuickpulseManager - - -class _QuickpulseSpanProcessor(SpanProcessor): - - def on_end(self, span: ReadableSpan) -> None: - qpm = _QuickpulseManager._instance - if qpm: - qpm._record_span(span) - return super().on_end(span) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_live_metrics.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_live_metrics.py index f5940f8fcbbd..4c4bd97996ef 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_live_metrics.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_live_metrics.py @@ -7,6 +7,7 @@ from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.resources import Resource, ResourceAttributes +from opentelemetry.trace import SpanKind from azure.monitor.opentelemetry.exporter._generated.models import ContextTagKeys from azure.monitor.opentelemetry.exporter._quickpulse._exporter import ( @@ -45,6 +46,10 @@ class TestQuickpulseManager(unittest.TestCase): def setUpClass(cls): _set_global_quickpulse_state(_QuickpulseState.PING_SHORT) + @classmethod + def tearDownClass(cls): + _set_global_quickpulse_state(_QuickpulseState.OFFLINE) + @mock.patch("opentelemetry.sdk.trace.id_generator.RandomIdGenerator.generate_trace_id") def test_init(self, generator_mock): generator_mock.return_value = "test_trace_id" @@ -127,3 +132,95 @@ def test_singleton(self): qpm2._base_monitoring_data_point.role_name, part_a_fields.get(ContextTagKeys.AI_CLOUD_ROLE, "") ) + + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._live_metrics._append_quickpulse_document") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._live_metrics._get_span_document") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._live_metrics._is_post_state") + def test_record_span_server_success(self, post_state_mock, span_doc_mock, append_doc_mock): + post_state_mock.return_value = True + span_doc = mock.Mock() + span_doc_mock.return_value = span_doc + span_mock = mock.Mock() + span_mock.end_time = 10 + span_mock.start_time = 5 + span_mock.status.is_ok = True + span_mock.kind = SpanKind.SERVER + qpm = _QuickpulseManager( + connection_string="InstrumentationKey=4321abcd-5678-4efa-8abc-1234567890ac;LiveEndpoint=https://eastus.livediagnostics.monitor.azure.com/", + resource=Resource.create(), + ) + qpm._request_rate_counter = mock.Mock() + qpm._request_duration = mock.Mock() + qpm._record_span(span_mock) + append_doc_mock.assert_called_once_with(span_doc) + qpm._request_rate_counter.add.assert_called_once_with(1) + qpm._request_duration.record.assert_called_once_with(5 / 1e9) + + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._live_metrics._append_quickpulse_document") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._live_metrics._get_span_document") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._live_metrics._is_post_state") + def test_record_span_server_failure(self, post_state_mock, span_doc_mock, append_doc_mock): + post_state_mock.return_value = True + span_doc = mock.Mock() + span_doc_mock.return_value = span_doc + span_mock = mock.Mock() + span_mock.end_time = 10 + span_mock.start_time = 5 + span_mock.status.is_ok = False + span_mock.kind = SpanKind.SERVER + qpm = _QuickpulseManager( + connection_string="InstrumentationKey=4321abcd-5678-4efa-8abc-1234567890ac;LiveEndpoint=https://eastus.livediagnostics.monitor.azure.com/", + resource=Resource.create(), + ) + qpm._request_failed_rate_counter = mock.Mock() + qpm._request_duration = mock.Mock() + qpm._record_span(span_mock) + append_doc_mock.assert_called_once_with(span_doc) + qpm._request_failed_rate_counter.add.assert_called_once_with(1) + qpm._request_duration.record.assert_called_once_with(5 / 1e9) + + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._live_metrics._append_quickpulse_document") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._live_metrics._get_span_document") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._live_metrics._is_post_state") + def test_record_span_dep_success(self, post_state_mock, span_doc_mock, append_doc_mock): + post_state_mock.return_value = True + span_doc = mock.Mock() + span_doc_mock.return_value = span_doc + span_mock = mock.Mock() + span_mock.end_time = 10 + span_mock.start_time = 5 + span_mock.status.is_ok = True + span_mock.kind = SpanKind.CLIENT + qpm = _QuickpulseManager( + connection_string="InstrumentationKey=4321abcd-5678-4efa-8abc-1234567890ac;LiveEndpoint=https://eastus.livediagnostics.monitor.azure.com/", + resource=Resource.create(), + ) + qpm._dependency_rate_counter = mock.Mock() + qpm._dependency_duration = mock.Mock() + qpm._record_span(span_mock) + append_doc_mock.assert_called_once_with(span_doc) + qpm._dependency_rate_counter.add.assert_called_once_with(1) + qpm._dependency_duration.record.assert_called_once_with(5 / 1e9) + + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._live_metrics._append_quickpulse_document") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._live_metrics._get_span_document") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._live_metrics._is_post_state") + def test_record_span_dep_failure(self, post_state_mock, span_doc_mock, append_doc_mock): + post_state_mock.return_value = True + span_doc = mock.Mock() + span_doc_mock.return_value = span_doc + span_mock = mock.Mock() + span_mock.end_time = 10 + span_mock.start_time = 5 + span_mock.status.is_ok = False + span_mock.kind = SpanKind.CLIENT + qpm = _QuickpulseManager( + connection_string="InstrumentationKey=4321abcd-5678-4efa-8abc-1234567890ac;LiveEndpoint=https://eastus.livediagnostics.monitor.azure.com/", + resource=Resource.create(), + ) + qpm._dependency_failure_rate_counter = mock.Mock() + qpm._dependency_duration = mock.Mock() + qpm._record_span(span_mock) + append_doc_mock.assert_called_once_with(span_doc) + qpm._dependency_failure_rate_counter.add.assert_called_once_with(1) + qpm._dependency_duration.record.assert_called_once_with(5 / 1e9) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_processor.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_processor.py new file mode 100644 index 000000000000..f3948b31e593 --- /dev/null +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_processor.py @@ -0,0 +1,44 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +import unittest +from unittest import mock + +from azure.monitor.opentelemetry.exporter._quickpulse._processor import ( + _QuickpulseLogRecordProcessor, + _QuickpulseSpanProcessor, +) +from azure.monitor.opentelemetry.exporter._quickpulse._live_metrics import _QuickpulseManager + + +class TestQuickpulseLogRecordProcessor(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.qpm = mock.Mock() + _QuickpulseManager._instance = cls.qpm + + @classmethod + def tearDownClass(cls) -> None: + _QuickpulseManager._instance = None + + def test_emit(self): + processor = _QuickpulseLogRecordProcessor() + log_data = mock.Mock() + processor.emit(log_data) + self.qpm._record_log_record.assert_called_once_with(log_data) + + +class TestQuickpulseSpanProcessor(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.qpm = mock.Mock() + _QuickpulseManager._instance = cls.qpm + + @classmethod + def tearDownClass(cls) -> None: + _QuickpulseManager._instance = None + + def test_on_end(self): + processor = _QuickpulseSpanProcessor() + span = mock.Mock() + processor.on_end(span) + self.qpm._record_span.assert_called_once_with(span) From b9799009f929816f4dcfd9fd7a0076be80827081 Mon Sep 17 00:00:00 2001 From: Leighton Chen Date: Fri, 8 Mar 2024 12:45:30 -0800 Subject: [PATCH 12/21] log_record --- .../tests/quickpulse/test_live_metrics.py | 23 +++++++++++++++++++ .../tests/quickpulse/test_utils.py | 0 2 files changed, 23 insertions(+) create mode 100644 sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_utils.py diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_live_metrics.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_live_metrics.py index 4c4bd97996ef..f1699bd6beb7 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_live_metrics.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_live_metrics.py @@ -7,6 +7,7 @@ from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.resources import Resource, ResourceAttributes +from opentelemetry.semconv.trace import SpanAttributes from opentelemetry.trace import SpanKind from azure.monitor.opentelemetry.exporter._generated.models import ContextTagKeys @@ -224,3 +225,25 @@ def test_record_span_dep_failure(self, post_state_mock, span_doc_mock, append_do append_doc_mock.assert_called_once_with(span_doc) qpm._dependency_failure_rate_counter.add.assert_called_once_with(1) qpm._dependency_duration.record.assert_called_once_with(5 / 1e9) + + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._live_metrics._append_quickpulse_document") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._live_metrics._get_log_record_document") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._live_metrics._is_post_state") + def test_record_log_exception(self, post_state_mock, log_doc_mock, append_doc_mock): + post_state_mock.return_value = True + log_record_doc = mock.Mock() + log_doc_mock.return_value = log_record_doc + log_data_mock = mock.Mock() + attributes = { + SpanAttributes.EXCEPTION_TYPE: "exc_type", + SpanAttributes.EXCEPTION_MESSAGE: "exc_msg", + } + log_data_mock.log_record.attributes = attributes + qpm = _QuickpulseManager( + connection_string="InstrumentationKey=4321abcd-5678-4efa-8abc-1234567890ac;LiveEndpoint=https://eastus.livediagnostics.monitor.azure.com/", + resource=Resource.create(), + ) + qpm._exception_rate_counter = mock.Mock() + qpm._record_log_record(log_data_mock) + append_doc_mock.assert_called_once_with(log_record_doc) + qpm._exception_rate_counter.add.assert_called_once_with(1) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_utils.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_utils.py new file mode 100644 index 000000000000..e69de29bb2d1 From 81f8aec4328c982d7ba807f71da859b4f0dc9d4f Mon Sep 17 00:00:00 2001 From: Leighton Chen Date: Fri, 8 Mar 2024 13:23:18 -0800 Subject: [PATCH 13/21] convert --- .../tests/quickpulse/test_utils.py | 102 ++++++++++++++++++ 1 file changed, 102 insertions(+) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_utils.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_utils.py index e69de29bb2d1..2eaa7eec120a 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_utils.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_utils.py @@ -0,0 +1,102 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +from datetime import datetime +import unittest +from unittest import mock + +from opentelemetry.sdk.metrics.export import HistogramDataPoint, NumberDataPoint +from azure.monitor.opentelemetry.exporter._quickpulse._constants import _COMMITTED_BYTES_NAME +from azure.monitor.opentelemetry.exporter._quickpulse._generated.models._models import ( + MetricPoint, + MonitoringDataPoint, +) +from azure.monitor.opentelemetry.exporter._quickpulse._utils import ( + _metric_to_quick_pulse_data_points, +) + + +class TestUtils(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.base_mdp = MonitoringDataPoint( + version=1.0, + instance="test_instance", + role_name="test_role_name", + machine_name="test_machine_name", + stream_id="test_stream_id" + ) + + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._utils.datetime") + def test_metric_to_qp_data_point_hist(self, datetime_mock): + point = HistogramDataPoint( + {}, + 0, + 0, + 2, + 10, + [1,1,0], + [0,10,20], + 0, + 5, + ) + metric = mock.Mock() + metric.name = _COMMITTED_BYTES_NAME[0] + metric.data.data_points = [point] + scope_metric = mock.Mock() + scope_metric.metrics = [metric] + resource_metric = mock.Mock() + resource_metric.scope_metrics = [scope_metric] + metric_data = mock.Mock() + metric_data.resource_metrics = [resource_metric] + metric_point = MetricPoint( + name=_COMMITTED_BYTES_NAME[1], + weight=1, + value=5 + ) + documents = [mock.Mock()] + date_now = datetime.now() + datetime_mock.now.return_value = date_now + mdp = _metric_to_quick_pulse_data_points(metric_data, self.base_mdp, documents)[0] + self.assertEqual(mdp.version, self.base_mdp.version) + self.assertEqual(mdp.instance, self.base_mdp.instance) + self.assertEqual(mdp.role_name, self.base_mdp.role_name) + self.assertEqual(mdp.machine_name, self.base_mdp.machine_name) + self.assertEqual(mdp.stream_id, self.base_mdp.stream_id) + self.assertEqual(mdp.timestamp, date_now) + self.assertEqual(mdp.metrics, [metric_point]) + self.assertEqual(mdp.documents, documents) + + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._utils.datetime") + def test_metric_to_qp_data_point_num(self, datetime_mock): + point = NumberDataPoint( + {}, + 0, + 0, + 7, + ) + metric = mock.Mock() + metric.name = _COMMITTED_BYTES_NAME[0] + metric.data.data_points = [point] + scope_metric = mock.Mock() + scope_metric.metrics = [metric] + resource_metric = mock.Mock() + resource_metric.scope_metrics = [scope_metric] + metric_data = mock.Mock() + metric_data.resource_metrics = [resource_metric] + metric_point = MetricPoint( + name=_COMMITTED_BYTES_NAME[1], + weight=1, + value=7 + ) + documents = [mock.Mock()] + date_now = datetime.now() + datetime_mock.now.return_value = date_now + mdp = _metric_to_quick_pulse_data_points(metric_data, self.base_mdp, documents)[0] + self.assertEqual(mdp.version, self.base_mdp.version) + self.assertEqual(mdp.instance, self.base_mdp.instance) + self.assertEqual(mdp.role_name, self.base_mdp.role_name) + self.assertEqual(mdp.machine_name, self.base_mdp.machine_name) + self.assertEqual(mdp.stream_id, self.base_mdp.stream_id) + self.assertEqual(mdp.timestamp, date_now) + self.assertEqual(mdp.metrics, [metric_point]) + self.assertEqual(mdp.documents, documents) \ No newline at end of file From f55641535763a1f9d88db25ef721253d75347afd Mon Sep 17 00:00:00 2001 From: Leighton Chen Date: Fri, 8 Mar 2024 13:38:41 -0800 Subject: [PATCH 14/21] utils --- .../exporter/_quickpulse/_utils.py | 12 +- .../tests/quickpulse/test_utils.py | 106 +++++++++++++++++- 2 files changed, 110 insertions(+), 8 deletions(-) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_utils.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_utils.py index ff8c076c2837..d266ba5f598a 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_utils.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_utils.py @@ -76,7 +76,7 @@ def _get_span_document(span: ReadableSpan) -> Union[RemoteDependencyDocument, Re url = _get_url(span_kind, span.attributes) if span_kind in (SpanKind.CLIENT, SpanKind.PRODUCER, SpanKind.INTERNAL): document = RemoteDependencyDocument( - document_type=_DocumentIngressDocumentType.RemoteDependency, + document_type=_DocumentIngressDocumentType.RemoteDependency.value, name=span.name, command_name=url, result_code=str(status_code), @@ -88,7 +88,7 @@ def _get_span_document(span: ReadableSpan) -> Union[RemoteDependencyDocument, Re else: code = str(grpc_status_code) document = RequestDocument( - document_type=_DocumentIngressDocumentType.Request, + document_type=_DocumentIngressDocumentType.Request.value, name=span.name, url=url, response_code=code, @@ -98,17 +98,17 @@ def _get_span_document(span: ReadableSpan) -> Union[RemoteDependencyDocument, Re # mypy: disable-error-code="assignment" def _get_log_record_document(log_data: LogData) -> Union[ExceptionDocument, TraceDocument]: - exc_type = log_data.log_record.attributes.get(SpanAttributes.EXCEPTION_TYPE, "") - exc_message = log_data.log_record.attributes.get(SpanAttributes.EXCEPTION_MESSAGE, "") + exc_type = log_data.log_record.attributes.get(SpanAttributes.EXCEPTION_TYPE) + exc_message = log_data.log_record.attributes.get(SpanAttributes.EXCEPTION_MESSAGE) if exc_type is not None or exc_message is not None: document = ExceptionDocument( - document_type=_DocumentIngressDocumentType.Exception, + document_type=_DocumentIngressDocumentType.Exception.value, exception_type=str(exc_type), exception_message=str(exc_message), ) else: document = TraceDocument( - document_type=_DocumentIngressDocumentType.Trace, + document_type=_DocumentIngressDocumentType.Trace.value, message=log_data.log_record.body, ) return document diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_utils.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_utils.py index 2eaa7eec120a..ea03e61f5a7d 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_utils.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_utils.py @@ -5,12 +5,24 @@ from unittest import mock from opentelemetry.sdk.metrics.export import HistogramDataPoint, NumberDataPoint -from azure.monitor.opentelemetry.exporter._quickpulse._constants import _COMMITTED_BYTES_NAME +from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.trace import SpanKind + +from azure.monitor.opentelemetry.exporter._quickpulse._constants import ( + _COMMITTED_BYTES_NAME, + _DocumentIngressDocumentType, +) from azure.monitor.opentelemetry.exporter._quickpulse._generated.models._models import ( + Exception, MetricPoint, MonitoringDataPoint, + RemoteDependency, + Request, + Trace, ) from azure.monitor.opentelemetry.exporter._quickpulse._utils import ( + _get_span_document, + _get_log_record_document, _metric_to_quick_pulse_data_points, ) @@ -99,4 +111,94 @@ def test_metric_to_qp_data_point_num(self, datetime_mock): self.assertEqual(mdp.stream_id, self.base_mdp.stream_id) self.assertEqual(mdp.timestamp, date_now) self.assertEqual(mdp.metrics, [metric_point]) - self.assertEqual(mdp.documents, documents) \ No newline at end of file + self.assertEqual(mdp.documents, documents) + + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._utils._ns_to_iso8601_string") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._utils._get_url") + def test_get_span_document_client(self, url_mock, iso_mock): + span_mock = mock.Mock() + span_mock.name = "test_span" + span_mock.end_time = 10 + span_mock.start_time = 4 + span_mock.attributes = { + SpanAttributes.HTTP_STATUS_CODE: "200", + SpanAttributes.RPC_GRPC_STATUS_CODE: "400", + } + span_mock.kind = SpanKind.CLIENT + url_mock.return_value = "test_url" + iso_mock.return_value = "1000" + doc = _get_span_document(span_mock) + self.assertTrue(isinstance(doc, RemoteDependency)) + self.assertEqual(doc.document_type, _DocumentIngressDocumentType.RemoteDependency.value) + self.assertEqual(doc.name, "test_span") + self.assertEqual(doc.command_name, "test_url") + self.assertEqual(doc.result_code, "200") + self.assertEqual(doc.duration, "1000") + + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._utils._ns_to_iso8601_string") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._utils._get_url") + def test_get_span_document_server(self, url_mock, iso_mock): + span_mock = mock.Mock() + span_mock.name = "test_span" + span_mock.end_time = 10 + span_mock.start_time = 4 + span_mock.attributes = { + SpanAttributes.HTTP_STATUS_CODE: "200", + SpanAttributes.RPC_GRPC_STATUS_CODE: "400", + } + span_mock.kind = SpanKind.SERVER + url_mock.return_value = "test_url" + iso_mock.return_value = "1000" + doc = _get_span_document(span_mock) + self.assertTrue(isinstance(doc, Request)) + self.assertEqual(doc.document_type, _DocumentIngressDocumentType.Request.value) + self.assertEqual(doc.name, "test_span") + self.assertEqual(doc.url, "test_url") + self.assertEqual(doc.response_code, "200") + self.assertEqual(doc.duration, "1000") + + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._utils._ns_to_iso8601_string") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._utils._get_url") + def test_get_span_document_server_grpc_status(self, url_mock, iso_mock): + span_mock = mock.Mock() + span_mock.name = "test_span" + span_mock.end_time = 10 + span_mock.start_time = 4 + span_mock.attributes = { + SpanAttributes.RPC_GRPC_STATUS_CODE: "400", + } + span_mock.kind = SpanKind.SERVER + url_mock.return_value = "test_url" + iso_mock.return_value = "1000" + doc = _get_span_document(span_mock) + self.assertTrue(isinstance(doc, Request)) + self.assertEqual(doc.document_type, _DocumentIngressDocumentType.Request.value) + self.assertEqual(doc.name, "test_span") + self.assertEqual(doc.url, "test_url") + self.assertEqual(doc.response_code, "400") + self.assertEqual(doc.duration, "1000") + + def test_get_log_record_document_server_exc(self): + log_record = mock.Mock() + log_record.attributes = { + SpanAttributes.EXCEPTION_TYPE: "exc_type", + SpanAttributes.EXCEPTION_MESSAGE: "exc_message", + } + log_data = mock.Mock() + log_data.log_record = log_record + doc = _get_log_record_document(log_data) + self.assertTrue(isinstance(doc, Exception)) + self.assertEqual(doc.document_type, _DocumentIngressDocumentType.Exception.value) + self.assertEqual(doc.exception_type, "exc_type") + self.assertEqual(doc.exception_message, "exc_message") + + def test_get_log_record_document_server_exc(self): + log_record = mock.Mock() + log_record.attributes = {} + log_record.body = "body" + log_data = mock.Mock() + log_data.log_record = log_record + doc = _get_log_record_document(log_data) + self.assertTrue(isinstance(doc, Trace)) + self.assertEqual(doc.document_type, _DocumentIngressDocumentType.Trace.value) + self.assertEqual(doc.message, "body") From ff42bc0686cf365e7becd6fcfd788ab7a0ef0756 Mon Sep 17 00:00:00 2001 From: Leighton Chen Date: Fri, 8 Mar 2024 14:50:28 -0800 Subject: [PATCH 15/21] lint lint --- .../opentelemetry/exporter/_quickpulse/_live_metrics.py | 1 + .../monitor/opentelemetry/exporter/_quickpulse/_utils.py | 7 ++++--- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py index 9f4895ef7836..32c67f744053 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py @@ -121,6 +121,7 @@ def _record_span(self, span: ReadableSpan) -> None: _append_quickpulse_document(document) duration_ms = 0 if span.end_time and span.start_time: + # mypy: disable-error-code="assignment" duration_ms = (span.end_time - span.start_time) / 1e9 # TODO: Spec out what "success" is success = span.status.is_ok diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_utils.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_utils.py index d266ba5f598a..07bd433acfc0 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_utils.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_utils.py @@ -65,7 +65,7 @@ def _metric_to_quick_pulse_data_points( # pylint: disable=too-many-nested-block ) ] -# mypy: disable-error-code="assignment, union-attr" +# mypy: disable-error-code="assignment,union-attr" def _get_span_document(span: ReadableSpan) -> Union[RemoteDependencyDocument, RequestDocument]: duration = 0 if span.end_time and span.start_time: @@ -114,12 +114,12 @@ def _get_log_record_document(log_data: LogData) -> Union[ExceptionDocument, Trac return document +# mypy: disable-error-code="assignment" +# pylint: disable=no-else-return def _get_url(span_kind: SpanKind, attributes: Attributes) -> str: if not attributes: return "" http_method = attributes.get(SpanAttributes.HTTP_METHOD) - if not http_method: - return "" if http_method: http_scheme = attributes.get(SpanAttributes.HTTP_SCHEME) # Client @@ -144,6 +144,7 @@ def _get_url(span_kind: SpanKind, attributes: Attributes) -> str: http_host = attributes.get(SpanAttributes.HTTP_HOST) if http_host: return f"{http_scheme}://{http_host}:{port}{http_target}" + return "" def _ns_to_iso8601_string(nanoseconds: int) -> str: From 05557863f726bd1991078be97fdb33f4e7e6dcf2 Mon Sep 17 00:00:00 2001 From: Leighton Chen Date: Mon, 11 Mar 2024 08:31:46 -0700 Subject: [PATCH 16/21] lint --- .../opentelemetry/exporter/_quickpulse/_live_metrics.py | 3 +-- .../opentelemetry/exporter/_quickpulse/_processor.py | 2 ++ .../monitor/opentelemetry/exporter/_quickpulse/_utils.py | 8 ++++---- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py index 32c67f744053..c84ae7b2a906 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py @@ -121,8 +121,7 @@ def _record_span(self, span: ReadableSpan) -> None: _append_quickpulse_document(document) duration_ms = 0 if span.end_time and span.start_time: - # mypy: disable-error-code="assignment" - duration_ms = (span.end_time - span.start_time) / 1e9 + duration_ms = (span.end_time - span.start_time) / 1e9 # type: ignore # TODO: Spec out what "success" is success = span.status.is_ok diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_processor.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_processor.py index 5ad17896a7f5..2e18253976fb 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_processor.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_processor.py @@ -7,6 +7,7 @@ from azure.monitor.opentelemetry.exporter._quickpulse._live_metrics import _QuickpulseManager +# pylint: disable=protected-access class _QuickpulseLogRecordProcessor(LogRecordProcessor): def emit(self, log_data: LogData) -> None: @@ -22,6 +23,7 @@ def force_flush(self, timeout_millis: int = 30000): super().force_flush(timeout_millis=timeout_millis) +# pylint: disable=protected-access class _QuickpulseSpanProcessor(SpanProcessor): def on_end(self, span: ReadableSpan) -> None: diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_utils.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_utils.py index 07bd433acfc0..023e7293d6a7 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_utils.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_utils.py @@ -70,8 +70,8 @@ def _get_span_document(span: ReadableSpan) -> Union[RemoteDependencyDocument, Re duration = 0 if span.end_time and span.start_time: duration = span.end_time - span.start_time - status_code = span.attributes.get(SpanAttributes.HTTP_STATUS_CODE, "") - grpc_status_code = span.attributes.get(SpanAttributes.RPC_GRPC_STATUS_CODE, "") + status_code = span.attributes.get(SpanAttributes.HTTP_STATUS_CODE, "") # type: ignore + grpc_status_code = span.attributes.get(SpanAttributes.RPC_GRPC_STATUS_CODE, "") # type: ignore span_kind = span.kind url = _get_url(span_kind, span.attributes) if span_kind in (SpanKind.CLIENT, SpanKind.PRODUCER, SpanKind.INTERNAL): @@ -98,8 +98,8 @@ def _get_span_document(span: ReadableSpan) -> Union[RemoteDependencyDocument, Re # mypy: disable-error-code="assignment" def _get_log_record_document(log_data: LogData) -> Union[ExceptionDocument, TraceDocument]: - exc_type = log_data.log_record.attributes.get(SpanAttributes.EXCEPTION_TYPE) - exc_message = log_data.log_record.attributes.get(SpanAttributes.EXCEPTION_MESSAGE) + exc_type = log_data.log_record.attributes.get(SpanAttributes.EXCEPTION_TYPE) # type: ignore + exc_message = log_data.log_record.attributes.get(SpanAttributes.EXCEPTION_MESSAGE) # type: ignore if exc_type is not None or exc_message is not None: document = ExceptionDocument( document_type=_DocumentIngressDocumentType.Exception.value, From b1f5fbe0715cdfdd5231d07edd66d645b1a2e145 Mon Sep 17 00:00:00 2001 From: Leighton Chen Date: Mon, 11 Mar 2024 10:13:35 -0700 Subject: [PATCH 17/21] lint --- .../azure/monitor/opentelemetry/exporter/_quickpulse/_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_utils.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_utils.py index 023e7293d6a7..204b47d2ed01 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_utils.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_utils.py @@ -127,7 +127,7 @@ def _get_url(span_kind: SpanKind, attributes: Attributes) -> str: http_url = attributes.get(SpanAttributes.HTTP_URL) if http_url: return str(http_url) - + host = attributes.get(SpanAttributes.NET_PEER_NAME) port = attributes.get(SpanAttributes.NET_PEER_PORT, "") ip = attributes.get(SpanAttributes.NET_PEER_IP) From b000b22569ea45da2d7fee0c5004352e1273c7b0 Mon Sep 17 00:00:00 2001 From: Leighton Chen Date: Tue, 12 Mar 2024 12:00:12 -0700 Subject: [PATCH 18/21] cpu --- .../CHANGELOG.md | 2 ++ .../exporter/_quickpulse/_live_metrics.py | 32 ++++++++++++++++++- .../setup.py | 1 + 3 files changed, 34 insertions(+), 1 deletion(-) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md b/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md index d589309891ac..7ea20381a93a 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md @@ -6,6 +6,8 @@ - Add live metrics collection of requests/dependencies/exceptions ([#34673](https://github.com/Azure/azure-sdk-for-python/pull/34673)) +- Add live metrics collection of cpu time/process memory + ([#34673](https://github.com/Azure/azure-sdk-for-python/pull/34673)) ### Breaking Changes diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py index c84ae7b2a906..efe4c46890ad 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py @@ -1,8 +1,10 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. import platform -from typing import Any, Optional +import psutil +from typing import Any, Iterable, Optional +from opentelemetry.metrics import CallbackOptions, Observation from opentelemetry.sdk._logs import LogData from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.resources import Resource @@ -13,10 +15,12 @@ from azure.monitor.opentelemetry.exporter._generated.models import ContextTagKeys from azure.monitor.opentelemetry.exporter._quickpulse._constants import ( + _COMMITTED_BYTES_NAME, _DEPENDENCY_DURATION_NAME, _DEPENDENCY_FAILURE_RATE_NAME, _DEPENDENCY_RATE_NAME, _EXCEPTION_RATE_NAME, + _PROCESSOR_TIME_NAME, _REQUEST_DURATION_NAME, _REQUEST_FAILURE_RATE_NAME, _REQUEST_RATE_NAME, @@ -43,6 +47,8 @@ ) +PROCESS = psutil.Process() + def enable_live_metrics(**kwargs: Any) -> None: """Live metrics entry point. @@ -113,6 +119,14 @@ def __init__(self, connection_string: Optional[str], resource: Optional[Resource "exc/sec", "live metrics exception rate per second" ) + self._process_memory_gauge = self._meter.create_observable_gauge( + _COMMITTED_BYTES_NAME[0], + [_get_process_memory], + ) + self._processor_time_gauge = self._meter.create_observable_gauge( + _PROCESSOR_TIME_NAME[0], + [_get_processor_time], + ) def _record_span(self, span: ReadableSpan) -> None: # Only record if in post state @@ -150,3 +164,19 @@ def _record_log_record(self, log_data: LogData) -> None: exc_message = log_record.attributes.get(SpanAttributes.EXCEPTION_MESSAGE) if exc_type is not None or exc_message is not None: self._exception_rate_counter.add(1) + + +def _get_process_memory(options: CallbackOptions) -> Iterable[Observation]: + # rss is non-swapped physical memory a process has used + yield Observation( + PROCESS.memory_info().rss, + {}, + ) + + +def _get_processor_time(options: CallbackOptions) -> Iterable[Observation]: + # Processor time does not include idle time + yield Observation( + 100 - psutil.cpu_times_percent().idle, + {}, + ) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/setup.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/setup.py index 56f8392cef1c..69f76632e7bd 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/setup.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/setup.py @@ -86,6 +86,7 @@ "msrest>=0.6.10", "opentelemetry-api~=1.21", "opentelemetry-sdk~=1.21", + "psutil>=5.9.8", ], entry_points={ "opentelemetry_traces_exporter": [ From c540a39f3f32edd6f2bd65b3d2f73c381e2aa69d Mon Sep 17 00:00:00 2001 From: Leighton Chen Date: Tue, 12 Mar 2024 12:19:55 -0700 Subject: [PATCH 19/21] tests --- .../tests/quickpulse/test_live_metrics.py | 62 ++++++++++++++++++- 1 file changed, 61 insertions(+), 1 deletion(-) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_live_metrics.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_live_metrics.py index f1699bd6beb7..60e03d03fe69 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_live_metrics.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_live_metrics.py @@ -1,22 +1,42 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. +import collections import platform import unittest from unittest import mock -from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics import ( + Counter, + Histogram, + Meter, + MeterProvider, + ObservableGauge, +) from opentelemetry.sdk.resources import Resource, ResourceAttributes from opentelemetry.semconv.trace import SpanAttributes from opentelemetry.trace import SpanKind from azure.monitor.opentelemetry.exporter._generated.models import ContextTagKeys +from azure.monitor.opentelemetry.exporter._quickpulse._constants import ( + _COMMITTED_BYTES_NAME, + _DEPENDENCY_DURATION_NAME, + _DEPENDENCY_FAILURE_RATE_NAME, + _DEPENDENCY_RATE_NAME, + _EXCEPTION_RATE_NAME, + _PROCESSOR_TIME_NAME, + _REQUEST_DURATION_NAME, + _REQUEST_FAILURE_RATE_NAME, + _REQUEST_RATE_NAME, +) from azure.monitor.opentelemetry.exporter._quickpulse._exporter import ( _QuickpulseExporter, _QuickpulseMetricReader, ) from azure.monitor.opentelemetry.exporter._quickpulse._live_metrics import ( enable_live_metrics, + _get_process_memory, + _get_processor_time, _QuickpulseManager, ) from azure.monitor.opentelemetry.exporter._quickpulse._state import ( @@ -92,6 +112,28 @@ def test_init(self, generator_mock): self.assertEqual(qpm._reader._base_monitoring_data_point, qpm._base_monitoring_data_point) self.assertTrue(isinstance(qpm._meter_provider, MeterProvider)) self.assertEqual(qpm._meter_provider._sdk_config.metric_readers, [qpm._reader]) + self.assertTrue(isinstance(qpm._meter, Meter)) + self.assertEqual(qpm._meter.name, "azure_monitor_live_metrics") + self.assertTrue(isinstance(qpm._request_duration, Histogram)) + self.assertEqual(qpm._request_duration.name, _REQUEST_DURATION_NAME[0]) + self.assertTrue(isinstance(qpm._dependency_duration, Histogram)) + self.assertEqual(qpm._dependency_duration.name, _DEPENDENCY_DURATION_NAME[0]) + self.assertTrue(isinstance(qpm._request_rate_counter, Counter)) + self.assertEqual(qpm._request_rate_counter.name, _REQUEST_RATE_NAME[0]) + self.assertTrue(isinstance(qpm._request_failed_rate_counter, Counter)) + self.assertEqual(qpm._request_failed_rate_counter.name, _REQUEST_FAILURE_RATE_NAME[0]) + self.assertTrue(isinstance(qpm._dependency_rate_counter, Counter)) + self.assertEqual(qpm._dependency_rate_counter.name, _DEPENDENCY_RATE_NAME[0]) + self.assertTrue(isinstance(qpm._dependency_failure_rate_counter, Counter)) + self.assertEqual(qpm._dependency_failure_rate_counter.name, _DEPENDENCY_FAILURE_RATE_NAME[0]) + self.assertTrue(isinstance(qpm._exception_rate_counter, Counter)) + self.assertEqual(qpm._exception_rate_counter.name, _EXCEPTION_RATE_NAME[0]) + self.assertTrue(isinstance(qpm._process_memory_gauge, ObservableGauge)) + self.assertEqual(qpm._process_memory_gauge.name, _COMMITTED_BYTES_NAME[0]) + self.assertEqual(qpm._process_memory_gauge._callbacks, [_get_process_memory]) + self.assertTrue(isinstance(qpm._processor_time_gauge, ObservableGauge)) + self.assertEqual(qpm._processor_time_gauge.name, _PROCESSOR_TIME_NAME[0]) + self.assertEqual(qpm._processor_time_gauge._callbacks, [_get_processor_time]) def test_singleton(self): @@ -247,3 +289,21 @@ def test_record_log_exception(self, post_state_mock, log_doc_mock, append_doc_mo qpm._record_log_record(log_data_mock) append_doc_mock.assert_called_once_with(log_record_doc) qpm._exception_rate_counter.add.assert_called_once_with(1) + + def test_process_memory(self): + with mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._live_metrics.PROCESS") as process_mock: + memory = collections.namedtuple('memory', 'rss') + pmem = memory(rss=40) + process_mock.memory_info.return_value = pmem + mem = _get_process_memory(None) + obs = next(mem) + self.assertEqual(obs.value, 40) + + @mock.patch("psutil.cpu_times_percent") + def test_processor_time(self, processor_mock): + cpu = collections.namedtuple('cpu', 'idle') + cpu_times = cpu(idle=94.5) + processor_mock.return_value = cpu_times + time = _get_processor_time(None) + obs = next(time) + self.assertEqual(obs.value, 5.5) From 6839c85683cf7cd0e0d9795626e27a4ed118b4c3 Mon Sep 17 00:00:00 2001 From: Leighton Chen Date: Tue, 12 Mar 2024 14:41:08 -0700 Subject: [PATCH 20/21] lint --- .../azure-monitor-opentelemetry-exporter/CHANGELOG.md | 2 +- .../opentelemetry/exporter/_quickpulse/_live_metrics.py | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md b/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md index 7ea20381a93a..56200abdfc9d 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md @@ -7,7 +7,7 @@ - Add live metrics collection of requests/dependencies/exceptions ([#34673](https://github.com/Azure/azure-sdk-for-python/pull/34673)) - Add live metrics collection of cpu time/process memory - ([#34673](https://github.com/Azure/azure-sdk-for-python/pull/34673)) + ([#34735](https://github.com/Azure/azure-sdk-for-python/pull/34735)) ### Breaking Changes diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py index efe4c46890ad..75d1ce3adf2a 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py @@ -1,8 +1,9 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. +from typing import Any, Iterable, Optional + import platform import psutil -from typing import Any, Iterable, Optional from opentelemetry.metrics import CallbackOptions, Observation from opentelemetry.sdk._logs import LogData @@ -166,6 +167,7 @@ def _record_log_record(self, log_data: LogData) -> None: self._exception_rate_counter.add(1) +# pylint: disable=unused-argument def _get_process_memory(options: CallbackOptions) -> Iterable[Observation]: # rss is non-swapped physical memory a process has used yield Observation( @@ -174,6 +176,7 @@ def _get_process_memory(options: CallbackOptions) -> Iterable[Observation]: ) +# pylint: disable=unused-argument def _get_processor_time(options: CallbackOptions) -> Iterable[Observation]: # Processor time does not include idle time yield Observation( From df043a394e747828522944baac070e3cf7a4b3d8 Mon Sep 17 00:00:00 2001 From: Leighton Chen Date: Tue, 12 Mar 2024 17:30:56 -0700 Subject: [PATCH 21/21] spelling --- .../monitor/opentelemetry/exporter/_quickpulse/_constants.py | 2 +- .../opentelemetry/exporter/_quickpulse/_live_metrics.py | 4 ++++ sdk/monitor/azure-monitor-opentelemetry-exporter/setup.py | 3 +++ .../tests/quickpulse/test_live_metrics.py | 4 ++++ shared_requirements.txt | 1 + 5 files changed, 13 insertions(+), 1 deletion(-) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_constants.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_constants.py index 94155b73f32c..b34228ce923d 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_constants.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_constants.py @@ -49,4 +49,4 @@ class _DocumentIngressDocumentType(Enum): Event = "Event" Trace = "Trace" -# cSpell:disable +# cSpell:enable diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py index 75d1ce3adf2a..feb6922f4428 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py @@ -1,5 +1,7 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. +# cSpell:disable + from typing import Any, Iterable, Optional import platform @@ -183,3 +185,5 @@ def _get_processor_time(options: CallbackOptions) -> Iterable[Observation]: 100 - psutil.cpu_times_percent().idle, {}, ) + +# cSpell:enable diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/setup.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/setup.py index 69f76632e7bd..7668667ac21c 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/setup.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/setup.py @@ -6,6 +6,7 @@ # license information. # -------------------------------------------------------------------------- +# cSpell:disable import os import re @@ -103,3 +104,5 @@ ] } ) + +# cSpell:enable diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_live_metrics.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_live_metrics.py index 60e03d03fe69..464d7162ee07 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_live_metrics.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_live_metrics.py @@ -1,6 +1,8 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. +# cSpell:disable + import collections import platform import unittest @@ -307,3 +309,5 @@ def test_processor_time(self, processor_mock): time = _get_processor_time(None) obs = next(time) self.assertEqual(obs.value, 5.5) + +# cSpell:enable diff --git a/shared_requirements.txt b/shared_requirements.txt index 564822a591a2..0cfd9cefeea1 100644 --- a/shared_requirements.txt +++ b/shared_requirements.txt @@ -17,6 +17,7 @@ azureml-telemetry cryptography msrestazure requests +psutil opencensus opencensus-ext-azure opencensus-ext-threading