Skip to content

Commit

Permalink
Add prometheus stats for VerifyStore
Browse files Browse the repository at this point in the history
  • Loading branch information
allada committed Jul 19, 2023
1 parent 732ae83 commit 5f5b2c4
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 55 deletions.
1 change: 1 addition & 0 deletions cas/store/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ rust_library(
"//util:buf_channel",
"//util:common",
"//util:error",
"//util:prometheus_utils",
"@crate_index//:hex",
"@crate_index//:sha2",
"@crate_index//:tokio",
Expand Down
150 changes: 97 additions & 53 deletions cas/store/verify_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,26 @@

use std::convert::TryFrom;
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;

use async_trait::async_trait;
use sha2::{Digest, Sha256};

use buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf};
use common::DigestInfo;
use error::{error_if, Error, ResultExt};
use error::{make_input_err, Error, ResultExt};
use prometheus_utils::{Collector, CollectorState, MetricsComponent, Registry};
use traits::{StoreTrait, UploadSizeInfo};

pub struct VerifyStore {
inner_store: Arc<dyn StoreTrait>,
verify_size: bool,
verify_hash: bool,

// Metrics.
size_verification_failures: AtomicU64,
hash_verification_failures: AtomicU64,
}

impl VerifyStore {
Expand All @@ -36,63 +42,70 @@ impl VerifyStore {
inner_store,
verify_size: config.verify_size,
verify_hash: config.verify_hash,
size_verification_failures: AtomicU64::new(0),
hash_verification_failures: AtomicU64::new(0),
}
}

fn pin_inner(&self) -> Pin<&dyn StoreTrait> {
Pin::new(self.inner_store.as_ref())
}
}

