Skip to content

Commit

Permalink
Refactor Store API to use StoreKey
Browse files Browse the repository at this point in the history
This enables us to use arbitrary strings when using the Store API.

closes TraceMachina#934
  • Loading branch information
allada committed Jun 4, 2024
1 parent 6d6630e commit 4dc8464
Show file tree
Hide file tree
Showing 40 changed files with 699 additions and 452 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions nativelink-scheduler/src/cache_lookup_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ async fn get_action_from_store(
digest_function: DigestHasherFunc,
) -> Option<ProtoActionResult> {
// If we are a GrpcStore we shortcut here, as this is a special store.
if let Some(grpc_store) = ac_store.downcast_ref::<GrpcStore>(Some(action_digest)) {
if let Some(grpc_store) = ac_store.downcast_ref::<GrpcStore>(Some(action_digest.into())) {
let action_result_request = GetActionResultRequest {
instance_name,
action_digest: Some(action_digest.into()),
Expand All @@ -79,7 +79,7 @@ async fn get_action_from_store(
.map(|response| response.into_inner())
.ok()
} else {
get_and_decode_digest::<ProtoActionResult>(ac_store, &action_digest)
get_and_decode_digest::<ProtoActionResult>(ac_store, action_digest.into())
.await
.ok()
}
Expand Down
12 changes: 9 additions & 3 deletions nativelink-service/src/ac_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,15 @@ impl AcServer {
.try_into()?;

// If we are a GrpcStore we shortcut here, as this is a special store.
if let Some(grpc_store) = store_info.store.downcast_ref::<GrpcStore>(Some(digest)) {
if let Some(grpc_store) = store_info
.store
.downcast_ref::<GrpcStore>(Some(digest.into()))
{
return grpc_store.get_action_result(Request::new(request)).await;
}

Ok(Response::new(
get_and_decode_digest::<ActionResult>(&store_info.store, &digest).await?,
get_and_decode_digest::<ActionResult>(&store_info.store, digest.into()).await?,
))
}

Expand Down Expand Up @@ -128,7 +131,10 @@ impl AcServer {
.try_into()?;

// If we are a GrpcStore we shortcut here, as this is a special store.
if let Some(grpc_store) = store_info.store.downcast_ref::<GrpcStore>(Some(digest)) {
if let Some(grpc_store) = store_info
.store
.downcast_ref::<GrpcStore>(Some(digest.into()))
{
return grpc_store.update_action_result(Request::new(request)).await;
}

Expand Down
6 changes: 3 additions & 3 deletions nativelink-service/src/bytestream_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ impl ByteStreamServer {
let digest = DigestInfo::try_new(resource_info.hash.as_ref(), resource_info.expected_size)?;

// If we are a GrpcStore we shortcut here, as this is a special store.
if let Some(grpc_store) = store_clone.downcast_ref::<GrpcStore>(Some(digest)) {
if let Some(grpc_store) = store_clone.downcast_ref::<GrpcStore>(Some(digest.into())) {
return grpc_store
.query_write_status(Request::new(query_request.clone()))
.await;
Expand Down Expand Up @@ -582,7 +582,7 @@ impl ByteStream for ByteStreamServer {
let digest = DigestInfo::try_new(resource_info.hash.as_ref(), resource_info.expected_size)?;

// If we are a GrpcStore we shortcut here, as this is a special store.
if let Some(grpc_store) = store.downcast_ref::<GrpcStore>(Some(digest)) {
if let Some(grpc_store) = store.downcast_ref::<GrpcStore>(Some(digest.into())) {
let stream = grpc_store.read(Request::new(read_request)).await?;
return Ok(Response::new(Box::pin(stream)));
}
Expand Down Expand Up @@ -638,7 +638,7 @@ impl ByteStream for ByteStreamServer {
.err_tip(|| "Invalid digest input in ByteStream::write")?;

// If we are a GrpcStore we shortcut here, as this is a special store.
if let Some(grpc_store) = store.downcast_ref::<GrpcStore>(Some(digest)) {
if let Some(grpc_store) = store.downcast_ref::<GrpcStore>(Some(digest.into())) {
return grpc_store.write(stream).await.map_err(|e| e.into());
}

Expand Down
4 changes: 2 additions & 2 deletions nativelink-service/src/cas_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl CasServer {

let mut requested_blobs = Vec::with_capacity(request.blob_digests.len());
for digest in request.blob_digests.iter() {
requested_blobs.push(DigestInfo::try_from(digest.clone())?);
requested_blobs.push(DigestInfo::try_from(digest.clone())?.into());
}
let sizes = store
.has_many(&requested_blobs)
Expand Down Expand Up @@ -255,7 +255,7 @@ impl CasServer {

while !deque.is_empty() {
let digest: DigestInfo = deque.pop_front().err_tip(|| "In VecDeque::pop_front")?;
let directory = get_and_decode_digest::<Directory>(&store, &digest)
let directory = get_and_decode_digest::<Directory>(&store, digest.into())
.await
.err_tip(|| "Converting digest to Directory")?;
if digest == page_token_digest {
Expand Down
5 changes: 3 additions & 2 deletions nativelink-service/src/execution_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl InstanceInfo {
// Goma puts the properties in the Command.
if platform_properties.is_empty() {
let command =
get_and_decode_digest::<Command>(&self.cas_store, &command_digest).await?;
get_and_decode_digest::<Command>(&self.cas_store, command_digest.into()).await?;
if let Some(platform) = &command.platform {
for property in &platform.properties {
let platform_property = self
Expand Down Expand Up @@ -209,7 +209,8 @@ impl ExecutionServer {
.execution_policy
.map_or(DEFAULT_EXECUTION_PRIORITY, |p| p.priority);

let action = get_and_decode_digest::<Action>(&instance_info.cas_store, &digest).await?;
let action =
get_and_decode_digest::<Action>(&instance_info.cas_store, digest.into()).await?;
let action_info = instance_info
.build_action_info(
instance_name,
Expand Down
4 changes: 2 additions & 2 deletions nativelink-service/tests/ac_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ mod get_action_result {
assert_eq!(err.code(), Code::NotFound);
assert_eq!(
err.message(),
"Hash 0123456789abcdef000000000000000000000000000000000123456789abcdef not found"
"Key Digest(DigestInfo { size_bytes: 0, hash: \"0123456789abcdef000000000000000000000000000000000123456789abcdef\" }) not found"
);
Ok(())
}
Expand Down Expand Up @@ -174,7 +174,7 @@ mod get_action_result {
assert_eq!(err.code(), Code::NotFound);
assert_eq!(
err.message(),
"Hash 0123456789abcdef000000000000000000000000000000000123456789abcdef not found"
"Key Digest(DigestInfo { size_bytes: 146, hash: \"0123456789abcdef000000000000000000000000000000000123456789abcdef\" }) not found"
);
Ok(())
}
Expand Down
3 changes: 1 addition & 2 deletions nativelink-service/tests/bytestream_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -954,8 +954,7 @@ pub mod read_tests {

let result = result_fut.await.err_tip(|| "Expected result to be ready")?;
let expected_err_str = concat!(
"status: NotFound, message: \"Hash 0123456789abcdef000000000000000000000000000000000123456789abcdef ",
"not found\", details: [], metadata: MetadataMap { headers: {} }",
"status: NotFound, message: \"Key Digest(DigestInfo { size_bytes: 55, hash: \\\"0123456789abcdef000000000000000000000000000000000123456789abcdef\\\" }) not found\", details: [], metadata: MetadataMap { headers: {} }",
);
assert_eq!(
Error::from(result.unwrap_err()),
Expand Down
7 changes: 5 additions & 2 deletions nativelink-service/tests/cas_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use nativelink_store::ac_utils::serialize_and_upload_message;
use nativelink_store::default_store_factory::store_factory;
use nativelink_store::store_manager::StoreManager;
use nativelink_util::common::DigestInfo;
use nativelink_util::store_trait::StoreLike;
use nativelink_util::store_trait::{StoreKey, StoreLike};
use prometheus_client::registry::Registry;
use tonic::Request;

Expand Down Expand Up @@ -309,7 +309,10 @@ mod batch_read_blobs {
data: vec![].into(),
status: Some(GrpcStatus {
code: Code::NotFound as i32,
message: format!("Hash {} not found", digest3.hash),
message: format!(
"Key {:?} not found",
StoreKey::from(DigestInfo::try_from(digest3)?)
),
details: vec![],
}),
compressor: compressor::Value::Identity.into(),
Expand Down
1 change: 1 addition & 0 deletions nativelink-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ aws-config = "1.4.0"
aws-sdk-s3 = { version = "1.28.0" }
aws-smithy-runtime = { version = "1.5.0" }
bincode = "1.3.3"
pin-project-lite = "0.2.14"
blake3 = "1.5.1"
byteorder = "1.5.0"
bytes = "1.6.0"
Expand Down
22 changes: 14 additions & 8 deletions nativelink-store/src/ac_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use futures::TryFutureExt;
use nativelink_error::{Code, Error, ResultExt};
use nativelink_util::common::DigestInfo;
use nativelink_util::digest_hasher::DigestHasher;
use nativelink_util::store_trait::StoreLike;
use nativelink_util::store_trait::{StoreKey, StoreLike};
use prost::Message;

// NOTE(blaise.bruer) From some local testing it looks like action cache items are rarely greater than
Expand All @@ -37,22 +37,27 @@ pub const ESTIMATED_DIGEST_SIZE: usize = 2048;
const MAX_ACTION_MSG_SIZE: usize = 10 << 20; // 10mb.

/// Attempts to fetch the digest contents from a store into the associated proto.
pub async fn get_and_decode_digest<'a, T: Message + Default>(
pub async fn get_and_decode_digest<T: Message + Default + 'static>(
store: &impl StoreLike,
digest: &DigestInfo,
key: StoreKey<'_>,
) -> Result<T, Error> {
get_size_and_decode_digest(store, digest)
get_size_and_decode_digest(store, key)
.map_ok(|(v, _)| v)
.await
}

/// Attempts to fetch the digest contents from a store into the associated proto.
pub async fn get_size_and_decode_digest<'a, T: Message + Default>(
pub async fn get_size_and_decode_digest<T: Message + Default + 'static>(
store: &impl StoreLike,
digest: &DigestInfo,
key: impl Into<StoreKey<'_>>,
) -> Result<(T, usize), Error> {
// Note: For unknown reasons we appear to be hitting:
// https://github.com/rust-lang/rust/issues/92096
// or a smiliar issue if we try to use the non-store driver function, so we
// are using the store driver function here.
let mut store_data_resp = store
.get_part_unchunked(*digest, 0, Some(MAX_ACTION_MSG_SIZE))
.as_store_driver_pin()
.get_part_unchunked(key.into(), 0, Some(MAX_ACTION_MSG_SIZE))
.await;
if let Err(err) = &mut store_data_resp {
if err.code == Code::NotFound {
Expand Down Expand Up @@ -98,7 +103,8 @@ pub async fn serialize_and_upload_message<'a, T: Message>(
let digest = message_to_digest(message, &mut buffer, hasher)
.err_tip(|| "In serialize_and_upload_message")?;
cas_store
.update_oneshot(digest, buffer.freeze())
.as_store_driver_pin()
.update_oneshot(digest.into(), buffer.freeze())
.await
.err_tip(|| "In serialize_and_upload_message")?;
Ok(digest)
Expand Down
8 changes: 6 additions & 2 deletions nativelink-store/src/cas_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use nativelink_util::common::DigestInfo;
use nativelink_util::store_trait::StoreKey;

pub const ZERO_BYTE_DIGESTS: [DigestInfo; 2] = [
// Sha256 hash of zero bytes.
Expand All @@ -36,6 +37,9 @@ pub const ZERO_BYTE_DIGESTS: [DigestInfo; 2] = [
];

#[inline]
pub fn is_zero_digest(digest: &DigestInfo) -> bool {
digest.size_bytes == 0 && ZERO_BYTE_DIGESTS.contains(digest)
pub fn is_zero_digest<'a>(digest: impl Into<StoreKey<'a>>) -> bool {
match digest.into() {
StoreKey::Digest(digest) => digest.size_bytes == 0 && ZERO_BYTE_DIGESTS.contains(&digest),
_ => false,
}
}
43 changes: 22 additions & 21 deletions nativelink-store/src/completeness_checking_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use nativelink_util::health_utils::{default_health_status_indicator, HealthStatu
use nativelink_util::metrics_utils::{
Collector, CollectorState, CounterWithTime, MetricsComponent, Registry,
};
use nativelink_util::store_trait::{Store, StoreDriver, StoreLike, UploadSizeInfo};
use nativelink_util::store_trait::{Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo};
use parking_lot::Mutex;
use tokio::sync::Notify;
use tracing::{event, Level};
Expand All @@ -40,7 +40,7 @@ use crate::ac_utils::{get_and_decode_digest, get_size_and_decode_digest};
/// output directories that need to be checked.
fn get_digests_and_output_dirs(
action_result: ProtoActionResult,
) -> Result<(Vec<DigestInfo>, Vec<ProtoOutputDirectory>), Error> {
) -> Result<(Vec<StoreKey<'static>>, Vec<ProtoOutputDirectory>), Error> {
// TODO(allada) When `try_collect()` is stable we can use it instead.
let mut digest_iter = action_result
.output_files
Expand All @@ -51,7 +51,7 @@ fn get_digests_and_output_dirs(
let mut digest_infos = Vec::with_capacity(digest_iter.size_hint().1.unwrap_or(0));
digest_iter
.try_for_each(|maybe_digest| {
digest_infos.push(maybe_digest?);
digest_infos.push(maybe_digest?.into());
Result::<_, Error>::Ok(())
})
.err_tip(|| "Some digests could not be converted to DigestInfos")?;
Expand All @@ -61,10 +61,10 @@ fn get_digests_and_output_dirs(
/// Given a list of output directories recursively get all digests
/// that need to be checked and pass them into `handle_digest_infos_fn`
/// as they are found.
async fn check_output_directories(
async fn check_output_directories<'a>(
cas_store: &Store,
output_directories: Vec<ProtoOutputDirectory>,
handle_digest_infos_fn: &impl Fn(Vec<DigestInfo>),
handle_digest_infos_fn: &impl Fn(Vec<StoreKey<'a>>),
) -> Result<(), Error> {
let mut futures = FuturesUnordered::new();

Expand All @@ -75,7 +75,7 @@ async fn check_output_directories(
let tree_digest = maybe_tree_digest
.err_tip(|| "Could not decode tree digest CompletenessCheckingStore::has")?;
futures.push(async move {
let tree = get_and_decode_digest::<ProtoTree>(cas_store, &tree_digest).await?;
let tree = get_and_decode_digest::<ProtoTree>(cas_store, tree_digest.into()).await?;
// TODO(allada) When `try_collect()` is stable we can use it instead.
// https://github.com/rust-lang/rust/issues/94047
let mut digest_iter = tree.children.into_iter().chain(tree.root).flat_map(|dir| {
Expand All @@ -87,7 +87,7 @@ async fn check_output_directories(
let mut digest_infos = Vec::with_capacity(digest_iter.size_hint().1.unwrap_or(0));
digest_iter
.try_for_each(|maybe_digest| {
digest_infos.push(maybe_digest?);
digest_infos.push(maybe_digest?.into());
Result::<_, Error>::Ok(())
})
.err_tip(|| "Expected digest to exist and be convertable")?;
Expand Down Expand Up @@ -129,14 +129,14 @@ impl CompletenessCheckingStore {
/// are polled concurrently.
async fn inner_has_with_results(
&self,
action_result_digests: &[DigestInfo],
action_result_digests: &[StoreKey<'_>],
results: &mut [Option<usize>],
) -> Result<(), Error> {
// Holds shared state between the different futures.
// This is how get around lifetime issues.
struct State<'a> {
results: &'a mut [Option<usize>],
digests_to_check: Vec<DigestInfo>,
digests_to_check: Vec<StoreKey<'a>>,
digests_to_check_idxs: Vec<usize>,
notify: Arc<Notify>,
done: bool,
Expand All @@ -161,9 +161,11 @@ impl CompletenessCheckingStore {
.map(|(i, digest)| {
async move {
// Note: We don't err_tip here because often have NotFound here which is ok.
let (action_result, size) =
get_size_and_decode_digest::<ProtoActionResult>(&self.ac_store, digest)
.await?;
let (action_result, size) = get_size_and_decode_digest::<ProtoActionResult>(
&self.ac_store,
digest.borrow(),
)
.await?;

let (mut digest_infos, output_directories) =
get_digests_and_output_dirs(action_result)?;
Expand Down Expand Up @@ -337,31 +339,30 @@ impl CompletenessCheckingStore {
impl StoreDriver for CompletenessCheckingStore {
async fn has_with_results(
self: Pin<&Self>,
action_result_digests: &[DigestInfo],
keys: &[StoreKey<'_>],
results: &mut [Option<usize>],
) -> Result<(), Error> {
self.inner_has_with_results(action_result_digests, results)
.await
self.inner_has_with_results(keys, results).await
}

async fn update(
self: Pin<&Self>,
digest: DigestInfo,
key: StoreKey<'_>,
reader: DropCloserReadHalf,
size_info: UploadSizeInfo,
) -> Result<(), Error> {
self.ac_store.update(digest, reader, size_info).await
self.ac_store.update(key, reader, size_info).await
}

async fn get_part(
self: Pin<&Self>,
digest: DigestInfo,
key: StoreKey<'_>,
writer: &mut DropCloserWriteHalf,
offset: usize,
length: Option<usize>,
) -> Result<(), Error> {
let results = &mut [None];
self.inner_has_with_results(&[digest], results)
self.inner_has_with_results(&[key.borrow()], results)
.await
.err_tip(|| "when calling CompletenessCheckingStore::get_part")?;
if results[0].is_none() {
Expand All @@ -370,10 +371,10 @@ impl StoreDriver for CompletenessCheckingStore {
"Digest found, but not all parts were found in CompletenessCheckingStore::get_part"
));
}
self.ac_store.get_part(digest, writer, offset, length).await
self.ac_store.get_part(key, writer, offset, length).await
}

fn inner_store(&self, _digest: Option<DigestInfo>) -> &dyn StoreDriver {
fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
self
}

Expand Down
Loading

0 comments on commit 4dc8464

Please sign in to comment.