-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Retry failed PVF prepare jobs #6213
Changes from 1 commit
adc9460
ebd600c
08d103f
086b2fd
84e5764
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -48,6 +48,13 @@ pub const PRECHECK_COMPILATION_TIMEOUT: Duration = Duration::from_secs(60); | |
// NOTE: If you change this make sure to fix the buckets of `pvf_preparation_time` metric. | ||
pub const EXECUTE_COMPILATION_TIMEOUT: Duration = Duration::from_secs(180); | ||
|
||
/// The time period after which a failed preparation artifact is considered ready to be retried. Note that we will only | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: reaches soft wrap limit and no good reason to not wrap the line. |
||
/// retry if another request comes in after this cooldown has passed. | ||
pub const PREPARE_FAILURE_COOLDOWN: Duration = Duration::from_secs(15 * 60); | ||
|
||
/// The amount of times we will retry failed prepare jobs. | ||
pub const NUM_PREPARE_RETRIES: u32 = 5; | ||
|
||
/// An alias to not spell the type for the oneshot sender for the PVF execution result. | ||
pub(crate) type ResultSender = oneshot::Sender<Result<ValidationResult, ValidationError>>; | ||
|
||
|
@@ -360,6 +367,8 @@ async fn run( | |
Some(to_host) => to_host, | ||
}; | ||
|
||
// If the artifact failed before, it could be re-scheduled for preparation here if | ||
// the preparation failure cooldown has elapsed. | ||
break_if_fatal!(handle_to_host( | ||
&cache_path, | ||
&mut artifacts, | ||
|
@@ -376,9 +385,9 @@ async fn run( | |
// Note that preparation always succeeds. | ||
// | ||
// That's because the error conditions are written into the artifact and will be | ||
// reported at the time of the execution. It potentially, but not necessarily, | ||
// can be scheduled as a result of this function call, in case there are pending | ||
// executions. | ||
// reported at the time of the execution. It potentially, but not necessarily, can | ||
// be scheduled for execution as a result of this function call, in case there are | ||
// pending executions. | ||
// | ||
// We could be eager in terms of reporting and plumb the result from the preparation | ||
// worker but we don't for the sake of simplicity. | ||
|
@@ -432,6 +441,8 @@ async fn handle_to_host( | |
/// Handles PVF prechecking. | ||
/// | ||
/// This tries to prepare the PVF by compiling the WASM blob within a given timeout ([`PRECHECK_COMPILATION_TIMEOUT`]). | ||
/// | ||
/// If the prepare job failed previously, we may retry it under certain conditions. | ||
async fn handle_precheck_pvf( | ||
artifacts: &mut Artifacts, | ||
prepare_queue: &mut mpsc::Sender<prepare::ToQueue>, | ||
|
@@ -446,11 +457,28 @@ async fn handle_precheck_pvf( | |
*last_time_needed = SystemTime::now(); | ||
let _ = result_sender.send(Ok(())); | ||
}, | ||
ArtifactState::Preparing { waiting_for_response } => | ||
ArtifactState::Preparing { waiting_for_response, num_failures: _ } => | ||
waiting_for_response.push(result_sender), | ||
ArtifactState::FailedToProcess(result) => { | ||
let _ = result_sender.send(PrepareResult::Err(result.clone())); | ||
}, | ||
ArtifactState::FailedToProcess { last_time_failed, num_failures, error } => | ||
if can_retry_prepare_after_failure(*last_time_failed, *num_failures) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. NIT: For preparation we actually agreed that retrying is not required/desired here. The pre-checking logic will not actually retry anyway, so this code will never get executed - so it is fine, but given that not doing it actually simplifies things, instead of complicating things (due to this complete separate implementation) - I would suggest to just not do it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would also remove some code duplication. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Makes sense, removed. |
||
// If we are allowed to retry the failed prepare job, change the state to | ||
// Preparing and re-queue this job. | ||
*state = ArtifactState::Preparing { | ||
waiting_for_response: vec![result_sender], | ||
num_failures: *num_failures, | ||
}; | ||
send_prepare( | ||
prepare_queue, | ||
prepare::ToQueue::Enqueue { | ||
priority: Priority::Normal, | ||
pvf, | ||
compilation_timeout: PRECHECK_COMPILATION_TIMEOUT, | ||
}, | ||
) | ||
.await?; | ||
} else { | ||
let _ = result_sender.send(PrepareResult::Err(error.clone())); | ||
}, | ||
} | ||
} else { | ||
artifacts.insert_preparing(artifact_id, vec![result_sender]); | ||
|
@@ -469,9 +497,13 @@ async fn handle_precheck_pvf( | |
|
||
/// Handles PVF execution. | ||
/// | ||
/// This will first try to prepare the PVF, if a prepared artifact does not already exist. If there is already a | ||
/// preparation job, we coalesce the two preparation jobs. When preparing for execution, we use a more lenient timeout | ||
/// ([`EXECUTE_COMPILATION_TIMEOUT`]) than when prechecking. | ||
/// This will try to prepare the PVF, if a prepared artifact does not already exist. If there is already a | ||
/// preparation job, we coalesce the two preparation jobs. | ||
/// | ||
/// If the prepare job failed previously, we may retry it under certain conditions. | ||
/// | ||
/// When preparing for execution, we use a more lenient timeout ([`EXECUTE_COMPILATION_TIMEOUT`]) | ||
/// than when prechecking. | ||
async fn handle_execute_pvf( | ||
cache_path: &Path, | ||
artifacts: &mut Artifacts, | ||
|
@@ -491,6 +523,7 @@ async fn handle_execute_pvf( | |
ArtifactState::Prepared { last_time_needed } => { | ||
*last_time_needed = SystemTime::now(); | ||
|
||
// This artifact has already been prepared, send it to the execute queue. | ||
send_execute( | ||
execute_queue, | ||
execute::ToQueue::Enqueue { | ||
|
@@ -502,11 +535,29 @@ async fn handle_execute_pvf( | |
) | ||
.await?; | ||
}, | ||
ArtifactState::Preparing { waiting_for_response: _ } => { | ||
ArtifactState::Preparing { .. } => { | ||
awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx); | ||
}, | ||
ArtifactState::FailedToProcess(error) => { | ||
let _ = result_tx.send(Err(ValidationError::from(error.clone()))); | ||
ArtifactState::FailedToProcess { last_time_failed, num_failures, error } => { | ||
if can_retry_prepare_after_failure(*last_time_failed, *num_failures) { | ||
// If we are allowed to retry the failed prepare job, change the state to | ||
// Preparing and re-queue this job. | ||
*state = ArtifactState::Preparing { | ||
waiting_for_response: Vec::new(), | ||
num_failures: *num_failures, | ||
}; | ||
send_prepare( | ||
prepare_queue, | ||
prepare::ToQueue::Enqueue { | ||
priority, | ||
pvf, | ||
compilation_timeout: EXECUTE_COMPILATION_TIMEOUT, | ||
}, | ||
) | ||
.await?; | ||
} else { | ||
let _ = result_tx.send(Err(ValidationError::from(error.clone()))); | ||
} | ||
}, | ||
} | ||
} else { | ||
|
@@ -523,6 +574,7 @@ async fn handle_execute_pvf( | |
) | ||
.await?; | ||
|
||
// Add an execution request that will wait to run after this prepare job has finished. | ||
awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx); | ||
} | ||
|
||
|
@@ -543,10 +595,29 @@ async fn handle_heads_up( | |
ArtifactState::Prepared { last_time_needed, .. } => { | ||
*last_time_needed = now; | ||
}, | ||
ArtifactState::Preparing { waiting_for_response: _ } => { | ||
ArtifactState::Preparing { .. } => { | ||
// The artifact is already being prepared, so we don't need to do anything. | ||
}, | ||
ArtifactState::FailedToProcess(_) => {}, | ||
ArtifactState::FailedToProcess { last_time_failed, num_failures, error: _ } => { | ||
// TODO: Do we want to retry for heads-up requests? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Figured we wanted the same logic for heads-up requests, but wanted to double-check. |
||
if can_retry_prepare_after_failure(*last_time_failed, *num_failures) { | ||
// If we are allowed to retry the failed prepare job, change the state to | ||
// Preparing and re-queue this job. | ||
*state = ArtifactState::Preparing { | ||
waiting_for_response: vec![], | ||
num_failures: *num_failures, | ||
}; | ||
send_prepare( | ||
prepare_queue, | ||
prepare::ToQueue::Enqueue { | ||
priority: Priority::Normal, | ||
pvf: active_pvf, | ||
compilation_timeout: EXECUTE_COMPILATION_TIMEOUT, | ||
}, | ||
) | ||
.await?; | ||
} | ||
}, | ||
} | ||
} else { | ||
// It's not in the artifacts, so we need to enqueue a job to prepare it. | ||
|
@@ -596,20 +667,26 @@ async fn handle_prepare_done( | |
never!("the artifact is already prepared: {:?}", artifact_id); | ||
return Ok(()) | ||
}, | ||
Some(ArtifactState::FailedToProcess(_)) => { | ||
Some(ArtifactState::FailedToProcess { .. }) => { | ||
// The reasoning is similar to the above, the artifact cannot be | ||
// processed at this point. | ||
never!("the artifact is already processed unsuccessfully: {:?}", artifact_id); | ||
return Ok(()) | ||
}, | ||
Some(state @ ArtifactState::Preparing { waiting_for_response: _ }) => state, | ||
Some(state @ ArtifactState::Preparing { .. }) => state, | ||
}; | ||
|
||
if let ArtifactState::Preparing { waiting_for_response } = state { | ||
let num_failures = if let ArtifactState::Preparing { waiting_for_response, num_failures } = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: I wonder if it's time to move it under the match above? Then, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I gave it a try, but I wasn't able to work around the compiler errors when I do that. |
||
state | ||
{ | ||
for result_sender in waiting_for_response.drain(..) { | ||
let _ = result_sender.send(result.clone()); | ||
} | ||
} | ||
num_failures | ||
} else { | ||
never!("The reasoning is similar to the above, the artifact can only be preparing at this point; qed"); | ||
return Ok(()) | ||
}; | ||
|
||
// It's finally time to dispatch all the execution requests that were waiting for this artifact | ||
// to be prepared. | ||
|
@@ -641,7 +718,11 @@ async fn handle_prepare_done( | |
|
||
*state = match result { | ||
Ok(()) => ArtifactState::Prepared { last_time_needed: SystemTime::now() }, | ||
Err(error) => ArtifactState::FailedToProcess(error.clone()), | ||
Err(error) => ArtifactState::FailedToProcess { | ||
last_time_failed: SystemTime::now(), | ||
num_failures: *num_failures + 1, | ||
error: error.clone(), | ||
}, | ||
}; | ||
|
||
Ok(()) | ||
|
@@ -704,6 +785,12 @@ async fn sweeper_task(mut sweeper_rx: mpsc::Receiver<PathBuf>) { | |
} | ||
} | ||
|
||
/// Check if the conditions to retry a prepare job have been met. | ||
fn can_retry_prepare_after_failure(last_time_failed: SystemTime, num_failures: u32) -> bool { | ||
SystemTime::now() >= last_time_failed + PREPARE_FAILURE_COOLDOWN && | ||
num_failures <= NUM_PREPARE_RETRIES | ||
} | ||
|
||
/// A stream that yields a pulse continuously at a given interval. | ||
fn pulse_every(interval: std::time::Duration) -> impl futures::Stream<Item = ()> { | ||
futures::stream::unfold(interval, { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: tautology. What is this error exactly? The last one I presume?