Skip to content

Commit

Permalink
separate workers
Browse files Browse the repository at this point in the history
  • Loading branch information
temaniarpit27 committed Aug 29, 2024
1 parent 9d0526d commit f0f4bf9
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 47 deletions.
19 changes: 3 additions & 16 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ num-bigint = "0.4.5"
num-traits = "0.2.19"
nunny = "0.2.1"
once_cell = "1.19.0"
paladin-core = "0.4.2"
paladin-core = { git = "https://github.com/0xPolygonZero/paladin.git", branch = "arpit/507-2" }
parking_lot = "0.12.3"
paste = "1.0.15"
pest = "2.7.10"
Expand Down Expand Up @@ -139,4 +139,3 @@ trybuild = "1.0"

[workspace.lints.clippy]
too_long_first_doc_paragraph = "allow"

2 changes: 1 addition & 1 deletion zero_bin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ Paladin options:
-t, --task-bus-routing-key <TASK_BUS_ROUTING_KEY>
Specifies the routing key for publishing task messages. In most cases, the default value should suffice
[default: task]
[default: ""]
-s, --serializer <SERIALIZER>
Determines the serialization format to be used
Expand Down
9 changes: 6 additions & 3 deletions zero_bin/leader/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ pub struct ProofParams {

/// The main function for the client.
pub(crate) async fn client_main(
runtime: Runtime,
block_proof_runtime: Runtime,
segment_runtime: Runtime,
rpc_params: RpcParams,
block_interval: BlockInterval,
mut params: ProofParams,
Expand Down Expand Up @@ -82,13 +83,15 @@ pub(crate) async fn client_main(
// verify the whole sequence.
let proved_blocks = prover::prove(
block_prover_inputs,
&runtime,
&block_proof_runtime,
&segment_runtime,
params.previous_proof.take(),
params.prover_config,
params.proof_output_dir.clone(),
)
.await;
runtime.close().await?;
block_proof_runtime.close().await?;
segment_runtime.close().await?;
let proved_blocks = proved_blocks?;

if params.prover_config.test_only {
Expand Down
27 changes: 20 additions & 7 deletions zero_bin/leader/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,31 @@ use tracing::{debug, error, info};

/// The main function for the HTTP mode.
pub(crate) async fn http_main(
runtime: Runtime,
block_proof_runtime: Runtime,
segment_runtime: Runtime,
port: u16,
output_dir: PathBuf,
prover_config: ProverConfig,
) -> Result<()> {
let addr = SocketAddr::from(([0, 0, 0, 0], port));
debug!("listening on {}", addr);

let runtime = Arc::new(runtime);
let block_proof_runtime = Arc::new(block_proof_runtime);
let segment_runtime = Arc::new(segment_runtime);
let app = Router::new().route(
"/prove",
post({
let runtime = runtime.clone();
move |body| prove(body, runtime, output_dir.clone(), prover_config)
let block_proof_runtime = block_proof_runtime.clone();
let segment_runtime = segment_runtime.clone();
move |body| {
prove(
body,
block_proof_runtime,
segment_runtime,
output_dir.clone(),
prover_config,
)
}
}),
);
let listener = tokio::net::TcpListener::bind(&addr).await?;
Expand Down Expand Up @@ -63,7 +74,8 @@ struct HttpProverInput {

async fn prove(
Json(payload): Json<HttpProverInput>,
runtime: Arc<Runtime>,
block_proof_runtime: Arc<Runtime>,
segment_runtime: Arc<Runtime>,
output_dir: PathBuf,
prover_config: ProverConfig,
) -> StatusCode {
Expand All @@ -75,7 +87,7 @@ async fn prove(
payload
.prover_input
.prove_test(
&runtime,
&segment_runtime,
payload.previous.map(futures::future::ok),
prover_config,
)
Expand All @@ -84,7 +96,8 @@ async fn prove(
payload
.prover_input
.prove(
&runtime,
&block_proof_runtime,
&segment_runtime,
payload.previous.map(futures::future::ok),
prover_config,
)
Expand Down
30 changes: 25 additions & 5 deletions zero_bin/leader/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,14 @@ async fn main() -> Result<()> {

let args = cli::Cli::parse();

let runtime = Runtime::from_config(&args.paladin, register()).await?;
let mut block_proof_paladin_args = args.paladin.clone();
block_proof_paladin_args.task_bus_routing_key = Some("block_proof".to_string());

let mut segment_paladin_args = args.paladin.clone();
segment_paladin_args.task_bus_routing_key = Some("segment".to_string());

let block_proof_runtime = Runtime::from_config(&block_proof_paladin_args, register()).await?;
let segment_runtime = Runtime::from_config(&segment_paladin_args, register()).await?;

let prover_config: ProverConfig = args.prover_config.into();

Expand All @@ -73,7 +80,13 @@ async fn main() -> Result<()> {
Command::Clean => zero_bin_common::prover_state::persistence::delete_all()?,
Command::Stdio { previous_proof } => {
let previous_proof = get_previous_proof(previous_proof)?;
stdio::stdio_main(runtime, previous_proof, prover_config).await?;
stdio::stdio_main(
block_proof_runtime,
segment_runtime,
previous_proof,
prover_config,
)
.await?;
}
Command::Http { port, output_dir } => {
// check if output_dir exists, is a directory, and is writable
Expand All @@ -85,7 +98,14 @@ async fn main() -> Result<()> {
panic!("output-dir is not a writable directory");
}

http::http_main(runtime, port, output_dir, prover_config).await?;
http::http_main(
block_proof_runtime,
segment_runtime,
port,
output_dir,
prover_config,
)
.await?;
}
Command::Rpc {
rpc_url,
Expand All @@ -99,7 +119,6 @@ async fn main() -> Result<()> {
backoff,
max_retries,
} => {
let runtime = Runtime::from_config(&args.paladin, register()).await?;
let previous_proof = get_previous_proof(previous_proof)?;
let mut block_interval = BlockInterval::new(&block_interval)?;

Expand All @@ -113,7 +132,8 @@ async fn main() -> Result<()> {

info!("Proving interval {block_interval}");
client_main(
runtime,
block_proof_runtime,
segment_runtime,
RpcParams {
rpc_url,
rpc_type,
Expand Down
17 changes: 13 additions & 4 deletions zero_bin/leader/src/stdio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use tracing::info;

/// The main function for the stdio mode.
pub(crate) async fn stdio_main(
runtime: Runtime,
block_proof_runtime: Runtime,
segment_runtime: Runtime,
previous: Option<GeneratedBlockProof>,
prover_config: ProverConfig,
) -> Result<()> {
Expand All @@ -21,9 +22,17 @@ pub(crate) async fn stdio_main(
.map(Into::into)
.collect::<Vec<BlockProverInputFuture>>();

let proved_blocks =
prover::prove(block_prover_inputs, &runtime, previous, prover_config, None).await;
runtime.close().await?;
let proved_blocks = prover::prove(
block_prover_inputs,
&block_proof_runtime,
&segment_runtime,
previous,
prover_config,
None,
)
.await;
block_proof_runtime.close().await?;
segment_runtime.close().await?;
let proved_blocks = proved_blocks?;

if prover_config.test_only {
Expand Down
25 changes: 16 additions & 9 deletions zero_bin/prover/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ impl BlockProverInput {

pub async fn prove(
self,
runtime: &Runtime,
block_proof_runtime: &Runtime,
segment_runtime: &Runtime,
previous: Option<impl Future<Output = Result<GeneratedBlockProof>>>,
prover_config: ProverConfig,
) -> Result<GeneratedBlockProof> {
Expand Down Expand Up @@ -99,7 +100,7 @@ impl BlockProverInput {

Directive::map(IndexedStream::from(segment_data_iterator), &seg_prove_ops)
.fold(&seg_agg_ops)
.run(runtime)
.run(segment_runtime)
.map(move |e| {
e.map(|p| (idx, proof_gen::proof_types::BatchAggregatableProof::from(p)))
})
Expand All @@ -109,7 +110,7 @@ impl BlockProverInput {
// Fold the batch aggregated proof stream into a single proof.
let final_batch_proof =
Directive::fold(IndexedStream::new(batch_proof_futs), &batch_agg_ops)
.run(runtime)
.run(block_proof_runtime)
.await?;

if let proof_gen::proof_types::BatchAggregatableProof::Agg(proof) = final_batch_proof {
Expand All @@ -126,7 +127,7 @@ impl BlockProverInput {
prev,
save_inputs_on_error,
})
.run(runtime)
.run(block_proof_runtime)
.await?;

info!("Successfully proved block {block_number}");
Expand All @@ -139,7 +140,7 @@ impl BlockProverInput {

pub async fn prove_test(
self,
runtime: &Runtime,
segment_runtime: &Runtime,
previous: Option<impl Future<Output = Result<GeneratedBlockProof>>>,
prover_config: ProverConfig,
) -> Result<GeneratedBlockProof> {
Expand Down Expand Up @@ -175,7 +176,7 @@ impl BlockProverInput {
);

simulation
.run(runtime)
.run(segment_runtime)
.await?
.try_for_each(|_| future::ok(()))
.await?;
Expand Down Expand Up @@ -204,7 +205,8 @@ impl BlockProverInput {
/// block proofs as well.
pub async fn prove(
block_prover_inputs: Vec<BlockProverInputFuture>,
runtime: &Runtime,
block_proof_runtime: &Runtime,
segment_runtime: &Runtime,
previous_proof: Option<GeneratedBlockProof>,
prover_config: ProverConfig,
proof_output_dir: Option<PathBuf>,
Expand All @@ -226,7 +228,7 @@ pub async fn prove(
// Prove the block
let block_proof = if prover_config.test_only {
block
.prove_test(runtime, previous_block_proof, prover_config)
.prove_test(segment_runtime, previous_block_proof, prover_config)
.then(move |proof| async move {
let proof = proof?;
let block_number = proof.b_height;
Expand All @@ -250,7 +252,12 @@ pub async fn prove(
.await?
} else {
block
.prove(runtime, previous_block_proof, prover_config)
.prove(
block_proof_runtime,
segment_runtime,
previous_block_proof,
prover_config,
)
.then(move |proof| async move {
let proof = proof?;
let block_number = proof.b_height;
Expand Down

0 comments on commit f0f4bf9

Please sign in to comment.