Skip to content

Commit

Permalink
feat: add capability to purge old histogram data (#460)
Browse files Browse the repository at this point in the history
Co-authored-by: Toby Lawrence <[email protected]>
  • Loading branch information
mnpw and tobz authored Mar 16, 2024
1 parent d48fab7 commit 3c71988
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 7 deletions.
1 change: 1 addition & 0 deletions clippy.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
too-many-lines-threshold = 150
25 changes: 24 additions & 1 deletion metrics-exporter-prometheus/src/exporter/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub struct PrometheusBuilder {
buckets: Option<Vec<f64>>,
bucket_overrides: Option<HashMap<Matcher, Vec<f64>>>,
idle_timeout: Option<Duration>,
upkeep_timeout: Duration,
recency_mask: MetricKindMask,
global_labels: Option<IndexMap<String, String>>,
}
Expand All @@ -60,6 +61,8 @@ impl PrometheusBuilder {
#[cfg(not(feature = "http-listener"))]
let exporter_config = ExporterConfig::Unconfigured;

let upkeep_timeout = Duration::from_secs(5);

Self {
exporter_config,
#[cfg(feature = "http-listener")]
Expand All @@ -70,6 +73,7 @@ impl PrometheusBuilder {
buckets: None,
bucket_overrides: None,
idle_timeout: None,
upkeep_timeout,
recency_mask: MetricKindMask::NONE,
global_labels: None,
}
Expand Down Expand Up @@ -303,6 +307,16 @@ impl PrometheusBuilder {
self
}

/// Sets the upkeep interval.
///
/// The upkeep task handles periodic maintenance operations, such as draining histogram data,
/// to ensure that all recorded data is up-to-date and prevent unbounded memory growth.
#[must_use]
pub fn upkeep_timeout(mut self, timeout: Duration) -> Self {
self.upkeep_timeout = timeout;
self
}

/// Adds a global label to this exporter.
///
/// Global labels are applied to all metrics. Labels defined on the metric key itself have precedence
Expand Down Expand Up @@ -409,11 +423,20 @@ impl PrometheusBuilder {
pub fn build(mut self) -> Result<(PrometheusRecorder, ExporterFuture), BuildError> {
#[cfg(feature = "http-listener")]
let allowed_addresses = self.allowed_addresses.take();

let exporter_config = self.exporter_config.clone();
let upkeep_timeout = self.upkeep_timeout;

let recorder = self.build_recorder();
let handle = recorder.handle();

let recorder_handle = handle.clone();
tokio::spawn(async move {
loop {
tokio::time::sleep(upkeep_timeout).await;
recorder_handle.run_upkeep();
}
});

Ok((
recorder,
match exporter_config {
Expand Down
31 changes: 25 additions & 6 deletions metrics-exporter-prometheus/src/recorder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ impl Inner {
*entry = value;
}

// Update distributions
self.drain_histograms_to_distributions();
// Remove expired histograms
let histogram_handles = self.registry.get_histogram_handles();
for (key, histogram) in histogram_handles {
let gen = histogram.get_generation();
Expand All @@ -66,7 +69,7 @@ impl Inner {
let (name, labels) = key_to_parts(&key, Some(&self.global_labels));
let mut wg = self.distributions.write().unwrap_or_else(PoisonError::into_inner);
let delete_by_name = if let Some(by_name) = wg.get_mut(&name) {
by_name.remove(&labels);
by_name.swap_remove(&labels);
by_name.is_empty()
} else {
false
Expand All @@ -80,7 +83,18 @@ impl Inner {

continue;
}
}

let distributions =
self.distributions.read().unwrap_or_else(PoisonError::into_inner).clone();

Snapshot { counters, gauges, distributions }
}

/// Drains histogram samples into distribution.
fn drain_histograms_to_distributions(&self) {
let histogram_handles = self.registry.get_histogram_handles();
for (key, histogram) in histogram_handles {
let (name, labels) = key_to_parts(&key, Some(&self.global_labels));

let mut wg = self.distributions.write().unwrap_or_else(PoisonError::into_inner);
Expand All @@ -92,11 +106,6 @@ impl Inner {

histogram.get_inner().clear_with(|samples| entry.record_samples(samples));
}

let distributions =
self.distributions.read().unwrap_or_else(PoisonError::into_inner).clone();

Snapshot { counters, gauges, distributions }
}

fn render(&self) -> String {
Expand Down Expand Up @@ -194,6 +203,10 @@ impl Inner {

output
}

fn run_upkeep(&self) {
self.drain_histograms_to_distributions();
}
}

/// A Prometheus recorder.
Expand Down Expand Up @@ -273,4 +286,10 @@ impl PrometheusHandle {
pub fn render(&self) -> String {
self.inner.render()
}

/// Performs upkeeping operations to ensure metrics held by recorder are up-to-date and do not
/// grow unboundedly.
pub fn run_upkeep(&self) {
self.inner.run_upkeep();
}
}

0 comments on commit 3c71988

Please sign in to comment.