diff --git a/Cargo.lock b/Cargo.lock index 80581acb75f6..b81a1e2d241b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6635,7 +6635,6 @@ dependencies = [ "zksync_env_config", "zksync_object_store", "zksync_types", - "zksync_utils", ] [[package]] @@ -9327,10 +9326,12 @@ name = "zksync_state" version = "0.1.0" dependencies = [ "anyhow", + "assert_matches", "itertools 0.10.5", "mini-moka", "rand 0.8.5", "tempfile", + "test-casing", "tokio", "tracing", "vise", diff --git a/core/bin/external_node/src/main.rs b/core/bin/external_node/src/main.rs index 07ee4140602d..8b65bbc5cf04 100644 --- a/core/bin/external_node/src/main.rs +++ b/core/bin/external_node/src/main.rs @@ -196,7 +196,9 @@ async fn init_tasks( memtable_capacity: config.optional.merkle_tree_memtable_capacity(), stalled_writes_timeout: config.optional.merkle_tree_stalled_writes_timeout(), }; - let metadata_calculator = MetadataCalculator::new(metadata_calculator_config, None).await; + let metadata_calculator = MetadataCalculator::new(metadata_calculator_config, None) + .await + .context("failed initializing metadata calculator")?; healthchecks.push(Box::new(metadata_calculator.tree_health_check())); let consistency_checker = ConsistencyChecker::new( diff --git a/core/bin/merkle_tree_consistency_checker/src/main.rs b/core/bin/merkle_tree_consistency_checker/src/main.rs index 60a4feb750e9..6d64729f9b6f 100644 --- a/core/bin/merkle_tree_consistency_checker/src/main.rs +++ b/core/bin/merkle_tree_consistency_checker/src/main.rs @@ -27,7 +27,7 @@ impl Cli { let db_path = &config.merkle_tree.path; tracing::info!("Verifying consistency of Merkle tree at {db_path}"); let start = Instant::now(); - let db = RocksDB::new(Path::new(db_path)); + let db = RocksDB::new(Path::new(db_path)).unwrap(); let tree = ZkSyncTree::new_lightweight(db.into()); let l1_batch_number = if let Some(number) = self.l1_batch { diff --git a/core/bin/snapshots_creator/Cargo.toml b/core/bin/snapshots_creator/Cargo.toml index fe18233e7d93..f1882dd2bb7e 100644 --- a/core/bin/snapshots_creator/Cargo.toml +++ b/core/bin/snapshots_creator/Cargo.toml @@ -16,7 +16,6 @@ prometheus_exporter = { path = "../../lib/prometheus_exporter" } zksync_config = { path = "../../lib/config" } zksync_dal = { path = "../../lib/dal" } zksync_env_config = { path = "../../lib/env_config" } -zksync_utils = { path = "../../lib/utils" } zksync_types = { path = "../../lib/types" } zksync_object_store = { path = "../../lib/object_store" } vlog = { path = "../../lib/vlog" } diff --git a/core/bin/snapshots_creator/src/chunking.rs b/core/bin/snapshots_creator/src/chunking.rs deleted file mode 100644 index 047a6a23d24e..000000000000 --- a/core/bin/snapshots_creator/src/chunking.rs +++ /dev/null @@ -1,69 +0,0 @@ -use std::ops; - -use zksync_types::{H256, U256}; -use zksync_utils::u256_to_h256; - -pub(crate) fn get_chunk_hashed_keys_range( - chunk_id: u64, - chunk_count: u64, -) -> ops::RangeInclusive { - assert!(chunk_count > 0); - let mut stride = U256::MAX / chunk_count; - let stride_minus_one = if stride < U256::MAX { - stride += U256::one(); - stride - 1 - } else { - stride // `stride` is really 1 << 256 == U256::MAX + 1 - }; - - let start = stride * chunk_id; - let (mut end, is_overflow) = stride_minus_one.overflowing_add(start); - if is_overflow { - end = U256::MAX; - } - u256_to_h256(start)..=u256_to_h256(end) -} - -#[cfg(test)] -mod tests { - use zksync_utils::h256_to_u256; - - use super::*; - - #[test] - fn chunking_is_correct() { - for chunks_count in (2..10).chain([42, 256, 500, 1_001, 12_345]) { - println!("Testing chunks_count={chunks_count}"); - let chunked_ranges: Vec<_> = (0..chunks_count) - .map(|chunk_id| get_chunk_hashed_keys_range(chunk_id, chunks_count)) - .collect(); - - assert_eq!(*chunked_ranges[0].start(), H256::zero()); - assert_eq!( - *chunked_ranges.last().unwrap().end(), - H256::repeat_byte(0xff) - ); - for window in chunked_ranges.windows(2) { - let [prev_chunk, next_chunk] = window else { - unreachable!(); - }; - assert_eq!( - h256_to_u256(*prev_chunk.end()) + 1, - h256_to_u256(*next_chunk.start()) - ); - } - - let chunk_sizes: Vec<_> = chunked_ranges - .iter() - .map(|chunk| h256_to_u256(*chunk.end()) - h256_to_u256(*chunk.start()) + 1) - .collect(); - - // Check that chunk sizes are roughly equal. Due to how chunks are constructed, the sizes - // of all chunks except for the last one are the same, and the last chunk size may be slightly smaller; - // the difference in sizes is lesser than the number of chunks. - let min_chunk_size = chunk_sizes.iter().copied().min().unwrap(); - let max_chunk_size = chunk_sizes.iter().copied().max().unwrap(); - assert!(max_chunk_size - min_chunk_size < U256::from(chunks_count)); - } - } -} diff --git a/core/bin/snapshots_creator/src/creator.rs b/core/bin/snapshots_creator/src/creator.rs index 51a14ce2ccae..2d2ce2335b90 100644 --- a/core/bin/snapshots_creator/src/creator.rs +++ b/core/bin/snapshots_creator/src/creator.rs @@ -9,19 +9,15 @@ use zksync_dal::{ConnectionPool, StorageProcessor}; use zksync_object_store::ObjectStore; use zksync_types::{ snapshots::{ - SnapshotFactoryDependencies, SnapshotMetadata, SnapshotStorageLogsChunk, - SnapshotStorageLogsStorageKey, + uniform_hashed_keys_chunk, SnapshotFactoryDependencies, SnapshotFactoryDependency, + SnapshotMetadata, SnapshotStorageLogsChunk, SnapshotStorageLogsStorageKey, }, L1BatchNumber, MiniblockNumber, }; -use zksync_utils::ceil_div; +use crate::metrics::{FactoryDepsStage, StorageChunkStage, METRICS}; #[cfg(test)] use crate::tests::HandleEvent; -use crate::{ - chunking::get_chunk_hashed_keys_range, - metrics::{FactoryDepsStage, StorageChunkStage, METRICS}, -}; /// Encapsulates progress of creating a particular storage snapshot. #[derive(Debug)] @@ -91,7 +87,7 @@ impl SnapshotCreator { return Ok(()); } - let hashed_keys_range = get_chunk_hashed_keys_range(chunk_id, chunk_count); + let hashed_keys_range = uniform_hashed_keys_chunk(chunk_id, chunk_count); let mut conn = self.connect_to_replica().await?; let latency = @@ -166,6 +162,12 @@ impl SnapshotCreator { tracing::info!("Saving factory deps to GCS..."); let latency = METRICS.factory_deps_processing_duration[&FactoryDepsStage::SaveToGcs].start(); + let factory_deps = factory_deps + .into_iter() + .map(|(_, bytecode)| SnapshotFactoryDependency { + bytecode: bytecode.into(), + }) + .collect(); let factory_deps = SnapshotFactoryDependencies { factory_deps }; let filename = self .blob_store @@ -216,8 +218,9 @@ impl SnapshotCreator { .await?; let chunk_size = config.storage_logs_chunk_size; // We force the minimum number of chunks to avoid situations where only one chunk is created in tests. - let chunk_count = - ceil_div(distinct_storage_logs_keys_count, chunk_size).max(min_chunk_count); + let chunk_count = distinct_storage_logs_keys_count + .div_ceil(chunk_size) + .max(min_chunk_count); tracing::info!( "Selected storage logs chunking for L1 batch {l1_batch_number}: \ diff --git a/core/bin/snapshots_creator/src/main.rs b/core/bin/snapshots_creator/src/main.rs index 0571500615bb..c9a52fe0d746 100644 --- a/core/bin/snapshots_creator/src/main.rs +++ b/core/bin/snapshots_creator/src/main.rs @@ -19,7 +19,6 @@ use zksync_object_store::ObjectStoreFactory; use crate::creator::SnapshotCreator; -mod chunking; mod creator; mod metrics; #[cfg(test)] diff --git a/core/bin/snapshots_creator/src/tests.rs b/core/bin/snapshots_creator/src/tests.rs index d061b0906705..987976fc9168 100644 --- a/core/bin/snapshots_creator/src/tests.rs +++ b/core/bin/snapshots_creator/src/tests.rs @@ -223,7 +223,8 @@ async fn prepare_postgres( let expected_l1_batches_and_indices = conn .storage_logs_dal() .get_l1_batches_and_indices_for_initial_writes(&hashed_keys) - .await; + .await + .unwrap(); let logs = logs.into_iter().map(|log| { let (l1_batch_number_of_initial_write, enumeration_index) = diff --git a/core/bin/storage_logs_dedup_migration/src/main.rs b/core/bin/storage_logs_dedup_migration/src/main.rs index 179685c40022..fcaebafc0fe1 100644 --- a/core/bin/storage_logs_dedup_migration/src/main.rs +++ b/core/bin/storage_logs_dedup_migration/src/main.rs @@ -108,7 +108,8 @@ async fn main() { let values_for_missing_keys: HashMap<_, _> = connection .storage_logs_dal() .get_storage_values(&missing_keys, miniblock_number - 1) - .await; + .await + .expect("failed getting storage values for missing keys"); in_memory_prev_values_iter .chain( diff --git a/core/lib/dal/.sqlx/query-2028ba507f3ccd474f0261e571eb19a3a7feec950cb3e503588cf55d954a493a.json b/core/lib/dal/.sqlx/query-2028ba507f3ccd474f0261e571eb19a3a7feec950cb3e503588cf55d954a493a.json deleted file mode 100644 index 8aaefe3c6ba6..000000000000 --- a/core/lib/dal/.sqlx/query-2028ba507f3ccd474f0261e571eb19a3a7feec950cb3e503588cf55d954a493a.json +++ /dev/null @@ -1,22 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n SELECT\n bytecode\n FROM\n factory_deps\n WHERE\n miniblock_number <= $1\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "bytecode", - "type_info": "Bytea" - } - ], - "parameters": { - "Left": [ - "Int8" - ] - }, - "nullable": [ - false - ] - }, - "hash": "2028ba507f3ccd474f0261e571eb19a3a7feec950cb3e503588cf55d954a493a" -} diff --git a/core/lib/dal/.sqlx/query-9f637f37dc3a29ce7412ab4347071bd180729779a0e98ae7a6bb4386aca99716.json b/core/lib/dal/.sqlx/query-9f637f37dc3a29ce7412ab4347071bd180729779a0e98ae7a6bb4386aca99716.json new file mode 100644 index 000000000000..fc18e9f6cb55 --- /dev/null +++ b/core/lib/dal/.sqlx/query-9f637f37dc3a29ce7412ab4347071bd180729779a0e98ae7a6bb4386aca99716.json @@ -0,0 +1,28 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n bytecode_hash,\n bytecode\n FROM\n factory_deps\n WHERE\n miniblock_number <= $1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "bytecode_hash", + "type_info": "Bytea" + }, + { + "ordinal": 1, + "name": "bytecode", + "type_info": "Bytea" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + false, + false + ] + }, + "hash": "9f637f37dc3a29ce7412ab4347071bd180729779a0e98ae7a6bb4386aca99716" +} diff --git a/core/lib/dal/src/models/storage_log.rs b/core/lib/dal/src/models/storage_log.rs index adca6742d095..87ab7b3d1c03 100644 --- a/core/lib/dal/src/models/storage_log.rs +++ b/core/lib/dal/src/models/storage_log.rs @@ -30,8 +30,15 @@ impl From for StorageLog { // We don't want to rely on the Merkle tree crate to import a single type, so we duplicate `TreeEntry` here. #[derive(Debug, Clone, Copy)] -pub struct StorageTreeEntry { - pub key: U256, +pub struct StorageRecoveryLogEntry { + pub key: H256, pub value: H256, pub leaf_index: u64, } + +impl StorageRecoveryLogEntry { + /// Converts `key` to the format used by the Merkle tree (little-endian [`U256`]). + pub fn tree_key(&self) -> U256 { + U256::from_little_endian(&self.key.0) + } +} diff --git a/core/lib/dal/src/snapshots_creator_dal.rs b/core/lib/dal/src/snapshots_creator_dal.rs index 9267470878e1..d09363592d18 100644 --- a/core/lib/dal/src/snapshots_creator_dal.rs +++ b/core/lib/dal/src/snapshots_creator_dal.rs @@ -1,6 +1,6 @@ use zksync_types::{ - snapshots::{SnapshotFactoryDependency, SnapshotStorageLog}, - AccountTreeId, Address, L1BatchNumber, MiniblockNumber, StorageKey, H256, + snapshots::SnapshotStorageLog, AccountTreeId, Address, L1BatchNumber, MiniblockNumber, + StorageKey, H256, }; use crate::{instrument::InstrumentExt, StorageProcessor}; @@ -99,13 +99,15 @@ impl SnapshotsCreatorDal<'_, '_> { Ok(storage_logs) } + /// Returns all factory dependencies up to and including the specified `miniblock_number`. pub async fn get_all_factory_deps( &mut self, miniblock_number: MiniblockNumber, - ) -> sqlx::Result> { + ) -> sqlx::Result)>> { let rows = sqlx::query!( r#" SELECT + bytecode_hash, bytecode FROM factory_deps @@ -121,9 +123,7 @@ impl SnapshotsCreatorDal<'_, '_> { Ok(rows .into_iter() - .map(|row| SnapshotFactoryDependency { - bytecode: row.bytecode.into(), - }) + .map(|row| (H256::from_slice(&row.bytecode_hash), row.bytecode)) .collect()) } } diff --git a/core/lib/dal/src/storage_dal.rs b/core/lib/dal/src/storage_dal.rs index 106a7b2a5d4e..5ad1fd8eeb15 100644 --- a/core/lib/dal/src/storage_dal.rs +++ b/core/lib/dal/src/storage_dal.rs @@ -134,8 +134,8 @@ impl StorageDal<'_, '_> { pub async fn get_factory_deps_for_revert( &mut self, block_number: MiniblockNumber, - ) -> Vec { - sqlx::query!( + ) -> sqlx::Result> { + Ok(sqlx::query!( r#" SELECT bytecode_hash @@ -147,11 +147,10 @@ impl StorageDal<'_, '_> { block_number.0 as i64 ) .fetch_all(self.storage.conn()) - .await - .unwrap() + .await? .into_iter() .map(|row| H256::from_slice(&row.bytecode_hash)) - .collect() + .collect()) } /// Applies the specified storage logs for a miniblock. Returns the map of unique storage updates. diff --git a/core/lib/dal/src/storage_logs_dal.rs b/core/lib/dal/src/storage_logs_dal.rs index ad756dd08197..27707c50d3a4 100644 --- a/core/lib/dal/src/storage_logs_dal.rs +++ b/core/lib/dal/src/storage_logs_dal.rs @@ -3,10 +3,11 @@ use std::{collections::HashMap, ops, time::Instant}; use sqlx::{types::chrono::Utc, Row}; use zksync_types::{ get_code_key, AccountTreeId, Address, L1BatchNumber, MiniblockNumber, StorageKey, StorageLog, - FAILED_CONTRACT_DEPLOYMENT_BYTECODE_HASH, H256, U256, + FAILED_CONTRACT_DEPLOYMENT_BYTECODE_HASH, H256, }; -use crate::{instrument::InstrumentExt, models::storage_log::StorageTreeEntry, StorageProcessor}; +pub use crate::models::storage_log::StorageRecoveryLogEntry; +use crate::{instrument::InstrumentExt, StorageProcessor}; #[derive(Debug)] pub struct StorageLogsDal<'a, 'c> { @@ -95,11 +96,14 @@ impl StorageLogsDal<'_, '_> { } /// Rolls back storage to the specified point in time. - pub async fn rollback_storage(&mut self, last_miniblock_to_keep: MiniblockNumber) { + pub async fn rollback_storage( + &mut self, + last_miniblock_to_keep: MiniblockNumber, + ) -> sqlx::Result<()> { let stage_start = Instant::now(); let modified_keys = self .modified_keys_since_miniblock(last_miniblock_to_keep) - .await; + .await?; tracing::info!( "Loaded {} keys changed after miniblock #{last_miniblock_to_keep} in {:?}", modified_keys.len(), @@ -109,7 +113,7 @@ impl StorageLogsDal<'_, '_> { let stage_start = Instant::now(); let prev_values = self .get_storage_values(&modified_keys, last_miniblock_to_keep) - .await; + .await?; tracing::info!( "Loaded previous storage values for modified keys in {:?}", stage_start.elapsed() @@ -144,8 +148,8 @@ impl StorageLogsDal<'_, '_> { &keys_to_delete as &[&[u8]], ) .execute(self.storage.conn()) - .await - .unwrap(); + .await?; + tracing::info!( "Removed {} keys in {:?}", keys_to_delete.len(), @@ -167,21 +171,22 @@ impl StorageLogsDal<'_, '_> { &values_to_update as &[&[u8]], ) .execute(self.storage.conn()) - .await - .unwrap(); + .await?; + tracing::info!( "Updated {} keys to previous values in {:?}", keys_to_update.len(), stage_start.elapsed() ); + Ok(()) } /// Returns all storage keys that were modified after the specified miniblock. async fn modified_keys_since_miniblock( &mut self, miniblock_number: MiniblockNumber, - ) -> Vec { - sqlx::query!( + ) -> sqlx::Result> { + Ok(sqlx::query!( r#" SELECT DISTINCT ON (hashed_key) hashed_key @@ -198,11 +203,10 @@ impl StorageLogsDal<'_, '_> { miniblock_number.0 as i64 ) .fetch_all(self.storage.conn()) - .await - .unwrap() + .await? .into_iter() .map(|row| H256::from_slice(&row.hashed_key)) - .collect() + .collect()) } /// Removes all storage logs with a miniblock number strictly greater than the specified `block_number`. @@ -258,7 +262,7 @@ impl StorageLogsDal<'_, '_> { pub async fn get_touched_slots_for_l1_batch( &mut self, l1_batch_number: L1BatchNumber, - ) -> HashMap { + ) -> sqlx::Result> { let rows = sqlx::query!( r#" SELECT @@ -290,8 +294,7 @@ impl StorageLogsDal<'_, '_> { l1_batch_number.0 as i64 ) .fetch_all(self.storage.conn()) - .await - .unwrap(); + .await?; let touched_slots = rows.into_iter().map(|row| { let key = StorageKey::new( @@ -300,7 +303,7 @@ impl StorageLogsDal<'_, '_> { ); (key, H256::from_slice(&row.value)) }); - touched_slots.collect() + Ok(touched_slots.collect()) } /// Returns (hashed) storage keys and the corresponding values that need to be applied to a storage @@ -308,19 +311,18 @@ impl StorageLogsDal<'_, '_> { pub async fn get_storage_logs_for_revert( &mut self, l1_batch_number: L1BatchNumber, - ) -> HashMap> { + ) -> sqlx::Result>> { let miniblock_range = self .storage .blocks_dal() .get_miniblock_range_of_l1_batch(l1_batch_number) - .await - .unwrap(); + .await?; let Some((_, last_miniblock)) = miniblock_range else { - return HashMap::new(); + return Ok(HashMap::new()); }; let stage_start = Instant::now(); - let mut modified_keys = self.modified_keys_since_miniblock(last_miniblock).await; + let mut modified_keys = self.modified_keys_since_miniblock(last_miniblock).await?; let modified_keys_count = modified_keys.len(); tracing::info!( "Fetched {modified_keys_count} keys changed after miniblock #{last_miniblock} in {:?}", @@ -334,7 +336,7 @@ impl StorageLogsDal<'_, '_> { let stage_start = Instant::now(); let l1_batch_and_index_by_key = self .get_l1_batches_and_indices_for_initial_writes(&modified_keys) - .await; + .await?; tracing::info!( "Loaded initial write info for modified keys in {:?}", stage_start.elapsed() @@ -372,7 +374,7 @@ impl StorageLogsDal<'_, '_> { let stage_start = Instant::now(); let prev_values_for_updated_keys = self .get_storage_values(&modified_keys, last_miniblock) - .await + .await? .into_iter() .map(|(key, value)| { let value = value.unwrap(); // We already filtered out keys that weren't touched. @@ -385,15 +387,15 @@ impl StorageLogsDal<'_, '_> { stage_start.elapsed() ); output.extend(prev_values_for_updated_keys); - output + Ok(output) } pub async fn get_l1_batches_and_indices_for_initial_writes( &mut self, hashed_keys: &[H256], - ) -> HashMap { + ) -> sqlx::Result> { if hashed_keys.is_empty() { - return HashMap::new(); // Shortcut to save time on communication with DB in the common case + return Ok(HashMap::new()); // Shortcut to save time on communication with DB in the common case } let hashed_keys: Vec<_> = hashed_keys.iter().map(H256::as_bytes).collect(); @@ -413,17 +415,17 @@ impl StorageLogsDal<'_, '_> { .instrument("get_l1_batches_and_indices_for_initial_writes") .report_latency() .fetch_all(self.storage.conn()) - .await - .unwrap(); + .await?; - rows.into_iter() + Ok(rows + .into_iter() .map(|row| { ( H256::from_slice(&row.hashed_key), (L1BatchNumber(row.l1_batch_number as u32), row.index as u64), ) }) - .collect() + .collect()) } /// Gets previous values for the specified storage keys before the specified L1 batch number. @@ -440,17 +442,16 @@ impl StorageLogsDal<'_, '_> { &mut self, hashed_keys: &[H256], next_l1_batch: L1BatchNumber, - ) -> HashMap> { + ) -> sqlx::Result>> { let (miniblock_number, _) = self .storage .blocks_dal() .get_miniblock_range_of_l1_batch(next_l1_batch) - .await - .unwrap() + .await? .unwrap(); if miniblock_number == MiniblockNumber(0) { - hashed_keys.iter().copied().map(|key| (key, None)).collect() + Ok(hashed_keys.iter().copied().map(|key| (key, None)).collect()) } else { self.get_storage_values(hashed_keys, miniblock_number - 1) .await @@ -462,7 +463,7 @@ impl StorageLogsDal<'_, '_> { &mut self, hashed_keys: &[H256], miniblock_number: MiniblockNumber, - ) -> HashMap> { + ) -> sqlx::Result>> { let hashed_keys: Vec<_> = hashed_keys.iter().map(H256::as_bytes).collect(); let rows = sqlx::query!( @@ -490,16 +491,16 @@ impl StorageLogsDal<'_, '_> { miniblock_number.0 as i64 ) .fetch_all(self.storage.conn()) - .await - .unwrap(); + .await?; - rows.into_iter() + Ok(rows + .into_iter() .map(|row| { let key = H256::from_slice(&row.hashed_key); let value = row.value.map(|value| H256::from_slice(&value)); (key, value) }) - .collect() + .collect()) } pub async fn get_miniblock_storage_logs( @@ -531,7 +532,7 @@ impl StorageLogsDal<'_, '_> { &mut self, miniblock_number: MiniblockNumber, key_ranges: &[ops::RangeInclusive], - ) -> sqlx::Result>> { + ) -> sqlx::Result>> { let (start_keys, end_keys): (Vec<_>, Vec<_>) = key_ranges .iter() .map(|range| (range.start().as_bytes(), range.end().as_bytes())) @@ -574,8 +575,8 @@ impl StorageLogsDal<'_, '_> { .await?; let rows = rows.into_iter().map(|row| { - Some(StorageTreeEntry { - key: U256::from_little_endian(row.hashed_key.as_ref()?), + Some(StorageRecoveryLogEntry { + key: H256::from_slice(row.hashed_key.as_ref()?), value: H256::from_slice(row.value.as_ref()?), leaf_index: row.index? as u64, }) @@ -589,7 +590,7 @@ impl StorageLogsDal<'_, '_> { &mut self, miniblock_number: MiniblockNumber, key_range: ops::RangeInclusive, - ) -> sqlx::Result> { + ) -> sqlx::Result> { let rows = sqlx::query!( r#" SELECT @@ -613,8 +614,8 @@ impl StorageLogsDal<'_, '_> { .fetch_all(self.storage.conn()) .await?; - let rows = rows.into_iter().map(|row| StorageTreeEntry { - key: U256::from_little_endian(&row.hashed_key), + let rows = rows.into_iter().map(|row| StorageRecoveryLogEntry { + key: H256::from_slice(&row.hashed_key), value: H256::from_slice(&row.value), leaf_index: row.index as u64, }); @@ -717,12 +718,6 @@ mod tests { use super::*; use crate::{tests::create_miniblock_header, ConnectionPool}; - fn u256_to_h256_reversed(value: U256) -> H256 { - let mut bytes = [0_u8; 32]; - value.to_little_endian(&mut bytes); - H256(bytes) - } - async fn insert_miniblock(conn: &mut StorageProcessor<'_>, number: u32, logs: Vec) { let mut header = L1BatchHeader::new( L1BatchNumber(number), @@ -770,7 +765,8 @@ mod tests { let touched_slots = conn .storage_logs_dal() .get_touched_slots_for_l1_batch(L1BatchNumber(1)) - .await; + .await + .unwrap(); assert_eq!(touched_slots.len(), 2); assert_eq!(touched_slots[&first_key], H256::repeat_byte(1)); assert_eq!(touched_slots[&second_key], H256::repeat_byte(2)); @@ -786,7 +782,8 @@ mod tests { let touched_slots = conn .storage_logs_dal() .get_touched_slots_for_l1_batch(L1BatchNumber(1)) - .await; + .await + .unwrap(); assert_eq!(touched_slots.len(), 2); assert_eq!(touched_slots[&first_key], H256::repeat_byte(3)); assert_eq!(touched_slots[&second_key], H256::repeat_byte(2)); @@ -818,7 +815,8 @@ mod tests { let prev_values = conn .storage_logs_dal() .get_previous_storage_values(&prev_keys, L1BatchNumber(2)) - .await; + .await + .unwrap(); assert_eq!(prev_values.len(), 3); assert_eq!(prev_values[&prev_keys[0]], Some(H256::repeat_byte(3))); assert_eq!(prev_values[&prev_keys[1]], None); @@ -826,7 +824,8 @@ mod tests { conn.storage_logs_dal() .rollback_storage(MiniblockNumber(1)) - .await; + .await + .unwrap(); let value = conn.storage_dal().get_by_key(&key).await.unwrap(); assert_eq!(value, Some(H256::repeat_byte(3))); @@ -872,7 +871,8 @@ mod tests { let logs_for_revert = conn .storage_logs_dal() .get_storage_logs_for_revert(L1BatchNumber(1)) - .await; + .await + .unwrap(); assert_eq!(logs_for_revert.len(), 15); // 5 updated + 10 new keys for log in &logs[5..] { let prev_value = logs_for_revert[&log.key.hashed_key()].unwrap().0; @@ -931,7 +931,8 @@ mod tests { let logs_for_revert = conn .storage_logs_dal() .get_storage_logs_for_revert(L1BatchNumber(1)) - .await; + .await + .unwrap(); assert_eq!(logs_for_revert.len(), 3); for (i, log) in logs.iter().enumerate() { let hashed_key = log.key.hashed_key(); @@ -974,10 +975,7 @@ mod tests { .iter() .find(|&key| key_range.contains(key)); if let Some(chunk_start) = chunk_start { - assert_eq!( - u256_to_h256_reversed(chunk_start.key), - *expected_start_key.unwrap() - ); + assert_eq!(chunk_start.key, *expected_start_key.unwrap()); assert_ne!(chunk_start.value, H256::zero()); assert_ne!(chunk_start.leaf_index, 0); } else { @@ -1027,7 +1025,7 @@ mod tests { assert_eq!( tree_entries .iter() - .map(|entry| u256_to_h256_reversed(entry.key)) + .map(|entry| entry.key) .collect::>(), sorted_hashed_keys ); @@ -1040,7 +1038,7 @@ mod tests { .unwrap(); assert!(!tree_entries.is_empty() && tree_entries.len() < 10); for entry in &tree_entries { - assert!(key_range.contains(&u256_to_h256_reversed(entry.key))); + assert!(key_range.contains(&entry.key)); } } } diff --git a/core/lib/merkle_tree/examples/loadtest/main.rs b/core/lib/merkle_tree/examples/loadtest/main.rs index 185ae0543f9d..53a641750d10 100644 --- a/core/lib/merkle_tree/examples/loadtest/main.rs +++ b/core/lib/merkle_tree/examples/loadtest/main.rs @@ -90,13 +90,11 @@ impl Cli { "Created temp dir for RocksDB: {}", dir.path().to_string_lossy() ); - let db = RocksDB::with_options( - dir.path(), - RocksDBOptions { - block_cache_capacity: self.block_cache, - ..RocksDBOptions::default() - }, - ); + let db_options = RocksDBOptions { + block_cache_capacity: self.block_cache, + ..RocksDBOptions::default() + }; + let db = RocksDB::with_options(dir.path(), db_options).unwrap(); rocksdb = RocksDBWrapper::from(db); if let Some(chunk_size) = self.chunk_size { diff --git a/core/lib/merkle_tree/examples/recovery.rs b/core/lib/merkle_tree/examples/recovery.rs index 1b4e634e567a..8769f9a64acc 100644 --- a/core/lib/merkle_tree/examples/recovery.rs +++ b/core/lib/merkle_tree/examples/recovery.rs @@ -62,13 +62,11 @@ impl Cli { "Created temp dir for RocksDB: {}", dir.path().to_string_lossy() ); - let db = RocksDB::with_options( - dir.path(), - RocksDBOptions { - block_cache_capacity: self.block_cache, - ..RocksDBOptions::default() - }, - ); + let db_options = RocksDBOptions { + block_cache_capacity: self.block_cache, + ..RocksDBOptions::default() + }; + let db = RocksDB::with_options(dir.path(), db_options).unwrap(); rocksdb = RocksDBWrapper::from(db); _temp_dir = Some(dir); &mut rocksdb diff --git a/core/lib/merkle_tree/src/storage/rocksdb.rs b/core/lib/merkle_tree/src/storage/rocksdb.rs index 7dd4d6083d79..8fc9f202d21b 100644 --- a/core/lib/merkle_tree/src/storage/rocksdb.rs +++ b/core/lib/merkle_tree/src/storage/rocksdb.rs @@ -3,7 +3,7 @@ use std::path::Path; use rayon::prelude::*; -use zksync_storage::{db::NamedColumnFamily, rocksdb::DBPinnableSlice, RocksDB}; +use zksync_storage::{db::NamedColumnFamily, rocksdb, rocksdb::DBPinnableSlice, RocksDB}; use crate::{ errors::{DeserializeError, ErrorContext}, @@ -66,8 +66,12 @@ impl RocksDBWrapper { const MANIFEST_KEY: &'static [u8] = &[0]; /// Creates a new wrapper, initializing RocksDB at the specified directory. - pub fn new(path: &Path) -> Self { - Self::from(RocksDB::new(path)) + /// + /// # Errors + /// + /// Propagates RocksDB I/O errors. + pub fn new(path: &Path) -> Result { + Ok(Self::from(RocksDB::new(path)?)) } /// Sets the chunk size for multi-get operations. The requested keys will be split @@ -295,7 +299,7 @@ mod tests { #[test] fn garbage_is_removed_on_db_reverts() { let dir = TempDir::new().expect("failed creating temporary dir for RocksDB"); - let mut db = RocksDBWrapper::new(dir.path()); + let mut db = RocksDBWrapper::new(dir.path()).unwrap(); // Insert some data to the database. let mut expected_keys = HashSet::new(); diff --git a/core/lib/merkle_tree/tests/integration/consistency.rs b/core/lib/merkle_tree/tests/integration/consistency.rs index b6b424e431ad..33ad521bc940 100644 --- a/core/lib/merkle_tree/tests/integration/consistency.rs +++ b/core/lib/merkle_tree/tests/integration/consistency.rs @@ -21,7 +21,7 @@ fn five_thousand_angry_monkeys_vs_merkle_tree() { const RNG_SEED: u64 = 42; let dir = TempDir::new().expect("failed creating temporary dir for RocksDB"); - let mut db = RocksDBWrapper::new(dir.path()); + let mut db = RocksDBWrapper::new(dir.path()).unwrap(); let mut tree = MerkleTree::new(&mut db); let kvs = generate_key_value_pairs(0..100); diff --git a/core/lib/merkle_tree/tests/integration/domain.rs b/core/lib/merkle_tree/tests/integration/domain.rs index e96b68fdade1..ee4ea973c956 100644 --- a/core/lib/merkle_tree/tests/integration/domain.rs +++ b/core/lib/merkle_tree/tests/integration/domain.rs @@ -45,7 +45,7 @@ fn basic_workflow() { let logs = gen_storage_logs(); let (metadata, expected_root_hash) = { - let db = RocksDB::new(temp_dir.as_ref()); + let db = RocksDB::new(temp_dir.as_ref()).unwrap(); let mut tree = ZkSyncTree::new_lightweight(db.into()); let metadata = tree.process_l1_batch(&logs); tree.save(); @@ -73,7 +73,7 @@ fn basic_workflow() { ]), ); - let db = RocksDB::new(temp_dir.as_ref()); + let db = RocksDB::new(temp_dir.as_ref()).unwrap(); let tree = ZkSyncTree::new_lightweight(db.into()); tree.verify_consistency(L1BatchNumber(0)); assert_eq!(tree.root_hash(), expected_root_hash); @@ -87,7 +87,7 @@ fn basic_workflow_multiblock() { let blocks = logs.chunks(9); let expected_root_hash = { - let db = RocksDB::new(temp_dir.as_ref()); + let db = RocksDB::new(temp_dir.as_ref()).unwrap(); let mut tree = ZkSyncTree::new_lightweight(db.into()); tree.use_dedicated_thread_pool(2); for block in blocks { @@ -105,7 +105,7 @@ fn basic_workflow_multiblock() { ]), ); - let db = RocksDB::new(temp_dir.as_ref()); + let db = RocksDB::new(temp_dir.as_ref()).unwrap(); let tree = ZkSyncTree::new_lightweight(db.into()); assert_eq!(tree.root_hash(), expected_root_hash); assert_eq!(tree.next_l1_batch_number(), L1BatchNumber(12)); @@ -114,7 +114,7 @@ fn basic_workflow_multiblock() { #[test] fn filtering_out_no_op_writes() { let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); - let db = RocksDB::new(temp_dir.as_ref()); + let db = RocksDB::new(temp_dir.as_ref()).unwrap(); let mut tree = ZkSyncTree::new(db.into()); let mut logs = gen_storage_logs(); let root_hash = tree.process_l1_batch(&logs).root_hash; @@ -152,7 +152,7 @@ fn filtering_out_no_op_writes() { #[test] fn revert_blocks() { let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); - let storage = RocksDB::new(temp_dir.as_ref()); + let storage = RocksDB::new(temp_dir.as_ref()).unwrap(); // Generate logs and save them to DB. // Produce 4 blocks with distinct values and 1 block with modified values from first block @@ -210,7 +210,7 @@ fn revert_blocks() { } // Revert the last block. - let storage = RocksDB::new(temp_dir.as_ref()); + let storage = RocksDB::new(temp_dir.as_ref()).unwrap(); { let mut tree = ZkSyncTree::new_lightweight(storage.into()); assert_eq!(tree.root_hash(), tree_metadata.last().unwrap().root_hash); @@ -220,7 +220,7 @@ fn revert_blocks() { } // Revert two more blocks. - let storage = RocksDB::new(temp_dir.as_ref()); + let storage = RocksDB::new(temp_dir.as_ref()).unwrap(); { let mut tree = ZkSyncTree::new_lightweight(storage.into()); tree.revert_logs(L1BatchNumber(1)); @@ -229,7 +229,7 @@ fn revert_blocks() { } // Revert two more blocks second time; the result should be the same - let storage = RocksDB::new(temp_dir.as_ref()); + let storage = RocksDB::new(temp_dir.as_ref()).unwrap(); { let mut tree = ZkSyncTree::new_lightweight(storage.into()); tree.revert_logs(L1BatchNumber(1)); @@ -238,7 +238,7 @@ fn revert_blocks() { } // Reapply one of the reverted logs - let storage = RocksDB::new(temp_dir.as_ref()); + let storage = RocksDB::new(temp_dir.as_ref()).unwrap(); { let storage_log = mirror_logs.get(3 * block_size).unwrap(); let mut tree = ZkSyncTree::new_lightweight(storage.into()); @@ -247,7 +247,7 @@ fn revert_blocks() { } // check saved block number - let storage = RocksDB::new(temp_dir.as_ref()); + let storage = RocksDB::new(temp_dir.as_ref()).unwrap(); let tree = ZkSyncTree::new_lightweight(storage.into()); assert_eq!(tree.next_l1_batch_number(), L1BatchNumber(3)); } @@ -255,7 +255,7 @@ fn revert_blocks() { #[test] fn reset_tree() { let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); - let storage = RocksDB::new(temp_dir.as_ref()); + let storage = RocksDB::new(temp_dir.as_ref()).unwrap(); let logs = gen_storage_logs(); let mut tree = ZkSyncTree::new_lightweight(storage.into()); let empty_root_hash = tree.root_hash(); @@ -278,14 +278,14 @@ fn read_logs() { logs.truncate(5); let write_metadata = { - let db = RocksDB::new(temp_dir.as_ref()); + let db = RocksDB::new(temp_dir.as_ref()).unwrap(); let mut tree = ZkSyncTree::new_lightweight(db.into()); let metadata = tree.process_l1_batch(&logs); tree.save(); metadata }; - let db = RocksDB::new(temp_dir.as_ref()); + let db = RocksDB::new(temp_dir.as_ref()).unwrap(); let mut tree = ZkSyncTree::new_lightweight(db.into()); let read_logs: Vec<_> = logs .into_iter() @@ -315,7 +315,7 @@ fn subtract_from_max_value(diff: u8) -> [u8; 32] { #[test] fn root_hash_compatibility() { let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); - let db = RocksDB::new(temp_dir.as_ref()); + let db = RocksDB::new(temp_dir.as_ref()).unwrap(); let mut tree = ZkSyncTree::new_lightweight(db.into()); assert_eq!( tree.root_hash(), @@ -372,7 +372,7 @@ fn root_hash_compatibility() { #[test] fn process_block_idempotency_check() { let temp_dir = TempDir::new().expect("failed to get temporary directory for RocksDB"); - let rocks_db = RocksDB::new(temp_dir.as_ref()); + let rocks_db = RocksDB::new(temp_dir.as_ref()).unwrap(); let mut tree = ZkSyncTree::new_lightweight(rocks_db.into()); let logs = gen_storage_logs(); let tree_metadata = tree.process_l1_batch(&logs); @@ -435,7 +435,7 @@ fn witness_workflow() { let logs = gen_storage_logs(); let (first_chunk, _) = logs.split_at(logs.len() / 2); - let db = RocksDB::new(temp_dir.as_ref()); + let db = RocksDB::new(temp_dir.as_ref()).unwrap(); let mut tree = ZkSyncTree::new(db.into()); let metadata = tree.process_l1_batch(first_chunk); let job = metadata.witness.unwrap(); @@ -465,7 +465,7 @@ fn witnesses_with_multiple_blocks() { let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); let logs = gen_storage_logs(); - let db = RocksDB::new(temp_dir.as_ref()); + let db = RocksDB::new(temp_dir.as_ref()).unwrap(); let mut tree = ZkSyncTree::new(db.into()); let empty_tree_hashes: Vec<_> = (0..256) .map(|i| Blake2Hasher.empty_subtree_hash(i)) diff --git a/core/lib/merkle_tree/tests/integration/merkle_tree.rs b/core/lib/merkle_tree/tests/integration/merkle_tree.rs index 117ea0db4d99..fe6731fb441c 100644 --- a/core/lib/merkle_tree/tests/integration/merkle_tree.rs +++ b/core/lib/merkle_tree/tests/integration/merkle_tree.rs @@ -550,7 +550,7 @@ mod rocksdb { impl Harness { fn new() -> Self { let dir = TempDir::new().expect("failed creating temporary dir for RocksDB"); - let db = RocksDBWrapper::new(dir.path()); + let db = RocksDBWrapper::new(dir.path()).unwrap(); Self { db, dir } } } @@ -661,7 +661,7 @@ mod rocksdb { tree.extend(vec![TreeEntry::new(U256::zero(), 1, H256::zero())]); drop(tree); - let db = RocksDBWrapper::new(dir.path()); + let db = RocksDBWrapper::new(dir.path()).unwrap(); MerkleTree::with_hasher(db, ()); } } diff --git a/core/lib/merkle_tree/tests/integration/recovery.rs b/core/lib/merkle_tree/tests/integration/recovery.rs index 2bac00f02c3d..2992561bb1bf 100644 --- a/core/lib/merkle_tree/tests/integration/recovery.rs +++ b/core/lib/merkle_tree/tests/integration/recovery.rs @@ -131,7 +131,7 @@ mod rocksdb { #[test_casing(8, test_casing::Product((RecoveryKind::ALL, [6, 10, 17, 42])))] fn recovery_in_chunks(kind: RecoveryKind, chunk_size: usize) { let temp_dir = TempDir::new().unwrap(); - let db = RocksDBWrapper::new(temp_dir.path()); + let db = RocksDBWrapper::new(temp_dir.path()).unwrap(); test_recovery_in_chunks(db, kind, chunk_size); } } diff --git a/core/lib/node/src/implementations/task/metadata_calculator.rs b/core/lib/node/src/implementations/task/metadata_calculator.rs index 4bfe18167b40..0a61ad00abb8 100644 --- a/core/lib/node/src/implementations/task/metadata_calculator.rs +++ b/core/lib/node/src/implementations/task/metadata_calculator.rs @@ -45,7 +45,7 @@ impl IntoZkSyncTask for MetadataCalculatorTaskBuilder { } let metadata_calculator = - MetadataCalculator::new(self.0, object_store.map(|os| os.0)).await; + MetadataCalculator::new(self.0, object_store.map(|os| os.0)).await?; let healthchecks = node .get_resource_or_default::>() diff --git a/core/lib/state/Cargo.toml b/core/lib/state/Cargo.toml index 87e433a4160e..fed7fa0c0157 100644 --- a/core/lib/state/Cargo.toml +++ b/core/lib/state/Cargo.toml @@ -23,5 +23,7 @@ tracing = "0.1" itertools = "0.10.3" [dev-dependencies] +assert_matches = "1.5.0" rand = "0.8.5" tempfile = "3.0.2" +test-casing = "0.1.2" diff --git a/core/lib/state/src/in_memory.rs b/core/lib/state/src/in_memory.rs index d6058649a459..6dfc98434eb9 100644 --- a/core/lib/state/src/in_memory.rs +++ b/core/lib/state/src/in_memory.rs @@ -15,7 +15,7 @@ pub const IN_MEMORY_STORAGE_DEFAULT_NETWORK_ID: u32 = 270; /// In-memory storage. #[derive(Debug, Default, Clone)] pub struct InMemoryStorage { - pub(crate) state: HashMap, + pub(crate) state: HashMap, pub(crate) factory_deps: HashMap>, last_enum_index_set: u64, } @@ -68,7 +68,7 @@ impl InMemoryStorage { let state: HashMap<_, _> = state_without_indices .into_iter() .enumerate() - .map(|(idx, (key, value))| (key, (value, idx as u64 + 1))) + .map(|(idx, (key, value))| (key.hashed_key(), (value, idx as u64 + 1))) .collect(); let factory_deps = contracts @@ -86,7 +86,7 @@ impl InMemoryStorage { /// Sets the storage `value` at the specified `key`. pub fn set_value(&mut self, key: StorageKey, value: StorageValue) { - match self.state.entry(key) { + match self.state.entry(key.hashed_key()) { Entry::Occupied(mut entry) => { entry.get_mut().0 = value; } @@ -101,24 +101,19 @@ impl InMemoryStorage { pub fn store_factory_dep(&mut self, hash: H256, bytecode: Vec) { self.factory_deps.insert(hash, bytecode); } - - /// Get internal state of the storage. - pub fn get_state(&self) -> &HashMap { - &self.state - } } impl ReadStorage for &InMemoryStorage { fn read_value(&mut self, key: &StorageKey) -> StorageValue { self.state - .get(key) + .get(&key.hashed_key()) .map(|(value, _)| value) .copied() .unwrap_or_default() } fn is_write_initial(&mut self, key: &StorageKey) -> bool { - !self.state.contains_key(key) + !self.state.contains_key(&key.hashed_key()) } fn load_factory_dep(&mut self, hash: H256) -> Option> { @@ -126,7 +121,7 @@ impl ReadStorage for &InMemoryStorage { } fn get_enumeration_index(&mut self, key: &StorageKey) -> Option { - self.state.get(key).map(|(_, idx)| *idx) + self.state.get(&key.hashed_key()).map(|(_, idx)| *idx) } } diff --git a/core/lib/state/src/lib.rs b/core/lib/state/src/lib.rs index 3d54967c9ad6..f76f38846566 100644 --- a/core/lib/state/src/lib.rs +++ b/core/lib/state/src/lib.rs @@ -30,7 +30,7 @@ mod witness; pub use self::{ in_memory::{InMemoryStorage, IN_MEMORY_STORAGE_DEFAULT_NETWORK_ID}, postgres::{PostgresStorage, PostgresStorageCaches}, - rocksdb::RocksdbStorage, + rocksdb::{RocksbStorageBuilder, RocksdbStorage}, shadow_storage::ShadowStorage, storage_view::{StorageView, StorageViewMetrics}, witness::WitnessStorage, diff --git a/core/lib/state/src/rocksdb/metrics.rs b/core/lib/state/src/rocksdb/metrics.rs index 997f4b42ed37..912bcdba251d 100644 --- a/core/lib/state/src/rocksdb/metrics.rs +++ b/core/lib/state/src/rocksdb/metrics.rs @@ -2,7 +2,7 @@ use std::time::Duration; -use vise::{Buckets, Gauge, Histogram, Metrics}; +use vise::{Buckets, EncodeLabelSet, EncodeLabelValue, Family, Gauge, Histogram, Metrics, Unit}; #[derive(Debug, Metrics)] #[metrics(prefix = "server_state_keeper_secondary_storage")] @@ -18,3 +18,36 @@ pub(super) struct RocksdbStorageMetrics { #[vise::register] pub(super) static METRICS: vise::Global = vise::Global::new(); + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue, EncodeLabelSet)] +#[metrics(label = "stage", rename_all = "snake_case")] +pub(super) enum RecoveryStage { + LoadFactoryDeps, + SaveFactoryDeps, + LoadChunkStarts, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue, EncodeLabelSet)] +#[metrics(label = "stage", rename_all = "snake_case")] +pub(super) enum ChunkRecoveryStage { + LoadEntries, + SaveEntries, +} + +/// Recovery-related group of metrics. +#[derive(Debug, Metrics)] +#[metrics(prefix = "server_state_keeper_secondary_storage_recovery")] +pub(super) struct RocksdbRecoveryMetrics { + /// Number of chunks recovered. + pub recovered_chunk_count: Gauge, + /// Latency of a storage recovery stage (not related to the recovery of a particular chunk; + /// those metrics are tracked in the `chunk_latency` histogram). + #[metrics(buckets = Buckets::LATENCIES, unit = Unit::Seconds)] + pub latency: Family>, + /// Latency of a chunk recovery stage. + #[metrics(buckets = Buckets::LATENCIES, unit = Unit::Seconds)] + pub chunk_latency: Family>, +} + +#[vise::register] +pub(super) static RECOVERY_METRICS: vise::Global = vise::Global::new(); diff --git a/core/lib/state/src/rocksdb/mod.rs b/core/lib/state/src/rocksdb/mod.rs index 6e0bb7233eec..956307566af3 100644 --- a/core/lib/state/src/rocksdb/mod.rs +++ b/core/lib/state/src/rocksdb/mod.rs @@ -19,24 +19,37 @@ //! | Contracts | address (20 bytes) | `Vec` | Contract contents | //! | Factory deps | hash (32 bytes) | `Vec` | Bytecodes for new contracts that a certain contract may deploy. | -use std::{collections::HashMap, convert::TryInto, mem, path::Path, time::Instant}; - +use std::{ + collections::HashMap, + convert::TryInto, + mem, + path::{Path, PathBuf}, + time::Instant, +}; + +use anyhow::Context as _; use itertools::{Either, Itertools}; +use tokio::sync::watch; use zksync_dal::StorageProcessor; use zksync_storage::{db::NamedColumnFamily, RocksDB}; use zksync_types::{L1BatchNumber, StorageKey, StorageValue, H256, U256}; use zksync_utils::{h256_to_u256, u256_to_h256}; use self::metrics::METRICS; +#[cfg(test)] +use self::tests::RocksdbStorageEventListener; use crate::{InMemoryStorage, ReadStorage}; mod metrics; +mod recovery; +#[cfg(test)] +mod tests; -fn serialize_block_number(block_number: u32) -> [u8; 4] { +fn serialize_l1_batch_number(block_number: u32) -> [u8; 4] { block_number.to_le_bytes() } -fn deserialize_block_number(bytes: &[u8]) -> u32 { +fn deserialize_l1_batch_number(bytes: &[u8]) -> u32 { let bytes: [u8; 4] = bytes.try_into().expect("incorrect block number format"); u32::from_le_bytes(bytes) } @@ -96,86 +109,189 @@ impl StateValue { } } +/// Error emitted when [`RocksdbStorage`] is being updated. +#[derive(Debug)] +enum RocksdbSyncError { + Internal(anyhow::Error), + Interrupted, +} + +impl From for RocksdbSyncError { + fn from(err: anyhow::Error) -> Self { + Self::Internal(err) + } +} + /// [`ReadStorage`] implementation backed by RocksDB. #[derive(Debug)] pub struct RocksdbStorage { db: RocksDB, pending_patch: InMemoryStorage, enum_index_migration_chunk_size: usize, + /// Test-only listeners to events produced by the storage. + #[cfg(test)] + listener: RocksdbStorageEventListener, } -impl RocksdbStorage { - const BLOCK_NUMBER_KEY: &'static [u8] = b"block_number"; - const ENUM_INDEX_MIGRATION_CURSOR: &'static [u8] = b"enum_index_migration_cursor"; +/// Builder of [`RocksdbStorage`]. The storage data is inaccessible until the storage is [`Self::synchronize()`]d +/// with Postgres. +#[derive(Debug)] +pub struct RocksbStorageBuilder(RocksdbStorage); - fn is_special_key(key: &[u8]) -> bool { - key == Self::BLOCK_NUMBER_KEY || key == Self::ENUM_INDEX_MIGRATION_CURSOR +impl RocksbStorageBuilder { + /// Enables enum indices migration. + pub fn enable_enum_index_migration(&mut self, chunk_size: usize) { + self.0.enum_index_migration_chunk_size = chunk_size; + } + + /// Returns the last processed l1 batch number + 1. + /// + /// # Panics + /// + /// Panics on RocksDB errors. + pub async fn l1_batch_number(&self) -> Option { + self.0.l1_batch_number().await } - /// Creates a new storage with the provided RocksDB `path`. - pub fn new(path: &Path) -> Self { - let db = RocksDB::new(path); - Self { - db, - pending_patch: InMemoryStorage::default(), - enum_index_migration_chunk_size: 100, + /// Synchronizes this storage with Postgres using the provided connection. + /// + /// # Return value + /// + /// Returns `Ok(None)` if the update is interrupted using `stop_receiver`. + /// + /// # Errors + /// + /// - Errors if the local L1 batch number is greater than the last sealed L1 batch number + /// in Postgres. + pub async fn synchronize( + self, + storage: &mut StorageProcessor<'_>, + stop_receiver: &watch::Receiver, + ) -> anyhow::Result> { + let mut inner = self.0; + match inner.update_from_postgres(storage, stop_receiver).await { + Ok(()) => Ok(Some(inner)), + Err(RocksdbSyncError::Interrupted) => Ok(None), + Err(RocksdbSyncError::Internal(err)) => Err(err), } } - /// Enables enum indices migration. - pub fn enable_enum_index_migration(&mut self, chunk_size: usize) { - self.enum_index_migration_chunk_size = chunk_size; + /// Rolls back the state to a previous L1 batch number. + /// + /// # Errors + /// + /// Propagates RocksDB and Postgres errors. + pub async fn rollback( + mut self, + storage: &mut StorageProcessor<'_>, + last_l1_batch_to_keep: L1BatchNumber, + ) -> anyhow::Result<()> { + self.0.rollback(storage, last_l1_batch_to_keep).await } +} - /// Synchronizes this storage with Postgres using the provided connection. +impl RocksdbStorage { + const L1_BATCH_NUMBER_KEY: &'static [u8] = b"block_number"; + const ENUM_INDEX_MIGRATION_CURSOR: &'static [u8] = b"enum_index_migration_cursor"; + + /// Desired size of log chunks loaded from Postgres during snapshot recovery. + /// This is intentionally not configurable because chunks must be the same for the entire recovery + /// (i.e., not changed after a node restart). + const DESIRED_LOG_CHUNK_SIZE: u64 = 200_000; + + fn is_special_key(key: &[u8]) -> bool { + key == Self::L1_BATCH_NUMBER_KEY || key == Self::ENUM_INDEX_MIGRATION_CURSOR + } + + /// Creates a new storage builder with the provided RocksDB `path`. /// - /// # Panics + /// # Errors /// - /// Panics if the local L1 batch number is greater than the last sealed L1 batch number - /// in Postgres. - pub async fn update_from_postgres(&mut self, conn: &mut StorageProcessor<'_>) { + /// Propagates RocksDB I/O errors. + pub async fn builder(path: &Path) -> anyhow::Result { + Self::new(path.to_path_buf()) + .await + .map(RocksbStorageBuilder) + } + + async fn new(path: PathBuf) -> anyhow::Result { + tokio::task::spawn_blocking(move || { + Ok(Self { + db: RocksDB::new(&path).context("failed initializing state keeper RocksDB")?, + pending_patch: InMemoryStorage::default(), + enum_index_migration_chunk_size: 100, + #[cfg(test)] + listener: RocksdbStorageEventListener::default(), + }) + }) + .await + .context("panicked initializing state keeper RocksDB")? + } + + async fn update_from_postgres( + &mut self, + storage: &mut StorageProcessor<'_>, + stop_receiver: &watch::Receiver, + ) -> Result<(), RocksdbSyncError> { + let mut current_l1_batch_number = self + .ensure_ready(storage, Self::DESIRED_LOG_CHUNK_SIZE, stop_receiver) + .await?; + let latency = METRICS.update.start(); - let Some(latest_l1_batch_number) = conn + let Some(latest_l1_batch_number) = storage .blocks_dal() .get_sealed_l1_batch_number() .await - .unwrap() + .context("failed fetching sealed L1 batch number")? else { // No L1 batches are persisted in Postgres; update is not necessary. - return; + return Ok(()); }; tracing::debug!("Loading storage for l1 batch number {latest_l1_batch_number}"); - let mut current_l1_batch_number = self.l1_batch_number().0; - assert!( - current_l1_batch_number <= latest_l1_batch_number.0 + 1, - "L1 batch number in state keeper cache ({current_l1_batch_number}) is greater than \ - the last sealed L1 batch number in Postgres ({latest_l1_batch_number})" - ); + if current_l1_batch_number > latest_l1_batch_number + 1 { + let err = anyhow::anyhow!( + "L1 batch number in state keeper cache ({current_l1_batch_number}) is greater than \ + the last sealed L1 batch number in Postgres ({latest_l1_batch_number})" + ); + return Err(err.into()); + } - while current_l1_batch_number <= latest_l1_batch_number.0 { - let current_lag = latest_l1_batch_number.0 - current_l1_batch_number + 1; + while current_l1_batch_number <= latest_l1_batch_number { + if *stop_receiver.borrow() { + return Err(RocksdbSyncError::Interrupted); + } + let current_lag = latest_l1_batch_number.0 - current_l1_batch_number.0 + 1; METRICS.lag.set(current_lag.into()); - tracing::debug!("loading state changes for l1 batch {current_l1_batch_number}"); - let storage_logs = conn + tracing::debug!("Loading state changes for l1 batch {current_l1_batch_number}"); + let storage_logs = storage .storage_logs_dal() - .get_touched_slots_for_l1_batch(L1BatchNumber(current_l1_batch_number)) - .await; - self.apply_storage_logs(storage_logs, conn).await; + .get_touched_slots_for_l1_batch(current_l1_batch_number) + .await + .with_context(|| { + format!("failed loading touched slots for L1 batch {current_l1_batch_number}") + })?; + self.apply_storage_logs(storage_logs, storage).await?; - tracing::debug!("loading factory deps for l1 batch {current_l1_batch_number}"); - let factory_deps = conn + tracing::debug!("Loading factory deps for L1 batch {current_l1_batch_number}"); + let factory_deps = storage .blocks_dal() - .get_l1_batch_factory_deps(L1BatchNumber(current_l1_batch_number)) + .get_l1_batch_factory_deps(current_l1_batch_number) .await - .unwrap(); + .with_context(|| { + format!("failed loading factory deps for L1 batch {current_l1_batch_number}") + })?; for (hash, bytecode) in factory_deps { self.store_factory_dep(hash, bytecode); } current_l1_batch_number += 1; - self.save(L1BatchNumber(current_l1_batch_number)).await; + self.save(Some(current_l1_batch_number)) + .await + .with_context(|| format!("failed saving L1 batch #{current_l1_batch_number}"))?; + #[cfg(test)] + (self.listener.on_l1_batch_synced)(current_l1_batch_number - 1); } latency.observe(); @@ -188,50 +304,66 @@ impl RocksdbStorage { assert!(self.enum_index_migration_chunk_size > 0); // Enum indices must be at the storage. Run migration till the end. - while self.enum_migration_start_from().is_some() { - self.save_missing_enum_indices(conn).await; + while self.enum_migration_start_from().await.is_some() { + if *stop_receiver.borrow() { + return Err(RocksdbSyncError::Interrupted); + } + self.save_missing_enum_indices(storage).await?; } + Ok(()) } async fn apply_storage_logs( &mut self, storage_logs: HashMap, - conn: &mut StorageProcessor<'_>, - ) { - let (logs_with_known_indices, logs_with_unknown_indices): (Vec<_>, Vec<_>) = self - .process_transaction_logs(storage_logs) + storage: &mut StorageProcessor<'_>, + ) -> anyhow::Result<()> { + let db = self.db.clone(); + let processed_logs = + tokio::task::spawn_blocking(move || Self::process_transaction_logs(&db, storage_logs)) + .await + .context("panicked processing storage logs")?; + + let (logs_with_known_indices, logs_with_unknown_indices): (Vec<_>, Vec<_>) = processed_logs + .into_iter() .partition_map(|(key, StateValue { value, enum_index })| match enum_index { - Some(index) => Either::Left((key, (value, index))), - None => Either::Right((key, value)), + Some(index) => Either::Left((key.hashed_key(), (value, index))), + None => Either::Right((key.hashed_key(), value)), }); let keys_with_unknown_indices: Vec<_> = logs_with_unknown_indices .iter() - .map(|(key, _)| key.hashed_key()) + .map(|&(key, _)| key) .collect(); - let enum_indices_and_batches = conn + let enum_indices_and_batches = storage .storage_logs_dal() .get_l1_batches_and_indices_for_initial_writes(&keys_with_unknown_indices) - .await; - assert_eq!( - keys_with_unknown_indices.len(), - enum_indices_and_batches.len() + .await + .context("failed getting enumeration indices for storage logs")?; + anyhow::ensure!( + keys_with_unknown_indices.len() == enum_indices_and_batches.len(), + "Inconsistent Postgres data: not all new storage logs have enumeration indices" ); - self.pending_patch.state = - logs_with_known_indices - .into_iter() - .chain(logs_with_unknown_indices.into_iter().map(|(key, value)| { - (key, (value, enum_indices_and_batches[&key.hashed_key()].1)) - })) - .collect(); + self.pending_patch.state = logs_with_known_indices + .into_iter() + .chain( + logs_with_unknown_indices + .into_iter() + .map(|(key, value)| (key, (value, enum_indices_and_batches[&key].1))), + ) + .collect(); + Ok(()) } - async fn save_missing_enum_indices(&self, conn: &mut StorageProcessor<'_>) { - let (Some(start_from), true) = ( - self.enum_migration_start_from(), + async fn save_missing_enum_indices( + &self, + storage: &mut StorageProcessor<'_>, + ) -> anyhow::Result<()> { + let (true, Some(start_from)) = ( self.enum_index_migration_chunk_size > 0, + self.enum_migration_start_from().await, ) else { - return; + return Ok(()); }; let started_at = Instant::now(); @@ -239,91 +371,107 @@ impl RocksdbStorage { "RocksDB enum index migration is not finished, starting from key {start_from:0>64x}" ); - let mut write_batch = self.db.new_write_batch(); - let (keys, values): (Vec<_>, Vec<_>) = self - .db - .from_iterator_cf(StateKeeperColumnFamily::State, start_from.as_bytes()) - .filter_map(|(key, value)| { - if Self::is_special_key(&key) { - return None; - } - let state_value = StateValue::deserialize(&value); - (state_value.enum_index.is_none()) - .then(|| (H256::from_slice(&key), state_value.value)) - }) - .take(self.enum_index_migration_chunk_size) - .unzip(); - let enum_indices_and_batches = conn + let db = self.db.clone(); + let enum_index_migration_chunk_size = self.enum_index_migration_chunk_size; + let (keys, values): (Vec<_>, Vec<_>) = tokio::task::spawn_blocking(move || { + db.from_iterator_cf(StateKeeperColumnFamily::State, start_from.as_bytes()) + .filter_map(|(key, value)| { + if Self::is_special_key(&key) { + return None; + } + let state_value = StateValue::deserialize(&value); + state_value + .enum_index + .is_none() + .then(|| (H256::from_slice(&key), state_value.value)) + }) + .take(enum_index_migration_chunk_size) + .unzip() + }) + .await + .unwrap(); + + let enum_indices_and_batches = storage .storage_logs_dal() .get_l1_batches_and_indices_for_initial_writes(&keys) - .await; + .await + .context("failed getting enumeration indices for storage logs")?; assert_eq!(keys.len(), enum_indices_and_batches.len()); + let key_count = keys.len(); - for (key, value) in keys.iter().zip(values) { - let index = enum_indices_and_batches[key].1; - write_batch.put_cf( - StateKeeperColumnFamily::State, - key.as_bytes(), - &StateValue::new(value, Some(index)).serialize(), - ); - } - - let next_key = keys - .last() - .and_then(|last_key| h256_to_u256(*last_key).checked_add(U256::one())) - .map(u256_to_h256); - match (next_key, keys.len()) { - (Some(next_key), keys_len) if keys_len == self.enum_index_migration_chunk_size => { + let db = self.db.clone(); + tokio::task::spawn_blocking(move || { + let mut write_batch = db.new_write_batch(); + for (key, value) in keys.iter().zip(values) { + let index = enum_indices_and_batches[key].1; write_batch.put_cf( StateKeeperColumnFamily::State, - Self::ENUM_INDEX_MIGRATION_CURSOR, - next_key.as_bytes(), + key.as_bytes(), + &StateValue::new(value, Some(index)).serialize(), ); } - _ => { - write_batch.put_cf( - StateKeeperColumnFamily::State, - Self::ENUM_INDEX_MIGRATION_CURSOR, - &[], - ); - tracing::info!("RocksDB enum index migration finished"); + + let next_key = keys + .last() + .and_then(|last_key| h256_to_u256(*last_key).checked_add(U256::one())) + .map(u256_to_h256); + match (next_key, keys.len()) { + (Some(next_key), keys_len) if keys_len == enum_index_migration_chunk_size => { + write_batch.put_cf( + StateKeeperColumnFamily::State, + Self::ENUM_INDEX_MIGRATION_CURSOR, + next_key.as_bytes(), + ); + } + _ => { + write_batch.put_cf( + StateKeeperColumnFamily::State, + Self::ENUM_INDEX_MIGRATION_CURSOR, + &[], + ); + tracing::info!("RocksDB enum index migration finished"); + } } - } - self.db - .write(write_batch) - .expect("failed to save state data into rocksdb"); + db.write(write_batch) + .context("failed saving enum indices to RocksDB") + }) + .await + .context("panicked while saving enum indices to RocksDB")??; + tracing::info!( - "RocksDB enum index migration chunk took {:?}, migrated {} keys", - started_at.elapsed(), - keys.len() + "RocksDB enum index migration chunk took {:?}, migrated {key_count} keys", + started_at.elapsed() ); + Ok(()) } fn read_value_inner(&self, key: &StorageKey) -> Option { - self.read_state_value(key) - .map(|state_value| state_value.value) + Self::read_state_value(&self.db, key.hashed_key()).map(|state_value| state_value.value) } - fn read_state_value(&self, key: &StorageKey) -> Option { + fn read_state_value( + db: &RocksDB, + hashed_key: H256, + ) -> Option { let cf = StateKeeperColumnFamily::State; - self.db - .get_cf(cf, &Self::serialize_state_key(key)) + db.get_cf(cf, &Self::serialize_state_key(hashed_key)) .expect("failed to read rocksdb state value") .map(|value| StateValue::deserialize(&value)) } /// Returns storage logs to apply. fn process_transaction_logs( - &self, + db: &RocksDB, updates: HashMap, - ) -> impl Iterator + '_ { - updates.into_iter().filter_map(|(key, new_value)| { - if let Some(state_value) = self.read_state_value(&key) { + ) -> Vec<(StorageKey, StateValue)> { + let it = updates.into_iter().filter_map(move |(key, new_value)| { + if let Some(state_value) = Self::read_state_value(db, key.hashed_key()) { Some((key, StateValue::new(new_value, state_value.enum_index))) } else { (!new_value.is_zero()).then_some((key, StateValue::new(new_value, None))) } - }) + }); + it.collect() } /// Stores a factory dependency with the specified `hash` and `bytecode`. @@ -331,16 +479,11 @@ impl RocksdbStorage { self.pending_patch.factory_deps.insert(hash, bytecode); } - /// Rolls back the state to a previous L1 batch number. - /// - /// # Panics - /// - /// Panics on RocksDB errors. - pub async fn rollback( + async fn rollback( &mut self, connection: &mut StorageProcessor<'_>, last_l1_batch_to_keep: L1BatchNumber, - ) { + ) -> anyhow::Result<()> { tracing::info!("Rolling back state keeper storage to L1 batch #{last_l1_batch_to_keep}..."); tracing::info!("Getting logs that should be applied to rollback state..."); @@ -348,7 +491,8 @@ impl RocksdbStorage { let logs = connection .storage_logs_dal() .get_storage_logs_for_revert(last_l1_batch_to_keep) - .await; + .await + .context("failed getting logs for rollback")?; tracing::info!("Got {} logs, took {:?}", logs.len(), stage_start.elapsed()); tracing::info!("Getting number of last miniblock for L1 batch #{last_l1_batch_to_keep}..."); @@ -357,8 +501,10 @@ impl RocksdbStorage { .blocks_dal() .get_miniblock_range_of_l1_batch(last_l1_batch_to_keep) .await - .unwrap() - .expect("L1 batch should contain at least one miniblock"); + .with_context(|| { + format!("failed fetching miniblock range for L1 batch #{last_l1_batch_to_keep}") + })? + .context("L1 batch should contain at least one miniblock")?; tracing::info!( "Got miniblock number {last_miniblock_to_keep}, took {:?}", stage_start.elapsed() @@ -369,7 +515,10 @@ impl RocksdbStorage { let factory_deps = connection .storage_dal() .get_factory_deps_for_revert(last_miniblock_to_keep) - .await; + .await + .with_context(|| { + format!("failed fetching factory deps for miniblock #{last_miniblock_to_keep}") + })?; tracing::info!( "Got {} factory deps, took {:?}", factory_deps.len(), @@ -394,8 +543,8 @@ impl RocksdbStorage { } batch.put_cf( cf, - Self::BLOCK_NUMBER_KEY, - &serialize_block_number(last_l1_batch_to_keep.0 + 1), + Self::L1_BATCH_NUMBER_KEY, + &serialize_l1_batch_number(last_l1_batch_to_keep.0 + 1), ); let cf = StateKeeperColumnFamily::FactoryDeps; @@ -404,29 +553,31 @@ impl RocksdbStorage { } db.write(batch) - .expect("failed to save state data into RocksDB"); + .context("failed to save state data into RocksDB") }) .await - .unwrap(); + .context("panicked during revert")? } /// Saves the pending changes to RocksDB. Must be executed on a Tokio thread. - async fn save(&mut self, l1_batch_number: L1BatchNumber) { + async fn save(&mut self, l1_batch_number: Option) -> anyhow::Result<()> { let pending_patch = mem::take(&mut self.pending_patch); let db = self.db.clone(); let save_task = tokio::task::spawn_blocking(move || { let mut batch = db.new_write_batch(); let cf = StateKeeperColumnFamily::State; - batch.put_cf( - cf, - Self::BLOCK_NUMBER_KEY, - &serialize_block_number(l1_batch_number.0), - ); + if let Some(l1_batch_number) = l1_batch_number { + batch.put_cf( + cf, + Self::L1_BATCH_NUMBER_KEY, + &serialize_l1_batch_number(l1_batch_number.0), + ); + } for (key, (value, enum_index)) in pending_patch.state { batch.put_cf( cf, - &Self::serialize_state_key(&key), + &Self::serialize_state_key(key), &StateValue::new(value, Some(enum_index)).serialize(), ); } @@ -436,26 +587,31 @@ impl RocksdbStorage { batch.put_cf(cf, &hash.to_fixed_bytes(), value.as_ref()); } db.write(batch) - .expect("failed to save state data into rocksdb"); + .context("failed to save state data into RocksDB") }); - save_task.await.unwrap(); + save_task + .await + .context("panicked when saving state data into RocksDB")? } - /// Returns the last processed l1 batch number + 1 + /// Returns the last processed l1 batch number + 1. + /// /// # Panics + /// /// Panics on RocksDB errors. - pub fn l1_batch_number(&self) -> L1BatchNumber { + async fn l1_batch_number(&self) -> Option { let cf = StateKeeperColumnFamily::State; - let block_number = self - .db - .get_cf(cf, Self::BLOCK_NUMBER_KEY) - .expect("failed to fetch block number"); - let block_number = block_number.map_or(0, |bytes| deserialize_block_number(&bytes)); - L1BatchNumber(block_number) + let db = self.db.clone(); + let number_bytes = + tokio::task::spawn_blocking(move || db.get_cf(cf, Self::L1_BATCH_NUMBER_KEY)) + .await + .expect("failed getting L1 batch number from RocksDB") + .expect("failed getting L1 batch number from RocksDB"); + number_bytes.map(|bytes| L1BatchNumber(deserialize_l1_batch_number(&bytes))) } - fn serialize_state_key(key: &StorageKey) -> [u8; 32] { - key.hashed_key().to_fixed_bytes() + fn serialize_state_key(key: H256) -> [u8; 32] { + key.to_fixed_bytes() } /// Estimates the number of key–value entries in the VM state. @@ -464,16 +620,20 @@ impl RocksdbStorage { .estimated_number_of_entries(StateKeeperColumnFamily::State) } - fn enum_migration_start_from(&self) -> Option { - let value = self - .db - .get_cf( + async fn enum_migration_start_from(&self) -> Option { + let db = self.db.clone(); + let value = tokio::task::spawn_blocking(move || { + db.get_cf( StateKeeperColumnFamily::State, Self::ENUM_INDEX_MIGRATION_CURSOR, ) - .expect("failed to read `ENUM_INDEX_MIGRATION_CURSOR`"); + .expect("failed to read `ENUM_INDEX_MIGRATION_CURSOR`") + }) + .await + .unwrap(); + match value { - Some(v) if v.is_empty() => None, + Some(value) if value.is_empty() => None, Some(cursor) => Some(H256::from_slice(&cursor)), None => Some(H256::zero()), } @@ -499,250 +659,7 @@ impl ReadStorage for RocksdbStorage { fn get_enumeration_index(&mut self, key: &StorageKey) -> Option { // Can safely unwrap here since it indicates that the migration has not yet ended and boojum will // only be deployed when the migration is finished. - self.read_state_value(key) + Self::read_state_value(&self.db, key.hashed_key()) .map(|state_value| state_value.enum_index.unwrap()) } } - -#[cfg(test)] -mod tests { - use tempfile::TempDir; - use zksync_dal::ConnectionPool; - use zksync_types::{MiniblockNumber, StorageLog}; - - use super::*; - use crate::test_utils::{ - create_l1_batch, create_miniblock, gen_storage_logs, prepare_postgres, - }; - - #[tokio::test] - async fn rocksdb_storage_basics() { - let dir = TempDir::new().expect("cannot create temporary dir for state keeper"); - let mut storage = RocksdbStorage::new(dir.path()); - let mut storage_logs: HashMap<_, _> = gen_storage_logs(0..20) - .into_iter() - .map(|log| (log.key, log.value)) - .collect(); - let changed_keys = storage.process_transaction_logs(storage_logs.clone()); - storage.pending_patch.state = changed_keys - .map(|(key, state_value)| (key, (state_value.value, 1))) // enum index doesn't matter in the test - .collect(); - storage.save(L1BatchNumber(0)).await; - { - for (key, value) in &storage_logs { - assert!(!storage.is_write_initial(key)); - assert_eq!(storage.read_value(key), *value); - } - } - - // Overwrite some of the logs. - for log in storage_logs.values_mut().step_by(2) { - *log = StorageValue::zero(); - } - let changed_keys = storage.process_transaction_logs(storage_logs.clone()); - storage.pending_patch.state = changed_keys - .map(|(key, state_value)| (key, (state_value.value, 1))) // enum index doesn't matter in the test - .collect(); - storage.save(L1BatchNumber(1)).await; - - for (key, value) in &storage_logs { - assert!(!storage.is_write_initial(key)); - assert_eq!(storage.read_value(key), *value); - } - } - - #[tokio::test] - async fn rocksdb_storage_syncing_with_postgres() { - let pool = ConnectionPool::test_pool().await; - let mut conn = pool.access_storage().await.unwrap(); - prepare_postgres(&mut conn).await; - let storage_logs = gen_storage_logs(20..40); - create_miniblock(&mut conn, MiniblockNumber(1), storage_logs.clone()).await; - create_l1_batch(&mut conn, L1BatchNumber(1), &storage_logs).await; - - let dir = TempDir::new().expect("cannot create temporary dir for state keeper"); - let mut storage = RocksdbStorage::new(dir.path()); - storage.update_from_postgres(&mut conn).await; - - assert_eq!(storage.l1_batch_number(), L1BatchNumber(2)); - for log in &storage_logs { - assert_eq!(storage.read_value(&log.key), log.value); - } - } - - async fn insert_factory_deps( - conn: &mut StorageProcessor<'_>, - miniblock_number: MiniblockNumber, - indices: impl Iterator, - ) { - let factory_deps = indices - .map(|i| (H256::repeat_byte(i), vec![i; 64])) - .collect(); - conn.storage_dal() - .insert_factory_deps(miniblock_number, &factory_deps) - .await; - } - - #[tokio::test] - async fn rocksdb_storage_revert() { - let pool = ConnectionPool::test_pool().await; - let mut conn = pool.access_storage().await.unwrap(); - prepare_postgres(&mut conn).await; - let storage_logs = gen_storage_logs(20..40); - create_miniblock(&mut conn, MiniblockNumber(1), storage_logs[..10].to_vec()).await; - insert_factory_deps(&mut conn, MiniblockNumber(1), 0..1).await; - create_miniblock(&mut conn, MiniblockNumber(2), storage_logs[10..].to_vec()).await; - insert_factory_deps(&mut conn, MiniblockNumber(2), 1..3).await; - create_l1_batch(&mut conn, L1BatchNumber(1), &storage_logs).await; - - let inserted_storage_logs = gen_storage_logs(50..60); - let replaced_storage_logs: Vec<_> = storage_logs - .iter() - .step_by(2) - .map(|&log| StorageLog { - value: H256::repeat_byte(0xf0), - ..log - }) - .collect(); - - let mut new_storage_logs = inserted_storage_logs.clone(); - new_storage_logs.extend_from_slice(&replaced_storage_logs); - create_miniblock(&mut conn, MiniblockNumber(3), new_storage_logs).await; - insert_factory_deps(&mut conn, MiniblockNumber(3), 3..5).await; - create_l1_batch(&mut conn, L1BatchNumber(2), &inserted_storage_logs).await; - - let dir = TempDir::new().expect("cannot create temporary dir for state keeper"); - let mut storage = RocksdbStorage::new(dir.path()); - storage.update_from_postgres(&mut conn).await; - - // Perform some sanity checks before the revert. - assert_eq!(storage.l1_batch_number(), L1BatchNumber(3)); - { - for log in &inserted_storage_logs { - assert_eq!(storage.read_value(&log.key), log.value); - } - for log in &replaced_storage_logs { - assert_eq!(storage.read_value(&log.key), log.value); - } - - for i in 0..5 { - assert_eq!( - storage.load_factory_dep(H256::repeat_byte(i)).unwrap(), - [i; 64] - ); - } - } - - storage.rollback(&mut conn, L1BatchNumber(1)).await; - assert_eq!(storage.l1_batch_number(), L1BatchNumber(2)); - { - for log in &inserted_storage_logs { - assert_eq!(storage.read_value(&log.key), H256::zero()); - } - for log in &replaced_storage_logs { - assert_ne!(storage.read_value(&log.key), log.value); - } - - for i in 0..3 { - assert_eq!( - storage.load_factory_dep(H256::repeat_byte(i)).unwrap(), - [i; 64] - ); - } - for i in 3..5 { - assert!(storage.load_factory_dep(H256::repeat_byte(i)).is_none()); - } - } - } - - #[tokio::test] - async fn rocksdb_enum_index_migration() { - let pool = ConnectionPool::test_pool().await; - let mut conn = pool.access_storage().await.unwrap(); - prepare_postgres(&mut conn).await; - let storage_logs = gen_storage_logs(20..40); - create_miniblock(&mut conn, MiniblockNumber(1), storage_logs.clone()).await; - create_l1_batch(&mut conn, L1BatchNumber(1), &storage_logs).await; - - let enum_indices: HashMap<_, _> = conn - .storage_logs_dedup_dal() - .initial_writes_for_batch(L1BatchNumber(1)) - .await - .into_iter() - .collect(); - - let dir = TempDir::new().expect("cannot create temporary dir for state keeper"); - let mut storage = RocksdbStorage::new(dir.path()); - storage.update_from_postgres(&mut conn).await; - - assert_eq!(storage.l1_batch_number(), L1BatchNumber(2)); - // Check that enum indices are correct after syncing with Postgres. - for log in &storage_logs { - let expected_index = enum_indices[&log.key.hashed_key()]; - assert_eq!( - storage.read_state_value(&log.key).unwrap().enum_index, - Some(expected_index) - ); - } - - // Remove enum indices for some keys. - let mut write_batch = storage.db.new_write_batch(); - for log in &storage_logs { - write_batch.put_cf( - StateKeeperColumnFamily::State, - log.key.hashed_key().as_bytes(), - log.value.as_bytes(), - ); - write_batch.delete_cf( - StateKeeperColumnFamily::State, - RocksdbStorage::ENUM_INDEX_MIGRATION_CURSOR, - ); - } - storage.db.write(write_batch).unwrap(); - - // Check that migration works as expected. - let ordered_keys_to_migrate: Vec = storage_logs - .iter() - .map(|log| log.key) - .sorted_by_key(StorageKey::hashed_key) - .collect(); - - storage.enable_enum_index_migration(10); - let start_from = storage.enum_migration_start_from(); - assert_eq!(start_from, Some(H256::zero())); - - // Migrate the first half. - storage.save_missing_enum_indices(&mut conn).await; - for key in ordered_keys_to_migrate.iter().take(10) { - let expected_index = enum_indices[&key.hashed_key()]; - assert_eq!( - storage.read_state_value(key).unwrap().enum_index, - Some(expected_index) - ); - } - assert!(storage - .read_state_value(&ordered_keys_to_migrate[10]) - .unwrap() - .enum_index - .is_none()); - - // Migrate the second half. - storage.save_missing_enum_indices(&mut conn).await; - for key in ordered_keys_to_migrate.iter().skip(10) { - let expected_index = enum_indices[&key.hashed_key()]; - assert_eq!( - storage.read_state_value(key).unwrap().enum_index, - Some(expected_index) - ); - } - - // 20 keys were processed but we haven't checked that no keys to migrate are left. - let start_from = storage.enum_migration_start_from(); - assert!(start_from.is_some()); - - // Check that migration will be marked as completed after the next iteration. - storage.save_missing_enum_indices(&mut conn).await; - let start_from = storage.enum_migration_start_from(); - assert!(start_from.is_none()); - } -} diff --git a/core/lib/state/src/rocksdb/recovery.rs b/core/lib/state/src/rocksdb/recovery.rs new file mode 100644 index 000000000000..dae4ae144be0 --- /dev/null +++ b/core/lib/state/src/rocksdb/recovery.rs @@ -0,0 +1,254 @@ +//! Logic for [`RocksdbStorage`] related to snapshot recovery. + +use std::ops; + +use anyhow::Context as _; +use tokio::sync::watch; +use zksync_dal::{storage_logs_dal::StorageRecoveryLogEntry, StorageProcessor}; +use zksync_types::{ + snapshots::{uniform_hashed_keys_chunk, SnapshotRecoveryStatus}, + L1BatchNumber, MiniblockNumber, H256, +}; + +use super::{ + metrics::{ChunkRecoveryStage, RecoveryStage, RECOVERY_METRICS}, + RocksdbStorage, RocksdbSyncError, StateValue, +}; + +#[derive(Debug)] +struct KeyChunk { + id: u64, + key_range: ops::RangeInclusive, + start_entry: Option, +} + +impl RocksdbStorage { + /// Ensures that this storage is ready for normal operation (i.e., updates by L1 batch). + /// + /// # Return value + /// + /// Returns the next L1 batch that should be fed to the storage. + pub(super) async fn ensure_ready( + &mut self, + storage: &mut StorageProcessor<'_>, + desired_log_chunk_size: u64, + stop_receiver: &watch::Receiver, + ) -> Result { + if let Some(number) = self.l1_batch_number().await { + return Ok(number); + } + + // Check whether we need to perform a snapshot migration. + let snapshot_recovery = storage + .snapshot_recovery_dal() + .get_applied_snapshot_status() + .await + .context("failed getting snapshot recovery info")?; + Ok(if let Some(snapshot_recovery) = snapshot_recovery { + self.recover_from_snapshot( + storage, + &snapshot_recovery, + desired_log_chunk_size, + stop_receiver, + ) + .await?; + snapshot_recovery.l1_batch_number + 1 + } else { + // No recovery snapshot; we're initializing the cache from the genesis + L1BatchNumber(0) + }) + } + + /// # Important + /// + /// `Self::L1_BATCH_NUMBER_KEY` must be set at the very end of the process. If it is set earlier, recovery is not fault-tolerant + /// (it would be considered complete even if it failed in the middle). + async fn recover_from_snapshot( + &mut self, + storage: &mut StorageProcessor<'_>, + snapshot_recovery: &SnapshotRecoveryStatus, + desired_log_chunk_size: u64, + stop_receiver: &watch::Receiver, + ) -> Result<(), RocksdbSyncError> { + if *stop_receiver.borrow() { + return Err(RocksdbSyncError::Interrupted); + } + tracing::info!("Recovering secondary storage from snapshot: {snapshot_recovery:?}"); + + self.recover_factory_deps(storage, snapshot_recovery) + .await?; + + if *stop_receiver.borrow() { + return Err(RocksdbSyncError::Interrupted); + } + let key_chunks = + Self::load_key_chunks(storage, snapshot_recovery, desired_log_chunk_size).await?; + + RECOVERY_METRICS.recovered_chunk_count.set(0); + for key_chunk in key_chunks { + if *stop_receiver.borrow() { + return Err(RocksdbSyncError::Interrupted); + } + + let chunk_id = key_chunk.id; + let Some(chunk_start) = key_chunk.start_entry else { + tracing::info!("Chunk {chunk_id} (hashed key range {key_chunk:?}) doesn't have entries in Postgres; skipping"); + RECOVERY_METRICS.recovered_chunk_count.inc_by(1); + continue; + }; + + // Check whether the chunk is already recovered. + let state_value = self.read_state_value_async(chunk_start.key).await; + if let Some(state_value) = state_value { + if state_value.value != chunk_start.value + || state_value.enum_index != Some(chunk_start.leaf_index) + { + let err = anyhow::anyhow!( + "Mismatch between entry for key {:?} in Postgres snapshot for miniblock #{} \ + ({chunk_start:?}) and RocksDB cache ({state_value:?}); the recovery procedure may be corrupted", + chunk_start.key, + snapshot_recovery.miniblock_number + ); + return Err(err.into()); + } + tracing::info!("Chunk {chunk_id} (hashed key range {key_chunk:?}) is already recovered; skipping"); + } else { + self.recover_logs_chunk( + storage, + snapshot_recovery.miniblock_number, + key_chunk.key_range.clone(), + ) + .await + .with_context(|| { + format!( + "failed recovering logs chunk {chunk_id} (hashed key range {:?})", + key_chunk.key_range + ) + })?; + + #[cfg(test)] + (self.listener.on_logs_chunk_recovered)(chunk_id); + } + RECOVERY_METRICS.recovered_chunk_count.inc_by(1); + } + + tracing::info!("All chunks recovered; finalizing recovery process"); + self.save(Some(snapshot_recovery.l1_batch_number + 1)) + .await?; + Ok(()) + } + + async fn recover_factory_deps( + &mut self, + storage: &mut StorageProcessor<'_>, + snapshot_recovery: &SnapshotRecoveryStatus, + ) -> anyhow::Result<()> { + // We don't expect that many factory deps; that's why we recover factory deps in any case. + let latency = RECOVERY_METRICS.latency[&RecoveryStage::LoadFactoryDeps].start(); + let factory_deps = storage + .snapshots_creator_dal() + .get_all_factory_deps(snapshot_recovery.miniblock_number) + .await + .context("Failed getting factory dependencies")?; + let latency = latency.observe(); + tracing::info!( + "Loaded {} factory dependencies from the snapshot in {latency:?}", + factory_deps.len() + ); + + let latency = RECOVERY_METRICS.latency[&RecoveryStage::SaveFactoryDeps].start(); + for (bytecode_hash, bytecode) in factory_deps { + self.store_factory_dep(bytecode_hash, bytecode); + } + self.save(None) + .await + .context("failed saving factory deps")?; + let latency = latency.observe(); + tracing::info!("Saved factory dependencies to RocksDB in {latency:?}"); + Ok(()) + } + + async fn load_key_chunks( + storage: &mut StorageProcessor<'_>, + snapshot_recovery: &SnapshotRecoveryStatus, + desired_log_chunk_size: u64, + ) -> anyhow::Result> { + let snapshot_miniblock = snapshot_recovery.miniblock_number; + let log_count = storage + .storage_logs_dal() + .count_miniblock_storage_logs(snapshot_miniblock) + .await + .with_context(|| { + format!("Failed getting number of logs for miniblock #{snapshot_miniblock}") + })?; + let chunk_count = log_count.div_ceil(desired_log_chunk_size); + tracing::info!( + "Estimated the number of chunks for recovery based on {log_count} logs: {chunk_count}" + ); + + let latency = RECOVERY_METRICS.latency[&RecoveryStage::LoadChunkStarts].start(); + let key_chunks: Vec<_> = (0..chunk_count) + .map(|chunk_id| uniform_hashed_keys_chunk(chunk_id, chunk_count)) + .collect(); + let chunk_starts = storage + .storage_logs_dal() + .get_chunk_starts_for_miniblock(snapshot_miniblock, &key_chunks) + .await + .context("Failed getting chunk starts")?; + let latency = latency.observe(); + tracing::info!("Loaded {chunk_count} chunk starts in {latency:?}"); + + let key_chunks = (0..chunk_count) + .zip(key_chunks) + .zip(chunk_starts) + .map(|((id, key_range), start_entry)| KeyChunk { + id, + key_range, + start_entry, + }) + .collect(); + Ok(key_chunks) + } + + async fn read_state_value_async(&self, hashed_key: H256) -> Option { + let db = self.db.clone(); + tokio::task::spawn_blocking(move || Self::read_state_value(&db, hashed_key)) + .await + .unwrap() + } + + async fn recover_logs_chunk( + &mut self, + storage: &mut StorageProcessor<'_>, + snapshot_miniblock: MiniblockNumber, + key_chunk: ops::RangeInclusive, + ) -> anyhow::Result<()> { + let latency = RECOVERY_METRICS.chunk_latency[&ChunkRecoveryStage::LoadEntries].start(); + let all_entries = storage + .storage_logs_dal() + .get_tree_entries_for_miniblock(snapshot_miniblock, key_chunk.clone()) + .await + .with_context(|| { + format!("Failed getting entries for chunk {key_chunk:?} in snapshot for miniblock #{snapshot_miniblock}") + })?; + let latency = latency.observe(); + tracing::debug!( + "Loaded {} log entries for chunk {key_chunk:?} in {latency:?}", + all_entries.len() + ); + + let latency = RECOVERY_METRICS.chunk_latency[&ChunkRecoveryStage::SaveEntries].start(); + self.pending_patch.state = all_entries + .into_iter() + .map(|entry| (entry.key, (entry.value, entry.leaf_index))) + .collect(); + self.save(None) + .await + .context("failed saving storage logs chunk")?; + let latency = latency.observe(); + tracing::debug!("Saved logs chunk {key_chunk:?} to RocksDB in {latency:?}"); + + tracing::info!("Recovered hashed key chunk {key_chunk:?}"); + Ok(()) + } +} diff --git a/core/lib/state/src/rocksdb/tests.rs b/core/lib/state/src/rocksdb/tests.rs new file mode 100644 index 000000000000..0d127d79c901 --- /dev/null +++ b/core/lib/state/src/rocksdb/tests.rs @@ -0,0 +1,493 @@ +//! Tests for [`RocksdbStorage`]. + +use std::fmt; + +use assert_matches::assert_matches; +use tempfile::TempDir; +use test_casing::test_casing; +use zksync_dal::ConnectionPool; +use zksync_types::{MiniblockNumber, StorageLog}; + +use super::*; +use crate::test_utils::{ + create_l1_batch, create_miniblock, gen_storage_logs, prepare_postgres, + prepare_postgres_for_snapshot_recovery, +}; + +pub(super) struct RocksdbStorageEventListener { + /// Called when an L1 batch is synced. + pub on_l1_batch_synced: Box, + /// Called when an storage logs chunk is recovered from a snapshot. + pub on_logs_chunk_recovered: Box, +} + +impl fmt::Debug for RocksdbStorageEventListener { + fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { + formatter + .debug_struct("RocksdbStorageEventListener") + .finish_non_exhaustive() + } +} + +impl Default for RocksdbStorageEventListener { + fn default() -> Self { + Self { + on_l1_batch_synced: Box::new(|_| { /* do nothing */ }), + on_logs_chunk_recovered: Box::new(|_| { /* do nothing */ }), + } + } +} + +#[tokio::test] +async fn rocksdb_storage_basics() { + let dir = TempDir::new().expect("cannot create temporary dir for state keeper"); + let mut storage = RocksdbStorage::new(dir.path().to_path_buf()).await.unwrap(); + let mut storage_logs: HashMap<_, _> = gen_storage_logs(0..20) + .into_iter() + .map(|log| (log.key, log.value)) + .collect(); + let changed_keys = RocksdbStorage::process_transaction_logs(&storage.db, storage_logs.clone()); + storage.pending_patch.state = changed_keys + .into_iter() + .map(|(key, state_value)| (key.hashed_key(), (state_value.value, 1))) // enum index doesn't matter in the test + .collect(); + storage.save(Some(L1BatchNumber(0))).await.unwrap(); + { + for (key, value) in &storage_logs { + assert!(!storage.is_write_initial(key)); + assert_eq!(storage.read_value(key), *value); + } + } + + // Overwrite some of the logs. + for log in storage_logs.values_mut().step_by(2) { + *log = StorageValue::zero(); + } + let changed_keys = RocksdbStorage::process_transaction_logs(&storage.db, storage_logs.clone()); + storage.pending_patch.state = changed_keys + .into_iter() + .map(|(key, state_value)| (key.hashed_key(), (state_value.value, 1))) // enum index doesn't matter in the test + .collect(); + storage.save(Some(L1BatchNumber(1))).await.unwrap(); + + for (key, value) in &storage_logs { + assert!(!storage.is_write_initial(key)); + assert_eq!(storage.read_value(key), *value); + } +} + +async fn sync_test_storage(dir: &TempDir, conn: &mut StorageProcessor<'_>) -> RocksdbStorage { + let (_stop_sender, stop_receiver) = watch::channel(false); + RocksdbStorage::builder(dir.path()) + .await + .expect("Failed initializing RocksDB") + .synchronize(conn, &stop_receiver) + .await + .unwrap() + .expect("Storage synchronization unexpectedly stopped") +} + +#[tokio::test] +async fn rocksdb_storage_syncing_with_postgres() { + let pool = ConnectionPool::test_pool().await; + let mut conn = pool.access_storage().await.unwrap(); + prepare_postgres(&mut conn).await; + let storage_logs = gen_storage_logs(20..40); + create_miniblock(&mut conn, MiniblockNumber(1), storage_logs.clone()).await; + create_l1_batch(&mut conn, L1BatchNumber(1), &storage_logs).await; + + let dir = TempDir::new().expect("cannot create temporary dir for state keeper"); + let mut storage = sync_test_storage(&dir, &mut conn).await; + + assert_eq!(storage.l1_batch_number().await, Some(L1BatchNumber(2))); + for log in &storage_logs { + assert_eq!(storage.read_value(&log.key), log.value); + } +} + +#[tokio::test] +async fn rocksdb_storage_syncing_fault_tolerance() { + let pool = ConnectionPool::test_pool().await; + let mut conn = pool.access_storage().await.unwrap(); + prepare_postgres(&mut conn).await; + let storage_logs = gen_storage_logs(100..200); + for (i, block_logs) in storage_logs.chunks(20).enumerate() { + let number = u32::try_from(i).unwrap() + 1; + create_miniblock(&mut conn, MiniblockNumber(number), block_logs.to_vec()).await; + create_l1_batch(&mut conn, L1BatchNumber(number), block_logs).await; + } + + let dir = TempDir::new().expect("cannot create temporary dir for state keeper"); + let (stop_sender, stop_receiver) = watch::channel(false); + let mut storage = RocksdbStorage::builder(dir.path()) + .await + .expect("Failed initializing RocksDB"); + let mut expected_l1_batch_number = L1BatchNumber(0); + storage.0.listener.on_l1_batch_synced = Box::new(move |number| { + assert_eq!(number, expected_l1_batch_number); + expected_l1_batch_number += 1; + if number == L1BatchNumber(2) { + stop_sender.send_replace(true); + } + }); + let storage = storage + .synchronize(&mut conn, &stop_receiver) + .await + .unwrap(); + assert!(storage.is_none()); + + // Resume storage syncing and check that it completes. + let storage = RocksdbStorage::builder(dir.path()) + .await + .expect("Failed initializing RocksDB"); + assert_eq!(storage.l1_batch_number().await, Some(L1BatchNumber(3))); + + let (_stop_sender, stop_receiver) = watch::channel(false); + let mut storage = storage + .synchronize(&mut conn, &stop_receiver) + .await + .unwrap() + .expect("Storage synchronization unexpectedly stopped"); + assert_eq!(storage.l1_batch_number().await, Some(L1BatchNumber(6))); + for log in &storage_logs { + assert_eq!(storage.read_value(&log.key), log.value); + assert!(!storage.is_write_initial(&log.key)); + } +} + +async fn insert_factory_deps( + conn: &mut StorageProcessor<'_>, + miniblock_number: MiniblockNumber, + indices: impl Iterator, +) { + let factory_deps = indices + .map(|i| (H256::repeat_byte(i), vec![i; 64])) + .collect(); + conn.storage_dal() + .insert_factory_deps(miniblock_number, &factory_deps) + .await; +} + +#[tokio::test] +async fn rocksdb_storage_revert() { + let pool = ConnectionPool::test_pool().await; + let mut conn = pool.access_storage().await.unwrap(); + prepare_postgres(&mut conn).await; + let storage_logs = gen_storage_logs(20..40); + create_miniblock(&mut conn, MiniblockNumber(1), storage_logs[..10].to_vec()).await; + insert_factory_deps(&mut conn, MiniblockNumber(1), 0..1).await; + create_miniblock(&mut conn, MiniblockNumber(2), storage_logs[10..].to_vec()).await; + insert_factory_deps(&mut conn, MiniblockNumber(2), 1..3).await; + create_l1_batch(&mut conn, L1BatchNumber(1), &storage_logs).await; + + let inserted_storage_logs = gen_storage_logs(50..60); + let replaced_storage_logs: Vec<_> = storage_logs + .iter() + .step_by(2) + .map(|&log| StorageLog { + value: H256::repeat_byte(0xf0), + ..log + }) + .collect(); + + let mut new_storage_logs = inserted_storage_logs.clone(); + new_storage_logs.extend_from_slice(&replaced_storage_logs); + create_miniblock(&mut conn, MiniblockNumber(3), new_storage_logs).await; + insert_factory_deps(&mut conn, MiniblockNumber(3), 3..5).await; + create_l1_batch(&mut conn, L1BatchNumber(2), &inserted_storage_logs).await; + + let dir = TempDir::new().expect("cannot create temporary dir for state keeper"); + let mut storage = sync_test_storage(&dir, &mut conn).await; + + // Perform some sanity checks before the revert. + assert_eq!(storage.l1_batch_number().await, Some(L1BatchNumber(3))); + { + for log in &inserted_storage_logs { + assert_eq!(storage.read_value(&log.key), log.value); + } + for log in &replaced_storage_logs { + assert_eq!(storage.read_value(&log.key), log.value); + } + + for i in 0..5 { + assert_eq!( + storage.load_factory_dep(H256::repeat_byte(i)).unwrap(), + [i; 64] + ); + } + } + + storage.rollback(&mut conn, L1BatchNumber(1)).await.unwrap(); + assert_eq!(storage.l1_batch_number().await, Some(L1BatchNumber(2))); + { + for log in &inserted_storage_logs { + assert_eq!(storage.read_value(&log.key), H256::zero()); + } + for log in &replaced_storage_logs { + assert_ne!(storage.read_value(&log.key), log.value); + } + + for i in 0..3 { + assert_eq!( + storage.load_factory_dep(H256::repeat_byte(i)).unwrap(), + [i; 64] + ); + } + for i in 3..5 { + assert!(storage.load_factory_dep(H256::repeat_byte(i)).is_none()); + } + } +} + +#[tokio::test] +async fn rocksdb_enum_index_migration() { + let pool = ConnectionPool::test_pool().await; + let mut conn = pool.access_storage().await.unwrap(); + prepare_postgres(&mut conn).await; + let storage_logs = gen_storage_logs(20..40); + create_miniblock(&mut conn, MiniblockNumber(1), storage_logs.clone()).await; + create_l1_batch(&mut conn, L1BatchNumber(1), &storage_logs).await; + + let enum_indices: HashMap<_, _> = conn + .storage_logs_dedup_dal() + .initial_writes_for_batch(L1BatchNumber(1)) + .await + .into_iter() + .collect(); + + let dir = TempDir::new().expect("cannot create temporary dir for state keeper"); + let mut storage = sync_test_storage(&dir, &mut conn).await; + + assert_eq!(storage.l1_batch_number().await, Some(L1BatchNumber(2))); + // Check that enum indices are correct after syncing with Postgres. + for log in &storage_logs { + let expected_index = enum_indices[&log.key.hashed_key()]; + assert_eq!( + storage.get_enumeration_index(&log.key), + Some(expected_index) + ); + } + + // Remove enum indices for some keys. + let mut write_batch = storage.db.new_write_batch(); + for log in &storage_logs { + write_batch.put_cf( + StateKeeperColumnFamily::State, + log.key.hashed_key().as_bytes(), + log.value.as_bytes(), + ); + write_batch.delete_cf( + StateKeeperColumnFamily::State, + RocksdbStorage::ENUM_INDEX_MIGRATION_CURSOR, + ); + } + storage.db.write(write_batch).unwrap(); + + // Check that migration works as expected. + let ordered_keys_to_migrate: Vec = storage_logs + .iter() + .map(|log| log.key) + .sorted_by_key(StorageKey::hashed_key) + .collect(); + + storage.enum_index_migration_chunk_size = 10; + let start_from = storage.enum_migration_start_from().await; + assert_eq!(start_from, Some(H256::zero())); + + // Migrate the first half. + storage.save_missing_enum_indices(&mut conn).await.unwrap(); + for key in ordered_keys_to_migrate.iter().take(10) { + let expected_index = enum_indices[&key.hashed_key()]; + assert_eq!(storage.get_enumeration_index(key), Some(expected_index)); + } + let non_migrated_state_value = + RocksdbStorage::read_state_value(&storage.db, ordered_keys_to_migrate[10].hashed_key()) + .unwrap(); + assert!(non_migrated_state_value.enum_index.is_none()); + + // Migrate the second half. + storage.save_missing_enum_indices(&mut conn).await.unwrap(); + for key in ordered_keys_to_migrate.iter().skip(10) { + let expected_index = enum_indices[&key.hashed_key()]; + assert_eq!(storage.get_enumeration_index(key), Some(expected_index)); + } + + // 20 keys were processed but we haven't checked that no keys to migrate are left. + let start_from = storage.enum_migration_start_from().await; + assert!(start_from.is_some()); + + // Check that migration will be marked as completed after the next iteration. + storage.save_missing_enum_indices(&mut conn).await.unwrap(); + let start_from = storage.enum_migration_start_from().await; + assert!(start_from.is_none()); +} + +#[test_casing(4, [RocksdbStorage::DESIRED_LOG_CHUNK_SIZE, 20, 5, 1])] +#[tokio::test] +async fn low_level_snapshot_recovery(log_chunk_size: u64) { + let pool = ConnectionPool::test_pool().await; + let mut conn = pool.access_storage().await.unwrap(); + let (snapshot_recovery, mut storage_logs) = + prepare_postgres_for_snapshot_recovery(&mut conn).await; + + let dir = TempDir::new().expect("cannot create temporary dir for state keeper"); + let mut storage = RocksdbStorage::new(dir.path().to_path_buf()).await.unwrap(); + let (_stop_sender, stop_receiver) = watch::channel(false); + let next_l1_batch = storage + .ensure_ready(&mut conn, log_chunk_size, &stop_receiver) + .await + .unwrap(); + assert_eq!(next_l1_batch, snapshot_recovery.l1_batch_number + 1); + assert_eq!( + storage.l1_batch_number().await, + Some(snapshot_recovery.l1_batch_number + 1) + ); + + // Sort logs in the same order as enum indices are assigned (by full `StorageKey`). + storage_logs.sort_unstable_by_key(|log| log.key); + for (i, log) in storage_logs.iter().enumerate() { + assert_eq!(storage.read_value(&log.key), log.value); + let expected_index = i as u64 + 1; + assert_eq!( + storage.get_enumeration_index(&log.key), + Some(expected_index) + ); + } +} + +#[tokio::test] +async fn recovering_factory_deps_from_snapshot() { + let pool = ConnectionPool::test_pool().await; + let mut conn = pool.access_storage().await.unwrap(); + let (snapshot_recovery, _) = prepare_postgres_for_snapshot_recovery(&mut conn).await; + + let mut all_factory_deps = HashMap::new(); + for number in 0..snapshot_recovery.miniblock_number.0 { + let bytecode_hash = H256::from_low_u64_be(number.into()); + let bytecode = vec![u8::try_from(number).unwrap(); 1_024]; + all_factory_deps.insert(bytecode_hash, bytecode.clone()); + + let number = MiniblockNumber(number); + // FIXME (PLA-589): don't store miniblocks once the corresponding foreign keys are removed + create_miniblock(&mut conn, number, vec![]).await; + conn.storage_dal() + .insert_factory_deps(number, &HashMap::from([(bytecode_hash, bytecode)])) + .await; + } + + let dir = TempDir::new().expect("cannot create temporary dir for state keeper"); + let mut storage = sync_test_storage(&dir, &mut conn).await; + + for (bytecode_hash, bytecode) in &all_factory_deps { + assert_eq!(storage.load_factory_dep(*bytecode_hash).unwrap(), *bytecode); + } +} + +#[tokio::test] +async fn recovering_from_snapshot_and_following_logs() { + let pool = ConnectionPool::test_pool().await; + let mut conn = pool.access_storage().await.unwrap(); + let (snapshot_recovery, mut storage_logs) = + prepare_postgres_for_snapshot_recovery(&mut conn).await; + + // Add some more storage logs. + let new_storage_logs = gen_storage_logs(500..600); + create_miniblock( + &mut conn, + snapshot_recovery.miniblock_number + 1, + new_storage_logs.clone(), + ) + .await; + create_l1_batch( + &mut conn, + snapshot_recovery.l1_batch_number + 1, + &new_storage_logs, + ) + .await; + + let updated_storage_logs: Vec<_> = storage_logs + .iter() + .step_by(3) + .copied() + .map(|mut log| { + log.value = H256::repeat_byte(0xff); + log + }) + .collect(); + create_miniblock( + &mut conn, + snapshot_recovery.miniblock_number + 2, + updated_storage_logs.clone(), + ) + .await; + create_l1_batch(&mut conn, snapshot_recovery.l1_batch_number + 2, &[]).await; + + let dir = TempDir::new().expect("cannot create temporary dir for state keeper"); + let mut storage = sync_test_storage(&dir, &mut conn).await; + + for (i, log) in new_storage_logs.iter().enumerate() { + assert_eq!(storage.read_value(&log.key), log.value); + let expected_index = (i + storage_logs.len()) as u64 + 1; + assert_eq!( + storage.get_enumeration_index(&log.key), + Some(expected_index) + ); + assert!(!storage.is_write_initial(&log.key)); + } + + for log in &updated_storage_logs { + assert_eq!(storage.read_value(&log.key), log.value); + assert!(storage.get_enumeration_index(&log.key).unwrap() <= storage_logs.len() as u64); + } + storage_logs.sort_unstable_by_key(|log| log.key); + for (i, log) in storage_logs.iter().enumerate() { + let expected_index = i as u64 + 1; + assert_eq!( + storage.get_enumeration_index(&log.key), + Some(expected_index) + ); + assert!(!storage.is_write_initial(&log.key)); + } +} + +#[tokio::test] +async fn recovery_fault_tolerance() { + let pool = ConnectionPool::test_pool().await; + let mut conn = pool.access_storage().await.unwrap(); + let (_, storage_logs) = prepare_postgres_for_snapshot_recovery(&mut conn).await; + let log_chunk_size = storage_logs.len() as u64 / 5; + + let dir = TempDir::new().expect("cannot create temporary dir for state keeper"); + let mut storage = RocksdbStorage::new(dir.path().to_path_buf()).await.unwrap(); + let (stop_sender, stop_receiver) = watch::channel(false); + let mut synced_chunk_count = 0_u64; + storage.listener.on_logs_chunk_recovered = Box::new(move |chunk_id| { + assert_eq!(chunk_id, synced_chunk_count); + synced_chunk_count += 1; + if synced_chunk_count == 2 { + stop_sender.send_replace(true); + } + }); + + let err = storage + .ensure_ready(&mut conn, log_chunk_size, &stop_receiver) + .await + .unwrap_err(); + assert_matches!(err, RocksdbSyncError::Interrupted); + drop(storage); + + // Resume recovery and check that no chunks are recovered twice. + let (_stop_sender, stop_receiver) = watch::channel(false); + let mut storage = RocksdbStorage::new(dir.path().to_path_buf()).await.unwrap(); + storage.listener.on_logs_chunk_recovered = Box::new(|chunk_id| { + assert!(chunk_id >= 2); + }); + storage + .ensure_ready(&mut conn, log_chunk_size, &stop_receiver) + .await + .unwrap(); + for log in &storage_logs { + assert_eq!(storage.read_value(&log.key), log.value); + assert!(!storage.is_write_initial(&log.key)); + } +} diff --git a/core/lib/state/src/test_utils.rs b/core/lib/state/src/test_utils.rs index 340f2ea62237..87157d05fa98 100644 --- a/core/lib/state/src/test_utils.rs +++ b/core/lib/state/src/test_utils.rs @@ -5,6 +5,7 @@ use std::ops; use zksync_dal::StorageProcessor; use zksync_types::{ block::{BlockGasCount, L1BatchHeader, MiniblockHeader}, + snapshots::SnapshotRecoveryStatus, AccountTreeId, Address, L1BatchNumber, MiniblockNumber, ProtocolVersion, StorageKey, StorageLog, H256, }; @@ -119,3 +120,40 @@ pub(crate) async fn create_l1_batch( .insert_initial_writes(l1_batch_number, &written_keys) .await; } + +pub(crate) async fn prepare_postgres_for_snapshot_recovery( + conn: &mut StorageProcessor<'_>, +) -> (SnapshotRecoveryStatus, Vec) { + conn.protocol_versions_dal() + .save_protocol_version_with_tx(ProtocolVersion::default()) + .await; + + let snapshot_recovery = SnapshotRecoveryStatus { + l1_batch_number: L1BatchNumber(23), + l1_batch_root_hash: H256::zero(), // not used + miniblock_number: MiniblockNumber(42), + miniblock_root_hash: H256::zero(), // not used + last_finished_chunk_id: None, + total_chunk_count: 100, + }; + conn.snapshot_recovery_dal() + .set_applied_snapshot_status(&snapshot_recovery) + .await + .unwrap(); + + // FIXME (PLA-589): don't store miniblock / L1 batch once the corresponding foreign keys are removed + let snapshot_storage_logs = gen_storage_logs(100..200); + create_miniblock( + conn, + snapshot_recovery.miniblock_number, + snapshot_storage_logs.clone(), + ) + .await; + create_l1_batch( + conn, + snapshot_recovery.l1_batch_number, + &snapshot_storage_logs, + ) + .await; + (snapshot_recovery, snapshot_storage_logs) +} diff --git a/core/lib/storage/src/db.rs b/core/lib/storage/src/db.rs index 24502493a60d..e8402b96c62a 100644 --- a/core/lib/storage/src/db.rs +++ b/core/lib/storage/src/db.rs @@ -298,11 +298,11 @@ pub struct RocksDB { } impl RocksDB { - pub fn new(path: &Path) -> Self { + pub fn new(path: &Path) -> Result { Self::with_options(path, RocksDBOptions::default()) } - pub fn with_options(path: &Path, options: RocksDBOptions) -> Self { + pub fn with_options(path: &Path, options: RocksDBOptions) -> Result { let caches = RocksDBCaches::new(options.block_cache_capacity); let db_options = Self::rocksdb_options(None, None); let existing_cfs = DB::list_cf(&db_options, path).unwrap_or_else(|err| { @@ -354,7 +354,7 @@ impl RocksDB { ColumnFamilyDescriptor::new(cf_name, cf_options) }); - let db = DB::open_cf_descriptors(&db_options, path, cfs).expect("failed to init rocksdb"); + let db = DB::open_cf_descriptors(&db_options, path, cfs)?; let inner = Arc::new(RocksDBInner { db, db_name: CF::DB_NAME, @@ -371,12 +371,12 @@ impl RocksDB { ); inner.wait_for_writes_to_resume(&options.stalled_writes_retries); - Self { + Ok(Self { inner, sync_writes: false, stalled_writes_retries: options.stalled_writes_retries, _cf: PhantomData, - } + }) } /// Switches on sync writes in [`Self::write()`] and [`Self::put()`]. This has a performance @@ -665,13 +665,15 @@ mod tests { #[test] fn changing_column_families() { let temp_dir = TempDir::new().unwrap(); - let db = RocksDB::::new(temp_dir.path()).with_sync_writes(); + let db = RocksDB::::new(temp_dir.path()) + .unwrap() + .with_sync_writes(); let mut batch = db.new_write_batch(); batch.put_cf(OldColumnFamilies::Default, b"test", b"value"); db.write(batch).unwrap(); drop(db); - let db = RocksDB::::new(temp_dir.path()); + let db = RocksDB::::new(temp_dir.path()).unwrap(); let value = db.get_cf(NewColumnFamilies::Default, b"test").unwrap(); assert_eq!(value.unwrap(), b"value"); } @@ -691,13 +693,15 @@ mod tests { #[test] fn default_column_family_does_not_need_to_be_explicitly_opened() { let temp_dir = TempDir::new().unwrap(); - let db = RocksDB::::new(temp_dir.path()).with_sync_writes(); + let db = RocksDB::::new(temp_dir.path()) + .unwrap() + .with_sync_writes(); let mut batch = db.new_write_batch(); batch.put_cf(OldColumnFamilies::Junk, b"test", b"value"); db.write(batch).unwrap(); drop(db); - let db = RocksDB::::new(temp_dir.path()); + let db = RocksDB::::new(temp_dir.path()).unwrap(); let value = db.get_cf(JunkColumnFamily, b"test").unwrap(); assert_eq!(value.unwrap(), b"value"); } @@ -705,7 +709,9 @@ mod tests { #[test] fn write_batch_can_be_restored_from_bytes() { let temp_dir = TempDir::new().unwrap(); - let db = RocksDB::::new(temp_dir.path()).with_sync_writes(); + let db = RocksDB::::new(temp_dir.path()) + .unwrap() + .with_sync_writes(); let mut batch = db.new_write_batch(); batch.put_cf(NewColumnFamilies::Default, b"test", b"value"); batch.put_cf(NewColumnFamilies::Default, b"test2", b"value2"); diff --git a/core/lib/types/src/snapshots.rs b/core/lib/types/src/snapshots.rs index 19f818bb5d1e..5d7b14fb6e96 100644 --- a/core/lib/types/src/snapshots.rs +++ b/core/lib/types/src/snapshots.rs @@ -1,11 +1,12 @@ -use std::convert::TryFrom; +use std::{convert::TryFrom, ops}; use anyhow::Context; use serde::{Deserialize, Serialize}; use zksync_basic_types::{AccountTreeId, L1BatchNumber, MiniblockNumber, H256}; use zksync_protobuf::{required, ProtoFmt}; +use zksync_utils::u256_to_h256; -use crate::{commitment::L1BatchWithMetadata, Bytes, StorageKey, StorageValue}; +use crate::{commitment::L1BatchWithMetadata, Bytes, StorageKey, StorageValue, U256}; /// Information about all snapshots persisted by the node. #[derive(Debug, Clone, Serialize, Deserialize)] @@ -196,3 +197,81 @@ pub struct SnapshotRecoveryStatus { pub last_finished_chunk_id: Option, pub total_chunk_count: u64, } + +/// Returns a chunk of `hashed_keys` with 0-based index `chunk_id` among `count`. Chunks do not intersect and jointly cover +/// the entire `hashed_key` space. If `hashed_key`s are uniformly distributed (which is the case), the returned ranges +/// are expected to contain the same number of entries. +/// +/// Used by multiple components during snapshot creation and recovery. +/// +/// # Panics +/// +/// Panics if `chunk_count == 0` or `chunk_id >= chunk_count`. +pub fn uniform_hashed_keys_chunk(chunk_id: u64, chunk_count: u64) -> ops::RangeInclusive { + assert!(chunk_count > 0, "`chunk_count` must be positive"); + assert!( + chunk_id < chunk_count, + "Chunk index {} exceeds count {}", + chunk_id, + chunk_count + ); + + let mut stride = U256::MAX / chunk_count; + let stride_minus_one = if stride < U256::MAX { + stride += U256::one(); + stride - 1 + } else { + stride // `stride` is really 1 << 256 == U256::MAX + 1 + }; + + let start = stride * chunk_id; + let (mut end, is_overflow) = stride_minus_one.overflowing_add(start); + if is_overflow { + end = U256::MAX; + } + u256_to_h256(start)..=u256_to_h256(end) +} + +#[cfg(test)] +mod tests { + use zksync_utils::h256_to_u256; + + use super::*; + + #[test] + fn chunking_is_correct() { + for chunks_count in (2..10).chain([42, 256, 500, 1_001, 12_345]) { + println!("Testing chunks_count={chunks_count}"); + let chunked_ranges: Vec<_> = (0..chunks_count) + .map(|chunk_id| uniform_hashed_keys_chunk(chunk_id, chunks_count)) + .collect(); + + assert_eq!(*chunked_ranges[0].start(), H256::zero()); + assert_eq!( + *chunked_ranges.last().unwrap().end(), + H256::repeat_byte(0xff) + ); + for window in chunked_ranges.windows(2) { + let [prev_chunk, next_chunk] = window else { + unreachable!(); + }; + assert_eq!( + h256_to_u256(*prev_chunk.end()) + 1, + h256_to_u256(*next_chunk.start()) + ); + } + + let chunk_sizes: Vec<_> = chunked_ranges + .iter() + .map(|chunk| h256_to_u256(*chunk.end()) - h256_to_u256(*chunk.start()) + 1) + .collect(); + + // Check that chunk sizes are roughly equal. Due to how chunks are constructed, the sizes + // of all chunks except for the last one are the same, and the last chunk size may be slightly smaller; + // the difference in sizes is lesser than the number of chunks. + let min_chunk_size = chunk_sizes.iter().copied().min().unwrap(); + let max_chunk_size = chunk_sizes.iter().copied().max().unwrap(); + assert!(max_chunk_size - min_chunk_size < U256::from(chunks_count)); + } + } +} diff --git a/core/lib/zksync_core/src/block_reverter/mod.rs b/core/lib/zksync_core/src/block_reverter/mod.rs index e45bef7eb216..6a97d1c99c19 100644 --- a/core/lib/zksync_core/src/block_reverter/mod.rs +++ b/core/lib/zksync_core/src/block_reverter/mod.rs @@ -189,7 +189,7 @@ impl BlockReverter { path: &Path, storage_root_hash: H256, ) { - let db = RocksDB::new(path); + let db = RocksDB::new(path).expect("Failed initializing RocksDB for Merkle tree"); let mut tree = ZkSyncTree::new_lightweight(db.into()); if tree.next_l1_batch_number() <= last_l1_batch_to_keep { @@ -207,14 +207,19 @@ impl BlockReverter { /// Reverts blocks in the state keeper cache. async fn rollback_state_keeper_cache(&self, last_l1_batch_to_keep: L1BatchNumber) { tracing::info!("opening DB with state keeper cache..."); - let mut sk_cache = RocksdbStorage::new(self.state_keeper_cache_path.as_ref()); + let sk_cache = RocksdbStorage::builder(self.state_keeper_cache_path.as_ref()) + .await + .expect("Failed initializing state keeper cache"); - if sk_cache.l1_batch_number() > last_l1_batch_to_keep + 1 { + if sk_cache.l1_batch_number().await > Some(last_l1_batch_to_keep + 1) { let mut storage = self.connection_pool.access_storage().await.unwrap(); - tracing::info!("rolling back state keeper cache..."); - sk_cache.rollback(&mut storage, last_l1_batch_to_keep).await; + tracing::info!("Rolling back state keeper cache..."); + sk_cache + .rollback(&mut storage, last_l1_batch_to_keep) + .await + .expect("Failed rolling back state keeper cache"); } else { - tracing::info!("nothing to revert in state keeper cache"); + tracing::info!("Nothing to revert in state keeper cache"); } } @@ -260,7 +265,8 @@ impl BlockReverter { transaction .storage_logs_dal() .rollback_storage(last_miniblock_to_keep) - .await; + .await + .expect("failed rolling back storage"); tracing::info!("rolling back storage logs..."); transaction .storage_logs_dal() diff --git a/core/lib/zksync_core/src/lib.rs b/core/lib/zksync_core/src/lib.rs index 8d7a9d630dc0..1fd5af883d74 100644 --- a/core/lib/zksync_core/src/lib.rs +++ b/core/lib/zksync_core/src/lib.rs @@ -811,7 +811,9 @@ async fn run_tree( tracing::info!("Initializing Merkle tree in {mode_str} mode"); let config = MetadataCalculatorConfig::for_main_node(merkle_tree_config, operation_manager); - let metadata_calculator = MetadataCalculator::new(config, object_store).await; + let metadata_calculator = MetadataCalculator::new(config, object_store) + .await + .context("failed initializing metadata_calculator")?; if let Some(api_config) = api_config { let address = (Ipv4Addr::UNSPECIFIED, api_config.port).into(); let tree_reader = metadata_calculator.tree_reader(); diff --git a/core/lib/zksync_core/src/metadata_calculator/helpers.rs b/core/lib/zksync_core/src/metadata_calculator/helpers.rs index 563e643d7e11..58c51d7c4066 100644 --- a/core/lib/zksync_core/src/metadata_calculator/helpers.rs +++ b/core/lib/zksync_core/src/metadata_calculator/helpers.rs @@ -7,6 +7,7 @@ use std::{ time::Duration, }; +use anyhow::Context as _; use serde::{Deserialize, Serialize}; #[cfg(test)] use tokio::sync::mpsc; @@ -45,7 +46,7 @@ pub(super) async fn create_db( memtable_capacity: usize, stalled_writes_timeout: Duration, multi_get_chunk_size: usize, -) -> RocksDBWrapper { +) -> anyhow::Result { tokio::task::spawn_blocking(move || { create_db_sync( &path, @@ -56,7 +57,7 @@ pub(super) async fn create_db( ) }) .await - .unwrap() + .context("panicked creating Merkle tree RocksDB")? } fn create_db_sync( @@ -65,7 +66,7 @@ fn create_db_sync( memtable_capacity: usize, stalled_writes_timeout: Duration, multi_get_chunk_size: usize, -) -> RocksDBWrapper { +) -> anyhow::Result { tracing::info!( "Initializing Merkle tree database at `{path}` with {multi_get_chunk_size} multi-get chunk size, \ {block_cache_capacity}B block cache, {memtable_capacity}B memtable capacity, \ @@ -80,7 +81,7 @@ fn create_db_sync( large_memtable_capacity: Some(memtable_capacity), stalled_writes_retries: StalledWritesRetries::new(stalled_writes_timeout), }, - ); + )?; if cfg!(test) { // We need sync writes for the unit tests to execute reliably. With the default config, // some writes to RocksDB may occur, but not be visible to the test code. @@ -88,7 +89,7 @@ fn create_db_sync( } let mut db = RocksDBWrapper::from(db); db.set_multi_get_chunk_size(multi_get_chunk_size); - db + Ok(db) } /// Wrapper around the "main" tree implementation used by [`MetadataCalculator`]. @@ -382,7 +383,8 @@ impl L1BatchWithLogs { let mut touched_slots = storage .storage_logs_dal() .get_touched_slots_for_l1_batch(l1_batch_number) - .await; + .await + .unwrap(); touched_slots_latency.observe_with_count(touched_slots.len()); let leaf_indices_latency = METRICS.start_load_stage(LoadChangesStage::LoadLeafIndices); @@ -391,7 +393,8 @@ impl L1BatchWithLogs { let l1_batches_for_initial_writes = storage .storage_logs_dal() .get_l1_batches_and_indices_for_initial_writes(&hashed_keys_for_writes) - .await; + .await + .unwrap(); leaf_indices_latency.observe_with_count(hashed_keys_for_writes.len()); let mut storage_logs = BTreeMap::new(); @@ -460,7 +463,8 @@ mod tests { let touched_slots = storage .storage_logs_dal() .get_touched_slots_for_l1_batch(l1_batch_number) - .await; + .await + .unwrap(); let mut storage_logs = BTreeMap::new(); @@ -472,11 +476,13 @@ mod tests { let previous_values = storage .storage_logs_dal() .get_previous_storage_values(&hashed_keys, l1_batch_number) - .await; + .await + .unwrap(); let l1_batches_for_initial_writes = storage .storage_logs_dal() .get_l1_batches_and_indices_for_initial_writes(&hashed_keys) - .await; + .await + .unwrap(); for storage_key in protective_reads { let previous_value = previous_values[&storage_key.hashed_key()].unwrap_or_default(); @@ -566,7 +572,8 @@ mod tests { Duration::ZERO, // writes should never be stalled in tests 500, ) - .await; + .await + .unwrap(); AsyncTree::new(db, MerkleTreeMode::Full) } diff --git a/core/lib/zksync_core/src/metadata_calculator/metrics.rs b/core/lib/zksync_core/src/metadata_calculator/metrics.rs index 87ab8fb377f7..e1f631641289 100644 --- a/core/lib/zksync_core/src/metadata_calculator/metrics.rs +++ b/core/lib/zksync_core/src/metadata_calculator/metrics.rs @@ -197,7 +197,7 @@ pub(super) enum ChunkRecoveryStage { #[metrics(prefix = "server_metadata_calculator_recovery")] pub(super) struct MetadataCalculatorRecoveryMetrics { /// Number of chunks recovered. - pub recovered_chunk_count: Gauge, + pub recovered_chunk_count: Gauge, /// Latency of a tree recovery stage (not related to the recovery of a particular chunk; /// those metrics are tracked in the `chunk_latency` histogram). #[metrics(buckets = Buckets::LATENCIES, unit = Unit::Seconds)] diff --git a/core/lib/zksync_core/src/metadata_calculator/mod.rs b/core/lib/zksync_core/src/metadata_calculator/mod.rs index 36c8d7eb5a64..b9eeada5dd3f 100644 --- a/core/lib/zksync_core/src/metadata_calculator/mod.rs +++ b/core/lib/zksync_core/src/metadata_calculator/mod.rs @@ -93,8 +93,8 @@ impl MetadataCalculator { pub async fn new( config: MetadataCalculatorConfig, object_store: Option>, - ) -> Self { - assert!( + ) -> anyhow::Result { + anyhow::ensure!( config.max_l1_batches_per_iter > 0, "Maximum L1 batches per iteration is misconfigured to be 0; please update it to positive value" ); @@ -106,18 +106,18 @@ impl MetadataCalculator { config.stalled_writes_timeout, config.multi_get_chunk_size, ) - .await; + .await?; let tree = GenericAsyncTree::new(db, config.mode).await; let (_, health_updater) = ReactiveHealthCheck::new("tree"); - Self { + Ok(Self { tree, tree_reader: watch::channel(None).0, object_store, delayer: Delayer::new(config.delay_interval), health_updater, max_l1_batches_per_iter: config.max_l1_batches_per_iter, - } + }) } /// Returns a health check for this calculator. diff --git a/core/lib/zksync_core/src/metadata_calculator/recovery/mod.rs b/core/lib/zksync_core/src/metadata_calculator/recovery/mod.rs index 0d37dd024178..a0e50c1d2ca6 100644 --- a/core/lib/zksync_core/src/metadata_calculator/recovery/mod.rs +++ b/core/lib/zksync_core/src/metadata_calculator/recovery/mod.rs @@ -27,7 +27,7 @@ use std::{ fmt, ops, - sync::atomic::{AtomicUsize, Ordering}, + sync::atomic::{AtomicU64, Ordering}, }; use anyhow::Context as _; @@ -38,8 +38,10 @@ use tokio::sync::{watch, Mutex, Semaphore}; use zksync_dal::{ConnectionPool, StorageProcessor}; use zksync_health_check::{Health, HealthStatus, HealthUpdater}; use zksync_merkle_tree::TreeEntry; -use zksync_types::{snapshots::SnapshotRecoveryStatus, MiniblockNumber, H256, U256}; -use zksync_utils::u256_to_h256; +use zksync_types::{ + snapshots::{uniform_hashed_keys_chunk, SnapshotRecoveryStatus}, + MiniblockNumber, H256, +}; use super::{ helpers::{AsyncTree, AsyncTreeRecovery, GenericAsyncTree}, @@ -53,7 +55,7 @@ mod tests; /// to control recovery behavior in tests. #[async_trait] trait HandleRecoveryEvent: fmt::Debug + Send + Sync { - fn recovery_started(&mut self, _chunk_count: usize, _recovered_chunk_count: usize) { + fn recovery_started(&mut self, _chunk_count: u64, _recovered_chunk_count: u64) { // Default implementation does nothing } @@ -70,16 +72,16 @@ trait HandleRecoveryEvent: fmt::Debug + Send + Sync { #[derive(Debug, Clone, Copy, Serialize, Deserialize)] struct RecoveryMerkleTreeInfo { mode: &'static str, // always set to "recovery" to distinguish from `MerkleTreeInfo` - chunk_count: usize, - recovered_chunk_count: usize, + chunk_count: u64, + recovered_chunk_count: u64, } /// [`HealthUpdater`]-based [`HandleRecoveryEvent`] implementation. #[derive(Debug)] struct RecoveryHealthUpdater<'a> { inner: &'a HealthUpdater, - chunk_count: usize, - recovered_chunk_count: AtomicUsize, + chunk_count: u64, + recovered_chunk_count: AtomicU64, } impl<'a> RecoveryHealthUpdater<'a> { @@ -87,14 +89,14 @@ impl<'a> RecoveryHealthUpdater<'a> { Self { inner, chunk_count: 0, - recovered_chunk_count: AtomicUsize::new(0), + recovered_chunk_count: AtomicU64::new(0), } } } #[async_trait] impl HandleRecoveryEvent for RecoveryHealthUpdater<'_> { - fn recovery_started(&mut self, chunk_count: usize, recovered_chunk_count: usize) { + fn recovery_started(&mut self, chunk_count: u64, recovered_chunk_count: u64) { self.chunk_count = chunk_count; *self.recovered_chunk_count.get_mut() = recovered_chunk_count; RECOVERY_METRICS @@ -146,15 +148,15 @@ impl SnapshotParameters { }) } - fn chunk_count(&self) -> usize { - zksync_utils::ceil_div(self.log_count, Self::DESIRED_CHUNK_SIZE) as usize + fn chunk_count(&self) -> u64 { + self.log_count.div_ceil(Self::DESIRED_CHUNK_SIZE) } } /// Options for tree recovery. #[derive(Debug)] struct RecoveryOptions<'a> { - chunk_count: usize, + chunk_count: u64, concurrency_limit: usize, events: Box, } @@ -219,7 +221,9 @@ impl AsyncTreeRecovery { stop_receiver: &watch::Receiver, ) -> anyhow::Result> { let chunk_count = options.chunk_count; - let chunks: Vec<_> = Self::hashed_key_ranges(chunk_count).collect(); + let chunks: Vec<_> = (0..chunk_count) + .map(|chunk_id| uniform_hashed_keys_chunk(chunk_id, chunk_count)) + .collect(); tracing::info!( "Recovering Merkle tree from Postgres snapshot in {chunk_count} concurrent chunks" ); @@ -231,7 +235,7 @@ impl AsyncTreeRecovery { drop(storage); options .events - .recovery_started(chunk_count, chunk_count - remaining_chunks.len()); + .recovery_started(chunk_count, chunk_count - remaining_chunks.len() as u64); tracing::info!( "Filtered recovered key chunks; {} / {chunk_count} chunks remaining", remaining_chunks.len() @@ -271,26 +275,6 @@ impl AsyncTreeRecovery { Ok(Some(tree)) } - fn hashed_key_ranges(count: usize) -> impl Iterator> { - assert!(count > 0); - let mut stride = U256::MAX / count; - let stride_minus_one = if stride < U256::MAX { - stride += U256::one(); - stride - 1 - } else { - stride // `stride` is really 1 << 256 == U256::MAX + 1 - }; - - (0..count).map(move |i| { - let start = stride * i; - let (mut end, is_overflow) = stride_minus_one.overflowing_add(start); - if is_overflow { - end = U256::MAX; - } - u256_to_h256(start)..=u256_to_h256(end) - }) - } - /// Filters out `key_chunks` for which recovery was successfully performed. async fn filter_chunks( &mut self, @@ -317,7 +301,7 @@ impl AsyncTreeRecovery { .filter_map(|(i, &start)| Some((i, start?))); let start_keys = existing_starts .clone() - .map(|(_, start_entry)| start_entry.key) + .map(|(_, start_entry)| start_entry.tree_key()) .collect(); let tree_entries = self.entries(start_keys).await; @@ -389,7 +373,7 @@ impl AsyncTreeRecovery { let all_entries = all_entries .into_iter() .map(|entry| TreeEntry { - key: entry.key, + key: entry.tree_key(), value: entry.value, leaf_index: entry.leaf_index, }) diff --git a/core/lib/zksync_core/src/metadata_calculator/recovery/tests.rs b/core/lib/zksync_core/src/metadata_calculator/recovery/tests.rs index 180894f2fc7d..3ba91da56ae0 100644 --- a/core/lib/zksync_core/src/metadata_calculator/recovery/tests.rs +++ b/core/lib/zksync_core/src/metadata_calculator/recovery/tests.rs @@ -13,7 +13,6 @@ use zksync_config::configs::{ use zksync_health_check::{CheckHealth, ReactiveHealthCheck}; use zksync_merkle_tree::{domain::ZkSyncTree, TreeInstruction}; use zksync_types::{L1BatchNumber, L2ChainId, StorageLog}; -use zksync_utils::h256_to_u256; use super::*; use crate::{ @@ -29,45 +28,6 @@ use crate::{ utils::testonly::prepare_recovery_snapshot, }; -#[test] -fn calculating_hashed_key_ranges_with_single_chunk() { - let mut ranges = AsyncTreeRecovery::hashed_key_ranges(1); - let full_range = ranges.next().unwrap(); - assert_eq!(full_range, H256::zero()..=H256([0xff; 32])); -} - -#[test] -fn calculating_hashed_key_ranges_for_256_chunks() { - let ranges = AsyncTreeRecovery::hashed_key_ranges(256); - let mut start = H256::zero(); - let mut end = H256([0xff; 32]); - - for (i, range) in ranges.enumerate() { - let i = u8::try_from(i).unwrap(); - start.0[0] = i; - end.0[0] = i; - assert_eq!(range, start..=end); - } -} - -#[test_casing(5, [3, 7, 23, 100, 255])] -fn calculating_hashed_key_ranges_for_arbitrary_chunks(chunk_count: usize) { - let ranges: Vec<_> = AsyncTreeRecovery::hashed_key_ranges(chunk_count).collect(); - assert_eq!(ranges.len(), chunk_count); - - for window in ranges.windows(2) { - let [prev_range, range] = window else { - unreachable!(); - }; - assert_eq!( - h256_to_u256(*range.start()), - h256_to_u256(*prev_range.end()) + 1 - ); - } - assert_eq!(*ranges.first().unwrap().start(), H256::zero()); - assert_eq!(*ranges.last().unwrap().end(), H256([0xff; 32])); -} - #[test] fn calculating_chunk_count() { let mut snapshot = SnapshotParameters { @@ -92,7 +52,8 @@ async fn create_tree_recovery(path: PathBuf, l1_batch: L1BatchNumber) -> AsyncTr Duration::ZERO, // writes should never be stalled in tests 500, ) - .await; + .await + .unwrap(); AsyncTreeRecovery::new(db, l1_batch.0.into(), MerkleTreeMode::Full) } @@ -145,7 +106,8 @@ async fn prepare_recovery_snapshot_with_genesis( let genesis_logs = storage .storage_logs_dal() .get_touched_slots_for_l1_batch(L1BatchNumber(0)) - .await; + .await + .unwrap(); let genesis_logs = genesis_logs .into_iter() .map(|(key, value)| StorageLog::new_write_log(key, value)); @@ -169,23 +131,23 @@ async fn prepare_recovery_snapshot_with_genesis( #[derive(Debug)] struct TestEventListener { - expected_recovered_chunks: usize, - stop_threshold: usize, - processed_chunk_count: AtomicUsize, + expected_recovered_chunks: u64, + stop_threshold: u64, + processed_chunk_count: AtomicU64, stop_sender: watch::Sender, } impl TestEventListener { - fn new(stop_threshold: usize, stop_sender: watch::Sender) -> Self { + fn new(stop_threshold: u64, stop_sender: watch::Sender) -> Self { Self { expected_recovered_chunks: 0, stop_threshold, - processed_chunk_count: AtomicUsize::new(0), + processed_chunk_count: AtomicU64::new(0), stop_sender, } } - fn expect_recovered_chunks(mut self, count: usize) -> Self { + fn expect_recovered_chunks(mut self, count: u64) -> Self { self.expected_recovered_chunks = count; self } @@ -193,7 +155,7 @@ impl TestEventListener { #[async_trait] impl HandleRecoveryEvent for TestEventListener { - fn recovery_started(&mut self, _chunk_count: usize, recovered_chunk_count: usize) { + fn recovery_started(&mut self, _chunk_count: u64, recovered_chunk_count: u64) { assert_eq!(recovered_chunk_count, self.expected_recovered_chunks); } @@ -207,7 +169,7 @@ impl HandleRecoveryEvent for TestEventListener { #[test_casing(3, [5, 7, 8])] #[tokio::test] -async fn recovery_fault_tolerance(chunk_count: usize) { +async fn recovery_fault_tolerance(chunk_count: u64) { let pool = ConnectionPool::test_pool().await; let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); let snapshot_recovery = prepare_recovery_snapshot_with_genesis(&pool, &temp_dir).await; @@ -251,9 +213,7 @@ async fn recovery_fault_tolerance(chunk_count: usize) { let recovery_options = RecoveryOptions { chunk_count, concurrency_limit: 1, - events: Box::new( - TestEventListener::new(usize::MAX, stop_sender).expect_recovered_chunks(3), - ), + events: Box::new(TestEventListener::new(u64::MAX, stop_sender).expect_recovered_chunks(3)), }; let tree = tree .recover(snapshot, recovery_options, &pool, &stop_receiver) @@ -291,7 +251,9 @@ async fn entire_recovery_workflow(case: RecoveryWorkflowCase) { &merkle_tree_config, &OperationsManagerConfig { delay_interval: 50 }, ); - let mut calculator = MetadataCalculator::new(calculator_config, None).await; + let mut calculator = MetadataCalculator::new(calculator_config, None) + .await + .unwrap(); let (delay_sx, mut delay_rx) = mpsc::unbounded_channel(); calculator.delayer.delay_notifier = delay_sx; diff --git a/core/lib/zksync_core/src/metadata_calculator/tests.rs b/core/lib/zksync_core/src/metadata_calculator/tests.rs index da158ff11ef9..8a2b9beb6fb6 100644 --- a/core/lib/zksync_core/src/metadata_calculator/tests.rs +++ b/core/lib/zksync_core/src/metadata_calculator/tests.rs @@ -400,7 +400,9 @@ async fn setup_calculator_with_options( ) -> MetadataCalculator { let calculator_config = MetadataCalculatorConfig::for_main_node(merkle_tree_config, operation_config); - let metadata_calculator = MetadataCalculator::new(calculator_config, object_store).await; + let metadata_calculator = MetadataCalculator::new(calculator_config, object_store) + .await + .unwrap(); let mut storage = pool.access_storage().await.unwrap(); if storage.blocks_dal().is_genesis_needed().await.unwrap() { @@ -526,6 +528,7 @@ async fn insert_initial_writes_for_batch( .storage_logs_dal() .get_touched_slots_for_l1_batch(l1_batch_number) .await + .unwrap() .into_iter() .filter_map(|(key, value)| (!value.is_zero()).then_some(key)) .collect(); @@ -631,7 +634,8 @@ async fn deduplication_works_as_expected() { let initial_writes = storage .storage_logs_dal() .get_l1_batches_and_indices_for_initial_writes(&hashed_keys) - .await; + .await + .unwrap(); assert_eq!(initial_writes.len(), hashed_keys.len()); assert!(initial_writes .values() @@ -650,7 +654,8 @@ async fn deduplication_works_as_expected() { let initial_writes = storage .storage_logs_dal() .get_l1_batches_and_indices_for_initial_writes(&hashed_keys) - .await; + .await + .unwrap(); assert_eq!(initial_writes.len(), hashed_keys.len()); assert!(initial_writes .values() @@ -659,7 +664,8 @@ async fn deduplication_works_as_expected() { let initial_writes = storage .storage_logs_dal() .get_l1_batches_and_indices_for_initial_writes(&new_hashed_keys) - .await; + .await + .unwrap(); assert_eq!(initial_writes.len(), new_hashed_keys.len()); assert!(initial_writes .values() @@ -675,7 +681,8 @@ async fn deduplication_works_as_expected() { let initial_writes = storage .storage_logs_dal() .get_l1_batches_and_indices_for_initial_writes(&no_op_hashed_keys) - .await; + .await + .unwrap(); assert!(initial_writes.is_empty()); let updated_logs: Vec<_> = no_op_logs @@ -692,7 +699,8 @@ async fn deduplication_works_as_expected() { let initial_writes = storage .storage_logs_dal() .get_l1_batches_and_indices_for_initial_writes(&no_op_hashed_keys) - .await; + .await + .unwrap(); assert_eq!(initial_writes.len(), no_op_hashed_keys.len() / 2); for key in no_op_hashed_keys.iter().step_by(2) { assert_eq!(initial_writes[key].0, L1BatchNumber(4)); diff --git a/core/lib/zksync_core/src/state_keeper/batch_executor/mod.rs b/core/lib/zksync_core/src/state_keeper/batch_executor/mod.rs index 16e1e83f6771..cdc1c182abcd 100644 --- a/core/lib/zksync_core/src/state_keeper/batch_executor/mod.rs +++ b/core/lib/zksync_core/src/state_keeper/batch_executor/mod.rs @@ -12,7 +12,7 @@ use multivm::{ }; use once_cell::sync::OnceCell; use tokio::{ - sync::{mpsc, oneshot}, + sync::{mpsc, oneshot, watch}, task::JoinHandle, }; use zksync_dal::ConnectionPool; @@ -75,7 +75,8 @@ pub trait L1BatchExecutorBuilder: 'static + Send + Sync + fmt::Debug { &mut self, l1_batch_params: L1BatchEnv, system_env: SystemEnv, - ) -> BatchExecutorHandle; + stop_receiver: &watch::Receiver, + ) -> Option; } /// The default implementation of [`L1BatchExecutorBuilder`]. @@ -119,18 +120,23 @@ impl L1BatchExecutorBuilder for MainBatchExecutorBuilder { &mut self, l1_batch_params: L1BatchEnv, system_env: SystemEnv, - ) -> BatchExecutorHandle { - let mut secondary_storage = RocksdbStorage::new(self.state_keeper_db_path.as_ref()); + stop_receiver: &watch::Receiver, + ) -> Option { + let mut secondary_storage = RocksdbStorage::builder(self.state_keeper_db_path.as_ref()) + .await + .expect("Failed initializing state keeper storage"); secondary_storage.enable_enum_index_migration(self.enum_index_migration_chunk_size); let mut conn = self .pool .access_storage_tagged("state_keeper") .await .unwrap(); - secondary_storage.update_from_postgres(&mut conn).await; - drop(conn); + let secondary_storage = secondary_storage + .synchronize(&mut conn, stop_receiver) + .await + .expect("Failed synchronizing secondary state keeper storage")?; - BatchExecutorHandle::new( + Some(BatchExecutorHandle::new( self.save_call_traces, self.max_allowed_tx_gas_limit, secondary_storage, @@ -138,7 +144,7 @@ impl L1BatchExecutorBuilder for MainBatchExecutorBuilder { system_env, self.upload_witness_inputs_to_gcs, self.optional_bytecode_compression, - ) + )) } } diff --git a/core/lib/zksync_core/src/state_keeper/batch_executor/tests/tester.rs b/core/lib/zksync_core/src/state_keeper/batch_executor/tests/tester.rs index 413a12bdf2ed..dc7f9d0d9794 100644 --- a/core/lib/zksync_core/src/state_keeper/batch_executor/tests/tester.rs +++ b/core/lib/zksync_core/src/state_keeper/batch_executor/tests/tester.rs @@ -6,10 +6,10 @@ use multivm::{ vm_latest::constants::INITIAL_STORAGE_WRITE_PUBDATA_BYTES, }; use tempfile::TempDir; +use tokio::sync::watch; use zksync_config::configs::chain::StateKeeperConfig; use zksync_contracts::{get_loadnext_contract, test_contracts::LoadnextContractExecutionParams}; use zksync_dal::ConnectionPool; -use zksync_state::RocksdbStorage; use zksync_test_account::{Account, DeployContractsTx, TxType}; use zksync_types::{ ethabi::Token, fee::Fee, system_contracts::get_system_smart_contracts, @@ -24,6 +24,7 @@ use crate::{ state_keeper::{ batch_executor::BatchExecutorHandle, tests::{default_l1_batch_env, default_system_env, BASE_SYSTEM_CONTRACTS}, + L1BatchExecutorBuilder, MainBatchExecutorBuilder, }, }; @@ -86,34 +87,27 @@ impl Tester { /// Creates a batch executor instance. /// This function intentionally uses sensible defaults to not introduce boilerplate. pub(super) async fn create_batch_executor(&self) -> BatchExecutorHandle { + let mut builder = MainBatchExecutorBuilder::new( + self.db_dir.path().to_str().unwrap().to_owned(), + self.pool.clone(), + self.config.max_allowed_tx_gas_limit.into(), + self.config.save_call_traces, + self.config.upload_witness_inputs_to_gcs, + 100, + false, + ); + // Not really important for the batch executor - it operates over a single batch. - let (l1_batch, system_env) = self.batch_params( + let (l1_batch_env, system_env) = self.batch_params( L1BatchNumber(1), 100, self.config.validation_computational_gas_limit, ); - - let mut secondary_storage = RocksdbStorage::new(self.db_dir.path()); - let mut conn = self - .pool - .access_storage_tagged("state_keeper") + let (_stop_sender, stop_receiver) = watch::channel(false); + builder + .init_batch(l1_batch_env, system_env, &stop_receiver) .await - .unwrap(); - - secondary_storage.update_from_postgres(&mut conn).await; - drop(conn); - - // We don't use the builder because it would require us to clone the `ConnectionPool`, which is forbidden - // for the test pool (see the doc-comment on `TestPool` for details). - BatchExecutorHandle::new( - self.config.save_call_traces, - self.config.max_allowed_tx_gas_limit.into(), - secondary_storage, - l1_batch, - system_env, - self.config.upload_witness_inputs_to_gcs, - false, - ) + .expect("Batch executor was interrupted") } /// Creates test batch params that can be fed into the VM. diff --git a/core/lib/zksync_core/src/state_keeper/io/tests/mod.rs b/core/lib/zksync_core/src/state_keeper/io/tests/mod.rs index 69c68f3f5c1b..7ba6a12f53fc 100644 --- a/core/lib/zksync_core/src/state_keeper/io/tests/mod.rs +++ b/core/lib/zksync_core/src/state_keeper/io/tests/mod.rs @@ -270,7 +270,8 @@ async fn processing_storage_logs_when_sealing_miniblock() { let touched_slots = conn .storage_logs_dal() .get_touched_slots_for_l1_batch(l1_batch_number) - .await; + .await + .unwrap(); // Keys that are only read must not be written to `storage_logs`. let account = AccountTreeId::default(); diff --git a/core/lib/zksync_core/src/state_keeper/keeper.rs b/core/lib/zksync_core/src/state_keeper/keeper.rs index 209809d33f95..d504bda0f130 100644 --- a/core/lib/zksync_core/src/state_keeper/keeper.rs +++ b/core/lib/zksync_core/src/state_keeper/keeper.rs @@ -158,8 +158,13 @@ impl ZkSyncStateKeeper { let mut batch_executor = self .batch_executor_base - .init_batch(l1_batch_env.clone(), system_env.clone()) - .await; + .init_batch( + l1_batch_env.clone(), + system_env.clone(), + &self.stop_receiver, + ) + .await + .ok_or(Error::Canceled)?; self.restore_state(&batch_executor, &mut updates_manager, pending_miniblocks) .await?; @@ -210,8 +215,13 @@ impl ZkSyncStateKeeper { ); batch_executor = self .batch_executor_base - .init_batch(l1_batch_env.clone(), system_env.clone()) - .await; + .init_batch( + l1_batch_env.clone(), + system_env.clone(), + &self.stop_receiver, + ) + .await + .ok_or(Error::Canceled)?; let version_changed = system_env.version != sealed_batch_protocol_version; diff --git a/core/lib/zksync_core/src/state_keeper/tests/tester.rs b/core/lib/zksync_core/src/state_keeper/tests/tester.rs index 9ac886270d31..ace93394a44a 100644 --- a/core/lib/zksync_core/src/state_keeper/tests/tester.rs +++ b/core/lib/zksync_core/src/state_keeper/tests/tester.rs @@ -452,7 +452,8 @@ impl L1BatchExecutorBuilder for TestBatchExecutorBuilder { &mut self, _l1batch_params: L1BatchEnv, _system_env: SystemEnv, - ) -> BatchExecutorHandle { + _stop_receiver: &watch::Receiver, + ) -> Option { let (commands_sender, commands_receiver) = mpsc::channel(1); let executor = TestBatchExecutor::new( @@ -462,7 +463,7 @@ impl L1BatchExecutorBuilder for TestBatchExecutorBuilder { ); let handle = tokio::task::spawn_blocking(move || executor.run()); - BatchExecutorHandle::from_raw(handle, commands_sender) + Some(BatchExecutorHandle::from_raw(handle, commands_sender)) } } @@ -780,7 +781,8 @@ impl L1BatchExecutorBuilder for MockBatchExecutorBuilder { &mut self, _l1batch_params: L1BatchEnv, _system_env: SystemEnv, - ) -> BatchExecutorHandle { + _stop_receiver: &watch::Receiver, + ) -> Option { let (send, recv) = mpsc::channel(1); let handle = tokio::task::spawn(async { let mut recv = recv; @@ -797,6 +799,6 @@ impl L1BatchExecutorBuilder for MockBatchExecutorBuilder { } } }); - BatchExecutorHandle::from_raw(handle, send) + Some(BatchExecutorHandle::from_raw(handle, send)) } } diff --git a/prover/Cargo.lock b/prover/Cargo.lock index bea7839b2c3c..7f36f52d54a2 100644 --- a/prover/Cargo.lock +++ b/prover/Cargo.lock @@ -7568,7 +7568,6 @@ dependencies = [ name = "zksync_crypto" version = "0.1.0" dependencies = [ - "base64 0.13.1", "blake2 0.10.6 (registry+https://github.com/rust-lang/crates.io-index)", "hex", "once_cell", @@ -7854,15 +7853,9 @@ name = "zksync_system_constants" version = "0.1.0" dependencies = [ "anyhow", - "bigdecimal", - "hex", "num 0.3.1", "once_cell", - "serde", - "serde_json", - "url", "zksync_basic_types", - "zksync_contracts", "zksync_utils", ] @@ -7893,7 +7886,6 @@ dependencies = [ "zkevm_test_harness 1.3.3", "zksync_basic_types", "zksync_config", - "zksync_consensus_roles", "zksync_contracts", "zksync_mini_merkle_tree", "zksync_protobuf",