Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(block-producer): handle reverted batches #557

Merged
merged 4 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/block-producer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ miden-lib = { workspace = true, features = ["testing"] }
miden-node-test-macro = { path = "../test-macro" }
miden-objects = { workspace = true, features = ["testing"] }
miden-tx = { workspace = true, features = ["testing"] }
pretty_assertions = "1.4.1"
rand_chacha = { version = "0.3", default-features = false }
tokio = { workspace = true, features = ["test-util"] }
winterfell = { version = "0.10" }
7 changes: 6 additions & 1 deletion crates/block-producer/src/mempool/batch_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use crate::batch_builder::batch::TransactionBatch;
/// │ <null> ◄────┘
/// └───────────┘
/// ```
#[derive(Default, Debug, Clone)]
#[derive(Default, Debug, Clone, PartialEq)]
pub struct BatchGraph {
/// Tracks the interdependencies between batches.
inner: DependencyGraph<BatchJobId, TransactionBatch>,
Expand Down Expand Up @@ -239,6 +239,11 @@ impl BatchGraph {

batches
}

/// Returns `true` if the graph contains the given batch.
pub fn contains(&self, id: &BatchJobId) -> bool {
self.batches.contains_key(id)
}
}

#[cfg(any(test, doctest))]
Expand Down
22 changes: 19 additions & 3 deletions crates/block-producer/src/mempool/dependency_graph.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{
collections::{BTreeMap, BTreeSet},
fmt::Display,
fmt::{Debug, Display},
};

// DEPENDENCY GRAPH
Expand Down Expand Up @@ -35,7 +35,7 @@ use std::{
/// │ <null> ◄────┘
/// └───────────┘
/// ```
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Clone, PartialEq, Eq)]
pub struct DependencyGraph<K, V> {
/// Node's who's data is still pending.
pending: BTreeSet<K>,
Expand All @@ -61,6 +61,22 @@ pub struct DependencyGraph<K, V> {
processed: BTreeSet<K>,
}

impl<K, V> Debug for DependencyGraph<K, V>
where
K: Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DependencyGraph")
.field("pending", &self.pending)
.field("vertices", &self.vertices.keys())
.field("processed", &self.processed)
.field("roots", &self.roots)
.field("parents", &self.parents)
.field("children", &self.children)
.finish()
}
}

#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)]
pub enum GraphError<K> {
#[error("Node {0} already exists")]
Expand Down Expand Up @@ -99,7 +115,7 @@ impl<K, V> Default for DependencyGraph<K, V> {
}
}

