Skip to content

Commit

Permalink
server
Browse files Browse the repository at this point in the history
  • Loading branch information
stefan-k committed Sep 14, 2022
1 parent 16957bd commit d792553
Showing 1 changed file with 51 additions and 61 deletions.
112 changes: 51 additions & 61 deletions auditor/src/collectors/slurm_epilog_2/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use once_cell::sync::Lazy;

static CONFIG: Lazy<Settings> =
Lazy::new(|| get_configuration().expect("Failed loading configuration"));
static KEYS: Lazy<Vec<(String, ParsableType)>> = Lazy::new(|| CONFIG.get_keys());

type Job = HashMap<String, AllowedTypes>;
type TransmitChannel = mpsc::Sender<(u64, Responder)>;
Expand Down Expand Up @@ -69,19 +70,16 @@ async fn main() -> Result<(), anyhow::Error> {

tracing::debug!("Listening on {}", addr);

let client = Arc::new(AuditorClient::new(&CONFIG.addr, CONFIG.port)?);

let (tx, rx) = mpsc::channel(1024);

let manager = Manager::new(rx);
let _manager = Manager::new(rx);

loop {
match listener.accept().await {
Ok((socket, _)) => {
let tx = tx.clone();
let client = client.clone();
tokio::spawn(async move {
if let Err(e) = handle_connection(socket, client, tx).await {
if let Err(e) = handle_connection(socket, tx).await {
tracing::error!("Failure during handling of the conection: {}", e);
}
});
Expand All @@ -92,33 +90,61 @@ async fn main() -> Result<(), anyhow::Error> {
}

pub struct QueueProcessor {
job_queue: Arc<Mutex<VecDeque<u64>>>,
queue_processor: tokio::task::JoinHandle<Result<(), anyhow::Error>>,
}

impl QueueProcessor {
#[tracing::instrument(name = "Starting queue processor")]
pub fn new(job_queue: Arc<Mutex<VecDeque<u64>>>) -> QueueProcessor {
let jq = job_queue.clone();
let queue_processor = tokio::spawn(async move {
let client = AuditorClient::new(&CONFIG.addr, CONFIG.port)?;
loop {
let mut jq = job_queue.lock().unwrap();
if !jq.is_empty() {
if let Some(jobid) = jq.pop_front() {
//todo
} else {
tracing::error!("Job queue unexpectedly empty.");
}
let job_id = {
let mut jq = job_queue.lock().unwrap();
jq.pop_front()
};

//todo
if let Some(job_id) = job_id {
match get_slurm_job_info(job_id).await {
Ok(job) => {
tracing::debug!(?job, "Acquired SLURM job info");
let record = RecordAdd::new(
format!("{}-{}", make_string_valid(&CONFIG.record_prefix), job_id),
make_string_valid(&CONFIG.site_id),
make_string_valid(&job["UserId"].extract_string()?),
make_string_valid(&job["GroupId"].extract_string()?),
construct_components(&CONFIG, &job),
job["StartTime"].extract_datetime()?,
)
.expect("Could not construct record")
.with_stop_time(job["EndTime"].extract_datetime()?);

tracing::debug!(?record, "Constructed record.");

tracing::info!("Sending record to AUDITOR instance.");
client.add(&record).await?;
// do something with job
Message::Ok
}
Err(e) => {
tracing::error!(
"Could not obtain job info for job {}: {:?}",
job_id,
e
);
Message::Error {
msg: "Could not obtain job info".to_string(),
}
}
};
}
std::thread::sleep(std::time::Duration::from_secs(5));
}

// Ok::<(), anyhow::Error>(())
});
QueueProcessor {
job_queue: jq,
queue_processor,
}
QueueProcessor { queue_processor }
}

#[tracing::instrument(name = "Stopping queue processor", skip(self))]
Expand All @@ -133,16 +159,15 @@ pub struct Manager {
}

impl Manager {
#[tracing::instrument(name = "Starting manager")]
#[tracing::instrument(name = "Starting manager", skip(rx))]
pub fn new(mut rx: ReceiveChannel) -> Manager {
let job_queue = Arc::new(Mutex::new(VecDeque::new()));
let queue_processor = QueueProcessor::new(job_queue.clone());
let jq = job_queue.clone();
let manager = tokio::spawn(async move {
// Start receiving messages
while let Some((job_id, responder)) = rx.recv().await {
let mut job_queue = jq.lock().unwrap();
job_queue.push_back(job_id);
let mut jq = job_queue.lock().unwrap();
jq.push_back(job_id);
let _ = responder.send(());
}
Ok::<(), anyhow::Error>(())
Expand All @@ -161,10 +186,9 @@ impl Manager {
}
}

#[tracing::instrument(name = "Handling new connection", skip(stream, client, tx))]
#[tracing::instrument(name = "Handling new connection", skip(stream, tx))]
async fn handle_connection(
mut stream: TcpStream,
client: Arc<AuditorClient>,
tx: TransmitChannel,
) -> Result<(), anyhow::Error> {
let mut buffer = [0; 1024];
Expand All @@ -178,8 +202,6 @@ async fn handle_connection(
.context("Failed to deserialize message.")?;
tracing::debug!("Received message: {:?}", message);

let keys = CONFIG.get_keys();

let response = match message {
Message::JobInfo { job_id } => {
tracing::info!("Received job id {}", job_id);
Expand All @@ -188,38 +210,8 @@ async fn handle_connection(

tx.send((job_id, resp_tx)).await?;

let _ = resp_rx.await?;
resp_rx.await?;
Message::Ok

// match get_slurm_job_info(job_id, keys).await {
// Ok(job) => {
// tracing::debug!(?job, "Acquired SLURM job info");
// let record = RecordAdd::new(
// format!("{}-{}", make_string_valid(&CONFIG.record_prefix), job_id),
// make_string_valid(&CONFIG.site_id),
// make_string_valid(&job["UserId"].extract_string()?),
// make_string_valid(&job["GroupId"].extract_string()?),
// construct_components(&CONFIG, &job),
// job["StartTime"].extract_datetime()?,
// )
// .expect("Could not construct record")
// .with_stop_time(job["EndTime"].extract_datetime()?);

// tracing::debug!(?record, "Constructed record.");

// tracing::info!("Sending record to AUDITOR instance.");
// client.add(&record).await?;
// // do something with job
// Message::Ok
// }
// Err(e) => {
// tracing::error!("Could not obtain job info for job {}: {:?}", job_id, e);
// Message::Error {
// msg: "Could not obtain job info".to_string(),
// }
// }
// };
// Message::Ok
}
msg => {
tracing::warn!("Received unacceptable message: {:?}", msg);
Expand All @@ -234,10 +226,8 @@ async fn handle_connection(
}

#[tracing::instrument(name = "Getting Slurm job info via sacct")]
async fn get_slurm_job_info(
job_id: u64,
mut keys: Vec<(String, ParsableType)>,
) -> Result<Job, anyhow::Error> {
async fn get_slurm_job_info(job_id: u64) -> Result<Job, anyhow::Error> {
let mut keys = KEYS.clone();
keys.push(("Start".to_owned(), ParsableType::DateTime));
keys.push(("End".to_owned(), ParsableType::DateTime));
keys.push(("User".to_owned(), ParsableType::Id));
Expand Down

0 comments on commit d792553

Please sign in to comment.