From e22cfb6cffd2c4b2ad1ec3f3f433616fcd738511 Mon Sep 17 00:00:00 2001 From: EmilLuta Date: Mon, 19 Aug 2024 14:06:18 +0200 Subject: [PATCH] feat(prover): Add ProverJobMonitor (#2666) ProverJobMonitor will be house keeper's counter part in prover subsystem. TL;DR; it's a singleton component, monitoring prover subsystem jobs. The TL;DR; is that prover and core won't share any databases. This enables: - core deployments without affecting prover - removing prover infrastructure (DB) in proverless envs The release plan is as follows: - release a component (PJM) that runs in parallel with HK - migrate all jobs/metrics/dashboards to PJM - delete their counterparts in HK - remove redundant infrastructure This PR contains: - a new component (PJM) - fixes for bugs/issues with old metrics (backported to HK) - refactoring of metrics (PJM metrics cover same metrics as HK, but they are different, as we can cover more with less) - various other small nits P.S. Name is up for discussion, feel free to suggest better name. --- core/bin/zksync_server/src/main.rs | 1 + core/bin/zksync_server/src/node_builder.rs | 2 +- core/lib/basic_types/src/basic_fri_types.rs | 170 ++++++++++++++- core/lib/basic_types/src/prover_dal.rs | 2 + core/lib/config/src/configs/general.rs | 4 +- core/lib/config/src/configs/mod.rs | 2 + .../config/src/configs/prover_job_monitor.rs | 185 ++++++++++++++++ core/lib/config/src/testonly.rs | 27 ++- core/lib/env_config/src/lib.rs | 1 + core/lib/env_config/src/prover_job_monitor.rs | 89 ++++++++ core/lib/protobuf_config/src/general.rs | 9 +- core/lib/protobuf_config/src/lib.rs | 1 + .../src/proto/config/general.proto | 2 + .../src/proto/config/prover_job_monitor.proto | 20 ++ .../protobuf_config/src/prover_job_monitor.rs | 131 ++++++++++++ .../src/temp_config_store/mod.rs | 7 +- .../archiver/fri_gpu_prover_archiver.rs | 4 +- .../archiver/fri_prover_jobs_archiver.rs | 4 +- .../fri_prover_queue_reporter.rs | 75 ++++--- ...ri_witness_generator_jobs_retry_manager.rs | 12 +- etc/env/base/prover_job_monitor.toml | 15 ++ etc/env/file_based/general.yaml | 17 ++ prover/Cargo.lock | 19 ++ prover/Cargo.toml | 1 + .../crates/bin/prover_job_monitor/Cargo.toml | 27 +++ .../src/archiver/gpu_prover_archiver.rs | 39 ++++ .../prover_job_monitor/src/archiver/mod.rs | 5 + .../src/archiver/prover_jobs_archiver.rs | 37 ++++ .../src/job_requeuer/mod.rs | 7 + .../proof_compressor_job_requeuer.rs | 42 ++++ .../src/job_requeuer/prover_job_requeuer.rs | 42 ++++ .../witness_generator_job_requeuer.rs | 90 ++++++++ .../crates/bin/prover_job_monitor/src/lib.rs | 6 + .../crates/bin/prover_job_monitor/src/main.rs | 201 ++++++++++++++++++ .../bin/prover_job_monitor/src/metrics.rs | 98 +++++++++ .../src/queue_reporter/mod.rs | 7 + .../proof_compressor_queue_reporter.rs | 68 ++++++ .../queue_reporter/prover_queue_reporter.rs | 83 ++++++++ .../witness_generator_queue_reporter.rs | 71 +++++++ .../bin/prover_job_monitor/src/task_wiring.rs | 86 ++++++++ .../src/witness_job_queuer.rs | 121 +++++++++++ .../proptest-regressions/tests.txt | 9 + .../crates/bin/witness_generator/src/main.rs | 2 +- ...80c233a9fd3e892b5a867a5517c2e04497a8.json} | 18 +- ...32b826708800a2f72f09bd7aea08cf724e1a.json} | 18 +- ...e4aa4f797bcc6e05ec2f2e435a7e940d8cf9.json} | 18 +- ...36b9704e8a76de95811cb23e3aa9f2512ade.json} | 4 +- ...9d0c658093dede5eb61489205aa751ad5b8ec.json | 22 -- ...6d9065bf4494daf8f7632ab2bfe055773f7b.json} | 18 +- ...b4d3d6a762761e45af2a73fe96da804e627e.json} | 18 +- ...e8e0ed14ad1f42ffd0b383fbfb38e78df8ae3.json | 22 ++ ...1570fc88c17822bebd5b92e3b2f726d9af3a.json} | 18 +- ...b93bfd5d96fdc68732fe38c79ccd44b84def.json} | 18 +- ...6dde4142e09330557cc627fee2db278ace50.json} | 18 +- ...f89bbd72934e1405e320e746158e6d395d96.json} | 18 +- .../src/fri_gpu_prover_queue_dal.rs | 5 +- .../src/fri_proof_compressor_dal.rs | 13 +- .../lib/prover_dal/src/fri_prover_dal.rs | 83 ++++---- .../src/fri_witness_generator_dal.rs | 77 +++++-- 59 files changed, 2047 insertions(+), 182 deletions(-) create mode 100644 core/lib/config/src/configs/prover_job_monitor.rs create mode 100644 core/lib/env_config/src/prover_job_monitor.rs create mode 100644 core/lib/protobuf_config/src/proto/config/prover_job_monitor.proto create mode 100644 core/lib/protobuf_config/src/prover_job_monitor.rs create mode 100644 etc/env/base/prover_job_monitor.toml create mode 100644 prover/crates/bin/prover_job_monitor/Cargo.toml create mode 100644 prover/crates/bin/prover_job_monitor/src/archiver/gpu_prover_archiver.rs create mode 100644 prover/crates/bin/prover_job_monitor/src/archiver/mod.rs create mode 100644 prover/crates/bin/prover_job_monitor/src/archiver/prover_jobs_archiver.rs create mode 100644 prover/crates/bin/prover_job_monitor/src/job_requeuer/mod.rs create mode 100644 prover/crates/bin/prover_job_monitor/src/job_requeuer/proof_compressor_job_requeuer.rs create mode 100644 prover/crates/bin/prover_job_monitor/src/job_requeuer/prover_job_requeuer.rs create mode 100644 prover/crates/bin/prover_job_monitor/src/job_requeuer/witness_generator_job_requeuer.rs create mode 100644 prover/crates/bin/prover_job_monitor/src/lib.rs create mode 100644 prover/crates/bin/prover_job_monitor/src/main.rs create mode 100644 prover/crates/bin/prover_job_monitor/src/metrics.rs create mode 100644 prover/crates/bin/prover_job_monitor/src/queue_reporter/mod.rs create mode 100644 prover/crates/bin/prover_job_monitor/src/queue_reporter/proof_compressor_queue_reporter.rs create mode 100644 prover/crates/bin/prover_job_monitor/src/queue_reporter/prover_queue_reporter.rs create mode 100644 prover/crates/bin/prover_job_monitor/src/queue_reporter/witness_generator_queue_reporter.rs create mode 100644 prover/crates/bin/prover_job_monitor/src/task_wiring.rs create mode 100644 prover/crates/bin/prover_job_monitor/src/witness_job_queuer.rs create mode 100644 prover/crates/bin/vk_setup_data_generator_server_fri/proptest-regressions/tests.txt rename prover/crates/lib/prover_dal/.sqlx/{query-860846c9bcad1edd1a2906542c178815e29440592b2bb00adacf02730b526458.json => query-102b79726652d9150c802350bdca80c233a9fd3e892b5a867a5517c2e04497a8.json} (68%) rename prover/crates/lib/prover_dal/.sqlx/{query-3c3abbf689fa64c6da7de69fd916769dbb04d3a61cf232892236c974660ffe64.json => query-216d0c263539739b53975a96a10332b826708800a2f72f09bd7aea08cf724e1a.json} (71%) rename prover/crates/lib/prover_dal/.sqlx/{query-8719c090a9ad2488d556e495238cdce6412e2725cf5162ce7a733f6dceaecb11.json => query-2b12c5d469e6220cc8ddc997c666e4aa4f797bcc6e05ec2f2e435a7e940d8cf9.json} (76%) rename prover/crates/lib/prover_dal/.sqlx/{query-ca9d06141265b8524ee28c55569cb21a635037d89ce24dd3ad58ffaadb59594a.json => query-5f18efe2fb3a16cdf3c23379f36536b9704e8a76de95811cb23e3aa9f2512ade.json} (65%) delete mode 100644 prover/crates/lib/prover_dal/.sqlx/query-6cfc59d2fc039c706f30ae91b7d9d0c658093dede5eb61489205aa751ad5b8ec.json rename prover/crates/lib/prover_dal/.sqlx/{query-a0f60a97f09b2467ca73bb6fbebb210d65149cdd4a3411a79b717aadbffb43af.json => query-8357972a21b39644e4cbe4bedc3b6d9065bf4494daf8f7632ab2bfe055773f7b.json} (71%) rename prover/crates/lib/prover_dal/.sqlx/{query-e3194873d24e67f8d0e98bf8bf2d4f9a3b98458746972c9860fb9473947d59ff.json => query-9895b2ded08be3e81a5357decf76b4d3d6a762761e45af2a73fe96da804e627e.json} (74%) create mode 100644 prover/crates/lib/prover_dal/.sqlx/query-a9e9399edfcaf7569869d5ac72ae8e0ed14ad1f42ffd0b383fbfb38e78df8ae3.json rename prover/crates/lib/prover_dal/.sqlx/{query-bfb80956a18eabf266f5b5a9d62912d57f8eb2a38bdb7884fc812a2897a3a660.json => query-bcc5d3d35652f49b41d4ee673b171570fc88c17822bebd5b92e3b2f726d9af3a.json} (63%) rename prover/crates/lib/prover_dal/.sqlx/{query-c156004a0e5ad5bcc33d3b894fd69718349ac4fc08b455c7f4265d7443f2ec13.json => query-d0be28042b50199075cb0eca26f6b93bfd5d96fdc68732fe38c79ccd44b84def.json} (50%) rename prover/crates/lib/prover_dal/.sqlx/{query-e32c0d85cb2841efb0b7cea6b049bae42849574731d33539bfdcca21c9b64f4e.json => query-d5bb897092bce2788fe02f31c9de6dde4142e09330557cc627fee2db278ace50.json} (76%) rename prover/crates/lib/prover_dal/.sqlx/{query-5e781f84ec41edd0941fa84de837effac442434c6e734d977e6682a7484abe7f.json => query-eb2a85cb60c680a71203769db7baf89bbd72934e1405e320e746158e6d395d96.json} (75%) diff --git a/core/bin/zksync_server/src/main.rs b/core/bin/zksync_server/src/main.rs index 1c22ce5c41a2..7e0ff0e49201 100644 --- a/core/bin/zksync_server/src/main.rs +++ b/core/bin/zksync_server/src/main.rs @@ -210,5 +210,6 @@ fn load_env_config() -> anyhow::Result { external_price_api_client_config: ExternalPriceApiClientConfig::from_env().ok(), external_proof_integration_api_config: ExternalProofIntegrationApiConfig::from_env().ok(), experimental_vm_config: ExperimentalVmConfig::from_env().ok(), + prover_job_monitor_config: None, }) } diff --git a/core/bin/zksync_server/src/node_builder.rs b/core/bin/zksync_server/src/node_builder.rs index d9bc46903000..7c4503876e9d 100644 --- a/core/bin/zksync_server/src/node_builder.rs +++ b/core/bin/zksync_server/src/node_builder.rs @@ -442,7 +442,7 @@ impl MainNodeBuilder { fn add_house_keeper_layer(mut self) -> anyhow::Result { let house_keeper_config = try_load_config!(self.configs.house_keeper_config); let fri_prover_config = try_load_config!(self.configs.prover_config); - let fri_witness_generator_config = try_load_config!(self.configs.witness_generator); + let fri_witness_generator_config = try_load_config!(self.configs.witness_generator_config); let fri_prover_group_config = try_load_config!(self.configs.prover_group_config); let fri_proof_compressor_config = try_load_config!(self.configs.proof_compressor_config); diff --git a/core/lib/basic_types/src/basic_fri_types.rs b/core/lib/basic_types/src/basic_fri_types.rs index 9765435f0973..5969cca6b8c0 100644 --- a/core/lib/basic_types/src/basic_fri_types.rs +++ b/core/lib/basic_types/src/basic_fri_types.rs @@ -2,11 +2,19 @@ // TODO (PLA-773): Should be moved to the prover workspace. -use std::{convert::TryFrom, str::FromStr}; +use std::{ + collections::{hash_map::IntoIter, HashMap}, + convert::TryFrom, + iter::once, + str::FromStr, +}; use serde::{Deserialize, Serialize}; -use crate::protocol_version::{ProtocolSemanticVersion, ProtocolVersionId, VersionPatch}; +use crate::{ + protocol_version::{ProtocolSemanticVersion, ProtocolVersionId, VersionPatch}, + prover_dal::JobCountStatistics, +}; const BLOB_CHUNK_SIZE: usize = 31; const ELEMENTS_PER_4844_BLOCK: usize = 4096; @@ -127,6 +135,14 @@ impl From for AggregationRound { } impl AggregationRound { + pub const ALL_ROUNDS: [AggregationRound; 5] = [ + AggregationRound::BasicCircuits, + AggregationRound::LeafAggregation, + AggregationRound::NodeAggregation, + AggregationRound::RecursionTip, + AggregationRound::Scheduler, + ]; + pub fn next(&self) -> Option { match self { AggregationRound::BasicCircuits => Some(AggregationRound::LeafAggregation), @@ -187,6 +203,156 @@ impl TryFrom for AggregationRound { } } +/// Wrapper for mapping from protocol version to prover circuits job stats +#[derive(Debug)] +pub struct ProtocolVersionedCircuitProverStats { + protocol_versioned_circuit_stats: HashMap, +} + +impl FromIterator for ProtocolVersionedCircuitProverStats { + fn from_iter>(iter: I) -> Self { + let mut mapping = HashMap::new(); + for entry in iter { + let protocol_semantic_version = entry.protocol_semantic_version; + let circuit_prover_stats: &mut CircuitProverStats = + mapping.entry(protocol_semantic_version).or_default(); + circuit_prover_stats.add(entry.circuit_id_round_tuple, entry.job_count_statistics); + } + Self { + protocol_versioned_circuit_stats: mapping, + } + } +} + +impl IntoIterator for ProtocolVersionedCircuitProverStats { + type Item = (ProtocolSemanticVersion, CircuitProverStats); + type IntoIter = IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.protocol_versioned_circuit_stats.into_iter() + } +} + +/// Wrapper for mapping between circuit/aggregation round to number of such jobs (queued and in progress) +#[derive(Debug)] +pub struct CircuitProverStats { + circuits_prover_stats: HashMap, +} + +impl IntoIterator for CircuitProverStats { + type Item = (CircuitIdRoundTuple, JobCountStatistics); + type IntoIter = IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.circuits_prover_stats.into_iter() + } +} + +impl CircuitProverStats { + fn add( + &mut self, + circuit_id_round_tuple: CircuitIdRoundTuple, + job_count_statistics: JobCountStatistics, + ) { + let stats = self + .circuits_prover_stats + .entry(circuit_id_round_tuple) + .or_default(); + stats.queued += job_count_statistics.queued; + stats.in_progress += job_count_statistics.in_progress; + } +} + +impl Default for CircuitProverStats { + fn default() -> Self { + let mut stats = HashMap::new(); + for circuit in (1..=15).chain(once(255)) { + stats.insert( + CircuitIdRoundTuple::new(circuit, 0), + JobCountStatistics::default(), + ); + } + for circuit in 3..=18 { + stats.insert( + CircuitIdRoundTuple::new(circuit, 1), + JobCountStatistics::default(), + ); + } + stats.insert( + CircuitIdRoundTuple::new(2, 2), + JobCountStatistics::default(), + ); + stats.insert( + CircuitIdRoundTuple::new(255, 3), + JobCountStatistics::default(), + ); + stats.insert( + CircuitIdRoundTuple::new(1, 4), + JobCountStatistics::default(), + ); + Self { + circuits_prover_stats: stats, + } + } +} + +/// DTO for communication between DAL and prover_job_monitor. +/// Represents an entry -- count (queued & in progress) of jobs (circuit_id, aggregation_round) for a given protocol version. +#[derive(Debug)] +pub struct CircuitProverStatsEntry { + circuit_id_round_tuple: CircuitIdRoundTuple, + protocol_semantic_version: ProtocolSemanticVersion, + job_count_statistics: JobCountStatistics, +} + +impl CircuitProverStatsEntry { + pub fn new( + circuit_id: i16, + aggregation_round: i16, + protocol_version: i32, + protocol_version_patch: i32, + status: &str, + count: i64, + ) -> Self { + let mut queued = 0; + let mut in_progress = 0; + match status { + "queued" => queued = count as usize, + "in_progress" => in_progress = count as usize, + _ => unreachable!("received {:?}, expected only 'queued'/'in_progress' from DB as part of query filter", status), + }; + + let job_count_statistics = JobCountStatistics { + queued, + in_progress, + }; + let protocol_semantic_version = ProtocolSemanticVersion::new( + ProtocolVersionId::try_from(protocol_version as u16) + .expect("received protocol version is broken"), + VersionPatch(protocol_version_patch as u32), + ); + + // BEWARE, HERE BE DRAGONS. + // In database, the `circuit_id` stored is the circuit for which the aggregation is done, + // not the circuit which is running. + // There is a single node level aggregation circuit, which is circuit 2. + // This can aggregate multiple leaf nodes (which may belong to different circuits). + // This "conversion" is a forced hacky way to use `circuit_id` 2 for nodes. + // A proper fix will be later provided to solve this once new auto-scaler is in place. + let circuit_id = if aggregation_round == 2 { + 2 + } else { + circuit_id as u8 + }; + let circuit_id_round_tuple = CircuitIdRoundTuple::new(circuit_id, aggregation_round as u8); + CircuitProverStatsEntry { + circuit_id_round_tuple, + protocol_semantic_version, + job_count_statistics, + } + } +} + #[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq, Hash)] pub struct JobIdentifiers { pub circuit_id: u8, diff --git a/core/lib/basic_types/src/prover_dal.rs b/core/lib/basic_types/src/prover_dal.rs index edaad3798e82..52de0eae919c 100644 --- a/core/lib/basic_types/src/prover_dal.rs +++ b/core/lib/basic_types/src/prover_dal.rs @@ -52,6 +52,8 @@ pub struct StuckJobs { pub status: String, pub attempts: u64, pub circuit_id: Option, + pub picked_by: Option, + pub error: Option, } // TODO (PLA-774): Redundant structure, should be replaced with `std::net::SocketAddr`. diff --git a/core/lib/config/src/configs/general.rs b/core/lib/config/src/configs/general.rs index 3e6b05d8003e..38ffd3d45fac 100644 --- a/core/lib/config/src/configs/general.rs +++ b/core/lib/config/src/configs/general.rs @@ -6,6 +6,7 @@ use crate::{ da_dispatcher::DADispatcherConfig, fri_prover_group::FriProverGroupConfig, house_keeper::HouseKeeperConfig, + prover_job_monitor::ProverJobMonitorConfig, pruning::PruningConfig, snapshot_recovery::SnapshotRecoveryConfig, vm_runner::{BasicWitnessInputProducerConfig, ProtectiveReadsWriterConfig}, @@ -33,7 +34,7 @@ pub struct GeneralConfig { pub prover_gateway: Option, pub witness_vector_generator: Option, pub prover_group_config: Option, - pub witness_generator: Option, + pub witness_generator_config: Option, pub prometheus_config: Option, pub proof_data_handler_config: Option, pub db_config: Option, @@ -52,4 +53,5 @@ pub struct GeneralConfig { pub consensus_config: Option, pub external_proof_integration_api_config: Option, pub experimental_vm_config: Option, + pub prover_job_monitor_config: Option, } diff --git a/core/lib/config/src/configs/mod.rs b/core/lib/config/src/configs/mod.rs index 0ecd8ee0df98..b213060f7ced 100644 --- a/core/lib/config/src/configs/mod.rs +++ b/core/lib/config/src/configs/mod.rs @@ -22,6 +22,7 @@ pub use self::{ object_store::ObjectStoreConfig, observability::{ObservabilityConfig, OpentelemetryConfig}, proof_data_handler::ProofDataHandlerConfig, + prover_job_monitor::ProverJobMonitorConfig, pruning::PruningConfig, secrets::{DatabaseSecrets, L1Secrets, Secrets}, snapshot_recovery::SnapshotRecoveryConfig, @@ -57,6 +58,7 @@ pub mod house_keeper; pub mod object_store; pub mod observability; pub mod proof_data_handler; +pub mod prover_job_monitor; pub mod pruning; pub mod secrets; pub mod snapshot_recovery; diff --git a/core/lib/config/src/configs/prover_job_monitor.rs b/core/lib/config/src/configs/prover_job_monitor.rs new file mode 100644 index 000000000000..c16b1db81b7a --- /dev/null +++ b/core/lib/config/src/configs/prover_job_monitor.rs @@ -0,0 +1,185 @@ +use std::time::Duration; + +use serde::{Deserialize, Serialize}; + +/// Config used for running ProverJobMonitor. +/// It handles configuration for setup of the binary (like database connections, prometheus) and configuration for jobs that are being ran. +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +pub struct ProverJobMonitorConfig { + /// Port for prometheus metrics connection. + pub prometheus_port: u16, + /// Maximum number of database connections per pool. + /// In a balanced system it should match the number of Tasks ran by ProverJobMonitor. + /// If lower, components will wait on one another for a connection. + /// If more, database will use more resources for idle connections (which drains DB resources needed for other components in Prover Subsystems). + pub max_db_connections: u32, + /// Amount of time ProverJobMonitor will wait all it's tasks to finish. + #[serde(default = "ProverJobMonitorConfig::default_graceful_shutdown_timeout_ms")] + pub graceful_shutdown_timeout_ms: u64, + /// The interval between runs for GPU Prover Archiver. + #[serde(default = "ProverJobMonitorConfig::default_gpu_prover_archiver_run_interval_ms")] + pub gpu_prover_archiver_run_interval_ms: u64, + /// The amount of time after which 'dead' provers can be archived. + #[serde( + default = "ProverJobMonitorConfig::default_gpu_prover_archiver_archive_prover_after_ms" + )] + pub gpu_prover_archiver_archive_prover_after_ms: u64, + /// The interval between runs for Prover Jobs Archiver. + #[serde(default = "ProverJobMonitorConfig::default_prover_jobs_archiver_run_interval_ms")] + pub prover_jobs_archiver_run_interval_ms: u64, + /// The amount of time after which completed jobs (that belong to completed batches) can be archived. + #[serde( + default = "ProverJobMonitorConfig::default_prover_jobs_archiver_archive_jobs_after_ms" + )] + pub prover_jobs_archiver_archive_jobs_after_ms: u64, + /// The interval between runs for Proof Compressor Job Requeuer. + #[serde( + default = "ProverJobMonitorConfig::default_proof_compressor_job_requeuer_run_interval_ms" + )] + pub proof_compressor_job_requeuer_run_interval_ms: u64, + /// The interval between runs for Prover Job Requeuer. + #[serde(default = "ProverJobMonitorConfig::default_prover_job_requeuer_run_interval_ms")] + pub prover_job_requeuer_run_interval_ms: u64, + /// The interval between runs for Witness Generator Job Requeuer. + #[serde( + default = "ProverJobMonitorConfig::default_witness_generator_job_requeuer_run_interval_ms" + )] + pub witness_generator_job_requeuer_run_interval_ms: u64, + /// The interval between runs for Proof Compressor Queue Reporter. + #[serde( + default = "ProverJobMonitorConfig::default_proof_compressor_queue_reporter_run_interval_ms" + )] + pub proof_compressor_queue_reporter_run_interval_ms: u64, + /// The interval between runs for Prover Queue Reporter. + #[serde(default = "ProverJobMonitorConfig::default_prover_queue_reporter_run_interval_ms")] + pub prover_queue_reporter_run_interval_ms: u64, + /// The interval between runs for Witness Generator Queue Reporter. + #[serde( + default = "ProverJobMonitorConfig::default_witness_generator_queue_reporter_run_interval_ms" + )] + pub witness_generator_queue_reporter_run_interval_ms: u64, + /// The interval between runs for Witness Job Queuer. + #[serde(default = "ProverJobMonitorConfig::default_witness_job_queuer_run_interval_ms")] + pub witness_job_queuer_run_interval_ms: u64, +} + +impl ProverJobMonitorConfig { + /// Default graceful shutdown timeout -- 5 seconds + pub fn default_graceful_shutdown_timeout_ms() -> u64 { + 5_000 + } + + /// Amount of time ProverJobMonitor will wait all it's tasks to finish. + pub fn graceful_shutdown_timeout(&self) -> Duration { + Duration::from_millis(self.graceful_shutdown_timeout_ms) + } + + /// The interval between runs for GPU Prover Archiver. + pub fn gpu_prover_archiver_run_interval(&self) -> Duration { + Duration::from_millis(self.gpu_prover_archiver_run_interval_ms) + } + + /// Default gpu_prover_archiver_archive_prover_after_secs -- 1 day + pub fn default_gpu_prover_archiver_run_interval_ms() -> u64 { + 86_400_000 + } + + /// The amount of time after which 'dead' provers can be archived. + pub fn archive_gpu_prover_duration(&self) -> Duration { + Duration::from_millis(self.gpu_prover_archiver_archive_prover_after_ms) + } + + /// Default gpu_prover_archiver_archive_prover_after_ms -- 2 days + pub fn default_gpu_prover_archiver_archive_prover_after_ms() -> u64 { + 172_800_000 + } + + /// The amount of time after which completed jobs (that belong to completed batches) can be archived. + pub fn prover_jobs_archiver_run_interval(&self) -> Duration { + Duration::from_millis(self.prover_jobs_archiver_run_interval_ms) + } + + /// Default prover_jobs_archiver_run_interval_ms -- 30 minutes + pub fn default_prover_jobs_archiver_run_interval_ms() -> u64 { + 1_800_000 + } + /// The interval between runs for Prover Jobs Archiver. + pub fn archive_prover_jobs_duration(&self) -> Duration { + Duration::from_millis(self.prover_jobs_archiver_archive_jobs_after_ms) + } + + /// Default prover_jobs_archiver_archive_jobs_after_ms -- 2 days + pub fn default_prover_jobs_archiver_archive_jobs_after_ms() -> u64 { + 172_800_000 + } + + /// The interval between runs for Proof Compressor Job Requeuer. + pub fn proof_compressor_job_requeuer_run_interval(&self) -> Duration { + Duration::from_millis(self.proof_compressor_job_requeuer_run_interval_ms) + } + + /// Default proof_compressor_job_requeuer_run_interval_ms -- 10 seconds + pub fn default_proof_compressor_job_requeuer_run_interval_ms() -> u64 { + 10_000 + } + + /// The interval between runs for Prover Job Requeuer. + pub fn prover_job_requeuer_run_interval(&self) -> Duration { + Duration::from_millis(self.prover_job_requeuer_run_interval_ms) + } + + /// Default prover_job_requeuer_run_interval_ms -- 10 seconds + pub fn default_prover_job_requeuer_run_interval_ms() -> u64 { + 10_000 + } + + /// The interval between runs for Witness Generator Job Requeuer. + pub fn witness_generator_job_requeuer_run_interval(&self) -> Duration { + Duration::from_millis(self.witness_generator_job_requeuer_run_interval_ms) + } + + /// Default witness_generator_job_requeuer_run_interval_ms -- 10 seconds + pub fn default_witness_generator_job_requeuer_run_interval_ms() -> u64 { + 10_000 + } + + /// The interval between runs for Proof Compressor Queue Reporter. + pub fn proof_compressor_queue_reporter_run_interval(&self) -> Duration { + Duration::from_millis(self.proof_compressor_queue_reporter_run_interval_ms) + } + + /// Default proof_compressor_queue_reporter_run_interval_ms -- 10 seconds + pub fn default_proof_compressor_queue_reporter_run_interval_ms() -> u64 { + 10_000 + } + + /// The interval between runs for Prover Queue Reporter. + pub fn prover_queue_reporter_run_interval(&self) -> Duration { + Duration::from_millis(self.prover_queue_reporter_run_interval_ms) + } + + /// Default prover_queue_reporter_run_interval_ms -- 10 seconds + pub fn default_prover_queue_reporter_run_interval_ms() -> u64 { + 10_000 + } + + /// The interval between runs for Witness Generator Queue Reporter. + pub fn witness_generator_queue_reporter_run_interval(&self) -> Duration { + Duration::from_millis(self.witness_generator_queue_reporter_run_interval_ms) + } + + /// Default witness_generator_queue_reporter_run_interval_ms -- 10 seconds + pub fn default_witness_generator_queue_reporter_run_interval_ms() -> u64 { + 10_000 + } + + /// The interval between runs for Witness Job Queuer. + pub fn witness_job_queuer_run_interval(&self) -> Duration { + Duration::from_millis(self.witness_job_queuer_run_interval_ms) + } + + /// Default witness_job_queuer_run_interval_ms -- 10 seconds + pub fn default_witness_job_queuer_run_interval_ms() -> u64 { + 10_000 + } +} diff --git a/core/lib/config/src/testonly.rs b/core/lib/config/src/testonly.rs index 162f1d1617d8..632030e8f1da 100644 --- a/core/lib/config/src/testonly.rs +++ b/core/lib/config/src/testonly.rs @@ -1057,6 +1057,30 @@ impl Distribution for EncodeDist { + fn sample( + &self, + rng: &mut R, + ) -> configs::prover_job_monitor::ProverJobMonitorConfig { + configs::prover_job_monitor::ProverJobMonitorConfig { + prometheus_port: self.sample(rng), + max_db_connections: self.sample(rng), + graceful_shutdown_timeout_ms: self.sample(rng), + gpu_prover_archiver_run_interval_ms: self.sample(rng), + gpu_prover_archiver_archive_prover_after_ms: self.sample(rng), + prover_jobs_archiver_run_interval_ms: self.sample(rng), + prover_jobs_archiver_archive_jobs_after_ms: self.sample(rng), + proof_compressor_job_requeuer_run_interval_ms: self.sample(rng), + prover_job_requeuer_run_interval_ms: self.sample(rng), + witness_generator_job_requeuer_run_interval_ms: self.sample(rng), + proof_compressor_queue_reporter_run_interval_ms: self.sample(rng), + prover_queue_reporter_run_interval_ms: self.sample(rng), + witness_generator_queue_reporter_run_interval_ms: self.sample(rng), + witness_job_queuer_run_interval_ms: self.sample(rng), + } + } +} + impl Distribution for EncodeDist { fn sample(&self, rng: &mut R) -> configs::GeneralConfig { configs::GeneralConfig { @@ -1073,7 +1097,7 @@ impl Distribution for EncodeDist { prover_gateway: self.sample(rng), witness_vector_generator: self.sample(rng), prover_group_config: self.sample(rng), - witness_generator: self.sample(rng), + witness_generator_config: self.sample(rng), prometheus_config: self.sample(rng), proof_data_handler_config: self.sample(rng), db_config: self.sample(rng), @@ -1092,6 +1116,7 @@ impl Distribution for EncodeDist { consensus_config: self.sample(rng), external_proof_integration_api_config: self.sample(rng), experimental_vm_config: self.sample(rng), + prover_job_monitor_config: self.sample(rng), } } } diff --git a/core/lib/env_config/src/lib.rs b/core/lib/env_config/src/lib.rs index fcb0f3625ea1..8cfa7b58a31c 100644 --- a/core/lib/env_config/src/lib.rs +++ b/core/lib/env_config/src/lib.rs @@ -26,6 +26,7 @@ mod da_dispatcher; mod external_price_api_client; mod external_proof_integration_api; mod genesis; +mod prover_job_monitor; #[cfg(test)] mod test_utils; mod vm_runner; diff --git a/core/lib/env_config/src/prover_job_monitor.rs b/core/lib/env_config/src/prover_job_monitor.rs new file mode 100644 index 000000000000..3a8f80473eb1 --- /dev/null +++ b/core/lib/env_config/src/prover_job_monitor.rs @@ -0,0 +1,89 @@ +use zksync_config::configs::ProverJobMonitorConfig; + +use crate::{envy_load, FromEnv}; + +impl FromEnv for ProverJobMonitorConfig { + fn from_env() -> anyhow::Result { + envy_load("prover_job_monitor", "PROVER_JOB_MONITOR_") + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test_utils::EnvMutex; + + static MUTEX: EnvMutex = EnvMutex::new(); + + fn expected_config() -> ProverJobMonitorConfig { + ProverJobMonitorConfig { + prometheus_port: 3317, + max_db_connections: 9, + graceful_shutdown_timeout_ms: 5000, + gpu_prover_archiver_run_interval_ms: 86400000, + gpu_prover_archiver_archive_prover_after_ms: 172800000, + prover_jobs_archiver_run_interval_ms: 1800000, + prover_jobs_archiver_archive_jobs_after_ms: 172800000, + proof_compressor_job_requeuer_run_interval_ms: 10000, + prover_job_requeuer_run_interval_ms: 10000, + witness_generator_job_requeuer_run_interval_ms: 10000, + proof_compressor_queue_reporter_run_interval_ms: 10000, + prover_queue_reporter_run_interval_ms: 10000, + witness_generator_queue_reporter_run_interval_ms: 10000, + witness_job_queuer_run_interval_ms: 10000, + } + } + + fn expected_changed_config() -> ProverJobMonitorConfig { + let mut config = expected_config(); + config.graceful_shutdown_timeout_ms += 1; + config.gpu_prover_archiver_run_interval_ms += 1; + config.gpu_prover_archiver_archive_prover_after_ms += 1; + config.prover_jobs_archiver_run_interval_ms += 1; + config.prover_jobs_archiver_archive_jobs_after_ms += 1; + config.proof_compressor_job_requeuer_run_interval_ms += 1; + config.prover_job_requeuer_run_interval_ms += 1; + config.witness_generator_job_requeuer_run_interval_ms += 1; + config.proof_compressor_queue_reporter_run_interval_ms += 1; + config.prover_queue_reporter_run_interval_ms += 1; + config.witness_generator_queue_reporter_run_interval_ms += 1; + config.witness_job_queuer_run_interval_ms += 1; + config + } + + #[test] + fn from_env_with_default() { + let config = r#" + PROVER_JOB_MONITOR_PROMETHEUS_PORT=3317 + PROVER_JOB_MONITOR_MAX_DB_CONNECTIONS=9 + "#; + let mut lock = MUTEX.lock(); + lock.set_env(config); + let actual = ProverJobMonitorConfig::from_env().unwrap(); + assert_eq!(actual, expected_config()); + } + + #[test] + fn from_env() { + let config = r#" + PROVER_JOB_MONITOR_PROMETHEUS_PORT=3317 + PROVER_JOB_MONITOR_MAX_DB_CONNECTIONS=9 + PROVER_JOB_MONITOR_GRACEFUL_SHUTDOWN_TIMEOUT_MS=5001 + PROVER_JOB_MONITOR_GPU_PROVER_ARCHIVER_RUN_INTERVAL_MS=86400001 + PROVER_JOB_MONITOR_GPU_PROVER_ARCHIVER_ARCHIVE_PROVER_AFTER_MS=172800001 + PROVER_JOB_MONITOR_PROVER_JOBS_ARCHIVER_RUN_INTERVAL_MS=1800001 + PROVER_JOB_MONITOR_PROVER_JOBS_ARCHIVER_ARCHIVE_JOBS_AFTER_MS=172800001 + PROVER_JOB_MONITOR_PROOF_COMPRESSOR_JOB_REQUEUER_RUN_INTERVAL_MS=10001 + PROVER_JOB_MONITOR_PROVER_JOB_REQUEUER_RUN_INTERVAL_MS=10001 + PROVER_JOB_MONITOR_WITNESS_GENERATOR_JOB_REQUEUER_RUN_INTERVAL_MS=10001 + PROVER_JOB_MONITOR_PROOF_COMPRESSOR_QUEUE_REPORTER_RUN_INTERVAL_MS=10001 + PROVER_JOB_MONITOR_PROVER_QUEUE_REPORTER_RUN_INTERVAL_MS=10001 + PROVER_JOB_MONITOR_WITNESS_GENERATOR_QUEUE_REPORTER_RUN_INTERVAL_MS=10001 + PROVER_JOB_MONITOR_WITNESS_JOB_QUEUER_RUN_INTERVAL_MS=10001 + "#; + let mut lock = MUTEX.lock(); + lock.set_env(config); + let actual = ProverJobMonitorConfig::from_env().unwrap(); + assert_eq!(actual, expected_changed_config()); + } +} diff --git a/core/lib/protobuf_config/src/general.rs b/core/lib/protobuf_config/src/general.rs index af6f690dfc8f..87bca88db387 100644 --- a/core/lib/protobuf_config/src/general.rs +++ b/core/lib/protobuf_config/src/general.rs @@ -22,7 +22,7 @@ impl ProtoRepr for proto::GeneralConfig { prover_group_config: read_optional_repr(&self.prover_group), prometheus_config: read_optional_repr(&self.prometheus), proof_data_handler_config: read_optional_repr(&self.data_handler), - witness_generator: read_optional_repr(&self.witness_generator), + witness_generator_config: read_optional_repr(&self.witness_generator), api_config: read_optional_repr(&self.api), db_config: read_optional_repr(&self.db), eth: read_optional_repr(&self.eth), @@ -44,6 +44,7 @@ impl ProtoRepr for proto::GeneralConfig { &self.external_proof_integration_api, ), experimental_vm_config: read_optional_repr(&self.experimental_vm), + prover_job_monitor_config: read_optional_repr(&self.prover_job_monitor), }) } @@ -62,7 +63,7 @@ impl ProtoRepr for proto::GeneralConfig { proof_compressor: this.proof_compressor_config.as_ref().map(ProtoRepr::build), prover: this.prover_config.as_ref().map(ProtoRepr::build), prover_group: this.prover_group_config.as_ref().map(ProtoRepr::build), - witness_generator: this.witness_generator.as_ref().map(ProtoRepr::build), + witness_generator: this.witness_generator_config.as_ref().map(ProtoRepr::build), prover_gateway: this.prover_gateway.as_ref().map(ProtoRepr::build), witness_vector_generator: this.witness_vector_generator.as_ref().map(ProtoRepr::build), prometheus: this.prometheus_config.as_ref().map(ProtoRepr::build), @@ -99,6 +100,10 @@ impl ProtoRepr for proto::GeneralConfig { .as_ref() .map(ProtoRepr::build), experimental_vm: this.experimental_vm_config.as_ref().map(ProtoRepr::build), + prover_job_monitor: this + .prover_job_monitor_config + .as_ref() + .map(ProtoRepr::build), } } } diff --git a/core/lib/protobuf_config/src/lib.rs b/core/lib/protobuf_config/src/lib.rs index ee526b2bb67f..f4d0188ea20f 100644 --- a/core/lib/protobuf_config/src/lib.rs +++ b/core/lib/protobuf_config/src/lib.rs @@ -31,6 +31,7 @@ mod snapshots_creator; mod external_price_api_client; mod external_proof_integration_api; +mod prover_job_monitor; mod snapshot_recovery; #[cfg(test)] mod tests; diff --git a/core/lib/protobuf_config/src/proto/config/general.proto b/core/lib/protobuf_config/src/proto/config/general.proto index 373559e73516..3595468949b1 100644 --- a/core/lib/protobuf_config/src/proto/config/general.proto +++ b/core/lib/protobuf_config/src/proto/config/general.proto @@ -24,6 +24,7 @@ import "zksync/config/base_token_adjuster.proto"; import "zksync/config/external_price_api_client.proto"; import "zksync/config/external_proof_integration_api.proto"; import "zksync/core/consensus.proto"; +import "zksync/config/prover_job_monitor.proto"; message GeneralConfig { optional database.Postgres postgres = 1; @@ -58,4 +59,5 @@ message GeneralConfig { optional core.consensus.Config consensus = 42; optional external_proof_integration_api.ExternalProofIntegrationApi external_proof_integration_api = 43; optional experimental.Vm experimental_vm = 44; + optional prover_job_monitor.ProverJobMonitor prover_job_monitor = 45; } diff --git a/core/lib/protobuf_config/src/proto/config/prover_job_monitor.proto b/core/lib/protobuf_config/src/proto/config/prover_job_monitor.proto new file mode 100644 index 000000000000..7b505aa3bcfb --- /dev/null +++ b/core/lib/protobuf_config/src/proto/config/prover_job_monitor.proto @@ -0,0 +1,20 @@ +syntax = "proto3"; + +package zksync.config.prover_job_monitor; + +message ProverJobMonitor { + optional uint32 prometheus_port = 1; // required; u32 + optional uint32 max_db_connections = 2; // required; u32 + optional uint64 graceful_shutdown_timeout_ms = 3; // optional; ms + optional uint64 gpu_prover_archiver_run_interval_ms = 4; // optional; ms + optional uint64 gpu_prover_archiver_archive_prover_after_ms = 5; // optional; ms + optional uint64 prover_jobs_archiver_run_interval_ms = 6; // optional; ms + optional uint64 prover_jobs_archiver_archive_jobs_after_ms = 7; // optional; ms + optional uint64 proof_compressor_job_requeuer_run_interval_ms = 8; // optional; ms + optional uint64 prover_job_requeuer_run_interval_ms = 9; // optional; ms + optional uint64 witness_generator_job_requeuer_run_interval_ms = 10; // optional; ms + optional uint64 proof_compressor_queue_reporter_run_interval_ms = 11; // optional; ms + optional uint64 prover_queue_reporter_run_interval_ms = 12; // optional; ms + optional uint64 witness_generator_queue_reporter_run_interval_ms = 13; // optional; ms + optional uint64 witness_job_queuer_run_interval_ms = 14; // optional; ms +} diff --git a/core/lib/protobuf_config/src/prover_job_monitor.rs b/core/lib/protobuf_config/src/prover_job_monitor.rs new file mode 100644 index 000000000000..a1c5a7c05995 --- /dev/null +++ b/core/lib/protobuf_config/src/prover_job_monitor.rs @@ -0,0 +1,131 @@ +use anyhow::Context as _; +use zksync_config::configs; +use zksync_protobuf::{repr::ProtoRepr, required}; + +use crate::proto::prover_job_monitor as proto; + +impl ProtoRepr for proto::ProverJobMonitor { + type Type = configs::prover_job_monitor::ProverJobMonitorConfig; + fn read(&self) -> anyhow::Result { + Ok(Self::Type { + prometheus_port: required(&self.prometheus_port) + .and_then(|x| Ok((*x).try_into()?)) + .context("prometheus_port")?, + max_db_connections: *required(&self.max_db_connections) + .context("max_db_connections")?, + graceful_shutdown_timeout_ms: *required( + &self + .graceful_shutdown_timeout_ms + .or_else(|| Some(Self::Type::default_graceful_shutdown_timeout_ms())), + ) + .context("graceful_shutdown_timeout_ms")?, + gpu_prover_archiver_run_interval_ms: *required( + &self + .gpu_prover_archiver_run_interval_ms + .or_else(|| Some(Self::Type::default_gpu_prover_archiver_run_interval_ms())), + ) + .context("gpu_prover_archiver_run_interval_ms")?, + gpu_prover_archiver_archive_prover_after_ms: *required( + &self + .gpu_prover_archiver_archive_prover_after_ms + .or_else(|| { + Some(Self::Type::default_gpu_prover_archiver_archive_prover_after_ms()) + }), + ) + .context("gpu_prover_archiver_archive_prover_after_ms")?, + prover_jobs_archiver_run_interval_ms: *required( + &self + .prover_jobs_archiver_run_interval_ms + .or_else(|| Some(Self::Type::default_prover_jobs_archiver_run_interval_ms())), + ) + .context("prover_jobs_archiver_run_interval_ms")?, + prover_jobs_archiver_archive_jobs_after_ms: *required( + &self.prover_jobs_archiver_archive_jobs_after_ms.or_else(|| { + Some(Self::Type::default_prover_jobs_archiver_archive_jobs_after_ms()) + }), + ) + .context("prover_jobs_archiver_archive_jobs_after_ms")?, + proof_compressor_job_requeuer_run_interval_ms: *required( + &self + .proof_compressor_job_requeuer_run_interval_ms + .or_else(|| { + Some(Self::Type::default_proof_compressor_job_requeuer_run_interval_ms()) + }), + ) + .context("proof_compressor_job_requeuer_run_interval_ms")?, + prover_job_requeuer_run_interval_ms: *required( + &self + .prover_job_requeuer_run_interval_ms + .or_else(|| Some(Self::Type::default_prover_job_requeuer_run_interval_ms())), + ) + .context("prover_job_requeuer_run_interval_ms")?, + witness_generator_job_requeuer_run_interval_ms: *required( + &self + .witness_generator_job_requeuer_run_interval_ms + .or_else(|| { + Some(Self::Type::default_witness_generator_job_requeuer_run_interval_ms()) + }), + ) + .context("witness_generator_job_requeuer_run_interval_ms")?, + proof_compressor_queue_reporter_run_interval_ms: *required( + &self + .proof_compressor_queue_reporter_run_interval_ms + .or_else(|| { + Some(Self::Type::default_proof_compressor_queue_reporter_run_interval_ms()) + }), + ) + .context("proof_compressor_queue_reporter_run_interval_ms")?, + prover_queue_reporter_run_interval_ms: *required( + &self + .prover_queue_reporter_run_interval_ms + .or_else(|| Some(Self::Type::default_prover_queue_reporter_run_interval_ms())), + ) + .context("prover_queue_reporter_run_interval_ms")?, + witness_generator_queue_reporter_run_interval_ms: *required( + &self + .witness_generator_queue_reporter_run_interval_ms + .or_else(|| { + Some(Self::Type::default_witness_generator_queue_reporter_run_interval_ms()) + }), + ) + .context("witness_generator_queue_reporter_run_interval_ms")?, + witness_job_queuer_run_interval_ms: *required( + &self + .witness_job_queuer_run_interval_ms + .or_else(|| Some(Self::Type::default_witness_job_queuer_run_interval_ms())), + ) + .context("witness_job_queuer_run_interval_ms")?, + }) + } + + fn build(this: &Self::Type) -> Self { + Self { + prometheus_port: Some(this.prometheus_port.into()), + max_db_connections: Some(this.max_db_connections), + graceful_shutdown_timeout_ms: Some(this.graceful_shutdown_timeout_ms), + gpu_prover_archiver_run_interval_ms: Some(this.gpu_prover_archiver_run_interval_ms), + gpu_prover_archiver_archive_prover_after_ms: Some( + this.gpu_prover_archiver_archive_prover_after_ms, + ), + prover_jobs_archiver_run_interval_ms: Some(this.prover_jobs_archiver_run_interval_ms), + prover_jobs_archiver_archive_jobs_after_ms: Some( + this.prover_jobs_archiver_archive_jobs_after_ms, + ), + proof_compressor_job_requeuer_run_interval_ms: Some( + this.proof_compressor_job_requeuer_run_interval_ms, + ), + prover_job_requeuer_run_interval_ms: Some(this.prover_job_requeuer_run_interval_ms), + witness_generator_job_requeuer_run_interval_ms: Some( + this.witness_generator_job_requeuer_run_interval_ms, + ), + proof_compressor_queue_reporter_run_interval_ms: Some( + this.proof_compressor_queue_reporter_run_interval_ms, + ), + prover_queue_reporter_run_interval_ms: Some(this.prover_queue_reporter_run_interval_ms), + witness_generator_queue_reporter_run_interval_ms: Some( + this.witness_generator_queue_reporter_run_interval_ms, + ), + witness_job_queuer_run_interval_ms: Some(this.witness_job_queuer_run_interval_ms), + } + } +} diff --git a/core/lib/zksync_core_leftovers/src/temp_config_store/mod.rs b/core/lib/zksync_core_leftovers/src/temp_config_store/mod.rs index d25c46bda083..4d2606dcf12d 100644 --- a/core/lib/zksync_core_leftovers/src/temp_config_store/mod.rs +++ b/core/lib/zksync_core_leftovers/src/temp_config_store/mod.rs @@ -16,7 +16,7 @@ use zksync_config::{ ExternalPriceApiClientConfig, FriProofCompressorConfig, FriProverConfig, FriProverGatewayConfig, FriWitnessGeneratorConfig, FriWitnessVectorGeneratorConfig, GeneralConfig, ObservabilityConfig, PrometheusConfig, ProofDataHandlerConfig, - ProtectiveReadsWriterConfig, PruningConfig, SnapshotRecoveryConfig, + ProtectiveReadsWriterConfig, ProverJobMonitorConfig, PruningConfig, SnapshotRecoveryConfig, }, ApiConfig, BaseTokenAdjusterConfig, ContractVerifierConfig, DADispatcherConfig, DBConfig, EthConfig, EthWatchConfig, ExternalProofIntegrationApiConfig, GasAdjusterConfig, @@ -79,6 +79,7 @@ pub struct TempConfigStore { pub external_price_api_client_config: Option, pub external_proof_integration_api_config: Option, pub experimental_vm_config: Option, + pub prover_job_monitor_config: Option, } impl TempConfigStore { @@ -97,7 +98,7 @@ impl TempConfigStore { prover_gateway: self.fri_prover_gateway_config.clone(), witness_vector_generator: self.fri_witness_vector_generator.clone(), prover_group_config: self.fri_prover_group_config.clone(), - witness_generator: self.fri_witness_generator_config.clone(), + witness_generator_config: self.fri_witness_generator_config.clone(), prometheus_config: self.prometheus_config.clone(), proof_data_handler_config: self.proof_data_handler_config.clone(), db_config: self.db_config.clone(), @@ -118,6 +119,7 @@ impl TempConfigStore { .external_proof_integration_api_config .clone(), experimental_vm_config: self.experimental_vm_config.clone(), + prover_job_monitor_config: self.prover_job_monitor_config.clone(), } } @@ -191,6 +193,7 @@ fn load_env_config() -> anyhow::Result { external_price_api_client_config: ExternalPriceApiClientConfig::from_env().ok(), external_proof_integration_api_config: ExternalProofIntegrationApiConfig::from_env().ok(), experimental_vm_config: ExperimentalVmConfig::from_env().ok(), + prover_job_monitor_config: ProverJobMonitorConfig::from_env().ok(), }) } diff --git a/core/node/house_keeper/src/prover/archiver/fri_gpu_prover_archiver.rs b/core/node/house_keeper/src/prover/archiver/fri_gpu_prover_archiver.rs index 5db53710733c..b0f5ff23fe3f 100644 --- a/core/node/house_keeper/src/prover/archiver/fri_gpu_prover_archiver.rs +++ b/core/node/house_keeper/src/prover/archiver/fri_gpu_prover_archiver.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use zksync_dal::ConnectionPool; use zksync_prover_dal::{Prover, ProverDal}; @@ -38,7 +40,7 @@ impl PeriodicJob for FriGpuProverArchiver { .await .unwrap() .fri_gpu_prover_queue_dal() - .archive_old_provers(self.archive_prover_after_secs) + .archive_old_provers(Duration::from_secs(self.archive_prover_after_secs)) .await; tracing::info!("Archived {:?} fri gpu prover records", archived_provers); HOUSE_KEEPER_METRICS diff --git a/core/node/house_keeper/src/prover/archiver/fri_prover_jobs_archiver.rs b/core/node/house_keeper/src/prover/archiver/fri_prover_jobs_archiver.rs index 02268c60e5f5..684c955231cf 100644 --- a/core/node/house_keeper/src/prover/archiver/fri_prover_jobs_archiver.rs +++ b/core/node/house_keeper/src/prover/archiver/fri_prover_jobs_archiver.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use zksync_dal::ConnectionPool; use zksync_prover_dal::{Prover, ProverDal}; @@ -38,7 +40,7 @@ impl PeriodicJob for FriProverJobsArchiver { .await .unwrap() .fri_prover_jobs_dal() - .archive_old_jobs(self.archiving_interval_secs) + .archive_old_jobs(Duration::from_secs(self.archiving_interval_secs)) .await; tracing::info!("Archived {:?} fri prover jobs", archived_jobs); HOUSE_KEEPER_METRICS diff --git a/core/node/house_keeper/src/prover/queue_reporter/fri_prover_queue_reporter.rs b/core/node/house_keeper/src/prover/queue_reporter/fri_prover_queue_reporter.rs index f429367c44a1..12dfae86ab46 100644 --- a/core/node/house_keeper/src/prover/queue_reporter/fri_prover_queue_reporter.rs +++ b/core/node/house_keeper/src/prover/queue_reporter/fri_prover_queue_reporter.rs @@ -2,9 +2,9 @@ use async_trait::async_trait; use zksync_config::configs::fri_prover_group::FriProverGroupConfig; use zksync_dal::{ConnectionPool, Core, CoreDal}; use zksync_prover_dal::{Prover, ProverDal}; +use zksync_types::{basic_fri_types::CircuitIdRoundTuple, prover_dal::JobCountStatistics}; use crate::{periodic_job::PeriodicJob, prover::metrics::FRI_PROVER_METRICS}; - /// `FriProverQueueReporter` is a task that periodically reports prover jobs status. /// Note: these values will be used for auto-scaling provers and Witness Vector Generators. #[derive(Debug)] @@ -39,45 +39,42 @@ impl PeriodicJob for FriProverQueueReporter { let mut conn = self.prover_connection_pool.connection().await.unwrap(); let stats = conn.fri_prover_jobs_dal().get_prover_jobs_stats().await; - for (job_identifiers, stats) in &stats { - // BEWARE, HERE BE DRAGONS. - // In database, the `circuit_id` stored is the circuit for which the aggregation is done, - // not the circuit which is running. - // There is a single node level aggregation circuit, which is circuit 2. - // This can aggregate multiple leaf nodes (which may belong to different circuits). - // This reporting is a hacky forced way to use `circuit_id` 2 which will solve auto scalers. - // A proper fix will be later provided to solve this at database level. - let circuit_id = if job_identifiers.aggregation_round == 2 { - 2 - } else { - job_identifiers.circuit_id - }; - - let group_id = self - .config - .get_group_id_for_circuit_id_and_aggregation_round( + for (protocol_semantic_version, circuit_prover_stats) in stats { + for (tuple, stat) in circuit_prover_stats { + let CircuitIdRoundTuple { + circuit_id, + aggregation_round, + } = tuple; + let JobCountStatistics { + queued, + in_progress, + } = stat; + let group_id = self + .config + .get_group_id_for_circuit_id_and_aggregation_round( + circuit_id, + aggregation_round, + ) + .unwrap_or(u8::MAX); + + FRI_PROVER_METRICS.report_prover_jobs( + "queued", circuit_id, - job_identifiers.aggregation_round, - ) - .unwrap_or(u8::MAX); - - FRI_PROVER_METRICS.report_prover_jobs( - "queued", - circuit_id, - job_identifiers.aggregation_round, - group_id, - job_identifiers.get_semantic_protocol_version(), - stats.queued as u64, - ); - - FRI_PROVER_METRICS.report_prover_jobs( - "in_progress", - circuit_id, - job_identifiers.aggregation_round, - group_id, - job_identifiers.get_semantic_protocol_version(), - stats.in_progress as u64, - ); + aggregation_round, + group_id, + protocol_semantic_version, + queued as u64, + ); + + FRI_PROVER_METRICS.report_prover_jobs( + "in_progress", + circuit_id, + aggregation_round, + group_id, + protocol_semantic_version, + in_progress as u64, + ); + } } let lag_by_circuit_type = conn diff --git a/core/node/house_keeper/src/prover/retry_manager/fri_witness_generator_jobs_retry_manager.rs b/core/node/house_keeper/src/prover/retry_manager/fri_witness_generator_jobs_retry_manager.rs index 817d1e290252..b3d990e2754f 100644 --- a/core/node/house_keeper/src/prover/retry_manager/fri_witness_generator_jobs_retry_manager.rs +++ b/core/node/house_keeper/src/prover/retry_manager/fri_witness_generator_jobs_retry_manager.rs @@ -48,7 +48,7 @@ impl FriWitnessGeneratorJobRetryManager { .await .unwrap() .fri_witness_generator_dal() - .requeue_stuck_jobs(self.processing_timeouts.basic(), self.max_attempts) + .requeue_stuck_basic_jobs(self.processing_timeouts.basic(), self.max_attempts) .await; self.emit_telemetry("witness_inputs_fri", &stuck_jobs); } @@ -60,10 +60,7 @@ impl FriWitnessGeneratorJobRetryManager { .await .unwrap() .fri_witness_generator_dal() - .requeue_stuck_leaf_aggregations_jobs( - self.processing_timeouts.leaf(), - self.max_attempts, - ) + .requeue_stuck_leaf_jobs(self.processing_timeouts.leaf(), self.max_attempts) .await; self.emit_telemetry("leaf_aggregations_jobs_fri", &stuck_jobs); } @@ -75,10 +72,7 @@ impl FriWitnessGeneratorJobRetryManager { .await .unwrap() .fri_witness_generator_dal() - .requeue_stuck_node_aggregations_jobs( - self.processing_timeouts.node(), - self.max_attempts, - ) + .requeue_stuck_node_jobs(self.processing_timeouts.node(), self.max_attempts) .await; self.emit_telemetry("node_aggregations_jobs_fri", &stuck_jobs); } diff --git a/etc/env/base/prover_job_monitor.toml b/etc/env/base/prover_job_monitor.toml new file mode 100644 index 000000000000..40cdf76b8b10 --- /dev/null +++ b/etc/env/base/prover_job_monitor.toml @@ -0,0 +1,15 @@ +[prover_job_monitor] +prometheus_port = 3317 +max_db_connections = 9 +graceful_shutdown_timeout_ms = 5000 +gpu_prover_archiver_run_interval_ms = 86400000 +gpu_prover_archiver_archive_prover_after_ms = 172800000 +prover_jobs_archiver_run_interval_ms = 1800000 +prover_jobs_archiver_archive_jobs_after_ms = 172800000 +proof_compressor_job_requeuer_run_interval_ms = 10000 +prover_job_requeuer_run_interval_ms = 10000 +witness_generator_job_requeuer_run_interval_ms = 10000 +proof_compressor_queue_reporter_run_interval_ms = 10000 +prover_queue_reporter_run_interval_ms = 10000 +witness_generator_queue_reporter_run_interval_ms = 10000 +witness_job_queuer_run_interval_ms = 10000 diff --git a/etc/env/file_based/general.yaml b/etc/env/file_based/general.yaml index 670bfc1cc776..90a509638c61 100644 --- a/etc/env/file_based/general.yaml +++ b/etc/env/file_based/general.yaml @@ -272,6 +272,23 @@ prover_group: aggregation_round: 1 - circuit_id: 18 aggregation_round: 1 +prover_job_monitor: + prometheus_port: 3317 + max_db_connections: 9 + graceful_shutdown_timeout_ms: 5000 + gpu_prover_archiver_run_interval_ms: 86400000 + gpu_prover_archiver_archive_prover_after_ms: 172800000 + prover_jobs_archiver_run_interval_ms: 1800000 + prover_jobs_archiver_archive_jobs_after_ms: 172800000 + proof_compressor_job_requeuer_run_interval_ms: 10000 + prover_job_requeuer_run_interval_ms: 10000 + witness_generator_job_requeuer_run_interval_ms: 10000 + proof_compressor_queue_reporter_run_interval_ms: 10000 + prover_queue_reporter_run_interval_ms: 10000 + witness_generator_queue_reporter_run_interval_ms: 10000 + witness_job_queuer_run_interval_ms: 10000 + + base_token_adjuster: price_polling_interval_ms: 30000 price_cache_update_interval_ms: 2000 diff --git a/prover/Cargo.lock b/prover/Cargo.lock index e48dc075b2f5..8268b121847c 100644 --- a/prover/Cargo.lock +++ b/prover/Cargo.lock @@ -8221,6 +8221,25 @@ dependencies = [ "zksync_types", ] +[[package]] +name = "zksync_prover_job_monitor" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "clap 4.5.4", + "ctrlc", + "tokio", + "tracing", + "vise", + "zksync_config", + "zksync_core_leftovers", + "zksync_prover_dal", + "zksync_types", + "zksync_utils", + "zksync_vlog", +] + [[package]] name = "zksync_queued_job_processor" version = "0.1.0" diff --git a/prover/Cargo.toml b/prover/Cargo.toml index 4ce858332502..9a1a50a2ddb5 100644 --- a/prover/Cargo.toml +++ b/prover/Cargo.toml @@ -81,6 +81,7 @@ zksync_utils = { path = "../core/lib/utils" } zksync_eth_client = { path = "../core/lib/eth_client" } zksync_contracts = { path = "../core/lib/contracts" } zksync_core_leftovers = { path = "../core/lib/zksync_core_leftovers" } +zksync_periodic_job = { path = "../core/lib/periodic_job" } # Prover workspace dependencies zksync_prover_dal = { path = "crates/lib/prover_dal" } diff --git a/prover/crates/bin/prover_job_monitor/Cargo.toml b/prover/crates/bin/prover_job_monitor/Cargo.toml new file mode 100644 index 000000000000..160d3a603e36 --- /dev/null +++ b/prover/crates/bin/prover_job_monitor/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "zksync_prover_job_monitor" +version.workspace = true +edition.workspace = true +authors.workspace = true +homepage.workspace = true +repository.workspace = true +license.workspace = true +keywords.workspace = true +categories.workspace = true + +[dependencies] +zksync_core_leftovers.workspace = true +zksync_vlog.workspace = true +zksync_prover_dal.workspace = true +zksync_utils.workspace = true +zksync_types.workspace = true +zksync_config = { workspace = true, features = ["observability_ext"] } + +vise.workspace = true + +tokio = { workspace = true, features = ["time", "macros"] } +anyhow.workspace = true +clap = { workspace = true, features = ["derive"] } +ctrlc = { workspace = true, features = ["termination"] } +tracing.workspace = true +async-trait.workspace = true diff --git a/prover/crates/bin/prover_job_monitor/src/archiver/gpu_prover_archiver.rs b/prover/crates/bin/prover_job_monitor/src/archiver/gpu_prover_archiver.rs new file mode 100644 index 000000000000..cebec06218df --- /dev/null +++ b/prover/crates/bin/prover_job_monitor/src/archiver/gpu_prover_archiver.rs @@ -0,0 +1,39 @@ +use std::time::Duration; + +use zksync_prover_dal::{Connection, Prover, ProverDal}; + +use crate::{metrics::PROVER_JOB_MONITOR_METRICS, task_wiring::Task}; + +/// `GpuProverArchiver` is a task that archives old fri GPU provers. +/// The task will archive the `dead` prover records that have not been updated for a certain amount of time. +/// Note: This component speeds up provers, in their absence, queries would slow down due to state growth. +#[derive(Debug)] +pub struct GpuProverArchiver { + /// duration after which a prover can be archived + archive_prover_after: Duration, +} + +impl GpuProverArchiver { + pub fn new(archive_prover_after: Duration) -> Self { + Self { + archive_prover_after, + } + } +} + +#[async_trait::async_trait] +impl Task for GpuProverArchiver { + async fn invoke(&self, connection: &mut Connection) -> anyhow::Result<()> { + let archived_provers = connection + .fri_gpu_prover_queue_dal() + .archive_old_provers(self.archive_prover_after) + .await; + if archived_provers > 0 { + tracing::info!("Archived {:?} gpu provers", archived_provers); + } + PROVER_JOB_MONITOR_METRICS + .archived_gpu_provers + .inc_by(archived_provers as u64); + Ok(()) + } +} diff --git a/prover/crates/bin/prover_job_monitor/src/archiver/mod.rs b/prover/crates/bin/prover_job_monitor/src/archiver/mod.rs new file mode 100644 index 000000000000..7e33e2165969 --- /dev/null +++ b/prover/crates/bin/prover_job_monitor/src/archiver/mod.rs @@ -0,0 +1,5 @@ +pub use gpu_prover_archiver::GpuProverArchiver; +pub use prover_jobs_archiver::ProverJobsArchiver; + +mod gpu_prover_archiver; +mod prover_jobs_archiver; diff --git a/prover/crates/bin/prover_job_monitor/src/archiver/prover_jobs_archiver.rs b/prover/crates/bin/prover_job_monitor/src/archiver/prover_jobs_archiver.rs new file mode 100644 index 000000000000..41e6d6cf4e44 --- /dev/null +++ b/prover/crates/bin/prover_job_monitor/src/archiver/prover_jobs_archiver.rs @@ -0,0 +1,37 @@ +use std::time::Duration; + +use zksync_prover_dal::{Connection, Prover, ProverDal}; + +use crate::{metrics::PROVER_JOB_MONITOR_METRICS, task_wiring::Task}; + +/// `ProverJobsArchiver` is a task that archives old finalized prover job. +/// The task will archive the `successful` prover jobs that have been done for a certain amount of time. +/// Note: This component speeds up provers, in their absence, queries would slow down due to state growth. +#[derive(Debug)] +pub struct ProverJobsArchiver { + /// duration after which a prover job can be archived + archive_jobs_after: Duration, +} + +impl ProverJobsArchiver { + pub fn new(archive_jobs_after: Duration) -> Self { + Self { archive_jobs_after } + } +} + +#[async_trait::async_trait] +impl Task for ProverJobsArchiver { + async fn invoke(&self, connection: &mut Connection) -> anyhow::Result<()> { + let archived_jobs = connection + .fri_prover_jobs_dal() + .archive_old_jobs(self.archive_jobs_after) + .await; + if archived_jobs > 0 { + tracing::info!("Archived {:?} prover jobs", archived_jobs); + } + PROVER_JOB_MONITOR_METRICS + .archived_prover_jobs + .inc_by(archived_jobs as u64); + Ok(()) + } +} diff --git a/prover/crates/bin/prover_job_monitor/src/job_requeuer/mod.rs b/prover/crates/bin/prover_job_monitor/src/job_requeuer/mod.rs new file mode 100644 index 000000000000..5130849b7fee --- /dev/null +++ b/prover/crates/bin/prover_job_monitor/src/job_requeuer/mod.rs @@ -0,0 +1,7 @@ +pub use proof_compressor_job_requeuer::ProofCompressorJobRequeuer; +pub use prover_job_requeuer::ProverJobRequeuer; +pub use witness_generator_job_requeuer::WitnessGeneratorJobRequeuer; + +mod proof_compressor_job_requeuer; +mod prover_job_requeuer; +mod witness_generator_job_requeuer; diff --git a/prover/crates/bin/prover_job_monitor/src/job_requeuer/proof_compressor_job_requeuer.rs b/prover/crates/bin/prover_job_monitor/src/job_requeuer/proof_compressor_job_requeuer.rs new file mode 100644 index 000000000000..baeba3ce369c --- /dev/null +++ b/prover/crates/bin/prover_job_monitor/src/job_requeuer/proof_compressor_job_requeuer.rs @@ -0,0 +1,42 @@ +use std::time::Duration; + +use async_trait::async_trait; +use zksync_prover_dal::{Connection, Prover, ProverDal}; + +use crate::{metrics::PROVER_JOB_MONITOR_METRICS, task_wiring::Task}; + +/// `ProofCompressorJobRequeuer` is a task that requeues compressor jobs that have not made progress in a given unit of time. +#[derive(Debug)] +pub struct ProofCompressorJobRequeuer { + /// max attempts before giving up on the job + max_attempts: u32, + /// the amount of time that must have passed before a job is considered to have not made progress + processing_timeout: Duration, +} + +impl ProofCompressorJobRequeuer { + pub fn new(max_attempts: u32, processing_timeout: Duration) -> Self { + Self { + max_attempts, + processing_timeout, + } + } +} + +#[async_trait] +impl Task for ProofCompressorJobRequeuer { + async fn invoke(&self, connection: &mut Connection) -> anyhow::Result<()> { + let stuck_jobs = connection + .fri_proof_compressor_dal() + .requeue_stuck_jobs(self.processing_timeout, self.max_attempts) + .await; + let job_len = stuck_jobs.len(); + for stuck_job in stuck_jobs { + tracing::info!("requeued proof compressor job {:?}", stuck_job); + } + PROVER_JOB_MONITOR_METRICS + .requeued_proof_compressor_jobs + .inc_by(job_len as u64); + Ok(()) + } +} diff --git a/prover/crates/bin/prover_job_monitor/src/job_requeuer/prover_job_requeuer.rs b/prover/crates/bin/prover_job_monitor/src/job_requeuer/prover_job_requeuer.rs new file mode 100644 index 000000000000..7f5e97203d69 --- /dev/null +++ b/prover/crates/bin/prover_job_monitor/src/job_requeuer/prover_job_requeuer.rs @@ -0,0 +1,42 @@ +use std::time::Duration; + +use async_trait::async_trait; +use zksync_prover_dal::{Connection, Prover, ProverDal}; + +use crate::{metrics::PROVER_JOB_MONITOR_METRICS, task_wiring::Task}; + +/// `ProverJobRequeuer` is a task that requeues prover jobs that have not made progress in a given unit of time. +#[derive(Debug)] +pub struct ProverJobRequeuer { + /// max attempts before giving up on the job + max_attempts: u32, + /// the amount of time that must have passed before a job is considered to have not made progress + processing_timeout: Duration, +} + +impl ProverJobRequeuer { + pub fn new(max_attempts: u32, processing_timeout: Duration) -> Self { + Self { + max_attempts, + processing_timeout, + } + } +} + +#[async_trait] +impl Task for ProverJobRequeuer { + async fn invoke(&self, connection: &mut Connection) -> anyhow::Result<()> { + let stuck_jobs = connection + .fri_prover_jobs_dal() + .requeue_stuck_jobs(self.processing_timeout, self.max_attempts) + .await; + let job_len = stuck_jobs.len(); + for stuck_job in stuck_jobs { + tracing::info!("requeued circuit prover job {:?}", stuck_job); + } + PROVER_JOB_MONITOR_METRICS + .requeued_circuit_prover_jobs + .inc_by(job_len as u64); + Ok(()) + } +} diff --git a/prover/crates/bin/prover_job_monitor/src/job_requeuer/witness_generator_job_requeuer.rs b/prover/crates/bin/prover_job_monitor/src/job_requeuer/witness_generator_job_requeuer.rs new file mode 100644 index 000000000000..e7d89f7d25d4 --- /dev/null +++ b/prover/crates/bin/prover_job_monitor/src/job_requeuer/witness_generator_job_requeuer.rs @@ -0,0 +1,90 @@ +use async_trait::async_trait; +use zksync_config::configs::fri_witness_generator::WitnessGenerationTimeouts; +use zksync_prover_dal::{Connection, Prover, ProverDal}; +use zksync_types::prover_dal::StuckJobs; + +use crate::{ + metrics::{WitnessType, PROVER_JOB_MONITOR_METRICS}, + task_wiring::Task, +}; + +/// `WitnessGeneratorJobRequeuer` s a task that requeues witness generator jobs that have not made progress in a given unit of time. +#[derive(Debug)] +pub struct WitnessGeneratorJobRequeuer { + /// max attempts before giving up on the job + max_attempts: u32, + /// the amount of time that must have passed before a job is considered to have not made progress + processing_timeouts: WitnessGenerationTimeouts, +} + +impl WitnessGeneratorJobRequeuer { + pub fn new(max_attempts: u32, processing_timeouts: WitnessGenerationTimeouts) -> Self { + Self { + max_attempts, + processing_timeouts, + } + } + + fn emit_telemetry(&self, witness_type: WitnessType, stuck_jobs: &Vec) { + for stuck_job in stuck_jobs { + tracing::info!("requeued {:?} {:?}", witness_type, stuck_job); + } + PROVER_JOB_MONITOR_METRICS.requeued_witness_generator_jobs[&witness_type] + .inc_by(stuck_jobs.len() as u64); + } + + async fn requeue_stuck_basic_jobs(&self, connection: &mut Connection<'_, Prover>) { + let stuck_jobs = connection + .fri_witness_generator_dal() + .requeue_stuck_basic_jobs(self.processing_timeouts.basic(), self.max_attempts) + .await; + self.emit_telemetry(WitnessType::BasicWitnessGenerator, &stuck_jobs); + } + + async fn requeue_stuck_leaf_jobs(&self, connection: &mut Connection<'_, Prover>) { + let stuck_jobs = connection + .fri_witness_generator_dal() + .requeue_stuck_leaf_jobs(self.processing_timeouts.leaf(), self.max_attempts) + .await; + self.emit_telemetry(WitnessType::LeafWitnessGenerator, &stuck_jobs); + } + + async fn requeue_stuck_node_jobs(&self, connection: &mut Connection<'_, Prover>) { + let stuck_jobs = connection + .fri_witness_generator_dal() + .requeue_stuck_node_jobs(self.processing_timeouts.node(), self.max_attempts) + .await; + self.emit_telemetry(WitnessType::NodeWitnessGenerator, &stuck_jobs); + } + + async fn requeue_stuck_recursion_tip_jobs(&self, connection: &mut Connection<'_, Prover>) { + let stuck_jobs = connection + .fri_witness_generator_dal() + .requeue_stuck_recursion_tip_jobs( + self.processing_timeouts.recursion_tip(), + self.max_attempts, + ) + .await; + self.emit_telemetry(WitnessType::RecursionTipWitnessGenerator, &stuck_jobs); + } + + async fn requeue_stuck_scheduler_jobs(&self, connection: &mut Connection<'_, Prover>) { + let stuck_jobs = connection + .fri_witness_generator_dal() + .requeue_stuck_scheduler_jobs(self.processing_timeouts.scheduler(), self.max_attempts) + .await; + self.emit_telemetry(WitnessType::SchedulerWitnessGenerator, &stuck_jobs); + } +} + +#[async_trait] +impl Task for WitnessGeneratorJobRequeuer { + async fn invoke(&self, connection: &mut Connection) -> anyhow::Result<()> { + self.requeue_stuck_basic_jobs(connection).await; + self.requeue_stuck_leaf_jobs(connection).await; + self.requeue_stuck_node_jobs(connection).await; + self.requeue_stuck_recursion_tip_jobs(connection).await; + self.requeue_stuck_scheduler_jobs(connection).await; + Ok(()) + } +} diff --git a/prover/crates/bin/prover_job_monitor/src/lib.rs b/prover/crates/bin/prover_job_monitor/src/lib.rs new file mode 100644 index 000000000000..60d8be297cfe --- /dev/null +++ b/prover/crates/bin/prover_job_monitor/src/lib.rs @@ -0,0 +1,6 @@ +pub mod archiver; +pub mod job_requeuer; +pub(crate) mod metrics; +pub mod queue_reporter; +pub mod task_wiring; +pub mod witness_job_queuer; diff --git a/prover/crates/bin/prover_job_monitor/src/main.rs b/prover/crates/bin/prover_job_monitor/src/main.rs new file mode 100644 index 000000000000..e585c06ad779 --- /dev/null +++ b/prover/crates/bin/prover_job_monitor/src/main.rs @@ -0,0 +1,201 @@ +use anyhow::Context as _; +use clap::Parser; +use tokio::{ + sync::{oneshot, watch}, + task::JoinHandle, +}; +use zksync_config::configs::{ + fri_prover_group::FriProverGroupConfig, FriProofCompressorConfig, FriProverConfig, + FriWitnessGeneratorConfig, ProverJobMonitorConfig, +}; +use zksync_core_leftovers::temp_config_store::{load_database_secrets, load_general_config}; +use zksync_prover_dal::{ConnectionPool, Prover}; +use zksync_prover_job_monitor::{ + archiver::{GpuProverArchiver, ProverJobsArchiver}, + job_requeuer::{ProofCompressorJobRequeuer, ProverJobRequeuer, WitnessGeneratorJobRequeuer}, + queue_reporter::{ + ProofCompressorQueueReporter, ProverQueueReporter, WitnessGeneratorQueueReporter, + }, + task_wiring::TaskRunner, + witness_job_queuer::WitnessJobQueuer, +}; +use zksync_utils::wait_for_tasks::ManagedTasks; +use zksync_vlog::prometheus::PrometheusExporterConfig; + +#[derive(Debug, Parser)] +#[command(author = "Matter Labs", version)] +pub(crate) struct CliOpts { + #[arg(long)] + pub(crate) config_path: Option, + #[arg(long)] + pub(crate) secrets_path: Option, +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let opt = CliOpts::parse(); + + let general_config = load_general_config(opt.config_path).context("general config")?; + + println!("general_config = {general_config:?}"); + let database_secrets = load_database_secrets(opt.secrets_path).context("database secrets")?; + + let observability_config = general_config + .observability + .context("observability config")?; + let _observability_guard = observability_config.install()?; + + let prover_job_monitor_config = general_config + .prover_job_monitor_config + .context("prover_job_monitor_config")?; + let proof_compressor_config = general_config + .proof_compressor_config + .context("proof_compressor_config")?; + let prover_config = general_config.prover_config.context("prover_config")?; + let witness_generator_config = general_config + .witness_generator_config + .context("witness_generator_config")?; + let prover_group_config = general_config + .prover_group_config + .context("fri_prover_group_config")?; + let exporter_config = PrometheusExporterConfig::pull(prover_job_monitor_config.prometheus_port); + + let (stop_signal_sender, stop_signal_receiver) = oneshot::channel(); + let mut stop_signal_sender = Some(stop_signal_sender); + ctrlc::set_handler(move || { + if let Some(sender) = stop_signal_sender.take() { + sender.send(()).ok(); + } + }) + .context("Error setting Ctrl+C handler")?; + + let (stop_sender, stop_receiver) = watch::channel(false); + + tracing::info!("Starting ProverJobMonitoring"); + + let connection_pool = ConnectionPool::::builder( + database_secrets.prover_url()?, + prover_job_monitor_config.max_db_connections, + ) + .build() + .await + .context("failed to build a connection pool")?; + + let graceful_shutdown_timeout = prover_job_monitor_config.graceful_shutdown_timeout(); + + let mut tasks = vec![tokio::spawn(exporter_config.run(stop_receiver.clone()))]; + + tasks.extend(get_tasks( + connection_pool, + prover_job_monitor_config, + proof_compressor_config, + prover_config, + witness_generator_config, + prover_group_config, + stop_receiver, + )?); + let mut tasks = ManagedTasks::new(tasks); + + tokio::select! { + _ = tasks.wait_single() => {}, + _ = stop_signal_receiver => { + tracing::info!("Stop signal received, shutting down"); + } + } + stop_sender.send(true).ok(); + tasks.complete(graceful_shutdown_timeout).await; + + Ok(()) +} + +fn get_tasks( + connection_pool: ConnectionPool, + prover_job_monitor_config: ProverJobMonitorConfig, + proof_compressor_config: FriProofCompressorConfig, + prover_config: FriProverConfig, + witness_generator_config: FriWitnessGeneratorConfig, + prover_group_config: FriProverGroupConfig, + stop_receiver: watch::Receiver, +) -> anyhow::Result>>> { + let mut task_runner = TaskRunner::new(connection_pool); + + // archivers + let gpu_prover_archiver = + GpuProverArchiver::new(prover_job_monitor_config.archive_gpu_prover_duration()); + task_runner.add( + "GpuProverArchiver", + prover_job_monitor_config.gpu_prover_archiver_run_interval(), + gpu_prover_archiver, + ); + + let prover_jobs_archiver = + ProverJobsArchiver::new(prover_job_monitor_config.archive_prover_jobs_duration()); + task_runner.add( + "ProverJobsArchiver", + prover_job_monitor_config.prover_jobs_archiver_run_interval(), + prover_jobs_archiver, + ); + + // job requeuers + let proof_compressor_job_requeuer = ProofCompressorJobRequeuer::new( + proof_compressor_config.max_attempts, + proof_compressor_config.generation_timeout(), + ); + task_runner.add( + "ProofCompressorJobRequeuer", + prover_job_monitor_config.proof_compressor_job_requeuer_run_interval(), + proof_compressor_job_requeuer, + ); + + let prover_job_requeuer = ProverJobRequeuer::new( + prover_config.max_attempts, + prover_config.proof_generation_timeout(), + ); + task_runner.add( + "ProverJobRequeuer", + prover_job_monitor_config.prover_job_requeuer_run_interval(), + prover_job_requeuer, + ); + + let witness_generator_job_requeuer = WitnessGeneratorJobRequeuer::new( + witness_generator_config.max_attempts, + witness_generator_config.witness_generation_timeouts(), + ); + task_runner.add( + "WitnessGeneratorJobRequeuer", + prover_job_monitor_config.witness_generator_job_requeuer_run_interval(), + witness_generator_job_requeuer, + ); + + // queue reporters + let proof_compressor_queue_reporter = ProofCompressorQueueReporter {}; + task_runner.add( + "ProofCompressorQueueReporter", + prover_job_monitor_config.proof_compressor_queue_reporter_run_interval(), + proof_compressor_queue_reporter, + ); + + let prover_queue_reporter = ProverQueueReporter::new(prover_group_config); + task_runner.add( + "ProverQueueReporter", + prover_job_monitor_config.prover_queue_reporter_run_interval(), + prover_queue_reporter, + ); + + let witness_generator_queue_reporter = WitnessGeneratorQueueReporter {}; + task_runner.add( + "WitnessGeneratorQueueReporter", + prover_job_monitor_config.witness_generator_queue_reporter_run_interval(), + witness_generator_queue_reporter, + ); + + // witness job queuer + let witness_job_queuer = WitnessJobQueuer {}; + task_runner.add( + "WitnessJobQueuer", + prover_job_monitor_config.witness_job_queuer_run_interval(), + witness_job_queuer, + ); + + Ok(task_runner.spawn(stop_receiver)) +} diff --git a/prover/crates/bin/prover_job_monitor/src/metrics.rs b/prover/crates/bin/prover_job_monitor/src/metrics.rs new file mode 100644 index 000000000000..fa5e22111ae4 --- /dev/null +++ b/prover/crates/bin/prover_job_monitor/src/metrics.rs @@ -0,0 +1,98 @@ +use vise::{Counter, EncodeLabelSet, EncodeLabelValue, Family, Gauge, LabeledFamily, Metrics}; +use zksync_types::protocol_version::ProtocolSemanticVersion; + +#[derive(Debug, Metrics)] +#[metrics(prefix = "prover_job_monitor")] +pub(crate) struct ProverJobMonitorMetrics { + // archivers + /// number of dead GPU provers archived + pub archived_gpu_provers: Counter, + /// number of finished prover job archived + pub archived_prover_jobs: Counter, + + // job requeuers + /// number of proof compressor jobs that have been requeued for execution + pub requeued_proof_compressor_jobs: Counter, + /// number of circuit prover jobs that have been requeued for execution + pub requeued_circuit_prover_jobs: Counter, + /// number of witness generator jobs that have been requeued for execution + pub requeued_witness_generator_jobs: Family>, + + // queues reporters + /// number of proof compressor jobs that are queued/in_progress per protocol version + #[metrics(labels = ["type", "protocol_version"])] + pub proof_compressor_jobs: LabeledFamily<(JobStatus, String), Gauge, 2>, + /// the oldest batch that has not been compressed yet + pub oldest_uncompressed_batch: Gauge, + /// number of prover jobs per circuit, per round, per protocol version, per status + /// Sets a specific value for a struct as follows: + /// { + /// status: Queued, + /// circuit_id: 1, + /// round: 0, + /// group_id: + /// protocol_version: 0.24.2, + /// } + pub prover_jobs: Family>, + /// the oldest batch that has not been proven yet, per circuit id and aggregation round + #[metrics(labels = ["circuit_id", "aggregation_round"])] + pub oldest_unprocessed_batch: LabeledFamily<(String, String), Gauge, 2>, + /// number of witness generator jobs per "round" + #[metrics(labels = ["type", "round", "protocol_version"])] + pub witness_generator_jobs_by_round: LabeledFamily<(JobStatus, String, String), Gauge, 3>, + + // witness job queuer + /// number of jobs queued per type of witness generator + pub queued_witness_generator_jobs: Family>, +} + +impl ProverJobMonitorMetrics { + pub fn report_prover_jobs( + &self, + status: JobStatus, + circuit_id: u8, + round: u8, + group_id: u8, + protocol_version: ProtocolSemanticVersion, + amount: u64, + ) { + self.prover_jobs[&ProverJobsLabels { + status, + circuit_id: circuit_id.to_string(), + round: round.to_string(), + group_id: group_id.to_string(), + protocol_version: protocol_version.to_string(), + }] + .set(amount); + } +} +#[vise::register] +pub(crate) static PROVER_JOB_MONITOR_METRICS: vise::Global = + vise::Global::new(); + +#[derive(Debug, Clone, PartialEq, Eq, Hash, EncodeLabelSet)] +pub(crate) struct ProverJobsLabels { + pub status: JobStatus, + pub circuit_id: String, + pub round: String, + pub group_id: String, + pub protocol_version: String, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue, EncodeLabelSet)] +#[metrics(label = "type", rename_all = "snake_case")] +#[allow(clippy::enum_variant_names)] +pub(crate) enum WitnessType { + BasicWitnessGenerator, + LeafWitnessGenerator, + NodeWitnessGenerator, + RecursionTipWitnessGenerator, + SchedulerWitnessGenerator, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue)] +#[metrics(rename_all = "snake_case")] +pub enum JobStatus { + Queued, + InProgress, +} diff --git a/prover/crates/bin/prover_job_monitor/src/queue_reporter/mod.rs b/prover/crates/bin/prover_job_monitor/src/queue_reporter/mod.rs new file mode 100644 index 000000000000..f325f1fcba7a --- /dev/null +++ b/prover/crates/bin/prover_job_monitor/src/queue_reporter/mod.rs @@ -0,0 +1,7 @@ +pub use proof_compressor_queue_reporter::ProofCompressorQueueReporter; +pub use prover_queue_reporter::ProverQueueReporter; +pub use witness_generator_queue_reporter::WitnessGeneratorQueueReporter; + +mod proof_compressor_queue_reporter; +mod prover_queue_reporter; +mod witness_generator_queue_reporter; diff --git a/prover/crates/bin/prover_job_monitor/src/queue_reporter/proof_compressor_queue_reporter.rs b/prover/crates/bin/prover_job_monitor/src/queue_reporter/proof_compressor_queue_reporter.rs new file mode 100644 index 000000000000..f31af8e247aa --- /dev/null +++ b/prover/crates/bin/prover_job_monitor/src/queue_reporter/proof_compressor_queue_reporter.rs @@ -0,0 +1,68 @@ +use std::collections::HashMap; + +use async_trait::async_trait; +use zksync_prover_dal::{Connection, Prover, ProverDal}; +use zksync_types::{protocol_version::ProtocolSemanticVersion, prover_dal::JobCountStatistics}; + +use crate::{ + metrics::{JobStatus, PROVER_JOB_MONITOR_METRICS}, + task_wiring::Task, +}; + +/// `ProofCompressorQueueReporter` is a task that reports compression jobs status. +/// Note: these values will be used for auto-scaling proof compressor. +#[derive(Debug)] +pub struct ProofCompressorQueueReporter {} + +impl ProofCompressorQueueReporter { + async fn get_job_statistics( + connection: &mut Connection<'_, Prover>, + ) -> HashMap { + connection.fri_proof_compressor_dal().get_jobs_stats().await + } +} + +#[async_trait] +impl Task for ProofCompressorQueueReporter { + async fn invoke(&self, connection: &mut Connection) -> anyhow::Result<()> { + let stats = Self::get_job_statistics(connection).await; + + for (protocol_version, stats) in &stats { + if stats.queued > 0 { + tracing::info!( + "Found {} queued proof compressor jobs for protocol version {}.", + stats.queued, + protocol_version + ); + } + if stats.in_progress > 0 { + tracing::info!( + "Found {} in progress proof compressor jobs for protocol version {}.", + stats.in_progress, + protocol_version + ); + } + + PROVER_JOB_MONITOR_METRICS.proof_compressor_jobs + [&(JobStatus::Queued, protocol_version.to_string())] + .set(stats.queued as u64); + + PROVER_JOB_MONITOR_METRICS.proof_compressor_jobs + [&(JobStatus::InProgress, protocol_version.to_string())] + .set(stats.in_progress as u64); + } + + let oldest_not_compressed_batch = connection + .fri_proof_compressor_dal() + .get_oldest_not_compressed_batch() + .await; + + if let Some(l1_batch_number) = oldest_not_compressed_batch { + PROVER_JOB_MONITOR_METRICS + .oldest_uncompressed_batch + .set(l1_batch_number.0 as u64); + } + + Ok(()) + } +} diff --git a/prover/crates/bin/prover_job_monitor/src/queue_reporter/prover_queue_reporter.rs b/prover/crates/bin/prover_job_monitor/src/queue_reporter/prover_queue_reporter.rs new file mode 100644 index 000000000000..365000acb59b --- /dev/null +++ b/prover/crates/bin/prover_job_monitor/src/queue_reporter/prover_queue_reporter.rs @@ -0,0 +1,83 @@ +use async_trait::async_trait; +use zksync_config::configs::fri_prover_group::FriProverGroupConfig; +use zksync_prover_dal::{Connection, Prover, ProverDal}; +use zksync_types::{basic_fri_types::CircuitIdRoundTuple, prover_dal::JobCountStatistics}; + +use crate::{ + metrics::{JobStatus, PROVER_JOB_MONITOR_METRICS}, + task_wiring::Task, +}; + +/// `ProverQueueReporter` is a task that reports prover jobs status. +/// Note: these values will be used for auto-scaling provers and Witness Vector Generators. +#[derive(Debug)] +pub struct ProverQueueReporter { + config: FriProverGroupConfig, +} + +impl ProverQueueReporter { + pub fn new(config: FriProverGroupConfig) -> Self { + Self { config } + } +} + +#[async_trait] +impl Task for ProverQueueReporter { + async fn invoke(&self, connection: &mut Connection) -> anyhow::Result<()> { + let stats = connection + .fri_prover_jobs_dal() + .get_prover_jobs_stats() + .await; + + for (protocol_semantic_version, circuit_prover_stats) in stats { + for (tuple, stat) in circuit_prover_stats { + let CircuitIdRoundTuple { + circuit_id, + aggregation_round, + } = tuple; + let JobCountStatistics { + queued, + in_progress, + } = stat; + let group_id = self + .config + .get_group_id_for_circuit_id_and_aggregation_round( + circuit_id, + aggregation_round, + ) + .unwrap_or(u8::MAX); + + PROVER_JOB_MONITOR_METRICS.report_prover_jobs( + JobStatus::Queued, + circuit_id, + aggregation_round, + group_id, + protocol_semantic_version, + queued as u64, + ); + + PROVER_JOB_MONITOR_METRICS.report_prover_jobs( + JobStatus::InProgress, + circuit_id, + aggregation_round, + group_id, + protocol_semantic_version, + in_progress as u64, + ) + } + } + + let lag_by_circuit_type = connection + .fri_prover_jobs_dal() + .min_unproved_l1_batch_number() + .await; + + for ((circuit_id, aggregation_round), l1_batch_number) in lag_by_circuit_type { + PROVER_JOB_MONITOR_METRICS.oldest_unprocessed_batch + [&(circuit_id.to_string(), aggregation_round.to_string())] + .set(l1_batch_number.0 as u64); + } + + Ok(()) + } +} diff --git a/prover/crates/bin/prover_job_monitor/src/queue_reporter/witness_generator_queue_reporter.rs b/prover/crates/bin/prover_job_monitor/src/queue_reporter/witness_generator_queue_reporter.rs new file mode 100644 index 000000000000..0d222f129d33 --- /dev/null +++ b/prover/crates/bin/prover_job_monitor/src/queue_reporter/witness_generator_queue_reporter.rs @@ -0,0 +1,71 @@ +use async_trait::async_trait; +use zksync_prover_dal::{Connection, Prover, ProverDal}; +use zksync_types::{ + basic_fri_types::AggregationRound, protocol_version::ProtocolSemanticVersion, + prover_dal::JobCountStatistics, +}; + +use crate::{ + metrics::{JobStatus, PROVER_JOB_MONITOR_METRICS}, + task_wiring::Task, +}; + +/// `WitnessGeneratorQueueReporter` is a task that reports witness generator jobs status. +/// Note: these values will be used for auto-scaling witness generators (Basic, Leaf, Node, Recursion Tip and Scheduler). +#[derive(Debug)] +pub struct WitnessGeneratorQueueReporter; + +impl WitnessGeneratorQueueReporter { + fn emit_metrics_for_round( + round: AggregationRound, + protocol_version: ProtocolSemanticVersion, + stats: &JobCountStatistics, + ) { + if stats.queued > 0 { + tracing::info!( + "Found {} queued {} witness generator jobs for protocol version {}.", + stats.queued, + round, + protocol_version + ); + } + if stats.in_progress > 0 { + tracing::info!( + "Found {} in progress {} witness generator jobs for protocol version {}.", + stats.in_progress, + round, + protocol_version + ); + } + + PROVER_JOB_MONITOR_METRICS.witness_generator_jobs_by_round[&( + JobStatus::Queued, + round.to_string(), + protocol_version.to_string(), + )] + .set(stats.queued as u64); + PROVER_JOB_MONITOR_METRICS.witness_generator_jobs_by_round[&( + JobStatus::InProgress, + round.to_string(), + protocol_version.to_string(), + )] + .set(stats.in_progress as u64); + } +} + +#[async_trait] +impl Task for WitnessGeneratorQueueReporter { + async fn invoke(&self, connection: &mut Connection) -> anyhow::Result<()> { + for round in AggregationRound::ALL_ROUNDS { + let stats = connection + .fri_witness_generator_dal() + .get_witness_jobs_stats(round) + .await; + for ((round, semantic_protocol_version), job_stats) in stats { + Self::emit_metrics_for_round(round, semantic_protocol_version, &job_stats); + } + } + + Ok(()) + } +} diff --git a/prover/crates/bin/prover_job_monitor/src/task_wiring.rs b/prover/crates/bin/prover_job_monitor/src/task_wiring.rs new file mode 100644 index 000000000000..d6539141b1db --- /dev/null +++ b/prover/crates/bin/prover_job_monitor/src/task_wiring.rs @@ -0,0 +1,86 @@ +use std::time::Duration; + +use anyhow::Context; +use tracing::Instrument; +use zksync_prover_dal::{Connection, ConnectionPool, Prover}; + +/// Task trait to be run in ProverJobMonitor. +#[async_trait::async_trait] +pub trait Task { + async fn invoke(&self, connection: &mut Connection) -> anyhow::Result<()>; +} + +/// Wrapper for Task with a periodic interface. Holds information about the task and provides DB connectivity. +struct PeriodicTask { + job: Box, + name: String, + interval: Duration, +} + +impl PeriodicTask { + async fn run( + &self, + mut stop_receiver: tokio::sync::watch::Receiver, + connection_pool: ConnectionPool, + ) -> anyhow::Result<()> { + tracing::info!( + "Started Task {} with run interval: {:?}", + self.name, + self.interval + ); + + let mut interval = tokio::time::interval(self.interval); + + while !*stop_receiver.borrow_and_update() { + interval.tick().await; + let mut connection = connection_pool + .connection() + .await + .context("failed to get database connection")?; + self.job + .invoke(&mut connection) + .instrument(tracing::info_span!("run", service_name = %self.name)) + .await + .context("failed to invoke task")?; + } + tracing::info!("Stop signal received; Task {} is shut down", self.name); + Ok(()) + } +} + +/// Wrapper on a vector of task. Makes adding/spawning tasks and sharing resources ergonomic. +pub struct TaskRunner { + pool: ConnectionPool, + tasks: Vec, +} + +impl TaskRunner { + pub fn new(pool: ConnectionPool) -> Self { + Self { + pool, + tasks: Vec::new(), + } + } + + pub fn add(&mut self, name: &str, interval: Duration, job: T) { + self.tasks.push(PeriodicTask { + name: name.into(), + interval, + job: Box::new(job), + }); + } + + pub fn spawn( + self, + stop_receiver: tokio::sync::watch::Receiver, + ) -> Vec>> { + self.tasks + .into_iter() + .map(|task| { + let pool = self.pool.clone(); + let receiver = stop_receiver.clone(); + tokio::spawn(async move { task.run(receiver, pool).await }) + }) + .collect() + } +} diff --git a/prover/crates/bin/prover_job_monitor/src/witness_job_queuer.rs b/prover/crates/bin/prover_job_monitor/src/witness_job_queuer.rs new file mode 100644 index 000000000000..d8d12df4abe3 --- /dev/null +++ b/prover/crates/bin/prover_job_monitor/src/witness_job_queuer.rs @@ -0,0 +1,121 @@ +use async_trait::async_trait; +use zksync_prover_dal::{Connection, Prover, ProverDal}; + +use crate::{ + metrics::{WitnessType, PROVER_JOB_MONITOR_METRICS}, + task_wiring::Task, +}; + +/// `WitnessJobQueuer` is a task that moves witness generator jobs from 'waiting_for_proofs' to 'queued'. +/// Note: this task is the backbone of scheduling/getting ready witness jobs to execute. +#[derive(Debug)] +pub struct WitnessJobQueuer; + +impl WitnessJobQueuer { + /// Marks leaf witness jobs as queued. + /// The trigger condition is all prover jobs on round 0 for a given circuit, per batch, have been completed. + async fn queue_leaf_jobs(&self, connection: &mut Connection<'_, Prover>) { + let l1_batch_numbers = connection + .fri_witness_generator_dal() + .move_leaf_aggregation_jobs_from_waiting_to_queued() + .await; + let len = l1_batch_numbers.len(); + for (l1_batch_number, circuit_id) in l1_batch_numbers { + tracing::info!( + "Marked leaf job for l1_batch {} and circuit_id {} as queued.", + l1_batch_number, + circuit_id + ); + } + + PROVER_JOB_MONITOR_METRICS.queued_witness_generator_jobs + [&WitnessType::LeafWitnessGenerator] + .inc_by(len as u64); + } + + async fn move_node_aggregation_jobs_from_waiting_to_queued( + &self, + connection: &mut Connection<'_, Prover>, + ) -> Vec<(i64, u8, u16)> { + let mut jobs = connection + .fri_witness_generator_dal() + .move_depth_zero_node_aggregation_jobs() + .await; + jobs.extend( + connection + .fri_witness_generator_dal() + .move_depth_non_zero_node_aggregation_jobs() + .await, + ); + jobs + } + + /// Marks node witness jobs as queued. + /// The trigger condition is all prover jobs on round 1 (or 2 if recursing) for a given circuit, per batch, have been completed. + async fn queue_node_jobs(&self, connection: &mut Connection<'_, Prover>) { + let l1_batch_numbers = self + .move_node_aggregation_jobs_from_waiting_to_queued(connection) + .await; + let len = l1_batch_numbers.len(); + for (l1_batch_number, circuit_id, depth) in l1_batch_numbers { + tracing::info!( + "Marked node job for l1_batch {} and circuit_id {} at depth {} as queued.", + l1_batch_number, + circuit_id, + depth + ); + } + PROVER_JOB_MONITOR_METRICS.queued_witness_generator_jobs + [&WitnessType::NodeWitnessGenerator] + .inc_by(len as u64); + } + + /// Marks recursion tip witness jobs as queued. + /// The trigger condition is all final node proving jobs for the batch have been completed. + async fn queue_recursion_tip_jobs(&self, connection: &mut Connection<'_, Prover>) { + let l1_batch_numbers = connection + .fri_witness_generator_dal() + .move_recursion_tip_jobs_from_waiting_to_queued() + .await; + for l1_batch_number in &l1_batch_numbers { + tracing::info!( + "Marked recursion tip job for l1_batch {} as queued.", + l1_batch_number, + ); + } + PROVER_JOB_MONITOR_METRICS.queued_witness_generator_jobs + [&WitnessType::RecursionTipWitnessGenerator] + .inc_by(l1_batch_numbers.len() as u64); + } + + /// Marks scheduler witness jobs as queued. + /// The trigger condition is the recursion tip proving job for the batch has been completed. + async fn queue_scheduler_jobs(&self, connection: &mut Connection<'_, Prover>) { + let l1_batch_numbers = connection + .fri_witness_generator_dal() + .move_scheduler_jobs_from_waiting_to_queued() + .await; + for l1_batch_number in &l1_batch_numbers { + tracing::info!( + "Marked scheduler job for l1_batch {} as queued.", + l1_batch_number, + ); + } + PROVER_JOB_MONITOR_METRICS.queued_witness_generator_jobs + [&WitnessType::SchedulerWitnessGenerator] + .inc_by(l1_batch_numbers.len() as u64); + } +} + +#[async_trait] +impl Task for WitnessJobQueuer { + async fn invoke(&self, connection: &mut Connection) -> anyhow::Result<()> { + // Note that there's no basic jobs here; basic witness generation is ready by the time it reaches prover subsystem. + // It doesn't need to wait for any proof to start, as it is the process that maps the future execution (how many proofs and future witness generators). + self.queue_leaf_jobs(connection).await; + self.queue_node_jobs(connection).await; + self.queue_recursion_tip_jobs(connection).await; + self.queue_scheduler_jobs(connection).await; + Ok(()) + } +} diff --git a/prover/crates/bin/vk_setup_data_generator_server_fri/proptest-regressions/tests.txt b/prover/crates/bin/vk_setup_data_generator_server_fri/proptest-regressions/tests.txt new file mode 100644 index 000000000000..7e50d86cb4f8 --- /dev/null +++ b/prover/crates/bin/vk_setup_data_generator_server_fri/proptest-regressions/tests.txt @@ -0,0 +1,9 @@ +# Seeds for failure cases proptest has generated in the past. It is +# automatically read and these particular cases re-run before any +# novel cases are generated. +# +# It is recommended to check this file in to source control so that +# everyone who runs the test benefits from these saved cases. +cc ca181a7669a6e07b68bce71c8c723efcb8fd2a4e895fc962ca1d33ce5f8188f7 # shrinks to circuit_id = 1 +cc ce71957c410fa7af30e04b3e85423555a8e1bbd26b4682b748fa67162bc5687f # shrinks to circuit_id = 1 +cc 6d3b0c60d8a5e7d7dc3bb4a2a21cce97461827583ae01b2414345175a02a1221 # shrinks to key = ProverServiceDataKey { circuit_id: 1, round: BasicCircuits } diff --git a/prover/crates/bin/witness_generator/src/main.rs b/prover/crates/bin/witness_generator/src/main.rs index e914d3742b5b..a88dd8726d39 100644 --- a/prover/crates/bin/witness_generator/src/main.rs +++ b/prover/crates/bin/witness_generator/src/main.rs @@ -79,7 +79,7 @@ async fn main() -> anyhow::Result<()> { ); let store_factory = ObjectStoreFactory::new(object_store_config.0); let config = general_config - .witness_generator + .witness_generator_config .context("witness generator config")?; let prometheus_config = general_config.prometheus_config; diff --git a/prover/crates/lib/prover_dal/.sqlx/query-860846c9bcad1edd1a2906542c178815e29440592b2bb00adacf02730b526458.json b/prover/crates/lib/prover_dal/.sqlx/query-102b79726652d9150c802350bdca80c233a9fd3e892b5a867a5517c2e04497a8.json similarity index 68% rename from prover/crates/lib/prover_dal/.sqlx/query-860846c9bcad1edd1a2906542c178815e29440592b2bb00adacf02730b526458.json rename to prover/crates/lib/prover_dal/.sqlx/query-102b79726652d9150c802350bdca80c233a9fd3e892b5a867a5517c2e04497a8.json index f3ed6e34148d..f912d06de810 100644 --- a/prover/crates/lib/prover_dal/.sqlx/query-860846c9bcad1edd1a2906542c178815e29440592b2bb00adacf02730b526458.json +++ b/prover/crates/lib/prover_dal/.sqlx/query-102b79726652d9150c802350bdca80c233a9fd3e892b5a867a5517c2e04497a8.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n UPDATE proof_compression_jobs_fri\n SET\n status = 'queued',\n error = 'Manually requeued',\n attempts = 2,\n updated_at = NOW(),\n processing_started_at = NOW()\n WHERE\n l1_batch_number = $1\n AND attempts >= $2\n AND (\n status = 'in_progress'\n OR status = 'failed'\n )\n RETURNING\n status,\n attempts\n ", + "query": "\n UPDATE proof_compression_jobs_fri\n SET\n status = 'queued',\n error = 'Manually requeued',\n attempts = 2,\n updated_at = NOW(),\n processing_started_at = NOW()\n WHERE\n l1_batch_number = $1\n AND attempts >= $2\n AND (\n status = 'in_progress'\n OR status = 'failed'\n )\n RETURNING\n status,\n attempts,\n error,\n picked_by\n ", "describe": { "columns": [ { @@ -12,6 +12,16 @@ "ordinal": 1, "name": "attempts", "type_info": "Int2" + }, + { + "ordinal": 2, + "name": "error", + "type_info": "Text" + }, + { + "ordinal": 3, + "name": "picked_by", + "type_info": "Text" } ], "parameters": { @@ -22,8 +32,10 @@ }, "nullable": [ false, - false + false, + true, + true ] }, - "hash": "860846c9bcad1edd1a2906542c178815e29440592b2bb00adacf02730b526458" + "hash": "102b79726652d9150c802350bdca80c233a9fd3e892b5a867a5517c2e04497a8" } diff --git a/prover/crates/lib/prover_dal/.sqlx/query-3c3abbf689fa64c6da7de69fd916769dbb04d3a61cf232892236c974660ffe64.json b/prover/crates/lib/prover_dal/.sqlx/query-216d0c263539739b53975a96a10332b826708800a2f72f09bd7aea08cf724e1a.json similarity index 71% rename from prover/crates/lib/prover_dal/.sqlx/query-3c3abbf689fa64c6da7de69fd916769dbb04d3a61cf232892236c974660ffe64.json rename to prover/crates/lib/prover_dal/.sqlx/query-216d0c263539739b53975a96a10332b826708800a2f72f09bd7aea08cf724e1a.json index 56d8b1fa9956..ec503eabee01 100644 --- a/prover/crates/lib/prover_dal/.sqlx/query-3c3abbf689fa64c6da7de69fd916769dbb04d3a61cf232892236c974660ffe64.json +++ b/prover/crates/lib/prover_dal/.sqlx/query-216d0c263539739b53975a96a10332b826708800a2f72f09bd7aea08cf724e1a.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n UPDATE scheduler_witness_jobs_fri\n SET\n status = 'queued',\n updated_at = NOW(),\n processing_started_at = NOW()\n WHERE\n (\n status = 'in_progress'\n AND processing_started_at <= NOW() - $1::INTERVAL\n AND attempts < $2\n )\n OR (\n status = 'failed'\n AND attempts < $2\n )\n RETURNING\n l1_batch_number,\n status,\n attempts\n ", + "query": "\n UPDATE scheduler_witness_jobs_fri\n SET\n status = 'queued',\n updated_at = NOW(),\n processing_started_at = NOW()\n WHERE\n (\n status = 'in_progress'\n AND processing_started_at <= NOW() - $1::INTERVAL\n AND attempts < $2\n )\n OR (\n status = 'failed'\n AND attempts < $2\n )\n RETURNING\n l1_batch_number,\n status,\n attempts,\n error,\n picked_by\n ", "describe": { "columns": [ { @@ -17,6 +17,16 @@ "ordinal": 2, "name": "attempts", "type_info": "Int2" + }, + { + "ordinal": 3, + "name": "error", + "type_info": "Text" + }, + { + "ordinal": 4, + "name": "picked_by", + "type_info": "Text" } ], "parameters": { @@ -28,8 +38,10 @@ "nullable": [ false, false, - false + false, + true, + true ] }, - "hash": "3c3abbf689fa64c6da7de69fd916769dbb04d3a61cf232892236c974660ffe64" + "hash": "216d0c263539739b53975a96a10332b826708800a2f72f09bd7aea08cf724e1a" } diff --git a/prover/crates/lib/prover_dal/.sqlx/query-8719c090a9ad2488d556e495238cdce6412e2725cf5162ce7a733f6dceaecb11.json b/prover/crates/lib/prover_dal/.sqlx/query-2b12c5d469e6220cc8ddc997c666e4aa4f797bcc6e05ec2f2e435a7e940d8cf9.json similarity index 76% rename from prover/crates/lib/prover_dal/.sqlx/query-8719c090a9ad2488d556e495238cdce6412e2725cf5162ce7a733f6dceaecb11.json rename to prover/crates/lib/prover_dal/.sqlx/query-2b12c5d469e6220cc8ddc997c666e4aa4f797bcc6e05ec2f2e435a7e940d8cf9.json index 6493053b122c..14b64e8122e5 100644 --- a/prover/crates/lib/prover_dal/.sqlx/query-8719c090a9ad2488d556e495238cdce6412e2725cf5162ce7a733f6dceaecb11.json +++ b/prover/crates/lib/prover_dal/.sqlx/query-2b12c5d469e6220cc8ddc997c666e4aa4f797bcc6e05ec2f2e435a7e940d8cf9.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n UPDATE node_aggregation_witness_jobs_fri\n SET\n status = 'queued',\n updated_at = NOW(),\n processing_started_at = NOW()\n WHERE\n (\n status = 'in_progress'\n AND processing_started_at <= NOW() - $1::INTERVAL\n AND attempts < $2\n )\n OR (\n status = 'failed'\n AND attempts < $2\n )\n RETURNING\n id,\n status,\n attempts,\n circuit_id\n ", + "query": "\n UPDATE node_aggregation_witness_jobs_fri\n SET\n status = 'queued',\n updated_at = NOW(),\n processing_started_at = NOW()\n WHERE\n (\n status = 'in_progress'\n AND processing_started_at <= NOW() - $1::INTERVAL\n AND attempts < $2\n )\n OR (\n status = 'failed'\n AND attempts < $2\n )\n RETURNING\n id,\n status,\n attempts,\n circuit_id,\n error,\n picked_by\n ", "describe": { "columns": [ { @@ -22,6 +22,16 @@ "ordinal": 3, "name": "circuit_id", "type_info": "Int2" + }, + { + "ordinal": 4, + "name": "error", + "type_info": "Text" + }, + { + "ordinal": 5, + "name": "picked_by", + "type_info": "Text" } ], "parameters": { @@ -34,8 +44,10 @@ false, false, false, - false + false, + true, + true ] }, - "hash": "8719c090a9ad2488d556e495238cdce6412e2725cf5162ce7a733f6dceaecb11" + "hash": "2b12c5d469e6220cc8ddc997c666e4aa4f797bcc6e05ec2f2e435a7e940d8cf9" } diff --git a/prover/crates/lib/prover_dal/.sqlx/query-ca9d06141265b8524ee28c55569cb21a635037d89ce24dd3ad58ffaadb59594a.json b/prover/crates/lib/prover_dal/.sqlx/query-5f18efe2fb3a16cdf3c23379f36536b9704e8a76de95811cb23e3aa9f2512ade.json similarity index 65% rename from prover/crates/lib/prover_dal/.sqlx/query-ca9d06141265b8524ee28c55569cb21a635037d89ce24dd3ad58ffaadb59594a.json rename to prover/crates/lib/prover_dal/.sqlx/query-5f18efe2fb3a16cdf3c23379f36536b9704e8a76de95811cb23e3aa9f2512ade.json index ff49f615ab50..a9c675855baf 100644 --- a/prover/crates/lib/prover_dal/.sqlx/query-ca9d06141265b8524ee28c55569cb21a635037d89ce24dd3ad58ffaadb59594a.json +++ b/prover/crates/lib/prover_dal/.sqlx/query-5f18efe2fb3a16cdf3c23379f36536b9704e8a76de95811cb23e3aa9f2512ade.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT\n l1_batch_number\n FROM\n proof_compression_jobs_fri\n WHERE\n status <> 'successful'\n ORDER BY\n l1_batch_number ASC\n LIMIT\n 1\n ", + "query": "\n SELECT\n l1_batch_number\n FROM\n proof_compression_jobs_fri\n WHERE\n status <> 'successful'\n AND status <> 'sent_to_server'\n ORDER BY\n l1_batch_number ASC\n LIMIT\n 1\n ", "describe": { "columns": [ { @@ -16,5 +16,5 @@ false ] }, - "hash": "ca9d06141265b8524ee28c55569cb21a635037d89ce24dd3ad58ffaadb59594a" + "hash": "5f18efe2fb3a16cdf3c23379f36536b9704e8a76de95811cb23e3aa9f2512ade" } diff --git a/prover/crates/lib/prover_dal/.sqlx/query-6cfc59d2fc039c706f30ae91b7d9d0c658093dede5eb61489205aa751ad5b8ec.json b/prover/crates/lib/prover_dal/.sqlx/query-6cfc59d2fc039c706f30ae91b7d9d0c658093dede5eb61489205aa751ad5b8ec.json deleted file mode 100644 index 02b7862517fb..000000000000 --- a/prover/crates/lib/prover_dal/.sqlx/query-6cfc59d2fc039c706f30ae91b7d9d0c658093dede5eb61489205aa751ad5b8ec.json +++ /dev/null @@ -1,22 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n WITH deleted AS (\n DELETE FROM prover_jobs_fri\n WHERE\n status NOT IN ('queued', 'in_progress', 'in_gpu_proof', 'failed')\n AND updated_at < NOW() - $1::INTERVAL\n RETURNING *\n ),\n inserted_count AS (\n INSERT INTO prover_jobs_fri_archive\n SELECT * FROM deleted\n )\n SELECT COUNT(*) FROM deleted\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "count", - "type_info": "Int8" - } - ], - "parameters": { - "Left": [ - "Interval" - ] - }, - "nullable": [ - null - ] - }, - "hash": "6cfc59d2fc039c706f30ae91b7d9d0c658093dede5eb61489205aa751ad5b8ec" -} diff --git a/prover/crates/lib/prover_dal/.sqlx/query-a0f60a97f09b2467ca73bb6fbebb210d65149cdd4a3411a79b717aadbffb43af.json b/prover/crates/lib/prover_dal/.sqlx/query-8357972a21b39644e4cbe4bedc3b6d9065bf4494daf8f7632ab2bfe055773f7b.json similarity index 71% rename from prover/crates/lib/prover_dal/.sqlx/query-a0f60a97f09b2467ca73bb6fbebb210d65149cdd4a3411a79b717aadbffb43af.json rename to prover/crates/lib/prover_dal/.sqlx/query-8357972a21b39644e4cbe4bedc3b6d9065bf4494daf8f7632ab2bfe055773f7b.json index f718a93a590d..54fba3bbeac0 100644 --- a/prover/crates/lib/prover_dal/.sqlx/query-a0f60a97f09b2467ca73bb6fbebb210d65149cdd4a3411a79b717aadbffb43af.json +++ b/prover/crates/lib/prover_dal/.sqlx/query-8357972a21b39644e4cbe4bedc3b6d9065bf4494daf8f7632ab2bfe055773f7b.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n UPDATE recursion_tip_witness_jobs_fri\n SET\n status = 'queued',\n updated_at = NOW(),\n processing_started_at = NOW()\n WHERE\n (\n status = 'in_progress'\n AND processing_started_at <= NOW() - $1::INTERVAL\n AND attempts < $2\n )\n OR (\n status = 'failed'\n AND attempts < $2\n )\n RETURNING\n l1_batch_number,\n status,\n attempts\n ", + "query": "\n UPDATE recursion_tip_witness_jobs_fri\n SET\n status = 'queued',\n updated_at = NOW(),\n processing_started_at = NOW()\n WHERE\n (\n status = 'in_progress'\n AND processing_started_at <= NOW() - $1::INTERVAL\n AND attempts < $2\n )\n OR (\n status = 'failed'\n AND attempts < $2\n )\n RETURNING\n l1_batch_number,\n status,\n attempts,\n error,\n picked_by\n ", "describe": { "columns": [ { @@ -17,6 +17,16 @@ "ordinal": 2, "name": "attempts", "type_info": "Int2" + }, + { + "ordinal": 3, + "name": "error", + "type_info": "Text" + }, + { + "ordinal": 4, + "name": "picked_by", + "type_info": "Text" } ], "parameters": { @@ -28,8 +38,10 @@ "nullable": [ false, false, - false + false, + true, + true ] }, - "hash": "a0f60a97f09b2467ca73bb6fbebb210d65149cdd4a3411a79b717aadbffb43af" + "hash": "8357972a21b39644e4cbe4bedc3b6d9065bf4494daf8f7632ab2bfe055773f7b" } diff --git a/prover/crates/lib/prover_dal/.sqlx/query-e3194873d24e67f8d0e98bf8bf2d4f9a3b98458746972c9860fb9473947d59ff.json b/prover/crates/lib/prover_dal/.sqlx/query-9895b2ded08be3e81a5357decf76b4d3d6a762761e45af2a73fe96da804e627e.json similarity index 74% rename from prover/crates/lib/prover_dal/.sqlx/query-e3194873d24e67f8d0e98bf8bf2d4f9a3b98458746972c9860fb9473947d59ff.json rename to prover/crates/lib/prover_dal/.sqlx/query-9895b2ded08be3e81a5357decf76b4d3d6a762761e45af2a73fe96da804e627e.json index 0264238ee484..90ea99942062 100644 --- a/prover/crates/lib/prover_dal/.sqlx/query-e3194873d24e67f8d0e98bf8bf2d4f9a3b98458746972c9860fb9473947d59ff.json +++ b/prover/crates/lib/prover_dal/.sqlx/query-9895b2ded08be3e81a5357decf76b4d3d6a762761e45af2a73fe96da804e627e.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n UPDATE prover_jobs_fri\n SET\n status = 'queued',\n error = 'Manually requeued',\n attempts = 2,\n updated_at = NOW(),\n processing_started_at = NOW()\n WHERE\n l1_batch_number = $1\n AND attempts >= $2\n AND (\n status = 'in_progress'\n OR status = 'failed'\n )\n RETURNING\n id,\n status,\n attempts,\n circuit_id\n ", + "query": "\n UPDATE prover_jobs_fri\n SET\n status = 'queued',\n error = 'Manually requeued',\n attempts = 2,\n updated_at = NOW(),\n processing_started_at = NOW()\n WHERE\n l1_batch_number = $1\n AND attempts >= $2\n AND (\n status = 'in_progress'\n OR status = 'failed'\n )\n RETURNING\n id,\n status,\n attempts,\n circuit_id,\n error,\n picked_by\n ", "describe": { "columns": [ { @@ -22,6 +22,16 @@ "ordinal": 3, "name": "circuit_id", "type_info": "Int2" + }, + { + "ordinal": 4, + "name": "error", + "type_info": "Text" + }, + { + "ordinal": 5, + "name": "picked_by", + "type_info": "Text" } ], "parameters": { @@ -34,8 +44,10 @@ false, false, false, - false + false, + true, + true ] }, - "hash": "e3194873d24e67f8d0e98bf8bf2d4f9a3b98458746972c9860fb9473947d59ff" + "hash": "9895b2ded08be3e81a5357decf76b4d3d6a762761e45af2a73fe96da804e627e" } diff --git a/prover/crates/lib/prover_dal/.sqlx/query-a9e9399edfcaf7569869d5ac72ae8e0ed14ad1f42ffd0b383fbfb38e78df8ae3.json b/prover/crates/lib/prover_dal/.sqlx/query-a9e9399edfcaf7569869d5ac72ae8e0ed14ad1f42ffd0b383fbfb38e78df8ae3.json new file mode 100644 index 000000000000..ea6e6c23e6a0 --- /dev/null +++ b/prover/crates/lib/prover_dal/.sqlx/query-a9e9399edfcaf7569869d5ac72ae8e0ed14ad1f42ffd0b383fbfb38e78df8ae3.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n WITH deleted AS (\n DELETE FROM prover_jobs_fri AS p\n USING proof_compression_jobs_fri AS c\n WHERE\n p.status NOT IN ('queued', 'in_progress', 'in_gpu_proof', 'failed')\n AND p.updated_at < NOW() - $1::INTERVAL\n AND p.l1_batch_number = c.l1_batch_number\n AND c.status = 'sent_to_server'\n RETURNING p.*\n ),\n inserted_count AS (\n INSERT INTO prover_jobs_fri_archive\n SELECT * FROM deleted\n )\n SELECT COUNT(*) FROM deleted\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "count", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Interval" + ] + }, + "nullable": [ + null + ] + }, + "hash": "a9e9399edfcaf7569869d5ac72ae8e0ed14ad1f42ffd0b383fbfb38e78df8ae3" +} diff --git a/prover/crates/lib/prover_dal/.sqlx/query-bfb80956a18eabf266f5b5a9d62912d57f8eb2a38bdb7884fc812a2897a3a660.json b/prover/crates/lib/prover_dal/.sqlx/query-bcc5d3d35652f49b41d4ee673b171570fc88c17822bebd5b92e3b2f726d9af3a.json similarity index 63% rename from prover/crates/lib/prover_dal/.sqlx/query-bfb80956a18eabf266f5b5a9d62912d57f8eb2a38bdb7884fc812a2897a3a660.json rename to prover/crates/lib/prover_dal/.sqlx/query-bcc5d3d35652f49b41d4ee673b171570fc88c17822bebd5b92e3b2f726d9af3a.json index 550cb5ec7438..ab1c2dd6552a 100644 --- a/prover/crates/lib/prover_dal/.sqlx/query-bfb80956a18eabf266f5b5a9d62912d57f8eb2a38bdb7884fc812a2897a3a660.json +++ b/prover/crates/lib/prover_dal/.sqlx/query-bcc5d3d35652f49b41d4ee673b171570fc88c17822bebd5b92e3b2f726d9af3a.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n UPDATE witness_inputs_fri\n SET\n status = 'queued',\n updated_at = NOW(),\n processing_started_at = NOW()\n WHERE\n (\n status = 'in_progress'\n AND processing_started_at <= NOW() - $1::INTERVAL\n AND attempts < $2\n )\n OR (\n status = 'in_gpu_proof'\n AND processing_started_at <= NOW() - $1::INTERVAL\n AND attempts < $2\n )\n OR (\n status = 'failed'\n AND attempts < $2\n )\n RETURNING\n l1_batch_number,\n status,\n attempts\n ", + "query": "\n UPDATE witness_inputs_fri\n SET\n status = 'queued',\n updated_at = NOW(),\n processing_started_at = NOW()\n WHERE\n (\n status = 'in_progress'\n AND processing_started_at <= NOW() - $1::INTERVAL\n AND attempts < $2\n )\n OR (\n status = 'failed'\n AND attempts < $2\n )\n RETURNING\n l1_batch_number,\n status,\n attempts,\n error,\n picked_by\n ", "describe": { "columns": [ { @@ -17,6 +17,16 @@ "ordinal": 2, "name": "attempts", "type_info": "Int2" + }, + { + "ordinal": 3, + "name": "error", + "type_info": "Text" + }, + { + "ordinal": 4, + "name": "picked_by", + "type_info": "Text" } ], "parameters": { @@ -28,8 +38,10 @@ "nullable": [ false, false, - false + false, + true, + true ] }, - "hash": "bfb80956a18eabf266f5b5a9d62912d57f8eb2a38bdb7884fc812a2897a3a660" + "hash": "bcc5d3d35652f49b41d4ee673b171570fc88c17822bebd5b92e3b2f726d9af3a" } diff --git a/prover/crates/lib/prover_dal/.sqlx/query-c156004a0e5ad5bcc33d3b894fd69718349ac4fc08b455c7f4265d7443f2ec13.json b/prover/crates/lib/prover_dal/.sqlx/query-d0be28042b50199075cb0eca26f6b93bfd5d96fdc68732fe38c79ccd44b84def.json similarity index 50% rename from prover/crates/lib/prover_dal/.sqlx/query-c156004a0e5ad5bcc33d3b894fd69718349ac4fc08b455c7f4265d7443f2ec13.json rename to prover/crates/lib/prover_dal/.sqlx/query-d0be28042b50199075cb0eca26f6b93bfd5d96fdc68732fe38c79ccd44b84def.json index 60f8a0df709a..3943480b896d 100644 --- a/prover/crates/lib/prover_dal/.sqlx/query-c156004a0e5ad5bcc33d3b894fd69718349ac4fc08b455c7f4265d7443f2ec13.json +++ b/prover/crates/lib/prover_dal/.sqlx/query-d0be28042b50199075cb0eca26f6b93bfd5d96fdc68732fe38c79ccd44b84def.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n UPDATE prover_jobs_fri\n SET\n status = 'queued',\n updated_at = NOW(),\n processing_started_at = NOW()\n WHERE\n id IN (\n SELECT\n id\n FROM\n prover_jobs_fri\n WHERE\n (\n status = 'in_progress'\n AND processing_started_at <= NOW() - $1::INTERVAL\n AND attempts < $2\n )\n OR (\n status = 'in_gpu_proof'\n AND processing_started_at <= NOW() - $1::INTERVAL\n AND attempts < $2\n )\n OR (\n status = 'failed'\n AND attempts < $2\n )\n FOR UPDATE\n SKIP LOCKED\n )\n RETURNING\n id,\n status,\n attempts,\n circuit_id\n ", + "query": "\n UPDATE prover_jobs_fri\n SET\n status = 'queued',\n updated_at = NOW(),\n processing_started_at = NOW()\n WHERE\n id IN (\n SELECT\n id\n FROM\n prover_jobs_fri\n WHERE\n (\n status IN ('in_progress', 'in_gpu_proof')\n AND processing_started_at <= NOW() - $1::INTERVAL\n AND attempts < $2\n )\n OR (\n status = 'failed'\n AND attempts < $2\n )\n FOR UPDATE\n SKIP LOCKED\n )\n RETURNING\n id,\n status,\n attempts,\n circuit_id,\n error,\n picked_by\n ", "describe": { "columns": [ { @@ -22,6 +22,16 @@ "ordinal": 3, "name": "circuit_id", "type_info": "Int2" + }, + { + "ordinal": 4, + "name": "error", + "type_info": "Text" + }, + { + "ordinal": 5, + "name": "picked_by", + "type_info": "Text" } ], "parameters": { @@ -34,8 +44,10 @@ false, false, false, - false + false, + true, + true ] }, - "hash": "c156004a0e5ad5bcc33d3b894fd69718349ac4fc08b455c7f4265d7443f2ec13" + "hash": "d0be28042b50199075cb0eca26f6b93bfd5d96fdc68732fe38c79ccd44b84def" } diff --git a/prover/crates/lib/prover_dal/.sqlx/query-e32c0d85cb2841efb0b7cea6b049bae42849574731d33539bfdcca21c9b64f4e.json b/prover/crates/lib/prover_dal/.sqlx/query-d5bb897092bce2788fe02f31c9de6dde4142e09330557cc627fee2db278ace50.json similarity index 76% rename from prover/crates/lib/prover_dal/.sqlx/query-e32c0d85cb2841efb0b7cea6b049bae42849574731d33539bfdcca21c9b64f4e.json rename to prover/crates/lib/prover_dal/.sqlx/query-d5bb897092bce2788fe02f31c9de6dde4142e09330557cc627fee2db278ace50.json index 3a8362d2866d..9df8f1c849cb 100644 --- a/prover/crates/lib/prover_dal/.sqlx/query-e32c0d85cb2841efb0b7cea6b049bae42849574731d33539bfdcca21c9b64f4e.json +++ b/prover/crates/lib/prover_dal/.sqlx/query-d5bb897092bce2788fe02f31c9de6dde4142e09330557cc627fee2db278ace50.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n UPDATE leaf_aggregation_witness_jobs_fri\n SET\n status = 'queued',\n updated_at = NOW(),\n processing_started_at = NOW()\n WHERE\n (\n status = 'in_progress'\n AND processing_started_at <= NOW() - $1::INTERVAL\n AND attempts < $2\n )\n OR (\n status = 'failed'\n AND attempts < $2\n )\n RETURNING\n id,\n status,\n attempts,\n circuit_id\n ", + "query": "\n UPDATE leaf_aggregation_witness_jobs_fri\n SET\n status = 'queued',\n updated_at = NOW(),\n processing_started_at = NOW()\n WHERE\n (\n status = 'in_progress'\n AND processing_started_at <= NOW() - $1::INTERVAL\n AND attempts < $2\n )\n OR (\n status = 'failed'\n AND attempts < $2\n )\n RETURNING\n id,\n status,\n attempts,\n circuit_id,\n error,\n picked_by\n ", "describe": { "columns": [ { @@ -22,6 +22,16 @@ "ordinal": 3, "name": "circuit_id", "type_info": "Int2" + }, + { + "ordinal": 4, + "name": "error", + "type_info": "Text" + }, + { + "ordinal": 5, + "name": "picked_by", + "type_info": "Text" } ], "parameters": { @@ -34,8 +44,10 @@ false, false, false, - false + false, + true, + true ] }, - "hash": "e32c0d85cb2841efb0b7cea6b049bae42849574731d33539bfdcca21c9b64f4e" + "hash": "d5bb897092bce2788fe02f31c9de6dde4142e09330557cc627fee2db278ace50" } diff --git a/prover/crates/lib/prover_dal/.sqlx/query-5e781f84ec41edd0941fa84de837effac442434c6e734d977e6682a7484abe7f.json b/prover/crates/lib/prover_dal/.sqlx/query-eb2a85cb60c680a71203769db7baf89bbd72934e1405e320e746158e6d395d96.json similarity index 75% rename from prover/crates/lib/prover_dal/.sqlx/query-5e781f84ec41edd0941fa84de837effac442434c6e734d977e6682a7484abe7f.json rename to prover/crates/lib/prover_dal/.sqlx/query-eb2a85cb60c680a71203769db7baf89bbd72934e1405e320e746158e6d395d96.json index 4958f38f5358..27680c0bb46e 100644 --- a/prover/crates/lib/prover_dal/.sqlx/query-5e781f84ec41edd0941fa84de837effac442434c6e734d977e6682a7484abe7f.json +++ b/prover/crates/lib/prover_dal/.sqlx/query-eb2a85cb60c680a71203769db7baf89bbd72934e1405e320e746158e6d395d96.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n UPDATE proof_compression_jobs_fri\n SET\n status = 'queued',\n updated_at = NOW(),\n processing_started_at = NOW()\n WHERE\n (\n status = 'in_progress'\n AND processing_started_at <= NOW() - $1::INTERVAL\n AND attempts < $2\n )\n OR (\n status = 'failed'\n AND attempts < $2\n )\n RETURNING\n l1_batch_number,\n status,\n attempts\n ", + "query": "\n UPDATE proof_compression_jobs_fri\n SET\n status = 'queued',\n updated_at = NOW(),\n processing_started_at = NOW()\n WHERE\n (\n status = 'in_progress'\n AND processing_started_at <= NOW() - $1::INTERVAL\n AND attempts < $2\n )\n OR (\n status = 'failed'\n AND attempts < $2\n )\n RETURNING\n l1_batch_number,\n status,\n attempts,\n error,\n picked_by\n ", "describe": { "columns": [ { @@ -17,6 +17,16 @@ "ordinal": 2, "name": "attempts", "type_info": "Int2" + }, + { + "ordinal": 3, + "name": "error", + "type_info": "Text" + }, + { + "ordinal": 4, + "name": "picked_by", + "type_info": "Text" } ], "parameters": { @@ -28,8 +38,10 @@ "nullable": [ false, false, - false + false, + true, + true ] }, - "hash": "5e781f84ec41edd0941fa84de837effac442434c6e734d977e6682a7484abe7f" + "hash": "eb2a85cb60c680a71203769db7baf89bbd72934e1405e320e746158e6d395d96" } diff --git a/prover/crates/lib/prover_dal/src/fri_gpu_prover_queue_dal.rs b/prover/crates/lib/prover_dal/src/fri_gpu_prover_queue_dal.rs index 753b65b4ef06..aa4810ad2f6f 100644 --- a/prover/crates/lib/prover_dal/src/fri_gpu_prover_queue_dal.rs +++ b/prover/crates/lib/prover_dal/src/fri_gpu_prover_queue_dal.rs @@ -198,9 +198,8 @@ impl FriGpuProverQueueDal<'_, '_> { .map(|row| GpuProverInstanceStatus::from_str(&row.instance_status).unwrap()) } - pub async fn archive_old_provers(&mut self, archive_prover_after_secs: u64) -> usize { - let prover_max_age = - pg_interval_from_duration(Duration::from_secs(archive_prover_after_secs)); + pub async fn archive_old_provers(&mut self, archive_prover_after: Duration) -> usize { + let prover_max_age = pg_interval_from_duration(archive_prover_after); sqlx::query_scalar!( r#" diff --git a/prover/crates/lib/prover_dal/src/fri_proof_compressor_dal.rs b/prover/crates/lib/prover_dal/src/fri_proof_compressor_dal.rs index 7adc08b680dc..31b121e51e42 100644 --- a/prover/crates/lib/prover_dal/src/fri_proof_compressor_dal.rs +++ b/prover/crates/lib/prover_dal/src/fri_proof_compressor_dal.rs @@ -288,6 +288,7 @@ impl FriProofCompressorDal<'_, '_> { proof_compression_jobs_fri WHERE status <> 'successful' + AND status <> 'sent_to_server' ORDER BY l1_batch_number ASC LIMIT @@ -329,7 +330,9 @@ impl FriProofCompressorDal<'_, '_> { RETURNING l1_batch_number, status, - attempts + attempts, + error, + picked_by "#, &processing_timeout, max_attempts as i32, @@ -343,6 +346,8 @@ impl FriProofCompressorDal<'_, '_> { status: row.status, attempts: row.attempts as u64, circuit_id: None, + error: row.error, + picked_by: row.picked_by, }) .collect() } @@ -431,7 +436,9 @@ impl FriProofCompressorDal<'_, '_> { ) RETURNING status, - attempts + attempts, + error, + picked_by "#, i64::from(block_number.0), max_attempts as i32, @@ -445,6 +452,8 @@ impl FriProofCompressorDal<'_, '_> { status: row.status, attempts: row.attempts as u64, circuit_id: None, + error: row.error, + picked_by: row.picked_by, }) .collect() } diff --git a/prover/crates/lib/prover_dal/src/fri_prover_dal.rs b/prover/crates/lib/prover_dal/src/fri_prover_dal.rs index f6efc6afa6ad..c2dadae58d0b 100644 --- a/prover/crates/lib/prover_dal/src/fri_prover_dal.rs +++ b/prover/crates/lib/prover_dal/src/fri_prover_dal.rs @@ -2,11 +2,12 @@ use std::{collections::HashMap, convert::TryFrom, str::FromStr, time::Duration}; use zksync_basic_types::{ - basic_fri_types::{AggregationRound, CircuitIdRoundTuple, JobIdentifiers}, - protocol_version::{ProtocolSemanticVersion, ProtocolVersionId}, - prover_dal::{ - FriProverJobMetadata, JobCountStatistics, ProverJobFriInfo, ProverJobStatus, StuckJobs, + basic_fri_types::{ + AggregationRound, CircuitIdRoundTuple, CircuitProverStatsEntry, + ProtocolVersionedCircuitProverStats, }, + protocol_version::{ProtocolSemanticVersion, ProtocolVersionId}, + prover_dal::{FriProverJobMetadata, ProverJobFriInfo, ProverJobStatus, StuckJobs}, L1BatchNumber, }; use zksync_db_connection::{ @@ -310,12 +311,7 @@ impl FriProverDal<'_, '_> { prover_jobs_fri WHERE ( - status = 'in_progress' - AND processing_started_at <= NOW() - $1::INTERVAL - AND attempts < $2 - ) - OR ( - status = 'in_gpu_proof' + status IN ('in_progress', 'in_gpu_proof') AND processing_started_at <= NOW() - $1::INTERVAL AND attempts < $2 ) @@ -330,7 +326,9 @@ impl FriProverDal<'_, '_> { id, status, attempts, - circuit_id + circuit_id, + error, + picked_by "#, &processing_timeout, max_attempts as i32, @@ -344,6 +342,8 @@ impl FriProverDal<'_, '_> { status: row.status, attempts: row.attempts as u64, circuit_id: Some(row.circuit_id as u32), + error: row.error, + picked_by: row.picked_by, }) .collect() } @@ -400,9 +400,9 @@ impl FriProverDal<'_, '_> { .unwrap(); } - pub async fn get_prover_jobs_stats(&mut self) -> HashMap { + pub async fn get_prover_jobs_stats(&mut self) -> ProtocolVersionedCircuitProverStats { { - let rows = sqlx::query!( + sqlx::query!( r#" SELECT COUNT(*) AS "count!", @@ -429,27 +429,19 @@ impl FriProverDal<'_, '_> { ) .fetch_all(self.storage.conn()) .await - .unwrap(); - - let mut result = HashMap::new(); - - for row in &rows { - let stats: &mut JobCountStatistics = result - .entry(JobIdentifiers { - circuit_id: row.circuit_id as u8, - aggregation_round: row.aggregation_round as u8, - protocol_version: row.protocol_version as u16, - protocol_version_patch: row.protocol_version_patch as u32, - }) - .or_default(); - match row.status.as_ref() { - "queued" => stats.queued = row.count as usize, - "in_progress" => stats.in_progress = row.count as usize, - _ => (), - } - } - - result + .unwrap() + .iter() + .map(|row| { + CircuitProverStatsEntry::new( + row.circuit_id, + row.aggregation_round, + row.protocol_version, + row.protocol_version_patch, + &row.status, + row.count, + ) + }) + .collect() } } @@ -577,19 +569,20 @@ impl FriProverDal<'_, '_> { .ok()? .map(|row| row.id as u32) } - - pub async fn archive_old_jobs(&mut self, archiving_interval_secs: u64) -> usize { - let archiving_interval_secs = - pg_interval_from_duration(Duration::from_secs(archiving_interval_secs)); + pub async fn archive_old_jobs(&mut self, archiving_interval: Duration) -> usize { + let archiving_interval_secs = pg_interval_from_duration(archiving_interval); sqlx::query_scalar!( r#" WITH deleted AS ( - DELETE FROM prover_jobs_fri + DELETE FROM prover_jobs_fri AS p + USING proof_compression_jobs_fri AS c WHERE - status NOT IN ('queued', 'in_progress', 'in_gpu_proof', 'failed') - AND updated_at < NOW() - $1::INTERVAL - RETURNING * + p.status NOT IN ('queued', 'in_progress', 'in_gpu_proof', 'failed') + AND p.updated_at < NOW() - $1::INTERVAL + AND p.l1_batch_number = c.l1_batch_number + AND c.status = 'sent_to_server' + RETURNING p.* ), inserted_count AS ( INSERT INTO prover_jobs_fri_archive @@ -744,7 +737,9 @@ impl FriProverDal<'_, '_> { id, status, attempts, - circuit_id + circuit_id, + error, + picked_by "#, i64::from(block_number.0), max_attempts as i32, @@ -758,6 +753,8 @@ impl FriProverDal<'_, '_> { status: row.status, attempts: row.attempts as u64, circuit_id: Some(row.circuit_id as u32), + error: row.error, + picked_by: row.picked_by, }) .collect() } diff --git a/prover/crates/lib/prover_dal/src/fri_witness_generator_dal.rs b/prover/crates/lib/prover_dal/src/fri_witness_generator_dal.rs index 488d5b3a5ec9..65d490ee4e08 100644 --- a/prover/crates/lib/prover_dal/src/fri_witness_generator_dal.rs +++ b/prover/crates/lib/prover_dal/src/fri_witness_generator_dal.rs @@ -248,7 +248,7 @@ impl FriWitnessGeneratorDal<'_, '_> { .unwrap(); } - pub async fn requeue_stuck_jobs( + pub async fn requeue_stuck_basic_jobs( &mut self, processing_timeout: Duration, max_attempts: u32, @@ -267,11 +267,6 @@ impl FriWitnessGeneratorDal<'_, '_> { AND processing_started_at <= NOW() - $1::INTERVAL AND attempts < $2 ) - OR ( - status = 'in_gpu_proof' - AND processing_started_at <= NOW() - $1::INTERVAL - AND attempts < $2 - ) OR ( status = 'failed' AND attempts < $2 @@ -279,7 +274,9 @@ impl FriWitnessGeneratorDal<'_, '_> { RETURNING l1_batch_number, status, - attempts + attempts, + error, + picked_by "#, &processing_timeout, max_attempts as i32, @@ -293,6 +290,8 @@ impl FriWitnessGeneratorDal<'_, '_> { status: row.status, attempts: row.attempts as u64, circuit_id: None, + error: row.error, + picked_by: row.picked_by, }) .collect() } @@ -928,15 +927,15 @@ impl FriWitnessGeneratorDal<'_, '_> { "#, AggregationRound::RecursionTip as i64, ) - .fetch_all(self.storage.conn()) - .await - .unwrap() - .into_iter() - .map(|row| (row.l1_batch_number as u64)) - .collect() + .fetch_all(self.storage.conn()) + .await + .unwrap() + .into_iter() + .map(|row| (row.l1_batch_number as u64)) + .collect() } - pub async fn requeue_stuck_leaf_aggregations_jobs( + pub async fn requeue_stuck_leaf_jobs( &mut self, processing_timeout: Duration, max_attempts: u32, @@ -963,7 +962,9 @@ impl FriWitnessGeneratorDal<'_, '_> { id, status, attempts, - circuit_id + circuit_id, + error, + picked_by "#, &processing_timeout, max_attempts as i32, @@ -977,11 +978,13 @@ impl FriWitnessGeneratorDal<'_, '_> { status: row.status, attempts: row.attempts as u64, circuit_id: Some(row.circuit_id as u32), + error: row.error, + picked_by: row.picked_by, }) .collect() } - pub async fn requeue_stuck_node_aggregations_jobs( + pub async fn requeue_stuck_node_jobs( &mut self, processing_timeout: Duration, max_attempts: u32, @@ -1008,7 +1011,9 @@ impl FriWitnessGeneratorDal<'_, '_> { id, status, attempts, - circuit_id + circuit_id, + error, + picked_by "#, &processing_timeout, max_attempts as i32, @@ -1022,6 +1027,8 @@ impl FriWitnessGeneratorDal<'_, '_> { status: row.status, attempts: row.attempts as u64, circuit_id: Some(row.circuit_id as u32), + error: row.error, + picked_by: row.picked_by, }) .collect() } @@ -1052,7 +1059,9 @@ impl FriWitnessGeneratorDal<'_, '_> { RETURNING l1_batch_number, status, - attempts + attempts, + error, + picked_by "#, &processing_timeout, max_attempts as i32, @@ -1066,6 +1075,8 @@ impl FriWitnessGeneratorDal<'_, '_> { status: row.status, attempts: row.attempts as u64, circuit_id: None, + error: row.error, + picked_by: row.picked_by, }) .collect() } @@ -1164,7 +1175,9 @@ impl FriWitnessGeneratorDal<'_, '_> { RETURNING l1_batch_number, status, - attempts + attempts, + error, + picked_by "#, &processing_timeout, max_attempts as i32, @@ -1178,6 +1191,8 @@ impl FriWitnessGeneratorDal<'_, '_> { status: row.status, attempts: row.attempts as u64, circuit_id: None, + error: row.error, + picked_by: row.picked_by, }) .collect() } @@ -1708,7 +1723,9 @@ impl FriWitnessGeneratorDal<'_, '_> { RETURNING l1_batch_number, status, - attempts + attempts, + error, + picked_by "#, i64::from(block_number.0), max_attempts @@ -1723,6 +1740,8 @@ impl FriWitnessGeneratorDal<'_, '_> { status: row.get("status"), attempts: row.get::("attempts") as u64, circuit_id: None, + error: row.get("error"), + picked_by: row.get("picked_by"), }) .collect() } @@ -1772,7 +1791,9 @@ impl FriWitnessGeneratorDal<'_, '_> { RETURNING l1_batch_number, status, - attempts + attempts, + error, + picked_by "#, i64::from(block_number.0), max_attempts @@ -1787,6 +1808,8 @@ impl FriWitnessGeneratorDal<'_, '_> { status: row.get("status"), attempts: row.get::("attempts") as u64, circuit_id: None, + error: row.get("error"), + picked_by: row.get("picked_by"), }) .collect() } @@ -1810,7 +1833,9 @@ impl FriWitnessGeneratorDal<'_, '_> { RETURNING l1_batch_number, status, - attempts + attempts, + error, + picked_by "#, i64::from(block_number.0), max_attempts @@ -1825,6 +1850,8 @@ impl FriWitnessGeneratorDal<'_, '_> { status: row.get("status"), attempts: row.get::("attempts") as u64, circuit_id: None, + error: row.get("error"), + picked_by: row.get("picked_by"), }) .collect() } @@ -1852,7 +1879,9 @@ impl FriWitnessGeneratorDal<'_, '_> { {}, status, attempts, - circuit_id + circuit_id, + error, + picked_by "#, table_name, i64::from(block_number.0), @@ -1869,6 +1898,8 @@ impl FriWitnessGeneratorDal<'_, '_> { status: row.get("status"), attempts: row.get::("attempts") as u64, circuit_id: Some(row.get::("circuit_id") as u32), + error: row.get("error"), + picked_by: row.get("picked_by"), }) .collect() }