Skip to content

Commit

Permalink
Mark workunits blocked, and skip rendering completed workunits (#12369)…
Browse files Browse the repository at this point in the history
… (#12376)

Our "graph" of execution is a DAG, while the workunits (used to visualize and record traces) form a tree. Because they form a tree, workunits do not always report that they are blocked on work that they didn't start, but which some other node started (common when lots of @rules are waiting on another single @rule which only one of them started).

To fix this, we make the `blocked` property of a workunit an atomic mutable, and skip rendering the parents of blocked leaves. We use the `blocked` flag both for `Tasks` (which wait directly for memoized Nodes, and so frequently block in this way), and in `BoundedCommandRunner`, which temporarily blocks the workunit while we're acquiring the semaphore. Additionally, we skip rendering or walking through `Completed` workunits, which can happen in the case of speculation if a parent workunit completes before a child.

In order to toggle the `blocked` property on workunits, we expose the current `RunningWorkunit` in two new places: the `CommandRunner` and `WrappedNode`. In both cases, this is to allow the generic code to consume the workunit created by their callers and mark it blocked (for `Task` and `BoundedCommandRunner`).

Fixes #12349.

[ci skip-build-wheels]
  • Loading branch information
stuhood authored Jul 20, 2021
1 parent 790c206 commit eadca88
Show file tree
Hide file tree
Showing 17 changed files with 735 additions and 531 deletions.
21 changes: 9 additions & 12 deletions src/python/pants/engine/internals/engine_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from pants.engine.internals.scheduler import ExecutionError, SchedulerSession
from pants.engine.internals.scheduler_test_base import SchedulerTestBase
from pants.engine.platform import rules as platform_rules
from pants.engine.process import MultiPlatformProcess, Process, ProcessResult
from pants.engine.process import MultiPlatformProcess, Process, ProcessCacheScope, ProcessResult
from pants.engine.process import rules as process_rules
from pants.engine.rules import Get, MultiGet, rule
from pants.engine.streaming_workunit_handler import (
Expand Down Expand Up @@ -609,6 +609,7 @@ async def a_rule() -> TrueResult:
proc = Process(
["/bin/sh", "-c", "true"],
description="always true",
cache_scope=ProcessCacheScope.PER_SESSION,
)
_ = await Get(ProcessResult, MultiPlatformProcess({None: proc}))
return TrueResult()
Expand All @@ -624,9 +625,9 @@ async def a_rule() -> TrueResult:

finished = list(itertools.chain.from_iterable(tracker.finished_workunit_chunks))
workunits_with_counters = [item for item in finished if "counters" in item]
assert workunits_with_counters[0]["counters"]["local_execution_requests"] == 1
assert workunits_with_counters[1]["counters"]["local_cache_requests"] == 1
assert workunits_with_counters[1]["counters"]["local_cache_requests_uncached"] == 1
assert workunits_with_counters[0]["counters"]["local_cache_requests"] == 1
assert workunits_with_counters[0]["counters"]["local_cache_requests_uncached"] == 1
assert workunits_with_counters[1]["counters"]["local_execution_requests"] == 1

assert histograms_info["version"] == 0
assert "histograms" in histograms_info
Expand Down Expand Up @@ -736,7 +737,7 @@ def test_process_digests_on_streaming_workunits(
run_tracker=run_tracker,
callbacks=[tracker],
report_interval_seconds=0.01,
max_workunit_verbosity=LogLevel.INFO,
max_workunit_verbosity=LogLevel.DEBUG,
specs=Specs.empty(),
options_bootstrapper=create_options_bootstrapper([]),
pantsd=False,
Expand All @@ -752,9 +753,7 @@ def test_process_digests_on_streaming_workunits(
assert tracker.finished
finished = list(itertools.chain.from_iterable(tracker.finished_workunit_chunks))

process_workunit = next(
item for item in finished if item["name"] == "multi_platform_process-running"
)
process_workunit = next(item for item in finished if item["name"] == "multi_platform_process")
assert process_workunit is not None
stdout_digest = process_workunit["artifacts"]["stdout_digest"]
stderr_digest = process_workunit["artifacts"]["stderr_digest"]
Expand All @@ -769,7 +768,7 @@ def test_process_digests_on_streaming_workunits(
run_tracker=run_tracker,
callbacks=[tracker],
report_interval_seconds=0.01,
max_workunit_verbosity=LogLevel.INFO,
max_workunit_verbosity=LogLevel.DEBUG,
specs=Specs.empty(),
options_bootstrapper=create_options_bootstrapper([]),
pantsd=False,
Expand All @@ -782,9 +781,7 @@ def test_process_digests_on_streaming_workunits(

assert tracker.finished
finished = list(itertools.chain.from_iterable(tracker.finished_workunit_chunks))
process_workunit = next(
item for item in finished if item["name"] == "multi_platform_process-running"
)
process_workunit = next(item for item in finished if item["name"] == "multi_platform_process")

assert process_workunit is not None
stdout_digest = process_workunit["artifacts"]["stdout_digest"]
Expand Down
4 changes: 2 additions & 2 deletions src/rust/engine/async_semaphore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ impl AsyncSemaphore {
///
pub async fn with_acquired<F, B, O>(self, f: F) -> O
where
F: FnOnce(usize) -> B + Send + 'static,
B: Future<Output = O> + Send + 'static,
F: FnOnce(usize) -> B,
B: Future<Output = O>,
{
let permit = self.acquire().await;
let res = f(permit.id).await;
Expand Down
82 changes: 58 additions & 24 deletions src/rust/engine/process_execution/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ use prost::Message;
use serde::{Deserialize, Serialize};
use sharded_lmdb::ShardedLmdb;
use store::Store;
use workunit_store::{in_workunit, Level, Metric, ObservationMetric, WorkunitMetadata};
use workunit_store::{
in_workunit, Level, Metric, ObservationMetric, RunningWorkunit, WorkunitMetadata,
};

use crate::{
Context, FallibleProcessResultWithPlatform, MultiPlatformProcess, Platform, Process,
Expand Down Expand Up @@ -57,42 +59,49 @@ impl crate::CommandRunner for CommandRunner {

async fn run(
&self,
req: MultiPlatformProcess,
context: Context,
workunit: &mut RunningWorkunit,
req: MultiPlatformProcess,
) -> Result<FallibleProcessResultWithPlatform, String> {
let cache_lookup_start = Instant::now();
let write_failures_to_cache = req
.0
.values()
.any(|process| process.cache_scope == ProcessCacheScope::Always);
let digest = crate::digest(req.clone(), &self.metadata);
let key = digest.hash;

let context2 = context.clone();
in_workunit!(
context2.workunit_store,
let cache_read_result = in_workunit!(
context.workunit_store.clone(),
"local_cache_read".to_owned(),
WorkunitMetadata {
level: Level::Trace,
desc: Some(format!("Local cache lookup: {}", req.user_facing_name())),
..WorkunitMetadata::default()
},
|workunit| async move {
workunit.increment_counter(Metric::LocalCacheRequests, 1);

let digest = crate::digest(req.clone(), &self.metadata);
let key = digest.hash;

let cache_failures = req
.0
.values()
.any(|process| process.cache_scope == ProcessCacheScope::Always);

let command_runner = self.clone();
match self.lookup(key).await {
Ok(Some(result)) if result.exit_code == 0 || cache_failures => {
Ok(Some(result)) if result.exit_code == 0 || write_failures_to_cache => {
let lookup_elapsed = cache_lookup_start.elapsed();
workunit.increment_counter(Metric::LocalCacheRequestsCached, 1);
if let Some(time_saved) = result.metadata.time_saved_from_cache(lookup_elapsed) {
let time_saved = time_saved.as_millis() as u64;
workunit.increment_counter(Metric::LocalCacheTotalTimeSavedMs, time_saved);
context
context2
.workunit_store
.record_observation(ObservationMetric::LocalCacheTimeSavedMs, time_saved);
}
return Ok(result);
// When we successfully use the cache, we change the description and increase the level
// (but not so much that it will be logged by default).
workunit.update_metadata(|initial| WorkunitMetadata {
desc: initial.desc.as_ref().map(|desc| format!("Hit: {}", desc)),
level: Level::Debug,
..initial
});
Ok(result)
}
Err(err) => {
debug!(
Expand All @@ -101,29 +110,54 @@ impl crate::CommandRunner for CommandRunner {
);
workunit.increment_counter(Metric::LocalCacheReadErrors, 1);
// Falling through to re-execute.
Err(())
}
Ok(_) => {
// Either we missed, or we hit for a failing result.
workunit.increment_counter(Metric::LocalCacheRequestsUncached, 1);
// Falling through to execute.
Err(())
}
}
}
.boxed()
)
.await;

if let Ok(result) = cache_read_result {
workunit.update_metadata(|initial| WorkunitMetadata {
desc: initial
.desc
.as_ref()
.map(|desc| format!("Hit local cache: {}", desc)),
..initial
});
return Ok(result);
}

let result = command_runner.underlying.run(req, context.clone()).await?;
if result.exit_code == 0 || cache_failures {
if let Err(err) = command_runner.store(key, &result).await {
let result = self.underlying.run(context.clone(), workunit, req).await?;
if result.exit_code == 0 || write_failures_to_cache {
let result = result.clone();
in_workunit!(
context.workunit_store.clone(),
"local_cache_write".to_owned(),
WorkunitMetadata {
level: Level::Trace,
..WorkunitMetadata::default()
},
|workunit| async move {
if let Err(err) = self.store(key, &result).await {
warn!(
"Error storing process execution result to local cache: {} - ignoring and continuing",
err
);
workunit.increment_counter(Metric::LocalCacheWriteErrors, 1);
}
}
Ok(result)
}
.boxed()
)
.await
)
.await;
}
Ok(result)
}
}

Expand Down
28 changes: 16 additions & 12 deletions src/rust/engine/process_execution/src/cache_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use store::Store;
use tempfile::TempDir;
use testutil::data::TestData;
use testutil::relative_paths;
use workunit_store::WorkunitStore;
use workunit_store::{RunningWorkunit, WorkunitStore};

use crate::{
CommandRunner as CommandRunnerTrait, Context, FallibleProcessResultWithPlatform, NamedCaches,
Expand Down Expand Up @@ -85,16 +85,18 @@ fn create_script(script_exit_code: i8) -> (Process, PathBuf, TempDir) {
(process, script_path, script_dir)
}

async fn run_roundtrip(script_exit_code: i8) -> RoundtripResults {
async fn run_roundtrip(script_exit_code: i8, workunit: &mut RunningWorkunit) -> RoundtripResults {
let (local, store, _local_runner_dir) = create_local_runner();
let (process, script_path, _script_dir) = create_script(script_exit_code);

let local_result = local.run(process.clone().into(), Context::default()).await;
let local_result = local
.run(Context::default(), workunit, process.clone().into())
.await;

let (caching, _cache_dir) = create_cached_runner(local, store.clone());

let uncached_result = caching
.run(process.clone().into(), Context::default())
.run(Context::default(), workunit, process.clone().into())
.await;

assert_eq!(local_result, uncached_result);
Expand All @@ -103,7 +105,9 @@ async fn run_roundtrip(script_exit_code: i8) -> RoundtripResults {
// fail due to a FileNotFound error. So, If the second run succeeds, that implies that the
// cache was successfully used.
std::fs::remove_file(&script_path).unwrap();
let maybe_cached_result = caching.run(process.into(), Context::default()).await;
let maybe_cached_result = caching
.run(Context::default(), workunit, process.into())
.await;

RoundtripResults {
uncached: uncached_result,
Expand All @@ -113,31 +117,31 @@ async fn run_roundtrip(script_exit_code: i8) -> RoundtripResults {

#[tokio::test]
async fn cache_success() {
WorkunitStore::setup_for_tests();
let results = run_roundtrip(0).await;
let (_, mut workunit) = WorkunitStore::setup_for_tests();
let results = run_roundtrip(0, &mut workunit).await;
assert_eq!(results.uncached, results.maybe_cached);
}

#[tokio::test]
async fn failures_not_cached() {
WorkunitStore::setup_for_tests();
let results = run_roundtrip(1).await;
let (_, mut workunit) = WorkunitStore::setup_for_tests();
let results = run_roundtrip(1, &mut workunit).await;
assert_ne!(results.uncached, results.maybe_cached);
assert_eq!(results.uncached.unwrap().exit_code, 1);
assert_eq!(results.maybe_cached.unwrap().exit_code, 127); // aka the return code for file not found
}

#[tokio::test]
async fn recover_from_missing_store_contents() {
WorkunitStore::setup_for_tests();
let (_, mut workunit) = WorkunitStore::setup_for_tests();

let (local, store, _local_runner_dir) = create_local_runner();
let (caching, _cache_dir) = create_cached_runner(local, store.clone());
let (process, _script_path, _script_dir) = create_script(0);

// Run once to cache the process.
let first_result = caching
.run(process.clone().into(), Context::default())
.run(Context::default(), &mut workunit, process.clone().into())
.await
.unwrap();

Expand Down Expand Up @@ -170,7 +174,7 @@ async fn recover_from_missing_store_contents() {

// Ensure that we don't fail if we re-run.
let second_result = caching
.run(process.clone().into(), Context::default())
.run(Context::default(), &mut workunit, process.clone().into())
.await
.unwrap();

Expand Down
Loading

0 comments on commit eadca88

Please sign in to comment.