Skip to content

Commit

Permalink
Slurm collector fixes (#811,#812,#681,#178)
Browse files Browse the repository at this point in the history
  • Loading branch information
Raphael Kleinemühl authored and raghuvar-vijay committed Dec 10, 2024
1 parent 2313c9d commit 1944857
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 36 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Dependencies: Update setuptools from 75.3.0 to 75.6.0 ([@dirksammel](https://github.com/dirksammel))
- Dependencies: Update sqlx from 0.7.4 to 0.8.2 (missed some occurrences) ([@dirksammel](https://github.com/dirksammel))
- Apel plugin: Update timestamp JSON atomically ([@maxfischer2781](https://github.com/maxfischer2781))
- Slurm collector: Fix timezone offset of local timestamp `lastcheck` (#681, #178) ([@rkleinem](https://github.com/rkleinem))
- Slurm collector: Ignore `.extern` steps instead of handling them as separate jobs (#812) ([@rkleinem](https://github.com/rkleinem))
- Slurm collector: Ignore cancelled jobs which have never been started (#811) ([@rkleinem](https://github.com/rkleinem))

### Removed
- Dependencies: Remove opentelemetry_api (replaced by opentelemetry) ([@dirksammel](https://github.com/dirksammel))
Expand Down
95 changes: 59 additions & 36 deletions collectors/slurm/src/sacctcaller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use auditor::{
constants::FORBIDDEN_CHARACTERS,
domain::{Component, RecordAdd, Score},
};
use chrono::{DateTime, FixedOffset, Local, Utc};
use chrono::{DateTime, Local, Utc};
use color_eyre::eyre::{eyre, Result};
use itertools::Itertools;
use once_cell::sync::Lazy;
Expand All @@ -23,7 +23,7 @@ use crate::{
configuration::{AllowedTypes, ComponentConfig, KeyConfig, ParsableType, Settings},
database::Database,
shutdown::Shutdown,
CONFIG, END, GROUP, JOBID, KEYS, START, USER,
CONFIG, END, GROUP, JOBID, KEYS, START, STATE, USER,
};

type SacctRow = HashMap<String, Option<AllowedTypes>>;
Expand All @@ -36,7 +36,12 @@ static BATCH_REGEX: Lazy<Regex> = Lazy::new(|| {
});

static SUB_REGEX: Lazy<Regex> = Lazy::new(|| {
Regex::new(r"^[0-9]+\.[0-9]*$")
Regex::new(r"^[0-9_]+\.[0-9]*$")
.expect("Could not construct essential Regex for matching job ids.")
});

static EXTERN_REGEX: Lazy<Regex> = Lazy::new(|| {
Regex::new(r"^[0-9_]+\.extern$")
.expect("Could not construct essential Regex for matching job ids.")
});

Expand Down Expand Up @@ -164,7 +169,6 @@ async fn get_job_info(database: &Database) -> Result<Vec<RecordAdd>> {
let (nextcheck, rid) = if records.is_empty() {
(lastcheck, last_record_id)
} else {
let local_offset = Local::now().offset().utc_minus_local();
let (ts, rid) = records.iter().fold(
(chrono::DateTime::<Utc>::MIN_UTC, String::new()),
|(acc, _acc_record_id), r| {
Expand All @@ -175,10 +179,7 @@ async fn get_job_info(database: &Database) -> Result<Vec<RecordAdd>> {
},
);
(
DateTime::<Local>::from_naive_utc_and_offset(
ts.naive_utc(),
FixedOffset::east_opt(local_offset).unwrap(),
),
DateTime::<Local>::from_naive_utc_and_offset(ts.naive_utc(), *Local::now().offset()),
rid,
)
};
Expand Down Expand Up @@ -227,37 +228,59 @@ fn tokenize_sacct_output(output: &str, keys: Vec<KeyConfig>) -> SacctRows {
#[tracing::instrument(name = "Parse sacct rows", skip(sacct_rows, keys))]
fn parse_sacct_rows(sacct_rows: SacctRows, keys: &[KeyConfig]) -> Result<Vec<Job>> {
tracing::debug!("sacct_rows = {:?}", sacct_rows);
sacct_rows
let mut jobs = Vec::with_capacity(sacct_rows.len());
for id in sacct_rows
.keys()
.filter(|k| !BATCH_REGEX.is_match(k))
.filter(|k| !SUB_REGEX.is_match(k))
.map(|id| -> Result<Job> {
let map1 = sacct_rows.get(id).ok_or(eyre!("Cannot get map1"))?;
let map2 = sacct_rows.get(&format!("{id}.batch"));
Ok(keys.iter()
.cloned()
.filter_map(|KeyConfig {name: k, key_type: _, allow_empty: _}| {
let val = match map1.get(&k) {
Some(Some(v)) => Some(v.clone()),
_ => {
if let Some(map2) = map2 {
match map2.get(&k) {
Some(Some(v)) => Some(v.clone()),
_ => {
tracing::error!("Something went wrong during parsing (map1, id: {id}, key: {k}, value: {:?})", map2.get(&k));
None
},
}
} else {
tracing::error!("Something went wrong during parsing (map2, id: {id}, key: {k}, value: {:?})", map1.get(&k));
None
.filter(|k| !EXTERN_REGEX.is_match(k))
{
let map1 = sacct_rows.get(id).ok_or(eyre!("Cannot get map1"))?;
let map2 = sacct_rows.get(&format!("{id}.batch"));
// A state might look like "CANCELLED by 1000"
let cancelled = map1
.get(STATE)
.and_then(Option::as_ref)
.ok_or(eyre!("Job {} has no state.", id))?
.extract_string()?
.starts_with("CANCELLED");
if cancelled
&& map1.get(START).and_then(Option::as_ref).is_none()
&& map2.and_then(|m| m.get(START)).is_none()
{
tracing::debug!(
"Ignore Job {} since it was cancelled before it was started.",
id
);
continue;
};
let job = keys
.iter()
.cloned()
.filter_map(|KeyConfig {name: k, key_type: _, allow_empty: _}| {
let val = match map1.get(&k) {
Some(Some(v)) => Some(v.clone()),
_ => {
if let Some(map2) = map2 {
match map2.get(&k) {
Some(Some(v)) => Some(v.clone()),
_ => {
tracing::error!("Something went wrong during parsing (map1, id: {id}, key: {k}, value: {:?})", map2.get(&k));
None
},
}
},
};
val.map(|val| (k, val))
})
.collect::<Job>())
}).collect::<Result<Vec<Job>>>()
} else {
tracing::error!("Something went wrong during parsing (map2, id: {id}, key: {k}, value: {:?})", map1.get(&k));
None
}
},
};
val.map(|val| (k, val))
})
.collect::<Job>();
jobs.push(job);
}
Ok(jobs)
}

#[tracing::instrument(
Expand Down Expand Up @@ -450,7 +473,7 @@ fn construct_component_scores(job: &Job, component_config: &ComponentConfig) ->
#[cfg(test)]
mod tests {
use auditor::domain::{ValidAmount, ValidName, ValidValue};
use chrono::NaiveDateTime;
use chrono::{FixedOffset, NaiveDateTime};

use super::*;
use crate::{
Expand Down

0 comments on commit 1944857

Please sign in to comment.