From d01840d5de2cb0f4bead8f1c384b24ba713e6a66 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Fri, 30 Aug 2024 13:21:22 +0300 Subject: [PATCH] feat(vm-runner): Implement batch data prefetching (#2724) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ - Implements prefetching of storage slots / bytecodes accessed by a VM in a batch. Enables it for the VM playground. Optionally shadows prefetched snapshot storage. - Makes RocksDB cache optional for VM playground. ## Why ❔ - Prefetching will allow to load storage slots / bytecodes for a batch in O(1) DB queries, which is very efficient for local debugging etc. It may be on par or faster than using RocksDB cache. (There's a caveat: prefetching doesn't work w/o protective reads.) - Disabling RocksDB cache is useful for local testing, since the cache won't catch up during a single batch run anyway. ## Checklist - [x] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [x] Tests for the changes have been added / updated. - [x] Documentation comments have been added / updated. - [x] Code has been formatted via `zk fmt` and `zk lint`. --- Cargo.lock | 2 + core/lib/config/src/configs/experimental.rs | 9 +- core/lib/env_config/src/vm_runner.rs | 4 +- core/lib/protobuf_config/src/experimental.rs | 7 +- .../src/proto/config/experimental.proto | 2 +- core/lib/state/src/lib.rs | 2 +- core/lib/state/src/rocksdb/mod.rs | 2 +- core/lib/state/src/shadow_storage.rs | 78 +++++--- core/lib/state/src/storage_factory.rs | 149 ++++++++++++-- core/lib/vm_interface/Cargo.toml | 1 + core/lib/vm_interface/src/storage/mod.rs | 2 + core/lib/vm_interface/src/storage/snapshot.rs | 189 ++++++++++++++++++ .../layers/vm_runner/playground.rs | 16 +- .../state_keeper/src/state_keeper_storage.rs | 4 +- core/node/vm_runner/Cargo.toml | 1 + core/node/vm_runner/src/impls/mod.rs | 2 +- core/node/vm_runner/src/impls/playground.rs | 141 +++++++++---- core/node/vm_runner/src/storage.rs | 65 +++++- core/node/vm_runner/src/tests/mod.rs | 70 ++----- core/node/vm_runner/src/tests/playground.rs | 111 +++++++--- core/node/vm_runner/src/tests/process.rs | 10 +- core/node/vm_runner/src/tests/storage.rs | 8 +- .../vm_runner/src/tests/storage_writer.rs | 65 ++++-- 23 files changed, 734 insertions(+), 206 deletions(-) create mode 100644 core/lib/vm_interface/src/storage/snapshot.rs diff --git a/Cargo.lock b/Cargo.lock index 07519d68aac5..413f76e68e3a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9783,6 +9783,7 @@ dependencies = [ "assert_matches", "hex", "serde", + "serde_json", "thiserror", "tracing", "zksync_contracts", @@ -9795,6 +9796,7 @@ name = "zksync_vm_runner" version = "0.1.0" dependencies = [ "anyhow", + "assert_matches", "async-trait", "backon", "dashmap", diff --git a/core/lib/config/src/configs/experimental.rs b/core/lib/config/src/configs/experimental.rs index 097f3c4112b3..618cfd3d388c 100644 --- a/core/lib/config/src/configs/experimental.rs +++ b/core/lib/config/src/configs/experimental.rs @@ -65,8 +65,7 @@ pub struct ExperimentalVmPlaygroundConfig { #[serde(default)] pub fast_vm_mode: FastVmMode, /// Path to the RocksDB cache directory. - #[serde(default = "ExperimentalVmPlaygroundConfig::default_db_path")] - pub db_path: String, + pub db_path: Option, /// First L1 batch to consider processed. Will not be used if the processing cursor is persisted, unless the `reset` flag is set. #[serde(default)] pub first_processed_batch: L1BatchNumber, @@ -83,7 +82,7 @@ impl Default for ExperimentalVmPlaygroundConfig { fn default() -> Self { Self { fast_vm_mode: FastVmMode::default(), - db_path: Self::default_db_path(), + db_path: None, first_processed_batch: L1BatchNumber(0), window_size: Self::default_window_size(), reset: false, @@ -92,10 +91,6 @@ impl Default for ExperimentalVmPlaygroundConfig { } impl ExperimentalVmPlaygroundConfig { - pub fn default_db_path() -> String { - "./db/vm_playground".to_owned() - } - pub fn default_window_size() -> NonZeroU32 { NonZeroU32::new(1).unwrap() } diff --git a/core/lib/env_config/src/vm_runner.rs b/core/lib/env_config/src/vm_runner.rs index efaf5d1666c3..730a79dd340a 100644 --- a/core/lib/env_config/src/vm_runner.rs +++ b/core/lib/env_config/src/vm_runner.rs @@ -65,7 +65,7 @@ mod tests { let config = ExperimentalVmConfig::from_env().unwrap(); assert_eq!(config.state_keeper_fast_vm_mode, FastVmMode::New); assert_eq!(config.playground.fast_vm_mode, FastVmMode::Shadow); - assert_eq!(config.playground.db_path, "/db/vm_playground"); + assert_eq!(config.playground.db_path.unwrap(), "/db/vm_playground"); assert_eq!(config.playground.first_processed_batch, L1BatchNumber(123)); assert!(config.playground.reset); @@ -83,6 +83,6 @@ mod tests { lock.remove_env(&["EXPERIMENTAL_VM_PLAYGROUND_DB_PATH"]); let config = ExperimentalVmConfig::from_env().unwrap(); - assert!(!config.playground.db_path.is_empty()); + assert!(config.playground.db_path.is_none()); } } diff --git a/core/lib/protobuf_config/src/experimental.rs b/core/lib/protobuf_config/src/experimental.rs index 7b71dec80344..63fa0ca51eb5 100644 --- a/core/lib/protobuf_config/src/experimental.rs +++ b/core/lib/protobuf_config/src/experimental.rs @@ -80,10 +80,7 @@ impl ProtoRepr for proto::VmPlayground { .transpose() .context("fast_vm_mode")? .map_or_else(FastVmMode::default, |mode| mode.parse()), - db_path: self - .db_path - .clone() - .unwrap_or_else(Self::Type::default_db_path), + db_path: self.db_path.clone(), first_processed_batch: L1BatchNumber(self.first_processed_batch.unwrap_or(0)), window_size: NonZeroU32::new(self.window_size.unwrap_or(1)) .context("window_size cannot be 0")?, @@ -94,7 +91,7 @@ impl ProtoRepr for proto::VmPlayground { fn build(this: &Self::Type) -> Self { Self { fast_vm_mode: Some(proto::FastVmMode::new(this.fast_vm_mode).into()), - db_path: Some(this.db_path.clone()), + db_path: this.db_path.clone(), first_processed_batch: Some(this.first_processed_batch.0), window_size: Some(this.window_size.get()), reset: Some(this.reset), diff --git a/core/lib/protobuf_config/src/proto/config/experimental.proto b/core/lib/protobuf_config/src/proto/config/experimental.proto index 55fb81b56325..5e1d045ca670 100644 --- a/core/lib/protobuf_config/src/proto/config/experimental.proto +++ b/core/lib/protobuf_config/src/proto/config/experimental.proto @@ -28,7 +28,7 @@ enum FastVmMode { // Experimental VM configuration message VmPlayground { optional FastVmMode fast_vm_mode = 1; // optional; if not set, fast VM is not used - optional string db_path = 2; // optional; defaults to `./db/vm_playground` + optional string db_path = 2; // optional; if not set, playground will not use RocksDB cache optional uint32 first_processed_batch = 3; // optional; defaults to 0 optional bool reset = 4; // optional; defaults to false optional uint32 window_size = 5; // optional; non-zero; defaults to 1 diff --git a/core/lib/state/src/lib.rs b/core/lib/state/src/lib.rs index ad5361c4608b..205579552a30 100644 --- a/core/lib/state/src/lib.rs +++ b/core/lib/state/src/lib.rs @@ -20,7 +20,7 @@ pub use self::{ }, shadow_storage::ShadowStorage, storage_factory::{ - BatchDiff, OwnedStorage, PgOrRocksdbStorage, ReadStorageFactory, RocksdbWithMemory, + BatchDiff, CommonStorage, OwnedStorage, ReadStorageFactory, RocksdbWithMemory, }, }; diff --git a/core/lib/state/src/rocksdb/mod.rs b/core/lib/state/src/rocksdb/mod.rs index f866a22a3e52..30c58ca6a0ef 100644 --- a/core/lib/state/src/rocksdb/mod.rs +++ b/core/lib/state/src/rocksdb/mod.rs @@ -347,7 +347,7 @@ impl RocksdbStorage { let to_l1_batch_number = if let Some(to_l1_batch_number) = to_l1_batch_number { if to_l1_batch_number > latest_l1_batch_number { let err = anyhow::anyhow!( - "Requested to update RocksDB to L1 batch number ({current_l1_batch_number}) that \ + "Requested to update RocksDB to L1 batch number ({to_l1_batch_number}) that \ is greater than the last sealed L1 batch number in Postgres ({latest_l1_batch_number})" ); return Err(err.into()); diff --git a/core/lib/state/src/shadow_storage.rs b/core/lib/state/src/shadow_storage.rs index 28d7b997cd1f..d69491e500f2 100644 --- a/core/lib/state/src/shadow_storage.rs +++ b/core/lib/state/src/shadow_storage.rs @@ -1,10 +1,12 @@ +use std::fmt; + use vise::{Counter, Metrics}; use zksync_types::{L1BatchNumber, StorageKey, StorageValue, H256}; use zksync_vm_interface::storage::ReadStorage; -#[allow(clippy::struct_field_names)] #[derive(Debug, Metrics)] #[metrics(prefix = "shadow_storage")] +#[allow(clippy::struct_field_names)] // false positive struct ShadowStorageMetrics { /// Number of mismatches when reading a value from a shadow storage. read_value_mismatch: Counter, @@ -19,24 +21,28 @@ struct ShadowStorageMetrics { #[vise::register] static METRICS: vise::Global = vise::Global::new(); -/// [`ReadStorage`] implementation backed by 2 different backends: -/// source_storage -- backend that will return values for function calls and be the source of truth -/// to_check_storage -- secondary storage, which will verify it's own return values against source_storage -/// Note that if to_check_storage value is different than source value, execution continues and metrics/ logs are emitted. +/// [`ReadStorage`] implementation backed by 2 different backends which are compared for each performed operation. +/// +/// - `Ref` is the backend that will return values for function calls and be the source of truth +/// - `Check` is the secondary storage, which will have its return values verified against `Ref` +/// +/// If `Check` value is different from a value from `Ref`, storage behavior depends on the [panic on divergence](Self::set_panic_on_divergence()) flag. +/// If this flag is set (which it is by default), the storage panics; otherwise, execution continues and metrics / logs are emitted. #[derive(Debug)] -pub struct ShadowStorage<'a> { - source_storage: Box, - to_check_storage: Box, - metrics: &'a ShadowStorageMetrics, +pub struct ShadowStorage { + source_storage: Ref, + to_check_storage: Check, + metrics: &'static ShadowStorageMetrics, l1_batch_number: L1BatchNumber, + panic_on_divergence: bool, } -impl<'a> ShadowStorage<'a> { +impl ShadowStorage { /// Creates a new storage using the 2 underlying [`ReadStorage`]s, first as source, the second to be checked /// against the source. pub fn new( - source_storage: Box, - to_check_storage: Box, + source_storage: Ref, + to_check_storage: Check, l1_batch_number: L1BatchNumber, ) -> Self { Self { @@ -44,35 +50,49 @@ impl<'a> ShadowStorage<'a> { to_check_storage, metrics: &METRICS, l1_batch_number, + panic_on_divergence: true, + } + } + + /// Sets behavior if a storage divergence is detected. + pub fn set_panic_on_divergence(&mut self, panic_on_divergence: bool) { + self.panic_on_divergence = panic_on_divergence; + } + + fn error_or_panic(&self, args: fmt::Arguments<'_>) { + if self.panic_on_divergence { + panic!("{args}"); + } else { + tracing::error!(l1_batch_number = self.l1_batch_number.0, "{args}"); } } } -impl ReadStorage for ShadowStorage<'_> { +impl ReadStorage for ShadowStorage { fn read_value(&mut self, key: &StorageKey) -> StorageValue { - let source_value = self.source_storage.as_mut().read_value(key); - let expected_value = self.to_check_storage.as_mut().read_value(key); + let source_value = self.source_storage.read_value(key); + let expected_value = self.to_check_storage.read_value(key); if source_value != expected_value { self.metrics.read_value_mismatch.inc(); - tracing::error!( + self.error_or_panic(format_args!( "read_value({key:?}) -- l1_batch_number={:?} -- expected source={source_value:?} \ to be equal to to_check={expected_value:?}", self.l1_batch_number - ); + )); } source_value } fn is_write_initial(&mut self, key: &StorageKey) -> bool { - let source_value = self.source_storage.as_mut().is_write_initial(key); - let expected_value = self.to_check_storage.as_mut().is_write_initial(key); + let source_value = self.source_storage.is_write_initial(key); + let expected_value = self.to_check_storage.is_write_initial(key); if source_value != expected_value { self.metrics.is_write_initial_mismatch.inc(); - tracing::error!( + self.error_or_panic(format_args!( "is_write_initial({key:?}) -- l1_batch_number={:?} -- expected source={source_value:?} \ to be equal to to_check={expected_value:?}", self.l1_batch_number - ); + )); } source_value } @@ -82,25 +102,25 @@ impl ReadStorage for ShadowStorage<'_> { let expected_value = self.to_check_storage.load_factory_dep(hash); if source_value != expected_value { self.metrics.load_factory_dep_mismatch.inc(); - tracing::error!( + self.error_or_panic(format_args!( "load_factory_dep({hash:?}) -- l1_batch_number={:?} -- expected source={source_value:?} \ to be equal to to_check={expected_value:?}", - self.l1_batch_number - ); + self.l1_batch_number + )); } source_value } fn get_enumeration_index(&mut self, key: &StorageKey) -> Option { - let source_value = self.source_storage.as_mut().get_enumeration_index(key); - let expected_value = self.to_check_storage.as_mut().get_enumeration_index(key); + let source_value = self.source_storage.get_enumeration_index(key); + let expected_value = self.to_check_storage.get_enumeration_index(key); if source_value != expected_value { - tracing::error!( + self.metrics.get_enumeration_index_mismatch.inc(); + self.error_or_panic(format_args!( "get_enumeration_index({key:?}) -- l1_batch_number={:?} -- \ expected source={source_value:?} to be equal to to_check={expected_value:?}", self.l1_batch_number - ); - self.metrics.get_enumeration_index_mismatch.inc(); + )); } source_value } diff --git a/core/lib/state/src/storage_factory.rs b/core/lib/state/src/storage_factory.rs index e2b5275c48d5..2ef9b249af2e 100644 --- a/core/lib/state/src/storage_factory.rs +++ b/core/lib/state/src/storage_factory.rs @@ -1,4 +1,7 @@ -use std::{collections::HashMap, fmt::Debug}; +use std::{ + collections::{HashMap, HashSet}, + fmt::Debug, +}; use anyhow::Context as _; use async_trait::async_trait; @@ -6,12 +9,13 @@ use tokio::{runtime::Handle, sync::watch}; use zksync_dal::{Connection, ConnectionPool, Core, CoreDal}; use zksync_storage::RocksDB; use zksync_types::{L1BatchNumber, StorageKey, StorageValue, H256}; -use zksync_vm_interface::storage::ReadStorage; +use zksync_utils::u256_to_h256; +use zksync_vm_interface::storage::{ReadStorage, StorageSnapshot, StorageWithSnapshot}; use crate::{PostgresStorage, RocksdbStorage, RocksdbStorageBuilder, StateKeeperColumnFamily}; /// Storage with a static lifetime that can be sent to Tokio tasks etc. -pub type OwnedStorage = PgOrRocksdbStorage<'static>; +pub type OwnedStorage = CommonStorage<'static>; /// Factory that can produce storage instances on demand. The storage type is encapsulated as a type param /// (mostly for testing purposes); the default is [`OwnedStorage`]. @@ -40,7 +44,7 @@ impl ReadStorageFactory for ConnectionPool { ) -> anyhow::Result> { let connection = self.connection().await?; let storage = OwnedStorage::postgres(connection, l1_batch_number).await?; - Ok(Some(storage)) + Ok(Some(storage.into())) } } @@ -65,19 +69,34 @@ pub struct RocksdbWithMemory { pub batch_diffs: Vec, } -/// A [`ReadStorage`] implementation that uses either [`PostgresStorage`] or [`RocksdbStorage`] -/// underneath. +/// Union of all [`ReadStorage`] implementations that are returned by [`ReadStorageFactory`], such as +/// Postgres- and RocksDB-backed storages. +/// +/// Ordinarily, you might want to use the [`OwnedStorage`] type alias instead of using `CommonStorage` directly. +/// The former naming signals that the storage has static lifetime and thus can be sent to Tokio tasks or other threads. #[derive(Debug)] -pub enum PgOrRocksdbStorage<'a> { +pub enum CommonStorage<'a> { /// Implementation over a Postgres connection. Postgres(PostgresStorage<'a>), /// Implementation over a RocksDB cache instance. Rocksdb(RocksdbStorage), /// Implementation over a RocksDB cache instance with in-memory DB diffs. RocksdbWithMemory(RocksdbWithMemory), + /// In-memory storage snapshot with the Postgres storage fallback. + Snapshot(StorageWithSnapshot>), + /// Generic implementation. Should be used for testing purposes only since it has performance penalty because + /// of the dynamic dispatch. + Boxed(Box), } -impl PgOrRocksdbStorage<'static> { +impl<'a> CommonStorage<'a> { + /// Creates a boxed storage. Should be used for testing purposes only. + pub fn boxed(storage: impl ReadStorage + Send + 'a) -> Self { + Self::Boxed(Box::new(storage)) + } +} + +impl CommonStorage<'static> { /// Creates a Postgres-based storage. Because of the `'static` lifetime requirement, `connection` must be /// non-transactional. /// @@ -87,7 +106,7 @@ impl PgOrRocksdbStorage<'static> { pub async fn postgres( mut connection: Connection<'static, Core>, l1_batch_number: L1BatchNumber, - ) -> anyhow::Result { + ) -> anyhow::Result> { let l2_block_number = if let Some((_, l2_block_number)) = connection .blocks_dal() .get_l2_block_range_of_l1_batch(l1_batch_number) @@ -110,11 +129,7 @@ impl PgOrRocksdbStorage<'static> { snapshot_recovery.l2_block_number }; tracing::debug!(%l1_batch_number, %l2_block_number, "Using Postgres-based storage"); - Ok( - PostgresStorage::new_async(Handle::current(), connection, l2_block_number, true) - .await? - .into(), - ) + PostgresStorage::new_async(Handle::current(), connection, l2_block_number, true).await } /// Catches up RocksDB synchronously (i.e. assumes the gap is small) and @@ -153,6 +168,92 @@ impl PgOrRocksdbStorage<'static> { tracing::debug!(%rocksdb_l1_batch_number, "Using RocksDB-based storage"); Ok(Some(rocksdb.into())) } + + /// Creates a storage snapshot. Require protective reads to be persisted for the batch, otherwise + /// will return `Ok(None)`. + #[tracing::instrument(skip(connection))] + pub async fn snapshot( + connection: &mut Connection<'static, Core>, + l1_batch_number: L1BatchNumber, + ) -> anyhow::Result> { + let Some(header) = connection + .blocks_dal() + .get_l1_batch_header(l1_batch_number) + .await? + else { + return Ok(None); + }; + let bytecode_hashes: HashSet<_> = header + .used_contract_hashes + .into_iter() + .map(u256_to_h256) + .collect(); + + // Check protective reads early on. + let protective_reads = connection + .storage_logs_dedup_dal() + .get_protective_reads_for_l1_batch(l1_batch_number) + .await?; + if protective_reads.is_empty() { + tracing::debug!("No protective reads for batch"); + return Ok(None); + } + let protective_reads_len = protective_reads.len(); + tracing::debug!("Loaded {protective_reads_len} protective reads"); + + let touched_slots = connection + .storage_logs_dal() + .get_touched_slots_for_l1_batch(l1_batch_number) + .await?; + tracing::debug!("Loaded {} touched keys", touched_slots.len()); + + let all_accessed_keys: Vec<_> = protective_reads + .into_iter() + .map(|key| key.hashed_key()) + .chain(touched_slots.into_keys()) + .collect(); + let previous_values = connection + .storage_logs_dal() + .get_previous_storage_values(&all_accessed_keys, l1_batch_number) + .await?; + tracing::debug!( + "Obtained {} previous values for accessed keys", + previous_values.len() + ); + let initial_write_info = connection + .storage_logs_dal() + .get_l1_batches_and_indices_for_initial_writes(&all_accessed_keys) + .await?; + tracing::debug!("Obtained initial write info for accessed keys"); + + let bytecodes = connection + .factory_deps_dal() + .get_factory_deps(&bytecode_hashes) + .await; + tracing::debug!("Loaded {} bytecodes used in the batch", bytecodes.len()); + let factory_deps = bytecodes + .into_iter() + .map(|(hash_u256, words)| { + let bytes: Vec = words.into_iter().flatten().collect(); + (u256_to_h256(hash_u256), bytes) + }) + .collect(); + + let storage = previous_values.into_iter().map(|(key, prev_value)| { + let prev_value = prev_value.unwrap_or_default(); + let enum_index = + initial_write_info + .get(&key) + .copied() + .and_then(|(l1_batch, enum_index)| { + // Filter out enum indexes assigned "in the future" + (l1_batch < l1_batch_number).then_some(enum_index) + }); + (key, enum_index.map(|idx| (prev_value, idx))) + }); + let storage = storage.collect(); + Ok(Some(StorageSnapshot::new(storage, factory_deps))) + } } impl ReadStorage for RocksdbWithMemory { @@ -203,12 +304,14 @@ impl ReadStorage for RocksdbWithMemory { } } -impl ReadStorage for PgOrRocksdbStorage<'_> { +impl ReadStorage for CommonStorage<'_> { fn read_value(&mut self, key: &StorageKey) -> StorageValue { match self { Self::Postgres(postgres) => postgres.read_value(key), Self::Rocksdb(rocksdb) => rocksdb.read_value(key), Self::RocksdbWithMemory(rocksdb_mem) => rocksdb_mem.read_value(key), + Self::Snapshot(snapshot) => snapshot.read_value(key), + Self::Boxed(storage) => storage.read_value(key), } } @@ -217,6 +320,8 @@ impl ReadStorage for PgOrRocksdbStorage<'_> { Self::Postgres(postgres) => postgres.is_write_initial(key), Self::Rocksdb(rocksdb) => rocksdb.is_write_initial(key), Self::RocksdbWithMemory(rocksdb_mem) => rocksdb_mem.is_write_initial(key), + Self::Snapshot(snapshot) => snapshot.is_write_initial(key), + Self::Boxed(storage) => storage.is_write_initial(key), } } @@ -225,6 +330,8 @@ impl ReadStorage for PgOrRocksdbStorage<'_> { Self::Postgres(postgres) => postgres.load_factory_dep(hash), Self::Rocksdb(rocksdb) => rocksdb.load_factory_dep(hash), Self::RocksdbWithMemory(rocksdb_mem) => rocksdb_mem.load_factory_dep(hash), + Self::Snapshot(snapshot) => snapshot.load_factory_dep(hash), + Self::Boxed(storage) => storage.load_factory_dep(hash), } } @@ -233,18 +340,26 @@ impl ReadStorage for PgOrRocksdbStorage<'_> { Self::Postgres(postgres) => postgres.get_enumeration_index(key), Self::Rocksdb(rocksdb) => rocksdb.get_enumeration_index(key), Self::RocksdbWithMemory(rocksdb_mem) => rocksdb_mem.get_enumeration_index(key), + Self::Snapshot(snapshot) => snapshot.get_enumeration_index(key), + Self::Boxed(storage) => storage.get_enumeration_index(key), } } } -impl<'a> From> for PgOrRocksdbStorage<'a> { +impl<'a> From> for CommonStorage<'a> { fn from(value: PostgresStorage<'a>) -> Self { Self::Postgres(value) } } -impl<'a> From for PgOrRocksdbStorage<'a> { +impl From for CommonStorage<'_> { fn from(value: RocksdbStorage) -> Self { Self::Rocksdb(value) } } + +impl<'a> From>> for CommonStorage<'a> { + fn from(value: StorageWithSnapshot>) -> Self { + Self::Snapshot(value) + } +} diff --git a/core/lib/vm_interface/Cargo.toml b/core/lib/vm_interface/Cargo.toml index a82c6ddadab5..8fc7883f1df7 100644 --- a/core/lib/vm_interface/Cargo.toml +++ b/core/lib/vm_interface/Cargo.toml @@ -22,3 +22,4 @@ tracing.workspace = true [dev-dependencies] assert_matches.workspace = true +serde_json.workspace = true diff --git a/core/lib/vm_interface/src/storage/mod.rs b/core/lib/vm_interface/src/storage/mod.rs index 96cc1f19862c..9b92ef8b7705 100644 --- a/core/lib/vm_interface/src/storage/mod.rs +++ b/core/lib/vm_interface/src/storage/mod.rs @@ -5,10 +5,12 @@ use zksync_types::{get_known_code_key, StorageKey, StorageValue, H256}; pub use self::{ // Note, that `test_infra` of the bootloader tests relies on this value to be exposed in_memory::{InMemoryStorage, IN_MEMORY_STORAGE_DEFAULT_NETWORK_ID}, + snapshot::{StorageSnapshot, StorageWithSnapshot}, view::{ImmutableStorageView, StorageView, StorageViewCache, StorageViewMetrics}, }; mod in_memory; +mod snapshot; mod view; /// Functionality to read from the VM storage. diff --git a/core/lib/vm_interface/src/storage/snapshot.rs b/core/lib/vm_interface/src/storage/snapshot.rs new file mode 100644 index 000000000000..a0175ff478a3 --- /dev/null +++ b/core/lib/vm_interface/src/storage/snapshot.rs @@ -0,0 +1,189 @@ +use std::{collections::HashMap, fmt}; + +use serde::{Deserialize, Serialize}; +use zksync_types::{web3, StorageKey, StorageValue, H256}; + +use super::ReadStorage; + +/// Self-sufficient or almost self-sufficient storage snapshot for a particular VM execution (e.g., executing a single L1 batch). +/// +/// `StorageSnapshot` works somewhat similarly to [`InMemoryStorage`](super::InMemoryStorage), but has different semantics +/// and use cases. `InMemoryStorage` is intended to be a modifiable storage to be used primarily in tests / benchmarks. +/// In contrast, `StorageSnapshot` cannot be modified once created and is intended to represent a complete or almost complete snapshot +/// for a particular VM execution. It can serve as a preloaded cache for a certain [`ReadStorage`] implementation +/// that significantly reduces the number of storage accesses. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StorageSnapshot { + // `Option` encompasses entire map value for more efficient serialization + storage: HashMap>, + // `Bytes` are used to have efficient serialization + factory_deps: HashMap, +} + +impl StorageSnapshot { + /// Creates a new storage snapshot. + /// + /// # Arguments + /// + /// - `storage` should contain all storage slots accessed during VM execution, i.e. protective reads + initial / repeated writes + /// for batch execution, keyed by the hashed storage key. `None` map values correspond to accessed slots without an assigned enum index. + /// By definition, all these slots are guaranteed to have zero value. + pub fn new( + storage: HashMap>, + factory_deps: HashMap>, + ) -> Self { + Self { + storage, + factory_deps: factory_deps + .into_iter() + .map(|(hash, bytecode)| (hash, web3::Bytes(bytecode))) + .collect(), + } + } + + /// Creates a [`ReadStorage`] implementation based on this snapshot and the provided fallback implementation. + /// Fallback will be called for storage slots / factory deps not in this snapshot (which, if this snapshot + /// is reasonably constructed, would be a rare occurrence). If `shadow` flag is set, the fallback will be + /// consulted for *every* operation; this obviously harms performance and is mostly useful for testing. + /// + /// The caller is responsible for ensuring that the fallback actually corresponds to the snapshot. + pub fn with_fallback( + self, + fallback: S, + shadow: bool, + ) -> StorageWithSnapshot { + StorageWithSnapshot { + snapshot: self, + fallback, + shadow, + } + } +} + +/// [`StorageSnapshot`] wrapper implementing [`ReadStorage`] trait. Created using [`with_fallback()`](StorageSnapshot::with_fallback()). +/// +/// # Why fallback? +/// +/// The reason we require a fallback is that it may be difficult to create a 100%-complete snapshot in the general case. +/// E.g., for batch execution, the data is mostly present in Postgres (provided that protective reads are recorded), +/// but in some scenarios, accessed slots may be not recorded anywhere (e.g., if a slot is written to and then reverted in the same block). +/// In practice, there are order of 10 such slots for a mainnet batch with ~5,000 transactions / ~35,000 accessed slots; +/// i.e., snapshots still can provide a good speed-up boost. +#[derive(Debug)] +pub struct StorageWithSnapshot { + snapshot: StorageSnapshot, + fallback: S, + shadow: bool, +} + +impl StorageWithSnapshot { + fn fallback( + &mut self, + operation: fmt::Arguments<'_>, + value: Option, + f: impl FnOnce(&mut S) -> T, + ) -> T { + if let Some(value) = value { + if self.shadow { + let fallback_value = f(&mut self.fallback); + assert_eq!(value, fallback_value, "mismatch in {operation} output"); + } + return value; + } + tracing::trace!("Output for {operation} is missing in snapshot"); + f(&mut self.fallback) + } +} + +impl ReadStorage for StorageWithSnapshot { + fn read_value(&mut self, key: &StorageKey) -> StorageValue { + let value = self + .snapshot + .storage + .get(&key.hashed_key()) + .map(|entry| entry.unwrap_or_default().0); + self.fallback(format_args!("read_value({key:?})"), value, |storage| { + storage.read_value(key) + }) + } + + fn is_write_initial(&mut self, key: &StorageKey) -> bool { + let is_initial = self + .snapshot + .storage + .get(&key.hashed_key()) + .map(Option::is_none); + self.fallback( + format_args!("is_write_initial({key:?})"), + is_initial, + |storage| storage.is_write_initial(key), + ) + } + + fn load_factory_dep(&mut self, hash: H256) -> Option> { + let dep = self + .snapshot + .factory_deps + .get(&hash) + .map(|dep| Some(dep.0.clone())); + self.fallback(format_args!("load_factory_dep({hash})"), dep, |storage| { + storage.load_factory_dep(hash) + }) + } + + fn get_enumeration_index(&mut self, key: &StorageKey) -> Option { + let enum_index = self + .snapshot + .storage + .get(&key.hashed_key()) + .map(|entry| entry.map(|(_, idx)| idx)); + self.fallback( + format_args!("get_enumeration_index({key:?})"), + enum_index, + |storage| storage.get_enumeration_index(key), + ) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn serializing_snapshot_to_json() { + let snapshot = StorageSnapshot::new( + HashMap::from([ + (H256::repeat_byte(1), Some((H256::from_low_u64_be(1), 10))), + ( + H256::repeat_byte(0x23), + Some((H256::from_low_u64_be(100), 100)), + ), + (H256::repeat_byte(0xff), None), + ]), + HashMap::from([(H256::repeat_byte(2), (0..32).collect())]), + ); + let expected_json = serde_json::json!({ + "storage": { + "0x0101010101010101010101010101010101010101010101010101010101010101": [ + "0x0000000000000000000000000000000000000000000000000000000000000001", + 10, + ], + "0x2323232323232323232323232323232323232323232323232323232323232323": [ + "0x0000000000000000000000000000000000000000000000000000000000000064", + 100, + ], + "0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff": null, + }, + "factory_deps": { + "0x0202020202020202020202020202020202020202020202020202020202020202": + "0x000102030405060708090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f", + }, + }); + let actual_json = serde_json::to_value(&snapshot).unwrap(); + assert_eq!(actual_json, expected_json); + + let restored: StorageSnapshot = serde_json::from_value(actual_json).unwrap(); + assert_eq!(restored.storage, snapshot.storage); + assert_eq!(restored.factory_deps, snapshot.factory_deps); + } +} diff --git a/core/node/node_framework/src/implementations/layers/vm_runner/playground.rs b/core/node/node_framework/src/implementations/layers/vm_runner/playground.rs index 4fe091f56468..ee1be98319b3 100644 --- a/core/node/node_framework/src/implementations/layers/vm_runner/playground.rs +++ b/core/node/node_framework/src/implementations/layers/vm_runner/playground.rs @@ -3,7 +3,10 @@ use zksync_config::configs::ExperimentalVmPlaygroundConfig; use zksync_node_framework_derive::{FromContext, IntoContext}; use zksync_types::L2ChainId; use zksync_vm_runner::{ - impls::{VmPlayground, VmPlaygroundCursorOptions, VmPlaygroundIo, VmPlaygroundLoaderTask}, + impls::{ + VmPlayground, VmPlaygroundCursorOptions, VmPlaygroundIo, VmPlaygroundLoaderTask, + VmPlaygroundStorageOptions, + }, ConcurrentOutputHandlerFactoryTask, }; @@ -45,7 +48,7 @@ pub struct Output { #[context(task)] pub output_handler_factory_task: ConcurrentOutputHandlerFactoryTask, #[context(task)] - pub loader_task: VmPlaygroundLoaderTask, + pub loader_task: Option, #[context(task)] pub playground: VmPlayground, } @@ -85,10 +88,15 @@ impl WiringLayer for VmPlaygroundLayer { window_size: self.config.window_size, reset_state: self.config.reset, }; + let storage = if let Some(path) = self.config.db_path { + VmPlaygroundStorageOptions::Rocksdb(path) + } else { + VmPlaygroundStorageOptions::Snapshots { shadow: false } + }; let (playground, tasks) = VmPlayground::new( connection_pool, self.config.fast_vm_mode, - self.config.db_path, + storage, self.zksync_network_id, cursor, ) @@ -125,6 +133,6 @@ impl Task for VmPlayground { } async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { - (*self).run(&stop_receiver.0).await + (*self).run(stop_receiver.0).await } } diff --git a/core/node/state_keeper/src/state_keeper_storage.rs b/core/node/state_keeper/src/state_keeper_storage.rs index 1b35f8ef73d0..f29115f9570e 100644 --- a/core/node/state_keeper/src/state_keeper_storage.rs +++ b/core/node/state_keeper/src/state_keeper_storage.rs @@ -70,7 +70,9 @@ impl ReadStorageFactory for AsyncRocksdbCache { Ok(storage) } else { Ok(Some( - OwnedStorage::postgres(connection, l1_batch_number).await?, + OwnedStorage::postgres(connection, l1_batch_number) + .await? + .into(), )) } } diff --git a/core/node/vm_runner/Cargo.toml b/core/node/vm_runner/Cargo.toml index cc6313fa5727..565b33c0c347 100644 --- a/core/node/vm_runner/Cargo.toml +++ b/core/node/vm_runner/Cargo.toml @@ -37,6 +37,7 @@ vise.workspace = true zksync_node_test_utils.workspace = true zksync_node_genesis.workspace = true zksync_test_account.workspace = true +assert_matches.workspace = true backon.workspace = true futures = { workspace = true, features = ["compat"] } rand.workspace = true diff --git a/core/node/vm_runner/src/impls/mod.rs b/core/node/vm_runner/src/impls/mod.rs index 0911aec0561d..6b2f5dd0667f 100644 --- a/core/node/vm_runner/src/impls/mod.rs +++ b/core/node/vm_runner/src/impls/mod.rs @@ -10,7 +10,7 @@ pub use self::{ }, playground::{ VmPlayground, VmPlaygroundCursorOptions, VmPlaygroundIo, VmPlaygroundLoaderTask, - VmPlaygroundTasks, + VmPlaygroundStorageOptions, VmPlaygroundTasks, }, protective_reads::{ProtectiveReadsIo, ProtectiveReadsWriter, ProtectiveReadsWriterTasks}, }; diff --git a/core/node/vm_runner/src/impls/playground.rs b/core/node/vm_runner/src/impls/playground.rs index ad5623a1329d..461d36116096 100644 --- a/core/node/vm_runner/src/impls/playground.rs +++ b/core/node/vm_runner/src/impls/playground.rs @@ -19,6 +19,7 @@ use zksync_state_keeper::{MainBatchExecutor, StateKeeperOutputHandler, UpdatesMa use zksync_types::{vm::FastVmMode, L1BatchNumber, L2ChainId}; use crate::{ + storage::{PostgresLoader, StorageLoader}, ConcurrentOutputHandlerFactory, ConcurrentOutputHandlerFactoryTask, OutputHandlerFactory, StorageSyncTask, VmRunner, VmRunnerIo, VmRunnerStorage, }; @@ -35,6 +36,20 @@ impl From for Health { } } +/// Options configuring the storage loader for VM playground. +#[derive(Debug)] +#[non_exhaustive] +pub enum VmPlaygroundStorageOptions { + /// Use RocksDB cache. + Rocksdb(String), + /// Use prefetched batch snapshots (with fallback to Postgres if protective reads are not available for a batch). + Snapshots { + /// Whether to shadow snapshot storage with Postgres. This degrades performance and is mostly useful + /// to test snapshot correctness. + shadow: bool, + }, +} + /// Options related to the VM playground cursor. #[derive(Debug)] pub struct VmPlaygroundCursorOptions { @@ -46,16 +61,29 @@ pub struct VmPlaygroundCursorOptions { pub reset_state: bool, } +#[derive(Debug)] +enum VmPlaygroundStorage { + Rocksdb { + path: String, + task_sender: oneshot::Sender>, + }, + Snapshots { + shadow: bool, + }, +} + /// Virtual machine playground. Does not persist anything in Postgres; instead, keeps an L1 batch cursor as a plain text file in the RocksDB directory /// (so that the playground doesn't repeatedly process same batches after a restart). +/// +/// If the RocksDB directory is not specified, the playground works in the ephemeral mode: it takes all inputs from Postgres, doesn't maintain cache +/// and doesn't persist the processed batch cursor. This is mostly useful for debugging purposes. #[derive(Debug)] pub struct VmPlayground { pool: ConnectionPool, batch_executor: MainBatchExecutor, - rocksdb_path: String, + storage: VmPlaygroundStorage, chain_id: L2ChainId, io: VmPlaygroundIo, - loader_task_sender: oneshot::Sender>, output_handler_factory: ConcurrentOutputHandlerFactory, reset_to_batch: Option, @@ -66,14 +94,30 @@ impl VmPlayground { pub async fn new( pool: ConnectionPool, vm_mode: FastVmMode, - rocksdb_path: String, + storage: VmPlaygroundStorageOptions, chain_id: L2ChainId, cursor: VmPlaygroundCursorOptions, ) -> anyhow::Result<(Self, VmPlaygroundTasks)> { - tracing::info!("Starting VM playground with mode {vm_mode:?}, cursor options: {cursor:?}"); + tracing::info!("Starting VM playground with mode {vm_mode:?}, storage: {storage:?}, cursor options: {cursor:?}"); - let cursor_file_path = Path::new(&rocksdb_path).join("__vm_playground_cursor"); - let latest_processed_batch = VmPlaygroundIo::read_cursor(&cursor_file_path).await?; + let cursor_file_path = match &storage { + VmPlaygroundStorageOptions::Rocksdb(path) => { + Some(Path::new(path).join("__vm_playground_cursor")) + } + VmPlaygroundStorageOptions::Snapshots { .. } => { + tracing::warn!( + "RocksDB cache is disabled; this can lead to significant performance degradation. Additionally, VM playground progress won't be persisted. \ + If this is not intended, set the cache path in app config" + ); + None + } + }; + + let latest_processed_batch = if let Some(path) = &cursor_file_path { + VmPlaygroundIo::read_cursor(path).await? + } else { + None + }; tracing::info!("Latest processed batch: {latest_processed_batch:?}"); let latest_processed_batch = if cursor.reset_state { cursor.first_processed_batch @@ -97,24 +141,33 @@ impl VmPlayground { io.clone(), VmPlaygroundOutputHandler, ); - let (loader_task_sender, loader_task_receiver) = oneshot::channel(); + let (storage, loader_task) = match storage { + VmPlaygroundStorageOptions::Rocksdb(path) => { + let (task_sender, task_receiver) = oneshot::channel(); + let rocksdb = VmPlaygroundStorage::Rocksdb { path, task_sender }; + let loader_task = VmPlaygroundLoaderTask { + inner: task_receiver, + }; + (rocksdb, Some(loader_task)) + } + VmPlaygroundStorageOptions::Snapshots { shadow } => { + (VmPlaygroundStorage::Snapshots { shadow }, None) + } + }; let this = Self { pool, batch_executor, - rocksdb_path, + storage, chain_id, io, - loader_task_sender, output_handler_factory, reset_to_batch: cursor.reset_state.then_some(cursor.first_processed_batch), }; Ok(( this, VmPlaygroundTasks { - loader_task: VmPlaygroundLoaderTask { - inner: loader_task_receiver, - }, + loader_task, output_handler_factory_task, }, )) @@ -132,7 +185,12 @@ impl VmPlayground { #[tracing::instrument(skip(self), err)] async fn reset_rocksdb_cache(&self, last_retained_batch: L1BatchNumber) -> anyhow::Result<()> { - let builder = RocksdbStorage::builder(self.rocksdb_path.as_ref()).await?; + let VmPlaygroundStorage::Rocksdb { path, .. } = &self.storage else { + tracing::warn!("No RocksDB path specified; skipping resetting cache"); + return Ok(()); + }; + + let builder = RocksdbStorage::builder(path.as_ref()).await?; let current_l1_batch = builder.l1_batch_number().await; if current_l1_batch <= Some(last_retained_batch) { tracing::info!("Resetting RocksDB cache is not required: its current batch #{current_l1_batch:?} is lower than the target"); @@ -150,10 +208,12 @@ impl VmPlayground { /// # Errors /// /// Propagates RocksDB and Postgres errors. - pub async fn run(self, stop_receiver: &watch::Receiver) -> anyhow::Result<()> { - fs::create_dir_all(&self.rocksdb_path) - .await - .with_context(|| format!("cannot create dir `{}`", self.rocksdb_path))?; + pub async fn run(self, stop_receiver: watch::Receiver) -> anyhow::Result<()> { + if let VmPlaygroundStorage::Rocksdb { path, .. } = &self.storage { + fs::create_dir_all(path) + .await + .with_context(|| format!("cannot create dir `{path}`"))?; + } if let Some(reset_to_batch) = self.reset_to_batch { self.io.health_updater.update(HealthStatus::Affected.into()); @@ -168,22 +228,28 @@ impl VmPlayground { self.io.update_health(); - let (loader, loader_task) = VmRunnerStorage::new( - self.pool.clone(), - self.rocksdb_path, - self.io.clone(), - self.chain_id, - ) - .await?; - self.loader_task_sender.send(loader_task).ok(); + let loader: Arc = match self.storage { + VmPlaygroundStorage::Rocksdb { path, task_sender } => { + let (loader, loader_task) = + VmRunnerStorage::new(self.pool.clone(), path, self.io.clone(), self.chain_id) + .await?; + task_sender.send(loader_task).ok(); + Arc::new(loader) + } + VmPlaygroundStorage::Snapshots { shadow } => { + let mut loader = PostgresLoader::new(self.pool.clone(), self.chain_id).await?; + loader.shadow_snapshots(shadow); + Arc::new(loader) + } + }; let vm_runner = VmRunner::new( self.pool, Box::new(self.io), - Arc::new(loader), + loader, Box::new(self.output_handler_factory), Box::new(self.batch_executor), ); - vm_runner.run(stop_receiver).await + vm_runner.run(&stop_receiver).await } } @@ -212,7 +278,7 @@ impl VmPlaygroundLoaderTask { #[derive(Debug)] pub struct VmPlaygroundTasks { /// Task that synchronizes storage with new available batches. - pub loader_task: VmPlaygroundLoaderTask, + pub loader_task: Option, /// Task that handles output from processed batches. pub output_handler_factory_task: ConcurrentOutputHandlerFactoryTask, } @@ -220,7 +286,7 @@ pub struct VmPlaygroundTasks { /// I/O powering [`VmPlayground`]. #[derive(Debug, Clone)] pub struct VmPlaygroundIo { - cursor_file_path: PathBuf, + cursor_file_path: Option, vm_mode: FastVmMode, window_size: u32, // We don't read this value from the cursor file in the `VmRunnerIo` implementation because reads / writes @@ -247,15 +313,16 @@ impl VmPlaygroundIo { } async fn write_cursor(&self, cursor: L1BatchNumber) -> anyhow::Result<()> { + let Some(cursor_file_path) = &self.cursor_file_path else { + return Ok(()); + }; let buffer = cursor.to_string(); - fs::write(&self.cursor_file_path, buffer) - .await - .with_context(|| { - format!( - "failed writing VM playground cursor to `{}`", - self.cursor_file_path.display() - ) - }) + fs::write(cursor_file_path, buffer).await.with_context(|| { + format!( + "failed writing VM playground cursor to `{}`", + cursor_file_path.display() + ) + }) } fn update_health(&self) { diff --git a/core/node/vm_runner/src/storage.rs b/core/node/vm_runner/src/storage.rs index e351b09ad2bf..d08ef2830f3f 100644 --- a/core/node/vm_runner/src/storage.rs +++ b/core/node/vm_runner/src/storage.rs @@ -37,6 +37,69 @@ pub trait StorageLoader: 'static + Send + Sync + fmt::Debug { ) -> anyhow::Result>; } +/// Simplified storage loader that always gets data from Postgres (i.e., doesn't do RocksDB caching). +#[derive(Debug)] +pub(crate) struct PostgresLoader { + pool: ConnectionPool, + l1_batch_params_provider: L1BatchParamsProvider, + chain_id: L2ChainId, + shadow_snapshots: bool, +} + +impl PostgresLoader { + pub async fn new(pool: ConnectionPool, chain_id: L2ChainId) -> anyhow::Result { + let mut l1_batch_params_provider = L1BatchParamsProvider::new(); + let mut conn = pool.connection().await?; + l1_batch_params_provider.initialize(&mut conn).await?; + Ok(Self { + pool, + l1_batch_params_provider, + chain_id, + shadow_snapshots: true, + }) + } + + /// Enables or disables snapshot storage shadowing. + pub fn shadow_snapshots(&mut self, shadow_snapshots: bool) { + self.shadow_snapshots = shadow_snapshots; + } +} + +#[async_trait] +impl StorageLoader for PostgresLoader { + #[tracing::instrument(skip_all, l1_batch_number = l1_batch_number.0)] + async fn load_batch( + &self, + l1_batch_number: L1BatchNumber, + ) -> anyhow::Result> { + let mut conn = self.pool.connection().await?; + let Some(data) = load_batch_execute_data( + &mut conn, + l1_batch_number, + &self.l1_batch_params_provider, + self.chain_id, + ) + .await? + else { + return Ok(None); + }; + + if let Some(snapshot) = OwnedStorage::snapshot(&mut conn, l1_batch_number).await? { + let postgres = OwnedStorage::postgres(conn, l1_batch_number - 1).await?; + let storage = snapshot.with_fallback(postgres, self.shadow_snapshots); + let storage = OwnedStorage::from(storage); + return Ok(Some((data, storage))); + } + + tracing::info!( + "Incomplete data to create storage snapshot for batch; will use sequential storage" + ); + let conn = self.pool.connection().await?; + let storage = OwnedStorage::postgres(conn, l1_batch_number - 1).await?; + Ok(Some((data, storage.into()))) + } +} + /// Data needed to execute an L1 batch. #[derive(Debug, Clone)] pub struct BatchExecuteData { @@ -142,7 +205,7 @@ impl StorageLoader for VmRunnerStorage { return Ok(if let Some(data) = batch_data { let storage = OwnedStorage::postgres(conn, l1_batch_number - 1).await?; - Some((data, storage)) + Some((data, storage.into())) } else { None }); diff --git a/core/node/vm_runner/src/tests/mod.rs b/core/node/vm_runner/src/tests/mod.rs index dd14e4dd1b0e..525a306eabf5 100644 --- a/core/node/vm_runner/src/tests/mod.rs +++ b/core/node/vm_runner/src/tests/mod.rs @@ -3,30 +3,27 @@ use std::{collections::HashMap, ops, sync::Arc, time::Duration}; use async_trait::async_trait; use rand::{prelude::SliceRandom, Rng}; use tokio::sync::RwLock; -use zksync_contracts::BaseSystemContractsHashes; use zksync_dal::{Connection, ConnectionPool, Core, CoreDal}; use zksync_multivm::interface::TransactionExecutionMetrics; +use zksync_node_genesis::GenesisParams; use zksync_node_test_utils::{ create_l1_batch_metadata, create_l2_block, execute_l2_transaction, l1_batch_metadata_to_commitment_artifacts, }; -use zksync_state::OwnedStorage; use zksync_state_keeper::{StateKeeperOutputHandler, UpdatesManager}; use zksync_test_account::Account; use zksync_types::{ - block::{BlockGasCount, L1BatchHeader, L2BlockHasher}, + block::{L1BatchHeader, L2BlockHasher}, fee::Fee, get_intrinsic_constants, l2::L2Tx, utils::storage_key_for_standard_token_balance, - AccountTreeId, Address, Execute, L1BatchNumber, L2BlockNumber, L2ChainId, ProtocolVersionId, - StorageKey, StorageLog, StorageLogKind, StorageValue, H160, H256, L2_BASE_TOKEN_ADDRESS, U256, + AccountTreeId, Address, Execute, L1BatchNumber, L2BlockNumber, ProtocolVersionId, StorageKey, + StorageLog, StorageLogKind, StorageValue, H160, H256, L2_BASE_TOKEN_ADDRESS, U256, }; -use zksync_utils::u256_to_h256; -use zksync_vm_utils::storage::L1BatchParamsProvider; +use zksync_utils::{bytecode::hash_bytecode, h256_to_u256, u256_to_h256}; -use super::{BatchExecuteData, OutputHandlerFactory, VmRunnerIo}; -use crate::storage::{load_batch_execute_data, StorageLoader}; +use super::{OutputHandlerFactory, VmRunnerIo}; mod output_handler; mod playground; @@ -36,33 +33,6 @@ mod storage_writer; const TEST_TIMEOUT: Duration = Duration::from_secs(10); -/// Simplified storage loader that always gets data from Postgres (i.e., doesn't do RocksDB caching). -#[derive(Debug)] -struct PostgresLoader(ConnectionPool); - -#[async_trait] -impl StorageLoader for PostgresLoader { - async fn load_batch( - &self, - l1_batch_number: L1BatchNumber, - ) -> anyhow::Result> { - let mut conn = self.0.connection().await?; - let Some(data) = load_batch_execute_data( - &mut conn, - l1_batch_number, - &L1BatchParamsProvider::new(), - L2ChainId::default(), - ) - .await? - else { - return Ok(None); - }; - - let storage = OwnedStorage::postgres(conn, l1_batch_number - 1).await?; - Ok(Some((data, storage))) - } -} - #[derive(Debug, Default)] struct IoMock { current: L1BatchNumber, @@ -244,7 +214,7 @@ pub fn create_l2_transaction( async fn store_l1_batches( conn: &mut Connection<'_, Core>, numbers: ops::RangeInclusive, - contract_hashes: BaseSystemContractsHashes, + genesis_params: &GenesisParams, accounts: &mut [Account], ) -> anyhow::Result> { let mut rng = rand::thread_rng(); @@ -308,7 +278,7 @@ async fn store_l1_batches( digest.push_tx_hash(tx.hash()); new_l2_block.hash = digest.finalize(ProtocolVersionId::latest()); - new_l2_block.base_system_contracts_hashes = contract_hashes; + new_l2_block.base_system_contracts_hashes = genesis_params.base_system_contracts().hashes(); new_l2_block.l2_tx_count = 1; conn.blocks_dal().insert_l2_block(&new_l2_block).await?; last_l2_block_hash = new_l2_block.hash; @@ -337,20 +307,24 @@ async fn store_l1_batches( last_l2_block_hash = fictive_l2_block.hash; l2_block_number += 1; - let header = L1BatchHeader::new( + let mut header = L1BatchHeader::new( l1_batch_number, l2_block_number.0 as u64 - 2, // Matches the first L2 block in the batch - BaseSystemContractsHashes::default(), + genesis_params.base_system_contracts().hashes(), ProtocolVersionId::default(), ); - let predicted_gas = BlockGasCount { - commit: 2, - prove: 3, - execute: 10, - }; - conn.blocks_dal() - .insert_l1_batch(&header, &[], predicted_gas, &[], &[], Default::default()) - .await?; + + // Conservatively assume that the bootloader / transactions touch *all* system contracts + default AA. + // By convention, bootloader hash isn't included into `used_contract_hashes`. + header.used_contract_hashes = genesis_params + .system_contracts() + .iter() + .map(|contract| hash_bytecode(&contract.bytecode)) + .chain([genesis_params.base_system_contracts().hashes().default_aa]) + .map(h256_to_u256) + .collect(); + + conn.blocks_dal().insert_mock_l1_batch(&header).await?; conn.blocks_dal() .mark_l2_blocks_as_executed_in_l1_batch(l1_batch_number) .await?; diff --git a/core/node/vm_runner/src/tests/playground.rs b/core/node/vm_runner/src/tests/playground.rs index 2f3caf1f85c7..aaaf4b45b1a4 100644 --- a/core/node/vm_runner/src/tests/playground.rs +++ b/core/node/vm_runner/src/tests/playground.rs @@ -8,9 +8,21 @@ use zksync_state::RocksdbStorage; use zksync_types::vm::FastVmMode; use super::*; -use crate::impls::{VmPlayground, VmPlaygroundCursorOptions, VmPlaygroundTasks}; +use crate::impls::{ + VmPlayground, VmPlaygroundCursorOptions, VmPlaygroundStorageOptions, VmPlaygroundTasks, +}; -async fn setup_storage(pool: &ConnectionPool, batch_count: u32) -> GenesisParams { +impl From<&tempfile::TempDir> for VmPlaygroundStorageOptions { + fn from(dir: &tempfile::TempDir) -> Self { + Self::Rocksdb(dir.path().to_str().unwrap().into()) + } +} + +async fn setup_storage( + pool: &ConnectionPool, + batch_count: u32, + insert_protective_reads: bool, +) -> GenesisParams { let mut conn = pool.connection().await.unwrap(); let genesis_params = GenesisParams::mock(); if !conn.blocks_dal().is_genesis_needed().await.unwrap() { @@ -24,35 +36,46 @@ async fn setup_storage(pool: &ConnectionPool, batch_count: u32) -> Genesis // Generate some batches and persist them in Postgres let mut accounts = [Account::random()]; fund(&mut conn, &accounts).await; - store_l1_batches( - &mut conn, - 1..=batch_count, - genesis_params.base_system_contracts().hashes(), - &mut accounts, - ) - .await - .unwrap(); + store_l1_batches(&mut conn, 1..=batch_count, &genesis_params, &mut accounts) + .await + .unwrap(); // Fill in missing storage logs for all batches so that running VM for all of them works correctly. - storage_writer::write_storage_logs(pool.clone()).await; + storage_writer::write_storage_logs(pool.clone(), insert_protective_reads).await; genesis_params } +#[derive(Debug, Clone, Copy)] +enum StorageLoaderKind { + Cached, + Postgres, + Snapshot, +} + +impl StorageLoaderKind { + const ALL: [Self; 3] = [Self::Cached, Self::Postgres, Self::Snapshot]; +} + async fn run_playground( pool: ConnectionPool, - rocksdb_dir: &tempfile::TempDir, + storage: VmPlaygroundStorageOptions, reset_to: Option, ) { - let genesis_params = setup_storage(&pool, 5).await; + let insert_protective_reads = matches!( + storage, + VmPlaygroundStorageOptions::Snapshots { shadow: true } + ); + let genesis_params = setup_storage(&pool, 5, insert_protective_reads).await; let cursor = VmPlaygroundCursorOptions { first_processed_batch: reset_to.unwrap_or(L1BatchNumber(0)), window_size: NonZeroU32::new(1).unwrap(), reset_state: reset_to.is_some(), }; + let (playground, playground_tasks) = VmPlayground::new( pool.clone(), FastVmMode::Shadow, - rocksdb_dir.path().to_str().unwrap().to_owned(), + storage, genesis_params.config().l2_chain_id, cursor, ) @@ -91,15 +114,17 @@ async fn wait_for_all_batches( let playground_io = playground.io().clone(); let mut completed_batches = playground_io.subscribe_to_completed_batches(); - let task_handles = [ - tokio::spawn(playground_tasks.loader_task.run(stop_receiver.clone())), + let mut task_handles = vec![ tokio::spawn( playground_tasks .output_handler_factory_task .run(stop_receiver.clone()), ), - tokio::spawn(async move { playground.run(&stop_receiver).await }), + tokio::spawn(playground.run(stop_receiver.clone())), ]; + if let Some(loader_task) = playground_tasks.loader_task { + task_handles.push(tokio::spawn(loader_task.run(stop_receiver))); + } // Wait until all batches are processed. let last_batch_number = conn @@ -149,14 +174,40 @@ async fn wait_for_all_batches( async fn vm_playground_basics(reset_state: bool) { let pool = ConnectionPool::test_pool().await; let rocksdb_dir = tempfile::TempDir::new().unwrap(); - run_playground(pool, &rocksdb_dir, reset_state.then_some(L1BatchNumber(0))).await; + run_playground( + pool, + VmPlaygroundStorageOptions::from(&rocksdb_dir), + reset_state.then_some(L1BatchNumber(0)), + ) + .await; } +#[test_casing(2, [false, true])] #[tokio::test] -async fn starting_from_non_zero_batch() { +async fn vm_playground_basics_without_cache(reset_state: bool) { let pool = ConnectionPool::test_pool().await; - let rocksdb_dir = tempfile::TempDir::new().unwrap(); - run_playground(pool, &rocksdb_dir, Some(L1BatchNumber(3))).await; + run_playground( + pool, + VmPlaygroundStorageOptions::Snapshots { shadow: false }, + reset_state.then_some(L1BatchNumber(0)), + ) + .await; +} + +#[test_casing(3, StorageLoaderKind::ALL)] +#[tokio::test] +async fn starting_from_non_zero_batch(storage_loader_kind: StorageLoaderKind) { + let pool = ConnectionPool::test_pool().await; + let rocksdb_dir; + let storage_loader = match storage_loader_kind { + StorageLoaderKind::Cached => { + rocksdb_dir = tempfile::TempDir::new().unwrap(); + VmPlaygroundStorageOptions::from(&rocksdb_dir) + } + StorageLoaderKind::Postgres => VmPlaygroundStorageOptions::Snapshots { shadow: false }, + StorageLoaderKind::Snapshot => VmPlaygroundStorageOptions::Snapshots { shadow: true }, + }; + run_playground(pool, storage_loader, Some(L1BatchNumber(3))).await; } #[test_casing(2, [L1BatchNumber(0), L1BatchNumber(2)])] @@ -164,7 +215,12 @@ async fn starting_from_non_zero_batch() { async fn resetting_playground_state(reset_to: L1BatchNumber) { let pool = ConnectionPool::test_pool().await; let rocksdb_dir = tempfile::TempDir::new().unwrap(); - run_playground(pool.clone(), &rocksdb_dir, None).await; + run_playground( + pool.clone(), + VmPlaygroundStorageOptions::from(&rocksdb_dir), + None, + ) + .await; // Manually catch up RocksDB to Postgres to ensure that resetting it is not trivial. let (_stop_sender, stop_receiver) = watch::channel(false); @@ -176,7 +232,12 @@ async fn resetting_playground_state(reset_to: L1BatchNumber) { .await .unwrap(); - run_playground(pool.clone(), &rocksdb_dir, Some(reset_to)).await; + run_playground( + pool.clone(), + VmPlaygroundStorageOptions::from(&rocksdb_dir), + Some(reset_to), + ) + .await; } #[test_casing(2, [2, 3])] @@ -186,7 +247,7 @@ async fn using_larger_window_size(window_size: u32) { let pool = ConnectionPool::test_pool().await; let rocksdb_dir = tempfile::TempDir::new().unwrap(); - let genesis_params = setup_storage(&pool, 5).await; + let genesis_params = setup_storage(&pool, 5, false).await; let cursor = VmPlaygroundCursorOptions { first_processed_batch: L1BatchNumber(0), window_size: NonZeroU32::new(window_size).unwrap(), @@ -195,7 +256,7 @@ async fn using_larger_window_size(window_size: u32) { let (playground, playground_tasks) = VmPlayground::new( pool.clone(), FastVmMode::Shadow, - rocksdb_dir.path().to_str().unwrap().to_owned(), + VmPlaygroundStorageOptions::from(&rocksdb_dir), genesis_params.config().l2_chain_id, cursor, ) diff --git a/core/node/vm_runner/src/tests/process.rs b/core/node/vm_runner/src/tests/process.rs index 7ea1335db71f..2ac976021e0b 100644 --- a/core/node/vm_runner/src/tests/process.rs +++ b/core/node/vm_runner/src/tests/process.rs @@ -25,17 +25,11 @@ async fn process_batches((batch_count, window): (u32, u32)) -> anyhow::Result<() let mut accounts = vec![Account::random(), Account::random()]; fund(&mut conn, &accounts).await; - store_l1_batches( - &mut conn, - 1..=batch_count, - genesis_params.base_system_contracts().hashes(), - &mut accounts, - ) - .await?; + store_l1_batches(&mut conn, 1..=batch_count, &genesis_params, &mut accounts).await?; drop(conn); // Fill in missing storage logs for all batches so that running VM for all of them works correctly. - storage_writer::write_storage_logs(connection_pool.clone()).await; + storage_writer::write_storage_logs(connection_pool.clone(), true).await; let io = Arc::new(RwLock::new(IoMock { current: 0.into(), diff --git a/core/node/vm_runner/src/tests/storage.rs b/core/node/vm_runner/src/tests/storage.rs index f6f7a2ba9e64..838b469f0ef3 100644 --- a/core/node/vm_runner/src/tests/storage.rs +++ b/core/node/vm_runner/src/tests/storage.rs @@ -115,7 +115,7 @@ async fn rerun_storage_on_existing_data() -> anyhow::Result<()> { let batches = store_l1_batches( &mut connection_pool.connection().await?, 1..=10, - genesis_params.base_system_contracts().hashes(), + &genesis_params, &mut accounts, ) .await?; @@ -212,7 +212,7 @@ async fn continuously_load_new_batches() -> anyhow::Result<()> { store_l1_batches( &mut connection_pool.connection().await?, 1..=1, - genesis_params.base_system_contracts().hashes(), + &genesis_params, &mut accounts, ) .await?; @@ -230,7 +230,7 @@ async fn continuously_load_new_batches() -> anyhow::Result<()> { store_l1_batches( &mut connection_pool.connection().await?, 2..=2, - genesis_params.base_system_contracts().hashes(), + &genesis_params, &mut accounts, ) .await?; @@ -266,7 +266,7 @@ async fn access_vm_runner_storage() -> anyhow::Result<()> { store_l1_batches( &mut connection_pool.connection().await?, batch_range, - genesis_params.base_system_contracts().hashes(), + &genesis_params, &mut accounts, ) .await?; diff --git a/core/node/vm_runner/src/tests/storage_writer.rs b/core/node/vm_runner/src/tests/storage_writer.rs index 4c7a6e0d6612..6cad2da6974a 100644 --- a/core/node/vm_runner/src/tests/storage_writer.rs +++ b/core/node/vm_runner/src/tests/storage_writer.rs @@ -1,14 +1,22 @@ +use assert_matches::assert_matches; +use test_casing::test_casing; use tokio::sync::watch; use zksync_node_genesis::{insert_genesis_batch, GenesisParams}; +use zksync_state::OwnedStorage; use zksync_state_keeper::MainBatchExecutor; +use zksync_types::L2ChainId; use super::*; -use crate::{ConcurrentOutputHandlerFactory, VmRunner}; +use crate::{ + storage::{PostgresLoader, StorageLoader}, + ConcurrentOutputHandlerFactory, VmRunner, +}; #[derive(Debug, Clone)] struct StorageWriterIo { last_processed_batch: Arc>, pool: ConnectionPool, + insert_protective_reads: bool, } impl StorageWriterIo { @@ -115,6 +123,19 @@ impl StateKeeperOutputHandler for StorageWriterIo { .insert_initial_writes(updates_manager.l1_batch.number, &initial_writes) .await?; + if self.insert_protective_reads { + let protective_reads: Vec<_> = finished_batch + .final_execution_state + .deduplicated_storage_logs + .iter() + .filter(|log_query| !log_query.is_write()) + .copied() + .collect(); + conn.storage_logs_dedup_dal() + .insert_protective_reads(updates_manager.l1_batch.number, &protective_reads) + .await?; + } + self.last_processed_batch .send_replace(updates_manager.l1_batch.number); Ok(()) @@ -134,7 +155,7 @@ impl OutputHandlerFactory for StorageWriterIo { /// Writes missing storage logs into Postgres by executing all transactions from it. Useful both for testing `VmRunner`, /// and to fill the storage for multi-batch tests for other components. -pub(super) async fn write_storage_logs(pool: ConnectionPool) { +pub(super) async fn write_storage_logs(pool: ConnectionPool, insert_protective_reads: bool) { let mut conn = pool.connection().await.unwrap(); let sealed_batch = conn .blocks_dal() @@ -146,10 +167,14 @@ pub(super) async fn write_storage_logs(pool: ConnectionPool) { let io = Box::new(StorageWriterIo { last_processed_batch: Arc::new(watch::channel(L1BatchNumber(0)).0), pool: pool.clone(), + insert_protective_reads, }); let mut processed_batch = io.last_processed_batch.subscribe(); - let loader = Arc::new(PostgresLoader(pool.clone())); + let loader = PostgresLoader::new(pool.clone(), L2ChainId::default()) + .await + .unwrap(); + let loader = Arc::new(loader); let batch_executor = Box::new(MainBatchExecutor::new(false, false)); let vm_runner = VmRunner::new(pool, io.clone(), loader, io, batch_executor); let (stop_sender, stop_receiver) = watch::channel(false); @@ -163,8 +188,9 @@ pub(super) async fn write_storage_logs(pool: ConnectionPool) { vm_runner_handle.await.unwrap().unwrap(); } +#[test_casing(2, [false, true])] #[tokio::test] -async fn storage_writer_works() { +async fn storage_writer_works(insert_protective_reads: bool) { let pool = ConnectionPool::::test_pool().await; let mut conn = pool.connection().await.unwrap(); let genesis_params = GenesisParams::mock(); @@ -174,17 +200,12 @@ async fn storage_writer_works() { let mut accounts = [Account::random()]; fund(&mut conn, &accounts).await; - store_l1_batches( - &mut conn, - 1..=5, - genesis_params.base_system_contracts().hashes(), - &mut accounts, - ) - .await - .unwrap(); + store_l1_batches(&mut conn, 1..=5, &genesis_params, &mut accounts) + .await + .unwrap(); drop(conn); - write_storage_logs(pool.clone()).await; + write_storage_logs(pool.clone(), insert_protective_reads).await; // Re-run the VM on all batches to check that storage logs are persisted correctly let (stop_sender, stop_receiver) = watch::channel(false); @@ -192,7 +213,23 @@ async fn storage_writer_works() { current: L1BatchNumber(0), max: 5, })); - let loader = Arc::new(PostgresLoader(pool.clone())); + let loader = PostgresLoader::new(pool.clone(), genesis_params.config().l2_chain_id) + .await + .unwrap(); + let loader = Arc::new(loader); + + // Check that the loader returns expected types of storage. + let (_, batch_storage) = loader + .load_batch(L1BatchNumber(1)) + .await + .unwrap() + .expect("no batch loaded"); + if insert_protective_reads { + assert_matches!(batch_storage, OwnedStorage::Snapshot(_)); + } else { + assert_matches!(batch_storage, OwnedStorage::Postgres(_)); + } + let (output_factory, output_factory_task) = ConcurrentOutputHandlerFactory::new(pool.clone(), io.clone(), TestOutputFactory::default()); let output_factory_handle = tokio::spawn(output_factory_task.run(stop_receiver.clone()));