From 656e7f7db00b12443996fa370076a4695e10768f Mon Sep 17 00:00:00 2001 From: Chris Staite Date: Thu, 20 Jul 2023 12:50:58 +0000 Subject: [PATCH] Add property modifying scheduler. In order to avoid modifying the Goma executable, add a wrapping scheduler that is able to modify the properties that are passed in from it. --- cas/scheduler/BUILD | 37 +++ cas/scheduler/default_scheduler_factory.rs | 15 +- cas/scheduler/property_modifier_scheduler.rs | 75 ++++++ .../tests/property_modifier_scheduler_test.rs | 246 ++++++++++++++++++ config/schedulers.rs | 30 +++ 5 files changed, 402 insertions(+), 1 deletion(-) create mode 100644 cas/scheduler/property_modifier_scheduler.rs create mode 100644 cas/scheduler/tests/property_modifier_scheduler_test.rs diff --git a/cas/scheduler/BUILD b/cas/scheduler/BUILD index 25f87cf63..4f23b6912 100644 --- a/cas/scheduler/BUILD +++ b/cas/scheduler/BUILD @@ -140,6 +140,42 @@ rust_test( ], ) +rust_library( + name = "property_modifier_scheduler", + srcs = ["property_modifier_scheduler.rs"], + visibility = [ + "//cas:__pkg__", + "//cas:__subpackages__", + ], + proc_macro_deps = ["@crate_index//:async-trait"], + deps = [ + ":action_messages", + ":platform_property_manager", + ":scheduler", + "//config", + "//util:error", + "@crate_index//:tokio", + ], +) + +rust_test( + name = "property_modifier_scheduler_test", + srcs = ["tests/property_modifier_scheduler_test.rs"], + deps = [ + ":action_messages", + ":property_modifier_scheduler", + ":mock_scheduler", + ":platform_property_manager", + ":scheduler", + ":scheduler_utils", + "//config", + "//util:common", + "//util:error", + "@crate_index//:pretty_assertions", + "@crate_index//:tokio", + ], +) + rust_library( name = "grpc_scheduler", srcs = ["grpc_scheduler.rs"], @@ -172,6 +208,7 @@ rust_library( deps = [ ":cache_lookup_scheduler", ":grpc_scheduler", + ":property_modifier_scheduler", ":scheduler", ":simple_scheduler", "//cas/store", diff --git a/cas/scheduler/default_scheduler_factory.rs b/cas/scheduler/default_scheduler_factory.rs index 6a663a07f..2f24da0ed 100644 --- a/cas/scheduler/default_scheduler_factory.rs +++ b/cas/scheduler/default_scheduler_factory.rs @@ -23,6 +23,7 @@ use cache_lookup_scheduler::CacheLookupScheduler; use config::schedulers::SchedulerConfig; use error::{Error, ResultExt}; use grpc_scheduler::GrpcScheduler; +use property_modifier_scheduler::PropertyModifierScheduler; use scheduler::{ActionScheduler, WorkerScheduler}; use simple_scheduler::SimpleScheduler; use store::StoreManager; @@ -47,7 +48,9 @@ pub fn scheduler_factory<'a>( let ac_store = store_manager .get_store(&config.ac_store) .err_tip(|| format!("'ac_store': '{}' does not exist", config.ac_store))?; - let (action_scheduler, worker_scheduler) = scheduler_factory(&config.scheduler, store_manager).await?; + let (action_scheduler, worker_scheduler) = scheduler_factory(&config.scheduler, store_manager) + .await + .err_tip(|| "In nested CacheLookupScheduler construction")?; let cache_lookup_scheduler = Arc::new(CacheLookupScheduler::new( cas_store, ac_store, @@ -55,6 +58,16 @@ pub fn scheduler_factory<'a>( )?); (Some(cache_lookup_scheduler), worker_scheduler) } + SchedulerConfig::property_modifier(config) => { + let (action_scheduler, worker_scheduler) = scheduler_factory(&config.scheduler, store_manager) + .await + .err_tip(|| "In nested PropertyModifierScheduler construction")?; + let property_modifier_scheduler = Arc::new(PropertyModifierScheduler::new( + config, + action_scheduler.err_tip(|| "Nested scheduler is not an action scheduler")?, + )); + (Some(property_modifier_scheduler), worker_scheduler) + } }; if let Some(action_scheduler) = &scheduler.0 { diff --git a/cas/scheduler/property_modifier_scheduler.rs b/cas/scheduler/property_modifier_scheduler.rs new file mode 100644 index 000000000..f08a03483 --- /dev/null +++ b/cas/scheduler/property_modifier_scheduler.rs @@ -0,0 +1,75 @@ +// Copyright 2023 The Turbo Cache Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use async_trait::async_trait; +use tokio::sync::watch; + +use action_messages::{ActionInfo, ActionInfoHashKey, ActionState}; +use config::schedulers::PropertyModification; +use error::{Error, ResultExt}; +use platform_property_manager::PlatformPropertyManager; +use scheduler::ActionScheduler; + +pub struct PropertyModifierScheduler { + modifications: Vec, + scheduler: Arc, +} + +impl PropertyModifierScheduler { + pub fn new(config: &config::schedulers::PropertyModifierScheduler, scheduler: Arc) -> Self { + Self { + modifications: config.modifications.clone(), + scheduler, + } + } +} + +#[async_trait] +impl ActionScheduler for PropertyModifierScheduler { + async fn get_platform_property_manager(&self, instance_name: &str) -> Result, Error> { + self.scheduler.get_platform_property_manager(instance_name).await + } + + async fn add_action(&self, mut action_info: ActionInfo) -> Result>, Error> { + let platform_property_manager = self + .get_platform_property_manager(&action_info.unique_qualifier.instance_name) + .await + .err_tip(|| "In PropertyModifierScheduler::add_action")?; + for modification in &self.modifications { + match modification { + PropertyModification::Add(addition) => action_info.platform_properties.properties.insert( + addition.name.clone(), + platform_property_manager + .make_prop_value(&addition.name, &addition.value) + .err_tip(|| "In PropertyModifierScheduler::add_action")?, + ), + PropertyModification::Remove(name) => action_info.platform_properties.properties.remove(name), + }; + } + self.scheduler.add_action(action_info).await + } + + async fn find_existing_action( + &self, + unique_qualifier: &ActionInfoHashKey, + ) -> Option>> { + self.scheduler.find_existing_action(unique_qualifier).await + } + + async fn clean_recently_completed_actions(&self) { + self.scheduler.clean_recently_completed_actions().await + } +} diff --git a/cas/scheduler/tests/property_modifier_scheduler_test.rs b/cas/scheduler/tests/property_modifier_scheduler_test.rs new file mode 100644 index 000000000..36f59d126 --- /dev/null +++ b/cas/scheduler/tests/property_modifier_scheduler_test.rs @@ -0,0 +1,246 @@ +// Copyright 2023 The Turbo Cache Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::Arc; +use std::time::UNIX_EPOCH; + +use tokio::{join, sync::watch}; + +use action_messages::{ActionInfoHashKey, ActionStage, ActionState}; +use common::DigestInfo; +use config::schedulers::{PlatformPropertyAddition, PropertyModification, PropertyType}; +use error::Error; +use mock_scheduler::MockActionScheduler; +use platform_property_manager::{PlatformPropertyManager, PlatformPropertyValue}; +use property_modifier_scheduler::PropertyModifierScheduler; +use scheduler::ActionScheduler; +use scheduler_utils::{make_base_action_info, INSTANCE_NAME}; + +struct TestContext { + mock_scheduler: Arc, + modifier_scheduler: PropertyModifierScheduler, +} + +fn make_modifier_scheduler(modifications: Vec) -> TestContext { + let mock_scheduler = Arc::new(MockActionScheduler::new()); + let config = config::schedulers::PropertyModifierScheduler { + modifications, + scheduler: Box::new(config::schedulers::SchedulerConfig::simple( + config::schedulers::SimpleScheduler::default(), + )), + }; + let modifier_scheduler = PropertyModifierScheduler::new(&config, mock_scheduler.clone()); + TestContext { + mock_scheduler, + modifier_scheduler, + } +} + +#[cfg(test)] +mod property_modifier_scheduler_tests { + use super::*; + use pretty_assertions::assert_eq; // Must be declared in every module. + + #[tokio::test] + async fn platform_property_manager_call_passed() -> Result<(), Error> { + let context = make_modifier_scheduler(vec![]); + let platform_property_manager = Arc::new(PlatformPropertyManager::new(HashMap::new())); + let instance_name = INSTANCE_NAME.to_string(); + let (actual_manager, actual_instance_name) = join!( + context.modifier_scheduler.get_platform_property_manager(&instance_name), + context + .mock_scheduler + .expect_get_platform_property_manager(Ok(platform_property_manager.clone())), + ); + assert_eq!(Arc::as_ptr(&platform_property_manager), Arc::as_ptr(&actual_manager?)); + assert_eq!(instance_name, actual_instance_name); + Ok(()) + } + + #[tokio::test] + async fn add_action_adds_property() -> Result<(), Error> { + let name = "name".to_string(); + let value = "value".to_string(); + let context = make_modifier_scheduler(vec![PropertyModification::Add(PlatformPropertyAddition { + name: name.clone(), + value: value.clone(), + })]); + let action_info = make_base_action_info(UNIX_EPOCH); + let (_forward_watch_channel_tx, forward_watch_channel_rx) = watch::channel(Arc::new(ActionState { + unique_qualifier: action_info.unique_qualifier.clone(), + stage: ActionStage::Queued, + })); + let platform_property_manager = Arc::new(PlatformPropertyManager::new(HashMap::from([( + name.clone(), + PropertyType::Exact, + )]))); + let (_, _, action_info) = join!( + context.modifier_scheduler.add_action(action_info), + context + .mock_scheduler + .expect_get_platform_property_manager(Ok(platform_property_manager)), + context.mock_scheduler.expect_add_action(Ok(forward_watch_channel_rx)), + ); + assert_eq!( + HashMap::from([(name, PlatformPropertyValue::Exact(value))]), + action_info.platform_properties.properties + ); + Ok(()) + } + + #[tokio::test] + async fn add_action_overwrites_property() -> Result<(), Error> { + let name = "name".to_string(); + let original_value = "value".to_string(); + let replaced_value = "replaced".to_string(); + let context = make_modifier_scheduler(vec![PropertyModification::Add(PlatformPropertyAddition { + name: name.clone(), + value: replaced_value.clone(), + })]); + let mut action_info = make_base_action_info(UNIX_EPOCH); + action_info + .platform_properties + .properties + .insert(name.clone(), PlatformPropertyValue::Unknown(original_value)); + let (_forward_watch_channel_tx, forward_watch_channel_rx) = watch::channel(Arc::new(ActionState { + unique_qualifier: action_info.unique_qualifier.clone(), + stage: ActionStage::Queued, + })); + let platform_property_manager = Arc::new(PlatformPropertyManager::new(HashMap::from([( + name.clone(), + PropertyType::Exact, + )]))); + let (_, _, action_info) = join!( + context.modifier_scheduler.add_action(action_info), + context + .mock_scheduler + .expect_get_platform_property_manager(Ok(platform_property_manager)), + context.mock_scheduler.expect_add_action(Ok(forward_watch_channel_rx)), + ); + assert_eq!( + HashMap::from([(name, PlatformPropertyValue::Exact(replaced_value))]), + action_info.platform_properties.properties + ); + Ok(()) + } + + #[tokio::test] + async fn add_action_property_added_after_remove() -> Result<(), Error> { + let name = "name".to_string(); + let value = "value".to_string(); + let context = make_modifier_scheduler(vec![ + PropertyModification::Remove(name.clone()), + PropertyModification::Add(PlatformPropertyAddition { + name: name.clone(), + value: value.clone(), + }), + ]); + let action_info = make_base_action_info(UNIX_EPOCH); + let (_forward_watch_channel_tx, forward_watch_channel_rx) = watch::channel(Arc::new(ActionState { + unique_qualifier: action_info.unique_qualifier.clone(), + stage: ActionStage::Queued, + })); + let platform_property_manager = Arc::new(PlatformPropertyManager::new(HashMap::from([( + name.clone(), + PropertyType::Exact, + )]))); + let (_, _, action_info) = join!( + context.modifier_scheduler.add_action(action_info), + context + .mock_scheduler + .expect_get_platform_property_manager(Ok(platform_property_manager)), + context.mock_scheduler.expect_add_action(Ok(forward_watch_channel_rx)), + ); + assert_eq!( + HashMap::from([(name, PlatformPropertyValue::Exact(value))]), + action_info.platform_properties.properties + ); + Ok(()) + } + + #[tokio::test] + async fn add_action_property_remove_after_add() -> Result<(), Error> { + let name = "name".to_string(); + let value = "value".to_string(); + let context = make_modifier_scheduler(vec![ + PropertyModification::Add(PlatformPropertyAddition { + name: name.clone(), + value: value.clone(), + }), + PropertyModification::Remove(name.clone()), + ]); + let action_info = make_base_action_info(UNIX_EPOCH); + let (_forward_watch_channel_tx, forward_watch_channel_rx) = watch::channel(Arc::new(ActionState { + unique_qualifier: action_info.unique_qualifier.clone(), + stage: ActionStage::Queued, + })); + let platform_property_manager = Arc::new(PlatformPropertyManager::new(HashMap::from([( + name, + PropertyType::Exact, + )]))); + let (_, _, action_info) = join!( + context.modifier_scheduler.add_action(action_info), + context + .mock_scheduler + .expect_get_platform_property_manager(Ok(platform_property_manager)), + context.mock_scheduler.expect_add_action(Ok(forward_watch_channel_rx)), + ); + assert_eq!(HashMap::from([]), action_info.platform_properties.properties); + Ok(()) + } + + #[tokio::test] + async fn add_action_property_remove() -> Result<(), Error> { + let name = "name".to_string(); + let value = "value".to_string(); + let context = make_modifier_scheduler(vec![PropertyModification::Remove(name.clone())]); + let mut action_info = make_base_action_info(UNIX_EPOCH); + action_info + .platform_properties + .properties + .insert(name, PlatformPropertyValue::Unknown(value)); + let (_forward_watch_channel_tx, forward_watch_channel_rx) = watch::channel(Arc::new(ActionState { + unique_qualifier: action_info.unique_qualifier.clone(), + stage: ActionStage::Queued, + })); + let platform_property_manager = Arc::new(PlatformPropertyManager::new(HashMap::new())); + let (_, _, action_info) = join!( + context.modifier_scheduler.add_action(action_info), + context + .mock_scheduler + .expect_get_platform_property_manager(Ok(platform_property_manager)), + context.mock_scheduler.expect_add_action(Ok(forward_watch_channel_rx)), + ); + assert_eq!(HashMap::from([]), action_info.platform_properties.properties); + Ok(()) + } + + #[tokio::test] + async fn find_existing_action_call_passed() -> Result<(), Error> { + let context = make_modifier_scheduler(vec![]); + let action_name = ActionInfoHashKey { + instance_name: "instance".to_string(), + digest: DigestInfo::new([8; 32], 1), + salt: 1000, + }; + let (actual_result, actual_action_name) = join!( + context.modifier_scheduler.find_existing_action(&action_name), + context.mock_scheduler.expect_find_existing_action(None), + ); + assert_eq!(true, actual_result.is_none()); + assert_eq!(action_name, actual_action_name); + Ok(()) + } +} diff --git a/config/schedulers.rs b/config/schedulers.rs index ec58b1e26..91bb7fb83 100644 --- a/config/schedulers.rs +++ b/config/schedulers.rs @@ -25,6 +25,7 @@ pub enum SchedulerConfig { simple(SimpleScheduler), grpc(GrpcScheduler), cache_lookup(CacheLookupScheduler), + property_modifier(PropertyModifierScheduler), } /// When the scheduler matches tasks to workers that are capable of running @@ -143,3 +144,32 @@ pub struct CacheLookupScheduler { /// The nested scheduler to use if cache lookup fails. pub scheduler: Box, } + +#[derive(Deserialize, Debug, Clone)] +pub struct PlatformPropertyAddition { + /// The name of the property to add. + pub name: String, + /// The value to assign to the property. + pub value: String, +} + +#[derive(Deserialize, Debug, Clone)] +pub enum PropertyModification { + /// Add a property to the action properties. + Add(PlatformPropertyAddition), + /// Remove a named property from the action. + Remove(String), +} + +#[derive(Deserialize, Debug)] +pub struct PropertyModifierScheduler { + /// A list of modifications to perform to incoming actions for the nested + /// scheduler. These are performed in order and blindly, so removing a + /// property that doesn't exist is fine and overwriting an existing property + /// is also fine. If adding properties that do not exist in the nested + /// scheduler is not supported and will likely cause unexpected behaviour. + pub modifications: Vec, + + /// The nested scheduler to use after modifying the properties. + pub scheduler: Box, +}