diff --git a/src/rust/engine/Cargo.lock b/src/rust/engine/Cargo.lock index 11e3988b606..9863e8abfc1 100644 --- a/src/rust/engine/Cargo.lock +++ b/src/rust/engine/Cargo.lock @@ -1738,6 +1738,7 @@ dependencies = [ "mock 0.0.1", "protobuf 2.0.6 (git+https://github.com/pantsbuild/rust-protobuf?rev=171611c33ec92f07e1b7107327f6d0139a7afebf)", "sha2 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", + "sharded_lmdb 0.0.1", "store 0.1.0", "task_executor 0.0.1", "tempfile 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/src/rust/engine/process_execution/Cargo.toml b/src/rust/engine/process_execution/Cargo.toml index 7e5d988225e..e846e76d764 100644 --- a/src/rust/engine/process_execution/Cargo.toml +++ b/src/rust/engine/process_execution/Cargo.toml @@ -19,6 +19,7 @@ hashing = { path = "../hashing" } log = "0.4" protobuf = { version = "2.0.6", features = ["with-bytes"] } sha2 = "0.8" +sharded_lmdb = { path = "../sharded_lmdb" } store = { path = "../fs/store" } task_executor = { path = "../task_executor" } tempfile = "3" diff --git a/src/rust/engine/process_execution/src/cache.rs b/src/rust/engine/process_execution/src/cache.rs new file mode 100644 index 00000000000..e45127eb42f --- /dev/null +++ b/src/rust/engine/process_execution/src/cache.rs @@ -0,0 +1,217 @@ +use crate::{ExecuteProcessRequest, ExecuteProcessRequestMetadata, FallibleExecuteProcessResult}; +use boxfuture::{try_future, BoxFuture, Boxable}; +use bytes::Bytes; +use futures::Future; +use hashing::{Digest, Fingerprint}; +use log::{debug, warn}; +use protobuf::Message; +use sharded_lmdb::ShardedLmdb; +use std::sync::Arc; +use store::Store; +use workunit_store::WorkUnitStore; + +#[derive(Clone)] +struct CommandRunner { + underlying: Arc, + process_execution_store: ShardedLmdb, + file_store: Store, + metadata: ExecuteProcessRequestMetadata, +} + +impl crate::CommandRunner for CommandRunner { + // TODO: Maybe record WorkUnits for local cache checks. + fn run( + &self, + req: ExecuteProcessRequest, + workunit_store: WorkUnitStore, + ) -> BoxFuture { + let digest = try_future!(self.digest(&req)); + let key = digest.0; + + let command_runner = self.clone(); + self + .lookup(key) + .then(move |maybe_result| { + match maybe_result { + Ok(Some(result)) => return futures::future::ok(result).to_boxed(), + Err(err) => { + warn!("Error loading process execution result from local cache: {} - continuing to execute", err); + // Falling through to re-execute. + }, + Ok(None) => { + // Falling through to execute. + }, + } + command_runner + .underlying + .run(req, workunit_store) + .and_then(move |result| { + command_runner.store(key, &result).then(|store_result| { + if let Err(err) = store_result { + debug!("Error storing process execution result to local cache: {} - ignoring and continuing", err); + } + Ok(result) + }) + }) + .to_boxed() + }) + .to_boxed() + } +} + +impl CommandRunner { + fn digest(&self, req: &ExecuteProcessRequest) -> Result { + let (_action, _command, execute_request) = + crate::remote::make_execute_request(req, self.metadata.clone())?; + execute_request.get_action_digest().into() + } + + fn lookup( + &self, + fingerprint: Fingerprint, + ) -> impl Future, Error = String> { + let file_store = self.file_store.clone(); + self + .process_execution_store + .load_bytes_with(fingerprint, |bytes| { + let mut execute_response = bazel_protos::remote_execution::ExecuteResponse::new(); + execute_response + .merge_from_bytes(&bytes) + .map_err(|e| format!("Invalid ExecuteResponse: {:?}", e))?; + Ok(execute_response) + }) + .and_then(move |maybe_execute_response| { + if let Some(execute_response) = maybe_execute_response { + crate::remote::populate_fallible_execution_result(file_store, execute_response, vec![]) + .map(Some) + .to_boxed() + } else { + futures::future::ok(None).to_boxed() + } + }) + } + + fn store( + &self, + fingerprint: Fingerprint, + result: &FallibleExecuteProcessResult, + ) -> impl Future { + let mut execute_response = bazel_protos::remote_execution::ExecuteResponse::new(); + execute_response.set_cached_result(true); + let action_result = execute_response.mut_result(); + action_result.set_exit_code(result.exit_code); + action_result.mut_output_directories().push({ + let mut directory = bazel_protos::remote_execution::OutputDirectory::new(); + directory.set_path(String::new()); + directory.set_tree_digest((&result.output_directory).into()); + directory + }); + let process_execution_store = self.process_execution_store.clone(); + // TODO: Should probably have a configurable lease time which is larger than default. + // (This isn't super urgent because we don't ever actually GC this store. So also...) + // TODO: GC the local process execution cache. + self + .file_store + .store_file_bytes(result.stdout.clone(), true) + .join( + self + .file_store + .store_file_bytes(result.stderr.clone(), true), + ) + .and_then(move |(stdout_digest, stderr_digest)| { + let action_result = execute_response.mut_result(); + action_result.set_stdout_digest((&stdout_digest).into()); + action_result.set_stderr_digest((&stderr_digest).into()); + execute_response + .write_to_bytes() + .map(Bytes::from) + .map_err(|err| format!("Error serializing execute process result to cache: {}", err)) + }) + .and_then(move |bytes| process_execution_store.store_bytes(fingerprint, bytes, false)) + } +} + +#[cfg(test)] +mod test { + use crate::ExecuteProcessRequest; + use crate::{CommandRunner as CommandRunnerTrait, ExecuteProcessRequestMetadata}; + use hashing::EMPTY_DIGEST; + use sharded_lmdb::ShardedLmdb; + use std::collections::{BTreeMap, BTreeSet}; + use std::io::Write; + use std::path::PathBuf; + use std::sync::Arc; + use std::time::Duration; + use store::Store; + use tempfile::TempDir; + use testutil::data::TestData; + use workunit_store::WorkUnitStore; + + #[test] + fn roundtrip() { + let runtime = task_executor::Executor::new(); + let work_dir = TempDir::new().unwrap(); + let store_dir = TempDir::new().unwrap(); + let store = Store::local_only(runtime.clone(), store_dir.path()).unwrap(); + let local = crate::local::CommandRunner::new( + store.clone(), + runtime.clone(), + work_dir.path().to_owned(), + true, + ); + + let script_dir = TempDir::new().unwrap(); + let script_path = script_dir.path().join("script"); + std::fs::File::create(&script_path) + .and_then(|mut file| { + writeln!( + file, + "echo -n {} > roland && echo Hello && echo >&2 World", + TestData::roland().string(), + ) + }) + .unwrap(); + + let request = ExecuteProcessRequest { + argv: vec![ + testutil::path::find_bash(), + format!("{}", script_path.display()), + ], + env: BTreeMap::new(), + input_files: EMPTY_DIGEST, + output_files: vec![PathBuf::from("roland")].into_iter().collect(), + output_directories: BTreeSet::new(), + timeout: Duration::from_millis(1000), + description: "bash".to_string(), + jdk_home: None, + }; + + let local_result = runtime.block_on(local.run(request.clone(), WorkUnitStore::new())); + + let cache_dir = TempDir::new().unwrap(); + let caching = crate::cache::CommandRunner { + underlying: Arc::new(local), + file_store: store.clone(), + process_execution_store: ShardedLmdb::new( + cache_dir.path().to_owned(), + 50 * 1024 * 1024, + runtime.clone(), + ) + .unwrap(), + metadata: ExecuteProcessRequestMetadata { + instance_name: None, + cache_key_gen_version: None, + platform_properties: BTreeMap::new(), + }, + }; + + let uncached_result = runtime.block_on(caching.run(request.clone(), WorkUnitStore::new())); + + assert_eq!(local_result, uncached_result); + + std::fs::remove_file(&script_path).unwrap(); + let cached_result = runtime.block_on(caching.run(request, WorkUnitStore::new())); + + assert_eq!(uncached_result, cached_result); + } +} diff --git a/src/rust/engine/process_execution/src/lib.rs b/src/rust/engine/process_execution/src/lib.rs index ed13290b12f..fe5a34aa240 100644 --- a/src/rust/engine/process_execution/src/lib.rs +++ b/src/rust/engine/process_execution/src/lib.rs @@ -41,6 +41,7 @@ use workunit_store::WorkUnitStore; use async_semaphore::AsyncSemaphore; +pub mod cache; pub mod local; pub mod remote; diff --git a/src/rust/engine/process_execution/src/remote.rs b/src/rust/engine/process_execution/src/remote.rs index 0515b71697b..75b6f27e59e 100644 --- a/src/rust/engine/process_execution/src/remote.rs +++ b/src/rust/engine/process_execution/src/remote.rs @@ -475,20 +475,13 @@ impl CommandRunner { let status = execute_response.take_status(); if grpcio::RpcStatusCode::from(status.get_code()) == grpcio::RpcStatusCode::Ok { - return self - .extract_stdout(&execute_response) - .join(self.extract_stderr(&execute_response)) - .join(self.extract_output_files(&execute_response)) - .and_then(move |((stdout, stderr), output_directory)| { - Ok(FallibleExecuteProcessResult { - stdout: stdout, - stderr: stderr, - exit_code: execute_response.get_result().get_exit_code(), - output_directory: output_directory, - execution_attempts: execution_attempts, - }) - }) - .to_boxed(); + return populate_fallible_execution_result( + self.store.clone(), + execute_response, + execution_attempts, + ) + .map_err(ExecutionError::Fatal) + .to_boxed(); } status } @@ -576,202 +569,6 @@ impl CommandRunner { } .to_boxed() } - - fn extract_stdout( - &self, - execute_response: &bazel_protos::remote_execution::ExecuteResponse, - ) -> BoxFuture { - if execute_response.get_result().has_stdout_digest() { - let stdout_digest_result: Result = - execute_response.get_result().get_stdout_digest().into(); - let stdout_digest = try_future!(stdout_digest_result - .map_err(|err| ExecutionError::Fatal(format!("Error extracting stdout: {}", err)))); - self - .store - .load_file_bytes_with(stdout_digest, |v| v) - .map_err(move |error| { - ExecutionError::Fatal(format!( - "Error fetching stdout digest ({:?}): {:?}", - stdout_digest, error - )) - }) - .and_then(move |maybe_value| { - maybe_value.ok_or_else(|| { - ExecutionError::Fatal(format!( - "Couldn't find stdout digest ({:?}), when fetching.", - stdout_digest - )) - }) - }) - .map(|(bytes, _metadata)| bytes) - .to_boxed() - } else { - let stdout_raw = Bytes::from(execute_response.get_result().get_stdout_raw()); - let stdout_copy = stdout_raw.clone(); - self - .store - .store_file_bytes(stdout_raw, true) - .map_err(move |error| { - ExecutionError::Fatal(format!("Error storing raw stdout: {:?}", error)) - }) - .map(|_| stdout_copy) - .to_boxed() - } - } - - fn extract_stderr( - &self, - execute_response: &bazel_protos::remote_execution::ExecuteResponse, - ) -> BoxFuture { - if execute_response.get_result().has_stderr_digest() { - let stderr_digest_result: Result = - execute_response.get_result().get_stderr_digest().into(); - let stderr_digest = try_future!(stderr_digest_result - .map_err(|err| ExecutionError::Fatal(format!("Error extracting stderr: {}", err)))); - self - .store - .load_file_bytes_with(stderr_digest, |v| v) - .map_err(move |error| { - ExecutionError::Fatal(format!( - "Error fetching stderr digest ({:?}): {:?}", - stderr_digest, error - )) - }) - .and_then(move |maybe_value| { - maybe_value.ok_or_else(|| { - ExecutionError::Fatal(format!( - "Couldn't find stderr digest ({:?}), when fetching.", - stderr_digest - )) - }) - }) - .map(|(bytes, _metadata)| bytes) - .to_boxed() - } else { - let stderr_raw = Bytes::from(execute_response.get_result().get_stderr_raw()); - let stderr_copy = stderr_raw.clone(); - self - .store - .store_file_bytes(stderr_raw, true) - .map_err(move |error| { - ExecutionError::Fatal(format!("Error storing raw stderr: {:?}", error)) - }) - .map(|_| stderr_copy) - .to_boxed() - } - } - - fn extract_output_files( - &self, - execute_response: &bazel_protos::remote_execution::ExecuteResponse, - ) -> BoxFuture { - // Get Digests of output Directories. - // Then we'll make a Directory for the output files, and merge them. - let mut directory_digests = - Vec::with_capacity(execute_response.get_result().get_output_directories().len() + 1); - // TODO: Maybe take rather than clone - let output_directories = execute_response - .get_result() - .get_output_directories() - .to_owned(); - for dir in output_directories { - let digest_result: Result = dir.get_tree_digest().into(); - let mut digest = future::done(digest_result).to_boxed(); - if !dir.get_path().is_empty() { - for component in dir.get_path().rsplit('/') { - let component = component.to_owned(); - let store = self.store.clone(); - digest = digest - .and_then(move |digest| { - let mut directory = bazel_protos::remote_execution::Directory::new(); - directory.mut_directories().push({ - let mut node = bazel_protos::remote_execution::DirectoryNode::new(); - node.set_name(component); - node.set_digest((&digest).into()); - node - }); - store.record_directory(&directory, true) - }) - .to_boxed(); - } - } - directory_digests.push(digest.map_err(|err| { - ExecutionError::Fatal(format!("Error saving remote output directory: {}", err)) - })); - } - - // Make a directory for the files - let mut path_map = HashMap::new(); - let path_stats_result: Result, String> = execute_response - .get_result() - .get_output_files() - .iter() - .map(|output_file| { - let output_file_path_buf = PathBuf::from(output_file.get_path()); - let digest: Result = output_file.get_digest().into(); - path_map.insert(output_file_path_buf.clone(), digest?); - Ok(PathStat::file( - output_file_path_buf.clone(), - File { - path: output_file_path_buf, - is_executable: output_file.get_is_executable(), - }, - )) - }) - .collect(); - - let path_stats = try_future!(path_stats_result.map_err(ExecutionError::Fatal)); - - #[derive(Clone)] - struct StoreOneOffRemoteDigest { - map_of_paths_to_digests: HashMap, - } - - impl StoreOneOffRemoteDigest { - fn new(map: HashMap) -> StoreOneOffRemoteDigest { - StoreOneOffRemoteDigest { - map_of_paths_to_digests: map, - } - } - } - - impl StoreFileByDigest for StoreOneOffRemoteDigest { - fn store_by_digest(&self, file: File) -> BoxFuture { - match self.map_of_paths_to_digests.get(&file.path) { - Some(digest) => future::ok(*digest), - None => future::err(format!( - "Didn't know digest for path in remote execution response: {:?}", - file.path - )), - } - .to_boxed() - } - } - - let store = self.store.clone(); - Snapshot::digest_from_path_stats( - self.store.clone(), - &StoreOneOffRemoteDigest::new(path_map), - &path_stats, - ) - .map_err(move |error| { - ExecutionError::Fatal(format!( - "Error when storing the output file directory info in the remote CAS: {:?}", - error - )) - }) - .join(future::join_all(directory_digests)) - .and_then(|(files_digest, mut directory_digests)| { - directory_digests.push(files_digest); - Snapshot::merge_directories(store, directory_digests).map_err(|err| { - ExecutionError::Fatal(format!( - "Error when merging output files and directories: {}", - err - )) - }) - }) - .to_boxed() - } } fn maybe_add_workunit( @@ -796,7 +593,7 @@ fn maybe_add_workunit( } } -fn make_execute_request( +pub fn make_execute_request( req: &ExecuteProcessRequest, metadata: ExecuteProcessRequestMetadata, ) -> Result< @@ -890,6 +687,208 @@ fn make_execute_request( Ok((action, command, execute_request)) } +pub fn populate_fallible_execution_result( + store: Store, + execute_response: bazel_protos::remote_execution::ExecuteResponse, + execution_attempts: Vec, +) -> impl Future { + extract_stdout(&store, &execute_response) + .join(extract_stderr(&store, &execute_response)) + .join(extract_output_files(store, &execute_response)) + .and_then(move |((stdout, stderr), output_directory)| { + Ok(FallibleExecuteProcessResult { + stdout: stdout, + stderr: stderr, + exit_code: execute_response.get_result().get_exit_code(), + output_directory: output_directory, + execution_attempts: execution_attempts, + }) + }) +} + +fn extract_stdout( + store: &Store, + execute_response: &bazel_protos::remote_execution::ExecuteResponse, +) -> BoxFuture { + if execute_response.get_result().has_stdout_digest() { + let stdout_digest_result: Result = + execute_response.get_result().get_stdout_digest().into(); + let stdout_digest = + try_future!(stdout_digest_result.map_err(|err| format!("Error extracting stdout: {}", err))); + store + .load_file_bytes_with(stdout_digest, |v| v) + .map_err(move |error| { + format!( + "Error fetching stdout digest ({:?}): {:?}", + stdout_digest, error + ) + }) + .and_then(move |maybe_value| { + maybe_value.ok_or_else(|| { + format!( + "Couldn't find stdout digest ({:?}), when fetching.", + stdout_digest + ) + }) + }) + .map(|(bytes, _metadata)| bytes) + .to_boxed() + } else { + let stdout_raw = Bytes::from(execute_response.get_result().get_stdout_raw()); + let stdout_copy = stdout_raw.clone(); + store + .store_file_bytes(stdout_raw, true) + .map_err(move |error| format!("Error storing raw stdout: {:?}", error)) + .map(|_| stdout_copy) + .to_boxed() + } +} + +fn extract_stderr( + store: &Store, + execute_response: &bazel_protos::remote_execution::ExecuteResponse, +) -> BoxFuture { + if execute_response.get_result().has_stderr_digest() { + let stderr_digest_result: Result = + execute_response.get_result().get_stderr_digest().into(); + let stderr_digest = + try_future!(stderr_digest_result.map_err(|err| format!("Error extracting stderr: {}", err))); + store + .load_file_bytes_with(stderr_digest, |v| v) + .map_err(move |error| { + format!( + "Error fetching stderr digest ({:?}): {:?}", + stderr_digest, error + ) + }) + .and_then(move |maybe_value| { + maybe_value.ok_or_else(|| { + format!( + "Couldn't find stderr digest ({:?}), when fetching.", + stderr_digest + ) + }) + }) + .map(|(bytes, _metadata)| bytes) + .to_boxed() + } else { + let stderr_raw = Bytes::from(execute_response.get_result().get_stderr_raw()); + let stderr_copy = stderr_raw.clone(); + store + .store_file_bytes(stderr_raw, true) + .map_err(move |error| format!("Error storing raw stderr: {:?}", error)) + .map(|_| stderr_copy) + .to_boxed() + } +} + +fn extract_output_files( + store: Store, + execute_response: &bazel_protos::remote_execution::ExecuteResponse, +) -> BoxFuture { + // Get Digests of output Directories. + // Then we'll make a Directory for the output files, and merge them. + let mut directory_digests = + Vec::with_capacity(execute_response.get_result().get_output_directories().len() + 1); + // TODO: Maybe take rather than clone + let output_directories = execute_response + .get_result() + .get_output_directories() + .to_owned(); + for dir in output_directories { + let digest_result: Result = dir.get_tree_digest().into(); + let mut digest = future::done(digest_result).to_boxed(); + if !dir.get_path().is_empty() { + for component in dir.get_path().rsplit('/') { + let component = component.to_owned(); + let store = store.clone(); + digest = digest + .and_then(move |digest| { + let mut directory = bazel_protos::remote_execution::Directory::new(); + directory.mut_directories().push({ + let mut node = bazel_protos::remote_execution::DirectoryNode::new(); + node.set_name(component); + node.set_digest((&digest).into()); + node + }); + store.record_directory(&directory, true) + }) + .to_boxed(); + } + } + directory_digests + .push(digest.map_err(|err| format!("Error saving remote output directory: {}", err))); + } + + // Make a directory for the files + let mut path_map = HashMap::new(); + let path_stats_result: Result, String> = execute_response + .get_result() + .get_output_files() + .iter() + .map(|output_file| { + let output_file_path_buf = PathBuf::from(output_file.get_path()); + let digest: Result = output_file.get_digest().into(); + path_map.insert(output_file_path_buf.clone(), digest?); + Ok(PathStat::file( + output_file_path_buf.clone(), + File { + path: output_file_path_buf, + is_executable: output_file.get_is_executable(), + }, + )) + }) + .collect(); + + let path_stats = try_future!(path_stats_result); + + #[derive(Clone)] + struct StoreOneOffRemoteDigest { + map_of_paths_to_digests: HashMap, + } + + impl StoreOneOffRemoteDigest { + fn new(map: HashMap) -> StoreOneOffRemoteDigest { + StoreOneOffRemoteDigest { + map_of_paths_to_digests: map, + } + } + } + + impl StoreFileByDigest for StoreOneOffRemoteDigest { + fn store_by_digest(&self, file: File) -> BoxFuture { + match self.map_of_paths_to_digests.get(&file.path) { + Some(digest) => future::ok(*digest), + None => future::err(format!( + "Didn't know digest for path in remote execution response: {:?}", + file.path + )), + } + .to_boxed() + } + } + + let store = store.clone(); + Snapshot::digest_from_path_stats( + store.clone(), + &StoreOneOffRemoteDigest::new(path_map), + &path_stats, + ) + .map_err(move |error| { + format!( + "Error when storing the output file directory info in the remote CAS: {:?}", + error + ) + }) + .join(future::join_all(directory_digests)) + .and_then(|(files_digest, mut directory_digests)| { + directory_digests.push(files_digest); + Snapshot::merge_directories(store, directory_digests) + .map_err(|err| format!("Error when merging output files and directories: {}", err)) + }) + .to_boxed() +} + fn format_error(error: &bazel_protos::status::Status) -> String { let error_code_enum = bazel_protos::code::Code::from_i32(error.get_code()); let error_code = match error_code_enum { @@ -2853,7 +2852,7 @@ mod tests { fn extract_output_files_from_response( execute_response: &bazel_protos::remote_execution::ExecuteResponse, - ) -> Result { + ) -> Result { let cas = mock::StubCAS::builder() .file(&TestData::roland()) .directory(&TestDirectory::containing_roland()) @@ -2861,7 +2860,10 @@ mod tests { let command_runner = create_command_runner("".to_owned(), &cas); let mut runtime = tokio::runtime::Runtime::new().unwrap(); - runtime.block_on(command_runner.extract_output_files(&execute_response)) + runtime.block_on(super::extract_output_files( + command_runner.store.clone(), + &execute_response, + )) } fn make_any_proto(message: &dyn Message) -> protobuf::well_known_types::Any {