Skip to content

Commit

Permalink
Add hit metrics to FastSlowStore
Browse files Browse the repository at this point in the history
Adds metrics on what store was being hit in FastSlowStore.

towards: TraceMachina#650
  • Loading branch information
allada committed Apr 26, 2024
1 parent fb0edae commit 7e16389
Showing 1 changed file with 53 additions and 3 deletions.
56 changes: 53 additions & 3 deletions nativelink-store/src/fast_slow_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::cmp::{max, min};
use std::ops::Range;
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;

use async_trait::async_trait;
Expand All @@ -26,7 +27,7 @@ use nativelink_util::buf_channel::{
use nativelink_util::common::DigestInfo;
use nativelink_util::fs;
use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator};
use nativelink_util::metrics_utils::Registry;
use nativelink_util::metrics_utils::{CollectorState, MetricsComponent, Registry};
use nativelink_util::store_trait::{
slow_update_store_with_file, Store, StoreOptimizations, UploadSizeInfo,
};
Expand All @@ -41,6 +42,8 @@ use nativelink_util::store_trait::{
pub struct FastSlowStore {
fast_store: Arc<dyn Store>,
slow_store: Arc<dyn Store>,

metrics: FastSlowStoreMetrics,
}

impl FastSlowStore {
Expand All @@ -52,6 +55,7 @@ impl FastSlowStore {
Self {
fast_store,
slow_store,
metrics: FastSlowStoreMetrics::default(),
}
}

Expand Down Expand Up @@ -273,9 +277,16 @@ impl Store for FastSlowStore {
let fast_store = self.pin_fast_store();
let slow_store = self.pin_slow_store();
if fast_store.has(digest).await?.is_some() {
return fast_store
self.metrics
.fast_store_hit_count
.fetch_add(1, Ordering::Acquire);
fast_store
.get_part_ref(digest, writer, offset, length)
.await;
.await?;
self.metrics
.fast_store_downloaded_bytes
.fetch_add(writer.get_bytes_written(), Ordering::Acquire);
return Ok(());
}

let sz = slow_store
Expand All @@ -289,6 +300,9 @@ impl Store for FastSlowStore {
digest.hash_str()
)
})?;
self.metrics
.slow_store_hit_count
.fetch_add(1, Ordering::Acquire);

let send_range = offset..length.map_or(usize::MAX, |length| length + offset);
let mut bytes_received: usize = 0;
Expand All @@ -309,6 +323,9 @@ impl Store for FastSlowStore {
let fast_res = fast_tx.send_eof();
return Ok::<_, Error>((fast_res, writer_pin));
}
self.metrics
.slow_store_downloaded_bytes
.fetch_add(output_buf.len() as u64, Ordering::Acquire);

let writer_fut = if let Some(range) = Self::calculate_range(
&(bytes_received..bytes_received + output_buf.len()),
Expand Down Expand Up @@ -373,4 +390,37 @@ impl Store for FastSlowStore {
}
}

#[derive(Default)]
struct FastSlowStoreMetrics {
fast_store_hit_count: AtomicU64,
fast_store_downloaded_bytes: AtomicU64,
slow_store_hit_count: AtomicU64,
slow_store_downloaded_bytes: AtomicU64,
}

impl MetricsComponent for FastSlowStoreMetrics {
fn gather_metrics(&self, c: &mut CollectorState) {
c.publish(
"fast_store_hit_count",
&self.fast_store_hit_count,
"Hit count for the fast store",
);
c.publish(
"fast_store_downloaded_bytes",
&self.fast_store_downloaded_bytes,
"Downloaded bytes from the fast store",
);
c.publish(
"slow_store_hit_count",
&self.slow_store_hit_count,
"Hit count for the slow store",
);
c.publish(
"slow_store_downloaded_bytes",
&self.slow_store_downloaded_bytes,
"Downloaded bytes from the slow store",
);
}
}

default_health_status_indicator!(FastSlowStore);

0 comments on commit 7e16389

Please sign in to comment.