Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: dropping installed filters #670

Merged
merged 13 commits into from
Dec 13, 2023
Merged
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/lib/zksync_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ num = { version = "0.3.1", features = ["serde"] }
bigdecimal = { version = "0.2.2", features = ["serde"] }
reqwest = { version = "0.11", features = ["blocking", "json"] }
hex = "0.4"
lru = { version = "0.12.1", default-features = false }
governor = "0.4.2"
tower-http = { version = "0.4.1", features = ["full"] }
tower = { version = "0.4.13", features = ["full"] }
Expand Down
4 changes: 1 addition & 3 deletions core/lib/zksync_core/src/api_server/web3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,9 +292,7 @@ impl<G: 'static + Send + Sync + L1GasPriceProvider> ApiBuilder<G> {
tokio::spawn(update_task);

RpcState {
installed_filters: Arc::new(Mutex::new(Filters::new(
self.filters_limit.unwrap_or(usize::MAX),
))),
installed_filters: Arc::new(Mutex::new(Filters::new(self.filters_limit))),
connection_pool: self.pool,
tx_sender: self.tx_sender.expect("TxSender is not provided"),
sync_state: self.sync_state,
Expand Down
81 changes: 57 additions & 24 deletions core/lib/zksync_core/src/api_server/web3/state.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::{
collections::HashMap,
convert::TryFrom,
future::Future,
sync::{
Expand All @@ -9,6 +8,7 @@ use std::{
time::{Duration, Instant},
};

use lru::LruCache;
use tokio::sync::Mutex;
use vise::GaugeGuard;
use zksync_config::configs::{api::Web3JsonRpcConfig, chain::NetworkConfig, ContractsConfig};
Expand Down Expand Up @@ -541,12 +541,9 @@ impl<E> RpcState<E> {
}
}

/// Contains mapping from index to `Filter` with optional location.
#[derive(Default, Debug)]
pub(crate) struct Filters {
state: HashMap<U256, InstalledFilter>,
max_cap: usize,
}
/// Contains mapping from index to `Filter`x with optional location.
Artemka374 marked this conversation as resolved.
Show resolved Hide resolved
#[derive(Debug)]
pub(crate) struct Filters(LruCache<U256, InstalledFilter>);

#[derive(Debug)]
struct InstalledFilter {
Expand Down Expand Up @@ -592,37 +589,33 @@ impl Drop for InstalledFilter {

impl Filters {
/// Instantiates `Filters` with given max capacity.
pub fn new(max_cap: usize) -> Self {
Self {
state: Default::default(),
max_cap,
}
pub fn new(max_cap: Option<usize>) -> Self {
let state = match max_cap {
Some(max_cap) => {
LruCache::new(max_cap.try_into().expect("Filter capacity should not be 0"))
}
None => LruCache::unbounded(),
};
Self(state)
}

/// Adds filter to the state and returns its key.
pub fn add(&mut self, filter: TypedFilter) -> U256 {
let idx = loop {
let val = H256::random().to_fixed_bytes().into();
if !self.state.contains_key(&val) {
if !self.0.contains(&val) {
break val;
}
};

self.state.insert(idx, InstalledFilter::new(filter));

// Check if we reached max capacity
if self.state.len() > self.max_cap {
if let Some(first) = self.state.keys().next().cloned() {
self.remove(first);
}
}
self.0.push(idx, InstalledFilter::new(filter));

idx
}

/// Retrieves filter from the state.
pub fn get_and_update_stats(&mut self, index: U256) -> Option<TypedFilter> {
let installed_filter = self.state.get_mut(&index)?;
let installed_filter = self.0.get_mut(&index)?;

installed_filter.update_stats();

Expand All @@ -631,13 +624,53 @@ impl Filters {

/// Updates filter in the state.
pub fn update(&mut self, index: U256, new_filter: TypedFilter) {
if let Some(installed_filter) = self.state.get_mut(&index) {
if let Some(installed_filter) = self.0.get_mut(&index) {
installed_filter.filter = new_filter;
}
}

/// Removes filter from the map.
pub fn remove(&mut self, index: U256) -> bool {
self.state.remove(&index).is_some()
self.0.pop(&index).is_some()
}
}

#[cfg(test)]
mod tests {
Artemka374 marked this conversation as resolved.
Show resolved Hide resolved
use chrono::NaiveDateTime;

#[test]
fn test_filters_functionality() {
use super::*;

let mut filters = Filters::new(Some(2));

let filter1 = TypedFilter::Events(Filter::default(), MiniblockNumber::default());
let filter2 = TypedFilter::Blocks(MiniblockNumber::default());
let filter3 = TypedFilter::PendingTransactions(NaiveDateTime::default());

let idx1 = filters.add(filter1.clone());
let idx2 = filters.add(filter2);
let idx3 = filters.add(filter3);

assert_eq!(filters.0.len(), 2);
assert!(!filters.0.contains(&idx1));
assert!(filters.0.contains(&idx2));
assert!(filters.0.contains(&idx3));

filters.get_and_update_stats(idx2);

let idx1 = filters.add(filter1);
assert_eq!(filters.0.len(), 2);
assert!(filters.0.contains(&idx1));
assert!(filters.0.contains(&idx2));
assert!(!filters.0.contains(&idx3));

filters.remove(idx1);

assert_eq!(filters.0.len(), 1);
assert!(!filters.0.contains(&idx1));
assert!(filters.0.contains(&idx2));
assert!(!filters.0.contains(&idx3));
}
}