From 5763385605c6213bde62f634e59dbbe38eb8129a Mon Sep 17 00:00:00 2001 From: "Nathan (Blaise) Bruer" Date: Mon, 3 Jun 2024 22:09:58 -0500 Subject: [PATCH] Refactor Store API to use StoreKey This enables us to use arbitraty strings when using the Store API. closes #934 --- Cargo.lock | 2 + .../src/cache_lookup_scheduler.rs | 4 +- nativelink-service/src/ac_server.rs | 12 +- nativelink-service/src/bytestream_server.rs | 6 +- nativelink-service/src/cas_server.rs | 4 +- nativelink-service/src/execution_server.rs | 5 +- nativelink-service/tests/ac_server_test.rs | 4 +- .../tests/bytestream_server_test.rs | 3 +- nativelink-service/tests/cas_server_test.rs | 7 +- nativelink-store/Cargo.toml | 1 + nativelink-store/src/ac_utils.rs | 22 +- nativelink-store/src/cas_utils.rs | 8 +- .../src/completeness_checking_store.rs | 43 +-- nativelink-store/src/compression_store.rs | 19 +- nativelink-store/src/dedup_store.rs | 30 +- nativelink-store/src/existence_cache_store.rs | 31 +- nativelink-store/src/fast_slow_store.rs | 64 ++--- nativelink-store/src/filesystem_store.rs | 28 +- nativelink-store/src/grpc_store.rs | 33 ++- nativelink-store/src/memory_store.rs | 43 ++- nativelink-store/src/noop_store.rs | 11 +- nativelink-store/src/redis_store.rs | 38 ++- nativelink-store/src/ref_store.rs | 26 +- nativelink-store/src/s3_store.rs | 41 ++- nativelink-store/src/shard_store.rs | 154 +++++----- .../src/size_partitioning_store.rs | 55 +++- nativelink-store/src/verify_store.rs | 24 +- .../tests/completeness_checking_store_test.rs | 42 ++- nativelink-store/tests/dedup_store_test.rs | 6 +- .../tests/fast_slow_store_test.rs | 14 +- .../tests/filesystem_store_test.rs | 4 +- nativelink-store/tests/memory_store_test.rs | 4 +- nativelink-store/tests/ref_store_test.rs | 3 +- nativelink-store/tests/s3_store_test.rs | 4 +- nativelink-store/tests/shard_store_test.rs | 10 +- nativelink-util/Cargo.toml | 1 + nativelink-util/src/evicting_map.rs | 42 ++- nativelink-util/src/store_trait.rs | 265 +++++++++++++----- .../src/running_actions_manager.rs | 18 +- .../tests/running_actions_manager_test.rs | 20 +- 40 files changed, 699 insertions(+), 452 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 406313ffca..16b2948eb8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1900,6 +1900,7 @@ dependencies = [ "nativelink-util", "once_cell", "parking_lot", + "pin-project-lite", "pretty_assertions", "prost", "rand", @@ -1928,6 +1929,7 @@ dependencies = [ "bytes", "console-subscriber", "futures", + "hashbrown 0.14.5", "hex", "hyper", "lru", diff --git a/nativelink-scheduler/src/cache_lookup_scheduler.rs b/nativelink-scheduler/src/cache_lookup_scheduler.rs index c5856f55c2..91f081cb08 100644 --- a/nativelink-scheduler/src/cache_lookup_scheduler.rs +++ b/nativelink-scheduler/src/cache_lookup_scheduler.rs @@ -64,7 +64,7 @@ async fn get_action_from_store( digest_function: DigestHasherFunc, ) -> Option { // If we are a GrpcStore we shortcut here, as this is a special store. - if let Some(grpc_store) = ac_store.downcast_ref::(Some(action_digest)) { + if let Some(grpc_store) = ac_store.downcast_ref::(Some(action_digest.into())) { let action_result_request = GetActionResultRequest { instance_name, action_digest: Some(action_digest.into()), @@ -79,7 +79,7 @@ async fn get_action_from_store( .map(|response| response.into_inner()) .ok() } else { - get_and_decode_digest::(ac_store, &action_digest) + get_and_decode_digest::(ac_store, action_digest.into()) .await .ok() } diff --git a/nativelink-service/src/ac_server.rs b/nativelink-service/src/ac_server.rs index 74897f5144..bb85eb7ee3 100644 --- a/nativelink-service/src/ac_server.rs +++ b/nativelink-service/src/ac_server.rs @@ -95,12 +95,15 @@ impl AcServer { .try_into()?; // If we are a GrpcStore we shortcut here, as this is a special store. - if let Some(grpc_store) = store_info.store.downcast_ref::(Some(digest)) { + if let Some(grpc_store) = store_info + .store + .downcast_ref::(Some(digest.into())) + { return grpc_store.get_action_result(Request::new(request)).await; } Ok(Response::new( - get_and_decode_digest::(&store_info.store, &digest).await?, + get_and_decode_digest::(&store_info.store, digest.into()).await?, )) } @@ -128,7 +131,10 @@ impl AcServer { .try_into()?; // If we are a GrpcStore we shortcut here, as this is a special store. - if let Some(grpc_store) = store_info.store.downcast_ref::(Some(digest)) { + if let Some(grpc_store) = store_info + .store + .downcast_ref::(Some(digest.into())) + { return grpc_store.update_action_result(Request::new(request)).await; } diff --git a/nativelink-service/src/bytestream_server.rs b/nativelink-service/src/bytestream_server.rs index 4de21704d1..043b0238b9 100644 --- a/nativelink-service/src/bytestream_server.rs +++ b/nativelink-service/src/bytestream_server.rs @@ -520,7 +520,7 @@ impl ByteStreamServer { let digest = DigestInfo::try_new(resource_info.hash.as_ref(), resource_info.expected_size)?; // If we are a GrpcStore we shortcut here, as this is a special store. - if let Some(grpc_store) = store_clone.downcast_ref::(Some(digest)) { + if let Some(grpc_store) = store_clone.downcast_ref::(Some(digest.into())) { return grpc_store .query_write_status(Request::new(query_request.clone())) .await; @@ -582,7 +582,7 @@ impl ByteStream for ByteStreamServer { let digest = DigestInfo::try_new(resource_info.hash.as_ref(), resource_info.expected_size)?; // If we are a GrpcStore we shortcut here, as this is a special store. - if let Some(grpc_store) = store.downcast_ref::(Some(digest)) { + if let Some(grpc_store) = store.downcast_ref::(Some(digest.into())) { let stream = grpc_store.read(Request::new(read_request)).await?; return Ok(Response::new(Box::pin(stream))); } @@ -638,7 +638,7 @@ impl ByteStream for ByteStreamServer { .err_tip(|| "Invalid digest input in ByteStream::write")?; // If we are a GrpcStore we shortcut here, as this is a special store. - if let Some(grpc_store) = store.downcast_ref::(Some(digest)) { + if let Some(grpc_store) = store.downcast_ref::(Some(digest.into())) { return grpc_store.write(stream).await.map_err(|e| e.into()); } diff --git a/nativelink-service/src/cas_server.rs b/nativelink-service/src/cas_server.rs index 26d2431bbb..d138cc39cf 100644 --- a/nativelink-service/src/cas_server.rs +++ b/nativelink-service/src/cas_server.rs @@ -76,7 +76,7 @@ impl CasServer { let mut requested_blobs = Vec::with_capacity(request.blob_digests.len()); for digest in request.blob_digests.iter() { - requested_blobs.push(DigestInfo::try_from(digest.clone())?); + requested_blobs.push(DigestInfo::try_from(digest.clone())?.into()); } let sizes = store .has_many(&requested_blobs) @@ -255,7 +255,7 @@ impl CasServer { while !deque.is_empty() { let digest: DigestInfo = deque.pop_front().err_tip(|| "In VecDeque::pop_front")?; - let directory = get_and_decode_digest::(&store, &digest) + let directory = get_and_decode_digest::(&store, digest.into()) .await .err_tip(|| "Converting digest to Directory")?; if digest == page_token_digest { diff --git a/nativelink-service/src/execution_server.rs b/nativelink-service/src/execution_server.rs index cc5f23481b..ebc9cbb19c 100644 --- a/nativelink-service/src/execution_server.rs +++ b/nativelink-service/src/execution_server.rs @@ -95,7 +95,7 @@ impl InstanceInfo { // Goma puts the properties in the Command. if platform_properties.is_empty() { let command = - get_and_decode_digest::(&self.cas_store, &command_digest).await?; + get_and_decode_digest::(&self.cas_store, command_digest.into()).await?; if let Some(platform) = &command.platform { for property in &platform.properties { let platform_property = self @@ -209,7 +209,8 @@ impl ExecutionServer { .execution_policy .map_or(DEFAULT_EXECUTION_PRIORITY, |p| p.priority); - let action = get_and_decode_digest::(&instance_info.cas_store, &digest).await?; + let action = + get_and_decode_digest::(&instance_info.cas_store, digest.into()).await?; let action_info = instance_info .build_action_info( instance_name, diff --git a/nativelink-service/tests/ac_server_test.rs b/nativelink-service/tests/ac_server_test.rs index 29c719e1c5..0c3961fdbd 100644 --- a/nativelink-service/tests/ac_server_test.rs +++ b/nativelink-service/tests/ac_server_test.rs @@ -129,7 +129,7 @@ mod get_action_result { assert_eq!(err.code(), Code::NotFound); assert_eq!( err.message(), - "Hash 0123456789abcdef000000000000000000000000000000000123456789abcdef not found" + "Key Digest(DigestInfo { size_bytes: 0, hash: \"0123456789abcdef000000000000000000000000000000000123456789abcdef\" }) not found" ); Ok(()) } @@ -174,7 +174,7 @@ mod get_action_result { assert_eq!(err.code(), Code::NotFound); assert_eq!( err.message(), - "Hash 0123456789abcdef000000000000000000000000000000000123456789abcdef not found" + "Key Digest(DigestInfo { size_bytes: 146, hash: \"0123456789abcdef000000000000000000000000000000000123456789abcdef\" }) not found" ); Ok(()) } diff --git a/nativelink-service/tests/bytestream_server_test.rs b/nativelink-service/tests/bytestream_server_test.rs index efae5cdee1..3fa69eed0e 100644 --- a/nativelink-service/tests/bytestream_server_test.rs +++ b/nativelink-service/tests/bytestream_server_test.rs @@ -954,8 +954,7 @@ pub mod read_tests { let result = result_fut.await.err_tip(|| "Expected result to be ready")?; let expected_err_str = concat!( - "status: NotFound, message: \"Hash 0123456789abcdef000000000000000000000000000000000123456789abcdef ", - "not found\", details: [], metadata: MetadataMap { headers: {} }", + "status: NotFound, message: \"Key Digest(DigestInfo { size_bytes: 55, hash: \\\"0123456789abcdef000000000000000000000000000000000123456789abcdef\\\" }) not found\", details: [], metadata: MetadataMap { headers: {} }", ); assert_eq!( Error::from(result.unwrap_err()), diff --git a/nativelink-service/tests/cas_server_test.rs b/nativelink-service/tests/cas_server_test.rs index 439ed90627..1a9b7f4cbb 100644 --- a/nativelink-service/tests/cas_server_test.rs +++ b/nativelink-service/tests/cas_server_test.rs @@ -26,7 +26,7 @@ use nativelink_store::ac_utils::serialize_and_upload_message; use nativelink_store::default_store_factory::store_factory; use nativelink_store::store_manager::StoreManager; use nativelink_util::common::DigestInfo; -use nativelink_util::store_trait::StoreLike; +use nativelink_util::store_trait::{StoreKey, StoreLike}; use prometheus_client::registry::Registry; use tonic::Request; @@ -309,7 +309,10 @@ mod batch_read_blobs { data: vec![].into(), status: Some(GrpcStatus { code: Code::NotFound as i32, - message: format!("Hash {} not found", digest3.hash), + message: format!( + "Key {:?} not found", + StoreKey::from(DigestInfo::try_from(digest3)?) + ), details: vec![], }), compressor: compressor::Value::Identity.into(), diff --git a/nativelink-store/Cargo.toml b/nativelink-store/Cargo.toml index 7bd41effe0..e00c2d6f5e 100644 --- a/nativelink-store/Cargo.toml +++ b/nativelink-store/Cargo.toml @@ -16,6 +16,7 @@ aws-config = "1.4.0" aws-sdk-s3 = { version = "1.28.0" } aws-smithy-runtime = { version = "1.5.0" } bincode = "1.3.3" +pin-project-lite = "0.2.14" blake3 = "1.5.1" byteorder = "1.5.0" bytes = "1.6.0" diff --git a/nativelink-store/src/ac_utils.rs b/nativelink-store/src/ac_utils.rs index 2f8049a231..e4c468b905 100644 --- a/nativelink-store/src/ac_utils.rs +++ b/nativelink-store/src/ac_utils.rs @@ -24,7 +24,7 @@ use futures::TryFutureExt; use nativelink_error::{Code, Error, ResultExt}; use nativelink_util::common::DigestInfo; use nativelink_util::digest_hasher::DigestHasher; -use nativelink_util::store_trait::StoreLike; +use nativelink_util::store_trait::{StoreKey, StoreLike}; use prost::Message; // NOTE(blaise.bruer) From some local testing it looks like action cache items are rarely greater than @@ -37,22 +37,27 @@ pub const ESTIMATED_DIGEST_SIZE: usize = 2048; const MAX_ACTION_MSG_SIZE: usize = 10 << 20; // 10mb. /// Attempts to fetch the digest contents from a store into the associated proto. -pub async fn get_and_decode_digest<'a, T: Message + Default>( +pub async fn get_and_decode_digest( store: &impl StoreLike, - digest: &DigestInfo, + key: StoreKey<'_>, ) -> Result { - get_size_and_decode_digest(store, digest) + get_size_and_decode_digest(store, key) .map_ok(|(v, _)| v) .await } /// Attempts to fetch the digest contents from a store into the associated proto. -pub async fn get_size_and_decode_digest<'a, T: Message + Default>( +pub async fn get_size_and_decode_digest( store: &impl StoreLike, - digest: &DigestInfo, + key: impl Into>, ) -> Result<(T, usize), Error> { + // Note: For unknown reasons we appear to be hitting: + // https://github.com/rust-lang/rust/issues/92096 + // or a smiliar issue if we try to use the non-store driver function, so we + // are using the store driver function here. let mut store_data_resp = store - .get_part_unchunked(*digest, 0, Some(MAX_ACTION_MSG_SIZE)) + .as_store_driver_pin() + .get_part_unchunked(key.into(), 0, Some(MAX_ACTION_MSG_SIZE)) .await; if let Err(err) = &mut store_data_resp { if err.code == Code::NotFound { @@ -98,7 +103,8 @@ pub async fn serialize_and_upload_message<'a, T: Message>( let digest = message_to_digest(message, &mut buffer, hasher) .err_tip(|| "In serialize_and_upload_message")?; cas_store - .update_oneshot(digest, buffer.freeze()) + .as_store_driver_pin() + .update_oneshot(digest.into(), buffer.freeze()) .await .err_tip(|| "In serialize_and_upload_message")?; Ok(digest) diff --git a/nativelink-store/src/cas_utils.rs b/nativelink-store/src/cas_utils.rs index 6ddf673302..91e85f5ca7 100644 --- a/nativelink-store/src/cas_utils.rs +++ b/nativelink-store/src/cas_utils.rs @@ -13,6 +13,7 @@ // limitations under the License. use nativelink_util::common::DigestInfo; +use nativelink_util::store_trait::StoreKey; pub const ZERO_BYTE_DIGESTS: [DigestInfo; 2] = [ // Sha256 hash of zero bytes. @@ -36,6 +37,9 @@ pub const ZERO_BYTE_DIGESTS: [DigestInfo; 2] = [ ]; #[inline] -pub fn is_zero_digest(digest: &DigestInfo) -> bool { - digest.size_bytes == 0 && ZERO_BYTE_DIGESTS.contains(digest) +pub fn is_zero_digest<'a>(digest: impl Into>) -> bool { + match digest.into() { + StoreKey::Digest(digest) => digest.size_bytes == 0 && ZERO_BYTE_DIGESTS.contains(&digest), + _ => false, + } } diff --git a/nativelink-store/src/completeness_checking_store.rs b/nativelink-store/src/completeness_checking_store.rs index 12de40079e..277f8bd256 100644 --- a/nativelink-store/src/completeness_checking_store.rs +++ b/nativelink-store/src/completeness_checking_store.rs @@ -29,7 +29,7 @@ use nativelink_util::health_utils::{default_health_status_indicator, HealthStatu use nativelink_util::metrics_utils::{ Collector, CollectorState, CounterWithTime, MetricsComponent, Registry, }; -use nativelink_util::store_trait::{Store, StoreDriver, StoreLike, UploadSizeInfo}; +use nativelink_util::store_trait::{Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo}; use parking_lot::Mutex; use tokio::sync::Notify; use tracing::{event, Level}; @@ -40,7 +40,7 @@ use crate::ac_utils::{get_and_decode_digest, get_size_and_decode_digest}; /// output directories that need to be checked. fn get_digests_and_output_dirs( action_result: ProtoActionResult, -) -> Result<(Vec, Vec), Error> { +) -> Result<(Vec>, Vec), Error> { // TODO(allada) When `try_collect()` is stable we can use it instead. let mut digest_iter = action_result .output_files @@ -51,7 +51,7 @@ fn get_digests_and_output_dirs( let mut digest_infos = Vec::with_capacity(digest_iter.size_hint().1.unwrap_or(0)); digest_iter .try_for_each(|maybe_digest| { - digest_infos.push(maybe_digest?); + digest_infos.push(maybe_digest?.into()); Result::<_, Error>::Ok(()) }) .err_tip(|| "Some digests could not be converted to DigestInfos")?; @@ -61,10 +61,10 @@ fn get_digests_and_output_dirs( /// Given a list of output directories recursively get all digests /// that need to be checked and pass them into `handle_digest_infos_fn` /// as they are found. -async fn check_output_directories( +async fn check_output_directories<'a>( cas_store: &Store, output_directories: Vec, - handle_digest_infos_fn: &impl Fn(Vec), + handle_digest_infos_fn: &impl Fn(Vec>), ) -> Result<(), Error> { let mut futures = FuturesUnordered::new(); @@ -75,7 +75,7 @@ async fn check_output_directories( let tree_digest = maybe_tree_digest .err_tip(|| "Could not decode tree digest CompletenessCheckingStore::has")?; futures.push(async move { - let tree = get_and_decode_digest::(cas_store, &tree_digest).await?; + let tree = get_and_decode_digest::(cas_store, tree_digest.into()).await?; // TODO(allada) When `try_collect()` is stable we can use it instead. // https://github.com/rust-lang/rust/issues/94047 let mut digest_iter = tree.children.into_iter().chain(tree.root).flat_map(|dir| { @@ -87,7 +87,7 @@ async fn check_output_directories( let mut digest_infos = Vec::with_capacity(digest_iter.size_hint().1.unwrap_or(0)); digest_iter .try_for_each(|maybe_digest| { - digest_infos.push(maybe_digest?); + digest_infos.push(maybe_digest?.into()); Result::<_, Error>::Ok(()) }) .err_tip(|| "Expected digest to exist and be convertable")?; @@ -129,14 +129,14 @@ impl CompletenessCheckingStore { /// are polled concurrently. async fn inner_has_with_results( &self, - action_result_digests: &[DigestInfo], + action_result_digests: &[StoreKey<'_>], results: &mut [Option], ) -> Result<(), Error> { // Holds shared state between the different futures. // This is how get around lifetime issues. struct State<'a> { results: &'a mut [Option], - digests_to_check: Vec, + digests_to_check: Vec>, digests_to_check_idxs: Vec, notify: Arc, done: bool, @@ -161,9 +161,11 @@ impl CompletenessCheckingStore { .map(|(i, digest)| { async move { // Note: We don't err_tip here because often have NotFound here which is ok. - let (action_result, size) = - get_size_and_decode_digest::(&self.ac_store, digest) - .await?; + let (action_result, size) = get_size_and_decode_digest::( + &self.ac_store, + digest.borrow(), + ) + .await?; let (mut digest_infos, output_directories) = get_digests_and_output_dirs(action_result)?; @@ -337,31 +339,30 @@ impl CompletenessCheckingStore { impl StoreDriver for CompletenessCheckingStore { async fn has_with_results( self: Pin<&Self>, - action_result_digests: &[DigestInfo], + keys: &[StoreKey<'_>], results: &mut [Option], ) -> Result<(), Error> { - self.inner_has_with_results(action_result_digests, results) - .await + self.inner_has_with_results(keys, results).await } async fn update( self: Pin<&Self>, - digest: DigestInfo, + key: StoreKey<'_>, reader: DropCloserReadHalf, size_info: UploadSizeInfo, ) -> Result<(), Error> { - self.ac_store.update(digest, reader, size_info).await + self.ac_store.update(key, reader, size_info).await } async fn get_part( self: Pin<&Self>, - digest: DigestInfo, + key: StoreKey<'_>, writer: &mut DropCloserWriteHalf, offset: usize, length: Option, ) -> Result<(), Error> { let results = &mut [None]; - self.inner_has_with_results(&[digest], results) + self.inner_has_with_results(&[key.borrow()], results) .await .err_tip(|| "when calling CompletenessCheckingStore::get_part")?; if results[0].is_none() { @@ -370,10 +371,10 @@ impl StoreDriver for CompletenessCheckingStore { "Digest found, but not all parts were found in CompletenessCheckingStore::get_part" )); } - self.ac_store.get_part(digest, writer, offset, length).await + self.ac_store.get_part(key, writer, offset, length).await } - fn inner_store(&self, _digest: Option) -> &dyn StoreDriver { + fn inner_store(&self, _digest: Option) -> &dyn StoreDriver { self } diff --git a/nativelink-store/src/compression_store.rs b/nativelink-store/src/compression_store.rs index fc204df4d0..a405e2feb3 100644 --- a/nativelink-store/src/compression_store.rs +++ b/nativelink-store/src/compression_store.rs @@ -27,11 +27,10 @@ use nativelink_error::{error_if, make_err, Code, Error, ResultExt}; use nativelink_util::buf_channel::{ make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf, }; -use nativelink_util::common::DigestInfo; use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; use nativelink_util::metrics_utils::Registry; use nativelink_util::spawn; -use nativelink_util::store_trait::{Store, StoreDriver, StoreLike, UploadSizeInfo}; +use nativelink_util::store_trait::{Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo}; use serde::{Deserialize, Serialize}; use crate::cas_utils::is_zero_digest; @@ -244,7 +243,7 @@ impl CompressionStore { impl StoreDriver for CompressionStore { async fn has_with_results( self: Pin<&Self>, - digests: &[DigestInfo], + digests: &[StoreKey<'_>], results: &mut [Option], ) -> Result<(), Error> { self.inner_store.has_with_results(digests, results).await @@ -252,7 +251,7 @@ impl StoreDriver for CompressionStore { async fn update( self: Pin<&Self>, - digest: DigestInfo, + key: StoreKey<'_>, mut reader: DropCloserReadHalf, upload_size: UploadSizeInfo, ) -> Result<(), Error> { @@ -261,10 +260,11 @@ impl StoreDriver for CompressionStore { let (mut tx, rx) = make_buf_channel_pair(); let inner_store = self.inner_store.clone(); + let key = key.into_owned(); let update_fut = spawn!("compression_store_update_spawn", async move { inner_store .update( - digest, + key, rx, UploadSizeInfo::MaxSize(output_state.max_output_size), ) @@ -388,12 +388,12 @@ impl StoreDriver for CompressionStore { async fn get_part( self: Pin<&Self>, - digest: DigestInfo, + key: StoreKey<'_>, writer: &mut DropCloserWriteHalf, offset: usize, length: Option, ) -> Result<(), Error> { - if is_zero_digest(&digest) { + if is_zero_digest(key.borrow()) { writer .send_eof() .err_tip(|| "Failed to send zero EOF in filesystem store get_part")?; @@ -404,9 +404,10 @@ impl StoreDriver for CompressionStore { let (tx, mut rx) = make_buf_channel_pair(); let inner_store = self.inner_store.clone(); + let key = key.into_owned(); let get_part_fut = spawn!("compression_store_get_part_spawn", async move { inner_store - .get_part(digest, tx, 0, None) + .get_part(key, tx, 0, None) .await .err_tip(|| "Inner store get in compression store failed") }) @@ -611,7 +612,7 @@ impl StoreDriver for CompressionStore { Ok(()) } - fn inner_store(&self, _digest: Option) -> &dyn StoreDriver { + fn inner_store(&self, _digest: Option>) -> &dyn StoreDriver { self } diff --git a/nativelink-store/src/dedup_store.rs b/nativelink-store/src/dedup_store.rs index 1e996792fc..3bf5544092 100644 --- a/nativelink-store/src/dedup_store.rs +++ b/nativelink-store/src/dedup_store.rs @@ -25,7 +25,7 @@ use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; use nativelink_util::fastcdc::FastCDC; use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; -use nativelink_util::store_trait::{Store, StoreDriver, StoreLike, UploadSizeInfo}; +use nativelink_util::store_trait::{Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo}; use serde::{Deserialize, Serialize}; use tokio_util::codec::FramedRead; use tokio_util::io::StreamReader; @@ -86,13 +86,13 @@ impl DedupStore { } } - async fn has(self: Pin<&Self>, digest: DigestInfo) -> Result, Error> { + async fn has(self: Pin<&Self>, key: StoreKey<'_>) -> Result, Error> { // First we need to load the index that contains where the individual parts actually // can be fetched from. let index_entries = { let maybe_data = self .index_store - .get_part_unchunked(digest, 0, None) + .get_part_unchunked(key.borrow(), 0, None) .await .err_tip(|| "Failed to read index store in dedup store"); let data = match maybe_data { @@ -109,7 +109,7 @@ impl DedupStore { Err(err) => { event!( Level::WARN, - ?digest, + ?key, ?err, "Failed to deserialize index in dedup store", ); @@ -120,10 +120,12 @@ impl DedupStore { } }; - let digests: Vec = index_entries + let digests: Vec<_> = index_entries .entries .into_iter() - .map(|index_entry| DigestInfo::new(index_entry.packed_hash, index_entry.size_bytes)) + .map(|index_entry| { + DigestInfo::new(index_entry.packed_hash, index_entry.size_bytes).into() + }) .collect(); let mut sum = 0; for size in self.content_store.has_many(&digests).await? { @@ -142,14 +144,14 @@ impl DedupStore { impl StoreDriver for DedupStore { async fn has_with_results( self: Pin<&Self>, - digests: &[DigestInfo], + digests: &[StoreKey<'_>], results: &mut [Option], ) -> Result<(), Error> { digests .iter() .zip(results.iter_mut()) - .map(|(digest, result)| async move { - match self.has(*digest).await { + .map(|(key, result)| async move { + match self.has(key.borrow()).await { Ok(maybe_size) => { *result = maybe_size; Ok(()) @@ -165,7 +167,7 @@ impl StoreDriver for DedupStore { async fn update( self: Pin<&Self>, - digest: DigestInfo, + key: StoreKey<'_>, reader: DropCloserReadHalf, _size_info: UploadSizeInfo, ) -> Result<(), Error> { @@ -210,7 +212,7 @@ impl StoreDriver for DedupStore { })?; self.index_store - .update_oneshot(digest, serialized_index.into()) + .update_oneshot(key, serialized_index.into()) .await .err_tip(|| "Failed to insert our index entry to index_store in dedup_store")?; @@ -219,7 +221,7 @@ impl StoreDriver for DedupStore { async fn get_part( self: Pin<&Self>, - digest: DigestInfo, + key: StoreKey<'_>, writer: &mut DropCloserWriteHalf, offset: usize, length: Option, @@ -236,7 +238,7 @@ impl StoreDriver for DedupStore { let index_entries = { let data = self .index_store - .get_part_unchunked(digest, 0, None) + .get_part_unchunked(key, 0, None) .await .err_tip(|| "Failed to read index store in dedup store")?; @@ -333,7 +335,7 @@ impl StoreDriver for DedupStore { Ok(()) } - fn inner_store(&self, _digest: Option) -> &dyn StoreDriver { + fn inner_store(&self, _digest: Option>) -> &dyn StoreDriver { self } diff --git a/nativelink-store/src/existence_cache_store.rs b/nativelink-store/src/existence_cache_store.rs index 77b87bb66b..6855addc07 100644 --- a/nativelink-store/src/existence_cache_store.rs +++ b/nativelink-store/src/existence_cache_store.rs @@ -24,7 +24,7 @@ use nativelink_util::common::DigestInfo; use nativelink_util::evicting_map::{EvictingMap, LenEntry}; use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; use nativelink_util::metrics_utils::{CollectorState, MetricsComponent, Registry}; -use nativelink_util::store_trait::{Store, StoreDriver, StoreLike, UploadSizeInfo}; +use nativelink_util::store_trait::{Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo}; #[derive(Clone, Debug)] struct ExistanceItem(usize); @@ -40,7 +40,6 @@ impl LenEntry for ExistanceItem { false } } - pub struct ExistenceCacheStore { inner_store: Store, existence_cache: EvictingMap, @@ -71,10 +70,10 @@ impl ExistenceCacheStore { ) -> Result<(), Error> { self.existence_cache.sizes_for_keys(digests, results).await; - let not_cached_digests: Vec = digests + let not_cached_digests: Vec<_> = digests .iter() .zip(results.iter()) - .filter_map(|(digest, result)| result.map_or_else(|| Some(*digest), |_| None)) + .filter_map(|(digest, result)| result.map_or_else(|| Some(digest.into()), |_| None)) .collect(); // Hot path optimization when all digests are cached. @@ -96,7 +95,9 @@ impl ExistenceCacheStore { let inserts = not_cached_digests .iter() .zip(inner_results.iter()) - .filter_map(|(digest, result)| result.map(|size| (*digest, ExistanceItem(size)))) + .filter_map(|(digest, result)| { + result.map(|size| (digest.borrow().into_digest(), ExistanceItem(size))) + }) .collect::>(); let _ = self.existence_cache.insert_many(inserts).await; } @@ -128,18 +129,27 @@ impl ExistenceCacheStore { impl StoreDriver for ExistenceCacheStore { async fn has_with_results( self: Pin<&Self>, - digests: &[DigestInfo], + digests: &[StoreKey<'_>], results: &mut [Option], ) -> Result<(), Error> { - self.inner_has_with_results(digests, results).await + // TODO(allada) This is a bit of a hack to get around the lifetime issues with the + // existence_cache. We need to convert the digests to owned values to be able to + // insert them into the cache. In theory it should be able to elide this conversion + // but it seems to be a bit tricky to get right. + let digests: Vec<_> = digests + .iter() + .map(|key| key.borrow().into_digest()) + .collect(); + self.inner_has_with_results(&digests, results).await } async fn update( self: Pin<&Self>, - digest: DigestInfo, + key: StoreKey<'_>, mut reader: DropCloserReadHalf, size_info: UploadSizeInfo, ) -> Result<(), Error> { + let digest = key.into_digest(); let mut exists = [None]; self.inner_has_with_results(&[digest], &mut exists) .await @@ -167,11 +177,12 @@ impl StoreDriver for ExistenceCacheStore { async fn get_part( self: Pin<&Self>, - digest: DigestInfo, + key: StoreKey<'_>, writer: &mut DropCloserWriteHalf, offset: usize, length: Option, ) -> Result<(), Error> { + let digest = key.into_digest(); let result = self .inner_store .get_part(digest, writer, offset, length) @@ -187,7 +198,7 @@ impl StoreDriver for ExistenceCacheStore { result } - fn inner_store(&self, _digest: Option) -> &dyn StoreDriver { + fn inner_store(&self, _digest: Option>) -> &dyn StoreDriver { self } diff --git a/nativelink-store/src/fast_slow_store.rs b/nativelink-store/src/fast_slow_store.rs index a57bbd45e3..cc1bcaa4e5 100644 --- a/nativelink-store/src/fast_slow_store.rs +++ b/nativelink-store/src/fast_slow_store.rs @@ -25,12 +25,12 @@ use nativelink_error::{make_err, Code, Error, ResultExt}; use nativelink_util::buf_channel::{ make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf, }; -use nativelink_util::common::DigestInfo; use nativelink_util::fs; use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; use nativelink_util::metrics_utils::{CollectorState, MetricsComponent, Registry}; use nativelink_util::store_trait::{ - slow_update_store_with_file, Store, StoreDriver, StoreLike, StoreOptimizations, UploadSizeInfo, + slow_update_store_with_file, Store, StoreDriver, StoreKey, StoreLike, StoreOptimizations, + UploadSizeInfo, }; // TODO(blaise.bruer) This store needs to be evaluated for more efficient memory usage, @@ -77,10 +77,10 @@ impl FastSlowStore { /// cost function. Since the data itself is shared and not copied it should be fairly /// low cost to just discard the data, but does cost a few mutex locks while /// streaming. - pub async fn populate_fast_store(&self, digest: DigestInfo) -> Result<(), Error> { + pub async fn populate_fast_store(&self, key: StoreKey<'_>) -> Result<(), Error> { let maybe_size_info = self .fast_store - .has(digest) + .has(key.borrow()) .await .err_tip(|| "While querying in populate_fast_store")?; if maybe_size_info.is_some() { @@ -94,7 +94,7 @@ impl FastSlowStore { while !rx.recv().await?.is_empty() {} Ok(()) }; - let (drain_res, get_res) = join!(drain_fut, StoreDriver::get(Pin::new(self), digest, tx)); + let (drain_res, get_res) = join!(drain_fut, StoreDriver::get(Pin::new(self), key, tx)); get_res.err_tip(|| "Failed to populate()").merge(drain_res) } @@ -126,37 +126,37 @@ impl FastSlowStore { impl StoreDriver for FastSlowStore { async fn has_with_results( self: Pin<&Self>, - digests: &[DigestInfo], + key: &[StoreKey<'_>], results: &mut [Option], ) -> Result<(), Error> { // If our slow store is a noop store, it'll always return a 404, // so only check the fast store in such case. - let slow_store = self.slow_store.inner_store(None); + let slow_store = self.slow_store.inner_store::>(None); if slow_store.optimized_for(StoreOptimizations::NoopDownloads) { - return self.fast_store.has_with_results(digests, results).await; + return self.fast_store.has_with_results(key, results).await; } // Only check the slow store because if it's not there, then something // down stream might be unable to get it. This should not affect // workers as they only use get() and a CAS can use an // ExistenceCacheStore to avoid the bottleneck. - self.slow_store.has_with_results(digests, results).await + self.slow_store.has_with_results(key, results).await } async fn update( self: Pin<&Self>, - digest: DigestInfo, + key: StoreKey<'_>, mut reader: DropCloserReadHalf, size_info: UploadSizeInfo, ) -> Result<(), Error> { // If either one of our stores is a noop store, bypass the multiplexing // and just use the store that is not a noop store. - let slow_store = self.slow_store.inner_store(Some(digest)); + let slow_store = self.slow_store.inner_store(Some(key.borrow())); if slow_store.optimized_for(StoreOptimizations::NoopUpdates) { - return self.fast_store.update(digest, reader, size_info).await; + return self.fast_store.update(key, reader, size_info).await; } - let fast_store = self.fast_store.inner_store(Some(digest)); + let fast_store = self.fast_store.inner_store(Some(key.borrow())); if fast_store.optimized_for(StoreOptimizations::NoopUpdates) { - return self.slow_store.update(digest, reader, size_info).await; + return self.slow_store.update(key, reader, size_info).await; } let (mut fast_tx, fast_rx) = make_buf_channel_pair(); @@ -199,8 +199,8 @@ impl StoreDriver for FastSlowStore { } }; - let fast_store_fut = self.fast_store.update(digest, fast_rx, size_info); - let slow_store_fut = self.slow_store.update(digest, slow_rx, size_info); + let fast_store_fut = self.fast_store.update(key.borrow(), fast_rx, size_info); + let slow_store_fut = self.slow_store.update(key.borrow(), slow_rx, size_info); let (data_stream_res, fast_res, slow_res) = join!(data_stream_fut, fast_store_fut, slow_store_fut); @@ -218,7 +218,7 @@ impl StoreDriver for FastSlowStore { /// dramatically increasing performance for large files. async fn update_with_whole_file( self: Pin<&Self>, - digest: DigestInfo, + key: StoreKey<'_>, mut file: fs::ResumeableFileSlot, upload_size: UploadSizeInfo, ) -> Result, Error> { @@ -232,7 +232,7 @@ impl StoreDriver for FastSlowStore { { slow_update_store_with_file( self.slow_store.as_store_driver_pin(), - digest, + key.borrow(), &mut file, upload_size, ) @@ -241,7 +241,7 @@ impl StoreDriver for FastSlowStore { } return self .fast_store - .update_with_whole_file(digest, file, upload_size) + .update_with_whole_file(key, file, upload_size) .await; } @@ -255,7 +255,7 @@ impl StoreDriver for FastSlowStore { { slow_update_store_with_file( self.fast_store.as_store_driver_pin(), - digest, + key.borrow(), &mut file, upload_size, ) @@ -264,11 +264,11 @@ impl StoreDriver for FastSlowStore { } return self .slow_store - .update_with_whole_file(digest, file, upload_size) + .update_with_whole_file(key, file, upload_size) .await; } - slow_update_store_with_file(self, digest, &mut file, upload_size) + slow_update_store_with_file(self, key, &mut file, upload_size) .await .err_tip(|| "In FastSlowStore::update_with_whole_file")?; Ok(Some(file)) @@ -276,19 +276,19 @@ impl StoreDriver for FastSlowStore { async fn get_part( self: Pin<&Self>, - digest: DigestInfo, + key: StoreKey<'_>, writer: &mut DropCloserWriteHalf, offset: usize, length: Option, ) -> Result<(), Error> { // TODO(blaise.bruer) Investigate if we should maybe ignore errors here instead of // forwarding the up. - if self.fast_store.has(digest).await?.is_some() { + if self.fast_store.has(key.borrow()).await?.is_some() { self.metrics .fast_store_hit_count .fetch_add(1, Ordering::Acquire); self.fast_store - .get_part(digest, writer.borrow_mut(), offset, length) + .get_part(key, writer.borrow_mut(), offset, length) .await?; self.metrics .fast_store_downloaded_bytes @@ -298,14 +298,14 @@ impl StoreDriver for FastSlowStore { let sz = self .slow_store - .has(digest) + .has(key.borrow()) .await .err_tip(|| "Failed to run has() on slow store")? .ok_or_else(|| { make_err!( Code::NotFound, "Object {} not found in either fast or slow store", - digest.hash_str() + key.as_str() ) })?; self.metrics @@ -351,10 +351,10 @@ impl StoreDriver for FastSlowStore { } }; - let slow_store_fut = self.slow_store.get(digest, slow_tx); - let fast_store_fut = self - .fast_store - .update(digest, fast_rx, UploadSizeInfo::ExactSize(sz)); + let slow_store_fut = self.slow_store.get(key.borrow(), slow_tx); + let fast_store_fut = + self.fast_store + .update(key.borrow(), fast_rx, UploadSizeInfo::ExactSize(sz)); let (data_stream_res, slow_res, fast_res) = join!(data_stream_fut, slow_store_fut, fast_store_fut); @@ -372,7 +372,7 @@ impl StoreDriver for FastSlowStore { } } - fn inner_store(&self, _digest: Option) -> &dyn StoreDriver { + fn inner_store(&self, _key: Option>) -> &dyn StoreDriver { self } diff --git a/nativelink-store/src/filesystem_store.rs b/nativelink-store/src/filesystem_store.rs index 17d62e9cf3..ca7991f14f 100644 --- a/nativelink-store/src/filesystem_store.rs +++ b/nativelink-store/src/filesystem_store.rs @@ -34,7 +34,7 @@ use nativelink_util::common::{fs, DigestInfo}; use nativelink_util::evicting_map::{EvictingMap, LenEntry}; use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator}; use nativelink_util::metrics_utils::{Collector, CollectorState, MetricsComponent, Registry}; -use nativelink_util::store_trait::{StoreDriver, StoreOptimizations, UploadSizeInfo}; +use nativelink_util::store_trait::{StoreDriver, StoreKey, StoreOptimizations, UploadSizeInfo}; use nativelink_util::{background_spawn, spawn_blocking}; use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom}; use tokio::time::{sleep, timeout, Sleep}; @@ -724,22 +724,23 @@ impl FilesystemStore { impl StoreDriver for FilesystemStore { async fn has_with_results( self: Pin<&Self>, - digests: &[DigestInfo], + keys: &[StoreKey<'_>], results: &mut [Option], ) -> Result<(), Error> { - self.evicting_map.sizes_for_keys(digests, results).await; + let keys: Vec<_> = keys.iter().map(|v| v.borrow().into_digest()).collect(); + self.evicting_map.sizes_for_keys(&keys, results).await; // We need to do a special pass to ensure our zero files exist. // If our results failed and the result was a zero file, we need to // create the file by spec. - for (digest, result) in digests.iter().zip(results.iter_mut()) { + for (digest, result) in keys.iter().zip(results.iter_mut()) { if result.is_some() || !is_zero_digest(digest) { continue; } let (mut tx, rx) = make_buf_channel_pair(); let send_eof_result = tx.send_eof(); - self.update(*digest, rx, UploadSizeInfo::ExactSize(0)) + self.update(digest.into(), rx, UploadSizeInfo::ExactSize(0)) .await - .err_tip(|| format!("Failed to create zero file for digest {digest:?}")) + .err_tip(|| format!("Failed to create zero file for key {digest:?}")) .merge( send_eof_result .err_tip(|| "Failed to send zero file EOF in filesystem store has"), @@ -752,10 +753,11 @@ impl StoreDriver for FilesystemStore { async fn update( self: Pin<&Self>, - digest: DigestInfo, + key: StoreKey<'_>, reader: DropCloserReadHalf, _upload_size: UploadSizeInfo, ) -> Result<(), Error> { + let digest = key.into_digest(); let mut temp_digest = digest; make_temp_digest(&mut temp_digest); @@ -780,10 +782,11 @@ impl StoreDriver for FilesystemStore { async fn update_with_whole_file( self: Pin<&Self>, - digest: DigestInfo, + key: StoreKey<'_>, mut file: fs::ResumeableFileSlot, upload_size: UploadSizeInfo, ) -> Result, Error> { + let digest = key.into_digest(); let path = file.get_path().as_os_str().to_os_string(); let file_size = match upload_size { UploadSizeInfo::ExactSize(size) => size as u64, @@ -820,13 +823,14 @@ impl StoreDriver for FilesystemStore { async fn get_part( self: Pin<&Self>, - digest: DigestInfo, + key: StoreKey<'_>, writer: &mut DropCloserWriteHalf, offset: usize, length: Option, ) -> Result<(), Error> { - if is_zero_digest(&digest) { - self.has(digest) + let digest = key.into_digest(); + if is_zero_digest(digest) { + self.has(digest.into()) .await .err_tip(|| "Failed to check if zero digest exists in filesystem store")?; writer @@ -893,7 +897,7 @@ impl StoreDriver for FilesystemStore { Ok(()) } - fn inner_store(&self, _digest: Option) -> &dyn StoreDriver { + fn inner_store(&self, _digest: Option>) -> &dyn StoreDriver { self } diff --git a/nativelink-store/src/grpc_store.rs b/nativelink-store/src/grpc_store.rs index 570996ab54..cf5076951a 100644 --- a/nativelink-store/src/grpc_store.rs +++ b/nativelink-store/src/grpc_store.rs @@ -45,7 +45,7 @@ use nativelink_util::proto_stream_utils::{ }; use nativelink_util::resource_info::ResourceInfo; use nativelink_util::retry::{Retrier, RetryResult}; -use nativelink_util::store_trait::{StoreDriver, UploadSizeInfo}; +use nativelink_util::store_trait::{StoreDriver, StoreKey, UploadSizeInfo}; use nativelink_util::{default_health_status_indicator, tls_utils}; use parking_lot::Mutex; use prost::Message; @@ -506,18 +506,18 @@ impl StoreDriver for GrpcStore { // is incorrect. async fn has_with_results( self: Pin<&Self>, - digests: &[DigestInfo], + keys: &[StoreKey<'_>], results: &mut [Option], ) -> Result<(), Error> { if matches!(self.store_type, nativelink_config::stores::StoreType::ac) { - digests - .iter() + keys.iter() .zip(results.iter_mut()) - .map(|(digest, result)| async move { + .map(|(key, result)| async move { // The length of an AC is incorrect, so we don't figure out the // length, instead the biggest possible result is returned in the // hope that we detect incorrect usage. - self.get_action_result_from_digest(*digest).await?; + self.get_action_result_from_digest(key.borrow().into_digest()) + .await?; *result = Some(usize::MAX); Ok::<_, Error>(()) }) @@ -531,7 +531,10 @@ impl StoreDriver for GrpcStore { let missing_blobs_response = self .find_missing_blobs(Request::new(FindMissingBlobsRequest { instance_name: self.instance_name.clone(), - blob_digests: digests.iter().map(|digest| digest.into()).collect(), + blob_digests: keys + .iter() + .map(|k| k.borrow().into_digest().into()) + .collect(), digest_function: ActiveOriginContext::get_value(&ACTIVE_HASHER_FUNC) .err_tip(|| "In GrpcStore::has_with_results")? .map_or_else(default_digest_hasher_func, |v| *v) @@ -552,8 +555,12 @@ impl StoreDriver for GrpcStore { missing_digests.push(DigestInfo::try_from(missing_digest)?); } missing_digests.sort_unstable(); - for (digest, result) in digests.iter().zip(results.iter_mut()) { - match missing_digests.binary_search(digest) { + for (digest, result) in keys + .iter() + .map(|v| v.borrow().into_digest()) + .zip(results.iter_mut()) + { + match missing_digests.binary_search(&digest) { Ok(_) => *result = None, Err(_) => *result = Some(usize::try_from(digest.size_bytes)?), } @@ -564,10 +571,11 @@ impl StoreDriver for GrpcStore { async fn update( self: Pin<&Self>, - digest: DigestInfo, + key: StoreKey<'_>, reader: DropCloserReadHalf, _size_info: UploadSizeInfo, ) -> Result<(), Error> { + let digest = key.into_digest(); if matches!(self.store_type, nativelink_config::stores::StoreType::ac) { return self.update_action_result_from_bytes(digest, reader).await; } @@ -642,11 +650,12 @@ impl StoreDriver for GrpcStore { async fn get_part( self: Pin<&Self>, - digest: DigestInfo, + key: StoreKey<'_>, writer: &mut DropCloserWriteHalf, offset: usize, length: Option, ) -> Result<(), Error> { + let digest = key.into_digest(); if matches!(self.store_type, nativelink_config::stores::StoreType::ac) { return self .get_action_result_as_part(digest, writer, offset, length) @@ -735,7 +744,7 @@ impl StoreDriver for GrpcStore { .await } - fn inner_store(&self, _digest: Option) -> &dyn StoreDriver { + fn inner_store(&self, _digest: Option>) -> &dyn StoreDriver { self } diff --git a/nativelink-store/src/memory_store.rs b/nativelink-store/src/memory_store.rs index 5af4845cf9..be00894388 100644 --- a/nativelink-store/src/memory_store.rs +++ b/nativelink-store/src/memory_store.rs @@ -21,11 +21,10 @@ use async_trait::async_trait; use bytes::{Bytes, BytesMut}; use nativelink_error::{Code, Error, ResultExt}; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; -use nativelink_util::common::DigestInfo; use nativelink_util::evicting_map::{EvictingMap, LenEntry}; use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; use nativelink_util::metrics_utils::{Collector, CollectorState, MetricsComponent, Registry}; -use nativelink_util::store_trait::{StoreDriver, UploadSizeInfo}; +use nativelink_util::store_trait::{StoreDriver, StoreKey, UploadSizeInfo}; use crate::cas_utils::is_zero_digest; @@ -51,7 +50,7 @@ impl LenEntry for BytesWrapper { } pub struct MemoryStore { - evicting_map: EvictingMap, + evicting_map: EvictingMap, BytesWrapper, SystemTime>, } impl MemoryStore { @@ -69,8 +68,8 @@ impl MemoryStore { self.evicting_map.len_for_test().await } - pub async fn remove_entry(&self, digest: &DigestInfo) -> bool { - self.evicting_map.remove(digest).await + pub async fn remove_entry(&self, key: StoreKey<'_>) -> bool { + self.evicting_map.remove(&key.into_owned()).await } } @@ -78,16 +77,19 @@ impl MemoryStore { impl StoreDriver for MemoryStore { async fn has_with_results( self: Pin<&Self>, - digests: &[DigestInfo], + keys: &[StoreKey<'_>], results: &mut [Option], ) -> Result<(), Error> { - self.evicting_map.sizes_for_keys(digests, results).await; + // TODO(allada): This is a dirty hack to get around the lifetime issues with the + // evicting map. + let digests: Vec<_> = keys.iter().map(|key| key.borrow().into_owned()).collect(); + let evicting_map = Pin::new(&self.evicting_map); + evicting_map.sizes_for_keys(digests, results).await; // We need to do a special pass to ensure our zero digest exist. - digests - .iter() + keys.iter() .zip(results.iter_mut()) - .for_each(|(digest, result)| { - if is_zero_digest(digest) { + .for_each(|(key, result)| { + if is_zero_digest(key.borrow()) { *result = Some(0); } }); @@ -96,7 +98,7 @@ impl StoreDriver for MemoryStore { async fn update( self: Pin<&Self>, - digest: DigestInfo, + key: StoreKey<'_>, mut reader: DropCloserReadHalf, _size_info: UploadSizeInfo, ) -> Result<(), Error> { @@ -113,19 +115,19 @@ impl StoreDriver for MemoryStore { }; self.evicting_map - .insert(digest, BytesWrapper(final_buffer)) + .insert(key.into_owned(), BytesWrapper(final_buffer)) .await; Ok(()) } async fn get_part( self: Pin<&Self>, - digest: DigestInfo, + key: StoreKey<'_>, writer: &mut DropCloserWriteHalf, offset: usize, length: Option, ) -> Result<(), Error> { - if is_zero_digest(&digest) { + if is_zero_digest(key.borrow()) { writer .send_eof() .err_tip(|| "Failed to send zero EOF in filesystem store get_part")?; @@ -134,14 +136,9 @@ impl StoreDriver for MemoryStore { let value = self .evicting_map - .get(&digest) + .get(&key.borrow().into_owned()) .await - .err_tip_with_code(|_| { - ( - Code::NotFound, - format!("Hash {} not found", digest.hash_str()), - ) - })?; + .err_tip_with_code(|_| (Code::NotFound, format!("Key {key:?} not found")))?; let default_len = value.len() - offset; let length = length.unwrap_or(default_len).min(default_len); if length > 0 { @@ -156,7 +153,7 @@ impl StoreDriver for MemoryStore { Ok(()) } - fn inner_store(&self, _digest: Option) -> &dyn StoreDriver { + fn inner_store(&self, _digest: Option>) -> &dyn StoreDriver { self } diff --git a/nativelink-store/src/noop_store.rs b/nativelink-store/src/noop_store.rs index 6902116a4f..cac5bed019 100644 --- a/nativelink-store/src/noop_store.rs +++ b/nativelink-store/src/noop_store.rs @@ -18,9 +18,8 @@ use std::sync::Arc; use async_trait::async_trait; use nativelink_error::{make_err, Code, Error, ResultExt}; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; -use nativelink_util::common::DigestInfo; use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; -use nativelink_util::store_trait::{StoreDriver, StoreOptimizations, UploadSizeInfo}; +use nativelink_util::store_trait::{StoreDriver, StoreKey, StoreOptimizations, UploadSizeInfo}; #[derive(Default)] pub struct NoopStore; @@ -35,7 +34,7 @@ impl NoopStore { impl StoreDriver for NoopStore { async fn has_with_results( self: Pin<&Self>, - _digests: &[DigestInfo], + _keys: &[StoreKey<'_>], results: &mut [Option], ) -> Result<(), Error> { results.iter_mut().for_each(|r| *r = None); @@ -44,7 +43,7 @@ impl StoreDriver for NoopStore { async fn update( self: Pin<&Self>, - _digest: DigestInfo, + _key: StoreKey<'_>, mut reader: DropCloserReadHalf, _size_info: UploadSizeInfo, ) -> Result<(), Error> { @@ -61,7 +60,7 @@ impl StoreDriver for NoopStore { async fn get_part( self: Pin<&Self>, - _digest: DigestInfo, + _key: StoreKey<'_>, _writer: &mut DropCloserWriteHalf, _offset: usize, _length: Option, @@ -69,7 +68,7 @@ impl StoreDriver for NoopStore { Err(make_err!(Code::NotFound, "Not found in noop store")) } - fn inner_store(&self, _digest: Option) -> &dyn StoreDriver { + fn inner_store(&self, _key: Option) -> &dyn StoreDriver { self } diff --git a/nativelink-store/src/redis_store.rs b/nativelink-store/src/redis_store.rs index 382f42ed80..9814f91784 100644 --- a/nativelink-store/src/redis_store.rs +++ b/nativelink-store/src/redis_store.rs @@ -24,10 +24,9 @@ use futures::future::{BoxFuture, FutureExt, Shared}; use nativelink_error::{error_if, make_err, Code, Error, ResultExt}; use nativelink_util::background_spawn; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; -use nativelink_util::common::DigestInfo; use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator}; use nativelink_util::metrics_utils::{Collector, CollectorState, MetricsComponent, Registry}; -use nativelink_util::store_trait::{StoreDriver, UploadSizeInfo}; +use nativelink_util::store_trait::{StoreDriver, StoreKey, UploadSizeInfo}; use redis::aio::{ConnectionLike, ConnectionManager}; use redis::AsyncCommands; @@ -35,10 +34,6 @@ use crate::cas_utils::is_zero_digest; const READ_CHUNK_SIZE: isize = 64 * 1024; -fn digest_to_key(digest: &DigestInfo) -> String { - format!("{}-{}", digest.hash_str(), digest.size_bytes) -} - /// Holds a connection result or a future that resolves to a connection. /// This is a utility to allow us to start a connection but not block on it. pub enum LazyConnection { @@ -115,10 +110,10 @@ impl RedisStore { impl StoreDriver for RedisStore { async fn has_with_results( self: Pin<&Self>, - digests: &[DigestInfo], + keys: &[StoreKey<'_>], results: &mut [Option], ) -> Result<(), Error> { - if digests.len() == 1 && is_zero_digest(&digests[0]) { + if keys.len() == 1 && is_zero_digest(keys[0].borrow()) { results[0] = Some(0); return Ok(()); } @@ -128,12 +123,12 @@ impl StoreDriver for pipe.atomic(); let mut zero_digest_indexes = Vec::new(); - digests.iter().enumerate().for_each(|(index, digest)| { - if is_zero_digest(digest) { + keys.iter().enumerate().for_each(|(index, key)| { + if is_zero_digest(key.borrow()) { zero_digest_indexes.push(index); } - pipe.strlen(digest_to_key(digest)); + pipe.strlen(key.as_str().as_ref()); }); let digest_sizes = pipe @@ -163,7 +158,7 @@ impl StoreDriver for async fn update( self: Pin<&Self>, - digest: DigestInfo, + key: StoreKey<'_>, mut reader: DropCloserReadHalf, _upload_size: UploadSizeInfo, ) -> Result<(), Error> { @@ -183,11 +178,11 @@ impl StoreDriver for .err_tip(|| "Failed to reach chunk in update in redis store")?; if chunk.is_empty() { - if is_zero_digest(&digest) { + if is_zero_digest(key.borrow()) { return Ok(()); } if force_recv { - conn.append(digest_to_key(&digest), &chunk[..]) + conn.append(key.as_str().as_ref(), &chunk[..]) .await .map_err(from_redis_err) .err_tip(|| "In RedisStore::update() single chunk")?; @@ -214,7 +209,7 @@ impl StoreDriver for pipe.cmd("RENAME") .arg(temp_key.get_or_init(make_temp_name)) - .arg(digest_to_key(&digest)); + .arg(key.as_str().as_ref()); pipe.query_async(&mut conn) .await .map_err(from_redis_err) @@ -225,14 +220,14 @@ impl StoreDriver for async fn get_part( self: Pin<&Self>, - digest: DigestInfo, + key: StoreKey<'_>, writer: &mut DropCloserWriteHalf, offset: usize, length: Option, ) -> Result<(), Error> { // To follow RBE spec we need to consider any digest's with // zero size to be existing. - if is_zero_digest(&digest) { + if is_zero_digest(key.borrow()) { writer .send_eof() .err_tip(|| "Failed to send zero EOF in redis store get_part")?; @@ -242,15 +237,14 @@ impl StoreDriver for let mut conn = self.get_conn().await?; if length == Some(0) { let exists = conn - .exists::<_, bool>(digest_to_key(&digest)) + .exists::<_, bool>(key.as_str().as_ref()) .await .map_err(from_redis_err) .err_tip(|| "In RedisStore::get_part::zero_exists")?; if !exists { return Err(make_err!( Code::NotFound, - "Data not found in Redis store for digest: {}", - digest_to_key(&digest) + "Data not found in Redis store for digest: {key:?}" )); } writer @@ -270,7 +264,7 @@ impl StoreDriver for let current_end = std::cmp::min(current_start.saturating_add(READ_CHUNK_SIZE), end_position) - 1; let chunk = conn - .getrange::<_, Bytes>(digest_to_key(&digest), current_start, current_end) + .getrange::<_, Bytes>(key.as_str().as_ref(), current_start, current_end) .await .map_err(from_redis_err) .err_tip(|| "In RedisStore::get_part::getrange")?; @@ -308,7 +302,7 @@ impl StoreDriver for Ok(()) } - fn inner_store(&self, _digest: Option) -> &dyn StoreDriver { + fn inner_store(&self, _digest: Option) -> &dyn StoreDriver { self } diff --git a/nativelink-store/src/ref_store.rs b/nativelink-store/src/ref_store.rs index d8bcb81253..577900856c 100644 --- a/nativelink-store/src/ref_store.rs +++ b/nativelink-store/src/ref_store.rs @@ -19,9 +19,8 @@ use std::sync::{Arc, Mutex, Weak}; use async_trait::async_trait; use nativelink_error::{make_err, make_input_err, Code, Error, ResultExt}; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; -use nativelink_util::common::DigestInfo; use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; -use nativelink_util::store_trait::{Store, StoreDriver, StoreLike, UploadSizeInfo}; +use nativelink_util::store_trait::{Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo}; use tracing::{event, Level}; use crate::store_manager::StoreManager; @@ -102,43 +101,38 @@ impl RefStore { impl StoreDriver for RefStore { async fn has_with_results( self: Pin<&Self>, - digests: &[DigestInfo], + keys: &[StoreKey<'_>], results: &mut [Option], ) -> Result<(), Error> { - self.get_store()?.has_with_results(digests, results).await + self.get_store()?.has_with_results(keys, results).await } async fn update( self: Pin<&Self>, - digest: DigestInfo, + key: StoreKey<'_>, reader: DropCloserReadHalf, size_info: UploadSizeInfo, ) -> Result<(), Error> { - self.get_store()?.update(digest, reader, size_info).await + self.get_store()?.update(key, reader, size_info).await } async fn get_part( self: Pin<&Self>, - digest: DigestInfo, + key: StoreKey<'_>, writer: &mut DropCloserWriteHalf, offset: usize, length: Option, ) -> Result<(), Error> { self.get_store()? - .get_part(digest, writer, offset, length) + .get_part(key, writer, offset, length) .await } - fn inner_store(&self, digest: Option) -> &'_ dyn StoreDriver { + fn inner_store(&self, key: Option) -> &'_ dyn StoreDriver { match self.get_store() { - Ok(store) => store.inner_store(digest), + Ok(store) => store.inner_store(key), Err(err) => { - event!( - Level::ERROR, - ?digest, - ?err, - "Failed to get store for digest", - ); + event!(Level::ERROR, ?key, ?err, "Failed to get store for key",); self } } diff --git a/nativelink-store/src/s3_store.rs b/nativelink-store/src/s3_store.rs index 766abe6e13..6a72ac53ae 100644 --- a/nativelink-store/src/s3_store.rs +++ b/nativelink-store/src/s3_store.rs @@ -44,11 +44,10 @@ use nativelink_error::{make_err, make_input_err, Code, Error, ResultExt}; use nativelink_util::buf_channel::{ make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf, }; -use nativelink_util::common::DigestInfo; use nativelink_util::fs; use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; use nativelink_util::retry::{Retrier, RetryResult}; -use nativelink_util::store_trait::{StoreDriver, UploadSizeInfo}; +use nativelink_util::store_trait::{StoreDriver, StoreKey, UploadSizeInfo}; use rand::rngs::OsRng; use rand::Rng; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; @@ -301,23 +300,18 @@ impl S3Store { }) } - fn make_s3_path(&self, digest: &DigestInfo) -> String { - format!( - "{}{}-{}", - self.key_prefix, - digest.hash_str(), - digest.size_bytes - ) + fn make_s3_path(&self, key: StoreKey<'_>) -> String { + format!("{}{}", self.key_prefix, key.as_str(),) } - async fn has(self: Pin<&Self>, digest: &DigestInfo) -> Result, Error> { + async fn has(self: Pin<&Self>, digest: &StoreKey<'_>) -> Result, Error> { self.retrier .retry(unfold((), move |state| async move { let result = self .s3_client .head_object() .bucket(&self.bucket) - .key(&self.make_s3_path(digest)) + .key(&self.make_s3_path(digest.borrow())) .send() .await; @@ -357,19 +351,18 @@ impl S3Store { impl StoreDriver for S3Store { async fn has_with_results( self: Pin<&Self>, - digests: &[DigestInfo], + keys: &[StoreKey<'_>], results: &mut [Option], ) -> Result<(), Error> { - digests - .iter() + keys.iter() .zip(results.iter_mut()) - .map(|(digest, result)| async move { - // We need to do a special pass to ensure our zero digest exist. - if is_zero_digest(digest) { + .map(|(key, result)| async move { + // We need to do a special pass to ensure our zero key exist. + if is_zero_digest(key.borrow()) { *result = Some(0); return Ok::<_, Error>(()); } - *result = self.has(digest).await?; + *result = self.has(key).await?; Ok::<_, Error>(()) }) .collect::>() @@ -380,11 +373,11 @@ impl StoreDriver for S3Store { async fn update( self: Pin<&Self>, - digest: DigestInfo, + digest: StoreKey<'_>, mut reader: DropCloserReadHalf, upload_size: UploadSizeInfo, ) -> Result<(), Error> { - let s3_path = &self.make_s3_path(&digest); + let s3_path = &self.make_s3_path(digest.borrow()); let max_size = match upload_size { UploadSizeInfo::ExactSize(sz) | UploadSizeInfo::MaxSize(sz) => sz, @@ -641,19 +634,19 @@ impl StoreDriver for S3Store { async fn get_part( self: Pin<&Self>, - digest: DigestInfo, + key: StoreKey<'_>, writer: &mut DropCloserWriteHalf, offset: usize, length: Option, ) -> Result<(), Error> { - if is_zero_digest(&digest) { + if is_zero_digest(key.borrow()) { writer .send_eof() .err_tip(|| "Failed to send zero EOF in filesystem store get_part")?; return Ok(()); } - let s3_path = &self.make_s3_path(&digest); + let s3_path = &self.make_s3_path(key); let end_read_byte = length .map_or(Some(None), |length| Some(offset.checked_add(length))) .err_tip(|| "Integer overflow protection triggered")?; @@ -738,7 +731,7 @@ impl StoreDriver for S3Store { .await } - fn inner_store(&self, _digest: Option) -> &'_ dyn StoreDriver { + fn inner_store(&self, _digest: Option>) -> &'_ dyn StoreDriver { self } diff --git a/nativelink-store/src/shard_store.rs b/nativelink-store/src/shard_store.rs index bc03e5a938..2be3fef858 100644 --- a/nativelink-store/src/shard_store.rs +++ b/nativelink-store/src/shard_store.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::hash::{DefaultHasher, Hasher}; use std::ops::BitXor; use std::pin::Pin; use std::sync::Arc; @@ -20,14 +21,13 @@ use async_trait::async_trait; use futures::stream::{FuturesUnordered, TryStreamExt}; use nativelink_error::{error_if, Error, ResultExt}; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; -use nativelink_util::common::DigestInfo; use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; use nativelink_util::metrics_utils::Registry; -use nativelink_util::store_trait::{Store, StoreDriver, StoreLike, UploadSizeInfo}; +use nativelink_util::store_trait::{Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo}; pub struct ShardStore { // The weights will always be in ascending order a specific store is choosen based on the - // the hash of the digest hash that is nearest-binary searched using the u32 as the index. + // the hash of the key hash that is nearest-binary searched using the u32 as the index. weights_and_stores: Vec<(u32, Store)>, } @@ -67,48 +67,57 @@ impl ShardStore { }) } - fn get_store_index(&self, digest: &DigestInfo) -> usize { - // Quote from std primitive array documentation: - // Array’s try_from(slice) implementations (and the corresponding slice.try_into() - // array implementations) succeed if the input slice length is the same as the result - // array length. They optimize especially well when the optimizer can easily determine - // the slice length, e.g. <[u8; 4]>::try_from(&slice[4..8]).unwrap(). Array implements - // TryFrom returning. - let size_bytes = digest.size_bytes.to_le_bytes(); - let key: u32 = 0 - .bitxor(u32::from_le_bytes( - digest.packed_hash[0..4].try_into().unwrap(), - )) - .bitxor(u32::from_le_bytes( - digest.packed_hash[4..8].try_into().unwrap(), - )) - .bitxor(u32::from_le_bytes( - digest.packed_hash[8..12].try_into().unwrap(), - )) - .bitxor(u32::from_le_bytes( - digest.packed_hash[12..16].try_into().unwrap(), - )) - .bitxor(u32::from_le_bytes( - digest.packed_hash[16..20].try_into().unwrap(), - )) - .bitxor(u32::from_le_bytes( - digest.packed_hash[20..24].try_into().unwrap(), - )) - .bitxor(u32::from_le_bytes( - digest.packed_hash[24..28].try_into().unwrap(), - )) - .bitxor(u32::from_le_bytes( - digest.packed_hash[28..32].try_into().unwrap(), - )) - .bitxor(u32::from_le_bytes(size_bytes[0..4].try_into().unwrap())) - .bitxor(u32::from_le_bytes(size_bytes[4..8].try_into().unwrap())); + fn get_store_index(&self, store_key: &StoreKey) -> usize { + let key = match store_key { + StoreKey::Digest(digest) => { + // Quote from std primitive array documentation: + // Array’s try_from(slice) implementations (and the corresponding slice.try_into() + // array implementations) succeed if the input slice length is the same as the result + // array length. They optimize especially well when the optimizer can easily determine + // the slice length, e.g. <[u8; 4]>::try_from(&slice[4..8]).unwrap(). Array implements + // TryFrom returning. + let size_bytes = digest.size_bytes.to_le_bytes(); + 0.bitxor(u32::from_le_bytes( + digest.packed_hash[0..4].try_into().unwrap(), + )) + .bitxor(u32::from_le_bytes( + digest.packed_hash[4..8].try_into().unwrap(), + )) + .bitxor(u32::from_le_bytes( + digest.packed_hash[8..12].try_into().unwrap(), + )) + .bitxor(u32::from_le_bytes( + digest.packed_hash[12..16].try_into().unwrap(), + )) + .bitxor(u32::from_le_bytes( + digest.packed_hash[16..20].try_into().unwrap(), + )) + .bitxor(u32::from_le_bytes( + digest.packed_hash[20..24].try_into().unwrap(), + )) + .bitxor(u32::from_le_bytes( + digest.packed_hash[24..28].try_into().unwrap(), + )) + .bitxor(u32::from_le_bytes( + digest.packed_hash[28..32].try_into().unwrap(), + )) + .bitxor(u32::from_le_bytes(size_bytes[0..4].try_into().unwrap())) + .bitxor(u32::from_le_bytes(size_bytes[4..8].try_into().unwrap())) + } + StoreKey::Str(s) => { + let mut hasher = DefaultHasher::new(); + hasher.write(s.as_bytes()); + let key_u64 = hasher.finish(); + (key_u64 >> 32) as u32 // We only need the top 32 bits. + } + }; self.weights_and_stores .binary_search_by_key(&key, |(weight, _)| *weight) .unwrap_or_else(|index| index) } - fn get_store(&self, digest: &DigestInfo) -> &Store { - let index = self.get_store_index(digest); + fn get_store(&self, key: &StoreKey) -> &Store { + let index = self.get_store_index(key); &self.weights_and_stores[index].1 } } @@ -117,54 +126,53 @@ impl ShardStore { impl StoreDriver for ShardStore { async fn has_with_results( self: Pin<&Self>, - digests: &[DigestInfo], + keys: &[StoreKey<'_>], results: &mut [Option], ) -> Result<(), Error> { - if digests.len() == 1 { - // Hot path: It is very common to lookup only one digest. - let store_idx = self.get_store_index(&digests[0]); + if keys.len() == 1 { + // Hot path: It is very common to lookup only one key. + let store_idx = self.get_store_index(&keys[0]); let store = &self.weights_and_stores[store_idx].1; return store - .has_with_results(digests, results) + .has_with_results(keys, results) .await .err_tip(|| "In ShardStore::has_with_results() for store {store_idx}}"); } - type DigestIdxVec = Vec; - type DigestVec = Vec; - let mut digests_for_store: Vec<(DigestIdxVec, DigestVec)> = self + type KeyIdxVec = Vec; + type KeyVec<'a> = Vec>; + let mut keys_for_store: Vec<(KeyIdxVec, KeyVec)> = self .weights_and_stores .iter() .map(|_| (Vec::new(), Vec::new())) .collect(); - // Bucket each digest into the store that it belongs to. - digests - .iter() + // Bucket each key into the store that it belongs to. + keys.iter() .enumerate() - .map(|(digest_idx, digest)| (digest, digest_idx, self.get_store_index(digest))) - .for_each(|(digest, digest_idx, store_idx)| { - digests_for_store[store_idx].0.push(digest_idx); - digests_for_store[store_idx].1.push(*digest); + .map(|(key_idx, key)| (key, key_idx, self.get_store_index(key))) + .for_each(|(key, key_idx, store_idx)| { + keys_for_store[store_idx].0.push(key_idx); + keys_for_store[store_idx].1.push(key.borrow()); }); // Build all our futures for each store. - let mut future_stream: FuturesUnordered<_> = digests_for_store + let mut future_stream: FuturesUnordered<_> = keys_for_store .into_iter() .enumerate() - .map(|(store_idx, (digest_idxs, digests))| async move { + .map(|(store_idx, (key_idxs, keys))| async move { let store = &self.weights_and_stores[store_idx].1; - let mut inner_results = vec![None; digests.len()]; + let mut inner_results = vec![None; keys.len()]; store - .has_with_results(&digests, &mut inner_results) + .has_with_results(&keys, &mut inner_results) .await .err_tip(|| "In ShardStore::has_with_results() for store {store_idx}")?; - Result::<_, Error>::Ok((digest_idxs, inner_results)) + Result::<_, Error>::Ok((key_idxs, inner_results)) }) .collect(); // Wait for all the stores to finish and populate our output results. - while let Some((digest_idxs, inner_results)) = future_stream.try_next().await? { - for (digest_idx, inner_result) in digest_idxs.into_iter().zip(inner_results) { - results[digest_idx] = inner_result; + while let Some((key_idxs, inner_results)) = future_stream.try_next().await? { + for (key_idx, inner_result) in key_idxs.into_iter().zip(inner_results) { + results[key_idx] = inner_result; } } Ok(()) @@ -172,37 +180,37 @@ impl StoreDriver for ShardStore { async fn update( self: Pin<&Self>, - digest: DigestInfo, + key: StoreKey<'_>, reader: DropCloserReadHalf, size_info: UploadSizeInfo, ) -> Result<(), Error> { - let store = self.get_store(&digest); + let store = self.get_store(&key); store - .update(digest, reader, size_info) + .update(key, reader, size_info) .await .err_tip(|| "In ShardStore::update()") } async fn get_part( self: Pin<&Self>, - digest: DigestInfo, + key: StoreKey<'_>, writer: &mut DropCloserWriteHalf, offset: usize, length: Option, ) -> Result<(), Error> { - let store = self.get_store(&digest); + let store = self.get_store(&key); store - .get_part(digest, writer, offset, length) + .get_part(key, writer, offset, length) .await .err_tip(|| "In ShardStore::get_part()") } - fn inner_store(&self, digest: Option) -> &'_ dyn StoreDriver { - let Some(digest) = digest else { + fn inner_store(&self, key: Option>) -> &'_ dyn StoreDriver { + let Some(key) = key else { return self; }; - let index = self.get_store_index(&digest); - self.weights_and_stores[index].1.inner_store(Some(digest)) + let index = self.get_store_index(&key); + self.weights_and_stores[index].1.inner_store(Some(key)) } fn as_any<'a>(&'a self) -> &'a (dyn std::any::Any + Sync + Send + 'static) { diff --git a/nativelink-store/src/size_partitioning_store.rs b/nativelink-store/src/size_partitioning_store.rs index 1057d3e2c4..08e179e03f 100644 --- a/nativelink-store/src/size_partitioning_store.rs +++ b/nativelink-store/src/size_partitioning_store.rs @@ -16,12 +16,11 @@ use std::pin::Pin; use std::sync::Arc; use async_trait::async_trait; -use nativelink_error::{Error, ResultExt}; +use nativelink_error::{make_input_err, Error, ResultExt}; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; -use nativelink_util::common::DigestInfo; use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; use nativelink_util::metrics_utils::{Collector, CollectorState, MetricsComponent, Registry}; -use nativelink_util::store_trait::{Store, StoreDriver, StoreLike, UploadSizeInfo}; +use nativelink_util::store_trait::{Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo}; use tokio::join; pub struct SizePartitioningStore { @@ -48,13 +47,23 @@ impl SizePartitioningStore { impl StoreDriver for SizePartitioningStore { async fn has_with_results( self: Pin<&Self>, - digests: &[DigestInfo], + keys: &[StoreKey<'_>], results: &mut [Option], ) -> Result<(), Error> { - let (lower_digests, upper_digests): (Vec<_>, Vec<_>) = digests - .iter() - .cloned() - .partition(|digest| digest.size_bytes < self.partition_size); + let mut all_digests = true; + let (lower_digests, upper_digests): (Vec<_>, Vec<_>) = + keys.iter().map(|v| v.borrow()).partition(|k| { + let StoreKey::Digest(digest) = k else { + all_digests = false; + return false; + }; + digest.size_bytes < self.partition_size + }); + if !all_digests { + return Err(make_input_err!( + "SizePartitioningStore only supports Digest keys" + )); + } let (lower_results, upper_results) = join!( self.lower_store.has_many(&lower_digests), self.upper_store.has_many(&upper_digests), @@ -68,7 +77,7 @@ impl StoreDriver for SizePartitioningStore { }; let mut upper_digests = upper_digests.into_iter().peekable(); let mut upper_results = upper_results?.into_iter(); - for (digest, result) in digests.iter().zip(results.iter_mut()) { + for (digest, result) in keys.iter().zip(results.iter_mut()) { if Some(digest) == upper_digests.peek() { upper_digests.next(); *result = upper_results @@ -85,10 +94,18 @@ impl StoreDriver for SizePartitioningStore { async fn update( self: Pin<&Self>, - digest: DigestInfo, + key: StoreKey<'_>, reader: DropCloserReadHalf, size_info: UploadSizeInfo, ) -> Result<(), Error> { + let digest = match key { + StoreKey::Digest(digest) => digest, + _ => { + return Err(make_input_err!( + "SizePartitioningStore only supports Digest keys" + )) + } + }; if digest.size_bytes < self.partition_size { return self.lower_store.update(digest, reader, size_info).await; } @@ -97,11 +114,19 @@ impl StoreDriver for SizePartitioningStore { async fn get_part( self: Pin<&Self>, - digest: DigestInfo, + key: StoreKey<'_>, writer: &mut DropCloserWriteHalf, offset: usize, length: Option, ) -> Result<(), Error> { + let digest = match key { + StoreKey::Digest(digest) => digest, + _ => { + return Err(make_input_err!( + "SizePartitioningStore only supports Digest keys" + )) + } + }; if digest.size_bytes < self.partition_size { return self .lower_store @@ -113,10 +138,14 @@ impl StoreDriver for SizePartitioningStore { .await } - fn inner_store(&self, digest: Option) -> &'_ dyn StoreDriver { - let Some(digest) = digest else { + fn inner_store(&self, key: Option>) -> &'_ dyn StoreDriver { + let Some(key) = key else { return self; }; + let digest = match key { + StoreKey::Digest(digest) => digest, + _ => return self, + }; if digest.size_bytes < self.partition_size { return self.lower_store.inner_store(Some(digest)); } diff --git a/nativelink-store/src/verify_store.rs b/nativelink-store/src/verify_store.rs index 8a36b949fd..61d1b0e89f 100644 --- a/nativelink-store/src/verify_store.rs +++ b/nativelink-store/src/verify_store.rs @@ -20,7 +20,6 @@ use nativelink_error::{make_input_err, Error, ResultExt}; use nativelink_util::buf_channel::{ make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf, }; -use nativelink_util::common::DigestInfo; use nativelink_util::digest_hasher::{ default_digest_hasher_func, DigestHasher, ACTIVE_HASHER_FUNC, }; @@ -29,7 +28,7 @@ use nativelink_util::metrics_utils::{ Collector, CollectorState, CounterWithTime, MetricsComponent, Registry, }; use nativelink_util::origin_context::ActiveOriginContext; -use nativelink_util::store_trait::{Store, StoreDriver, StoreLike, UploadSizeInfo}; +use nativelink_util::store_trait::{Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo}; pub struct VerifyStore { inner_store: Store, @@ -114,7 +113,7 @@ impl VerifyStore { impl StoreDriver for VerifyStore { async fn has_with_results( self: Pin<&Self>, - digests: &[DigestInfo], + digests: &[StoreKey<'_>], results: &mut [Option], ) -> Result<(), Error> { self.inner_store.has_with_results(digests, results).await @@ -122,10 +121,19 @@ impl StoreDriver for VerifyStore { async fn update( self: Pin<&Self>, - digest: DigestInfo, + key: StoreKey<'_>, reader: DropCloserReadHalf, size_info: UploadSizeInfo, ) -> Result<(), Error> { + let digest = match key { + StoreKey::Digest(digest) => digest, + _ => { + return Err(make_input_err!( + "Only digests are supported in VerifyStore. Got {:?}", + key + )); + } + }; let digest_size = usize::try_from(digest.size_bytes) .err_tip(|| "Digest size_bytes was not convertible to usize")?; if let UploadSizeInfo::ExactSize(expected_size) = size_info { @@ -163,17 +171,15 @@ impl StoreDriver for VerifyStore { async fn get_part( self: Pin<&Self>, - digest: DigestInfo, + key: StoreKey<'_>, writer: &mut DropCloserWriteHalf, offset: usize, length: Option, ) -> Result<(), Error> { - self.inner_store - .get_part(digest, writer, offset, length) - .await + self.inner_store.get_part(key, writer, offset, length).await } - fn inner_store(&self, _digest: Option) -> &'_ dyn StoreDriver { + fn inner_store(&self, _digest: Option) -> &'_ dyn StoreDriver { self } diff --git a/nativelink-store/tests/completeness_checking_store_test.rs b/nativelink-store/tests/completeness_checking_store_test.rs index d059bc430b..515a239bd5 100644 --- a/nativelink-store/tests/completeness_checking_store_test.rs +++ b/nativelink-store/tests/completeness_checking_store_test.rs @@ -124,7 +124,10 @@ mod completeness_checking_store_tests { let (ac_store, _cas_store, action_result_digest) = setup().await?; - let res = ac_store.has_many(&[action_result_digest]).await.unwrap(); + let res = ac_store + .has_many(&[action_result_digest.into()]) + .await + .unwrap(); assert!( res[0].is_some(), "Results should be some with all items in CAS." @@ -136,9 +139,12 @@ mod completeness_checking_store_tests { let (ac_store, cas_store, action_result_digest) = setup().await?; - cas_store.remove_entry(&ROOT_FILE).await; + cas_store.remove_entry(ROOT_FILE.into()).await; - let res = ac_store.has_many(&[action_result_digest]).await.unwrap(); + let res = ac_store + .has_many(&[action_result_digest.into()]) + .await + .unwrap(); assert!( res[0].is_none(), "Results should be none with missing root file." @@ -150,8 +156,11 @@ mod completeness_checking_store_tests { let (ac_store, cas_store, action_result_digest) = setup().await?; - cas_store.remove_entry(&CHILD_FILE).await; - let res = ac_store.has_many(&[action_result_digest]).await.unwrap(); + cas_store.remove_entry(CHILD_FILE.into()).await; + let res = ac_store + .has_many(&[action_result_digest.into()]) + .await + .unwrap(); assert!( res[0].is_none(), "Results should be none with missing root file." @@ -163,8 +172,11 @@ mod completeness_checking_store_tests { let (ac_store, cas_store, action_result_digest) = setup().await?; - cas_store.remove_entry(&OUTPUT_FILE).await; - let res = ac_store.has_many(&[action_result_digest]).await.unwrap(); + cas_store.remove_entry(OUTPUT_FILE.into()).await; + let res = ac_store + .has_many(&[action_result_digest.into()]) + .await + .unwrap(); assert!( res[0].is_none(), "Results should be none with missing root file." @@ -176,8 +188,11 @@ mod completeness_checking_store_tests { let (ac_store, cas_store, action_result_digest) = setup().await?; - cas_store.remove_entry(&STDOUT).await; - let res = ac_store.has_many(&[action_result_digest]).await.unwrap(); + cas_store.remove_entry(STDOUT.into()).await; + let res = ac_store + .has_many(&[action_result_digest.into()]) + .await + .unwrap(); assert!( res[0].is_none(), "Results should be none with missing root file." @@ -189,8 +204,11 @@ mod completeness_checking_store_tests { let (ac_store, cas_store, action_result_digest) = setup().await?; - cas_store.remove_entry(&STDERR).await; - let res = ac_store.has_many(&[action_result_digest]).await.unwrap(); + cas_store.remove_entry(STDERR.into()).await; + let res = ac_store + .has_many(&[action_result_digest.into()]) + .await + .unwrap(); assert!( res[0].is_none(), "Results should be none with missing root file." @@ -221,7 +239,7 @@ mod completeness_checking_store_tests { let (ac_store, cas_store, action_result_digest) = setup().await?; - cas_store.remove_entry(&OUTPUT_FILE).await; + cas_store.remove_entry(OUTPUT_FILE.into()).await; assert!( ac_store diff --git a/nativelink-store/tests/dedup_store_test.rs b/nativelink-store/tests/dedup_store_test.rs index e1c9bb683b..04d82a5975 100644 --- a/nativelink-store/tests/dedup_store_test.rs +++ b/nativelink-store/tests/dedup_store_test.rs @@ -111,7 +111,11 @@ mod dedup_store_tests { const LAST_CHUNK_SIZE: usize = 25779; let did_delete = content_store - .remove_entry(&DigestInfo::try_new(LAST_CHUNK_HASH, LAST_CHUNK_SIZE).unwrap()) + .remove_entry( + DigestInfo::try_new(LAST_CHUNK_HASH, LAST_CHUNK_SIZE) + .unwrap() + .into(), + ) .await; assert_eq!(did_delete, true, "Expected item to exist in store"); diff --git a/nativelink-store/tests/fast_slow_store_test.rs b/nativelink-store/tests/fast_slow_store_test.rs index efb51563df..84339cc4ee 100644 --- a/nativelink-store/tests/fast_slow_store_test.rs +++ b/nativelink-store/tests/fast_slow_store_test.rs @@ -26,7 +26,7 @@ use nativelink_store::noop_store::NoopStore; use nativelink_util::buf_channel::make_buf_channel_pair; use nativelink_util::common::DigestInfo; use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; -use nativelink_util::store_trait::{Store, StoreDriver, StoreLike}; +use nativelink_util::store_trait::{Store, StoreDriver, StoreKey, StoreLike}; use rand::rngs::SmallRng; use rand::{Rng, SeedableRng}; @@ -248,12 +248,12 @@ mod fast_slow_store_tests { impl StoreDriver for DropCheckStore { async fn has_with_results( self: Pin<&Self>, - digests: &[DigestInfo], + digests: &[StoreKey<'_>], results: &mut [Option], ) -> Result<(), Error> { if let Some(has_digest) = self.digest { for (digest, result) in digests.iter().zip(results.iter_mut()) { - if digest.hash_str() == has_digest.hash_str() { + if *digest == has_digest.into() { *result = Some(has_digest.size_bytes as usize); } } @@ -263,7 +263,7 @@ mod fast_slow_store_tests { async fn update( self: Pin<&Self>, - _digest: DigestInfo, + _digest: StoreKey<'_>, mut reader: nativelink_util::buf_channel::DropCloserReadHalf, _size_info: nativelink_util::store_trait::UploadSizeInfo, ) -> Result<(), Error> { @@ -284,20 +284,20 @@ mod fast_slow_store_tests { async fn get_part( self: Pin<&Self>, - digest: DigestInfo, + key: StoreKey<'_>, writer: &mut nativelink_util::buf_channel::DropCloserWriteHalf, offset: usize, length: Option, ) -> Result<(), Error> { // Gets called in the slow store and we provide the data that's // sent to the upstream and the fast store. - let bytes = length.unwrap_or(digest.size_bytes as usize) - offset; + let bytes = length.unwrap_or(key.into_digest().size_bytes as usize) - offset; let data = vec![0_u8; bytes]; writer.send(Bytes::copy_from_slice(&data)).await?; writer.send_eof() } - fn inner_store(&self, _digest: Option) -> &'_ dyn StoreDriver { + fn inner_store(&self, _digest: Option>) -> &'_ dyn StoreDriver { self } diff --git a/nativelink-store/tests/filesystem_store_test.rs b/nativelink-store/tests/filesystem_store_test.rs index 3fac7d5f6e..a95764a52f 100644 --- a/nativelink-store/tests/filesystem_store_test.rs +++ b/nativelink-store/tests/filesystem_store_test.rs @@ -1139,10 +1139,10 @@ mod filesystem_store_tests { .await?, ); - let digests = vec![digest]; + let keys = vec![digest.into()]; let mut results = vec![None]; let _ = store - .has_with_results(&digests, &mut results) + .has_with_results(&keys, &mut results) .await .err_tip(|| "Failed to get_part"); assert_eq!(results, vec!(Some(0))); diff --git a/nativelink-store/tests/memory_store_test.rs b/nativelink-store/tests/memory_store_test.rs index 74cd449552..adadebea53 100644 --- a/nativelink-store/tests/memory_store_test.rs +++ b/nativelink-store/tests/memory_store_test.rs @@ -279,7 +279,7 @@ mod memory_store_tests { packed_hash: Sha256::new().finalize().into(), size_bytes: 0, }; - let digests = vec![digest]; + let keys = vec![digest.into()]; let mut results = vec![None]; let store_owned = MemoryStore::new(&nativelink_config::stores::MemoryStore::default()); @@ -287,7 +287,7 @@ mod memory_store_tests { let _ = store .as_ref() - .has_with_results(&digests, &mut results) + .has_with_results(&keys, &mut results) .await .err_tip(|| "Failed to get_part"); assert_eq!(results, vec!(Some(0))); diff --git a/nativelink-store/tests/ref_store_test.rs b/nativelink-store/tests/ref_store_test.rs index 7cdc8a40f9..652b8ced6b 100644 --- a/nativelink-store/tests/ref_store_test.rs +++ b/nativelink-store/tests/ref_store_test.rs @@ -164,7 +164,8 @@ mod ref_store_tests { // Ensure the result of inner_store() points to exact same memory store. assert_eq!( - ref_store_outer.inner_store(None) as *const dyn StoreDriver as *const (), + ref_store_outer.inner_store(Option::::None) as *const dyn StoreDriver + as *const (), memory_store.into_inner().as_ref() as *const dyn StoreDriver as *const (), "Expected inner store to be memory store" ); diff --git a/nativelink-store/tests/s3_store_test.rs b/nativelink-store/tests/s3_store_test.rs index 3eaed98896..bdf787607a 100644 --- a/nativelink-store/tests/s3_store_test.rs +++ b/nativelink-store/tests/s3_store_test.rs @@ -637,7 +637,7 @@ mod s3_store_tests { packed_hash: Sha256::new().finalize().into(), size_bytes: 0, }; - let digests = vec![digest]; + let keys = vec![digest.into()]; let mut results = vec![None]; let mock_client = StaticReplayClient::new(vec![]); @@ -657,7 +657,7 @@ mod s3_store_tests { )?; let _ = store - .has_with_results(&digests, &mut results) + .has_with_results(&keys, &mut results) .await .err_tip(|| "Failed to get_part"); assert_eq!(results, vec!(Some(0))); diff --git a/nativelink-store/tests/shard_store_test.rs b/nativelink-store/tests/shard_store_test.rs index 52f3a37b6c..079f9cd9e6 100644 --- a/nativelink-store/tests/shard_store_test.rs +++ b/nativelink-store/tests/shard_store_test.rs @@ -127,7 +127,7 @@ mod shard_store_tests { assert_eq!( shard_store - .has_many(&[missing_digest1, missing_digest2]) + .has_many(&[missing_digest1.into(), missing_digest2.into()]) .await, Ok(vec![None, None]) ); @@ -146,7 +146,9 @@ mod shard_store_tests { .await?; assert_eq!( - shard_store.has_many(&[digest1, missing_digest]).await, + shard_store + .has_many(&[digest1.into(), missing_digest.into()]) + .await, Ok(vec![Some(MEGABYTE_SZ), None]) ); Ok(()) @@ -168,7 +170,9 @@ mod shard_store_tests { .await?; assert_eq!( - shard_store.has_many(&[digest1, digest2]).await, + shard_store + .has_many(&[digest1.into(), digest2.into()]) + .await, Ok(vec![Some(original_data1.len()), Some(original_data2.len())]) ); Ok(()) diff --git a/nativelink-util/Cargo.toml b/nativelink-util/Cargo.toml index fc24438aef..393fe01519 100644 --- a/nativelink-util/Cargo.toml +++ b/nativelink-util/Cargo.toml @@ -18,6 +18,7 @@ hex = "0.4.3" hyper = "0.14.28" lru = "0.12.3" parking_lot = "0.12.2" +hashbrown = "0.14.5" pin-project-lite = "0.2.14" prometheus-client = "0.21.2" prost = "0.12.4" diff --git a/nativelink-util/src/evicting_map.rs b/nativelink-util/src/evicting_map.rs index 4d85b9bdf9..4d4fa6a24f 100644 --- a/nativelink-util/src/evicting_map.rs +++ b/nativelink-util/src/evicting_map.rs @@ -17,6 +17,7 @@ use std::cmp::Eq; use std::fmt::Debug; use std::hash::Hash; use std::ops::DerefMut; +use std::pin::Pin; use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; @@ -250,9 +251,14 @@ where } /// Return the size of a `key`, if not found `None` is returned. - pub async fn size_for_key(&self, key: &impl Borrow) -> Option { + pub async fn size_for_key(&self, key: &Q) -> Option + where + Self: Unpin, + K: Borrow, + Q: Hash + Eq + Debug, + { let mut results = [None]; - self.sizes_for_keys([key.borrow()], &mut results[..]).await; + Pin::new(self).sizes_for_keys([key], &mut results[..]).await; results[0] } @@ -260,10 +266,12 @@ where /// to be provided for storing the resulting key sizes. Each index value in /// `keys` maps directly to the size value for the key in `results`. /// If no key is found in the internal map, `None` is filled in its place. - pub async fn sizes_for_keys(&self, keys: It, results: &mut [Option]) + pub async fn sizes_for_keys<'a, It, Q, R>(&'a self, keys: It, results: &mut [Option]) where - It: IntoIterator, - RefKey: Borrow, + It: IntoIterator + 'a, + K: Borrow, + R: Borrow + 'a, + Q: Hash + Eq + Debug + 'a, { let mut state = self.state.lock().await; @@ -296,7 +304,11 @@ where } } - pub async fn get(&self, key: &impl Borrow) -> Option { + pub async fn get(&self, key: &Q) -> Option + where + K: Borrow, + Q: Hash + Eq + Debug, + { let mut state = self.state.lock().await; self.evict_items(state.deref_mut()).await; @@ -365,12 +377,20 @@ where replaced_items } - pub async fn remove(&self, key: &impl Borrow) -> bool { + pub async fn remove(&self, key: &Q) -> bool + where + K: Borrow, + Q: Hash + Eq + Debug, + { let mut state = self.state.lock().await; self.inner_remove(&mut state, key).await } - async fn inner_remove(&self, mut state: &mut State, key: &impl Borrow) -> bool { + async fn inner_remove(&self, mut state: &mut State, key: &Q) -> bool + where + K: Borrow, + Q: Hash + Eq + Debug, + { self.evict_items(state.deref_mut()).await; if let Some(entry) = state.lru.pop(key.borrow()) { state.remove(&entry, false).await; @@ -381,7 +401,11 @@ where /// Same as remove(), but allows for a conditional to be applied to the entry before removal /// in an atomic fashion. - pub async fn remove_if bool>(&self, key: &impl Borrow, cond: F) -> bool { + pub async fn remove_if bool>(&self, key: &Q, cond: F) -> bool + where + K: Borrow, + Q: Hash + Eq + Debug, + { let mut state = self.state.lock().await; if let Some(entry) = state.lru.get(key.borrow()) { if !cond(&entry.data) { diff --git a/nativelink-util/src/store_trait.rs b/nativelink-util/src/store_trait.rs index 6347333eb0..174892e6c0 100644 --- a/nativelink-util/src/store_trait.rs +++ b/nativelink-util/src/store_trait.rs @@ -33,7 +33,7 @@ use tokio::time::timeout; use crate::buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf}; use crate::common::DigestInfo; -use crate::digest_hasher::{default_digest_hasher_func, DigestHasher}; +use crate::digest_hasher::{default_digest_hasher_func, DigestHasher, DigestHasherFunc}; use crate::fs::{self, idle_file_descriptor_timeout}; use crate::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator}; use crate::metrics_utils::Registry; @@ -77,7 +77,7 @@ pub enum UploadSizeInfo { // optimizations that may be present in the inner store. pub async fn slow_update_store_with_file( store: Pin<&S>, - digest: DigestInfo, + digest: impl Into>, file: &mut fs::ResumeableFileSlot, upload_size: UploadSizeInfo, ) -> Result<(), Error> { @@ -90,7 +90,7 @@ pub async fn slow_update_store_with_file( let (tx, rx) = make_buf_channel_pair(); let mut update_fut = store - .update(digest, rx, upload_size) + .update(digest.into(), rx, upload_size) .map(|r| r.err_tip(|| "Could not upload data to store in upload_file_to_store")); let read_result = { let read_data_fut = async { @@ -144,6 +144,108 @@ pub enum StoreOptimizations { NoopDownloads, } +#[derive(Debug, Eq)] +pub enum StoreKey<'a> { + /// A string key. + Str(Cow<'a, str>), + + /// A key that is a digest. + Digest(DigestInfo), +} + +impl<'a> StoreKey<'a> { + pub fn borrow(&'a self) -> StoreKey<'a> { + match self { + StoreKey::Str(Cow::Owned(s)) => StoreKey::Str(Cow::Borrowed(s)), + StoreKey::Str(Cow::Borrowed(s)) => StoreKey::Str(Cow::Borrowed(s)), + StoreKey::Digest(d) => StoreKey::Digest(*d), + } + } + + pub fn into_owned(self) -> StoreKey<'static> { + match self { + StoreKey::Str(Cow::Owned(s)) => StoreKey::Str(Cow::Owned(s)), + StoreKey::Str(Cow::Borrowed(s)) => StoreKey::Str(Cow::Owned(s.to_owned())), + StoreKey::Digest(d) => StoreKey::Digest(d), + } + } + + pub fn into_digest(self) -> DigestInfo { + match self { + StoreKey::Digest(digest) => digest, + StoreKey::Str(s) => { + let mut hasher = DigestHasherFunc::Blake3.hasher(); + hasher.update(s.as_bytes()); + hasher.finalize_digest() + } + } + } + + pub fn as_str(&'a self) -> Cow<'a, str> { + match self { + StoreKey::Str(Cow::Owned(s)) => Cow::Borrowed(s), + StoreKey::Str(Cow::Borrowed(s)) => Cow::Borrowed(s), + StoreKey::Digest(d) => Cow::Owned(format!("{}-{}", d.hash_str(), d.size_bytes)), + } + } +} + +impl PartialEq for StoreKey<'_> { + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (StoreKey::Str(a), StoreKey::Str(b)) => a == b, + (StoreKey::Digest(a), StoreKey::Digest(b)) => a == b, + _ => false, + } + } +} + +impl Hash for StoreKey<'_> { + fn hash(&self, state: &mut H) { + /// Salts the hash with the enum value that represents + /// the type of the key. + #[repr(u8)] + enum HashId { + Str = 0, + Digest = 1, + } + match self { + StoreKey::Str(s) => { + (HashId::Str as u8).hash(state); + s.hash(state) + } + StoreKey::Digest(d) => { + (HashId::Digest as u8).hash(state); + d.hash(state) + } + } + } +} + +impl<'a> From<&'a str> for StoreKey<'a> { + fn from(s: &'a str) -> Self { + StoreKey::Str(Cow::Borrowed(s)) + } +} + +impl From for StoreKey<'static> { + fn from(s: String) -> Self { + StoreKey::Str(Cow::Owned(s)) + } +} + +impl<'a> From for StoreKey<'a> { + fn from(d: DigestInfo) -> Self { + StoreKey::Digest(d) + } +} + +impl<'a> From<&DigestInfo> for StoreKey<'a> { + fn from(d: &DigestInfo) -> Self { + StoreKey::Digest(*d) + } +} + #[derive(Clone)] #[repr(transparent)] pub struct Store { @@ -170,13 +272,16 @@ impl Store { /// (if applicable) and check if it implements some special traits that allow optimizations. /// Note: If the store performs complex operations on the data, it should return itself. #[inline] - pub fn inner_store(&self, digest: Option) -> &dyn StoreDriver { - self.inner.inner_store(digest) + pub fn inner_store<'a, K: Into>>(&self, digest: Option) -> &dyn StoreDriver { + self.inner.inner_store(digest.map(|v| v.into())) } /// Tries to cast the underlying store to the given type. #[inline] - pub fn downcast_ref(&self, maybe_digest: Option) -> Option<&U> { + pub fn downcast_ref( + &self, + maybe_digest: Option>, + ) -> Option<&U> { self.inner.inner_store(maybe_digest).as_any().downcast_ref() } @@ -218,7 +323,7 @@ where } } -pub trait StoreLike: Send + Sync + Sized { +pub trait StoreLike: Send + Sync + Sized + Unpin + 'static { /// Returns the immediate inner store driver. fn as_store_driver(&self) -> &'_ dyn StoreDriver; @@ -235,8 +340,11 @@ pub trait StoreLike: Send + Sync + Sized { /// the store, or Some(size) if it does. /// Note: On an AC store the size will be incorrect and should not be used! #[inline] - fn has(&self, digest: DigestInfo) -> impl Future, Error>> + '_ { - self.as_store_driver_pin().has(digest) + fn has<'a>( + &'a self, + digest: impl Into>, + ) -> impl Future, Error>> + 'a { + self.as_store_driver_pin().has(digest.into()) } /// Look up a list of digests in the store and return a result for each in @@ -244,35 +352,35 @@ pub trait StoreLike: Send + Sync + Sized { /// exist in the store, or Some(size) if it does. /// Note: On an AC store the size will be incorrect and should not be used! #[inline] - fn has_many<'b>( - &'b self, - digests: &'b [DigestInfo], - ) -> impl Future>, Error>> + Send + 'b { + fn has_many<'a>( + &'a self, + digests: &'a [StoreKey<'a>], + ) -> impl Future>, Error>> + Send + 'a { self.as_store_driver_pin().has_many(digests) } /// The implementation of the above has and has_many functions. See their /// documentation for details. #[inline] - fn has_with_results<'b>( - &'b self, - digests: &'b [DigestInfo], - results: &'b mut [Option], - ) -> impl Future> + Send + 'b { + fn has_with_results<'a>( + &'a self, + digests: &'a [StoreKey<'a>], + results: &'a mut [Option], + ) -> impl Future> + Send + 'a { self.as_store_driver_pin() .has_with_results(digests, results) } /// Sends the data to the store. #[inline] - fn update( - &self, - digest: DigestInfo, + fn update<'a>( + &'a self, + digest: impl Into>, reader: DropCloserReadHalf, upload_size: UploadSizeInfo, - ) -> impl Future> + Send + '_ { + ) -> impl Future> + Send + 'a { self.as_store_driver_pin() - .update(digest, reader, upload_size) + .update(digest.into(), reader, upload_size) } /// Any optimizations the store might want to expose to the callers. @@ -286,66 +394,78 @@ pub trait StoreLike: Send + Sync + Sized { /// This is useful if the underlying store can optimize the upload process /// when it knows the data is coming from a file. #[inline] - fn update_with_whole_file( - &self, - digest: DigestInfo, + fn update_with_whole_file<'a>( + &'a self, + digest: impl Into>, file: fs::ResumeableFileSlot, upload_size: UploadSizeInfo, - ) -> impl Future, Error>> + Send + '_ { + ) -> impl Future, Error>> + Send + 'a { self.as_store_driver_pin() - .update_with_whole_file(digest, file, upload_size) + .update_with_whole_file(digest.into(), file, upload_size) } /// Utility to send all the data to the store when you have all the bytes. #[inline] - fn update_oneshot( - &self, - digest: DigestInfo, + fn update_oneshot<'a>( + &'a self, + digest: impl Into>, data: Bytes, - ) -> impl Future> + Send + '_ { - self.as_store_driver_pin().update_oneshot(digest, data) + ) -> impl Future> + Send + 'a { + self.as_store_driver_pin() + .update_oneshot(digest.into(), data) } /// Retreives part of the data from the store and writes it to the given writer. #[inline] - fn get_part<'b>( - &'b self, - digest: DigestInfo, - mut writer: impl BorrowMut + Send + 'b, + fn get_part<'a>( + &'a self, + digest: impl Into>, + mut writer: impl BorrowMut + Send + 'a, offset: usize, length: Option, - ) -> impl Future> + Send + 'b { + ) -> impl Future> + Send + 'a { + let key = digest.into(); // Note: We need to capture `writer` just in case the caller // expects the drop() method to be called on it when the future // is done due to the complex interaction between the DropCloserWriteHalf // and the DropCloserReadHalf during drop(). async move { self.as_store_driver_pin() - .get_part(digest, writer.borrow_mut(), offset, length) + .get_part(key, writer.borrow_mut(), offset, length) .await } } /// Utility that works the same as `.get_part()`, but writes all the data. #[inline] - fn get( - &self, - digest: DigestInfo, + fn get<'a>( + &'a self, + key: impl Into>, writer: DropCloserWriteHalf, - ) -> impl Future> + Send + '_ { - self.as_store_driver_pin().get(digest, writer) + ) -> impl Future> + Send + 'a { + self.as_store_driver_pin().get(key.into(), writer) + } + + fn get_part_unchunked_test<'a>( + &'a self, + key: StoreKey<'a>, + offset: usize, + length: Option, + ) -> impl Future> + Send + Unpin + 'a { + self.as_store_driver_pin() + .get_part_unchunked(key, offset, length) } /// Utility that will return all the bytes at once instead of in a streaming manner. #[inline] - fn get_part_unchunked( - &self, - digest: DigestInfo, + fn get_part_unchunked<'a>( + &'a self, + key: impl Into> + 'a, offset: usize, length: Option, - ) -> impl Future> + Send + '_ { + ) -> impl Future> + Send + Unpin + 'a { self.as_store_driver_pin() - .get_part_unchunked(digest, offset, length) + .get_part_unchunked(key.into(), offset, length) } /// Default implementation of the health check. Some stores may want to override this @@ -363,9 +483,9 @@ pub trait StoreLike: Send + Sync + Sized { pub trait StoreDriver: Sync + Send + Unpin + HealthStatusIndicator + 'static { /// See: `StoreLike::has()` for details. #[inline] - async fn has(self: Pin<&Self>, digest: DigestInfo) -> Result, Error> { + async fn has(self: Pin<&Self>, key: StoreKey<'_>) -> Result, Error> { let mut result = [None]; - self.has_with_results(&[digest], &mut result).await?; + self.has_with_results(&[key], &mut result).await?; Ok(result[0]) } @@ -373,7 +493,7 @@ pub trait StoreDriver: Sync + Send + Unpin + HealthStatusIndicator + 'static { #[inline] async fn has_many( self: Pin<&Self>, - digests: &[DigestInfo], + digests: &[StoreKey<'_>], ) -> Result>, Error> { let mut results = vec![None; digests.len()]; self.has_with_results(digests, &mut results).await?; @@ -383,14 +503,14 @@ pub trait StoreDriver: Sync + Send + Unpin + HealthStatusIndicator + 'static { /// See: `StoreLike::has_with_results()` for details. async fn has_with_results( self: Pin<&Self>, - digests: &[DigestInfo], + digests: &[StoreKey<'_>], results: &mut [Option], ) -> Result<(), Error>; /// See: `StoreLike::update()` for details. async fn update( self: Pin<&Self>, - digest: DigestInfo, + key: StoreKey<'_>, reader: DropCloserReadHalf, upload_size: UploadSizeInfo, ) -> Result<(), Error>; @@ -403,30 +523,26 @@ pub trait StoreDriver: Sync + Send + Unpin + HealthStatusIndicator + 'static { /// See: `StoreLike::update_with_whole_file()` for details. async fn update_with_whole_file( self: Pin<&Self>, - digest: DigestInfo, + key: StoreKey<'_>, mut file: fs::ResumeableFileSlot, upload_size: UploadSizeInfo, ) -> Result, Error> { - let inner_store = self.inner_store(Some(digest)); + let inner_store = self.inner_store(Some(key.borrow())); if inner_store.optimized_for(StoreOptimizations::FileUpdates) { error_if!( addr_eq(inner_store, self.deref()), "Store::inner_store() returned self when optimization present" ); return Pin::new(inner_store) - .update_with_whole_file(digest, file, upload_size) + .update_with_whole_file(key, file, upload_size) .await; } - slow_update_store_with_file(self, digest, &mut file, upload_size).await?; + slow_update_store_with_file(self, key, &mut file, upload_size).await?; Ok(Some(file)) } /// See: `StoreLike::update_oneshot()` for details. - async fn update_oneshot( - self: Pin<&Self>, - digest: DigestInfo, - data: Bytes, - ) -> Result<(), Error> { + async fn update_oneshot(self: Pin<&Self>, key: StoreKey<'_>, data: Bytes) -> Result<(), Error> { // TODO(blaise.bruer) This is extremely inefficient, since we have exactly // what we need here. Maybe we could instead make a version of the stream // that can take objects already fully in memory instead? @@ -446,7 +562,7 @@ pub trait StoreDriver: Sync + Send + Unpin + HealthStatusIndicator + 'static { }; try_join!( send_fut, - self.update(digest, rx, UploadSizeInfo::ExactSize(data_len)) + self.update(key, rx, UploadSizeInfo::ExactSize(data_len)) )?; Ok(()) } @@ -454,7 +570,7 @@ pub trait StoreDriver: Sync + Send + Unpin + HealthStatusIndicator + 'static { /// See: `StoreLike::get_part()` for details. async fn get_part( self: Pin<&Self>, - digest: DigestInfo, + key: StoreKey<'_>, writer: &mut DropCloserWriteHalf, offset: usize, length: Option, @@ -464,16 +580,16 @@ pub trait StoreDriver: Sync + Send + Unpin + HealthStatusIndicator + 'static { #[inline] async fn get( self: Pin<&Self>, - digest: DigestInfo, + key: StoreKey<'_>, mut writer: DropCloserWriteHalf, ) -> Result<(), Error> { - self.get_part(digest, &mut writer, 0, None).await + self.get_part(key, &mut writer, 0, None).await } /// See: `StoreLike::get_part_unchunked()` for details. async fn get_part_unchunked( self: Pin<&Self>, - digest: DigestInfo, + key: StoreKey<'_>, offset: usize, length: Option, ) -> Result { @@ -486,7 +602,7 @@ pub trait StoreDriver: Sync + Send + Unpin + HealthStatusIndicator + 'static { rx.consume(length), // We use a closure here to ensure that the `tx` is dropped when the // future is done. - async move { self.get_part(digest, &mut tx, offset, length).await }, + async move { self.get_part(key, &mut tx, offset, length).await }, ); get_part_res .err_tip(|| "Failed to get_part in get_part_unchunked") @@ -514,18 +630,21 @@ pub trait StoreDriver: Sync + Send + Unpin + HealthStatusIndicator + 'static { let mut digest_hasher = default_digest_hasher_func().hasher(); digest_hasher.update(&digest_data); let digest_data_len = digest_data.len(); - let digest_info = digest_hasher.finalize_digest(); + let digest_info = StoreKey::from(digest_hasher.finalize_digest()); let digest_bytes = bytes::Bytes::copy_from_slice(&digest_data); - if let Err(e) = self.update_oneshot(digest_info, digest_bytes.clone()).await { + if let Err(e) = self + .update_oneshot(digest_info.borrow(), digest_bytes.clone()) + .await + { return HealthStatus::new_failed( self.get_ref(), format!("Store.update_oneshot() failed: {e}").into(), ); } - match self.has(digest_info).await { + match self.has(digest_info.borrow()).await { Ok(Some(s)) => { if s != digest_data_len { return HealthStatus::new_failed( @@ -572,7 +691,7 @@ pub trait StoreDriver: Sync + Send + Unpin + HealthStatusIndicator + 'static { } /// See: `StoreLike::inner_store()` for details. - fn inner_store(&self, _digest: Option) -> &dyn StoreDriver; + fn inner_store(&self, _digest: Option>) -> &dyn StoreDriver; /// Returns an Any variation of whatever Self is. fn as_any(&self) -> &(dyn std::any::Any + Sync + Send + 'static); diff --git a/nativelink-worker/src/running_actions_manager.rs b/nativelink-worker/src/running_actions_manager.rs index 10fc69bac2..48fad10da5 100644 --- a/nativelink-worker/src/running_actions_manager.rs +++ b/nativelink-worker/src/running_actions_manager.rs @@ -127,7 +127,7 @@ pub fn download_to_directory<'a>( current_directory: &'a str, ) -> BoxFuture<'a, Result<(), Error>> { async move { - let directory = get_and_decode_digest::(cas_store, digest) + let directory = get_and_decode_digest::(cas_store, digest.into()) .await .err_tip(|| "Converting digest to Directory")?; let mut futures = FuturesUnordered::new(); @@ -149,7 +149,7 @@ pub fn download_to_directory<'a>( } futures.push( cas_store - .populate_fast_store(digest) + .populate_fast_store(digest.into()) .and_then(move |_| async move { let file_entry = filesystem_store .get_file_entry_for_digest(&digest) @@ -283,8 +283,9 @@ async fn upload_file( .err_tip(|| "Could not rewind file")?; cas_store + .as_store_driver_pin() .update_with_whole_file( - digest, + digest.into(), resumeable_file, UploadSizeInfo::ExactSize(digest.size_bytes as usize), ) @@ -667,7 +668,7 @@ impl RunningActionImpl { let command_fut = self.metrics().get_proto_command_from_store.wrap(async { get_and_decode_digest::( self.running_actions_manager.cas_store.as_ref(), - &self.action_info.command_digest, + self.action_info.command_digest.into(), ) .await .err_tip(|| "Converting command_digest to Command") @@ -1492,7 +1493,7 @@ impl UploadActionResults { return Ok(()); }; // If we are a GrpcStore we shortcut here, as this is a special store. - if let Some(grpc_store) = ac_store.downcast_ref::(Some(action_digest)) { + if let Some(grpc_store) = ac_store.downcast_ref::(Some(action_digest.into())) { let update_action_request = UpdateActionResultRequest { // This is populated by `update_action_result`. instance_name: String::new(), @@ -1723,9 +1724,10 @@ impl RunningActionsManagerImpl { .err_tip(|| "Expected action_digest to exist on StartExecute")? .try_into()?; let load_start_timestamp = (self.callbacks.now_fn)(); - let action = get_and_decode_digest::(self.cas_store.as_ref(), &action_digest) - .await - .err_tip(|| "During start_action")?; + let action = + get_and_decode_digest::(self.cas_store.as_ref(), action_digest.into()) + .await + .err_tip(|| "During start_action")?; let action_info = ActionInfo::try_from_action_and_execute_request_with_salt( execute_request, action, diff --git a/nativelink-worker/tests/running_actions_manager_test.rs b/nativelink-worker/tests/running_actions_manager_test.rs index 714c0ffdbd..b6b266ae61 100644 --- a/nativelink-worker/tests/running_actions_manager_test.rs +++ b/nativelink-worker/tests/running_actions_manager_test.rs @@ -1092,7 +1092,7 @@ mod running_actions_manager_tests { }; let tree = get_and_decode_digest::( slow_store.as_ref(), - &action_result.output_folders[0].tree_digest, + action_result.output_folders[0].tree_digest.into(), ) .await?; let root_directory = Directory { @@ -1944,7 +1944,8 @@ exit 1 .await?; let retrieved_result = - get_and_decode_digest::(ac_store.as_ref(), &action_digest).await?; + get_and_decode_digest::(ac_store.as_ref(), action_digest.into()) + .await?; let proto_result: ProtoActionResult = action_result.into(); assert_eq!(proto_result, retrieved_result); @@ -2017,7 +2018,8 @@ exit 1 .await?; let retrieved_result = - get_and_decode_digest::(ac_store.as_ref(), &action_digest).await?; + get_and_decode_digest::(ac_store.as_ref(), action_digest.into()) + .await?; let proto_result: ProtoActionResult = action_result.into(); assert_eq!(proto_result, retrieved_result); @@ -2106,7 +2108,7 @@ exit 1 }; let retrieved_result = get_and_decode_digest::( cas_store.as_ref(), - &historical_digest, + historical_digest.into(), ) .await?; @@ -2214,7 +2216,7 @@ exit 1 let retrieved_result = get_and_decode_digest::( cas_store.as_ref(), - &historical_digest, + historical_digest.into(), ) .await?; @@ -2275,9 +2277,11 @@ exit 1 DigestInfo::try_new(action_result_hash, action_result_size.parse::()?)? }; - let retrieved_result = - get_and_decode_digest::(ac_store.as_ref(), &action_result_digest) - .await?; + let retrieved_result = get_and_decode_digest::( + ac_store.as_ref(), + action_result_digest.into(), + ) + .await?; let proto_result: ProtoActionResult = action_result.into(); assert_eq!(proto_result, retrieved_result);