Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry tasks during schedule phase of active listener #1349

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 40 additions & 24 deletions nativelink-scheduler/src/simple_scheduler_state_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ use super::awaited_action_db::{
/// can fail before giving up.
const MAX_UPDATE_RETRIES: usize = 5;

/// Maximum number of times an action can timeout before
/// forcefully being retried.
/// This is a safety mechanism to prevent an action from
/// being stuck in a state where it is not updated due to
/// an error in the inner database.
const ABS_MAX_TIMEOUTS_BEFORE_RETRY: usize = 5;

/// Simple struct that implements the ActionStateResult trait and always returns an error.
struct ErrorActionStateResult(Error);

Expand Down Expand Up @@ -233,6 +240,7 @@ where

async fn changed(&mut self) -> Result<Arc<ActionState>, Error> {
let mut timeout_attempts = 0;
let mut timeouts_triggered = 0;
loop {
tokio::select! {
awaited_action_result = self.awaited_action_sub.changed() => {
Expand All @@ -242,6 +250,7 @@ where
}
_ = (self.now_fn)().sleep(self.no_event_action_timeout) => {
// Timeout happened, do additional checks below.
timeouts_triggered += 1;
}
}

Expand All @@ -251,7 +260,9 @@ where
.await
.err_tip(|| "In MatchingEngineActionStateResult::changed")?;

if matches!(awaited_action.state().stage, ActionStage::Queued) {
if matches!(awaited_action.state().stage, ActionStage::Queued)
&& timeouts_triggered < ABS_MAX_TIMEOUTS_BEFORE_RETRY
{
// Actions in queued state do not get periodically updated,
// so we don't need to timeout them.
continue;
Expand Down Expand Up @@ -388,28 +399,28 @@ where
.await
.err_tip(|| "In SimpleSchedulerStateManager::timeout_operation_id")?;

// If the action is not executing, we should not timeout the action.
if !matches!(awaited_action.state().stage, ActionStage::Executing) {
return Ok(());
}

let last_worker_updated = awaited_action
.last_worker_updated_timestamp()
.duration_since(SystemTime::UNIX_EPOCH)
.map_err(|e| {
make_err!(
Code::Internal,
"Failed to convert last_worker_updated to duration since epoch {e:?}"
)
})?;
let worker_should_update_before = last_worker_updated
.checked_add(self.no_event_action_timeout)
.err_tip(|| "Timestamp too big in SimpleSchedulerStateManager::timeout_operation_id")?;
if worker_should_update_before < (self.now_fn)().elapsed() {
// The action was updated recently, we should not timeout the action.
// This is to prevent timing out actions that have recently been updated
// (like multiple clients timeout the same action at the same time).
return Ok(());
// If the action is executing, we check if the worker has updated the action recently.
if matches!(awaited_action.state().stage, ActionStage::Executing) {
let last_worker_updated = awaited_action
.last_worker_updated_timestamp()
.duration_since(SystemTime::UNIX_EPOCH)
.map_err(|e| {
make_err!(
Code::Internal,
"Failed to convert last_worker_updated to duration since epoch {e:?}"
)
})?;
let worker_should_update_before = last_worker_updated
.checked_add(self.no_event_action_timeout)
.err_tip(|| {
"Timestamp too big in SimpleSchedulerStateManager::timeout_operation_id"
})?;
if worker_should_update_before < (self.now_fn)().elapsed() {
// The action was updated recently, we should not timeout the action.
// This is to prevent timing out actions that have recently been updated
// (like multiple clients timeout the same action at the same time).
return Ok(());
}
}

self.assign_operation(
Expand Down Expand Up @@ -497,9 +508,14 @@ where
}
UpdateOperationType::UpdateWithActionStage(stage) => stage.clone(),
UpdateOperationType::UpdateWithError(err) => {
// Don't count items being placed back in queue when they are already queued.
// This might happen as a safety mechanism to prevent an action from being
// stuck in a state where it has not been updated for a while and just making
// sure it is still in the queue by re-inserting it.
let is_queued = awaited_action.state().stage == ActionStage::Queued;
// Don't count a backpressure failure as an attempt for an action.
let due_to_backpressure = err.code == Code::ResourceExhausted;
if !due_to_backpressure {
if !due_to_backpressure && !is_queued {
awaited_action.attempts += 1;
}

Expand Down
Loading