From d7925531bd551f1f13a897735221262ca25d944c Mon Sep 17 00:00:00 2001 From: Stefan Kroboth Date: Tue, 13 Sep 2022 17:08:58 +0200 Subject: [PATCH] server --- .../src/collectors/slurm_epilog_2/server.rs | 112 ++++++++---------- 1 file changed, 51 insertions(+), 61 deletions(-) diff --git a/auditor/src/collectors/slurm_epilog_2/server.rs b/auditor/src/collectors/slurm_epilog_2/server.rs index b1fce6d5e..87c402f2a 100644 --- a/auditor/src/collectors/slurm_epilog_2/server.rs +++ b/auditor/src/collectors/slurm_epilog_2/server.rs @@ -42,6 +42,7 @@ use once_cell::sync::Lazy; static CONFIG: Lazy = Lazy::new(|| get_configuration().expect("Failed loading configuration")); +static KEYS: Lazy> = Lazy::new(|| CONFIG.get_keys()); type Job = HashMap; type TransmitChannel = mpsc::Sender<(u64, Responder)>; @@ -69,19 +70,16 @@ async fn main() -> Result<(), anyhow::Error> { tracing::debug!("Listening on {}", addr); - let client = Arc::new(AuditorClient::new(&CONFIG.addr, CONFIG.port)?); - let (tx, rx) = mpsc::channel(1024); - let manager = Manager::new(rx); + let _manager = Manager::new(rx); loop { match listener.accept().await { Ok((socket, _)) => { let tx = tx.clone(); - let client = client.clone(); tokio::spawn(async move { - if let Err(e) = handle_connection(socket, client, tx).await { + if let Err(e) = handle_connection(socket, tx).await { tracing::error!("Failure during handling of the conection: {}", e); } }); @@ -92,33 +90,61 @@ async fn main() -> Result<(), anyhow::Error> { } pub struct QueueProcessor { - job_queue: Arc>>, queue_processor: tokio::task::JoinHandle>, } impl QueueProcessor { #[tracing::instrument(name = "Starting queue processor")] pub fn new(job_queue: Arc>>) -> QueueProcessor { - let jq = job_queue.clone(); let queue_processor = tokio::spawn(async move { + let client = AuditorClient::new(&CONFIG.addr, CONFIG.port)?; loop { - let mut jq = job_queue.lock().unwrap(); - if !jq.is_empty() { - if let Some(jobid) = jq.pop_front() { - //todo - } else { - tracing::error!("Job queue unexpectedly empty."); - } + let job_id = { + let mut jq = job_queue.lock().unwrap(); + jq.pop_front() + }; + + //todo + if let Some(job_id) = job_id { + match get_slurm_job_info(job_id).await { + Ok(job) => { + tracing::debug!(?job, "Acquired SLURM job info"); + let record = RecordAdd::new( + format!("{}-{}", make_string_valid(&CONFIG.record_prefix), job_id), + make_string_valid(&CONFIG.site_id), + make_string_valid(&job["UserId"].extract_string()?), + make_string_valid(&job["GroupId"].extract_string()?), + construct_components(&CONFIG, &job), + job["StartTime"].extract_datetime()?, + ) + .expect("Could not construct record") + .with_stop_time(job["EndTime"].extract_datetime()?); + + tracing::debug!(?record, "Constructed record."); + + tracing::info!("Sending record to AUDITOR instance."); + client.add(&record).await?; + // do something with job + Message::Ok + } + Err(e) => { + tracing::error!( + "Could not obtain job info for job {}: {:?}", + job_id, + e + ); + Message::Error { + msg: "Could not obtain job info".to_string(), + } + } + }; } std::thread::sleep(std::time::Duration::from_secs(5)); } // Ok::<(), anyhow::Error>(()) }); - QueueProcessor { - job_queue: jq, - queue_processor, - } + QueueProcessor { queue_processor } } #[tracing::instrument(name = "Stopping queue processor", skip(self))] @@ -133,16 +159,15 @@ pub struct Manager { } impl Manager { - #[tracing::instrument(name = "Starting manager")] + #[tracing::instrument(name = "Starting manager", skip(rx))] pub fn new(mut rx: ReceiveChannel) -> Manager { let job_queue = Arc::new(Mutex::new(VecDeque::new())); let queue_processor = QueueProcessor::new(job_queue.clone()); - let jq = job_queue.clone(); let manager = tokio::spawn(async move { // Start receiving messages while let Some((job_id, responder)) = rx.recv().await { - let mut job_queue = jq.lock().unwrap(); - job_queue.push_back(job_id); + let mut jq = job_queue.lock().unwrap(); + jq.push_back(job_id); let _ = responder.send(()); } Ok::<(), anyhow::Error>(()) @@ -161,10 +186,9 @@ impl Manager { } } -#[tracing::instrument(name = "Handling new connection", skip(stream, client, tx))] +#[tracing::instrument(name = "Handling new connection", skip(stream, tx))] async fn handle_connection( mut stream: TcpStream, - client: Arc, tx: TransmitChannel, ) -> Result<(), anyhow::Error> { let mut buffer = [0; 1024]; @@ -178,8 +202,6 @@ async fn handle_connection( .context("Failed to deserialize message.")?; tracing::debug!("Received message: {:?}", message); - let keys = CONFIG.get_keys(); - let response = match message { Message::JobInfo { job_id } => { tracing::info!("Received job id {}", job_id); @@ -188,38 +210,8 @@ async fn handle_connection( tx.send((job_id, resp_tx)).await?; - let _ = resp_rx.await?; + resp_rx.await?; Message::Ok - - // match get_slurm_job_info(job_id, keys).await { - // Ok(job) => { - // tracing::debug!(?job, "Acquired SLURM job info"); - // let record = RecordAdd::new( - // format!("{}-{}", make_string_valid(&CONFIG.record_prefix), job_id), - // make_string_valid(&CONFIG.site_id), - // make_string_valid(&job["UserId"].extract_string()?), - // make_string_valid(&job["GroupId"].extract_string()?), - // construct_components(&CONFIG, &job), - // job["StartTime"].extract_datetime()?, - // ) - // .expect("Could not construct record") - // .with_stop_time(job["EndTime"].extract_datetime()?); - - // tracing::debug!(?record, "Constructed record."); - - // tracing::info!("Sending record to AUDITOR instance."); - // client.add(&record).await?; - // // do something with job - // Message::Ok - // } - // Err(e) => { - // tracing::error!("Could not obtain job info for job {}: {:?}", job_id, e); - // Message::Error { - // msg: "Could not obtain job info".to_string(), - // } - // } - // }; - // Message::Ok } msg => { tracing::warn!("Received unacceptable message: {:?}", msg); @@ -234,10 +226,8 @@ async fn handle_connection( } #[tracing::instrument(name = "Getting Slurm job info via sacct")] -async fn get_slurm_job_info( - job_id: u64, - mut keys: Vec<(String, ParsableType)>, -) -> Result { +async fn get_slurm_job_info(job_id: u64) -> Result { + let mut keys = KEYS.clone(); keys.push(("Start".to_owned(), ParsableType::DateTime)); keys.push(("End".to_owned(), ParsableType::DateTime)); keys.push(("User".to_owned(), ParsableType::Id));