From 60d5e133d9c57fb0c9fe8c2875f083b2f4df3885 Mon Sep 17 00:00:00 2001 From: ElysaSrc <101974839+ElysaSrc@users.noreply.github.com> Date: Wed, 4 Sep 2024 12:25:53 +0200 Subject: [PATCH 1/2] osrdyne: support keda autoscaling for k8s driver Signed-off-by: ElysaSrc <101974839+ElysaSrc@users.noreply.github.com> --- osrdyne/Cargo.lock | 348 ++++++++++++++++--------- osrdyne/Cargo.toml | 6 +- osrdyne/src/drivers/docker.rs | 1 + osrdyne/src/drivers/kubernetes.rs | 235 ++++++++++++++--- osrdyne/src/drivers/kubernetes/keda.rs | 49 ++++ osrdyne/src/drivers/noop.rs | 1 + osrdyne/src/drivers/process_compose.rs | 1 + osrdyne/src/drivers/worker_driver.rs | 1 + osrdyne/src/pool.rs | 11 +- 9 files changed, 483 insertions(+), 170 deletions(-) create mode 100644 osrdyne/src/drivers/kubernetes/keda.rs diff --git a/osrdyne/Cargo.lock b/osrdyne/Cargo.lock index 813d4c1996c..901f2e24887 100644 --- a/osrdyne/Cargo.lock +++ b/osrdyne/Cargo.lock @@ -4,9 +4,9 @@ version = 3 [[package]] name = "addr2line" -version = "0.24.1" +version = "0.24.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f5fb1d8e4442bd405fdfd1dacb42792696b0cf9cb15882e5d097b742a676d375" +checksum = "dfbe277e56a376000877090da837660b4427aad530e3028d44e0bffe4f89a1c1" dependencies = [ "gimli", ] @@ -596,9 +596,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.1.24" +version = "1.1.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "812acba72f0a070b003d3697490d2b55b837230ae7c6c6497f05cc2ddbb8d938" +checksum = "2e80e3b6a3ab07840e1cae9b0666a63970dc28e8ed5ffbcdacbfc760c281bfc1" dependencies = [ "shlex", ] @@ -712,6 +712,41 @@ dependencies = [ "typenum", ] +[[package]] +name = "darling" +version = "0.20.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f63b86c8a8826a49b8c21f08a2d07338eec8d900540f8630dc76284be802989" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.20.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95133861a8032aaea082871032f5815eb9e98cef03fa916ab4500513994df9e5" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim", + "syn", +] + +[[package]] +name = "darling_macro" +version = "0.20.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806" +dependencies = [ + "darling_core", + "quote", + "syn", +] + [[package]] name = "data-encoding" version = "2.6.0" @@ -803,6 +838,12 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" +[[package]] +name = "dyn-clone" +version = "1.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d6ef0072f8a535281e4876be788938b528e9a1d43900b82c2569af7da799125" + [[package]] name = "either" version = "1.13.0" @@ -947,13 +988,12 @@ dependencies = [ [[package]] name = "futures" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" dependencies = [ "futures-channel", "futures-core", - "futures-executor", "futures-io", "futures-sink", "futures-task", @@ -962,9 +1002,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" dependencies = [ "futures-core", "futures-sink", @@ -972,26 +1012,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" - -[[package]] -name = "futures-executor" -version = "0.3.30" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" -dependencies = [ - "futures-core", - "futures-task", - "futures-util", -] +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" [[package]] name = "futures-io" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" [[package]] name = "futures-lite" @@ -1023,9 +1052,9 @@ dependencies = [ [[package]] name = "futures-macro" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", @@ -1034,21 +1063,21 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" [[package]] name = "futures-task" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" [[package]] name = "futures-util" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ "futures-channel", "futures-core", @@ -1085,9 +1114,9 @@ dependencies = [ [[package]] name = "gimli" -version = "0.31.0" +version = "0.31.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32085ea23f3234fc7846555e85283ba4de91e21016dc0455a16286d87a292d64" +checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" [[package]] name = "hashbrown" @@ -1101,6 +1130,30 @@ version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e087f84d4f86bf4b218b927129862374b72199ae7d8657835f1e89000eea4fb" +[[package]] +name = "headers" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "322106e6bd0cba2d5ead589ddb8150a13d7c4217cf80d7c4f682ca994ccc6aa9" +dependencies = [ + "base64 0.21.7", + "bytes", + "headers-core", + "http", + "httpdate", + "mime", + "sha1", +] + +[[package]] +name = "headers-core" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54b4a22553d4242c49fddb9ba998a99962b5cc6f22cb5a3482bec22522403ce4" +dependencies = [ + "http", +] + [[package]] name = "hermit-abi" version = "0.3.9" @@ -1210,37 +1263,38 @@ dependencies = [ ] [[package]] -name = "hyper-named-pipe" -version = "0.1.0" +name = "hyper-http-proxy" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73b7d8abf35697b81a825e386fc151e0d503e8cb5fcb93cc8669c376dfd6f278" +checksum = "5d06dbdfbacf34d996c6fb540a71a684a7aae9056c71951163af8a8a4c07b9a4" dependencies = [ - "hex", + "bytes", + "futures-util", + "headers", + "http", "hyper", + "hyper-rustls", "hyper-util", "pin-project-lite", + "rustls-native-certs 0.7.3", "tokio", + "tokio-rustls", "tower-service", - "winapi", ] [[package]] -name = "hyper-rustls" -version = "0.26.0" +name = "hyper-named-pipe" +version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a0bea761b46ae2b24eb4aef630d8d1c398157b6fc29e6350ecf090a0b70c952c" +checksum = "73b7d8abf35697b81a825e386fc151e0d503e8cb5fcb93cc8669c376dfd6f278" dependencies = [ - "futures-util", - "http", + "hex", "hyper", "hyper-util", - "log", - "rustls 0.22.4", - "rustls-native-certs", - "rustls-pki-types", + "pin-project-lite", "tokio", - "tokio-rustls 0.25.0", "tower-service", + "winapi", ] [[package]] @@ -1253,10 +1307,12 @@ dependencies = [ "http", "hyper", "hyper-util", - "rustls 0.23.13", + "log", + "rustls", + "rustls-native-certs 0.8.0", "rustls-pki-types", "tokio", - "tokio-rustls 0.26.0", + "tokio-rustls", "tower-service", "webpki-roots", ] @@ -1331,6 +1387,12 @@ dependencies = [ "cc", ] +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + [[package]] name = "idna" version = "0.5.0" @@ -1415,9 +1477,9 @@ dependencies = [ [[package]] name = "ipnet" -version = "2.10.0" +version = "2.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "187674a687eed5fe42285b40c6291f9a01517d415fad1c3cbc6a9f778af7fcd4" +checksum = "ddc24109865250148c2e0f3d25d4f0f479571723792d3802153c60922a4fb708" [[package]] name = "is_terminal_polyfill" @@ -1457,11 +1519,11 @@ dependencies = [ [[package]] name = "k8s-openapi" -version = "0.21.1" +version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "550f99d93aa4c2b25de527bce492d772caf5e21d7ac9bd4b508ba781c8d91e30" +checksum = "19501afb943ae5806548bc3ebd7f3374153ca057a38f480ef30adfde5ef09755" dependencies = [ - "base64 0.21.7", + "base64 0.22.1", "chrono", "serde", "serde-value", @@ -1470,20 +1532,21 @@ dependencies = [ [[package]] name = "kube" -version = "0.89.0" +version = "0.93.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "92cd10d00ad38b2f72a5223cd8f2827968466a5d32ae89672d2b0df06488c499" +checksum = "0365920075af1a2d23619c1ca801c492f2400157de42627f041a061716e76416" dependencies = [ "k8s-openapi", "kube-client", "kube-core", + "kube-derive", ] [[package]] name = "kube-client" -version = "0.89.0" +version = "0.93.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23b4ee4e409c9afb4e38a30802875acb108902387a41346bbc2fd8610df5f729" +checksum = "d81336eb3a5b10a40c97a5a97ad66622e92bad942ce05ee789edd730aa4f8603" dependencies = [ "base64 0.22.1", "bytes", @@ -1495,15 +1558,15 @@ dependencies = [ "http-body", "http-body-util", "hyper", - "hyper-rustls 0.26.0", + "hyper-http-proxy", + "hyper-rustls", "hyper-timeout", "hyper-util", "jsonpath-rust", "k8s-openapi", "kube-core", "pem", - "pin-project", - "rustls 0.22.4", + "rustls", "rustls-pemfile", "secrecy", "serde", @@ -1519,20 +1582,33 @@ dependencies = [ [[package]] name = "kube-core" -version = "0.89.0" +version = "0.93.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "beab9186726ed0c2420ff8a37b02f26dc62b3c33330ac60d0cc7605e1a6f6678" +checksum = "cce373a74d787d439063cdefab0f3672860bd7bac01a38e39019177e764a0fe6" dependencies = [ "chrono", "form_urlencoded", "http", "k8s-openapi", - "once_cell", + "schemars", "serde", "serde_json", "thiserror", ] +[[package]] +name = "kube-derive" +version = "0.93.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04a26c9844791e127329be5dce9298b03f9e2ff5939076d5438c92dea5eb78f2" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "serde_json", + "syn", +] + [[package]] name = "lapin" version = "2.5.0" @@ -1686,9 +1762,9 @@ dependencies = [ [[package]] name = "object" -version = "0.36.4" +version = "0.36.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "084f1a5821ac4c651660a94a7153d27ac9d8a53736203f58b31945ded098070a" +checksum = "aedf0a2d09c573ed1d8d85b30c119153926a2b36dce0ab28322c09a117a4683e" dependencies = [ "memchr", ] @@ -1704,12 +1780,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.20.1" +version = "1.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "82881c4be219ab5faaf2ad5e5e5ecdff8c66bd7402ca3160975c93b24961afd1" -dependencies = [ - "portable-atomic", -] +checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775" [[package]] name = "openssl-probe" @@ -1746,6 +1819,7 @@ dependencies = [ "percent-encoding", "priority-queue", "reqwest", + "schemars", "serde", "serde_json", "smallvec", @@ -1912,18 +1986,18 @@ dependencies = [ [[package]] name = "pin-project" -version = "1.1.5" +version = "1.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6bf43b791c5b9e34c3d182969b4abb522f9343702850a2e57f460d00d09b4b3" +checksum = "baf123a161dde1e524adf36f90bc5d8d3462824a9c43553ad07a8183161189ec" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.1.5" +version = "1.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" +checksum = "a4502d8515ca9f32f1fb543d987f63d95a14934883db45bdb48060b6b69257f8" dependencies = [ "proc-macro2", "quote", @@ -2026,12 +2100,6 @@ dependencies = [ "windows-sys 0.59.0", ] -[[package]] -name = "portable-atomic" -version = "1.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc9c68a3f6da06753e9335d63e27f6b9754dd1920d941135b7ea8224f141adb2" - [[package]] name = "powerfmt" version = "0.2.0" @@ -2060,9 +2128,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.86" +version = "1.0.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e719e8df665df0d1c8fbfd238015744736151d4445ec0836b8e628aae103b77" +checksum = "b3e4daa0dcf6feba26f985457cdf104d4b4256fc5a09547140f3631bb076b19a" dependencies = [ "unicode-ident", ] @@ -2091,7 +2159,7 @@ dependencies = [ "quinn-proto", "quinn-udp", "rustc-hash", - "rustls 0.23.13", + "rustls", "socket2 0.5.7", "thiserror", "tokio", @@ -2108,7 +2176,7 @@ dependencies = [ "rand", "ring", "rustc-hash", - "rustls 0.23.13", + "rustls", "slab", "thiserror", "tinyvec", @@ -2248,7 +2316,7 @@ dependencies = [ "http-body", "http-body-util", "hyper", - "hyper-rustls 0.27.3", + "hyper-rustls", "hyper-util", "ipnet", "js-sys", @@ -2258,7 +2326,7 @@ dependencies = [ "percent-encoding", "pin-project-lite", "quinn", - "rustls 0.23.13", + "rustls", "rustls-pemfile", "rustls-pki-types", "serde", @@ -2266,7 +2334,7 @@ dependencies = [ "serde_urlencoded", "sync_wrapper 1.0.1", "tokio", - "tokio-rustls 0.26.0", + "tokio-rustls", "tower-service", "url", "wasm-bindgen", @@ -2341,24 +2409,11 @@ dependencies = [ [[package]] name = "rustls" -version = "0.22.4" +version = "0.23.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf4ef73721ac7bcd79b2b315da7779d8fc09718c6b3d2d1b2d94850eb8c18432" +checksum = "415d9944693cb90382053259f89fbb077ea730ad7273047ec63b19bc9b160ba8" dependencies = [ "log", - "ring", - "rustls-pki-types", - "rustls-webpki", - "subtle", - "zeroize", -] - -[[package]] -name = "rustls" -version = "0.23.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2dabaac7466917e566adb06783a81ca48944c6898a1b08b9374106dd671f4c8" -dependencies = [ "once_cell", "ring", "rustls-pki-types", @@ -2374,8 +2429,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a980454b497c439c274f2feae2523ed8138bbd3d323684e1435fec62f800481" dependencies = [ "log", - "rustls 0.23.13", - "rustls-native-certs", + "rustls", + "rustls-native-certs 0.7.3", "rustls-pki-types", "rustls-webpki", ] @@ -2393,6 +2448,19 @@ dependencies = [ "security-framework", ] +[[package]] +name = "rustls-native-certs" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcaf18a4f2be7326cd874a5fa579fae794320a0f388d365dca7e480e55f83f8a" +dependencies = [ + "openssl-probe", + "rustls-pemfile", + "rustls-pki-types", + "schannel", + "security-framework", +] + [[package]] name = "rustls-pemfile" version = "2.2.0" @@ -2442,13 +2510,37 @@ dependencies = [ [[package]] name = "schannel" -version = "0.1.24" +version = "0.1.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e9aaafd5a2b6e3d657ff009d82fbd630b6bd54dd4eb06f21693925cdf80f9b8b" +checksum = "01227be5826fa0690321a2ba6c5cd57a19cf3f6a09e76973b58e61de6ab9d1c1" dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "schemars" +version = "0.8.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09c024468a378b7e36765cd36702b7a90cc3cba11654f6685c8f233408e89e92" +dependencies = [ + "dyn-clone", + "schemars_derive", + "serde", + "serde_json", +] + +[[package]] +name = "schemars_derive" +version = "0.8.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1eee588578aff73f856ab961cd2f79e36bc45d7ded33a7562adba4667aecc0e" +dependencies = [ + "proc-macro2", + "quote", + "serde_derive_internals", + "syn", +] + [[package]] name = "scopeguard" version = "1.2.0" @@ -2529,6 +2621,17 @@ dependencies = [ "syn", ] +[[package]] +name = "serde_derive_internals" +version = "0.29.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "serde_html_form" version = "0.2.6" @@ -2589,9 +2692,9 @@ dependencies = [ [[package]] name = "serde_with" -version = "3.10.0" +version = "3.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9720086b3357bcb44fce40117d769a4d068c70ecfa190850a980a71755f66fcc" +checksum = "8e28bdad6db2b8340e449f7108f020b3b092e8583a9e3fb82713e1d4e71fe817" dependencies = [ "base64 0.22.1", "chrono", @@ -2721,6 +2824,12 @@ dependencies = [ "der", ] +[[package]] +name = "strsim" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" + [[package]] name = "subtle" version = "2.6.1" @@ -2877,24 +2986,13 @@ dependencies = [ "syn", ] -[[package]] -name = "tokio-rustls" -version = "0.25.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f" -dependencies = [ - "rustls 0.22.4", - "rustls-pki-types", - "tokio", -] - [[package]] name = "tokio-rustls" version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" dependencies = [ - "rustls 0.23.13", + "rustls", "rustls-pki-types", "tokio", ] @@ -3201,9 +3299,9 @@ dependencies = [ [[package]] name = "webpki-roots" -version = "0.26.5" +version = "0.26.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0bd24728e5af82c6c4ec1b66ac4844bdf8156257fccda846ec58b42cd0cdbe6a" +checksum = "841c67bff177718f1d4dfefde8d8f0e78f9b6589319ba88312f567fc5841a958" dependencies = [ "rustls-pki-types", ] diff --git a/osrdyne/Cargo.toml b/osrdyne/Cargo.toml index f624f77bfe1..75e1541d291 100644 --- a/osrdyne/Cargo.toml +++ b/osrdyne/Cargo.toml @@ -15,9 +15,10 @@ figment = { version = "0.10.19", features = ["env", "yaml"] } futures-lite = "2.3.0" http = "1.1.0" im = "15.1.0" -k8s-openapi = { version = "0.21", features = ["v1_29"] } -kube = { version = "0.89", default-features = false, features = [ +k8s-openapi = { version = "0.22", features = ["v1_29"] } +kube = { version = "0.93", default-features = false, features = [ "client", + "derive", "rustls-tls", ] } lapin = "2.3.4" @@ -28,6 +29,7 @@ reqwest = { version = "0.12.4", default-features = false, features = [ "json", "rustls-tls", ] } +schemars = "0.8.21" serde = "1.0.203" serde_json = "1.0.117" smallvec = { version = "1.13.2", features = ["serde"] } diff --git a/osrdyne/src/drivers/docker.rs b/osrdyne/src/drivers/docker.rs index af09c8fdaec..67eb8079876 100644 --- a/osrdyne/src/drivers/docker.rs +++ b/osrdyne/src/drivers/docker.rs @@ -59,6 +59,7 @@ impl DockerDriver { impl WorkerDriver for DockerDriver { fn get_or_create_worker_group( &mut self, + _queue_name: String, worker_key: Key, ) -> Pin> + Send + '_>> { Box::pin(async move { diff --git a/osrdyne/src/drivers/kubernetes.rs b/osrdyne/src/drivers/kubernetes.rs index 1ba0d71af69..9340152ba1b 100644 --- a/osrdyne/src/drivers/kubernetes.rs +++ b/osrdyne/src/drivers/kubernetes.rs @@ -15,21 +15,72 @@ use k8s_openapi::{ }, apimachinery::pkg::apis::meta::v1::LabelSelector, }; +use keda::MetricType; use kube::{api::ObjectMeta, Client}; +use log::debug; use serde::{Deserialize, Serialize}; -use std::{collections::BTreeMap, future::Future, pin::Pin}; +use std::{ + collections::{BTreeMap, HashMap}, + future::Future, + pin::Pin, +}; use uuid::Uuid; +mod keda; + #[derive(Serialize, Deserialize, Clone, Debug)] -pub struct AutoscalingOptions { +pub struct HPAOptions { /// The minimum number of replicas pub min_replicas: i32, + /// The maximum number of replicas pub max_replicas: i32, + /// The target CPU utilization percentage pub target_cpu_utilization_percentage: i32, } +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct KedaOptions { + /// Polling Interval in seconds + pub polling_interval: i32, + + /// Cooldown Period in seconds + pub cooldown_period: i32, + + /// Min Replicas for the HPA of the ScaledObject + pub min_replicas: i32, + + /// Max Replicas for the HPA of the ScaledObject + pub max_replicas: i32, + + /// Amqp Host + pub amqp_host: String, + + /// Mode + pub mode: String, + + /// Value + pub value: String, + + /// Activation Value + pub activation_value: String, + + /// Use cached metrics + pub use_cached_metrics: bool, + + /// Metric Type + pub metric_type: MetricType, +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +#[serde(tag = "type")] +pub enum AutoscalingOptions { + Hpa(HPAOptions), + Keda(KedaOptions), + NoScaling, +} + #[derive(Serialize, Deserialize, Clone, Debug)] pub struct KubernetesDeploymentOptions { /// The default environment variables to set for the worker (passthrough to kubernetes deployment) @@ -102,9 +153,120 @@ impl KubernetesDriver { } } +impl KubernetesDriver { + async fn create_hpa_autoscaler( + &self, + worker_key: Key, + hpa: HPAOptions, + worker_deployment_name: String, + ) -> Result<(), super::worker_driver::DriverError> { + let hpa = HorizontalPodAutoscaler { + metadata: ObjectMeta { + name: Some(worker_deployment_name.clone()), + namespace: Some(self.options.namespace.clone()), + labels: Some({ + let mut labels = BTreeMap::new(); + labels.insert(LABEL_MANAGED_BY.to_owned(), MANAGED_BY_VALUE.to_owned()); + labels.insert(LABEL_WORKER_ID.to_owned(), worker_key.to_string()); + labels + }), + ..Default::default() + }, + spec: Some({ + k8s_openapi::api::autoscaling::v1::HorizontalPodAutoscalerSpec { + scale_target_ref: + k8s_openapi::api::autoscaling::v1::CrossVersionObjectReference { + api_version: Some("apps/v1".to_string()), + kind: "Deployment".to_string(), + name: worker_deployment_name.clone(), + }, + min_replicas: Some(hpa.min_replicas), + max_replicas: hpa.max_replicas, + target_cpu_utilization_percentage: Some(hpa.target_cpu_utilization_percentage), + } + }), + ..Default::default() + }; + + debug!("Creating HPA: {:?}", hpa); + + kube::api::Api::::namespaced( + self.client.clone(), + &self.options.namespace, + ) + .create(&kube::api::PostParams::default(), &hpa) + .await + .map_err(super::worker_driver::DriverError::KubernetesError)?; + + Ok(()) + } + + async fn create_keda_autoscaler( + &self, + worker_key: Key, + keda: KedaOptions, + queue_name: String, + worker_deployment_name: String, + ) -> Result<(), super::worker_driver::DriverError> { + let scaled_object = keda::ScaledObject { + metadata: ObjectMeta { + name: Some(worker_deployment_name.clone()), + namespace: Some(self.options.namespace.clone()), + labels: Some({ + let mut labels = BTreeMap::new(); + labels.insert(LABEL_MANAGED_BY.to_owned(), MANAGED_BY_VALUE.to_owned()); + labels.insert(LABEL_WORKER_ID.to_owned(), worker_key.to_string()); + labels + }), + ..Default::default() + }, + spec: keda::ScaledObjectSpec { + scale_target_ref: keda::ScaleTargetRef { + api_version: "apps/v1".to_string(), + kind: "Deployment".to_string(), + name: worker_deployment_name.clone(), + env_source_container_name: None, + }, + polling_interval: Some(keda.polling_interval), + cooldown_period: Some(keda.cooldown_period), + initial_cooldown_period: None, + idle_replica_count: None, + min_replica_count: Some(keda.min_replicas), + max_replicas_count: Some(keda.max_replicas), + triggers: vec![keda::Trigger { + type_: "rabbitmq".to_string(), + use_cached_metrics: keda.use_cached_metrics, + metric_type: keda.metric_type.clone(), + metadata: { + let mut metadata = HashMap::new(); + metadata.insert("host".to_string(), keda.amqp_host.clone()); + metadata.insert("protocol".to_string(), "amqp".to_string()); + metadata.insert("mode".to_string(), keda.mode.clone()); + metadata.insert("value".to_string(), keda.value.clone()); + metadata + .insert("activationValue".to_string(), keda.activation_value.clone()); + metadata.insert("queueName".to_string(), queue_name); + metadata + }, + }], + }, + }; + + debug!("Creating Keda ScaledObject: {:?}", scaled_object); + + kube::api::Api::namespaced(self.client.clone(), &self.options.namespace) + .create(&kube::api::PostParams::default(), &scaled_object) + .await + .map_err(super::worker_driver::DriverError::KubernetesError)?; + + Ok(()) + } +} + impl WorkerDriver for KubernetesDriver { fn get_or_create_worker_group( &mut self, + queue_name: String, worker_key: Key, ) -> Pin> + Send + '_>> { Box::pin(async move { @@ -143,6 +305,11 @@ impl WorkerDriver for KubernetesDriver { value: Some(self.amqp_uri.clone()), ..Default::default() }); + env.push(EnvVar { + name: "WORKEY_ID_USE_HOSTNAME".to_string(), + value: Some("1".to_string()), + ..Default::default() + }); env }; @@ -208,46 +375,7 @@ impl WorkerDriver for KubernetesDriver { ..Default::default() }; - // Create the autoscaler if needed - if let Some(autoscaling) = &self.options.autoscaling { - let hpa = HorizontalPodAutoscaler { - metadata: ObjectMeta { - name: Some(worker_deployment_name.clone()), - namespace: Some(self.options.namespace.clone()), - labels: Some({ - let mut labels = BTreeMap::new(); - labels.insert(LABEL_MANAGED_BY.to_owned(), MANAGED_BY_VALUE.to_owned()); - labels.insert(LABEL_WORKER_ID.to_owned(), worker_key.to_string()); - labels - }), - ..Default::default() - }, - spec: Some({ - k8s_openapi::api::autoscaling::v1::HorizontalPodAutoscalerSpec { - scale_target_ref: - k8s_openapi::api::autoscaling::v1::CrossVersionObjectReference { - api_version: Some("apps/v1".to_string()), - kind: "Deployment".to_string(), - name: worker_deployment_name.clone(), - }, - min_replicas: Some(autoscaling.min_replicas), - max_replicas: autoscaling.max_replicas, - target_cpu_utilization_percentage: Some( - autoscaling.target_cpu_utilization_percentage, - ), - } - }), - ..Default::default() - }; - - kube::api::Api::::namespaced( - self.client.clone(), - &self.options.namespace, - ) - .create(&kube::api::PostParams::default(), &hpa) - .await - .map_err(super::worker_driver::DriverError::KubernetesError)?; - } + debug!("Creating deployment: {:?}", deployment); // Create the deployment kube::api::Api::::namespaced(self.client.clone(), &self.options.namespace) @@ -255,6 +383,29 @@ impl WorkerDriver for KubernetesDriver { .await .map_err(super::worker_driver::DriverError::KubernetesError)?; + // Create the autoscaler if needed + match &self.options.autoscaling { + // Using HorizontalPodAutoscaler + Some(AutoscalingOptions::Hpa(hpa)) => { + self.create_hpa_autoscaler(worker_key, hpa.clone(), worker_deployment_name) + .await?; + } + + // Using Keda as the autoscaler + Some(AutoscalingOptions::Keda(keda)) => { + self.create_keda_autoscaler( + worker_key, + keda.clone(), + queue_name, + worker_deployment_name, + ) + .await?; + } + + // No autoscaler configured + Some(AutoscalingOptions::NoScaling) | None => {} + } + Ok(new_id) }) } diff --git a/osrdyne/src/drivers/kubernetes/keda.rs b/osrdyne/src/drivers/kubernetes/keda.rs new file mode 100644 index 00000000000..e41a9f08a3c --- /dev/null +++ b/osrdyne/src/drivers/kubernetes/keda.rs @@ -0,0 +1,49 @@ +use std::collections::HashMap; + +use kube::CustomResource; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +#[derive(CustomResource, Clone, Debug, Deserialize, Serialize, JsonSchema)] +#[serde(rename_all = "camelCase")] +#[kube( + group = "keda.sh", + version = "v1alpha1", + kind = "ScaledObject", + namespaced +)] +pub struct ScaledObjectSpec { + pub scale_target_ref: ScaleTargetRef, + pub polling_interval: Option, + pub cooldown_period: Option, + pub initial_cooldown_period: Option, + pub idle_replica_count: Option, + pub min_replica_count: Option, + pub max_replicas_count: Option, + pub triggers: Vec, +} + +#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct ScaleTargetRef { + pub api_version: String, + pub kind: String, + pub name: String, + pub env_source_container_name: Option, +} + +#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)] +pub enum MetricType { + AverageValue, + Value, + Utilization, +} + +#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct Trigger { + pub type_: String, + pub metadata: HashMap, + pub use_cached_metrics: bool, + pub metric_type: MetricType, +} diff --git a/osrdyne/src/drivers/noop.rs b/osrdyne/src/drivers/noop.rs index 27304f47d4c..759922dc5d2 100644 --- a/osrdyne/src/drivers/noop.rs +++ b/osrdyne/src/drivers/noop.rs @@ -19,6 +19,7 @@ impl NoopDriver { impl WorkerDriver for NoopDriver { fn get_or_create_worker_group( &mut self, + _queue_name: String, _worker_key: Key, ) -> Pin> + Send + '_>> { Box::pin(async move { Ok(self.fixed_pool_id) }) diff --git a/osrdyne/src/drivers/process_compose.rs b/osrdyne/src/drivers/process_compose.rs index 24579e37621..380a66bc385 100644 --- a/osrdyne/src/drivers/process_compose.rs +++ b/osrdyne/src/drivers/process_compose.rs @@ -104,6 +104,7 @@ pub struct PCDriver { impl WorkerDriver for PCDriver { fn get_or_create_worker_group( &mut self, + _queue_name: String, worker_key: Key, ) -> Pin> + Send + '_>> { Box::pin(async move { diff --git a/osrdyne/src/drivers/worker_driver.rs b/osrdyne/src/drivers/worker_driver.rs index 8eb2adafcdb..807a3b2ef95 100644 --- a/osrdyne/src/drivers/worker_driver.rs +++ b/osrdyne/src/drivers/worker_driver.rs @@ -46,6 +46,7 @@ pub trait WorkerDriver: Send { /// Returns the internal UUID of the worker. fn get_or_create_worker_group( &mut self, + queue_name: String, worker_key: Key, ) -> Pin> + Send + '_>>; diff --git a/osrdyne/src/pool.rs b/osrdyne/src/pool.rs index 6d083e286d3..489ef6fa428 100644 --- a/osrdyne/src/pool.rs +++ b/osrdyne/src/pool.rs @@ -246,10 +246,13 @@ impl Pool { Ok(()) }); } + { + let arc_clone = self.clone(); let expected_state = expected_state.clone(); tasks.spawn(async move { worker_control_loop( + arc_clone, expected_state, running_worker_watch, driver, @@ -467,6 +470,7 @@ async fn activity_processor( } async fn worker_control_loop( + pool: Arc, expected_state: tokio::sync::watch::Receiver, running_workers_watch: tokio::sync::watch::Sender>>, mut driver: Box, @@ -494,6 +498,7 @@ async fn worker_control_loop( .iter() .map(|c| &c.worker_key) .collect::>(); + let wanted_worker_keys = target .queues .into_iter() @@ -516,7 +521,11 @@ async fn worker_control_loop( // Add wanted groups for worker_key in wanted_worker_keys { - if let Err(e) = driver.get_or_create_worker_group(worker_key).await { + let queue_name = pool.key_queue_name(&worker_key); + if let Err(e) = driver + .get_or_create_worker_group(queue_name, worker_key) + .await + { log::error!( "Failed to get or create worker group: {:?}. Aborting current loop iteration.", e From 633007c9650b7fe7ccd4b1e268344de86de46237 Mon Sep 17 00:00:00 2001 From: Younes Khoudli Date: Wed, 2 Oct 2024 17:02:06 +0200 Subject: [PATCH 2/2] core: add the posibility to use hostname as WORKEY_ID This is useful when using keda or other autoscalers, as they won't support assigning WORKER_IDs. Signed-off-by: Younes Khoudli --- .../main/java/fr/sncf/osrd/cli/WorkerCommand.kt | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/fr/sncf/osrd/cli/WorkerCommand.kt b/core/src/main/java/fr/sncf/osrd/cli/WorkerCommand.kt index 8c33eb40ed8..17a1016f536 100644 --- a/core/src/main/java/fr/sncf/osrd/cli/WorkerCommand.kt +++ b/core/src/main/java/fr/sncf/osrd/cli/WorkerCommand.kt @@ -39,6 +39,7 @@ class WorkerCommand : CliCommand { private var editoastAuthorization: String? = null val WORKER_ID: String? + val WORKER_ID_USE_HOSTNAME: String? val WORKER_KEY: String? val WORKER_AMQP_URI: String val WORKER_POOL: String @@ -46,7 +47,7 @@ class WorkerCommand : CliCommand { val WORKER_ACTIVITY_EXCHANGE: String init { - WORKER_ID = System.getenv("WORKER_ID") + WORKER_ID_USE_HOSTNAME = System.getenv("WORKER_ID_USE_HOSTNAME") WORKER_KEY = System.getenv("WORKER_KEY") WORKER_AMQP_URI = System.getenv("WORKER_AMQP_URI") ?: "amqp://osrd:password@127.0.0.1:5672/%2f" @@ -55,6 +56,18 @@ class WorkerCommand : CliCommand { System.getenv("WORKER_REQUESTS_QUEUE") ?: "$WORKER_POOL-req-$WORKER_KEY" WORKER_ACTIVITY_EXCHANGE = System.getenv("WORKER_ACTIVITY_EXCHANGE") ?: "$WORKER_POOL-activity-xchg" + + WORKER_ID = + if ( + WORKER_ID_USE_HOSTNAME == null || + WORKER_ID_USE_HOSTNAME == "" || + WORKER_ID_USE_HOSTNAME == "0" || + WORKER_ID_USE_HOSTNAME.lowercase() == "false" + ) { + System.getenv("WORKER_ID") + } else { + java.net.InetAddress.getLocalHost().hostName + } } override fun run(): Int {