From 1d124eea4440db3b3f2e58afca86ad5a305f6a6c Mon Sep 17 00:00:00 2001 From: Stu Hood Date: Mon, 19 Jul 2021 16:10:17 -0700 Subject: [PATCH] Mark workunits blocked, and skip rendering completed workunits (#12369) Our "graph" of execution is a DAG, while the workunits (used to visualize and record traces) form a tree. Because they form a tree, workunits do not always report that they are blocked on work that they didn't start, but which some other node started (common when lots of @rules are waiting on another single @rule which only one of them started). To fix this, we make the `blocked` property of a workunit an atomic mutable, and skip rendering the parents of blocked leaves. We use the `blocked` flag both for `Tasks` (which wait directly for memoized Nodes, and so frequently block in this way), and in `BoundedCommandRunner`, which temporarily blocks the workunit while we're acquiring the semaphore. Additionally, we skip rendering or walking through `Completed` workunits, which can happen in the case of speculation if a parent workunit completes before a child. In order to toggle the `blocked` property on workunits, we expose the current `RunningWorkunit` in two new places: the `CommandRunner` and `WrappedNode`. In both cases, this is to allow the generic code to consume the workunit created by their callers and mark it blocked (for `Task` and `BoundedCommandRunner`). Fixes #12349. [ci skip-build-wheels] --- .../pants/engine/internals/engine_test.py | 21 +- src/rust/engine/async_semaphore/src/lib.rs | 4 +- .../engine/process_execution/src/cache.rs | 82 +++- .../process_execution/src/cache_tests.rs | 28 +- src/rust/engine/process_execution/src/lib.rs | 89 +--- .../engine/process_execution/src/local.rs | 10 +- .../process_execution/src/local_tests.rs | 53 +-- .../engine/process_execution/src/remote.rs | 89 ++-- .../process_execution/src/remote_cache.rs | 55 ++- .../src/remote_cache_tests.rs | 58 +-- .../process_execution/src/remote_tests.rs | 64 +-- src/rust/engine/process_executor/src/main.rs | 18 +- src/rust/engine/src/externs/interface.rs | 2 +- src/rust/engine/src/nodes.rs | 416 ++++++++++-------- src/rust/engine/src/session.rs | 16 +- src/rust/engine/workunit_store/src/lib.rs | 168 +++++-- src/rust/engine/workunit_store/src/tests.rs | 93 +++- 17 files changed, 735 insertions(+), 531 deletions(-) diff --git a/src/python/pants/engine/internals/engine_test.py b/src/python/pants/engine/internals/engine_test.py index fed350565f1..974e588bcd8 100644 --- a/src/python/pants/engine/internals/engine_test.py +++ b/src/python/pants/engine/internals/engine_test.py @@ -31,7 +31,7 @@ from pants.engine.internals.scheduler import ExecutionError, SchedulerSession from pants.engine.internals.scheduler_test_base import SchedulerTestBase from pants.engine.platform import rules as platform_rules -from pants.engine.process import MultiPlatformProcess, Process, ProcessResult +from pants.engine.process import MultiPlatformProcess, Process, ProcessCacheScope, ProcessResult from pants.engine.process import rules as process_rules from pants.engine.rules import Get, MultiGet, rule from pants.engine.streaming_workunit_handler import ( @@ -609,6 +609,7 @@ async def a_rule() -> TrueResult: proc = Process( ["/bin/sh", "-c", "true"], description="always true", + cache_scope=ProcessCacheScope.PER_SESSION, ) _ = await Get(ProcessResult, MultiPlatformProcess({None: proc})) return TrueResult() @@ -624,9 +625,9 @@ async def a_rule() -> TrueResult: finished = list(itertools.chain.from_iterable(tracker.finished_workunit_chunks)) workunits_with_counters = [item for item in finished if "counters" in item] - assert workunits_with_counters[0]["counters"]["local_execution_requests"] == 1 - assert workunits_with_counters[1]["counters"]["local_cache_requests"] == 1 - assert workunits_with_counters[1]["counters"]["local_cache_requests_uncached"] == 1 + assert workunits_with_counters[0]["counters"]["local_cache_requests"] == 1 + assert workunits_with_counters[0]["counters"]["local_cache_requests_uncached"] == 1 + assert workunits_with_counters[1]["counters"]["local_execution_requests"] == 1 assert histograms_info["version"] == 0 assert "histograms" in histograms_info @@ -736,7 +737,7 @@ def test_process_digests_on_streaming_workunits( run_tracker=run_tracker, callbacks=[tracker], report_interval_seconds=0.01, - max_workunit_verbosity=LogLevel.INFO, + max_workunit_verbosity=LogLevel.DEBUG, specs=Specs.empty(), options_bootstrapper=create_options_bootstrapper([]), pantsd=False, @@ -752,9 +753,7 @@ def test_process_digests_on_streaming_workunits( assert tracker.finished finished = list(itertools.chain.from_iterable(tracker.finished_workunit_chunks)) - process_workunit = next( - item for item in finished if item["name"] == "multi_platform_process-running" - ) + process_workunit = next(item for item in finished if item["name"] == "multi_platform_process") assert process_workunit is not None stdout_digest = process_workunit["artifacts"]["stdout_digest"] stderr_digest = process_workunit["artifacts"]["stderr_digest"] @@ -769,7 +768,7 @@ def test_process_digests_on_streaming_workunits( run_tracker=run_tracker, callbacks=[tracker], report_interval_seconds=0.01, - max_workunit_verbosity=LogLevel.INFO, + max_workunit_verbosity=LogLevel.DEBUG, specs=Specs.empty(), options_bootstrapper=create_options_bootstrapper([]), pantsd=False, @@ -782,9 +781,7 @@ def test_process_digests_on_streaming_workunits( assert tracker.finished finished = list(itertools.chain.from_iterable(tracker.finished_workunit_chunks)) - process_workunit = next( - item for item in finished if item["name"] == "multi_platform_process-running" - ) + process_workunit = next(item for item in finished if item["name"] == "multi_platform_process") assert process_workunit is not None stdout_digest = process_workunit["artifacts"]["stdout_digest"] diff --git a/src/rust/engine/async_semaphore/src/lib.rs b/src/rust/engine/async_semaphore/src/lib.rs index 2805d68d1b1..82b5731a61c 100644 --- a/src/rust/engine/async_semaphore/src/lib.rs +++ b/src/rust/engine/async_semaphore/src/lib.rs @@ -68,8 +68,8 @@ impl AsyncSemaphore { /// pub async fn with_acquired(self, f: F) -> O where - F: FnOnce(usize) -> B + Send + 'static, - B: Future + Send + 'static, + F: FnOnce(usize) -> B, + B: Future, { let permit = self.acquire().await; let res = f(permit.id).await; diff --git a/src/rust/engine/process_execution/src/cache.rs b/src/rust/engine/process_execution/src/cache.rs index a0f587283df..ca02e6f99bd 100644 --- a/src/rust/engine/process_execution/src/cache.rs +++ b/src/rust/engine/process_execution/src/cache.rs @@ -11,7 +11,9 @@ use prost::Message; use serde::{Deserialize, Serialize}; use sharded_lmdb::ShardedLmdb; use store::Store; -use workunit_store::{in_workunit, Level, Metric, ObservationMetric, WorkunitMetadata}; +use workunit_store::{ + in_workunit, Level, Metric, ObservationMetric, RunningWorkunit, WorkunitMetadata, +}; use crate::{ Context, FallibleProcessResultWithPlatform, MultiPlatformProcess, Platform, Process, @@ -57,42 +59,49 @@ impl crate::CommandRunner for CommandRunner { async fn run( &self, - req: MultiPlatformProcess, context: Context, + workunit: &mut RunningWorkunit, + req: MultiPlatformProcess, ) -> Result { let cache_lookup_start = Instant::now(); + let write_failures_to_cache = req + .0 + .values() + .any(|process| process.cache_scope == ProcessCacheScope::Always); + let digest = crate::digest(req.clone(), &self.metadata); + let key = digest.hash; + let context2 = context.clone(); - in_workunit!( - context2.workunit_store, + let cache_read_result = in_workunit!( + context.workunit_store.clone(), "local_cache_read".to_owned(), WorkunitMetadata { level: Level::Trace, + desc: Some(format!("Local cache lookup: {}", req.user_facing_name())), ..WorkunitMetadata::default() }, |workunit| async move { workunit.increment_counter(Metric::LocalCacheRequests, 1); - let digest = crate::digest(req.clone(), &self.metadata); - let key = digest.hash; - - let cache_failures = req - .0 - .values() - .any(|process| process.cache_scope == ProcessCacheScope::Always); - - let command_runner = self.clone(); match self.lookup(key).await { - Ok(Some(result)) if result.exit_code == 0 || cache_failures => { + Ok(Some(result)) if result.exit_code == 0 || write_failures_to_cache => { let lookup_elapsed = cache_lookup_start.elapsed(); workunit.increment_counter(Metric::LocalCacheRequestsCached, 1); if let Some(time_saved) = result.metadata.time_saved_from_cache(lookup_elapsed) { let time_saved = time_saved.as_millis() as u64; workunit.increment_counter(Metric::LocalCacheTotalTimeSavedMs, time_saved); - context + context2 .workunit_store .record_observation(ObservationMetric::LocalCacheTimeSavedMs, time_saved); } - return Ok(result); + // When we successfully use the cache, we change the description and increase the level + // (but not so much that it will be logged by default). + workunit.update_metadata(|initial| WorkunitMetadata { + desc: initial.desc.as_ref().map(|desc| format!("Hit: {}", desc)), + level: Level::Debug, + ..initial + }); + Ok(result) } Err(err) => { debug!( @@ -101,17 +110,43 @@ impl crate::CommandRunner for CommandRunner { ); workunit.increment_counter(Metric::LocalCacheReadErrors, 1); // Falling through to re-execute. + Err(()) } Ok(_) => { // Either we missed, or we hit for a failing result. workunit.increment_counter(Metric::LocalCacheRequestsUncached, 1); // Falling through to execute. + Err(()) } } + } + .boxed() + ) + .await; + + if let Ok(result) = cache_read_result { + workunit.update_metadata(|initial| WorkunitMetadata { + desc: initial + .desc + .as_ref() + .map(|desc| format!("Hit local cache: {}", desc)), + ..initial + }); + return Ok(result); + } - let result = command_runner.underlying.run(req, context.clone()).await?; - if result.exit_code == 0 || cache_failures { - if let Err(err) = command_runner.store(key, &result).await { + let result = self.underlying.run(context.clone(), workunit, req).await?; + if result.exit_code == 0 || write_failures_to_cache { + let result = result.clone(); + in_workunit!( + context.workunit_store.clone(), + "local_cache_write".to_owned(), + WorkunitMetadata { + level: Level::Trace, + ..WorkunitMetadata::default() + }, + |workunit| async move { + if let Err(err) = self.store(key, &result).await { warn!( "Error storing process execution result to local cache: {} - ignoring and continuing", err @@ -119,11 +154,10 @@ impl crate::CommandRunner for CommandRunner { workunit.increment_counter(Metric::LocalCacheWriteErrors, 1); } } - Ok(result) - } - .boxed() - ) - .await + ) + .await; + } + Ok(result) } } diff --git a/src/rust/engine/process_execution/src/cache_tests.rs b/src/rust/engine/process_execution/src/cache_tests.rs index d0297d4b69e..7f19f3bab87 100644 --- a/src/rust/engine/process_execution/src/cache_tests.rs +++ b/src/rust/engine/process_execution/src/cache_tests.rs @@ -7,7 +7,7 @@ use store::Store; use tempfile::TempDir; use testutil::data::TestData; use testutil::relative_paths; -use workunit_store::WorkunitStore; +use workunit_store::{RunningWorkunit, WorkunitStore}; use crate::{ CommandRunner as CommandRunnerTrait, Context, FallibleProcessResultWithPlatform, NamedCaches, @@ -85,16 +85,18 @@ fn create_script(script_exit_code: i8) -> (Process, PathBuf, TempDir) { (process, script_path, script_dir) } -async fn run_roundtrip(script_exit_code: i8) -> RoundtripResults { +async fn run_roundtrip(script_exit_code: i8, workunit: &mut RunningWorkunit) -> RoundtripResults { let (local, store, _local_runner_dir) = create_local_runner(); let (process, script_path, _script_dir) = create_script(script_exit_code); - let local_result = local.run(process.clone().into(), Context::default()).await; + let local_result = local + .run(Context::default(), workunit, process.clone().into()) + .await; let (caching, _cache_dir) = create_cached_runner(local, store.clone()); let uncached_result = caching - .run(process.clone().into(), Context::default()) + .run(Context::default(), workunit, process.clone().into()) .await; assert_eq!(local_result, uncached_result); @@ -103,7 +105,9 @@ async fn run_roundtrip(script_exit_code: i8) -> RoundtripResults { // fail due to a FileNotFound error. So, If the second run succeeds, that implies that the // cache was successfully used. std::fs::remove_file(&script_path).unwrap(); - let maybe_cached_result = caching.run(process.into(), Context::default()).await; + let maybe_cached_result = caching + .run(Context::default(), workunit, process.into()) + .await; RoundtripResults { uncached: uncached_result, @@ -113,15 +117,15 @@ async fn run_roundtrip(script_exit_code: i8) -> RoundtripResults { #[tokio::test] async fn cache_success() { - WorkunitStore::setup_for_tests(); - let results = run_roundtrip(0).await; + let (_, mut workunit) = WorkunitStore::setup_for_tests(); + let results = run_roundtrip(0, &mut workunit).await; assert_eq!(results.uncached, results.maybe_cached); } #[tokio::test] async fn failures_not_cached() { - WorkunitStore::setup_for_tests(); - let results = run_roundtrip(1).await; + let (_, mut workunit) = WorkunitStore::setup_for_tests(); + let results = run_roundtrip(1, &mut workunit).await; assert_ne!(results.uncached, results.maybe_cached); assert_eq!(results.uncached.unwrap().exit_code, 1); assert_eq!(results.maybe_cached.unwrap().exit_code, 127); // aka the return code for file not found @@ -129,7 +133,7 @@ async fn failures_not_cached() { #[tokio::test] async fn recover_from_missing_store_contents() { - WorkunitStore::setup_for_tests(); + let (_, mut workunit) = WorkunitStore::setup_for_tests(); let (local, store, _local_runner_dir) = create_local_runner(); let (caching, _cache_dir) = create_cached_runner(local, store.clone()); @@ -137,7 +141,7 @@ async fn recover_from_missing_store_contents() { // Run once to cache the process. let first_result = caching - .run(process.clone().into(), Context::default()) + .run(Context::default(), &mut workunit, process.clone().into()) .await .unwrap(); @@ -170,7 +174,7 @@ async fn recover_from_missing_store_contents() { // Ensure that we don't fail if we re-run. let second_result = caching - .run(process.clone().into(), Context::default()) + .run(Context::default(), &mut workunit, process.clone().into()) .await .unwrap(); diff --git a/src/rust/engine/process_execution/src/lib.rs b/src/rust/engine/process_execution/src/lib.rs index f50897e1442..a0f2591afcb 100644 --- a/src/rust/engine/process_execution/src/lib.rs +++ b/src/rust/engine/process_execution/src/lib.rs @@ -30,19 +30,20 @@ #[macro_use] extern crate derivative; -use async_trait::async_trait; -use bazel_protos::gen::build::bazel::remote::execution::v2 as remexec; -pub use log::Level; -use remexec::ExecutedActionMetadata; -use serde::{Deserialize, Serialize}; use std::collections::{BTreeMap, BTreeSet}; use std::convert::TryFrom; use std::path::PathBuf; use std::sync::Arc; -use workunit_store::{in_workunit, UserMetadataItem, WorkunitMetadata, WorkunitStore}; + +pub use log::Level; use async_semaphore::AsyncSemaphore; +use async_trait::async_trait; +use bazel_protos::gen::build::bazel::remote::execution::v2 as remexec; use hashing::{Digest, EMPTY_FINGERPRINT}; +use remexec::ExecutedActionMetadata; +use serde::{Deserialize, Serialize}; +use workunit_store::{RunningWorkunit, WorkunitStore}; pub mod cache; #[cfg(test)] @@ -321,10 +322,6 @@ impl MultiPlatformProcess { .map(|(_platforms, process)| process.level) .unwrap_or(Level::Info) } - - pub fn workunit_name(&self) -> String { - "multi_platform_process".to_string() - } } impl From for MultiPlatformProcess { @@ -473,8 +470,9 @@ pub trait CommandRunner: Send + Sync { /// async fn run( &self, - req: MultiPlatformProcess, context: Context, + workunit: &mut RunningWorkunit, + req: MultiPlatformProcess, ) -> Result; /// @@ -530,38 +528,21 @@ impl BoundedCommandRunner { impl CommandRunner for BoundedCommandRunner { async fn run( &self, - mut req: MultiPlatformProcess, context: Context, + workunit: &mut RunningWorkunit, + mut req: MultiPlatformProcess, ) -> Result { - let name = format!("{}-waiting", req.workunit_name()); - let desc = req.user_facing_name(); - let outer_metadata = WorkunitMetadata { - level: Level::Debug, - desc: Some(format!("(Waiting) {}", desc)), - // We don't want to display the workunit associated with processes waiting on a - // BoundedCommandRunner to show in the dynamic UI, so set the `blocked` flag - // on the workunit metadata in order to prevent this. - blocked: true, - ..WorkunitMetadata::default() - }; - - let bounded_fut = { - let inner = self.inner.clone(); - let semaphore = self.inner.1.clone(); - let context = context.clone(); - let name = format!("{}-running", req.workunit_name()); - - semaphore.with_acquired(move |concurrency_id| { + let semaphore = self.inner.1.clone(); + let inner = self.inner.clone(); + let blocking_token = workunit.blocking(); + semaphore + .with_acquired(|concurrency_id| { log::debug!( "Running {} under semaphore with concurrency id: {}", - desc, + req.user_facing_name(), concurrency_id ); - let metadata = WorkunitMetadata { - level: req.workunit_level(), - desc: Some(desc), - ..WorkunitMetadata::default() - }; + std::mem::drop(blocking_token); for (_, process) in req.0.iter_mut() { if let Some(ref execution_slot_env_var) = process.execution_slot_variable { @@ -572,39 +553,9 @@ impl CommandRunner for BoundedCommandRunner { } } - in_workunit!( - context.workunit_store.clone(), - name, - metadata, - |workunit| async move { - let res = inner.0.run(req, context).await; - if let Ok(FallibleProcessResultWithPlatform { - stdout_digest, - stderr_digest, - exit_code, - .. - }) = res - { - workunit.update_metadata(|initial| WorkunitMetadata { - stdout: Some(stdout_digest), - stderr: Some(stderr_digest), - user_metadata: vec![( - "exit_code".to_string(), - UserMetadataItem::ImmediateId(exit_code as i64), - )], - ..initial - }) - } - res - }, - ) + inner.0.run(context, workunit, req) }) - }; - - in_workunit!(context.workunit_store, name, outer_metadata, |_workunit| { - bounded_fut - }) - .await + .await } fn extract_compatible_request(&self, req: &MultiPlatformProcess) -> Option { diff --git a/src/rust/engine/process_execution/src/local.rs b/src/rust/engine/process_execution/src/local.rs index fe0cafe7980..d40fa81e10c 100644 --- a/src/rust/engine/process_execution/src/local.rs +++ b/src/rust/engine/process_execution/src/local.rs @@ -29,7 +29,7 @@ use tokio::sync::RwLock; use tokio::time::{timeout, Duration}; use tokio_util::codec::{BytesCodec, FramedRead}; use tryfuture::try_future; -use workunit_store::{in_workunit, Level, Metric, WorkunitMetadata}; +use workunit_store::{in_workunit, Metric, RunningWorkunit, WorkunitMetadata}; use crate::{ Context, FallibleProcessResultWithPlatform, MultiPlatformProcess, NamedCaches, Platform, Process, @@ -247,8 +247,9 @@ impl super::CommandRunner for CommandRunner { /// async fn run( &self, - req: MultiPlatformProcess, context: Context, + _workunit: &mut RunningWorkunit, + req: MultiPlatformProcess, ) -> Result { let req = self.extract_compatible_request(&req).unwrap(); let req_debug_repr = format!("{:#?}", req); @@ -256,7 +257,10 @@ impl super::CommandRunner for CommandRunner { context.workunit_store.clone(), "run_local_process".to_owned(), WorkunitMetadata { - level: Level::Trace, + // NB: See engine::nodes::NodeKey::workunit_level for more information on why this workunit + // renders at the Process's level. + level: req.level, + desc: Some(req.description.clone()), ..WorkunitMetadata::default() }, |workunit| async move { diff --git a/src/rust/engine/process_execution/src/local_tests.rs b/src/rust/engine/process_execution/src/local_tests.rs index 76d52a7bb78..538a8bb21bd 100644 --- a/src/rust/engine/process_execution/src/local_tests.rs +++ b/src/rust/engine/process_execution/src/local_tests.rs @@ -18,7 +18,7 @@ use tempfile::TempDir; use testutil::data::{TestData, TestDirectory}; use testutil::path::{find_bash, which}; use testutil::{owned_string_vec, relative_paths}; -use workunit_store::WorkunitStore; +use workunit_store::{RunningWorkunit, WorkunitStore}; #[derive(PartialEq, Debug)] struct LocalTestResult { @@ -30,8 +30,6 @@ struct LocalTestResult { #[tokio::test] #[cfg(unix)] async fn stdout() { - WorkunitStore::setup_for_tests(); - let result = run_command_locally(Process::new(owned_string_vec(&["/bin/echo", "-n", "foo"]))) .await .unwrap(); @@ -45,8 +43,6 @@ async fn stdout() { #[tokio::test] #[cfg(unix)] async fn stdout_and_stderr_and_exit_code() { - WorkunitStore::setup_for_tests(); - let result = run_command_locally(Process::new(owned_string_vec(&[ "/bin/bash", "-c", @@ -64,8 +60,6 @@ async fn stdout_and_stderr_and_exit_code() { #[tokio::test] #[cfg(unix)] async fn capture_exit_code_signal() { - WorkunitStore::setup_for_tests(); - // Launch a process that kills itself with a signal. let result = run_command_locally(Process::new(owned_string_vec(&[ "/bin/bash", @@ -85,8 +79,6 @@ async fn capture_exit_code_signal() { #[tokio::test] #[cfg(unix)] async fn env() { - WorkunitStore::setup_for_tests(); - let mut env: BTreeMap = BTreeMap::new(); env.insert("FOO".to_string(), "foo".to_string()); env.insert("BAR".to_string(), "not foo".to_string()); @@ -116,8 +108,6 @@ async fn env() { #[tokio::test] #[cfg(unix)] async fn env_is_deterministic() { - WorkunitStore::setup_for_tests(); - fn make_request() -> Process { let mut env = BTreeMap::new(); env.insert("FOO".to_string(), "foo".to_string()); @@ -133,8 +123,6 @@ async fn env_is_deterministic() { #[tokio::test] async fn binary_not_found() { - WorkunitStore::setup_for_tests(); - let err_string = run_command_locally(Process::new(owned_string_vec(&["echo", "-n", "foo"]))) .await .expect_err("Want Err"); @@ -144,8 +132,6 @@ async fn binary_not_found() { #[tokio::test] async fn output_files_none() { - WorkunitStore::setup_for_tests(); - let result = run_command_locally(Process::new(owned_string_vec(&[ &find_bash(), "-c", @@ -162,8 +148,6 @@ async fn output_files_none() { #[tokio::test] async fn output_files_one() { - WorkunitStore::setup_for_tests(); - let result = run_command_locally( Process::new(vec![ find_bash(), @@ -187,8 +171,6 @@ async fn output_files_one() { #[tokio::test] async fn output_dirs() { - WorkunitStore::setup_for_tests(); - let result = run_command_locally( Process::new(vec![ find_bash(), @@ -217,8 +199,6 @@ async fn output_dirs() { #[tokio::test] async fn output_files_many() { - WorkunitStore::setup_for_tests(); - let result = run_command_locally( Process::new(vec![ find_bash(), @@ -246,8 +226,6 @@ async fn output_files_many() { #[tokio::test] async fn output_files_execution_failure() { - WorkunitStore::setup_for_tests(); - let result = run_command_locally( Process::new(vec![ find_bash(), @@ -274,8 +252,6 @@ async fn output_files_execution_failure() { #[tokio::test] async fn output_files_partial_output() { - WorkunitStore::setup_for_tests(); - let result = run_command_locally( Process::new(vec![ find_bash(), @@ -303,8 +279,6 @@ async fn output_files_partial_output() { #[tokio::test] async fn output_overlapping_file_and_dir() { - WorkunitStore::setup_for_tests(); - let result = run_command_locally( Process::new(vec![ find_bash(), @@ -329,8 +303,6 @@ async fn output_overlapping_file_and_dir() { #[tokio::test] async fn append_only_cache_created() { - WorkunitStore::setup_for_tests(); - let name = "geo"; let dest = format!(".cache/{}", name); let cache_name = CacheName::new(name.to_owned()).unwrap(); @@ -351,8 +323,6 @@ async fn append_only_cache_created() { #[tokio::test] async fn jdk_symlink() { - WorkunitStore::setup_for_tests(); - let preserved_work_tmpdir = TempDir::new().unwrap(); let roland = TestData::roland().bytes(); std::fs::write( @@ -377,7 +347,7 @@ async fn jdk_symlink() { #[tokio::test] async fn test_directory_preservation() { - WorkunitStore::setup_for_tests(); + let (_, mut workunit) = WorkunitStore::setup_for_tests(); let preserved_work_tmpdir = TempDir::new().unwrap(); let preserved_work_root = preserved_work_tmpdir.path().to_owned(); @@ -414,6 +384,7 @@ async fn test_directory_preservation() { process, preserved_work_root.clone(), false, + &mut workunit, Some(store), Some(executor), ) @@ -457,7 +428,7 @@ async fn test_directory_preservation() { #[tokio::test] async fn test_directory_preservation_error() { - WorkunitStore::setup_for_tests(); + let (_, mut workunit) = WorkunitStore::setup_for_tests(); let preserved_work_tmpdir = TempDir::new().unwrap(); let preserved_work_root = preserved_work_tmpdir.path().to_owned(); @@ -469,6 +440,7 @@ async fn test_directory_preservation_error() { Process::new(vec!["doesnotexist".to_owned()]), preserved_work_root.clone(), false, + &mut workunit, None, None, ) @@ -482,8 +454,6 @@ async fn test_directory_preservation_error() { #[tokio::test] async fn all_containing_directories_for_outputs_are_created() { - WorkunitStore::setup_for_tests(); - let result = run_command_locally( Process::new(vec![ find_bash(), @@ -514,8 +484,6 @@ async fn all_containing_directories_for_outputs_are_created() { #[tokio::test] async fn output_empty_dir() { - WorkunitStore::setup_for_tests(); - let result = run_command_locally( Process::new(vec![ find_bash(), @@ -539,8 +507,6 @@ async fn output_empty_dir() { #[tokio::test] async fn timeout() { - WorkunitStore::setup_for_tests(); - let argv = vec![ find_bash(), "-c".to_owned(), @@ -561,7 +527,7 @@ async fn timeout() { #[tokio::test] async fn working_directory() { - WorkunitStore::setup_for_tests(); + let (_, mut workunit) = WorkunitStore::setup_for_tests(); let store_dir = TempDir::new().unwrap(); let executor = task_executor::Executor::new(); @@ -595,6 +561,7 @@ async fn working_directory() { process, work_dir.path().to_owned(), true, + &mut workunit, Some(store), Some(executor), ) @@ -612,15 +579,17 @@ async fn working_directory() { } async fn run_command_locally(req: Process) -> Result { + let (_, mut workunit) = WorkunitStore::setup_for_tests(); let work_dir = TempDir::new().unwrap(); let work_dir_path = work_dir.path().to_owned(); - run_command_locally_in_dir(req, work_dir_path, true, None, None).await + run_command_locally_in_dir(req, work_dir_path, true, &mut workunit, None, None).await } async fn run_command_locally_in_dir( req: Process, dir: PathBuf, cleanup: bool, + workunit: &mut RunningWorkunit, store: Option, executor: Option, ) -> Result { @@ -636,7 +605,7 @@ async fn run_command_locally_in_dir( NamedCaches::new(named_cache_dir.path().to_owned()), cleanup, ); - let original = runner.run(req.into(), Context::default()).await?; + let original = runner.run(Context::default(), workunit, req.into()).await?; let stdout_bytes: Vec = store .load_file_bytes_with(original.stdout_digest, |bytes| bytes.into()) .await? diff --git a/src/rust/engine/process_execution/src/remote.rs b/src/rust/engine/process_execution/src/remote.rs index b5ac557a2d9..4d64c32595d 100644 --- a/src/rust/engine/process_execution/src/remote.rs +++ b/src/rust/engine/process_execution/src/remote.rs @@ -731,8 +731,9 @@ impl crate::CommandRunner for CommandRunner { /// Run the given MultiPlatformProcess via the Remote Execution API. async fn run( &self, - request: MultiPlatformProcess, context: Context, + _workunit: &mut RunningWorkunit, + request: MultiPlatformProcess, ) -> Result { // Retrieve capabilities for this server. let capabilities = self.get_capabilities().await?; @@ -755,38 +756,21 @@ impl crate::CommandRunner for CommandRunner { let deadline_duration = self.overall_deadline + request.timeout.unwrap_or_default(); // Ensure the action and command are stored locally. - let (command_digest, action_digest) = in_workunit!( - context.workunit_store.clone(), - "ensure_action_stored_locally".to_owned(), - WorkunitMetadata { - level: Level::Trace, - desc: Some(format!("ensure action stored locally for {:?}", action)), - ..WorkunitMetadata::default() - }, - |_workunit| ensure_action_stored_locally(&self.store, &command, &action), - ) - .await?; + let (command_digest, action_digest) = + ensure_action_stored_locally(&self.store, &command, &action).await?; // Check the remote Action Cache to see if this request was already computed. // If so, return immediately with the result. let context2 = context.clone(); - let cached_response_opt = in_workunit!( - context.workunit_store.clone(), - "check_action_cache".to_owned(), - WorkunitMetadata { - level: Level::Trace, - desc: Some(format!("check action cache for {:?}", action_digest)), - ..WorkunitMetadata::default() - }, - |_workunit| check_action_cache( - action_digest, - &self.metadata, - self.platform, - &context2, - self.action_cache_client.clone(), - self.store.clone(), - false, - ), + let cached_response_opt = check_action_cache( + action_digest, + &command, + &self.metadata, + self.platform, + &context2, + self.action_cache_client.clone(), + self.store.clone(), + false, ) .await?; debug!( @@ -817,7 +801,10 @@ impl crate::CommandRunner for CommandRunner { context.workunit_store.clone(), "run_execute_request".to_owned(), WorkunitMetadata { - level: Level::Trace, + // NB: See engine::nodes::NodeKey::workunit_level for more information on why this workunit + // renders at the Process's level. + level: request.level, + desc: Some(request.description.clone()), ..WorkunitMetadata::default() }, |workunit| async move { @@ -1344,14 +1331,16 @@ fn apply_headers(mut request: Request, build_id: &str) -> Request { request } -/// Check the remote Action Cache for a cached result of running the given `action_digest`. +/// Check the remote Action Cache for a cached result of running the given `command` and the Action +/// with the given `action_digest`. /// -/// This check is necessary because some RE servers do not short-circuit the Execute method +/// This check is necessary because some REAPI servers do not short-circuit the Execute method /// by checking the Action Cache (e.g., BuildBarn). Thus, this client must check the cache /// explicitly in order to avoid duplicating already-cached work. This behavior matches /// the Bazel RE client. pub async fn check_action_cache( action_digest: Digest, + command: &Command, metadata: &ProcessMetadata, platform: Platform, context: &Context, @@ -1364,7 +1353,10 @@ pub async fn check_action_cache( "check_action_cache".to_owned(), WorkunitMetadata { level: Level::Trace, - desc: Some(format!("check action cache for {:?}", action_digest)), + desc: Some(format!( + "check action cache for {:?} ({:?})", + command, action_digest + )), ..WorkunitMetadata::default() }, |workunit| async move { @@ -1396,16 +1388,31 @@ pub async fn check_action_cache( let response = populate_fallible_execution_result(store.clone(), &action_result, platform, false) .await?; + // TODO: This should move inside the retry_call above, both in order to be retried, and + // to ensure that we increment a miss if we fail to eagerly fetch. if eager_fetch { - future::try_join_all(vec![ - store.ensure_local_has_file(response.stdout_digest).boxed(), - store.ensure_local_has_file(response.stderr_digest).boxed(), - store - .ensure_local_has_recursive_directory(response.output_directory) - .boxed(), - ]) + let response = response.clone(); + in_workunit!( + context.workunit_store.clone(), + "eager_fetch_action_cache".to_owned(), + WorkunitMetadata { + level: Level::Trace, + desc: Some("eagerly fetching after action cache hit".to_owned()), + ..WorkunitMetadata::default() + }, + |_workunit| async move { + future::try_join_all(vec![ + store.ensure_local_has_file(response.stdout_digest).boxed(), + store.ensure_local_has_file(response.stderr_digest).boxed(), + store + .ensure_local_has_recursive_directory(response.output_directory) + .boxed(), + ]) + .await + } + ) .await?; - }; + } workunit.increment_counter(Metric::RemoteCacheRequestsCached, 1); Ok(Some(response)) } diff --git a/src/rust/engine/process_execution/src/remote_cache.rs b/src/rust/engine/process_execution/src/remote_cache.rs index ae9acc993c1..f42c48b787d 100644 --- a/src/rust/engine/process_execution/src/remote_cache.rs +++ b/src/rust/engine/process_execution/src/remote_cache.rs @@ -17,12 +17,14 @@ use remexec::action_cache_client::ActionCacheClient; use remexec::{ActionResult, Command, FileNode, Tree}; use store::Store; use tonic::transport::Channel; -use workunit_store::{in_workunit, Level, Metric, ObservationMetric, WorkunitMetadata}; +use workunit_store::{ + in_workunit, Level, Metric, ObservationMetric, RunningWorkunit, WorkunitMetadata, +}; use crate::remote::make_execute_request; use crate::{ Context, FallibleProcessResultWithPlatform, MultiPlatformProcess, Platform, Process, - ProcessMetadata, RemoteCacheWarningsBehavior, + ProcessCacheScope, ProcessMetadata, RemoteCacheWarningsBehavior, }; use grpc_util::retry::status_is_retryable; @@ -343,13 +345,16 @@ impl CommandRunner { &self, context: Context, cache_lookup_start: Instant, + command: &Command, action_digest: Digest, + request: &Process, mut local_execution_future: BoxFuture<'_, Result>, ) -> Result<(FallibleProcessResultWithPlatform, bool), String> { // A future to read from the cache and log the results accordingly. let cache_read_future = async { let response = crate::remote::check_action_cache( action_digest, + command, &self.metadata, self.platform, &context, @@ -382,6 +387,7 @@ impl CommandRunner { "remote_cache_read_speculation".to_owned(), WorkunitMetadata { level: Level::Trace, + desc: Some(format!("Remote cache lookup: {}", request.description)), ..WorkunitMetadata::default() }, |workunit| async move { @@ -396,7 +402,18 @@ impl CommandRunner { context2 .workunit_store .record_observation(ObservationMetric::RemoteCacheTimeSavedMs, time_saved); - } + } + // When we successfully use the cache, we change the description and increase the level + // (but not so much that it will be logged by default). + workunit.update_metadata(|initial| WorkunitMetadata { + desc: initial + .desc + .as_ref() + .map(|desc| format!("Hit: {}", desc)), + level: Level::Debug, + ..initial + + }); Ok((cached_response, true)) } else { // Note that we don't increment a counter here, as there is nothing of note in this @@ -525,8 +542,9 @@ enum CacheErrorType { impl crate::CommandRunner for CommandRunner { async fn run( &self, - req: MultiPlatformProcess, context: Context, + workunit: &mut RunningWorkunit, + req: MultiPlatformProcess, ) -> Result { let cache_lookup_start = Instant::now(); // Construct the REv2 ExecuteRequest and related data for this execution request. @@ -535,35 +553,34 @@ impl crate::CommandRunner for CommandRunner { .ok_or_else(|| "No compatible Process found for checking remote cache.".to_owned())?; let (action, command, _execute_request) = make_execute_request(&request, self.metadata.clone())?; + let write_failures_to_cache = req + .0 + .values() + .any(|process| process.cache_scope == ProcessCacheScope::Always); // Ensure the action and command are stored locally. - let command2 = command.clone(); - let (command_digest, action_digest) = in_workunit!( - context.workunit_store.clone(), - "ensure_action_stored_locally".to_owned(), - WorkunitMetadata { - level: Level::Trace, - desc: Some(format!("ensure action stored locally for {:?}", action)), - ..WorkunitMetadata::default() - }, - |_workunit| crate::remote::ensure_action_stored_locally(&self.store, &command2, &action), - ) - .await?; + let (command_digest, action_digest) = + crate::remote::ensure_action_stored_locally(&self.store, &command, &action).await?; let (result, hit_cache) = if self.cache_read { self .speculate_read_action_cache( context.clone(), cache_lookup_start, + &command, action_digest, - self.underlying.run(req, context.clone()), + &request, + self.underlying.run(context.clone(), workunit, req), ) .await? } else { - (self.underlying.run(req, context.clone()).await?, false) + ( + self.underlying.run(context.clone(), workunit, req).await?, + false, + ) }; - if !hit_cache && result.exit_code == 0 && self.cache_write { + if !hit_cache && (result.exit_code == 0 || write_failures_to_cache) && self.cache_write { // NB: We use a distinct workunit for the start of the cache write so that we guarantee the // counter is recorded, given that the cache write is async and may still be executing after // the Pants session has finished and workunits are no longer processed. diff --git a/src/rust/engine/process_execution/src/remote_cache_tests.rs b/src/rust/engine/process_execution/src/remote_cache_tests.rs index 14a16ac6610..14e4430dde8 100644 --- a/src/rust/engine/process_execution/src/remote_cache_tests.rs +++ b/src/rust/engine/process_execution/src/remote_cache_tests.rs @@ -16,7 +16,7 @@ use store::Store; use tempfile::TempDir; use testutil::data::{TestData, TestDirectory, TestTree}; use tokio::time::sleep; -use workunit_store::WorkunitStore; +use workunit_store::{RunningWorkunit, WorkunitStore}; use crate::remote::{ensure_action_stored_locally, make_execute_request}; use crate::{ @@ -58,8 +58,9 @@ impl MockLocalCommandRunner { impl CommandRunnerTrait for MockLocalCommandRunner { async fn run( &self, - _req: MultiPlatformProcess, _context: Context, + _workunit: &mut RunningWorkunit, + _req: MultiPlatformProcess, ) -> Result { sleep(self.delay).await; self.call_counter.fetch_add(1, Ordering::SeqCst); @@ -180,7 +181,7 @@ fn insert_into_action_cache( #[tokio::test] async fn cache_read_success() { - WorkunitStore::setup_for_tests(); + let (_, mut workunit) = WorkunitStore::setup_for_tests(); let store_setup = StoreSetup::new(); let (local_runner, local_runner_call_counter) = create_local_runner(1, 1000); let (cache_runner, action_cache) = create_cached_runner(local_runner, &store_setup, 0, 0, false); @@ -190,7 +191,7 @@ async fn cache_read_success() { assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 0); let remote_result = cache_runner - .run(process.clone().into(), Context::default()) + .run(Context::default(), &mut workunit, process.clone().into()) .await .unwrap(); assert_eq!(remote_result.exit_code, 0); @@ -200,7 +201,7 @@ async fn cache_read_success() { /// If the cache has any issues during reads, we should gracefully fallback to the local runner. #[tokio::test] async fn cache_read_skipped_on_errors() { - WorkunitStore::setup_for_tests(); + let (_, mut workunit) = WorkunitStore::setup_for_tests(); let store_setup = StoreSetup::new(); let (local_runner, local_runner_call_counter) = create_local_runner(1, 100); let (cache_runner, action_cache) = create_cached_runner(local_runner, &store_setup, 0, 0, false); @@ -211,7 +212,7 @@ async fn cache_read_skipped_on_errors() { assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 0); let remote_result = cache_runner - .run(process.clone().into(), Context::default()) + .run(Context::default(), &mut workunit, process.clone().into()) .await .unwrap(); assert_eq!(remote_result.exit_code, 1); @@ -223,9 +224,9 @@ async fn cache_read_skipped_on_errors() { /// the cached result with its non-existent digests. #[tokio::test] async fn cache_read_eager_fetch() { - WorkunitStore::setup_for_tests(); + let (_, mut workunit) = WorkunitStore::setup_for_tests(); - async fn run_process(eager_fetch: bool) -> (i32, usize) { + async fn run_process(eager_fetch: bool, workunit: &mut RunningWorkunit) -> (i32, usize) { let store_setup = StoreSetup::new(); let (local_runner, local_runner_call_counter) = create_local_runner(1, 1000); let (cache_runner, action_cache) = @@ -242,7 +243,7 @@ async fn cache_read_eager_fetch() { assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 0); let remote_result = cache_runner - .run(process.clone().into(), Context::default()) + .run(Context::default(), workunit, process.clone().into()) .await .unwrap(); @@ -250,20 +251,25 @@ async fn cache_read_eager_fetch() { (remote_result.exit_code, final_local_count) } - let (lazy_exit_code, lazy_local_call_count) = run_process(false).await; + let (lazy_exit_code, lazy_local_call_count) = run_process(false, &mut workunit).await; assert_eq!(lazy_exit_code, 0); assert_eq!(lazy_local_call_count, 0); - let (eager_exit_code, eager_local_call_count) = run_process(true).await; + let (eager_exit_code, eager_local_call_count) = run_process(true, &mut workunit).await; assert_eq!(eager_exit_code, 1); assert_eq!(eager_local_call_count, 1); } #[tokio::test] async fn cache_read_speculation() { - WorkunitStore::setup_for_tests(); - - async fn run_process(local_delay_ms: u64, remote_delay_ms: u64, cache_hit: bool) -> (i32, usize) { + let (_, mut workunit) = WorkunitStore::setup_for_tests(); + + async fn run_process( + local_delay_ms: u64, + remote_delay_ms: u64, + cache_hit: bool, + workunit: &mut RunningWorkunit, + ) -> (i32, usize) { let store_setup = StoreSetup::new(); let (local_runner, local_runner_call_counter) = create_local_runner(1, local_delay_ms); let (cache_runner, action_cache) = @@ -276,7 +282,7 @@ async fn cache_read_speculation() { assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 0); let remote_result = cache_runner - .run(process.clone().into(), Context::default()) + .run(Context::default(), workunit, process.clone().into()) .await .unwrap(); @@ -285,24 +291,24 @@ async fn cache_read_speculation() { } // Case 1: remote is faster than local. - let (exit_code, local_call_count) = run_process(200, 0, true).await; + let (exit_code, local_call_count) = run_process(200, 0, true, &mut workunit).await; assert_eq!(exit_code, 0); assert_eq!(local_call_count, 0); // Case 2: local is faster than remote. - let (exit_code, local_call_count) = run_process(0, 200, true).await; + let (exit_code, local_call_count) = run_process(0, 200, true, &mut workunit).await; assert_eq!(exit_code, 1); assert_eq!(local_call_count, 1); // Case 3: the remote lookup wins, but there is no cache entry so we fallback to local execution. - let (exit_code, local_call_count) = run_process(200, 0, false).await; + let (exit_code, local_call_count) = run_process(200, 0, false, &mut workunit).await; assert_eq!(exit_code, 1); assert_eq!(local_call_count, 1); } #[tokio::test] async fn cache_write_success() { - WorkunitStore::setup_for_tests(); + let (_, mut workunit) = WorkunitStore::setup_for_tests(); let store_setup = StoreSetup::new(); let (local_runner, local_runner_call_counter) = create_local_runner(0, 100); let (cache_runner, action_cache) = create_cached_runner(local_runner, &store_setup, 0, 0, false); @@ -312,7 +318,7 @@ async fn cache_write_success() { assert!(action_cache.action_map.lock().is_empty()); let local_result = cache_runner - .run(process.clone().into(), Context::default()) + .run(Context::default(), &mut workunit, process.clone().into()) .await .unwrap(); assert_eq!(local_result.exit_code, 0); @@ -333,7 +339,7 @@ async fn cache_write_success() { #[tokio::test] async fn cache_write_not_for_failures() { - WorkunitStore::setup_for_tests(); + let (_, mut workunit) = WorkunitStore::setup_for_tests(); let store_setup = StoreSetup::new(); let (local_runner, local_runner_call_counter) = create_local_runner(1, 100); let (cache_runner, action_cache) = create_cached_runner(local_runner, &store_setup, 0, 0, false); @@ -343,7 +349,7 @@ async fn cache_write_not_for_failures() { assert!(action_cache.action_map.lock().is_empty()); let local_result = cache_runner - .run(process.clone().into(), Context::default()) + .run(Context::default(), &mut workunit, process.clone().into()) .await .unwrap(); assert_eq!(local_result.exit_code, 1); @@ -357,7 +363,7 @@ async fn cache_write_not_for_failures() { /// Cache writes should be async and not block the CommandRunner from returning. #[tokio::test] async fn cache_write_does_not_block() { - WorkunitStore::setup_for_tests(); + let (_, mut workunit) = WorkunitStore::setup_for_tests(); let store_setup = StoreSetup::new(); let (local_runner, local_runner_call_counter) = create_local_runner(0, 100); let (cache_runner, action_cache) = @@ -368,7 +374,7 @@ async fn cache_write_does_not_block() { assert!(action_cache.action_map.lock().is_empty()); let local_result = cache_runner - .run(process.clone().into(), Context::default()) + .run(Context::default(), &mut workunit, process.clone().into()) .await .unwrap(); assert_eq!(local_result.exit_code, 0); @@ -535,8 +541,9 @@ async fn make_action_result_basic() { impl CommandRunnerTrait for MockCommandRunner { async fn run( &self, - _req: MultiPlatformProcess, _context: Context, + _workunit: &mut RunningWorkunit, + _req: MultiPlatformProcess, ) -> Result { unimplemented!() } @@ -546,7 +553,6 @@ async fn make_action_result_basic() { } } - WorkunitStore::setup_for_tests(); let store_dir = TempDir::new().unwrap(); let executor = task_executor::Executor::new(); let store = Store::local_only(executor.clone(), store_dir.path()).unwrap(); diff --git a/src/rust/engine/process_execution/src/remote_tests.rs b/src/rust/engine/process_execution/src/remote_tests.rs index 7246f1e8f89..56ce19cb08e 100644 --- a/src/rust/engine/process_execution/src/remote_tests.rs +++ b/src/rust/engine/process_execution/src/remote_tests.rs @@ -18,7 +18,7 @@ use store::Store; use tempfile::TempDir; use testutil::data::{TestData, TestDirectory, TestTree}; use testutil::{owned_string_vec, relative_paths}; -use workunit_store::{WorkunitState, WorkunitStore}; +use workunit_store::WorkunitStore; use crate::remote::{digest, CommandRunner, ExecutionError, OperationOrStatus}; use crate::{ @@ -817,7 +817,7 @@ async fn server_sending_triggering_timeout_with_deadline_exceeded() { #[tokio::test] async fn sends_headers() { - WorkunitStore::setup_for_tests(); + let (_, mut workunit) = WorkunitStore::setup_for_tests(); let execute_request = echo_foo_request(); let op_name = "gimme-foo".to_string(); @@ -889,7 +889,7 @@ async fn sends_headers() { build_id: String::from("marmosets"), }; command_runner - .run(execute_request, context) + .run(context, &mut workunit, execute_request) .await .expect("Execution failed"); @@ -1208,7 +1208,6 @@ async fn initial_response_error() { #[tokio::test] async fn initial_response_missing_response_and_error() { - WorkunitStore::setup_for_tests(); let execute_request = echo_foo_request(); let mock_server = { @@ -1478,7 +1477,7 @@ async fn execute_missing_file_uploads_if_known() { #[tokio::test] async fn execute_missing_file_errors_if_unknown() { - WorkunitStore::setup_for_tests(); + let (_, mut workunit) = WorkunitStore::setup_for_tests(); let missing_digest = TestDirectory::containing_roland().digest(); let mock_server = { @@ -1530,7 +1529,7 @@ async fn execute_missing_file_errors_if_unknown() { .unwrap(); let error = runner - .run(cat_roland_request(), Context::default()) + .run(Context::default(), &mut workunit, cat_roland_request()) .await .expect_err("Want error"); assert_contains(&error, &format!("{}", missing_digest.hash)); @@ -1716,7 +1715,7 @@ async fn extract_execute_response_other_status() { #[tokio::test] async fn remote_workunits_are_stored() { - let mut workunit_store = WorkunitStore::setup_for_tests(); + let (mut workunit_store, _) = WorkunitStore::setup_for_tests(); let op_name = "gimme-foo".to_string(); let testdata = TestData::roland(); let testdata_empty = TestData::empty(); @@ -1739,48 +1738,19 @@ async fn remote_workunits_are_stored() { .await .unwrap(); - let got_workunit_items: HashSet<(String, WorkunitState)> = + let got_workunit_items: HashSet = workunit_store.with_latest_workunits(log::Level::Trace, |_, completed| { completed .iter() - .map(|workunit| (workunit.name.clone(), workunit.state.clone())) + .map(|workunit| workunit.name.clone()) .collect() }); - use concrete_time::Duration; - use concrete_time::TimeSpan; - let wanted_workunit_items = hashset! { - (String::from("remote execution action scheduling"), - WorkunitState::Completed { - time_span: TimeSpan { - start: Duration::new(0, 0), - duration: Duration::new(1, 0), - } - }, - ), - (String::from("remote execution worker input fetching"), - WorkunitState::Completed { - time_span: TimeSpan { - start: Duration::new(2, 0), - duration: Duration::new(1, 0), - } - }), - (String::from("remote execution worker command executing"), - WorkunitState::Completed { - time_span: TimeSpan { - start: Duration::new(4, 0), - duration: Duration::new(1, 0), - } - }), - (String::from("remote execution worker output uploading"), - WorkunitState::Completed { - time_span: TimeSpan { - start: Duration::new(6, 0), - duration: Duration::new(1, 0), - } - }), - + String::from("remote execution action scheduling"), + String::from("remote execution worker input fetching"), + String::from("remote execution worker command executing"), + String::from("remote execution worker output uploading"), }; assert!(got_workunit_items.is_superset(&wanted_workunit_items)); @@ -2189,7 +2159,10 @@ pub(crate) async fn run_cmd_runner( command_runner: R, store: Store, ) -> Result { - let original = command_runner.run(request, Context::default()).await?; + let (_, mut workunit) = WorkunitStore::setup_for_tests(); + let original = command_runner + .run(Context::default(), &mut workunit, request) + .await?; let stdout_bytes: Vec = store .load_file_bytes_with(original.stdout_digest, |bytes| bytes.into()) .await? @@ -2234,13 +2207,16 @@ async fn run_command_remote( address: String, request: MultiPlatformProcess, ) -> Result { + let (_, mut workunit) = WorkunitStore::setup_for_tests(); let cas = mock::StubCAS::builder() .file(&TestData::roland()) .directory(&TestDirectory::containing_roland()) .tree(&TestTree::roland_at_root()) .build(); let (command_runner, store) = create_command_runner(address, &cas, Platform::Linux); - let original = command_runner.run(request, Context::default()).await?; + let original = command_runner + .run(Context::default(), &mut workunit, request) + .await?; let stdout_bytes: Vec = store .load_file_bytes_with(original.stdout_digest, |bytes| bytes.into()) diff --git a/src/rust/engine/process_executor/src/main.rs b/src/rust/engine/process_executor/src/main.rs index e0d1e8b2285..3a1f69ced6d 100644 --- a/src/rust/engine/process_executor/src/main.rs +++ b/src/rust/engine/process_executor/src/main.rs @@ -43,7 +43,7 @@ use process_execution::{Context, NamedCaches, Platform, ProcessCacheScope, Proce use prost::Message; use store::{Store, StoreWrapper}; use structopt::StructOpt; -use workunit_store::WorkunitStore; +use workunit_store::{in_workunit, WorkunitMetadata, WorkunitStore}; #[derive(StructOpt)] struct CommandSpec { @@ -300,10 +300,18 @@ async fn main() { )) as Box, }; - let result = runner - .run(request.into(), Context::default()) - .await - .expect("Error executing"); + let result = in_workunit!( + workunit_store.clone(), + "process_executor".to_owned(), + WorkunitMetadata::default(), + |workunit| async move { + runner + .run(Context::default(), workunit, request.into()) + .await + } + ) + .await + .expect("Error executing"); if let Some(output) = args.materialize_output_to { store diff --git a/src/rust/engine/src/externs/interface.rs b/src/rust/engine/src/externs/interface.rs index 507856eb69b..613859cabe2 100644 --- a/src/rust/engine/src/externs/interface.rs +++ b/src/rust/engine/src/externs/interface.rs @@ -915,7 +915,7 @@ async fn workunit_to_py_value( } match workunit.state { - WorkunitState::Started { start_time } => { + WorkunitState::Started { start_time, .. } => { let duration = start_time .duration_since(UNIX_EPOCH) .unwrap_or_else(|_| Duration::default()); diff --git a/src/rust/engine/src/nodes.rs b/src/rust/engine/src/nodes.rs index 5f21bbf44af..8afb01bb676 100644 --- a/src/rust/engine/src/nodes.rs +++ b/src/rust/engine/src/nodes.rs @@ -11,7 +11,7 @@ use std::time::Duration; use std::{self, fmt}; use async_trait::async_trait; -use futures::future::{self, FutureExt, TryFutureExt}; +use futures::future::{self, BoxFuture, FutureExt, TryFutureExt}; use futures::stream::StreamExt; use url::Url; @@ -39,7 +39,7 @@ use reqwest::Error; use std::pin::Pin; use store::{self, StoreFileByDigest}; use workunit_store::{ - in_workunit, ArtifactOutput, Level, UserMetadataItem, UserMetadataPyValue, WorkunitMetadata, + in_workunit, Level, RunningWorkunit, UserMetadataItem, UserMetadataPyValue, WorkunitMetadata, }; pub type NodeResult = Result; @@ -87,7 +87,11 @@ impl StoreFileByDigest for Context { pub trait WrappedNode: Into { type Item: TryFrom; - async fn run_wrapped_node(self, context: Context) -> NodeResult; + async fn run_wrapped_node( + self, + context: Context, + _workunit: &mut RunningWorkunit, + ) -> NodeResult; } /// @@ -127,12 +131,12 @@ impl Select { Select::new(params, product, entry) } - async fn select_product( + fn select_product( &self, context: &Context, product: TypeId, caller_description: &str, - ) -> NodeResult { + ) -> BoxFuture> { let edges = context .core .rule_graph @@ -142,33 +146,32 @@ impl Select { "Tried to select product {} for {} but found no edges", product, caller_description )) - })?; + }); + let params = self.params.clone(); let context = context.clone(); - Select::new_from_edges(self.params.clone(), product, &edges) - .run_wrapped_node(context) - .await + async move { + let edges = edges?; + Select::new_from_edges(params, product, &edges) + .run(context) + .await + } + .boxed() } -} - -// TODO: This is a Node only because it is used as a root in the graph, but it should never be -// requested using context.get -#[async_trait] -impl WrappedNode for Select { - type Item = Value; - async fn run_wrapped_node(self, context: Context) -> NodeResult { + async fn run(self, context: Context) -> NodeResult { match &self.entry { &rule_graph::Entry::WithDeps(rule_graph::EntryWithDeps::Inner(ref inner)) => { match inner.rule() { - &tasks::Rule::Task(ref task) => context - .get(Task { - params: self.params.clone(), - product: self.product, - task: task.clone(), - entry: Arc::new(self.entry.clone()), - }) - .await - .map(|output| output.value), + &tasks::Rule::Task(ref task) => { + context + .get(Task { + params: self.params.clone(), + product: self.product, + task: task.clone(), + entry: Arc::new(self.entry.clone()), + }) + .await + } &Rule::Intrinsic(ref intrinsic) => { let intrinsic = intrinsic.clone(); let values = future::try_join_all( @@ -204,6 +207,26 @@ impl WrappedNode for Select { } } +/// +/// NB: This is a Node so that it can be used as a root in the graph, but it should otherwise +/// never be requested as a Node using context.get. Select is a thin proxy to other Node types +/// (which it requests using context.get), and memoizing it would be redundant. +/// +/// Instead, use `Select::run` to run the Select logic without memoizing it. +/// +#[async_trait] +impl WrappedNode for Select { + type Item = Value; + + async fn run_wrapped_node( + self, + context: Context, + _workunit: &mut RunningWorkunit, + ) -> NodeResult { + self.run(context).await + } +} + impl From