Skip to content

Commit

Permalink
Add functionality for worker to download and create working dir
Browse files Browse the repository at this point in the history
Adds function download_to_directory() which is used to download
a digest encoded as a Directory proto and build a local directory
with those files present.
  • Loading branch information
allada committed Apr 23, 2022
1 parent d7d71a1 commit 5e7f9ef
Show file tree
Hide file tree
Showing 8 changed files with 544 additions and 22 deletions.
4 changes: 3 additions & 1 deletion cas/cas_main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
local_worker_cfg.cas_store
)
})?;
tokio::spawn(new_local_worker(Arc::new(local_worker_cfg), cas_store.clone()).run())
let local_worker = new_local_worker(Arc::new(local_worker_cfg), cas_store.clone())
.err_tip(|| "Could not make LocalWorker")?;
tokio::spawn(local_worker.run())
}
};
futures.push(Box::pin(spawn_fut.map_ok_or_else(|e| Err(e.into()), |v| v)));
Expand Down
6 changes: 3 additions & 3 deletions cas/store/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ rust_library(
":traits",
],
proc_macro_deps = ["//third_party:async_trait"],
visibility = ["//cas:__pkg__"]
visibility = ["//cas:__pkg__", "//cas:__subpackages__"]
)

rust_library(
Expand Down Expand Up @@ -137,7 +137,7 @@ rust_library(
":traits",
],
proc_macro_deps = ["//third_party:async_trait"],
visibility = ["//cas:__pkg__"]
visibility = ["//cas:__pkg__", "//cas:__subpackages__"]
)

rust_library(
Expand Down Expand Up @@ -226,7 +226,7 @@ rust_library(
":traits",
],
proc_macro_deps = ["//third_party:async_trait"],
visibility = ["//cas:__pkg__"]
visibility = ["//cas:__pkg__", "//cas:__subpackages__"]
)

rust_test(
Expand Down
29 changes: 29 additions & 0 deletions cas/store/fast_slow_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,35 @@ impl FastSlowStore {
Self { fast_store, slow_store }
}

pub fn fast_slow<'a>(&'a self) -> &'a Arc<dyn StoreTrait> {
&self.fast_store
}

/// Ensure our fast store is populated. This should be kept as a low
/// cost function. Since the data itself is shared and not copied it should be fairly
/// low cost to just discard the data, but does cost a few mutex locks while
/// streaming.
pub async fn populate_fast_store(self: Pin<&Self>, digest: DigestInfo) -> Result<(), Error> {
let maybe_size_info = self
.pin_fast_store()
.has(digest.clone())
.await
.err_tip(|| "While querying in populate_fast_store")?;
if let Some(_) = maybe_size_info {
return Ok(());
}
// TODO(blaise.bruer) This is extremely inefficient, since we are just trying
// to send the stream to /dev/null. Maybe we could instead make a version of
// the stream that can send to the drain more efficiently?
let (tx, mut rx) = make_buf_channel_pair();
let drain_fut = async move {
while !rx.recv().await?.is_empty() {}
Ok(())
};
let (drain_res, get_res) = join!(drain_fut, self.get(digest, tx));
get_res.err_tip(|| "Failed to populate()").merge(drain_res)
}

