diff --git a/src/check_executor.rs b/src/check_executor.rs index eb020e4..d095245 100644 --- a/src/check_executor.rs +++ b/src/check_executor.rs @@ -42,7 +42,7 @@ impl ScheduledCheck { impl CheckResult { /// Produce a missed check result from a scheduled check. - pub fn missed_from(config: &CheckConfig, tick: &Tick) -> Self { + pub fn missed_from(config: &CheckConfig, tick: &Tick, region: &str) -> Self { Self { guid: Uuid::new_v4(), subscription_id: config.subscription_id, @@ -54,6 +54,7 @@ impl CheckResult { actual_check_time: Utc::now(), duration: None, request_info: None, + region: region.to_string(), } } } @@ -64,12 +65,14 @@ pub fn run_executor( concurrency: usize, checker: Arc, producer: Arc, + region: String, ) -> (CheckSender, JoinHandle<()>) { tracing::info!("executor.starting"); let (sender, reciever) = mpsc::unbounded_channel(); - let executor = - tokio::spawn(async move { executor_loop(concurrency, checker, producer, reciever).await }); + let executor = tokio::spawn(async move { + executor_loop(concurrency, checker, producer, reciever, region).await + }); (sender, executor) } @@ -100,6 +103,7 @@ async fn executor_loop( checker: Arc, producer: Arc, reciever: UnboundedReceiver, + region: String, ) { let schedule_check_stream: UnboundedReceiverStream<_> = reciever.into(); @@ -107,6 +111,7 @@ async fn executor_loop( .for_each_concurrent(concurrency, |scheduled_check| { let job_checker = checker.clone(); let job_producer = producer.clone(); + let job_region = region.clone(); // TODO(epurkhiser): Record metrics on the size of the size of the queue @@ -121,9 +126,9 @@ async fn executor_loop( let interval = TimeDelta::seconds(config.interval as i64); let check_result = if late_by > interval { - CheckResult::missed_from(config, tick) + CheckResult::missed_from(config, tick, &job_region) } else { - job_checker.check_url(config, tick).await + job_checker.check_url(config, tick, &job_region).await }; if let Err(e) = job_producer.produce_checker_result(&check_result) { @@ -178,6 +183,7 @@ fn record_result_metrics(result: &CheckResult) { "status" => status_label, "failure_reason" => failure_reason.unwrap_or("ok"), "status_code" => status_code.clone(), + "uptime_region" => result.region.clone(), ) .record(duration.to_std().unwrap().as_secs_f64()); } @@ -194,6 +200,7 @@ fn record_result_metrics(result: &CheckResult) { "status" => status_label, "failure_reason" => failure_reason.unwrap_or("ok"), "status_code" => status_code.clone(), + "uptime_region" => result.region.clone(), ) .record(delay); @@ -203,6 +210,7 @@ fn record_result_metrics(result: &CheckResult) { "status" => status_label, "failure_reason" => failure_reason.unwrap_or("ok"), "status_code" => status_code, + "uptime_region" => result.region.clone(), ) .increment(1); } @@ -228,7 +236,7 @@ mod tests { let checker = Arc::new(DummyChecker::new(Duration::from_secs(1))); let producer = Arc::new(DummyResultsProducer::new("uptime-results")); - let (sender, _) = run_executor(1, checker, producer); + let (sender, _) = run_executor(1, checker, producer, "us-west".to_string()); let tick = Tick::from_time(Utc::now()); let config = Arc::new(CheckConfig { @@ -254,7 +262,7 @@ mod tests { let producer = Arc::new(DummyResultsProducer::new("uptime-results")); // Only allow 2 configs to execute concurrently - let (sender, _) = run_executor(2, checker, producer); + let (sender, _) = run_executor(2, checker, producer, "us-west".to_string()); // Send 4 configs into the executor let mut configs: Vec> = (0..4) @@ -304,7 +312,7 @@ mod tests { let checker = Arc::new(DummyChecker::new(Duration::from_secs(1))); let producer = Arc::new(DummyResultsProducer::new("uptime-results")); - let (sender, _) = run_executor(1, checker, producer); + let (sender, _) = run_executor(1, checker, producer, "us-west".to_string()); let tick = Tick::from_time(Utc::now() - TimeDelta::minutes(2)); let config = Arc::new(CheckConfig { diff --git a/src/checker.rs b/src/checker.rs index 71b3a8b..f83cf1b 100644 --- a/src/checker.rs +++ b/src/checker.rs @@ -18,5 +18,6 @@ pub trait Checker: Send + Sync { &self, config: &CheckConfig, tick: &Tick, + region: &str, ) -> impl Future + Send; } diff --git a/src/checker/dummy_checker.rs b/src/checker/dummy_checker.rs index 9df470e..28b670c 100644 --- a/src/checker/dummy_checker.rs +++ b/src/checker/dummy_checker.rs @@ -24,7 +24,7 @@ impl DummyChecker { } impl Checker for DummyChecker { - async fn check_url(&self, config: &CheckConfig, tick: &Tick) -> CheckResult { + async fn check_url(&self, config: &CheckConfig, tick: &Tick, region: &str) -> CheckResult { let scheduled_check_time = tick.time(); let actual_check_time = Utc::now(); let trace_id = TraceId::default(); @@ -49,6 +49,7 @@ impl Checker for DummyChecker { actual_check_time, duration, request_info, + region: region.to_string(), } } } diff --git a/src/checker/http_checker.rs b/src/checker/http_checker.rs index 610310b..a2d04a0 100644 --- a/src/checker/http_checker.rs +++ b/src/checker/http_checker.rs @@ -127,7 +127,7 @@ impl Checker for HttpChecker { /// Makes a request to a url to determine whether it is up. /// Up is defined as returning a 2xx within a specific timeframe. #[tracing::instrument] - async fn check_url(&self, config: &CheckConfig, tick: &Tick) -> CheckResult { + async fn check_url(&self, config: &CheckConfig, tick: &Tick, region: &str) -> CheckResult { let scheduled_check_time = tick.time(); let actual_check_time = Utc::now(); let trace_id = TraceId::default(); @@ -192,6 +192,7 @@ impl Checker for HttpChecker { actual_check_time, duration, request_info, + region: region.to_string(), } } } @@ -236,7 +237,7 @@ mod tests { }; let tick = make_tick(); - let result = checker.check_url(&config, &tick).await; + let result = checker.check_url(&config, &tick, "us-west").await; assert_eq!(result.status, CheckStatus::Success); assert_eq!( @@ -277,7 +278,7 @@ mod tests { }; let tick = make_tick(); - let result = checker.check_url(&config, &tick).await; + let result = checker.check_url(&config, &tick, "us-west").await; assert_eq!(result.status, CheckStatus::Success); assert_eq!( @@ -313,7 +314,7 @@ mod tests { }; let tick = make_tick(); - let result = checker.check_url(&config, &tick).await; + let result = checker.check_url(&config, &tick, "us-west").await; assert_eq!(result.status, CheckStatus::Failure); assert!(result.duration.is_some_and(|d| d > timeout)); @@ -346,7 +347,7 @@ mod tests { }; let tick = make_tick(); - let result = checker.check_url(&config, &tick).await; + let result = checker.check_url(&config, &tick, "us-west").await; assert_eq!(result.status, CheckStatus::Failure); assert_eq!( @@ -375,7 +376,7 @@ mod tests { }; let tick = make_tick(); - let result = checker.check_url(&localhost_config, &tick).await; + let result = checker.check_url(&localhost_config, &tick, "us-west").await; assert_eq!(result.status, CheckStatus::Failure); assert_eq!(result.request_info.and_then(|i| i.http_status_code), None); @@ -400,7 +401,9 @@ mod tests { ..Default::default() }; let tick = make_tick(); - let result = checker.check_url(&restricted_ip_config, &tick).await; + let result = checker + .check_url(&restricted_ip_config, &tick, "us-west") + .await; assert_eq!(result.status, CheckStatus::Failure); assert_eq!(result.request_info.and_then(|i| i.http_status_code), None); @@ -420,7 +423,9 @@ mod tests { ..Default::default() }; let tick = make_tick(); - let result = checker.check_url(&restricted_ipv6_config, &tick).await; + let result = checker + .check_url(&restricted_ipv6_config, &tick, "us-west") + .await; assert_eq!( result.status_reason.map(|r| r.description), Some("destination is restricted".to_string()) diff --git a/src/config_waiter.rs b/src/config_waiter.rs index 8469616..d453d18 100644 --- a/src/config_waiter.rs +++ b/src/config_waiter.rs @@ -39,6 +39,7 @@ pub fn wait_for_partition_boot( config_store: Arc, partition: u16, shutdown: CancellationToken, + region: String, ) -> Receiver { let start = Instant::now(); let (boot_finished, boot_finished_rx) = oneshot::channel::(); @@ -97,9 +98,9 @@ pub fn wait_for_partition_boot( "config_consumer.partition_boot_complete", ); - metrics::gauge!("config_consumer.partition_boot_time_ms", "partition" => partition.to_string()) + metrics::gauge!("config_consumer.partition_boot_time_ms", "partition" => partition.to_string(), "uptime_region" => region.to_string()) .set(boot_time_ms as f64); - metrics::gauge!("config_consumer.partition_total_configs", "partition" => partition.to_string()) + metrics::gauge!("config_consumer.partition_total_configs", "partition" => partition.to_string(), "uptime_region" => region.to_string()) .set(total_configs as f64); boot_finished @@ -125,7 +126,12 @@ mod tests { let config_store = Arc::new(ConfigStore::new_rw()); let shutdown_signal = CancellationToken::new(); - let wait_booted = wait_for_partition_boot(config_store.clone(), 0, shutdown_signal.clone()); + let wait_booted = wait_for_partition_boot( + config_store.clone(), + 0, + shutdown_signal.clone(), + "us-west".to_string(), + ); tokio::pin!(wait_booted); // nothing produced yet. Move time right before to the BOOT_MAX_IDLE. @@ -166,7 +172,12 @@ mod tests { let config_store = Arc::new(ConfigStore::new_rw()); let shutdown_signal = CancellationToken::new(); - let wait_booted = wait_for_partition_boot(config_store.clone(), 0, shutdown_signal.clone()); + let wait_booted = wait_for_partition_boot( + config_store.clone(), + 0, + shutdown_signal.clone(), + "us-west".to_string(), + ); tokio::pin!(wait_booted); // nothing produced yet. Move time right before to the BOOT_MAX_IDLE. diff --git a/src/manager.rs b/src/manager.rs index 5a6437f..3e49117 100644 --- a/src/manager.rs +++ b/src/manager.rs @@ -46,8 +46,12 @@ impl PartitionedService { // otherwise we may execute checks for old configs in a partition that are removed later in // the log. let shutdown_signal = CancellationToken::new(); - let config_loaded = - wait_for_partition_boot(waiter_config_store, partition, shutdown_signal.clone()); + let config_loaded = wait_for_partition_boot( + waiter_config_store, + partition, + shutdown_signal.clone(), + config.region.clone(), + ); let scheduler_join_handle = run_scheduler( partition, @@ -117,8 +121,12 @@ impl Manager { // XXX: Executor will shutdown once the sender goes out of scope. This will happen once all // referneces of the Sender (executor_sender) are dropped. - let (executor_sender, executor_join_handle) = - run_executor(config.checker_concurrency, checker, producer); + let (executor_sender, executor_join_handle) = run_executor( + config.checker_concurrency, + checker, + producer, + config.region.clone(), + ); let (shutdown_sender, shutdown_service_rx) = mpsc::unbounded_channel(); diff --git a/src/producer/kafka_producer.rs b/src/producer/kafka_producer.rs index 7d8295b..afea7d2 100644 --- a/src/producer/kafka_producer.rs +++ b/src/producer/kafka_producer.rs @@ -84,6 +84,7 @@ mod tests { request_type: RequestMethod::Get, http_status_code: Some(200), }), + region: "us-west-1".to_string(), }; // TODO: Have an actual Kafka running for a real test. At the moment this is fine since // it will fail async diff --git a/src/scheduler.rs b/src/scheduler.rs index 3be8073..94214bc 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -85,8 +85,8 @@ async fn scheduler_loop( .expect("Lock poisoned") .get_configs(tick); tracing::debug!(%tick, bucket_size = configs.len(), "scheduler.tick_scheduled"); - - metrics::gauge!("scheduler.bucket_size").set(configs.len() as f64); + metrics::gauge!("scheduler.bucket_size", "uptime_region" => region.clone()) + .set(configs.len() as f64); let mut results = vec![]; @@ -98,7 +98,7 @@ async fn scheduler_loop( metrics::counter!( "scheduler.skipped_region", - "checker_region" => region.clone(), + "uptime_region" => region.clone(), ) .increment(1); } @@ -269,6 +269,7 @@ mod tests { actual_check_time: Utc::now(), duration: Some(Duration::seconds(1)), request_info: None, + region: config.region.clone(), }); scheduled_check2.record_result(CheckResult { guid: Uuid::new_v4(), @@ -281,6 +282,7 @@ mod tests { actual_check_time: Utc::now(), duration: Some(Duration::seconds(1)), request_info: None, + region: config.region.clone(), }); shutdown_token.cancel(); @@ -377,6 +379,7 @@ mod tests { actual_check_time: Utc::now(), duration: Some(Duration::seconds(1)), request_info: None, + region: config.region.clone(), }); scheduled_check2.record_result(CheckResult { guid: Uuid::new_v4(), @@ -389,6 +392,7 @@ mod tests { actual_check_time: Utc::now(), duration: Some(Duration::seconds(1)), request_info: None, + region: config.region.clone(), }); shutdown_token.cancel(); @@ -483,6 +487,7 @@ mod tests { actual_check_time: Utc::now(), duration: Some(Duration::seconds(1)), request_info: None, + region: config.region.clone(), }); scheduled_check2.record_result(CheckResult { guid: Uuid::new_v4(), @@ -495,6 +500,7 @@ mod tests { actual_check_time: Utc::now(), duration: Some(Duration::seconds(1)), request_info: None, + region: config.region.clone(), }); shutdown_token.cancel(); diff --git a/src/types/result.rs b/src/types/result.rs index 8c0ef17..ea99180 100644 --- a/src/types/result.rs +++ b/src/types/result.rs @@ -96,6 +96,9 @@ pub struct CheckResult { /// Information about the check request made. Will be empty if the check was missed pub request_info: Option, + + /// Region slug that produced the check result + pub region: String, } #[cfg(test)] @@ -122,7 +125,8 @@ mod tests { "request_info": { "request_type": "HEAD", "http_status_code": 500 - } + }, + "region": "us-west-1" }"#; let check_result = serde_json::from_str::(json).unwrap(); @@ -146,7 +150,8 @@ mod tests { "request_info": { "request_type": "HEAD", "http_status_code": 200 - } + }, + "region": "us-west-1" }"#; let check_result = serde_json::from_str::(json).unwrap();