From f0f4bf9a736fc7e96d0afe817361643bdd02d9cb Mon Sep 17 00:00:00 2001 From: Arpit Temani Date: Thu, 29 Aug 2024 12:33:16 +0530 Subject: [PATCH] separate workers --- Cargo.lock | 19 +++---------------- Cargo.toml | 3 +-- zero_bin/README.md | 2 +- zero_bin/leader/src/client.rs | 9 ++++++--- zero_bin/leader/src/http.rs | 27 ++++++++++++++++++++------- zero_bin/leader/src/main.rs | 30 +++++++++++++++++++++++++----- zero_bin/leader/src/stdio.rs | 17 +++++++++++++---- zero_bin/prover/src/lib.rs | 25 ++++++++++++++++--------- 8 files changed, 85 insertions(+), 47 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2b3eaf1d8..062983741 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3347,8 +3347,7 @@ dependencies = [ [[package]] name = "paladin-core" version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5af1955eaab1506a43d046628c218b7b3915539554838feb85ed31f54aace2f2" +source = "git+https://github.com/0xPolygonZero/paladin.git?branch=arpit/507-2#2ba9ac248eb22b4480b01d78c8182609bbdff0fe" dependencies = [ "anyhow", "async-trait", @@ -3366,6 +3365,7 @@ dependencies = [ "paladin-opkind-derive", "pin-project", "postcard", + "rand", "serde", "thiserror", "tokio", @@ -3374,14 +3374,12 @@ dependencies = [ "tokio-util", "tracing", "tracing-subscriber", - "uuid", ] [[package]] name = "paladin-opkind-derive" version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af25dcb10b7c0ce99abee8694e2e79e4787d7f778b9339dc5a50ba6fc45e5cc9" +source = "git+https://github.com/0xPolygonZero/paladin.git?branch=arpit/507-2#2ba9ac248eb22b4480b01d78c8182609bbdff0fe" dependencies = [ "quote", "syn 2.0.70", @@ -5383,17 +5381,6 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" -[[package]] -name = "uuid" -version = "1.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81dfa00651efa65069b0b6b651f4aaa31ba9e3c3ce0137aaad053604ee7e0314" -dependencies = [ - "getrandom", - "rand", - "serde", -] - [[package]] name = "valuable" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index cd48fcca2..c5b87de1e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -78,7 +78,7 @@ num-bigint = "0.4.5" num-traits = "0.2.19" nunny = "0.2.1" once_cell = "1.19.0" -paladin-core = "0.4.2" +paladin-core = { git = "https://github.com/0xPolygonZero/paladin.git", branch = "arpit/507-2" } parking_lot = "0.12.3" paste = "1.0.15" pest = "2.7.10" @@ -139,4 +139,3 @@ trybuild = "1.0" [workspace.lints.clippy] too_long_first_doc_paragraph = "allow" - diff --git a/zero_bin/README.md b/zero_bin/README.md index b6221a724..d8b03ce8f 100644 --- a/zero_bin/README.md +++ b/zero_bin/README.md @@ -101,7 +101,7 @@ Paladin options: -t, --task-bus-routing-key Specifies the routing key for publishing task messages. In most cases, the default value should suffice - [default: task] + [default: ""] -s, --serializer Determines the serialization format to be used diff --git a/zero_bin/leader/src/client.rs b/zero_bin/leader/src/client.rs index 4ef5ed3cd..d110d2a38 100644 --- a/zero_bin/leader/src/client.rs +++ b/zero_bin/leader/src/client.rs @@ -33,7 +33,8 @@ pub struct ProofParams { /// The main function for the client. pub(crate) async fn client_main( - runtime: Runtime, + block_proof_runtime: Runtime, + segment_runtime: Runtime, rpc_params: RpcParams, block_interval: BlockInterval, mut params: ProofParams, @@ -82,13 +83,15 @@ pub(crate) async fn client_main( // verify the whole sequence. let proved_blocks = prover::prove( block_prover_inputs, - &runtime, + &block_proof_runtime, + &segment_runtime, params.previous_proof.take(), params.prover_config, params.proof_output_dir.clone(), ) .await; - runtime.close().await?; + block_proof_runtime.close().await?; + segment_runtime.close().await?; let proved_blocks = proved_blocks?; if params.prover_config.test_only { diff --git a/zero_bin/leader/src/http.rs b/zero_bin/leader/src/http.rs index 39c7333e1..d05e9252f 100644 --- a/zero_bin/leader/src/http.rs +++ b/zero_bin/leader/src/http.rs @@ -12,7 +12,8 @@ use tracing::{debug, error, info}; /// The main function for the HTTP mode. pub(crate) async fn http_main( - runtime: Runtime, + block_proof_runtime: Runtime, + segment_runtime: Runtime, port: u16, output_dir: PathBuf, prover_config: ProverConfig, @@ -20,12 +21,22 @@ pub(crate) async fn http_main( let addr = SocketAddr::from(([0, 0, 0, 0], port)); debug!("listening on {}", addr); - let runtime = Arc::new(runtime); + let block_proof_runtime = Arc::new(block_proof_runtime); + let segment_runtime = Arc::new(segment_runtime); let app = Router::new().route( "/prove", post({ - let runtime = runtime.clone(); - move |body| prove(body, runtime, output_dir.clone(), prover_config) + let block_proof_runtime = block_proof_runtime.clone(); + let segment_runtime = segment_runtime.clone(); + move |body| { + prove( + body, + block_proof_runtime, + segment_runtime, + output_dir.clone(), + prover_config, + ) + } }), ); let listener = tokio::net::TcpListener::bind(&addr).await?; @@ -63,7 +74,8 @@ struct HttpProverInput { async fn prove( Json(payload): Json, - runtime: Arc, + block_proof_runtime: Arc, + segment_runtime: Arc, output_dir: PathBuf, prover_config: ProverConfig, ) -> StatusCode { @@ -75,7 +87,7 @@ async fn prove( payload .prover_input .prove_test( - &runtime, + &segment_runtime, payload.previous.map(futures::future::ok), prover_config, ) @@ -84,7 +96,8 @@ async fn prove( payload .prover_input .prove( - &runtime, + &block_proof_runtime, + &segment_runtime, payload.previous.map(futures::future::ok), prover_config, ) diff --git a/zero_bin/leader/src/main.rs b/zero_bin/leader/src/main.rs index f4a448a3b..e466d7c4b 100644 --- a/zero_bin/leader/src/main.rs +++ b/zero_bin/leader/src/main.rs @@ -55,7 +55,14 @@ async fn main() -> Result<()> { let args = cli::Cli::parse(); - let runtime = Runtime::from_config(&args.paladin, register()).await?; + let mut block_proof_paladin_args = args.paladin.clone(); + block_proof_paladin_args.task_bus_routing_key = Some("block_proof".to_string()); + + let mut segment_paladin_args = args.paladin.clone(); + segment_paladin_args.task_bus_routing_key = Some("segment".to_string()); + + let block_proof_runtime = Runtime::from_config(&block_proof_paladin_args, register()).await?; + let segment_runtime = Runtime::from_config(&segment_paladin_args, register()).await?; let prover_config: ProverConfig = args.prover_config.into(); @@ -73,7 +80,13 @@ async fn main() -> Result<()> { Command::Clean => zero_bin_common::prover_state::persistence::delete_all()?, Command::Stdio { previous_proof } => { let previous_proof = get_previous_proof(previous_proof)?; - stdio::stdio_main(runtime, previous_proof, prover_config).await?; + stdio::stdio_main( + block_proof_runtime, + segment_runtime, + previous_proof, + prover_config, + ) + .await?; } Command::Http { port, output_dir } => { // check if output_dir exists, is a directory, and is writable @@ -85,7 +98,14 @@ async fn main() -> Result<()> { panic!("output-dir is not a writable directory"); } - http::http_main(runtime, port, output_dir, prover_config).await?; + http::http_main( + block_proof_runtime, + segment_runtime, + port, + output_dir, + prover_config, + ) + .await?; } Command::Rpc { rpc_url, @@ -99,7 +119,6 @@ async fn main() -> Result<()> { backoff, max_retries, } => { - let runtime = Runtime::from_config(&args.paladin, register()).await?; let previous_proof = get_previous_proof(previous_proof)?; let mut block_interval = BlockInterval::new(&block_interval)?; @@ -113,7 +132,8 @@ async fn main() -> Result<()> { info!("Proving interval {block_interval}"); client_main( - runtime, + block_proof_runtime, + segment_runtime, RpcParams { rpc_url, rpc_type, diff --git a/zero_bin/leader/src/stdio.rs b/zero_bin/leader/src/stdio.rs index 88dd20aac..ef9392564 100644 --- a/zero_bin/leader/src/stdio.rs +++ b/zero_bin/leader/src/stdio.rs @@ -8,7 +8,8 @@ use tracing::info; /// The main function for the stdio mode. pub(crate) async fn stdio_main( - runtime: Runtime, + block_proof_runtime: Runtime, + segment_runtime: Runtime, previous: Option, prover_config: ProverConfig, ) -> Result<()> { @@ -21,9 +22,17 @@ pub(crate) async fn stdio_main( .map(Into::into) .collect::>(); - let proved_blocks = - prover::prove(block_prover_inputs, &runtime, previous, prover_config, None).await; - runtime.close().await?; + let proved_blocks = prover::prove( + block_prover_inputs, + &block_proof_runtime, + &segment_runtime, + previous, + prover_config, + None, + ) + .await; + block_proof_runtime.close().await?; + segment_runtime.close().await?; let proved_blocks = proved_blocks?; if prover_config.test_only { diff --git a/zero_bin/prover/src/lib.rs b/zero_bin/prover/src/lib.rs index 46472bc7f..cb80bdccb 100644 --- a/zero_bin/prover/src/lib.rs +++ b/zero_bin/prover/src/lib.rs @@ -50,7 +50,8 @@ impl BlockProverInput { pub async fn prove( self, - runtime: &Runtime, + block_proof_runtime: &Runtime, + segment_runtime: &Runtime, previous: Option>>, prover_config: ProverConfig, ) -> Result { @@ -99,7 +100,7 @@ impl BlockProverInput { Directive::map(IndexedStream::from(segment_data_iterator), &seg_prove_ops) .fold(&seg_agg_ops) - .run(runtime) + .run(segment_runtime) .map(move |e| { e.map(|p| (idx, proof_gen::proof_types::BatchAggregatableProof::from(p))) }) @@ -109,7 +110,7 @@ impl BlockProverInput { // Fold the batch aggregated proof stream into a single proof. let final_batch_proof = Directive::fold(IndexedStream::new(batch_proof_futs), &batch_agg_ops) - .run(runtime) + .run(block_proof_runtime) .await?; if let proof_gen::proof_types::BatchAggregatableProof::Agg(proof) = final_batch_proof { @@ -126,7 +127,7 @@ impl BlockProverInput { prev, save_inputs_on_error, }) - .run(runtime) + .run(block_proof_runtime) .await?; info!("Successfully proved block {block_number}"); @@ -139,7 +140,7 @@ impl BlockProverInput { pub async fn prove_test( self, - runtime: &Runtime, + segment_runtime: &Runtime, previous: Option>>, prover_config: ProverConfig, ) -> Result { @@ -175,7 +176,7 @@ impl BlockProverInput { ); simulation - .run(runtime) + .run(segment_runtime) .await? .try_for_each(|_| future::ok(())) .await?; @@ -204,7 +205,8 @@ impl BlockProverInput { /// block proofs as well. pub async fn prove( block_prover_inputs: Vec, - runtime: &Runtime, + block_proof_runtime: &Runtime, + segment_runtime: &Runtime, previous_proof: Option, prover_config: ProverConfig, proof_output_dir: Option, @@ -226,7 +228,7 @@ pub async fn prove( // Prove the block let block_proof = if prover_config.test_only { block - .prove_test(runtime, previous_block_proof, prover_config) + .prove_test(segment_runtime, previous_block_proof, prover_config) .then(move |proof| async move { let proof = proof?; let block_number = proof.b_height; @@ -250,7 +252,12 @@ pub async fn prove( .await? } else { block - .prove(runtime, previous_block_proof, prover_config) + .prove( + block_proof_runtime, + segment_runtime, + previous_block_proof, + prover_config, + ) .then(move |proof| async move { let proof = proof?; let block_number = proof.b_height;