async fn inner_check_update(
mut tx: DropCloserWriteHalf,
mut rx: DropCloserReadHalf,
size_info: UploadSizeInfo,
mut maybe_hasher: Option<([u8; 32], Sha256)>,
) -> Result<(), Error> {
let mut sum_size: u64 = 0;
loop {
let chunk = rx
.recv()
.await
.err_tip(|| "Failed to reach chunk in check_update in verify store")?;
sum_size += chunk.len() as u64;

if chunk.is_empty() {
// Is EOF.
if let UploadSizeInfo::ExactSize(expected_size) = size_info {
error_if!(
sum_size != expected_size as u64,
"Expected size {} but got size {} on insert",
expected_size,
sum_size
);
}
if let Some((original_hash, hasher)) = maybe_hasher {
let hash_result: [u8; 32] = hasher.finalize().into();
error_if!(
original_hash != hash_result,
"Hashes do not match, got: {} but digest hash was {}",
hex::encode(original_hash),
hex::encode(hash_result),
);
async fn inner_check_update(
&self,
mut tx: DropCloserWriteHalf,
mut rx: DropCloserReadHalf,
size_info: UploadSizeInfo,
mut maybe_hasher: Option<([u8; 32], Sha256)>,
) -> Result<(), Error> {
let mut sum_size: u64 = 0;
loop {
let chunk = rx
.recv()
.await
.err_tip(|| "Failed to reach chunk in check_update in verify store")?;
sum_size += chunk.len() as u64;

if chunk.is_empty() {
// Is EOF.
if let UploadSizeInfo::ExactSize(expected_size) = size_info {
if sum_size != expected_size as u64 {
self.size_verification_failures.fetch_add(1, Ordering::Relaxed);
return Err(make_input_err!(
"Expected size {} but got size {} on insert",
expected_size,
sum_size
));
}
}
if let Some((original_hash, hasher)) = maybe_hasher {
let hash_result: [u8; 32] = hasher.finalize().into();
if original_hash != hash_result {
self.hash_verification_failures.fetch_add(1, Ordering::Relaxed);
return Err(make_input_err!(
"Hashes do not match, got: {} but digest hash was {}",
hex::encode(original_hash),
hex::encode(hash_result),
));
}
}
tx.send_eof().await.err_tip(|| "In verify_store::check_update")?;
break;
}
tx.send_eof().await.err_tip(|| "In verify_store::check_update")?;
break;
}

// This will allows us to hash while sending data to another thread.
let write_future = tx.send(chunk.clone());
// This will allows us to hash while sending data to another thread.
let write_future = tx.send(chunk.clone());

if let Some((_, hasher)) = maybe_hasher.as_mut() {
hasher.update(chunk.as_ref());
}
if let Some((_, hasher)) = maybe_hasher.as_mut() {
hasher.update(chunk.as_ref());
}

write_future
.await
.err_tip(|| "Failed to write chunk to inner store in verify store")?;
write_future
.await
.err_tip(|| "Failed to write chunk to inner store in verify store")?;
}
Ok(())
}
Ok(())
}

#[async_trait]
Expand All @@ -114,12 +127,14 @@ impl StoreTrait for VerifyStore {
let digest_size =
usize::try_from(digest.size_bytes).err_tip(|| "Digest size_bytes was not convertible to usize")?;
if let UploadSizeInfo::ExactSize(expected_size) = size_info {
error_if!(
self.verify_size && expected_size != digest_size,
"Expected size to match. Got {} but digest says {} on update",
expected_size,
digest.size_bytes
);
if self.verify_size && expected_size != digest_size {
self.size_verification_failures.fetch_add(1, Ordering::Relaxed);
return Err(make_input_err!(
"Expected size to match. Got {} but digest says {} on update",
expected_size,
digest.size_bytes
));
}
}

let mut hasher = None;
Expand All @@ -130,7 +145,7 @@ impl StoreTrait for VerifyStore {
let (tx, rx) = make_buf_channel_pair();

let update_fut = self.pin_inner().update(digest, rx, size_info);
let check_fut = inner_check_update(tx, reader, size_info, hasher);
let check_fut = self.inner_check_update(tx, reader, size_info, hasher);

let (update_res, check_res) = tokio::join!(update_fut, check_fut);

Expand All @@ -150,4 +165,33 @@ impl StoreTrait for VerifyStore {
fn as_any(self: Arc<Self>) -> Box<dyn std::any::Any + Send> {
Box::new(self)
}

fn register_metrics(self: Arc<Self>, registry: &mut Registry) {
registry.register_collector(Box::new(Collector::new(&self)));
}
}

impl MetricsComponent for VerifyStore {
fn gather_metrics(&self, collector: &mut CollectorState) {
collector.publish(
"verify_size",
self.verify_size,
"If the verification store is verifying the size of the data",
);
collector.publish(
"verify_hash",
self.verify_hash,
"If the verification store is verifying the hash of the data",
);
collector.publish(
"size_verification_failures",
self.size_verification_failures.load(Ordering::Relaxed),
"Number of failures the verification store had due to size mismatches",
);
collector.publish(
"hash_verification_failures",
self.hash_verification_failures.load(Ordering::Relaxed),
"Number of failures the verification store had due to hash mismatches",
);
}
}
4 changes: 2 additions & 2 deletions util/prometheus_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ macro_rules! impl_numerical {
}

// Regsiter all the numerical types to be converted into Numerical.
impl_numerical!(u8, u16, u32, u64, usize, i8, i16, i32, i64, isize);
impl_numerical!(u8, bool, u16, u32, u64, usize, i8, i16, i32, i64, isize);

macro_rules! impl_numerical_metric {
($u:ty,$($t:ty),*) => {
Expand All @@ -222,7 +222,7 @@ macro_rules! impl_numerical_metric {
};
}
// Implement metrics for all the numerical integer types by trying to cast it to i64.
impl_numerical_metric!(i64, u8, u16, u32, u64, usize, i8, i16, i32, i64, isize);
impl_numerical_metric!(i64, bool, u8, u16, u32, u64, usize, i8, i16, i32, i64, isize);

// Implement metrics for all float types by trying to cast it to f64.
impl_numerical_metric!(f64, f64, f32);

0 comments on commit 5f5b2c4

Please sign in to comment.