fn pin_fast_store<'a>(&'a self) -> Pin<&'a dyn StoreTrait> {
Pin::new(self.fast_store.as_ref())
}
Expand Down
27 changes: 26 additions & 1 deletion cas/worker/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ rust_library(
srcs = ["local_worker.rs"],
deps = [
"//cas/store",
"//cas/store:fast_slow_store",
"//config",
"//proto",
"//third_party:futures",
Expand All @@ -27,10 +28,14 @@ rust_library(
srcs = ["running_actions_manager.rs"],
deps = [
"//cas/scheduler:action_messages",
"//cas/store",
"//cas/store:ac_utils",
"//cas/store:fast_slow_store",
"//cas/store:filesystem_store",
"//proto",
"//third_party:fast_async_mutex",
"//third_party:futures",
"//third_party:tokio",
"//third_party:filetime",
"//util:common",
"//util:error",
],
Expand Down Expand Up @@ -110,6 +115,26 @@ rust_library(
proc_macro_deps = ["//third_party:async_trait"],
)

rust_test(
name = "running_actions_manager_test",
srcs = ["tests/running_actions_manager_test.rs"],
deps = [
"//cas/store",
"//cas/store:fast_slow_store",
"//cas/store:filesystem_store",
"//cas/store:memory_store",
"//config",
"//proto",
"//third_party:pretty_assertions",
"//third_party:prost",
"//third_party:rand",
"//third_party:tokio",
"//util:common",
"//util:error",
":running_actions_manager",
],
)

rust_test(
name = "local_worker_test",
srcs = ["tests/local_worker_test.rs"],
Expand Down
16 changes: 12 additions & 4 deletions cas/worker/local_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use tonic::{transport::Channel as TonicChannel, Streaming};
use common::log;
use config::cas_server::LocalWorkerConfig;
use error::{make_err, make_input_err, Code, Error, ResultExt};
use fast_slow_store::FastSlowStore;
use proto::com::github::allada::turbo_cache::remote_execution::{
execute_result, update_for_worker::Update, worker_api_client::WorkerApiClient, ExecuteFinishedResult,
ExecuteResult, KeepAliveRequest, UpdateForWorker,
Expand Down Expand Up @@ -177,12 +178,19 @@ pub struct LocalWorker<T: WorkerApiClientTrait, U: RunningActionsManager> {
sleep_fn: Option<Box<dyn Fn(Duration) -> BoxFuture<'static, ()> + Send + Sync>>,
}

/// Creates a new LocalWorker. The `cas_store` must be an instance of FastSlowStore and will be
/// checked at runtime.
pub fn new_local_worker(
config: Arc<LocalWorkerConfig>,
cas_store: Arc<dyn Store>,
) -> LocalWorker<WorkerApiClientWrapper, RunningActionsManagerImpl> {
let running_actions_manager = Arc::new(RunningActionsManagerImpl::new(cas_store.clone()));
LocalWorker::new_with_connection_factory_and_actions_manager(
) -> Result<LocalWorker<WorkerApiClientWrapper, RunningActionsManagerImpl>, Error> {
let fast_slow_store = cas_store
.as_any()
.downcast_ref::<Arc<FastSlowStore>>()
.err_tip(|| "Expected store for LocalWorker's store to be a FastSlowStore")?
.clone();
let running_actions_manager = Arc::new(RunningActionsManagerImpl::new(fast_slow_store)?).clone();
Ok(LocalWorker::new_with_connection_factory_and_actions_manager(
config.clone(),
running_actions_manager,
Box::new(move || {
Expand All @@ -204,7 +212,7 @@ pub fn new_local_worker(
})
}),
Box::new(move |d| Box::pin(sleep(d))),
)
))
}

impl<T: WorkerApiClientTrait, U: RunningActionsManager> LocalWorker<T, U> {
Expand Down
159 changes: 146 additions & 13 deletions cas/worker/running_actions_manager.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,130 @@
// Copyright 2022 Nathan (Blaise) Bruer. All rights reserved.

use std::collections::HashMap;
use std::fs::Permissions;
use std::os::unix::fs::PermissionsExt;
use std::pin::Pin;
use std::sync::{Arc, Weak};

use fast_async_mutex::mutex::Mutex;
use filetime::{set_file_mtime, FileTime};
use futures::future::{BoxFuture, FutureExt, TryFutureExt};
use futures::stream::{FuturesUnordered, TryStreamExt};
use tokio::fs;
use tokio::task::spawn_blocking;

use ac_utils::get_and_decode_digest;
use action_messages::ActionInfo;
use async_trait::async_trait;
use common::DigestInfo;
use error::{Error, ResultExt};
use proto::build::bazel::remote::execution::v2::Action;
use error::{make_err, Code, Error, ResultExt};
use fast_slow_store::FastSlowStore;
use filesystem_store::FilesystemStore;
use proto::build::bazel::remote::execution::v2::{Action, Directory as ProtoDirectory};
use proto::com::github::allada::turbo_cache::remote_execution::{ExecuteFinishedResult, StartExecute};
use store::Store;

/// Aggressively download the digests of files and make a local folder from it. This function
/// will spawn unbounded number of futures to try and get these downloaded. The store itself
/// should be rate limited if spawning too many requests at once is an issue.
/// We require the `FilesystemStore` to be the `fast` store of `FastSlowStore`. This is for
/// efficiency reasons. We will request the `FastSlowStore` to populate the entry then we will
/// assume the `FilesystemStore` has the file available immediately after and hardlink the file
/// to a new location.
// Sadly we cannot use `async fn` here because the rust compiler cannot determine the auto traits
// of the future. So we need to force this function to return a dynamic future instead.
// see: https://github.com/rust-lang/rust/issues/78649
pub fn download_to_directory<'a>(
cas_store: Pin<&'a FastSlowStore>,
filesystem_store: Pin<&'a FilesystemStore>,
digest: &'a DigestInfo,
current_directory: &'a str,
) -> BoxFuture<'a, Result<(), Error>> {
async move {
let directory = get_and_decode_digest::<ProtoDirectory>(cas_store, digest)
.await
.err_tip(|| "Converting digest to Directory")?;
let mut futures = FuturesUnordered::new();

for file in directory.files {
let digest: DigestInfo = file
.digest
.err_tip(|| "Expected Digest to exist in Directory::file::digest")?
.try_into()
.err_tip(|| "In Directory::file::digest")?;
let src = filesystem_store.get_file_for_digest(&digest);
let dest = format!("{}/{}", current_directory, file.name);
let mut mtime = None;
let mut unix_mode = None;
if let Some(properties) = file.node_properties {
mtime = properties.mtime;
unix_mode = properties.unix_mode;
}
futures.push(
cas_store
.populate_fast_store(digest.clone())
.and_then(move |_| async move {
fs::hard_link(src, &dest)
.await
.map_err(|e| make_err!(Code::Internal, "Could not make hardlink, {:?} : {}", e, dest))?;
if let Some(unix_mode) = unix_mode {
fs::set_permissions(&dest, Permissions::from_mode(unix_mode))
.await
.err_tip(|| format!("Could not set unix mode in download_to_directory {}", dest))?;
}
if let Some(mtime) = mtime {
spawn_blocking(move || {
set_file_mtime(&dest, FileTime::from_unix_time(mtime.seconds, mtime.nanos as u32))
.err_tip(|| format!("Failed to set mtime in download_to_directory {}", dest))
})
.await
.err_tip(|| "Failed to launch spawn_blocking in download_to_directory")??;
}
Ok(())
})
.map_err(move |e| e.append(format!("for digest {:?}", digest)))
.boxed(),
);
}

for directory in directory.directories {
let digest: DigestInfo = directory
.digest
.err_tip(|| "Expected Digest to exist in Directory::directories::digest")?
.try_into()
.err_tip(|| "In Directory::file::digest")?;
let new_directory_path = format!("{}/{}", current_directory, directory.name);
futures.push(
async move {
fs::create_dir(&new_directory_path)
.await
.err_tip(|| format!("Could not create directory {}", new_directory_path))?;
download_to_directory(cas_store, filesystem_store, &digest, &new_directory_path)
.await
.err_tip(|| format!("in download_to_directory : {}", new_directory_path))?;
Ok(())
}
.boxed(),
);
}

for symlink in directory.symlinks {
let dest = format!("{}/{}", current_directory, symlink.name);
futures.push(
async move {
fs::symlink(&symlink.target, &dest)
.await
.err_tip(|| format!("Could not create symlink {} -> {}", symlink.target, dest))?;
Ok(())
}
.boxed(),
);
}

while futures.try_next().await?.is_some() {}
Ok(())
}
.boxed()
}

#[async_trait]
pub trait RunningAction: Sync + Send + Sized + Unpin + 'static {
Expand All @@ -35,11 +146,19 @@ pub trait RunningAction: Sync + Send + Sized + Unpin + 'static {
async fn get_finished_result(self: Arc<Self>) -> Result<ExecuteFinishedResult, Error>;
}

pub struct RunningActionImpl {}
pub struct RunningActionImpl {
_action_info: ActionInfo,
_cas_store: Pin<Arc<FastSlowStore>>,
_filesystem_store: Pin<Arc<FilesystemStore>>,
}

impl RunningActionImpl {
fn new() -> Self {
Self {}
fn new(action_info: ActionInfo, cas_store: Arc<FastSlowStore>, filesystem_store: Arc<FilesystemStore>) -> Self {
Self {
_action_info: action_info,
_cas_store: Pin::new(cas_store),
_filesystem_store: Pin::new(filesystem_store),
}
}
}

Expand Down Expand Up @@ -81,16 +200,26 @@ type ActionId = [u8; 32];
/// Holds state info about what is being executed and the interface for interacting
/// with actions while they are running.
pub struct RunningActionsManagerImpl {
cas_store: Pin<Arc<dyn Store>>,
cas_store: Arc<FastSlowStore>,
filesystem_store: Arc<FilesystemStore>,
running_actions: Mutex<HashMap<ActionId, Weak<RunningActionImpl>>>,
}

impl RunningActionsManagerImpl {
pub fn new(cas_store: Arc<dyn Store>) -> Self {
Self {
cas_store: Pin::new(cas_store),
pub fn new(cas_store: Arc<FastSlowStore>) -> Result<Self, Error> {
// Sadly because of some limitations of how Any works we need to clone more times than optimal.
let filesystem_store = cas_store
.fast_slow()
.clone()
.as_any()
.downcast_ref::<Arc<FilesystemStore>>()
.err_tip(|| "Expected fast slow store for cas_store in RunningActionsManagerImpl")?
.clone();
Ok(Self {
cas_store,
filesystem_store,
running_actions: Mutex::new(HashMap::new()),
}
})
}

async fn create_action_info(&self, start_execute: StartExecute) -> Result<ActionInfo, Error> {
Expand All @@ -102,7 +231,7 @@ impl RunningActionsManagerImpl {
.clone()
.err_tip(|| "Expected action_digest to exist on StartExecute")?
.try_into()?;
let action = get_and_decode_digest::<Action>(self.cas_store.as_ref(), &action_digest)
let action = get_and_decode_digest::<Action>(Pin::new(self.cas_store.as_ref()), &action_digest)
.await
.err_tip(|| "During start_action")?;
Ok(
Expand All @@ -122,7 +251,11 @@ impl RunningActionsManager for RunningActionsManagerImpl {
) -> Result<Arc<RunningActionImpl>, Error> {
let action_info = self.create_action_info(start_execute).await?;
let action_id = action_info.unique_qualifier.get_hash();
let running_action = Arc::new(RunningActionImpl::new());
let running_action = Arc::new(RunningActionImpl::new(
action_info,
self.cas_store.clone(),
self.filesystem_store.clone(),
));
{
let mut running_actions = self.running_actions.lock().await;
running_actions.insert(action_id, Arc::downgrade(&running_action));
Expand Down
Loading

0 comments on commit 5e7f9ef

Please sign in to comment.