diff --git a/Cargo.lock b/Cargo.lock index 478372734bdd..a35ccbf237ab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3936,6 +3936,12 @@ dependencies = [ "logos-codegen", ] +[[package]] +name = "lru" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2994eeba8ed550fd9b47a0b38f0242bc3344e496483c6180b69139cc2fa5d1d7" + [[package]] name = "mach" version = "0.3.2" @@ -8704,6 +8710,7 @@ dependencies = [ "jsonrpc-http-server", "jsonrpc-pubsub", "jsonrpc-ws-server", + "lru", "metrics", "multivm", "num 0.3.1", diff --git a/core/lib/zksync_core/Cargo.toml b/core/lib/zksync_core/Cargo.toml index b2904c0b3de4..10fe43bdf404 100644 --- a/core/lib/zksync_core/Cargo.toml +++ b/core/lib/zksync_core/Cargo.toml @@ -74,6 +74,7 @@ num = { version = "0.3.1", features = ["serde"] } bigdecimal = { version = "0.2.2", features = ["serde"] } reqwest = { version = "0.11", features = ["blocking", "json"] } hex = "0.4" +lru = { version = "0.12.1", default-features = false } governor = "0.4.2" tower-http = { version = "0.4.1", features = ["full"] } tower = { version = "0.4.13", features = ["full"] } diff --git a/core/lib/zksync_core/src/api_server/web3/mod.rs b/core/lib/zksync_core/src/api_server/web3/mod.rs index 6ee53088ecff..56004f924689 100644 --- a/core/lib/zksync_core/src/api_server/web3/mod.rs +++ b/core/lib/zksync_core/src/api_server/web3/mod.rs @@ -292,9 +292,7 @@ impl ApiBuilder { tokio::spawn(update_task); RpcState { - installed_filters: Arc::new(Mutex::new(Filters::new( - self.filters_limit.unwrap_or(usize::MAX), - ))), + installed_filters: Arc::new(Mutex::new(Filters::new(self.filters_limit))), connection_pool: self.pool, tx_sender: self.tx_sender.expect("TxSender is not provided"), sync_state: self.sync_state, diff --git a/core/lib/zksync_core/src/api_server/web3/state.rs b/core/lib/zksync_core/src/api_server/web3/state.rs index 0bae4eda03cf..5f524df1f09a 100644 --- a/core/lib/zksync_core/src/api_server/web3/state.rs +++ b/core/lib/zksync_core/src/api_server/web3/state.rs @@ -1,5 +1,4 @@ use std::{ - collections::HashMap, convert::TryFrom, future::Future, sync::{ @@ -9,6 +8,7 @@ use std::{ time::{Duration, Instant}, }; +use lru::LruCache; use tokio::sync::Mutex; use vise::GaugeGuard; use zksync_config::configs::{api::Web3JsonRpcConfig, chain::NetworkConfig, ContractsConfig}; @@ -541,12 +541,9 @@ impl RpcState { } } -/// Contains mapping from index to `Filter` with optional location. -#[derive(Default, Debug)] -pub(crate) struct Filters { - state: HashMap, - max_cap: usize, -} +/// Contains mapping from index to `Filter`x with optional location. +#[derive(Debug)] +pub(crate) struct Filters(LruCache); #[derive(Debug)] struct InstalledFilter { @@ -592,37 +589,33 @@ impl Drop for InstalledFilter { impl Filters { /// Instantiates `Filters` with given max capacity. - pub fn new(max_cap: usize) -> Self { - Self { - state: Default::default(), - max_cap, - } + pub fn new(max_cap: Option) -> Self { + let state = match max_cap { + Some(max_cap) => { + LruCache::new(max_cap.try_into().expect("Filter capacity should not be 0")) + } + None => LruCache::unbounded(), + }; + Self(state) } /// Adds filter to the state and returns its key. pub fn add(&mut self, filter: TypedFilter) -> U256 { let idx = loop { let val = H256::random().to_fixed_bytes().into(); - if !self.state.contains_key(&val) { + if !self.0.contains(&val) { break val; } }; - self.state.insert(idx, InstalledFilter::new(filter)); - - // Check if we reached max capacity - if self.state.len() > self.max_cap { - if let Some(first) = self.state.keys().next().cloned() { - self.remove(first); - } - } + self.0.push(idx, InstalledFilter::new(filter)); idx } /// Retrieves filter from the state. pub fn get_and_update_stats(&mut self, index: U256) -> Option { - let installed_filter = self.state.get_mut(&index)?; + let installed_filter = self.0.get_mut(&index)?; installed_filter.update_stats(); @@ -631,13 +624,53 @@ impl Filters { /// Updates filter in the state. pub fn update(&mut self, index: U256, new_filter: TypedFilter) { - if let Some(installed_filter) = self.state.get_mut(&index) { + if let Some(installed_filter) = self.0.get_mut(&index) { installed_filter.filter = new_filter; } } /// Removes filter from the map. pub fn remove(&mut self, index: U256) -> bool { - self.state.remove(&index).is_some() + self.0.pop(&index).is_some() + } +} + +#[cfg(test)] +mod tests { + use chrono::NaiveDateTime; + + #[test] + fn test_filters_functionality() { + use super::*; + + let mut filters = Filters::new(Some(2)); + + let filter1 = TypedFilter::Events(Filter::default(), MiniblockNumber::default()); + let filter2 = TypedFilter::Blocks(MiniblockNumber::default()); + let filter3 = TypedFilter::PendingTransactions(NaiveDateTime::default()); + + let idx1 = filters.add(filter1.clone()); + let idx2 = filters.add(filter2); + let idx3 = filters.add(filter3); + + assert_eq!(filters.0.len(), 2); + assert!(!filters.0.contains(&idx1)); + assert!(filters.0.contains(&idx2)); + assert!(filters.0.contains(&idx3)); + + filters.get_and_update_stats(idx2); + + let idx1 = filters.add(filter1); + assert_eq!(filters.0.len(), 2); + assert!(filters.0.contains(&idx1)); + assert!(filters.0.contains(&idx2)); + assert!(!filters.0.contains(&idx3)); + + filters.remove(idx1); + + assert_eq!(filters.0.len(), 1); + assert!(!filters.0.contains(&idx1)); + assert!(filters.0.contains(&idx2)); + assert!(!filters.0.contains(&idx3)); } }