diff --git a/src/python/pants/option/global_options.py b/src/python/pants/option/global_options.py index 245f92cfc47..fb7414484d2 100644 --- a/src/python/pants/option/global_options.py +++ b/src/python/pants/option/global_options.py @@ -513,6 +513,8 @@ class ExecutionOptions: remote_execution_overall_deadline_secs: int remote_execution_rpc_concurrency: int + remote_execution_append_only_caches_base_path: str | None + @classmethod def from_options( cls, @@ -555,6 +557,7 @@ def from_options( remote_execution_headers=dynamic_remote_options.execution_headers, remote_execution_overall_deadline_secs=bootstrap_options.remote_execution_overall_deadline_secs, remote_execution_rpc_concurrency=dynamic_remote_options.execution_rpc_concurrency, + remote_execution_append_only_caches_base_path=bootstrap_options.remote_execution_append_only_caches_base_path, ) @@ -642,6 +645,7 @@ def from_options(cls, options: OptionValueContainer) -> LocalStoreOptions: }, remote_execution_overall_deadline_secs=60 * 60, # one hour remote_execution_rpc_concurrency=128, + remote_execution_append_only_caches_base_path=None, ) DEFAULT_LOCAL_STORE_OPTIONS = LocalStoreOptions() @@ -1549,6 +1553,18 @@ class BootstrapOptions: default=DEFAULT_EXECUTION_OPTIONS.remote_execution_rpc_concurrency, help="The number of concurrent requests allowed to the remote execution service.", ) + remote_execution_append_only_caches_base_path = StrOption( + default=None, + advanced=True, + help=softwrap( + """ + Sets the base path to use when setting up an append-only cache for a process running remotely. + If this option is not set, then append-only caches will not be used with remote execution. + The option should be set to the absolute path of a writable directory in the remote execution + environment where Pants can create append-only caches for use with remotely executing processes. + """ + ), + ) watch_filesystem = BoolOption( default=True, advanced=True, diff --git a/src/rust/engine/process_execution/src/cache.rs b/src/rust/engine/process_execution/src/cache.rs index 1a2adce6193..e32495871bf 100644 --- a/src/rust/engine/process_execution/src/cache.rs +++ b/src/rust/engine/process_execution/src/cache.rs @@ -82,6 +82,7 @@ impl crate::CommandRunner for CommandRunner { None, self.process_cache_namespace.clone(), &self.file_store, + None, ) .await .into(), diff --git a/src/rust/engine/process_execution/src/lib.rs b/src/rust/engine/process_execution/src/lib.rs index 5543e608cdf..4e67cbda56c 100644 --- a/src/rust/engine/process_execution/src/lib.rs +++ b/src/rust/engine/process_execution/src/lib.rs @@ -962,12 +962,19 @@ pub async fn digest( instance_name: Option, process_cache_namespace: Option, store: &Store, + append_only_caches_base_path: Option<&str>, ) -> Digest { let EntireExecuteRequest { execute_request, .. - } = remote::make_execute_request(process, instance_name, process_cache_namespace, store) - .await - .unwrap(); + } = remote::make_execute_request( + process, + instance_name, + process_cache_namespace, + store, + append_only_caches_base_path, + ) + .await + .unwrap(); execute_request.action_digest.unwrap().try_into().unwrap() } diff --git a/src/rust/engine/process_execution/src/nailgun/nailgun_pool.rs b/src/rust/engine/process_execution/src/nailgun/nailgun_pool.rs index 5f624e9c303..3b3f21680d9 100644 --- a/src/rust/engine/process_execution/src/nailgun/nailgun_pool.rs +++ b/src/rust/engine/process_execution/src/nailgun/nailgun_pool.rs @@ -428,7 +428,7 @@ struct NailgunProcessFingerprint { impl NailgunProcessFingerprint { pub async fn new(name: String, nailgun_req: &Process, store: &Store) -> Result { - let nailgun_req_digest = crate::digest(nailgun_req, None, None, store).await; + let nailgun_req_digest = crate::digest(nailgun_req, None, None, store, None).await; Ok(NailgunProcessFingerprint { name, fingerprint: nailgun_req_digest.hash, diff --git a/src/rust/engine/process_execution/src/named_caches.rs b/src/rust/engine/process_execution/src/named_caches.rs index 4e4677619dc..74cbeceddc9 100644 --- a/src/rust/engine/process_execution/src/named_caches.rs +++ b/src/rust/engine/process_execution/src/named_caches.rs @@ -24,6 +24,10 @@ impl CacheName { )) } } + + pub fn name(&self) -> &str { + &self.0 + } } #[derive(Clone)] diff --git a/src/rust/engine/process_execution/src/remote.rs b/src/rust/engine/process_execution/src/remote.rs index ce6c49459f5..0e902be62cb 100644 --- a/src/rust/engine/process_execution/src/remote.rs +++ b/src/rust/engine/process_execution/src/remote.rs @@ -1,9 +1,9 @@ use std::cmp::Ordering; use std::collections::{BTreeMap, HashMap}; use std::convert::TryInto; -use std::fmt::{self, Debug}; +use std::fmt::{self, Debug, Write}; use std::io::Cursor; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::{Duration, Instant, SystemTime}; @@ -45,8 +45,8 @@ use workunit_store::{ }; use crate::{ - Context, FallibleProcessResultWithPlatform, Platform, Process, ProcessCacheScope, ProcessError, - ProcessExecutionStrategy, ProcessResultMetadata, ProcessResultSource, + CacheName, Context, FallibleProcessResultWithPlatform, Platform, Process, ProcessCacheScope, + ProcessError, ProcessExecutionStrategy, ProcessResultMetadata, ProcessResultSource, }; // Environment variable which is exclusively used for cache key invalidation. @@ -106,6 +106,7 @@ pub enum ExecutionError { pub struct CommandRunner { instance_name: Option, process_cache_namespace: Option, + append_only_caches_base_path: Option, store: Store, executor: Executor, execution_client: Arc>, @@ -167,6 +168,7 @@ impl CommandRunner { execution_address: &str, instance_name: Option, process_cache_namespace: Option, + append_only_caches_base_path: Option, root_ca_certs: Option>, headers: BTreeMap, store: Store, @@ -203,6 +205,7 @@ impl CommandRunner { let command_runner = CommandRunner { instance_name, process_cache_namespace, + append_only_caches_base_path, execution_client, operations_client, store, @@ -818,6 +821,10 @@ impl crate::CommandRunner for CommandRunner { self.instance_name.clone(), self.process_cache_namespace.clone(), &self.store, + self + .append_only_caches_base_path + .as_ref() + .map(|s| s.as_ref()), ) .await?; let build_id = context.build_id.clone(); @@ -920,6 +927,62 @@ fn maybe_add_workunit( } } +fn make_wrapper_for_append_only_caches( + caches: &BTreeMap, + base_path: &str, + working_directory: Option<&str>, +) -> Result { + let mut script = String::new(); + writeln!(&mut script, "#!/bin/sh").map_err(|err| format!("write! failed: {err:?}"))?; + + // Setup the append-only caches. + for (cache_name, path) in caches { + writeln!( + &mut script, + "/bin/mkdir -p '{}/{}'", + base_path, + cache_name.name() + ) + .map_err(|err| format!("write! failed: {err:?}"))?; + if let Some(parent) = path.parent() { + writeln!(&mut script, "/bin/mkdir -p '{}'", parent.to_string_lossy()) + .map_err(|err| format!("write! failed: {err}"))?; + } + writeln!( + &mut script, + "/bin/ln -s '{}/{}' '{}'", + base_path, + cache_name.name(), + path.as_path().to_string_lossy() + ) + .map_err(|err| format!("write! failed: {err}"))?; + } + + // Change into any working directory. + // + // Note: When this wrapper script is in effect, Pants will not set the `working_directory` + // field on the `ExecuteRequest` so that this wrapper script can operate in the input root + // first. + if let Some(path) = working_directory { + writeln!( + &mut script, + concat!( + "cd '{0}'\n", + "if [ \"$?\" != 0 ]; then\n", + " echo \"pants-wrapper: Failed to change working directory to: {0}\" 1>&2\n", + " exit 1\n", + "fi\n", + ), + path + ) + .map_err(|err| format!("write! failed: {err}"))?; + } + + // Finally, execute the process. + writeln!(&mut script, "exec \"$@\"").map_err(|err| format!("write! failed: {err:?}"))?; + Ok(script) +} + /// Return type for `make_execute_request`. Contains all of the generated REAPI protobufs for /// a particular `Process`. #[derive(Clone, Debug, PartialEq)] @@ -934,12 +997,47 @@ pub async fn make_execute_request( req: &Process, instance_name: Option, cache_key_gen_version: Option, - _store: &Store, + store: &Store, + append_only_caches_base_path: Option<&str>, ) -> Result { + const WRAPPER_SCRIPT: &str = "./__pants_wrapper__"; + + // Implement append-only caches by running a wrapper script before the actual program + // to be invoked in the remote environment. + let wrapper_script_digest_opt = match (append_only_caches_base_path, &req.append_only_caches) { + (Some(base_path), caches) if !caches.is_empty() => { + let script = make_wrapper_for_append_only_caches( + caches, + base_path, + req.working_directory.as_ref().and_then(|p| p.to_str()), + )?; + let digest = store + .store_file_bytes(Bytes::from(script), false) + .await + .map_err(|err| format!("Failed to store wrapper script for remote execution: {err}"))?; + let path = RelativePath::new(Path::new(WRAPPER_SCRIPT))?; + let snapshot = store.snapshot_of_one_file(path, digest, true).await?; + let directory_digest = DirectoryDigest::new(snapshot.digest, snapshot.tree); + Some(directory_digest) + } + _ => None, + }; + + let arguments = match &wrapper_script_digest_opt { + Some(_) => { + let mut args = Vec::with_capacity(req.argv.len() + 1); + args.push(WRAPPER_SCRIPT.to_string()); + args.extend(req.argv.iter().cloned()); + args + } + None => req.argv.clone(), + }; + let mut command = remexec::Command { - arguments: req.argv.clone(), + arguments, ..remexec::Command::default() }; + for (name, value) in &req.env { if name == CACHE_KEY_GEN_VERSION_ENV_VAR_NAME || name == CACHE_KEY_TARGET_PLATFORM_ENV_VAR_NAME @@ -964,15 +1062,6 @@ pub async fn make_execute_request( _ => vec![], }; - // TODO: Disabling append-only caches in remoting until server support exists due to - // interaction with how servers match platform properties. - // if !req.append_only_caches.is_empty() { - // platform_properties.extend(NamedCaches::platform_properties( - // &req.append_only_caches, - // &cache_key_gen_version, - // )); - // } - if let Some(cache_key_gen_version) = cache_key_gen_version { command .environment_variables @@ -1037,10 +1126,14 @@ pub async fn make_execute_request( command.output_directories = output_directories; if let Some(working_directory) = &req.working_directory { - command.working_directory = working_directory - .to_str() - .map(str::to_owned) - .unwrap_or_else(|| panic!("Non-UTF8 working directory path: {:?}", working_directory)); + // Do not set `working_directory` if a wrapper script is in use because the wrapper script + // will change to the working directory itself. + if wrapper_script_digest_opt.is_none() { + command.working_directory = working_directory + .to_str() + .map(str::to_owned) + .unwrap_or_else(|| panic!("Non-UTF8 working directory path: {:?}", working_directory)); + } } if req.jdk_home.is_some() { @@ -1091,7 +1184,19 @@ pub async fn make_execute_request( .environment_variables .sort_by(|x, y| x.name.cmp(&y.name)); - let input_root_digest = req.input_digests.complete.clone(); + let input_root_digest: DirectoryDigest = match &wrapper_script_digest_opt { + Some(wrapper_digest) => { + let digests = vec![ + req.input_digests.complete.clone(), + wrapper_digest.to_owned(), + ]; + store + .merge(digests) + .await + .map_err(|err| format!("store error: {err}"))? + } + None => req.input_digests.complete.clone(), + }; let mut action = remexec::Action { command_digest: Some((&digest(&command)?).into()), diff --git a/src/rust/engine/process_execution/src/remote_cache.rs b/src/rust/engine/process_execution/src/remote_cache.rs index 05222b61936..7d694a13a7f 100644 --- a/src/rust/engine/process_execution/src/remote_cache.rs +++ b/src/rust/engine/process_execution/src/remote_cache.rs @@ -51,6 +51,7 @@ pub struct CommandRunner { inner: Arc, instance_name: Option, process_cache_namespace: Option, + append_only_caches_base_path: Option, executor: task_executor::Executor, store: Store, action_cache_client: Arc>, @@ -79,6 +80,7 @@ impl CommandRunner { cache_content_behavior: CacheContentBehavior, concurrency_limit: usize, read_timeout: Duration, + append_only_caches_base_path: Option, ) -> Result { let tls_client_config = if action_cache_address.starts_with("https://") { Some(grpc_util::tls::Config::new_without_mtls(root_ca_certs).try_into()?) @@ -103,6 +105,7 @@ impl CommandRunner { inner, instance_name, process_cache_namespace, + append_only_caches_base_path, executor, store, action_cache_client, @@ -475,6 +478,10 @@ impl crate::CommandRunner for CommandRunner { self.instance_name.clone(), self.process_cache_namespace.clone(), &self.store, + self + .append_only_caches_base_path + .as_ref() + .map(|s| s.as_ref()), ) .await?; let failures_cached = request.cache_scope == ProcessCacheScope::Always; diff --git a/src/rust/engine/process_execution/src/remote_cache_tests.rs b/src/rust/engine/process_execution/src/remote_cache_tests.rs index 70216d2cbc1..9c8cc09e548 100644 --- a/src/rust/engine/process_execution/src/remote_cache_tests.rs +++ b/src/rust/engine/process_execution/src/remote_cache_tests.rs @@ -149,6 +149,7 @@ fn create_cached_runner( cache_content_behavior, 256, CACHE_READ_TIMEOUT, + None, ) .expect("caching command runner"), ) @@ -162,7 +163,7 @@ async fn create_process(store_setup: &StoreSetup) -> (Process, Digest) { ]); let EntireExecuteRequest { action, command, .. - } = make_execute_request(&process, None, None, &store_setup.store) + } = make_execute_request(&process, None, None, &store_setup.store, None) .await .unwrap(); let (_command_digest, action_digest) = @@ -733,6 +734,7 @@ async fn make_action_result_basic() { CacheContentBehavior::Defer, 256, CACHE_READ_TIMEOUT, + None, ) .expect("caching command runner"); diff --git a/src/rust/engine/process_execution/src/remote_tests.rs b/src/rust/engine/process_execution/src/remote_tests.rs index bebd94af962..b1e66699fa6 100644 --- a/src/rust/engine/process_execution/src/remote_tests.rs +++ b/src/rust/engine/process_execution/src/remote_tests.rs @@ -24,10 +24,10 @@ use workunit_store::{RunId, WorkunitStore}; use crate::remote::{CommandRunner, EntireExecuteRequest, ExecutionError, OperationOrStatus}; use crate::{ - CommandRunner as CommandRunnerTrait, Context, FallibleProcessResultWithPlatform, InputDigests, - Platform, Process, ProcessCacheScope, ProcessError, ProcessExecutionStrategy, + CacheName, CommandRunner as CommandRunnerTrait, Context, FallibleProcessResultWithPlatform, + InputDigests, Platform, Process, ProcessCacheScope, ProcessError, ProcessExecutionStrategy, }; -use fs::{RelativePath, EMPTY_DIRECTORY_DIGEST}; +use fs::{DirectoryDigest, RelativePath, EMPTY_DIRECTORY_DIGEST}; use std::any::type_name; use std::io::Cursor; use tonic::{Code, Status}; @@ -151,7 +151,7 @@ async fn make_execute_request() { }; assert_eq!( - crate::remote::make_execute_request(&req, None, None, &store).await, + crate::remote::make_execute_request(&req, None, None, &store, None).await, Ok(EntireExecuteRequest { action: want_action, command: want_command, @@ -253,7 +253,8 @@ async fn make_execute_request_with_instance_name() { }; assert_eq!( - crate::remote::make_execute_request(&req, Some("dark-tower".to_owned()), None, &store).await, + crate::remote::make_execute_request(&req, Some("dark-tower".to_owned()), None, &store, None) + .await, Ok(EntireExecuteRequest { action: want_action, command: want_command, @@ -353,7 +354,7 @@ async fn make_execute_request_with_cache_key_gen_version() { }; assert_eq!( - crate::remote::make_execute_request(&req, None, Some("meep".to_owned()), &store).await, + crate::remote::make_execute_request(&req, None, Some("meep".to_owned()), &store, None).await, Ok(EntireExecuteRequest { action: want_action, command: want_command, @@ -428,7 +429,7 @@ async fn make_execute_request_with_jdk() { }; assert_eq!( - crate::remote::make_execute_request(&req, None, None, &store).await, + crate::remote::make_execute_request(&req, None, None, &store, None).await, Ok(EntireExecuteRequest { action: want_action, command: want_command, @@ -527,7 +528,7 @@ async fn make_execute_request_with_jdk_and_extra_platform_properties() { }; assert_eq!( - crate::remote::make_execute_request(&req, None, None, &store).await, + crate::remote::make_execute_request(&req, None, None, &store, None).await, Ok(EntireExecuteRequest { action: want_action, command: want_command, @@ -621,7 +622,7 @@ async fn make_execute_request_with_timeout() { }; assert_eq!( - crate::remote::make_execute_request(&req, None, None, &store).await, + crate::remote::make_execute_request(&req, None, None, &store, None).await, Ok(EntireExecuteRequest { action: want_action, command: want_command, @@ -631,6 +632,144 @@ async fn make_execute_request_with_timeout() { ); } +#[tokio::test] +async fn make_execute_request_with_append_only_caches() { + let executor = task_executor::Executor::new(); + let store_dir = TempDir::new().unwrap(); + let store = Store::local_only(executor, store_dir).unwrap(); + + let input_directory = TestDirectory::containing_roland(); + store + .record_directory(&input_directory.directory(), false) + .await + .unwrap(); + + let req = Process { + argv: owned_string_vec(&["/bin/cat", "../.cache/xyzzy/foo.txt"]), + env: vec![("SOME".to_owned(), "value".to_owned())] + .into_iter() + .collect(), + working_directory: Some(RelativePath::new(Path::new("animals")).unwrap()), + input_digests: InputDigests::with_input_files(input_directory.directory_digest()), + output_files: BTreeSet::new(), + output_directories: BTreeSet::new(), + timeout: one_second(), + description: "some description".to_owned(), + level: log::Level::Info, + append_only_caches: btreemap! { + CacheName::new(String::from("xyzzy")).unwrap() => RelativePath::new(Path::new(".cache/xyzzy")).unwrap(), + }, + jdk_home: None, + platform: Platform::Linux_x86_64, + execution_slot_variable: None, + concurrency_available: 0, + cache_scope: ProcessCacheScope::Always, + execution_strategy: ProcessExecutionStrategy::RemoteExecution(vec![]), + remote_cache_speculation_delay: std::time::Duration::from_millis(0), + }; + + let want_command = remexec::Command { + arguments: vec![ + "./__pants_wrapper__".to_owned(), + "/bin/cat".to_owned(), + "../.cache/xyzzy/foo.txt".to_owned(), + ], + environment_variables: vec![ + remexec::command::EnvironmentVariable { + name: crate::remote::CACHE_KEY_EXECUTION_STRATEGY.to_owned(), + value: ProcessExecutionStrategy::RemoteExecution(vec![]).cache_value(), + }, + remexec::command::EnvironmentVariable { + name: crate::remote::CACHE_KEY_TARGET_PLATFORM_ENV_VAR_NAME.to_owned(), + value: "linux_x86_64".to_owned(), + }, + remexec::command::EnvironmentVariable { + name: "SOME".to_owned(), + value: "value".to_owned(), + }, + ], + platform: Some(remexec::Platform::default()), + ..Default::default() + }; + + let want_action = remexec::Action { + command_digest: Some( + (&Digest::new( + Fingerprint::from_hex_string( + "1deb19eddcefd5074263064a7df2a19caeb4e6d86a849bc07e23a5d856f886ec", + ) + .unwrap(), + 178, + )) + .into(), + ), + input_root_digest: Some( + (Digest::new( + Fingerprint::from_hex_string( + "92f5d2ff07cb6cdf4a70f2d6392781b482cd587b9dd69d6729ac73eb54110a69", + ) + .unwrap(), + 178, + )) + .into(), + ), + timeout: Some(prost_types::Duration::from(Duration::from_secs(1))), + ..Default::default() + }; + + let want_execute_request = remexec::ExecuteRequest { + action_digest: Some( + (&Digest::new( + Fingerprint::from_hex_string( + "e4196db365556cbeed4941845f448cfafc1fabb76b3c476c3f378f358235d3c4", + ) + .unwrap(), + 146, + )) + .into(), + ), + skip_cache_lookup: true, + ..Default::default() + }; + + let want_input_root_digest = DirectoryDigest::from_persisted_digest(Digest::new( + Fingerprint::from_hex_string( + "92f5d2ff07cb6cdf4a70f2d6392781b482cd587b9dd69d6729ac73eb54110a69", + ) + .unwrap(), + 178, + )); + + let got_execute_request = + crate::remote::make_execute_request(&req, None, None, &store, Some("/append-only-caches")) + .await + .unwrap(); + assert_eq!( + got_execute_request, + EntireExecuteRequest { + action: want_action, + command: want_command, + execute_request: want_execute_request, + input_root_digest: want_input_root_digest, + } + ); + + // Ensure that the wrapper script was added to the input root. + let mut files = store + .load_digest_trie(got_execute_request.input_root_digest) + .await + .unwrap() + .files(); + files.sort(); + assert_eq!( + files, + vec![ + Path::new("__pants_wrapper__").to_path_buf(), + Path::new("roland.ext").to_path_buf() + ] + ) +} + #[tokio::test] async fn make_execute_request_using_immutable_inputs() { let executor = task_executor::Executor::new(); @@ -737,7 +876,7 @@ async fn make_execute_request_using_immutable_inputs() { }; assert_eq!( - crate::remote::make_execute_request(&req, None, None, &store).await, + crate::remote::make_execute_request(&req, None, None, &store, None).await, Ok(EntireExecuteRequest { action: want_action, command: want_command, @@ -765,6 +904,7 @@ async fn successful_with_only_call_to_execute() { None, None, &store, + None, ) .await .unwrap(); @@ -815,6 +955,7 @@ async fn successful_after_reconnect_with_wait_execution() { None, None, &store, + None, ) .await .unwrap(); @@ -869,6 +1010,7 @@ async fn successful_after_reconnect_from_retryable_error() { None, None, &store, + None, ) .await .unwrap(); @@ -929,7 +1071,7 @@ async fn dropped_request_cancels() { let mock_server = { mock::execution_server::TestServer::new( mock::execution_server::MockExecution::new(vec![ExpectedAPICall::Execute { - execute_request: crate::remote::make_execute_request(&request, None, None, &store) + execute_request: crate::remote::make_execute_request(&request, None, None, &store, None) .await .unwrap() .execute_request, @@ -988,6 +1130,7 @@ async fn server_rejecting_execute_request_gives_error() { None, None, &store, + None, ) .await .unwrap() @@ -1021,6 +1164,7 @@ async fn server_sending_triggering_timeout_with_deadline_exceeded() { None, None, &store, + None, ) .await .unwrap(); @@ -1074,6 +1218,7 @@ async fn sends_headers() { None, None, &store, + None, ) .await .unwrap(); @@ -1100,6 +1245,7 @@ async fn sends_headers() { None, None, None, + None, btreemap! { String::from("cat") => String::from("roland"), String::from("authorization") => String::from("Bearer catnip-will-get-you-anywhere"), @@ -1274,6 +1420,7 @@ async fn ensure_inline_stdio_is_stored() { None, None, &store, + None, ) .await .unwrap(); @@ -1300,6 +1447,7 @@ async fn ensure_inline_stdio_is_stored() { None, None, None, + None, BTreeMap::new(), store.clone(), task_executor::Executor::new(), @@ -1358,6 +1506,7 @@ async fn bad_result_bytes() { None, None, &store, + None, ) .await .unwrap() @@ -1406,6 +1555,7 @@ async fn initial_response_error() { None, None, &store, + None, ) .await .unwrap(); @@ -1460,6 +1610,7 @@ async fn initial_response_missing_response_and_error() { None, None, &store, + None, ) .await .unwrap(); @@ -1505,6 +1656,7 @@ async fn fails_after_retry_limit_exceeded() { None, None, &store, + None, ) .await .unwrap(); @@ -1568,6 +1720,7 @@ async fn fails_after_retry_limit_exceeded_with_stream_close() { None, None, &store, + None, ) .await .unwrap(); @@ -1650,6 +1803,7 @@ async fn execute_missing_file_uploads_if_known() { None, None, &store, + None, ) .await .unwrap(); @@ -1671,6 +1825,7 @@ async fn execute_missing_file_uploads_if_known() { None, None, &store, + None, ) .await .unwrap() @@ -1703,6 +1858,7 @@ async fn execute_missing_file_uploads_if_known() { None, None, None, + None, BTreeMap::new(), store.clone(), task_executor::Executor::new(), @@ -1764,6 +1920,7 @@ async fn execute_missing_file_errors_if_unknown() { None, None, None, + None, BTreeMap::new(), store, task_executor::Executor::new(), @@ -2443,6 +2600,7 @@ fn create_command_runner(execution_address: String, cas: &mock::StubCAS) -> (Com None, None, None, + None, BTreeMap::new(), store.clone(), task_executor::Executor::new(), diff --git a/src/rust/engine/process_executor/src/main.rs b/src/rust/engine/process_executor/src/main.rs index 7ba8dc95804..81e32f24192 100644 --- a/src/rust/engine/process_executor/src/main.rs +++ b/src/rust/engine/process_executor/src/main.rs @@ -301,6 +301,7 @@ async fn main() { &address, process_metadata.instance_name.clone(), process_metadata.cache_key_gen_version.clone(), + None, root_ca_certs.clone(), headers.clone(), store.clone(), @@ -329,6 +330,9 @@ async fn main() { CacheContentBehavior::Defer, args.cache_rpc_concurrency, Duration::from_secs(2), + args + .named_cache_path + .map(|p| p.to_string_lossy().to_string()), ) .expect("Failed to make remote cache command runner"), ) diff --git a/src/rust/engine/src/context.rs b/src/rust/engine/src/context.rs index 272a8895456..a1be4952d04 100644 --- a/src/rust/engine/src/context.rs +++ b/src/rust/engine/src/context.rs @@ -102,6 +102,7 @@ pub struct RemotingOptions { pub execution_headers: BTreeMap, pub execution_overall_deadline: Duration, pub execution_rpc_concurrency: usize, + pub append_only_caches_base_path: Option, } #[derive(Clone, Debug)] @@ -265,6 +266,7 @@ impl Core { remoting_opts.execution_address.as_ref().unwrap(), instance_name, process_cache_namespace, + remoting_opts.append_only_caches_base_path.clone(), root_ca_certs.clone(), remoting_opts.execution_headers.clone(), full_store.clone(), @@ -330,6 +332,7 @@ impl Core { remoting_opts.cache_content_behavior, remoting_opts.cache_rpc_concurrency, remoting_opts.cache_read_timeout, + remoting_opts.append_only_caches_base_path.clone(), )?); } diff --git a/src/rust/engine/src/externs/interface.rs b/src/rust/engine/src/externs/interface.rs index 807d9d6afd2..fd0c66dd194 100644 --- a/src/rust/engine/src/externs/interface.rs +++ b/src/rust/engine/src/externs/interface.rs @@ -307,6 +307,7 @@ impl PyRemotingOptions { execution_headers: BTreeMap, execution_overall_deadline_secs: u64, execution_rpc_concurrency: usize, + append_only_caches_base_path: Option, ) -> Self { Self(RemotingOptions { execution_enable, @@ -329,6 +330,7 @@ impl PyRemotingOptions { execution_headers, execution_overall_deadline: Duration::from_secs(execution_overall_deadline_secs), execution_rpc_concurrency, + append_only_caches_base_path, }) } }