From ecadfb0ee7bb2062ad0023d8307572a3df44010f Mon Sep 17 00:00:00 2001 From: Zach Birenbaum Date: Sun, 17 Mar 2024 00:55:10 +0000 Subject: [PATCH] Remove old actions with no listeners Implement scheduler side removal of actions with no listeners. Adds disconnect_timeout_s configuration field with default of 60s. If the client waiting on a given action is disconnected for longer than this duration without reconnecting the scheduler will stop tracking it. This does not remove it from the worker if the job has already been dispatched. fixes #338 --- nativelink-config/src/schedulers.rs | 6 ++ nativelink-scheduler/src/simple_scheduler.rs | 58 +++++++++++++++- .../tests/simple_scheduler_test.rs | 68 +++++++++++++++++++ 3 files changed, 131 insertions(+), 1 deletion(-) diff --git a/nativelink-config/src/schedulers.rs b/nativelink-config/src/schedulers.rs index 09f79d6e0..170b5f273 100644 --- a/nativelink-config/src/schedulers.rs +++ b/nativelink-config/src/schedulers.rs @@ -119,6 +119,12 @@ pub struct SimpleScheduler { /// The strategy used to assign workers jobs. #[serde(default)] pub allocation_strategy: WorkerAllocationStrategy, + + /// Remove action from queue after this much time has elapsed without a listener + /// amount of time in seconds. + /// Default: 60 (seconds) + #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")] + pub disconnect_timeout_s: u64, } /// A scheduler that simply forwards requests to an upstream scheduler. This diff --git a/nativelink-scheduler/src/simple_scheduler.rs b/nativelink-scheduler/src/simple_scheduler.rs index f35fd03c9..6aa9adab4 100644 --- a/nativelink-scheduler/src/simple_scheduler.rs +++ b/nativelink-scheduler/src/simple_scheduler.rs @@ -18,7 +18,7 @@ use std::collections::BTreeMap; use std::hash::{Hash, Hasher}; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; -use std::time::{Instant, SystemTime}; +use std::time::{Instant, SystemTime, UNIX_EPOCH}; use async_trait::async_trait; use futures::Future; @@ -57,6 +57,10 @@ const DEFAULT_RETAIN_COMPLETED_FOR_S: u64 = 60; /// If this changes, remember to change the documentation in the config. const DEFAULT_MAX_JOB_RETRIES: usize = 3; +/// Default timeout for actions without any listeners +/// If this changes, remember to change the documentation in the config. +const DEFAULT_DISCONNECT_TIMEOUT_S: u64 = 60; + /// An action that is being awaited on and last known state. struct AwaitedAction { action_info: Arc, @@ -68,14 +72,36 @@ struct AwaitedAction { /// Possible last error set by the worker. If empty and attempts is set, it may be due to /// something like a worker timeout. last_error: Option, + + /// Updated on every client connect and periodically while it has listeners. + last_update_timestamp: Mutex } +impl AwaitedAction { + pub fn set_last_update_timestamp(&self, timestamp: u64) { + let mut guard = self.last_update_timestamp.lock(); + *guard = timestamp; + } + pub fn get_last_update_timestamp(&self) -> u64 { + let guard = self.last_update_timestamp.lock(); + *guard + } +} /// Holds the relationship of a worker that is executing a specific action. struct RunningAction { worker_id: WorkerId, action: AwaitedAction, } +impl RunningAction { + pub fn set_last_update_timestamp(&self, timestamp: u64) { + self.action.set_last_update_timestamp(timestamp); + } + pub fn get_last_update_timestamp(&self) -> u64 { + self.action.get_last_update_timestamp() + } +} + struct Workers { workers: LruCache, /// The allocation strategy for workers. @@ -230,6 +256,8 @@ struct SimpleSchedulerImpl { /// Notify task<->worker matching engine that work needs to be done. tasks_or_workers_change_notify: Arc, metrics: Arc, + /// How long the server will wait for a client to reconnect before removing the action from the queue. + disconnect_timeout_s: u64, } impl SimpleSchedulerImpl { @@ -307,6 +335,7 @@ impl SimpleSchedulerImpl { notify_channel: tx, attempts: 0, last_error: None, + last_update_timestamp: Mutex::new(0) }, ); @@ -428,6 +457,7 @@ impl SimpleSchedulerImpl { Ok(()) } + // TODO(blaise.bruer) This is an O(n*m) (aka n^2) algorithm. In theory we can create a map // of capabilities of each worker and then try and match the actions to the worker using // the map lookup (ie. map reduce). @@ -440,6 +470,15 @@ impl SimpleSchedulerImpl { let action_infos: Vec> = self.queued_actions.keys().rev().cloned().collect(); for action_info in action_infos { + // add update to queued action update timestamp here + let action = self.queued_actions.get_mut(&action_info).unwrap(); + let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(); + if action.notify_channel.receiver_count() > 0 { + action.set_last_update_timestamp(now) + } else if action.get_last_update_timestamp() + self.disconnect_timeout_s < now { + self.queued_actions_set.remove(&action_info); + self.queued_actions.remove(&action_info); + } let Some(awaited_action) = self.queued_actions.get(action_info.as_ref()) else { error!( "queued_actions out of sync with itself for action {}", @@ -501,6 +540,17 @@ impl SimpleSchedulerImpl { }, ); } + + let mut remove_actions = Vec::new(); + let running_actions = &mut self.active_actions.values().collect::>(); + for running_action in running_actions { + if running_action.action.notify_channel.receiver_count() > 0 { + running_action.set_last_update_timestamp(SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs()); + } else if running_action.get_last_update_timestamp() + self.disconnect_timeout_s < SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs() { + remove_actions.push(running_action.action.action_info.clone()) + } + } + self.active_actions.retain(|x, _| { !remove_actions.contains(x) }); } fn update_action_with_internal_error( @@ -694,6 +744,11 @@ impl SimpleScheduler { max_job_retries = DEFAULT_MAX_JOB_RETRIES; } + let mut disconnect_timeout_s = scheduler_cfg.disconnect_timeout_s; + if disconnect_timeout_s == 0 { + disconnect_timeout_s = DEFAULT_DISCONNECT_TIMEOUT_S; + } + let tasks_or_workers_change_notify = Arc::new(Notify::new()); let metrics = Arc::new(Metrics::default()); @@ -709,6 +764,7 @@ impl SimpleScheduler { max_job_retries, tasks_or_workers_change_notify: tasks_or_workers_change_notify.clone(), metrics: metrics.clone(), + disconnect_timeout_s })); let weak_inner = Arc::downgrade(&inner); Self { diff --git a/nativelink-scheduler/tests/simple_scheduler_test.rs b/nativelink-scheduler/tests/simple_scheduler_test.rs index 0653d7b73..f4be9064d 100644 --- a/nativelink-scheduler/tests/simple_scheduler_test.rs +++ b/nativelink-scheduler/tests/simple_scheduler_test.rs @@ -96,6 +96,7 @@ async fn setup_action( #[cfg(test)] mod scheduler_tests { use pretty_assertions::assert_eq; + use tokio::time::sleep; use super::*; // Must be declared in every module. @@ -1602,4 +1603,71 @@ mod scheduler_tests { Ok(()) } + + #[tokio::test] + async fn ensure_actions_with_disconnected_clients_are_dropped() -> Result<(), Error> { + const WORKER_ID: WorkerId = WorkerId(0x1234_5678_9111); + const DISCONNECT_TIMEOUT_S: u64 = 1; + + let scheduler = SimpleScheduler::new_with_callback( + &nativelink_config::schedulers::SimpleScheduler { + disconnect_timeout_s: DISCONNECT_TIMEOUT_S, + ..Default::default() + }, + || async move {}, + ); + let action1_digest = DigestInfo::new([98u8; 32], 512); + let action2_digest = DigestInfo::new([99u8; 32], 512); + + let mut rx_from_worker = + setup_new_worker(&scheduler, WORKER_ID, PlatformProperties::default()).await?; + let insert_timestamp = make_system_time(1); + + let client_rx = setup_action( + &scheduler, + action1_digest, + PlatformProperties::default(), + insert_timestamp, + ) + .await?; + + // Drop our receiver + let unique_qualifier = client_rx.borrow().unique_qualifier.clone(); + drop(client_rx); + + // Allow task<->worker matcher to run. + tokio::task::yield_now().await; + + // Sleep for longer than disconnect_timeout_s + let _ = sleep(Duration::from_secs(DISCONNECT_TIMEOUT_S + 1)).await; + + { + // Other tests check full data. We only care if we got StartAction. + match rx_from_worker.recv().await.unwrap().update { + Some(update_for_worker::Update::StartAction(_)) => { /* Success */ } + v => panic!("Expected StartAction, got : {v:?}"), + } + } + + // Setup a second action so matching engine is scheduled to rerun + let client_rx = setup_action( + &scheduler, + action2_digest, + PlatformProperties::default(), + insert_timestamp, + ) + .await?; + drop(client_rx); + + // Allow task<->worker matcher to run. + tokio::task::yield_now().await; + + // Check to make sure that the action was removed + assert!(scheduler + .find_existing_action(&unique_qualifier) + .await + .is_none(),); + + Ok(()) + } }