Skip to content

Commit

Permalink
Extend PyFunc API to allow adding custom metrics to exporter (#574)
Browse files Browse the repository at this point in the history
* #567 extend PyFunc with custom metrics

* #567 add an example of PyFunc with custom metrics

* #567 add docs for PyFunc metrics

* #567 move MetricsRegistry to a separate module
  • Loading branch information
tomskikh authored Nov 24, 2023
1 parent 9faf45c commit ddd3dda
Show file tree
Hide file tree
Showing 15 changed files with 416 additions and 83 deletions.
1 change: 1 addition & 0 deletions docs/source/reference/api/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ API Reference
param_storage
utils
libs
metrics
client
12 changes: 12 additions & 0 deletions docs/source/reference/api/metrics.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
Metrics
=======

.. currentmodule:: savant.metrics

.. autosummary::
:toctree: generated
:nosignatures:
:template: autosummary/class.rst

Counter
Gauge
12 changes: 12 additions & 0 deletions gst_plugins/python/pyfunc.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
gst_post_stream_failed_error,
gst_post_stream_failed_warning,
)
from savant.metrics.base import BaseMetricsExporter
from savant.utils.logging import LoggerMixin

# RGBA format is required to access the frame (pyds.get_nvds_buf_surface)
Expand Down Expand Up @@ -75,6 +76,12 @@ class GstPluginPyFunc(LoggerMixin, GstBase.BaseTransform):
'VideoPipeline object from savant-rs.',
GObject.ParamFlags.READWRITE,
),
'metrics-exporter': (
object,
'Metrics exporter.',
'Metrics exporter.',
GObject.ParamFlags.READWRITE,
),
'stream-pool-size': (
int,
'Max stream pool size',
Expand Down Expand Up @@ -103,6 +110,7 @@ def __init__(self):
self.class_name: Optional[str] = None
self.kwargs: Optional[str] = None
self.video_pipeline: Optional[VideoPipeline] = None
self.metrics_exporter: Optional[BaseMetricsExporter] = None
self.dev_mode: bool = False
self.max_stream_pool_size: int = 1
# pyfunc object
Expand All @@ -121,6 +129,8 @@ def do_get_property(self, prop: GObject.GParamSpec) -> Any:
return self.kwargs
if prop.name == 'pipeline':
return self.video_pipeline
if prop.name == 'metrics-exporter':
return self.metrics_exporter
if prop.name == 'stream-pool-size':
return self.max_stream_pool_size
if prop.name == 'dev-mode':
Expand All @@ -141,6 +151,8 @@ def do_set_property(self, prop: GObject.GParamSpec, value: Any):
self.kwargs = value
elif prop.name == 'pipeline':
self.video_pipeline = value
elif prop.name == 'metrics-exporter':
self.metrics_exporter = value
elif prop.name == 'stream-pool-size':
self.max_stream_pool_size = value
elif prop.name == 'dev-mode':
Expand Down
2 changes: 1 addition & 1 deletion requirements/savant-rs.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
savant-rs==0.1.83
savant-rs==0.1.84
3 changes: 3 additions & 0 deletions samples/pass_through_processing/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ docker compose -f samples/pass_through_processing/docker-compose.l4t.yml up
# for pre-configured Grafana dashboard visit
# http://127.0.0.1:3000/d/a4c1f484-75c9-4375-a04d-ab5d50578239/module-performance-metrics?orgId=1&refresh=5s

# for the tracker metrics visit
# http://127.0.0.1:8000/metrics

# Ctrl+C to stop running the compose bundle
```

Expand Down
2 changes: 2 additions & 0 deletions samples/pass_through_processing/docker-compose.x86.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ services:
module-tracker:
image: ghcr.io/insight-platform/savant-deepstream:latest
restart: unless-stopped
ports:
- "8000:8000"
volumes:
- zmq_sockets:/tmp/zmq-sockets
- ../../models/peoplenet_detector:/models
Expand Down
6 changes: 6 additions & 0 deletions samples/pass_through_processing/module.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,11 @@ pipeline:
tracker-width: 960 # 640 # must be a multiple of 32
tracker-height: 544 # 384
display-tracking-id: 0
# pyfunc metrics example
- element: pyfunc
# specify the pyfunc's python module
module: samples.pass_through_processing.py_func_metrics_example
# specify the pyfunc's python class from the module
class_name: PyFuncMetricsExample

# sink definition is skipped, zeromq sink is used by default to connect with sink adapters
65 changes: 65 additions & 0 deletions samples/pass_through_processing/py_func_metrics_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
"""Example of how to use metrics in PyFunc."""
from savant.deepstream.meta.frame import NvDsFrameMeta
from savant.deepstream.pyfunc import NvDsPyFuncPlugin
from savant.gstreamer import Gst
from savant.metrics import Counter, Gauge


class PyFuncMetricsExample(NvDsPyFuncPlugin):
"""Example of how to use metrics in PyFunc.
Metrics values example:
.. code-block:: text
# HELP frames_per_source_total Number of processed frames per source
# TYPE frames_per_source_total counter
frames_per_source_total{module_stage="tracker",source_id="city-traffic"} 748.0 1700803467794
# HELP total_queue_length The total queue length for the pipeline
# TYPE total_queue_length gauge
total_queue_length{module_stage="tracker",source_id="city-traffic"} 36.0 1700803467794
Note: the "module_stage" label is configured in docker-compose file and added to all metrics.
"""

# Called when the new source is added
def on_source_add(self, source_id: str):
# Check if the metric is not registered yet
if 'frames_per_source' not in self.metrics:
# Register the counter metric
self.metrics['frames_per_source'] = Counter(
name='frames_per_source',
description='Number of processed frames per source',
# Labels are optional, by default there are no labels
labelnames=('source_id',),
)
self.logger.info('Registered metric: %s', 'frames_per_source')
if 'total_queue_length' not in self.metrics:
# Register the gauge metric
self.metrics['total_queue_length'] = Gauge(
name='total_queue_length',
description='The total queue length for the pipeline',
# There are no labels for this metric
)
self.logger.info('Registered metric: %s', 'total_queue_length')

def process_frame(self, buffer: Gst.Buffer, frame_meta: NvDsFrameMeta):
# Count the frame for this source
self.metrics['frames_per_source'].inc(
# 1, # Default increment value
# Labels should be a tuple and must match the labelnames
labels=(frame_meta.source_id,),
)
try:
last_runtime_metric = self.get_runtime_metrics(1)[0]
queue_length = sum(
stage.queue_length for stage in last_runtime_metric.stage_stats
)
except IndexError:
queue_length = 0

# Set the total queue length for this source
self.metrics['total_queue_length'].set(
queue_length, # The new gauge value
# There are no labels for this metric
)
2 changes: 2 additions & 0 deletions savant/deepstream/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ def add_element(
if isinstance(element, PyFuncElement):
gst_element.set_property('pipeline', self._video_pipeline)
gst_element.set_property('stream-pool-size', self._batch_size)
if self._metrics_exporter is not None:
gst_element.set_property('metrics-exporter', self._metrics_exporter)
# TODO: add stage names to element config?
if isinstance(element_idx, int):
stage = self._element_stages[element_idx]
Expand Down
26 changes: 26 additions & 0 deletions savant/deepstream/pyfunc.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
nvds_frame_meta_iterator,
)
from savant.gstreamer import Gst # noqa: F401
from savant.metrics.base import BaseMetricsExporter
from savant.metrics.registry import MetricsRegistry
from savant.utils.source_info import SourceInfoRegistry


Expand All @@ -35,6 +37,8 @@ def __init__(self, **kwargs):
super().__init__(**kwargs)
self._sources = SourceInfoRegistry()
self._video_pipeline: Optional[VideoPipeline] = None
self._metrics_exporter: Optional[BaseMetricsExporter] = None
self._metrics_registry: Optional[MetricsRegistry] = None
self._last_nvevent_seqnum: Dict[int, Dict[int, int]] = {
event_type: {}
for event_type in [
Expand All @@ -49,6 +53,8 @@ def __init__(self, **kwargs):
def on_start(self) -> bool:
"""Do on plugin start."""
self._video_pipeline = self.gst_element.get_property('pipeline')
self._metrics_exporter = self.gst_element.get_property('metrics-exporter')
self._metrics_registry = MetricsRegistry(self._metrics_exporter)
# the prop is set to pipeline batch size during init
self._stream_pool_size = self.gst_element.get_property('stream-pool-size')
return True
Expand Down Expand Up @@ -183,3 +189,23 @@ def get_runtime_metrics(self, n: int):
"""Get last runtime metrics."""

return self._video_pipeline.get_stat_records(n)

@property
def metrics(self) -> MetricsRegistry:
"""Get metrics registry.
Usage example:
.. code-block:: python
from savant.metrics import Counter
self.metrics['frames_per_source'] = Counter(
name='frames_per_source',
description='Number of processed frames per source',
labelnames=('source_id',),
)
...
self.metrics['frames_per_source'].inc(labels=('camera-1',))
"""

return self._metrics_registry
8 changes: 8 additions & 0 deletions savant/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from savant.config.schema import MetricsParameters
from savant.metrics.base import BaseMetricsExporter
from savant.metrics.metric import Counter, Gauge
from savant.metrics.prometheus import PrometheusMetricsExporter


Expand All @@ -20,3 +21,10 @@ def build_metrics_exporter(
return PrometheusMetricsExporter(pipeline, params.provider_params)

raise ValueError(f'Unknown metrics provider: {params.provider}')


__all__ = [
'Counter',
'Gauge',
'build_metrics_exporter',
]
7 changes: 7 additions & 0 deletions savant/metrics/base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from abc import ABC, abstractmethod

from savant.metrics.metric import Metric


class BaseMetricsExporter(ABC):
"""Base class for metrics exporters."""
Expand All @@ -11,3 +13,8 @@ def start(self):
@abstractmethod
def stop(self):
"""Stop metrics exporter."""

@abstractmethod
def register_metric(self, metric: Metric):
"""Register metric."""
pass
134 changes: 134 additions & 0 deletions savant/metrics/metric.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import time
from typing import Dict, Optional, Tuple


class Metric:
"""Base class for metrics.
:param name: Metric name.
:param description: Metric description.
:param labelnames: Metric label names.
"""

def __init__(
self,
name: str,
description: str = '',
labelnames: Tuple[str, ...] = (),
):
self._name = name
self._description = description or name
self._labelnames = labelnames
self._values: Dict[Tuple[str, ...], Tuple[float, float]] = {}

@property
def name(self) -> str:
"""Metric name."""
return self._name

@property
def description(self) -> str:
"""Metric description."""
return self._description

@property
def labelnames(self) -> Tuple[str, ...]:
"""Metric label names."""
return self._labelnames

@property
def values(self) -> Dict[Tuple[str, ...], Tuple[float, float]]:
"""Metric values.
:return: Dictionary: labels -> (value, timestamp).
"""
return self._values


class Counter(Metric):
"""Counter metric.
Usage example:
.. code-block:: python
counter = Counter(
name='frames_per_source',
description='Number of processed frames per source',
labelnames=('source_id',),
)
counter.inc(labels=('camera-1',))
"""

def inc(
self,
amount=1,
labels: Tuple[str, ...] = (),
timestamp: Optional[float] = None,
):
"""Increment counter by amount.
:param amount: Increment amount.
:param labels: Labels values.
:param timestamp: Metric timestamp.
"""

assert len(labels) == len(self._labelnames), 'Labels must match label names'
assert amount > 0, 'Counter increment amount must be positive'
last_value = self._values.get(labels, (0, 0))[0]
if timestamp is None:
timestamp = time.time()
self._values[labels] = last_value + amount, timestamp

def set(
self,
value,
labels: Tuple[str, ...] = (),
timestamp: Optional[float] = None,
):
"""Set counter to specific value.
:param value: Counter value. Must be non-decreasing.
:param labels: Labels values.
:param timestamp: Metric timestamp.
"""

assert len(labels) == len(self._labelnames), 'Labels must match label names'
last_value = self._values.get(labels, (0, 0))[0]
assert value >= last_value, 'Counter value must be non-decreasing'
if timestamp is None:
timestamp = time.time()
self._values[labels] = value, timestamp


class Gauge(Metric):
"""Gauge metric.
Usage example:
.. code-block:: python
gauge = Gauge(
name='total_queue_length',
description='The total queue length for the pipeline',
)
gauge.set(123)
"""

def set(
self,
value,
labels: Tuple[str, ...] = (),
timestamp: Optional[float] = None,
):
"""Set gauge to specific value.
:param value: Gauge value.
:param labels: Labels values.
:param timestamp: Metric timestamp.
"""

assert len(labels) == len(self._labelnames), 'Labels must match label names'
if timestamp is None:
timestamp = time.time()
self._values[labels] = value, timestamp
Loading

0 comments on commit ddd3dda

Please sign in to comment.