From 1b33b5e9ec04bea0010350798332a90413c482d3 Mon Sep 17 00:00:00 2001 From: Yury Akudovich Date: Tue, 5 Nov 2024 09:32:06 +0100 Subject: [PATCH] feat(prover): Move prover_autoscaler config into crate (#3222) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ Move prover_autoscaler config into crate. Remove protobuf monstrosity. Create kube client only in Agent. ## Why ❔ To simplify config management and allow using hashmaps directly in yaml config. Also this reduces number of used time crates to 2, reduces build time and size. Kube client was created for Scaler too, but was never used. ## Checklist - [x] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [ ] Tests for the changes have been added / updated. - [ ] Documentation comments have been added / updated. - [x] Code has been formatted via `zkstack dev fmt` and `zkstack dev lint`. ref ZKD-1855 --- Cargo.lock | 6 - Cargo.toml | 1 - core/lib/config/Cargo.toml | 5 - core/lib/config/src/configs/mod.rs | 1 - core/lib/config/src/configs/observability.rs | 6 +- core/lib/protobuf_config/Cargo.toml | 1 - core/lib/protobuf_config/src/lib.rs | 1 - .../src/proto/config/prover_autoscaler.proto | 74 ----- .../protobuf_config/src/prover_autoscaler.rs | 301 ------------------ prover/Cargo.lock | 22 +- prover/Cargo.toml | 5 +- .../crates/bin/prover_autoscaler/Cargo.toml | 6 +- .../bin/prover_autoscaler/src/config.rs | 33 +- .../prover_autoscaler/src/global/queuer.rs | 6 +- .../prover_autoscaler/src/global/scaler.rs | 8 +- .../crates/bin/prover_autoscaler/src/lib.rs | 1 + .../crates/bin/prover_autoscaler/src/main.rs | 21 +- .../bin/prover_autoscaler/src/metrics.rs | 3 +- 18 files changed, 65 insertions(+), 436 deletions(-) delete mode 100644 core/lib/protobuf_config/src/proto/config/prover_autoscaler.proto delete mode 100644 core/lib/protobuf_config/src/prover_autoscaler.rs rename core/lib/config/src/configs/prover_autoscaler.rs => prover/crates/bin/prover_autoscaler/src/config.rs (86%) diff --git a/Cargo.lock b/Cargo.lock index d8ccfb8fd799..8d393040c03e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10589,12 +10589,7 @@ dependencies = [ "secrecy", "serde", "serde_json", - "strum", - "strum_macros", - "time", "tracing", - "url", - "vise", "zksync_basic_types", "zksync_concurrency", "zksync_consensus_utils", @@ -11852,7 +11847,6 @@ dependencies = [ "secrecy", "serde_json", "serde_yaml", - "time", "tracing", "zksync_basic_types", "zksync_config", diff --git a/Cargo.toml b/Cargo.toml index 1faac5e16418..87e0de13129f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -172,7 +172,6 @@ sqlx = "0.8.1" static_assertions = "1.1" structopt = "0.3.20" strum = "0.26" -strum_macros = "0.26.4" tempfile = "3.0.2" test-casing = "0.1.2" test-log = "0.2.15" diff --git a/core/lib/config/Cargo.toml b/core/lib/config/Cargo.toml index af39e5159ba8..46c0b27d4b03 100644 --- a/core/lib/config/Cargo.toml +++ b/core/lib/config/Cargo.toml @@ -18,15 +18,10 @@ zksync_concurrency.workspace = true zksync_vlog = { workspace = true, optional = true } tracing = { workspace = true, optional = true } -url.workspace = true anyhow.workspace = true rand.workspace = true secrecy.workspace = true serde = { workspace = true, features = ["derive"] } -time = { workspace = true, features = ["serde-human-readable"] } -strum.workspace = true -strum_macros.workspace = true -vise.workspace = true [dev-dependencies] serde_json.workspace = true diff --git a/core/lib/config/src/configs/mod.rs b/core/lib/config/src/configs/mod.rs index 2b848030d719..5bf9a49e0acf 100644 --- a/core/lib/config/src/configs/mod.rs +++ b/core/lib/config/src/configs/mod.rs @@ -60,7 +60,6 @@ pub mod house_keeper; pub mod object_store; pub mod observability; pub mod proof_data_handler; -pub mod prover_autoscaler; pub mod prover_job_monitor; pub mod pruning; pub mod secrets; diff --git a/core/lib/config/src/configs/observability.rs b/core/lib/config/src/configs/observability.rs index 42363cbcb4ff..f7ad2a97b91c 100644 --- a/core/lib/config/src/configs/observability.rs +++ b/core/lib/config/src/configs/observability.rs @@ -1,6 +1,8 @@ +use serde::Deserialize; + /// Configuration for the essential observability stack, like /// logging and sentry integration. -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Deserialize)] pub struct ObservabilityConfig { /// URL of the Sentry instance to send events to. pub sentry_url: Option, @@ -15,7 +17,7 @@ pub struct ObservabilityConfig { pub log_directives: Option, } -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Deserialize)] pub struct OpentelemetryConfig { /// Enables export of span data of specified level (and above) using opentelemetry exporters. pub level: String, diff --git a/core/lib/protobuf_config/Cargo.toml b/core/lib/protobuf_config/Cargo.toml index 87a0a63567ba..92d9bd53978c 100644 --- a/core/lib/protobuf_config/Cargo.toml +++ b/core/lib/protobuf_config/Cargo.toml @@ -26,7 +26,6 @@ rand.workspace = true hex.workspace = true secrecy.workspace = true tracing.workspace = true -time.workspace = true [build-dependencies] zksync_protobuf_build.workspace = true diff --git a/core/lib/protobuf_config/src/lib.rs b/core/lib/protobuf_config/src/lib.rs index 09f42422c511..90c0ba071c0b 100644 --- a/core/lib/protobuf_config/src/lib.rs +++ b/core/lib/protobuf_config/src/lib.rs @@ -28,7 +28,6 @@ mod observability; mod proof_data_handler; pub mod proto; mod prover; -mod prover_autoscaler; mod prover_job_monitor; mod pruning; mod secrets; diff --git a/core/lib/protobuf_config/src/proto/config/prover_autoscaler.proto b/core/lib/protobuf_config/src/proto/config/prover_autoscaler.proto deleted file mode 100644 index 742181653861..000000000000 --- a/core/lib/protobuf_config/src/proto/config/prover_autoscaler.proto +++ /dev/null @@ -1,74 +0,0 @@ -syntax = "proto3"; - -package zksync.config.prover_autoscaler; - -import "zksync/std.proto"; -import "zksync/config/observability.proto"; - -message ProverAutoscalerConfig { - optional std.Duration graceful_shutdown_timeout = 1; // optional - optional ProverAutoscalerAgentConfig agent_config = 2; // optional - optional ProverAutoscalerScalerConfig scaler_config = 3; // optional - optional observability.Observability observability = 4; // optional -} - -message ProverAutoscalerAgentConfig { - optional uint32 prometheus_port = 1; // required - optional uint32 http_port = 2; // required - repeated string namespaces = 3; // optional - optional string cluster_name = 4; // optional - optional bool dry_run = 5; // optional -} - -message ProtocolVersion { - optional string namespace = 1; // required - optional string protocol_version = 2; // required -} - -message ClusterPriority { - optional string cluster = 1; // required - optional uint32 priority = 2; // required -} - -message ProverSpeed { - optional string gpu = 1; // required - optional uint32 speed = 2; // required -} - -message MaxProver { - optional string cluster_and_gpu = 1; // required, format: / - optional uint32 max = 2; // required -} - -message MinProver { - optional string namespace = 1; // required - optional uint32 min = 2; // required -} - -message MaxReplica { - optional string cluster = 1; // required - optional uint64 max = 2; // required -} - -message ScalerTarget { - optional string queue_report_field = 1; // required - optional string deployment = 5; // required - repeated MaxReplica max_replicas = 3; // required at least one - optional uint64 speed = 4; // optional - reserved 2; reserved "pod_name_prefix"; -} - -message ProverAutoscalerScalerConfig { - optional uint32 prometheus_port = 1; // required - optional std.Duration scaler_run_interval = 2; // optional - optional string prover_job_monitor_url = 3; // required - repeated string agents = 4; // required at least one - repeated ProtocolVersion protocol_versions = 5; // required at least one - repeated ClusterPriority cluster_priorities = 6; // optional - repeated ProverSpeed prover_speed = 7; // optional - optional uint32 long_pending_duration_s = 8; // optional - repeated MaxProver max_provers = 9; // optional - repeated MinProver min_provers = 10; // optional - repeated ScalerTarget scaler_targets = 11; // optional - optional bool dry_run = 12; // optional -} diff --git a/core/lib/protobuf_config/src/prover_autoscaler.rs b/core/lib/protobuf_config/src/prover_autoscaler.rs deleted file mode 100644 index 6b67d9f620ff..000000000000 --- a/core/lib/protobuf_config/src/prover_autoscaler.rs +++ /dev/null @@ -1,301 +0,0 @@ -use std::collections::HashMap; - -use anyhow::Context; -use time::Duration; -use zksync_config::configs::{self, prover_autoscaler::Gpu}; -use zksync_protobuf::{read_optional, repr::ProtoRepr, required, ProtoFmt}; - -use crate::{proto::prover_autoscaler as proto, read_optional_repr}; - -impl ProtoRepr for proto::ProverAutoscalerConfig { - type Type = configs::prover_autoscaler::ProverAutoscalerConfig; - fn read(&self) -> anyhow::Result { - Ok(Self::Type { - graceful_shutdown_timeout: read_optional(&self.graceful_shutdown_timeout) - .context("graceful_shutdown_timeout")? - .unwrap_or(Self::Type::default_graceful_shutdown_timeout()), - agent_config: read_optional_repr(&self.agent_config), - scaler_config: read_optional_repr(&self.scaler_config), - observability: read_optional_repr(&self.observability), - }) - } - - fn build(this: &Self::Type) -> Self { - Self { - graceful_shutdown_timeout: Some(ProtoFmt::build(&this.graceful_shutdown_timeout)), - agent_config: this.agent_config.as_ref().map(ProtoRepr::build), - scaler_config: this.scaler_config.as_ref().map(ProtoRepr::build), - observability: this.observability.as_ref().map(ProtoRepr::build), - } - } -} - -impl ProtoRepr for proto::ProverAutoscalerAgentConfig { - type Type = configs::prover_autoscaler::ProverAutoscalerAgentConfig; - fn read(&self) -> anyhow::Result { - Ok(Self::Type { - prometheus_port: required(&self.prometheus_port) - .and_then(|x| Ok((*x).try_into()?)) - .context("prometheus_port")?, - http_port: required(&self.http_port) - .and_then(|x| Ok((*x).try_into()?)) - .context("http_port")?, - namespaces: self.namespaces.to_vec(), - cluster_name: Some("".to_string()), - dry_run: self.dry_run.unwrap_or(Self::Type::default_dry_run()), - }) - } - - fn build(this: &Self::Type) -> Self { - Self { - prometheus_port: Some(this.prometheus_port.into()), - http_port: Some(this.http_port.into()), - namespaces: this.namespaces.clone(), - cluster_name: this.cluster_name.clone(), - dry_run: Some(this.dry_run), - } - } -} - -impl ProtoRepr for proto::ProverAutoscalerScalerConfig { - type Type = configs::prover_autoscaler::ProverAutoscalerScalerConfig; - fn read(&self) -> anyhow::Result { - Ok(Self::Type { - prometheus_port: required(&self.prometheus_port) - .and_then(|x| Ok((*x).try_into()?)) - .context("prometheus_port")?, - scaler_run_interval: read_optional(&self.scaler_run_interval) - .context("scaler_run_interval")? - .unwrap_or(Self::Type::default_scaler_run_interval()), - prover_job_monitor_url: required(&self.prover_job_monitor_url) - .context("prover_job_monitor_url")? - .clone(), - agents: self.agents.to_vec(), - protocol_versions: self - .protocol_versions - .iter() - .enumerate() - .map(|(i, e)| e.read().context(i)) - .collect::>() - .context("protocol_versions")?, - cluster_priorities: self - .cluster_priorities - .iter() - .enumerate() - .map(|(i, e)| e.read().context(i)) - .collect::>() - .context("cluster_priorities")?, - prover_speed: self - .prover_speed - .iter() - .enumerate() - .map(|(i, e)| e.read().context(i)) - .collect::>() - .context("prover_speed")?, - long_pending_duration: match self.long_pending_duration_s { - Some(s) => Duration::seconds(s.into()), - None => Self::Type::default_long_pending_duration(), - }, - max_provers: self.max_provers.iter().fold(HashMap::new(), |mut acc, e| { - let (cluster_and_gpu, max) = e.read().expect("max_provers"); - if let Some((cluster, gpu)) = cluster_and_gpu.split_once('/') { - acc.entry(cluster.to_string()) - .or_default() - .insert(gpu.parse().expect("max_provers/gpu"), max); - } - acc - }), - min_provers: self - .min_provers - .iter() - .enumerate() - .map(|(i, e)| e.read().context(i)) - .collect::>() - .context("min_provers")?, - scaler_targets: self - .scaler_targets - .iter() - .enumerate() - .map(|(i, x)| x.read().context(i).unwrap()) - .collect::>(), - dry_run: self.dry_run.unwrap_or_default(), - }) - } - - fn build(this: &Self::Type) -> Self { - Self { - prometheus_port: Some(this.prometheus_port.into()), - scaler_run_interval: Some(ProtoFmt::build(&this.scaler_run_interval)), - prover_job_monitor_url: Some(this.prover_job_monitor_url.clone()), - agents: this.agents.clone(), - protocol_versions: this - .protocol_versions - .iter() - .map(|(k, v)| proto::ProtocolVersion::build(&(k.clone(), v.clone()))) - .collect(), - cluster_priorities: this - .cluster_priorities - .iter() - .map(|(k, v)| proto::ClusterPriority::build(&(k.clone(), *v))) - .collect(), - prover_speed: this - .prover_speed - .iter() - .map(|(k, v)| proto::ProverSpeed::build(&(*k, *v))) - .collect(), - long_pending_duration_s: Some(this.long_pending_duration.whole_seconds() as u32), - max_provers: this - .max_provers - .iter() - .flat_map(|(cluster, inner_map)| { - inner_map.iter().map(move |(gpu, max)| { - proto::MaxProver::build(&(format!("{}/{}", cluster, gpu), *max)) - }) - }) - .collect(), - min_provers: this - .min_provers - .iter() - .map(|(k, v)| proto::MinProver::build(&(k.clone(), *v))) - .collect(), - scaler_targets: this.scaler_targets.iter().map(ProtoRepr::build).collect(), - dry_run: Some(this.dry_run), - } - } -} - -impl ProtoRepr for proto::ProtocolVersion { - type Type = (String, String); - fn read(&self) -> anyhow::Result { - Ok(( - required(&self.namespace).context("namespace")?.clone(), - required(&self.protocol_version) - .context("protocol_version")? - .clone(), - )) - } - fn build(this: &Self::Type) -> Self { - Self { - namespace: Some(this.0.clone()), - protocol_version: Some(this.1.clone()), - } - } -} - -impl ProtoRepr for proto::ClusterPriority { - type Type = (String, u32); - fn read(&self) -> anyhow::Result { - Ok(( - required(&self.cluster).context("cluster")?.clone(), - *required(&self.priority).context("priority")?, - )) - } - fn build(this: &Self::Type) -> Self { - Self { - cluster: Some(this.0.clone()), - priority: Some(this.1), - } - } -} - -impl ProtoRepr for proto::ProverSpeed { - type Type = (Gpu, u32); - fn read(&self) -> anyhow::Result { - Ok(( - required(&self.gpu).context("gpu")?.parse()?, - *required(&self.speed).context("speed")?, - )) - } - fn build(this: &Self::Type) -> Self { - Self { - gpu: Some(this.0.to_string()), - speed: Some(this.1), - } - } -} - -impl ProtoRepr for proto::MaxProver { - type Type = (String, u32); - fn read(&self) -> anyhow::Result { - Ok(( - required(&self.cluster_and_gpu) - .context("cluster_and_gpu")? - .parse()?, - *required(&self.max).context("max")?, - )) - } - fn build(this: &Self::Type) -> Self { - Self { - cluster_and_gpu: Some(this.0.to_string()), - max: Some(this.1), - } - } -} - -impl ProtoRepr for proto::MinProver { - type Type = (String, u32); - fn read(&self) -> anyhow::Result { - Ok(( - required(&self.namespace).context("namespace")?.clone(), - *required(&self.min).context("min")?, - )) - } - fn build(this: &Self::Type) -> Self { - Self { - namespace: Some(this.0.to_string()), - min: Some(this.1), - } - } -} - -impl ProtoRepr for proto::MaxReplica { - type Type = (String, usize); - fn read(&self) -> anyhow::Result { - Ok(( - required(&self.cluster).context("cluster")?.parse()?, - *required(&self.max).context("max")? as usize, - )) - } - fn build(this: &Self::Type) -> Self { - Self { - cluster: Some(this.0.to_string()), - max: Some(this.1 as u64), - } - } -} - -impl ProtoRepr for proto::ScalerTarget { - type Type = configs::prover_autoscaler::ScalerTarget; - fn read(&self) -> anyhow::Result { - Ok(Self::Type { - queue_report_field: required(&self.queue_report_field) - .and_then(|x| Ok((*x).parse()?)) - .context("queue_report_field")?, - deployment: required(&self.deployment).context("deployment")?.clone(), - max_replicas: self - .max_replicas - .iter() - .enumerate() - .map(|(i, e)| e.read().context(i)) - .collect::>() - .context("max_replicas")?, - speed: match self.speed { - Some(x) => x as usize, - None => Self::Type::default_speed(), - }, - }) - } - - fn build(this: &Self::Type) -> Self { - Self { - queue_report_field: Some(this.queue_report_field.to_string()), - deployment: Some(this.deployment.clone()), - max_replicas: this - .max_replicas - .iter() - .map(|(k, v)| proto::MaxReplica::build(&(k.clone(), *v))) - .collect(), - speed: Some(this.speed as u64), - } - } -} diff --git a/prover/Cargo.lock b/prover/Cargo.lock index d13af11fde9f..8432cbb85fa9 100644 --- a/prover/Cargo.lock +++ b/prover/Cargo.lock @@ -2616,6 +2616,16 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" +[[package]] +name = "humantime-serde" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57a3db5ea5923d99402c94e9feb261dc5ee9b4efa158b0315f788cf549cc200c" +dependencies = [ + "humantime", + "serde", +] + [[package]] name = "hyper" version = "0.14.29" @@ -7887,12 +7897,7 @@ dependencies = [ "rand 0.8.5", "secrecy 0.8.0", "serde", - "strum", - "strum_macros", - "time", "tracing", - "url", - "vise", "zksync_basic_types", "zksync_concurrency", "zksync_consensus_utils", @@ -8341,7 +8346,6 @@ dependencies = [ "secrecy 0.8.0", "serde_json", "serde_yaml", - "time", "tracing", "zksync_basic_types", "zksync_config", @@ -8358,10 +8362,10 @@ dependencies = [ "async-trait", "axum", "chrono", - "clap 4.5.4", "ctrlc", "debug-map-sorted", "futures 0.3.30", + "humantime-serde", "k8s-openapi", "kube", "once_cell", @@ -8371,9 +8375,10 @@ dependencies = [ "rustls", "serde", "serde_json", + "serde_yaml", "structopt", "strum", - "time", + "strum_macros", "tokio", "tracing", "tracing-subscriber", @@ -8382,7 +8387,6 @@ dependencies = [ "vise", "zksync_config", "zksync_core_leftovers", - "zksync_protobuf_config", "zksync_prover_job_monitor", "zksync_types", "zksync_utils", diff --git a/prover/Cargo.toml b/prover/Cargo.toml index 32c3185f64c3..e53efaae1968 100644 --- a/prover/Cargo.toml +++ b/prover/Cargo.toml @@ -28,6 +28,8 @@ debug-map-sorted = "0.1.1" dialoguer = "0.11" futures = "0.3" hex = "0.4" +humantime = "2.1" +humantime-serde = "1.1" indicatif = "0.16" itertools = "0.10.5" jemallocator = "0.5" @@ -47,12 +49,13 @@ rustls = { version = "0.23.12", features = ["ring"] } serde = "1.0" serde_derive = "1.0" serde_json = "1.0" +serde_yaml = "0.9" sha3 = "0.10.8" sqlx = { version = "0.8.1", default-features = false } structopt = "0.3.26" strum = { version = "0.26" } +strum_macros = "0.26" tempfile = "3" -time = "0.3.36" tokio = "1" tokio-util = "0.7.11" toml_edit = "0.14.4" diff --git a/prover/crates/bin/prover_autoscaler/Cargo.toml b/prover/crates/bin/prover_autoscaler/Cargo.toml index fbf3ecae9098..88569aa87e94 100644 --- a/prover/crates/bin/prover_autoscaler/Cargo.toml +++ b/prover/crates/bin/prover_autoscaler/Cargo.toml @@ -16,16 +16,15 @@ zksync_utils.workspace = true zksync_types.workspace = true zksync_config = { workspace = true, features = ["observability_ext"] } zksync_prover_job_monitor.workspace = true -zksync_protobuf_config.workspace = true debug-map-sorted.workspace = true anyhow.workspace = true async-trait.workspace = true axum.workspace = true chrono.workspace = true -clap = { workspace = true, features = ["derive"] } ctrlc = { workspace = true, features = ["termination"] } futures.workspace = true +humantime-serde.workspace = true k8s-openapi = { workspace = true, features = ["v1_30"] } kube = { workspace = true, features = ["runtime", "derive"] } once_cell.workspace = true @@ -35,9 +34,10 @@ ring.workspace = true rustls = { workspace = true, features = ["ring"] } serde = { workspace = true, features = ["derive"] } serde_json.workspace = true +serde_yaml.workspace = true structopt.workspace = true strum.workspace = true -time.workspace = true +strum_macros.workspace = true tokio = { workspace = true, features = ["time", "macros"] } tracing-subscriber = { workspace = true, features = ["env-filter"] } tracing.workspace = true diff --git a/core/lib/config/src/configs/prover_autoscaler.rs b/prover/crates/bin/prover_autoscaler/src/config.rs similarity index 86% rename from core/lib/config/src/configs/prover_autoscaler.rs rename to prover/crates/bin/prover_autoscaler/src/config.rs index 4191208b96e3..c34cff15951a 100644 --- a/core/lib/config/src/configs/prover_autoscaler.rs +++ b/prover/crates/bin/prover_autoscaler/src/config.rs @@ -1,18 +1,17 @@ -use std::collections::HashMap; +use std::{collections::HashMap, path::PathBuf, time::Duration}; +use anyhow::Context; use serde::Deserialize; use strum::Display; use strum_macros::EnumString; -use time::Duration; use vise::EncodeLabelValue; - -use crate::configs::ObservabilityConfig; +use zksync_config::configs::ObservabilityConfig; /// Config used for running ProverAutoscaler (both Scaler and Agent). -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Deserialize)] pub struct ProverAutoscalerConfig { /// Amount of time ProverJobMonitor will wait all it's tasks to finish. - // TODO: find a way to use #[serde(with = "humantime_serde")] with time::Duration. + #[serde(with = "humantime_serde")] pub graceful_shutdown_timeout: Duration, pub agent_config: Option, pub scaler_config: Option, @@ -40,7 +39,10 @@ pub struct ProverAutoscalerScalerConfig { /// Port for prometheus metrics connection. pub prometheus_port: u16, /// The interval between runs for global Scaler. - #[serde(default = "ProverAutoscalerScalerConfig::default_scaler_run_interval")] + #[serde( + with = "humantime_serde", + default = "ProverAutoscalerScalerConfig::default_scaler_run_interval" + )] pub scaler_run_interval: Duration, /// URL to get queue reports from. /// In production should be "http://prover-job-monitor.stage2.svc.cluster.local:3074/queue_report". @@ -59,7 +61,10 @@ pub struct ProverAutoscalerScalerConfig { /// Minimum number of provers per namespace. pub min_provers: HashMap, /// Duration after which pending pod considered long pending. - #[serde(default = "ProverAutoscalerScalerConfig::default_long_pending_duration")] + #[serde( + with = "humantime_serde", + default = "ProverAutoscalerScalerConfig::default_long_pending_duration" + )] pub long_pending_duration: Duration, /// List of simple autoscaler targets. pub scaler_targets: Vec, @@ -136,7 +141,7 @@ pub struct ScalerTarget { impl ProverAutoscalerConfig { /// Default graceful shutdown timeout -- 5 seconds pub fn default_graceful_shutdown_timeout() -> Duration { - Duration::seconds(5) + Duration::from_secs(5) } } @@ -153,7 +158,7 @@ impl ProverAutoscalerAgentConfig { impl ProverAutoscalerScalerConfig { /// Default scaler_run_interval -- 10s pub fn default_scaler_run_interval() -> Duration { - Duration::seconds(10) + Duration::from_secs(10) } /// Default prover_job_monitor_url -- cluster local URL @@ -163,7 +168,7 @@ impl ProverAutoscalerScalerConfig { /// Default long_pending_duration -- 10m pub fn default_long_pending_duration() -> Duration { - Duration::minutes(10) + Duration::from_secs(600) } } @@ -172,3 +177,9 @@ impl ScalerTarget { 1 } } + +pub fn config_from_yaml(path: &PathBuf) -> anyhow::Result { + let yaml = std::fs::read_to_string(path) + .with_context(|| format!("failed to read {}", path.display()))?; + Ok(serde_yaml::from_str(&yaml)?) +} diff --git a/prover/crates/bin/prover_autoscaler/src/global/queuer.rs b/prover/crates/bin/prover_autoscaler/src/global/queuer.rs index 7255f479647e..baeb5b70a4ef 100644 --- a/prover/crates/bin/prover_autoscaler/src/global/queuer.rs +++ b/prover/crates/bin/prover_autoscaler/src/global/queuer.rs @@ -2,11 +2,13 @@ use std::{collections::HashMap, ops::Deref}; use anyhow::{Context, Ok}; use reqwest::Method; -use zksync_config::configs::prover_autoscaler::QueueReportFields; use zksync_prover_job_monitor::autoscaler_queue_reporter::{QueueReport, VersionedQueueReport}; use zksync_utils::http_with_retries::send_request_with_retries; -use crate::metrics::{AUTOSCALER_METRICS, DEFAULT_ERROR_CODE}; +use crate::{ + config::QueueReportFields, + metrics::{AUTOSCALER_METRICS, DEFAULT_ERROR_CODE}, +}; const MAX_RETRIES: usize = 5; diff --git a/prover/crates/bin/prover_autoscaler/src/global/scaler.rs b/prover/crates/bin/prover_autoscaler/src/global/scaler.rs index dc652999da5f..829b95dd7514 100644 --- a/prover/crates/bin/prover_autoscaler/src/global/scaler.rs +++ b/prover/crates/bin/prover_autoscaler/src/global/scaler.rs @@ -4,14 +4,12 @@ use chrono::Utc; use debug_map_sorted::SortedOutputExt; use once_cell::sync::Lazy; use regex::Regex; -use zksync_config::configs::prover_autoscaler::{ - Gpu, ProverAutoscalerScalerConfig, QueueReportFields, ScalerTarget, -}; use super::{queuer, watcher}; use crate::{ agent::{ScaleDeploymentRequest, ScaleRequest}, cluster_types::{Cluster, Clusters, Pod, PodStatus}, + config::{Gpu, ProverAutoscalerScalerConfig, QueueReportFields, ScalerTarget}, metrics::AUTOSCALER_METRICS, task_wiring::Task, }; @@ -128,7 +126,7 @@ impl Scaler { simple_scalers.push(SimpleScaler::new( c, config.cluster_priorities.clone(), - chrono::Duration::seconds(config.long_pending_duration.whole_seconds()), + chrono::Duration::seconds(config.long_pending_duration.as_secs() as i64), )) } Self { @@ -150,7 +148,7 @@ impl GpuScaler { max_provers: config.max_provers, prover_speed: config.prover_speed, long_pending_duration: chrono::Duration::seconds( - config.long_pending_duration.whole_seconds(), + config.long_pending_duration.as_secs() as i64, ), } } diff --git a/prover/crates/bin/prover_autoscaler/src/lib.rs b/prover/crates/bin/prover_autoscaler/src/lib.rs index 0b0d704c9078..019fe2b7fb4d 100644 --- a/prover/crates/bin/prover_autoscaler/src/lib.rs +++ b/prover/crates/bin/prover_autoscaler/src/lib.rs @@ -1,5 +1,6 @@ pub mod agent; pub(crate) mod cluster_types; +pub mod config; pub mod global; pub mod k8s; pub(crate) mod metrics; diff --git a/prover/crates/bin/prover_autoscaler/src/main.rs b/prover/crates/bin/prover_autoscaler/src/main.rs index ac5121dccd9c..b153cb9c4d59 100644 --- a/prover/crates/bin/prover_autoscaler/src/main.rs +++ b/prover/crates/bin/prover_autoscaler/src/main.rs @@ -6,10 +6,9 @@ use tokio::{ sync::{oneshot, watch}, task::JoinHandle, }; -use zksync_core_leftovers::temp_config_store::read_yaml_repr; -use zksync_protobuf_config::proto::prover_autoscaler; use zksync_prover_autoscaler::{ agent, + config::{config_from_yaml, ProverAutoscalerConfig}, global::{self}, k8s::{Scaler, Watcher}, task_wiring::TaskRunner, @@ -56,15 +55,11 @@ struct Opt { async fn main() -> anyhow::Result<()> { let opt = Opt::from_args(); let general_config = - read_yaml_repr::(&opt.config_path) - .context("general config")?; + config_from_yaml::(&opt.config_path).context("general config")?; let observability_config = general_config .observability .context("observability config")?; let _observability_guard = observability_config.install()?; - // That's unfortunate that there are at least 3 different Duration in rust and we use all 3 in this repo. - // TODO: Consider updating zksync_protobuf to support std::time::Duration. - let graceful_shutdown_timeout = general_config.graceful_shutdown_timeout.unsigned_abs(); let (stop_signal_sender, stop_signal_receiver) = oneshot::channel(); let mut stop_signal_sender = Some(stop_signal_sender); @@ -77,9 +72,6 @@ async fn main() -> anyhow::Result<()> { let (stop_sender, stop_receiver) = watch::channel(false); - let _ = rustls::crypto::ring::default_provider().install_default(); - let client = kube::Client::try_default().await?; - let mut tasks = vec![]; match opt.job { @@ -92,6 +84,9 @@ async fn main() -> anyhow::Result<()> { let exporter_config = PrometheusExporterConfig::pull(agent_config.prometheus_port); tasks.push(tokio::spawn(exporter_config.run(stop_receiver.clone()))); + let _ = rustls::crypto::ring::default_provider().install_default(); + let client = kube::Client::try_default().await?; + // TODO: maybe get cluster name from curl -H "Metadata-Flavor: Google" // http://metadata.google.internal/computeMetadata/v1/instance/attributes/cluster-name let watcher = Watcher::new(client.clone(), cluster, agent_config.namespaces); @@ -107,7 +102,7 @@ async fn main() -> anyhow::Result<()> { AutoscalerType::Scaler => { tracing::info!("Starting ProverAutoscaler Scaler"); let scaler_config = general_config.scaler_config.context("scaler_config")?; - let interval = scaler_config.scaler_run_interval.unsigned_abs(); + let interval = scaler_config.scaler_run_interval; let exporter_config = PrometheusExporterConfig::pull(scaler_config.prometheus_port); tasks.push(tokio::spawn(exporter_config.run(stop_receiver.clone()))); let watcher = @@ -127,7 +122,9 @@ async fn main() -> anyhow::Result<()> { } } stop_sender.send(true).ok(); - tasks.complete(graceful_shutdown_timeout).await; + tasks + .complete(general_config.graceful_shutdown_timeout) + .await; Ok(()) } diff --git a/prover/crates/bin/prover_autoscaler/src/metrics.rs b/prover/crates/bin/prover_autoscaler/src/metrics.rs index 39860a9e8f09..115ae3b74259 100644 --- a/prover/crates/bin/prover_autoscaler/src/metrics.rs +++ b/prover/crates/bin/prover_autoscaler/src/metrics.rs @@ -1,5 +1,6 @@ use vise::{Counter, Gauge, LabeledFamily, Metrics}; -use zksync_config::configs::prover_autoscaler::Gpu; + +use crate::config::Gpu; pub const DEFAULT_ERROR_CODE: u16 = 500;