Skip to content

Commit

Permalink
Aj/honor metric timestamps (#904)
Browse files Browse the repository at this point in the history
* feat: set timestamp when metric is parsed

* feat: Ten second rolling buckets

* debuggin

* remove debug line

* fix: remove time from aggr

* feat: Clean up parse code, refactor floored buckets to metrics

* feat: comment docs

* feat: Cleanup impl

* fix: call method
  • Loading branch information
astuyve authored Feb 28, 2025
1 parent eae5885 commit 89754a9
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 64 deletions.
101 changes: 55 additions & 46 deletions dogstatsd/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use crate::constants;
use crate::datadog::{self, Metric as MetricToShip, Series};
use crate::errors;
use crate::metric::{self, Metric, MetricValue, SortedTags};
use std::time;

use datadog_protos::metrics::{Dogsketch, Sketch, SketchPayload};
use ddsketch_agent::DDSketch;
Expand Down Expand Up @@ -91,13 +90,14 @@ impl Aggregator {
/// Function will return overflow error if more than
/// `min(constants::MAX_CONTEXTS, CONTEXTS)` is exceeded.
pub fn insert(&mut self, metric: Metric) -> Result<(), errors::Insert> {
let id = metric::id(metric.name, &metric.tags);
let id = metric::id(metric.name, &metric.tags, metric.timestamp);
let len = self.map.len();

match self
.map
.entry(id, |m| m.id == id, |m| metric::id(m.name, &m.tags))
{
match self.map.entry(
id,
|m| m.id == id,
|m| metric::id(m.name, &m.tags, m.timestamp),
) {
hash_table::Entry::Vacant(entry) => {
if len >= self.max_context {
return Err(errors::Insert::Overflow);
Expand All @@ -117,19 +117,13 @@ impl Aggregator {

#[must_use]
pub fn distributions_to_protobuf(&self) -> SketchPayload {
let now = time::UNIX_EPOCH
.elapsed()
.expect("unable to poll clock, unrecoverable")
.as_secs()
.try_into()
.unwrap_or_default();
let mut sketch_payload = SketchPayload::new();

sketch_payload.sketches = self
.map
.iter()
.filter_map(|entry| match entry.value {
MetricValue::Distribution(_) => build_sketch(now, entry, self.tags.clone()),
MetricValue::Distribution(_) => build_sketch(entry, self.tags.clone()),
_ => None,
})
.collect();
Expand All @@ -138,12 +132,6 @@ impl Aggregator {

#[must_use]
pub fn consume_distributions(&mut self) -> Vec<SketchPayload> {
let now = time::UNIX_EPOCH
.elapsed()
.expect("unable to poll clock, unrecoverable")
.as_secs()
.try_into()
.unwrap_or_default();
let mut batched_payloads = Vec::new();
let mut sketch_payload = SketchPayload::new();
let mut this_batch_size = 0u64;
Expand All @@ -155,7 +143,7 @@ impl Aggregator {
}
false
})
.filter_map(|entry| build_sketch(now, &entry, self.tags.clone()))
.filter_map(|entry| build_sketch(&entry, self.tags.clone()))
{
let next_chunk_size = sketch.compute_size();

Expand Down Expand Up @@ -248,18 +236,23 @@ impl Aggregator {
batched_payloads
}

pub fn get_entry_by_id(&self, name: Ustr, tags: &Option<SortedTags>) -> Option<&Metric> {
let id = metric::id(name, tags);
pub fn get_entry_by_id(
&self,
name: Ustr,
tags: &Option<SortedTags>,
timestamp: i64,
) -> Option<&Metric> {
let id = metric::id(name, tags, timestamp);
self.map.find(id, |m| m.id == id)
}
}

fn build_sketch(now: i64, entry: &Metric, mut base_tag_vec: SortedTags) -> Option<Sketch> {
fn build_sketch(entry: &Metric, mut base_tag_vec: SortedTags) -> Option<Sketch> {
let sketch = entry.value.get_sketch()?;
let mut dogsketch = Dogsketch::default();
sketch.merge_to_dogsketch(&mut dogsketch);
// TODO(Astuyve) allow users to specify timestamp
dogsketch.set_ts(now);
dogsketch.set_ts(entry.timestamp);
let mut sketch = Sketch::default();
sketch.set_dogsketches(vec![dogsketch]);
let name = entry.name.to_string();
Expand All @@ -286,10 +279,7 @@ fn build_metric(entry: &Metric, mut base_tag_vec: SortedTags) -> Option<MetricTo
let point = datadog::Point {
value: entry.value.get_value()?,
// TODO(astuyve) allow user to specify timestamp
timestamp: time::UNIX_EPOCH
.elapsed()
.expect("unable to poll clock, unrecoverable")
.as_secs(),
timestamp: entry.timestamp as u64,
};

if let Some(tags) = entry.tags.clone() {
Expand Down Expand Up @@ -328,21 +318,29 @@ pub mod tests {
metric_id: &str,
value: f64,
tags: &str,
timestamp: i64,
) {
let aggregator = aggregator_mutex.lock().unwrap();
if let Some(e) =
aggregator.get_entry_by_id(metric_id.into(), &Some(SortedTags::parse(tags).unwrap()))
{
if let Some(e) = aggregator.get_entry_by_id(
metric_id.into(),
&Some(SortedTags::parse(tags).unwrap()),
timestamp,
) {
let metric = e.value.get_value().unwrap();
assert!((metric - value).abs() < PRECISION);
} else {
panic!("{}", format!("{metric_id} not found"));
}
}

pub fn assert_sketch(aggregator_mutex: &Mutex<Aggregator>, metric_id: &str, value: f64) {
pub fn assert_sketch(
aggregator_mutex: &Mutex<Aggregator>,
metric_id: &str,
value: f64,
timestamp: i64,
) {
let aggregator = aggregator_mutex.lock().unwrap();
if let Some(e) = aggregator.get_entry_by_id(metric_id.into(), &None) {
if let Some(e) = aggregator.get_entry_by_id(metric_id.into(), &None, timestamp) {
let metric = e.value.get_sketch().unwrap();
assert!((metric.max().unwrap() - value).abs() < PRECISION);
assert!((metric.min().unwrap() - value).abs() < PRECISION);
Expand Down Expand Up @@ -387,14 +385,20 @@ pub mod tests {
#[cfg_attr(miri, ignore)]
fn overflow() {
let mut aggregator = Aggregator::new(EMPTY_TAGS, 2).unwrap();

let mut now = std::time::UNIX_EPOCH
.elapsed()
.expect("unable to poll clock, unrecoverable")
.as_secs()
.try_into()
.unwrap_or_default();
now = (now / 10) * 10;
let metric1 = parse("test:1|c|#k:v").expect("metric parse failed");
let metric2 = parse("foo:1|c|#k:v").expect("metric parse failed");
let metric3 = parse("bar:1|c|#k:v").expect("metric parse failed");

let id1 = metric::id(metric1.name, &metric1.tags);
let id2 = metric::id(metric2.name, &metric2.tags);
let id3 = metric::id(metric3.name, &metric3.tags);
let id1 = metric::id(metric1.name, &metric1.tags, now);
let id2 = metric::id(metric2.name, &metric2.tags, now);
let id3 = metric::id(metric3.name, &metric3.tags, now);

assert_ne!(id1, id2);
assert_ne!(id1, id3);
Expand All @@ -417,25 +421,30 @@ pub mod tests {
#[cfg_attr(miri, ignore)]
fn clear() {
let mut aggregator = Aggregator::new(EMPTY_TAGS, 2).unwrap();

let metric1 = parse("test:3|c|#k1:v1").expect("metric parse failed");
let metric2 = parse("foo:5|c|#k2:v2").expect("metric parse failed");
let mut now = 1656581409;
now = (now / 10) * 10;
let metric1 = parse("test:3|c|#k1:v1|T1656581409").expect("metric parse failed");
let metric2 = parse("foo:5|c|#k2:v2|T1656581409").expect("metric parse failed");

assert!(aggregator.insert(metric1).is_ok());
assert!(aggregator.insert(metric2).is_ok());

assert_eq!(aggregator.map.len(), 2);
if let Some(v) =
aggregator.get_entry_by_id("foo".into(), &Some(SortedTags::parse("k2:v2").unwrap()))
{
if let Some(v) = aggregator.get_entry_by_id(
"foo".into(),
&Some(SortedTags::parse("k2:v2").unwrap()),
now,
) {
assert_eq!(v.value.get_value().unwrap(), 5f64);
} else {
panic!("failed to get value by id");
}

if let Some(v) =
aggregator.get_entry_by_id("test".into(), &Some(SortedTags::parse("k1:v1").unwrap()))
{
if let Some(v) = aggregator.get_entry_by_id(
"test".into(),
&Some(SortedTags::parse("k1:v1").unwrap()),
now,
) {
assert_eq!(v.value.get_value().unwrap(), 3f64);
} else {
panic!("failed to get value by id");
Expand Down
38 changes: 29 additions & 9 deletions dogstatsd/src/dogstatsd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,9 @@ mod tests {
#[cfg_attr(miri, ignore)]
async fn test_dogstatsd_multi_distribution() {
let locked_aggregator = setup_dogstatsd(
"single_machine_performance.rouster.api.series_v2.payload_size_bytes:269942|d
single_machine_performance.rouster.metrics_min_timestamp_latency:1426.90870216|d
single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d
"single_machine_performance.rouster.api.series_v2.payload_size_bytes:269942|d|T1656581409
single_machine_performance.rouster.metrics_min_timestamp_latency:1426.90870216|d|T1656581409
single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d|T1656581409
",
)
.await;
Expand All @@ -154,24 +154,38 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d
&locked_aggregator,
"single_machine_performance.rouster.api.series_v2.payload_size_bytes",
269_942_f64,
1656581400,
);
assert_sketch(
&locked_aggregator,
"single_machine_performance.rouster.metrics_min_timestamp_latency",
1_426.908_702_16,
1656581400,
);
assert_sketch(
&locked_aggregator,
"single_machine_performance.rouster.metrics_max_timestamp_latency",
1_376.908_702_16,
1656581400,
);
}

#[tokio::test]
#[cfg_attr(miri, ignore)]
async fn test_dogstatsd_multi_metric() {
let mut now = std::time::UNIX_EPOCH
.elapsed()
.expect("unable to poll clock, unrecoverable")
.as_secs()
.try_into()
.unwrap_or_default();
now = (now / 10) * 10;
let locked_aggregator = setup_dogstatsd(
"metric3:3|c|#tag3:val3,tag4:val4\nmetric1:1|c\nmetric2:2|c|#tag2:val2\n",
format!(
"metric3:3|c|#tag3:val3,tag4:val4\nmetric1:1|c\nmetric2:2|c|#tag2:val2|T{:}\n",
now
)
.as_str(),
)
.await;
let aggregator = locked_aggregator.lock().expect("lock poisoned");
Expand All @@ -182,23 +196,29 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d
assert_eq!(aggregator.distributions_to_protobuf().sketches.len(), 0);
drop(aggregator);

assert_value(&locked_aggregator, "metric1", 1.0, "");
assert_value(&locked_aggregator, "metric2", 2.0, "tag2:val2");
assert_value(&locked_aggregator, "metric3", 3.0, "tag3:val3,tag4:val4");
assert_value(&locked_aggregator, "metric1", 1.0, "", now);
assert_value(&locked_aggregator, "metric2", 2.0, "tag2:val2", now);
assert_value(
&locked_aggregator,
"metric3",
3.0,
"tag3:val3,tag4:val4",
now,
);
}

#[tokio::test]
#[cfg_attr(miri, ignore)]
async fn test_dogstatsd_single_metric() {
let locked_aggregator = setup_dogstatsd("metric123:99123|c").await;
let locked_aggregator = setup_dogstatsd("metric123:99123|c|T1656581409").await;
let aggregator = locked_aggregator.lock().expect("lock poisoned");
let parsed_metrics = aggregator.to_series();

assert_eq!(parsed_metrics.len(), 1);
assert_eq!(aggregator.distributions_to_protobuf().sketches.len(), 0);
drop(aggregator);

assert_value(&locked_aggregator, "metric123", 99_123.0, "");
assert_value(&locked_aggregator, "metric123", 99_123.0, "", 1656581400);
}

#[tokio::test]
Expand Down
Loading

0 comments on commit 89754a9

Please sign in to comment.