From d23f0eb344360c317609285a3e433a5032c9f49f Mon Sep 17 00:00:00 2001 From: Stu Hood Date: Fri, 16 Jul 2021 15:02:55 -0700 Subject: [PATCH] Render the MultiPlatformExecuteProcess node's workunit rather than the BoundedCommandRunner's. [ci skip-build-wheels] --- src/rust/engine/process_execution/src/lib.rs | 107 ++++++++----------- src/rust/engine/src/nodes.rs | 1 + 2 files changed, 45 insertions(+), 63 deletions(-) diff --git a/src/rust/engine/process_execution/src/lib.rs b/src/rust/engine/process_execution/src/lib.rs index d189205d9414..b861ff46408c 100644 --- a/src/rust/engine/process_execution/src/lib.rs +++ b/src/rust/engine/process_execution/src/lib.rs @@ -43,9 +43,7 @@ use bazel_protos::gen::build::bazel::remote::execution::v2 as remexec; use hashing::{Digest, EMPTY_FINGERPRINT}; use remexec::ExecutedActionMetadata; use serde::{Deserialize, Serialize}; -use workunit_store::{ - in_workunit, RunningWorkunit, UserMetadataItem, WorkunitMetadata, WorkunitStore, -}; +use workunit_store::{RunningWorkunit, UserMetadataItem, WorkunitMetadata, WorkunitStore}; pub mod cache; #[cfg(test)] @@ -537,69 +535,52 @@ impl CommandRunner for BoundedCommandRunner { async fn run( &self, context: Context, - // TODO - _workunit: &mut RunningWorkunit, + workunit: &mut RunningWorkunit, mut req: MultiPlatformProcess, ) -> Result { - let name = format!("{}-running", req.workunit_name()); - let desc = req.user_facing_name(); - let metadata = WorkunitMetadata { - level: req.workunit_level(), - desc: Some(desc.clone()), - ..WorkunitMetadata::default() - }; - - in_workunit!( - context.workunit_store.clone(), - name, - metadata, - |workunit| async move { - let semaphore = self.inner.1.clone(); - let inner = self.inner.clone(); - let blocking_token = workunit.blocking(); - let res: (Result<_, _>) = semaphore - .with_acquired(|concurrency_id| { - log::debug!( - "Running {} under semaphore with concurrency id: {}", - desc, - concurrency_id - ); - std::mem::drop(blocking_token); - - for (_, process) in req.0.iter_mut() { - if let Some(ref execution_slot_env_var) = process.execution_slot_variable { - let execution_slot = format!("{}", concurrency_id); - process - .env - .insert(execution_slot_env_var.clone(), execution_slot); - } - } - - inner.0.run(context, workunit, req) - }) - .await; - - if let Ok(FallibleProcessResultWithPlatform { - stdout_digest, - stderr_digest, - exit_code, - .. - }) = res - { - workunit.update_metadata(|initial| WorkunitMetadata { - stdout: Some(stdout_digest), - stderr: Some(stderr_digest), - user_metadata: vec![( - "exit_code".to_string(), - UserMetadataItem::ImmediateId(exit_code as i64), - )], - ..initial - }) + let semaphore = self.inner.1.clone(); + let inner = self.inner.clone(); + let blocking_token = workunit.blocking(); + let res: (Result<_, _>) = semaphore + .with_acquired(|concurrency_id| { + log::debug!( + "Running {} under semaphore with concurrency id: {}", + req.user_facing_name(), + concurrency_id + ); + std::mem::drop(blocking_token); + + for (_, process) in req.0.iter_mut() { + if let Some(ref execution_slot_env_var) = process.execution_slot_variable { + let execution_slot = format!("{}", concurrency_id); + process + .env + .insert(execution_slot_env_var.clone(), execution_slot); + } } - res - } - ) - .await + + inner.0.run(context, workunit, req) + }) + .await; + + if let Ok(FallibleProcessResultWithPlatform { + stdout_digest, + stderr_digest, + exit_code, + .. + }) = res + { + workunit.update_metadata(|initial| WorkunitMetadata { + stdout: Some(stdout_digest), + stderr: Some(stderr_digest), + user_metadata: vec![( + "exit_code".to_string(), + UserMetadataItem::ImmediateId(exit_code as i64), + )], + ..initial + }) + } + res } fn extract_compatible_request(&self, req: &MultiPlatformProcess) -> Option { diff --git a/src/rust/engine/src/nodes.rs b/src/rust/engine/src/nodes.rs index 877ba702063a..3f8eeb83f859 100644 --- a/src/rust/engine/src/nodes.rs +++ b/src/rust/engine/src/nodes.rs @@ -1257,6 +1257,7 @@ impl NodeKey { fn workunit_level(&self) -> Level { match self { NodeKey::Task(ref task) => task.task.display_info.level, + NodeKey::MultiPlatformExecuteProcess(ref mpp) => mpp.process.workunit_level(), NodeKey::DownloadedFile(..) => Level::Debug, _ => Level::Trace, }