From 183b25dd1a9a4dbe3b625b4e45a85018567b2e64 Mon Sep 17 00:00:00 2001 From: Mirko von Leipzig Date: Fri, 15 Nov 2024 16:26:25 +0200 Subject: [PATCH 1/3] Move shared mempool constructor out-of-line --- crates/block-producer/src/mempool/mod.rs | 6 +++--- crates/block-producer/src/server/mod.rs | 9 +++++++-- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/crates/block-producer/src/mempool/mod.rs b/crates/block-producer/src/mempool/mod.rs index e793ef1df..6e753a511 100644 --- a/crates/block-producer/src/mempool/mod.rs +++ b/crates/block-producer/src/mempool/mod.rs @@ -109,8 +109,8 @@ impl Mempool { batch_limit: usize, block_limit: usize, state_retention: usize, - ) -> SharedMempool { - Arc::new(Mutex::new(Self { + ) -> Self { + Self { chain_tip, batch_transaction_limit: batch_limit, block_batch_limit: block_limit, @@ -119,7 +119,7 @@ impl Mempool { transactions: Default::default(), batches: Default::default(), next_batch_id: Default::default(), - })) + } } /// Adds a transaction to the mempool. diff --git a/crates/block-producer/src/server/mod.rs b/crates/block-producer/src/server/mod.rs index e1f3bf0b7..b621abcac 100644 --- a/crates/block-producer/src/server/mod.rs +++ b/crates/block-producer/src/server/mod.rs @@ -1,4 +1,4 @@ -use std::net::ToSocketAddrs; +use std::{net::ToSocketAddrs, sync::Arc}; use miden_node_proto::generated::{ block_producer::api_server, requests::SubmitProvenTransactionRequest, @@ -97,7 +97,12 @@ impl BlockProducer { chain_tip, } = self; - let mempool = Mempool::new(chain_tip, batch_limit, block_limit, state_retention); + let mempool = Arc::new(Mutex::new(Mempool::new( + chain_tip, + batch_limit, + block_limit, + state_retention, + ))); // Spawn rpc server and batch and block provers. // From 06aa0e9ddbf4daf2e9a3dadb577ece12f474cbce Mon Sep 17 00:00:00 2001 From: Mirko von Leipzig Date: Fri, 15 Nov 2024 16:26:25 +0200 Subject: [PATCH 2/3] Test and fix batch failures --- Cargo.lock | 17 +++ crates/block-producer/Cargo.toml | 1 + .../block-producer/src/mempool/batch_graph.rs | 2 +- .../src/mempool/dependency_graph.rs | 18 ++- crates/block-producer/src/mempool/mod.rs | 119 ++++++++++++++++-- .../src/test_utils/proven_tx.rs | 22 ++++ 6 files changed, 167 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4de7bc004..d0c59fede 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -677,6 +677,12 @@ dependencies = [ "syn", ] +[[package]] +name = "diff" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56254986775e3233ffa9c4d7d3faaf6d36a2c09d30b20687e9f88bc8bafc16c8" + [[package]] name = "digest" version = "0.10.7" @@ -1596,6 +1602,7 @@ dependencies = [ "miden-processor", "miden-stdlib", "miden-tx", + "pretty_assertions", "rand", "rand_chacha", "serde", @@ -2195,6 +2202,16 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "925383efa346730478fb4838dbe9137d2a47675ad789c546d150a6e1dd4ab31c" +[[package]] +name = "pretty_assertions" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ae130e2f271fbc2ac3a40fb1d07180839cdbbe443c7a27e1e3c13c5cac0116d" +dependencies = [ + "diff", + "yansi", +] + [[package]] name = "prettyplease" version = "0.2.25" diff --git a/crates/block-producer/Cargo.toml b/crates/block-producer/Cargo.toml index ec4eec1b1..75fc61d06 100644 --- a/crates/block-producer/Cargo.toml +++ b/crates/block-producer/Cargo.toml @@ -42,6 +42,7 @@ miden-lib = { workspace = true, features = ["testing"] } miden-node-test-macro = { path = "../test-macro" } miden-objects = { workspace = true, features = ["testing"] } miden-tx = { workspace = true, features = ["testing"] } +pretty_assertions = "1.4.1" rand_chacha = { version = "0.3", default-features = false } tokio = { workspace = true, features = ["test-util"] } winterfell = { version = "0.9" } diff --git a/crates/block-producer/src/mempool/batch_graph.rs b/crates/block-producer/src/mempool/batch_graph.rs index b83af277d..a3e450f5d 100644 --- a/crates/block-producer/src/mempool/batch_graph.rs +++ b/crates/block-producer/src/mempool/batch_graph.rs @@ -50,7 +50,7 @@ use crate::batch_builder::batch::TransactionBatch; /// │ ◄────┘ /// └───────────┘ /// ``` -#[derive(Default, Clone)] +#[derive(Default, Debug, Clone, PartialEq)] pub struct BatchGraph { /// Tracks the interdependencies between batches. inner: DependencyGraph, diff --git a/crates/block-producer/src/mempool/dependency_graph.rs b/crates/block-producer/src/mempool/dependency_graph.rs index 37b247590..d0d6be0bd 100644 --- a/crates/block-producer/src/mempool/dependency_graph.rs +++ b/crates/block-producer/src/mempool/dependency_graph.rs @@ -1,6 +1,6 @@ use std::{ collections::{BTreeMap, BTreeSet}, - fmt::Display, + fmt::{Debug, Display}, }; // DEPENDENCY GRAPH @@ -35,7 +35,7 @@ use std::{ /// │ ◄────┘ /// └───────────┘ /// ``` -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Clone, PartialEq, Eq)] pub struct DependencyGraph { /// Node's who's data is still pending. pending: BTreeSet, @@ -61,6 +61,18 @@ pub struct DependencyGraph { processed: BTreeSet, } +impl Debug for DependencyGraph +where + K: Debug, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DependencyGraph") + .field("pending", &self.pending) + .field("vertices", &self.vertices.keys()) + .finish() + } +} + #[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)] pub enum GraphError { #[error("Node {0} already exists")] @@ -99,7 +111,7 @@ impl Default for DependencyGraph { } } -impl DependencyGraph { +impl DependencyGraph { /// Inserts a new pending node into the graph. /// /// # Errors diff --git a/crates/block-producer/src/mempool/mod.rs b/crates/block-producer/src/mempool/mod.rs index 6e753a511..6fc4f8526 100644 --- a/crates/block-producer/src/mempool/mod.rs +++ b/crates/block-producer/src/mempool/mod.rs @@ -78,6 +78,7 @@ impl BlockNumber { pub type SharedMempool = Arc>; +#[derive(Clone, Debug, PartialEq)] pub struct Mempool { /// The latest inflight state of each account. /// @@ -98,6 +99,13 @@ pub struct Mempool { block_in_progress: Option>, + /// Batches which are currently being proven. + /// + /// This is used to identify jobs which have been cancelled by the mempool but might still be + /// submitted by the batch prover. This is achieved by ignoring all batch proofs which are not + /// in this set. + batches_in_progress: BTreeSet, + batch_transaction_limit: usize, block_batch_limit: usize, } @@ -116,6 +124,7 @@ impl Mempool { block_batch_limit: block_limit, state: InflightState::new(chain_tip, state_retention), block_in_progress: Default::default(), + batches_in_progress: Default::default(), transactions: Default::default(), batches: Default::default(), next_batch_id: Default::default(), @@ -159,6 +168,7 @@ impl Mempool { self.next_batch_id.increment(); self.batches.insert(batch_id, tx_ids, parents).expect("Malformed graph"); + self.batches_in_progress.insert(batch_id); Some((batch_id, batch)) } @@ -167,25 +177,37 @@ impl Mempool { /// /// Transactions are placed back in the queue. pub fn batch_failed(&mut self, batch: BatchJobId) { + // Batch may already have been removed as part of a parent batches failure. + if !self.batches_in_progress.contains(&batch) { + return; + } + let removed_batches = self.batches.remove_batches([batch].into()).expect("Batch was not present"); - // Its possible to receive failures for batches which were already removed - // as part of a prior failure. Early exit to prevent logging these no-ops. - if removed_batches.is_empty() { - return; - } + let transactions = removed_batches.values().flatten().copied().collect(); - let batches = removed_batches.keys().copied().collect::>(); - let transactions = removed_batches.into_values().flatten().collect(); + // Remove these batches from the active list so we can ignore any subsequent submissions. + removed_batches.keys().for_each(|batch| { + self.batches_in_progress.remove(batch); + }); self.transactions.requeue_transactions(transactions).expect("Malformed graph"); - tracing::warn!(%batch, descendents=?batches, "Batch failed, dropping all inflight descendent batches, impacted transactions are back in queue."); + tracing::warn!( + %batch, + descendents=?removed_batches.keys(), + "Batch failed, dropping all inflight descendent batches, impacted transactions are back in queue." + ); } /// Marks a batch as proven if it exists. pub fn batch_proved(&mut self, batch_id: BatchJobId, batch: TransactionBatch) { + if !self.batches_in_progress.remove(&batch_id) { + // Batch may have been removed as part of a parent batches failure. + return; + } + self.batches.submit_proof(batch_id, batch).expect("Malformed graph"); } @@ -250,3 +272,84 @@ impl Mempool { self.state.revert_transactions(transactions); } } + +#[cfg(test)] +mod tests { + use pretty_assertions::assert_eq; + + use super::*; + use crate::test_utils::MockProvenTxBuilder; + + impl Mempool { + fn for_tests() -> Self { + Self::new(BlockNumber::new(0), 5, 10, 5) + } + } + + // BATCH REVERSION TESTS + // ================================================================================================ + + #[test] + fn children_of_reverted_batches_are_ignored() { + //! Batches are proved concurrently. This makes it possible for a child job to complete + //! after the parent has been reverted. Such a child job should be ignored. + let txs = MockProvenTxBuilder::sequential(); + + let mut uut = Mempool::for_tests(); + uut.add_transaction(txs[0].clone()).unwrap(); + let (parent_batch, batch_txs) = uut.select_batch().unwrap(); + assert_eq!(batch_txs, vec![txs[0].clone()]); + + uut.add_transaction(txs[1].clone()).unwrap(); + let (child_batch_a, batch_txs) = uut.select_batch().unwrap(); + assert_eq!(batch_txs, vec![txs[1].clone()]); + + uut.add_transaction(txs[2].clone()).unwrap(); + let (child_batch_b, batch_txs) = uut.select_batch().unwrap(); + assert_eq!(batch_txs, vec![txs[2].clone()]); + + // Child batch jobs are now dangling. + uut.batch_failed(parent_batch); + let reference = uut.clone(); + + // Success or failure of the child job should effectively do nothing. + uut.batch_failed(child_batch_a); + assert_eq!(uut, reference); + + let proof = TransactionBatch::new( + vec![txs[2].raw_proven_transaction().clone()], + Default::default(), + ) + .unwrap(); + uut.batch_proved(child_batch_b, proof); + assert_eq!(uut, reference); + } + + #[test] + fn reverted_batch_transactions_are_requeued() { + let txs = MockProvenTxBuilder::sequential(); + + let mut uut = Mempool::for_tests(); + uut.add_transaction(txs[0].clone()).unwrap(); + uut.select_batch().unwrap(); + + uut.add_transaction(txs[1].clone()).unwrap(); + let (failed_batch, _) = uut.select_batch().unwrap(); + + uut.add_transaction(txs[2].clone()).unwrap(); + uut.select_batch().unwrap(); + + // Middle batch failed, so it and its child transaction should be re-entered into the queue. + uut.batch_failed(failed_batch); + + let mut reference = Mempool::for_tests(); + reference.add_transaction(txs[0].clone()).unwrap(); + reference.select_batch().unwrap(); + reference.add_transaction(txs[1].clone()).unwrap(); + reference.add_transaction(txs[2].clone()).unwrap(); + reference.next_batch_id.increment(); + reference.next_batch_id.increment(); + + assert_eq!(uut, reference); + } +} diff --git a/crates/block-producer/src/test_utils/proven_tx.rs b/crates/block-producer/src/test_utils/proven_tx.rs index cf804d8d7..271466341 100644 --- a/crates/block-producer/src/test_utils/proven_tx.rs +++ b/crates/block-producer/src/test_utils/proven_tx.rs @@ -1,5 +1,6 @@ use std::ops::Range; +use itertools::Itertools; use miden_air::HashFunction; use miden_objects::{ accounts::AccountId, @@ -8,9 +9,11 @@ use miden_objects::{ vm::ExecutionProof, Digest, Felt, Hasher, ONE, }; +use rand::Rng; use winterfell::Proof; use super::MockPrivateAccount; +use crate::domain::transaction::AuthenticatedTransaction; pub struct MockProvenTxBuilder { account_id: AccountId, @@ -29,6 +32,25 @@ impl MockProvenTxBuilder { Self::with_account(mock_account.id, mock_account.states[0], mock_account.states[1]) } + /// Generates 3 random, sequential transactions acting on the same account. + pub fn sequential() -> [AuthenticatedTransaction; 3] { + let mut rng = rand::thread_rng(); + let mock_account: MockPrivateAccount<4> = rng.gen::().into(); + + (0..3) + .map(|i| { + Self::with_account( + mock_account.id, + mock_account.states[i], + mock_account.states[i + 1], + ) + }) + .map(|tx| AuthenticatedTransaction::from_inner(tx.build())) + .collect_vec() + .try_into() + .expect("Sizes should match") + } + pub fn with_account( account_id: AccountId, initial_account_hash: Digest, From 8b1fca44e927321f531cb7bb4205bca7b0ffdc21 Mon Sep 17 00:00:00 2001 From: Mirko von Leipzig Date: Wed, 4 Dec 2024 12:32:19 +0200 Subject: [PATCH 3/3] Review changes --- .../block-producer/src/mempool/batch_graph.rs | 5 +++++ .../src/mempool/dependency_graph.rs | 2 ++ crates/block-producer/src/mempool/mod.rs | 20 +++---------------- 3 files changed, 10 insertions(+), 17 deletions(-) diff --git a/crates/block-producer/src/mempool/batch_graph.rs b/crates/block-producer/src/mempool/batch_graph.rs index 5f66bb6eb..c489cddde 100644 --- a/crates/block-producer/src/mempool/batch_graph.rs +++ b/crates/block-producer/src/mempool/batch_graph.rs @@ -239,6 +239,11 @@ impl BatchGraph { batches } + + /// Returns `true` if the graph contains the given batch. + pub fn contains(&self, id: &BatchJobId) -> bool { + self.batches.contains_key(id) + } } #[cfg(any(test, doctest))] diff --git a/crates/block-producer/src/mempool/dependency_graph.rs b/crates/block-producer/src/mempool/dependency_graph.rs index ed38f7f33..5fb2766a8 100644 --- a/crates/block-producer/src/mempool/dependency_graph.rs +++ b/crates/block-producer/src/mempool/dependency_graph.rs @@ -71,6 +71,8 @@ where .field("vertices", &self.vertices.keys()) .field("processed", &self.processed) .field("roots", &self.roots) + .field("parents", &self.parents) + .field("children", &self.children) .finish() } } diff --git a/crates/block-producer/src/mempool/mod.rs b/crates/block-producer/src/mempool/mod.rs index b23c7fcd8..3ebc98622 100644 --- a/crates/block-producer/src/mempool/mod.rs +++ b/crates/block-producer/src/mempool/mod.rs @@ -199,13 +199,6 @@ pub struct Mempool { /// The current inflight block, if any. block_in_progress: Option>, - /// Batches which are currently being proven. - /// - /// This is used to identify jobs which have been cancelled by the mempool but might still be - /// submitted by the batch prover. This is achieved by ignoring all batch proofs which are not - /// in this set. - batches_in_progress: BTreeSet, - block_budget: BlockBudget, batch_budget: BatchBudget, } @@ -233,7 +226,6 @@ impl Mempool { block_budget, state: InflightState::new(chain_tip, state_retention), block_in_progress: Default::default(), - batches_in_progress: Default::default(), transactions: Default::default(), batches: Default::default(), next_batch_id: Default::default(), @@ -278,7 +270,6 @@ impl Mempool { let batch_id = self.next_batch_id; self.next_batch_id.increment(); - self.batches_in_progress.insert(batch_id); self.batches .insert(batch_id, tx_ids, parents) .expect("Selected batch should insert"); @@ -291,7 +282,7 @@ impl Mempool { /// Transactions are placed back in the queue. pub fn batch_failed(&mut self, batch: BatchJobId) { // Batch may already have been removed as part of a parent batches failure. - if !self.batches_in_progress.contains(&batch) { + if !self.batches.contains(&batch) { return; } @@ -300,11 +291,6 @@ impl Mempool { let transactions = removed_batches.values().flatten().copied().collect(); - // Remove these batches from the active list so we can ignore any subsequent submissions. - removed_batches.keys().for_each(|batch| { - self.batches_in_progress.remove(batch); - }); - self.transactions .requeue_transactions(transactions) .expect("Transaction should requeue"); @@ -318,8 +304,8 @@ impl Mempool { /// Marks a batch as proven if it exists. pub fn batch_proved(&mut self, batch_id: BatchJobId, batch: TransactionBatch) { - if !self.batches_in_progress.remove(&batch_id) { - // Batch may have been removed as part of a parent batches failure. + // Batch may have been removed as part of a parent batches failure. + if !self.batches.contains(&batch_id) { return; }