Skip to content

Commit

Permalink
Render the MultiPlatformExecuteProcess node's workunit rather than th…
Browse files Browse the repository at this point in the history
…e BoundedCommandRunner's.

[ci skip-build-wheels]
  • Loading branch information
stuhood committed Jul 16, 2021
1 parent 43cd242 commit d23f0eb
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 63 deletions.
107 changes: 44 additions & 63 deletions src/rust/engine/process_execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@ use bazel_protos::gen::build::bazel::remote::execution::v2 as remexec;
use hashing::{Digest, EMPTY_FINGERPRINT};
use remexec::ExecutedActionMetadata;
use serde::{Deserialize, Serialize};
use workunit_store::{
in_workunit, RunningWorkunit, UserMetadataItem, WorkunitMetadata, WorkunitStore,
};
use workunit_store::{RunningWorkunit, UserMetadataItem, WorkunitMetadata, WorkunitStore};

pub mod cache;
#[cfg(test)]
Expand Down Expand Up @@ -537,69 +535,52 @@ impl CommandRunner for BoundedCommandRunner {
async fn run(
&self,
context: Context,
// TODO
_workunit: &mut RunningWorkunit,
workunit: &mut RunningWorkunit,
mut req: MultiPlatformProcess,
) -> Result<FallibleProcessResultWithPlatform, String> {
let name = format!("{}-running", req.workunit_name());
let desc = req.user_facing_name();
let metadata = WorkunitMetadata {
level: req.workunit_level(),
desc: Some(desc.clone()),
..WorkunitMetadata::default()
};

in_workunit!(
context.workunit_store.clone(),
name,
metadata,
|workunit| async move {
let semaphore = self.inner.1.clone();
let inner = self.inner.clone();
let blocking_token = workunit.blocking();
let res: (Result<_, _>) = semaphore
.with_acquired(|concurrency_id| {
log::debug!(
"Running {} under semaphore with concurrency id: {}",
desc,
concurrency_id
);
std::mem::drop(blocking_token);

for (_, process) in req.0.iter_mut() {
if let Some(ref execution_slot_env_var) = process.execution_slot_variable {
let execution_slot = format!("{}", concurrency_id);
process
.env
.insert(execution_slot_env_var.clone(), execution_slot);
}
}

inner.0.run(context, workunit, req)
})
.await;

if let Ok(FallibleProcessResultWithPlatform {
stdout_digest,
stderr_digest,
exit_code,
..
}) = res
{
workunit.update_metadata(|initial| WorkunitMetadata {
stdout: Some(stdout_digest),
stderr: Some(stderr_digest),
user_metadata: vec![(
"exit_code".to_string(),
UserMetadataItem::ImmediateId(exit_code as i64),
)],
..initial
})
let semaphore = self.inner.1.clone();
let inner = self.inner.clone();
let blocking_token = workunit.blocking();
let res: (Result<_, _>) = semaphore
.with_acquired(|concurrency_id| {
log::debug!(
"Running {} under semaphore with concurrency id: {}",
req.user_facing_name(),
concurrency_id
);
std::mem::drop(blocking_token);

for (_, process) in req.0.iter_mut() {
if let Some(ref execution_slot_env_var) = process.execution_slot_variable {
let execution_slot = format!("{}", concurrency_id);
process
.env
.insert(execution_slot_env_var.clone(), execution_slot);
}
}
res
}
)
.await

inner.0.run(context, workunit, req)
})
.await;

if let Ok(FallibleProcessResultWithPlatform {
stdout_digest,
stderr_digest,
exit_code,
..
}) = res
{
workunit.update_metadata(|initial| WorkunitMetadata {
stdout: Some(stdout_digest),
stderr: Some(stderr_digest),
user_metadata: vec![(
"exit_code".to_string(),
UserMetadataItem::ImmediateId(exit_code as i64),
)],
..initial
})
}
res
}

fn extract_compatible_request(&self, req: &MultiPlatformProcess) -> Option<Process> {
Expand Down
1 change: 1 addition & 0 deletions src/rust/engine/src/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1257,6 +1257,7 @@ impl NodeKey {
fn workunit_level(&self) -> Level {
match self {
NodeKey::Task(ref task) => task.task.display_info.level,
NodeKey::MultiPlatformExecuteProcess(ref mpp) => mpp.process.workunit_level(),
NodeKey::DownloadedFile(..) => Level::Debug,
_ => Level::Trace,
}
Expand Down

0 comments on commit d23f0eb

Please sign in to comment.