From f986e68973d3dc476e2067fc9b809d30d431eb0d Mon Sep 17 00:00:00 2001 From: "Nathan (Blaise) Bruer" Date: Thu, 25 Apr 2024 14:20:53 -0500 Subject: [PATCH] Add metrics output to SizePartitioningStore SizePartitioningStore now properly publishes metrics about the underlying store and config. towards: #650 --- .../src/size_partitioning_store.rs | 37 +++++++++++++++---- 1 file changed, 30 insertions(+), 7 deletions(-) diff --git a/nativelink-store/src/size_partitioning_store.rs b/nativelink-store/src/size_partitioning_store.rs index f426ea5763..e66bcbdbb7 100644 --- a/nativelink-store/src/size_partitioning_store.rs +++ b/nativelink-store/src/size_partitioning_store.rs @@ -20,11 +20,12 @@ use nativelink_error::{Error, ResultExt}; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; +use nativelink_util::metrics_utils::{Collector, CollectorState, MetricsComponent, Registry}; use nativelink_util::store_trait::{Store, UploadSizeInfo}; use tokio::join; pub struct SizePartitioningStore { - size: i64, + partition_size: i64, lower_store: Arc, upper_store: Arc, } @@ -36,7 +37,7 @@ impl SizePartitioningStore { upper_store: Arc, ) -> Self { SizePartitioningStore { - size: config.size as i64, + partition_size: config.size as i64, lower_store, upper_store, } @@ -53,7 +54,7 @@ impl Store for SizePartitioningStore { let (lower_digests, upper_digests): (Vec<_>, Vec<_>) = digests .iter() .cloned() - .partition(|digest| digest.size_bytes < self.size); + .partition(|digest| digest.size_bytes < self.partition_size); let (lower_results, upper_results) = join!( Pin::new(self.lower_store.as_ref()).has_many(&lower_digests), Pin::new(self.upper_store.as_ref()).has_many(&upper_digests), @@ -88,7 +89,7 @@ impl Store for SizePartitioningStore { reader: DropCloserReadHalf, size_info: UploadSizeInfo, ) -> Result<(), Error> { - if digest.size_bytes < self.size { + if digest.size_bytes < self.partition_size { return Pin::new(self.lower_store.as_ref()) .update(digest, reader, size_info) .await; @@ -105,7 +106,7 @@ impl Store for SizePartitioningStore { offset: usize, length: Option, ) -> Result<(), Error> { - if digest.size_bytes < self.size { + if digest.size_bytes < self.partition_size { return Pin::new(self.lower_store.as_ref()) .get_part_ref(digest, writer, offset, length) .await; @@ -119,7 +120,7 @@ impl Store for SizePartitioningStore { let Some(digest) = digest else { return self; }; - if digest.size_bytes < self.size { + if digest.size_bytes < self.partition_size { return self.lower_store.inner_store(Some(digest)); } self.upper_store.inner_store(Some(digest)) @@ -129,7 +130,7 @@ impl Store for SizePartitioningStore { let Some(digest) = digest else { return self; }; - if digest.size_bytes < self.size { + if digest.size_bytes < self.partition_size { return self.lower_store.clone().inner_store_arc(Some(digest)); } self.upper_store.clone().inner_store_arc(Some(digest)) @@ -142,6 +143,28 @@ impl Store for SizePartitioningStore { fn as_any_arc(self: Arc) -> Arc { self } + + fn register_metrics(self: Arc, registry: &mut Registry) { + let lower_store_registry = registry.sub_registry_with_prefix("lower_store"); + self.lower_store + .clone() + .register_metrics(lower_store_registry); + let upper_store_registry = registry.sub_registry_with_prefix("upper_store"); + self.upper_store + .clone() + .register_metrics(upper_store_registry); + registry.register_collector(Box::new(Collector::new(&self))); + } +} + +impl MetricsComponent for SizePartitioningStore { + fn gather_metrics(&self, c: &mut CollectorState) { + c.publish( + "partition_size", + &self.partition_size, + "Size to partition our data", + ); + } } default_health_status_indicator!(SizePartitioningStore);