diff --git a/src/rust/engine/async_semaphore/src/lib.rs b/src/rust/engine/async_semaphore/src/lib.rs index 82b5731a61c..e50b359e0b8 100644 --- a/src/rust/engine/async_semaphore/src/lib.rs +++ b/src/rust/engine/async_semaphore/src/lib.rs @@ -77,7 +77,7 @@ impl AsyncSemaphore { res } - async fn acquire(&self) -> Permit<'_> { + pub async fn acquire(&self) -> Permit<'_> { let permit = self.inner.sema.acquire().await.expect("semaphore closed"); let id = { let mut available_ids = self.inner.available_ids.lock(); @@ -100,6 +100,12 @@ pub struct Permit<'a> { id: usize, } +impl Permit<'_> { + pub fn concurrency_slot(&self) -> usize { + self.id + } +} + impl<'a> Drop for Permit<'a> { fn drop(&mut self) { let mut available_ids = self.inner.available_ids.lock(); diff --git a/src/rust/engine/process_execution/src/lib.rs b/src/rust/engine/process_execution/src/lib.rs index 8120551491d..6181f5c2127 100644 --- a/src/rust/engine/process_execution/src/lib.rs +++ b/src/rust/engine/process_execution/src/lib.rs @@ -43,7 +43,7 @@ use bazel_protos::gen::build::bazel::remote::execution::v2 as remexec; use hashing::{Digest, EMPTY_FINGERPRINT}; use remexec::ExecutedActionMetadata; use serde::{Deserialize, Serialize}; -use workunit_store::{RunningWorkunit, WorkunitStore}; +use workunit_store::{in_workunit, RunningWorkunit, WorkunitMetadata, WorkunitStore}; pub mod cache; #[cfg(test)] @@ -586,30 +586,37 @@ impl CommandRunner for BoundedCommandRunner { workunit: &mut RunningWorkunit, mut req: MultiPlatformProcess, ) -> Result { - let semaphore = self.inner.1.clone(); - let inner = self.inner.clone(); - let blocking_token = workunit.blocking(); - semaphore - .with_acquired(|concurrency_id| { - log::debug!( - "Running {} under semaphore with concurrency id: {}", - req.user_facing_name(), - concurrency_id + let semaphore_acquisition = self.inner.1.acquire(); + let permit = in_workunit!( + context.workunit_store.clone(), + "acquire_command_runner_slot".to_owned(), + WorkunitMetadata { + level: Level::Trace, + ..WorkunitMetadata::default() + }, + |workunit| async move { + let _blocking_token = workunit.blocking(); + semaphore_acquisition.await + } + ) + .await; + + log::debug!( + "Running {} under semaphore with concurrency id: {}", + req.user_facing_name(), + permit.concurrency_slot() + ); + + for (_, process) in req.0.iter_mut() { + if let Some(ref execution_slot_env_var) = process.execution_slot_variable { + process.env.insert( + execution_slot_env_var.clone(), + format!("{}", permit.concurrency_slot()), ); - std::mem::drop(blocking_token); - - for (_, process) in req.0.iter_mut() { - if let Some(ref execution_slot_env_var) = process.execution_slot_variable { - let execution_slot = format!("{}", concurrency_id); - process - .env - .insert(execution_slot_env_var.clone(), execution_slot); - } - } - - inner.0.run(context, workunit, req) - }) - .await + } + } + + self.inner.0.run(context, workunit, req).await } fn extract_compatible_request(&self, req: &MultiPlatformProcess) -> Option { diff --git a/src/rust/engine/process_execution/src/remote.rs b/src/rust/engine/process_execution/src/remote.rs index 32eb7ca419e..483bd7487aa 100644 --- a/src/rust/engine/process_execution/src/remote.rs +++ b/src/rust/engine/process_execution/src/remote.rs @@ -762,7 +762,7 @@ impl crate::CommandRunner for CommandRunner { let context2 = context.clone(); let cached_response_opt = check_action_cache( action_digest, - &command, + &request.description, &self.metadata, self.platform, &context2, @@ -1337,7 +1337,7 @@ fn apply_headers(mut request: Request, build_id: &str) -> Request { /// the Bazel RE client. pub async fn check_action_cache( action_digest: Digest, - command: &Command, + command_description: &str, metadata: &ProcessMetadata, platform: Platform, context: &Context, @@ -1349,11 +1349,8 @@ pub async fn check_action_cache( context.workunit_store.clone(), "check_action_cache".to_owned(), WorkunitMetadata { - level: Level::Trace, - desc: Some(format!( - "check action cache for {:?} ({:?})", - command, action_digest - )), + level: Level::Debug, + desc: Some(format!("Remote cache lookup for: {}", command_description)), ..WorkunitMetadata::default() }, |workunit| async move { diff --git a/src/rust/engine/process_execution/src/remote_cache.rs b/src/rust/engine/process_execution/src/remote_cache.rs index 262e0af2fc4..054f56e88a4 100644 --- a/src/rust/engine/process_execution/src/remote_cache.rs +++ b/src/rust/engine/process_execution/src/remote_cache.rs @@ -349,7 +349,6 @@ impl CommandRunner { &self, context: Context, cache_lookup_start: Instant, - command: &Command, action_digest: Digest, request: &Process, mut local_execution_future: BoxFuture<'_, Result>, @@ -358,7 +357,7 @@ impl CommandRunner { let cache_read_future = async { let response = crate::remote::check_action_cache( action_digest, - command, + &request.description, &self.metadata, self.platform, &context, @@ -391,7 +390,6 @@ impl CommandRunner { "remote_cache_read_speculation".to_owned(), WorkunitMetadata { level: Level::Trace, - desc: Some(format!("Remote cache lookup: {}", request.description)), ..WorkunitMetadata::default() }, |workunit| async move { @@ -562,7 +560,6 @@ impl crate::CommandRunner for CommandRunner { .speculate_read_action_cache( context.clone(), cache_lookup_start, - &command, action_digest, &request, self.underlying.run(context.clone(), workunit, req), diff --git a/src/rust/engine/workunit_store/src/tests.rs b/src/rust/engine/workunit_store/src/tests.rs index 8088dc03233..fa5e622f5ab 100644 --- a/src/rust/engine/workunit_store/src/tests.rs +++ b/src/rust/engine/workunit_store/src/tests.rs @@ -23,6 +23,13 @@ fn heavy_hitters_only_running() { ); } +#[test] +fn heavy_hitters_blocked_path() { + // Test that a chain of blocked workunits do not cause their parents to be rendered. + let ws = create_store(vec![wu_root(0)], vec![wu(1, 0), wu(2, 1)], vec![]); + assert!(ws.heavy_hitters(1).is_empty()); +} + #[test] fn straggling_workunits_basic() { let ws = create_store(vec![wu_root(0), wu(1, 0)], vec![], vec![]); @@ -36,12 +43,19 @@ fn straggling_workunits_basic() { } #[test] -fn straggling_workunits_blocked() { - // Test that a blocked leaf is not eligible to be rendered. +fn straggling_workunits_blocked_leaf() { + // Test that a blocked leaf does not cause its parents to be rendered. let ws = create_store(vec![wu_root(0)], vec![wu(1, 0)], vec![]); assert!(ws.straggling_workunits(Duration::from_secs(0)).is_empty()); } +#[test] +fn straggling_workunits_blocked_path() { + // Test that a chain of blocked workunits do not cause their parents to be rendered. + let ws = create_store(vec![wu_root(0)], vec![wu(1, 0), wu(2, 1)], vec![]); + assert!(ws.straggling_workunits(Duration::from_secs(0)).is_empty()); +} + #[test] fn workunit_span_id_has_16_digits_len_hex_format() { let number: u64 = 1;