Skip to content

Commit

Permalink
Add DigestHasher as interface to hashing functions
Browse files Browse the repository at this point in the history
Adds DigestHasher and hashing implementation for Sha256. This will
be used to abstract the different hashing algos when computing
digests.

towards: #191
  • Loading branch information
allada committed Nov 19, 2023
1 parent 9543964 commit 55c2e4b
Show file tree
Hide file tree
Showing 12 changed files with 306 additions and 77 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ members = [
"gencargo/dedup_store_test",
"gencargo/default_scheduler_factory",
"gencargo/default_store_factory",
"gencargo/digest_hasher",
"gencargo/error",
"gencargo/evicting_map",
"gencargo/evicting_map_test",
Expand Down Expand Up @@ -184,6 +185,7 @@ dedup_store = { path = "gencargo/dedup_store" }
dedup_store_test = { path = "gencargo/dedup_store_test" }
default_scheduler_factory = { path = "gencargo/default_scheduler_factory" }
default_store_factory = { path = "gencargo/default_store_factory" }
digest_hasher = { path = "gencargo/digest_hasher" }
error = { path = "gencargo/error" }
evicting_map = { path = "gencargo/evicting_map" }
evicting_map_test = { path = "gencargo/evicting_map_test" }
Expand Down
2 changes: 1 addition & 1 deletion cas/store/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,12 @@ rust_library(
deps = [
":store",
"//util:buf_channel",
"//util:digest_hasher",
"//util:common",
"//util:error",
"@crate_index//:bytes",
"@crate_index//:futures",
"@crate_index//:prost",
"@crate_index//:sha2",
"@crate_index//:tokio",
],
)
Expand Down
42 changes: 25 additions & 17 deletions cas/store/ac_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ use std::pin::Pin;
use bytes::{Bytes, BytesMut};
use futures::{future::join, Future, FutureExt};
use prost::Message;
use sha2::{Digest, Sha256};
use tokio::io::{AsyncRead, AsyncReadExt};

use buf_channel::{make_buf_channel_pair, DropCloserWriteHalf};
use common::{fs, DigestInfo};
use digest_hasher::DigestHasher;
use error::{Code, Error, ResultExt};
use store::{Store, UploadSizeInfo};

Expand Down Expand Up @@ -59,37 +59,48 @@ pub async fn get_and_decode_digest<T: Message + Default>(
T::decode(store_data).err_tip_with_code(|e| (Code::NotFound, format!("Stored value appears to be corrupt: {}", e)))
}

pub fn message_to_digest(message: &impl Message, buf: &mut BytesMut) -> Result<DigestInfo, Error> {
let mut hasher = Sha256::new();
message.encode(buf).err_tip(|| "Could not encode directory proto")?;
hasher.update(&buf[..]);
Ok(DigestInfo::new(hasher.finalize().into(), i64::try_from(buf.len())?))
/// Computes the digest of a message.
pub fn message_to_digest<'a>(
message: &impl Message,
mut buf: &mut BytesMut,
hasher: impl Into<&'a mut DigestHasher>,
) -> Result<DigestInfo, Error> {
let hasher = hasher.into();
message
.encode(&mut buf)
.err_tip(|| "Could not encode directory proto")?;
hasher.update(buf);
Ok(hasher.finalize_digest(i64::try_from(buf.len())?))
}

/// Takes a proto message and will serialize it and upload it to the provided store.
pub async fn serialize_and_upload_message<'a, T: Message>(
message: &'a T,
cas_store: Pin<&'a dyn Store>,
hasher: impl Into<&mut DigestHasher>,
) -> Result<DigestInfo, Error> {
let mut buffer = BytesMut::with_capacity(message.encoded_len());
let digest = message_to_digest(message, &mut buffer).err_tip(|| "In serialize_and_upload_message")?;
let digest = message_to_digest(message, &mut buffer, hasher).err_tip(|| "In serialize_and_upload_message")?;
upload_buf_to_store(cas_store, digest, buffer.freeze())
.await
.err_tip(|| "In serialize_and_upload_message")?;
Ok(digest)
}

