From 520b762e513dbac0d1a58c4172b31bd10cdfdaed Mon Sep 17 00:00:00 2001
From: "Nathan (Blaise) Bruer" <github.blaise@allada.com>
Date: Fri, 26 Apr 2024 10:33:38 -0500
Subject: [PATCH] Add metrics to CompletenessCheckingStore (#882)

CompletenessCheckingStore will now publish metrics about the
underlying store.

towards: #650
---
 .../src/completeness_checking_store.rs        | 452 +++++++++---------
 1 file changed, 239 insertions(+), 213 deletions(-)

diff --git a/nativelink-store/src/completeness_checking_store.rs b/nativelink-store/src/completeness_checking_store.rs
index ff7ff9f77..d0656ef2c 100644
--- a/nativelink-store/src/completeness_checking_store.rs
+++ b/nativelink-store/src/completeness_checking_store.rs
@@ -26,6 +26,9 @@ use nativelink_proto::build::bazel::remote::execution::v2::{
 use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
 use nativelink_util::common::DigestInfo;
 use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator};
+use nativelink_util::metrics_utils::{
+    Collector, CollectorState, CounterWithTime, MetricsComponent, Registry,
+};
 use nativelink_util::store_trait::{Store, UploadSizeInfo};
 use parking_lot::Mutex;
 use tokio::sync::Notify;
@@ -33,20 +36,6 @@ use tracing::{event, Level};
 
 use crate::ac_utils::{get_and_decode_digest, get_size_and_decode_digest};
 
-pub struct CompletenessCheckingStore {
-    cas_store: Arc<dyn Store>,
-    ac_store: Arc<dyn Store>,
-}
-
-impl CompletenessCheckingStore {
-    pub fn new(ac_store: Arc<dyn Store>, cas_store: Arc<dyn Store>) -> Self {
-        CompletenessCheckingStore {
-            cas_store,
-            ac_store,
-        }
-    }
-}
-
 /// Given a proto action result, return all relevant digests and
 /// output directories that need to be checked.
 fn get_digests_and_output_dirs(
@@ -116,208 +105,229 @@ async fn check_output_directories(
     Ok(())
 }
 
-/// Check that all files and directories in action results
-/// exist in the CAS. Does this by decoding digests and
-/// checking their existence in two separate sets of futures that
-/// are polled concurrently.
-async fn inner_has_with_results(
-    ac_store: Pin<&dyn Store>,
-    cas_store: Pin<&dyn Store>,
-    action_result_digests: &[DigestInfo],
-    results: &mut [Option<usize>],
-) -> Result<(), Error> {
-    // Holds shared state between the different futures.
-    // This is how get around lifetime issues.
-    struct State<'a> {
-        results: &'a mut [Option<usize>],
-        digests_to_check: Vec<DigestInfo>,
-        digests_to_check_idxs: Vec<usize>,
-        notify: Arc<Notify>,
-        done: bool,
+pub struct CompletenessCheckingStore {
+    cas_store: Arc<dyn Store>,
+    ac_store: Arc<dyn Store>,
+
+    incomplete_entries_counter: CounterWithTime,
+    complete_entries_counter: CounterWithTime,
+}
+
+impl CompletenessCheckingStore {
+    pub fn new(ac_store: Arc<dyn Store>, cas_store: Arc<dyn Store>) -> Self {
+        CompletenessCheckingStore {
+            cas_store,
+            ac_store,
+            incomplete_entries_counter: CounterWithTime::default(),
+            complete_entries_counter: CounterWithTime::default(),
+        }
     }
-    // Note: In theory Mutex is not needed, but lifetimes are
-    // very tricky to get right here. Since we are using parking_lot
-    // and we are guaranteed to never have lock collisions, it should
-    // be nearly as fast as a few atomic operations.
-    let state_mux = &Mutex::new(State {
-        results,
-        digests_to_check: Vec::new(),
-        digests_to_check_idxs: Vec::new(),
-        // Note: Any time `digests_to_check` or `digests_to_check_idxs` is
-        // modified we must notify the subscriber here.
-        notify: Arc::new(Notify::new()),
-        done: false,
-    });
-
-    let mut futures = action_result_digests
-        .iter()
-        .enumerate()
-        .map(|(i, digest)| {
-            async move {
-                // Note: We don't err_tip here because often have NotFound here which is ok.
-                let (action_result, size) =
-                    get_size_and_decode_digest::<ProtoActionResult>(ac_store, digest).await?;
-
-                let (mut digest_infos, output_directories) =
-                    get_digests_and_output_dirs(action_result)?;
 
-                {
-                    let mut state = state_mux.lock();
+    /// Check that all files and directories in action results
+    /// exist in the CAS. Does this by decoding digests and
+    /// checking their existence in two separate sets of futures that
+    /// are polled concurrently.
+    async fn inner_has_with_results(
+        &self,
+        action_result_digests: &[DigestInfo],
+        results: &mut [Option<usize>],
+    ) -> Result<(), Error> {
+        let ac_store = Pin::new(self.ac_store.as_ref());
+        let cas_store = Pin::new(self.cas_store.as_ref());
+        // Holds shared state between the different futures.
+        // This is how get around lifetime issues.
+        struct State<'a> {
+            results: &'a mut [Option<usize>],
+            digests_to_check: Vec<DigestInfo>,
+            digests_to_check_idxs: Vec<usize>,
+            notify: Arc<Notify>,
+            done: bool,
+        }
+        // Note: In theory Mutex is not needed, but lifetimes are
+        // very tricky to get right here. Since we are using parking_lot
+        // and we are guaranteed to never have lock collisions, it should
+        // be nearly as fast as a few atomic operations.
+        let state_mux = &Mutex::new(State {
+            results,
+            digests_to_check: Vec::new(),
+            digests_to_check_idxs: Vec::new(),
+            // Note: Any time `digests_to_check` or `digests_to_check_idxs` is
+            // modified we must notify the subscriber here.
+            notify: Arc::new(Notify::new()),
+            done: false,
+        });
 
-                    // We immediately set the size of the digest here. Later we will unset it if
-                    // we find that the digest has missing outputs.
-                    state.results[i] = Some(size);
-                    let rep_len = digest_infos.len();
-                    if state.digests_to_check.is_empty() {
-                        // Hot path: Most actions only have files and only one digest
-                        // requested to be checked. So we can avoid the heap allocation
-                        // by just swapping out our container's stack if our pending_digests
-                        // is empty.
-                        mem::swap(&mut state.digests_to_check, &mut digest_infos);
-                    } else {
-                        state.digests_to_check.extend(digest_infos);
+        let mut futures = action_result_digests
+            .iter()
+            .enumerate()
+            .map(|(i, digest)| {
+                async move {
+                    // Note: We don't err_tip here because often have NotFound here which is ok.
+                    let (action_result, size) =
+                        get_size_and_decode_digest::<ProtoActionResult>(ac_store, digest).await?;
+
+                    let (mut digest_infos, output_directories) =
+                        get_digests_and_output_dirs(action_result)?;
+
+                    {
+                        let mut state = state_mux.lock();
+
+                        // We immediately set the size of the digest here. Later we will unset it if
+                        // we find that the digest has missing outputs.
+                        state.results[i] = Some(size);
+                        let rep_len = digest_infos.len();
+                        if state.digests_to_check.is_empty() {
+                            // Hot path: Most actions only have files and only one digest
+                            // requested to be checked. So we can avoid the heap allocation
+                            // by just swapping out our container's stack if our pending_digests
+                            // is empty.
+                            mem::swap(&mut state.digests_to_check, &mut digest_infos);
+                        } else {
+                            state.digests_to_check.extend(digest_infos);
+                        }
+                        state
+                            .digests_to_check_idxs
+                            .extend(iter::repeat(i).take(rep_len));
+                        state.notify.notify_one();
                     }
-                    state
-                        .digests_to_check_idxs
-                        .extend(iter::repeat(i).take(rep_len));
-                    state.notify.notify_one();
-                }
 
-                // Hot path: It is very common for no output directories to be defined.
-                // So we can avoid any needless work by early returning.
-                if output_directories.is_empty() {
-                    return Ok(());
-                }
+                    // Hot path: It is very common for no output directories to be defined.
+                    // So we can avoid any needless work by early returning.
+                    if output_directories.is_empty() {
+                        return Ok(());
+                    }
 
-                check_output_directories(cas_store, output_directories, &move |digest_infos| {
-                    let mut state = state_mux.lock();
-                    let rep_len = digest_infos.len();
-                    state.digests_to_check.extend(digest_infos);
-                    state
-                        .digests_to_check_idxs
-                        .extend(iter::repeat(i).take(rep_len));
-                    state.notify.notify_one();
-                })
-                .await?;
+                    check_output_directories(cas_store, output_directories, &move |digest_infos| {
+                        let mut state = state_mux.lock();
+                        let rep_len = digest_infos.len();
+                        state.digests_to_check.extend(digest_infos);
+                        state
+                            .digests_to_check_idxs
+                            .extend(iter::repeat(i).take(rep_len));
+                        state.notify.notify_one();
+                    })
+                    .await?;
 
-                Result::<(), Error>::Ok(())
-            }
-            // Add a tip to the error to help with debugging and the index of the
-            // digest that failed so we know which one to unset.
-            .map_err(move |mut e| {
-                if e.code != Code::NotFound {
-                    e = e.append(
-                        "Error checking existence of digest in CompletenessCheckingStore::has",
-                    );
+                    Result::<(), Error>::Ok(())
                 }
-                (e, i)
+                // Add a tip to the error to help with debugging and the index of the
+                // digest that failed so we know which one to unset.
+                .map_err(move |mut e| {
+                    if e.code != Code::NotFound {
+                        e = e.append(
+                            "Error checking existence of digest in CompletenessCheckingStore::has",
+                        );
+                    }
+                    (e, i)
+                })
             })
-        })
-        .collect::<FuturesUnordered<_>>();
-
-    // This future will wait for the notify to be notified and then
-    // check the CAS store for the digest's existence.
-    // For optimization reasons we only allow one outstanding call to
-    // the underlying `has_with_results()` at a time. This is because
-    // we want to give the ability for stores to batch requests together
-    // whenever possible.
-    // The most common case is only one notify will ever happen.
-    let check_existence_fut = async {
-        let mut has_results = vec![];
-        let notify = state_mux.lock().notify.clone();
-        loop {
-            notify.notified().await;
-            let (digests, indexes) = {
-                let mut state = state_mux.lock();
-                if state.done {
-                    if state.digests_to_check.is_empty() {
-                        break;
+            .collect::<FuturesUnordered<_>>();
+
+        // This future will wait for the notify to be notified and then
+        // check the CAS store for the digest's existence.
+        // For optimization reasons we only allow one outstanding call to
+        // the underlying `has_with_results()` at a time. This is because
+        // we want to give the ability for stores to batch requests together
+        // whenever possible.
+        // The most common case is only one notify will ever happen.
+        let check_existence_fut = async {
+            let mut has_results = vec![];
+            let notify = state_mux.lock().notify.clone();
+            loop {
+                notify.notified().await;
+                let (digests, indexes) = {
+                    let mut state = state_mux.lock();
+                    if state.done {
+                        if state.digests_to_check.is_empty() {
+                            break;
+                        }
+                        // Edge case: It is possible for our `digest_to_check` to have had
+                        // data added, `notify_one` called, then immediately `done` set to
+                        // true. We protect ourselves by checking if we have digests if done
+                        // is set, and if we do, let ourselves know to run again, but continue
+                        // processing the data.
+                        notify.notify_one();
+                    }
+                    (
+                        mem::take(&mut state.digests_to_check),
+                        mem::take(&mut state.digests_to_check_idxs),
+                    )
+                };
+                assert_eq!(
+                    digests.len(),
+                    indexes.len(),
+                    "Expected sizes to match in CompletenessCheckingStore::has"
+                );
+
+                // Recycle our results vector to avoid needless allocations.
+                has_results.clear();
+                has_results.resize(digests.len(), None);
+                cas_store
+                    .has_with_results(&digests, &mut has_results[..])
+                    .await
+                    .err_tip(|| {
+                        "Error calling has_with_results() inside CompletenessCheckingStore::has"
+                    })?;
+                let missed_indexes = has_results
+                    .iter()
+                    .zip(indexes)
+                    .filter_map(|(r, index)| r.map_or_else(|| Some(index), |_| None));
+                {
+                    let mut state = state_mux.lock();
+                    for index in missed_indexes {
+                        state.results[index] = None;
                     }
-                    // Edge case: It is possible for our `digest_to_check` to have had
-                    // data added, `notify_one` called, then immediately `done` set to
-                    // true. We protect ourselves by checking if we have digests if done
-                    // is set, and if we do, let ourselves know to run again, but continue
-                    // processing the data.
-                    notify.notify_one();
-                }
-                (
-                    mem::take(&mut state.digests_to_check),
-                    mem::take(&mut state.digests_to_check_idxs),
-                )
-            };
-            assert_eq!(
-                digests.len(),
-                indexes.len(),
-                "Expected sizes to match in CompletenessCheckingStore::has"
-            );
-
-            // Recycle our results vector to avoid needless allocations.
-            has_results.clear();
-            has_results.resize(digests.len(), None);
-            cas_store
-                .has_with_results(&digests, &mut has_results[..])
-                .await
-                .err_tip(|| {
-                    "Error calling has_with_results() inside CompletenessCheckingStore::has"
-                })?;
-            let missed_indexes = has_results
-                .iter()
-                .zip(indexes)
-                .filter_map(|(r, index)| r.map_or_else(|| Some(index), |_| None));
-            {
-                let mut state = state_mux.lock();
-                for index in missed_indexes {
-                    state.results[index] = None;
                 }
             }
+            Result::<(), Error>::Ok(())
         }
-        Result::<(), Error>::Ok(())
-    }
-    .fuse();
-    tokio::pin!(check_existence_fut);
-
-    loop {
-        // Poll both futures at the same time.
-        select! {
-            r = check_existence_fut => {
-                return Err(make_err!(
-                    Code::Internal,
-                    "CompletenessCheckingStore's check_existence_fut ended unexpectedly {r:?}"
-                ));
-            }
-            maybe_result = futures.next() => {
-                match maybe_result {
-                    Some(Ok(())) => {}
-                    Some(Err((err, i))) => {
-                        state_mux.lock().results[i] = None;
-                        // Note: Don't return the errors. We just flag the result as
-                        // missing but show a warning if it's not a NotFound.
-                        if err.code != Code::NotFound {
-                            event!(
-                                Level::WARN,
-                                ?err,
-                                "Error checking existence of digest"
-                            );
+        .fuse();
+        tokio::pin!(check_existence_fut);
+
+        loop {
+            // Poll both futures at the same time.
+            select! {
+                r = check_existence_fut => {
+                    return Err(make_err!(
+                        Code::Internal,
+                        "CompletenessCheckingStore's check_existence_fut ended unexpectedly {r:?}"
+                    ));
+                }
+                maybe_result = futures.next() => {
+                    match maybe_result {
+                        Some(Ok(())) => self.complete_entries_counter.inc(),
+                        Some(Err((err, i))) => {
+                            self.incomplete_entries_counter.inc();
+                            state_mux.lock().results[i] = None;
+                            // Note: Don't return the errors. We just flag the result as
+                            // missing but show a warning if it's not a NotFound.
+                            if err.code != Code::NotFound {
+                                event!(
+                                    Level::WARN,
+                                    ?err,
+                                    "Error checking existence of digest"
+                                );
+                            }
                         }
-                    }
-                    None => {
-                        // We are done, so flag it done and ensure we notify the
-                        // subscriber future.
-                        {
-                            let mut state = state_mux.lock();
-                            state.done = true;
-                            state.notify.notify_one();
+                        None => {
+                            // We are done, so flag it done and ensure we notify the
+                            // subscriber future.
+                            {
+                                let mut state = state_mux.lock();
+                                state.done = true;
+                                state.notify.notify_one();
+                            }
+                            check_existence_fut
+                                .await
+                                .err_tip(|| "CompletenessCheckingStore's check_existence_fut ended unexpectedly on last await")?;
+                            return Ok(());
                         }
-                        check_existence_fut
-                            .await
-                            .err_tip(|| "CompletenessCheckingStore's check_existence_fut ended unexpectedly on last await")?;
-                        return Ok(());
                     }
                 }
             }
         }
+        // Unreachable.
     }
-    // Unreachable.
 }
 
 #[async_trait]
@@ -327,13 +337,8 @@ impl Store for CompletenessCheckingStore {
         action_result_digests: &[DigestInfo],
         results: &mut [Option<usize>],
     ) -> Result<(), Error> {
-        inner_has_with_results(
-            Pin::new(self.ac_store.as_ref()),
-            Pin::new(self.cas_store.as_ref()),
-            action_result_digests,
-            results,
-        )
-        .await
+        self.inner_has_with_results(action_result_digests, results)
+            .await
     }
 
     async fn update(
@@ -354,23 +359,19 @@ impl Store for CompletenessCheckingStore {
         offset: usize,
         length: Option<usize>,
     ) -> Result<(), Error> {
-        let ac_store = Pin::new(self.ac_store.as_ref());
         let results = &mut [None];
-        inner_has_with_results(
-            ac_store,
-            Pin::new(self.cas_store.as_ref()),
-            &[digest],
-            results,
-        )
-        .await
-        .err_tip(|| "when calling CompletenessCheckingStore::get_part_ref")?;
+        self.inner_has_with_results(&[digest], results)
+            .await
+            .err_tip(|| "when calling CompletenessCheckingStore::get_part_ref")?;
         if results[0].is_none() {
             return Err(make_err!(
                 Code::NotFound,
                 "Digest found, but not all parts were found in CompletenessCheckingStore::get_part_ref"
             ));
         }
-        ac_store.get_part_ref(digest, writer, offset, length).await
+        Pin::new(self.ac_store.as_ref())
+            .get_part_ref(digest, writer, offset, length)
+            .await
     }
 
     fn inner_store(&self, _digest: Option<DigestInfo>) -> &'_ dyn Store {
@@ -388,6 +389,31 @@ impl Store for CompletenessCheckingStore {
     fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> {
         self
     }
+
+    fn register_metrics(self: Arc<Self>, registry: &mut Registry) {
+        self.cas_store
+            .clone()
+            .register_metrics(registry.sub_registry_with_prefix("cas_store"));
+        self.ac_store
+            .clone()
+            .register_metrics(registry.sub_registry_with_prefix("ac_store"));
+        registry.register_collector(Box::new(Collector::new(&self)));
+    }
+}
+
+impl MetricsComponent for CompletenessCheckingStore {
+    fn gather_metrics(&self, c: &mut CollectorState) {
+        c.publish(
+            "incomplete_entries_counter",
+            &self.incomplete_entries_counter,
+            "Incomplete entries hit in CompletenessCheckingStore",
+        );
+        c.publish(
+            "complete_entries_counter",
+            &self.complete_entries_counter,
+            "Complete entries hit in CompletenessCheckingStore",
+        );
+    }
 }
 
 default_health_status_indicator!(CompletenessCheckingStore);