Skip to content

Commit

Permalink
PVF: re-preparing artifact on failed runtime construction (#3187)
Browse files Browse the repository at this point in the history
resolve #3139

- [x] use a distinguishable error for `execute_artifact`
- [x] remove artifact in case of a `RuntimeConstruction` error during
the execution
- [x] augment the `validate_candidate_with_retry` of `ValidationBackend`
with the case of retriable `RuntimeConstruction` error during the
execution
- [x] update the book
(https://paritytech.github.io/polkadot-sdk/book/node/utility/pvf-host-and-workers.html#retrying-execution-requests)
- [x] add a test
- [x] run zombienet tests

---------

Co-authored-by: s0me0ne-unkn0wn <[email protected]>
  • Loading branch information
maksimryndin and s0me0ne-unkn0wn authored Feb 28, 2024
1 parent 1453026 commit 4261366
Show file tree
Hide file tree
Showing 15 changed files with 294 additions and 49 deletions.
53 changes: 34 additions & 19 deletions polkadot/node/core/candidate-validation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,8 @@ async fn validate_candidate_exhaustive(
))),
Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::JobError(err))) =>
Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(err))),
Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::RuntimeConstruction(err))) =>
Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(err))),

Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousJobDeath(err))) =>
Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(format!(
Expand Down Expand Up @@ -780,49 +782,62 @@ trait ValidationBackend {
return validation_result
}

macro_rules! break_if_no_retries_left {
($counter:ident) => {
if $counter > 0 {
$counter -= 1;
} else {
break
}
};
}

// Allow limited retries for each kind of error.
let mut num_death_retries_left = 1;
let mut num_job_error_retries_left = 1;
let mut num_internal_retries_left = 1;
let mut num_runtime_construction_retries_left = 1;
loop {
// Stop retrying if we exceeded the timeout.
if total_time_start.elapsed() + retry_delay > exec_timeout {
break
}

let mut retry_immediately = false;
match validation_result {
Err(ValidationError::PossiblyInvalid(
PossiblyInvalidError::AmbiguousWorkerDeath |
PossiblyInvalidError::AmbiguousJobDeath(_),
)) =>
if num_death_retries_left > 0 {
num_death_retries_left -= 1;
} else {
break
},
)) => break_if_no_retries_left!(num_death_retries_left),

Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::JobError(_))) =>
if num_job_error_retries_left > 0 {
num_job_error_retries_left -= 1;
} else {
break
},
break_if_no_retries_left!(num_job_error_retries_left),

Err(ValidationError::Internal(_)) =>
if num_internal_retries_left > 0 {
num_internal_retries_left -= 1;
} else {
break
},
break_if_no_retries_left!(num_internal_retries_left),

Err(ValidationError::PossiblyInvalid(
PossiblyInvalidError::RuntimeConstruction(_),
)) => {
break_if_no_retries_left!(num_runtime_construction_retries_left);
self.precheck_pvf(pvf.clone()).await?;
// In this case the error is deterministic
// And a retry forces the ValidationBackend
// to re-prepare the artifact so
// there is no need to wait before the retry
retry_immediately = true;
},

Ok(_) | Err(ValidationError::Invalid(_) | ValidationError::Preparation(_)) => break,
}