impl<K: Ord + Copy + Display + std::fmt::Debug, V: Clone> DependencyGraph<K, V> {
impl<K: Ord + Copy + Display + Debug, V: Clone> DependencyGraph<K, V> {
/// Inserts a new pending node into the graph.
///
/// # Errors
Expand Down
127 changes: 113 additions & 14 deletions crates/block-producer/src/mempool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ impl BlockBudget {

pub type SharedMempool = Arc<Mutex<Mempool>>;

#[derive(Clone, Debug, PartialEq)]
pub struct Mempool {
/// The latest inflight state of each account.
///
Expand All @@ -198,19 +199,28 @@ pub struct Mempool {
/// The current inflight block, if any.
block_in_progress: Option<BTreeSet<BatchJobId>>,

batch_budget: BatchBudget,
block_budget: BlockBudget,
batch_budget: BatchBudget,
}

impl Mempool {
/// Creates a new [Mempool] with the provided configuration.
pub fn new(
/// Creates a new [SharedMempool] with the provided configuration.
pub fn shared(
chain_tip: BlockNumber,
batch_budget: BatchBudget,
block_budget: BlockBudget,
state_retention: usize,
) -> SharedMempool {
Arc::new(Mutex::new(Self {
Arc::new(Mutex::new(Self::new(chain_tip, batch_budget, block_budget, state_retention)))
}

fn new(
chain_tip: BlockNumber,
batch_budget: BatchBudget,
block_budget: BlockBudget,
state_retention: usize,
) -> Mempool {
Self {
chain_tip,
batch_budget,
block_budget,
Expand All @@ -219,7 +229,7 @@ impl Mempool {
transactions: Default::default(),
batches: Default::default(),
next_batch_id: Default::default(),
}))
}
}

/// Adds a transaction to the mempool.
Expand Down Expand Up @@ -271,27 +281,34 @@ impl Mempool {
///
/// Transactions are placed back in the queue.
pub fn batch_failed(&mut self, batch: BatchJobId) {
let removed_batches =
self.batches.remove_batches([batch].into()).expect("Batch was not present");

// Its possible to receive failures for batches which were already removed
// as part of a prior failure. Early exit to prevent logging these no-ops.
if removed_batches.is_empty() {
// Batch may already have been removed as part of a parent batches failure.
if !self.batches.contains(&batch) {
return;
}

let batches = removed_batches.keys().copied().collect::<Vec<_>>();
let transactions = removed_batches.into_values().flatten().collect();
let removed_batches =
self.batches.remove_batches([batch].into()).expect("Batch was not present");

let transactions = removed_batches.values().flatten().copied().collect();

self.transactions
.requeue_transactions(transactions)
.expect("Transaction should requeue");

tracing::warn!(%batch, descendents=?batches, "Batch failed, dropping all inflight descendent batches, impacted transactions are back in queue.");
tracing::warn!(
%batch,
descendents=?removed_batches.keys(),
"Batch failed, dropping all inflight descendent batches, impacted transactions are back in queue."
);
}

/// Marks a batch as proven if it exists.
pub fn batch_proved(&mut self, batch_id: BatchJobId, batch: TransactionBatch) {
// Batch may have been removed as part of a parent batches failure.
if !self.batches.contains(&batch_id) {
return;
}

self.batches.submit_proof(batch_id, batch).expect("Batch proof should submit");
}

Expand Down Expand Up @@ -356,3 +373,85 @@ impl Mempool {
self.state.revert_transactions(transactions);
}
}

#[cfg(test)]
mod tests {
use pretty_assertions::assert_eq;

use super::*;
use crate::test_utils::MockProvenTxBuilder;

impl Mempool {
fn for_tests() -> Self {
Self::new(BlockNumber::new(0), Default::default(), Default::default(), 5)
}
}

// BATCH REVERSION TESTS
// ================================================================================================

#[test]
fn children_of_reverted_batches_are_ignored() {
// Batches are proved concurrently. This makes it possible for a child job to complete after
// the parent has been reverted (and therefore reverting the child job). Such a child job
// should be ignored.
let txs = MockProvenTxBuilder::sequential();

let mut uut = Mempool::for_tests();
uut.add_transaction(txs[0].clone()).unwrap();
let (parent_batch, batch_txs) = uut.select_batch().unwrap();
assert_eq!(batch_txs, vec![txs[0].clone()]);

uut.add_transaction(txs[1].clone()).unwrap();
let (child_batch_a, batch_txs) = uut.select_batch().unwrap();
assert_eq!(batch_txs, vec![txs[1].clone()]);

uut.add_transaction(txs[2].clone()).unwrap();
let (child_batch_b, batch_txs) = uut.select_batch().unwrap();
assert_eq!(batch_txs, vec![txs[2].clone()]);

// Child batch jobs are now dangling.
uut.batch_failed(parent_batch);
let reference = uut.clone();

// Success or failure of the child job should effectively do nothing.
uut.batch_failed(child_batch_a);
assert_eq!(uut, reference);

let proof = TransactionBatch::new(
vec![txs[2].raw_proven_transaction().clone()],
Default::default(),
)
.unwrap();
uut.batch_proved(child_batch_b, proof);
assert_eq!(uut, reference);
}

#[test]
fn reverted_batch_transactions_are_requeued() {
let txs = MockProvenTxBuilder::sequential();

let mut uut = Mempool::for_tests();
uut.add_transaction(txs[0].clone()).unwrap();
uut.select_batch().unwrap();

uut.add_transaction(txs[1].clone()).unwrap();
let (failed_batch, _) = uut.select_batch().unwrap();

uut.add_transaction(txs[2].clone()).unwrap();
uut.select_batch().unwrap();

// Middle batch failed, so it and its child transaction should be re-entered into the queue.
uut.batch_failed(failed_batch);

let mut reference = Mempool::for_tests();
reference.add_transaction(txs[0].clone()).unwrap();
reference.select_batch().unwrap();
reference.add_transaction(txs[1].clone()).unwrap();
reference.add_transaction(txs[2].clone()).unwrap();
reference.next_batch_id.increment();
reference.next_batch_id.increment();

assert_eq!(uut, reference);
}
}
2 changes: 1 addition & 1 deletion crates/block-producer/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl BlockProducer {
chain_tip,
} = self;

let mempool = Mempool::new(chain_tip, batch_budget, block_budget, state_retention);
let mempool = Mempool::shared(chain_tip, batch_budget, block_budget, state_retention);

// Spawn rpc server and batch and block provers.
//
Expand Down
22 changes: 22 additions & 0 deletions crates/block-producer/src/test_utils/proven_tx.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::ops::Range;

use itertools::Itertools;
use miden_air::HashFunction;
use miden_objects::{
accounts::AccountId,
Expand All @@ -8,9 +9,11 @@ use miden_objects::{
vm::ExecutionProof,
Digest, Felt, Hasher, ONE,
};
use rand::Rng;
use winterfell::Proof;

use super::MockPrivateAccount;
use crate::domain::transaction::AuthenticatedTransaction;

pub struct MockProvenTxBuilder {
account_id: AccountId,
Expand All @@ -29,6 +32,25 @@ impl MockProvenTxBuilder {
Self::with_account(mock_account.id, mock_account.states[0], mock_account.states[1])
}

/// Generates 3 random, sequential transactions acting on the same account.
pub fn sequential() -> [AuthenticatedTransaction; 3] {
let mut rng = rand::thread_rng();
let mock_account: MockPrivateAccount<4> = rng.gen::<u32>().into();

(0..3)
.map(|i| {
Self::with_account(
mock_account.id,
mock_account.states[i],
mock_account.states[i + 1],
)
})
.map(|tx| AuthenticatedTransaction::from_inner(tx.build()))
.collect_vec()
.try_into()
.expect("Sizes should match")
}

pub fn with_account(
account_id: AccountId,
initial_account_hash: Digest,
Expand Down
Loading