From 8e7b2968fe2f32d10b95b291039b6a8cb5354604 Mon Sep 17 00:00:00 2001 From: tomg10 Date: Sun, 31 Mar 2024 16:01:44 +0200 Subject: [PATCH 01/17] improved tree pruner Signed-off-by: tomg10 --- .../lib/merkle_tree/examples/loadtest/main.rs | 12 +- core/lib/merkle_tree/src/domain.rs | 8 +- core/lib/merkle_tree/src/lib.rs | 4 +- core/lib/merkle_tree/src/pruning.rs | 140 +++++++++++++----- .../tests/integration/merkle_tree.rs | 4 +- .../src/metadata_calculator/helpers.rs | 7 +- .../src/metadata_calculator/mod.rs | 18 ++- .../src/metadata_calculator/pruning.rs | 20 +++ 8 files changed, 169 insertions(+), 44 deletions(-) create mode 100644 core/lib/zksync_core/src/metadata_calculator/pruning.rs diff --git a/core/lib/merkle_tree/examples/loadtest/main.rs b/core/lib/merkle_tree/examples/loadtest/main.rs index 53a641750d10..199133ffd5ae 100644 --- a/core/lib/merkle_tree/examples/loadtest/main.rs +++ b/core/lib/merkle_tree/examples/loadtest/main.rs @@ -14,8 +14,8 @@ use tempfile::TempDir; use tracing_subscriber::EnvFilter; use zksync_crypto::hasher::blake2::Blake2Hasher; use zksync_merkle_tree::{ - Database, HashTree, MerkleTree, MerkleTreePruner, PatchSet, RocksDBWrapper, TreeEntry, - TreeInstruction, + Database, HashTree, KeepConstantVersionsCount, MerkleTree, MerkleTreePruner, PatchSet, + RocksDBWrapper, TreeEntry, TreeInstruction, }; use zksync_storage::{RocksDB, RocksDBOptions}; use zksync_types::{AccountTreeId, Address, StorageKey, H256, U256}; @@ -102,9 +102,13 @@ impl Cli { } if self.prune { - let (mut pruner, pruner_handle) = MerkleTreePruner::new(rocksdb.clone(), 0); + let (mut pruner, pruner_handle) = MerkleTreePruner::new(rocksdb.clone()); pruner.set_poll_interval(Duration::from_secs(10)); - let pruner_thread = thread::spawn(|| pruner.run()); + let pruner_thread = thread::spawn(|| { + pruner.run(Box::new(KeepConstantVersionsCount { + past_versions_to_keep: 0, + })) + }); pruner_handles = Some((pruner_handle, pruner_thread)); } _temp_dir = Some(dir); diff --git a/core/lib/merkle_tree/src/domain.rs b/core/lib/merkle_tree/src/domain.rs index b2515dd0c849..17d0422c08dd 100644 --- a/core/lib/merkle_tree/src/domain.rs +++ b/core/lib/merkle_tree/src/domain.rs @@ -14,7 +14,7 @@ use crate::{ Key, Root, TreeEntry, TreeEntryWithProof, TreeInstruction, TreeLogEntry, ValueHash, TREE_DEPTH, }, - BlockOutput, HashTree, MerkleTree, NoVersionError, + BlockOutput, HashTree, MerkleTree, MerkleTreePruner, MerkleTreePrunerHandle, NoVersionError, }; /// Metadata for the current tree state. @@ -61,6 +61,12 @@ impl ZkSyncTree { .expect("failed initializing `rayon` thread pool") } + /// Returns tree pruner and a handle to stop it + pub fn pruner(&self) -> (MerkleTreePruner, MerkleTreePrunerHandle) { + let db = self.tree.db.inner().clone(); + MerkleTreePruner::new(db) + } + /// Returns metadata based on `storage_logs` generated by the genesis L1 batch. This does not /// create a persistent tree. pub fn process_genesis_batch(storage_logs: &[TreeInstruction]) -> BlockOutput { diff --git a/core/lib/merkle_tree/src/lib.rs b/core/lib/merkle_tree/src/lib.rs index 687e957f8ef4..f8cc09e847bd 100644 --- a/core/lib/merkle_tree/src/lib.rs +++ b/core/lib/merkle_tree/src/lib.rs @@ -51,7 +51,9 @@ use zksync_crypto::hasher::blake2::Blake2Hasher; pub use crate::{ errors::NoVersionError, hasher::{HashTree, TreeRangeDigest}, - pruning::{MerkleTreePruner, MerkleTreePrunerHandle}, + pruning::{ + KeepConstantVersionsCount, MerkleTreePruner, MerkleTreePrunerHandle, RetainedVersionSource, + }, storage::{ Database, MerkleTreeColumnFamily, PatchSet, Patched, PruneDatabase, PrunePatchSet, RocksDBWrapper, diff --git a/core/lib/merkle_tree/src/pruning.rs b/core/lib/merkle_tree/src/pruning.rs index 5b1911ca6005..9fade70745fa 100644 --- a/core/lib/merkle_tree/src/pruning.rs +++ b/core/lib/merkle_tree/src/pruning.rs @@ -1,6 +1,6 @@ //! Tree pruning logic. -use std::{fmt, sync::mpsc, time::Duration}; +use std::{cmp::min, fmt, sync::mpsc, time::Duration}; use crate::{ metrics::{PruningStats, PRUNING_TIMINGS}, @@ -24,6 +24,27 @@ impl MerkleTreePrunerHandle { } } +/// objects implementing this trait can be passed to pruner main loop, they act as a source of info about up to which version the tree should be pruned +pub trait RetainedVersionSource { + /// Returns info up to which version (l1_batch) the tree should be pruned up to + fn target_retained_version(&self, last_prunable_version: u64) -> anyhow::Result; +} + +/// Pruner 'algorithm' simulating keeping a constant number of past versions +#[derive(Debug)] +pub struct KeepConstantVersionsCount { + /// How many past versions of tree should be kept + pub past_versions_to_keep: u64, +} + +impl RetainedVersionSource for KeepConstantVersionsCount { + fn target_retained_version(&self, last_prunable_version: u64) -> anyhow::Result { + Ok(last_prunable_version + .checked_sub(self.past_versions_to_keep) + .unwrap()) + } +} + /// Component responsible for Merkle tree pruning, i.e. removing nodes not referenced by new versions /// of the tree. A pruner should be instantiated using a [`Clone`] of the tree database, possibly /// configured and then [`run()`](Self::run()) on its own thread. [`MerkleTreePrunerHandle`] provides @@ -39,7 +60,6 @@ impl MerkleTreePrunerHandle { /// where `N` is a configurable number set when the pruner [is created](Self::new()). pub struct MerkleTreePruner { db: DB, - past_versions_to_keep: u64, target_pruned_key_count: usize, poll_interval: Duration, aborted_receiver: mpsc::Receiver<()>, @@ -49,7 +69,6 @@ impl fmt::Debug for MerkleTreePruner { fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { formatter .debug_struct("MerkleTreePruner") - .field("past_versions_to_keep", &self.past_versions_to_keep) .field("target_pruned_key_count", &self.target_pruned_key_count) .field("poll_interval", &self.poll_interval) .finish_non_exhaustive() @@ -64,12 +83,11 @@ impl MerkleTreePruner { /// /// Returns the created pruner and a handle to it. *The pruner will be aborted when its handle /// is dropped.* - pub fn new(db: DB, past_versions_to_keep: u64) -> (Self, MerkleTreePrunerHandle) { + pub fn new(db: DB) -> (Self, MerkleTreePrunerHandle) { let (aborted_sender, aborted_receiver) = mpsc::channel(); let handle = MerkleTreePrunerHandle { aborted_sender }; let this = Self { db, - past_versions_to_keep, target_pruned_key_count: 500_000, poll_interval: Duration::from_secs(60), aborted_receiver, @@ -94,17 +112,24 @@ impl MerkleTreePruner { self.poll_interval = poll_interval; } - fn target_retained_version(&self) -> Option { + /// Returns max version number that can be safely pruned, so that after pruning there is at least one version present after pruning + pub fn last_prunable_version(&self) -> Option { let manifest = self.db.manifest()?; - let latest_version = manifest.version_count.checked_sub(1)?; - latest_version.checked_sub(self.past_versions_to_keep) + manifest.version_count.checked_sub(1) } #[doc(hidden)] // Used in integration tests; logically private #[allow(clippy::range_plus_one)] // exclusive range is required by `PrunePatchSet` constructor - pub fn run_once(&mut self) -> Option { - let target_retained_version = self.target_retained_version()?; + pub fn prune_up_to(&mut self, target_retained_version: u64) -> Option { let min_stale_key_version = self.db.min_stale_key_version()?; + + //We must leave at least one version + let last_prunable_version = self.last_prunable_version(); + if last_prunable_version.is_none() { + tracing::info!("Nothing to prune; skipping"); + return None; + } + let target_retained_version = min(target_retained_version, last_prunable_version.unwrap()); let stale_key_new_versions = min_stale_key_version..=target_retained_version; tracing::info!("Collecting stale keys with new versions in {stale_key_new_versions:?}"); @@ -142,11 +167,44 @@ impl MerkleTreePruner { Some(stats) } + fn wait_for_next_iteration(&mut self, timeout: Duration) -> bool { + match self.aborted_receiver.recv_timeout(timeout) { + Ok(()) => false, // Abort was requested + Err(mpsc::RecvTimeoutError::Disconnected) => { + tracing::warn!("Pruner handle is dropped without calling `abort()`; exiting"); + false + } + // The pruner handle is alive and wasn't used to abort the pruner. + Err(mpsc::RecvTimeoutError::Timeout) => true, + } + } + /// Runs this pruner indefinitely until it is aborted by dropping its handle. - pub fn run(mut self) { + pub fn run(mut self, retained_version_source: Box) { tracing::info!("Started Merkle tree pruner {self:?}"); loop { - let timeout = if let Some(stats) = self.run_once() { + let last_version = self.last_prunable_version(); + if last_version.is_none() { + tracing::info!("Nothing to prune in tree, sleeping"); + if !self.wait_for_next_iteration(self.poll_interval) { + break; + } + continue; + } + let retained_version = + retained_version_source.target_retained_version(last_version.unwrap()); + if retained_version.is_err() { + tracing::warn!( + "Unable to determine tree target retained version, error was: {}", + retained_version.unwrap_err() + ); + if !self.wait_for_next_iteration(self.poll_interval) { + break; + } + continue; + } + + let timeout = if let Some(stats) = self.prune_up_to(retained_version.unwrap()) { let has_more_work = stats.has_more_work(); stats.report(); if has_more_work { @@ -159,15 +217,8 @@ impl MerkleTreePruner { self.poll_interval }; - match self.aborted_receiver.recv_timeout(timeout) { - Ok(()) => break, // Abort was requested - Err(mpsc::RecvTimeoutError::Disconnected) => { - tracing::warn!("Pruner handle is dropped without calling `abort()`; exiting"); - break; - } - Err(mpsc::RecvTimeoutError::Timeout) => { - // The pruner handle is alive and wasn't used to abort the pruner. - } + if !self.wait_for_next_iteration(timeout) { + break; } } } @@ -203,9 +254,11 @@ mod tests { #[test] fn pruner_basics() { let mut db = create_db(); - let (mut pruner, _handle) = MerkleTreePruner::new(&mut db, 0); + let (mut pruner, _handle) = MerkleTreePruner::new(&mut db); - let stats = pruner.run_once().unwrap(); + let stats = pruner + .prune_up_to(pruner.last_prunable_version().unwrap()) + .unwrap(); assert!(stats.pruned_key_count > 0); assert_eq!(stats.deleted_stale_key_versions, 1..5); assert_eq!(stats.target_retained_version, 4); @@ -221,11 +274,13 @@ mod tests { #[test] fn pruner_with_intermediate_commits() { let mut db = create_db(); - let (mut pruner, _handle) = MerkleTreePruner::new(&mut db, 0); + let (mut pruner, _handle) = MerkleTreePruner::new(&mut db); pruner.set_target_pruned_key_count(1); for i in 1..5 { - let stats = pruner.run_once().unwrap(); + let stats = pruner + .prune_up_to(pruner.last_prunable_version().unwrap()) + .unwrap(); assert!(stats.pruned_key_count > 0); assert_eq!(stats.deleted_stale_key_versions, i..(i + 1)); assert_eq!(stats.target_retained_version, 4); @@ -235,9 +290,13 @@ mod tests { #[test] fn pruner_is_aborted_immediately_when_requested() { - let (mut pruner, pruner_handle) = MerkleTreePruner::new(PatchSet::default(), 0); + let (mut pruner, pruner_handle) = MerkleTreePruner::new(PatchSet::default()); pruner.set_poll_interval(Duration::from_secs(30)); - let join_handle = thread::spawn(|| pruner.run()); + let join_handle = thread::spawn(|| { + pruner.run(Box::new(KeepConstantVersionsCount { + past_versions_to_keep: 0, + })) + }); pruner_handle.abort(); let start = Instant::now(); @@ -260,8 +319,10 @@ mod tests { } let latest_version = tree.latest_version().unwrap(); - let (mut pruner, _handle) = MerkleTreePruner::new(&mut db, past_versions_to_keep); - let stats = pruner.run_once().unwrap(); + let (mut pruner, _handle) = MerkleTreePruner::new(&mut db); + let stats = pruner + .prune_up_to(pruner.last_prunable_version().unwrap() - past_versions_to_keep) + .unwrap(); assert!(stats.pruned_key_count > 0); let first_retained_version = latest_version.saturating_sub(past_versions_to_keep); assert_eq!(stats.target_retained_version, first_retained_version); @@ -282,8 +343,10 @@ mod tests { } let latest_version = tree.latest_version().unwrap(); - let (mut pruner, _handle) = MerkleTreePruner::new(&mut db, past_versions_to_keep); - let stats = pruner.run_once().unwrap(); + let (mut pruner, _handle) = MerkleTreePruner::new(&mut db); + let stats = pruner + .prune_up_to(pruner.last_prunable_version().unwrap() - past_versions_to_keep) + .unwrap(); assert!(stats.pruned_key_count > 0); let first_retained_version = latest_version.saturating_sub(past_versions_to_keep); assert_eq!(stats.target_retained_version, first_retained_version); @@ -344,7 +407,10 @@ mod tests { let new_keys_in_db: HashSet<_> = db.nodes_mut().map(|(key, _)| *key).collect(); assert!(new_keys_in_db.is_superset(&keys_in_db)); - let stats = MerkleTreePruner::new(&mut db, 0).0.run_once().unwrap(); + let (pruner, _handle) = MerkleTreePruner::new(&mut db); + let stats = pruner + .prune_up_to(pruner.last_prunable_version().unwrap()) + .unwrap(); assert_eq!(stats.pruned_key_count, keys_in_db.len() + batch_count); // ^ roots are not counted in `keys_in_db` @@ -378,12 +444,18 @@ mod tests { for chunk in new_kvs.chunks(20) { MerkleTree::new(&mut db).extend(chunk.to_vec()); if prune_iteratively { - MerkleTreePruner::new(&mut db, 0).0.run_once().unwrap(); + let (mut pruner, _handle) = MerkleTreePruner::new(&mut db); + pruner + .prune_up_to(pruner.last_prunable_version().unwrap()) + .unwrap(); } } if !prune_iteratively { - MerkleTreePruner::new(&mut db, 0).0.run_once().unwrap(); + let (mut pruner, _handle) = MerkleTreePruner::new(&mut db); + pruner + .prune_up_to(pruner.last_prunable_version().unwrap()) + .unwrap(); } let new_leaf_keys_in_db = leaf_keys(&mut db); assert!(new_leaf_keys_in_db.is_disjoint(&leaf_keys_in_db)); diff --git a/core/lib/merkle_tree/tests/integration/merkle_tree.rs b/core/lib/merkle_tree/tests/integration/merkle_tree.rs index fe6731fb441c..747731e24a72 100644 --- a/core/lib/merkle_tree/tests/integration/merkle_tree.rs +++ b/core/lib/merkle_tree/tests/integration/merkle_tree.rs @@ -608,8 +608,8 @@ mod rocksdb { fn snapshot_for_pruned_tree(chunk_size: usize) { let Harness { mut db, dir: _dir } = Harness::new(); test_intermediate_commits(&mut db, chunk_size); - let (mut pruner, _) = MerkleTreePruner::new(&mut db, 0); - pruner.run_once(); + let (mut pruner, _) = MerkleTreePruner::new(&mut db); + pruner.prune_up_to(pruner.last_prunable_version().unwrap()); let raw_db = db.into_inner(); let snapshot_name = format!("db-snapshot-{chunk_size}-chunked-commits-pruned"); diff --git a/core/lib/zksync_core/src/metadata_calculator/helpers.rs b/core/lib/zksync_core/src/metadata_calculator/helpers.rs index 2745244047d8..e4765337c408 100644 --- a/core/lib/zksync_core/src/metadata_calculator/helpers.rs +++ b/core/lib/zksync_core/src/metadata_calculator/helpers.rs @@ -19,7 +19,8 @@ use zksync_health_check::{Health, HealthStatus}; use zksync_merkle_tree::{ domain::{TreeMetadata, ZkSyncTree, ZkSyncTreeReader}, recovery::MerkleTreeRecovery, - Database, Key, NoVersionError, RocksDBWrapper, TreeEntry, TreeEntryWithProof, TreeInstruction, + Database, Key, MerkleTreePruner, MerkleTreePrunerHandle, NoVersionError, RocksDBWrapper, + TreeEntry, TreeEntryWithProof, TreeInstruction, }; use zksync_storage::{RocksDB, RocksDBOptions, StalledWritesRetries}; use zksync_types::{block::L1BatchHeader, L1BatchNumber, StorageKey, H256}; @@ -157,6 +158,10 @@ impl AsyncTree { } } + pub fn pruner(&self) -> (MerkleTreePruner, MerkleTreePrunerHandle) { + self.inner.as_ref().expect(Self::INCONSISTENT_MSG).pruner() + } + pub fn is_empty(&self) -> bool { self.as_ref().is_empty() } diff --git a/core/lib/zksync_core/src/metadata_calculator/mod.rs b/core/lib/zksync_core/src/metadata_calculator/mod.rs index dffaae636806..3a10db9d3a64 100644 --- a/core/lib/zksync_core/src/metadata_calculator/mod.rs +++ b/core/lib/zksync_core/src/metadata_calculator/mod.rs @@ -3,6 +3,7 @@ use std::{ sync::Arc, + thread, time::{Duration, Instant}, }; @@ -22,9 +23,11 @@ use self::{ helpers::{create_db, Delayer, GenericAsyncTree, MerkleTreeHealth}, updater::TreeUpdater, }; +use crate::metadata_calculator::pruning::KeepPruningSyncedWithDbPruning; mod helpers; mod metrics; +mod pruning; mod recovery; #[cfg(test)] pub(crate) mod tests; @@ -158,11 +161,24 @@ impl MetadataCalculator { "Merkle tree is initialized and ready to process L1 batches: {:?}", tree_reader.clone().info().await ); + let (tree_pruner, pruner_handle) = tree.pruner(); + let pruner_pool = pool.clone(); + let join_handle = thread::spawn(move || { + tree_pruner.run(Box::new(KeepPruningSyncedWithDbPruning { + pool: pruner_pool, + })) + }); self.tree_reader.send_replace(Some(tree_reader)); let updater = TreeUpdater::new(tree, self.max_l1_batches_per_iter, self.object_store); updater .loop_updating_tree(self.delayer, &pool, stop_receiver, self.health_updater) - .await + .await?; + + // line below requests pruner to stop + drop(pruner_handle); + join_handle.join().unwrap(); + + Ok(()) } } diff --git a/core/lib/zksync_core/src/metadata_calculator/pruning.rs b/core/lib/zksync_core/src/metadata_calculator/pruning.rs new file mode 100644 index 000000000000..41930c6d4ff4 --- /dev/null +++ b/core/lib/zksync_core/src/metadata_calculator/pruning.rs @@ -0,0 +1,20 @@ +use tokio::runtime::Handle; +use zksync_dal::{Core, CoreDal}; +use zksync_db_connection::connection_pool::ConnectionPool; +use zksync_merkle_tree::RetainedVersionSource; + +#[derive(Debug)] +pub struct KeepPruningSyncedWithDbPruning { + pub pool: ConnectionPool, +} + +impl RetainedVersionSource for KeepPruningSyncedWithDbPruning { + fn target_retained_version(&self, _last_prunable_version: u64) -> anyhow::Result { + let rt_handle = Handle::current(); + //this code looks awkward as tree crate is not using async + let mut storage = rt_handle.block_on(self.pool.connection_tagged("tree_pruner"))?; + // TODO update this code once db pruning is merged + let _ = rt_handle.block_on(storage.blocks_dal().get_earliest_l1_batch_number())?; + Ok(0) + } +} From 4e33aedce9e121c857103659d5461872197c7dcd Mon Sep 17 00:00:00 2001 From: tomg10 Date: Sun, 31 Mar 2024 17:42:35 +0200 Subject: [PATCH 02/17] cargo fix Signed-off-by: tomg10 --- Cargo.lock | 1 + core/lib/merkle_tree/Cargo.toml | 1 + 2 files changed, 2 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index fc7d1d63f9a0..0988d78dc13e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8475,6 +8475,7 @@ dependencies = [ name = "zksync_merkle_tree" version = "0.1.0" dependencies = [ + "anyhow", "assert_matches", "clap 4.4.6", "insta", diff --git a/core/lib/merkle_tree/Cargo.toml b/core/lib/merkle_tree/Cargo.toml index 1170c81479b5..debe49dbffb1 100644 --- a/core/lib/merkle_tree/Cargo.toml +++ b/core/lib/merkle_tree/Cargo.toml @@ -17,6 +17,7 @@ zksync_storage.workspace = true zksync_prover_interface.workspace = true zksync_utils.workspace = true +anyhow.workspace = true leb128.workspace = true once_cell.workspace = true rayon.workspace = true From 620cebbe1f69d0db7d86dce5aef34daf6fea32d2 Mon Sep 17 00:00:00 2001 From: tomg10 Date: Mon, 1 Apr 2024 03:34:33 +0200 Subject: [PATCH 03/17] fix Signed-off-by: tomg10 --- core/lib/merkle_tree/src/pruning.rs | 4 ++-- core/lib/merkle_tree/tests/integration/merkle_tree.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/lib/merkle_tree/src/pruning.rs b/core/lib/merkle_tree/src/pruning.rs index 9fade70745fa..608bacbebd55 100644 --- a/core/lib/merkle_tree/src/pruning.rs +++ b/core/lib/merkle_tree/src/pruning.rs @@ -180,7 +180,7 @@ impl MerkleTreePruner { } /// Runs this pruner indefinitely until it is aborted by dropping its handle. - pub fn run(mut self, retained_version_source: Box) { + pub fn run(mut self, retained_version_source: &Box) { tracing::info!("Started Merkle tree pruner {self:?}"); loop { let last_version = self.last_prunable_version(); @@ -407,7 +407,7 @@ mod tests { let new_keys_in_db: HashSet<_> = db.nodes_mut().map(|(key, _)| *key).collect(); assert!(new_keys_in_db.is_superset(&keys_in_db)); - let (pruner, _handle) = MerkleTreePruner::new(&mut db); + let (mut pruner, _handle) = MerkleTreePruner::new(&mut db); let stats = pruner .prune_up_to(pruner.last_prunable_version().unwrap()) .unwrap(); diff --git a/core/lib/merkle_tree/tests/integration/merkle_tree.rs b/core/lib/merkle_tree/tests/integration/merkle_tree.rs index 747731e24a72..bead4311b30d 100644 --- a/core/lib/merkle_tree/tests/integration/merkle_tree.rs +++ b/core/lib/merkle_tree/tests/integration/merkle_tree.rs @@ -608,7 +608,7 @@ mod rocksdb { fn snapshot_for_pruned_tree(chunk_size: usize) { let Harness { mut db, dir: _dir } = Harness::new(); test_intermediate_commits(&mut db, chunk_size); - let (mut pruner, _) = MerkleTreePruner::new(&mut db); + let (mut pruner, _handle) = MerkleTreePruner::new(&mut db); pruner.prune_up_to(pruner.last_prunable_version().unwrap()); let raw_db = db.into_inner(); From 1f889cf7d8befe2d52205cf725945f28cb19d2ca Mon Sep 17 00:00:00 2001 From: tomg10 Date: Mon, 1 Apr 2024 03:58:31 +0200 Subject: [PATCH 04/17] fix Signed-off-by: tomg10 --- core/lib/merkle_tree/examples/loadtest/main.rs | 4 ++-- core/lib/merkle_tree/src/pruning.rs | 6 +++--- core/lib/zksync_core/src/metadata_calculator/mod.rs | 4 +--- 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/core/lib/merkle_tree/examples/loadtest/main.rs b/core/lib/merkle_tree/examples/loadtest/main.rs index 199133ffd5ae..458f368548e4 100644 --- a/core/lib/merkle_tree/examples/loadtest/main.rs +++ b/core/lib/merkle_tree/examples/loadtest/main.rs @@ -105,9 +105,9 @@ impl Cli { let (mut pruner, pruner_handle) = MerkleTreePruner::new(rocksdb.clone()); pruner.set_poll_interval(Duration::from_secs(10)); let pruner_thread = thread::spawn(|| { - pruner.run(Box::new(KeepConstantVersionsCount { + pruner.run(&KeepConstantVersionsCount { past_versions_to_keep: 0, - })) + }) }); pruner_handles = Some((pruner_handle, pruner_thread)); } diff --git a/core/lib/merkle_tree/src/pruning.rs b/core/lib/merkle_tree/src/pruning.rs index 608bacbebd55..5b30b5512c61 100644 --- a/core/lib/merkle_tree/src/pruning.rs +++ b/core/lib/merkle_tree/src/pruning.rs @@ -180,7 +180,7 @@ impl MerkleTreePruner { } /// Runs this pruner indefinitely until it is aborted by dropping its handle. - pub fn run(mut self, retained_version_source: &Box) { + pub fn run(mut self, retained_version_source: &dyn RetainedVersionSource) { tracing::info!("Started Merkle tree pruner {self:?}"); loop { let last_version = self.last_prunable_version(); @@ -293,9 +293,9 @@ mod tests { let (mut pruner, pruner_handle) = MerkleTreePruner::new(PatchSet::default()); pruner.set_poll_interval(Duration::from_secs(30)); let join_handle = thread::spawn(|| { - pruner.run(Box::new(KeepConstantVersionsCount { + pruner.run(&KeepConstantVersionsCount { past_versions_to_keep: 0, - })) + }) }); pruner_handle.abort(); diff --git a/core/lib/zksync_core/src/metadata_calculator/mod.rs b/core/lib/zksync_core/src/metadata_calculator/mod.rs index 3a10db9d3a64..2844a30558c9 100644 --- a/core/lib/zksync_core/src/metadata_calculator/mod.rs +++ b/core/lib/zksync_core/src/metadata_calculator/mod.rs @@ -164,9 +164,7 @@ impl MetadataCalculator { let (tree_pruner, pruner_handle) = tree.pruner(); let pruner_pool = pool.clone(); let join_handle = thread::spawn(move || { - tree_pruner.run(Box::new(KeepPruningSyncedWithDbPruning { - pool: pruner_pool, - })) + tree_pruner.run(&KeepPruningSyncedWithDbPruning { pool: pruner_pool }) }); self.tree_reader.send_replace(Some(tree_reader)); From 8733fe2f04d59d6e37e9177ee731380c391c0ff9 Mon Sep 17 00:00:00 2001 From: tomg10 Date: Mon, 1 Apr 2024 17:40:56 +0200 Subject: [PATCH 05/17] fix Signed-off-by: tomg10 --- core/lib/merkle_tree/src/pruning.rs | 4 +++- .../src/metadata_calculator/mod.rs | 15 +++++++------ .../src/metadata_calculator/pruning.rs | 22 ++++++++++++++----- 3 files changed, 28 insertions(+), 13 deletions(-) diff --git a/core/lib/merkle_tree/src/pruning.rs b/core/lib/merkle_tree/src/pruning.rs index 5b30b5512c61..983508dd7b4d 100644 --- a/core/lib/merkle_tree/src/pruning.rs +++ b/core/lib/merkle_tree/src/pruning.rs @@ -27,6 +27,7 @@ impl MerkleTreePrunerHandle { /// objects implementing this trait can be passed to pruner main loop, they act as a source of info about up to which version the tree should be pruned pub trait RetainedVersionSource { /// Returns info up to which version (l1_batch) the tree should be pruned up to + #[allow(clippy::missing_errors_doc)] fn target_retained_version(&self, last_prunable_version: u64) -> anyhow::Result; } @@ -180,6 +181,7 @@ impl MerkleTreePruner { } /// Runs this pruner indefinitely until it is aborted by dropping its handle. + #[allow(clippy::missing_panics_doc)] pub fn run(mut self, retained_version_source: &dyn RetainedVersionSource) { tracing::info!("Started Merkle tree pruner {self:?}"); loop { @@ -295,7 +297,7 @@ mod tests { let join_handle = thread::spawn(|| { pruner.run(&KeepConstantVersionsCount { past_versions_to_keep: 0, - }) + }); }); pruner_handle.abort(); diff --git a/core/lib/zksync_core/src/metadata_calculator/mod.rs b/core/lib/zksync_core/src/metadata_calculator/mod.rs index 2844a30558c9..7c569d8e814c 100644 --- a/core/lib/zksync_core/src/metadata_calculator/mod.rs +++ b/core/lib/zksync_core/src/metadata_calculator/mod.rs @@ -8,7 +8,7 @@ use std::{ }; use anyhow::Context as _; -use tokio::sync::watch; +use tokio::{runtime::Handle, sync::watch}; use zksync_config::configs::{ chain::OperationsManagerConfig, database::{MerkleTreeConfig, MerkleTreeMode}, @@ -162,10 +162,11 @@ impl MetadataCalculator { tree_reader.clone().info().await ); let (tree_pruner, pruner_handle) = tree.pruner(); - let pruner_pool = pool.clone(); - let join_handle = thread::spawn(move || { - tree_pruner.run(&KeepPruningSyncedWithDbPruning { pool: pruner_pool }) - }); + let pruner_version_source = KeepPruningSyncedWithDbPruning { + pool: pool.clone(), + rt_handle: Handle::current(), + }; + let pruner_thread = thread::spawn(move || tree_pruner.run(&pruner_version_source)); self.tree_reader.send_replace(Some(tree_reader)); let updater = TreeUpdater::new(tree, self.max_l1_batches_per_iter, self.object_store); @@ -174,8 +175,8 @@ impl MetadataCalculator { .await?; // line below requests pruner to stop - drop(pruner_handle); - join_handle.join().unwrap(); + pruner_handle.abort(); + pruner_thread.join().unwrap(); Ok(()) } diff --git a/core/lib/zksync_core/src/metadata_calculator/pruning.rs b/core/lib/zksync_core/src/metadata_calculator/pruning.rs index 41930c6d4ff4..5ab66d94d6f1 100644 --- a/core/lib/zksync_core/src/metadata_calculator/pruning.rs +++ b/core/lib/zksync_core/src/metadata_calculator/pruning.rs @@ -2,19 +2,31 @@ use tokio::runtime::Handle; use zksync_dal::{Core, CoreDal}; use zksync_db_connection::connection_pool::ConnectionPool; use zksync_merkle_tree::RetainedVersionSource; +use zksync_types::L1BatchNumber; #[derive(Debug)] pub struct KeepPruningSyncedWithDbPruning { pub pool: ConnectionPool, + pub rt_handle: Handle, } impl RetainedVersionSource for KeepPruningSyncedWithDbPruning { fn target_retained_version(&self, _last_prunable_version: u64) -> anyhow::Result { - let rt_handle = Handle::current(); - //this code looks awkward as tree crate is not using async - let mut storage = rt_handle.block_on(self.pool.connection_tagged("tree_pruner"))?; + // We have to do this as as the whole tree does not work inside tokio runtime + let target_l1_batch_number = self.rt_handle.block_on(self.get_l1_batch_to_prune())?.0; + Ok(target_l1_batch_number as u64) + } +} + +impl KeepPruningSyncedWithDbPruning { + async fn get_l1_batch_to_prune(&self) -> anyhow::Result { + let mut storage = self.pool.connection_tagged("tree_pruner").await?; // TODO update this code once db pruning is merged - let _ = rt_handle.block_on(storage.blocks_dal().get_earliest_l1_batch_number())?; - Ok(0) + let _ = storage + .blocks_dal() + .get_earliest_l1_batch_number() + .await? + .unwrap_or(L1BatchNumber(0)); + Ok(L1BatchNumber(0)) } } From 27165c836ef217ae491668f6095f6d90086e3f8d Mon Sep 17 00:00:00 2001 From: tomg10 Date: Tue, 2 Apr 2024 03:09:32 +0200 Subject: [PATCH 06/17] fix Signed-off-by: tomg10 --- core/lib/zksync_core/src/metadata_calculator/mod.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/core/lib/zksync_core/src/metadata_calculator/mod.rs b/core/lib/zksync_core/src/metadata_calculator/mod.rs index 7c569d8e814c..e358005787cd 100644 --- a/core/lib/zksync_core/src/metadata_calculator/mod.rs +++ b/core/lib/zksync_core/src/metadata_calculator/mod.rs @@ -176,7 +176,12 @@ impl MetadataCalculator { // line below requests pruner to stop pruner_handle.abort(); - pruner_thread.join().unwrap(); + // we can't just join thread here as it may cause deadlock + // if tree pruner is currently executing async code, as join is blocking + tokio::task::spawn_blocking(|| { + pruner_thread.join().unwrap(); + }) + .await?; Ok(()) } From 1c4ccc9bdf199b1512aceef521b957c6e3cde130 Mon Sep 17 00:00:00 2001 From: tomg10 Date: Wed, 3 Apr 2024 02:29:45 +0200 Subject: [PATCH 07/17] PR feedback Signed-off-by: tomg10 --- .../lib/merkle_tree/examples/loadtest/main.rs | 15 +-- core/lib/merkle_tree/src/lib.rs | 4 +- core/lib/merkle_tree/src/metrics.rs | 2 +- core/lib/merkle_tree/src/pruning.rs | 118 +++++++----------- .../src/metadata_calculator/mod.rs | 56 ++++++--- .../src/metadata_calculator/pruning.rs | 32 ----- 6 files changed, 95 insertions(+), 132 deletions(-) delete mode 100644 core/lib/zksync_core/src/metadata_calculator/pruning.rs diff --git a/core/lib/merkle_tree/examples/loadtest/main.rs b/core/lib/merkle_tree/examples/loadtest/main.rs index 458f368548e4..5a43fa876f2f 100644 --- a/core/lib/merkle_tree/examples/loadtest/main.rs +++ b/core/lib/merkle_tree/examples/loadtest/main.rs @@ -14,8 +14,8 @@ use tempfile::TempDir; use tracing_subscriber::EnvFilter; use zksync_crypto::hasher::blake2::Blake2Hasher; use zksync_merkle_tree::{ - Database, HashTree, KeepConstantVersionsCount, MerkleTree, MerkleTreePruner, PatchSet, - RocksDBWrapper, TreeEntry, TreeInstruction, + Database, HashTree, MerkleTree, MerkleTreePruner, PatchSet, RocksDBWrapper, TreeEntry, + TreeInstruction, }; use zksync_storage::{RocksDB, RocksDBOptions}; use zksync_types::{AccountTreeId, Address, StorageKey, H256, U256}; @@ -104,11 +104,7 @@ impl Cli { if self.prune { let (mut pruner, pruner_handle) = MerkleTreePruner::new(rocksdb.clone()); pruner.set_poll_interval(Duration::from_secs(10)); - let pruner_thread = thread::spawn(|| { - pruner.run(&KeepConstantVersionsCount { - past_versions_to_keep: 0, - }) - }); + let pruner_thread = thread::spawn(|| pruner.run()); pruner_handles = Some((pruner_handle, pruner_thread)); } _temp_dir = Some(dir); @@ -159,6 +155,11 @@ impl Cli { let output = tree.extend(kvs.collect()); output.root_hash }; + + if let Some((ref pruner_handle, _)) = pruner_handles { + pruner_handle.set_target_retained_version(version); + } + let elapsed = start.elapsed(); tracing::info!("Processed block #{version} in {elapsed:?}, root hash = {root_hash:?}"); } diff --git a/core/lib/merkle_tree/src/lib.rs b/core/lib/merkle_tree/src/lib.rs index f8cc09e847bd..687e957f8ef4 100644 --- a/core/lib/merkle_tree/src/lib.rs +++ b/core/lib/merkle_tree/src/lib.rs @@ -51,9 +51,7 @@ use zksync_crypto::hasher::blake2::Blake2Hasher; pub use crate::{ errors::NoVersionError, hasher::{HashTree, TreeRangeDigest}, - pruning::{ - KeepConstantVersionsCount, MerkleTreePruner, MerkleTreePrunerHandle, RetainedVersionSource, - }, + pruning::{MerkleTreePruner, MerkleTreePrunerHandle}, storage::{ Database, MerkleTreeColumnFamily, PatchSet, Patched, PruneDatabase, PrunePatchSet, RocksDBWrapper, diff --git a/core/lib/merkle_tree/src/metrics.rs b/core/lib/merkle_tree/src/metrics.rs index ef1e94f9b050..8c8fdc4aeaa4 100644 --- a/core/lib/merkle_tree/src/metrics.rs +++ b/core/lib/merkle_tree/src/metrics.rs @@ -334,7 +334,7 @@ pub struct PruningStats { } impl PruningStats { - pub fn report(self) { + pub fn report(&self) { PRUNING_METRICS .target_retained_version .set(self.target_retained_version); diff --git a/core/lib/merkle_tree/src/pruning.rs b/core/lib/merkle_tree/src/pruning.rs index 983508dd7b4d..509bf5fa01d2 100644 --- a/core/lib/merkle_tree/src/pruning.rs +++ b/core/lib/merkle_tree/src/pruning.rs @@ -1,6 +1,11 @@ //! Tree pruning logic. -use std::{cmp::min, fmt, sync::mpsc, time::Duration}; +use std::{ + cmp::min, + fmt, + sync::{mpsc, Arc, RwLock}, + time::Duration, +}; use crate::{ metrics::{PruningStats, PRUNING_TIMINGS}, @@ -9,11 +14,11 @@ use crate::{ /// Handle for a [`MerkleTreePruner`] allowing to abort its operation. /// -/// The pruner is aborted once the handle is dropped. -#[must_use = "Pruner is aborted once handle is dropped"] +/// The pruner is aborted once the abort method on handle is called. #[derive(Debug)] pub struct MerkleTreePrunerHandle { aborted_sender: mpsc::Sender<()>, + target_retained_version: Arc>>, } impl MerkleTreePrunerHandle { @@ -22,27 +27,15 @@ impl MerkleTreePrunerHandle { pub fn abort(self) { self.aborted_sender.send(()).ok(); } -} -/// objects implementing this trait can be passed to pruner main loop, they act as a source of info about up to which version the tree should be pruned -pub trait RetainedVersionSource { - /// Returns info up to which version (l1_batch) the tree should be pruned up to - #[allow(clippy::missing_errors_doc)] - fn target_retained_version(&self, last_prunable_version: u64) -> anyhow::Result; -} - -/// Pruner 'algorithm' simulating keeping a constant number of past versions -#[derive(Debug)] -pub struct KeepConstantVersionsCount { - /// How many past versions of tree should be kept - pub past_versions_to_keep: u64, -} - -impl RetainedVersionSource for KeepConstantVersionsCount { - fn target_retained_version(&self, last_prunable_version: u64) -> anyhow::Result { - Ok(last_prunable_version - .checked_sub(self.past_versions_to_keep) - .unwrap()) + /// Sets the version of the tree the pruner should attempt to prune to + #[allow(clippy::missing_panics_doc)] + pub fn set_target_retained_version(&self, new_version: u64) { + let mut version = self + .target_retained_version + .write() + .expect("target_retained_version is poisoned"); + *version = Some(new_version); } } @@ -64,6 +57,7 @@ pub struct MerkleTreePruner { target_pruned_key_count: usize, poll_interval: Duration, aborted_receiver: mpsc::Receiver<()>, + target_retained_version: Arc>>, } impl fmt::Debug for MerkleTreePruner { @@ -82,16 +76,20 @@ impl MerkleTreePruner { /// /// # Return value /// - /// Returns the created pruner and a handle to it. *The pruner will be aborted when its handle - /// is dropped.* + /// Returns the created pruner and a handle to it. The pruner can be stopped by calling abort on it's handle pub fn new(db: DB) -> (Self, MerkleTreePrunerHandle) { let (aborted_sender, aborted_receiver) = mpsc::channel(); - let handle = MerkleTreePrunerHandle { aborted_sender }; + let target_retained_version = Arc::new(RwLock::new(None)); + let handle = MerkleTreePrunerHandle { + aborted_sender, + target_retained_version: target_retained_version.clone(), + }; let this = Self { db, target_pruned_key_count: 500_000, poll_interval: Duration::from_secs(60), aborted_receiver, + target_retained_version, }; (this, handle) } @@ -168,58 +166,38 @@ impl MerkleTreePruner { Some(stats) } - fn wait_for_next_iteration(&mut self, timeout: Duration) -> bool { - match self.aborted_receiver.recv_timeout(timeout) { - Ok(()) => false, // Abort was requested - Err(mpsc::RecvTimeoutError::Disconnected) => { - tracing::warn!("Pruner handle is dropped without calling `abort()`; exiting"); - false - } - // The pruner handle is alive and wasn't used to abort the pruner. - Err(mpsc::RecvTimeoutError::Timeout) => true, - } + fn wait_for_abort(&mut self, timeout: Duration) -> bool { + self.aborted_receiver.recv_timeout(timeout).is_ok() } - /// Runs this pruner indefinitely until it is aborted by dropping its handle. + /// Runs this pruner indefinitely until it is aborted by calling abort() on its handle. #[allow(clippy::missing_panics_doc)] - pub fn run(mut self, retained_version_source: &dyn RetainedVersionSource) { + pub fn run(mut self) { tracing::info!("Started Merkle tree pruner {self:?}"); loop { - let last_version = self.last_prunable_version(); - if last_version.is_none() { - tracing::info!("Nothing to prune in tree, sleeping"); - if !self.wait_for_next_iteration(self.poll_interval) { - break; - } - continue; + if self.wait_for_abort(Duration::ZERO) { + break; } - let retained_version = - retained_version_source.target_retained_version(last_version.unwrap()); - if retained_version.is_err() { - tracing::warn!( - "Unable to determine tree target retained version, error was: {}", - retained_version.unwrap_err() - ); - if !self.wait_for_next_iteration(self.poll_interval) { - break; + + let retained_version = *self + .target_retained_version + .read() + .expect("target_retained_version is poisoned"); + + if let Some(retained_version) = retained_version { + if let Some(stats) = self.prune_up_to(retained_version) { + stats.report(); + if stats.has_more_work() { + continue; + } } - continue; } - let timeout = if let Some(stats) = self.prune_up_to(retained_version.unwrap()) { - let has_more_work = stats.has_more_work(); - stats.report(); - if has_more_work { - Duration::ZERO - } else { - self.poll_interval - } - } else { - tracing::debug!("No pruning required per specified policies; waiting"); + tracing::debug!( + "Pruning was not performed; waiting {:?}", self.poll_interval - }; - - if !self.wait_for_next_iteration(timeout) { + ); + if self.wait_for_abort(self.poll_interval) { break; } } @@ -295,9 +273,7 @@ mod tests { let (mut pruner, pruner_handle) = MerkleTreePruner::new(PatchSet::default()); pruner.set_poll_interval(Duration::from_secs(30)); let join_handle = thread::spawn(|| { - pruner.run(&KeepConstantVersionsCount { - past_versions_to_keep: 0, - }); + pruner.run(); }); pruner_handle.abort(); diff --git a/core/lib/zksync_core/src/metadata_calculator/mod.rs b/core/lib/zksync_core/src/metadata_calculator/mod.rs index 56935d6317eb..7fa0e24d5657 100644 --- a/core/lib/zksync_core/src/metadata_calculator/mod.rs +++ b/core/lib/zksync_core/src/metadata_calculator/mod.rs @@ -3,19 +3,20 @@ use std::{ sync::Arc, - thread, time::{Duration, Instant}, }; use anyhow::Context as _; -use tokio::{runtime::Handle, sync::watch}; +use tokio::sync::watch; use zksync_config::configs::{ chain::OperationsManagerConfig, database::{MerkleTreeConfig, MerkleTreeMode}, }; -use zksync_dal::{ConnectionPool, Core}; +use zksync_dal::{ConnectionPool, Core, CoreDal}; use zksync_health_check::{HealthUpdater, ReactiveHealthCheck}; +use zksync_merkle_tree::{MerkleTreePruner, MerkleTreePrunerHandle, RocksDBWrapper}; use zksync_object_store::ObjectStore; +use zksync_types::L1BatchNumber; pub use self::helpers::LazyAsyncTreeReader; pub(crate) use self::helpers::{AsyncTreeReader, L1BatchWithLogs, MerkleTreeInfo}; @@ -23,11 +24,9 @@ use self::{ helpers::{create_db, Delayer, GenericAsyncTree, MerkleTreeHealth}, updater::TreeUpdater, }; -use crate::metadata_calculator::pruning::KeepPruningSyncedWithDbPruning; mod helpers; mod metrics; -mod pruning; mod recovery; #[cfg(test)] pub(crate) mod tests; @@ -149,6 +148,32 @@ impl MetadataCalculator { Ok(GenericAsyncTree::new(db, self.config.mode).await) } + async fn loop_pruning_tree( + pool: ConnectionPool, + tree_pruner: MerkleTreePruner, + pruner_handle: MerkleTreePrunerHandle, + stop_receiver: watch::Receiver, + ) -> anyhow::Result<()> { + let pruner_thread = tokio::task::spawn_blocking(|| tree_pruner.run()); + loop { + let mut storage = pool.connection_tagged("tree_pruner").await?; + // TODO update this code once db pruning is merged + let _ = storage + .blocks_dal() + .get_earliest_l1_batch_number() + .await? + .unwrap_or(L1BatchNumber(0)); + + pruner_handle.set_target_retained_version(0); + + if *stop_receiver.borrow() { + pruner_handle.abort(); + pruner_thread.await?; + return Ok(()); + } + } + } + pub async fn run( self, pool: ConnectionPool, @@ -166,12 +191,14 @@ impl MetadataCalculator { "Merkle tree is initialized and ready to process L1 batches: {:?}", tree_reader.clone().info().await ); + let (tree_pruner, pruner_handle) = tree.pruner(); - let pruner_version_source = KeepPruningSyncedWithDbPruning { - pool: pool.clone(), - rt_handle: Handle::current(), - }; - let pruner_thread = thread::spawn(move || tree_pruner.run(&pruner_version_source)); + let pruner = Self::loop_pruning_tree( + pool.clone(), + tree_pruner, + pruner_handle, + stop_receiver.clone(), + ); self.tree_reader.send_replace(Some(tree_reader)); let updater = TreeUpdater::new(tree, self.max_l1_batches_per_iter, self.object_store); @@ -179,14 +206,7 @@ impl MetadataCalculator { .loop_updating_tree(self.delayer, &pool, stop_receiver, self.health_updater) .await?; - // line below requests pruner to stop - pruner_handle.abort(); - // we can't just join thread here as it may cause deadlock - // if tree pruner is currently executing async code, as join is blocking - tokio::task::spawn_blocking(|| { - pruner_thread.join().unwrap(); - }) - .await?; + pruner.await?; Ok(()) } diff --git a/core/lib/zksync_core/src/metadata_calculator/pruning.rs b/core/lib/zksync_core/src/metadata_calculator/pruning.rs deleted file mode 100644 index 5ab66d94d6f1..000000000000 --- a/core/lib/zksync_core/src/metadata_calculator/pruning.rs +++ /dev/null @@ -1,32 +0,0 @@ -use tokio::runtime::Handle; -use zksync_dal::{Core, CoreDal}; -use zksync_db_connection::connection_pool::ConnectionPool; -use zksync_merkle_tree::RetainedVersionSource; -use zksync_types::L1BatchNumber; - -#[derive(Debug)] -pub struct KeepPruningSyncedWithDbPruning { - pub pool: ConnectionPool, - pub rt_handle: Handle, -} - -impl RetainedVersionSource for KeepPruningSyncedWithDbPruning { - fn target_retained_version(&self, _last_prunable_version: u64) -> anyhow::Result { - // We have to do this as as the whole tree does not work inside tokio runtime - let target_l1_batch_number = self.rt_handle.block_on(self.get_l1_batch_to_prune())?.0; - Ok(target_l1_batch_number as u64) - } -} - -impl KeepPruningSyncedWithDbPruning { - async fn get_l1_batch_to_prune(&self) -> anyhow::Result { - let mut storage = self.pool.connection_tagged("tree_pruner").await?; - // TODO update this code once db pruning is merged - let _ = storage - .blocks_dal() - .get_earliest_l1_batch_number() - .await? - .unwrap_or(L1BatchNumber(0)); - Ok(L1BatchNumber(0)) - } -} From 1ce7feb28f97f4f86969e2a3ff82c1483dec6fc6 Mon Sep 17 00:00:00 2001 From: tomg10 Date: Wed, 3 Apr 2024 02:34:08 +0200 Subject: [PATCH 08/17] fix Signed-off-by: tomg10 --- Cargo.lock | 1 - core/lib/merkle_tree/Cargo.toml | 1 - 2 files changed, 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0988d78dc13e..fc7d1d63f9a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8475,7 +8475,6 @@ dependencies = [ name = "zksync_merkle_tree" version = "0.1.0" dependencies = [ - "anyhow", "assert_matches", "clap 4.4.6", "insta", diff --git a/core/lib/merkle_tree/Cargo.toml b/core/lib/merkle_tree/Cargo.toml index debe49dbffb1..1170c81479b5 100644 --- a/core/lib/merkle_tree/Cargo.toml +++ b/core/lib/merkle_tree/Cargo.toml @@ -17,7 +17,6 @@ zksync_storage.workspace = true zksync_prover_interface.workspace = true zksync_utils.workspace = true -anyhow.workspace = true leb128.workspace = true once_cell.workspace = true rayon.workspace = true From 55be60ebc83ad7e5ae42ad9c9328c54beddca6b2 Mon Sep 17 00:00:00 2001 From: tomg10 Date: Wed, 3 Apr 2024 02:56:29 +0200 Subject: [PATCH 09/17] prepare for tests Signed-off-by: tomg10 --- core/lib/zksync_core/src/metadata_calculator/mod.rs | 10 ++++++++-- core/lib/zksync_core/src/metadata_calculator/tests.rs | 3 ++- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/core/lib/zksync_core/src/metadata_calculator/mod.rs b/core/lib/zksync_core/src/metadata_calculator/mod.rs index 7fa0e24d5657..0b0aca61f852 100644 --- a/core/lib/zksync_core/src/metadata_calculator/mod.rs +++ b/core/lib/zksync_core/src/metadata_calculator/mod.rs @@ -53,6 +53,8 @@ pub struct MetadataCalculatorConfig { pub memtable_capacity: usize, /// Timeout to wait for the Merkle tree database to run compaction on stalled writes. pub stalled_writes_timeout: Duration, + /// Time between consecutive tree pruner loop iterations + pub tree_pruner_polling_interval: Duration, } impl MetadataCalculatorConfig { @@ -69,6 +71,7 @@ impl MetadataCalculatorConfig { block_cache_capacity: merkle_tree_config.block_cache_size(), memtable_capacity: merkle_tree_config.memtable_capacity(), stalled_writes_timeout: merkle_tree_config.stalled_writes_timeout(), + tree_pruner_polling_interval: Duration::from_secs(5), } } } @@ -149,6 +152,7 @@ impl MetadataCalculator { } async fn loop_pruning_tree( + &self, pool: ConnectionPool, tree_pruner: MerkleTreePruner, pruner_handle: MerkleTreePrunerHandle, @@ -171,6 +175,7 @@ impl MetadataCalculator { pruner_thread.await?; return Ok(()); } + tokio::time::sleep(self.config.tree_pruner_polling_interval).await; } } @@ -192,8 +197,9 @@ impl MetadataCalculator { tree_reader.clone().info().await ); - let (tree_pruner, pruner_handle) = tree.pruner(); - let pruner = Self::loop_pruning_tree( + let (mut tree_pruner, pruner_handle) = tree.pruner(); + tree_pruner.set_poll_interval(self.config.tree_pruner_polling_interval); + let pruner = self.loop_pruning_tree( pool.clone(), tree_pruner, pruner_handle, diff --git a/core/lib/zksync_core/src/metadata_calculator/tests.rs b/core/lib/zksync_core/src/metadata_calculator/tests.rs index 731799fd77a1..20ef73dee057 100644 --- a/core/lib/zksync_core/src/metadata_calculator/tests.rs +++ b/core/lib/zksync_core/src/metadata_calculator/tests.rs @@ -398,8 +398,9 @@ async fn setup_calculator_with_options( pool: &ConnectionPool, object_store: Option>, ) -> MetadataCalculator { - let calculator_config = + let mut calculator_config = MetadataCalculatorConfig::for_main_node(merkle_tree_config, operation_config); + calculator_config.tree_pruner_polling_interval = Duration::ZERO; let metadata_calculator = MetadataCalculator::new(calculator_config, object_store) .await .unwrap(); From 8cc492b87662e727d1d23898c7a10ba1837e4bdb Mon Sep 17 00:00:00 2001 From: tomg10 Date: Fri, 12 Apr 2024 17:00:23 +0200 Subject: [PATCH 10/17] PR feedback, remove code from metadata calculator Signed-off-by: tomg10 --- core/lib/merkle_tree/src/domain.rs | 17 ++++--- core/lib/merkle_tree/src/pruning.rs | 18 ++++---- .../src/metadata_calculator/helpers.rs | 1 + .../src/metadata_calculator/mod.rs | 45 +------------------ 4 files changed, 22 insertions(+), 59 deletions(-) diff --git a/core/lib/merkle_tree/src/domain.rs b/core/lib/merkle_tree/src/domain.rs index 17d0422c08dd..c87f804307a4 100644 --- a/core/lib/merkle_tree/src/domain.rs +++ b/core/lib/merkle_tree/src/domain.rs @@ -1,5 +1,7 @@ //! Tying the Merkle tree implementation to the problem domain. +use std::sync::atomic::{AtomicBool, Ordering}; + use rayon::{ThreadPool, ThreadPoolBuilder}; use zksync_crypto::hasher::blake2::Blake2Hasher; use zksync_prover_interface::inputs::{PrepareBasicCircuitsJob, StorageLogMetadata}; @@ -50,6 +52,7 @@ pub struct ZkSyncTree { tree: MerkleTree>, thread_pool: Option, mode: TreeMode, + pruning_enabled: AtomicBool, } impl ZkSyncTree { @@ -61,12 +64,6 @@ impl ZkSyncTree { .expect("failed initializing `rayon` thread pool") } - /// Returns tree pruner and a handle to stop it - pub fn pruner(&self) -> (MerkleTreePruner, MerkleTreePrunerHandle) { - let db = self.tree.db.inner().clone(); - MerkleTreePruner::new(db) - } - /// Returns metadata based on `storage_logs` generated by the genesis L1 batch. This does not /// create a persistent tree. pub fn process_genesis_batch(storage_logs: &[TreeInstruction]) -> BlockOutput { @@ -107,9 +104,17 @@ impl ZkSyncTree { tree: MerkleTree::new(Patched::new(db)), thread_pool: None, mode, + pruning_enabled: AtomicBool::new(false), } } + /// Returns tree pruner and a handle to stop it + pub fn pruner(&self) -> (MerkleTreePruner, MerkleTreePrunerHandle) { + let db = self.tree.db.inner().clone(); + self.pruning_enabled.compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed).expect("attempted to construct multiple pruners for a single tree, pruner may only run as a singleton"); + MerkleTreePruner::new(db) + } + /// Returns a readonly handle to the tree. The handle **does not** see uncommitted changes to the tree, /// only ones flushed to RocksDB. pub fn reader(&self) -> ZkSyncTreeReader { diff --git a/core/lib/merkle_tree/src/pruning.rs b/core/lib/merkle_tree/src/pruning.rs index 509bf5fa01d2..552dfaa89009 100644 --- a/core/lib/merkle_tree/src/pruning.rs +++ b/core/lib/merkle_tree/src/pruning.rs @@ -3,7 +3,10 @@ use std::{ cmp::min, fmt, - sync::{mpsc, Arc, RwLock}, + sync::{ + atomic::{AtomicU64, Ordering}, + mpsc, Arc, RwLock, + }, time::Duration, }; @@ -18,7 +21,7 @@ use crate::{ #[derive(Debug)] pub struct MerkleTreePrunerHandle { aborted_sender: mpsc::Sender<()>, - target_retained_version: Arc>>, + target_retained_version: Arc, } impl MerkleTreePrunerHandle { @@ -31,11 +34,8 @@ impl MerkleTreePrunerHandle { /// Sets the version of the tree the pruner should attempt to prune to #[allow(clippy::missing_panics_doc)] pub fn set_target_retained_version(&self, new_version: u64) { - let mut version = self - .target_retained_version - .write() - .expect("target_retained_version is poisoned"); - *version = Some(new_version); + self.target_retained_version + .store(new_version, Ordering::Relaxed); } } @@ -82,7 +82,7 @@ impl MerkleTreePruner { let target_retained_version = Arc::new(RwLock::new(None)); let handle = MerkleTreePrunerHandle { aborted_sender, - target_retained_version: target_retained_version.clone(), + target_retained_version: Arc::new(AtomicU64::new(0)), }; let this = Self { db, @@ -128,7 +128,7 @@ impl MerkleTreePruner { tracing::info!("Nothing to prune; skipping"); return None; } - let target_retained_version = min(target_retained_version, last_prunable_version.unwrap()); + let target_retained_version = min(target_retained_version, last_prunable_version?); let stale_key_new_versions = min_stale_key_version..=target_retained_version; tracing::info!("Collecting stale keys with new versions in {stale_key_new_versions:?}"); diff --git a/core/lib/zksync_core/src/metadata_calculator/helpers.rs b/core/lib/zksync_core/src/metadata_calculator/helpers.rs index 26f2f147c33a..f3f24d40a660 100644 --- a/core/lib/zksync_core/src/metadata_calculator/helpers.rs +++ b/core/lib/zksync_core/src/metadata_calculator/helpers.rs @@ -162,6 +162,7 @@ impl AsyncTree { } } + #[allow(dead_code)] pub fn pruner(&self) -> (MerkleTreePruner, MerkleTreePrunerHandle) { self.inner.as_ref().expect(Self::INCONSISTENT_MSG).pruner() } diff --git a/core/lib/zksync_core/src/metadata_calculator/mod.rs b/core/lib/zksync_core/src/metadata_calculator/mod.rs index 0b0aca61f852..e047b6c36c10 100644 --- a/core/lib/zksync_core/src/metadata_calculator/mod.rs +++ b/core/lib/zksync_core/src/metadata_calculator/mod.rs @@ -12,11 +12,9 @@ use zksync_config::configs::{ chain::OperationsManagerConfig, database::{MerkleTreeConfig, MerkleTreeMode}, }; -use zksync_dal::{ConnectionPool, Core, CoreDal}; +use zksync_dal::{ConnectionPool, Core}; use zksync_health_check::{HealthUpdater, ReactiveHealthCheck}; -use zksync_merkle_tree::{MerkleTreePruner, MerkleTreePrunerHandle, RocksDBWrapper}; use zksync_object_store::ObjectStore; -use zksync_types::L1BatchNumber; pub use self::helpers::LazyAsyncTreeReader; pub(crate) use self::helpers::{AsyncTreeReader, L1BatchWithLogs, MerkleTreeInfo}; @@ -53,8 +51,6 @@ pub struct MetadataCalculatorConfig { pub memtable_capacity: usize, /// Timeout to wait for the Merkle tree database to run compaction on stalled writes. pub stalled_writes_timeout: Duration, - /// Time between consecutive tree pruner loop iterations - pub tree_pruner_polling_interval: Duration, } impl MetadataCalculatorConfig { @@ -71,7 +67,6 @@ impl MetadataCalculatorConfig { block_cache_capacity: merkle_tree_config.block_cache_size(), memtable_capacity: merkle_tree_config.memtable_capacity(), stalled_writes_timeout: merkle_tree_config.stalled_writes_timeout(), - tree_pruner_polling_interval: Duration::from_secs(5), } } } @@ -151,34 +146,6 @@ impl MetadataCalculator { Ok(GenericAsyncTree::new(db, self.config.mode).await) } - async fn loop_pruning_tree( - &self, - pool: ConnectionPool, - tree_pruner: MerkleTreePruner, - pruner_handle: MerkleTreePrunerHandle, - stop_receiver: watch::Receiver, - ) -> anyhow::Result<()> { - let pruner_thread = tokio::task::spawn_blocking(|| tree_pruner.run()); - loop { - let mut storage = pool.connection_tagged("tree_pruner").await?; - // TODO update this code once db pruning is merged - let _ = storage - .blocks_dal() - .get_earliest_l1_batch_number() - .await? - .unwrap_or(L1BatchNumber(0)); - - pruner_handle.set_target_retained_version(0); - - if *stop_receiver.borrow() { - pruner_handle.abort(); - pruner_thread.await?; - return Ok(()); - } - tokio::time::sleep(self.config.tree_pruner_polling_interval).await; - } - } - pub async fn run( self, pool: ConnectionPool, @@ -197,14 +164,6 @@ impl MetadataCalculator { tree_reader.clone().info().await ); - let (mut tree_pruner, pruner_handle) = tree.pruner(); - tree_pruner.set_poll_interval(self.config.tree_pruner_polling_interval); - let pruner = self.loop_pruning_tree( - pool.clone(), - tree_pruner, - pruner_handle, - stop_receiver.clone(), - ); self.tree_reader.send_replace(Some(tree_reader)); let updater = TreeUpdater::new(tree, self.max_l1_batches_per_iter, self.object_store); @@ -212,8 +171,6 @@ impl MetadataCalculator { .loop_updating_tree(self.delayer, &pool, stop_receiver, self.health_updater) .await?; - pruner.await?; - Ok(()) } } From 9e809e8ea5baeadd0222b1798f8a82e856b40692 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Wed, 17 Apr 2024 14:07:42 +0300 Subject: [PATCH 11/17] Brush up `pruning` module --- .../lib/merkle_tree/examples/loadtest/main.rs | 2 +- core/lib/merkle_tree/src/domain.rs | 21 +++-- core/lib/merkle_tree/src/pruning.rs | 82 +++++++++++-------- 3 files changed, 61 insertions(+), 44 deletions(-) diff --git a/core/lib/merkle_tree/examples/loadtest/main.rs b/core/lib/merkle_tree/examples/loadtest/main.rs index 5a43fa876f2f..53920bbe4e6c 100644 --- a/core/lib/merkle_tree/examples/loadtest/main.rs +++ b/core/lib/merkle_tree/examples/loadtest/main.rs @@ -156,7 +156,7 @@ impl Cli { output.root_hash }; - if let Some((ref pruner_handle, _)) = pruner_handles { + if let Some((pruner_handle, _)) = &pruner_handles { pruner_handle.set_target_retained_version(version); } diff --git a/core/lib/merkle_tree/src/domain.rs b/core/lib/merkle_tree/src/domain.rs index c87f804307a4..e6e854387d2b 100644 --- a/core/lib/merkle_tree/src/domain.rs +++ b/core/lib/merkle_tree/src/domain.rs @@ -1,7 +1,5 @@ //! Tying the Merkle tree implementation to the problem domain. -use std::sync::atomic::{AtomicBool, Ordering}; - use rayon::{ThreadPool, ThreadPoolBuilder}; use zksync_crypto::hasher::blake2::Blake2Hasher; use zksync_prover_interface::inputs::{PrepareBasicCircuitsJob, StorageLogMetadata}; @@ -52,7 +50,7 @@ pub struct ZkSyncTree { tree: MerkleTree>, thread_pool: Option, mode: TreeMode, - pruning_enabled: AtomicBool, + pruning_enabled: bool, } impl ZkSyncTree { @@ -104,14 +102,23 @@ impl ZkSyncTree { tree: MerkleTree::new(Patched::new(db)), thread_pool: None, mode, - pruning_enabled: AtomicBool::new(false), + pruning_enabled: false, } } - /// Returns tree pruner and a handle to stop it - pub fn pruner(&self) -> (MerkleTreePruner, MerkleTreePrunerHandle) { + /// Returns tree pruner and a handle to stop it. + /// + /// # Panics + /// + /// Panics if this method was already called for the tree instance; it's logically unsound to run + /// multiple pruners for the same tree concurrently. + pub fn pruner(&mut self) -> (MerkleTreePruner, MerkleTreePrunerHandle) { + assert!( + !self.pruning_enabled, + "pruner was already obtained for the tree" + ); + self.pruning_enabled = true; let db = self.tree.db.inner().clone(); - self.pruning_enabled.compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed).expect("attempted to construct multiple pruners for a single tree, pruner may only run as a singleton"); MerkleTreePruner::new(db) } diff --git a/core/lib/merkle_tree/src/pruning.rs b/core/lib/merkle_tree/src/pruning.rs index 552dfaa89009..832c4a697cdc 100644 --- a/core/lib/merkle_tree/src/pruning.rs +++ b/core/lib/merkle_tree/src/pruning.rs @@ -1,11 +1,10 @@ //! Tree pruning logic. use std::{ - cmp::min, fmt, sync::{ atomic::{AtomicU64, Ordering}, - mpsc, Arc, RwLock, + mpsc, Arc, }, time::Duration, }; @@ -31,11 +30,11 @@ impl MerkleTreePrunerHandle { self.aborted_sender.send(()).ok(); } - /// Sets the version of the tree the pruner should attempt to prune to - #[allow(clippy::missing_panics_doc)] + /// Sets the version of the tree the pruner should attempt to prune to. Calls should provide + /// monotonically increasing versions; call with a lesser version will have no effect. pub fn set_target_retained_version(&self, new_version: u64) { self.target_retained_version - .store(new_version, Ordering::Relaxed); + .fetch_max(new_version, Ordering::Relaxed); } } @@ -57,7 +56,7 @@ pub struct MerkleTreePruner { target_pruned_key_count: usize, poll_interval: Duration, aborted_receiver: mpsc::Receiver<()>, - target_retained_version: Arc>>, + target_retained_version: Arc, } impl fmt::Debug for MerkleTreePruner { @@ -66,6 +65,7 @@ impl fmt::Debug for MerkleTreePruner { .debug_struct("MerkleTreePruner") .field("target_pruned_key_count", &self.target_pruned_key_count) .field("poll_interval", &self.poll_interval) + .field("target_retained_version", &self.target_retained_version) .finish_non_exhaustive() } } @@ -79,10 +79,10 @@ impl MerkleTreePruner { /// Returns the created pruner and a handle to it. The pruner can be stopped by calling abort on it's handle pub fn new(db: DB) -> (Self, MerkleTreePrunerHandle) { let (aborted_sender, aborted_receiver) = mpsc::channel(); - let target_retained_version = Arc::new(RwLock::new(None)); + let target_retained_version = Arc::new(AtomicU64::new(0)); let handle = MerkleTreePrunerHandle { aborted_sender, - target_retained_version: Arc::new(AtomicU64::new(0)), + target_retained_version: target_retained_version.clone(), }; let this = Self { db, @@ -111,7 +111,8 @@ impl MerkleTreePruner { self.poll_interval = poll_interval; } - /// Returns max version number that can be safely pruned, so that after pruning there is at least one version present after pruning + /// Returns max version number that can be safely pruned, so that after pruning there is at least one version present after pruning. + #[doc(hidden)] // Used in integration tests; logically private pub fn last_prunable_version(&self) -> Option { let manifest = self.db.manifest()?; manifest.version_count.checked_sub(1) @@ -122,14 +123,21 @@ impl MerkleTreePruner { pub fn prune_up_to(&mut self, target_retained_version: u64) -> Option { let min_stale_key_version = self.db.min_stale_key_version()?; - //We must leave at least one version + // We must retain at least one tree version. let last_prunable_version = self.last_prunable_version(); if last_prunable_version.is_none() { tracing::info!("Nothing to prune; skipping"); return None; } - let target_retained_version = min(target_retained_version, last_prunable_version?); + let target_retained_version = last_prunable_version?.min(target_retained_version); let stale_key_new_versions = min_stale_key_version..=target_retained_version; + if stale_key_new_versions.is_empty() { + tracing::info!( + "No Merkle tree versions can be pruned; min stale key version is {min_stale_key_version}, \ + target retained version is {target_retained_version}" + ); + return None; + } tracing::info!("Collecting stale keys with new versions in {stale_key_new_versions:?}"); let load_stale_keys_latency = PRUNING_TIMINGS.load_stale_keys.start(); @@ -167,39 +175,41 @@ impl MerkleTreePruner { } fn wait_for_abort(&mut self, timeout: Duration) -> bool { - self.aborted_receiver.recv_timeout(timeout).is_ok() + match self.aborted_receiver.recv_timeout(timeout) { + Ok(()) => true, // Abort was requested + Err(mpsc::RecvTimeoutError::Disconnected) => { + tracing::warn!("Pruner handle is dropped without calling `abort()`; exiting"); + true + } + Err(mpsc::RecvTimeoutError::Timeout) => { + // The pruner handle is alive and wasn't used to abort the pruner. + false + } + } } /// Runs this pruner indefinitely until it is aborted by calling abort() on its handle. - #[allow(clippy::missing_panics_doc)] pub fn run(mut self) { tracing::info!("Started Merkle tree pruner {self:?}"); - loop { - if self.wait_for_abort(Duration::ZERO) { - break; - } - let retained_version = *self - .target_retained_version - .read() - .expect("target_retained_version is poisoned"); - - if let Some(retained_version) = retained_version { - if let Some(stats) = self.prune_up_to(retained_version) { - stats.report(); - if stats.has_more_work() { - continue; - } + let mut wait_interval = Duration::ZERO; + while !self.wait_for_abort(wait_interval) { + let retained_version = self.target_retained_version.load(Ordering::Relaxed); + if let Some(stats) = self.prune_up_to(retained_version) { + tracing::debug!( + "Performed pruning for target retained version {retained_version}: {stats:?}" + ); + stats.report(); + if stats.has_more_work() { + continue; } + } else { + tracing::debug!( + "Pruning was not performed; waiting {:?}", + self.poll_interval + ); } - - tracing::debug!( - "Pruning was not performed; waiting {:?}", - self.poll_interval - ); - if self.wait_for_abort(self.poll_interval) { - break; - } + wait_interval = self.poll_interval; } } } From 8e7f9aec43e7ba577d2184f6ebaf40d04dae775f Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Wed, 17 Apr 2024 14:26:45 +0300 Subject: [PATCH 12/17] Sketch pruning task --- core/bin/external_node/src/main.rs | 4 +- .../zksync_core/src/api_server/tree/tests.rs | 8 +- core/lib/zksync_core/src/lib.rs | 12 +-- .../src/metadata_calculator/helpers.rs | 17 ++-- .../src/metadata_calculator/mod.rs | 35 +++++--- .../src/metadata_calculator/pruning.rs | 81 +++++++++++++++++++ .../src/metadata_calculator/recovery/tests.rs | 12 +-- .../src/metadata_calculator/tests.rs | 77 +++++++++--------- .../layers/metadata_calculator.rs | 16 ++-- 9 files changed, 175 insertions(+), 87 deletions(-) create mode 100644 core/lib/zksync_core/src/metadata_calculator/pruning.rs diff --git a/core/bin/external_node/src/main.rs b/core/bin/external_node/src/main.rs index 84c7ddf17ba4..4b2382920620 100644 --- a/core/bin/external_node/src/main.rs +++ b/core/bin/external_node/src/main.rs @@ -139,7 +139,7 @@ async fn run_tree( memtable_capacity: config.optional.merkle_tree_memtable_capacity(), stalled_writes_timeout: config.optional.merkle_tree_stalled_writes_timeout(), }; - let metadata_calculator = MetadataCalculator::new(metadata_calculator_config, None) + let metadata_calculator = MetadataCalculator::new(metadata_calculator_config, None, tree_pool) .await .context("failed initializing metadata calculator")?; let tree_reader = Arc::new(metadata_calculator.tree_reader()); @@ -158,7 +158,7 @@ async fn run_tree( })); } - let tree_handle = task::spawn(metadata_calculator.run(tree_pool, stop_receiver)); + let tree_handle = task::spawn(metadata_calculator.run(stop_receiver)); task_futures.push(tree_handle); Ok(tree_reader) diff --git a/core/lib/zksync_core/src/api_server/tree/tests.rs b/core/lib/zksync_core/src/api_server/tree/tests.rs index d59322938236..f3d3cb62390a 100644 --- a/core/lib/zksync_core/src/api_server/tree/tests.rs +++ b/core/lib/zksync_core/src/api_server/tree/tests.rs @@ -15,12 +15,12 @@ use crate::metadata_calculator::tests::{ async fn merkle_tree_api() { let pool = ConnectionPool::::test_pool().await; let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); - let (calculator, _) = setup_calculator(temp_dir.path(), &pool).await; + let (calculator, _) = setup_calculator(temp_dir.path(), pool.clone()).await; let api_addr = (Ipv4Addr::LOCALHOST, 0).into(); reset_db_state(&pool, 5).await; let tree_reader = calculator.tree_reader(); - let calculator_task = tokio::spawn(run_calculator(calculator, pool)); + let calculator_task = tokio::spawn(run_calculator(calculator)); let (stop_sender, stop_receiver) = watch::channel(false); let api_server = tree_reader @@ -77,7 +77,7 @@ async fn merkle_tree_api() { async fn local_merkle_tree_client() { let pool = ConnectionPool::::test_pool().await; let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); - let (calculator, _) = setup_calculator(temp_dir.path(), &pool).await; + let (calculator, _) = setup_calculator(temp_dir.path(), pool.clone()).await; reset_db_state(&pool, 5).await; let tree_reader = calculator.tree_reader(); @@ -86,7 +86,7 @@ async fn local_merkle_tree_client() { assert_matches!(err, TreeApiError::NotReady); // Wait until the calculator processes initial L1 batches. - run_calculator(calculator, pool).await; + run_calculator(calculator).await; let tree_info = tree_reader.get_info().await.unwrap(); assert!(tree_info.leaf_count > 20); diff --git a/core/lib/zksync_core/src/lib.rs b/core/lib/zksync_core/src/lib.rs index 3bc6f13c9ec0..b152aa11938a 100644 --- a/core/lib/zksync_core/src/lib.rs +++ b/core/lib/zksync_core/src/lib.rs @@ -923,7 +923,11 @@ async fn run_tree( tracing::info!("Initializing Merkle tree in {mode_str} mode"); let config = MetadataCalculatorConfig::for_main_node(merkle_tree_config, operation_manager); - let metadata_calculator = MetadataCalculator::new(config, object_store) + let pool = ConnectionPool::::singleton(postgres_config.master_url()?) + .build() + .await + .context("failed to build connection pool")?; + let metadata_calculator = MetadataCalculator::new(config, object_store, pool) .await .context("failed initializing metadata_calculator")?; if let Some(api_config) = api_config { @@ -941,11 +945,7 @@ async fn run_tree( let tree_health_check = metadata_calculator.tree_health_check(); app_health.insert_component(tree_health_check); - let pool = ConnectionPool::::singleton(postgres_config.master_url()?) - .build() - .await - .context("failed to build connection pool")?; - let tree_task = tokio::spawn(metadata_calculator.run(pool, stop_receiver)); + let tree_task = tokio::spawn(metadata_calculator.run(stop_receiver)); task_futures.push(tree_task); let elapsed = started_at.elapsed(); diff --git a/core/lib/zksync_core/src/metadata_calculator/helpers.rs b/core/lib/zksync_core/src/metadata_calculator/helpers.rs index f3f24d40a660..69c76f7c585d 100644 --- a/core/lib/zksync_core/src/metadata_calculator/helpers.rs +++ b/core/lib/zksync_core/src/metadata_calculator/helpers.rs @@ -19,13 +19,15 @@ use zksync_health_check::{Health, HealthStatus}; use zksync_merkle_tree::{ domain::{TreeMetadata, ZkSyncTree, ZkSyncTreeReader}, recovery::MerkleTreeRecovery, - Database, Key, MerkleTreePruner, MerkleTreePrunerHandle, NoVersionError, RocksDBWrapper, - TreeEntry, TreeEntryWithProof, TreeInstruction, + Database, Key, NoVersionError, RocksDBWrapper, TreeEntry, TreeEntryWithProof, TreeInstruction, }; use zksync_storage::{RocksDB, RocksDBOptions, StalledWritesRetries}; use zksync_types::{block::L1BatchHeader, L1BatchNumber, StorageKey, H256}; -use super::metrics::{LoadChangesStage, TreeUpdateStage, METRICS}; +use super::{ + metrics::{LoadChangesStage, TreeUpdateStage, METRICS}, + pruning::PruningHandles, +}; /// General information about the Merkle tree. #[derive(Debug, Serialize, Deserialize)] @@ -155,6 +157,10 @@ impl AsyncTree { self.mode } + pub fn pruner(&mut self) -> PruningHandles { + self.as_mut().pruner() + } + pub fn reader(&self) -> AsyncTreeReader { AsyncTreeReader { inner: self.inner.as_ref().expect(Self::INCONSISTENT_MSG).reader(), @@ -162,11 +168,6 @@ impl AsyncTree { } } - #[allow(dead_code)] - pub fn pruner(&self) -> (MerkleTreePruner, MerkleTreePrunerHandle) { - self.inner.as_ref().expect(Self::INCONSISTENT_MSG).pruner() - } - pub fn is_empty(&self) -> bool { self.as_ref().is_empty() } diff --git a/core/lib/zksync_core/src/metadata_calculator/mod.rs b/core/lib/zksync_core/src/metadata_calculator/mod.rs index e047b6c36c10..e086d56b8086 100644 --- a/core/lib/zksync_core/src/metadata_calculator/mod.rs +++ b/core/lib/zksync_core/src/metadata_calculator/mod.rs @@ -7,7 +7,7 @@ use std::{ }; use anyhow::Context as _; -use tokio::sync::watch; +use tokio::sync::{oneshot, watch}; use zksync_config::configs::{ chain::OperationsManagerConfig, database::{MerkleTreeConfig, MerkleTreeMode}, @@ -16,15 +16,17 @@ use zksync_dal::{ConnectionPool, Core}; use zksync_health_check::{HealthUpdater, ReactiveHealthCheck}; use zksync_object_store::ObjectStore; -pub use self::helpers::LazyAsyncTreeReader; pub(crate) use self::helpers::{AsyncTreeReader, L1BatchWithLogs, MerkleTreeInfo}; +pub use self::{helpers::LazyAsyncTreeReader, pruning::MerkleTreePruningTask}; use self::{ helpers::{create_db, Delayer, GenericAsyncTree, MerkleTreeHealth}, + pruning::PruningHandles, updater::TreeUpdater, }; mod helpers; mod metrics; +mod pruning; mod recovery; #[cfg(test)] pub(crate) mod tests; @@ -75,6 +77,8 @@ impl MetadataCalculatorConfig { pub struct MetadataCalculator { config: MetadataCalculatorConfig, tree_reader: watch::Sender>, + pruning_handles_sender: oneshot::Sender, + pool: ConnectionPool, object_store: Option>, delayer: Delayer, health_updater: HealthUpdater, @@ -86,6 +90,7 @@ impl MetadataCalculator { pub async fn new( config: MetadataCalculatorConfig, object_store: Option>, + pool: ConnectionPool, ) -> anyhow::Result { anyhow::ensure!( config.max_l1_batches_per_iter > 0, @@ -100,6 +105,8 @@ impl MetadataCalculator { let (_, health_updater) = ReactiveHealthCheck::new("tree"); Ok(Self { tree_reader: watch::channel(None).0, + pruning_handles_sender: oneshot::channel().0, + pool, object_store, delayer: Delayer::new(config.delay_interval), health_updater, @@ -118,6 +125,15 @@ impl MetadataCalculator { LazyAsyncTreeReader(self.tree_reader.subscribe()) } + /// Returns a task that can be used to prune the Merkle tree according to the pruning logs in Postgres. + /// This method should be called once; only the latest returned task will do any job, all previous ones + /// will terminate immediately. + pub fn pruning_task(&mut self, poll_interval: Duration) -> MerkleTreePruningTask { + let (pruning_handles_sender, pruning_handles) = oneshot::channel(); + self.pruning_handles_sender = pruning_handles_sender; + MerkleTreePruningTask::new(pruning_handles, self.pool.clone(), poll_interval) + } + async fn create_tree(&self) -> anyhow::Result { self.health_updater .update(MerkleTreeHealth::Initialization.into()); @@ -146,16 +162,12 @@ impl MetadataCalculator { Ok(GenericAsyncTree::new(db, self.config.mode).await) } - pub async fn run( - self, - pool: ConnectionPool, - stop_receiver: watch::Receiver, - ) -> anyhow::Result<()> { + pub async fn run(self, stop_receiver: watch::Receiver) -> anyhow::Result<()> { let tree = self.create_tree().await?; let tree = tree - .ensure_ready(&pool, &stop_receiver, &self.health_updater) + .ensure_ready(&self.pool, &stop_receiver, &self.health_updater) .await?; - let Some(tree) = tree else { + let Some(mut tree) = tree else { return Ok(()); // recovery was aborted because a stop signal was received }; let tree_reader = tree.reader(); @@ -164,11 +176,14 @@ impl MetadataCalculator { tree_reader.clone().info().await ); + if !self.pruning_handles_sender.is_closed() { + self.pruning_handles_sender.send(tree.pruner()).ok(); + } self.tree_reader.send_replace(Some(tree_reader)); let updater = TreeUpdater::new(tree, self.max_l1_batches_per_iter, self.object_store); updater - .loop_updating_tree(self.delayer, &pool, stop_receiver, self.health_updater) + .loop_updating_tree(self.delayer, &self.pool, stop_receiver, self.health_updater) .await?; Ok(()) diff --git a/core/lib/zksync_core/src/metadata_calculator/pruning.rs b/core/lib/zksync_core/src/metadata_calculator/pruning.rs new file mode 100644 index 000000000000..3d9aef078a11 --- /dev/null +++ b/core/lib/zksync_core/src/metadata_calculator/pruning.rs @@ -0,0 +1,81 @@ +//! Merkle tree pruning logic. + +use std::time::Duration; + +use anyhow::Context as _; +use tokio::sync::{oneshot, watch}; +use zksync_dal::{ConnectionPool, Core, CoreDal}; +use zksync_merkle_tree::{MerkleTreePruner, MerkleTreePrunerHandle, RocksDBWrapper}; + +pub(super) type PruningHandles = (MerkleTreePruner, MerkleTreePrunerHandle); + +/// Task performing Merkle tree pruning according to the pruning entries in Postgres. +#[derive(Debug)] +#[must_use = "Task should `run()` in a managed Tokio task"] +pub struct MerkleTreePruningTask { + handles: oneshot::Receiver, + pool: ConnectionPool, + poll_interval: Duration, +} + +impl MerkleTreePruningTask { + pub(super) fn new( + handles: oneshot::Receiver, + pool: ConnectionPool, + poll_interval: Duration, + ) -> Self { + Self { + handles, + pool, + poll_interval, + } + } + + pub async fn run(self, mut stop_receiver: watch::Receiver) -> anyhow::Result<()> { + let (pruner, pruner_handle); + tokio::select! { + res = self.handles => { + match res { + Ok(handles) => (pruner, pruner_handle) = handles, + Err(_) => { + tracing::info!("Merkle tree dropped; shutting down tree pruning"); + return Ok(()); + } + } + } + _ = stop_receiver.changed() => { + tracing::info!("Stop signal received before Merkle tree is initialized; shutting down tree pruning"); + return Ok(()); + } + } + tracing::info!("Obtained pruning handles; starting Merkle tree pruning"); + + // FIXME: is this good enough (vs a managed task)? + let pruner_task_handle = tokio::task::spawn_blocking(|| pruner.run()); + + while !*stop_receiver.borrow_and_update() { + let mut storage = self.pool.connection_tagged("metadata_calculator").await?; + let l1_batch_number = storage + .blocks_dal() + .get_sealed_l1_batch_number() + .await? + .unwrap(); // FIXME: get actual pruning data + drop(storage); + + pruner_handle.set_target_retained_version(l1_batch_number.0.into()); + + if tokio::time::timeout(self.poll_interval, stop_receiver.changed()) + .await + .is_ok() + { + break; + } + } + + tracing::info!("Stop signal received, Merkle tree pruning is shutting down"); + pruner_handle.abort(); + pruner_task_handle + .await + .context("Merkle tree pruning thread panicked") + } +} diff --git a/core/lib/zksync_core/src/metadata_calculator/recovery/tests.rs b/core/lib/zksync_core/src/metadata_calculator/recovery/tests.rs index 8ff5421ae9d8..97e822b12122 100644 --- a/core/lib/zksync_core/src/metadata_calculator/recovery/tests.rs +++ b/core/lib/zksync_core/src/metadata_calculator/recovery/tests.rs @@ -62,7 +62,7 @@ async fn create_tree_recovery(path: PathBuf, l1_batch: L1BatchNumber) -> AsyncTr async fn basic_recovery_workflow() { let pool = ConnectionPool::::test_pool().await; let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); - let snapshot_recovery = prepare_recovery_snapshot_with_genesis(&pool, &temp_dir).await; + let snapshot_recovery = prepare_recovery_snapshot_with_genesis(pool.clone(), &temp_dir).await; let snapshot = SnapshotParameters::new(&pool, &snapshot_recovery) .await .unwrap(); @@ -94,7 +94,7 @@ async fn basic_recovery_workflow() { } async fn prepare_recovery_snapshot_with_genesis( - pool: &ConnectionPool, + pool: ConnectionPool, temp_dir: &TempDir, ) -> SnapshotRecoveryStatus { let mut storage = pool.connection().await.unwrap(); @@ -118,7 +118,7 @@ async fn prepare_recovery_snapshot_with_genesis( // Ensure that metadata for L1 batch #1 is present in the DB. let (calculator, _) = setup_calculator(&temp_dir.path().join("init"), pool).await; - let l1_batch_root_hash = run_calculator(calculator, pool.clone()).await; + let l1_batch_root_hash = run_calculator(calculator).await; SnapshotRecoveryStatus { l1_batch_number: L1BatchNumber(1), @@ -175,7 +175,7 @@ impl HandleRecoveryEvent for TestEventListener { async fn recovery_fault_tolerance(chunk_count: u64) { let pool = ConnectionPool::::test_pool().await; let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); - let snapshot_recovery = prepare_recovery_snapshot_with_genesis(&pool, &temp_dir).await; + let snapshot_recovery = prepare_recovery_snapshot_with_genesis(pool.clone(), &temp_dir).await; let tree_path = temp_dir.path().join("recovery"); let tree = create_tree_recovery(tree_path.clone(), L1BatchNumber(1)).await; @@ -260,7 +260,7 @@ async fn entire_recovery_workflow(case: RecoveryWorkflowCase) { &merkle_tree_config, &OperationsManagerConfig { delay_interval: 50 }, ); - let mut calculator = MetadataCalculator::new(calculator_config, None) + let mut calculator = MetadataCalculator::new(calculator_config, None, pool.clone()) .await .unwrap(); let (delay_sx, mut delay_rx) = mpsc::unbounded_channel(); @@ -268,7 +268,7 @@ async fn entire_recovery_workflow(case: RecoveryWorkflowCase) { let (stop_sender, stop_receiver) = watch::channel(false); let tree_reader = calculator.tree_reader(); - let calculator_task = tokio::spawn(calculator.run(pool.clone(), stop_receiver)); + let calculator_task = tokio::spawn(calculator.run(stop_receiver)); match case { // Wait until the tree is fully initialized and stop the calculator. diff --git a/core/lib/zksync_core/src/metadata_calculator/tests.rs b/core/lib/zksync_core/src/metadata_calculator/tests.rs index 20ef73dee057..3c280262db7c 100644 --- a/core/lib/zksync_core/src/metadata_calculator/tests.rs +++ b/core/lib/zksync_core/src/metadata_calculator/tests.rs @@ -45,9 +45,9 @@ async fn genesis_creation() { let pool = ConnectionPool::::test_pool().await; let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); - let (calculator, _) = setup_calculator(temp_dir.path(), &pool).await; - run_calculator(calculator, pool.clone()).await; - let (calculator, _) = setup_calculator(temp_dir.path(), &pool).await; + let (calculator, _) = setup_calculator(temp_dir.path(), pool.clone()).await; + run_calculator(calculator).await; + let (calculator, _) = setup_calculator(temp_dir.path(), pool).await; let tree = calculator.create_tree().await.unwrap(); let GenericAsyncTree::Ready(tree) = tree else { @@ -62,9 +62,9 @@ async fn basic_workflow() { let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); - let (calculator, object_store) = setup_calculator(temp_dir.path(), &pool).await; + let (calculator, object_store) = setup_calculator(temp_dir.path(), pool.clone()).await; reset_db_state(&pool, 1).await; - let merkle_tree_hash = run_calculator(calculator, pool.clone()).await; + let merkle_tree_hash = run_calculator(calculator).await; // Check the hash against the reference. let expected_tree_hash = expected_tree_hash(&pool).await; @@ -77,7 +77,7 @@ async fn basic_workflow() { // ^ The exact values depend on ops in genesis block assert!(merkle_paths.iter().all(|log| log.is_write)); - let (calculator, _) = setup_calculator(temp_dir.path(), &pool).await; + let (calculator, _) = setup_calculator(temp_dir.path(), pool).await; let tree = calculator.create_tree().await.unwrap(); let GenericAsyncTree::Ready(tree) = tree else { panic!("Unexpected tree state: {tree:?}"); @@ -110,7 +110,7 @@ async fn status_receiver_has_correct_states() { let pool = ConnectionPool::::test_pool().await; let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); - let (mut calculator, _) = setup_calculator(temp_dir.path(), &pool).await; + let (mut calculator, _) = setup_calculator(temp_dir.path(), pool.clone()).await; let tree_health_check = calculator.tree_health_check(); assert_eq!(tree_health_check.name(), "tree"); let health = tree_health_check.check_health().await; @@ -126,7 +126,7 @@ async fn status_receiver_has_correct_states() { let (delay_sx, mut delay_rx) = mpsc::unbounded_channel(); calculator.delayer.delay_notifier = delay_sx; - let calculator_handle = tokio::spawn(calculator.run(pool, stop_rx)); + let calculator_handle = tokio::spawn(calculator.run(stop_rx)); delay_rx.recv().await.unwrap(); assert_eq!( tree_health_check.check_health().await.status(), @@ -159,15 +159,15 @@ async fn multi_l1_batch_workflow() { // Collect all storage logs in a single L1 batch let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); - let (calculator, _) = setup_calculator(temp_dir.path(), &pool).await; + let (calculator, _) = setup_calculator(temp_dir.path(), pool.clone()).await; reset_db_state(&pool, 1).await; - let root_hash = run_calculator(calculator, pool.clone()).await; + let root_hash = run_calculator(calculator).await; // Collect the same logs in multiple L1 batches let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); - let (calculator, object_store) = setup_calculator(temp_dir.path(), &pool).await; + let (calculator, object_store) = setup_calculator(temp_dir.path(), pool.clone()).await; reset_db_state(&pool, 10).await; - let multi_block_root_hash = run_calculator(calculator, pool).await; + let multi_block_root_hash = run_calculator(calculator).await; assert_eq!(multi_block_root_hash, root_hash); let mut prev_index = None; @@ -194,16 +194,16 @@ async fn running_metadata_calculator_with_additional_blocks() { let pool = ConnectionPool::::test_pool().await; let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); - let calculator = setup_lightweight_calculator(temp_dir.path(), &pool).await; + let calculator = setup_lightweight_calculator(temp_dir.path(), pool.clone()).await; reset_db_state(&pool, 5).await; - run_calculator(calculator, pool.clone()).await; + run_calculator(calculator).await; - let mut calculator = setup_lightweight_calculator(temp_dir.path(), &pool).await; + let mut calculator = setup_lightweight_calculator(temp_dir.path(), pool.clone()).await; let (stop_sx, stop_rx) = watch::channel(false); let (delay_sx, mut delay_rx) = mpsc::unbounded_channel(); calculator.delayer.delay_notifier = delay_sx; - let calculator_handle = tokio::spawn(calculator.run(pool.clone(), stop_rx)); + let calculator_handle = tokio::spawn(calculator.run(stop_rx)); // Wait until the calculator has processed initial L1 batches. let (next_l1_batch, _) = tokio::time::timeout(RUN_TIMEOUT, delay_rx.recv()) .await @@ -234,8 +234,8 @@ async fn running_metadata_calculator_with_additional_blocks() { .unwrap(); // Switch to the full tree. It should pick up from the same spot and result in the same tree root hash. - let (calculator, _) = setup_calculator(temp_dir.path(), &pool).await; - let root_hash_for_full_tree = run_calculator(calculator, pool).await; + let (calculator, _) = setup_calculator(temp_dir.path(), pool).await; + let root_hash_for_full_tree = run_calculator(calculator).await; assert_eq!(root_hash_for_full_tree, updated_root_hash); } @@ -248,12 +248,13 @@ async fn shutting_down_calculator() { operation_config.delay_interval = 30_000; // ms; chosen to be larger than `RUN_TIMEOUT` let calculator = - setup_calculator_with_options(&merkle_tree_config, &operation_config, &pool, None).await; + setup_calculator_with_options(&merkle_tree_config, &operation_config, pool.clone(), None) + .await; reset_db_state(&pool, 5).await; let (stop_sx, stop_rx) = watch::channel(false); - let calculator_task = tokio::spawn(calculator.run(pool, stop_rx)); + let calculator_task = tokio::spawn(calculator.run(stop_rx)); tokio::time::sleep(Duration::from_millis(100)).await; stop_sx.send_replace(true); run_with_timeout(RUN_TIMEOUT, calculator_task) @@ -268,9 +269,9 @@ async fn test_postgres_backup_recovery( ) { let pool = ConnectionPool::::test_pool().await; let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); - let calculator = setup_lightweight_calculator(temp_dir.path(), &pool).await; + let calculator = setup_lightweight_calculator(temp_dir.path(), pool.clone()).await; reset_db_state(&pool, 5).await; - run_calculator(calculator, pool.clone()).await; + run_calculator(calculator).await; // Simulate recovery from a DB snapshot in which some newer L1 batches are erased. let last_batch_after_recovery = L1BatchNumber(3); @@ -293,12 +294,12 @@ async fn test_postgres_backup_recovery( } drop(storage); - let mut calculator = setup_lightweight_calculator(temp_dir.path(), &pool).await; + let mut calculator = setup_lightweight_calculator(temp_dir.path(), pool.clone()).await; let (stop_sx, stop_rx) = watch::channel(false); let (delay_sx, mut delay_rx) = mpsc::unbounded_channel(); calculator.delayer.delay_notifier = delay_sx; - let calculator_handle = tokio::spawn(calculator.run(pool.clone(), stop_rx)); + let calculator_handle = tokio::spawn(calculator.run(stop_rx)); // Wait until the calculator has processed initial L1 batches. let (next_l1_batch, _) = tokio::time::timeout(RUN_TIMEOUT, delay_rx.recv()) .await @@ -357,7 +358,7 @@ async fn postgres_backup_recovery_with_excluded_metadata() { pub(crate) async fn setup_calculator( db_path: &Path, - pool: &ConnectionPool, + pool: ConnectionPool, ) -> (MetadataCalculator, Arc) { let store_factory = ObjectStoreFactory::mock(); let store = store_factory.create_store().await; @@ -370,7 +371,7 @@ pub(crate) async fn setup_calculator( async fn setup_lightweight_calculator( db_path: &Path, - pool: &ConnectionPool, + pool: ConnectionPool, ) -> MetadataCalculator { let (db_config, operation_config) = create_config(db_path, MerkleTreeMode::Lightweight); setup_calculator_with_options(&db_config, &operation_config, pool, None).await @@ -395,33 +396,29 @@ fn create_config( async fn setup_calculator_with_options( merkle_tree_config: &MerkleTreeConfig, operation_config: &OperationsManagerConfig, - pool: &ConnectionPool, + pool: ConnectionPool, object_store: Option>, ) -> MetadataCalculator { - let mut calculator_config = - MetadataCalculatorConfig::for_main_node(merkle_tree_config, operation_config); - calculator_config.tree_pruner_polling_interval = Duration::ZERO; - let metadata_calculator = MetadataCalculator::new(calculator_config, object_store) - .await - .unwrap(); - let mut storage = pool.connection().await.unwrap(); if storage.blocks_dal().is_genesis_needed().await.unwrap() { insert_genesis_batch(&mut storage, &GenesisParams::mock()) .await .unwrap(); } - metadata_calculator + drop(storage); + + let calculator_config = + MetadataCalculatorConfig::for_main_node(merkle_tree_config, operation_config); + MetadataCalculator::new(calculator_config, object_store, pool) + .await + .unwrap() } fn path_to_string(path: &Path) -> String { path.to_str().unwrap().to_owned() } -pub(crate) async fn run_calculator( - mut calculator: MetadataCalculator, - pool: ConnectionPool, -) -> H256 { +pub(crate) async fn run_calculator(mut calculator: MetadataCalculator) -> H256 { let (stop_sx, stop_rx) = watch::channel(false); let (delay_sx, mut delay_rx) = mpsc::unbounded_channel(); calculator.delayer.delay_notifier = delay_sx; @@ -436,7 +433,7 @@ pub(crate) async fn run_calculator( root_hash }); - run_with_timeout(RUN_TIMEOUT, calculator.run(pool, stop_rx)) + run_with_timeout(RUN_TIMEOUT, calculator.run(stop_rx)) .await .unwrap(); delayer_handle.await.unwrap() diff --git a/core/node/node_framework/src/implementations/layers/metadata_calculator.rs b/core/node/node_framework/src/implementations/layers/metadata_calculator.rs index bd7bba618ba4..3bc2cd448ca1 100644 --- a/core/node/node_framework/src/implementations/layers/metadata_calculator.rs +++ b/core/node/node_framework/src/implementations/layers/metadata_calculator.rs @@ -1,5 +1,5 @@ +use anyhow::Context as _; use zksync_core::metadata_calculator::{MetadataCalculator, MetadataCalculatorConfig}; -use zksync_dal::{ConnectionPool, Core}; use zksync_storage::RocksDB; use crate::{ @@ -26,7 +26,6 @@ pub struct MetadataCalculatorLayer(pub MetadataCalculatorConfig); #[derive(Debug)] pub struct MetadataCalculatorTask { metadata_calculator: MetadataCalculator, - main_pool: ConnectionPool, } #[async_trait::async_trait] @@ -37,7 +36,7 @@ impl WiringLayer for MetadataCalculatorLayer { async fn wire(self: Box, mut context: ServiceContext<'_>) -> Result<(), WiringError> { let pool = context.get_resource::().await?; - let main_pool = pool.get().await.unwrap(); + let main_pool = pool.get().await?; let object_store = context.get_resource::().await.ok(); // OK to be None. if object_store.is_none() { @@ -47,14 +46,13 @@ impl WiringLayer for MetadataCalculatorLayer { } let metadata_calculator = - MetadataCalculator::new(self.0, object_store.map(|os| os.0)).await?; + MetadataCalculator::new(self.0, object_store.map(|os| os.0), main_pool).await?; let AppHealthCheckResource(app_health) = context.get_resource_or_default().await; app_health.insert_component(metadata_calculator.tree_health_check()); let task = Box::new(MetadataCalculatorTask { metadata_calculator, - main_pool, }); context.add_task(task); Ok(()) @@ -68,16 +66,12 @@ impl Task for MetadataCalculatorTask { } async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { - let result = self - .metadata_calculator - .run(self.main_pool, stop_receiver.0) - .await; + let result = self.metadata_calculator.run(stop_receiver.0).await; // Wait for all the instances of RocksDB to be destroyed. tokio::task::spawn_blocking(RocksDB::await_rocksdb_termination) .await - .unwrap(); - + .context("failed terminating RocksDB instances")?; result } } From 1c254124b20f71da6215e937d9cb7e1c7b5c0619 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Wed, 17 Apr 2024 17:15:16 +0300 Subject: [PATCH 13/17] Add pruning-related tree getters --- core/lib/merkle_tree/src/domain.rs | 22 ++++++++++++++++++++++ core/lib/merkle_tree/src/lib.rs | 13 +++++++++++++ core/lib/merkle_tree/src/pruning.rs | 12 +++++++++--- 3 files changed, 44 insertions(+), 3 deletions(-) diff --git a/core/lib/merkle_tree/src/domain.rs b/core/lib/merkle_tree/src/domain.rs index 16d2a72117da..09587a6ce2c4 100644 --- a/core/lib/merkle_tree/src/domain.rs +++ b/core/lib/merkle_tree/src/domain.rs @@ -6,6 +6,7 @@ use zksync_prover_interface::inputs::{PrepareBasicCircuitsJob, StorageLogMetadat use zksync_types::{L1BatchNumber, StorageKey}; use crate::{ + consistency::ConsistencyError, storage::{PatchSet, Patched, RocksDBWrapper}, types::{ Key, Root, TreeEntry, TreeEntryWithProof, TreeInstruction, TreeLogEntry, ValueHash, @@ -378,6 +379,14 @@ impl ZkSyncTreeReader { L1BatchNumber(number) } + /// Returns the minimum L1 batch number retained by the tree. + #[allow(clippy::missing_panics_doc)] + pub fn min_l1_batch_number(&self) -> Option { + self.0.first_retained_version().map(|version| { + L1BatchNumber(u32::try_from(version).expect("integer overflow for L1 batch number")) + }) + } + /// Returns the number of leaves in the tree. pub fn leaf_count(&self) -> u64 { self.0.latest_root().leaf_count() @@ -397,4 +406,17 @@ impl ZkSyncTreeReader { let version = u64::from(l1_batch_number.0); self.0.entries_with_proofs(version, keys) } + + /// Verifies consistency of the tree at the specified L1 batch number. + /// + /// # Errors + /// + /// Returns the first encountered verification error, should one occur. + pub fn verify_consistency( + &self, + l1_batch_number: L1BatchNumber, + ) -> Result<(), ConsistencyError> { + let version = l1_batch_number.0.into(); + self.0.verify_consistency(version, true) + } } diff --git a/core/lib/merkle_tree/src/lib.rs b/core/lib/merkle_tree/src/lib.rs index 738c79c3718a..caa965751578 100644 --- a/core/lib/merkle_tree/src/lib.rs +++ b/core/lib/merkle_tree/src/lib.rs @@ -241,6 +241,19 @@ impl MerkleTree { } } +impl MerkleTree { + /// Returns the first retained version of the tree. + pub fn first_retained_version(&self) -> Option { + match self.db.min_stale_key_version() { + // Min stale key version is next after the first retained version since at least + // the root is updated on each version. + Some(version) => version.checked_sub(1), + // No stale keys means all past versions of the tree have been pruned + None => self.latest_version(), + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/core/lib/merkle_tree/src/pruning.rs b/core/lib/merkle_tree/src/pruning.rs index 832c4a697cdc..8b96bf50115e 100644 --- a/core/lib/merkle_tree/src/pruning.rs +++ b/core/lib/merkle_tree/src/pruning.rs @@ -32,9 +32,11 @@ impl MerkleTreePrunerHandle { /// Sets the version of the tree the pruner should attempt to prune to. Calls should provide /// monotonically increasing versions; call with a lesser version will have no effect. - pub fn set_target_retained_version(&self, new_version: u64) { + /// + /// Returns the previously set target retained version. + pub fn set_target_retained_version(&self, new_version: u64) -> u64 { self.target_retained_version - .fetch_max(new_version, Ordering::Relaxed); + .fetch_max(new_version, Ordering::Relaxed) } } @@ -244,8 +246,9 @@ mod tests { #[test] fn pruner_basics() { let mut db = create_db(); - let (mut pruner, _handle) = MerkleTreePruner::new(&mut db); + assert_eq!(MerkleTree::new(&mut db).first_retained_version(), Some(0)); + let (mut pruner, _handle) = MerkleTreePruner::new(&mut db); let stats = pruner .prune_up_to(pruner.last_prunable_version().unwrap()) .unwrap(); @@ -259,6 +262,8 @@ mod tests { assert!(db.root_mut(version).is_none()); } assert!(db.root_mut(4).is_some()); + + assert_eq!(MerkleTree::new(&mut db).first_retained_version(), Some(4)); } #[test] @@ -321,6 +326,7 @@ mod tests { assert_no_stale_keys(&db, first_retained_version); let mut tree = MerkleTree::new(&mut db); + assert_eq!(tree.first_retained_version(), Some(first_retained_version)); for version in first_retained_version..=latest_version { tree.verify_consistency(version, true).unwrap(); } From 29f417dc839b22cf3b8310556b188f9dedc46f88 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Wed, 17 Apr 2024 17:15:55 +0300 Subject: [PATCH 14/17] Add basic pruning task tests --- .../src/metadata_calculator/helpers.rs | 10 + .../src/metadata_calculator/pruning.rs | 174 +++++++++++++++++- 2 files changed, 177 insertions(+), 7 deletions(-) diff --git a/core/lib/zksync_core/src/metadata_calculator/helpers.rs b/core/lib/zksync_core/src/metadata_calculator/helpers.rs index d9d0dacb44c9..8b72cb86f784 100644 --- a/core/lib/zksync_core/src/metadata_calculator/helpers.rs +++ b/core/lib/zksync_core/src/metadata_calculator/helpers.rs @@ -36,6 +36,7 @@ pub struct MerkleTreeInfo { pub mode: MerkleTreeMode, pub root_hash: H256, pub next_l1_batch_number: L1BatchNumber, + pub min_l1_batch_number: Option, pub leaf_count: u64, } @@ -234,12 +235,21 @@ impl AsyncTreeReader { mode: self.mode, root_hash: self.inner.root_hash(), next_l1_batch_number: self.inner.next_l1_batch_number(), + min_l1_batch_number: self.inner.min_l1_batch_number(), leaf_count: self.inner.leaf_count(), }) .await .unwrap() } + #[cfg(test)] + pub async fn verify_consistency(self, l1_batch_number: L1BatchNumber) -> anyhow::Result<()> { + tokio::task::spawn_blocking(move || self.inner.verify_consistency(l1_batch_number)) + .await + .context("tree consistency verification panicked")? + .map_err(Into::into) + } + pub async fn entries_with_proofs( self, l1_batch_number: L1BatchNumber, diff --git a/core/lib/zksync_core/src/metadata_calculator/pruning.rs b/core/lib/zksync_core/src/metadata_calculator/pruning.rs index 3d9aef078a11..3d0f0ac82be5 100644 --- a/core/lib/zksync_core/src/metadata_calculator/pruning.rs +++ b/core/lib/zksync_core/src/metadata_calculator/pruning.rs @@ -32,7 +32,7 @@ impl MerkleTreePruningTask { } pub async fn run(self, mut stop_receiver: watch::Receiver) -> anyhow::Result<()> { - let (pruner, pruner_handle); + let (mut pruner, pruner_handle); tokio::select! { res = self.handles => { match res { @@ -51,18 +51,22 @@ impl MerkleTreePruningTask { tracing::info!("Obtained pruning handles; starting Merkle tree pruning"); // FIXME: is this good enough (vs a managed task)? + pruner.set_poll_interval(self.poll_interval); let pruner_task_handle = tokio::task::spawn_blocking(|| pruner.run()); while !*stop_receiver.borrow_and_update() { let mut storage = self.pool.connection_tagged("metadata_calculator").await?; - let l1_batch_number = storage - .blocks_dal() - .get_sealed_l1_batch_number() - .await? - .unwrap(); // FIXME: get actual pruning data + let pruning_info = storage.pruning_dal().get_pruning_info().await?; drop(storage); - pruner_handle.set_target_retained_version(l1_batch_number.0.into()); + if let Some(l1_batch_number) = pruning_info.last_hard_pruned_l1_batch { + let target_retained_version = u64::from(l1_batch_number.0) + 1; + let prev_target_version = + pruner_handle.set_target_retained_version(target_retained_version); + if prev_target_version != target_retained_version { + tracing::info!("Set target retained tree version from {prev_target_version} to {target_retained_version}"); + } + } if tokio::time::timeout(self.poll_interval, stop_receiver.changed()) .await @@ -79,3 +83,159 @@ impl MerkleTreePruningTask { .context("Merkle tree pruning thread panicked") } } + +#[cfg(test)] +mod tests { + use tempfile::TempDir; + use zksync_types::{L1BatchNumber, MiniblockNumber}; + + use super::*; + use crate::{ + genesis::{insert_genesis_batch, GenesisParams}, + metadata_calculator::{ + tests::{extend_db_state_from_l1_batch, gen_storage_logs, mock_config, reset_db_state}, + MetadataCalculator, + }, + utils::testonly::prepare_recovery_snapshot, + }; + + const POLL_INTERVAL: Duration = Duration::from_millis(50); + + #[tokio::test] + async fn basic_tree_pruning_workflow() { + let pool = ConnectionPool::::test_pool().await; + let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); + let config = mock_config(temp_dir.path()); + let mut storage = pool.connection().await.unwrap(); + insert_genesis_batch(&mut storage, &GenesisParams::mock()) + .await + .unwrap(); + reset_db_state(&pool, 5).await; + + let mut calculator = MetadataCalculator::new(config, None, pool.clone()) + .await + .unwrap(); + let reader = calculator.tree_reader(); + let pruning_task = calculator.pruning_task(POLL_INTERVAL); + let (stop_sender, stop_receiver) = watch::channel(false); + let calculator_handle = tokio::spawn(calculator.run(stop_receiver.clone())); + let pruning_task_handle = tokio::spawn(pruning_task.run(stop_receiver)); + + // Wait until the calculator is initialized. + let reader = reader.wait().await; + while reader.clone().info().await.next_l1_batch_number < L1BatchNumber(6) { + tokio::time::sleep(POLL_INTERVAL).await; + } + + // Add a pruning log to force pruning. + storage + .pruning_dal() + .hard_prune_batches_range(L1BatchNumber(3), MiniblockNumber(3)) + .await + .unwrap(); + + while reader.clone().info().await.min_l1_batch_number.unwrap() <= L1BatchNumber(3) { + tokio::time::sleep(POLL_INTERVAL).await; + } + + reader.verify_consistency(L1BatchNumber(5)).await.unwrap(); + + stop_sender.send_replace(true); + calculator_handle.await.unwrap().unwrap(); + pruning_task_handle.await.unwrap().unwrap(); + } + + #[tokio::test] + async fn pruning_after_snapshot_recovery() { + let pool = ConnectionPool::::test_pool().await; + let snapshot_logs = gen_storage_logs(100..300, 1).pop().unwrap(); + let mut storage = pool.connection().await.unwrap(); + let snapshot_recovery = prepare_recovery_snapshot( + &mut storage, + L1BatchNumber(23), + MiniblockNumber(23), + &snapshot_logs, + ) + .await; + + let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); + let config = mock_config(temp_dir.path()); + + let mut calculator = MetadataCalculator::new(config, None, pool.clone()) + .await + .unwrap(); + let reader = calculator.tree_reader(); + let pruning_task = calculator.pruning_task(POLL_INTERVAL); + let (stop_sender, stop_receiver) = watch::channel(false); + let calculator_handle = tokio::spawn(calculator.run(stop_receiver.clone())); + let pruning_task_handle = tokio::spawn(pruning_task.run(stop_receiver)); + + // Wait until the calculator is initialized. + let reader = reader.wait().await; + let tree_info = reader.clone().info().await; + assert_eq!( + tree_info.next_l1_batch_number, + snapshot_recovery.l1_batch_number + 1 + ); + assert_eq!( + tree_info.min_l1_batch_number, + Some(snapshot_recovery.l1_batch_number) + ); + + // Add some new L1 batches and wait for them to be processed. + let mut new_logs = gen_storage_logs(500..600, 5); + { + let mut storage = storage.start_transaction().await.unwrap(); + // Logs must be sorted by `log.key` to match their enum index assignment + for batch_logs in &mut new_logs { + batch_logs.sort_unstable_by_key(|log| log.key); + } + + extend_db_state_from_l1_batch( + &mut storage, + snapshot_recovery.l1_batch_number + 1, + new_logs, + ) + .await; + storage.commit().await.unwrap(); + } + + let original_tree_info = loop { + let tree_info = reader.clone().info().await; + if tree_info.next_l1_batch_number == snapshot_recovery.l1_batch_number + 6 { + break tree_info; + } + tokio::time::sleep(POLL_INTERVAL).await; + }; + + // Prune first 3 created batches in Postgres. + storage + .pruning_dal() + .hard_prune_batches_range( + snapshot_recovery.l1_batch_number + 3, + snapshot_recovery.miniblock_number + 3, + ) + .await + .unwrap(); + + // Check that the batches are pruned in the tree. + let pruned_tree_info = loop { + let tree_info = reader.clone().info().await; + if tree_info.min_l1_batch_number.unwrap() > snapshot_recovery.l1_batch_number + 3 { + break tree_info; + } + tokio::time::sleep(POLL_INTERVAL).await; + }; + + assert_eq!(pruned_tree_info.root_hash, original_tree_info.root_hash); + assert_eq!(pruned_tree_info.leaf_count, original_tree_info.leaf_count); + reader + .verify_consistency(pruned_tree_info.next_l1_batch_number - 1) + .await + .unwrap(); + + stop_sender.send_replace(true); + calculator_handle.await.unwrap().unwrap(); + pruning_task_handle.await.unwrap().unwrap(); + } +} From fb13898fb19a2eca0e5c346640844106c0232092 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Fri, 19 Apr 2024 14:15:11 +0300 Subject: [PATCH 15/17] Simplify metadata calc initialization --- core/bin/external_node/src/main.rs | 8 ++++---- core/lib/zksync_core/src/lib.rs | 5 +++-- .../lib/zksync_core/src/metadata_calculator/mod.rs | 14 +++++++++----- .../zksync_core/src/metadata_calculator/pruning.rs | 4 ++-- .../src/metadata_calculator/recovery/tests.rs | 7 +++---- .../zksync_core/src/metadata_calculator/tests.rs | 2 +- .../implementations/layers/metadata_calculator.rs | 4 ++-- 7 files changed, 24 insertions(+), 20 deletions(-) diff --git a/core/bin/external_node/src/main.rs b/core/bin/external_node/src/main.rs index 80da90c89c0f..7d6033a22307 100644 --- a/core/bin/external_node/src/main.rs +++ b/core/bin/external_node/src/main.rs @@ -160,10 +160,10 @@ async fn run_tree( .await .context("failed creating DB pool for Merkle tree recovery")?; - let metadata_calculator = - MetadataCalculator::new(metadata_calculator_config, None, tree_pool, recovery_pool) - .await - .context("failed initializing metadata calculator")?; + let metadata_calculator = MetadataCalculator::new(metadata_calculator_config, None, tree_pool) + .await + .context("failed initializing metadata calculator")? + .with_recovery_pool(recovery_pool); let tree_reader = Arc::new(metadata_calculator.tree_reader()); app_health.insert_component(metadata_calculator.tree_health_check()); diff --git a/core/lib/zksync_core/src/lib.rs b/core/lib/zksync_core/src/lib.rs index 0df47cc0373d..9db18d973cc0 100644 --- a/core/lib/zksync_core/src/lib.rs +++ b/core/lib/zksync_core/src/lib.rs @@ -998,9 +998,10 @@ async fn run_tree( .build() .await .context("failed to build connection pool for Merkle tree recovery")?; - let metadata_calculator = MetadataCalculator::new(config, object_store, pool, recovery_pool) + let metadata_calculator = MetadataCalculator::new(config, object_store, pool) .await - .context("failed initializing metadata_calculator")?; + .context("failed initializing metadata_calculator")? + .with_recovery_pool(recovery_pool); if let Some(api_config) = api_config { let address = (Ipv4Addr::UNSPECIFIED, api_config.port).into(); diff --git a/core/lib/zksync_core/src/metadata_calculator/mod.rs b/core/lib/zksync_core/src/metadata_calculator/mod.rs index 17a79e675ce6..14c130da4b54 100644 --- a/core/lib/zksync_core/src/metadata_calculator/mod.rs +++ b/core/lib/zksync_core/src/metadata_calculator/mod.rs @@ -102,14 +102,11 @@ impl MetadataCalculator { /// /// # Arguments /// - /// - `pool` can have a single connection. - /// - `recovery_pool` will only be used in case of snapshot recovery. It should have multiple connections (e.g., 10) - /// to speed up recovery. + /// - `pool` can have a single connection (but then you should set a separate recovery pool). pub async fn new( config: MetadataCalculatorConfig, object_store: Option>, pool: ConnectionPool, - recovery_pool: ConnectionPool, ) -> anyhow::Result { if let Err(err) = METRICS.info.set(ConfigLabels::new(&config)) { tracing::warn!( @@ -134,8 +131,8 @@ impl MetadataCalculator { tree_reader: watch::channel(None).0, pruning_handles_sender: oneshot::channel().0, object_store, + recovery_pool: pool.clone(), pool, - recovery_pool, delayer: Delayer::new(config.delay_interval), health_updater, max_l1_batches_per_iter: config.max_l1_batches_per_iter, @@ -143,6 +140,13 @@ impl MetadataCalculator { }) } + /// Sets a separate pool that will be used in case of snapshot recovery. It should have multiple connections + /// (e.g., 10) to speed up recovery. + pub fn with_recovery_pool(mut self, recovery_pool: ConnectionPool) -> Self { + self.recovery_pool = recovery_pool; + self + } + /// Returns a health check for this calculator. pub fn tree_health_check(&self) -> ReactiveHealthCheck { self.health_updater.subscribe() diff --git a/core/lib/zksync_core/src/metadata_calculator/pruning.rs b/core/lib/zksync_core/src/metadata_calculator/pruning.rs index cca0fa8eaec0..e07d89cac0a3 100644 --- a/core/lib/zksync_core/src/metadata_calculator/pruning.rs +++ b/core/lib/zksync_core/src/metadata_calculator/pruning.rs @@ -112,7 +112,7 @@ mod tests { .unwrap(); reset_db_state(&pool, 5).await; - let mut calculator = MetadataCalculator::new(config, None, pool.clone(), pool.clone()) + let mut calculator = MetadataCalculator::new(config, None, pool.clone()) .await .unwrap(); let reader = calculator.tree_reader(); @@ -161,7 +161,7 @@ mod tests { let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); let config = mock_config(temp_dir.path()); - let mut calculator = MetadataCalculator::new(config, None, pool.clone(), pool.clone()) + let mut calculator = MetadataCalculator::new(config, None, pool.clone()) .await .unwrap(); let reader = calculator.tree_reader(); diff --git a/core/lib/zksync_core/src/metadata_calculator/recovery/tests.rs b/core/lib/zksync_core/src/metadata_calculator/recovery/tests.rs index 5e34153db26c..ac7225409e39 100644 --- a/core/lib/zksync_core/src/metadata_calculator/recovery/tests.rs +++ b/core/lib/zksync_core/src/metadata_calculator/recovery/tests.rs @@ -252,10 +252,9 @@ async fn entire_recovery_workflow(case: RecoveryWorkflowCase) { &merkle_tree_config, &OperationsManagerConfig { delay_interval: 50 }, ); - let mut calculator = - MetadataCalculator::new(calculator_config, None, pool.clone(), pool.clone()) - .await - .unwrap(); + let mut calculator = MetadataCalculator::new(calculator_config, None, pool.clone()) + .await + .unwrap(); let (delay_sx, mut delay_rx) = mpsc::unbounded_channel(); calculator.delayer.delay_notifier = delay_sx; diff --git a/core/lib/zksync_core/src/metadata_calculator/tests.rs b/core/lib/zksync_core/src/metadata_calculator/tests.rs index fad46873433d..fd942bc5a351 100644 --- a/core/lib/zksync_core/src/metadata_calculator/tests.rs +++ b/core/lib/zksync_core/src/metadata_calculator/tests.rs @@ -424,7 +424,7 @@ async fn setup_calculator_with_options( let calculator_config = MetadataCalculatorConfig::for_main_node(merkle_tree_config, operation_config); - MetadataCalculator::new(calculator_config, object_store, pool.clone(), pool) + MetadataCalculator::new(calculator_config, object_store, pool) .await .unwrap() } diff --git a/core/node/node_framework/src/implementations/layers/metadata_calculator.rs b/core/node/node_framework/src/implementations/layers/metadata_calculator.rs index f78cede6ae99..7277dfa666a5 100644 --- a/core/node/node_framework/src/implementations/layers/metadata_calculator.rs +++ b/core/node/node_framework/src/implementations/layers/metadata_calculator.rs @@ -57,9 +57,9 @@ impl WiringLayer for MetadataCalculatorLayer { self.0, object_store.map(|store_resource| store_resource.0), main_pool, - recovery_pool, ) - .await?; + .await? + .with_recovery_pool(recovery_pool); let AppHealthCheckResource(app_health) = context.get_resource_or_default().await; app_health.insert_component(metadata_calculator.tree_health_check()); From 40df9b272d6a6377c17fff6760adfc4efaabfdf0 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Mon, 22 Apr 2024 20:19:18 +0300 Subject: [PATCH 16/17] Fix minor nits --- core/lib/merkle_tree/src/pruning.rs | 14 ++++++-------- .../zksync_core/src/metadata_calculator/pruning.rs | 3 ++- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/core/lib/merkle_tree/src/pruning.rs b/core/lib/merkle_tree/src/pruning.rs index 8b96bf50115e..456f6acf5318 100644 --- a/core/lib/merkle_tree/src/pruning.rs +++ b/core/lib/merkle_tree/src/pruning.rs @@ -51,8 +51,7 @@ impl MerkleTreePrunerHandle { /// stale keys are recorded in a separate column family. A pruner takes stale keys that were produced /// by a certain range of tree versions, and removes the corresponding nodes from the tree /// (in RocksDB, this uses simple pointwise `delete_cf()` operations). The range of versions -/// depends on pruning policies; for now, it's "remove versions older than `latest_version - N`", -/// where `N` is a configurable number set when the pruner [is created](Self::new()). +/// depends on pruning policies; for now, it's passed via the pruner handle. pub struct MerkleTreePruner { db: DB, target_pruned_key_count: usize, @@ -78,7 +77,8 @@ impl MerkleTreePruner { /// /// # Return value /// - /// Returns the created pruner and a handle to it. The pruner can be stopped by calling abort on it's handle + /// Returns the created pruner and a handle to it. The pruner can be stopped by calling abort on its handle + /// or dropping the handle. pub fn new(db: DB) -> (Self, MerkleTreePrunerHandle) { let (aborted_sender, aborted_receiver) = mpsc::channel(); let target_retained_version = Arc::new(AtomicU64::new(0)); @@ -113,7 +113,7 @@ impl MerkleTreePruner { self.poll_interval = poll_interval; } - /// Returns max version number that can be safely pruned, so that after pruning there is at least one version present after pruning. + /// Returns max version number that can be safely pruned, so that there is at least one version present after pruning. #[doc(hidden)] // Used in integration tests; logically private pub fn last_prunable_version(&self) -> Option { let manifest = self.db.manifest()?; @@ -190,7 +190,7 @@ impl MerkleTreePruner { } } - /// Runs this pruner indefinitely until it is aborted by calling abort() on its handle. + /// Runs this pruner indefinitely until it is aborted. pub fn run(mut self) { tracing::info!("Started Merkle tree pruner {self:?}"); @@ -287,9 +287,7 @@ mod tests { fn pruner_is_aborted_immediately_when_requested() { let (mut pruner, pruner_handle) = MerkleTreePruner::new(PatchSet::default()); pruner.set_poll_interval(Duration::from_secs(30)); - let join_handle = thread::spawn(|| { - pruner.run(); - }); + let join_handle = thread::spawn(|| pruner.run()); pruner_handle.abort(); let start = Instant::now(); diff --git a/core/lib/zksync_core/src/metadata_calculator/pruning.rs b/core/lib/zksync_core/src/metadata_calculator/pruning.rs index e07d89cac0a3..28ffa367c9aa 100644 --- a/core/lib/zksync_core/src/metadata_calculator/pruning.rs +++ b/core/lib/zksync_core/src/metadata_calculator/pruning.rs @@ -50,7 +50,8 @@ impl MerkleTreePruningTask { } tracing::info!("Obtained pruning handles; starting Merkle tree pruning"); - // FIXME: is this good enough (vs a managed task)? + // Pruner is not allocated a managed task because it is blocking; its cancellation awareness inherently + // depends on the pruner handle (i.e., this task). pruner.set_poll_interval(self.poll_interval); let pruner_task_handle = tokio::task::spawn_blocking(|| pruner.run()); From d075574bd147b9de2a1f72a9546dd53cb3b023f4 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Mon, 22 Apr 2024 20:40:50 +0300 Subject: [PATCH 17/17] Simplify pruner handle interface --- .../lib/merkle_tree/examples/loadtest/main.rs | 11 +++- core/lib/merkle_tree/src/pruning.rs | 56 ++++++++++--------- .../src/metadata_calculator/pruning.rs | 12 +++- 3 files changed, 48 insertions(+), 31 deletions(-) diff --git a/core/lib/merkle_tree/examples/loadtest/main.rs b/core/lib/merkle_tree/examples/loadtest/main.rs index 9c2041c6f2dc..0e51a0d956a7 100644 --- a/core/lib/merkle_tree/examples/loadtest/main.rs +++ b/core/lib/merkle_tree/examples/loadtest/main.rs @@ -162,7 +162,12 @@ impl Cli { }; if let Some((pruner_handle, _)) = &pruner_handles { - pruner_handle.set_target_retained_version(version); + if pruner_handle.set_target_retained_version(version).is_err() { + tracing::error!("Pruner unexpectedly stopped"); + let (_, pruner_thread) = pruner_handles.unwrap(); + pruner_thread.join().expect("Pruner panicked"); + return; // unreachable + } } let elapsed = start.elapsed(); @@ -177,8 +182,8 @@ impl Cli { tracing::info!("Verified tree consistency in {elapsed:?}"); if let Some((pruner_handle, pruner_thread)) = pruner_handles { - pruner_handle.abort(); - pruner_thread.join().unwrap(); + drop(pruner_handle); + pruner_thread.join().expect("Pruner panicked"); } } diff --git a/core/lib/merkle_tree/src/pruning.rs b/core/lib/merkle_tree/src/pruning.rs index 456f6acf5318..9e99098cbfd4 100644 --- a/core/lib/merkle_tree/src/pruning.rs +++ b/core/lib/merkle_tree/src/pruning.rs @@ -4,7 +4,7 @@ use std::{ fmt, sync::{ atomic::{AtomicU64, Ordering}, - mpsc, Arc, + mpsc, Arc, Weak, }, time::Duration, }; @@ -14,29 +14,41 @@ use crate::{ storage::{PruneDatabase, PrunePatchSet}, }; +/// Error returned by [`MerkleTreePrunerHandle::set_target_retained_version()`]. +#[derive(Debug)] +pub struct PrunerStoppedError(()); + +impl fmt::Display for PrunerStoppedError { + fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { + formatter.write_str("Merkle tree pruner stopped") + } +} + /// Handle for a [`MerkleTreePruner`] allowing to abort its operation. /// -/// The pruner is aborted once the abort method on handle is called. +/// The pruner is aborted once the handle is dropped. +#[must_use = "Pruner is aborted once handle is dropped"] #[derive(Debug)] pub struct MerkleTreePrunerHandle { - aborted_sender: mpsc::Sender<()>, - target_retained_version: Arc, + _aborted_sender: mpsc::Sender<()>, + target_retained_version: Weak, } impl MerkleTreePrunerHandle { - /// Aborts the pruner that this handle is attached to. If the pruner has already terminated - /// (e.g., due to a panic), this is a no-op. - pub fn abort(self) { - self.aborted_sender.send(()).ok(); - } - /// Sets the version of the tree the pruner should attempt to prune to. Calls should provide /// monotonically increasing versions; call with a lesser version will have no effect. /// /// Returns the previously set target retained version. - pub fn set_target_retained_version(&self, new_version: u64) -> u64 { - self.target_retained_version - .fetch_max(new_version, Ordering::Relaxed) + /// + /// # Errors + /// + /// If the pruner has stopped (e.g., due to a panic), this method will return an error. + pub fn set_target_retained_version(&self, new_version: u64) -> Result { + if let Some(version) = self.target_retained_version.upgrade() { + Ok(version.fetch_max(new_version, Ordering::Relaxed)) + } else { + Err(PrunerStoppedError(())) + } } } @@ -72,19 +84,17 @@ impl fmt::Debug for MerkleTreePruner { } impl MerkleTreePruner { - /// Creates a pruner with the specified database and the number of past tree versions to keep. - /// E.g., 0 means keeping only the latest version. + /// Creates a pruner with the specified database. /// /// # Return value /// - /// Returns the created pruner and a handle to it. The pruner can be stopped by calling abort on its handle - /// or dropping the handle. + /// Returns the created pruner and a handle to it. *The pruner will be aborted when its handle is dropped.* pub fn new(db: DB) -> (Self, MerkleTreePrunerHandle) { let (aborted_sender, aborted_receiver) = mpsc::channel(); let target_retained_version = Arc::new(AtomicU64::new(0)); let handle = MerkleTreePrunerHandle { - aborted_sender, - target_retained_version: target_retained_version.clone(), + _aborted_sender: aborted_sender, + target_retained_version: Arc::downgrade(&target_retained_version), }; let this = Self { db, @@ -178,11 +188,7 @@ impl MerkleTreePruner { fn wait_for_abort(&mut self, timeout: Duration) -> bool { match self.aborted_receiver.recv_timeout(timeout) { - Ok(()) => true, // Abort was requested - Err(mpsc::RecvTimeoutError::Disconnected) => { - tracing::warn!("Pruner handle is dropped without calling `abort()`; exiting"); - true - } + Ok(()) | Err(mpsc::RecvTimeoutError::Disconnected) => true, Err(mpsc::RecvTimeoutError::Timeout) => { // The pruner handle is alive and wasn't used to abort the pruner. false @@ -289,7 +295,7 @@ mod tests { pruner.set_poll_interval(Duration::from_secs(30)); let join_handle = thread::spawn(|| pruner.run()); - pruner_handle.abort(); + drop(pruner_handle); let start = Instant::now(); join_handle.join().unwrap(); assert!(start.elapsed() < Duration::from_secs(10)); diff --git a/core/lib/zksync_core/src/metadata_calculator/pruning.rs b/core/lib/zksync_core/src/metadata_calculator/pruning.rs index 28ffa367c9aa..9346a5be2c86 100644 --- a/core/lib/zksync_core/src/metadata_calculator/pruning.rs +++ b/core/lib/zksync_core/src/metadata_calculator/pruning.rs @@ -62,8 +62,14 @@ impl MerkleTreePruningTask { if let Some(l1_batch_number) = pruning_info.last_hard_pruned_l1_batch { let target_retained_version = u64::from(l1_batch_number.0) + 1; - let prev_target_version = - pruner_handle.set_target_retained_version(target_retained_version); + let Ok(prev_target_version) = + pruner_handle.set_target_retained_version(target_retained_version) + else { + tracing::error!("Merkle tree pruning thread unexpectedly stopped"); + return pruner_task_handle + .await + .context("Merkle tree pruning thread panicked"); + }; if prev_target_version != target_retained_version { tracing::info!("Set target retained tree version from {prev_target_version} to {target_retained_version}"); } @@ -78,7 +84,7 @@ impl MerkleTreePruningTask { } tracing::info!("Stop signal received, Merkle tree pruning is shutting down"); - pruner_handle.abort(); + drop(pruner_handle); pruner_task_handle .await .context("Merkle tree pruning thread panicked")