diff --git a/core/lib/dal/.sqlx/query-0429f2fa683bdff6fc1ff5069de69d57dbfda4be1f70232afffca82a895d43e0.json b/core/lib/dal/.sqlx/query-12c062c6a5078ebcbde378126a3773e86be9876cd198610e0792322e2a0797af.json similarity index 50% rename from core/lib/dal/.sqlx/query-0429f2fa683bdff6fc1ff5069de69d57dbfda4be1f70232afffca82a895d43e0.json rename to core/lib/dal/.sqlx/query-12c062c6a5078ebcbde378126a3773e86be9876cd198610e0792322e2a0797af.json index 5693bdf987e5..0027377ae596 100644 --- a/core/lib/dal/.sqlx/query-0429f2fa683bdff6fc1ff5069de69d57dbfda4be1f70232afffca82a895d43e0.json +++ b/core/lib/dal/.sqlx/query-12c062c6a5078ebcbde378126a3773e86be9876cd198610e0792322e2a0797af.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n WITH\n sl AS (\n SELECT\n (\n SELECT\n ARRAY[hashed_key, value] AS kv\n FROM\n storage_logs\n WHERE\n storage_logs.miniblock_number = $1\n AND storage_logs.hashed_key >= u.start_key\n AND storage_logs.hashed_key <= u.end_key\n ORDER BY\n storage_logs.hashed_key\n LIMIT\n 1\n )\n FROM\n UNNEST($2::bytea [], $3::bytea []) AS u (start_key, end_key)\n )\n \n SELECT\n sl.kv[1] AS \"hashed_key?\",\n sl.kv[2] AS \"value?\",\n initial_writes.index\n FROM\n sl\n LEFT OUTER JOIN initial_writes ON initial_writes.hashed_key = sl.kv[1]\n ", + "query": "\n WITH\n sl AS (\n SELECT\n (\n SELECT\n ARRAY[hashed_key, value] AS kv\n FROM\n storage_logs\n WHERE\n storage_logs.miniblock_number <= $1\n AND storage_logs.hashed_key >= u.start_key\n AND storage_logs.hashed_key <= u.end_key\n ORDER BY\n storage_logs.hashed_key\n LIMIT\n 1\n )\n FROM\n UNNEST($2::bytea [], $3::bytea []) AS u (start_key, end_key)\n )\n \n SELECT\n sl.kv[1] AS \"hashed_key?\",\n sl.kv[2] AS \"value?\",\n initial_writes.index\n FROM\n sl\n LEFT OUTER JOIN initial_writes ON initial_writes.hashed_key = sl.kv[1]\n ", "describe": { "columns": [ { @@ -32,5 +32,5 @@ true ] }, - "hash": "0429f2fa683bdff6fc1ff5069de69d57dbfda4be1f70232afffca82a895d43e0" + "hash": "12c062c6a5078ebcbde378126a3773e86be9876cd198610e0792322e2a0797af" } diff --git a/core/lib/dal/.sqlx/query-442212bb5f28f234cd624f2acc27944b2acedce201da4454aadb79f3545713ae.json b/core/lib/dal/.sqlx/query-2ae0541e9af1a9966585a25dfe772cb2ea9f2209fe2c12dda6c72c96bdb496d3.json similarity index 72% rename from core/lib/dal/.sqlx/query-442212bb5f28f234cd624f2acc27944b2acedce201da4454aadb79f3545713ae.json rename to core/lib/dal/.sqlx/query-2ae0541e9af1a9966585a25dfe772cb2ea9f2209fe2c12dda6c72c96bdb496d3.json index 621295d4ab81..b706a9df4373 100644 --- a/core/lib/dal/.sqlx/query-442212bb5f28f234cd624f2acc27944b2acedce201da4454aadb79f3545713ae.json +++ b/core/lib/dal/.sqlx/query-2ae0541e9af1a9966585a25dfe772cb2ea9f2209fe2c12dda6c72c96bdb496d3.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT\n storage_logs.hashed_key,\n storage_logs.value,\n initial_writes.index\n FROM\n storage_logs\n INNER JOIN initial_writes ON storage_logs.hashed_key = initial_writes.hashed_key\n WHERE\n storage_logs.miniblock_number = $1\n AND storage_logs.hashed_key >= $2::bytea\n AND storage_logs.hashed_key <= $3::bytea\n ORDER BY\n storage_logs.hashed_key\n ", + "query": "\n SELECT\n storage_logs.hashed_key,\n storage_logs.value,\n initial_writes.index\n FROM\n storage_logs\n INNER JOIN initial_writes ON storage_logs.hashed_key = initial_writes.hashed_key\n WHERE\n storage_logs.miniblock_number <= $1\n AND storage_logs.hashed_key >= $2::bytea\n AND storage_logs.hashed_key <= $3::bytea\n ORDER BY\n storage_logs.hashed_key\n ", "describe": { "columns": [ { @@ -32,5 +32,5 @@ false ] }, - "hash": "442212bb5f28f234cd624f2acc27944b2acedce201da4454aadb79f3545713ae" + "hash": "2ae0541e9af1a9966585a25dfe772cb2ea9f2209fe2c12dda6c72c96bdb496d3" } diff --git a/core/lib/dal/src/storage_logs_dal.rs b/core/lib/dal/src/storage_logs_dal.rs index adad6eb7e1db..ced8f594add3 100644 --- a/core/lib/dal/src/storage_logs_dal.rs +++ b/core/lib/dal/src/storage_logs_dal.rs @@ -727,7 +727,7 @@ impl StorageLogsDal<'_, '_> { FROM storage_logs WHERE - storage_logs.miniblock_number = $1 + storage_logs.miniblock_number <= $1 AND storage_logs.hashed_key >= u.start_key AND storage_logs.hashed_key <= u.end_key ORDER BY @@ -784,7 +784,7 @@ impl StorageLogsDal<'_, '_> { storage_logs INNER JOIN initial_writes ON storage_logs.hashed_key = initial_writes.hashed_key WHERE - storage_logs.miniblock_number = $1 + storage_logs.miniblock_number <= $1 AND storage_logs.hashed_key >= $2::bytea AND storage_logs.hashed_key <= $3::bytea ORDER BY diff --git a/core/node/metadata_calculator/src/pruning.rs b/core/node/metadata_calculator/src/pruning.rs index abbf9bf6865a..4ac05e55c302 100644 --- a/core/node/metadata_calculator/src/pruning.rs +++ b/core/node/metadata_calculator/src/pruning.rs @@ -304,6 +304,7 @@ mod tests { extend_db_state_from_l1_batch( &mut storage, snapshot_recovery.l1_batch_number + 1, + snapshot_recovery.l2_block_number + 1, new_logs, ) .await; diff --git a/core/node/metadata_calculator/src/recovery/mod.rs b/core/node/metadata_calculator/src/recovery/mod.rs index dcbc0a68af92..ce7207471791 100644 --- a/core/node/metadata_calculator/src/recovery/mod.rs +++ b/core/node/metadata_calculator/src/recovery/mod.rs @@ -32,16 +32,14 @@ use std::{ }; use anyhow::Context as _; +use async_trait::async_trait; use futures::future; use tokio::sync::{watch, Mutex, Semaphore}; use zksync_dal::{Connection, ConnectionPool, Core, CoreDal}; use zksync_health_check::HealthUpdater; use zksync_merkle_tree::TreeEntry; use zksync_shared_metrics::{SnapshotRecoveryStage, APP_METRICS}; -use zksync_types::{ - snapshots::{uniform_hashed_keys_chunk, SnapshotRecoveryStatus}, - L2BlockNumber, H256, -}; +use zksync_types::{snapshots::uniform_hashed_keys_chunk, L1BatchNumber, L2BlockNumber, H256}; use super::{ helpers::{AsyncTree, AsyncTreeRecovery, GenericAsyncTree, MerkleTreeHealth}, @@ -54,12 +52,13 @@ mod tests; /// Handler of recovery life cycle events. This functionality is encapsulated in a trait to be able /// to control recovery behavior in tests. +#[async_trait] trait HandleRecoveryEvent: fmt::Debug + Send + Sync { fn recovery_started(&mut self, _chunk_count: u64, _recovered_chunk_count: u64) { // Default implementation does nothing } - fn chunk_recovered(&self) { + async fn chunk_recovered(&self) { // Default implementation does nothing } } @@ -82,6 +81,7 @@ impl<'a> RecoveryHealthUpdater<'a> { } } +#[async_trait] impl HandleRecoveryEvent for RecoveryHealthUpdater<'_> { fn recovery_started(&mut self, chunk_count: u64, recovered_chunk_count: u64) { self.chunk_count = chunk_count; @@ -91,7 +91,7 @@ impl HandleRecoveryEvent for RecoveryHealthUpdater<'_> { .set(recovered_chunk_count); } - fn chunk_recovered(&self) { + async fn chunk_recovered(&self) { let recovered_chunk_count = self.recovered_chunk_count.fetch_add(1, Ordering::SeqCst) + 1; let chunks_left = self.chunk_count.saturating_sub(recovered_chunk_count); tracing::info!( @@ -110,34 +110,68 @@ impl HandleRecoveryEvent for RecoveryHealthUpdater<'_> { } #[derive(Debug, Clone, Copy)] -struct SnapshotParameters { +struct InitParameters { + l1_batch: L1BatchNumber, l2_block: L2BlockNumber, - expected_root_hash: H256, + expected_root_hash: Option, log_count: u64, desired_chunk_size: u64, } -impl SnapshotParameters { +impl InitParameters { async fn new( pool: &ConnectionPool, - recovery: &SnapshotRecoveryStatus, config: &MetadataCalculatorRecoveryConfig, - ) -> anyhow::Result { - let l2_block = recovery.l2_block_number; - let expected_root_hash = recovery.l1_batch_root_hash; - + ) -> anyhow::Result> { let mut storage = pool.connection_tagged("metadata_calculator").await?; + let recovery_status = storage + .snapshot_recovery_dal() + .get_applied_snapshot_status() + .await?; + let pruning_info = storage.pruning_dal().get_pruning_info().await?; + + let (l1_batch, l2_block); + let mut expected_root_hash = None; + match (recovery_status, pruning_info.last_hard_pruned_l2_block) { + (Some(recovery), None) => { + tracing::warn!( + "Snapshot recovery {recovery:?} is present on the node, but pruning info is empty; assuming no pruning happened" + ); + l1_batch = recovery.l1_batch_number; + l2_block = recovery.l2_block_number; + expected_root_hash = Some(recovery.l1_batch_root_hash); + } + (Some(recovery), Some(pruned_l2_block)) => { + // We have both recovery and some pruning on top of it. + l2_block = pruned_l2_block.max(recovery.l2_block_number); + l1_batch = pruning_info + .last_hard_pruned_l1_batch + .with_context(|| format!("malformed pruning info: {pruning_info:?}"))?; + if l1_batch == recovery.l1_batch_number { + expected_root_hash = Some(recovery.l1_batch_root_hash); + } + } + (None, Some(pruned_l2_block)) => { + l2_block = pruned_l2_block; + l1_batch = pruning_info + .last_hard_pruned_l1_batch + .with_context(|| format!("malformed pruning info: {pruning_info:?}"))?; + } + (None, None) => return Ok(None), + }; + let log_count = storage .storage_logs_dal() .get_storage_logs_row_count(l2_block) .await?; - Ok(Self { + Ok(Some(Self { + l1_batch, l2_block, expected_root_hash, log_count, desired_chunk_size: config.desired_chunk_size, - }) + })) } fn chunk_count(&self) -> u64 { @@ -168,29 +202,27 @@ impl GenericAsyncTree { stop_receiver: &watch::Receiver, ) -> anyhow::Result> { let started_at = Instant::now(); - let (tree, snapshot_recovery) = match self { + let (tree, init_params) = match self { Self::Ready(tree) => return Ok(Some(tree)), Self::Recovering(tree) => { - let snapshot_recovery = get_snapshot_recovery(main_pool).await?.context( + let params = InitParameters::new(main_pool, config).await?.context( "Merkle tree is recovering, but Postgres doesn't contain snapshot recovery information", )?; let recovered_version = tree.recovered_version(); anyhow::ensure!( - u64::from(snapshot_recovery.l1_batch_number.0) == recovered_version, - "Snapshot L1 batch in Postgres ({snapshot_recovery:?}) differs from the recovered Merkle tree version \ + u64::from(params.l1_batch.0) == recovered_version, + "Snapshot L1 batch in Postgres ({params:?}) differs from the recovered Merkle tree version \ ({recovered_version})" ); - tracing::info!("Resuming tree recovery with status: {snapshot_recovery:?}"); - (tree, snapshot_recovery) + tracing::info!("Resuming tree recovery with status: {params:?}"); + (tree, params) } Self::Empty { db, mode } => { - if let Some(snapshot_recovery) = get_snapshot_recovery(main_pool).await? { - tracing::info!( - "Starting Merkle tree recovery with status {snapshot_recovery:?}" - ); - let l1_batch = snapshot_recovery.l1_batch_number; + if let Some(params) = InitParameters::new(main_pool, config).await? { + tracing::info!("Starting Merkle tree recovery with status {params:?}"); + let l1_batch = params.l1_batch; let tree = AsyncTreeRecovery::new(db, l1_batch.0.into(), mode, config)?; - (tree, snapshot_recovery) + (tree, params) } else { // Start the tree from scratch. The genesis block will be filled in `TreeUpdater::loop_updating_tree()`. return Ok(Some(AsyncTree::new(db, mode)?)); @@ -198,17 +230,16 @@ impl GenericAsyncTree { } }; - let snapshot = SnapshotParameters::new(main_pool, &snapshot_recovery, config).await?; tracing::debug!( - "Obtained snapshot parameters: {snapshot:?} based on recovery configuration {config:?}" + "Obtained recovery init parameters: {init_params:?} based on recovery configuration {config:?}" ); let recovery_options = RecoveryOptions { - chunk_count: snapshot.chunk_count(), + chunk_count: init_params.chunk_count(), concurrency_limit: recovery_pool.max_size() as usize, events: Box::new(RecoveryHealthUpdater::new(health_updater)), }; let tree = tree - .recover(snapshot, recovery_options, &recovery_pool, stop_receiver) + .recover(init_params, recovery_options, &recovery_pool, stop_receiver) .await?; if tree.is_some() { // Only report latency if recovery wasn't canceled @@ -223,12 +254,12 @@ impl GenericAsyncTree { impl AsyncTreeRecovery { async fn recover( mut self, - snapshot: SnapshotParameters, + init_params: InitParameters, mut options: RecoveryOptions<'_>, pool: &ConnectionPool, stop_receiver: &watch::Receiver, ) -> anyhow::Result> { - self.ensure_desired_chunk_size(snapshot.desired_chunk_size) + self.ensure_desired_chunk_size(init_params.desired_chunk_size) .await?; let start_time = Instant::now(); @@ -237,13 +268,15 @@ impl AsyncTreeRecovery { .map(|chunk_id| uniform_hashed_keys_chunk(chunk_id, chunk_count)) .collect(); tracing::info!( - "Recovering Merkle tree from Postgres snapshot in {chunk_count} chunks with max concurrency {}", + "Recovering Merkle tree from Postgres snapshot in {chunk_count} chunks with max concurrency {}. \ + Be aware that enabling node pruning during recovery will probably result in a recovery error; always disable pruning \ + until recovery is complete", options.concurrency_limit ); let mut storage = pool.connection_tagged("metadata_calculator").await?; let remaining_chunks = self - .filter_chunks(&mut storage, snapshot.l2_block, &chunks) + .filter_chunks(&mut storage, init_params.l2_block, &chunks) .await?; drop(storage); options @@ -261,9 +294,10 @@ impl AsyncTreeRecovery { .acquire() .await .context("semaphore is never closed")?; - if Self::recover_key_chunk(&tree, snapshot.l2_block, chunk, pool, stop_receiver).await? + if Self::recover_key_chunk(&tree, init_params.l2_block, chunk, pool, stop_receiver) + .await? { - options.events.chunk_recovered(); + options.events.chunk_recovered().await; } anyhow::Ok(()) }); @@ -279,13 +313,18 @@ impl AsyncTreeRecovery { let finalize_latency = RECOVERY_METRICS.latency[&RecoveryStage::Finalize].start(); let actual_root_hash = tree.root_hash().await; - anyhow::ensure!( - actual_root_hash == snapshot.expected_root_hash, - "Root hash of recovered tree {actual_root_hash:?} differs from expected root hash {:?}. \ - If pruning is enabled and the tree is initialized some time after node recovery, \ - this is caused by snapshot storage logs getting pruned; this setup is currently not supported", - snapshot.expected_root_hash - ); + if let Some(expected_root_hash) = init_params.expected_root_hash { + anyhow::ensure!( + actual_root_hash == expected_root_hash, + "Root hash of recovered tree {actual_root_hash:?} differs from expected root hash {expected_root_hash:?}" + ); + } + + // Check pruning info one last time before finalizing the tree. + let mut storage = pool.connection_tagged("metadata_calculator").await?; + Self::check_pruning_info(&mut storage, init_params.l2_block).await?; + drop(storage); + let tree = tree.finalize().await?; finalize_latency.observe(); tracing::info!( @@ -340,6 +379,21 @@ impl AsyncTreeRecovery { Ok(output) } + async fn check_pruning_info( + storage: &mut Connection<'_, Core>, + snapshot_l2_block: L2BlockNumber, + ) -> anyhow::Result<()> { + let pruning_info = storage.pruning_dal().get_pruning_info().await?; + if let Some(last_hard_pruned_l2_block) = pruning_info.last_hard_pruned_l2_block { + anyhow::ensure!( + last_hard_pruned_l2_block == snapshot_l2_block, + "Additional data was pruned compared to tree recovery L2 block #{snapshot_l2_block}: {pruning_info:?}. \ + Continuing recovery is impossible; to recover the tree, drop its RocksDB directory, stop pruning and restart recovery" + ); + } + Ok(()) + } + /// Returns `Ok(true)` if the chunk was recovered, `Ok(false)` if the recovery process was interrupted. async fn recover_key_chunk( tree: &Mutex, @@ -363,7 +417,9 @@ impl AsyncTreeRecovery { .storage_logs_dal() .get_tree_entries_for_l2_block(snapshot_l2_block, key_chunk.clone()) .await?; + Self::check_pruning_info(&mut storage, snapshot_l2_block).await?; drop(storage); + let entries_latency = entries_latency.observe(); tracing::debug!( "Loaded {} entries for chunk {key_chunk:?} in {entries_latency:?}", @@ -414,13 +470,3 @@ impl AsyncTreeRecovery { Ok(true) } } - -async fn get_snapshot_recovery( - pool: &ConnectionPool, -) -> anyhow::Result> { - let mut storage = pool.connection_tagged("metadata_calculator").await?; - Ok(storage - .snapshot_recovery_dal() - .get_applied_snapshot_status() - .await?) -} diff --git a/core/node/metadata_calculator/src/recovery/tests.rs b/core/node/metadata_calculator/src/recovery/tests.rs index 3861e8a5a84e..1d83c2f06031 100644 --- a/core/node/metadata_calculator/src/recovery/tests.rs +++ b/core/node/metadata_calculator/src/recovery/tests.rs @@ -1,6 +1,6 @@ //! Tests for metadata calculator snapshot recovery. -use std::{path::Path, sync::Mutex}; +use std::{collections::HashMap, path::Path, sync::Mutex}; use assert_matches::assert_matches; use tempfile::TempDir; @@ -15,7 +15,7 @@ use zksync_health_check::{CheckHealth, HealthStatus, ReactiveHealthCheck}; use zksync_merkle_tree::{domain::ZkSyncTree, recovery::PersistenceThreadHandle, TreeInstruction}; use zksync_node_genesis::{insert_genesis_batch, GenesisParams}; use zksync_node_test_utils::prepare_recovery_snapshot; -use zksync_types::{L1BatchNumber, ProtocolVersionId, StorageLog}; +use zksync_types::{L1BatchNumber, U256}; use super::*; use crate::{ @@ -29,10 +29,11 @@ use crate::{ #[test] fn calculating_chunk_count() { - let mut snapshot = SnapshotParameters { + let mut snapshot = InitParameters { + l1_batch: L1BatchNumber(1), l2_block: L2BlockNumber(1), log_count: 160_000_000, - expected_root_hash: H256::zero(), + expected_root_hash: Some(H256::zero()), desired_chunk_size: 200_000, }; assert_eq!(snapshot.chunk_count(), 800); @@ -57,13 +58,15 @@ async fn create_tree_recovery( async fn basic_recovery_workflow() { let pool = ConnectionPool::::test_pool().await; let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); - let snapshot_recovery = prepare_recovery_snapshot_with_genesis(pool.clone(), &temp_dir).await; + let root_hash = prepare_storage_logs(pool.clone(), &temp_dir).await; + prune_storage(&pool, L1BatchNumber(1)).await; + let config = MetadataCalculatorRecoveryConfig::default(); - let snapshot = SnapshotParameters::new(&pool, &snapshot_recovery, &config) + let init_params = InitParameters::new(&pool, &config) .await - .unwrap(); - - assert!(snapshot.log_count > 200); + .unwrap() + .expect("no init params"); + assert!(init_params.log_count > 200, "{init_params:?}"); let (_stop_sender, stop_receiver) = watch::channel(false); for chunk_count in [1, 4, 9, 16, 60, 256] { @@ -78,54 +81,94 @@ async fn basic_recovery_workflow() { events: Box::new(RecoveryHealthUpdater::new(&health_updater)), }; let tree = tree - .recover(snapshot, recovery_options, &pool, &stop_receiver) + .recover(init_params, recovery_options, &pool, &stop_receiver) .await .unwrap() .expect("Tree recovery unexpectedly aborted"); - assert_eq!(tree.root_hash(), snapshot_recovery.l1_batch_root_hash); + assert_eq!(tree.root_hash(), root_hash); let health = health_check.check_health().await; assert_matches!(health.status(), HealthStatus::Affected); } } -async fn prepare_recovery_snapshot_with_genesis( - pool: ConnectionPool, - temp_dir: &TempDir, -) -> SnapshotRecoveryStatus { +async fn prepare_storage_logs(pool: ConnectionPool, temp_dir: &TempDir) -> H256 { let mut storage = pool.connection().await.unwrap(); insert_genesis_batch(&mut storage, &GenesisParams::mock()) .await .unwrap(); - let mut logs = gen_storage_logs(100..300, 1).pop().unwrap(); - - // Add all logs from the genesis L1 batch to `logs` so that they cover all state keys. - let genesis_logs = storage - .storage_logs_dal() - .get_touched_slots_for_executed_l1_batch(L1BatchNumber(0)) - .await - .unwrap(); - let genesis_logs = genesis_logs - .into_iter() - .map(|(key, value)| StorageLog::new_write_log(key, value)); - logs.extend(genesis_logs); + let logs = gen_storage_logs(100..300, 1).pop().unwrap(); extend_db_state(&mut storage, vec![logs]).await; drop(storage); // Ensure that metadata for L1 batch #1 is present in the DB. let (calculator, _) = setup_calculator(&temp_dir.path().join("init"), pool, true).await; - let l1_batch_root_hash = run_calculator(calculator).await; - - SnapshotRecoveryStatus { - l1_batch_number: L1BatchNumber(1), - l1_batch_timestamp: 1, - l1_batch_root_hash, - l2_block_number: L2BlockNumber(1), - l2_block_timestamp: 1, - l2_block_hash: H256::zero(), // not used - protocol_version: ProtocolVersionId::latest(), - storage_logs_chunks_processed: vec![], - } + run_calculator(calculator).await +} + +async fn prune_storage(pool: &ConnectionPool, pruned_l1_batch: L1BatchNumber) { + // Emulate pruning batches in the storage. + let mut storage = pool.connection().await.unwrap(); + let (_, pruned_l2_block) = storage + .blocks_dal() + .get_l2_block_range_of_l1_batch(pruned_l1_batch) + .await + .unwrap() + .expect("L1 batch not present in Postgres"); + storage + .pruning_dal() + .soft_prune_batches_range(pruned_l1_batch, pruned_l2_block) + .await + .unwrap(); + let pruning_stats = storage + .pruning_dal() + .hard_prune_batches_range(pruned_l1_batch, pruned_l2_block) + .await + .unwrap(); + assert!( + pruning_stats.deleted_l1_batches > 0 && pruning_stats.deleted_l2_blocks > 0, + "{pruning_stats:?}" + ); +} + +#[tokio::test] +async fn recovery_workflow_for_partial_pruning() { + let pool = ConnectionPool::::test_pool().await; + let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); + let recovery_root_hash = prepare_storage_logs(pool.clone(), &temp_dir).await; + + // Add more storage logs and prune initial logs. + let logs = gen_storage_logs(200..400, 5); + extend_db_state(&mut pool.connection().await.unwrap(), logs).await; + let (calculator, _) = setup_calculator(&temp_dir.path().join("init"), pool.clone(), true).await; + let final_root_hash = run_calculator(calculator).await; + prune_storage(&pool, L1BatchNumber(1)).await; + + let tree_path = temp_dir.path().join("recovery"); + let db = create_db(mock_config(&tree_path)).await.unwrap(); + let tree = GenericAsyncTree::Empty { + db, + mode: MerkleTreeMode::Lightweight, + }; + let (_stop_sender, stop_receiver) = watch::channel(false); + let tree = tree + .ensure_ready( + &MetadataCalculatorRecoveryConfig::default(), + &pool, + pool.clone(), + &ReactiveHealthCheck::new("tree").1, + &stop_receiver, + ) + .await + .unwrap() + .expect("Tree recovery unexpectedly aborted"); + + assert_eq!(tree.root_hash(), recovery_root_hash); + drop(tree); // Release exclusive lock on RocksDB + + // Check that tree operates as intended after recovery + let (calculator, _) = setup_calculator(&tree_path, pool, true).await; + assert_eq!(run_calculator(calculator).await, final_root_hash); } #[derive(Debug)] @@ -164,12 +207,13 @@ impl TestEventListener { } } +#[async_trait] impl HandleRecoveryEvent for TestEventListener { fn recovery_started(&mut self, _chunk_count: u64, recovered_chunk_count: u64) { assert_eq!(recovered_chunk_count, self.expected_recovered_chunks); } - fn chunk_recovered(&self) { + async fn chunk_recovered(&self) { let processed_chunk_count = self.processed_chunk_count.fetch_add(1, Ordering::SeqCst) + 1; if processed_chunk_count >= self.stop_threshold { self.stop_sender.send_replace(true); @@ -201,7 +245,8 @@ impl FaultToleranceCase { async fn recovery_fault_tolerance(chunk_count: u64, case: FaultToleranceCase) { let pool = ConnectionPool::::test_pool().await; let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); - let snapshot_recovery = prepare_recovery_snapshot_with_genesis(pool.clone(), &temp_dir).await; + let root_hash = prepare_storage_logs(pool.clone(), &temp_dir).await; + prune_storage(&pool, L1BatchNumber(1)).await; let tree_path = temp_dir.path().join("recovery"); let mut config = MetadataCalculatorRecoveryConfig::default(); @@ -217,18 +262,19 @@ async fn recovery_fault_tolerance(chunk_count: u64, case: FaultToleranceCase) { concurrency_limit: 1, events: Box::new(TestEventListener::new(1, stop_sender)), }; - let snapshot = SnapshotParameters::new(&pool, &snapshot_recovery, &config) + let init_params = InitParameters::new(&pool, &config) .await - .unwrap(); + .unwrap() + .expect("no init params"); assert!(tree - .recover(snapshot, recovery_options, &pool, &stop_receiver) + .recover(init_params, recovery_options, &pool, &stop_receiver) .await .unwrap() .is_none()); // Emulate a restart and recover 2 more chunks (or 1 + emulated persistence crash). let (mut tree, handle) = create_tree_recovery(&tree_path, L1BatchNumber(1), &config).await; - assert_ne!(tree.root_hash().await, snapshot_recovery.l1_batch_root_hash); + assert_ne!(tree.root_hash().await, root_hash); let (stop_sender, stop_receiver) = watch::channel(false); let mut event_listener = TestEventListener::new(2, stop_sender).expect_recovered_chunks(1); let expected_recovered_chunks = if matches!(case, FaultToleranceCase::ParallelWithCrash) { @@ -244,7 +290,7 @@ async fn recovery_fault_tolerance(chunk_count: u64, case: FaultToleranceCase) { events: Box::new(event_listener), }; let recovery_result = tree - .recover(snapshot, recovery_options, &pool, &stop_receiver) + .recover(init_params, recovery_options, &pool, &stop_receiver) .await; if matches!(case, FaultToleranceCase::ParallelWithCrash) { let err = format!("{:#}", recovery_result.unwrap_err()); @@ -255,7 +301,7 @@ async fn recovery_fault_tolerance(chunk_count: u64, case: FaultToleranceCase) { // Emulate another restart and recover remaining chunks. let (mut tree, _) = create_tree_recovery(&tree_path, L1BatchNumber(1), &config).await; - assert_ne!(tree.root_hash().await, snapshot_recovery.l1_batch_root_hash); + assert_ne!(tree.root_hash().await, root_hash); let (stop_sender, stop_receiver) = watch::channel(false); let recovery_options = RecoveryOptions { chunk_count, @@ -266,11 +312,11 @@ async fn recovery_fault_tolerance(chunk_count: u64, case: FaultToleranceCase) { ), }; let tree = tree - .recover(snapshot, recovery_options, &pool, &stop_receiver) + .recover(init_params, recovery_options, &pool, &stop_receiver) .await .unwrap() .expect("Tree recovery unexpectedly aborted"); - assert_eq!(tree.root_hash(), snapshot_recovery.l1_batch_root_hash); + assert_eq!(tree.root_hash(), root_hash); } #[derive(Debug)] @@ -345,6 +391,7 @@ async fn entire_recovery_workflow(case: RecoveryWorkflowCase) { extend_db_state_from_l1_batch( &mut storage, snapshot_recovery.l1_batch_number + 1, + snapshot_recovery.l2_block_number + 1, [new_logs.clone()], ) .await; @@ -376,3 +423,124 @@ async fn entire_recovery_workflow(case: RecoveryWorkflowCase) { stop_sender.send_replace(true); calculator_task.await.expect("calculator panicked").unwrap(); } + +/// `pruned_batches == 0` is a sanity check. +#[test_casing(4, [0, 1, 2, 4])] +#[tokio::test] +async fn recovery_with_further_pruning(pruned_batches: u32) { + const NEW_BATCH_COUNT: usize = 5; + + assert!( + (pruned_batches as usize) < NEW_BATCH_COUNT, + "at least 1 batch should remain in DB" + ); + + let pool = ConnectionPool::::test_pool().await; + let snapshot_logs = gen_storage_logs(100..300, 1).pop().unwrap(); + let mut storage = pool.connection().await.unwrap(); + let mut db_transaction = storage.start_transaction().await.unwrap(); + let snapshot_recovery = prepare_recovery_snapshot( + &mut db_transaction, + L1BatchNumber(23), + L2BlockNumber(42), + &snapshot_logs, + ) + .await; + + // Add some batches after recovery. + let logs = gen_storage_logs(200..400, NEW_BATCH_COUNT); + extend_db_state_from_l1_batch( + &mut db_transaction, + snapshot_recovery.l1_batch_number + 1, + snapshot_recovery.l2_block_number + 1, + logs, + ) + .await; + db_transaction.commit().await.unwrap(); + + let all_logs = storage + .storage_logs_dal() + .dump_all_storage_logs_for_tests() + .await; + assert_eq!(all_logs.len(), 400); + let initial_writes = storage + .storage_logs_dedup_dal() + .dump_all_initial_writes_for_tests() + .await; + let initial_writes: HashMap<_, _> = initial_writes + .into_iter() + .map(|write| (write.hashed_key, write.index)) + .collect(); + drop(storage); + + let instructions: Vec<_> = all_logs + .iter() + .map(|log| { + let leaf_index = initial_writes[&log.hashed_key]; + let key = U256::from_little_endian(log.hashed_key.as_bytes()); + TreeInstruction::write(key, leaf_index, log.value) + }) + .collect(); + let expected_root_hash = ZkSyncTree::process_genesis_batch(&instructions).root_hash; + + if pruned_batches > 0 { + prune_storage(&pool, snapshot_recovery.l1_batch_number + pruned_batches).await; + } + + // Create a new tree instance. It should recover and process the remaining batches. + let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); + let (calculator, _) = setup_calculator(temp_dir.path(), pool, true).await; + assert_eq!(run_calculator(calculator).await, expected_root_hash); +} + +#[derive(Debug)] +struct PruningEventListener { + pool: ConnectionPool, + pruned_l1_batch: L1BatchNumber, +} + +#[async_trait] +impl HandleRecoveryEvent for PruningEventListener { + async fn chunk_recovered(&self) { + prune_storage(&self.pool, self.pruned_l1_batch).await; + } +} + +#[tokio::test] +async fn pruning_during_recovery_is_detected() { + let pool = ConnectionPool::::test_pool().await; + let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); + + let mut storage = pool.connection().await.unwrap(); + insert_genesis_batch(&mut storage, &GenesisParams::mock()) + .await + .unwrap(); + let logs = gen_storage_logs(200..400, 5); + extend_db_state(&mut storage, logs).await; + drop(storage); + prune_storage(&pool, L1BatchNumber(1)).await; + + let tree_path = temp_dir.path().join("recovery"); + let config = MetadataCalculatorRecoveryConfig::default(); + let (tree, _) = create_tree_recovery(&tree_path, L1BatchNumber(1), &config).await; + let (_stop_sender, stop_receiver) = watch::channel(false); + let recovery_options = RecoveryOptions { + chunk_count: 5, + concurrency_limit: 1, + events: Box::new(PruningEventListener { + pool: pool.clone(), + pruned_l1_batch: L1BatchNumber(3), + }), + }; + let init_params = InitParameters::new(&pool, &config) + .await + .unwrap() + .expect("no init params"); + + let err = tree + .recover(init_params, recovery_options, &pool, &stop_receiver) + .await + .unwrap_err(); + let err = format!("{err:#}").to_lowercase(); + assert!(err.contains("continuing recovery is impossible"), "{err}"); +} diff --git a/core/node/metadata_calculator/src/tests.rs b/core/node/metadata_calculator/src/tests.rs index b878b0c4a533..1c003c4ecf78 100644 --- a/core/node/metadata_calculator/src/tests.rs +++ b/core/node/metadata_calculator/src/tests.rs @@ -696,7 +696,9 @@ async fn setup_calculator_with_options( object_store: Option>, ) -> MetadataCalculator { let mut storage = pool.connection().await.unwrap(); - if storage.blocks_dal().is_genesis_needed().await.unwrap() { + let pruning_info = storage.pruning_dal().get_pruning_info().await.unwrap(); + let has_pruning_logs = pruning_info.last_hard_pruned_l1_batch.is_some(); + if !has_pruning_logs && storage.blocks_dal().is_genesis_needed().await.unwrap() { insert_genesis_batch(&mut storage, &GenesisParams::mock()) .await .unwrap(); @@ -782,13 +784,26 @@ pub(super) async fn extend_db_state( .await .unwrap() .expect("no L1 batches in Postgres"); - extend_db_state_from_l1_batch(&mut storage, sealed_l1_batch + 1, new_logs).await; + let sealed_l2_block = storage + .blocks_dal() + .get_sealed_l2_block_number() + .await + .unwrap() + .expect("no L2 blocks in Postgres"); + extend_db_state_from_l1_batch( + &mut storage, + sealed_l1_batch + 1, + sealed_l2_block + 1, + new_logs, + ) + .await; storage.commit().await.unwrap(); } pub(super) async fn extend_db_state_from_l1_batch( storage: &mut Connection<'_, Core>, next_l1_batch: L1BatchNumber, + mut next_l2_block: L2BlockNumber, new_logs: impl IntoIterator>, ) { assert!(storage.in_transaction(), "must be called in DB transaction"); @@ -797,8 +812,7 @@ pub(super) async fn extend_db_state_from_l1_batch( let header = create_l1_batch(idx); let batch_number = header.number; // Assumes that L1 batch consists of only one L2 block. - let l2_block_header = create_l2_block(idx); - let l2_block_number = l2_block_header.number; + let l2_block_header = create_l2_block(next_l2_block.0); storage .blocks_dal() @@ -812,7 +826,7 @@ pub(super) async fn extend_db_state_from_l1_batch( .unwrap(); storage .storage_logs_dal() - .insert_storage_logs(l2_block_number, &batch_logs) + .insert_storage_logs(next_l2_block, &batch_logs) .await .unwrap(); storage @@ -831,6 +845,8 @@ pub(super) async fn extend_db_state_from_l1_batch( .await .unwrap(); insert_initial_writes_for_batch(storage, batch_number).await; + + next_l2_block += 1; } } diff --git a/docs/guides/external-node/08_pruning.md b/docs/guides/external-node/08_pruning.md index 7f7dfc34d4a9..06bd9f8d8a9d 100644 --- a/docs/guides/external-node/08_pruning.md +++ b/docs/guides/external-node/08_pruning.md @@ -53,6 +53,12 @@ be pruned after it has been executed on Ethereum. Pruning can be disabled or enabled and the data retention period can be freely changed during the node lifetime. +> [!WARNING] +> +> Pruning should be disabled when recovering the Merkle tree (e.g., if a node ran in +> [the treeless mode](09_treeless_mode.md) before, or if its tree needs a reset for whatever reason). Otherwise, tree +> recovery will with almost definitely result in an error, or worse, in a corrupted tree. + ## Storage requirements for pruned nodes The storage requirements depend on how long you configure to retain the data, but are roughly: diff --git a/docs/guides/external-node/09_treeless_mode.md b/docs/guides/external-node/09_treeless_mode.md index 59e6f6412d31..ceeea6f86c67 100644 --- a/docs/guides/external-node/09_treeless_mode.md +++ b/docs/guides/external-node/09_treeless_mode.md @@ -59,12 +59,6 @@ or not running it when initializing a node. > (order of 2–3 hours for the mainnet) because the node no longer needs to recover the Merkle tree before starting > catching up. -> [!WARNING] -> -> In contrast to the tree fetcher, the Merkle tree cannot be safely switched on after a significant delay if pruning is -> enabled (some data necessary for tree update may have been pruned while the tree was off). We plan to fix this flaw in -> the future. If pruning is disabled, the Merkle tree _can_ be freely switched on / off. - ## Monitoring tree fetcher Tree fetcher information is logged with the `zksync_node_sync::tree_data_fetcher` target.