diff --git a/core/bin/external_node/src/node_builder.rs b/core/bin/external_node/src/node_builder.rs index 14e09b9c2a7a..7d8489013535 100644 --- a/core/bin/external_node/src/node_builder.rs +++ b/core/bin/external_node/src/node_builder.rs @@ -378,6 +378,7 @@ impl ExternalNodeBuilder { factory_deps_cache_size: self.config.optional.factory_deps_cache_size() as u64, initial_writes_cache_size: self.config.optional.initial_writes_cache_size() as u64, latest_values_cache_size: self.config.optional.latest_values_cache_size() as u64, + latest_values_max_block_lag: 20, // reasonable default }; let max_vm_concurrency = self.config.optional.vm_concurrency_limit; let tx_sender_layer = TxSenderLayer::new( diff --git a/core/bin/zksync_server/src/node_builder.rs b/core/bin/zksync_server/src/node_builder.rs index 4600b0f9e543..902141ab4ca5 100644 --- a/core/bin/zksync_server/src/node_builder.rs +++ b/core/bin/zksync_server/src/node_builder.rs @@ -307,6 +307,7 @@ impl MainNodeBuilder { factory_deps_cache_size: rpc_config.factory_deps_cache_size() as u64, initial_writes_cache_size: rpc_config.initial_writes_cache_size() as u64, latest_values_cache_size: rpc_config.latest_values_cache_size() as u64, + latest_values_max_block_lag: rpc_config.latest_values_max_block_lag(), }; // On main node we always use master pool sink. diff --git a/core/lib/config/src/configs/api.rs b/core/lib/config/src/configs/api.rs index 86c9ebd074d8..ce0d96129584 100644 --- a/core/lib/config/src/configs/api.rs +++ b/core/lib/config/src/configs/api.rs @@ -189,6 +189,10 @@ pub struct Web3JsonRpcConfig { /// Latest values cache size in MiBs. The default value is 128 MiB. If set to 0, the latest /// values cache will be disabled. pub latest_values_cache_size_mb: Option, + /// Maximum lag in the number of blocks for the latest values cache after which the cache is reset. Greater values + /// lead to increased the cache update latency, i.e., less storage queries being processed by the cache. OTOH, smaller values + /// can lead to spurious resets when Postgres lags for whatever reason (e.g., when sealing L1 batches). + pub latest_values_max_block_lag: Option, /// Limit for fee history block range. pub fee_history_limit: Option, /// Maximum number of requests in a single batch JSON RPC request. Default is 500. @@ -243,20 +247,21 @@ impl Web3JsonRpcConfig { estimate_gas_acceptable_overestimation: 1000, estimate_gas_optimize_search: false, max_tx_size: 1000000, - vm_execution_cache_misses_limit: Default::default(), - vm_concurrency_limit: Default::default(), - factory_deps_cache_size_mb: Default::default(), - initial_writes_cache_size_mb: Default::default(), - latest_values_cache_size_mb: Default::default(), - fee_history_limit: Default::default(), - max_batch_request_size: Default::default(), - max_response_body_size_mb: Default::default(), + vm_execution_cache_misses_limit: None, + vm_concurrency_limit: None, + factory_deps_cache_size_mb: None, + initial_writes_cache_size_mb: None, + latest_values_cache_size_mb: None, + latest_values_max_block_lag: None, + fee_history_limit: None, + max_batch_request_size: None, + max_response_body_size_mb: None, max_response_body_size_overrides_mb: MaxResponseSizeOverrides::empty(), - websocket_requests_per_minute_limit: Default::default(), - mempool_cache_update_interval: Default::default(), - mempool_cache_size: Default::default(), + websocket_requests_per_minute_limit: None, + mempool_cache_update_interval: None, + mempool_cache_size: None, tree_api_url: None, - whitelisted_tokens_for_aa: Default::default(), + whitelisted_tokens_for_aa: vec![], api_namespaces: None, extended_api_tracing: false, } @@ -308,6 +313,11 @@ impl Web3JsonRpcConfig { self.latest_values_cache_size_mb.unwrap_or(128) * super::BYTES_IN_MEGABYTE } + /// Returns the maximum lag in the number of blocks for the latest values cache. + pub fn latest_values_max_block_lag(&self) -> u32 { + self.latest_values_max_block_lag.map_or(20, NonZeroU32::get) + } + pub fn fee_history_limit(&self) -> u64 { self.fee_history_limit.unwrap_or(1024) } diff --git a/core/lib/config/src/testonly.rs b/core/lib/config/src/testonly.rs index 960808aa6a69..0fdd927d19f0 100644 --- a/core/lib/config/src/testonly.rs +++ b/core/lib/config/src/testonly.rs @@ -86,6 +86,7 @@ impl Distribution for EncodeDist { factory_deps_cache_size_mb: self.sample(rng), initial_writes_cache_size_mb: self.sample(rng), latest_values_cache_size_mb: self.sample(rng), + latest_values_max_block_lag: self.sample(rng), fee_history_limit: self.sample(rng), max_batch_request_size: self.sample(rng), max_response_body_size_mb: self.sample(rng), diff --git a/core/lib/env_config/src/api.rs b/core/lib/env_config/src/api.rs index 53efea9a7848..ecc2343d49f4 100644 --- a/core/lib/env_config/src/api.rs +++ b/core/lib/env_config/src/api.rs @@ -76,6 +76,7 @@ mod tests { factory_deps_cache_size_mb: Some(128), initial_writes_cache_size_mb: Some(32), latest_values_cache_size_mb: Some(256), + latest_values_max_block_lag: Some(NonZeroU32::new(50).unwrap()), fee_history_limit: Some(100), max_batch_request_size: Some(200), max_response_body_size_mb: Some(10), @@ -136,6 +137,7 @@ mod tests { API_WEB3_JSON_RPC_FACTORY_DEPS_CACHE_SIZE_MB=128 API_WEB3_JSON_RPC_INITIAL_WRITES_CACHE_SIZE_MB=32 API_WEB3_JSON_RPC_LATEST_VALUES_CACHE_SIZE_MB=256 + API_WEB3_JSON_RPC_LATEST_VALUES_MAX_BLOCK_LAG=50 API_WEB3_JSON_RPC_FEE_HISTORY_LIMIT=100 API_WEB3_JSON_RPC_MAX_BATCH_REQUEST_SIZE=200 API_WEB3_JSON_RPC_WEBSOCKET_REQUESTS_PER_MINUTE_LIMIT=10 diff --git a/core/lib/protobuf_config/src/api.rs b/core/lib/protobuf_config/src/api.rs index a0c3825228af..3db80c6d691a 100644 --- a/core/lib/protobuf_config/src/api.rs +++ b/core/lib/protobuf_config/src/api.rs @@ -1,4 +1,4 @@ -use std::num::NonZeroUsize; +use std::num::{NonZeroU32, NonZeroUsize}; use anyhow::Context as _; use zksync_config::configs::{api, ApiConfig}; @@ -113,6 +113,11 @@ impl ProtoRepr for proto::Web3JsonRpc { .map(|x| x.try_into()) .transpose() .context("latest_values_cache_size_mb")?, + latest_values_max_block_lag: self + .latest_values_max_block_lag + .map(|x| x.try_into()) + .transpose() + .context("latest_values_max_block_lag")?, fee_history_limit: self.fee_history_limit, max_batch_request_size: self .max_batch_request_size @@ -183,6 +188,7 @@ impl ProtoRepr for proto::Web3JsonRpc { latest_values_cache_size_mb: this .latest_values_cache_size_mb .map(|x| x.try_into().unwrap()), + latest_values_max_block_lag: this.latest_values_max_block_lag.map(NonZeroU32::get), fee_history_limit: this.fee_history_limit, max_batch_request_size: this.max_batch_request_size.map(|x| x.try_into().unwrap()), max_response_body_size_mb: this diff --git a/core/lib/protobuf_config/src/proto/config/api.proto b/core/lib/protobuf_config/src/proto/config/api.proto index 68475e442fd6..89ba0a6bcd2c 100644 --- a/core/lib/protobuf_config/src/proto/config/api.proto +++ b/core/lib/protobuf_config/src/proto/config/api.proto @@ -41,6 +41,7 @@ message Web3JsonRpc { repeated string api_namespaces = 32; // Optional, if empty all namespaces are available optional bool extended_api_tracing = 33; // optional, default false optional bool estimate_gas_optimize_search = 34; // optional, default false + optional uint32 latest_values_max_block_lag = 35; // optional reserved 15; reserved "l1_to_l2_transactions_compatibility_mode"; reserved 11; reserved "request_timeout"; diff --git a/core/lib/state/src/cache/lru_cache.rs b/core/lib/state/src/cache/lru_cache.rs index fa37bdb3e227..55b037bbb8c2 100644 --- a/core/lib/state/src/cache/lru_cache.rs +++ b/core/lib/state/src/cache/lru_cache.rs @@ -46,6 +46,13 @@ where Self { name, cache } } + /// Returns the capacity of this cache in bytes. + pub fn capacity(&self) -> u64 { + self.cache + .as_ref() + .map_or(0, |cache| cache.policy().max_capacity().unwrap_or(u64::MAX)) + } + /// Gets an entry and pulls it to the front if it exists. pub fn get(&self, key: &K) -> Option { let latency = METRICS.latency[&(self.name, Method::Get)].start(); diff --git a/core/lib/state/src/postgres/mod.rs b/core/lib/state/src/postgres/mod.rs index 67866634ee4b..f689f1487f35 100644 --- a/core/lib/state/src/postgres/mod.rs +++ b/core/lib/state/src/postgres/mod.rs @@ -72,8 +72,7 @@ impl CacheValue for TimestampedStorageValue { #[allow(clippy::cast_possible_truncation)] // doesn't happen in practice fn cache_weight(&self) -> u32 { const WEIGHT: usize = mem::size_of::() + mem::size_of::(); - // ^ Since values are small in size, we want to account for key sizes as well - + // ^ Since values are small, we want to account for key sizes as well WEIGHT as u32 } } @@ -114,6 +113,14 @@ impl ValuesCache { Self(Arc::new(RwLock::new(inner))) } + fn capacity(&self) -> u64 { + self.0 + .read() + .expect("values cache is poisoned") + .values + .capacity() + } + /// *NB.* The returned value should be considered immediately stale; at best, it can be /// the lower boundary on the current `valid_for` value. fn valid_for(&self) -> L2BlockNumber { @@ -154,80 +161,86 @@ impl ValuesCache { } } + fn reset( + &self, + from_l2_block: L2BlockNumber, + to_l2_block: L2BlockNumber, + ) -> anyhow::Result<()> { + // We can spend too much time loading data from Postgres, so we opt for an easier "update" route: + // evict *everything* from cache and call it a day. This should not happen too often in practice. + tracing::info!( + "Storage values cache is too far behind (current L2 block is {from_l2_block}; \ + requested update to {to_l2_block}); resetting the cache" + ); + let mut lock = self + .0 + .write() + .map_err(|_| anyhow::anyhow!("values cache is poisoned"))?; + anyhow::ensure!( + lock.valid_for == from_l2_block, + "sanity check failed: values cache was expected to be valid for L2 block #{from_l2_block}, but it's actually \ + valid for L2 block #{}", + lock.valid_for + ); + lock.valid_for = to_l2_block; + lock.values.clear(); + + CACHE_METRICS.values_emptied.inc(); + CACHE_METRICS + .values_valid_for_miniblock + .set(u64::from(to_l2_block.0)); + Ok(()) + } + async fn update( &self, from_l2_block: L2BlockNumber, to_l2_block: L2BlockNumber, connection: &mut Connection<'_, Core>, ) -> anyhow::Result<()> { - const MAX_L2_BLOCKS_LAG: u32 = 5; - tracing::debug!( "Updating storage values cache from L2 block {from_l2_block} to {to_l2_block}" ); - if to_l2_block.0 - from_l2_block.0 > MAX_L2_BLOCKS_LAG { - // We can spend too much time loading data from Postgres, so we opt for an easier "update" route: - // evict *everything* from cache and call it a day. This should not happen too often in practice. - tracing::info!( - "Storage values cache is too far behind (current L2 block is {from_l2_block}; \ - requested update to {to_l2_block}); resetting the cache" - ); - let mut lock = self - .0 - .write() - .map_err(|_| anyhow::anyhow!("values cache is poisoned"))?; - anyhow::ensure!( - lock.valid_for == from_l2_block, - "sanity check failed: values cache was expected to be valid for L2 block #{from_l2_block}, but it's actually \ - valid for L2 block #{}", - lock.valid_for - ); - lock.valid_for = to_l2_block; - lock.values.clear(); + let update_latency = CACHE_METRICS.values_update[&ValuesUpdateStage::LoadKeys].start(); + let l2_blocks = (from_l2_block + 1)..=to_l2_block; + let modified_keys = connection + .storage_logs_dal() + .modified_keys_in_l2_blocks(l2_blocks.clone()) + .await?; - CACHE_METRICS.values_emptied.inc(); - } else { - let update_latency = CACHE_METRICS.values_update[&ValuesUpdateStage::LoadKeys].start(); - let l2_blocks = (from_l2_block + 1)..=to_l2_block; - let modified_keys = connection - .storage_logs_dal() - .modified_keys_in_l2_blocks(l2_blocks.clone()) - .await?; - - let elapsed = update_latency.observe(); - CACHE_METRICS - .values_update_modified_keys - .observe(modified_keys.len()); - tracing::debug!( - "Loaded {modified_keys_len} modified storage keys from L2 blocks {l2_blocks:?}; \ - took {elapsed:?}", - modified_keys_len = modified_keys.len() - ); + let elapsed = update_latency.observe(); + CACHE_METRICS + .values_update_modified_keys + .observe(modified_keys.len()); + tracing::debug!( + "Loaded {modified_keys_len} modified storage keys from L2 blocks {l2_blocks:?}; \ + took {elapsed:?}", + modified_keys_len = modified_keys.len() + ); - let update_latency = - CACHE_METRICS.values_update[&ValuesUpdateStage::RemoveStaleKeys].start(); - let mut lock = self - .0 - .write() - .map_err(|_| anyhow::anyhow!("values cache is poisoned"))?; - // The code below holding onto the write `lock` is the only code that can theoretically poison the `RwLock` - // (other than emptying the cache above). Thus, it's kept as simple and tight as possible. - // E.g., we load data from Postgres beforehand. - anyhow::ensure!( - lock.valid_for == from_l2_block, - "sanity check failed: values cache was expected to be valid for L2 block #{from_l2_block}, but it's actually \ - valid for L2 block #{}", - lock.valid_for - ); - lock.valid_for = to_l2_block; - for modified_key in &modified_keys { - lock.values.remove(modified_key); - } - lock.values.report_size(); - drop(lock); - update_latency.observe(); + let update_latency = + CACHE_METRICS.values_update[&ValuesUpdateStage::RemoveStaleKeys].start(); + let mut lock = self + .0 + .write() + .map_err(|_| anyhow::anyhow!("values cache is poisoned"))?; + // The code below holding onto the write `lock` is the only code that can theoretically poison the `RwLock` + // (other than emptying the cache above). Thus, it's kept as simple and tight as possible. + // E.g., we load data from Postgres beforehand. + anyhow::ensure!( + lock.valid_for == from_l2_block, + "sanity check failed: values cache was expected to be valid for L2 block #{from_l2_block}, but it's actually \ + valid for L2 block #{}", + lock.valid_for + ); + lock.valid_for = to_l2_block; + for modified_key in &modified_keys { + lock.values.remove(modified_key); } + lock.values.report_size(); + drop(lock); + update_latency.observe(); CACHE_METRICS .values_valid_for_miniblock @@ -298,6 +311,7 @@ impl PostgresStorageCaches { pub fn configure_storage_values_cache( &mut self, capacity: u64, + max_l2_blocks_lag: u32, connection_pool: ConnectionPool, ) -> PostgresStorageCachesTask { assert!( @@ -320,6 +334,7 @@ impl PostgresStorageCaches { PostgresStorageCachesTask { connection_pool, values_cache, + max_l2_blocks_lag, command_receiver, } } @@ -349,6 +364,7 @@ impl PostgresStorageCaches { pub struct PostgresStorageCachesTask { connection_pool: ConnectionPool, values_cache: ValuesCache, + max_l2_blocks_lag: u32, command_receiver: UnboundedReceiver, } @@ -359,32 +375,41 @@ impl PostgresStorageCachesTask { /// /// - Propagates Postgres errors. /// - Propagates errors from the cache update task. + #[tracing::instrument(name = "PostgresStorageCachesTask::run", skip_all)] pub async fn run(mut self, mut stop_receiver: watch::Receiver) -> anyhow::Result<()> { + tracing::info!( + max_l2_blocks_lag = self.max_l2_blocks_lag, + values_cache.capacity = self.values_cache.capacity(), + "Starting task" + ); + let mut current_l2_block = self.values_cache.valid_for(); loop { - tokio::select! { - _ = stop_receiver.changed() => { - break; - } - Some(to_l2_block) = self.command_receiver.recv() => { - if to_l2_block <= current_l2_block { - continue; - } - let mut connection = self - .connection_pool - .connection_tagged("values_cache_updater") - .await?; - self.values_cache - .update(current_l2_block, to_l2_block, &mut connection) - .await?; - current_l2_block = to_l2_block; - } + let to_l2_block = tokio::select! { + _ = stop_receiver.changed() => break, + Some(to_l2_block) = self.command_receiver.recv() => to_l2_block, else => { // The command sender has been dropped, which means that we must receive the stop signal soon. stop_receiver.changed().await?; break; } + }; + if to_l2_block <= current_l2_block { + continue; + } + + if to_l2_block.0 - current_l2_block.0 > self.max_l2_blocks_lag { + self.values_cache.reset(current_l2_block, to_l2_block)?; + } else { + let mut connection = self + .connection_pool + .connection_tagged("values_cache_updater") + .await?; + self.values_cache + .update(current_l2_block, to_l2_block, &mut connection) + .await?; } + current_l2_block = to_l2_block; } Ok(()) } diff --git a/core/lib/state/src/postgres/tests.rs b/core/lib/state/src/postgres/tests.rs index f88055fa0479..029df60cb461 100644 --- a/core/lib/state/src/postgres/tests.rs +++ b/core/lib/state/src/postgres/tests.rs @@ -462,7 +462,7 @@ async fn wait_for_cache_update(values_cache: &ValuesCache, target_l2_block: L2Bl fn test_values_cache(pool: &ConnectionPool, rt_handle: Handle) { let mut caches = PostgresStorageCaches::new(1_024, 1_024); - let task = caches.configure_storage_values_cache(1_024 * 1_024, pool.clone()); + let task = caches.configure_storage_values_cache(1_024 * 1_024, 5, pool.clone()); let (stop_sender, stop_receiver) = watch::channel(false); let update_task_handle = tokio::task::spawn(task.run(stop_receiver)); @@ -595,7 +595,7 @@ fn mini_fuzz_values_cache_inner( mut rt_handle: Handle, ) { let mut caches = PostgresStorageCaches::new(1_024, 1_024); - let _ = caches.configure_storage_values_cache(1_024 * 1_024, pool.clone()); + let _ = caches.configure_storage_values_cache(1_024 * 1_024, 5, pool.clone()); let values_cache = caches.values.as_ref().unwrap().cache.clone(); let mut connection = rt_handle.block_on(pool.connection()).unwrap(); diff --git a/core/node/node_framework/src/implementations/layers/web3_api/tx_sender.rs b/core/node/node_framework/src/implementations/layers/web3_api/tx_sender.rs index a09938055fae..ba1a69e23bb6 100644 --- a/core/node/node_framework/src/implementations/layers/web3_api/tx_sender.rs +++ b/core/node/node_framework/src/implementations/layers/web3_api/tx_sender.rs @@ -32,6 +32,7 @@ pub struct PostgresStorageCachesConfig { pub factory_deps_cache_size: u64, pub initial_writes_cache_size: u64, pub latest_values_cache_size: u64, + pub latest_values_max_block_lag: u32, } /// Wiring layer for the `TxSender`. @@ -133,10 +134,13 @@ impl WiringLayer for TxSenderLayer { PostgresStorageCaches::new(factory_deps_capacity, initial_writes_capacity); let postgres_storage_caches_task = if values_capacity > 0 { - Some( - storage_caches - .configure_storage_values_cache(values_capacity, replica_pool.clone()), - ) + let update_task = storage_caches.configure_storage_values_cache( + values_capacity, + self.postgres_storage_caches_config + .latest_values_max_block_lag, + replica_pool.clone(), + ); + Some(update_task) } else { None };