From ebb33bc929a8ed4888ebf6547b9fea6272581f18 Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Wed, 10 Jul 2019 18:25:10 +0100 Subject: [PATCH] Local caching CommandRunner has default-on flag --- src/python/pants/engine/native.py | 1 + src/python/pants/option/global_options.py | 5 +++ src/rust/engine/Cargo.lock | 1 + src/rust/engine/Cargo.toml | 1 + src/rust/engine/engine_cffi/src/lib.rs | 2 ++ .../engine/process_execution/src/cache.rs | 10 +++--- src/rust/engine/src/context.rs | 36 ++++++++++++++----- src/rust/engine/src/nodes.rs | 1 - 8 files changed, 43 insertions(+), 14 deletions(-) diff --git a/src/python/pants/engine/native.py b/src/python/pants/engine/native.py index 6946b6c4b6b3..15007621930a 100644 --- a/src/python/pants/engine/native.py +++ b/src/python/pants/engine/native.py @@ -841,6 +841,7 @@ def ti(type_obj): execution_options.process_execution_local_parallelism, execution_options.process_execution_remote_parallelism, execution_options.process_execution_cleanup_local_dirs, + execution_options.process_execution_use_local_cache, ) return self.gc(scheduler, self.lib.scheduler_destroy) diff --git a/src/python/pants/option/global_options.py b/src/python/pants/option/global_options.py index a8db0f577185..b1e6cac35e28 100644 --- a/src/python/pants/option/global_options.py +++ b/src/python/pants/option/global_options.py @@ -36,6 +36,7 @@ class ExecutionOptions(datatype([ 'process_execution_local_parallelism', 'process_execution_remote_parallelism', 'process_execution_cleanup_local_dirs', + 'process_execution_use_local_cache', 'remote_execution_process_cache_namespace', 'remote_instance_name', 'remote_ca_certs_path', @@ -61,6 +62,7 @@ def from_bootstrap_options(cls, bootstrap_options): process_execution_local_parallelism=bootstrap_options.process_execution_local_parallelism, process_execution_remote_parallelism=bootstrap_options.process_execution_remote_parallelism, process_execution_cleanup_local_dirs=bootstrap_options.process_execution_cleanup_local_dirs, + process_execution_use_local_cache=bootstrap_options.process_execution_use_local_cache, remote_execution_process_cache_namespace=bootstrap_options.remote_execution_process_cache_namespace, remote_instance_name=bootstrap_options.remote_instance_name, remote_ca_certs_path=bootstrap_options.remote_ca_certs_path, @@ -80,6 +82,7 @@ def from_bootstrap_options(cls, bootstrap_options): process_execution_local_parallelism=multiprocessing.cpu_count()*2, process_execution_remote_parallelism=128, process_execution_cleanup_local_dirs=True, + process_execution_use_local_cache=True, remote_execution_process_cache_namespace=None, remote_instance_name=None, remote_ca_certs_path=None, @@ -415,6 +418,8 @@ def register_bootstrap_options(cls, register): register('--process-execution-cleanup-local-dirs', type=bool, default=True, advanced=True, help='Whether or not to cleanup directories used for local process execution ' '(primarily useful for e.g. debugging).') + register('--process-execution-use-local-cache', type=bool, default=True, advanced=True, + help='Whether to keep process executions in a local cache persisted to disk.') @classmethod def register_options(cls, register): diff --git a/src/rust/engine/Cargo.lock b/src/rust/engine/Cargo.lock index 9863e8abfc16..098f87690797 100644 --- a/src/rust/engine/Cargo.lock +++ b/src/rust/engine/Cargo.lock @@ -622,6 +622,7 @@ dependencies = [ "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", "reqwest 0.9.18 (registry+https://github.com/rust-lang/crates.io-index)", "rule_graph 0.0.1", + "sharded_lmdb 0.0.1", "smallvec 0.6.10 (registry+https://github.com/rust-lang/crates.io-index)", "store 0.1.0", "task_executor 0.0.1", diff --git a/src/rust/engine/Cargo.toml b/src/rust/engine/Cargo.toml index c269620658c5..ce121b9ea84f 100644 --- a/src/rust/engine/Cargo.toml +++ b/src/rust/engine/Cargo.toml @@ -94,6 +94,7 @@ process_execution = { path = "process_execution" } rand = "0.6" reqwest = { version = "0.9.10", default_features = false, features = ["rustls-tls"] } rule_graph = { path = "rule_graph" } +sharded_lmdb = { path = "sharded_lmdb" } smallvec = "0.6" store = { path = "fs/store" } tempfile = "3" diff --git a/src/rust/engine/engine_cffi/src/lib.rs b/src/rust/engine/engine_cffi/src/lib.rs index 9f52223e8fd3..708bf3d98ba7 100644 --- a/src/rust/engine/engine_cffi/src/lib.rs +++ b/src/rust/engine/engine_cffi/src/lib.rs @@ -202,6 +202,7 @@ pub extern "C" fn scheduler_create( process_execution_local_parallelism: u64, process_execution_remote_parallelism: u64, process_execution_cleanup_local_dirs: bool, + process_execution_use_local_cache: bool, ) -> *const Scheduler { let root_type_ids = root_type_ids.to_vec(); let ignore_patterns = ignore_patterns_buf @@ -310,6 +311,7 @@ pub extern "C" fn scheduler_create( process_execution_local_parallelism as usize, process_execution_remote_parallelism as usize, process_execution_cleanup_local_dirs, + process_execution_use_local_cache, )))) } diff --git a/src/rust/engine/process_execution/src/cache.rs b/src/rust/engine/process_execution/src/cache.rs index e45127eb42f8..3a736cb8a514 100644 --- a/src/rust/engine/process_execution/src/cache.rs +++ b/src/rust/engine/process_execution/src/cache.rs @@ -11,11 +11,11 @@ use store::Store; use workunit_store::WorkUnitStore; #[derive(Clone)] -struct CommandRunner { - underlying: Arc, - process_execution_store: ShardedLmdb, - file_store: Store, - metadata: ExecuteProcessRequestMetadata, +pub struct CommandRunner { + pub underlying: Arc, + pub process_execution_store: ShardedLmdb, + pub file_store: Store, + pub metadata: ExecuteProcessRequestMetadata, } impl crate::CommandRunner for CommandRunner { diff --git a/src/rust/engine/src/context.rs b/src/rust/engine/src/context.rs index 26468fab13e4..30ba609f0b2d 100644 --- a/src/rust/engine/src/context.rs +++ b/src/rust/engine/src/context.rs @@ -19,10 +19,11 @@ use boxfuture::{BoxFuture, Boxable}; use core::clone::Clone; use fs::{safe_create_dir_all_ioerror, PosixFS}; use graph::{EntryId, Graph, NodeContext}; -use process_execution::{self, BoundedCommandRunner, ExecuteProcessRequestMetadata}; +use process_execution::{self, BoundedCommandRunner, CommandRunner, ExecuteProcessRequestMetadata}; use rand::seq::SliceRandom; use reqwest; use rule_graph::RuleGraph; +use sharded_lmdb::ShardedLmdb; use std::collections::btree_map::BTreeMap; use store::Store; @@ -41,7 +42,7 @@ pub struct Core { pub types: Types, pub executor: task_executor::Executor, store: Store, - pub command_runner: BoundedCommandRunner, + pub command_runner: Box, pub http_client: reqwest::r#async::Client, pub vfs: PosixFS, pub build_root: PathBuf, @@ -71,6 +72,7 @@ impl Core { process_execution_local_parallelism: usize, process_execution_remote_parallelism: usize, process_execution_cleanup_local_dirs: bool, + process_execution_use_local_cache: bool, ) -> Core { // Randomize CAS address order to avoid thundering herds from common config. let mut remote_store_servers = remote_store_servers; @@ -97,7 +99,7 @@ impl Core { None }; - let local_store_dir = local_store_dir.clone(); + let local_store_dir2 = local_store_dir.clone(); let store = safe_create_dir_all_ioerror(&local_store_dir) .map_err(|e| format!("Error making directory {:?}: {:?}", local_store_dir, e)) .and_then(|()| { @@ -123,15 +125,17 @@ impl Core { }) .unwrap_or_else(|e| panic!("Could not initialize Store: {:?}", e)); + let process_execution_metadata = ExecuteProcessRequestMetadata { + instance_name: remote_instance_name.clone(), + cache_key_gen_version: remote_execution_process_cache_namespace.clone(), + platform_properties: remote_execution_extra_platform_properties.clone(), + }; + let command_runner = match &remote_execution_server { Some(ref address) if remote_execution => BoundedCommandRunner::new( Box::new(process_execution::remote::CommandRunner::new( address, - ExecuteProcessRequestMetadata { - instance_name: remote_instance_name.clone(), - cache_key_gen_version: remote_execution_process_cache_namespace.clone(), - platform_properties: remote_execution_extra_platform_properties.clone(), - }, + process_execution_metadata.clone(), root_ca_certs.clone(), oauth_bearer_token.clone(), store.clone(), @@ -148,6 +152,22 @@ impl Core { process_execution_local_parallelism, ), }; + let mut command_runner: Box = Box::new(command_runner); + + if process_execution_use_local_cache { + let process_execution_store = ShardedLmdb::new( + local_store_dir2.join("processes"), + 5 * 1024 * 1024 * 1024, + executor.clone(), + ).expect("Could not initialize store for process cache: {:?}"); + command_runner = Box::new(process_execution::cache::CommandRunner { + underlying: command_runner.into(), + process_execution_store, + file_store: store.clone(), + metadata: process_execution_metadata, + }) + + } let http_client = reqwest::r#async::Client::new(); let rule_graph = RuleGraph::new(tasks.as_map(), root_subject_types); diff --git a/src/rust/engine/src/nodes.rs b/src/rust/engine/src/nodes.rs index 519a806b920f..29eecbcc9f9e 100644 --- a/src/rust/engine/src/nodes.rs +++ b/src/rust/engine/src/nodes.rs @@ -26,7 +26,6 @@ use fs::{ PathGlobs, PathStat, StrictGlobMatching, VFS, }; use hashing; -use process_execution::{self, CommandRunner}; use rule_graph; use graph::{Entry, Node, NodeError, NodeTracer, NodeVisualizer};