Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mark workunits blocked, and skip rendering completed workunits (cherrypick of #12369) #12376

Merged
merged 1 commit into from
Jul 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 9 additions & 12 deletions src/python/pants/engine/internals/engine_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from pants.engine.internals.scheduler import ExecutionError, SchedulerSession
from pants.engine.internals.scheduler_test_base import SchedulerTestBase
from pants.engine.platform import rules as platform_rules
from pants.engine.process import MultiPlatformProcess, Process, ProcessResult
from pants.engine.process import MultiPlatformProcess, Process, ProcessCacheScope, ProcessResult
from pants.engine.process import rules as process_rules
from pants.engine.rules import Get, MultiGet, rule
from pants.engine.streaming_workunit_handler import (
Expand Down Expand Up @@ -609,6 +609,7 @@ async def a_rule() -> TrueResult:
proc = Process(
["/bin/sh", "-c", "true"],
description="always true",
cache_scope=ProcessCacheScope.PER_SESSION,
)
_ = await Get(ProcessResult, MultiPlatformProcess({None: proc}))
return TrueResult()
Expand All @@ -624,9 +625,9 @@ async def a_rule() -> TrueResult:

finished = list(itertools.chain.from_iterable(tracker.finished_workunit_chunks))
workunits_with_counters = [item for item in finished if "counters" in item]
assert workunits_with_counters[0]["counters"]["local_execution_requests"] == 1
assert workunits_with_counters[1]["counters"]["local_cache_requests"] == 1
assert workunits_with_counters[1]["counters"]["local_cache_requests_uncached"] == 1
assert workunits_with_counters[0]["counters"]["local_cache_requests"] == 1
assert workunits_with_counters[0]["counters"]["local_cache_requests_uncached"] == 1
assert workunits_with_counters[1]["counters"]["local_execution_requests"] == 1

assert histograms_info["version"] == 0
assert "histograms" in histograms_info
Expand Down Expand Up @@ -736,7 +737,7 @@ def test_process_digests_on_streaming_workunits(
run_tracker=run_tracker,
callbacks=[tracker],
report_interval_seconds=0.01,
max_workunit_verbosity=LogLevel.INFO,
max_workunit_verbosity=LogLevel.DEBUG,
specs=Specs.empty(),
options_bootstrapper=create_options_bootstrapper([]),
pantsd=False,
Expand All @@ -752,9 +753,7 @@ def test_process_digests_on_streaming_workunits(
assert tracker.finished
finished = list(itertools.chain.from_iterable(tracker.finished_workunit_chunks))

process_workunit = next(
item for item in finished if item["name"] == "multi_platform_process-running"
)
process_workunit = next(item for item in finished if item["name"] == "multi_platform_process")
assert process_workunit is not None
stdout_digest = process_workunit["artifacts"]["stdout_digest"]
stderr_digest = process_workunit["artifacts"]["stderr_digest"]
Expand All @@ -769,7 +768,7 @@ def test_process_digests_on_streaming_workunits(
run_tracker=run_tracker,
callbacks=[tracker],
report_interval_seconds=0.01,
max_workunit_verbosity=LogLevel.INFO,
max_workunit_verbosity=LogLevel.DEBUG,
specs=Specs.empty(),
options_bootstrapper=create_options_bootstrapper([]),
pantsd=False,
Expand All @@ -782,9 +781,7 @@ def test_process_digests_on_streaming_workunits(

assert tracker.finished
finished = list(itertools.chain.from_iterable(tracker.finished_workunit_chunks))
process_workunit = next(
item for item in finished if item["name"] == "multi_platform_process-running"
)
process_workunit = next(item for item in finished if item["name"] == "multi_platform_process")

assert process_workunit is not None
stdout_digest = process_workunit["artifacts"]["stdout_digest"]
Expand Down
4 changes: 2 additions & 2 deletions src/rust/engine/async_semaphore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ impl AsyncSemaphore {
///
pub async fn with_acquired<F, B, O>(self, f: F) -> O
where
F: FnOnce(usize) -> B + Send + 'static,
B: Future<Output = O> + Send + 'static,
F: FnOnce(usize) -> B,
B: Future<Output = O>,
{
let permit = self.acquire().await;
let res = f(permit.id).await;
Expand Down
82 changes: 58 additions & 24 deletions src/rust/engine/process_execution/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ use prost::Message;
use serde::{Deserialize, Serialize};
use sharded_lmdb::ShardedLmdb;
use store::Store;
use workunit_store::{in_workunit, Level, Metric, ObservationMetric, WorkunitMetadata};
use workunit_store::{
in_workunit, Level, Metric, ObservationMetric, RunningWorkunit, WorkunitMetadata,
};

use crate::{
Context, FallibleProcessResultWithPlatform, MultiPlatformProcess, Platform, Process,
Expand Down Expand Up @@ -57,42 +59,49 @@ impl crate::CommandRunner for CommandRunner {

async fn run(
&self,
req: MultiPlatformProcess,
context: Context,
workunit: &mut RunningWorkunit,
req: MultiPlatformProcess,
) -> Result<FallibleProcessResultWithPlatform, String> {
let cache_lookup_start = Instant::now();
let write_failures_to_cache = req
.0
.values()
.any(|process| process.cache_scope == ProcessCacheScope::Always);
let digest = crate::digest(req.clone(), &self.metadata);
let key = digest.hash;

let context2 = context.clone();
in_workunit!(
context2.workunit_store,
let cache_read_result = in_workunit!(
context.workunit_store.clone(),
"local_cache_read".to_owned(),
WorkunitMetadata {
level: Level::Trace,
desc: Some(format!("Local cache lookup: {}", req.user_facing_name())),
..WorkunitMetadata::default()
},
|workunit| async move {
workunit.increment_counter(Metric::LocalCacheRequests, 1);

let digest = crate::digest(req.clone(), &self.metadata);
let key = digest.hash;

let cache_failures = req
.0
.values()
.any(|process| process.cache_scope == ProcessCacheScope::Always);

let command_runner = self.clone();
match self.lookup(key).await {
Ok(Some(result)) if result.exit_code == 0 || cache_failures => {
Ok(Some(result)) if result.exit_code == 0 || write_failures_to_cache => {
let lookup_elapsed = cache_lookup_start.elapsed();
workunit.increment_counter(Metric::LocalCacheRequestsCached, 1);
if let Some(time_saved) = result.metadata.time_saved_from_cache(lookup_elapsed) {
let time_saved = time_saved.as_millis() as u64;
workunit.increment_counter(Metric::LocalCacheTotalTimeSavedMs, time_saved);
context
context2
.workunit_store
.record_observation(ObservationMetric::LocalCacheTimeSavedMs, time_saved);
}
return Ok(result);
// When we successfully use the cache, we change the description and increase the level
// (but not so much that it will be logged by default).
workunit.update_metadata(|initial| WorkunitMetadata {
desc: initial.desc.as_ref().map(|desc| format!("Hit: {}", desc)),
level: Level::Debug,
..initial
});
Ok(result)
}
Err(err) => {
debug!(
Expand All @@ -101,29 +110,54 @@ impl crate::CommandRunner for CommandRunner {
);
workunit.increment_counter(Metric::LocalCacheReadErrors, 1);
// Falling through to re-execute.
Err(())
}
Ok(_) => {
// Either we missed, or we hit for a failing result.
workunit.increment_counter(Metric::LocalCacheRequestsUncached, 1);
// Falling through to execute.
Err(())
}
}
}
.boxed()
)
.await;

if let Ok(result) = cache_read_result {
workunit.update_metadata(|initial| WorkunitMetadata {
desc: initial
.desc
.as_ref()
.map(|desc| format!("Hit local cache: {}", desc)),
..initial
});
return Ok(result);
}

let result = command_runner.underlying.run(req, context.clone()).await?;
if result.exit_code == 0 || cache_failures {
if let Err(err) = command_runner.store(key, &result).await {
let result = self.underlying.run(context.clone(), workunit, req).await?;
if result.exit_code == 0 || write_failures_to_cache {
let result = result.clone();
in_workunit!(
context.workunit_store.clone(),
"local_cache_write".to_owned(),
WorkunitMetadata {
level: Level::Trace,
..WorkunitMetadata::default()
},
|workunit| async move {
if let Err(err) = self.store(key, &result).await {
warn!(
"Error storing process execution result to local cache: {} - ignoring and continuing",
err
);
workunit.increment_counter(Metric::LocalCacheWriteErrors, 1);
}
}
Ok(result)
}
.boxed()
)
.await
)
.await;
}
Ok(result)
}
}

Expand Down
28 changes: 16 additions & 12 deletions src/rust/engine/process_execution/src/cache_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use store::Store;
use tempfile::TempDir;
use testutil::data::TestData;
use testutil::relative_paths;
use workunit_store::WorkunitStore;
use workunit_store::{RunningWorkunit, WorkunitStore};

use crate::{
CommandRunner as CommandRunnerTrait, Context, FallibleProcessResultWithPlatform, NamedCaches,
Expand Down Expand Up @@ -85,16 +85,18 @@ fn create_script(script_exit_code: i8) -> (Process, PathBuf, TempDir) {
(process, script_path, script_dir)
}

async fn run_roundtrip(script_exit_code: i8) -> RoundtripResults {
async fn run_roundtrip(script_exit_code: i8, workunit: &mut RunningWorkunit) -> RoundtripResults {
let (local, store, _local_runner_dir) = create_local_runner();
let (process, script_path, _script_dir) = create_script(script_exit_code);

let local_result = local.run(process.clone().into(), Context::default()).await;
let local_result = local
.run(Context::default(), workunit, process.clone().into())
.await;

let (caching, _cache_dir) = create_cached_runner(local, store.clone());

let uncached_result = caching
.run(process.clone().into(), Context::default())
.run(Context::default(), workunit, process.clone().into())
.await;

assert_eq!(local_result, uncached_result);
Expand All @@ -103,7 +105,9 @@ async fn run_roundtrip(script_exit_code: i8) -> RoundtripResults {
// fail due to a FileNotFound error. So, If the second run succeeds, that implies that the
// cache was successfully used.
std::fs::remove_file(&script_path).unwrap();
let maybe_cached_result = caching.run(process.into(), Context::default()).await;
let maybe_cached_result = caching
.run(Context::default(), workunit, process.into())
.await;

RoundtripResults {
uncached: uncached_result,
Expand All @@ -113,31 +117,31 @@ async fn run_roundtrip(script_exit_code: i8) -> RoundtripResults {

#[tokio::test]
async fn cache_success() {
WorkunitStore::setup_for_tests();
let results = run_roundtrip(0).await;
let (_, mut workunit) = WorkunitStore::setup_for_tests();
let results = run_roundtrip(0, &mut workunit).await;
assert_eq!(results.uncached, results.maybe_cached);
}

#[tokio::test]
async fn failures_not_cached() {
WorkunitStore::setup_for_tests();
let results = run_roundtrip(1).await;
let (_, mut workunit) = WorkunitStore::setup_for_tests();
let results = run_roundtrip(1, &mut workunit).await;
assert_ne!(results.uncached, results.maybe_cached);
assert_eq!(results.uncached.unwrap().exit_code, 1);
assert_eq!(results.maybe_cached.unwrap().exit_code, 127); // aka the return code for file not found
}

#[tokio::test]
async fn recover_from_missing_store_contents() {
WorkunitStore::setup_for_tests();
let (_, mut workunit) = WorkunitStore::setup_for_tests();

let (local, store, _local_runner_dir) = create_local_runner();
let (caching, _cache_dir) = create_cached_runner(local, store.clone());
let (process, _script_path, _script_dir) = create_script(0);

// Run once to cache the process.
let first_result = caching
.run(process.clone().into(), Context::default())
.run(Context::default(), &mut workunit, process.clone().into())
.await
.unwrap();

Expand Down Expand Up @@ -170,7 +174,7 @@ async fn recover_from_missing_store_contents() {

// Ensure that we don't fail if we re-run.
let second_result = caching
.run(process.clone().into(), Context::default())
.run(Context::default(), &mut workunit, process.clone().into())
.await
.unwrap();

Expand Down
Loading