diff --git a/Cargo.lock b/Cargo.lock index 2405dcc9a..bf90bf222 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1955,6 +1955,7 @@ dependencies = [ "futures", "hashbrown 0.14.5", "lru", + "mock_instant", "nativelink-config", "nativelink-error", "nativelink-macro", diff --git a/nativelink-config/src/stores.rs b/nativelink-config/src/stores.rs index b08c2ba82..ca77ac43f 100644 --- a/nativelink-config/src/stores.rs +++ b/nativelink-config/src/stores.rs @@ -712,7 +712,8 @@ pub struct EvictionPolicy { #[serde(default, deserialize_with = "convert_data_size_with_shellexpand")] pub evict_bytes: usize, - /// Maximum number of seconds for an entry to live before an eviction. + /// Maximum number of seconds for an entry to live since it was last + /// accessed before it is evicted. /// Default: 0. Zero means never evict based on time. #[serde(default, deserialize_with = "convert_duration_with_shellexpand")] pub max_seconds: u32, diff --git a/nativelink-scheduler/BUILD.bazel b/nativelink-scheduler/BUILD.bazel index e7251c78e..0b1b156e8 100644 --- a/nativelink-scheduler/BUILD.bazel +++ b/nativelink-scheduler/BUILD.bazel @@ -83,6 +83,7 @@ rust_test_suite( "//nativelink-store", "//nativelink-util", "@crates//:futures", + "@crates//:mock_instant", "@crates//:pretty_assertions", "@crates//:prost", "@crates//:tokio", diff --git a/nativelink-scheduler/Cargo.toml b/nativelink-scheduler/Cargo.toml index 9c766f300..119f5e4bf 100644 --- a/nativelink-scheduler/Cargo.toml +++ b/nativelink-scheduler/Cargo.toml @@ -21,6 +21,7 @@ uuid = { version = "1.8.0", features = ["v4"] } futures = "0.3.30" hashbrown = "0.14" lru = "0.12.3" +mock_instant = "0.3.2" parking_lot = "0.12.2" rand = "0.8.5" scopeguard = "1.2.0" diff --git a/nativelink-scheduler/src/memory_awaited_action_db.rs b/nativelink-scheduler/src/memory_awaited_action_db.rs index 10992108c..d8ddf4aa9 100644 --- a/nativelink-scheduler/src/memory_awaited_action_db.rs +++ b/nativelink-scheduler/src/memory_awaited_action_db.rs @@ -15,7 +15,7 @@ use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::ops::{Bound, RangeBounds}; use std::sync::Arc; -use std::time::{Duration, Instant, SystemTime}; +use std::time::Duration; use async_lock::Mutex; use async_trait::async_trait; @@ -28,6 +28,7 @@ use nativelink_util::action_messages::{ }; use nativelink_util::chunked_stream::ChunkedStream; use nativelink_util::evicting_map::{EvictingMap, LenEntry}; +use nativelink_util::instant_wrapper::InstantWrapper; use nativelink_util::metrics_utils::{CollectorState, MetricsComponent}; use nativelink_util::operation_state_manager::ActionStateResult; use nativelink_util::spawn; @@ -48,21 +49,21 @@ const CLIENT_KEEPALIVE_DURATION: Duration = Duration::from_secs(10); /// Represents a client that is currently listening to an action. /// When the client is dropped, it will send the [`AwaitedAction`] to the -/// `drop_tx` if there are other cleanups needed. +/// `event_tx` if there are other cleanups needed. #[derive(Debug)] struct ClientAwaitedAction { /// The OperationId that the client is listening to. operation_id: OperationId, /// The sender to notify of this struct being dropped. - drop_tx: mpsc::UnboundedSender, + event_tx: mpsc::UnboundedSender, } impl ClientAwaitedAction { - pub fn new(operation_id: OperationId, drop_tx: mpsc::UnboundedSender) -> Self { + pub fn new(operation_id: OperationId, event_tx: mpsc::UnboundedSender) -> Self { Self { operation_id, - drop_tx, + event_tx, } } @@ -74,7 +75,7 @@ impl ClientAwaitedAction { impl Drop for ClientAwaitedAction { fn drop(&mut self) { // If we failed to send it means noone is listening. - let _ = self.drop_tx.send(ActionEvent::ClientDroppedOperation( + let _ = self.event_tx.send(ActionEvent::ClientDroppedOperation( self.operation_id.clone(), )); } @@ -105,50 +106,61 @@ pub(crate) enum ActionEvent { /// Information required to track an individual client /// keep alive config and state. -struct ClientKeepAlive { +struct ClientInfo I> { /// The client operation id. client_operation_id: ClientOperationId, /// The last time a keep alive was sent. - last_keep_alive: Instant, - /// The sender to notify of this struct being dropped. - drop_tx: mpsc::UnboundedSender, + last_keep_alive: I, + /// The function to get the current time. + now_fn: NowFn, + /// The sender to notify of this struct had an event. + event_tx: mpsc::UnboundedSender, } -/// Subscriber that can be used to monitor when AwaitedActions change. -pub struct MemoryAwaitedActionSubscriber { +/// Subscriber that clients can be used to monitor when AwaitedActions change. +pub struct MemoryAwaitedActionSubscriber I> { /// The receiver to listen for changes. awaited_action_rx: watch::Receiver, - /// The client operation id and keep alive information. - client_operation_info: Option, + /// If a client id is known this is the info needed to keep the client + /// action alive. + client_info: Option>, } -impl MemoryAwaitedActionSubscriber { +impl I> MemoryAwaitedActionSubscriber { pub fn new(mut awaited_action_rx: watch::Receiver) -> Self { awaited_action_rx.mark_changed(); Self { awaited_action_rx, - client_operation_info: None, + client_info: None, } } - pub fn new_with_client( mut awaited_action_rx: watch::Receiver, client_operation_id: ClientOperationId, - drop_tx: mpsc::UnboundedSender, - ) -> Self { + event_tx: mpsc::UnboundedSender, + now_fn: NowFn, + ) -> Self + where + NowFn: Fn() -> I, + { awaited_action_rx.mark_changed(); Self { awaited_action_rx, - client_operation_info: Some(ClientKeepAlive { + client_info: Some(ClientInfo { client_operation_id, - last_keep_alive: Instant::now(), - drop_tx, + last_keep_alive: I::from_secs(0), + now_fn, + event_tx, }), } } } -impl AwaitedActionSubscriber for MemoryAwaitedActionSubscriber { +impl AwaitedActionSubscriber for MemoryAwaitedActionSubscriber +where + I: InstantWrapper, + NowFn: Fn() -> I + Send + Sync + 'static, +{ async fn changed(&mut self) -> Result { { let changed_fut = self.awaited_action_rx.changed().map(|r| { @@ -159,25 +171,26 @@ impl AwaitedActionSubscriber for MemoryAwaitedActionSubscriber { ) }) }); - let Some(client_keep_alive) = self.client_operation_info.as_mut() else { + let Some(client_info) = self.client_info.as_mut() else { changed_fut.await?; return Ok(self.awaited_action_rx.borrow().clone()); }; tokio::pin!(changed_fut); loop { - if client_keep_alive.last_keep_alive.elapsed() > CLIENT_KEEPALIVE_DURATION { - client_keep_alive.last_keep_alive = Instant::now(); + if client_info.last_keep_alive.elapsed() > CLIENT_KEEPALIVE_DURATION { + client_info.last_keep_alive = (client_info.now_fn)(); // Failing to send just means our receiver dropped. - let _ = client_keep_alive.drop_tx.send(ActionEvent::ClientKeepAlive( - client_keep_alive.client_operation_id.clone(), + let _ = client_info.event_tx.send(ActionEvent::ClientKeepAlive( + client_info.client_operation_id.clone(), )); } + let sleep_fut = (client_info.now_fn)().sleep(CLIENT_KEEPALIVE_DURATION); tokio::select! { result = &mut changed_fut => { result?; break; } - _ = tokio::time::sleep(CLIENT_KEEPALIVE_DURATION) => { + _ = sleep_fut => { // If we haven't received any updates for a while, we should // let the database know that we are still listening to prevent // the action from being dropped. @@ -329,10 +342,9 @@ impl SortedAwaitedActions { } /// The database for storing the state of all actions. -pub struct AwaitedActionDbImpl { +pub struct AwaitedActionDbImpl I> { /// A lookup table to lookup the state of an action by its client operation id. - client_operation_to_awaited_action: - EvictingMap, SystemTime>, + client_operation_to_awaited_action: EvictingMap, I>, /// A lookup table to lookup the state of an action by its worker operation id. operation_id_to_awaited_action: BTreeMap>, @@ -351,13 +363,16 @@ pub struct AwaitedActionDbImpl { /// Where to send notifications about important events related to actions. action_event_tx: mpsc::UnboundedSender, + + /// The function to get the current time. + now_fn: NowFn, } -impl AwaitedActionDbImpl { +impl I + Clone + Send + Sync> AwaitedActionDbImpl { async fn get_awaited_action_by_id( &self, client_operation_id: &ClientOperationId, - ) -> Result, Error> { + ) -> Result>, Error> { let maybe_client_awaited_action = self .client_operation_to_awaited_action .get(client_operation_id) @@ -369,7 +384,14 @@ impl AwaitedActionDbImpl { self.operation_id_to_awaited_action .get(client_awaited_action.operation_id()) - .map(|tx| Some(MemoryAwaitedActionSubscriber::new(tx.subscribe()))) + .map(|tx| { + Some(MemoryAwaitedActionSubscriber::new_with_client( + tx.subscribe(), + client_operation_id.clone(), + self.action_event_tx.clone(), + self.now_fn.clone(), + )) + }) .ok_or_else(|| { make_err!( Code::Internal, @@ -487,13 +509,13 @@ impl AwaitedActionDbImpl { &self, start: Bound<&OperationId>, end: Bound<&OperationId>, - ) -> impl Iterator { + ) -> impl Iterator)> { self.operation_id_to_awaited_action .range((start, end)) .map(|(operation_id, tx)| { ( operation_id, - MemoryAwaitedActionSubscriber::new(tx.subscribe()), + MemoryAwaitedActionSubscriber::::new(tx.subscribe()), ) }) } @@ -501,10 +523,10 @@ impl AwaitedActionDbImpl { fn get_by_operation_id( &self, operation_id: &OperationId, - ) -> Option { + ) -> Option> { self.operation_id_to_awaited_action .get(operation_id) - .map(|tx| MemoryAwaitedActionSubscriber::new(tx.subscribe())) + .map(|tx| MemoryAwaitedActionSubscriber::::new(tx.subscribe())) } fn get_range_of_actions<'a, 'b>( @@ -512,7 +534,13 @@ impl AwaitedActionDbImpl { state: SortedAwaitedActionState, range: impl RangeBounds + 'b, ) -> impl DoubleEndedIterator< - Item = Result<(&'a SortedAwaitedAction, MemoryAwaitedActionSubscriber), Error>, + Item = Result< + ( + &'a SortedAwaitedAction, + MemoryAwaitedActionSubscriber, + ), + Error, + >, > + 'a { let btree = match state { SortedAwaitedActionState::CacheCheck => &self.sorted_action_info_hash_keys.cache_check, @@ -674,7 +702,7 @@ impl AwaitedActionDbImpl { &mut self, client_operation_id: ClientOperationId, action_info: Arc, - ) -> Result { + ) -> Result, Error> { // Check to see if the action is already known and subscribe if it is. let subscription_result = self .try_subscribe( @@ -738,6 +766,7 @@ impl AwaitedActionDbImpl { rx, client_operation_id, self.action_event_tx.clone(), + self.now_fn.clone(), )) } @@ -749,7 +778,7 @@ impl AwaitedActionDbImpl { // removed the ability to upgrade priorities of actions. // we should add priority upgrades back in. _priority: i32, - ) -> Result, Error> { + ) -> Result>, Error> { let unique_key = match unique_qualifier { ActionUniqueQualifier::Cachable(unique_key) => unique_key, ActionUniqueQualifier::Uncachable(_unique_key) => return Ok(None), @@ -795,28 +824,33 @@ impl AwaitedActionDbImpl { ) .await; - Ok(Some(MemoryAwaitedActionSubscriber::new(subscription))) + Ok(Some(MemoryAwaitedActionSubscriber::new_with_client( + subscription, + client_operation_id.clone(), + self.action_event_tx.clone(), + self.now_fn.clone(), + ))) } } -pub struct MemoryAwaitedActionDb { - inner: Arc>, +pub struct MemoryAwaitedActionDb I> { + inner: Arc>>, _handle_awaited_action_events: JoinHandleDropGuard<()>, } -impl MemoryAwaitedActionDb { - pub fn new(eviction_config: &EvictionPolicy) -> Self { +impl I + Clone + Send + Sync + 'static> + MemoryAwaitedActionDb +{ + pub fn new(eviction_config: &EvictionPolicy, now_fn: NowFn) -> Self { let (action_event_tx, mut action_event_rx) = mpsc::unbounded_channel(); let inner = Arc::new(Mutex::new(AwaitedActionDbImpl { - client_operation_to_awaited_action: EvictingMap::new( - eviction_config, - SystemTime::now(), - ), + client_operation_to_awaited_action: EvictingMap::new(eviction_config, (now_fn)()), operation_id_to_awaited_action: BTreeMap::new(), action_info_hash_key_to_awaited_action: HashMap::new(), sorted_action_info_hash_keys: SortedAwaitedActions::default(), connected_clients_for_operation_id: HashMap::new(), action_event_tx, + now_fn, })); let weak_inner = Arc::downgrade(&inner); Self { @@ -841,8 +875,10 @@ impl MemoryAwaitedActionDb { } } -impl AwaitedActionDb for MemoryAwaitedActionDb { - type Subscriber = MemoryAwaitedActionSubscriber; +impl I + Clone + Send + Sync + 'static> AwaitedActionDb + for MemoryAwaitedActionDb +{ + type Subscriber = MemoryAwaitedActionSubscriber; async fn get_awaited_action_by_id( &self, @@ -943,7 +979,9 @@ impl AwaitedActionDb for MemoryAwaitedActionDb { } } -impl MetricsComponent for MemoryAwaitedActionDb { +impl I + Send + Sync + 'static> MetricsComponent + for MemoryAwaitedActionDb +{ fn gather_metrics(&self, c: &mut CollectorState) { let inner = self.inner.lock_blocking(); c.publish( diff --git a/nativelink-scheduler/src/simple_scheduler.rs b/nativelink-scheduler/src/simple_scheduler.rs index a0a27cf18..d0f30ae44 100644 --- a/nativelink-scheduler/src/simple_scheduler.rs +++ b/nativelink-scheduler/src/simple_scheduler.rs @@ -14,6 +14,7 @@ use std::pin::Pin; use std::sync::Arc; +use std::time::SystemTime; use async_trait::async_trait; use futures::Future; @@ -22,6 +23,7 @@ use nativelink_error::{Error, ResultExt}; use nativelink_util::action_messages::{ ActionInfo, ActionStage, ActionState, ClientOperationId, OperationId, WorkerId, }; +use nativelink_util::instant_wrapper::InstantWrapper; use nativelink_util::metrics_utils::Registry; use nativelink_util::operation_state_manager::{ ActionStateResult, ActionStateResultStream, ClientStateManager, MatchingEngineStateManager, @@ -245,25 +247,32 @@ impl SimpleScheduler { pub fn new( scheduler_cfg: &nativelink_config::schedulers::SimpleScheduler, ) -> (Arc, Arc) { - Self::new_with_callback(scheduler_cfg, || { - // The cost of running `do_try_match()` is very high, but constant - // in relation to the number of changes that have happened. This - // means that grabbing this lock to process `do_try_match()` should - // always yield to any other tasks that might want the lock. The - // easiest and most fair way to do this is to sleep for a small - // amount of time. Using something like tokio::task::yield_now() - // does not yield as aggresively as we'd like if new futures are - // scheduled within a future. - tokio::time::sleep(Duration::from_millis(1)) - }) + Self::new_with_callback( + scheduler_cfg, + || { + // The cost of running `do_try_match()` is very high, but constant + // in relation to the number of changes that have happened. This + // means that grabbing this lock to process `do_try_match()` should + // always yield to any other tasks that might want the lock. The + // easiest and most fair way to do this is to sleep for a small + // amount of time. Using something like tokio::task::yield_now() + // does not yield as aggresively as we'd like if new futures are + // scheduled within a future. + tokio::time::sleep(Duration::from_millis(1)) + }, + SystemTime::now, + ) } pub fn new_with_callback< Fut: Future + Send, F: Fn() -> Fut + Send + Sync + 'static, + I: InstantWrapper, + NowFn: Fn() -> I + Clone + Send + Sync + 'static, >( scheduler_cfg: &nativelink_config::schedulers::SimpleScheduler, on_matching_engine_run: F, + now_fn: NowFn, ) -> (Arc, Arc) { let platform_property_manager = Arc::new(PlatformPropertyManager::new( scheduler_cfg @@ -291,10 +300,13 @@ impl SimpleScheduler { let state_manager = SimpleSchedulerStateManager::new( tasks_or_worker_change_notify.clone(), max_job_retries, - MemoryAwaitedActionDb::new(&EvictionPolicy { - max_seconds: retain_completed_for_s, - ..Default::default() - }), + MemoryAwaitedActionDb::new( + &EvictionPolicy { + max_seconds: retain_completed_for_s, + ..Default::default() + }, + now_fn, + ), ); let worker_scheduler = ApiWorkerScheduler::new( diff --git a/nativelink-scheduler/tests/simple_scheduler_test.rs b/nativelink-scheduler/tests/simple_scheduler_test.rs index 88ae447a4..d74956222 100644 --- a/nativelink-scheduler/tests/simple_scheduler_test.rs +++ b/nativelink-scheduler/tests/simple_scheduler_test.rs @@ -20,6 +20,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use futures::poll; use futures::task::Poll; +use mock_instant::MockClock; use nativelink_error::{make_err, Code, Error, ResultExt}; use nativelink_macro::nativelink_test; use nativelink_proto::build::bazel::remote::execution::v2::{digest_function, ExecuteRequest}; @@ -37,6 +38,7 @@ use nativelink_util::action_messages::{ }; use nativelink_util::common::DigestInfo; use nativelink_util::digest_hasher::DigestHasherFunc; +use nativelink_util::instant_wrapper::MockInstantWrapped; use nativelink_util::platform_properties::{PlatformProperties, PlatformPropertyValue}; use pretty_assertions::assert_eq; use tokio::sync::mpsc; @@ -149,6 +151,7 @@ async fn basic_add_action_with_one_worker_test() -> Result<(), Error> { let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( &nativelink_config::schedulers::SimpleScheduler::default(), || async move {}, + MockInstantWrapped::default, ); let action_digest = DigestInfo::new([99u8; 32], 512); @@ -203,6 +206,7 @@ async fn find_executing_action() -> Result<(), Error> { let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( &nativelink_config::schedulers::SimpleScheduler::default(), || async move {}, + MockInstantWrapped::default, ); let action_digest = DigestInfo::new([99u8; 32], 512); @@ -269,6 +273,7 @@ async fn remove_worker_reschedules_multiple_running_job_test() -> Result<(), Err ..Default::default() }, || async move {}, + MockInstantWrapped::default, ); let action_digest1 = DigestInfo::new([99u8; 32], 512); let action_digest2 = DigestInfo::new([88u8; 32], 512); @@ -436,6 +441,7 @@ async fn set_drain_worker_pauses_and_resumes_worker_test() -> Result<(), Error> let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( &nativelink_config::schedulers::SimpleScheduler::default(), || async move {}, + MockInstantWrapped::default, ); let action_digest = DigestInfo::new([99u8; 32], 512); @@ -517,6 +523,7 @@ async fn worker_should_not_queue_if_properties_dont_match_test() -> Result<(), E let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( &nativelink_config::schedulers::SimpleScheduler::default(), || async move {}, + MockInstantWrapped::default, ); let action_digest = DigestInfo::new([99u8; 32], 512); let mut platform_properties = PlatformProperties::default(); @@ -597,6 +604,7 @@ async fn cacheable_items_join_same_action_queued_test() -> Result<(), Error> { let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( &nativelink_config::schedulers::SimpleScheduler::default(), || async move {}, + MockInstantWrapped::default, ); let action_digest = DigestInfo::new([99u8; 32], 512); @@ -700,6 +708,7 @@ async fn worker_disconnects_does_not_schedule_for_execution_test() -> Result<(), let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( &nativelink_config::schedulers::SimpleScheduler::default(), || async move {}, + MockInstantWrapped::default, ); let action_digest = DigestInfo::new([99u8; 32], 512); @@ -741,6 +750,7 @@ async fn worker_timesout_reschedules_running_job_test() -> Result<(), Error> { ..Default::default() }, || async move {}, + MockInstantWrapped::default, ); let action_digest = DigestInfo::new([99u8; 32], 512); @@ -859,6 +869,7 @@ async fn update_action_sends_completed_result_to_client_test() -> Result<(), Err let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( &nativelink_config::schedulers::SimpleScheduler::default(), || async move {}, + MockInstantWrapped::default, ); let action_digest = DigestInfo::new([99u8; 32], 512); @@ -954,6 +965,7 @@ async fn update_action_sends_completed_result_after_disconnect() -> Result<(), E let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( &nativelink_config::schedulers::SimpleScheduler::default(), || async move {}, + MockInstantWrapped::default, ); let action_digest = DigestInfo::new([99u8; 32], 512); @@ -1056,6 +1068,7 @@ async fn update_action_with_wrong_worker_id_errors_test() -> Result<(), Error> { let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( &nativelink_config::schedulers::SimpleScheduler::default(), || async move {}, + MockInstantWrapped::default, ); let action_digest = DigestInfo::new([99u8; 32], 512); @@ -1154,6 +1167,7 @@ async fn does_not_crash_if_operation_joined_then_relaunched() -> Result<(), Erro let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( &nativelink_config::schedulers::SimpleScheduler::default(), || async move {}, + MockInstantWrapped::default, ); let action_digest = DigestInfo::new([99u8; 32], 512); @@ -1281,6 +1295,7 @@ async fn run_two_jobs_on_same_worker_with_platform_properties_restrictions() -> let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( &nativelink_config::schedulers::SimpleScheduler::default(), || async move {}, + MockInstantWrapped::default, ); let action_digest1 = DigestInfo::new([11u8; 32], 512); let action_digest2 = DigestInfo::new([99u8; 32], 512); @@ -1417,6 +1432,7 @@ async fn run_jobs_in_the_order_they_were_queued() -> Result<(), Error> { let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( &nativelink_config::schedulers::SimpleScheduler::default(), || async move {}, + MockInstantWrapped::default, ); let action_digest1 = DigestInfo::new([11u8; 32], 512); let action_digest2 = DigestInfo::new([99u8; 32], 512); @@ -1477,6 +1493,7 @@ async fn worker_retries_on_internal_error_and_fails_test() -> Result<(), Error> ..Default::default() }, || async move {}, + MockInstantWrapped::default, ); let action_digest = DigestInfo::new([99u8; 32], 512); @@ -1621,6 +1638,7 @@ async fn ensure_scheduler_drops_inner_spawn() -> Result<(), Error> { let _drop_checker = drop_checker.clone(); async move {} }, + MockInstantWrapped::default, ); assert_eq!(dropped.load(Ordering::Relaxed), false); @@ -1642,6 +1660,7 @@ async fn ensure_task_or_worker_change_notification_received_test() -> Result<(), let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( &nativelink_config::schedulers::SimpleScheduler::default(), || async move {}, + MockInstantWrapped::default, ); let action_digest = DigestInfo::new([99u8; 32], 512); @@ -1698,3 +1717,64 @@ async fn ensure_task_or_worker_change_notification_received_test() -> Result<(), Ok(()) } + +// Note: This is a regression test for: +// https://github.com/TraceMachina/nativelink/issues/1197 +#[nativelink_test] +async fn client_reconnect_keeps_action_alive() -> Result<(), Error> { + let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( + &nativelink_config::schedulers::SimpleScheduler::default(), + || async move {}, + MockInstantWrapped::default, + ); + let action_digest = DigestInfo::new([99u8; 32], 512); + + let insert_timestamp = make_system_time(1); + let action_listener = setup_action( + &scheduler, + action_digest, + PlatformProperties::default(), + insert_timestamp, + ) + .await + .unwrap(); + + let client_id = action_listener.client_operation_id().clone(); + + // Simulate client disconnecting. + drop(action_listener); + + let mut new_action_listener = scheduler + .find_by_client_operation_id(&client_id) + .await + .unwrap() + .expect("Action not found"); + + // We should get one notification saying it's queued. + assert_eq!( + new_action_listener.changed().await.unwrap().stage, + ActionStage::Queued + ); + + let changed_fut = new_action_listener.changed(); + tokio::pin!(changed_fut); + + // Now increment time and ensure the action does not get evicted. + for _ in 0..500 { + MockClock::advance(Duration::from_secs(2)); + // All others should be pending. + assert_eq!(poll!(&mut changed_fut), Poll::Pending); + tokio::task::yield_now().await; + // Eviction happens when someone touches the internal + // evicting map. So we constantly ask for some other client + // to trigger eviction logic. + scheduler + .find_by_client_operation_id(&ClientOperationId::from_raw_string( + "dummy_client_id".to_string(), + )) + .await + .unwrap(); + } + + Ok(()) +} diff --git a/nativelink-util/BUILD.bazel b/nativelink-util/BUILD.bazel index 7175719ca..8cb9fe668 100644 --- a/nativelink-util/BUILD.bazel +++ b/nativelink-util/BUILD.bazel @@ -20,6 +20,7 @@ rust_library( "src/fastcdc.rs", "src/fs.rs", "src/health_utils.rs", + "src/instant_wrapper.rs", "src/lib.rs", "src/metrics_utils.rs", "src/operation_state_manager.rs", @@ -51,6 +52,7 @@ rust_library( "@crates//:hyper", "@crates//:hyper-util", "@crates//:lru", + "@crates//:mock_instant", "@crates//:parking_lot", "@crates//:pin-project", "@crates//:pin-project-lite", diff --git a/nativelink-util/Cargo.toml b/nativelink-util/Cargo.toml index a11c6a5e6..ab0cbb83c 100644 --- a/nativelink-util/Cargo.toml +++ b/nativelink-util/Cargo.toml @@ -35,10 +35,10 @@ tonic = { version = "0.11.0", features = ["tls"] } tracing = "0.1.40" tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"] } uuid = { version = "1.8.0", features = ["v4", "serde"] } +mock_instant = "0.3.2" [dev-dependencies] nativelink-macro = { path = "../nativelink-macro" } pretty_assertions = "1.4.0" rand = "0.8.5" -mock_instant = "0.3.2" diff --git a/nativelink-util/src/evicting_map.rs b/nativelink-util/src/evicting_map.rs index 237ef1d92..a67f2fb60 100644 --- a/nativelink-util/src/evicting_map.rs +++ b/nativelink-util/src/evicting_map.rs @@ -20,7 +20,6 @@ use std::future::Future; use std::hash::Hash; use std::ops::{DerefMut, RangeBounds}; use std::sync::Arc; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; use async_lock::Mutex; use lru::LruCache; @@ -28,6 +27,7 @@ use nativelink_config::stores::EvictionPolicy; use serde::{Deserialize, Serialize}; use tracing::{event, Level}; +use crate::instant_wrapper::InstantWrapper; use crate::metrics_utils::{CollectorState, Counter, CounterWithTime, MetricsComponent}; #[derive(Serialize, Deserialize, PartialEq, Debug, Clone)] @@ -36,30 +36,6 @@ pub struct SerializedLRU { pub anchor_time: u64, } -/// Wrapper used to abstract away which underlying Instant impl we are using. -/// This is needed for testing. -pub trait InstantWrapper: 'static { - fn from_secs(secs: u64) -> Self; - fn unix_timestamp(&self) -> u64; - fn elapsed(&self) -> Duration; -} - -impl InstantWrapper for SystemTime { - fn from_secs(secs: u64) -> SystemTime { - SystemTime::UNIX_EPOCH - .checked_add(Duration::from_secs(secs)) - .unwrap() - } - - fn unix_timestamp(&self) -> u64 { - self.duration_since(UNIX_EPOCH).unwrap().as_secs() - } - - fn elapsed(&self) -> Duration { - ::elapsed(self).unwrap() - } -} - #[derive(Debug)] struct EvictionItem { seconds_since_anchor: i32, @@ -345,7 +321,7 @@ where let lru_len = state.lru.len(); for (key, result) in keys.into_iter().zip(results.iter_mut()) { - match state.lru.get(key.borrow()) { + match state.lru.get_mut(key.borrow()) { Some(entry) => { // Since we are not inserting anythign we don't need to evict based // on the size of the store. @@ -354,6 +330,7 @@ where // we are here. let should_evict = self.should_evict(lru_len, entry, 0, u64::MAX); if !should_evict && entry.data.touch().await { + entry.seconds_since_anchor = self.anchor_time.elapsed().as_secs() as i32; *result = Some(entry.data.len()); } else { *result = None; @@ -383,6 +360,7 @@ where let entry = state.lru.get_mut(key.borrow())?; if entry.data.touch().await { + entry.seconds_since_anchor = self.anchor_time.elapsed().as_secs() as i32; return Some(entry.data.clone()); } diff --git a/nativelink-util/src/instant_wrapper.rs b/nativelink-util/src/instant_wrapper.rs new file mode 100644 index 000000000..4b3075908 --- /dev/null +++ b/nativelink-util/src/instant_wrapper.rs @@ -0,0 +1,84 @@ +// Copyright 2024 The NativeLink Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::future::Future; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +use mock_instant::Instant as MockInstant; + +/// Wrapper used to abstract away which underlying Instant impl we are using. +/// This is needed for testing. +pub trait InstantWrapper: Send + Sync + 'static { + fn from_secs(secs: u64) -> Self; + fn unix_timestamp(&self) -> u64; + fn elapsed(&self) -> Duration; + fn sleep(self, duration: Duration) -> impl Future + Send + Sync + 'static; +} + +impl InstantWrapper for SystemTime { + fn from_secs(secs: u64) -> SystemTime { + SystemTime::UNIX_EPOCH + .checked_add(Duration::from_secs(secs)) + .unwrap() + } + + fn unix_timestamp(&self) -> u64 { + self.duration_since(UNIX_EPOCH).unwrap().as_secs() + } + + fn elapsed(&self) -> Duration { + ::elapsed(self).unwrap() + } + + async fn sleep(self, duration: Duration) { + tokio::time::sleep(duration).await; + } +} + +pub fn default_instant_wrapper() -> impl InstantWrapper { + SystemTime::now() +} + +/// Our mocked out instant that we can pass to our EvictionMap. +pub struct MockInstantWrapped(MockInstant); + +impl Default for MockInstantWrapped { + fn default() -> Self { + Self(MockInstant::now()) + } +} + +impl InstantWrapper for MockInstantWrapped { + fn from_secs(_secs: u64) -> Self { + MockInstantWrapped(MockInstant::now()) + } + + fn unix_timestamp(&self) -> u64 { + 100 + } + + fn elapsed(&self) -> Duration { + self.0.elapsed() + } + + async fn sleep(self, duration: Duration) { + let baseline = self.0.elapsed(); + loop { + tokio::task::yield_now().await; + if self.0.elapsed() - baseline >= duration { + break; + } + } + } +} diff --git a/nativelink-util/src/lib.rs b/nativelink-util/src/lib.rs index 04d7571b8..a4b22aac3 100644 --- a/nativelink-util/src/lib.rs +++ b/nativelink-util/src/lib.rs @@ -23,6 +23,7 @@ pub mod evicting_map; pub mod fastcdc; pub mod fs; pub mod health_utils; +pub mod instant_wrapper; pub mod metrics_utils; pub mod operation_state_manager; pub mod origin_context; diff --git a/nativelink-util/tests/evicting_map_test.rs b/nativelink-util/tests/evicting_map_test.rs index de0e01ecc..1e33cbc66 100644 --- a/nativelink-util/tests/evicting_map_test.rs +++ b/nativelink-util/tests/evicting_map_test.rs @@ -17,12 +17,13 @@ use std::sync::Arc; use std::time::Duration; use bytes::Bytes; -use mock_instant::{Instant as MockInstant, MockClock}; +use mock_instant::MockClock; use nativelink_config::stores::EvictionPolicy; use nativelink_error::Error; use nativelink_macro::nativelink_test; use nativelink_util::common::DigestInfo; -use nativelink_util::evicting_map::{EvictingMap, InstantWrapper, LenEntry}; +use nativelink_util::evicting_map::{EvictingMap, LenEntry}; +use nativelink_util::instant_wrapper::MockInstantWrapped; use pretty_assertions::assert_eq; #[derive(Clone, PartialEq, Debug)] @@ -47,23 +48,6 @@ impl From for BytesWrapper { } } -/// Our mocked out instant that we can pass to our EvictionMap. -struct MockInstantWrapped(MockInstant); - -impl InstantWrapper for MockInstantWrapped { - fn from_secs(_secs: u64) -> Self { - MockInstantWrapped(MockInstant::now()) - } - - fn unix_timestamp(&self) -> u64 { - 100 - } - - fn elapsed(&self) -> Duration { - self.0.elapsed() - } -} - const HASH1: &str = "0123456789abcdef000000000000000000000000000000000123456789abcdef"; const HASH2: &str = "123456789abcdef000000000000000000000000000000000123456789abcdef1"; const HASH3: &str = "23456789abcdef000000000000000000000000000000000123456789abcdef12"; @@ -78,7 +62,7 @@ async fn insert_purges_at_max_count() -> Result<(), Error> { max_bytes: 0, evict_bytes: 0, }, - MockInstantWrapped(MockInstant::now()), + MockInstantWrapped::default(), ); evicting_map .insert(DigestInfo::try_new(HASH1, 0)?, Bytes::new().into()) @@ -134,7 +118,7 @@ async fn insert_purges_at_max_bytes() -> Result<(), Error> { max_bytes: 17, evict_bytes: 0, }, - MockInstantWrapped(MockInstant::now()), + MockInstantWrapped::default(), ); const DATA: &str = "12345678"; evicting_map @@ -191,7 +175,7 @@ async fn insert_purges_to_low_watermark_at_max_bytes() -> Result<(), Error> { max_bytes: 17, evict_bytes: 9, }, - MockInstantWrapped(MockInstant::now()), + MockInstantWrapped::default(), ); const DATA: &str = "12345678"; evicting_map @@ -248,7 +232,7 @@ async fn insert_purges_at_max_seconds() -> Result<(), Error> { max_bytes: 0, evict_bytes: 0, }, - MockInstantWrapped(MockInstant::now()), + MockInstantWrapped::default(), ); const DATA: &str = "12345678"; @@ -309,7 +293,7 @@ async fn get_refreshes_time() -> Result<(), Error> { max_bytes: 0, evict_bytes: 0, }, - MockInstantWrapped(MockInstant::now()), + MockInstantWrapped::default(), ); const DATA: &str = "12345678"; @@ -387,7 +371,7 @@ async fn unref_called_on_replace() -> Result<(), Error> { max_bytes: 0, evict_bytes: 0, }, - MockInstantWrapped(MockInstant::now()), + MockInstantWrapped::default(), ); const DATA1: &str = "12345678"; @@ -433,7 +417,7 @@ async fn contains_key_refreshes_time() -> Result<(), Error> { max_bytes: 0, evict_bytes: 0, }, - MockInstantWrapped(MockInstant::now()), + MockInstantWrapped::default(), ); const DATA: &str = "12345678"; @@ -487,7 +471,7 @@ async fn hashes_equal_sizes_different_doesnt_override() -> Result<(), Error> { max_bytes: 0, evict_bytes: 0, }, - MockInstantWrapped(MockInstant::now()), + MockInstantWrapped::default(), ); let value1 = BytesWrapper(Bytes::from_static(b"12345678")); @@ -540,7 +524,7 @@ async fn get_evicts_on_time() -> Result<(), Error> { max_bytes: 0, evict_bytes: 0, }, - MockInstantWrapped(MockInstant::now()), + MockInstantWrapped::default(), ); const DATA: &str = "12345678"; @@ -572,7 +556,7 @@ async fn remove_evicts_on_time() -> Result<(), Error> { max_bytes: 0, evict_bytes: 0, }, - MockInstantWrapped(MockInstant::now()), + MockInstantWrapped::default(), ); const DATA: &str = "12345678"; @@ -606,7 +590,7 @@ async fn range_multiple_items_test() -> Result<(), Error> { max_bytes: 0, evict_bytes: 0, }, - MockInstantWrapped(MockInstant::now()), + MockInstantWrapped::default(), ); const KEY1: &str = "key-123";