Skip to content

Commit

Permalink
feat(uptime): Add region to uptime results and stats (#196)
Browse files Browse the repository at this point in the history
We need to start returning regions in our results. Also including them
in our various stats so that we can break things down by uptime region
if necessary
  • Loading branch information
wedamija authored Jan 7, 2025
1 parent 8317d06 commit b9aad03
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 30 deletions.
24 changes: 16 additions & 8 deletions src/check_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl ScheduledCheck {

impl CheckResult {
/// Produce a missed check result from a scheduled check.
pub fn missed_from(config: &CheckConfig, tick: &Tick) -> Self {
pub fn missed_from(config: &CheckConfig, tick: &Tick, region: &str) -> Self {
Self {
guid: Uuid::new_v4(),
subscription_id: config.subscription_id,
Expand All @@ -54,6 +54,7 @@ impl CheckResult {
actual_check_time: Utc::now(),
duration: None,
request_info: None,
region: region.to_string(),
}
}
}
Expand All @@ -64,12 +65,14 @@ pub fn run_executor(
concurrency: usize,
checker: Arc<impl Checker + 'static>,
producer: Arc<impl ResultsProducer + 'static>,
region: String,
) -> (CheckSender, JoinHandle<()>) {
tracing::info!("executor.starting");

let (sender, reciever) = mpsc::unbounded_channel();
let executor =
tokio::spawn(async move { executor_loop(concurrency, checker, producer, reciever).await });
let executor = tokio::spawn(async move {
executor_loop(concurrency, checker, producer, reciever, region).await
});

(sender, executor)
}
Expand Down Expand Up @@ -100,13 +103,15 @@ async fn executor_loop(
checker: Arc<impl Checker + 'static>,
producer: Arc<impl ResultsProducer + 'static>,
reciever: UnboundedReceiver<ScheduledCheck>,
region: String,
) {
let schedule_check_stream: UnboundedReceiverStream<_> = reciever.into();

schedule_check_stream
.for_each_concurrent(concurrency, |scheduled_check| {
let job_checker = checker.clone();
let job_producer = producer.clone();
let job_region = region.clone();

// TODO(epurkhiser): Record metrics on the size of the size of the queue

Expand All @@ -121,9 +126,9 @@ async fn executor_loop(
let interval = TimeDelta::seconds(config.interval as i64);

let check_result = if late_by > interval {
CheckResult::missed_from(config, tick)
CheckResult::missed_from(config, tick, &job_region)
} else {
job_checker.check_url(config, tick).await
job_checker.check_url(config, tick, &job_region).await
};

if let Err(e) = job_producer.produce_checker_result(&check_result) {
Expand Down Expand Up @@ -178,6 +183,7 @@ fn record_result_metrics(result: &CheckResult) {
"status" => status_label,
"failure_reason" => failure_reason.unwrap_or("ok"),
"status_code" => status_code.clone(),
"uptime_region" => result.region.clone(),
)
.record(duration.to_std().unwrap().as_secs_f64());
}
Expand All @@ -194,6 +200,7 @@ fn record_result_metrics(result: &CheckResult) {
"status" => status_label,
"failure_reason" => failure_reason.unwrap_or("ok"),
"status_code" => status_code.clone(),
"uptime_region" => result.region.clone(),
)
.record(delay);

Expand All @@ -203,6 +210,7 @@ fn record_result_metrics(result: &CheckResult) {
"status" => status_label,
"failure_reason" => failure_reason.unwrap_or("ok"),
"status_code" => status_code,
"uptime_region" => result.region.clone(),
)
.increment(1);
}
Expand All @@ -228,7 +236,7 @@ mod tests {
let checker = Arc::new(DummyChecker::new(Duration::from_secs(1)));
let producer = Arc::new(DummyResultsProducer::new("uptime-results"));

let (sender, _) = run_executor(1, checker, producer);
let (sender, _) = run_executor(1, checker, producer, "us-west".to_string());

let tick = Tick::from_time(Utc::now());
let config = Arc::new(CheckConfig {
Expand All @@ -254,7 +262,7 @@ mod tests {
let producer = Arc::new(DummyResultsProducer::new("uptime-results"));

// Only allow 2 configs to execute concurrently
let (sender, _) = run_executor(2, checker, producer);
let (sender, _) = run_executor(2, checker, producer, "us-west".to_string());

// Send 4 configs into the executor
let mut configs: Vec<Receiver<CheckResult>> = (0..4)
Expand Down Expand Up @@ -304,7 +312,7 @@ mod tests {
let checker = Arc::new(DummyChecker::new(Duration::from_secs(1)));
let producer = Arc::new(DummyResultsProducer::new("uptime-results"));

let (sender, _) = run_executor(1, checker, producer);
let (sender, _) = run_executor(1, checker, producer, "us-west".to_string());

let tick = Tick::from_time(Utc::now() - TimeDelta::minutes(2));
let config = Arc::new(CheckConfig {
Expand Down
1 change: 1 addition & 0 deletions src/checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@ pub trait Checker: Send + Sync {
&self,
config: &CheckConfig,
tick: &Tick,
region: &str,
) -> impl Future<Output = CheckResult> + Send;
}
3 changes: 2 additions & 1 deletion src/checker/dummy_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl DummyChecker {
}

impl Checker for DummyChecker {
async fn check_url(&self, config: &CheckConfig, tick: &Tick) -> CheckResult {
async fn check_url(&self, config: &CheckConfig, tick: &Tick, region: &str) -> CheckResult {
let scheduled_check_time = tick.time();
let actual_check_time = Utc::now();
let trace_id = TraceId::default();
Expand All @@ -49,6 +49,7 @@ impl Checker for DummyChecker {
actual_check_time,
duration,
request_info,
region: region.to_string(),
}
}
}
21 changes: 13 additions & 8 deletions src/checker/http_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ impl Checker for HttpChecker {
/// Makes a request to a url to determine whether it is up.
/// Up is defined as returning a 2xx within a specific timeframe.
#[tracing::instrument]
async fn check_url(&self, config: &CheckConfig, tick: &Tick) -> CheckResult {
async fn check_url(&self, config: &CheckConfig, tick: &Tick, region: &str) -> CheckResult {
let scheduled_check_time = tick.time();
let actual_check_time = Utc::now();
let trace_id = TraceId::default();
Expand Down Expand Up @@ -192,6 +192,7 @@ impl Checker for HttpChecker {
actual_check_time,
duration,
request_info,
region: region.to_string(),
}
}
}
Expand Down Expand Up @@ -236,7 +237,7 @@ mod tests {
};

let tick = make_tick();
let result = checker.check_url(&config, &tick).await;
let result = checker.check_url(&config, &tick, "us-west").await;

assert_eq!(result.status, CheckStatus::Success);
assert_eq!(
Expand Down Expand Up @@ -277,7 +278,7 @@ mod tests {
};

let tick = make_tick();
let result = checker.check_url(&config, &tick).await;
let result = checker.check_url(&config, &tick, "us-west").await;

assert_eq!(result.status, CheckStatus::Success);
assert_eq!(
Expand Down Expand Up @@ -313,7 +314,7 @@ mod tests {
};

let tick = make_tick();
let result = checker.check_url(&config, &tick).await;
let result = checker.check_url(&config, &tick, "us-west").await;

assert_eq!(result.status, CheckStatus::Failure);
assert!(result.duration.is_some_and(|d| d > timeout));
Expand Down Expand Up @@ -346,7 +347,7 @@ mod tests {
};

let tick = make_tick();
let result = checker.check_url(&config, &tick).await;
let result = checker.check_url(&config, &tick, "us-west").await;

assert_eq!(result.status, CheckStatus::Failure);
assert_eq!(
Expand Down Expand Up @@ -375,7 +376,7 @@ mod tests {
};

let tick = make_tick();
let result = checker.check_url(&localhost_config, &tick).await;
let result = checker.check_url(&localhost_config, &tick, "us-west").await;

assert_eq!(result.status, CheckStatus::Failure);
assert_eq!(result.request_info.and_then(|i| i.http_status_code), None);
Expand All @@ -400,7 +401,9 @@ mod tests {
..Default::default()
};
let tick = make_tick();
let result = checker.check_url(&restricted_ip_config, &tick).await;
let result = checker
.check_url(&restricted_ip_config, &tick, "us-west")
.await;

assert_eq!(result.status, CheckStatus::Failure);
assert_eq!(result.request_info.and_then(|i| i.http_status_code), None);
Expand All @@ -420,7 +423,9 @@ mod tests {
..Default::default()
};
let tick = make_tick();
let result = checker.check_url(&restricted_ipv6_config, &tick).await;
let result = checker
.check_url(&restricted_ipv6_config, &tick, "us-west")
.await;
assert_eq!(
result.status_reason.map(|r| r.description),
Some("destination is restricted".to_string())
Expand Down
19 changes: 15 additions & 4 deletions src/config_waiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub fn wait_for_partition_boot(
config_store: Arc<RwConfigStore>,
partition: u16,
shutdown: CancellationToken,
region: String,
) -> Receiver<BootResult> {
let start = Instant::now();
let (boot_finished, boot_finished_rx) = oneshot::channel::<BootResult>();
Expand Down Expand Up @@ -97,9 +98,9 @@ pub fn wait_for_partition_boot(
"config_consumer.partition_boot_complete",
);

metrics::gauge!("config_consumer.partition_boot_time_ms", "partition" => partition.to_string())
metrics::gauge!("config_consumer.partition_boot_time_ms", "partition" => partition.to_string(), "uptime_region" => region.to_string())
.set(boot_time_ms as f64);
metrics::gauge!("config_consumer.partition_total_configs", "partition" => partition.to_string())
metrics::gauge!("config_consumer.partition_total_configs", "partition" => partition.to_string(), "uptime_region" => region.to_string())
.set(total_configs as f64);

boot_finished
Expand All @@ -125,7 +126,12 @@ mod tests {
let config_store = Arc::new(ConfigStore::new_rw());

let shutdown_signal = CancellationToken::new();
let wait_booted = wait_for_partition_boot(config_store.clone(), 0, shutdown_signal.clone());
let wait_booted = wait_for_partition_boot(
config_store.clone(),
0,
shutdown_signal.clone(),
"us-west".to_string(),
);
tokio::pin!(wait_booted);

// nothing produced yet. Move time right before to the BOOT_MAX_IDLE.
Expand Down Expand Up @@ -166,7 +172,12 @@ mod tests {
let config_store = Arc::new(ConfigStore::new_rw());

let shutdown_signal = CancellationToken::new();
let wait_booted = wait_for_partition_boot(config_store.clone(), 0, shutdown_signal.clone());
let wait_booted = wait_for_partition_boot(
config_store.clone(),
0,
shutdown_signal.clone(),
"us-west".to_string(),
);
tokio::pin!(wait_booted);

// nothing produced yet. Move time right before to the BOOT_MAX_IDLE.
Expand Down
16 changes: 12 additions & 4 deletions src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,12 @@ impl PartitionedService {
// otherwise we may execute checks for old configs in a partition that are removed later in
// the log.
let shutdown_signal = CancellationToken::new();
let config_loaded =
wait_for_partition_boot(waiter_config_store, partition, shutdown_signal.clone());
let config_loaded = wait_for_partition_boot(
waiter_config_store,
partition,
shutdown_signal.clone(),
config.region.clone(),
);

let scheduler_join_handle = run_scheduler(
partition,
Expand Down Expand Up @@ -117,8 +121,12 @@ impl Manager {

// XXX: Executor will shutdown once the sender goes out of scope. This will happen once all
// referneces of the Sender (executor_sender) are dropped.
let (executor_sender, executor_join_handle) =
run_executor(config.checker_concurrency, checker, producer);
let (executor_sender, executor_join_handle) = run_executor(
config.checker_concurrency,
checker,
producer,
config.region.clone(),
);

let (shutdown_sender, shutdown_service_rx) = mpsc::unbounded_channel();

Expand Down
1 change: 1 addition & 0 deletions src/producer/kafka_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ mod tests {
request_type: RequestMethod::Get,
http_status_code: Some(200),
}),
region: "us-west-1".to_string(),
};
// TODO: Have an actual Kafka running for a real test. At the moment this is fine since
// it will fail async
Expand Down
12 changes: 9 additions & 3 deletions src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ async fn scheduler_loop(
.expect("Lock poisoned")
.get_configs(tick);
tracing::debug!(%tick, bucket_size = configs.len(), "scheduler.tick_scheduled");

metrics::gauge!("scheduler.bucket_size").set(configs.len() as f64);
metrics::gauge!("scheduler.bucket_size", "uptime_region" => region.clone())
.set(configs.len() as f64);

let mut results = vec![];

Expand All @@ -98,7 +98,7 @@ async fn scheduler_loop(

metrics::counter!(
"scheduler.skipped_region",
"checker_region" => region.clone(),
"uptime_region" => region.clone(),
)
.increment(1);
}
Expand Down Expand Up @@ -269,6 +269,7 @@ mod tests {
actual_check_time: Utc::now(),
duration: Some(Duration::seconds(1)),
request_info: None,
region: config.region.clone(),
});
scheduled_check2.record_result(CheckResult {
guid: Uuid::new_v4(),
Expand All @@ -281,6 +282,7 @@ mod tests {
actual_check_time: Utc::now(),
duration: Some(Duration::seconds(1)),
request_info: None,
region: config.region.clone(),
});

shutdown_token.cancel();
Expand Down Expand Up @@ -377,6 +379,7 @@ mod tests {
actual_check_time: Utc::now(),
duration: Some(Duration::seconds(1)),
request_info: None,
region: config.region.clone(),
});
scheduled_check2.record_result(CheckResult {
guid: Uuid::new_v4(),
Expand All @@ -389,6 +392,7 @@ mod tests {
actual_check_time: Utc::now(),
duration: Some(Duration::seconds(1)),
request_info: None,
region: config.region.clone(),
});

shutdown_token.cancel();
Expand Down Expand Up @@ -483,6 +487,7 @@ mod tests {
actual_check_time: Utc::now(),
duration: Some(Duration::seconds(1)),
request_info: None,
region: config.region.clone(),
});
scheduled_check2.record_result(CheckResult {
guid: Uuid::new_v4(),
Expand All @@ -495,6 +500,7 @@ mod tests {
actual_check_time: Utc::now(),
duration: Some(Duration::seconds(1)),
request_info: None,
region: config.region.clone(),
});

shutdown_token.cancel();
Expand Down
Loading

0 comments on commit b9aad03

Please sign in to comment.