Skip to content

Commit

Permalink
Add metrics output to SizePartitioningStore
Browse files Browse the repository at this point in the history
SizePartitioningStore now properly publishes metrics about the
underlying store and config.

towards: #650
  • Loading branch information
allada committed Apr 25, 2024
1 parent fb0edae commit eb7733e
Showing 1 changed file with 31 additions and 7 deletions.
38 changes: 31 additions & 7 deletions nativelink-store/src/size_partitioning_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ use nativelink_error::{Error, ResultExt};
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
use nativelink_util::common::DigestInfo;
use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator};
use nativelink_util::metrics_utils::{Collector, CollectorState, MetricsComponent, Registry};
use nativelink_util::store_trait::{Store, UploadSizeInfo};
use tokio::join;

pub struct SizePartitioningStore {
size: i64,
partition_size: i64,
lower_store: Arc<dyn Store>,
upper_store: Arc<dyn Store>,
}
Expand All @@ -36,7 +37,7 @@ impl SizePartitioningStore {
upper_store: Arc<dyn Store>,
) -> Self {
SizePartitioningStore {
size: config.size as i64,
partition_size: config.size as i64,
lower_store,
upper_store,
}
Expand All @@ -53,7 +54,7 @@ impl Store for SizePartitioningStore {
let (lower_digests, upper_digests): (Vec<_>, Vec<_>) = digests
.iter()
.cloned()
.partition(|digest| digest.size_bytes < self.size);
.partition(|digest| digest.size_bytes < self.partition_size);
let (lower_results, upper_results) = join!(
Pin::new(self.lower_store.as_ref()).has_many(&lower_digests),
Pin::new(self.upper_store.as_ref()).has_many(&upper_digests),
Expand Down Expand Up @@ -88,11 +89,12 @@ impl Store for SizePartitioningStore {
reader: DropCloserReadHalf,
size_info: UploadSizeInfo,
) -> Result<(), Error> {
if digest.size_bytes < self.size {
if digest.size_bytes < self.partition_size {
return Pin::new(self.lower_store.as_ref())
.update(digest, reader, size_info)
.await;
}

Pin::new(self.upper_store.as_ref())
.update(digest, reader, size_info)
.await
Expand All @@ -105,7 +107,7 @@ impl Store for SizePartitioningStore {
offset: usize,
length: Option<usize>,
) -> Result<(), Error> {
if digest.size_bytes < self.size {
if digest.size_bytes < self.partition_size {
return Pin::new(self.lower_store.as_ref())
.get_part_ref(digest, writer, offset, length)
.await;
Expand All @@ -119,7 +121,7 @@ impl Store for SizePartitioningStore {
let Some(digest) = digest else {
return self;
};
if digest.size_bytes < self.size {
if digest.size_bytes < self.partition_size {
return self.lower_store.inner_store(Some(digest));
}
self.upper_store.inner_store(Some(digest))
Expand All @@ -129,7 +131,7 @@ impl Store for SizePartitioningStore {
let Some(digest) = digest else {
return self;
};
if digest.size_bytes < self.size {
if digest.size_bytes < self.partition_size {
return self.lower_store.clone().inner_store_arc(Some(digest));
}
self.upper_store.clone().inner_store_arc(Some(digest))
Expand All @@ -142,6 +144,28 @@ impl Store for SizePartitioningStore {
fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> {
self
}

fn register_metrics(self: Arc<Self>, registry: &mut Registry) {
let lower_store_registry = registry.sub_registry_with_prefix("lower_store");
self.lower_store
.clone()
.register_metrics(lower_store_registry);
let upper_store_registry = registry.sub_registry_with_prefix("upper_store");
self.upper_store
.clone()
.register_metrics(upper_store_registry);
registry.register_collector(Box::new(Collector::new(&self)));
}
}

impl MetricsComponent for SizePartitioningStore {
fn gather_metrics(&self, c: &mut CollectorState) {
c.publish(
"partition_size",
&self.partition_size,
"Size to partition our data",
);
}
}

default_health_status_indicator!(SizePartitioningStore);

0 comments on commit eb7733e

Please sign in to comment.