// If we got a possibly transient error, retry once after a brief delay, on the
// assumption that the conditions that caused this error may have resolved on their own.
{
// Wait a brief delay before retrying.
futures_timer::Delay::new(retry_delay).await;
// In case of many transient errors it is necessary to wait a little bit
// for the error to be probably resolved
if !retry_immediately {
futures_timer::Delay::new(retry_delay).await;
}

let new_timeout = exec_timeout.saturating_sub(total_time_start.elapsed());

Expand Down
1 change: 1 addition & 0 deletions polkadot/node/core/pvf/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

use crate::prepare::{PrepareSuccess, PrepareWorkerSuccess};
use parity_scale_codec::{Decode, Encode};
pub use sc_executor_common::error::Error as ExecuteError;

/// Result of PVF preparation from a worker, with checksum of the compiled PVF and stats of the
/// preparation if successful.
Expand Down
15 changes: 15 additions & 0 deletions polkadot/node/core/pvf/common/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ pub enum WorkerResponse {
},
/// The candidate is invalid.
InvalidCandidate(String),
/// Instantiation of the WASM module instance failed during an execution.
/// Possibly related to local issues or dirty node update. May be retried with re-preparation.
RuntimeConstruction(String),
/// The job timed out.
JobTimedOut,
/// The job process has died. We must kill the worker just in case.
Expand Down Expand Up @@ -68,6 +71,9 @@ pub enum JobResponse {
/// The result of parachain validation.
result_descriptor: ValidationResult,
},
/// A possibly transient runtime instantiation error happened during the execution; may be
/// retried with re-preparation
RuntimeConstruction(String),
/// The candidate is invalid.
InvalidCandidate(String),
}
Expand All @@ -81,6 +87,15 @@ impl JobResponse {
Self::InvalidCandidate(format!("{}: {}", ctx, msg))
}
}

/// Creates a may retry response from a context `ctx` and a message `msg` (which can be empty).
pub fn runtime_construction(ctx: &'static str, msg: &str) -> Self {
if msg.is_empty() {
Self::RuntimeConstruction(ctx.to_string())
} else {
Self::RuntimeConstruction(format!("{}: {}", ctx, msg))
}
}
}

/// An unexpected error occurred in the execution job process. Because this comes from the job,
Expand Down
4 changes: 2 additions & 2 deletions polkadot/node/core/pvf/common/src/executor_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

