Skip to content

Commit

Permalink
feat(merkle tree): Snapshot recovery for Merkle tree (#163)
Browse files Browse the repository at this point in the history
# What ❔

Allows to recover a Merkle tree from a snapshot (collection of tree
entries ordered by ascending key). The recovery procedure is
fault-tolerant (may be paused and restarted).

## Why ❔

This is one of components for recovering a node from a snapshot.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
  • Loading branch information
slowli authored Oct 20, 2023
1 parent 44f7179 commit 9e20703
Show file tree
Hide file tree
Showing 25 changed files with 2,390 additions and 369 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/lib/merkle_tree/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@ serde = { version = "1", features = ["derive"] }
serde_json = "1"
serde_with = { version = "1", features = ["hex"] }
tempfile = "3.0.2"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
23 changes: 16 additions & 7 deletions core/lib/merkle_tree/examples/loadtest/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use clap::Parser;
use rand::{rngs::StdRng, seq::IteratorRandom, SeedableRng};
use tempfile::TempDir;
use tracing_subscriber::EnvFilter;

use std::{
thread,
Expand Down Expand Up @@ -66,8 +67,16 @@ struct Cli {
}

impl Cli {
fn init_logging() {
tracing_subscriber::fmt()
.pretty()
.with_env_filter(EnvFilter::from_default_env())
.init();
}

fn run(self) {
println!("Launched with options: {self:?}");
Self::init_logging();
tracing::info!("Launched with options: {self:?}");

let (mut mock_db, mut rocksdb);
let mut _temp_dir = None;
Expand All @@ -77,7 +86,7 @@ impl Cli {
&mut mock_db
} else {
let dir = TempDir::new().expect("failed creating temp dir for RocksDB");
println!(
tracing::info!(
"Created temp dir for RocksDB: {}",
dir.path().to_string_lossy()
);
Expand Down Expand Up @@ -127,7 +136,7 @@ impl Cli {
let updated_keys = Self::generate_keys(updated_indices.into_iter());
let kvs = new_keys.into_iter().chain(updated_keys).zip(values);

println!("Processing block #{version}");
tracing::info!("Processing block #{version}");
let start = Instant::now();
let root_hash = if self.proofs {
let reads = Self::generate_keys(read_indices.into_iter())
Expand All @@ -143,15 +152,15 @@ impl Cli {
output.root_hash
};
let elapsed = start.elapsed();
println!("Processed block #{version} in {elapsed:?}, root hash = {root_hash:?}");
tracing::info!("Processed block #{version} in {elapsed:?}, root hash = {root_hash:?}");
}

println!("Verifying tree consistency...");
tracing::info!("Verifying tree consistency...");
let start = Instant::now();
tree.verify_consistency(self.commit_count - 1)
.expect("tree consistency check failed");
let elapsed = start.elapsed();
println!("Verified tree consistency in {elapsed:?}");
tracing::info!("Verified tree consistency in {elapsed:?}");

if let Some((pruner_handle, pruner_thread)) = pruner_handles {
pruner_handle.abort();
Expand All @@ -170,5 +179,5 @@ impl Cli {
}

fn main() {
Cli::parse().run()
Cli::parse().run();
}
120 changes: 120 additions & 0 deletions core/lib/merkle_tree/examples/recovery.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
//! Tree recovery load test.
use clap::Parser;
use rand::{rngs::StdRng, Rng, SeedableRng};
use tempfile::TempDir;
use tracing_subscriber::EnvFilter;

use std::time::Instant;

use zksync_crypto::hasher::blake2::Blake2Hasher;
use zksync_merkle_tree::{
recovery::{MerkleTreeRecovery, RecoveryEntry},
HashTree, Key, PatchSet, PruneDatabase, RocksDBWrapper, ValueHash,
};
use zksync_storage::RocksDB;

/// CLI for load-testing Merkle tree recovery.
#[derive(Debug, Parser)]
struct Cli {
/// Number of updates to perform.
#[arg(name = "updates")]
update_count: u64,
/// Number of entries per update.
#[arg(name = "ops")]
writes_per_update: usize,
/// Use a no-op hashing function.
#[arg(name = "no-hash", long)]
no_hashing: bool,
/// Perform testing on in-memory DB rather than RocksDB (i.e., with focus on hashing logic).
#[arg(long = "in-memory", short = 'M')]
in_memory: bool,
/// Block cache capacity for RocksDB in bytes.
#[arg(long = "block-cache", conflicts_with = "in_memory")]
block_cache: Option<usize>,
/// Seed to use in the RNG for reproducibility.
#[arg(long = "rng-seed", default_value = "0")]
rng_seed: u64,
}

impl Cli {
fn init_logging() {
tracing_subscriber::fmt()
.pretty()
.with_env_filter(EnvFilter::from_default_env())
.init();
}

fn run(self) {
Self::init_logging();
tracing::info!("Launched with options: {self:?}");

let (mut mock_db, mut rocksdb);
let mut _temp_dir = None;
let db: &mut dyn PruneDatabase = if self.in_memory {
mock_db = PatchSet::default();
&mut mock_db
} else {
let dir = TempDir::new().expect("failed creating temp dir for RocksDB");
tracing::info!(
"Created temp dir for RocksDB: {}",
dir.path().to_string_lossy()
);
rocksdb = if let Some(block_cache_capacity) = self.block_cache {
let db = RocksDB::with_cache(dir.path(), Some(block_cache_capacity));
RocksDBWrapper::from(db)
} else {
RocksDBWrapper::new(dir.path())
};
_temp_dir = Some(dir);
&mut rocksdb
};

let hasher: &dyn HashTree = if self.no_hashing { &() } else { &Blake2Hasher };
let mut rng = StdRng::seed_from_u64(self.rng_seed);

let recovered_version = 123;
let key_step =
Key::MAX / (Key::from(self.update_count) * Key::from(self.writes_per_update));
assert!(key_step > Key::from(u64::MAX));
// ^ Total number of generated keys is <2^128.

let mut last_key = Key::zero();
let mut last_leaf_index = 0;
let mut recovery = MerkleTreeRecovery::with_hasher(db, recovered_version, hasher);
let recovery_started_at = Instant::now();
for updated_idx in 0..self.update_count {
let started_at = Instant::now();
let recovery_entries = (0..self.writes_per_update)
.map(|_| {
last_key += key_step - Key::from(rng.gen::<u64>());
// ^ Increases the key by a random increment close to `key` step with some randomness.
last_leaf_index += 1;
RecoveryEntry {
key: last_key,
value: ValueHash::zero(),
leaf_index: last_leaf_index,
}
})
.collect();
recovery.extend(recovery_entries);
tracing::info!(
"Updated tree with recovery chunk #{updated_idx} in {:?}",
started_at.elapsed()
);
}

let tree = recovery.finalize();
tracing::info!(
"Recovery finished in {:?}; verifying consistency...",
recovery_started_at.elapsed()
);
let started_at = Instant::now();
tree.verify_consistency(recovered_version).unwrap();
tracing::info!("Verified consistency in {:?}", started_at.elapsed());
}
}

fn main() {
Cli::parse().run();
}
89 changes: 85 additions & 4 deletions core/lib/merkle_tree/src/consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ pub enum ConsistencyError {
},
#[error("leaf with key {full_key} has same index {index} as another key")]
DuplicateLeafIndex { index: u64, full_key: Key },
#[error("internal node with key {key} does not have children")]
EmptyInternalNode { key: NodeKey },
#[error(
"internal node with key {key} should have version {expected_version} (max among child ref versions)"
)]
KeyVersionMismatch { key: NodeKey, expected_version: u64 },
#[error("root node should have version >={max_child_version} (max among child ref versions)")]
RootVersionMismatch { max_child_version: u64 },
}

impl<DB> MerkleTree<'_, DB>
Expand Down Expand Up @@ -109,6 +117,21 @@ where
}

Node::Internal(node) => {
let expected_version = node.child_refs().map(|child_ref| child_ref.version).max();
let Some(expected_version) = expected_version else {
return Err(ConsistencyError::EmptyInternalNode { key });
};
if !key.is_empty() && expected_version != key.version {
return Err(ConsistencyError::KeyVersionMismatch {
key,
expected_version,
});
} else if key.is_empty() && expected_version > key.version {
return Err(ConsistencyError::RootVersionMismatch {
max_child_version: expected_version,
});
}

// `.into_par_iter()` below is the only place where `rayon`-based parallelism
// is used in tree verification.
let children: Vec<_> = node.children().collect();
Expand Down Expand Up @@ -239,7 +262,7 @@ mod tests {
use std::num::NonZeroU64;

use super::*;
use crate::PatchSet;
use crate::{types::InternalNode, PatchSet};
use zksync_types::{H256, U256};

const FIRST_KEY: Key = U256([0, 0, 0, 0x_dead_beef_0000_0000]);
Expand Down Expand Up @@ -284,7 +307,7 @@ mod tests {
#[test]
fn missing_root_error() {
let mut db = prepare_database();
db.roots_mut().remove(&0);
db.remove_root(0);

let err = MerkleTree::new(db).verify_consistency(0).unwrap_err();
assert_matches!(err, ConsistencyError::MissingRoot(0));
Expand All @@ -311,7 +334,7 @@ mod tests {
fn leaf_count_mismatch_error() {
let mut db = prepare_database();

let root = db.roots_mut().get_mut(&0).unwrap();
let root = db.root_mut(0).unwrap();
let Root::Filled { leaf_count, .. } = root else {
panic!("unexpected root: {root:?}");
};
Expand All @@ -331,7 +354,7 @@ mod tests {
fn hash_mismatch_error() {
let mut db = prepare_database();

let root = db.roots_mut().get_mut(&0).unwrap();
let root = db.root_mut(0).unwrap();
let Root::Filled {
node: Node::Internal(node),
..
Expand Down Expand Up @@ -412,4 +435,62 @@ mod tests {
let err = MerkleTree::new(db).verify_consistency(0).unwrap_err();
assert_matches!(err, ConsistencyError::DuplicateLeafIndex { index: 1, .. });
}

#[test]
fn empty_internal_node_error() {
let mut db = prepare_database();
let node_key = db.nodes_mut().find_map(|(key, node)| {
if let Node::Internal(node) = node {
*node = InternalNode::default();
return Some(*key);
}
None
});
let node_key = node_key.unwrap();

let err = MerkleTree::new(db).verify_consistency(0).unwrap_err();
assert_matches!(err, ConsistencyError::EmptyInternalNode { key } if key == node_key);
}

#[test]
fn version_mismatch_error() {
let mut db = prepare_database();
let node_key = db.nodes_mut().find_map(|(key, node)| {
if let Node::Internal(node) = node {
let (nibble, _) = node.children().next().unwrap();
node.child_ref_mut(nibble).unwrap().version = 1;
return Some(*key);
}
None
});
let node_key = node_key.unwrap();

let err = MerkleTree::new(db).verify_consistency(0).unwrap_err();
assert_matches!(
err,
ConsistencyError::KeyVersionMismatch { key, expected_version: 1 } if key == node_key
);
}

#[test]
fn root_version_mismatch_error() {
let mut db = prepare_database();
let Some(Root::Filled {
node: Node::Internal(node),
..
}) = db.root_mut(0)
else {
unreachable!();
};
let (nibble, _) = node.children().next().unwrap();
node.child_ref_mut(nibble).unwrap().version = 42;

let err = MerkleTree::new(db).verify_consistency(0).unwrap_err();
assert_matches!(
err,
ConsistencyError::RootVersionMismatch {
max_child_version: 42,
}
);
}
}
10 changes: 7 additions & 3 deletions core/lib/merkle_tree/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ mod getters;
mod hasher;
mod metrics;
mod pruning;
pub mod recovery;
mod storage;
mod types;
mod utils;
Expand Down Expand Up @@ -146,7 +147,7 @@ impl<'a, DB: Database> MerkleTree<'a, DB> {
pub fn with_hasher(db: DB, hasher: &'a dyn HashTree) -> Self {
let tags = db.manifest().and_then(|manifest| manifest.tags);
if let Some(tags) = tags {
tags.assert_consistency(hasher);
tags.assert_consistency(hasher, false);
}
// If there are currently no tags in the tree, we consider that it fits
// for backward compatibility. The tags will be added the next time the tree is saved.
Expand Down Expand Up @@ -208,7 +209,7 @@ impl<'a, DB: Database> MerkleTree<'a, DB> {
/// Returns information about the update such as the final tree hash.
pub fn extend(&mut self, key_value_pairs: Vec<(Key, ValueHash)>) -> BlockOutput {
let next_version = self.db.manifest().unwrap_or_default().version_count;
let storage = Storage::new(&self.db, self.hasher, next_version);
let storage = Storage::new(&self.db, self.hasher, next_version, true);
let (output, patch) = storage.extend(key_value_pairs);
self.db.apply_patch(patch);
output
Expand All @@ -226,7 +227,7 @@ impl<'a, DB: Database> MerkleTree<'a, DB> {
instructions: Vec<(Key, TreeInstruction)>,
) -> BlockOutputWithProofs {
let next_version = self.db.manifest().unwrap_or_default().version_count;
let storage = Storage::new(&self.db, self.hasher, next_version);
let storage = Storage::new(&self.db, self.hasher, next_version, true);
let (output, patch) = storage.extend_with_proofs(instructions);
self.db.apply_patch(patch);
output
Expand All @@ -246,6 +247,7 @@ mod tests {
architecture: "AR64MT".to_owned(),
depth: 256,
hasher: "blake2s256".to_string(),
is_recovering: false,
});

MerkleTree::new(db);
Expand All @@ -259,6 +261,7 @@ mod tests {
architecture: "AR16MT".to_owned(),
depth: 128,
hasher: "blake2s256".to_string(),
is_recovering: false,
});

MerkleTree::new(db);
Expand All @@ -272,6 +275,7 @@ mod tests {
architecture: "AR16MT".to_owned(),
depth: 256,
hasher: "sha256".to_string(),
is_recovering: false,
});

MerkleTree::new(db);
Expand Down
Loading

0 comments on commit 9e20703

Please sign in to comment.