Skip to content

Commit

Permalink
fix: dropping installed filters (#670)
Browse files Browse the repository at this point in the history
## What ❔

Fix dropping installed filters:

* use LRU instead of HashMap for InstalledFIlters

## Why ❔

Sometimes filters are dropped very quickly.

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [ ] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `cargo spellcheck
--cfg=./spellcheck/era.cfg --code 1`.
  • Loading branch information
Artemka374 authored Dec 13, 2023
1 parent 1153c42 commit 985c737
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 27 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/lib/zksync_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ num = { version = "0.3.1", features = ["serde"] }
bigdecimal = { version = "0.2.2", features = ["serde"] }
reqwest = { version = "0.11", features = ["blocking", "json"] }
hex = "0.4"
lru = { version = "0.12.1", default-features = false }
governor = "0.4.2"
tower-http = { version = "0.4.1", features = ["full"] }
tower = { version = "0.4.13", features = ["full"] }
Expand Down
4 changes: 1 addition & 3 deletions core/lib/zksync_core/src/api_server/web3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,9 +292,7 @@ impl<G: 'static + Send + Sync + L1GasPriceProvider> ApiBuilder<G> {
tokio::spawn(update_task);

RpcState {
installed_filters: Arc::new(Mutex::new(Filters::new(
self.filters_limit.unwrap_or(usize::MAX),
))),
installed_filters: Arc::new(Mutex::new(Filters::new(self.filters_limit))),
connection_pool: self.pool,
tx_sender: self.tx_sender.expect("TxSender is not provided"),
sync_state: self.sync_state,
Expand Down
81 changes: 57 additions & 24 deletions core/lib/zksync_core/src/api_server/web3/state.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::{
collections::HashMap,
convert::TryFrom,
future::Future,
sync::{
Expand All @@ -9,6 +8,7 @@ use std::{
time::{Duration, Instant},
};

use lru::LruCache;
use tokio::sync::Mutex;
use vise::GaugeGuard;
use zksync_config::configs::{api::Web3JsonRpcConfig, chain::NetworkConfig, ContractsConfig};
Expand Down Expand Up @@ -541,12 +541,9 @@ impl<E> RpcState<E> {
}
}

/// Contains mapping from index to `Filter` with optional location.
#[derive(Default, Debug)]
pub(crate) struct Filters {
state: HashMap<U256, InstalledFilter>,
max_cap: usize,
}
/// Contains mapping from index to `Filter`x with optional location.
#[derive(Debug)]
pub(crate) struct Filters(LruCache<U256, InstalledFilter>);

#[derive(Debug)]
struct InstalledFilter {
Expand Down Expand Up @@ -592,37 +589,33 @@ impl Drop for InstalledFilter {

impl Filters {
/// Instantiates `Filters` with given max capacity.
pub fn new(max_cap: usize) -> Self {
Self {
state: Default::default(),
max_cap,
}
pub fn new(max_cap: Option<usize>) -> Self {
let state = match max_cap {
Some(max_cap) => {
LruCache::new(max_cap.try_into().expect("Filter capacity should not be 0"))
}
None => LruCache::unbounded(),
};
Self(state)
}

/// Adds filter to the state and returns its key.
pub fn add(&mut self, filter: TypedFilter) -> U256 {
let idx = loop {
let val = H256::random().to_fixed_bytes().into();
if !self.state.contains_key(&val) {
if !self.0.contains(&val) {
break val;
}
};

self.state.insert(idx, InstalledFilter::new(filter));

// Check if we reached max capacity
if self.state.len() > self.max_cap {
if let Some(first) = self.state.keys().next().cloned() {
self.remove(first);
}
}
self.0.push(idx, InstalledFilter::new(filter));

idx
}

/// Retrieves filter from the state.
pub fn get_and_update_stats(&mut self, index: U256) -> Option<TypedFilter> {
let installed_filter = self.state.get_mut(&index)?;
let installed_filter = self.0.get_mut(&index)?;

installed_filter.update_stats();

Expand All @@ -631,13 +624,53 @@ impl Filters {

/// Updates filter in the state.
pub fn update(&mut self, index: U256, new_filter: TypedFilter) {
if let Some(installed_filter) = self.state.get_mut(&index) {
if let Some(installed_filter) = self.0.get_mut(&index) {
installed_filter.filter = new_filter;
}
}

/// Removes filter from the map.
pub fn remove(&mut self, index: U256) -> bool {
self.state.remove(&index).is_some()
self.0.pop(&index).is_some()
}
}

#[cfg(test)]
mod tests {
use chrono::NaiveDateTime;

#[test]
fn test_filters_functionality() {
use super::*;

let mut filters = Filters::new(Some(2));

let filter1 = TypedFilter::Events(Filter::default(), MiniblockNumber::default());
let filter2 = TypedFilter::Blocks(MiniblockNumber::default());
let filter3 = TypedFilter::PendingTransactions(NaiveDateTime::default());

let idx1 = filters.add(filter1.clone());
let idx2 = filters.add(filter2);
let idx3 = filters.add(filter3);

assert_eq!(filters.0.len(), 2);
assert!(!filters.0.contains(&idx1));
assert!(filters.0.contains(&idx2));
assert!(filters.0.contains(&idx3));

filters.get_and_update_stats(idx2);

let idx1 = filters.add(filter1);
assert_eq!(filters.0.len(), 2);
assert!(filters.0.contains(&idx1));
assert!(filters.0.contains(&idx2));
assert!(!filters.0.contains(&idx3));

filters.remove(idx1);

assert_eq!(filters.0.len(), 1);
assert!(!filters.0.contains(&idx1));
assert!(filters.0.contains(&idx2));
assert!(!filters.0.contains(&idx3));
}
}

0 comments on commit 985c737

Please sign in to comment.