//! Interface to the Substrate Executor
use crate::error::ExecuteError;
use polkadot_primitives::{
executor_params::{DEFAULT_LOGICAL_STACK_MAX, DEFAULT_NATIVE_STACK_MAX},
ExecutorParam, ExecutorParams,
Expand Down Expand Up @@ -109,7 +110,7 @@ pub unsafe fn execute_artifact(
compiled_artifact_blob: &[u8],
executor_params: &ExecutorParams,
params: &[u8],
) -> Result<Vec<u8>, String> {
) -> Result<Vec<u8>, ExecuteError> {
let mut extensions = sp_externalities::Extensions::new();

extensions.register(sp_core::traits::ReadRuntimeVersionExt::new(ReadRuntimeVersion));
Expand All @@ -123,7 +124,6 @@ pub unsafe fn execute_artifact(
Ok(Ok(ok)) => Ok(ok),
Ok(Err(err)) | Err(err) => Err(err),
}
.map_err(|err| format!("execute error: {:?}", err))
}

/// Constructs the runtime for the given PVF, given the artifact bytes.
Expand Down
10 changes: 8 additions & 2 deletions polkadot/node/core/pvf/execute-worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

//! Contains the logic for executing PVFs. Used by the polkadot-execute-worker binary.
pub use polkadot_node_core_pvf_common::executor_interface::execute_artifact;
pub use polkadot_node_core_pvf_common::{
error::ExecuteError, executor_interface::execute_artifact,
};

// NOTE: Initializing logging in e.g. tests will not have an effect in the workers, as they are
// separate spawned processes. Run with e.g. `RUST_LOG=parachain::pvf-execute-worker=trace`.
Expand Down Expand Up @@ -237,7 +239,9 @@ fn validate_using_artifact(
// [`executor_interface::prepare`].
execute_artifact(compiled_artifact_blob, executor_params, params)
} {
Err(err) => return JobResponse::format_invalid("execute", &err),
Err(ExecuteError::RuntimeConstruction(wasmerr)) =>
return JobResponse::runtime_construction("execute", &wasmerr.to_string()),
Err(err) => return JobResponse::format_invalid("execute", &err.to_string()),
Ok(d) => d,
};

Expand Down Expand Up @@ -550,6 +554,8 @@ fn handle_parent_process(
Ok(WorkerResponse::Ok { result_descriptor, duration: cpu_tv })
},
Ok(JobResponse::InvalidCandidate(err)) => Ok(WorkerResponse::InvalidCandidate(err)),
Ok(JobResponse::RuntimeConstruction(err)) =>
Ok(WorkerResponse::RuntimeConstruction(err)),
Err(job_error) => {
gum::warn!(
target: LOG_TARGET,
Expand Down
8 changes: 8 additions & 0 deletions polkadot/node/core/pvf/src/artifacts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,14 @@ impl Artifacts {
.is_none());
}

/// Remove artifact by its id.
pub fn remove(&mut self, artifact_id: ArtifactId) -> Option<(ArtifactId, PathBuf)> {
self.inner.remove(&artifact_id).and_then(|state| match state {
ArtifactState::Prepared { path, .. } => Some((artifact_id, path)),
_ => None,
})
}

/// Remove artifacts older than the given TTL and return id and path of the removed ones.
pub fn prune(&mut self, artifact_ttl: Duration) -> Vec<(ArtifactId, PathBuf)> {
let now = SystemTime::now();
Expand Down
4 changes: 4 additions & 0 deletions polkadot/node/core/pvf/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ pub enum PossiblyInvalidError {
/// vote invalid.
#[error("possibly invalid: job error: {0}")]
JobError(String),
/// Instantiation of the WASM module instance failed during an execution.
/// Possibly related to local issues or dirty node update. May be retried with re-preparation.
#[error("possibly invalid: runtime construction: {0}")]
RuntimeConstruction(String),
}

impl From<PrepareError> for ValidationError {
Expand Down
2 changes: 1 addition & 1 deletion polkadot/node/core/pvf/src/execute/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@
mod queue;
mod worker_interface;

pub use queue::{start, PendingExecutionRequest, ToQueue};
pub use queue::{start, FromQueue, PendingExecutionRequest, ToQueue};
68 changes: 57 additions & 11 deletions polkadot/node/core/pvf/src/execute/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::{
InvalidCandidate, PossiblyInvalidError, ValidationError, LOG_TARGET,
};
use futures::{
channel::mpsc,
channel::{mpsc, oneshot},
future::BoxFuture,
stream::{FuturesUnordered, StreamExt as _},
Future, FutureExt,
Expand Down Expand Up @@ -54,6 +54,12 @@ pub enum ToQueue {
Enqueue { artifact: ArtifactPathId, pending_execution_request: PendingExecutionRequest },
}

/// A response from queue.
#[derive(Debug)]
pub enum FromQueue {
RemoveArtifact { artifact: ArtifactId, reply_to: oneshot::Sender<()> },
}

/// An execution request that should execute the PVF (known in the context) and send the results
/// to the given result sender.
#[derive(Debug)]
Expand Down Expand Up @@ -137,6 +143,8 @@ struct Queue {

/// The receiver that receives messages to the pool.
to_queue_rx: mpsc::Receiver<ToQueue>,
/// The sender to send messages back to validation host.
from_queue_tx: mpsc::UnboundedSender<FromQueue>,

// Some variables related to the current session.
program_path: PathBuf,
Expand All @@ -161,6 +169,7 @@ impl Queue {
node_version: Option<String>,
security_status: SecurityStatus,
to_queue_rx: mpsc::Receiver<ToQueue>,
from_queue_tx: mpsc::UnboundedSender<FromQueue>,
) -> Self {
Self {
metrics,
Expand All @@ -170,6 +179,7 @@ impl Queue {
node_version,
security_status,
to_queue_rx,
from_queue_tx,
queue: VecDeque::new(),
mux: Mux::new(),
workers: Workers {
Expand Down Expand Up @@ -301,7 +311,7 @@ async fn handle_mux(queue: &mut Queue, event: QueueEvent) {
handle_worker_spawned(queue, idle, handle, job);
},
QueueEvent::StartWork(worker, outcome, artifact_id, result_tx) => {
handle_job_finish(queue, worker, outcome, artifact_id, result_tx);
handle_job_finish(queue, worker, outcome, artifact_id, result_tx).await;
},
}
}
Expand All @@ -327,42 +337,69 @@ fn handle_worker_spawned(

/// If there are pending jobs in the queue, schedules the next of them onto the just freed up
/// worker. Otherwise, puts back into the available workers list.
fn handle_job_finish(
async fn handle_job_finish(
queue: &mut Queue,
worker: Worker,
outcome: Outcome,
artifact_id: ArtifactId,
result_tx: ResultSender,
) {
let (idle_worker, result, duration) = match outcome {
let (idle_worker, result, duration, sync_channel) = match outcome {
Outcome::Ok { result_descriptor, duration, idle_worker } => {
// TODO: propagate the soft timeout

(Some(idle_worker), Ok(result_descriptor), Some(duration))
(Some(idle_worker), Ok(result_descriptor), Some(duration), None)
},
Outcome::InvalidCandidate { err, idle_worker } => (
Some(idle_worker),
Err(ValidationError::Invalid(InvalidCandidate::WorkerReportedInvalid(err))),
None,
None,
),
Outcome::InternalError { err } => (None, Err(ValidationError::Internal(err)), None),
Outcome::RuntimeConstruction { err, idle_worker } => {
// The task for artifact removal is executed concurrently with
// the message to the host on the execution result.
let (result_tx, result_rx) = oneshot::channel();
queue
.from_queue_tx
.unbounded_send(FromQueue::RemoveArtifact {
artifact: artifact_id.clone(),
reply_to: result_tx,
})
.expect("from execute queue receiver is listened by the host; qed");
(
Some(idle_worker),
Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::RuntimeConstruction(
err,
))),
None,
Some(result_rx),
)
},
Outcome::InternalError { err } => (None, Err(ValidationError::Internal(err)), None, None),
// Either the worker or the job timed out. Kill the worker in either case. Treated as
// definitely-invalid, because if we timed out, there's no time left for a retry.
Outcome::HardTimeout =>
(None, Err(ValidationError::Invalid(InvalidCandidate::HardTimeout)), None),
(None, Err(ValidationError::Invalid(InvalidCandidate::HardTimeout)), None, None),
// "Maybe invalid" errors (will retry).
Outcome::WorkerIntfErr => (
None,
Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath)),
None,
None,
),
Outcome::JobDied { err } => (
None,
Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousJobDeath(err))),
None,
None,
),
Outcome::JobError { err } => (
None,
Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::JobError(err))),
None,
None,
),
Outcome::JobError { err } =>
(None, Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::JobError(err))), None),
};

queue.metrics.execute_finished();
Expand All @@ -386,6 +423,12 @@ fn handle_job_finish(
);
}

if let Some(sync_channel) = sync_channel {
// err means the sender is dropped (the artifact is already removed from the cache)
// so that's legitimate to ignore the result
let _ = sync_channel.await;
}

// First we send the result. It may fail due to the other end of the channel being dropped,
// that's legitimate and we don't treat that as an error.
let _ = result_tx.send(result);
Expand Down Expand Up @@ -521,8 +564,10 @@ pub fn start(
spawn_timeout: Duration,
node_version: Option<String>,
security_status: SecurityStatus,
) -> (mpsc::Sender<ToQueue>, impl Future<Output = ()>) {
) -> (mpsc::Sender<ToQueue>, mpsc::UnboundedReceiver<FromQueue>, impl Future<Output = ()>) {
let (to_queue_tx, to_queue_rx) = mpsc::channel(20);
let (from_queue_tx, from_queue_rx) = mpsc::unbounded();

let run = Queue::new(
metrics,
program_path,
Expand All @@ -532,7 +577,8 @@ pub fn start(
node_version,
security_status,
to_queue_rx,
from_queue_tx,
)
.run();
(to_queue_tx, run)
(to_queue_tx, from_queue_rx, run)
}
Loading

0 comments on commit 4261366

Please sign in to comment.