Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Retry failed PVF prepare jobs #6213

Merged
merged 5 commits into from
Nov 8, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions node/core/pvf/src/artifacts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,22 @@ pub enum ArtifactState {
last_time_needed: SystemTime,
},
/// A task to prepare this artifact is scheduled.
Preparing { waiting_for_response: Vec<PrepareResultSender> },
Preparing {
/// List of result senders that are waiting for a response.
waiting_for_response: Vec<PrepareResultSender>,
/// The number of times this artifact has failed to prepare.
num_failures: u32,
},
/// The code couldn't be compiled due to an error. Such artifacts
/// never reach the executor and stay in the host's memory.
FailedToProcess(PrepareError),
FailedToProcess {
/// Keep track of the last time that processing this artifact failed.
last_time_failed: SystemTime,
/// The number of times this artifact has failed to prepare.
num_failures: u32,
/// The prepare error.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: tautology. What is this error exactly? The last one I presume?

error: PrepareError,
},
}

/// A container of all known artifact ids and their states.
Expand Down Expand Up @@ -150,7 +162,7 @@ impl Artifacts {
// See the precondition.
always!(self
.artifacts
.insert(artifact_id, ArtifactState::Preparing { waiting_for_response })
.insert(artifact_id, ArtifactState::Preparing { waiting_for_response, num_failures: 0 })
.is_none());
}

Expand Down
127 changes: 107 additions & 20 deletions node/core/pvf/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ pub const PRECHECK_COMPILATION_TIMEOUT: Duration = Duration::from_secs(60);
// NOTE: If you change this make sure to fix the buckets of `pvf_preparation_time` metric.
pub const EXECUTE_COMPILATION_TIMEOUT: Duration = Duration::from_secs(180);

/// The time period after which a failed preparation artifact is considered ready to be retried. Note that we will only
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: reaches soft wrap limit and no good reason to not wrap the line.

/// retry if another request comes in after this cooldown has passed.
pub const PREPARE_FAILURE_COOLDOWN: Duration = Duration::from_secs(15 * 60);

/// The amount of times we will retry failed prepare jobs.
pub const NUM_PREPARE_RETRIES: u32 = 5;

/// An alias to not spell the type for the oneshot sender for the PVF execution result.
pub(crate) type ResultSender = oneshot::Sender<Result<ValidationResult, ValidationError>>;

Expand Down Expand Up @@ -360,6 +367,8 @@ async fn run(
Some(to_host) => to_host,
};

