diff --git a/src/python/pants/engine/native.py b/src/python/pants/engine/native.py index 43d3331b17b..887b7c79e0e 100644 --- a/src/python/pants/engine/native.py +++ b/src/python/pants/engine/native.py @@ -740,6 +740,7 @@ def tc(constraint): self.context.utf8_buf(build_root), self.context.utf8_buf(work_dir), self.context.utf8_buf(local_store_dir), + self.context.utf8_buf(execution_options.local_execution_process_cache_namespace or ""), self.context.utf8_buf_buf(ignore_patterns), self.to_ids_buf(root_subject_types), # Remote execution config. diff --git a/src/python/pants/engine/scheduler.py b/src/python/pants/engine/scheduler.py index 72d04e661f8..498d2c731f1 100644 --- a/src/python/pants/engine/scheduler.py +++ b/src/python/pants/engine/scheduler.py @@ -88,6 +88,8 @@ def __init__( self._tasks = native.new_tasks() self._register_rules(rule_index) + # TODO: we REALLY need to have a datatype for all of these so that we don't mix up arguments by + # order. self._scheduler = native.new_scheduler( tasks=self._tasks, root_subject_types=self._root_subject_types, diff --git a/src/python/pants/option/global_options.py b/src/python/pants/option/global_options.py index 31160095c0e..e6b82b855b8 100644 --- a/src/python/pants/option/global_options.py +++ b/src/python/pants/option/global_options.py @@ -30,6 +30,7 @@ class GlobMatchErrorBehavior(enum('failure_behavior', ['ignore', 'warn', 'error' class ExecutionOptions(datatype([ + 'local_execution_process_cache_namespace', 'remote_store_server', 'remote_store_thread_count', 'remote_execution_server', @@ -52,6 +53,7 @@ class ExecutionOptions(datatype([ @classmethod def from_bootstrap_options(cls, bootstrap_options): return cls( + local_execution_process_cache_namespace=bootstrap_options.local_execution_process_cache_namespace, remote_store_server=bootstrap_options.remote_store_server, remote_execution_server=bootstrap_options.remote_execution_server, remote_store_thread_count=bootstrap_options.remote_store_thread_count, @@ -68,6 +70,7 @@ def from_bootstrap_options(cls, bootstrap_options): DEFAULT_EXECUTION_OPTIONS = ExecutionOptions( + local_execution_process_cache_namespace=None, remote_store_server=[], remote_store_thread_count=1, remote_execution_server=None, @@ -291,6 +294,11 @@ def register_bootstrap_options(cls, register): # This default is also hard-coded into the engine's rust code in # fs::Store::default_path default=os.path.expanduser('~/.cache/pants/lmdb_store')) + register('--local-execution-process-cache-namespace', advanced=True, + help="The cache namespace for local process execution. " + "Bump this to invalidate every artifact's local execution. " + "This is the hermetic execution equivalent of the legacy cache-key-gen-version " + "flag.") register('--remote-store-server', advanced=True, type=list, default=[], help='host:port of grpc server to use as remote execution file store.') register('--remote-store-thread-count', type=int, advanced=True, diff --git a/src/rust/engine/Cargo.lock b/src/rust/engine/Cargo.lock index 4c13841e6d2..a3c79591aaa 100644 --- a/src/rust/engine/Cargo.lock +++ b/src/rust/engine/Cargo.lock @@ -529,6 +529,7 @@ dependencies = [ "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "mock 0.0.1", "parking_lot 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)", + "prost 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "protobuf 2.0.5 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.80 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.58 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/src/rust/engine/fs/Cargo.toml b/src/rust/engine/fs/Cargo.toml index d94f98d38cd..e5fda7143f6 100644 --- a/src/rust/engine/fs/Cargo.toml +++ b/src/rust/engine/fs/Cargo.toml @@ -25,6 +25,7 @@ lazy_static = "1" lmdb = { git = "https://github.com/pantsbuild/lmdb-rs.git", rev = "06bdfbfc6348f6804127176e561843f214fc17f8" } log = "0.4" parking_lot = "0.6" +prost = "0.4" protobuf = { version = "2.0.4", features = ["with-bytes"] } serverset = { path = "../serverset" } sha2 = "0.8" diff --git a/src/rust/engine/fs/src/store.rs b/src/rust/engine/fs/src/store.rs index 1e1f5a2fbcf..2a383df44e6 100644 --- a/src/rust/engine/fs/src/store.rs +++ b/src/rust/engine/fs/src/store.rs @@ -2,10 +2,11 @@ use crate::{BackoffConfig, FileContent}; use bazel_protos; use boxfuture::{try_future, BoxFuture, Boxable}; -use bytes::Bytes; +use bytes::{Bytes, BytesMut}; use dirs; use futures::{future, Future}; -use hashing::Digest; +use hashing::{Digest, Fingerprint}; +use prost::{self, Message as ProstMessage}; use protobuf::Message; use serde_derive::Serialize; use std::collections::HashMap; @@ -138,6 +139,7 @@ impl Store { /// /// Store a file locally. /// + // TODO: use an enum instead of a bool, and describe in that struct what `initial_lease` means! pub fn store_file_bytes(&self, bytes: Bytes, initial_lease: bool) -> BoxFuture { self .local @@ -232,6 +234,102 @@ impl Store { ) } + pub fn fingerprint_from_bytes_unsafe(bytes: &Bytes) -> Fingerprint { + use digest::{Digest as DigestTrait, FixedOutput}; + let mut hasher = sha2::Sha256::default(); + hasher.input(&bytes); + Fingerprint::from_bytes_unsafe(hasher.fixed_result().as_slice()) + } + + pub fn digest_bytes(bytes: &Bytes) -> Digest { + let fingerprint = Self::fingerprint_from_bytes_unsafe(&bytes); + Digest(fingerprint, bytes.len()) + } + + // TODO: is there an existing way to do this conversion (without explicitly allocating a buf of + // the same length)? + pub fn encode_proto(proto: &P) -> Result { + let mut buf = BytesMut::with_capacity(proto.encoded_len()); + proto.encode(&mut buf)?; + Ok(buf.freeze()) + } + + pub fn encode_action_proto( + action: &bazel_protos::build::bazel::remote::execution::v2::Action, + ) -> Result { + Self::encode_proto(action) + .map_err(|e| format!("Error serializing Action proto {:?}: {:?}", action, e)) + } + + pub fn encode_action_result_proto( + action_result: &bazel_protos::build::bazel::remote::execution::v2::ActionResult, + ) -> Result { + Self::encode_proto(action_result).map_err(|e| { + format!( + "Error serializing ActionResult proto {:?}: {:?}", + action_result, e + ) + }) + } + + // TODO: convert every Result<_, String> into a typedef! + fn action_fingerprint( + action: &bazel_protos::build::bazel::remote::execution::v2::Action, + ) -> Result { + Self::encode_action_proto(&action) + .map(|bytes| super::Store::fingerprint_from_bytes_unsafe(&bytes)) + } + + /// ??? + pub fn record_process_result( + &self, + req: &bazel_protos::build::bazel::remote::execution::v2::Action, + res: &bazel_protos::build::bazel::remote::execution::v2::ActionResult, + ) -> BoxFuture<(), String> { + let converted_key_value_protos = Self::action_fingerprint(&req) + .and_then(|req| Self::encode_action_result_proto(&res).map(|res| (req, res))); + let store = self.clone(); + future::result(converted_key_value_protos) + .and_then(move |(action_fingerprint, result_bytes)| { + store + .local + .record_process_result(action_fingerprint, result_bytes) + }) + .to_boxed() + } + + fn deserialize_action_result( + result_bytes: &Bytes, + ) -> Result { + bazel_protos::build::bazel::remote::execution::v2::ActionResult::decode(result_bytes).map_err( + |e| { + format!( + "LMDB corruption: ActionResult bytes for {:?} were not valid: {:?}", + result_bytes, e + ) + }, + ) + } + + /// ??? + pub fn load_process_result( + &self, + req: &bazel_protos::build::bazel::remote::execution::v2::Action, + ) -> BoxFuture, String> + { + let store = self.clone(); + future::result(Self::action_fingerprint(&req)) + .and_then(move |action_fingerprint| store.local.load_process_result(action_fingerprint)) + .and_then(|maybe_bytes| { + let deserialized_result = match maybe_bytes { + Some(bytes) => Self::deserialize_action_result(&bytes).map(Some), + None => Ok(None), + }; + future::result(deserialized_result) + }) + .to_boxed() + } + /// /// Loads bytes from remote cas if required and possible (i.e. if remote is configured). Takes /// two functions f_local and f_remote. These functions are any validation or transformations you @@ -666,8 +764,7 @@ mod local { use boxfuture::{BoxFuture, Boxable}; use bytes::Bytes; - use digest::{Digest as DigestTrait, FixedOutput}; - use futures::future; + use futures::future::{self, Future}; use hashing::{Digest, Fingerprint}; use lmdb::Error::{KeyExist, NotFound}; use lmdb::{ @@ -675,7 +772,6 @@ mod local { RwTransaction, Transaction, WriteFlags, }; use log::{debug, error}; - use sha2::Sha256; use std; use std::collections::{BinaryHeap, HashMap}; use std::fmt; @@ -693,13 +789,51 @@ mod local { inner: Arc, } + /// + /// A simple wrapper for an lmdb database with an associated environment. + /// + struct DbEnv(Arc, Arc); + + impl DbEnv { + fn new(path: &Path, db_name: &str, flags: DatabaseFlags) -> Result, String> { + super::super::safe_create_dir_all(&path) + .map_err(|err| format!("Error making directory for store at {:?}: {:?}", path, err))?; + let process_executions_env = Environment::new() + .set_max_dbs(1) + .set_map_size(MAX_LOCAL_STORE_SIZE_BYTES) + .open(&path) + .map_err(|err| { + format!( + "Error making process execution Environment for db at {:?}: {:?}", + path, err + ) + })?; + process_executions_env + .create_db(Some(db_name), flags) + .map_err(|e| { + format!( + "Error creating/opening database named {:?} at {:?}: {}", + db_name, path, e + ) + }) + .map(|db| Arc::new(DbEnv(Arc::new(db), Arc::new(process_executions_env)))) + } + + // Arc requires calling a method to extract the constituent fields. + fn get(&self) -> (Arc, Arc) { + (Arc::clone(&self.0), Arc::clone(&self.1)) + } + } + struct InnerStore { pool: Arc, // Store directories separately from files because: // 1. They may have different lifetimes. // 2. It's nice to know whether we should be able to parse something as a proto. + // TODO: why are these `Result`s? file_dbs: Result, String>, directory_dbs: Result, String>, + process_execution_db: Result, String>, } impl ByteStore { @@ -707,11 +841,18 @@ mod local { let root = path.as_ref(); let files_root = root.join("files"); let directories_root = root.join("directories"); + let process_executions_root = root.join("process_executions"); + Ok(ByteStore { inner: Arc::new(InnerStore { pool: pool, file_dbs: ShardedLmdb::new(files_root.clone()).map(Arc::new), directory_dbs: ShardedLmdb::new(directories_root.clone()).map(Arc::new), + process_execution_db: DbEnv::new( + &process_executions_root, + "process_executions_content", + DatabaseFlags::empty(), + ), }), }) } @@ -933,11 +1074,7 @@ mod local { .inner .pool .spawn_fn(move || { - let fingerprint = { - let mut hasher = Sha256::default(); - hasher.input(&bytes); - Fingerprint::from_bytes_unsafe(hasher.fixed_result().as_slice()) - }; + let fingerprint = super::Store::fingerprint_from_bytes_unsafe(&bytes); let digest = Digest(fingerprint, bytes.len()); let (env, content_database, lease_database) = dbs.clone()?.get(&fingerprint); @@ -1008,6 +1145,76 @@ mod local { }) }).to_boxed() } + + pub fn record_process_result( + &self, + action_fingerprint: Fingerprint, + result_bytes: Bytes, + ) -> BoxFuture<(), String> { + let db_env = self.inner.process_execution_db.clone(); + let store = self.clone(); + future::result(db_env) + .and_then(move |db_env| { + let (db, env) = db_env.get(); + store.inner.pool.spawn_fn(move || { + let put_res = env.begin_rw_txn().and_then(|mut txn| { + txn.put( + *db, + &action_fingerprint, + &result_bytes, + // TODO: this was stolen from store_bytes() -- is it still applicable? + WriteFlags::NO_OVERWRITE, + )?; + txn.commit() + }); + match put_res { + Ok(()) => Ok(()), + Err(KeyExist) => Ok(()), + Err(err) => Err(format!( + "Error storing process execution action with fingerprint {:?}: {:?}", + action_fingerprint, err + )), + } + }) + }) + .to_boxed() + .to_boxed() + } + + pub fn load_process_result( + &self, + action_fingerprint: Fingerprint, + ) -> BoxFuture, String> { + let store = self.clone(); + let db_env = store.inner.process_execution_db.clone(); + self + .inner + .pool + .spawn_fn(move || { + let (db, env) = db_env.clone()?.get(); + let ro_txn = env + .begin_ro_txn() + // TODO: is there a reason load_bytes_with() uses {} instead of {:?} here? + .map_err(|err| { + format!( + "Failed to begin read transaction for process result: {}", + err + ) + }); + ro_txn.and_then(|txn| { + let db = db.clone(); + match txn.get(*db, &action_fingerprint) { + Ok(bytes) => Ok(Some(Bytes::from(bytes))), + Err(NotFound) => Ok(None), + Err(err) => Err(format!( + "Error loading result for process execution action with fingerprint {:?}: {:?}", + action_fingerprint, err + )), + } + }) + }) + .to_boxed() + } } // Each LMDB directory can have at most one concurrent writer. @@ -1708,12 +1915,10 @@ mod remote { use bazel_protos; use boxfuture::{BoxFuture, Boxable}; use bytes::{Bytes, BytesMut}; - use digest::{Digest as DigestTrait, FixedOutput}; use futures::{self, future, Future, IntoFuture, Sink, Stream}; use grpcio; - use hashing::{Digest, Fingerprint}; + use hashing::Digest; use serverset::{Retry, Serverset}; - use sha2::Sha256; use std::cmp::min; use std::collections::HashSet; use std::sync::Arc; @@ -1831,9 +2036,7 @@ mod remote { } pub fn store_bytes(&self, bytes: Bytes) -> BoxFuture { - let mut hasher = Sha256::default(); - hasher.input(&bytes); - let fingerprint = Fingerprint::from_bytes_unsafe(hasher.fixed_result().as_slice()); + let fingerprint = super::Store::fingerprint_from_bytes_unsafe(&bytes); let len = bytes.len(); let digest = Digest(fingerprint, len); let resource_name = format!( @@ -2403,14 +2606,12 @@ mod tests { use crate::pool::ResettablePool; use bazel_protos; use bytes::Bytes; - use digest::{Digest as DigestTrait, FixedOutput}; use futures::Future; use futures_timer::TimerHandle; use hashing::{Digest, Fingerprint}; use mock::StubCAS; use protobuf::Message; use serverset::BackoffConfig; - use sha2::Sha256; use std; use std::collections::HashMap; use std::fs::File; @@ -2739,11 +2940,8 @@ mod tests { .write_to_bytes() .expect("Error serializing proto"), ); - let non_canonical_directory_fingerprint = { - let mut hasher = Sha256::default(); - hasher.input(&non_canonical_directory_bytes); - Fingerprint::from_bytes_unsafe(hasher.fixed_result().as_slice()) - }; + let non_canonical_directory_fingerprint = + super::Store::fingerprint_from_bytes_unsafe(&non_canonical_directory_bytes); let directory_digest = Digest( non_canonical_directory_fingerprint, non_canonical_directory_bytes.len(), diff --git a/src/rust/engine/process_execution/src/cached_execution.rs b/src/rust/engine/process_execution/src/cached_execution.rs new file mode 100644 index 00000000000..8c48c733226 --- /dev/null +++ b/src/rust/engine/process_execution/src/cached_execution.rs @@ -0,0 +1,963 @@ +// Copyright 2019 Pants project contributors (see CONTRIBUTORS.md). +// Licensed under the Apache License, Version 2.0 (see LICENSE). + +#![deny(unused_must_use)] +// Enable all clippy lints except for many of the pedantic ones. It's a shame this needs to be copied and pasted across crates, but there doesn't appear to be a way to include inner attributes from a common source. +#![deny( + clippy::all, + clippy::default_trait_access, + clippy::expl_impl_clone_on_copy, + clippy::if_not_else, + clippy::needless_continue, + clippy::single_match_else, + clippy::unseparated_literal_suffix, + clippy::used_underscore_binding +)] +// It is often more clear to show that nothing is being moved. +#![allow(clippy::match_ref_pats)] +// Subjective style. +#![allow( + clippy::len_without_is_empty, + clippy::redundant_field_names, + clippy::too_many_arguments +)] +// Default isn't as big a deal as people seem to think it is. +#![allow( + clippy::new_without_default, + clippy::new_without_default_derive, + clippy::new_ret_no_self +)] +// Arc can be more clear than needing to grok Orderings: +#![allow(clippy::mutex_atomic)] + +use std::collections::HashMap; +use std::path::PathBuf; +use std::sync::Arc; + +use boxfuture::{try_future, BoxFuture, Boxable}; +use bytes::Bytes; +use futures::future::{self, Future}; +use log::debug; + +use fs::{File, PathStat}; +use hashing::{Digest, Fingerprint}; + +use super::{CommandRunner, ExecuteProcessRequest, ExecutionStats, FallibleExecuteProcessResult}; + +// Environment variable which is exclusively used for cache key invalidation. +// This may be not specified in an ExecuteProcessRequest, and may be populated only by the +// CommandRunner. +pub const CACHE_KEY_GEN_VERSION_ENV_VAR_NAME: &str = "PANTS_CACHE_KEY_GEN_VERSION"; + +/// ???/DON'T LET THE `cache_key_gen_version` BECOME A KITCHEN SINK!!! +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct CacheableExecuteProcessRequest { + req: ExecuteProcessRequest, + // TODO: give this a better type than Option (everywhere)! + cache_key_gen_version: Option, +} + +impl CacheableExecuteProcessRequest { + pub fn new(req: ExecuteProcessRequest, cache_key_gen_version: Option) -> Self { + CacheableExecuteProcessRequest { + req, + cache_key_gen_version, + } + } +} + +/// ???/why is this "cacheable"? +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct CacheableExecuteProcessResult { + pub stdout: Bytes, + pub stderr: Bytes, + pub exit_code: i32, + pub output_directory: hashing::Digest, +} + +impl CacheableExecuteProcessResult { + pub fn with_execution_attempts( + &self, + execution_attempts: Vec, + ) -> FallibleExecuteProcessResult { + FallibleExecuteProcessResult { + stdout: self.stdout.clone(), + stderr: self.stderr.clone(), + exit_code: self.exit_code, + output_directory: self.output_directory, + execution_attempts, + } + } +} + +/// ??? +pub enum OutputDirWrapping { + Direct, + TopLevelWrapped, +} + +/// ???/it's called "immediate" because it's a best-effort thing located locally (???) +pub trait ImmediateExecutionCache: Send + Sync { + fn record_process_result( + &self, + req: &ProcessRequest, + res: &ProcessResult, + ) -> BoxFuture<(), String>; + + fn load_process_result(&self, req: &ProcessRequest) -> BoxFuture, String>; +} + +/// ??? +#[derive(Clone)] +pub struct ActionSerializer { + store: fs::Store, +} + +impl ActionSerializer { + pub fn new(store: fs::Store) -> Self { + ActionSerializer { store } + } + + fn extract_digest( + digest: &bazel_protos::build::bazel::remote::execution::v2::Digest, + ) -> Result { + let fingerprint = Fingerprint::from_hex_string(&digest.hash)?; + Ok(Digest(fingerprint, digest.size_bytes as usize)) + } + + pub fn convert_digest( + digest: &Digest, + ) -> bazel_protos::build::bazel::remote::execution::v2::Digest { + let Digest(fingerprint, len) = digest; + bazel_protos::build::bazel::remote::execution::v2::Digest { + hash: fingerprint.to_hex(), + size_bytes: *len as i64, + } + } + + fn make_command( + req: &CacheableExecuteProcessRequest, + ) -> Result { + let CacheableExecuteProcessRequest { + req, + cache_key_gen_version, + } = req; + let arguments = req.argv.clone(); + + if req.env.contains_key(CACHE_KEY_GEN_VERSION_ENV_VAR_NAME) { + return Err(format!( + "Cannot set env var with name {} as that is reserved for internal use by pants", + CACHE_KEY_GEN_VERSION_ENV_VAR_NAME + )); + } + let mut env_var_pairs: Vec<(String, String)> = req.env.clone().into_iter().collect(); + if let Some(cache_key_gen_version) = cache_key_gen_version { + env_var_pairs.push(( + CACHE_KEY_GEN_VERSION_ENV_VAR_NAME.to_string(), + cache_key_gen_version.to_string(), + )); + } + let environment_variables: Vec<_> = env_var_pairs + .into_iter() + .map(|(name, value)| { + bazel_protos::build::bazel::remote::execution::v2::command::EnvironmentVariable { + name, + value, + } + }) + .collect(); + + let mut output_files = req + .output_files + .iter() + .map(|p| { + p.to_str() + .map(|s| s.to_owned()) + .ok_or_else(|| format!("Non-UTF8 output file path: {:?}", p)) + }) + .collect::, String>>()?; + output_files.sort(); + + let mut output_directories = req + .output_directories + .iter() + .map(|p| { + p.to_str() + .map(|s| s.to_owned()) + .ok_or_else(|| format!("Non-UTF8 output directory path: {:?}", p)) + }) + .collect::, String>>()?; + output_directories.sort(); + + // Ideally, the JDK would be brought along as part of the input directory, but we don't currently + // have support for that. The platform with which we're experimenting for remote execution + // supports this property, and will symlink .jdk to a system-installed JDK: + // https://github.com/twitter/scoot/pull/391 + let platform = if req.jdk_home.is_some() { + // This really should be req.jdk_home.map(|_| {...}), but that gives "cannot move out of + // borrowed content". + Some( + bazel_protos::build::bazel::remote::execution::v2::Platform { + properties: vec![ + bazel_protos::build::bazel::remote::execution::v2::platform::Property { + name: "JDK_SYMLINK".to_owned(), + value: ".jdk".to_owned(), + }, + ], + }, + ) + } else { + None + }; + + Ok(bazel_protos::build::bazel::remote::execution::v2::Command { + arguments, + environment_variables, + output_files, + output_directories, + platform, + working_directory: "".to_owned(), + }) + } + + pub fn encode_command_proto( + command: &bazel_protos::build::bazel::remote::execution::v2::Command, + ) -> Result { + fs::Store::encode_proto(command) + .map_err(|e| format!("Error serializing Command proto {:?}: {:?}", command, e)) + } + + /// ???/having the Command is necessary for making an ExecuteRequest proto, which we don't need in + /// this file. + pub fn make_action_with_command( + req: &CacheableExecuteProcessRequest, + ) -> Result< + ( + bazel_protos::build::bazel::remote::execution::v2::Action, + bazel_protos::build::bazel::remote::execution::v2::Command, + ), + String, + > { + let command = Self::make_command(&req)?; + let command_proto_bytes = Self::encode_command_proto(&command)?; + let action = bazel_protos::build::bazel::remote::execution::v2::Action { + command_digest: Some(Self::convert_digest(&fs::Store::digest_bytes( + &command_proto_bytes, + ))), + input_root_digest: Some(Self::convert_digest(&req.req.input_files)), + ..bazel_protos::build::bazel::remote::execution::v2::Action::default() + }; + Ok((action, command)) + } + + fn extract_stdout( + &self, + result: &bazel_protos::build::bazel::remote::execution::v2::ActionResult, + ) -> BoxFuture { + if let Some(ref stdout_digest) = result.stdout_digest { + let stdout_digest_result: Result = stdout_digest.into(); + let stdout_digest = try_future!( + stdout_digest_result.map_err(|err| format!("Error extracting stdout: {}", err)) + ); + self + .store + .load_file_bytes_with(stdout_digest, |v| v) + .map_err(move |error| { + format!( + "Error fetching stdout digest ({:?}): {:?}", + stdout_digest, error + ) + }) + .and_then(move |maybe_value| { + maybe_value.ok_or_else(|| { + format!( + "Couldn't find stdout digest ({:?}), when fetching.", + stdout_digest + ) + }) + }) + .to_boxed() + } else { + let stdout_raw = Bytes::from(result.stdout_raw.clone()); + let stdout_copy = stdout_raw.clone(); + self + .store + .store_file_bytes(stdout_raw, true) + .map_err(move |error| format!("Error storing raw stdout: {:?}", error)) + .map(|_| stdout_copy) + .to_boxed() + } + } + + fn extract_stderr( + &self, + result: &bazel_protos::build::bazel::remote::execution::v2::ActionResult, + ) -> BoxFuture { + if let Some(ref stderr_digest) = result.stderr_digest { + let stderr_digest_result: Result = stderr_digest.into(); + let stderr_digest = try_future!( + stderr_digest_result.map_err(|err| format!("Error extracting stderr: {}", err)) + ); + self + .store + .load_file_bytes_with(stderr_digest, |v| v) + .map_err(move |error| { + format!( + "Error fetching stderr digest ({:?}): {:?}", + stderr_digest, error + ) + }) + .and_then(move |maybe_value| { + maybe_value.ok_or_else(|| { + format!( + "Couldn't find stderr digest ({:?}), when fetching.", + stderr_digest + ) + }) + }) + .to_boxed() + } else { + let stderr_raw = Bytes::from(result.stderr_raw.clone()); + let stderr_copy = stderr_raw.clone(); + self + .store + .store_file_bytes(stderr_raw, true) + .map_err(move |error| format!("Error storing raw stderr: {:?}", error)) + .map(|_| stderr_copy) + .to_boxed() + } + } + + fn extract_output_files_with_single_containing_directory( + result: &bazel_protos::build::bazel::remote::execution::v2::ActionResult, + ) -> Result { + let invalid_containing_dir = || { + format!("Error: invalid output directory for result {:?}. An process execution extracted as an ActionResult from the process execution cache must will always contain a single top-level directory with the path \"\".", + result) + }; + + if result.output_files.is_empty() && result.output_directories.len() == 1 { + // A single directory (with a provided digest), which should be at the path "". + let dir = result.output_directories.last().unwrap(); + if dir.path.is_empty() { + let proto_digest = dir.tree_digest.clone().unwrap(); + Self::extract_digest(&proto_digest) + } else { + Err(invalid_containing_dir()) + } + } else { + Err(invalid_containing_dir()) + } + } + + fn extract_output_files( + &self, + result: &bazel_protos::build::bazel::remote::execution::v2::ActionResult, + ) -> BoxFuture { + // Get Digests of output Directories. + // Then we'll make a Directory for the output files, and merge them. + let output_directories = result.output_directories.clone(); + let mut directory_digests = Vec::with_capacity(output_directories.len() + 1); + for dir in output_directories.into_iter() { + let digest_result: Result = (&dir.tree_digest.unwrap()).into(); + let mut digest = future::done(digest_result).to_boxed(); + for component in dir.path.rsplit('/') { + let component = component.to_owned(); + let store = self.store.clone(); + digest = digest + .and_then(move |digest| { + let mut directory = bazel_protos::remote_execution::Directory::new(); + directory.mut_directories().push({ + let mut node = bazel_protos::remote_execution::DirectoryNode::new(); + node.set_name(component); + node.set_digest((&digest).into()); + node + }); + store.record_directory(&directory, true) + }) + .to_boxed(); + } + directory_digests + .push(digest.map_err(|err| format!("Error saving remote output directory: {}", err))); + } + + // Make a directory for the files + let mut path_map = HashMap::new(); + let output_files = result.output_files.clone(); + let path_stats_result: Result, String> = output_files + .into_iter() + .map(|output_file| { + let output_file_path_buf = PathBuf::from(output_file.path); + let digest = output_file + .digest + .ok_or_else(|| "No digest on remote execution output file".to_string())?; + let digest: Result = (&digest).into(); + path_map.insert(output_file_path_buf.clone(), digest?); + Ok(PathStat::file( + output_file_path_buf.clone(), + File { + path: output_file_path_buf, + is_executable: output_file.is_executable, + }, + )) + }) + .collect(); + + let path_stats = try_future!(path_stats_result); + + #[derive(Clone)] + struct StoreOneOffRemoteDigest { + map_of_paths_to_digests: HashMap, + } + + impl StoreOneOffRemoteDigest { + fn new(map: HashMap) -> StoreOneOffRemoteDigest { + StoreOneOffRemoteDigest { + map_of_paths_to_digests: map, + } + } + } + + impl fs::StoreFileByDigest for StoreOneOffRemoteDigest { + fn store_by_digest(&self, file: File) -> BoxFuture { + match self.map_of_paths_to_digests.get(&file.path) { + Some(digest) => future::ok(*digest), + None => future::err(format!( + "Didn't know digest for path in remote execution response: {:?}", + file.path + )), + } + .to_boxed() + } + } + + let store = self.store.clone(); + fs::Snapshot::digest_from_path_stats( + self.store.clone(), + &StoreOneOffRemoteDigest::new(path_map), + &path_stats, + ) + .map_err(move |error| { + format!( + "Error when storing the output file directory info in the remote CAS: {:?}", + error + ) + }) + .join(future::join_all(directory_digests)) + .and_then(|(files_digest, mut directory_digests)| { + directory_digests.push(files_digest); + fs::Snapshot::merge_directories(store, directory_digests) + .map_err(|err| format!("Error when merging output files and directories: {}", err)) + }) + .to_boxed() + } + + pub fn convert_request_to_action( + req: &CacheableExecuteProcessRequest, + ) -> Result { + let (action, _) = Self::make_action_with_command(req)?; + Ok(action) + } + + pub fn convert_result_to_action_result( + res: &CacheableExecuteProcessResult, + ) -> bazel_protos::build::bazel::remote::execution::v2::ActionResult { + bazel_protos::build::bazel::remote::execution::v2::ActionResult { + output_files: vec![], + output_directories: vec![ + bazel_protos::build::bazel::remote::execution::v2::OutputDirectory { + path: "".to_string(), + tree_digest: Some(Self::convert_digest(&res.output_directory)), + }, + ], + exit_code: res.exit_code, + stdout_raw: res.stdout.to_vec(), + stdout_digest: Some(Self::convert_digest(&fs::Store::digest_bytes(&res.stdout))), + stderr_raw: res.stderr.to_vec(), + stderr_digest: Some(Self::convert_digest(&fs::Store::digest_bytes(&res.stderr))), + execution_metadata: None, + } + } + + pub fn extract_action_result( + &self, + res: &bazel_protos::build::bazel::remote::execution::v2::ActionResult, + wrapping: OutputDirWrapping, + ) -> BoxFuture { + let exit_code = res.exit_code; + let extracted_output_files = match wrapping { + OutputDirWrapping::Direct => self.extract_output_files(&res), + OutputDirWrapping::TopLevelWrapped => future::result( + Self::extract_output_files_with_single_containing_directory(&res), + ) + .to_boxed(), + }; + self + .extract_stdout(&res) + .join(self.extract_stderr(&res)) + .join(extracted_output_files) + .map( + move |((stdout, stderr), output_directory)| CacheableExecuteProcessResult { + stdout, + stderr, + exit_code, + output_directory, + }, + ) + .to_boxed() + } +} + +impl ImmediateExecutionCache + for ActionSerializer +{ + fn record_process_result( + &self, + req: &CacheableExecuteProcessRequest, + res: &CacheableExecuteProcessResult, + ) -> BoxFuture<(), String> { + let action_request = Self::convert_request_to_action(&req); + let action_result = Self::convert_result_to_action_result(&res); + let store = self.store.clone(); + // TODO: I wish there was a shorthand syntax to extract multiple fields from a reference to a + // struct while cloning them. + let stdout = res.stdout.clone(); + let stderr = res.stderr.clone(); + future::result(action_request) + .and_then(move |action_request| { + store + .store_file_bytes(stdout, true) + .join(store.store_file_bytes(stderr, true)) + .and_then(move |(_, _)| { + // NB: We wait until the stdout and stderr digests have been successfully recorded, so + // that we don't later attempt to read digests in the `action_result` which don't exist. + store.record_process_result(&action_request, &action_result) + }) + .to_boxed() + }) + .to_boxed() + } + + fn load_process_result( + &self, + req: &CacheableExecuteProcessRequest, + ) -> BoxFuture, String> { + let store = self.store.clone(); + let cache = self.clone(); + future::result(Self::convert_request_to_action(req)) + .and_then(move |action_proto| store.load_process_result(&action_proto)) + .and_then(move |maybe_action_result| match maybe_action_result { + Some(action_result) => cache + // NB: FallibleExecuteProcessResult always wraps everything in a *single* output + // directory, which is then converted into an OutputDirectory for the ActionResult proto + // at the path "" in convert_result_to_action_result(), so we have to pull the contents + // out of that single dir with the path "". + .extract_action_result(&action_result, OutputDirWrapping::TopLevelWrapped) + .map(Some) + .to_boxed(), + None => future::result(Ok(None)).to_boxed(), + }) + .to_boxed() + } +} + +/// +/// A CommandRunner wrapper that attempts to cache process executions. +/// +#[derive(Clone)] +pub struct CachingCommandRunner { + inner: Arc>, + cache: Arc< + Box>, + >, + cache_key_gen_version: Option, +} + +impl CachingCommandRunner { + pub fn from_store( + inner: Box, + store: fs::Store, + cache_key_gen_version: Option, + ) -> Self { + let action_serializer = ActionSerializer::new(store); + let boxed_cache = Box::new(action_serializer) + as Box< + dyn ImmediateExecutionCache, + >; + Self::new(inner, boxed_cache, cache_key_gen_version) + } + + pub fn new( + inner: Box, + cache: Box< + dyn ImmediateExecutionCache, + >, + cache_key_gen_version: Option, + ) -> Self { + CachingCommandRunner { + inner: Arc::new(inner), + cache: Arc::new(cache), + cache_key_gen_version, + } + } +} + +impl CommandRunner for CachingCommandRunner { + fn run(&self, req: ExecuteProcessRequest) -> BoxFuture { + let cacheable_request = + CacheableExecuteProcessRequest::new(req.clone(), self.cache_key_gen_version.clone()); + let cache = self.cache.clone(); + let inner = self.inner.clone(); + cache + .load_process_result(&cacheable_request) + .and_then(move |cache_fetch| match cache_fetch { + // We have a cache hit! + Some(cached_execution_result) => { + debug!( + "cached execution for request {:?}! {:?}", + req.clone(), + cached_execution_result + ); + future::result(Ok(cached_execution_result)).to_boxed() + } + // We have to actually run the process now. + None => inner + .run(req.clone()) + .and_then(move |res| { + debug!("uncached execution for request {:?}: {:?}", req, res); + let cacheable_process_result = res.into_cacheable(); + cache + .record_process_result(&cacheable_request, &cacheable_process_result) + .map(move |()| { + debug!( + "request {:?} should now be cached as {:?}", + &cacheable_request, &cacheable_process_result, + ); + cacheable_process_result + }) + }) + .to_boxed(), + }) + // NB: We clear metadata about execution attempts when returning a cacheable process execution + // result. + .map(|cacheable_process_result| cacheable_process_result.with_execution_attempts(vec![])) + .to_boxed() + } +} + +#[cfg(test)] +mod tests { + use super::{ + ActionSerializer, CacheableExecuteProcessRequest, CacheableExecuteProcessResult, + CachingCommandRunner, CommandRunner, ExecuteProcessRequest, FallibleExecuteProcessResult, + ImmediateExecutionCache, + }; + use crate::local::testutils::find_bash; + use futures::future::Future; + use hashing::{Digest, Fingerprint}; + use std::collections::{BTreeMap, BTreeSet}; + use std::ops::Deref; + use std::path::Path; + use std::path::PathBuf; + use std::sync::Arc; + use std::time::Duration; + use tempfile::TempDir; + use testutil::data::{TestData, TestDirectory}; + use testutil::owned_string_vec; + + #[test] + fn encode_action() { + let input_directory = TestDirectory::containing_roland(); + let req = ExecuteProcessRequest { + argv: owned_string_vec(&["/bin/echo", "yo"]), + env: vec![("SOME".to_owned(), "value".to_owned())] + .into_iter() + .collect(), + input_files: input_directory.digest(), + // Intentionally poorly sorted: + output_files: vec!["path/to/file", "other/file"] + .into_iter() + .map(PathBuf::from) + .collect(), + output_directories: vec!["directory/name"] + .into_iter() + .map(PathBuf::from) + .collect(), + timeout: Duration::from_millis(1000), + description: "some description".to_owned(), + jdk_home: None, + }; + + let want_action = bazel_protos::build::bazel::remote::execution::v2::Action { + command_digest: Some(ActionSerializer::convert_digest(&Digest( + Fingerprint::from_hex_string( + "cc4ddd3085aaffbe0abce22f53b30edbb59896bb4a4f0d76219e48070cd0afe1", + ) + .unwrap(), + 72, + ))), + input_root_digest: Some(ActionSerializer::convert_digest(&input_directory.digest())), + ..Default::default() + }; + + assert_eq!( + Ok(want_action), + ActionSerializer::convert_request_to_action(&CacheableExecuteProcessRequest::new(req, None)) + ); + } + + #[test] + fn encode_empty_action_result() { + let testdata_empty = TestData::empty(); + + let empty_proto_digest = bazel_protos::build::bazel::remote::execution::v2::Digest { + hash: fs::EMPTY_DIGEST.0.to_hex(), + size_bytes: fs::EMPTY_DIGEST.1 as i64, + }; + + let want_action_result = bazel_protos::build::bazel::remote::execution::v2::ActionResult { + output_files: vec![], + output_directories: vec![ + bazel_protos::build::bazel::remote::execution::v2::OutputDirectory { + path: "".to_string(), + tree_digest: Some(empty_proto_digest.clone()), + }, + ], + exit_code: 0, + stdout_raw: vec![], + stdout_digest: Some(empty_proto_digest.clone()), + stderr_raw: vec![], + stderr_digest: Some(empty_proto_digest.clone()), + execution_metadata: None, + }; + + let empty_result = FallibleExecuteProcessResult { + stdout: testdata_empty.bytes(), + stderr: testdata_empty.bytes(), + exit_code: 0, + output_directory: fs::EMPTY_DIGEST, + execution_attempts: vec![], + }; + + assert_eq!( + want_action_result, + ActionSerializer::convert_result_to_action_result(&empty_result.into_cacheable()) + ); + } + + #[test] + #[cfg(unix)] + fn cached_process_execution_stdout() { + let random_perl = output_only_process_request(owned_string_vec(&[ + "/usr/bin/perl", + "-e", + "print(rand(10))", + ])); + let store_dir = TempDir::new().unwrap(); + let work_dir = TempDir::new().unwrap(); + let (_, base_runner, action_serializer) = cache_in_dir(store_dir.path(), work_dir.path()); + let cacheable_perl = CacheableExecuteProcessRequest { + req: random_perl.clone(), + cache_key_gen_version: None, + }; + assert_eq!( + Ok(None), + action_serializer + .load_process_result(&cacheable_perl) + .wait() + ); + let caching_runner = make_caching_runner(base_runner.clone(), action_serializer.clone(), None); + let process_result = caching_runner.run(random_perl.clone()).wait().unwrap(); + // The process run again without caching is different. + let base_process_result = base_runner.run(random_perl.clone()).wait().unwrap(); + assert!(base_process_result != process_result); + assert_eq!(0, process_result.exit_code); + // A "cacheable" process execution result won't have e.g. the number of attempts that the + // process was tried, for idempotency, but everything else should be the same. + assert_eq!( + process_result.clone().into_cacheable(), + action_serializer + .load_process_result(&cacheable_perl) + .wait() + .unwrap() + .unwrap() + ); + let perl_number = String::from_utf8(process_result.stdout.deref().to_vec()) + .unwrap() + .parse::() + .unwrap(); + // Try again and verify the result is cached (the random number is still the same). + let second_process_result = caching_runner.run(random_perl.clone()).wait().unwrap(); + let second_perl_number = String::from_utf8(second_process_result.stdout.deref().to_vec()) + .unwrap() + .parse::() + .unwrap(); + assert_eq!(perl_number, second_perl_number); + // See that the result is invalidated if a `cache_key_gen_version` is provided. + let new_key = "xx".to_string(); + let new_cacheable_perl = CacheableExecuteProcessRequest { + req: random_perl.clone(), + cache_key_gen_version: Some(new_key.clone()), + }; + assert_eq!( + Ok(None), + action_serializer + .load_process_result(&new_cacheable_perl) + .wait() + ); + let new_caching_runner = make_caching_runner( + base_runner.clone(), + action_serializer.clone(), + Some(new_key), + ); + let new_process_result = new_caching_runner.run(random_perl.clone()).wait().unwrap(); + assert_eq!(0, new_process_result.exit_code); + // The new `cache_key_gen_version` is propagated to the requests made against the + // CachingCommandRunner. + assert_eq!( + new_process_result.clone().into_cacheable(), + action_serializer + .load_process_result(&new_cacheable_perl) + .wait() + .unwrap() + .unwrap() + ); + let new_perl_number = String::from_utf8(new_process_result.stdout.deref().to_vec()) + .unwrap() + .parse::() + .unwrap(); + // The output of the rand(10) call in the perl invocation is different, because the process + // execution wasn't cached. + assert!(new_perl_number != perl_number); + // Make sure that changing the cache key string from non-None to non-None also invalidates the + // process result. + let second_string_key = "yy".to_string(); + let second_cache_string_perl = CacheableExecuteProcessRequest { + req: random_perl.clone(), + cache_key_gen_version: Some(second_string_key.clone()), + }; + assert_eq!( + Ok(None), + action_serializer + .load_process_result(&second_cache_string_perl) + .wait() + ); + let second_string_caching_runner = make_caching_runner( + base_runner.clone(), + action_serializer.clone(), + Some(second_string_key), + ); + let second_string_process_result = second_string_caching_runner + .run(random_perl) + .wait() + .unwrap(); + assert_eq!(0, second_string_process_result.exit_code); + assert_eq!( + second_string_process_result.clone().into_cacheable(), + action_serializer + .load_process_result(&second_cache_string_perl) + .wait() + .unwrap() + .unwrap() + ); + let second_string_perl_number = + String::from_utf8(second_string_process_result.stdout.deref().to_vec()) + .unwrap() + .parse::() + .unwrap(); + // The new result is distinct from all the previously cached invocations. + assert!(second_string_perl_number != perl_number); + assert!(second_string_perl_number != new_perl_number); + } + + #[test] + #[cfg(unix)] + fn cached_process_execution_output_files() { + let make_file = ExecuteProcessRequest { + argv: vec![ + find_bash(), + "-c".to_owned(), + "/usr/bin/perl -e 'print(rand(10))' > wow.txt".to_string(), + ], + env: BTreeMap::new(), + input_files: fs::EMPTY_DIGEST, + output_files: vec!["wow.txt"].into_iter().map(PathBuf::from).collect(), + output_directories: BTreeSet::new(), + timeout: Duration::from_millis(1000), + description: "make a nondeterministic file".to_string(), + jdk_home: None, + }; + let store_dir = TempDir::new().unwrap(); + let work_dir = TempDir::new().unwrap(); + let (_, base_runner, action_serializer) = cache_in_dir(store_dir.path(), work_dir.path()); + let cacheable_make_file = CacheableExecuteProcessRequest { + req: make_file.clone(), + cache_key_gen_version: None, + }; + assert_eq!( + Ok(None), + action_serializer + .load_process_result(&cacheable_make_file) + .wait() + ); + let caching_runner = make_caching_runner(base_runner.clone(), action_serializer.clone(), None); + let base_process_result = base_runner.run(make_file.clone()).wait().unwrap(); + let process_result = caching_runner.run(make_file.clone()).wait().unwrap(); + // The process run again without caching is different. + assert!(base_process_result != process_result); + assert_eq!(0, process_result.exit_code); + assert_eq!( + process_result.clone().into_cacheable(), + action_serializer + .load_process_result(&cacheable_make_file) + .wait() + .unwrap() + .unwrap() + ); + let second_process_result = caching_runner.run(make_file.clone()).wait().unwrap(); + assert_eq!(second_process_result, process_result); + } + + fn output_only_process_request(argv: Vec) -> ExecuteProcessRequest { + ExecuteProcessRequest { + argv, + env: BTreeMap::new(), + input_files: fs::EMPTY_DIGEST, + output_files: BTreeSet::new(), + output_directories: BTreeSet::new(), + timeout: Duration::from_millis(1000), + description: "write some output".to_string(), + jdk_home: None, + } + } + + fn cache_in_dir( + store_dir: &Path, + work_dir: &Path, + ) -> (fs::Store, crate::local::CommandRunner, ActionSerializer) { + let pool = Arc::new(fs::ResettablePool::new("test-pool-".to_owned())); + let store = fs::Store::local_only(store_dir, pool.clone()).unwrap(); + let action_serializer = ActionSerializer::new(store.clone()); + let base_runner = + crate::local::CommandRunner::new(store.clone(), pool, work_dir.to_path_buf(), true); + (store, base_runner, action_serializer) + } + + fn make_caching_runner( + base_runner: crate::local::CommandRunner, + action_serializer: ActionSerializer, + cache_key_gen_version: Option, + ) -> CachingCommandRunner { + CachingCommandRunner::new( + Box::new(base_runner) as Box, + Box::new(action_serializer) + as Box< + dyn ImmediateExecutionCache< + CacheableExecuteProcessRequest, + CacheableExecuteProcessResult, + >, + >, + cache_key_gen_version, + ) + } +} diff --git a/src/rust/engine/process_execution/src/lib.rs b/src/rust/engine/process_execution/src/lib.rs index 5f71fe0f231..47a7af7794b 100644 --- a/src/rust/engine/process_execution/src/lib.rs +++ b/src/rust/engine/process_execution/src/lib.rs @@ -40,8 +40,13 @@ use std::time::Duration; use async_semaphore::AsyncSemaphore; +pub mod cached_execution; pub mod local; pub mod remote; +pub use crate::cached_execution::{ + ActionSerializer, CacheableExecuteProcessRequest, CacheableExecuteProcessResult, + CachingCommandRunner, ImmediateExecutionCache, +}; /// /// A process to be executed. @@ -102,11 +107,14 @@ pub struct FallibleExecuteProcessResult { pub execution_attempts: Vec, } -#[cfg(test)] impl FallibleExecuteProcessResult { - pub fn without_execution_attempts(mut self) -> Self { - self.execution_attempts = vec![]; - self + pub fn into_cacheable(self) -> CacheableExecuteProcessResult { + CacheableExecuteProcessResult { + stdout: self.stdout, + stderr: self.stderr, + exit_code: self.exit_code, + output_directory: self.output_directory, + } } } diff --git a/src/rust/engine/process_execution/src/local.rs b/src/rust/engine/process_execution/src/local.rs index baf052442cd..57bf0cf00ea 100644 --- a/src/rust/engine/process_execution/src/local.rs +++ b/src/rust/engine/process_execution/src/local.rs @@ -21,6 +21,7 @@ use super::{ExecuteProcessRequest, FallibleExecuteProcessResult}; use bytes::{Bytes, BytesMut}; +#[derive(Clone)] pub struct CommandRunner { store: fs::Store, fs_pool: Arc, @@ -292,6 +293,7 @@ impl super::CommandRunner for CommandRunner { }) .map(Arc::new) .and_then(|posix_fs| { + eprintln!("output_file_paths: {:?}", output_file_paths); CommandRunner::construct_output_snapshot( store, posix_fs, @@ -336,13 +338,12 @@ mod tests { use super::super::CommandRunner as CommandRunnerTrait; use super::{ExecuteProcessRequest, FallibleExecuteProcessResult}; + use crate::local::testutils::*; use fs; use futures::Future; use std; use std::collections::{BTreeMap, BTreeSet}; - use std::env; - use std::os::unix::fs::PermissionsExt; - use std::path::{Path, PathBuf}; + use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; use tempfile::TempDir; @@ -882,8 +883,15 @@ mod tests { }; runner.run(req).wait() } +} + +#[cfg(test)] +pub mod testutils { + use std::env; + use std::os::unix::fs::PermissionsExt; + use std::path::{Path, PathBuf}; - fn find_bash() -> String { + pub fn find_bash() -> String { which("bash") .expect("No bash on PATH") .to_str() @@ -891,7 +899,7 @@ mod tests { .to_owned() } - fn which(executable: &str) -> Option { + pub fn which(executable: &str) -> Option { if let Some(paths) = env::var_os("PATH") { for path in env::split_paths(&paths) { let executable_path = path.join(executable); @@ -903,7 +911,7 @@ mod tests { None } - fn is_executable(path: &Path) -> bool { + pub fn is_executable(path: &Path) -> bool { std::fs::metadata(path) .map(|meta| meta.permissions().mode() & 0o100 == 0o100) .unwrap_or(false) diff --git a/src/rust/engine/process_execution/src/remote.rs b/src/rust/engine/process_execution/src/remote.rs index 930c6adb79a..78a9c1cb245 100644 --- a/src/rust/engine/process_execution/src/remote.rs +++ b/src/rust/engine/process_execution/src/remote.rs @@ -1,23 +1,22 @@ -use std::collections::HashMap; -use std::path::PathBuf; use std::time::{Duration, Instant}; use bazel_protos; use boxfuture::{try_future, BoxFuture, Boxable}; -use bytes::Bytes; -use digest::{Digest as DigestTrait, FixedOutput}; -use fs::{self, File, PathStat, Store}; +use fs::{self, Store}; use futures::{future, Future, Stream}; use futures_timer::Delay; use hashing::{Digest, Fingerprint}; use log::{debug, trace, warn}; use parking_lot::Mutex; -use prost::Message; +use prost::{self, Message}; use protobuf::{self, Message as GrpcioMessage, ProtobufEnum}; -use sha2::Sha256; use time; -use super::{ExecuteProcessRequest, ExecutionStats, FallibleExecuteProcessResult}; +use super::{ + ActionSerializer, CacheableExecuteProcessRequest, ExecuteProcessRequest, ExecutionStats, + FallibleExecuteProcessResult, +}; +use crate::cached_execution::OutputDirWrapping; use std; use std::cmp::min; @@ -29,11 +28,6 @@ use tower_grpc::Request; use tower_h2::client; use tower_util::MakeService; -// Environment variable which is exclusively used for cache key invalidation. -// This may be not specified in an ExecuteProcessRequest, and may be populated only by the -// CommandRunner. -const CACHE_KEY_GEN_VERSION_ENV_VAR_NAME: &str = "PANTS_CACHE_KEY_GEN_VERSION"; - #[derive(Debug)] enum OperationOrStatus { Operation(bazel_protos::google::longrunning::Operation), @@ -59,6 +53,7 @@ pub struct CommandRunner { clients: futures::future::Shared>, store: Store, futures_timer_thread: resettable::Resettable, + action_serializer: ActionSerializer, } #[derive(Debug, PartialEq)] @@ -121,6 +116,60 @@ impl CommandRunner { } } +#[derive(Clone, Debug)] +pub struct BazelProcessExecutionRequest { + action: bazel_protos::build::bazel::remote::execution::v2::Action, + command: bazel_protos::build::bazel::remote::execution::v2::Command, + execute_request: bazel_protos::build::bazel::remote::execution::v2::ExecuteRequest, +} + +impl BazelProcessExecutionRequest { + fn new( + action: bazel_protos::build::bazel::remote::execution::v2::Action, + command: bazel_protos::build::bazel::remote::execution::v2::Command, + execute_request: bazel_protos::build::bazel::remote::execution::v2::ExecuteRequest, + ) -> Self { + BazelProcessExecutionRequest { + action, + command, + execute_request, + } + } + + fn convert_execute_request( + req: &CacheableExecuteProcessRequest, + instance_name: &Option, + ) -> Result { + let (action, command) = ActionSerializer::make_action_with_command(&req)?; + let action_proto_bytes = &fs::Store::encode_action_proto(&action)?; + let execute_request = bazel_protos::build::bazel::remote::execution::v2::ExecuteRequest { + action_digest: Some(ActionSerializer::convert_digest(&fs::Store::digest_bytes( + &action_proto_bytes, + ))), + skip_cache_lookup: false, + instance_name: instance_name.clone().unwrap_or_default(), + execution_policy: None, + results_cache_policy: None, + }; + Ok(BazelProcessExecutionRequest::new( + action, + command, + execute_request, + )) + } +} + +#[cfg(test)] +impl PartialEq for BazelProcessExecutionRequest { + fn eq(&self, other: &Self) -> bool { + (&self.action, &self.command, &self.execute_request) + == (&other.action, &other.command, &other.execute_request) + } +} + +#[cfg(test)] +impl Eq for BazelProcessExecutionRequest {} + impl super::CommandRunner for CommandRunner { /// /// Runs a command via a gRPC service implementing the Bazel Remote Execution API @@ -142,176 +191,20 @@ impl super::CommandRunner for CommandRunner { /// TODO: Request jdk_home be created if set. /// fn run(&self, req: ExecuteProcessRequest) -> BoxFuture { - let clients = self.clients.clone(); - - let store = self.store.clone(); - let execute_request_result = - make_execute_request(&req, &self.instance_name, &self.cache_key_gen_version); - - let ExecuteProcessRequest { - description, - timeout, - input_files, - .. - } = req; - - let description2 = description.clone(); - - match execute_request_result { - Ok((action, command, execute_request)) => { - let command_runner = self.clone(); - let command_runner2 = self.clone(); - let execute_request2 = execute_request.clone(); - let futures_timer_thread = self.futures_timer_thread.clone(); - - let store2 = store.clone(); - let mut history = ExecutionHistory::default(); - - self - .store_proto_locally(&command) - .join(self.store_proto_locally(&action)) - .and_then(move |(command_digest, action_digest)| { - store2.ensure_remote_has_recursive(vec![command_digest, action_digest, input_files]) - }) - .and_then(move |summary| { - history.current_attempt += summary; - trace!( - "Executing remotely request: {:?} (command: {:?})", - execute_request, - command - ); - command_runner - .oneshot_execute(execute_request) - .join(future::ok(history)) - }) - .and_then(move |(operation, history)| { - let start_time = Instant::now(); - - future::loop_fn( - (history, operation, 0), - move |(mut history, operation, iter_num)| { - let description = description.clone(); - - let execute_request2 = execute_request2.clone(); - let store = store.clone(); - let clients = clients.clone(); - let command_runner2 = command_runner2.clone(); - let command_runner3 = command_runner2.clone(); - let futures_timer_thread = futures_timer_thread.clone(); - let f = command_runner2.extract_execute_response(operation, &mut history); - f.map(future::Loop::Break).or_else(move |value| { - match value { - ExecutionError::Fatal(err) => future::err(err).to_boxed(), - ExecutionError::MissingDigests(missing_digests) => { - let ExecutionHistory { - mut attempts, - current_attempt, - } = history; - - trace!( - "Server reported missing digests ({:?}); trying to upload: {:?}", - current_attempt, - missing_digests, - ); - - attempts.push(current_attempt); - let history = ExecutionHistory { - attempts, - current_attempt: ExecutionStats::default(), - }; - - let execute_request = execute_request2.clone(); - store - .ensure_remote_has_recursive(missing_digests) - .and_then(move |summary| { - let mut history = history; - history.current_attempt += summary; - command_runner2 - .oneshot_execute(execute_request) - .join(future::ok(history)) - }) - // Reset `iter_num` on `MissingDigests` - .map(|(operation, history)| future::Loop::Continue((history, operation, 0))) - .to_boxed() - } - ExecutionError::NotFinished(operation_name) => { - let operation_name2 = operation_name.clone(); - let operation_request = - bazel_protos::google::longrunning::GetOperationRequest { - name: operation_name.clone(), - }; - - let backoff_period = min( - CommandRunner::BACKOFF_MAX_WAIT_MILLIS, - (1 + iter_num) * CommandRunner::BACKOFF_INCR_WAIT_MILLIS, - ); - - // take the grpc result and cancel the op if too much time has passed. - let elapsed = start_time.elapsed(); - - if elapsed > timeout { - future::err(format!( - "Exceeded time out of {:?} with {:?} for operation {}, {}", - timeout, elapsed, operation_name, description - )) - .to_boxed() - } else { - // maybe the delay here should be the min of remaining time and the backoff period - Delay::new_handle( - Instant::now() + Duration::from_millis(backoff_period), - futures_timer_thread.with(|thread| thread.handle()), - ) - .map_err(move |e| { - format!( - "Future-Delay errored at operation result polling for {}, {}: {}", - operation_name, description, e - ) - }) - .and_then(move |_| { - clients - .map_err(|err| format!("{}", err)) - .and_then(move |clients| { - clients - .operations_client - .lock() - .get_operation(command_runner3.make_request(operation_request)) - .map(|r| r.into_inner()) - .or_else(move |err| { - rpcerror_recover_cancelled(operation_name2, err) - }) - .map_err(towergrpcerror_to_string) - }) - .map(OperationOrStatus::Operation) - .map(move |operation| { - future::Loop::Continue((history, operation, iter_num + 1)) - }) - .to_boxed() - }) - .to_boxed() - } - } - } - }) - }, - ) - }) - .map(move |resp| { - let mut attempts = String::new(); - for (i, attempt) in resp.execution_attempts.iter().enumerate() { - attempts += &format!("\nAttempt {}: {:?}", i, attempt); - } - debug!( - "Finished remote exceution of {} after {} attempts: Stats: {}", - description2, - resp.execution_attempts.len(), - attempts - ); - resp - }) + let cacheable_input_request = + CacheableExecuteProcessRequest::new(req.clone(), self.cache_key_gen_version.clone()); + let converted_request_protos = BazelProcessExecutionRequest::convert_execute_request( + &cacheable_input_request, + &self.instance_name, + ); + let command_runner = self.clone(); + future::result(converted_request_protos) + .and_then(move |action_request| { + command_runner + .execute_request(req, action_request.clone()) .to_boxed() - } - Err(err) => future::err(err).to_boxed(), - } + }) + .to_boxed() } } @@ -385,11 +278,13 @@ impl CommandRunner { .shared(); Ok(CommandRunner { cache_key_gen_version, - instance_name, + // TODO: this may be able to be removed! + instance_name: instance_name.clone(), authorization_header: oauth_bearer_token.map(|t| format!("Bearer {}", t)), clients, - store, + store: store.clone(), futures_timer_thread, + action_serializer: ActionSerializer::new(store.clone()), }) } @@ -403,17 +298,15 @@ impl CommandRunner { request } - fn store_proto_locally( + fn store_proto_locally( &self, proto: &P, ) -> impl Future { let store = self.store.clone(); future::done( - proto - .write_to_bytes() - .map_err(|e| format!("Error serializing proto {:?}", e)), + fs::Store::encode_proto(proto).map_err(|e| format!("Error serializing proto {:?}", e)), ) - .and_then(move |command_bytes| store.store_file_bytes(Bytes::from(command_bytes), true)) + .and_then(move |command_bytes| store.store_file_bytes(command_bytes, true)) .map_err(|e| format!("Error saving proto to local store: {:?}", e)) } @@ -497,17 +390,14 @@ impl CommandRunner { if status.code == bazel_protos::google::rpc::Code::Ok.into() { if let Some(result) = maybe_result { return self - .extract_stdout(&result) - .join(self.extract_stderr(&result)) - .join(self.extract_output_files(&result)) - .and_then(move |((stdout, stderr), output_directory)| { - Ok(FallibleExecuteProcessResult { - stdout: stdout, - stderr: stderr, - exit_code: result.exit_code, - output_directory: output_directory, - execution_attempts: execution_attempts, - }) + .action_serializer + .extract_action_result(&result, OutputDirWrapping::Direct) + .map(|cacheable_result| cacheable_result.with_execution_attempts(execution_attempts)) + .map_err(move |err| { + ExecutionError::Fatal(format!( + "error decoding process result {:?}: {:?}", + result, err + )) }) .to_boxed(); } else { @@ -603,280 +493,175 @@ impl CommandRunner { .to_boxed() } - fn extract_stdout( + fn execute_request( &self, - result: &bazel_protos::build::bazel::remote::execution::v2::ActionResult, - ) -> BoxFuture { - if let Some(ref stdout_digest) = result.stdout_digest { - let stdout_digest_result: Result = stdout_digest.into(); - let stdout_digest = try_future!(stdout_digest_result - .map_err(|err| ExecutionError::Fatal(format!("Error extracting stdout: {}", err)))); - self - .store - .load_file_bytes_with(stdout_digest, |v| v) - .map_err(move |error| { - ExecutionError::Fatal(format!( - "Error fetching stdout digest ({:?}): {:?}", - stdout_digest, error - )) - }) - .and_then(move |maybe_value| { - maybe_value.ok_or_else(|| { - ExecutionError::Fatal(format!( - "Couldn't find stdout digest ({:?}), when fetching.", - stdout_digest - )) - }) - }) - .to_boxed() - } else { - let stdout_raw = Bytes::from(result.stdout_raw.as_slice()); - let stdout_copy = stdout_raw.clone(); - self - .store - .store_file_bytes(stdout_raw, true) - .map_err(move |error| { - ExecutionError::Fatal(format!("Error storing raw stdout: {:?}", error)) - }) - .map(|_| stdout_copy) - .to_boxed() - } - } + input_execution_request: ExecuteProcessRequest, + serializable_proto_request: BazelProcessExecutionRequest, + ) -> BoxFuture { + let ExecuteProcessRequest { + input_files, + description, + timeout, + .. + } = input_execution_request; + let BazelProcessExecutionRequest { + action, + command, + execute_request, + } = serializable_proto_request; - fn extract_stderr( - &self, - result: &bazel_protos::build::bazel::remote::execution::v2::ActionResult, - ) -> BoxFuture { - if let Some(ref stderr_digest) = result.stderr_digest { - let stderr_digest_result: Result = stderr_digest.into(); - let stderr_digest = try_future!(stderr_digest_result - .map_err(|err| ExecutionError::Fatal(format!("Error extracting stderr: {}", err)))); - self - .store - .load_file_bytes_with(stderr_digest, |v| v) - .map_err(move |error| { - ExecutionError::Fatal(format!( - "Error fetching stderr digest ({:?}): {:?}", - stderr_digest, error - )) - }) - .and_then(move |maybe_value| { - maybe_value.ok_or_else(|| { - ExecutionError::Fatal(format!( - "Couldn't find stderr digest ({:?}), when fetching.", - stderr_digest - )) - }) - }) - .to_boxed() - } else { - let stderr_raw = Bytes::from(result.stderr_raw.as_slice()); - let stderr_copy = stderr_raw.clone(); - self - .store - .store_file_bytes(stderr_raw, true) - .map_err(move |error| { - ExecutionError::Fatal(format!("Error storing raw stderr: {:?}", error)) - }) - .map(|_| stderr_copy) - .to_boxed() - } - } + let command_runner = self.clone(); + let command_runner2 = self.clone(); + let execute_request2 = execute_request.clone(); + let description2 = description.clone(); + let futures_timer_thread = self.futures_timer_thread.clone(); - fn extract_output_files( - &self, - result: &bazel_protos::build::bazel::remote::execution::v2::ActionResult, - ) -> BoxFuture { - // Get Digests of output Directories. - // Then we'll make a Directory for the output files, and merge them. - let output_directories = result.output_directories.clone(); - let mut directory_digests = Vec::with_capacity(output_directories.len() + 1); - for dir in output_directories { - let digest_result: Result = (&dir.tree_digest.unwrap()).into(); - let mut digest = future::done(digest_result).to_boxed(); - for component in dir.path.rsplit('/') { - let component = component.to_owned(); - let store = self.store.clone(); - digest = digest - .and_then(move |digest| { - let mut directory = bazel_protos::remote_execution::Directory::new(); - directory.mut_directories().push({ - let mut node = bazel_protos::remote_execution::DirectoryNode::new(); - node.set_name(component); - node.set_digest((&digest).into()); - node - }); - store.record_directory(&directory, true) - }) - .to_boxed(); - } - directory_digests.push(digest.map_err(|err| { - ExecutionError::Fatal(format!("Error saving remote output directory: {}", err)) - })); - } + let store = self.store.clone(); + let store2 = self.store.clone(); + let clients = self.clients.clone(); - // Make a directory for the files - let mut path_map = HashMap::new(); - let output_files = result.output_files.clone(); - let path_stats_result: Result, String> = output_files - .into_iter() - .map(|output_file| { - let output_file_path_buf = PathBuf::from(output_file.path); - let digest = output_file - .digest - .ok_or_else(|| "No digest on remote execution output file".to_string())?; - let digest: Result = (&digest).into(); - path_map.insert(output_file_path_buf.clone(), digest?); - Ok(PathStat::file( - output_file_path_buf.clone(), - File { - path: output_file_path_buf, - is_executable: output_file.is_executable, + let mut history = ExecutionHistory::default(); + + self + .store_proto_locally(&command) + .join(self.store_proto_locally(&action)) + .and_then(move |(command_digest, action_digest)| { + store.ensure_remote_has_recursive(vec![command_digest, action_digest, input_files]) + }) + .and_then(move |summary| { + history.current_attempt += summary; + trace!( + "Executing remotely request: {:?} (command: {:?})", + execute_request, + command + ); + command_runner + .oneshot_execute(execute_request) + .join(future::ok(history)) + }) + .and_then(move |(operation, history)| { + let start_time = Instant::now(); + + future::loop_fn( + (history, operation, 0), + move |(mut history, operation, iter_num)| { + let description = description2.clone(); + + let execute_request2 = execute_request2.clone(); + let store = store2.clone(); + let clients = clients.clone(); + let command_runner2 = command_runner2.clone(); + let command_runner3 = command_runner2.clone(); + let futures_timer_thread = futures_timer_thread.clone(); + let f = command_runner2.extract_execute_response(operation, &mut history); + f.map(future::Loop::Break).or_else(move |value| { + match value { + ExecutionError::Fatal(err) => future::err(err).to_boxed(), + ExecutionError::MissingDigests(missing_digests) => { + let ExecutionHistory { + mut attempts, + current_attempt, + } = history; + + trace!( + "Server reported missing digests ({:?}); trying to upload: {:?}", + current_attempt, + missing_digests, + ); + + attempts.push(current_attempt); + let history = ExecutionHistory { + attempts, + current_attempt: ExecutionStats::default(), + }; + + let execute_request = execute_request2.clone(); + store + .ensure_remote_has_recursive(missing_digests) + .and_then(move |summary| { + let mut history = history; + history.current_attempt += summary; + command_runner2 + .oneshot_execute(execute_request) + .join(future::ok(history)) + }) + // Reset `iter_num` on `MissingDigests` + .map(|(operation, history)| future::Loop::Continue((history, operation, 0))) + .to_boxed() + } + ExecutionError::NotFinished(operation_name) => { + let operation_name2 = operation_name.clone(); + let operation_request = bazel_protos::google::longrunning::GetOperationRequest { + name: operation_name.clone(), + }; + + let backoff_period = min( + CommandRunner::BACKOFF_MAX_WAIT_MILLIS, + (1 + iter_num) * CommandRunner::BACKOFF_INCR_WAIT_MILLIS, + ); + + // take the grpc result and cancel the op if too much time has passed. + let elapsed = start_time.elapsed(); + + if elapsed > timeout { + future::err(format!( + "Exceeded time out of {:?} with {:?} for operation {}, {}", + timeout, elapsed, operation_name, description + )) + .to_boxed() + } else { + // maybe the delay here should be the min of remaining time and the backoff period + Delay::new_handle( + Instant::now() + Duration::from_millis(backoff_period), + futures_timer_thread.with(|thread| thread.handle()), + ) + .map_err(move |e| { + format!( + "Future-Delay errored at operation result polling for {}, {}: {}", + operation_name, description, e + ) + }) + .and_then(move |_| { + clients + .map_err(|err| format!("{}", err)) + .and_then(move |clients| { + clients + .operations_client + .lock() + .get_operation(command_runner3.make_request(operation_request)) + .map(|r| r.into_inner()) + .or_else(move |err| rpcerror_recover_cancelled(operation_name2, err)) + .map_err(towergrpcerror_to_string) + }) + .map(OperationOrStatus::Operation) + .map(move |operation| { + future::Loop::Continue((history, operation, iter_num + 1)) + }) + .to_boxed() + }) + .to_boxed() + } + } + } + }) }, - )) + ) }) - .collect(); - - let path_stats = try_future!(path_stats_result.map_err(ExecutionError::Fatal)); - - #[derive(Clone)] - struct StoreOneOffRemoteDigest { - map_of_paths_to_digests: HashMap, - } - - impl StoreOneOffRemoteDigest { - fn new(map: HashMap) -> StoreOneOffRemoteDigest { - StoreOneOffRemoteDigest { - map_of_paths_to_digests: map, + .map(move |resp| { + let mut attempts = String::new(); + for (i, attempt) in resp.execution_attempts.iter().enumerate() { + attempts += &format!("\nAttempt {}: {:?}", i, attempt); } - } - } - - impl fs::StoreFileByDigest for StoreOneOffRemoteDigest { - fn store_by_digest(&self, file: File) -> BoxFuture { - match self.map_of_paths_to_digests.get(&file.path) { - Some(digest) => future::ok(*digest), - None => future::err(format!( - "Didn't know digest for path in remote execution response: {:?}", - file.path - )), - } - .to_boxed() - } - } - - let store = self.store.clone(); - fs::Snapshot::digest_from_path_stats( - self.store.clone(), - &StoreOneOffRemoteDigest::new(path_map), - &path_stats, - ) - .map_err(move |error| { - ExecutionError::Fatal(format!( - "Error when storing the output file directory info in the remote CAS: {:?}", - error - )) - }) - .join(future::join_all(directory_digests)) - .and_then(|(files_digest, mut directory_digests)| { - directory_digests.push(files_digest); - fs::Snapshot::merge_directories(store, directory_digests).map_err(|err| { - ExecutionError::Fatal(format!( - "Error when merging output files and directories: {}", - err - )) + debug!( + "Finished remote exceution of {} after {} attempts: Stats: {}", + description, + resp.execution_attempts.len(), + attempts + ); + resp }) - }) - .to_boxed() - } -} - -fn make_execute_request( - req: &ExecuteProcessRequest, - instance_name: &Option, - cache_key_gen_version: &Option, -) -> Result< - ( - bazel_protos::remote_execution::Action, - bazel_protos::remote_execution::Command, - bazel_protos::build::bazel::remote::execution::v2::ExecuteRequest, - ), - String, -> { - let mut command = bazel_protos::remote_execution::Command::new(); - command.set_arguments(protobuf::RepeatedField::from_vec(req.argv.clone())); - for (ref name, ref value) in &req.env { - if name.as_str() == CACHE_KEY_GEN_VERSION_ENV_VAR_NAME { - return Err(format!( - "Cannot set env var with name {} as that is reserved for internal use by pants", - CACHE_KEY_GEN_VERSION_ENV_VAR_NAME - )); - } - let mut env = bazel_protos::remote_execution::Command_EnvironmentVariable::new(); - env.set_name(name.to_string()); - env.set_value(value.to_string()); - command.mut_environment_variables().push(env); - } - if let Some(cache_key_gen_version) = cache_key_gen_version { - let mut env = bazel_protos::remote_execution::Command_EnvironmentVariable::new(); - env.set_name(CACHE_KEY_GEN_VERSION_ENV_VAR_NAME.to_string()); - env.set_value(cache_key_gen_version.to_string()); - command.mut_environment_variables().push(env); - } - let mut output_files = req - .output_files - .iter() - .map(|p| { - p.to_str() - .map(|s| s.to_owned()) - .ok_or_else(|| format!("Non-UTF8 output file path: {:?}", p)) - }) - .collect::, String>>()?; - output_files.sort(); - command.set_output_files(protobuf::RepeatedField::from_vec(output_files)); - - let mut output_directories = req - .output_directories - .iter() - .map(|p| { - p.to_str() - .map(|s| s.to_owned()) - .ok_or_else(|| format!("Non-UTF8 output directory path: {:?}", p)) - }) - .collect::, String>>()?; - output_directories.sort(); - command.set_output_directories(protobuf::RepeatedField::from_vec(output_directories)); - - // Ideally, the JDK would be brought along as part of the input directory, but we don't currently - // have support for that. The platform with which we're experimenting for remote execution - // supports this property, and will symlink .jdk to a system-installed JDK: - // https://github.com/twitter/scoot/pull/391 - if req.jdk_home.is_some() { - command.set_platform({ - let mut platform = bazel_protos::remote_execution::Platform::new(); - platform.mut_properties().push({ - let mut property = bazel_protos::remote_execution::Platform_Property::new(); - property.set_name("JDK_SYMLINK".to_owned()); - property.set_value(".jdk".to_owned()); - property - }); - platform - }); + .to_boxed() } - - let mut action = bazel_protos::remote_execution::Action::new(); - action.set_command_digest((&digest(&command)?).into()); - action.set_input_root_digest((&req.input_files).into()); - - let execute_request = bazel_protos::build::bazel::remote::execution::v2::ExecuteRequest { - action_digest: Some((&digest(&action)?).into()), - skip_cache_lookup: false, - instance_name: instance_name.clone().unwrap_or_default(), - execution_policy: None, - results_cache_policy: None, - }; - - Ok((action, command, execute_request)) } fn format_error(error: &bazel_protos::google::rpc::Status) -> String { @@ -926,18 +711,6 @@ fn towergrpcerror_to_string(error: tower_grpc::Error) -> } } -fn digest(message: &dyn GrpcioMessage) -> Result { - let bytes = message.write_to_bytes().map_err(|e| format!("{:?}", e))?; - - let mut hasher = Sha256::default(); - hasher.input(&bytes); - - Ok(Digest( - Fingerprint::from_bytes_unsafe(&hasher.fixed_result()), - bytes.len(), - )) -} - fn timespec_from(timestamp: &Option) -> time::Timespec { if let Some(timestamp) = timestamp { time::Timespec::new(timestamp.seconds, timestamp.nanos) @@ -963,9 +736,10 @@ mod tests { use super::super::CommandRunner as CommandRunnerTrait; use super::{ - CommandRunner, ExecuteProcessRequest, ExecutionError, ExecutionHistory, - FallibleExecuteProcessResult, + ActionSerializer, BazelProcessExecutionRequest, CacheableExecuteProcessRequest, CommandRunner, + ExecuteProcessRequest, ExecutionError, ExecutionHistory, FallibleExecuteProcessResult, }; + use crate::cached_execution::OutputDirWrapping; use mock::execution_server::MockOperation; use std::collections::{BTreeMap, BTreeSet}; use std::iter::{self, FromIterator}; @@ -987,7 +761,7 @@ mod tests { } #[test] - fn make_execute_request() { + fn test_make_execute_request() { let input_directory = TestDirectory::containing_roland(); let req = ExecuteProcessRequest { argv: owned_string_vec(&["/bin/echo", "yo"]), @@ -1009,55 +783,49 @@ mod tests { jdk_home: None, }; - let mut want_command = bazel_protos::remote_execution::Command::new(); - want_command.mut_arguments().push("/bin/echo".to_owned()); - want_command.mut_arguments().push("yo".to_owned()); - want_command.mut_environment_variables().push({ - let mut env = bazel_protos::remote_execution::Command_EnvironmentVariable::new(); - env.set_name("SOME".to_owned()); - env.set_value("value".to_owned()); - env - }); - want_command - .mut_output_files() - .push("other/file".to_owned()); - want_command - .mut_output_files() - .push("path/to/file".to_owned()); - want_command - .mut_output_directories() - .push("directory/name".to_owned()); - - let mut want_action = bazel_protos::remote_execution::Action::new(); - want_action.set_command_digest( - (&Digest( + let want_command = bazel_protos::build::bazel::remote::execution::v2::Command { + arguments: owned_string_vec(&["/bin/echo", "yo"]), + environment_variables: vec![ + bazel_protos::build::bazel::remote::execution::v2::command::EnvironmentVariable { + name: "SOME".to_owned(), + value: "value".to_owned(), + }, + ], + output_files: owned_string_vec(&["other/file", "path/to/file"]), + output_directories: owned_string_vec(&["directory/name"]), + ..Default::default() + }; + + let want_action = bazel_protos::build::bazel::remote::execution::v2::Action { + command_digest: Some(ActionSerializer::convert_digest(&Digest( Fingerprint::from_hex_string( "cc4ddd3085aaffbe0abce22f53b30edbb59896bb4a4f0d76219e48070cd0afe1", ) .unwrap(), 72, - )) - .into(), - ); - want_action.set_input_root_digest((&input_directory.digest()).into()); + ))), + input_root_digest: Some(ActionSerializer::convert_digest(&input_directory.digest())), + ..Default::default() + }; let want_execute_request = bazel_protos::build::bazel::remote::execution::v2::ExecuteRequest { - action_digest: Some( - (&Digest( - Fingerprint::from_hex_string( - "844c929423444f3392e0dcc89ebf1febbfdf3a2e2fcab7567cc474705a5385e4", - ) - .unwrap(), - 140, - )) - .into(), - ), + action_digest: Some(ActionSerializer::convert_digest(&Digest( + Fingerprint::from_hex_string( + "844c929423444f3392e0dcc89ebf1febbfdf3a2e2fcab7567cc474705a5385e4", + ) + .unwrap(), + 140, + ))), ..Default::default() }; assert_eq!( - super::make_execute_request(&req, &None, &None), - Ok((want_action, want_command, want_execute_request)) + make_execute_request(&req, &None, &None), + Ok(BazelProcessExecutionRequest::new( + want_action, + want_command, + want_execute_request + )), ); } @@ -1084,69 +852,50 @@ mod tests { jdk_home: None, }; - let mut want_command = bazel_protos::remote_execution::Command::new(); - want_command.mut_arguments().push("/bin/echo".to_owned()); - want_command.mut_arguments().push("yo".to_owned()); - want_command.mut_environment_variables().push({ - let mut env = bazel_protos::remote_execution::Command_EnvironmentVariable::new(); - env.set_name("SOME".to_owned()); - env.set_value("value".to_owned()); - env - }); - want_command - .mut_output_files() - .push("other/file".to_owned()); - want_command - .mut_output_files() - .push("path/to/file".to_owned()); - want_command - .mut_output_directories() - .push("directory/name".to_owned()); - - let mut want_action = bazel_protos::remote_execution::Action::new(); - want_action.set_command_digest( - (&Digest( + let want_command = bazel_protos::build::bazel::remote::execution::v2::Command { + arguments: owned_string_vec(&["/bin/echo", "yo"]), + environment_variables: vec![ + bazel_protos::build::bazel::remote::execution::v2::command::EnvironmentVariable { + name: "SOME".to_owned(), + value: "value".to_owned(), + }, + ], + output_files: owned_string_vec(&["other/file", "path/to/file"]), + output_directories: owned_string_vec(&["directory/name"]), + ..Default::default() + }; + + let want_action = bazel_protos::build::bazel::remote::execution::v2::Action { + command_digest: Some(ActionSerializer::convert_digest(&Digest( Fingerprint::from_hex_string( "cc4ddd3085aaffbe0abce22f53b30edbb59896bb4a4f0d76219e48070cd0afe1", ) .unwrap(), 72, - )) - .into(), - ); - want_action.set_input_root_digest((&input_directory.digest()).into()); + ))), + input_root_digest: Some(ActionSerializer::convert_digest(&input_directory.digest())), + ..Default::default() + }; - let mut want_execute_request = bazel_protos::remote_execution::ExecuteRequest::new(); - want_execute_request.set_instance_name("dark-tower".to_owned()); - want_execute_request.set_action_digest( - (&Digest( + let want_execute_request = bazel_protos::build::bazel::remote::execution::v2::ExecuteRequest { + action_digest: Some(ActionSerializer::convert_digest(&Digest( Fingerprint::from_hex_string( "844c929423444f3392e0dcc89ebf1febbfdf3a2e2fcab7567cc474705a5385e4", ) .unwrap(), 140, - )) - .into(), - ); - - let want_execute_request = bazel_protos::build::bazel::remote::execution::v2::ExecuteRequest { - action_digest: Some( - (&Digest( - Fingerprint::from_hex_string( - "844c929423444f3392e0dcc89ebf1febbfdf3a2e2fcab7567cc474705a5385e4", - ) - .unwrap(), - 140, - )) - .into(), - ), + ))), instance_name: "dark-tower".to_owned(), ..Default::default() }; assert_eq!( - super::make_execute_request(&req, &Some("dark-tower".to_owned()), &None), - Ok((want_action, want_command, want_execute_request)) + make_execute_request(&req, &Some("dark-tower".to_owned()), &None), + Ok(BazelProcessExecutionRequest::new( + want_action, + want_command, + want_execute_request + )), ); } @@ -1173,61 +922,53 @@ mod tests { jdk_home: None, }; - let mut want_command = bazel_protos::remote_execution::Command::new(); - want_command.mut_arguments().push("/bin/echo".to_owned()); - want_command.mut_arguments().push("yo".to_owned()); - want_command.mut_environment_variables().push({ - let mut env = bazel_protos::remote_execution::Command_EnvironmentVariable::new(); - env.set_name("SOME".to_owned()); - env.set_value("value".to_owned()); - env - }); - want_command.mut_environment_variables().push({ - let mut env = bazel_protos::remote_execution::Command_EnvironmentVariable::new(); - env.set_name(super::CACHE_KEY_GEN_VERSION_ENV_VAR_NAME.to_owned()); - env.set_value("meep".to_owned()); - env - }); - want_command - .mut_output_files() - .push("other/file".to_owned()); - want_command - .mut_output_files() - .push("path/to/file".to_owned()); - want_command - .mut_output_directories() - .push("directory/name".to_owned()); - - let mut want_action = bazel_protos::remote_execution::Action::new(); - want_action.set_command_digest( - (&Digest( + let want_command = bazel_protos::build::bazel::remote::execution::v2::Command { + arguments: owned_string_vec(&["/bin/echo", "yo"]), + environment_variables: vec![ + bazel_protos::build::bazel::remote::execution::v2::command::EnvironmentVariable { + name: "SOME".to_owned(), + value: "value".to_owned(), + }, + bazel_protos::build::bazel::remote::execution::v2::command::EnvironmentVariable { + name: crate::cached_execution::CACHE_KEY_GEN_VERSION_ENV_VAR_NAME.to_owned(), + value: "meep".to_owned(), + }, + ], + output_files: owned_string_vec(&["other/file", "path/to/file"]), + output_directories: owned_string_vec(&["directory/name"]), + ..Default::default() + }; + + let want_action = bazel_protos::build::bazel::remote::execution::v2::Action { + command_digest: Some(ActionSerializer::convert_digest(&Digest( Fingerprint::from_hex_string( "1a95e3482dd235593df73dc12b808ec7d922733a40d97d8233c1a32c8610a56d", ) .unwrap(), 109, - )) - .into(), - ); - want_action.set_input_root_digest((&input_directory.digest()).into()); + ))), + input_root_digest: Some(ActionSerializer::convert_digest(&input_directory.digest())), + ..Default::default() + }; let want_execute_request = bazel_protos::build::bazel::remote::execution::v2::ExecuteRequest { - action_digest: Some( - (&Digest( - Fingerprint::from_hex_string( - "0ee5d4c8ac12513a87c8d949c6883ac533a264d30215126af71a9028c4ab6edf", - ) - .unwrap(), - 140, - )) - .into(), - ), + action_digest: Some(ActionSerializer::convert_digest(&Digest( + Fingerprint::from_hex_string( + "0ee5d4c8ac12513a87c8d949c6883ac533a264d30215126af71a9028c4ab6edf", + ) + .unwrap(), + 140, + ))), ..Default::default() }; assert_eq!( - super::make_execute_request(&req, &None, &Some("meep".to_owned())), - Ok((want_action, want_command, want_execute_request)) + make_execute_request(&req, &None, &Some("meep".to_owned())), + Ok(BazelProcessExecutionRequest::new( + want_action, + want_command, + want_execute_request + )), ); } @@ -1245,46 +986,51 @@ mod tests { jdk_home: Some(PathBuf::from("/tmp")), }; - let mut want_command = bazel_protos::remote_execution::Command::new(); - want_command.mut_arguments().push("/bin/echo".to_owned()); - want_command.mut_arguments().push("yo".to_owned()); - want_command.mut_platform().mut_properties().push({ - let mut property = bazel_protos::remote_execution::Platform_Property::new(); - property.set_name("JDK_SYMLINK".to_owned()); - property.set_value(".jdk".to_owned()); - property - }); + let want_command = bazel_protos::build::bazel::remote::execution::v2::Command { + arguments: owned_string_vec(&["/bin/echo", "yo"]), + platform: Some( + bazel_protos::build::bazel::remote::execution::v2::Platform { + properties: vec![ + bazel_protos::build::bazel::remote::execution::v2::platform::Property { + name: "JDK_SYMLINK".to_owned(), + value: ".jdk".to_owned(), + }, + ], + }, + ), + ..Default::default() + }; - let mut want_action = bazel_protos::remote_execution::Action::new(); - want_action.set_command_digest( - (&Digest( + let want_action = bazel_protos::build::bazel::remote::execution::v2::Action { + command_digest: Some(ActionSerializer::convert_digest(&Digest( Fingerprint::from_hex_string( "f373f421b328ddeedfba63542845c0423d7730f428dd8e916ec6a38243c98448", ) .unwrap(), 38, - )) - .into(), - ); - want_action.set_input_root_digest((&input_directory.digest()).into()); + ))), + input_root_digest: Some(ActionSerializer::convert_digest(&input_directory.digest())), + ..Default::default() + }; let want_execute_request = bazel_protos::build::bazel::remote::execution::v2::ExecuteRequest { - action_digest: Some( - (&Digest( - Fingerprint::from_hex_string( - "b1fb7179ce496995a4e3636544ec000dca1b951f1f6216493f6c7608dc4dd910", - ) - .unwrap(), - 140, - )) - .into(), - ), + action_digest: Some(ActionSerializer::convert_digest(&Digest( + Fingerprint::from_hex_string( + "b1fb7179ce496995a4e3636544ec000dca1b951f1f6216493f6c7608dc4dd910", + ) + .unwrap(), + 140, + ))), ..Default::default() }; assert_eq!( - super::make_execute_request(&req, &None, &None), - Ok((want_action, want_command, want_execute_request)) + make_execute_request(&req, &None, &None), + Ok(BazelProcessExecutionRequest::new( + want_action, + want_command, + want_execute_request + )), ); } @@ -1295,7 +1041,7 @@ mod tests { let mock_server = { mock::execution_server::TestServer::new(mock::execution_server::MockExecution::new( "wrong-command".to_string(), - super::make_execute_request( + make_execute_request( &ExecuteProcessRequest { argv: owned_string_vec(&["/bin/echo", "-n", "bar"]), env: BTreeMap::new(), @@ -1310,7 +1056,7 @@ mod tests { &None, ) .unwrap() - .2, + .execute_request, vec![], )) }; @@ -1331,9 +1077,9 @@ mod tests { mock::execution_server::TestServer::new(mock::execution_server::MockExecution::new( op_name.clone(), - super::make_execute_request(&execute_request, &None, &None) + make_execute_request(&execute_request, &None, &None) .unwrap() - .2, + .execute_request, vec![ make_incomplete_operation(&op_name), make_successful_operation( @@ -1349,19 +1095,18 @@ mod tests { let result = run_command_remote(mock_server.address(), execute_request).unwrap(); assert_eq!( - result.without_execution_attempts(), - FallibleExecuteProcessResult { + result.into_cacheable(), + CacheableExecuteProcessResult { stdout: as_bytes("foo"), stderr: as_bytes(""), exit_code: 0, output_directory: fs::EMPTY_DIGEST, - execution_attempts: vec![], } ); } #[test] - fn extract_response_with_digest_stdout() { + fn extract_action_result_with_digest_stdout() { let op_name = "gimme-foo".to_string(); let testdata = TestData::roland(); let testdata_empty = TestData::empty(); @@ -1378,8 +1123,8 @@ mod tests { .unwrap() ) .unwrap() - .without_execution_attempts(), - FallibleExecuteProcessResult { + .into_cacheable(), + CacheableExecuteProcessResult { stdout: testdata.bytes(), stderr: testdata_empty.bytes(), exit_code: 0, @@ -1390,7 +1135,7 @@ mod tests { } #[test] - fn extract_response_with_digest_stderr() { + fn extract_action_result_with_digest_stderr() { let op_name = "gimme-foo".to_string(); let testdata = TestData::roland(); let testdata_empty = TestData::empty(); @@ -1407,8 +1152,8 @@ mod tests { .unwrap() ) .unwrap() - .without_execution_attempts(), - FallibleExecuteProcessResult { + .into_cacheable(), + CacheableExecuteProcessResult { stdout: testdata_empty.bytes(), stderr: testdata.bytes(), exit_code: 0, @@ -1428,9 +1173,9 @@ mod tests { mock::execution_server::TestServer::new(mock::execution_server::MockExecution::new( op_name.clone(), - super::make_execute_request(&echo_roland_request(), &None, &None) + make_execute_request(&echo_roland_request(), &None, &None) .unwrap() - .2, + .execute_request, vec![make_successful_operation( &op_name.clone(), StdoutType::Raw(test_stdout.string()), @@ -1475,8 +1220,8 @@ mod tests { let result = rt.block_on(cmd_runner.run(echo_roland_request())).unwrap(); rt.shutdown_now().wait().unwrap(); assert_eq!( - result.without_execution_attempts(), - FallibleExecuteProcessResult { + result.into_cacheable(), + CacheableExecuteProcessResult { stdout: test_stdout.bytes(), stderr: test_stderr.bytes(), exit_code: 0, @@ -1517,9 +1262,9 @@ mod tests { mock::execution_server::TestServer::new(mock::execution_server::MockExecution::new( op_name.clone(), - super::make_execute_request(&execute_request, &None, &None) + make_execute_request(&execute_request, &None, &None) .unwrap() - .2, + .execute_request, Vec::from_iter( iter::repeat(make_incomplete_operation(&op_name)) .take(4) @@ -1536,8 +1281,8 @@ mod tests { let result = run_command_remote(mock_server.address(), execute_request).unwrap(); assert_eq!( - result.without_execution_attempts(), - FallibleExecuteProcessResult { + result.into_cacheable(), + CacheableExecuteProcessResult { stdout: as_bytes("foo"), stderr: as_bytes(""), exit_code: 0, @@ -1568,9 +1313,9 @@ mod tests { mock::execution_server::TestServer::new(mock::execution_server::MockExecution::new( op_name.clone(), - super::make_execute_request(&execute_request, &None, &None) + make_execute_request(&execute_request, &None, &None) .unwrap() - .2, + .execute_request, vec![ make_incomplete_operation(&op_name), make_delayed_incomplete_operation(&op_name, delayed_operation_time), @@ -1593,9 +1338,9 @@ mod tests { mock::execution_server::TestServer::new(mock::execution_server::MockExecution::new( op_name.clone(), - super::make_execute_request(&execute_request, &None, &None) + make_execute_request(&execute_request, &None, &None) .unwrap() - .2, + .execute_request, vec![ make_incomplete_operation(&op_name), make_canceled_operation(Some(Duration::from_millis(100))), @@ -1612,8 +1357,8 @@ mod tests { let result = run_command_remote(mock_server.address(), execute_request).unwrap(); assert_eq!( - result.without_execution_attempts(), - FallibleExecuteProcessResult { + result.into_cacheable(), + CacheableExecuteProcessResult { stdout: as_bytes("foo"), stderr: as_bytes(""), exit_code: 0, @@ -1632,9 +1377,9 @@ mod tests { mock::execution_server::TestServer::new(mock::execution_server::MockExecution::new( op_name.clone(), - super::make_execute_request(&execute_request, &None, &None) + make_execute_request(&execute_request, &None, &None) .unwrap() - .2, + .execute_request, vec![ make_incomplete_operation(&op_name), MockOperation::new({ @@ -1666,9 +1411,9 @@ mod tests { mock::execution_server::TestServer::new(mock::execution_server::MockExecution::new( op_name.clone(), - super::make_execute_request(&execute_request, &None, &None) + make_execute_request(&execute_request, &None, &None) .unwrap() - .2, + .execute_request, vec![MockOperation::new( bazel_protos::google::longrunning::Operation { name: op_name.clone(), @@ -1700,9 +1445,9 @@ mod tests { mock::execution_server::TestServer::new(mock::execution_server::MockExecution::new( op_name.clone(), - super::make_execute_request(&execute_request, &None, &None) + make_execute_request(&execute_request, &None, &None) .unwrap() - .2, + .execute_request, vec![ make_incomplete_operation(&op_name), MockOperation::new(bazel_protos::google::longrunning::Operation { @@ -1735,9 +1480,9 @@ mod tests { mock::execution_server::TestServer::new(mock::execution_server::MockExecution::new( op_name.clone(), - super::make_execute_request(&execute_request, &None, &None) + make_execute_request(&execute_request, &None, &None) .unwrap() - .2, + .execute_request, vec![MockOperation::new( bazel_protos::google::longrunning::Operation { name: op_name.clone(), @@ -1763,9 +1508,9 @@ mod tests { mock::execution_server::TestServer::new(mock::execution_server::MockExecution::new( op_name.clone(), - super::make_execute_request(&execute_request, &None, &None) + make_execute_request(&execute_request, &None, &None) .unwrap() - .2, + .execute_request, vec![ make_incomplete_operation(&op_name), MockOperation::new(bazel_protos::google::longrunning::Operation { @@ -1792,9 +1537,9 @@ mod tests { mock::execution_server::TestServer::new(mock::execution_server::MockExecution::new( op_name.clone(), - super::make_execute_request(&cat_roland_request(), &None, &None) + make_execute_request(&cat_roland_request(), &None, &None) .unwrap() - .2, + .execute_request, vec![ make_incomplete_operation(&op_name), make_precondition_failure_operation(vec![missing_preconditionfailure_violation( @@ -1855,8 +1600,8 @@ mod tests { ); rt.shutdown_now().wait().unwrap(); assert_eq!( - result.unwrap().without_execution_attempts(), - FallibleExecuteProcessResult { + result.unwrap().into_cacheable(), + CacheableExecuteProcessResult { stdout: roland.bytes(), stderr: Bytes::from(""), exit_code: 0, @@ -1885,9 +1630,9 @@ mod tests { mock::execution_server::TestServer::new(mock::execution_server::MockExecution::new( op_name.clone(), - super::make_execute_request(&cat_roland_request(), &None, &None) + make_execute_request(&cat_roland_request(), &None, &None) .unwrap() - .2, + .execute_request, vec![ //make_incomplete_operation(&op_name), MockOperation { @@ -1967,9 +1712,9 @@ mod tests { mock::execution_server::TestServer::new(mock::execution_server::MockExecution::new( op_name.clone(), - super::make_execute_request(&cat_roland_request(), &None, &None) + make_execute_request(&cat_roland_request(), &None, &None) .unwrap() - .2, + .execute_request, // We won't get as far as trying to run the operation, so don't expect any requests whose // responses we would need to stub. vec![], @@ -2044,12 +1789,11 @@ mod tests { #[test] fn extract_execute_response_success() { - let want_result = FallibleExecuteProcessResult { + let want_result = CacheableExecuteProcessResult { stdout: as_bytes("roland"), stderr: Bytes::from("simba"), exit_code: 17, output_directory: TestDirectory::nested().digest(), - execution_attempts: vec![], }; let response = bazel_protos::build::bazel::remote::execution::v2::ExecuteResponse { @@ -2085,7 +1829,7 @@ mod tests { assert_eq!( extract_execute_response(operation) .unwrap() - .without_execution_attempts(), + .into_cacheable(), want_result ); } @@ -2212,21 +1956,24 @@ mod tests { #[test] fn digest_command() { - let mut command = bazel_protos::remote_execution::Command::new(); - command.mut_arguments().push("/bin/echo".to_string()); - command.mut_arguments().push("foo".to_string()); + let env1 = bazel_protos::build::bazel::remote::execution::v2::command::EnvironmentVariable { + name: "A".to_string(), + value: "a".to_string(), + }; - let mut env1 = bazel_protos::remote_execution::Command_EnvironmentVariable::new(); - env1.set_name("A".to_string()); - env1.set_value("a".to_string()); - command.mut_environment_variables().push(env1); + let env2 = bazel_protos::build::bazel::remote::execution::v2::command::EnvironmentVariable { + name: "B".to_string(), + value: "b".to_string(), + }; - let mut env2 = bazel_protos::remote_execution::Command_EnvironmentVariable::new(); - env2.set_name("B".to_string()); - env2.set_value("b".to_string()); - command.mut_environment_variables().push(env2); + let command = bazel_protos::build::bazel::remote::execution::v2::Command { + arguments: owned_string_vec(&["/bin/echo", "foo"]), + environment_variables: vec![env1, env2], + ..Default::default() + }; - let digest = super::digest(&command).unwrap(); + let digest = + fs::Store::digest_bytes(&ActionSerializer::encode_command_proto(&command).unwrap()); assert_eq!( &digest.0.to_hex(), @@ -2244,9 +1991,9 @@ mod tests { let op_name = "gimme-foo".to_string(); mock::execution_server::TestServer::new(mock::execution_server::MockExecution::new( op_name.clone(), - super::make_execute_request(&execute_request, &None, &None) + make_execute_request(&execute_request, &None, &None) .unwrap() - .2, + .execute_request, vec![ make_incomplete_operation(&op_name), make_successful_operation( @@ -2282,9 +2029,9 @@ mod tests { let op_name = "gimme-foo".to_string(); mock::execution_server::TestServer::new(mock::execution_server::MockExecution::new( op_name.clone(), - super::make_execute_request(&execute_request, &None, &None) + make_execute_request(&execute_request, &None, &None) .unwrap() - .2, + .execute_request, vec![ make_incomplete_operation(&op_name), make_incomplete_operation(&op_name), @@ -2589,14 +2336,18 @@ mod tests { result } - fn create_command_runner(address: String, cas: &mock::StubCAS) -> CommandRunner { + fn create_command_runner_with_store( + address: String, + cas: &mock::StubCAS, + instance_name: Option, + ) -> (fs::Store, CommandRunner) { let store_dir = TempDir::new().unwrap(); let timer_thread = timer_thread(); let store = fs::Store::with_remote( store_dir, Arc::new(fs::ResettablePool::new("test-pool-".to_owned())), &[cas.address()], - None, + instance_name, &None, None, 1, @@ -2608,8 +2359,15 @@ mod tests { ) .expect("Failed to make store"); - CommandRunner::new(&address, None, None, None, store, timer_thread) - .expect("Failed to make command runner") + let command_runner = + CommandRunner::new(&address, None, None, None, store.clone(), timer_thread) + .expect("Failed to make command runner"); + + (store, command_runner) + } + + fn create_command_runner(address: String, cas: &mock::StubCAS) -> CommandRunner { + create_command_runner_with_store(address, cas, None).1 } fn timer_thread() -> resettable::Resettable { @@ -2642,11 +2400,16 @@ mod tests { .directory(&TestDirectory::containing_roland()) .build(); + let (store, _) = create_command_runner_with_store("127.0.0.1:0".to_owned(), &cas, None); + let action_serializer = ActionSerializer::new(store); + let mut runtime = tokio::runtime::Runtime::new().unwrap(); - let command_runner = create_command_runner("127.0.0.1:0".to_owned(), &cas); - let result = runtime.block_on(command_runner.extract_output_files(result)); + let result = + runtime.block_on(action_serializer.extract_action_result(&result, OutputDirWrapping::Direct)); runtime.shutdown_now().wait().unwrap(); result + .map(|res| res.output_directory) + .map_err(|e| ExecutionError::Fatal(e)) } fn make_any_prost_executeresponse( @@ -2711,4 +2474,27 @@ mod tests { jdk_home: None, } } + + fn serializer_convert_execute_request( + req: ExecuteProcessRequest, + instance_name: Option, + cache_key_gen_version: Option, + ) -> Result { + BazelProcessExecutionRequest::convert_execute_request( + &CacheableExecuteProcessRequest::new(req, cache_key_gen_version), + &instance_name, + ) + } + + fn make_execute_request( + req: &ExecuteProcessRequest, + instance_name: &Option, + cache_key_gen_version: &Option, + ) -> Result { + serializer_convert_execute_request( + req.clone(), + instance_name.clone(), + cache_key_gen_version.clone(), + ) + } } diff --git a/src/rust/engine/src/context.rs b/src/rust/engine/src/context.rs index 1fac707784c..6dc120fae69 100644 --- a/src/rust/engine/src/context.rs +++ b/src/rust/engine/src/context.rs @@ -21,11 +21,19 @@ use boxfuture::{BoxFuture, Boxable}; use fs::{self, safe_create_dir_all_ioerror, PosixFS, ResettablePool, Store}; use graph::{EntryId, Graph, NodeContext}; use log::debug; -use process_execution::{self, BoundedCommandRunner, CommandRunner}; +use process_execution::{self, BoundedCommandRunner, CachingCommandRunner, CommandRunner}; use rand::seq::SliceRandom; use reqwest; use resettable::Resettable; +// TODO: better name!!! +#[derive(Clone)] +struct StoreAndCommandRunnerAndHttpClient { + store: Store, + command_runner: Arc>, + http_client: reqwest::r#async::Client, +} + /// /// The core context shared (via Arc) between the Scheduler and the Context objects of /// all running Nodes. @@ -42,8 +50,7 @@ pub struct Core { pub fs_pool: Arc, pub runtime: Resettable>, pub futures_timer_thread: Resettable, - store_and_command_runner_and_http_client: - Resettable<(Store, BoundedCommandRunner, reqwest::r#async::Client)>, + store_and_command_runner_and_http_client: Resettable, pub vfs: PosixFS, pub build_root: PathBuf, } @@ -57,6 +64,8 @@ impl Core { ignore_patterns: &[String], work_dir: PathBuf, local_store_dir: PathBuf, + // TODO: REALLY give this a better type than Option!!! + local_execution_process_cache_namespace: Option, remote_store_servers: Vec, remote_execution_server: Option, remote_execution_process_cache_namespace: Option, @@ -129,6 +138,7 @@ impl Core { }) .unwrap_or_else(|e| panic!("Could not initialize Store: {:?}", e)); + // NOTE: This is where the process execution mechanism is selected! let underlying_command_runner: Box = match &remote_execution_server { Some(ref address) => Box::new( process_execution::remote::CommandRunner::new( @@ -149,12 +159,24 @@ impl Core { )) as Box, }; - let command_runner = + let bounded_command_runner = BoundedCommandRunner::new(underlying_command_runner, process_execution_parallelism); + let caching_command_runner = CachingCommandRunner::from_store( + Box::new(bounded_command_runner) as Box, + store.clone(), + local_execution_process_cache_namespace.clone(), + ); + + let command_runner = Arc::new(Box::new(caching_command_runner) as Box); + let http_client = reqwest::r#async::Client::new(); - (store, command_runner, http_client) + StoreAndCommandRunnerAndHttpClient { + store, + command_runner, + http_client, + } }); let rule_graph = RuleGraph::new(&tasks, root_subject_types); @@ -211,15 +233,21 @@ impl Core { } pub fn store(&self) -> Store { - self.store_and_command_runner_and_http_client.get().0 + self.store_and_command_runner_and_http_client.get().store } - pub fn command_runner(&self) -> BoundedCommandRunner { - self.store_and_command_runner_and_http_client.get().1 + pub fn command_runner(&self) -> Arc> { + self + .store_and_command_runner_and_http_client + .get() + .command_runner } pub fn http_client(&self) -> reqwest::r#async::Client { - self.store_and_command_runner_and_http_client.get().2 + self + .store_and_command_runner_and_http_client + .get() + .http_client } } diff --git a/src/rust/engine/src/lib.rs b/src/rust/engine/src/lib.rs index a2074ee81a4..1100db19e28 100644 --- a/src/rust/engine/src/lib.rs +++ b/src/rust/engine/src/lib.rs @@ -198,6 +198,8 @@ pub extern "C" fn scheduler_create( build_root_buf: Buffer, work_dir_buf: Buffer, local_store_dir_buf: Buffer, + // TODO: need some testing modifying this variable! + local_execution_process_cache_namespace: Buffer, ignore_patterns_buf: BufferBuffer, root_type_ids: TypeIdBuffer, remote_store_servers_buf: BufferBuffer, @@ -245,6 +247,9 @@ pub extern "C" fn scheduler_create( }; let mut tasks = with_tasks(tasks_ptr, |tasks| tasks.clone()); tasks.intrinsics_set(&types); + let local_execution_process_cache_namespace_string = local_execution_process_cache_namespace + .to_string() + .expect("local_execution_process_cache_namespace was not valid UTF8"); // Allocate on the heap via `Box` and return a raw pointer to the boxed value. let remote_store_servers_vec = remote_store_servers_buf .to_strings() @@ -285,6 +290,11 @@ pub extern "C" fn scheduler_create( &ignore_patterns, PathBuf::from(work_dir_buf.to_os_string()), PathBuf::from(local_store_dir_buf.to_os_string()), + if local_execution_process_cache_namespace_string.is_empty() { + None + } else { + Some(local_execution_process_cache_namespace_string) + }, remote_store_servers_vec, if remote_execution_server_string.is_empty() { None diff --git a/src/rust/engine/src/nodes.rs b/src/rust/engine/src/nodes.rs index 3a9587e741e..fe92ed2e6ad 100644 --- a/src/rust/engine/src/nodes.rs +++ b/src/rust/engine/src/nodes.rs @@ -25,7 +25,7 @@ use fs::{ PathGlobs, PathStat, StoreFileByDigest, StrictGlobMatching, VFS, }; use hashing; -use process_execution::{self, CommandRunner}; +use process_execution; use graph::{Entry, Node, NodeError, NodeTracer, NodeVisualizer};