Skip to content

Commit

Permalink
client-side append-only caches for remote execution
Browse files Browse the repository at this point in the history
  • Loading branch information
Tom Dyas committed Oct 31, 2022
1 parent d05c437 commit 653f016
Show file tree
Hide file tree
Showing 11 changed files with 169 additions and 29 deletions.
16 changes: 16 additions & 0 deletions src/python/pants/option/global_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,8 @@ class ExecutionOptions:
remote_execution_overall_deadline_secs: int
remote_execution_rpc_concurrency: int

remote_execution_append_only_caches_base_path: str | None

@classmethod
def from_options(
cls,
Expand Down Expand Up @@ -555,6 +557,7 @@ def from_options(
remote_execution_headers=dynamic_remote_options.execution_headers,
remote_execution_overall_deadline_secs=bootstrap_options.remote_execution_overall_deadline_secs,
remote_execution_rpc_concurrency=dynamic_remote_options.execution_rpc_concurrency,
remote_execution_append_only_caches_base_path=bootstrap_options.remote_execution_append_only_caches_base_path,
)


Expand Down Expand Up @@ -642,6 +645,7 @@ def from_options(cls, options: OptionValueContainer) -> LocalStoreOptions:
},
remote_execution_overall_deadline_secs=60 * 60, # one hour
remote_execution_rpc_concurrency=128,
remote_execution_append_only_caches_base_path=None,
)

