From 2eefa75afe702c0fe6d1e5761bd5cc32c74bbba4 Mon Sep 17 00:00:00 2001 From: Adam Singer Date: Fri, 28 Jun 2024 10:01:12 -0700 Subject: [PATCH] Reduce references to self.state_manager.inner (#1060) Removed unused test method and relocated notify function in `add_action`. --- .../src/scheduler_state/state_manager.rs | 4 +++- nativelink-scheduler/src/simple_scheduler.rs | 22 ------------------- 2 files changed, 3 insertions(+), 23 deletions(-) diff --git a/nativelink-scheduler/src/scheduler_state/state_manager.rs b/nativelink-scheduler/src/scheduler_state/state_manager.rs index 79386c4d8..05ece2885 100644 --- a/nativelink-scheduler/src/scheduler_state/state_manager.rs +++ b/nativelink-scheduler/src/scheduler_state/state_manager.rs @@ -466,6 +466,7 @@ impl ClientStateManager for StateManager { // Check to see if the action is running, if it is and cacheable, merge the actions. if let Some(running_action) = self.inner.active_actions.get_mut(&action_info) { self.inner.metrics.add_action_joined_running_action.inc(); + self.inner.tasks_or_workers_change_notify.notify_one(); return Ok(Arc::new(ClientActionStateResult::new( running_action.notify_channel.subscribe(), ))); @@ -497,6 +498,7 @@ impl ClientStateManager for StateManager { .queued_actions .insert(arc_action_info.clone(), queued_action); self.inner.queued_actions_set.insert(arc_action_info); + self.inner.tasks_or_workers_change_notify.notify_one(); return Ok(result); } @@ -525,7 +527,7 @@ impl ClientStateManager for StateManager { worker_id: None, }, ); - + self.inner.tasks_or_workers_change_notify.notify_one(); return Ok(Arc::new(ClientActionStateResult::new(rx))); } diff --git a/nativelink-scheduler/src/simple_scheduler.rs b/nativelink-scheduler/src/simple_scheduler.rs index cb7040d12..6894f5c5c 100644 --- a/nativelink-scheduler/src/simple_scheduler.rs +++ b/nativelink-scheduler/src/simple_scheduler.rs @@ -88,10 +88,6 @@ impl SimpleSchedulerImpl { action_info: ActionInfo, ) -> Result>, Error> { let add_action_result = self.state_manager.add_action(action_info).await?; - self.state_manager - .inner - .tasks_or_workers_change_notify - .notify_one(); add_action_result.as_receiver().await.cloned() } @@ -484,24 +480,6 @@ impl SimpleScheduler { .contains(worker_id) } - /// Checks to see if the worker can accept work. Should only be used in unit tests. - pub async fn can_worker_accept_work_for_test( - &self, - worker_id: &WorkerId, - ) -> Result { - let mut inner = self.get_inner_lock().await; - let worker = inner - .state_manager - .inner - .workers - .workers - .get_mut(worker_id) - .ok_or_else(|| { - make_input_err!("WorkerId '{}' does not exist in workers map", worker_id) - })?; - Ok(worker.can_accept_work()) - } - /// A unit test function used to send the keep alive message to the worker from the server. pub async fn send_keep_alive_to_worker_for_test( &self,