From c08728cb642c3d89bd66bbf5f4b79e4d18775d51 Mon Sep 17 00:00:00 2001 From: Palash Nigam Date: Wed, 22 Jun 2022 22:12:46 +0530 Subject: [PATCH] Add Summary metric type --- Cargo.toml | 1 + src/encoding/text.rs | 2 + src/metrics.rs | 3 +- src/metrics/counter.rs | 2 +- src/metrics/summary.rs | 117 +++++++++++++++++++++++++++++++++++++++++ 5 files changed, 123 insertions(+), 2 deletions(-) create mode 100644 src/metrics/summary.rs diff --git a/Cargo.toml b/Cargo.toml index 6808a572..080713b6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ dtoa = "1.0" itoa = "1.0" owning_ref = "0.4" prometheus-client-derive-text-encode = { version = "0.3.0", path = "derive-text-encode" } +quantiles = "0.7.1" [dev-dependencies] async-std = { version = "1", features = ["attributes"] } diff --git a/src/encoding/text.rs b/src/encoding/text.rs index 742032d6..e033e76a 100644 --- a/src/encoding/text.rs +++ b/src/encoding/text.rs @@ -184,6 +184,7 @@ impl Encode for MetricType { MetricType::Histogram => "histogram", MetricType::Info => "info", MetricType::Unknown => "unknown", + MetricType::Summary => "summary", }; writer.write_all(t.as_bytes())?; @@ -603,6 +604,7 @@ where } } + #[cfg(test)] mod tests { use super::*; diff --git a/src/metrics.rs b/src/metrics.rs index 5a21654c..d64d181a 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -5,6 +5,7 @@ pub mod exemplar; pub mod family; pub mod gauge; pub mod histogram; +pub mod summary; pub mod info; /// A metric that is aware of its Open Metrics metric type. @@ -19,9 +20,9 @@ pub enum MetricType { Histogram, Info, Unknown, + Summary, // Not (yet) supported metric types. // // GaugeHistogram, // StateSet, - // Summary } diff --git a/src/metrics/counter.rs b/src/metrics/counter.rs index c8ad9461..450f36e1 100644 --- a/src/metrics/counter.rs +++ b/src/metrics/counter.rs @@ -185,7 +185,7 @@ mod tests { // Map infinite, subnormal and NaN to 0.0. .map(|f| if f.is_normal() { f } else { 0.0 }) .collect(); - let sum = fs.iter().sum(); + let sum: f64 = fs.iter().sum(); let counter = Counter::::default(); for f in fs { counter.inc_by(f); diff --git a/src/metrics/summary.rs b/src/metrics/summary.rs new file mode 100644 index 00000000..2507dc83 --- /dev/null +++ b/src/metrics/summary.rs @@ -0,0 +1,117 @@ +//! Module implementing an Open Metrics histogram. +//! +//! See [`Summary`] for details. + +use super::{MetricType, TypedMetric}; +//use owning_ref::OwningRef; +//use std::iter::{self, once}; +use std::sync::{Arc, Mutex}; + +use quantiles::ckms::CKMS; + +/// Open Metrics [`Summary`] to measure distributions of discrete events. +pub struct Summary { + target_quantile: Vec, + target_error: f64, + max_age_buckets: u64, + max_age_seconds: u64, + inner: Arc>, +} + +impl Clone for Summary { + fn clone(&self) -> Self { + Summary { + target_quantile: self.target_quantile.clone(), + target_error: self.target_error, + max_age_buckets: self.max_age_buckets, + max_age_seconds: self.max_age_seconds, + inner: self.inner.clone(), + } + } +} + +pub(crate) struct InnerSummary { + sum: f64, + count: u64, + quantile_streams: Vec>, + // head_stream is like a cursor which carries the index + // of the stream in the quantile_streams that we want to query + head_stream: u64, +} + +impl Summary { + pub fn new(max_age_buckets: u64, max_age_seconds: u64, target_quantile: Vec, target_error: f64) -> Self { + let mut streams: Vec> = Vec::new(); + for _ in 0..max_age_buckets { + streams.push(CKMS::new(target_error)); + } + + Summary{ + max_age_buckets, + max_age_seconds, + target_quantile, + target_error, + inner: Arc::new(Mutex::new(InnerSummary { + sum: Default::default(), + count: Default::default(), + quantile_streams: streams, + head_stream: 0, + })) + } + } + + pub fn observe(&mut self, v: f64) { + let mut inner = self.inner.lock().unwrap(); + inner.sum += v; + inner.count += 1; + + // insert quantiles into all streams/buckets. + for stream in inner.quantile_streams.iter_mut() { + stream.insert(v); + } + } + + pub fn get(&self) -> (f64, u64, Vec<(f64, f64)>) { + let inner = self.inner.lock().unwrap(); + let sum = inner.sum; + let count = inner.count; + let head = inner.head_stream; + let mut quantile_values: Vec<(f64, f64)> = Vec::new(); + + // TODO: add stream rotation + for q in self.target_quantile.iter() { + match inner.quantile_streams[head as usize].query(*q) { + Some((_, v)) => quantile_values.push((*q, v)), + None => continue, // TODO fix this + }; + } + (sum, count, quantile_values) + } +} + +// TODO: should this type impl Default like Counter? + +impl TypedMetric for Summary { + const TYPE: MetricType = MetricType::Summary; +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn basic() { + let mut summary = Summary::new(5, 10, vec![0.5, 0.9, 0.99], 0.01); + summary.observe(5.0); + summary.observe(15.0); + summary.observe(25.0); + + let (s, c, q) = summary.get(); + assert_eq!(45.0, s); + assert_eq!(3, c); + + for elem in q.iter() { + println!("Vec<{}, {}>", elem.0, elem.1); + } + } +} \ No newline at end of file