DEFAULT_LOCAL_STORE_OPTIONS = LocalStoreOptions()
Expand Down Expand Up @@ -1549,6 +1553,18 @@ class BootstrapOptions:
default=DEFAULT_EXECUTION_OPTIONS.remote_execution_rpc_concurrency,
help="The number of concurrent requests allowed to the remote execution service.",
)
remote_execution_append_only_caches_base_path = StrOption(
default=None,
advanced=True,
help=softwrap(
"""
Sets the base path to use when setting up an append-only cache for a process running remotely.
If this option is not set, then append-only caches will not be used with remote execution.
The option should be set to the absolute path of a writable directory in the remote execution
environment where Pants can create append-only caches for use with remotely executing processes.
"""
),
)
watch_filesystem = BoolOption(
default=True,
advanced=True,
Expand Down
1 change: 1 addition & 0 deletions src/rust/engine/process_execution/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ impl crate::CommandRunner for CommandRunner {
None,
self.process_cache_namespace.clone(),
&self.file_store,
None,
)
.await
.into(),
Expand Down
13 changes: 10 additions & 3 deletions src/rust/engine/process_execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -962,12 +962,19 @@ pub async fn digest(
instance_name: Option<String>,
process_cache_namespace: Option<String>,
store: &Store,
append_only_caches_base_path: Option<&str>,
) -> Digest {
let EntireExecuteRequest {
execute_request, ..
} = remote::make_execute_request(process, instance_name, process_cache_namespace, store)
.await
.unwrap();
} = remote::make_execute_request(
process,
instance_name,
process_cache_namespace,
store,
append_only_caches_base_path,
)
.await
.unwrap();
execute_request.action_digest.unwrap().try_into().unwrap()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ struct NailgunProcessFingerprint {

impl NailgunProcessFingerprint {
pub async fn new(name: String, nailgun_req: &Process, store: &Store) -> Result<Self, String> {
let nailgun_req_digest = crate::digest(nailgun_req, None, None, store).await;
let nailgun_req_digest = crate::digest(nailgun_req, None, None, store, None).await;
Ok(NailgunProcessFingerprint {
name,
fingerprint: nailgun_req_digest.hash,
Expand Down
4 changes: 4 additions & 0 deletions src/rust/engine/process_execution/src/named_caches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ impl CacheName {
))
}
}

pub fn name(&self) -> &str {
&self.0
}
}

#[derive(Clone)]
Expand Down
110 changes: 94 additions & 16 deletions src/rust/engine/process_execution/src/remote.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::cmp::Ordering;
use std::collections::{BTreeMap, HashMap};
use std::convert::TryInto;
use std::fmt::{self, Debug};
use std::fmt::{self, Debug, Write};
use std::io::Cursor;
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime};

Expand Down Expand Up @@ -45,8 +45,8 @@ use workunit_store::{
};

use crate::{
Context, FallibleProcessResultWithPlatform, Platform, Process, ProcessCacheScope, ProcessError,
ProcessExecutionStrategy, ProcessResultMetadata, ProcessResultSource,
CacheName, Context, FallibleProcessResultWithPlatform, Platform, Process, ProcessCacheScope,
ProcessError, ProcessExecutionStrategy, ProcessResultMetadata, ProcessResultSource,
};

// Environment variable which is exclusively used for cache key invalidation.
Expand Down Expand Up @@ -106,6 +106,7 @@ pub enum ExecutionError {
pub struct CommandRunner {
instance_name: Option<String>,
process_cache_namespace: Option<String>,
append_only_caches_base_path: Option<String>,
store: Store,
executor: Executor,
execution_client: Arc<ExecutionClient<LayeredService>>,
Expand Down Expand Up @@ -167,6 +168,7 @@ impl CommandRunner {
execution_address: &str,
instance_name: Option<String>,
process_cache_namespace: Option<String>,
append_only_caches_base_path: Option<String>,
root_ca_certs: Option<Vec<u8>>,
headers: BTreeMap<String, String>,
store: Store,
Expand Down Expand Up @@ -203,6 +205,7 @@ impl CommandRunner {
let command_runner = CommandRunner {
instance_name,
process_cache_namespace,
append_only_caches_base_path,
execution_client,
operations_client,
store,
Expand Down Expand Up @@ -818,6 +821,10 @@ impl crate::CommandRunner for CommandRunner {
self.instance_name.clone(),
self.process_cache_namespace.clone(),
&self.store,
self
.append_only_caches_base_path
.as_ref()
.map(|s| s.as_ref()),
)
.await?;
let build_id = context.build_id.clone();
Expand Down Expand Up @@ -920,6 +927,37 @@ fn maybe_add_workunit(
}
}

fn make_wrapper_for_append_only_caches(
caches: &BTreeMap<CacheName, RelativePath>,
base_path: &str,
) -> Result<String, String> {
let mut script = String::new();
writeln!(&mut script, "#!/bin/sh").map_err(|err| format!("write! failed: {err:?}"))?;
for (cache_name, path) in caches {
writeln!(
&mut script,
"mkdir -p '{}/{}'",
base_path,
cache_name.name()
)
.map_err(|err| format!("write! failed: {err:?}"))?;
if let Some(parent) = path.parent() {
writeln!(&mut script, "mkdir -p '{}'", parent.to_string_lossy())
.map_err(|err| format!("write! failed: {err}"))?;
}
writeln!(
&mut script,
"ln -s '{}/{}' '{}'",
base_path,
cache_name.name(),
path.as_path().to_string_lossy()
)
.map_err(|err| format!("write! failed: {err}"))?;
}
writeln!(&mut script, "exec \"$@\"").map_err(|err| format!("write! failed: {err:?}"))?;
Ok(script)
}

/// Return type for `make_execute_request`. Contains all of the generated REAPI protobufs for
/// a particular `Process`.
#[derive(Clone, Debug, PartialEq)]
Expand All @@ -934,12 +972,49 @@ pub async fn make_execute_request(
req: &Process,
instance_name: Option<String>,
cache_key_gen_version: Option<String>,
_store: &Store,
store: &Store,
append_only_caches_base_path: Option<&str>,
) -> Result<EntireExecuteRequest, String> {
const CACHES_WRAPPER: &str = "./__wrapper__";

// Implement append-only caches by running a wrapper script before the actual

let wrapper_script_digest_opt = match (append_only_caches_base_path, &req.append_only_caches) {
(Some(base_path), caches) if !caches.is_empty() => {
if req.working_directory.is_some() {
return Err(
"TODO-DEV: Append-only caches and working directory cannot be set together in remote execution.".into()
);
}
// TODO-WIP: Make the base path configurable.
let script = make_wrapper_for_append_only_caches(caches, base_path)?;
let digest = store
.store_file_bytes(Bytes::from(script), false)
.await
.map_err(|err| format!("Failed to store wrapper script: {err}"))?;
let path = RelativePath::new(Path::new(CACHES_WRAPPER))?;
let snapshot = store.snapshot_of_one_file(path, digest, true).await?;
let directory_digest = DirectoryDigest::new(snapshot.digest, snapshot.tree);
Some(directory_digest)
}
_ => None,
};

let arguments = match &wrapper_script_digest_opt {
Some(_) => {
let mut args = Vec::with_capacity(req.argv.len() + 1);
args.push(CACHES_WRAPPER.to_string());
args.extend(req.argv.iter().cloned());
args
}
None => req.argv.clone(),
};

let mut command = remexec::Command {
arguments: req.argv.clone(),
arguments,
..remexec::Command::default()
};

for (name, value) in &req.env {
if name == CACHE_KEY_GEN_VERSION_ENV_VAR_NAME
|| name == CACHE_KEY_TARGET_PLATFORM_ENV_VAR_NAME
Expand All @@ -964,15 +1039,6 @@ pub async fn make_execute_request(
_ => vec![],
};

// TODO: Disabling append-only caches in remoting until server support exists due to
// interaction with how servers match platform properties.
// if !req.append_only_caches.is_empty() {
// platform_properties.extend(NamedCaches::platform_properties(
// &req.append_only_caches,
// &cache_key_gen_version,
// ));
// }

if let Some(cache_key_gen_version) = cache_key_gen_version {
command
.environment_variables
Expand Down Expand Up @@ -1091,7 +1157,19 @@ pub async fn make_execute_request(
.environment_variables
.sort_by(|x, y| x.name.cmp(&y.name));

let input_root_digest = req.input_digests.complete.clone();
let input_root_digest: DirectoryDigest = match &wrapper_script_digest_opt {
Some(wrapper_digest) => {
let digests = vec![
req.input_digests.complete.clone(),
wrapper_digest.to_owned(),
];
store
.merge(digests)
.await
.map_err(|err| format!("store error: {err}"))?
}
None => req.input_digests.complete.clone(),
};

let mut action = remexec::Action {
command_digest: Some((&digest(&command)?).into()),
Expand Down
7 changes: 7 additions & 0 deletions src/rust/engine/process_execution/src/remote_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub struct CommandRunner {
inner: Arc<dyn crate::CommandRunner>,
instance_name: Option<String>,
process_cache_namespace: Option<String>,
append_only_caches_base_path: Option<String>,
executor: task_executor::Executor,
store: Store,
action_cache_client: Arc<ActionCacheClient<LayeredService>>,
Expand Down Expand Up @@ -79,6 +80,7 @@ impl CommandRunner {
cache_content_behavior: CacheContentBehavior,
concurrency_limit: usize,
read_timeout: Duration,
append_only_caches_base_path: Option<String>,
) -> Result<Self, String> {
let tls_client_config = if action_cache_address.starts_with("https://") {
Some(grpc_util::tls::Config::new_without_mtls(root_ca_certs).try_into()?)
Expand All @@ -103,6 +105,7 @@ impl CommandRunner {
inner,
instance_name,
process_cache_namespace,
append_only_caches_base_path,
executor,
store,
action_cache_client,
Expand Down Expand Up @@ -475,6 +478,10 @@ impl crate::CommandRunner for CommandRunner {
self.instance_name.clone(),
self.process_cache_namespace.clone(),
&self.store,
self
.append_only_caches_base_path
.as_ref()
.map(|s| s.as_ref()),
)
.await?;
let failures_cached = request.cache_scope == ProcessCacheScope::Always;
Expand Down
4 changes: 3 additions & 1 deletion src/rust/engine/process_execution/src/remote_cache_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ fn create_cached_runner(
cache_content_behavior,
256,
CACHE_READ_TIMEOUT,
None,
)
.expect("caching command runner"),
)
Expand All @@ -162,7 +163,7 @@ async fn create_process(store_setup: &StoreSetup) -> (Process, Digest) {
]);
let EntireExecuteRequest {
action, command, ..
} = make_execute_request(&process, None, None, &store_setup.store)
} = make_execute_request(&process, None, None, &store_setup.store, None)
.await
.unwrap();
let (_command_digest, action_digest) =
Expand Down Expand Up @@ -733,6 +734,7 @@ async fn make_action_result_basic() {
CacheContentBehavior::Defer,
256,
CACHE_READ_TIMEOUT,
None,
)
.expect("caching command runner");

Expand Down
Loading

0 comments on commit 653f016

Please sign in to comment.