pub async fn compute_buf_digest(buf: &[u8]) -> Result<DigestInfo, Error> {
let mut hasher = Sha256::new();
/// Computes a digest of a given buffer.
pub async fn compute_buf_digest(buf: &[u8], hasher: impl Into<&mut DigestHasher>) -> Result<DigestInfo, Error> {
let hasher = hasher.into();
hasher.update(buf);
Ok(DigestInfo::new(hasher.finalize().into(), i64::try_from(buf.len())?))
Ok(hasher.finalize_digest(i64::try_from(buf.len())?))
}

/// Given a bytestream computes the digest for the data.
/// Note: This will happen in a new spawn since computing digests can be thread intensive.
pub async fn compute_digest<R: AsyncRead + Unpin + Send>(mut reader: R) -> Result<(DigestInfo, R), Error> {
pub async fn compute_digest<R: AsyncRead + Unpin + Send>(
mut reader: R,
hasher: impl Into<&mut DigestHasher>,
) -> Result<(DigestInfo, R), Error> {
let mut chunk = BytesMut::with_capacity(DEFAULT_READ_BUFF_SIZE);
let mut hasher = Sha256::new();
let hasher = hasher.into();
let mut digest_size = 0;
loop {
reader
Expand All @@ -104,10 +115,7 @@ pub async fn compute_digest<R: AsyncRead + Unpin + Send>(mut reader: R) -> Resul
chunk.clear();
}

Ok((
DigestInfo::new(hasher.finalize().into(), i64::try_from(digest_size)?),
reader,
))
Ok((hasher.finalize_digest(i64::try_from(digest_size)?), reader))
}

fn inner_upload_file_to_store<'a, Fut: Future<Output = Result<(), Error>> + 'a>(
Expand Down
4 changes: 3 additions & 1 deletion cas/worker/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ rust_library(
"//config",
"//proto",
"//util:common",
"//util:digest_hasher",
"//util:error",
"//util:metrics_utils",
"@crate_index//:bytes",
Expand All @@ -51,8 +52,8 @@ rust_library(
"@crate_index//:relative-path",
"@crate_index//:scopeguard",
"@crate_index//:serde",
"@crate_index//:tokio",
"@crate_index//:tokio-stream",
"@crate_index//:tokio",
"@crate_index//:tonic",
"@crate_index//:uuid",
],
Expand Down Expand Up @@ -145,6 +146,7 @@ rust_test(
"//config",
"//proto",
"//util:common",
"//util:digest_hasher",
"//util:error",
"@crate_index//:futures",
"@crate_index//:once_cell",
Expand Down
31 changes: 22 additions & 9 deletions cas/worker/running_actions_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ use action_messages::{
use async_trait::async_trait;
use common::{fs, log, DigestInfo, JoinHandleDropGuard};
use config::cas_server::{EnvironmentSource, UploadActionResultConfig, UploadCacheResultsStrategy};
use digest_hasher::DigestHasherFunc;
use error::{make_err, make_input_err, Code, Error, ResultExt};
use fast_slow_store::FastSlowStore;
use filesystem_store::{FileEntry, FilesystemStore};
Expand Down Expand Up @@ -239,7 +240,9 @@ async fn upload_file(
.as_reader()
.await
.err_tip(|| "Could not get reader from file slot in RunningActionsManager::upload_file()")?;
let digest = compute_digest(file_handle).await?.0;
let digest = compute_digest(file_handle, &mut DigestHasherFunc::Sha256.into())
.await?
.0;
Ok::<_, Error>((digest, resumeable_file))
}))
.await
Expand Down Expand Up @@ -362,9 +365,10 @@ fn upload_directory<'a, P: AsRef<Path> + Debug + Send + Sync + Clone + 'a>(
})?
.to_string();

let digest = serialize_and_upload_message(&dir, cas_store)
.await
.err_tip(|| format!("for {full_path:?}"))?;
let digest =
serialize_and_upload_message(&dir, cas_store, &mut DigestHasherFunc::Sha256.into())
.await
.err_tip(|| format!("for {full_path:?}"))?;

Result::<(DirectoryNode, VecDeque<Directory>), Error>::Ok((
DirectoryNode {
Expand Down Expand Up @@ -967,9 +971,13 @@ impl RunningActionImpl {
root: Some(root_dir),
children: children.into(),
};
let tree_digest = serialize_and_upload_message(&tree, cas_store)
.await
.err_tip(|| format!("While processing {entry}"))?;
let tree_digest = serialize_and_upload_message(
&tree,
cas_store,
&mut DigestHasherFunc::Sha256.into(),
)
.await
.err_tip(|| format!("While processing {entry}"))?;
Ok(DirectoryInfo {
path: entry,
tree_digest,
Expand Down Expand Up @@ -1028,15 +1036,19 @@ impl RunningActionImpl {

let stdout_digest_fut = self.metrics().upload_stdout.wrap(async {
let data = execution_result.stdout;
let digest = compute_buf_digest(&data).await.err_tip(|| "Computing stdout digest")?;
let digest = compute_buf_digest(&data, &mut DigestHasherFunc::Sha256.into())
.await
.err_tip(|| "Computing stdout digest")?;
upload_buf_to_store(cas_store, digest, data)
.await
.err_tip(|| "Uploading stdout")?;
Result::<DigestInfo, Error>::Ok(digest)
});
let stderr_digest_fut = self.metrics().upload_stderr.wrap(async {
let data = execution_result.stderr;
let digest = compute_buf_digest(&data).await.err_tip(|| "Computing stderr digest")?;
let digest = compute_buf_digest(&data, &mut DigestHasherFunc::Sha256.into())
.await
.err_tip(|| "Computing stderr digest")?;
upload_buf_to_store(cas_store, digest, data)
.await
.err_tip(|| "Uploading stderr")?;
Expand Down Expand Up @@ -1348,6 +1360,7 @@ impl UploadActionResults {
execute_response: Some(execute_response.clone()),
},
Pin::new(self.historical_store.as_ref()),
&mut DigestHasherFunc::Sha256.into(),
)
.await
.err_tip(|| format!("Caching HistoricalExecuteResponse for digest: {action_digest:?}"))?;
Expand Down
Loading

0 comments on commit 55c2e4b

Please sign in to comment.