diff --git a/Cargo.lock b/Cargo.lock index c4ef17f5e7e4..3ced51cebe87 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5600,7 +5600,7 @@ dependencies = [ [[package]] name = "parity-db" version = "0.3.5" -source = "git+https://github.com/paritytech/parity-db?branch=master#3b9e57772c3f94353c2f3806bab25e9dfe3e70cf" +source = "git+https://github.com/cheme/parity-db?branch=eb#f80b78a84c1c5552bf43396a11a087895a7ae85e" dependencies = [ "blake2-rfc", "crc32fast", @@ -6686,12 +6686,14 @@ dependencies = [ "kvdb", "kvdb-shared-tests", "lazy_static", + "linked-hash-map", "log", "lru 0.7.2", "metered-channel", "parity-db", "parity-scale-codec", "parity-util-mem", + "parking_lot", "pin-project 1.0.10", "polkadot-node-jaeger", "polkadot-node-metrics", @@ -11159,8 +11161,8 @@ version = "1.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ee73e6e4924fe940354b8d4d98cad5231175d615cd855b758adc658c0aac6a0" dependencies = [ - "cfg-if 1.0.0", - "rand 0.8.4", + "cfg-if 0.1.10", + "rand 0.7.3", "static_assertions", ] diff --git a/Cargo.toml b/Cargo.toml index 1aed9674cd78..751cf289bcc8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -178,4 +178,4 @@ polkadot = { path = "/usr/bin/polkadot" } config = "./scripts/gitlab/spellcheck.toml" [patch.crates-io] -parity-db = { git = "https://github.com/paritytech/parity-db", branch = "master" } +parity-db = { git = "https://github.com/cheme/parity-db", branch = "eb" } diff --git a/node/service/src/parachains_db/mod.rs b/node/service/src/parachains_db/mod.rs index e5905004d70e..f846bba80474 100644 --- a/node/service/src/parachains_db/mod.rs +++ b/node/service/src/parachains_db/mod.rs @@ -130,7 +130,7 @@ pub fn exists_rocksdb_db(root: PathBuf) -> bool { /// Open a parity db database. #[cfg(feature = "full-node")] -pub fn open_creating(root: PathBuf, _cache_sizes: CacheSizes) -> io::Result> { +pub fn open_creating(root: PathBuf, cache_sizes: CacheSizes) -> io::Result> { let path = root.join("parachains"); let path_str = path .to_str() @@ -146,9 +146,15 @@ pub fn open_creating(root: PathBuf, _cache_sizes: CacheSizes) -> io::Result(result: parity_db::Result) -> T { match result { @@ -156,6 +157,8 @@ pub mod paritydb_impl { pub struct DbAdapter { db: Db, allowed_iter: BTreeSet, + lru: Vec, Option>>>>>>, + write_lock: Arc>, } impl parity_util_mem::MallocSizeOf for DbAdapter { @@ -170,6 +173,11 @@ pub mod paritydb_impl { } fn get(&self, col: u32, key: &[u8]) -> Result> { + if let Some(lru) = self.lru.get(col as usize).map(Option::as_ref).flatten() { + if let Some(cached) = lru.write().get(key) { + return Ok(cached.clone()) + } + } map_err(self.db.get(col as u8, key)) } @@ -233,6 +241,7 @@ pub mod paritydb_impl { if let Some((prefix_iter, col, prefix)) = current_prefix_iter { if let Some((key, _value)) = handle_err(prefix_iter.next()) { if key.starts_with(prefix) { + self.set_lru(*col as u32, &key[..], &None); return Some((*col, key.to_vec(), None)) } } @@ -240,9 +249,15 @@ pub mod paritydb_impl { } return match ops.next() { None => None, - Some(DBOp::Insert { col, key, value }) => - Some((col as u8, key.to_vec(), Some(value))), - Some(DBOp::Delete { col, key }) => Some((col as u8, key.to_vec(), None)), + Some(DBOp::Insert { col, key, value }) => { + let value = Some(value); + self.set_lru(col, &key[..], &value); + Some((col as u8, key.to_vec(), value)) + }, + Some(DBOp::Delete { col, key }) => { + self.set_lru(col, &key[..], &None); + Some((col as u8, key.to_vec(), None)) + }, Some(DBOp::DeletePrefix { col, prefix }) => { let col = col as u8; let mut iter = handle_err(self.db.iter(col)); @@ -253,6 +268,8 @@ pub mod paritydb_impl { } }); + // Locking is required due to possible racy change of the content of a deleted prefix. + let _lock = self.write_lock.lock(); map_err(self.db.commit(transaction)) } } @@ -265,8 +282,22 @@ pub mod paritydb_impl { impl DbAdapter { /// Implementation of of `Database` for parity-db adapter. - pub fn new(db: Db, allowed_iter: &[u32]) -> Self { - DbAdapter { db, allowed_iter: allowed_iter.iter().cloned().collect() } + pub fn new(db: Db, allowed_iter: &[u32], caches_size: &[(u32, usize)]) -> Self { + let mut lru = Vec::, Option>>>>>>::new(); + for (col, cache_size) in caches_size { + if *cache_size > 0 { + lru.resize(*col as usize + 1, None); + lru[*col as usize] = Some(Arc::new(RwLock::new(LRUMap::new(*cache_size)))); + } + } + let write_lock = Arc::new(Mutex::new(())); + DbAdapter { db, allowed_iter: allowed_iter.iter().cloned().collect(), lru, write_lock } + } + + fn set_lru(&self, col: u32, key: &[u8], value: &Option>) { + if let Some(lru) = self.lru.get(col as usize).map(Option::as_ref).flatten() { + lru.write().add(key.to_vec(), value.clone()); + } } } @@ -277,7 +308,7 @@ pub mod paritydb_impl { use std::io; use tempfile::Builder as TempfileBuilder; - fn create(num_col: u32) -> io::Result<(DbAdapter, tempfile::TempDir)> { + fn create(num_col: u32, with_cache: bool) -> io::Result<(DbAdapter, tempfile::TempDir)> { let tempdir = TempfileBuilder::new().prefix("").tempdir()?; let mut options = parity_db::Options::with_columns(tempdir.path(), num_col as u8); for i in 0..num_col { @@ -287,44 +318,138 @@ pub mod paritydb_impl { let db = parity_db::Db::open_or_create(&options) .map_err(|err| io::Error::new(io::ErrorKind::Other, format!("{:?}", err)))?; - let db = DbAdapter::new(db, &[0]); + let cache = if with_cache { + &[(0, 100)] + } else { + &[(0, 0)] + }; + let db = DbAdapter::new(db, &[0], cache); Ok((db, tempdir)) } #[test] fn put_and_get() -> io::Result<()> { - let (db, _temp_file) = create(1)?; + let (db, _temp_file) = create(1, false)?; + st::test_put_and_get(&db)?; + let (db, _temp_file) = create(1, true)?; st::test_put_and_get(&db) } #[test] fn delete_and_get() -> io::Result<()> { - let (db, _temp_file) = create(1)?; + let (db, _temp_file) = create(1, false)?; + st::test_delete_and_get(&db)?; + let (db, _temp_file) = create(1, true)?; st::test_delete_and_get(&db) } #[test] fn delete_prefix() -> io::Result<()> { - let (db, _temp_file) = create(st::DELETE_PREFIX_NUM_COLUMNS)?; + let (db, _temp_file) = create(st::DELETE_PREFIX_NUM_COLUMNS, false)?; + st::test_delete_prefix(&db)?; + let (db, _temp_file) = create(st::DELETE_PREFIX_NUM_COLUMNS, true)?; st::test_delete_prefix(&db) } #[test] fn iter() -> io::Result<()> { - let (db, _temp_file) = create(1)?; + let (db, _temp_file) = create(1, false)?; + st::test_iter(&db)?; + let (db, _temp_file) = create(1, true)?; st::test_iter(&db) } #[test] fn iter_with_prefix() -> io::Result<()> { - let (db, _temp_file) = create(1)?; + let (db, _temp_file) = create(1, false)?; + st::test_iter_with_prefix(&db)?; + let (db, _temp_file) = create(1, true)?; st::test_iter_with_prefix(&db) } #[test] fn complex() -> io::Result<()> { - let (db, _temp_file) = create(1)?; + let (db, _temp_file) = create(1, false)?; + st::test_complex(&db)?; + let (db, _temp_file) = create(1, true)?; st::test_complex(&db) } } } + +mod lru_map { + use linked_hash_map::{Entry, LinkedHashMap}; + use std::hash::Hash as StdHash; + + pub(super) struct LRUMap(LinkedHashMap, usize, usize); + + impl LRUMap + where + K: std::hash::Hash + Eq, + { + pub(crate) fn new(size_limit: usize) -> Self { + LRUMap(LinkedHashMap::new(), 0, size_limit) + } + } + + /// Internal trait similar to `heapsize` but using + /// a simple estimation. + /// + /// This should not be made public, it is implementation + /// detail trait. If it need to become public please + /// consider using `malloc_size_of`. + pub(super) trait EstimateSize { + /// Return a size estimation of additional size needed + /// to cache this struct (in bytes). + fn estimate_size(&self) -> usize; + } + + impl EstimateSize for Vec { + fn estimate_size(&self) -> usize { + self.len() + } + } + + impl EstimateSize for Option> { + fn estimate_size(&self) -> usize { + self.as_ref().map(|v| v.capacity()).unwrap_or(0) + } + } + + impl LRUMap { + pub(super) fn add(&mut self, k: K, v: V) { + let lmap = &mut self.0; + let storage_used_size = &mut self.1; + let limit = self.2; + let klen = k.estimate_size(); + *storage_used_size += v.estimate_size(); + match lmap.entry(k) { + Entry::Occupied(mut entry) => { + *storage_used_size -= entry.get().estimate_size(); + entry.insert(v); + }, + Entry::Vacant(entry) => { + *storage_used_size += klen; + entry.insert(v); + }, + }; + + while *storage_used_size > limit { + if let Some((k, v)) = lmap.pop_front() { + *storage_used_size -= k.estimate_size(); + *storage_used_size -= v.estimate_size(); + } else { + break + } + } + } + + pub(super) fn get(&mut self, k: &Q) -> Option<&mut V> + where + K: std::borrow::Borrow, + Q: StdHash + Eq, + { + self.0.get_refresh(k) + } + } +}