From 60651790071d3af861f6cb6176040008d515f923 Mon Sep 17 00:00:00 2001 From: Jamie Winsor Date: Fri, 13 Nov 2015 06:58:09 -0800 Subject: [PATCH] refactor Sidecar into an actor health check hook can now be templatized with mustache --- src/bldr/command/start.rs | 1 - src/bldr/pkg.rs | 9 ++- src/bldr/sidecar.rs | 136 ++++++++++++++++++++------------ src/bldr/topology/mod.rs | 129 ++++++++++++++++++------------ src/bldr/topology/standalone.rs | 4 +- src/main.rs | 2 - 6 files changed, 175 insertions(+), 106 deletions(-) diff --git a/src/bldr/command/start.rs b/src/bldr/command/start.rs index 604cd185f6..0ee601914d 100644 --- a/src/bldr/command/start.rs +++ b/src/bldr/command/start.rs @@ -82,4 +82,3 @@ pub fn package(config: &Config) -> BldrResult<()> { } Ok(()) } - diff --git a/src/bldr/pkg.rs b/src/bldr/pkg.rs index 22fee1487a..f645cbc052 100644 --- a/src/bldr/pkg.rs +++ b/src/bldr/pkg.rs @@ -285,6 +285,11 @@ impl Package { } } + /// Get status of running package. + pub fn status(&self) -> BldrResult { + self.signal(Signal::Status) + } + pub fn exposes(&self) -> Vec { match fs::metadata(self.join_path("EXPOSES")) { Ok(_) => { @@ -419,9 +424,9 @@ impl Package { } } - pub fn health_check(&self) -> BldrResult { + pub fn health_check(&self, config: &ServiceConfig) -> BldrResult { if let Some(hook) = self.hooks().health_check_hook { - match hook.run(None) { + match hook.run(Some(config)) { Ok(output) => Ok(health_check::CheckResult::ok(output)), Err(BldrError::HookFailed(_, 1, output)) => Ok(health_check::CheckResult::warning(output)), Err(BldrError::HookFailed(_, 2, output)) => Ok(health_check::CheckResult::critical(output)), diff --git a/src/bldr/sidecar.rs b/src/bldr/sidecar.rs index 8a0c8a6b0b..585fa6fd31 100644 --- a/src/bldr/sidecar.rs +++ b/src/bldr/sidecar.rs @@ -27,29 +27,84 @@ use iron::prelude::*; use iron::status; use router::Router; -use std::sync::Arc; -use std::thread; +use std::sync::{Arc, RwLock}; -use error::{BldrError, BldrResult}; +use wonder; +use wonder::actor::{GenServer, InitResult, HandleResult, StopReason, ActorSender}; -use pkg::{Package, Signal}; +use error::BldrError; use health_check; +use pkg::Package; +use service_config::ServiceConfig; -/// The sidecar state -struct Sidecar { +const GET_HEALTH: &'static str = "/health"; +const GET_CONFIG: &'static str = "/config"; +const GET_STATUS: &'static str = "/status"; +const LISTEN_ADDR: &'static str = "0.0.0.0:9631"; + +pub type SidecarActor = wonder::actor::Actor; + +pub struct Sidecar; + +pub struct SidecarState { /// The package this sidecar is helping out - pub package: Package, + pub package: Arc>, + /// The configuration of the supervised service + pub config: Arc>, +} + +#[derive(Debug)] +pub enum SidecarMessage { + Ok, + Stop, +} + +impl SidecarState { + pub fn new(package: Arc>, config: Arc>) -> Self { + SidecarState { + package: package, + config: config, + } + } } impl Sidecar { - /// Returns a new sidecar. - /// - /// # Failures - /// - /// * If the package cannot be found - fn new(pkg: &str) -> BldrResult> { - let package = try!(Package::latest(pkg, None)); - Ok(Arc::new(Sidecar{package: package})) + /// Start the sidecar. + pub fn start(package: Arc>, config: Arc>) -> SidecarActor { + let state = SidecarState::new(package, config); + wonder::actor::Builder::new(Sidecar).name("sidecar".to_string()).start(state).unwrap() + } +} + +impl GenServer for Sidecar { + type T = SidecarMessage; + type S = SidecarState; + type E = BldrError; + + fn init(&self, _tx: &ActorSender, _state: &mut Self::S) -> InitResult { + Ok(Some(0)) + } + + fn handle_timeout( + &self, + _tx: &ActorSender, + _me: &ActorSender, + state: &mut Self::S + ) -> HandleResult { + let mut router = Router::new(); + let package_1 = state.package.clone(); + let package_2 = state.package.clone(); + let package_3 = state.package.clone(); + let config_1 = state.config.clone(); + + router.get(GET_CONFIG, move |r: &mut Request| config(&package_1, r)); + router.get(GET_STATUS, move |r: &mut Request| status(&package_2, r)); + router.get(GET_HEALTH, move |r: &mut Request| health(&package_3, &config_1, r)); + + match Iron::new(router).http(LISTEN_ADDR) { + Ok(_) => HandleResult::NoReply(None), + Err(_) => HandleResult::Stop(StopReason::Fatal("couldn't start router".to_string()), None), + } } } @@ -60,8 +115,9 @@ impl Sidecar { /// # Failures /// /// * Fails if the configuration cannot be found. -fn config(sidecar: &Sidecar, _req: &mut Request) -> IronResult { - let last_config = try!(sidecar.package.last_config()); +fn config(lock: &Arc>, _req: &mut Request) -> IronResult { + let package = lock.read().unwrap(); + let last_config = try!(package.last_config()); Ok(Response::with((status::Ok, last_config))) } @@ -72,8 +128,9 @@ fn config(sidecar: &Sidecar, _req: &mut Request) -> IronResult { /// # Failures /// /// * Fails if the supervisor cannot return the status. -fn status(sidecar: &Sidecar, _req: &mut Request) -> IronResult { - let output = try!(sidecar.package.signal(Signal::Status)); +fn status(lock: &Arc>, _req: &mut Request) -> IronResult { + let package = lock.read().unwrap(); + let output = try!(package.status()); Ok(Response::with((status::Ok, output))) } @@ -85,8 +142,16 @@ fn status(sidecar: &Sidecar, _req: &mut Request) -> IronResult { /// # Failures /// /// * If the health_check cannot be run. -fn health(sidecar: &Sidecar, _req: &mut Request) -> IronResult { - let result = try!(sidecar.package.health_check()); +fn health( + package_lock: &Arc>, + config_lock: &Arc>, + _req: &mut Request +) -> IronResult { + let result = { + let package = package_lock.read().unwrap(); + let config = config_lock.read().unwrap(); + try!(package.health_check(&config)) + }; match result.status { health_check::Status::Ok | health_check::Status::Warning => { @@ -101,38 +166,9 @@ fn health(sidecar: &Sidecar, _req: &mut Request) -> IronResult { } } -/// Start the sidecar. -/// -/// # Failures -/// -/// * If the thread cannot be spawned -pub fn run(pkg: &str) -> BldrResult<()> { - let pkg_name = String::from(pkg); - try!(thread::Builder::new().name(String::from("sidecar")).spawn(move || -> BldrResult<()> { - // The sidecar is in an Arc. The clones are - // creating instances to share, and when they all go away, we'll - // reap the instance. Turns out they won't really ever go away, - // but you do what you need to :) - let sidecar = try!(Sidecar::new(&pkg_name)); - let sidecar2 = sidecar.clone(); - let sidecar3 = sidecar.clone(); - - let mut router = Router::new(); - - router.get("/config", move |r: &mut Request| config(&sidecar, r)); - router.get("/status", move |r: &mut Request| status(&sidecar2, r)); - router.get("/health", move |r: &mut Request| health(&sidecar3, r)); - - Iron::new(router).http("0.0.0.0:9631").unwrap(); - Ok(()) - })); - Ok(()) -} - /// Translates BldrErrors into IronErrors impl From for IronError { fn from(err: BldrError) -> IronError { IronError{error: Box::new(err), response: Response::with((status::InternalServerError, "Internal bldr error"))} } } - diff --git a/src/bldr/topology/mod.rs b/src/bldr/topology/mod.rs index 7c9bc6977a..cb16c35122 100644 --- a/src/bldr/topology/mod.rs +++ b/src/bldr/topology/mod.rs @@ -30,6 +30,7 @@ pub mod leader; use ansi_term::Colour::White; use std::thread; +use std::sync::{Arc, RwLock}; use std::sync::mpsc::{TryRecvError}; use libc::{pid_t, c_int}; @@ -43,6 +44,7 @@ use util::signals::SignalNotifier; use error::{BldrResult, BldrError}; use config::Config; use service_config::ServiceConfig; +use sidecar; use user_config; use watch_config; @@ -126,7 +128,9 @@ pub enum State { /// The topology `Worker` is where everything our state machine needs between states lives. pub struct Worker<'a> { /// The package we are supervising - pub package: Package, + pub package: Arc>, + /// Name of the package being supervised + pub package_name: String, /// A pointer to our current Config pub config: &'a Config, /// The topology we are running @@ -134,11 +138,13 @@ pub struct Worker<'a> { /// Our census pub census: census::Census, /// Our Service Configuration; manages changes to our configuration, - pub service_config: ServiceConfig, + pub service_config: Arc>, /// Our Census Entry Actor; writes our entry periodically pub census_entry_actor: wonder::actor::Actor, /// Our Census Actor; reads the census periodically pub census_actor: wonder::actor::Actor, + /// Our Sidecar Actor; exposes a restful HTTP interface to the outside world + pub sidecar_actor: sidecar::SidecarActor, /// Our User Configuration; reads the config periodically pub user_actor: wonder::actor::Actor, /// Our User Configuration; reads the config periodically @@ -146,7 +152,7 @@ pub struct Worker<'a> { /// A pointer to the supervisor thread pub supervisor_thread: Option>>, /// The PID of the Supervisor itself - pub supervisor_id: Option + pub supervisor_id: Option, } impl<'a> Worker<'a> { @@ -161,25 +167,33 @@ impl<'a> Worker<'a> { ce.port(Some(port)); ce.exposes(Some(exposes)); let census_data = ce.as_etcd_write(&package, &config); + let package_name = package.name.clone(); - println!(" {}({}): Supervisor ID {}", package.name, White.bold().paint("T"), ce.candidate_string()); + println!(" {}({}): Supervisor ID {}", package_name, White.bold().paint("T"), ce.candidate_string()); // Setup the Census let census = census::Census::new(ce); - let census_actor_state = census::CensusActorState::new(format!("{}/{}/census", package.name, config.group())); + let census_actor_state = census::CensusActorState::new(format!("{}/{}/census", package_name, config.group())); // Setup the Service Configuration let service_config = try!(ServiceConfig::new(&package)); // Setup the User Data Configuration - let user_actor_state = user_config::UserActorState::new(format!("{}/{}/config", package.name, config.group())); + let user_actor_state = user_config::UserActorState::new(format!("{}/{}/config", package_name, config.group())); // Setup the Watches let mut watch_actor_state = watch_config::WatchActorState::new(); try!(watch_actor_state.set_watches(&config)); + let pkg_lock = Arc::new(RwLock::new(package)); + let pkg_lock_1 = pkg_lock.clone(); + + let service_config_lock = Arc::new(RwLock::new(service_config)); + let service_config_lock_1 = service_config_lock.clone(); + Ok(Worker{ - package: package, + package: pkg_lock, + package_name: package_name, topology: topology, config: config, census: census, @@ -191,7 +205,8 @@ impl<'a> Worker<'a> { .name("census-reader".to_string()) .start(census_actor_state) .unwrap(), - service_config: service_config, + service_config: service_config_lock, + sidecar_actor: sidecar::Sidecar::start(pkg_lock_1, service_config_lock_1), user_actor: wonder::actor::Builder::new(user_config::UserActor) .name("user-config".to_string()) .start(user_actor_state) @@ -207,7 +222,12 @@ impl<'a> Worker<'a> { /// Prints a preamble for the topology's println statements pub fn preamble(&self) -> String { - format!("{}({})", self.package.name, White.bold().paint("T")) + format!("{}({})", self.package_name, White.bold().paint("T")) + } + + pub fn signal_package(&self, signal: Signal) -> BldrResult { + let package = self.package.read().unwrap(); + package.signal(signal) } /// Join the supervisor thread, and check for errors @@ -252,45 +272,49 @@ impl<'a> Worker<'a> { /// * The discovery subsystem returns an error /// * The topology state machine returns an error fn run_internal<'a>(sm: &mut StateMachine, BldrError>, worker: &mut Worker<'a>) -> BldrResult<()> { - try!(worker.package.create_srvc_path()); - try!(worker.package.copy_run(&worker.service_config)); + { + let package = worker.package.read().unwrap(); + let service_config = worker.service_config.read().unwrap(); + try!(package.create_srvc_path()); + try!(package.copy_run(&service_config)); + } let handler = wonder::actor::Builder::new(SignalNotifier).name("signal-handler".to_string()).start(()).unwrap(); - println!(" {}({}): Watching census", worker.package.name, White.bold().paint("D")); - println!(" {}({}): Watching config", worker.package.name, White.bold().paint("D")); + println!(" {}({}): Watching census", worker.package_name, White.bold().paint("D")); + println!(" {}({}): Watching config", worker.package_name, White.bold().paint("D")); loop { match handler.receiver.try_recv() { Ok(wonder::actor::Message::Cast(signals::Message::Signal(signals::Signal::SIGHUP))) => { println!(" {}: Sending SIGHUP", worker.preamble()); - try!(worker.package.signal(Signal::Hup)); + try!(worker.signal_package(Signal::Hup)); }, Ok(wonder::actor::Message::Cast(signals::Message::Signal(signals::Signal::SIGINT))) => { println!(" {}: Sending 'force-shutdown' on SIGINT", worker.preamble()); - try!(worker.package.signal(Signal::ForceShutdown)); + try!(worker.signal_package(Signal::ForceShutdown)); try!(worker.join_supervisor()); break; }, Ok(wonder::actor::Message::Cast(signals::Message::Signal(signals::Signal::SIGQUIT))) => { - try!(worker.package.signal(Signal::Quit)); + try!(worker.signal_package(Signal::Quit)); println!(" {}: Sending SIGQUIT", worker.preamble()); }, Ok(wonder::actor::Message::Cast(signals::Message::Signal(signals::Signal::SIGALRM))) => { - try!(worker.package.signal(Signal::Alarm)); + try!(worker.signal_package(Signal::Alarm)); println!(" {}: Sending SIGALRM", worker.preamble()); }, Ok(wonder::actor::Message::Cast(signals::Message::Signal(signals::Signal::SIGTERM))) => { println!(" {}: Sending 'force-shutdown' on SIGTERM", worker.preamble()); - try!(worker.package.signal(Signal::ForceShutdown)); + try!(worker.signal_package(Signal::ForceShutdown)); try!(worker.join_supervisor()); break; }, Ok(wonder::actor::Message::Cast(signals::Message::Signal(signals::Signal::SIGUSR1))) => { println!(" {}: Sending SIGUSR1", worker.preamble()); - try!(worker.package.signal(Signal::One)); + try!(worker.signal_package(Signal::One)); }, Ok(wonder::actor::Message::Cast(signals::Message::Signal(signals::Signal::SIGUSR2))) => { println!(" {}: Sending SIGUSR1", worker.preamble()); - try!(worker.package.signal(Signal::Two)); + try!(worker.signal_package(Signal::Two)); }, Ok(_) => {}, Err(TryRecvError::Empty) => {}, @@ -336,46 +360,53 @@ fn run_internal<'a>(sm: &mut StateMachine, BldrError>, worker: { let mut ce = try!(worker.census.me_mut()); if ce.needs_write.is_some() { - try!(census::CensusEntryActor::write(&worker.census_entry_actor, ce.as_etcd_write(&worker.package, &worker.config))); + let package = worker.package.read().unwrap(); + try!(census::CensusEntryActor::write(&worker.census_entry_actor, ce.as_etcd_write(&package, &worker.config))); } } - // Manage the entire census + // Obtain write lock on service config { - if let Some(census_string) = try!(census::CensusActor::census_string(&worker.census_actor)) { - try!(worker.census.update(&census_string)); - } - if !worker.census.in_event { - if worker.census.needs_write { - let census_toml = try!(worker.census.to_toml()); - worker.service_config.census(census_toml); + let mut service_config = worker.service_config.write().unwrap(); + + // Manage the entire census + { + if let Some(census_string) = try!(census::CensusActor::census_string(&worker.census_actor)) { + try!(worker.census.update(&census_string)); + } + if !worker.census.in_event { + if worker.census.needs_write { + let census_toml = try!(worker.census.to_toml()); + service_config.census(census_toml); + } } } - } - // Manage the user configuration from discovery - { - match try!(user_config::UserActor::config_string(&worker.user_actor)) { - Some(user_string) => worker.service_config.user(user_string), - None => worker.service_config.user(String::new()), + // Manage the user configuration from discovery + { + match try!(user_config::UserActor::config_string(&worker.user_actor)) { + Some(user_string) => service_config.user(user_string), + None => service_config.user(String::new()), + } } - } - // Manage the watch configuration from discovery - { - match try!(watch_config::WatchActor::config_string(&worker.watch_actor)) { - Some(watch_string) => worker.service_config.watch(watch_string), - None => worker.service_config.watch(String::new()), + // Manage the watch configuration from discovery + { + match try!(watch_config::WatchActor::config_string(&worker.watch_actor)) { + Some(watch_string) => service_config.watch(watch_string), + None => service_config.watch(String::new()), + } } - } - // Don't bother trying to reconfigure if we are in an event - just wait till - // everything settles down. - if !worker.census.in_event { - // Write the configuration, and restart if needed - if try!(worker.service_config.write(&worker.package)) { - try!(worker.package.copy_run(&worker.service_config)); - try!(worker.package.reconfigure(&worker.service_config)); + // Don't bother trying to reconfigure if we are in an event - just wait till + // everything settles down. + if !worker.census.in_event { + let package = worker.package.read().unwrap(); + // Write the configuration, and restart if needed + if try!(service_config.write(&package)) { + try!(package.copy_run(&service_config)); + try!(package.reconfigure(&service_config)); + } } } diff --git a/src/bldr/topology/standalone.rs b/src/bldr/topology/standalone.rs index 8b86b8943b..d662377a15 100644 --- a/src/bldr/topology/standalone.rs +++ b/src/bldr/topology/standalone.rs @@ -79,14 +79,14 @@ pub fn state_starting(worker: &mut Worker) -> Result<(State, u32), BldrError> { let runit_pkg = try!(Package::latest("runit", None)); let mut child = try!( Command::new(runit_pkg.join_path("bin/runsv")) - .arg(&format!("/opt/bldr/srvc/{}", worker.package.name)) + .arg(&format!("/opt/bldr/srvc/{}", worker.package_name)) .stdin(Stdio::null()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .spawn() ); worker.supervisor_id = Some(child.id()); - let pkg = worker.package.name.clone(); + let pkg = worker.package_name.clone(); let supervisor_thread = try!(thread::Builder::new().name(String::from("supervisor")).spawn(move|| -> BldrResult<()> { { let mut c_stdout = match child.stdout { diff --git a/src/main.rs b/src/main.rs index 308a4310f1..c490808fa1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -30,7 +30,6 @@ use std::ffi::CString; use std::ptr; use bldr::config::{Command, Config}; -use bldr::sidecar; use bldr::error::{BldrResult, BldrError}; use bldr::command::*; @@ -212,7 +211,6 @@ fn install(config: &Config) -> BldrResult<()> { fn start(config: &Config) -> BldrResult<()> { banner(); println!("Starting {}", Yellow.bold().paint(config.package())); - try!(sidecar::run(config.package())); try!(start::package(config)); println!("Finished with {}", Yellow.bold().paint(config.package())); Ok(())