diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index aabec457c..78308d08f 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -76,7 +76,7 @@ jobs: run: cargo llvm-cov --all-features --workspace --lcov --output-path lcov.info - name: Upload coverage to Codecov - uses: codecov/codecov-action@v3 + uses: codecov/codecov-action@v5 with: files: lcov.info fail_ci_if_error: true diff --git a/.github/workflows/general.yml b/.github/workflows/general.yml index 99d9d001e..089cc1234 100644 --- a/.github/workflows/general.yml +++ b/.github/workflows/general.yml @@ -185,7 +185,7 @@ jobs: uses: actions/checkout@v4 - name: Check spelling - uses: crate-ci/typos@v1.27.3 + uses: crate-ci/typos@v1.28.2 test_prometheus_exporter: name: Prometheus exporter tests diff --git a/.github/workflows/python_code_tests.yml b/.github/workflows/python_code_tests.yml index f2ecede85..3054c6d56 100644 --- a/.github/workflows/python_code_tests.yml +++ b/.github/workflows/python_code_tests.yml @@ -33,7 +33,7 @@ jobs: - name: Run pytest run: pytest - name: Upload coverage - uses: codecov/codecov-action@v3 + uses: codecov/codecov-action@v5 with: files: ./plugins/apel/lcov.info fail_ci_if_error: true diff --git a/CHANGELOG.md b/CHANGELOG.md index 72d9f37bb..4851455b6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,18 +10,31 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - pyauditor + Apel plugin + HTCondor collector: drop support for Python 3.8 ([@dirksammel](https://github.com/dirksammel)) ### Security +- [RUSTSEC-2024-0363]: Update sqlx from 0.7.4 to 0.8.2 (missed some occurrences) ([@dirksammel](https://github.com/dirksammel)) +- [RUSTSEC-2024-0399]: Update rustls from 0.23.16 to 0.23.19 ([@dirksammel](https://github.com/dirksammel)) +- [RUSTSEC-2024-0402]: Update hashbrown from 0.15.0 to 0.15.2 ([@dirksammel](https://github.com/dirksammel)) ### Added +- Apel plugin: Add function for user->VO mapping to config ([@dirksammel](https://github.com/dirksammel)) - CI: Add workflow to test publishing to the PyPI test repo ([@dirksammel](https://github.com/dirksammel)) - Kubernetes collector: Added a Kubernetes collector ([@rkleinem](https://github.com/rkleinem)) ### Changed - Auditor Docker container: Switch from fixed to latest Rust version ([@dirksammel](https://github.com/dirksammel)) -- Dependencies: Update crate-ci/typos from 1.26.8 to 1.27.3 ([@dirksammel](https://github.com/dirksammel)) -- Dependencies: Update ruff from 0.7.1 to 0.7.3 ([@dirksammel](https://github.com/dirksammel)) -- Dependencies: Update setuptools from 75.3.0 to 75.4.0 ([@dirksammel](https://github.com/dirksammel)) +- Dependencies: Switch from pyo3-asyncio 0.20.0 to pyo3-async-runtimes 0.22.0 ([@dirksammel](https://github.com/dirksammel)) +- Dependencies: Update codecov/codecov-action from 3 to 5 ([@dirksammel](https://github.com/dirksammel)) +- Dependencies: Update crate-ci/typos from 1.26.8 to 1.28.2 ([@dirksammel](https://github.com/dirksammel)) +- Dependencies: Update cryptography from 43.0.3 to 44.0.0 ([@dirksammel](https://github.com/dirksammel)) +- Dependencies: Update pydantic from 2.9.2 to 2.10.3 ([@dirksammel](https://github.com/dirksammel)) +- Dependencies: Update pyo3 from 0.20.3 to 0.22.5 ([@dirksammel](https://github.com/dirksammel)) +- Dependencies: Update pytest from 8.3.3 to 8.3.4 ([@dirksammel](https://github.com/dirksammel)) +- Dependencies: Update ruff from 0.7.1 to 0.8.2 ([@dirksammel](https://github.com/dirksammel)) +- Dependencies: Update setuptools from 75.3.0 to 75.6.0 ([@dirksammel](https://github.com/dirksammel)) - Dependencies: Update sqlx from 0.7.4 to 0.8.2 (missed some occurrences) ([@dirksammel](https://github.com/dirksammel)) - Apel plugin: Update timestamp JSON atomically ([@maxfischer2781](https://github.com/maxfischer2781)) +- Slurm collector: Fix timezone offset of local timestamp `lastcheck` (#681, #178) ([@rkleinem](https://github.com/rkleinem)) +- Slurm collector: Ignore `.extern` steps instead of handling them as separate jobs (#812) ([@rkleinem](https://github.com/rkleinem)) +- Slurm collector: Ignore cancelled jobs which have never been started (#811) ([@rkleinem](https://github.com/rkleinem)) ### Removed - Dependencies: Remove opentelemetry_api (replaced by opentelemetry) ([@dirksammel](https://github.com/dirksammel)) @@ -32,14 +45,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - AUDITOR client: rename function `does_not_contains` to `does_not_contain` ### Security -- [RUSTSEC-2024-0363]: Update sqlx from 0.7.4 to 0.8.2 ([@raghuvar-vijay](https://github.com/raghuvar-vijay)) +- [RUSTSEC-2024-0363]: Update sqlx from 0.7.4 to 0.8.2 ([@raghuvar-vijay](https://github.com/raghuvar-vijay)) - [CVE-2024-45311]: Update quinn-proto from 0.11.3 to 0.11.8 ([@raghuvar-vijay](https://github.com/raghuvar-vijay)) ### Added - CI: Add checks for import sorting and formatting to python linting workflow ([@dirksammel](https://github.com/dirksammel)) ### Changed -- Dependencies: Update sqlx from 0.7.4 to 0.8.2 ([@raghuvar-vijay](https://github.com/raghuvar-vijay)) - Dependencies: Update black from 24.4.2 to 24.10.0 ([@dirksammel](https://github.com/dirksammel)) - Dependencies: Update build from 1.2.1 to 1.2.2.post1 ([@dirksammel](https://github.com/dirksammel)) - Dependencies: Update crate-ci/typos from 1.23.3 to 1.26.8 ([@dirksammel](https://github.com/dirksammel)) @@ -56,12 +68,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Dependencies: Update serde_json from 1.0.121 to 1.0.132 ([@raghuvar-vijay](https://github.com/raghuvar-vijay)) - Dependencies: Update setuptools from 71.1.0 to 75.3.0 ([@dirksammel](https://github.com/dirksammel)) - Dependencies: Update shalzz/zola-deploy-action from 0.19.1 to 0.19.2 ([@dirksammel](https://github.com/dirksammel)) +- Dependencies: Update sqlx from 0.7.4 to 0.8.2 ([@raghuvar-vijay](https://github.com/raghuvar-vijay)) - Dependencies: Update tarides/changelog-check-action from 2 to 3 ([@dirksammel](https://github.com/dirksammel)) - Dependencies: Update types-pyyaml from 6.0.12.20240311 to 6.0.12.20240917 ([@dirksammel](https://github.com/dirksammel)) - Dependencies: Update types-requests from 2.32.0.20240712 to 2.32.0.20241016 ([@dirksammel](https://github.com/dirksammel)) - Dependencies: update wiremock from 0.6.1 to 0.6.2 ([@raghuvar-vijay](https://github.com/raghuvar-vijay)) - HTCondor collector: Fix bug that the machine score was stored as an integer, not as a float ([@mschnepf](https://github.com/mschnepf)) -- Docs: Typo fixes in query documentation ([@dirksammel](https://github.com/dirksammel)) +- Docs: Typo fixes in query documentation ([@dirksammel](https://github.com/dirksammel)) - Docker files: Linting ([@dirksammel](https://github.com/dirksammel)) ### Removed @@ -271,7 +284,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Breaking changes ### Security - + ### Added - Docs: Add steps for creating a new release ([@QuantumDancer](https://github.com/QuantumDancer)) - CI: Add Python 3.12 to workflows ([@dirksammel](https://github.com/dirksammel)) @@ -289,7 +302,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [0.3.0] - 2023-11-17 ### Breaking changes -- Auditor: Standardize REST APIs. Routes have changed to single endpoint '/record' with methods such as 'GET', 'PUT', 'POST' (#465) ([@raghuvar-vijay](https://github.com/raghuvar-vijay)) +- Auditor: Standardize REST APIs. Routes have changed to single endpoint '/record' with methods such as 'GET', 'PUT', 'POST' (#465) ([@raghuvar-vijay](https://github.com/raghuvar-vijay)) - Priority plugin: 'auditor' configuration has to be present in the config file. 'prometheus' configuration is optional (#456) ([@raghuvar-vijay](https://github.com/raghuvar-vijay)) - Slurm collector: New filter options to filter slurm jobs are added. The `job_status` key in the config is moved to the `job_filter` section and is renamed to `status` (#472) ([@QuantumDancer](https://github.com/QuantumDancer)) diff --git a/Cargo.lock b/Cargo.lock index cb6c84f5d..cd15b8f9e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "actix-codec" @@ -1132,7 +1132,7 @@ version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1e6a265c649f3f5979b601d26f1d05ada116434c87741c9493cb56218f76cbc" dependencies = [ - "heck 0.5.0", + "heck", "proc-macro2", "quote", "syn 2.0.89", @@ -1477,12 +1477,6 @@ dependencies = [ "hashbrown 0.14.5", ] -[[package]] -name = "heck" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" - [[package]] name = "heck" version = "0.5.0" @@ -2769,9 +2763,9 @@ checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" [[package]] name = "pyo3" -version = "0.20.3" +version = "0.22.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53bdbb96d49157e65d45cc287af5f32ffadd5f4761438b527b055fb0d4bb8233" +checksum = "f402062616ab18202ae8319da13fa4279883a2b8a9d9f83f20dbade813ce1884" dependencies = [ "anyhow", "cfg-if", @@ -2779,7 +2773,7 @@ dependencies = [ "indoc", "libc", "memoffset", - "parking_lot", + "once_cell", "portable-atomic", "pyo3-build-config", "pyo3-ffi", @@ -2788,35 +2782,35 @@ dependencies = [ ] [[package]] -name = "pyo3-asyncio" -version = "0.20.0" +name = "pyo3-async-runtimes" +version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ea6b68e93db3622f3bb3bf363246cf948ed5375afe7abff98ccbdd50b184995" +checksum = "2529f0be73ffd2be0cc43c013a640796558aa12d7ca0aab5cc14f375b4733031" dependencies = [ "futures", "once_cell", "pin-project-lite", "pyo3", - "pyo3-asyncio-macros", + "pyo3-async-runtimes-macros", "tokio", ] [[package]] -name = "pyo3-asyncio-macros" -version = "0.20.0" +name = "pyo3-async-runtimes-macros" +version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56c467178e1da6252c95c29ecf898b133f742e9181dca5def15dc24e19d45a39" +checksum = "22c26fd8e9fc19f53f0c1e00bf61471de6789f7eb263056f7f944a9cceb5823e" dependencies = [ "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.89", ] [[package]] name = "pyo3-build-config" -version = "0.20.3" +version = "0.22.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "deaa5745de3f5231ce10517a1f5dd97d53e5a2fd77aa6b5842292085831d48d7" +checksum = "b14b5775b5ff446dd1056212d778012cbe8a0fbffd368029fd9e25b514479c38" dependencies = [ "once_cell", "target-lexicon", @@ -2824,9 +2818,9 @@ dependencies = [ [[package]] name = "pyo3-ffi" -version = "0.20.3" +version = "0.22.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62b42531d03e08d4ef1f6e85a2ed422eb678b8cd62b762e53891c05faf0d4afa" +checksum = "9ab5bcf04a2cdcbb50c7d6105de943f543f9ed92af55818fd17b660390fc8636" dependencies = [ "libc", "pyo3-build-config", @@ -2834,9 +2828,9 @@ dependencies = [ [[package]] name = "pyo3-macros" -version = "0.20.3" +version = "0.22.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7305c720fa01b8055ec95e484a6eca7a83c841267f0dd5280f0c8b8551d2c158" +checksum = "0fd24d897903a9e6d80b968368a34e1525aeb719d568dba8b3d4bfa5dc67d453" dependencies = [ "proc-macro2", "pyo3-macros-backend", @@ -2846,11 +2840,11 @@ dependencies = [ [[package]] name = "pyo3-macros-backend" -version = "0.20.3" +version = "0.22.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c7e9b68bb9c3149c5b0cade5d07f953d6d125eb4337723c4ccdb665f1f96185" +checksum = "36c011a03ba1e50152b4b394b479826cad97e7a21eb52df179cd91ac411cbfbe" dependencies = [ - "heck 0.4.1", + "heck", "proc-macro2", "pyo3-build-config", "quote", @@ -2866,7 +2860,7 @@ dependencies = [ "auditor-client", "chrono", "pyo3", - "pyo3-asyncio", + "pyo3-async-runtimes", "serde", "serde_json", "tokio", @@ -3690,7 +3684,7 @@ checksum = "1804e8a7c7865599c9c79be146dc8a9fd8cc86935fa641d3ea58e5f0688abaa5" dependencies = [ "dotenvy", "either", - "heck 0.5.0", + "heck", "hex", "once_cell", "proc-macro2", diff --git a/Cargo.toml b/Cargo.toml index 6fcd5cb6d..08c83be5c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,8 +30,8 @@ opentelemetry = "0.23.0" opentelemetry-prometheus = "0.16.0" opentelemetry_sdk = "0.23.0" prometheus = "0.13.4" -pyo3 = { version = "0.20.3", features = ["chrono", "extension-module", "anyhow"] } -pyo3-asyncio = { version = "0.20.0", features = ["attributes", "tokio-runtime"] } +pyo3 = { version = "0.22.5", features = ["chrono", "extension-module", "anyhow"] } +pyo3-async-runtimes = { version = "0.22.0", features = ["attributes", "tokio-runtime"] } quickcheck = "1.0.3" quickcheck_macros = "1.0.0" rand = "0.8.5" diff --git a/auditor-client/src/lib.rs b/auditor-client/src/lib.rs index 47e4f2033..26a136fdc 100644 --- a/auditor-client/src/lib.rs +++ b/auditor-client/src/lib.rs @@ -892,7 +892,7 @@ impl Operator { } } -/// Implementations of conversion traits for the `Value` enum. +// Implementations of conversion traits for the `Value` enum. /// Conversion from chrono DateTime to Value::Datetime. impl From> for Value { diff --git a/collectors/htcondor/pyproject.toml b/collectors/htcondor/pyproject.toml index 3709ee05c..d8e30f212 100644 --- a/collectors/htcondor/pyproject.toml +++ b/collectors/htcondor/pyproject.toml @@ -1,5 +1,5 @@ [build-system] -requires = ["setuptools==75.4.0"] +requires = ["setuptools==75.6.0"] build-backend = "setuptools.build_meta" [tool.setuptools] @@ -20,7 +20,7 @@ readme = "README.md" [project.optional-dependencies] build = [ "build==1.2.2.post1", - "setuptools==75.4.0", + "setuptools==75.6.0", ] [project.scripts] diff --git a/collectors/slurm/src/sacctcaller.rs b/collectors/slurm/src/sacctcaller.rs index ff11d66dc..b37009def 100644 --- a/collectors/slurm/src/sacctcaller.rs +++ b/collectors/slurm/src/sacctcaller.rs @@ -12,7 +12,7 @@ use auditor::{ constants::FORBIDDEN_CHARACTERS, domain::{Component, RecordAdd, Score}, }; -use chrono::{DateTime, FixedOffset, Local, Utc}; +use chrono::{DateTime, Local, Utc}; use color_eyre::eyre::{eyre, Result}; use itertools::Itertools; use once_cell::sync::Lazy; @@ -23,7 +23,7 @@ use crate::{ configuration::{AllowedTypes, ComponentConfig, KeyConfig, ParsableType, Settings}, database::Database, shutdown::Shutdown, - CONFIG, END, GROUP, JOBID, KEYS, START, USER, + CONFIG, END, GROUP, JOBID, KEYS, START, STATE, USER, }; type SacctRow = HashMap>; @@ -36,7 +36,12 @@ static BATCH_REGEX: Lazy = Lazy::new(|| { }); static SUB_REGEX: Lazy = Lazy::new(|| { - Regex::new(r"^[0-9]+\.[0-9]*$") + Regex::new(r"^[0-9_]+\.[0-9]*$") + .expect("Could not construct essential Regex for matching job ids.") +}); + +static EXTERN_REGEX: Lazy = Lazy::new(|| { + Regex::new(r"^[0-9_]+\.extern$") .expect("Could not construct essential Regex for matching job ids.") }); @@ -164,7 +169,6 @@ async fn get_job_info(database: &Database) -> Result> { let (nextcheck, rid) = if records.is_empty() { (lastcheck, last_record_id) } else { - let local_offset = Local::now().offset().utc_minus_local(); let (ts, rid) = records.iter().fold( (chrono::DateTime::::MIN_UTC, String::new()), |(acc, _acc_record_id), r| { @@ -175,10 +179,7 @@ async fn get_job_info(database: &Database) -> Result> { }, ); ( - DateTime::::from_naive_utc_and_offset( - ts.naive_utc(), - FixedOffset::east_opt(local_offset).unwrap(), - ), + DateTime::::from_naive_utc_and_offset(ts.naive_utc(), *Local::now().offset()), rid, ) }; @@ -227,37 +228,62 @@ fn tokenize_sacct_output(output: &str, keys: Vec) -> SacctRows { #[tracing::instrument(name = "Parse sacct rows", skip(sacct_rows, keys))] fn parse_sacct_rows(sacct_rows: SacctRows, keys: &[KeyConfig]) -> Result> { tracing::debug!("sacct_rows = {:?}", sacct_rows); - sacct_rows + let mut jobs = Vec::with_capacity(sacct_rows.len()); + for id in sacct_rows .keys() .filter(|k| !BATCH_REGEX.is_match(k)) .filter(|k| !SUB_REGEX.is_match(k)) - .map(|id| -> Result { - let map1 = sacct_rows.get(id).ok_or(eyre!("Cannot get map1"))?; - let map2 = sacct_rows.get(&format!("{id}.batch")); - Ok(keys.iter() - .cloned() - .filter_map(|KeyConfig {name: k, key_type: _, allow_empty: _}| { - let val = match map1.get(&k) { - Some(Some(v)) => Some(v.clone()), - _ => { - if let Some(map2) = map2 { - match map2.get(&k) { - Some(Some(v)) => Some(v.clone()), - _ => { - tracing::error!("Something went wrong during parsing (map1, id: {id}, key: {k}, value: {:?})", map2.get(&k)); - None - }, - } - } else { - tracing::error!("Something went wrong during parsing (map2, id: {id}, key: {k}, value: {:?})", map1.get(&k)); - None + .filter(|k| !EXTERN_REGEX.is_match(k)) + { + let map1 = sacct_rows.get(id).ok_or(eyre!("Cannot get map1"))?; + let map2 = sacct_rows.get(&format!("{id}.batch")); + // A state might look like "CANCELLED by 1000" + let cancelled = map1 + .get(STATE) + .and_then(Option::as_ref) + .ok_or(eyre!("Job {} has no state.", id))? + .extract_string()? + .starts_with("CANCELLED"); + if cancelled + && map1.get(START).and_then(Option::as_ref).is_none() + && map2 + .and_then(|m| m.get(START)) + .and_then(Option::as_ref) + .is_none() + { + tracing::debug!( + "Ignore Job {} since it was cancelled before it was started.", + id + ); + continue; + }; + let job = keys + .iter() + .cloned() + .filter_map(|KeyConfig {name: k, key_type: _, allow_empty: _}| { + let val = match map1.get(&k) { + Some(Some(v)) => Some(v.clone()), + _ => { + if let Some(map2) = map2 { + match map2.get(&k) { + Some(Some(v)) => Some(v.clone()), + _ => { + tracing::error!("Something went wrong during parsing (map1, id: {id}, key: {k}, value: {:?})", map2.get(&k)); + None + }, } - }, - }; - val.map(|val| (k, val)) - }) - .collect::()) - }).collect::>>() + } else { + tracing::error!("Something went wrong during parsing (map2, id: {id}, key: {k}, value: {:?})", map1.get(&k)); + None + } + }, + }; + val.map(|val| (k, val)) + }) + .collect::(); + jobs.push(job); + } + Ok(jobs) } #[tracing::instrument( @@ -450,7 +476,7 @@ fn construct_component_scores(job: &Job, component_config: &ComponentConfig) -> #[cfg(test)] mod tests { use auditor::domain::{ValidAmount, ValidName, ValidValue}; - use chrono::NaiveDateTime; + use chrono::{FixedOffset, NaiveDateTime}; use super::*; use crate::{ @@ -983,6 +1009,221 @@ mod tests { assert_eq!(parsed_sacct_rows, expected); } + #[test] + fn parse_sacct_rows_unstarted_cancelled_skipped() { + let keys = vec![ + KeyConfig { + name: JOBID.to_owned(), + key_type: ParsableType::String, + allow_empty: false, + }, + KeyConfig { + name: START.to_owned(), + key_type: ParsableType::DateTime, + allow_empty: false, + }, + KeyConfig { + name: END.to_owned(), + key_type: ParsableType::DateTime, + allow_empty: false, + }, + KeyConfig { + name: STATE.to_owned(), + key_type: ParsableType::String, + allow_empty: false, + }, + ]; + + let sacct_rows = SacctRows::from([ + ( + "1234567".to_owned(), + SacctRow::from([ + ( + JOBID.to_owned(), + Some(AllowedTypes::String("1234567".to_owned())), + ), + (START.to_owned(), None), + (END.to_owned(), None), + ( + STATE.to_owned(), + Some(AllowedTypes::String("CANCELLED by 1000".to_owned())), + ), + ]), + ), + ( + "1234567.batch".to_owned(), + SacctRow::from([ + ( + JOBID.to_owned(), + Some(AllowedTypes::String("1234567.batch".to_owned())), + ), + (START.to_owned(), None), + (END.to_owned(), None), + ( + STATE.to_owned(), + Some(AllowedTypes::String("CANCELLED by 1000".to_owned())), + ), + ]), + ), + ]); + + let parsed_sacct_rows = parse_sacct_rows(sacct_rows, &keys).unwrap(); + + let expected = vec![]; + assert_eq!(parsed_sacct_rows, expected); + } + + #[test] + fn parse_sacct_rows_started_cancelled_succeeds() { + let keys = vec![ + KeyConfig { + name: JOBID.to_owned(), + key_type: ParsableType::String, + allow_empty: false, + }, + KeyConfig { + name: START.to_owned(), + key_type: ParsableType::DateTime, + allow_empty: false, + }, + KeyConfig { + name: END.to_owned(), + key_type: ParsableType::DateTime, + allow_empty: false, + }, + KeyConfig { + name: STATE.to_owned(), + key_type: ParsableType::String, + allow_empty: false, + }, + ]; + + let sacct_rows = SacctRows::from([ + ( + "1234567".to_owned(), + SacctRow::from([ + ( + JOBID.to_owned(), + Some(AllowedTypes::String("1234567".to_owned())), + ), + ( + START.to_owned(), + Some(AllowedTypes::DateTime(DateTime::::from( + NaiveDateTime::parse_from_str( + "2023-11-07T10:14:01", + "%Y-%m-%dT%H:%M:%S", + ) + .unwrap() + .and_local_timezone( + FixedOffset::east_opt(Local::now().offset().local_minus_utc()) + .unwrap(), + ) + .unwrap(), + ))), + ), + ( + END.to_owned(), + Some(AllowedTypes::DateTime(DateTime::::from( + NaiveDateTime::parse_from_str( + "2023-11-07T11:39:09", + "%Y-%m-%dT%H:%M:%S", + ) + .unwrap() + .and_local_timezone( + FixedOffset::east_opt(Local::now().offset().local_minus_utc()) + .unwrap(), + ) + .unwrap(), + ))), + ), + ( + STATE.to_owned(), + Some(AllowedTypes::String("CANCELLED by 1000".to_owned())), + ), + ]), + ), + ( + "1234567.batch".to_owned(), + SacctRow::from([ + ( + JOBID.to_owned(), + Some(AllowedTypes::String("1234567.batch".to_owned())), + ), + ( + START.to_owned(), + Some(AllowedTypes::DateTime(DateTime::::from( + NaiveDateTime::parse_from_str( + "2023-11-07T10:14:01", + "%Y-%m-%dT%H:%M:%S", + ) + .unwrap() + .and_local_timezone( + FixedOffset::east_opt(Local::now().offset().local_minus_utc()) + .unwrap(), + ) + .unwrap(), + ))), + ), + ( + END.to_owned(), + Some(AllowedTypes::DateTime(DateTime::::from( + NaiveDateTime::parse_from_str( + "2023-11-07T11:39:09", + "%Y-%m-%dT%H:%M:%S", + ) + .unwrap() + .and_local_timezone( + FixedOffset::east_opt(Local::now().offset().local_minus_utc()) + .unwrap(), + ) + .unwrap(), + ))), + ), + ( + STATE.to_owned(), + Some(AllowedTypes::String("CANCELLED by 1000".to_owned())), + ), + ]), + ), + ]); + + let parsed_sacct_rows = parse_sacct_rows(sacct_rows, &keys).unwrap(); + + let expected = vec![Job::from([ + ( + "JobID".to_owned(), + AllowedTypes::String("1234567".to_owned()), + ), + ( + START.to_owned(), + AllowedTypes::DateTime(DateTime::::from( + NaiveDateTime::parse_from_str("2023-11-07T10:14:01", "%Y-%m-%dT%H:%M:%S") + .unwrap() + .and_local_timezone( + FixedOffset::east_opt(Local::now().offset().local_minus_utc()).unwrap(), + ) + .unwrap(), + )), + ), + ( + END.to_owned(), + AllowedTypes::DateTime(DateTime::::from( + NaiveDateTime::parse_from_str("2023-11-07T11:39:09", "%Y-%m-%dT%H:%M:%S") + .unwrap() + .and_local_timezone( + FixedOffset::east_opt(Local::now().offset().local_minus_utc()).unwrap(), + ) + .unwrap(), + )), + ), + ( + STATE.to_owned(), + AllowedTypes::String("CANCELLED by 1000".to_owned()), + ), + ])]; + assert_eq!(parsed_sacct_rows, expected); + } + #[test] fn construct_components_empty_config_succeeds() { let job = Job::from([( diff --git a/media/website/content/_index.md b/media/website/content/_index.md index d32b7f613..5abb1a941 100644 --- a/media/website/content/_index.md +++ b/media/website/content/_index.md @@ -755,8 +755,13 @@ summary_fields: GlobalUserName: !MetaField name: subject VO: !MetaField - name: voms - regex: (?<=%2F).*?\S(?=%2F) + name: user + function: + name: vo_mapping + parameters: + atlpr: atlas + atlsg: ops + ops: ops VOGroup: !MetaField name: voms regex: (?=%2F).*?\S(?=%2F) @@ -870,7 +875,7 @@ Different field types are available, depending on the source of the value that i `ComponentField` extracts the value from a `component` in the AUDITOR record. The mandatory parameter of this field is `name`, which gives the name of the component in the AUDITOR record. If the value needs to be modified, e.g. if it has another unit than the one expected by APEL, the optional parameter `divide_by` has to be used. -`MetaField` extracts the value from the `meta` information in the AUDITOR record. The mandatory parameter of this field is `name`, which gives the name of the component in the AUDITOR record. If the value needs to be modified, one of the optional parameters `regex` or `function` can be used. `regex` takes a regular expression, searches the value for this expression, and returns the complete match, `function` takes the name of a function and applies it to the value. The function has to be present in `config.py` and can be added via a pull request. +`MetaField` extracts the value from the `meta` information in the AUDITOR record. The mandatory parameter of this field is `name`, which gives the name of the component in the AUDITOR record. If the value needs to be modified, one of the optional parameters `regex` or `function` can be used. `regex` takes a regular expression, searches the value for this expression, and returns the complete match, `function` has the parameters `name` and `parameters`, where the latter is optional and can be used to provide additional parameters to the function. If you want to manipulate the value of the `Metafield` with a custom function, it has to be present in `utility.py` and can be added via a pull request. `ConstantField` has the mandatory parameter `value`, which is exactly what will be written in the message field. diff --git a/plugins/apel/pyproject.toml b/plugins/apel/pyproject.toml index 9868f1770..9a4cc1fd9 100644 --- a/plugins/apel/pyproject.toml +++ b/plugins/apel/pyproject.toml @@ -1,5 +1,5 @@ [build-system] -requires = ["setuptools==75.4.0"] +requires = ["setuptools==75.6.0"] build-backend = "setuptools.build_meta" [project] @@ -9,9 +9,9 @@ requires-python = ">=3.9" dependencies = [ "python-auditor==0.6.3", "requests==2.32.3", - "cryptography==43.0.3", + "cryptography==44.0.0", "pyyaml==6.0.2", - "pydantic==2.9.2", + "pydantic==2.10.3", ] description = "AUDITOR plugin for sending accounting data to APEL" readme = "README.md" @@ -19,10 +19,10 @@ readme = "README.md" [project.optional-dependencies] style = [ "black==24.10.0", - "ruff==0.7.3", + "ruff==0.8.2", ] tests = [ - "pytest==8.3.3", + "pytest==8.3.4", "pytest-cov==6.0.0", "mypy==1.13.0", "types-pyyaml==6.0.12.20240917", @@ -30,7 +30,7 @@ tests = [ ] build = [ "build==1.2.2.post1", - "setuptools==75.4.0", + "setuptools==75.6.0", ] [project.scripts] diff --git a/plugins/apel/src/auditor_apel_plugin/config.py b/plugins/apel/src/auditor_apel_plugin/config.py index f8f311158..6b646bbf4 100644 --- a/plugins/apel/src/auditor_apel_plugin/config.py +++ b/plugins/apel/src/auditor_apel_plugin/config.py @@ -7,12 +7,14 @@ import re from datetime import datetime from enum import Enum -from typing import Callable, Dict, List, Optional, Union +from typing import Any, Callable, Dict, List, Optional, Union import yaml from pyauditor import Record from pydantic import BaseModel +from .utility import vo_mapping + logger = logging.getLogger("apel_plugin") @@ -28,6 +30,11 @@ def from_yaml(cls, loader: yaml.SafeLoader, node: yaml.nodes.MappingNode): return cls(**loader.construct_mapping(node, deep=True)) +class Function(Configurable): + name: str + parameters: Any = None + + class PluginConfig(Configurable): log_level: str time_json_path: str @@ -94,10 +101,12 @@ def get_value(self, record: Record) -> int: class MetaField(Field): name: str regex: Optional[str] = None - function: Optional[str] = None + function: Optional[Function] = None def get_value(self, record: Record) -> Union[str, int, float]: - function_dict: Dict[str, Callable[[str], Union[str, int, float]]] = {} + function_dict: Dict[str, Callable[[str, Any], Union[str, int, float]]] = { + "vo_mapping": vo_mapping + } try: value = record.meta.get(self.name)[0] @@ -116,14 +125,14 @@ def get_value(self, record: Record) -> Union[str, int, float]: return "None" elif self.function is not None: try: - function = function_dict[self.function] + function = function_dict[self.function.name] except KeyError: logger.critical( - f"Function {self.function} not found in dictionary of allowed " + f"Function {self.function.name} not found in dictionary of allowed " "functions" ) raise - value = function(value) + value = function(value, self.function.parameters) return value return value diff --git a/plugins/apel/src/auditor_apel_plugin/utility.py b/plugins/apel/src/auditor_apel_plugin/utility.py index 8d7294b73..235c8bb52 100644 --- a/plugins/apel/src/auditor_apel_plugin/utility.py +++ b/plugins/apel/src/auditor_apel_plugin/utility.py @@ -1,6 +1,9 @@ +import logging import pathlib from contextlib import contextmanager -from typing import IO, ContextManager, Literal, overload +from typing import IO, ContextManager, Dict, Literal, overload + +logger = logging.getLogger("apel_plugin") @overload @@ -29,3 +32,13 @@ def write_transaction( raise else: tmp_path.rename(path) + + +def vo_mapping(user: str, vo_dict: Dict[str, str]) -> str: + for k, v in vo_dict.items(): + if user.startswith(k): + return v + + logger.warning(f"No VO for user {user} found, will use None") + + return "None" diff --git a/plugins/apel/tests/test_config.py b/plugins/apel/tests/test_config.py index 93c29a399..f52b1dc8e 100644 --- a/plugins/apel/tests/test_config.py +++ b/plugins/apel/tests/test_config.py @@ -11,6 +11,7 @@ Config, ConstantField, Field, + Function, MessageType, MetaField, NormalisedField, @@ -67,7 +68,7 @@ def test_plugin_config(self): value = all_fields["VO"].name - assert value == "voms" + assert value == "user" value = all_fields["CpuDuration"].name @@ -131,7 +132,9 @@ def test_get_value_meta_field_fail(self): assert value == "None" - meta_field = MetaField(name="meta_test", function="missing_function") + meta_field = MetaField( + name="meta_test", function=Function(name="missing_function") + ) with pytest.raises(Exception) as pytest_error: meta_field.get_value(record) @@ -336,3 +339,53 @@ def test_loaders(self): value = config.name assert value == "test_field" + + def test_vo_mapping(self): + record = pyauditor.Record( + "record_id", + datetime(1984, 3, 3, 0, 0, 0).astimezone(tz=timezone.utc), + ) + + vo_dict = {"atlpr": "atlas", "atlsg": "ops", "ops": "ops"} + + meta = pyauditor.Meta() + meta.insert("user", ["atlpr000"]) + record.with_meta(meta) + + meta_field = MetaField( + name="user", function=Function(name="vo_mapping", parameters=vo_dict) + ) + value = meta_field.get_value(record) + + assert value == "atlas" + + with open(Path.joinpath(test_dir, "test_config.yml"), "r") as f: + config: Config = yaml.load(f, Loader=get_loaders()) + + value = config.summary_fields.optional["VO"].get_value(record) + + assert value == "atlas" + + meta = pyauditor.Meta() + meta.insert("user", ["atlsg000"]) + record.with_meta(meta) + + value = config.summary_fields.optional["VO"].get_value(record) + + assert value == "ops" + + meta = pyauditor.Meta() + meta.insert("user", ["ops000"]) + record.with_meta(meta) + + value = config.summary_fields.optional["VO"].get_value(record) + + assert value == "ops" + + meta = pyauditor.Meta() + meta.insert("user", ["ilc000"]) + record.with_meta(meta) + + value = config.summary_fields.optional["VO"].get_value(record) + + assert value == "None" diff --git a/plugins/apel/tests/test_config.yml b/plugins/apel/tests/test_config.yml index 608746ce6..cd34cdc79 100644 --- a/plugins/apel/tests/test_config.yml +++ b/plugins/apel/tests/test_config.yml @@ -45,8 +45,13 @@ summary_fields: GlobalUserName: !MetaField name: subject VO: !MetaField - name: voms - regex: (?<=%2F).*?\S(?=%2F) + name: user + function: + name: vo_mapping + parameters: + atlpr: atlas + atlsg: ops + ops: ops VOGroup: !MetaField name: voms regex: (?=%2F).*?\S(?=%2F) diff --git a/plugins/apel/tests/test_utility.py b/plugins/apel/tests/test_utility.py index 185b31a9a..a3bf2965d 100644 --- a/plugins/apel/tests/test_utility.py +++ b/plugins/apel/tests/test_utility.py @@ -3,7 +3,7 @@ import pytest -from auditor_apel_plugin.utility import write_transaction +from auditor_apel_plugin.utility import vo_mapping, write_transaction CONTENT = [ "Hello World!", @@ -51,3 +51,28 @@ def test_write_transaction_failure( assert file_path.read_text() == initial raise failure assert file_path.read_text() == final + + +class TestUtility: + def test_vo_mapping(self): + vo_dict = {"atlpr": "atlas", "atlsg": "ops", "ops": "ops"} + + user = "atlpr000" + value = vo_mapping(user, vo_dict) + + assert value == "atlas" + + user = "atlsg001" + value = vo_mapping(user, vo_dict) + + assert value == "ops" + + user = "ops" + value = vo_mapping(user, vo_dict) + + assert value == "ops" + + user = "ilc002" + value = vo_mapping(user, vo_dict) + + assert value == "None" diff --git a/pyauditor/Cargo.toml b/pyauditor/Cargo.toml index 2aa25fd8a..109856976 100644 --- a/pyauditor/Cargo.toml +++ b/pyauditor/Cargo.toml @@ -29,8 +29,8 @@ anyhow.workspace = true auditor-client.workspace = true auditor.workspace = true chrono.workspace = true -pyo3-asyncio.workspace = true pyo3.workspace = true +pyo3-async-runtimes.workspace = true serde.workspace = true serde_json.workspace = true tokio.workspace = true diff --git a/pyauditor/src/blocking_client.rs b/pyauditor/src/blocking_client.rs index 925dff18a..8029b543a 100644 --- a/pyauditor/src/blocking_client.rs +++ b/pyauditor/src/blocking_client.rs @@ -62,7 +62,10 @@ impl AuditorClientBlocking { /// /// records = client.get_stopped_since(start_since) /// - fn get_started_since(self_: PyRef<'_, Self>, timestamp: &PyDateTime) -> PyResult> { + fn get_started_since( + self_: PyRef<'_, Self>, + timestamp: &Bound<'_, PyDateTime>, + ) -> PyResult> { let since: DateTime = timestamp.extract()?; let query_string = auditor_client::QueryBuilder::new() .with_start_time(auditor_client::Operator::default().gte(since.into())) @@ -99,7 +102,10 @@ impl AuditorClientBlocking { /// /// records = client.get_stopped_since(stop_since) /// - fn get_stopped_since(self_: PyRef<'_, Self>, timestamp: &PyDateTime) -> PyResult> { + fn get_stopped_since( + self_: PyRef<'_, Self>, + timestamp: &Bound<'_, PyDateTime>, + ) -> PyResult> { let since: DateTime = timestamp.extract()?; let query_string = auditor_client::QueryBuilder::new() .with_stop_time(auditor_client::Operator::default().gte(since.into())) diff --git a/pyauditor/src/builder.rs b/pyauditor/src/builder.rs index e58d684d7..c6bf0ca23 100644 --- a/pyauditor/src/builder.rs +++ b/pyauditor/src/builder.rs @@ -124,11 +124,11 @@ impl AuditorClientBuilder { } /// Build a ``QueuedAuditorClient`` from ``AuditorClientBuilder`` - pub fn build_queued<'a>(&'a self, py: Python<'a>) -> PyResult<&'a PyAny> { + pub fn build_queued<'a>(&'a self, py: Python<'a>) -> PyResult> { // Must clone here because `build` moves the builder, but python // does not allow that. Doesn't matter, Python is slow anyways. let builder = self.inner.clone(); - pyo3_asyncio::tokio::future_into_py(py, async move { + pyo3_async_runtimes::tokio::future_into_py(py, async move { Ok(QueuedAuditorClient { inner: builder .build_queued() diff --git a/pyauditor/src/client.rs b/pyauditor/src/client.rs index 3567035c5..c992a33d2 100644 --- a/pyauditor/src/client.rs +++ b/pyauditor/src/client.rs @@ -58,7 +58,7 @@ impl Value { /// /// value = Value.set_datetime(start_time) #[staticmethod] - fn set_datetime(datetime: &PyDateTime) -> Result { + fn set_datetime(datetime: &Bound<'_, PyDateTime>) -> Result { let date_time: DateTime = datetime.extract()?; Ok(Value { inner: auditor_client::Value::Datetime(auditor_client::DateTimeUtcWrapper(date_time)), @@ -606,9 +606,9 @@ pub struct AuditorClient { impl AuditorClient { /// health_check() /// Returns ``true`` if the Auditor instance is healthy, ``false`` otherwise - fn health_check<'a>(self_: PyRef<'a, Self>, py: Python<'a>) -> PyResult<&'a PyAny> { + fn health_check<'a>(self_: PyRef<'a, Self>, py: Python<'a>) -> PyResult> { let inner = self_.inner.clone(); - pyo3_asyncio::tokio::future_into_py(py, async move { + pyo3_async_runtimes::tokio::future_into_py(py, async move { Ok(inner.health_check().await) // Ok(Python::with_gil(|py| py.None())) }) @@ -616,9 +616,9 @@ impl AuditorClient { /// get() /// Gets all records from the Auditors database - fn get<'a>(self_: PyRef<'a, Self>, py: Python<'a>) -> PyResult<&'a PyAny> { + fn get<'a>(self_: PyRef<'a, Self>, py: Python<'a>) -> PyResult> { let inner = self_.inner.clone(); - pyo3_asyncio::tokio::future_into_py(py, async move { + pyo3_async_runtimes::tokio::future_into_py(py, async move { Ok(inner .get() .await @@ -654,11 +654,11 @@ impl AuditorClient { /// fn get_started_since<'a>( self_: PyRef<'a, Self>, - timestamp: &PyDateTime, + timestamp: &Bound<'_, PyDateTime>, py: Python<'a>, - ) -> PyResult<&'a PyAny> { - let message = py.get_type::(); - PyErr::warn(py, message, "get_started_since is depreciated", 0)?; + ) -> PyResult> { + let message = py.get_type_bound::(); + PyErr::warn_bound(py, &message, "get_started_since is depreciated", 0)?; let since: DateTime = timestamp.extract()?; let inner = self_.inner.clone(); @@ -666,7 +666,7 @@ impl AuditorClient { .with_start_time(auditor_client::Operator::default().gte(since.into())) .build(); - pyo3_asyncio::tokio::future_into_py(py, async move { + pyo3_async_runtimes::tokio::future_into_py(py, async move { Ok(inner .advanced_query(query_string.to_string()) .await @@ -702,18 +702,18 @@ impl AuditorClient { /// fn get_stopped_since<'a>( self_: PyRef<'a, Self>, - timestamp: &PyDateTime, + timestamp: &Bound<'_, PyDateTime>, py: Python<'a>, - ) -> PyResult<&'a PyAny> { - let message = py.get_type::(); - PyErr::warn(py, message, "get_stopped_since_since is depreciated", 0)?; + ) -> PyResult> { + let message = py.get_type_bound::(); + PyErr::warn_bound(py, &message, "get_stopped_since_since is depreciated", 0)?; let since: DateTime = timestamp.extract()?; let inner = self_.inner.clone(); let query_string = auditor_client::QueryBuilder::new() .with_stop_time(auditor_client::Operator::default().gte(since.into())) .build(); - pyo3_asyncio::tokio::future_into_py(py, async move { + pyo3_async_runtimes::tokio::future_into_py(py, async move { Ok(inner .advanced_query(query_string.to_string()) .await @@ -744,9 +744,9 @@ impl AuditorClient { self_: PyRef<'a, Self>, query_string: String, py: Python<'a>, - ) -> PyResult<&'a PyAny> { + ) -> PyResult> { let inner = self_.inner.clone(); - pyo3_asyncio::tokio::future_into_py(py, async move { + pyo3_async_runtimes::tokio::future_into_py(py, async move { Ok(inner .advanced_query(query_string) .await @@ -773,9 +773,9 @@ impl AuditorClient { self_: PyRef<'a, Self>, record_id: String, py: Python<'a>, - ) -> PyResult<&'a PyAny> { + ) -> PyResult> { let inner = self_.inner.clone(); - pyo3_asyncio::tokio::future_into_py(py, async move { + pyo3_async_runtimes::tokio::future_into_py(py, async move { inner .get_single_record(record_id) .await @@ -786,9 +786,9 @@ impl AuditorClient { /// add(record: Record) /// Push a record to the Auditor instance - fn add<'a>(&self, record: Record, py: Python<'a>) -> PyResult<&'a PyAny> { + fn add<'a>(&self, record: Record, py: Python<'a>) -> PyResult> { let inner = self.inner.clone(); - pyo3_asyncio::tokio::future_into_py(py, async move { + pyo3_async_runtimes::tokio::future_into_py(py, async move { inner .add(&auditor::domain::RecordAdd::try_from(record.inner)?) .await @@ -798,7 +798,7 @@ impl AuditorClient { /// add(record: Record) /// Push a list of records to the Auditor instance - fn bulk_insert<'a>(&self, records: Vec, py: Python<'a>) -> PyResult<&'a PyAny> { + fn bulk_insert<'a>(&self, records: Vec, py: Python<'a>) -> PyResult> { let inner = self.inner.clone(); let bulk_insert_records: Result, anyhow::Error> = records @@ -806,7 +806,7 @@ impl AuditorClient { .map(|r| auditor::domain::RecordAdd::try_from(r.inner)) .collect(); - pyo3_asyncio::tokio::future_into_py(py, async move { + pyo3_async_runtimes::tokio::future_into_py(py, async move { let bul = bulk_insert_records?; inner .bulk_insert(&bul) @@ -817,9 +817,9 @@ impl AuditorClient { /// update(record: Record) /// Update an existing record in the Auditor instance - fn update<'a>(&self, record: Record, py: Python<'a>) -> PyResult<&'a PyAny> { + fn update<'a>(&self, record: Record, py: Python<'a>) -> PyResult> { let inner = self.inner.clone(); - pyo3_asyncio::tokio::future_into_py(py, async move { + pyo3_async_runtimes::tokio::future_into_py(py, async move { inner .update(&auditor::domain::RecordUpdate::try_from(record.inner)?) .await diff --git a/pyauditor/src/domain/record.rs b/pyauditor/src/domain/record.rs index 59e7fc5d6..9568462dc 100644 --- a/pyauditor/src/domain/record.rs +++ b/pyauditor/src/domain/record.rs @@ -56,7 +56,7 @@ pub struct Record { #[pymethods] impl Record { #[new] - fn new(record_id: String, start_time: &PyDateTime) -> Result { + fn new(record_id: String, start_time: &Bound<'_, PyDateTime>) -> Result { let start_time: DateTime = start_time.extract()?; Ok(Record { inner: auditor::domain::Record { @@ -106,7 +106,7 @@ impl Record { /// :type stop_time: datetime.datetime fn with_stop_time<'a>( mut self_: PyRefMut<'a, Self>, - stop_time: &'a PyDateTime, + stop_time: &Bound<'a, PyDateTime>, ) -> Result, Error> { let stop_time: DateTime = stop_time.extract()?; self_.inner.stop_time = Some(stop_time); diff --git a/pyauditor/src/lib.rs b/pyauditor/src/lib.rs index 9d69367fc..ac8647553 100644 --- a/pyauditor/src/lib.rs +++ b/pyauditor/src/lib.rs @@ -17,7 +17,7 @@ mod queued_client; /// pyauditor is a client for interacting with an Auditor instance via Python. #[pymodule] -fn pyauditor(_py: Python, m: &PyModule) -> PyResult<()> { +fn pyauditor(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/pyauditor/src/queued_client.rs b/pyauditor/src/queued_client.rs index 30976b312..5ded4ef64 100644 --- a/pyauditor/src/queued_client.rs +++ b/pyauditor/src/queued_client.rs @@ -40,9 +40,9 @@ pub struct QueuedAuditorClient { impl QueuedAuditorClient { /// health_check() /// Returns ``true`` if the Auditor instance is healthy, ``false`` otherwise - fn health_check<'a>(self_: PyRef<'a, Self>, py: Python<'a>) -> PyResult<&'a PyAny> { + fn health_check<'a>(self_: PyRef<'a, Self>, py: Python<'a>) -> PyResult> { let inner = self_.inner.clone(); - pyo3_asyncio::tokio::future_into_py(py, async move { + pyo3_async_runtimes::tokio::future_into_py(py, async move { Ok(inner.health_check().await) // Ok(Python::with_gil(|py| py.None())) }) @@ -50,9 +50,9 @@ impl QueuedAuditorClient { /// get() /// Gets all records from the Auditors database - fn get<'a>(self_: PyRef<'a, Self>, py: Python<'a>) -> PyResult<&'a PyAny> { + fn get<'a>(self_: PyRef<'a, Self>, py: Python<'a>) -> PyResult> { let inner = self_.inner.clone(); - pyo3_asyncio::tokio::future_into_py(py, async move { + pyo3_async_runtimes::tokio::future_into_py(py, async move { Ok(inner .get() .await @@ -88,11 +88,11 @@ impl QueuedAuditorClient { /// fn get_started_since<'a>( self_: PyRef<'a, Self>, - timestamp: &PyDateTime, + timestamp: &Bound<'_, PyDateTime>, py: Python<'a>, - ) -> PyResult<&'a PyAny> { - let message = py.get_type::(); - PyErr::warn(py, message, "get_started_since is depreciated", 0)?; + ) -> PyResult> { + let message = py.get_type_bound::(); + PyErr::warn_bound(py, &message, "get_started_since is depreciated", 0)?; let since: DateTime = timestamp.extract()?; let inner = self_.inner.clone(); @@ -100,7 +100,7 @@ impl QueuedAuditorClient { .with_start_time(auditor_client::Operator::default().gte(since.into())) .build(); - pyo3_asyncio::tokio::future_into_py(py, async move { + pyo3_async_runtimes::tokio::future_into_py(py, async move { Ok(inner .advanced_query(query_string.to_string()) .await @@ -136,18 +136,18 @@ impl QueuedAuditorClient { /// fn get_stopped_since<'a>( self_: PyRef<'a, Self>, - timestamp: &PyDateTime, + timestamp: &Bound<'_, PyDateTime>, py: Python<'a>, - ) -> PyResult<&'a PyAny> { - let message = py.get_type::(); - PyErr::warn(py, message, "get_stopped_since_since is depreciated", 0)?; + ) -> PyResult> { + let message = py.get_type_bound::(); + PyErr::warn_bound(py, &message, "get_stopped_since_since is depreciated", 0)?; let since: DateTime = timestamp.extract()?; let inner = self_.inner.clone(); let query_string = auditor_client::QueryBuilder::new() .with_stop_time(auditor_client::Operator::default().gte(since.into())) .build(); - pyo3_asyncio::tokio::future_into_py(py, async move { + pyo3_async_runtimes::tokio::future_into_py(py, async move { Ok(inner .advanced_query(query_string.to_string()) .await @@ -178,9 +178,9 @@ impl QueuedAuditorClient { self_: PyRef<'a, Self>, query_string: String, py: Python<'a>, - ) -> PyResult<&'a PyAny> { + ) -> PyResult> { let inner = self_.inner.clone(); - pyo3_asyncio::tokio::future_into_py(py, async move { + pyo3_async_runtimes::tokio::future_into_py(py, async move { Ok(inner .advanced_query(query_string) .await @@ -207,9 +207,9 @@ impl QueuedAuditorClient { self_: PyRef<'a, Self>, record_id: String, py: Python<'a>, - ) -> PyResult<&'a PyAny> { + ) -> PyResult> { let inner = self_.inner.clone(); - pyo3_asyncio::tokio::future_into_py(py, async move { + pyo3_async_runtimes::tokio::future_into_py(py, async move { inner .get_single_record(record_id) .await @@ -220,9 +220,9 @@ impl QueuedAuditorClient { /// add(record: Record) /// Push a record to the Auditor instance - fn add<'a>(&self, record: Record, py: Python<'a>) -> PyResult<&'a PyAny> { + fn add<'a>(&self, record: Record, py: Python<'a>) -> PyResult> { let inner = self.inner.clone(); - pyo3_asyncio::tokio::future_into_py(py, async move { + pyo3_async_runtimes::tokio::future_into_py(py, async move { inner .add(&auditor::domain::RecordAdd::try_from(record.inner)?) .await @@ -232,7 +232,7 @@ impl QueuedAuditorClient { /// add(record: Record) /// Push a list of records to the Auditor instance - fn bulk_insert<'a>(&self, records: Vec, py: Python<'a>) -> PyResult<&'a PyAny> { + fn bulk_insert<'a>(&self, records: Vec, py: Python<'a>) -> PyResult> { let inner = self.inner.clone(); let bulk_insert_records: Result, anyhow::Error> = records @@ -240,7 +240,7 @@ impl QueuedAuditorClient { .map(|r| auditor::domain::RecordAdd::try_from(r.inner)) .collect(); - pyo3_asyncio::tokio::future_into_py(py, async move { + pyo3_async_runtimes::tokio::future_into_py(py, async move { let bul = bulk_insert_records?; inner .bulk_insert(&bul) @@ -251,9 +251,9 @@ impl QueuedAuditorClient { /// update(record: Record) /// Update an existing record in the Auditor instance - fn update<'a>(&self, record: Record, py: Python<'a>) -> PyResult<&'a PyAny> { + fn update<'a>(&self, record: Record, py: Python<'a>) -> PyResult> { let inner = self.inner.clone(); - pyo3_asyncio::tokio::future_into_py(py, async move { + pyo3_async_runtimes::tokio::future_into_py(py, async move { inner .update(&auditor::domain::RecordUpdate::try_from(record.inner)?) .await @@ -262,9 +262,9 @@ impl QueuedAuditorClient { } /// Stops the background sync task - fn stop<'a>(&self, py: Python<'a>) -> PyResult<&'a PyAny> { + fn stop<'a>(&self, py: Python<'a>) -> PyResult> { let mut inner = self.inner.clone(); - pyo3_asyncio::tokio::future_into_py(py, async move { + pyo3_async_runtimes::tokio::future_into_py(py, async move { inner .stop() .await