// If the artifact failed before, it could be re-scheduled for preparation here if
// the preparation failure cooldown has elapsed.
break_if_fatal!(handle_to_host(
&cache_path,
&mut artifacts,
Expand All @@ -376,9 +385,9 @@ async fn run(
// Note that preparation always succeeds.
//
// That's because the error conditions are written into the artifact and will be
// reported at the time of the execution. It potentially, but not necessarily,
// can be scheduled as a result of this function call, in case there are pending
// executions.
// reported at the time of the execution. It potentially, but not necessarily, can
// be scheduled for execution as a result of this function call, in case there are
// pending executions.
//
// We could be eager in terms of reporting and plumb the result from the preparation
// worker but we don't for the sake of simplicity.
Expand Down Expand Up @@ -432,6 +441,8 @@ async fn handle_to_host(
/// Handles PVF prechecking.
///
/// This tries to prepare the PVF by compiling the WASM blob within a given timeout ([`PRECHECK_COMPILATION_TIMEOUT`]).
///
/// If the prepare job failed previously, we may retry it under certain conditions.
async fn handle_precheck_pvf(
artifacts: &mut Artifacts,
prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
Expand All @@ -446,11 +457,28 @@ async fn handle_precheck_pvf(
*last_time_needed = SystemTime::now();
let _ = result_sender.send(Ok(()));
},
ArtifactState::Preparing { waiting_for_response } =>
ArtifactState::Preparing { waiting_for_response, num_failures: _ } =>
waiting_for_response.push(result_sender),
ArtifactState::FailedToProcess(result) => {
let _ = result_sender.send(PrepareResult::Err(result.clone()));
},
ArtifactState::FailedToProcess { last_time_failed, num_failures, error } =>
if can_retry_prepare_after_failure(*last_time_failed, *num_failures) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: For preparation we actually agreed that retrying is not required/desired here. The pre-checking logic will not actually retry anyway, so this code will never get executed - so it is fine, but given that not doing it actually simplifies things, instead of complicating things (due to this complete separate implementation) - I would suggest to just not do it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would also remove some code duplication.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, removed.

// If we are allowed to retry the failed prepare job, change the state to
// Preparing and re-queue this job.
*state = ArtifactState::Preparing {
waiting_for_response: vec![result_sender],
num_failures: *num_failures,
};
send_prepare(
prepare_queue,
prepare::ToQueue::Enqueue {
priority: Priority::Normal,
pvf,
compilation_timeout: PRECHECK_COMPILATION_TIMEOUT,
},
)
.await?;
} else {
let _ = result_sender.send(PrepareResult::Err(error.clone()));
},
}
} else {
artifacts.insert_preparing(artifact_id, vec![result_sender]);
Expand All @@ -469,9 +497,13 @@ async fn handle_precheck_pvf(

/// Handles PVF execution.
///
/// This will first try to prepare the PVF, if a prepared artifact does not already exist. If there is already a
/// preparation job, we coalesce the two preparation jobs. When preparing for execution, we use a more lenient timeout
/// ([`EXECUTE_COMPILATION_TIMEOUT`]) than when prechecking.
/// This will try to prepare the PVF, if a prepared artifact does not already exist. If there is already a
/// preparation job, we coalesce the two preparation jobs.
///
/// If the prepare job failed previously, we may retry it under certain conditions.
///
/// When preparing for execution, we use a more lenient timeout ([`EXECUTE_COMPILATION_TIMEOUT`])
/// than when prechecking.
async fn handle_execute_pvf(
cache_path: &Path,
artifacts: &mut Artifacts,
Expand All @@ -491,6 +523,7 @@ async fn handle_execute_pvf(
ArtifactState::Prepared { last_time_needed } => {
*last_time_needed = SystemTime::now();

// This artifact has already been prepared, send it to the execute queue.
send_execute(
execute_queue,
execute::ToQueue::Enqueue {
Expand All @@ -502,11 +535,29 @@ async fn handle_execute_pvf(
)
.await?;
},
ArtifactState::Preparing { waiting_for_response: _ } => {
ArtifactState::Preparing { .. } => {
awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx);
},
ArtifactState::FailedToProcess(error) => {
let _ = result_tx.send(Err(ValidationError::from(error.clone())));
ArtifactState::FailedToProcess { last_time_failed, num_failures, error } => {
if can_retry_prepare_after_failure(*last_time_failed, *num_failures) {
// If we are allowed to retry the failed prepare job, change the state to
// Preparing and re-queue this job.
*state = ArtifactState::Preparing {
waiting_for_response: Vec::new(),
num_failures: *num_failures,
};
send_prepare(
prepare_queue,
prepare::ToQueue::Enqueue {
priority,
pvf,
compilation_timeout: EXECUTE_COMPILATION_TIMEOUT,
},
)
.await?;
} else {
let _ = result_tx.send(Err(ValidationError::from(error.clone())));
}
},
}
} else {
Expand All @@ -523,6 +574,7 @@ async fn handle_execute_pvf(
)
.await?;

// Add an execution request that will wait to run after this prepare job has finished.
awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx);
}

Expand All @@ -543,10 +595,29 @@ async fn handle_heads_up(
ArtifactState::Prepared { last_time_needed, .. } => {
*last_time_needed = now;
},
ArtifactState::Preparing { waiting_for_response: _ } => {
ArtifactState::Preparing { .. } => {
// The artifact is already being prepared, so we don't need to do anything.
},
ArtifactState::FailedToProcess(_) => {},
ArtifactState::FailedToProcess { last_time_failed, num_failures, error: _ } => {
// TODO: Do we want to retry for heads-up requests?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Figured we wanted the same logic for heads-up requests, but wanted to double-check.

if can_retry_prepare_after_failure(*last_time_failed, *num_failures) {
// If we are allowed to retry the failed prepare job, change the state to
// Preparing and re-queue this job.
*state = ArtifactState::Preparing {
waiting_for_response: vec![],
num_failures: *num_failures,
};
send_prepare(
prepare_queue,
prepare::ToQueue::Enqueue {
priority: Priority::Normal,
pvf: active_pvf,
compilation_timeout: EXECUTE_COMPILATION_TIMEOUT,
},
)
.await?;
}
},
}
} else {
// It's not in the artifacts, so we need to enqueue a job to prepare it.
Expand Down Expand Up @@ -596,20 +667,26 @@ async fn handle_prepare_done(
never!("the artifact is already prepared: {:?}", artifact_id);
return Ok(())
},
Some(ArtifactState::FailedToProcess(_)) => {
Some(ArtifactState::FailedToProcess { .. }) => {
// The reasoning is similar to the above, the artifact cannot be
// processed at this point.
never!("the artifact is already processed unsuccessfully: {:?}", artifact_id);
return Ok(())
},
Some(state @ ArtifactState::Preparing { waiting_for_response: _ }) => state,
Some(state @ ArtifactState::Preparing { .. }) => state,
};

if let ArtifactState::Preparing { waiting_for_response } = state {
let num_failures = if let ArtifactState::Preparing { waiting_for_response, num_failures } =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I wonder if it's time to move it under the match above? Then, let state would become let (state, num_failures) and/or waiting_for_response.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I gave it a try, but I wasn't able to work around the compiler errors when I do that.

state
{
for result_sender in waiting_for_response.drain(..) {
let _ = result_sender.send(result.clone());
}
}
num_failures
} else {
never!("The reasoning is similar to the above, the artifact can only be preparing at this point; qed");
return Ok(())
};

// It's finally time to dispatch all the execution requests that were waiting for this artifact
// to be prepared.
Expand Down Expand Up @@ -641,7 +718,11 @@ async fn handle_prepare_done(

*state = match result {
Ok(()) => ArtifactState::Prepared { last_time_needed: SystemTime::now() },
Err(error) => ArtifactState::FailedToProcess(error.clone()),
Err(error) => ArtifactState::FailedToProcess {
last_time_failed: SystemTime::now(),
num_failures: *num_failures + 1,
error: error.clone(),
},
};

Ok(())
Expand Down Expand Up @@ -704,6 +785,12 @@ async fn sweeper_task(mut sweeper_rx: mpsc::Receiver<PathBuf>) {
}
}

/// Check if the conditions to retry a prepare job have been met.
fn can_retry_prepare_after_failure(last_time_failed: SystemTime, num_failures: u32) -> bool {
SystemTime::now() >= last_time_failed + PREPARE_FAILURE_COOLDOWN &&
num_failures <= NUM_PREPARE_RETRIES
}

/// A stream that yields a pulse continuously at a given interval.
fn pulse_every(interval: std::time::Duration) -> impl futures::Stream<Item = ()> {
futures::stream::unfold(interval, {
Expand Down