Skip to content

Commit

Permalink
Refactor Store API to use StoreKey (TraceMachina#964)
Browse files Browse the repository at this point in the history
This enables us to use arbitrary strings when using the Store API.

closes TraceMachina#934
  • Loading branch information
allada authored Jun 4, 2024
1 parent 04beafd commit e524bbc
Show file tree
Hide file tree
Showing 37 changed files with 718 additions and 458 deletions.
4 changes: 2 additions & 2 deletions nativelink-scheduler/src/cache_lookup_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ async fn get_action_from_store(
digest_function: DigestHasherFunc,
) -> Option<ProtoActionResult> {
// If we are a GrpcStore we shortcut here, as this is a special store.
if let Some(grpc_store) = ac_store.downcast_ref::<GrpcStore>(Some(action_digest)) {
if let Some(grpc_store) = ac_store.downcast_ref::<GrpcStore>(Some(action_digest.into())) {
let action_result_request = GetActionResultRequest {
instance_name,
action_digest: Some(action_digest.into()),
Expand All @@ -79,7 +79,7 @@ async fn get_action_from_store(
.map(|response| response.into_inner())
.ok()
} else {
get_and_decode_digest::<ProtoActionResult>(ac_store, &action_digest)
get_and_decode_digest::<ProtoActionResult>(ac_store, action_digest.into())
.await
.ok()
}
Expand Down
12 changes: 9 additions & 3 deletions nativelink-service/src/ac_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,15 @@ impl AcServer {
.try_into()?;

// If we are a GrpcStore we shortcut here, as this is a special store.
if let Some(grpc_store) = store_info.store.downcast_ref::<GrpcStore>(Some(digest)) {
if let Some(grpc_store) = store_info
.store
.downcast_ref::<GrpcStore>(Some(digest.into()))
{
return grpc_store.get_action_result(Request::new(request)).await;
}

Ok(Response::new(
get_and_decode_digest::<ActionResult>(&store_info.store, &digest).await?,
get_and_decode_digest::<ActionResult>(&store_info.store, digest.into()).await?,
))
}

Expand Down Expand Up @@ -128,7 +131,10 @@ impl AcServer {
.try_into()?;

// If we are a GrpcStore we shortcut here, as this is a special store.
if let Some(grpc_store) = store_info.store.downcast_ref::<GrpcStore>(Some(digest)) {
if let Some(grpc_store) = store_info
.store
.downcast_ref::<GrpcStore>(Some(digest.into()))
{
return grpc_store.update_action_result(Request::new(request)).await;
}

Expand Down
6 changes: 3 additions & 3 deletions nativelink-service/src/bytestream_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ impl ByteStreamServer {
let digest = DigestInfo::try_new(resource_info.hash.as_ref(), resource_info.expected_size)?;

// If we are a GrpcStore we shortcut here, as this is a special store.
if let Some(grpc_store) = store_clone.downcast_ref::<GrpcStore>(Some(digest)) {
if let Some(grpc_store) = store_clone.downcast_ref::<GrpcStore>(Some(digest.into())) {
return grpc_store
.query_write_status(Request::new(query_request.clone()))
.await;
Expand Down Expand Up @@ -582,7 +582,7 @@ impl ByteStream for ByteStreamServer {
let digest = DigestInfo::try_new(resource_info.hash.as_ref(), resource_info.expected_size)?;

// If we are a GrpcStore we shortcut here, as this is a special store.
if let Some(grpc_store) = store.downcast_ref::<GrpcStore>(Some(digest)) {
if let Some(grpc_store) = store.downcast_ref::<GrpcStore>(Some(digest.into())) {
let stream = grpc_store.read(Request::new(read_request)).await?;
return Ok(Response::new(Box::pin(stream)));
}
Expand Down Expand Up @@ -638,7 +638,7 @@ impl ByteStream for ByteStreamServer {
.err_tip(|| "Invalid digest input in ByteStream::write")?;

// If we are a GrpcStore we shortcut here, as this is a special store.
if let Some(grpc_store) = store.downcast_ref::<GrpcStore>(Some(digest)) {
if let Some(grpc_store) = store.downcast_ref::<GrpcStore>(Some(digest.into())) {
return grpc_store.write(stream).await.map_err(|e| e.into());
}

Expand Down
4 changes: 2 additions & 2 deletions nativelink-service/src/cas_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl CasServer {

let mut requested_blobs = Vec::with_capacity(request.blob_digests.len());
for digest in request.blob_digests.iter() {
requested_blobs.push(DigestInfo::try_from(digest.clone())?);
requested_blobs.push(DigestInfo::try_from(digest.clone())?.into());
}
let sizes = store
.has_many(&requested_blobs)
Expand Down Expand Up @@ -255,7 +255,7 @@ impl CasServer {

while !deque.is_empty() {
let digest: DigestInfo = deque.pop_front().err_tip(|| "In VecDeque::pop_front")?;
let directory = get_and_decode_digest::<Directory>(&store, &digest)
let directory = get_and_decode_digest::<Directory>(&store, digest.into())
.await
.err_tip(|| "Converting digest to Directory")?;
if digest == page_token_digest {
Expand Down
5 changes: 3 additions & 2 deletions nativelink-service/src/execution_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl InstanceInfo {
// Goma puts the properties in the Command.
if platform_properties.is_empty() {
let command =
get_and_decode_digest::<Command>(&self.cas_store, &command_digest).await?;
get_and_decode_digest::<Command>(&self.cas_store, command_digest.into()).await?;
if let Some(platform) = &command.platform {
for property in &platform.properties {
let platform_property = self
Expand Down Expand Up @@ -209,7 +209,8 @@ impl ExecutionServer {
.execution_policy
.map_or(DEFAULT_EXECUTION_PRIORITY, |p| p.priority);

let action = get_and_decode_digest::<Action>(&instance_info.cas_store, &digest).await?;
let action =
get_and_decode_digest::<Action>(&instance_info.cas_store, digest.into()).await?;
let action_info = instance_info
.build_action_info(
instance_name,
Expand Down
4 changes: 2 additions & 2 deletions nativelink-service/tests/ac_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ mod get_action_result {
assert_eq!(err.code(), Code::NotFound);
assert_eq!(
err.message(),
"Hash 0123456789abcdef000000000000000000000000000000000123456789abcdef not found"
"Key Digest(DigestInfo { size_bytes: 0, hash: \"0123456789abcdef000000000000000000000000000000000123456789abcdef\" }) not found"
);
Ok(())
}
Expand Down Expand Up @@ -174,7 +174,7 @@ mod get_action_result {
assert_eq!(err.code(), Code::NotFound);
assert_eq!(
err.message(),
"Hash 0123456789abcdef000000000000000000000000000000000123456789abcdef not found"
"Key Digest(DigestInfo { size_bytes: 146, hash: \"0123456789abcdef000000000000000000000000000000000123456789abcdef\" }) not found"
);
Ok(())
}
Expand Down
3 changes: 1 addition & 2 deletions nativelink-service/tests/bytestream_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -954,8 +954,7 @@ pub mod read_tests {

let result = result_fut.await.err_tip(|| "Expected result to be ready")?;
let expected_err_str = concat!(
"status: NotFound, message: \"Hash 0123456789abcdef000000000000000000000000000000000123456789abcdef ",
"not found\", details: [], metadata: MetadataMap { headers: {} }",
"status: NotFound, message: \"Key Digest(DigestInfo { size_bytes: 55, hash: \\\"0123456789abcdef000000000000000000000000000000000123456789abcdef\\\" }) not found\", details: [], metadata: MetadataMap { headers: {} }",
);
assert_eq!(
Error::from(result.unwrap_err()),
Expand Down
7 changes: 5 additions & 2 deletions nativelink-service/tests/cas_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use nativelink_store::ac_utils::serialize_and_upload_message;
use nativelink_store::default_store_factory::store_factory;
use nativelink_store::store_manager::StoreManager;
use nativelink_util::common::DigestInfo;
use nativelink_util::store_trait::StoreLike;
use nativelink_util::store_trait::{StoreKey, StoreLike};
use prometheus_client::registry::Registry;
use tonic::Request;

Expand Down Expand Up @@ -309,7 +309,10 @@ mod batch_read_blobs {
data: vec![].into(),
status: Some(GrpcStatus {
code: Code::NotFound as i32,
message: format!("Hash {} not found", digest3.hash),
message: format!(
"Key {:?} not found",
StoreKey::from(DigestInfo::try_from(digest3)?)
),
details: vec![],
}),
compressor: compressor::Value::Identity.into(),
Expand Down
26 changes: 18 additions & 8 deletions nativelink-store/src/ac_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use futures::TryFutureExt;
use nativelink_error::{Code, Error, ResultExt};
use nativelink_util::common::DigestInfo;
use nativelink_util::digest_hasher::DigestHasher;
use nativelink_util::store_trait::StoreLike;
use nativelink_util::store_trait::{StoreKey, StoreLike};
use prost::Message;

// NOTE(blaise.bruer) From some local testing it looks like action cache items are rarely greater than
Expand All @@ -37,22 +37,27 @@ pub const ESTIMATED_DIGEST_SIZE: usize = 2048;
const MAX_ACTION_MSG_SIZE: usize = 10 << 20; // 10mb.

/// Attempts to fetch the digest contents from a store into the associated proto.
pub async fn get_and_decode_digest<T: Message + Default>(
pub async fn get_and_decode_digest<T: Message + Default + 'static>(
store: &impl StoreLike,
digest: &DigestInfo,
key: StoreKey<'_>,
) -> Result<T, Error> {
get_size_and_decode_digest(store, digest)
get_size_and_decode_digest(store, key)
.map_ok(|(v, _)| v)
.await
}

/// Attempts to fetch the digest contents from a store into the associated proto.
pub async fn get_size_and_decode_digest<'a, T: Message + Default>(
pub async fn get_size_and_decode_digest<T: Message + Default + 'static>(
store: &impl StoreLike,
digest: &DigestInfo,
key: impl Into<StoreKey<'_>>,
) -> Result<(T, usize), Error> {
// Note: For unknown reasons we appear to be hitting:
// https://github.com/rust-lang/rust/issues/92096
// or a smiliar issue if we try to use the non-store driver function, so we
// are using the store driver function here.
let mut store_data_resp = store
.get_part_unchunked(*digest, 0, Some(MAX_ACTION_MSG_SIZE))
.as_store_driver_pin()
.get_part_unchunked(key.into(), 0, Some(MAX_ACTION_MSG_SIZE))
.await;
if let Err(err) = &mut store_data_resp {
if err.code == Code::NotFound {
Expand Down Expand Up @@ -97,8 +102,13 @@ pub async fn serialize_and_upload_message<'a, T: Message>(
let mut buffer = BytesMut::with_capacity(message.encoded_len());
let digest = message_to_digest(message, &mut buffer, hasher)
.err_tip(|| "In serialize_and_upload_message")?;
// Note: For unknown reasons we appear to be hitting:
// https://github.com/rust-lang/rust/issues/92096
// or a smiliar issue if we try to use the non-store driver function, so we
// are using the store driver function here.
cas_store
.update_oneshot(digest, buffer.freeze())
.as_store_driver_pin()
.update_oneshot(digest.into(), buffer.freeze())
.await
.err_tip(|| "In serialize_and_upload_message")?;
Ok(digest)
Expand Down
8 changes: 6 additions & 2 deletions nativelink-store/src/cas_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use nativelink_util::common::DigestInfo;
use nativelink_util::store_trait::StoreKey;

pub const ZERO_BYTE_DIGESTS: [DigestInfo; 2] = [
// Sha256 hash of zero bytes.
Expand All @@ -36,6 +37,9 @@ pub const ZERO_BYTE_DIGESTS: [DigestInfo; 2] = [
];

#[inline]
pub fn is_zero_digest(digest: &DigestInfo) -> bool {
digest.size_bytes == 0 && ZERO_BYTE_DIGESTS.contains(digest)
pub fn is_zero_digest<'a>(digest: impl Into<StoreKey<'a>>) -> bool {
match digest.into() {
StoreKey::Digest(digest) => digest.size_bytes == 0 && ZERO_BYTE_DIGESTS.contains(&digest),
_ => false,
}
}
43 changes: 22 additions & 21 deletions nativelink-store/src/completeness_checking_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use nativelink_util::health_utils::{default_health_status_indicator, HealthStatu
use nativelink_util::metrics_utils::{
Collector, CollectorState, CounterWithTime, MetricsComponent, Registry,
};
use nativelink_util::store_trait::{Store, StoreDriver, StoreLike, UploadSizeInfo};
use nativelink_util::store_trait::{Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo};
use parking_lot::Mutex;
use tokio::sync::Notify;
use tracing::{event, Level};
Expand All @@ -40,7 +40,7 @@ use crate::ac_utils::{get_and_decode_digest, get_size_and_decode_digest};
/// output directories that need to be checked.
fn get_digests_and_output_dirs(
action_result: ProtoActionResult,
) -> Result<(Vec<DigestInfo>, Vec<ProtoOutputDirectory>), Error> {
) -> Result<(Vec<StoreKey<'static>>, Vec<ProtoOutputDirectory>), Error> {
// TODO(allada) When `try_collect()` is stable we can use it instead.
let mut digest_iter = action_result
.output_files
Expand All @@ -51,7 +51,7 @@ fn get_digests_and_output_dirs(
let mut digest_infos = Vec::with_capacity(digest_iter.size_hint().1.unwrap_or(0));
digest_iter
.try_for_each(|maybe_digest| {
digest_infos.push(maybe_digest?);
digest_infos.push(maybe_digest?.into());
Result::<_, Error>::Ok(())
})
.err_tip(|| "Some digests could not be converted to DigestInfos")?;
Expand All @@ -61,10 +61,10 @@ fn get_digests_and_output_dirs(
/// Given a list of output directories recursively get all digests
/// that need to be checked and pass them into `handle_digest_infos_fn`
/// as they are found.
async fn check_output_directories(
async fn check_output_directories<'a>(
cas_store: &Store,
output_directories: Vec<ProtoOutputDirectory>,
handle_digest_infos_fn: &impl Fn(Vec<DigestInfo>),
handle_digest_infos_fn: &impl Fn(Vec<StoreKey<'a>>),
) -> Result<(), Error> {
let mut futures = FuturesUnordered::new();

Expand All @@ -75,7 +75,7 @@ async fn check_output_directories(
let tree_digest = maybe_tree_digest
.err_tip(|| "Could not decode tree digest CompletenessCheckingStore::has")?;
futures.push(async move {
let tree = get_and_decode_digest::<ProtoTree>(cas_store, &tree_digest).await?;
let tree = get_and_decode_digest::<ProtoTree>(cas_store, tree_digest.into()).await?;
// TODO(allada) When `try_collect()` is stable we can use it instead.
// https://github.com/rust-lang/rust/issues/94047
let mut digest_iter = tree.children.into_iter().chain(tree.root).flat_map(|dir| {
Expand All @@ -87,7 +87,7 @@ async fn check_output_directories(
let mut digest_infos = Vec::with_capacity(digest_iter.size_hint().1.unwrap_or(0));
digest_iter
.try_for_each(|maybe_digest| {
digest_infos.push(maybe_digest?);
digest_infos.push(maybe_digest?.into());
Result::<_, Error>::Ok(())
})
.err_tip(|| "Expected digest to exist and be convertable")?;
Expand Down Expand Up @@ -129,14 +129,14 @@ impl CompletenessCheckingStore {
/// are polled concurrently.
async fn inner_has_with_results(
&self,
action_result_digests: &[DigestInfo],
action_result_digests: &[StoreKey<'_>],
results: &mut [Option<usize>],
) -> Result<(), Error> {
// Holds shared state between the different futures.
// This is how get around lifetime issues.
struct State<'a> {
results: &'a mut [Option<usize>],
digests_to_check: Vec<DigestInfo>,
digests_to_check: Vec<StoreKey<'a>>,
digests_to_check_idxs: Vec<usize>,
notify: Arc<Notify>,
done: bool,
Expand All @@ -161,9 +161,11 @@ impl CompletenessCheckingStore {
.map(|(i, digest)| {
async move {
// Note: We don't err_tip here because often have NotFound here which is ok.
let (action_result, size) =
get_size_and_decode_digest::<ProtoActionResult>(&self.ac_store, digest)
.await?;
let (action_result, size) = get_size_and_decode_digest::<ProtoActionResult>(
&self.ac_store,
digest.borrow(),
)
.await?;

let (mut digest_infos, output_directories) =
get_digests_and_output_dirs(action_result)?;
Expand Down Expand Up @@ -337,31 +339,30 @@ impl CompletenessCheckingStore {
impl StoreDriver for CompletenessCheckingStore {
async fn has_with_results(
self: Pin<&Self>,
action_result_digests: &[DigestInfo],
keys: &[StoreKey<'_>],
results: &mut [Option<usize>],
) -> Result<(), Error> {
self.inner_has_with_results(action_result_digests, results)
.await
self.inner_has_with_results(keys, results).await
}

async fn update(
self: Pin<&Self>,
digest: DigestInfo,
key: StoreKey<'_>,
reader: DropCloserReadHalf,
size_info: UploadSizeInfo,
) -> Result<(), Error> {
self.ac_store.update(digest, reader, size_info).await
self.ac_store.update(key, reader, size_info).await
}

async fn get_part(
self: Pin<&Self>,
digest: DigestInfo,
key: StoreKey<'_>,
writer: &mut DropCloserWriteHalf,
offset: usize,
length: Option<usize>,
) -> Result<(), Error> {
let results = &mut [None];
self.inner_has_with_results(&[digest], results)
self.inner_has_with_results(&[key.borrow()], results)
.await
.err_tip(|| "when calling CompletenessCheckingStore::get_part")?;
if results[0].is_none() {
Expand All @@ -370,10 +371,10 @@ impl StoreDriver for CompletenessCheckingStore {
"Digest found, but not all parts were found in CompletenessCheckingStore::get_part"
));
}
self.ac_store.get_part(digest, writer, offset, length).await
self.ac_store.get_part(key, writer, offset, length).await
}

fn inner_store(&self, _digest: Option<DigestInfo>) -> &dyn StoreDriver {
fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
self
}

Expand Down
Loading

0 comments on commit e524bbc

Please sign in to comment.