diff --git a/cas/grpc_service/tests/worker_api_server_test.rs b/cas/grpc_service/tests/worker_api_server_test.rs index c39685fe4..8f07e1648 100644 --- a/cas/grpc_service/tests/worker_api_server_test.rs +++ b/cas/grpc_service/tests/worker_api_server_test.rs @@ -108,10 +108,7 @@ pub mod connect_worker_tests { pub async fn connect_worker_adds_worker_to_scheduler_test() -> Result<(), Box> { let test_context = setup_api_server(BASE_WORKER_TIMEOUT_S, Box::new(static_now_fn)).await?; - let worker_exists = test_context - .scheduler - .contains_worker_for_test(&test_context.worker_id) - .await; + let worker_exists = test_context.scheduler.contains_worker_for_test(&test_context.worker_id); assert!(worker_exists, "Expected worker to exist in worker map"); Ok(()) @@ -132,20 +129,14 @@ pub mod keep_alive_tests { // Now change time to 1 second before timeout and ensure the worker is still in the pool. now_timestamp += BASE_WORKER_TIMEOUT_S - 1; test_context.scheduler.remove_timedout_workers(now_timestamp).await?; - let worker_exists = test_context - .scheduler - .contains_worker_for_test(&test_context.worker_id) - .await; + let worker_exists = test_context.scheduler.contains_worker_for_test(&test_context.worker_id); assert!(worker_exists, "Expected worker to exist in worker map"); } { // Now add 1 second and our worker should have been evicted due to timeout. now_timestamp += 1; test_context.scheduler.remove_timedout_workers(now_timestamp).await?; - let worker_exists = test_context - .scheduler - .contains_worker_for_test(&test_context.worker_id) - .await; + let worker_exists = test_context.scheduler.contains_worker_for_test(&test_context.worker_id); assert!(!worker_exists, "Expected worker to not exist in map"); } @@ -171,10 +162,7 @@ pub mod keep_alive_tests { // Now change time to 1 second before timeout and ensure the worker is still in the pool. let timestamp = add_and_return_timestamp(BASE_WORKER_TIMEOUT_S - 1); test_context.scheduler.remove_timedout_workers(timestamp).await?; - let worker_exists = test_context - .scheduler - .contains_worker_for_test(&test_context.worker_id) - .await; + let worker_exists = test_context.scheduler.contains_worker_for_test(&test_context.worker_id); assert!(worker_exists, "Expected worker to exist in worker map"); } { @@ -191,10 +179,7 @@ pub mod keep_alive_tests { // Now add 1 second and our worker should still exist in our map. let timestamp = add_and_return_timestamp(1); test_context.scheduler.remove_timedout_workers(timestamp).await?; - let worker_exists = test_context - .scheduler - .contains_worker_for_test(&test_context.worker_id) - .await; + let worker_exists = test_context.scheduler.contains_worker_for_test(&test_context.worker_id); assert!(worker_exists, "Expected worker to exist in map"); } @@ -209,7 +194,6 @@ pub mod keep_alive_tests { test_context .scheduler .send_keep_alive_to_worker_for_test(&test_context.worker_id) - .await .err_tip(|| "Could not send keep alive to worker")?; { @@ -240,18 +224,12 @@ pub mod going_away_tests { pub async fn going_away_removes_worker_test() -> Result<(), Box> { let test_context = setup_api_server(BASE_WORKER_TIMEOUT_S, Box::new(static_now_fn)).await?; - let worker_exists = test_context - .scheduler - .contains_worker_for_test(&test_context.worker_id) - .await; + let worker_exists = test_context.scheduler.contains_worker_for_test(&test_context.worker_id); assert!(worker_exists, "Expected worker to exist in worker map"); test_context.scheduler.remove_worker(test_context.worker_id).await; - let worker_exists = test_context - .scheduler - .contains_worker_for_test(&test_context.worker_id) - .await; + let worker_exists = test_context.scheduler.contains_worker_for_test(&test_context.worker_id); assert!(!worker_exists, "Expected worker to be removed from worker map"); Ok(()) diff --git a/cas/scheduler/default_scheduler_factory.rs b/cas/scheduler/default_scheduler_factory.rs index ddad4846a..bbe379678 100644 --- a/cas/scheduler/default_scheduler_factory.rs +++ b/cas/scheduler/default_scheduler_factory.rs @@ -34,7 +34,7 @@ pub fn scheduler_factory<'a>( Box::pin(async move { let scheduler: SchedulerFactoryResults = match scheduler_type_cfg { SchedulerConfig::simple(config) => { - let scheduler = Arc::new(SimpleScheduler::new(&config)); + let scheduler = Arc::new(SimpleScheduler::new(config)); (Some(scheduler.clone()), Some(scheduler)) } SchedulerConfig::grpc(config) => (Some(Arc::new(GrpcScheduler::new(config).await?)), None), diff --git a/cas/scheduler/grpc_scheduler.rs b/cas/scheduler/grpc_scheduler.rs index 803f40322..d9278dec5 100644 --- a/cas/scheduler/grpc_scheduler.rs +++ b/cas/scheduler/grpc_scheduler.rs @@ -22,7 +22,6 @@ use tonic::{transport, Request}; use action_messages::{ActionInfo, ActionState, DEFAULT_EXECUTION_PRIORITY}; use common::log; -use config; use error::{make_err, Code, Error, ResultExt}; use platform_property_manager::PlatformPropertyManager; use proto::build::bazel::remote::execution::v2::{ diff --git a/cas/scheduler/platform_property_manager.rs b/cas/scheduler/platform_property_manager.rs index cace77ea4..1703ccfd1 100644 --- a/cas/scheduler/platform_property_manager.rs +++ b/cas/scheduler/platform_property_manager.rs @@ -31,29 +31,31 @@ pub struct PlatformProperties { } impl PlatformProperties { - pub fn new(map: HashMap) -> Self { + #[must_use] + pub const fn new(map: HashMap) -> Self { Self { properties: map } } /// Determines if the worker's `PlatformProperties` is satisfied by this struct. - pub fn is_satisfied_by(&self, worker_properties: &PlatformProperties) -> bool { + #[must_use] + pub fn is_satisfied_by(&self, worker_properties: &Self) -> bool { for (property, check_value) in &self.properties { if let Some(worker_value) = worker_properties.properties.get(property) { - if !check_value.is_satisfied_by(&worker_value) { + if !check_value.is_satisfied_by(worker_value) { return false; } } else { return false; } } - return true; + true } } impl From for PlatformProperties { fn from(platform: ProtoPlatform) -> Self { let mut properties = HashMap::with_capacity(platform.properties.len()); - for property in platform.properties.into_iter() { + for property in platform.properties { properties.insert(property.name, PlatformPropertyValue::Unknown(property.value)); } Self { properties } @@ -82,13 +84,14 @@ pub enum PlatformPropertyValue { impl PlatformPropertyValue { /// Same as `PlatformProperties::is_satisfied_by`, but on an individual value. - pub fn is_satisfied_by(&self, worker_value: &PlatformPropertyValue) -> bool { + #[must_use] + pub fn is_satisfied_by(&self, worker_value: &Self) -> bool { if self == worker_value { return true; } match self { - PlatformPropertyValue::Minimum(v) => { - if let PlatformPropertyValue::Minimum(worker_v) = worker_value { + Self::Minimum(v) => { + if let Self::Minimum(worker_v) = worker_value { return worker_v >= v; } false @@ -96,27 +99,27 @@ impl PlatformPropertyValue { // Priority is used to pass info to the worker and not restrict which // workers can be selected, but might be used to prefer certain workers // over others. - PlatformPropertyValue::Priority(_) => true, + Self::Priority(_) => true, // Success exact case is handled above. - PlatformPropertyValue::Exact(_) => false, - // Used mostly for transporting data. This should not be relied upon when this value. - PlatformPropertyValue::Unknown(_) => false, + Self::Exact(_) | Self::Unknown(_) => false, } } } -/// Helps manage known properties and conversion into PlatformPropertyValue. +/// Helps manage known properties and conversion into `PlatformPropertyValue`. pub struct PlatformPropertyManager { known_properties: HashMap, } impl PlatformPropertyManager { - pub fn new(known_properties: HashMap) -> Self { + #[must_use] + pub const fn new(known_properties: HashMap) -> Self { Self { known_properties } } /// Returns the `known_properties` map. - pub fn get_known_properties(&self) -> &HashMap { + #[must_use] + pub const fn get_known_properties(&self) -> &HashMap { &self.known_properties } @@ -126,14 +129,14 @@ impl PlatformPropertyManager { pub fn make_prop_value(&self, key: &str, value: &str) -> Result { if let Some(prop_type) = self.known_properties.get(key) { return match prop_type { - PropertyType::Minimum => Ok(PlatformPropertyValue::Minimum( - u64::from_str_radix(value, 10).err_tip_with_code(|e| { + PropertyType::Minimum => Ok(PlatformPropertyValue::Minimum(value.parse::().err_tip_with_code( + |e| { ( Code::InvalidArgument, - format!("Cannot convert to platform property to u64: {} - {}", value, e), + format!("Cannot convert to platform property to u64: {value} - {e}"), ) - })?, - )), + }, + )?)), PropertyType::Exact => Ok(PlatformPropertyValue::Exact(value.to_string())), PropertyType::Priority => Ok(PlatformPropertyValue::Priority(value.to_string())), }; diff --git a/cas/scheduler/simple_scheduler.rs b/cas/scheduler/simple_scheduler.rs index b2c68a5c9..2f811071c 100644 --- a/cas/scheduler/simple_scheduler.rs +++ b/cas/scheduler/simple_scheduler.rs @@ -26,7 +26,6 @@ use tokio::time::Duration; use action_messages::{ActionInfo, ActionInfoHashKey, ActionResult, ActionStage, ActionState}; use common::log; -use config; use error::{error_if, make_err, make_input_err, Code, Error, ResultExt}; use platform_property_manager::PlatformPropertyManager; use scheduler::{ActionScheduler, WorkerScheduler}; @@ -220,7 +219,7 @@ impl SimpleSchedulerImpl { // Action needs to be added to queue or is not cacheable. let action_info = Arc::new(action_info); - let action_digest = action_info.digest().clone(); + let action_digest = *action_info.digest(); // TODO(allada) This name field needs to be indexable. The client might perform operations // based on the name later. It cannot be the same index used as the workers though, because @@ -255,7 +254,7 @@ impl SimpleSchedulerImpl { let mut awaited_action = running_action.action; let send_result = if awaited_action.attempts >= self.max_job_retries { let mut default_action_result = ActionResult::default(); - default_action_result.execution_metadata.worker = format!("{}", worker_id); + default_action_result.execution_metadata.worker = format!("{worker_id}"); Arc::make_mut(&mut awaited_action.current_state).stage = ActionStage::Error(( awaited_action.last_error.unwrap_or_else(|| { make_err!( @@ -294,13 +293,13 @@ impl SimpleSchedulerImpl { /// Evicts the worker from the pool and puts items back into the queue if anything was being executed on it. fn immediate_evict_worker(&mut self, worker_id: &WorkerId) { - if let Some(mut worker) = self.workers.remove_worker(&worker_id) { + if let Some(mut worker) = self.workers.remove_worker(worker_id) { // We don't care if we fail to send message to worker, this is only a best attempt. let _ = worker.notify_update(WorkerUpdate::Disconnect); // We create a temporary Vec to avoid doubt about a possible code // path touching the worker.running_action_infos elsewhere. for action_info in worker.running_action_infos.drain() { - self.retry_action(&action_info, &worker_id); + self.retry_action(&action_info, worker_id); } } // Note: Calling this many time is very cheap, it'll only trigger `do_try_match` once. @@ -318,24 +317,19 @@ impl SimpleSchedulerImpl { // unstable feature [see: https://github.com/rust-lang/rust/issues/70530]). let action_infos: Vec> = self.queued_actions.keys().rev().cloned().collect(); for action_info in action_infos { - let awaited_action = match self.queued_actions.get(action_info.as_ref()) { - Some(awaited_action) => awaited_action, - _ => { - log::error!( - "queued_actions out of sync with itself for action {}", - action_info.digest().str() - ); - continue; - } + let Some(awaited_action) = self.queued_actions.get(action_info.as_ref()) else { + log::error!( + "queued_actions out of sync with itself for action {}", + action_info.digest().str() + ); + continue; }; - let worker = if let Some(worker) = self.workers.find_worker_for_action_mut(&awaited_action) { - worker - } else { + let Some(worker) = self.workers.find_worker_for_action_mut(awaited_action) else { // No worker found, check the next action to see if there's a // matching one for that. continue; }; - let worker_id = worker.id.clone(); + let worker_id = worker.id; // Try to notify our worker of the new action to run, if it fails remove the worker from the // pool and try to find another worker. @@ -381,15 +375,9 @@ impl SimpleSchedulerImpl { action_info_hash_key: &ActionInfoHashKey, err: Error, ) { - let (action_info, mut running_action) = match self.active_actions.remove_entry(action_info_hash_key) { - Some((action_info, running_action)) => (action_info, running_action), - None => { - log::error!( - "Could not find action info in active actions : {:?}", - action_info_hash_key - ); - return; - } + let Some((action_info, mut running_action)) = self.active_actions.remove_entry(action_info_hash_key) else { + log::error!("Could not find action info in active actions : {action_info_hash_key:?}"); + return; }; let due_to_backpressure = err.code == Code::ResourceExhausted; @@ -398,15 +386,15 @@ impl SimpleSchedulerImpl { running_action.action.attempts -= 1; } - if running_action.worker_id != *worker_id { + if running_action.worker_id == *worker_id { + // Don't set the error on an action that's running somewhere else. + log::warn!("Internal error for worker {}: {}", worker_id, err); + running_action.action.last_error = Some(err); + } else { log::error!( "Got a result from a worker that should not be running the action, Removing worker. Expected worker {} got worker {}", running_action.worker_id, worker_id ); - } else { - // Don't set the error on an action that's running somewhere else. - log::warn!("Internal error for worker {}: {}", worker_id, err); - running_action.action.last_error = Some(err); } // Now put it back. retry_action() needs it to be there to send errors properly. @@ -425,7 +413,7 @@ impl SimpleSchedulerImpl { } // Re-queue the action or fail on max attempts. - self.retry_action(&action_info, &worker_id); + self.retry_action(&action_info, worker_id); } fn update_action( @@ -435,22 +423,16 @@ impl SimpleSchedulerImpl { action_stage: ActionStage, ) -> Result<(), Error> { if !action_stage.has_action_result() { - let msg = format!( - "Worker '{}' set the action_stage of running action {:?} to {:?}. Removing worker.", - worker_id, action_info_hash_key, action_stage - ); + let msg = format!("Worker '{worker_id}' set the action_stage of running action {action_info_hash_key:?} to {action_stage:?}. Removing worker."); log::error!("{}", msg); self.immediate_evict_worker(worker_id); return Err(make_input_err!("{}", msg)); } - let (action_info, mut running_action) = - self.active_actions.remove_entry(action_info_hash_key).err_tip(|| { - format!( - "Could not find action info in active actions : {:?}", - action_info_hash_key - ) - })?; + let (action_info, mut running_action) = self + .active_actions + .remove_entry(action_info_hash_key) + .err_tip(|| format!("Could not find action info in active actions : {action_info_hash_key:?}"))?; if running_action.worker_id != *worker_id { let msg = format!( @@ -462,7 +444,7 @@ impl SimpleSchedulerImpl { ); log::error!("{}", msg); // First put it back in our active_actions or we will drop the task. - self.active_actions.insert(action_info.clone(), running_action); + self.active_actions.insert(action_info, running_action); self.immediate_evict_worker(worker_id); return Err(make_input_err!("{}", msg)); } @@ -512,6 +494,7 @@ pub struct SimpleScheduler { impl SimpleScheduler { #[inline] + #[must_use] pub fn new(scheduler_cfg: &config::schedulers::SimpleScheduler) -> Self { Self::new_with_callback(scheduler_cfg, || { // The cost of running `do_try_match()` is very high, but constant @@ -530,10 +513,7 @@ impl SimpleScheduler { on_matching_engine_run: F, ) -> Self { let platform_property_manager = Arc::new(PlatformPropertyManager::new( - scheduler_cfg - .supported_platform_properties - .clone() - .unwrap_or(HashMap::new()), + scheduler_cfg.supported_platform_properties.clone().unwrap_or_default(), )); let mut worker_timeout_s = scheduler_cfg.worker_timeout_s; @@ -551,7 +531,7 @@ impl SimpleScheduler { let inner = Arc::new(Mutex::new(SimpleSchedulerImpl { queued_actions_set: HashSet::new(), queued_actions: BTreeMap::new(), - workers: Workers::new(scheduler_cfg.allocation_strategy.clone()), + workers: Workers::new(scheduler_cfg.allocation_strategy), active_actions: HashMap::new(), worker_timeout_s, max_job_retries, @@ -559,7 +539,7 @@ impl SimpleScheduler { })); let weak_inner = Arc::downgrade(&inner); Self { - inner: inner.clone(), + inner, platform_property_manager, task_worker_matching_future: tokio::spawn(async move { // Break out of the loop only when the inner is dropped. @@ -586,13 +566,14 @@ impl SimpleScheduler { } /// Checks to see if the worker exists in the worker pool. Should only be used in unit tests. - pub async fn contains_worker_for_test(&self, worker_id: &WorkerId) -> bool { + #[must_use] + pub fn contains_worker_for_test(&self, worker_id: &WorkerId) -> bool { let inner = self.inner.lock(); inner.workers.workers.contains(worker_id) } /// A unit test function used to send the keep alive message to the worker from the server. - pub async fn send_keep_alive_to_worker_for_test(&self, worker_id: &WorkerId) -> Result<(), Error> { + pub fn send_keep_alive_to_worker_for_test(&self, worker_id: &WorkerId) -> Result<(), Error> { let mut inner = self.inner.lock(); let worker = inner .workers @@ -626,7 +607,7 @@ impl WorkerScheduler for SimpleScheduler { } async fn add_worker(&self, worker: Worker) -> Result<(), Error> { - let worker_id = worker.id.clone(); + let worker_id = worker.id; let mut inner = self.inner.lock(); let res = inner .workers @@ -646,7 +627,7 @@ impl WorkerScheduler for SimpleScheduler { err: Error, ) { let mut inner = self.inner.lock(); - inner.update_action_with_internal_error(worker_id, action_info_hash_key, err) + inner.update_action_with_internal_error(worker_id, action_info_hash_key, err); } async fn update_action( @@ -689,9 +670,9 @@ impl WorkerScheduler for SimpleScheduler { } }) .collect(); - for worker_id in worker_ids_to_remove.iter() { + for worker_id in &worker_ids_to_remove { log::warn!("Worker {} timed out, removing from pool", worker_id); - inner.immediate_evict_worker(&worker_id); + inner.immediate_evict_worker(worker_id); } Ok(()) } diff --git a/cas/scheduler/tests/action_messages_test.rs b/cas/scheduler/tests/action_messages_test.rs index 5cda773cf..c412b6ef0 100644 --- a/cas/scheduler/tests/action_messages_test.rs +++ b/cas/scheduler/tests/action_messages_test.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, HashMap}; +use std::collections::{BTreeSet, HashMap}; use std::sync::Arc; use std::time::{Duration, SystemTime}; @@ -51,7 +51,7 @@ mod action_messages_tests { any.type_url, "type.googleapis.com/build.bazel.remote.execution.v2.ExecuteResponse" ), - other => assert!(false, "Expected Some(Result(Any)), got: {:?}", other), + other => panic!("Expected Some(Result(Any)), got: {other:?}"), } Ok(()) @@ -79,7 +79,7 @@ mod action_messages_tests { output_upload_start_timestamp: SystemTime::UNIX_EPOCH, output_upload_completed_timestamp: SystemTime::UNIX_EPOCH, }, - server_logs: Default::default(), + server_logs: HashMap::default(), }) .into(); @@ -127,13 +127,13 @@ mod action_messages_tests { }, skip_cache_lookup: true, }); - let mut action_map = BTreeMap::, ()>::new(); - action_map.insert(lowest_priority_action.clone(), ()); - action_map.insert(high_priority_action.clone(), ()); + let mut action_set = BTreeSet::>::new(); + action_set.insert(lowest_priority_action.clone()); + action_set.insert(high_priority_action.clone()); assert_eq!( vec![high_priority_action, lowest_priority_action], - action_map.keys().rev().cloned().collect::>>() + action_set.iter().rev().cloned().collect::>>() ); Ok(()) @@ -177,13 +177,13 @@ mod action_messages_tests { }, skip_cache_lookup: true, }); - let mut action_map = BTreeMap::, ()>::new(); - action_map.insert(current_action.clone(), ()); - action_map.insert(first_action.clone(), ()); + let mut action_set = BTreeSet::>::new(); + action_set.insert(current_action.clone()); + action_set.insert(first_action.clone()); assert_eq!( vec![first_action, current_action], - action_map.keys().rev().cloned().collect::>>() + action_set.iter().rev().cloned().collect::>>() ); Ok(()) diff --git a/cas/scheduler/tests/simple_scheduler_test.rs b/cas/scheduler/tests/simple_scheduler_test.rs index ba5d36266..25720cc8f 100644 --- a/cas/scheduler/tests/simple_scheduler_test.rs +++ b/cas/scheduler/tests/simple_scheduler_test.rs @@ -24,7 +24,6 @@ use action_messages::{ SymlinkInfo, INTERNAL_ERROR_EXIT_CODE, }; use common::DigestInfo; -use config; use error::{make_err, Code, Error, ResultExt}; use platform_property_manager::{PlatformProperties, PlatformPropertyValue}; use proto::build::bazel::remote::execution::v2::ExecuteRequest; @@ -92,16 +91,21 @@ mod scheduler_tests { #[tokio::test] async fn basic_add_action_with_one_worker_test() -> Result<(), Error> { - const WORKER_ID: WorkerId = WorkerId(0x123456789111); + const WORKER_ID: WorkerId = WorkerId(0x1234_5678_9111); let scheduler = SimpleScheduler::new_with_callback(&config::schedulers::SimpleScheduler::default(), || async move {}); let action_digest = DigestInfo::new([99u8; 32], 512); - let mut rx_from_worker = setup_new_worker(&scheduler, WORKER_ID, Default::default()).await?; + let mut rx_from_worker = setup_new_worker(&scheduler, WORKER_ID, PlatformProperties::default()).await?; let insert_timestamp = make_system_time(1); - let mut client_rx = - setup_action(&scheduler, action_digest.clone(), Default::default(), insert_timestamp).await?; + let mut client_rx = setup_action( + &scheduler, + action_digest, + PlatformProperties::default(), + insert_timestamp, + ) + .await?; { // Worker should have been sent an execute command. @@ -110,7 +114,7 @@ mod scheduler_tests { execute_request: Some(ExecuteRequest { instance_name: INSTANCE_NAME.to_string(), skip_cache_lookup: true, - action_digest: Some(action_digest.clone().into()), + action_digest: Some(action_digest.into()), ..Default::default() }), salt: 0, @@ -126,7 +130,7 @@ mod scheduler_tests { let expected_action_state = ActionState { // Name is a random string, so we ignore it and just make it the same. name: action_state.name.clone(), - action_digest: action_digest.clone(), + action_digest, stage: ActionStage::Executing, }; assert_eq!(action_state.as_ref(), &expected_action_state); @@ -137,8 +141,8 @@ mod scheduler_tests { #[tokio::test] async fn remove_worker_reschedules_multiple_running_job_test() -> Result<(), Error> { - const WORKER_ID1: WorkerId = WorkerId(0x111111); - const WORKER_ID2: WorkerId = WorkerId(0x222222); + const WORKER_ID1: WorkerId = WorkerId(0x0011_1111); + const WORKER_ID2: WorkerId = WorkerId(0x0022_2222); let scheduler = SimpleScheduler::new_with_callback( &config::schedulers::SimpleScheduler { worker_timeout_s: WORKER_TIMEOUT_S, @@ -149,20 +153,20 @@ mod scheduler_tests { let action_digest1 = DigestInfo::new([99u8; 32], 512); let action_digest2 = DigestInfo::new([88u8; 32], 512); - let mut rx_from_worker1 = setup_new_worker(&scheduler, WORKER_ID1, Default::default()).await?; + let mut rx_from_worker1 = setup_new_worker(&scheduler, WORKER_ID1, PlatformProperties::default()).await?; let insert_timestamp1 = make_system_time(1); let mut client_rx1 = setup_action( &scheduler, - action_digest1.clone(), - Default::default(), + action_digest1, + PlatformProperties::default(), insert_timestamp1, ) .await?; let insert_timestamp2 = make_system_time(2); let mut client_rx2 = setup_action( &scheduler, - action_digest2.clone(), - Default::default(), + action_digest2, + PlatformProperties::default(), insert_timestamp2, ) .await?; @@ -170,13 +174,13 @@ mod scheduler_tests { let mut expected_action_state1 = ActionState { // Name is a random string, so we ignore it and just make it the same. name: "UNKNOWN_HERE".to_string(), - action_digest: action_digest1.clone(), + action_digest: action_digest1, stage: ActionStage::Executing, }; let mut expected_action_state2 = ActionState { // Name is a random string, so we ignore it and just make it the same. name: "UNKNOWN_HERE".to_string(), - action_digest: action_digest2.clone(), + action_digest: action_digest2, stage: ActionStage::Executing, }; @@ -185,7 +189,7 @@ mod scheduler_tests { execute_request: Some(ExecuteRequest { instance_name: INSTANCE_NAME.to_string(), skip_cache_lookup: true, - action_digest: Some(action_digest1.clone().into()), + action_digest: Some(action_digest1.into()), ..Default::default() }), salt: 0, @@ -202,7 +206,7 @@ mod scheduler_tests { execute_request: Some(ExecuteRequest { instance_name: INSTANCE_NAME.to_string(), skip_cache_lookup: true, - action_digest: Some(action_digest2.clone().into()), + action_digest: Some(action_digest2.into()), ..Default::default() }), salt: 0, @@ -216,7 +220,7 @@ mod scheduler_tests { } // Add a second worker that can take jobs if the first dies. - let mut rx_from_worker2 = setup_new_worker(&scheduler, WORKER_ID2, Default::default()).await?; + let mut rx_from_worker2 = setup_new_worker(&scheduler, WORKER_ID2, PlatformProperties::default()).await?; { // Client should get notification saying it's being executed. @@ -275,6 +279,9 @@ mod scheduler_tests { #[tokio::test] async fn worker_should_not_queue_if_properties_dont_match_test() -> Result<(), Error> { + const WORKER_ID1: WorkerId = WorkerId(0x0010_0001); + const WORKER_ID2: WorkerId = WorkerId(0x0010_0002); + let scheduler = SimpleScheduler::new_with_callback(&config::schedulers::SimpleScheduler::default(), || async move {}); let action_digest = DigestInfo::new([99u8; 32], 512); @@ -287,17 +294,10 @@ mod scheduler_tests { .properties .insert("prop".to_string(), PlatformPropertyValue::Exact("2".to_string())); - const WORKER_ID1: WorkerId = WorkerId(0x100001); - const WORKER_ID2: WorkerId = WorkerId(0x100002); let mut rx_from_worker1 = setup_new_worker(&scheduler, WORKER_ID1, platform_properties.clone()).await?; let insert_timestamp = make_system_time(1); - let mut client_rx = setup_action( - &scheduler, - action_digest.clone(), - worker_properties.clone(), - insert_timestamp, - ) - .await?; + let mut client_rx = + setup_action(&scheduler, action_digest, worker_properties.clone(), insert_timestamp).await?; { // Client should get notification saying it's been queued. @@ -305,7 +305,7 @@ mod scheduler_tests { let expected_action_state = ActionState { // Name is a random string, so we ignore it and just make it the same. name: action_state.name.clone(), - action_digest: action_digest.clone(), + action_digest, stage: ActionStage::Queued, }; assert_eq!(action_state.as_ref(), &expected_action_state); @@ -319,7 +319,7 @@ mod scheduler_tests { execute_request: Some(ExecuteRequest { instance_name: INSTANCE_NAME.to_string(), skip_cache_lookup: true, - action_digest: Some(action_digest.clone().into()), + action_digest: Some(action_digest.into()), ..Default::default() }), salt: 0, @@ -335,7 +335,7 @@ mod scheduler_tests { let expected_action_state = ActionState { // Name is a random string, so we ignore it and just make it the same. name: action_state.name.clone(), - action_digest: action_digest.clone(), + action_digest, stage: ActionStage::Executing, }; assert_eq!(action_state.as_ref(), &expected_action_state); @@ -349,24 +349,34 @@ mod scheduler_tests { #[tokio::test] async fn cacheable_items_join_same_action_queued_test() -> Result<(), Error> { - const WORKER_ID: WorkerId = WorkerId(0x100009); + const WORKER_ID: WorkerId = WorkerId(0x0010_0009); let scheduler = SimpleScheduler::new_with_callback(&config::schedulers::SimpleScheduler::default(), || async move {}); let action_digest = DigestInfo::new([99u8; 32], 512); let mut expected_action_state = ActionState { - name: "".to_string(), // Will be filled later. - action_digest: action_digest.clone(), + name: String::new(), // Will be filled later. + action_digest, stage: ActionStage::Queued, }; let insert_timestamp1 = make_system_time(1); let insert_timestamp2 = make_system_time(2); - let mut client1_rx = - setup_action(&scheduler, action_digest.clone(), Default::default(), insert_timestamp1).await?; - let mut client2_rx = - setup_action(&scheduler, action_digest.clone(), Default::default(), insert_timestamp2).await?; + let mut client1_rx = setup_action( + &scheduler, + action_digest, + PlatformProperties::default(), + insert_timestamp1, + ) + .await?; + let mut client2_rx = setup_action( + &scheduler, + action_digest, + PlatformProperties::default(), + insert_timestamp2, + ) + .await?; { // Clients should get notification saying it's been queued. @@ -378,7 +388,7 @@ mod scheduler_tests { assert_eq!(action_state2.as_ref(), &expected_action_state); } - let mut rx_from_worker = setup_new_worker(&scheduler, WORKER_ID, Default::default()).await?; + let mut rx_from_worker = setup_new_worker(&scheduler, WORKER_ID, PlatformProperties::default()).await?; { // Worker should have been sent an execute command. @@ -387,7 +397,7 @@ mod scheduler_tests { execute_request: Some(ExecuteRequest { instance_name: INSTANCE_NAME.to_string(), skip_cache_lookup: true, - action_digest: Some(action_digest.clone().into()), + action_digest: Some(action_digest.into()), ..Default::default() }), salt: 0, @@ -410,8 +420,13 @@ mod scheduler_tests { { // Now if another action is requested it should also join with executing action. let insert_timestamp3 = make_system_time(2); - let mut client3_rx = - setup_action(&scheduler, action_digest.clone(), Default::default(), insert_timestamp3).await?; + let mut client3_rx = setup_action( + &scheduler, + action_digest, + PlatformProperties::default(), + insert_timestamp3, + ) + .await?; assert_eq!(client3_rx.borrow_and_update().as_ref(), &expected_action_state); } @@ -420,26 +435,31 @@ mod scheduler_tests { #[tokio::test] async fn worker_disconnects_does_not_schedule_for_execution_test() -> Result<(), Error> { - const WORKER_ID: WorkerId = WorkerId(0x100010); + const WORKER_ID: WorkerId = WorkerId(0x0010_0010); let scheduler = SimpleScheduler::new_with_callback(&config::schedulers::SimpleScheduler::default(), || async move {}); let action_digest = DigestInfo::new([99u8; 32], 512); - let rx_from_worker = setup_new_worker(&scheduler, WORKER_ID, Default::default()).await?; + let rx_from_worker = setup_new_worker(&scheduler, WORKER_ID, PlatformProperties::default()).await?; // Now act like the worker disconnected. drop(rx_from_worker); let insert_timestamp = make_system_time(1); - let mut client_rx = - setup_action(&scheduler, action_digest.clone(), Default::default(), insert_timestamp).await?; + let mut client_rx = setup_action( + &scheduler, + action_digest, + PlatformProperties::default(), + insert_timestamp, + ) + .await?; { // Client should get notification saying it's being queued not executed. let action_state = client_rx.borrow_and_update(); let expected_action_state = ActionState { // Name is a random string, so we ignore it and just make it the same. name: action_state.name.clone(), - action_digest: action_digest.clone(), + action_digest, stage: ActionStage::Queued, }; assert_eq!(action_state.as_ref(), &expected_action_state); @@ -450,8 +470,8 @@ mod scheduler_tests { #[tokio::test] async fn worker_timesout_reschedules_running_job_test() -> Result<(), Error> { - const WORKER_ID1: WorkerId = WorkerId(0x111111); - const WORKER_ID2: WorkerId = WorkerId(0x222222); + const WORKER_ID1: WorkerId = WorkerId(0x0011_1111); + const WORKER_ID2: WorkerId = WorkerId(0x0022_2222); let scheduler = SimpleScheduler::new_with_callback( &config::schedulers::SimpleScheduler { worker_timeout_s: WORKER_TIMEOUT_S, @@ -462,18 +482,23 @@ mod scheduler_tests { let action_digest = DigestInfo::new([99u8; 32], 512); // Note: This needs to stay in scope or a disconnect will trigger. - let mut rx_from_worker1 = setup_new_worker(&scheduler, WORKER_ID1, Default::default()).await?; + let mut rx_from_worker1 = setup_new_worker(&scheduler, WORKER_ID1, PlatformProperties::default()).await?; let insert_timestamp = make_system_time(1); - let mut client_rx = - setup_action(&scheduler, action_digest.clone(), Default::default(), insert_timestamp).await?; + let mut client_rx = setup_action( + &scheduler, + action_digest, + PlatformProperties::default(), + insert_timestamp, + ) + .await?; // Note: This needs to stay in scope or a disconnect will trigger. - let mut rx_from_worker2 = setup_new_worker(&scheduler, WORKER_ID2, Default::default()).await?; + let mut rx_from_worker2 = setup_new_worker(&scheduler, WORKER_ID2, PlatformProperties::default()).await?; let mut expected_action_state = ActionState { // Name is a random string, so we ignore it and just make it the same. name: "UNKNOWN_HERE".to_string(), - action_digest: action_digest.clone(), + action_digest, stage: ActionStage::Executing, }; @@ -482,7 +507,7 @@ mod scheduler_tests { execute_request: Some(ExecuteRequest { instance_name: INSTANCE_NAME.to_string(), skip_cache_lookup: true, - action_digest: Some(action_digest.clone().into()), + action_digest: Some(action_digest.into()), ..Default::default() }), salt: 0, @@ -539,29 +564,34 @@ mod scheduler_tests { #[tokio::test] async fn update_action_sends_completed_result_to_client_test() -> Result<(), Error> { - const WORKER_ID: WorkerId = WorkerId(0x123456789111); + const WORKER_ID: WorkerId = WorkerId(0x1234_5678_9111); let scheduler = SimpleScheduler::new_with_callback(&config::schedulers::SimpleScheduler::default(), || async move {}); let action_digest = DigestInfo::new([99u8; 32], 512); - let mut rx_from_worker = setup_new_worker(&scheduler, WORKER_ID, Default::default()).await?; + let mut rx_from_worker = setup_new_worker(&scheduler, WORKER_ID, PlatformProperties::default()).await?; let insert_timestamp = make_system_time(1); - let mut client_rx = - setup_action(&scheduler, action_digest.clone(), Default::default(), insert_timestamp).await?; + let mut client_rx = setup_action( + &scheduler, + action_digest, + PlatformProperties::default(), + insert_timestamp, + ) + .await?; { // Other tests check full data. We only care if we got StartAction. match rx_from_worker.recv().await.unwrap().update { Some(update_for_worker::Update::StartAction(_)) => { /* Success */ } - v => assert!(false, "Expected StartAction, got : {:?}", v), + v => panic!("Expected StartAction, got : {v:?}"), } // Other tests check full data. We only care if client thinks we are Executing. assert_eq!(client_rx.borrow_and_update().stage, ActionStage::Executing); } let action_info_hash_key = ActionInfoHashKey { - digest: action_digest.clone(), + digest: action_digest, salt: 0, }; let action_result = ActionResult { @@ -597,7 +627,7 @@ mod scheduler_tests { output_upload_start_timestamp: make_system_time(12), output_upload_completed_timestamp: make_system_time(13), }, - server_logs: Default::default(), + server_logs: HashMap::default(), }; scheduler .update_action( @@ -613,7 +643,7 @@ mod scheduler_tests { let expected_action_state = ActionState { // Name is a random string, so we ignore it and just make it the same. name: action_state.name.clone(), - action_digest: action_digest.clone(), + action_digest, stage: ActionStage::Completed(action_result), }; assert_eq!(action_state.as_ref(), &expected_action_state); @@ -621,7 +651,7 @@ mod scheduler_tests { { // Update info for the action should now be closed (notification happens through Err). let result = client_rx.changed().await; - assert!(result.is_err(), "Expected result to be an error : {:?}", result); + assert!(result.is_err(), "Expected result to be an error : {result:?}"); } Ok(()) @@ -629,37 +659,42 @@ mod scheduler_tests { #[tokio::test] async fn update_action_with_wrong_worker_id_errors_test() -> Result<(), Error> { - const GOOD_WORKER_ID: WorkerId = WorkerId(0x123456789111); - const ROGUE_WORKER_ID: WorkerId = WorkerId(0x987654321); + const GOOD_WORKER_ID: WorkerId = WorkerId(0x1234_5678_9111); + const ROGUE_WORKER_ID: WorkerId = WorkerId(0x0009_8765_4321); let scheduler = SimpleScheduler::new_with_callback(&config::schedulers::SimpleScheduler::default(), || async move {}); let action_digest = DigestInfo::new([99u8; 32], 512); - let mut rx_from_worker = setup_new_worker(&scheduler, GOOD_WORKER_ID, Default::default()).await?; + let mut rx_from_worker = setup_new_worker(&scheduler, GOOD_WORKER_ID, PlatformProperties::default()).await?; let insert_timestamp = make_system_time(1); - let mut client_rx = - setup_action(&scheduler, action_digest.clone(), Default::default(), insert_timestamp).await?; + let mut client_rx = setup_action( + &scheduler, + action_digest, + PlatformProperties::default(), + insert_timestamp, + ) + .await?; { // Other tests check full data. We only care if we got StartAction. match rx_from_worker.recv().await.unwrap().update { Some(update_for_worker::Update::StartAction(_)) => { /* Success */ } - v => assert!(false, "Expected StartAction, got : {:?}", v), + v => panic!("Expected StartAction, got : {v:?}"), } // Other tests check full data. We only care if client thinks we are Executing. assert_eq!(client_rx.borrow_and_update().stage, ActionStage::Executing); } let action_info_hash_key = ActionInfoHashKey { - digest: action_digest.clone(), + digest: action_digest, salt: 0, }; let action_result = ActionResult { - output_files: Default::default(), - output_folders: Default::default(), - output_file_symlinks: Default::default(), - output_directory_symlinks: Default::default(), + output_files: Vec::default(), + output_folders: Vec::default(), + output_file_symlinks: Vec::default(), + output_directory_symlinks: Vec::default(), exit_code: 0, stdout_digest: DigestInfo::new([6u8; 32], 19), stderr_digest: DigestInfo::new([7u8; 32], 20), @@ -675,7 +710,7 @@ mod scheduler_tests { output_upload_start_timestamp: make_system_time(12), output_upload_completed_timestamp: make_system_time(13), }, - server_logs: Default::default(), + server_logs: HashMap::default(), }; let update_action_result = scheduler .update_action( @@ -686,19 +721,17 @@ mod scheduler_tests { .await; { + const EXPECTED_ERR: &str = "Got a result from a worker that should not be running the action"; // Our request should have sent an error back. assert!( update_action_result.is_err(), "Expected error, got: {:?}", &update_action_result ); - const EXPECTED_ERR: &str = "Got a result from a worker that should not be running the action"; let err = update_action_result.unwrap_err(); assert!( err.to_string().contains(EXPECTED_ERR), - "Error should contain '{}', got: {:?}", - EXPECTED_ERR, - err + "Error should contain '{EXPECTED_ERR}', got: {err:?}", ); } { @@ -715,22 +748,27 @@ mod scheduler_tests { #[tokio::test] async fn does_not_crash_if_operation_joined_then_relaunched() -> Result<(), Error> { - const WORKER_ID: WorkerId = WorkerId(0x10000f); + const WORKER_ID: WorkerId = WorkerId(0x0010_000f); let scheduler = SimpleScheduler::new_with_callback(&config::schedulers::SimpleScheduler::default(), || async move {}); let action_digest = DigestInfo::new([99u8; 32], 512); let mut expected_action_state = ActionState { - name: "".to_string(), // Will be filled later. - action_digest: action_digest.clone(), + name: String::new(), // Will be filled later. + action_digest, stage: ActionStage::Executing, }; let insert_timestamp = make_system_time(1); - let mut client_rx = - setup_action(&scheduler, action_digest.clone(), Default::default(), insert_timestamp).await?; - let mut rx_from_worker = setup_new_worker(&scheduler, WORKER_ID, Default::default()).await?; + let mut client_rx = setup_action( + &scheduler, + action_digest, + PlatformProperties::default(), + insert_timestamp, + ) + .await?; + let mut rx_from_worker = setup_new_worker(&scheduler, WORKER_ID, PlatformProperties::default()).await?; { // Worker should have been sent an execute command. @@ -739,7 +777,7 @@ mod scheduler_tests { execute_request: Some(ExecuteRequest { instance_name: INSTANCE_NAME.to_string(), skip_cache_lookup: true, - action_digest: Some(action_digest.clone().into()), + action_digest: Some(action_digest.into()), ..Default::default() }), salt: 0, @@ -759,15 +797,15 @@ mod scheduler_tests { } let action_result = ActionResult { - output_files: Default::default(), - output_folders: Default::default(), - output_directory_symlinks: Default::default(), - output_file_symlinks: Default::default(), + output_files: Vec::default(), + output_folders: Vec::default(), + output_directory_symlinks: Vec::default(), + output_file_symlinks: Vec::default(), exit_code: Default::default(), stdout_digest: DigestInfo::new([1u8; 32], 512), stderr_digest: DigestInfo::new([2u8; 32], 512), execution_metadata: ExecutionMetadata { - worker: "".to_string(), + worker: String::new(), queued_timestamp: SystemTime::UNIX_EPOCH, worker_start_timestamp: SystemTime::UNIX_EPOCH, worker_completed_timestamp: SystemTime::UNIX_EPOCH, @@ -778,14 +816,14 @@ mod scheduler_tests { output_upload_start_timestamp: SystemTime::UNIX_EPOCH, output_upload_completed_timestamp: SystemTime::UNIX_EPOCH, }, - server_logs: Default::default(), + server_logs: HashMap::default(), }; scheduler .update_action( &WORKER_ID, &ActionInfoHashKey { - digest: action_digest.clone(), + digest: action_digest, salt: 0, }, ActionStage::Completed(action_result.clone()), @@ -803,8 +841,13 @@ mod scheduler_tests { { let insert_timestamp = make_system_time(1); - let mut client_rx = - setup_action(&scheduler, action_digest.clone(), Default::default(), insert_timestamp).await?; + let mut client_rx = setup_action( + &scheduler, + action_digest, + PlatformProperties::default(), + insert_timestamp, + ) + .await?; // We didn't disconnect our worker, so it will have scheduled it to the worker. expected_action_state.stage = ActionStage::Executing; let action_state = client_rx.borrow_and_update(); @@ -820,7 +863,7 @@ mod scheduler_tests { /// a job finished on a specific worker (eg: restore platform properties). #[tokio::test] async fn run_two_jobs_on_same_worker_with_platform_properties_restrictions() -> Result<(), Error> { - const WORKER_ID: WorkerId = WorkerId(0x123456789111); + const WORKER_ID: WorkerId = WorkerId(0x1234_5678_9111); let scheduler = SimpleScheduler::new_with_callback(&config::schedulers::SimpleScheduler::default(), || async move {}); @@ -834,23 +877,17 @@ mod scheduler_tests { let insert_timestamp1 = make_system_time(1); let mut client1_rx = setup_action( &scheduler, - action_digest1.clone(), + action_digest1, platform_properties.clone(), insert_timestamp1, ) .await?; let insert_timestamp2 = make_system_time(1); - let mut client2_rx = setup_action( - &scheduler, - action_digest2.clone(), - platform_properties, - insert_timestamp2, - ) - .await?; + let mut client2_rx = setup_action(&scheduler, action_digest2, platform_properties, insert_timestamp2).await?; match rx_from_worker.recv().await.unwrap().update { Some(update_for_worker::Update::StartAction(_)) => { /* Success */ } - v => assert!(false, "Expected StartAction, got : {:?}", v), + v => panic!("Expected StartAction, got : {v:?}"), } { // First client should be in an Executing state. @@ -860,10 +897,10 @@ mod scheduler_tests { } let action_result = ActionResult { - output_files: Default::default(), - output_folders: Default::default(), - output_file_symlinks: Default::default(), - output_directory_symlinks: Default::default(), + output_files: Vec::default(), + output_folders: Vec::default(), + output_file_symlinks: Vec::default(), + output_directory_symlinks: Vec::default(), exit_code: 0, stdout_digest: DigestInfo::new([6u8; 32], 19), stderr_digest: DigestInfo::new([7u8; 32], 20), @@ -879,7 +916,7 @@ mod scheduler_tests { output_upload_start_timestamp: make_system_time(12), output_upload_completed_timestamp: make_system_time(13), }, - server_logs: Default::default(), + server_logs: HashMap::default(), }; // Tell scheduler our first task is completed. @@ -887,7 +924,7 @@ mod scheduler_tests { .update_action( &WORKER_ID, &ActionInfoHashKey { - digest: action_digest1.clone(), + digest: action_digest1, salt: 0, }, ActionStage::Completed(action_result.clone()), @@ -906,7 +943,7 @@ mod scheduler_tests { let mut expected_action_state = ActionState { // Name is a random string, so we ignore it and just make it the same. name: action_state.name.clone(), - action_digest: action_digest1.clone(), + action_digest: action_digest1, stage: ActionStage::Completed(action_result.clone()), }; // We now know the name of the action so populate it. @@ -921,7 +958,7 @@ mod scheduler_tests { // Our second client should now executing. match rx_from_worker.recv().await.unwrap().update { Some(update_for_worker::Update::StartAction(_)) => { /* Success */ } - v => assert!(false, "Expected StartAction, got : {:?}", v), + v => panic!("Expected StartAction, got : {v:?}"), } // Other tests check full data. We only care if client thinks we are Executing. assert_eq!(client2_rx.borrow_and_update().stage, ActionStage::Executing); @@ -932,7 +969,7 @@ mod scheduler_tests { .update_action( &WORKER_ID, &ActionInfoHashKey { - digest: action_digest2.clone(), + digest: action_digest2, salt: 0, }, ActionStage::Completed(action_result.clone()), @@ -945,7 +982,7 @@ mod scheduler_tests { let mut expected_action_state = ActionState { // Name is a random string, so we ignore it and just make it the same. name: action_state.name.clone(), - action_digest: action_digest2.clone(), + action_digest: action_digest2, stage: ActionStage::Completed(action_result.clone()), }; // We now know the name of the action so populate it. @@ -959,7 +996,7 @@ mod scheduler_tests { /// This tests that actions are performed in the order they were queued. #[tokio::test] async fn run_jobs_in_the_order_they_were_queued() -> Result<(), Error> { - const WORKER_ID: WorkerId = WorkerId(0x123456789111); + const WORKER_ID: WorkerId = WorkerId(0x1234_5678_9111); let scheduler = SimpleScheduler::new_with_callback(&config::schedulers::SimpleScheduler::default(), || async move {}); @@ -975,7 +1012,7 @@ mod scheduler_tests { let insert_timestamp2 = make_system_time(2); let mut client2_rx = setup_action( &scheduler, - action_digest2.clone(), + action_digest2, platform_properties.clone(), insert_timestamp2, ) @@ -983,7 +1020,7 @@ mod scheduler_tests { let insert_timestamp1 = make_system_time(1); let mut client1_rx = setup_action( &scheduler, - action_digest1.clone(), + action_digest1, platform_properties.clone(), insert_timestamp1, ) @@ -994,7 +1031,7 @@ mod scheduler_tests { match rx_from_worker.recv().await.unwrap().update { Some(update_for_worker::Update::StartAction(_)) => { /* Success */ } - v => assert!(false, "Expected StartAction, got : {:?}", v), + v => panic!("Expected StartAction, got : {v:?}"), } { // First client should be in an Executing state. @@ -1008,7 +1045,7 @@ mod scheduler_tests { #[tokio::test] async fn worker_retries_on_internal_error_and_fails_test() -> Result<(), Error> { - const WORKER_ID: WorkerId = WorkerId(0x123456789111); + const WORKER_ID: WorkerId = WorkerId(0x1234_5678_9111); let scheduler = SimpleScheduler::new_with_callback( &config::schedulers::SimpleScheduler { @@ -1019,23 +1056,28 @@ mod scheduler_tests { ); let action_digest = DigestInfo::new([99u8; 32], 512); - let mut rx_from_worker = setup_new_worker(&scheduler, WORKER_ID, Default::default()).await?; + let mut rx_from_worker = setup_new_worker(&scheduler, WORKER_ID, PlatformProperties::default()).await?; let insert_timestamp = make_system_time(1); - let mut client_rx = - setup_action(&scheduler, action_digest.clone(), Default::default(), insert_timestamp).await?; + let mut client_rx = setup_action( + &scheduler, + action_digest, + PlatformProperties::default(), + insert_timestamp, + ) + .await?; { // Other tests check full data. We only care if we got StartAction. match rx_from_worker.recv().await.unwrap().update { Some(update_for_worker::Update::StartAction(_)) => { /* Success */ } - v => assert!(false, "Expected StartAction, got : {:?}", v), + v => panic!("Expected StartAction, got : {v:?}"), } // Other tests check full data. We only care if client thinks we are Executing. assert_eq!(client_rx.borrow_and_update().stage, ActionStage::Executing); } let action_info_hash_key = ActionInfoHashKey { - digest: action_digest.clone(), + digest: action_digest, salt: 0, }; scheduler @@ -1052,19 +1094,19 @@ mod scheduler_tests { let expected_action_state = ActionState { // Name is a random string, so we ignore it and just make it the same. name: action_state.name.clone(), - action_digest: action_digest.clone(), + action_digest, stage: ActionStage::Queued, }; assert_eq!(action_state.as_ref(), &expected_action_state); } // Now connect a new worker and it should pickup the action. - let mut rx_from_worker = setup_new_worker(&scheduler, WORKER_ID, Default::default()).await?; + let mut rx_from_worker = setup_new_worker(&scheduler, WORKER_ID, PlatformProperties::default()).await?; { // Other tests check full data. We only care if we got StartAction. match rx_from_worker.recv().await.unwrap().update { Some(update_for_worker::Update::StartAction(_)) => { /* Success */ } - v => assert!(false, "Expected StartAction, got : {:?}", v), + v => panic!("Expected StartAction, got : {v:?}"), } // Other tests check full data. We only care if client thinks we are Executing. assert_eq!(client_rx.borrow_and_update().stage, ActionStage::Executing); @@ -1085,14 +1127,14 @@ mod scheduler_tests { let expected_action_state = ActionState { // Name is a random string, so we ignore it and just make it the same. name: action_state.name.clone(), - action_digest: action_digest.clone(), + action_digest, stage: ActionStage::Error(( make_err!(Code::Internal, "Some error"), ActionResult { - output_files: Default::default(), - output_folders: Default::default(), - output_file_symlinks: Default::default(), - output_directory_symlinks: Default::default(), + output_files: Vec::default(), + output_folders: Vec::default(), + output_file_symlinks: Vec::default(), + output_directory_symlinks: Vec::default(), exit_code: INTERNAL_ERROR_EXIT_CODE, stdout_digest: DigestInfo::empty_digest(), stderr_digest: DigestInfo::empty_digest(), @@ -1108,7 +1150,7 @@ mod scheduler_tests { output_upload_start_timestamp: SystemTime::UNIX_EPOCH, output_upload_completed_timestamp: SystemTime::UNIX_EPOCH, }, - server_logs: Default::default(), + server_logs: HashMap::default(), }, )), };