Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

In-memory tracking for active worker count #710

Merged
merged 1 commit into from
Nov 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/agent/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,15 @@ impl ResponseExt for ::reqwest::blocking::Response {
pub struct AgentApi {
url: String,
token: String,
random_id: String,
}

impl AgentApi {
pub fn new(url: &str, token: &str) -> Self {
AgentApi {
url: url.to_string(),
token: token.to_string(),
random_id: format!("{:X}{:X}", rand::random::<u64>(), rand::random::<u64>()),
}
}

Expand Down Expand Up @@ -200,6 +202,9 @@ impl AgentApi {
self.retry(|this| {
let _: bool = this
.build_request(Method::POST, "heartbeat")
.json(&json!({
"id": self.random_id,
}))
.send()?
.to_api_response()?;
Ok(())
Expand Down
30 changes: 29 additions & 1 deletion src/server/agents.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use crate::prelude::*;
use crate::server::tokens::Tokens;
use chrono::Duration;
use chrono::{DateTime, Utc};
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::{Arc, Mutex};

/// Number of seconds without an heartbeat after an agent should be considered unreachable.
const INACTIVE_AFTER: i64 = 300;
Expand Down Expand Up @@ -74,15 +76,41 @@ impl Agent {
#[derive(Clone)]
pub struct Agents {
db: Database,
// worker -> timestamp
current_workers: Arc<Mutex<HashMap<String, (WorkerInfo, std::time::Instant)>>>,
}

#[derive(Deserialize)]
pub struct WorkerInfo {
id: String,
}

impl Agents {
pub fn new(db: Database, tokens: &Tokens) -> Fallible<Self> {
let agents = Agents { db };
let agents = Agents {
db,
current_workers: Arc::new(Mutex::new(HashMap::new())),
};
agents.synchronize(tokens)?;
Ok(agents)
}

pub fn active_worker_count(&self) -> usize {
let mut guard = self.current_workers.lock().unwrap();
guard.retain(|_, (_, timestamp)| {
// It's been 10 minutes since we heard from this worker, drop it from our active list.
timestamp.elapsed() > std::time::Duration::from_secs(60 * 10)
});
guard.len()
}

pub fn add_worker(&self, id: WorkerInfo) {
self.current_workers
.lock()
.unwrap()
.insert(id.id.clone(), (id, std::time::Instant::now()));
}

fn synchronize(&self, tokens: &Tokens) -> Fallible<()> {
self.db.transaction(|trans| {
let mut real = tokens.agents.values().collect::<HashSet<&String>>();
Expand Down
10 changes: 10 additions & 0 deletions src/server/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const AGENT_WORK_METRIC: &str = "crater_agent_supposed_to_work";
const AGENT_FAILED: &str = "crater_agent_failure";
const LAST_CRATES_UPDATE_METRIC: &str = "crater_last_crates_update";
const ENDPOINT_TIME: &str = "crater_endpoint_time_seconds";
const WORKER_COUNT: &str = "crater_worker_count";

#[derive(Clone)]
pub struct Metrics {
Expand All @@ -19,6 +20,7 @@ pub struct Metrics {
crater_work_status: IntGaugeVec,
crater_last_crates_update: IntGauge,
pub crater_endpoint_time: HistogramVec,
crater_worker_count: IntGauge,
}

impl Metrics {
Expand Down Expand Up @@ -46,16 +48,24 @@ impl Metrics {
&["endpoint"]
)?;

let crater_worker_count = prometheus::opts!(WORKER_COUNT, "number of active workers");
let crater_worker_count = prometheus::register_int_gauge!(crater_worker_count)?;

Ok(Metrics {
crater_completed_jobs_total,
crater_bounced_record_progress,
crater_agent_failure,
crater_work_status,
crater_last_crates_update,
crater_endpoint_time,
crater_worker_count,
})
}

pub fn record_worker_count(&self, count: usize) {
self.crater_worker_count.set(count as i64);
}

pub fn record_error(&self, agent: &str, experiment: &str) {
self.crater_agent_failure
.with_label_values(&[agent, experiment])
Expand Down
11 changes: 10 additions & 1 deletion src/server/routes/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::agent::Capabilities;
use crate::experiments::{Assignee, Experiment};
use crate::prelude::*;
use crate::results::{DatabaseDB, EncodingType, ProgressData};
use crate::server::agents::WorkerInfo;
use crate::server::api_types::{AgentConfig, ApiResponse};
use crate::server::auth::{auth_filter, AuthDetails};
use crate::server::messages::Message;
Expand Down Expand Up @@ -68,6 +69,7 @@ pub fn routes(
let heartbeat = warp::post()
.and(warp::path("heartbeat"))
.and(warp::path::end())
.and(warp::body::json())
.and(data_filter)
.and(auth_filter(data.clone()))
.map(endpoint_heartbeat);
Expand Down Expand Up @@ -318,12 +320,19 @@ fn endpoint_record_progress(
ret
}

fn endpoint_heartbeat(data: Arc<Data>, auth: AuthDetails) -> Fallible<Response<Body>> {
fn endpoint_heartbeat(
id: WorkerInfo,
data: Arc<Data>,
auth: AuthDetails,
) -> Fallible<Response<Body>> {
data.agents.add_worker(id);
if let Some(rev) = auth.git_revision {
data.agents.set_git_revision(&auth.name, &rev)?;
}

data.agents.record_heartbeat(&auth.name)?;
data.metrics
.record_worker_count(data.agents.active_worker_count());
Ok(ApiResponse::Success { result: true }.into_response()?)
}

Expand Down
Loading