Skip to content

Commit

Permalink
Add metrics output to SizePartitioningStore
Browse files Browse the repository at this point in the history
SizePartitioningStore now properly publishes metrics about the
underlying store and config.

towards: #650
  • Loading branch information
allada committed Apr 26, 2024
1 parent 8ee7ab3 commit 6fa70d2
Showing 1 changed file with 30 additions and 7 deletions.
37 changes: 30 additions & 7 deletions nativelink-store/src/size_partitioning_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ use nativelink_error::{Error, ResultExt};
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
use nativelink_util::common::DigestInfo;
use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator};
use nativelink_util::metrics_utils::{Collector, CollectorState, MetricsComponent, Registry};
use nativelink_util::store_trait::{Store, UploadSizeInfo};
use tokio::join;

pub struct SizePartitioningStore {
size: i64,
partition_size: i64,
lower_store: Arc<dyn Store>,
upper_store: Arc<dyn Store>,
}
Expand All @@ -36,7 +37,7 @@ impl SizePartitioningStore {
upper_store: Arc<dyn Store>,
) -> Self {
SizePartitioningStore {
size: config.size as i64,
partition_size: config.size as i64,
lower_store,
upper_store,
}
Expand All @@ -53,7 +54,7 @@ impl Store for SizePartitioningStore {
let (lower_digests, upper_digests): (Vec<_>, Vec<_>) = digests
.iter()
.cloned()
.partition(|digest| digest.size_bytes < self.size);
.partition(|digest| digest.size_bytes < self.partition_size);
let (lower_results, upper_results) = join!(
Pin::new(self.lower_store.as_ref()).has_many(&lower_digests),
Pin::new(self.upper_store.as_ref()).has_many(&upper_digests),
Expand Down Expand Up @@ -88,7 +89,7 @@ impl Store for SizePartitioningStore {
reader: DropCloserReadHalf,
size_info: UploadSizeInfo,
) -> Result<(), Error> {
if digest.size_bytes < self.size {
if digest.size_bytes < self.partition_size {
return Pin::new(self.lower_store.as_ref())
.update(digest, reader, size_info)
.await;
Expand All @@ -105,7 +106,7 @@ impl Store for SizePartitioningStore {
offset: usize,
length: Option<usize>,
) -> Result<(), Error> {
if digest.size_bytes < self.size {
if digest.size_bytes < self.partition_size {
return Pin::new(self.lower_store.as_ref())
.get_part_ref(digest, writer, offset, length)
.await;
Expand All @@ -119,7 +120,7 @@ impl Store for SizePartitioningStore {
let Some(digest) = digest else {
return self;
};
if digest.size_bytes < self.size {
if digest.size_bytes < self.partition_size {
return self.lower_store.inner_store(Some(digest));
}
self.upper_store.inner_store(Some(digest))
Expand All @@ -129,7 +130,7 @@ impl Store for SizePartitioningStore {
let Some(digest) = digest else {
return self;
};
if digest.size_bytes < self.size {
if digest.size_bytes < self.partition_size {
return self.lower_store.clone().inner_store_arc(Some(digest));
}
self.upper_store.clone().inner_store_arc(Some(digest))
Expand All @@ -142,6 +143,28 @@ impl Store for SizePartitioningStore {
fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> {
self
}

fn register_metrics(self: Arc<Self>, registry: &mut Registry) {
let lower_store_registry = registry.sub_registry_with_prefix("lower_store");
self.lower_store
.clone()
.register_metrics(lower_store_registry);
let upper_store_registry = registry.sub_registry_with_prefix("upper_store");
self.upper_store
.clone()
.register_metrics(upper_store_registry);
registry.register_collector(Box::new(Collector::new(&self)));
}
}

impl MetricsComponent for SizePartitioningStore {
fn gather_metrics(&self, c: &mut CollectorState) {
c.publish(
"partition_size",
&self.partition_size,
"Size to partition our data",
);
}
}

default_health_status_indicator!(SizePartitioningStore);

0 comments on commit 6fa70d2

Please sign in to comment.