Skip to content

Commit

Permalink
lru cache for db column with cache
Browse files Browse the repository at this point in the history
  • Loading branch information
cheme committed Feb 2, 2022
1 parent 4cea166 commit ae177bc
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 24 deletions.
8 changes: 5 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -178,4 +178,4 @@ polkadot = { path = "/usr/bin/polkadot" }
config = "./scripts/gitlab/spellcheck.toml"

[patch.crates-io]
parity-db = { git = "https://github.com/paritytech/parity-db", branch = "master" }
parity-db = { git = "https://github.com/cheme/parity-db", branch = "eb" }
8 changes: 7 additions & 1 deletion node/service/src/parachains_db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ pub fn exists_rocksdb_db(root: PathBuf) -> bool {

/// Open a parity db database.
#[cfg(feature = "full-node")]
pub fn open_creating(root: PathBuf, _cache_sizes: CacheSizes) -> io::Result<Arc<dyn Database>> {
pub fn open_creating(root: PathBuf, cache_sizes: CacheSizes) -> io::Result<Arc<dyn Database>> {
let path = root.join("parachains");
let path_str = path
.to_str()
Expand All @@ -146,9 +146,15 @@ pub fn open_creating(root: PathBuf, _cache_sizes: CacheSizes) -> io::Result<Arc<
let db = parity_db::Db::open_or_create(&options)
.map_err(|err| io::Error::new(io::ErrorKind::Other, format!("{:?}", err)))?;

let cache_sizes = [
(columns::COL_AVAILABILITY_DATA, cache_sizes.availability_data),
(columns::COL_AVAILABILITY_META, cache_sizes.availability_meta),
(columns::COL_APPROVAL_DATA, cache_sizes.approval_data),
];
let db = polkadot_node_subsystem_util::database::paritydb_impl::DbAdapter::new(
db,
columns::ORDERED_COL,
&cache_sizes,
);
Ok(Arc::new(db))
}
9 changes: 5 additions & 4 deletions node/subsystem-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@ description = "Subsystem traits and message definitions"
async-trait = "0.1.52"
futures = "0.3.19"
itertools = "0.10"
kvdb = "0.10.0"
linked-hash-map = "0.5.4"
parity-db = { version = "0.3.5" }
parity-scale-codec = { version = "2.3.1", default-features = false, features = ["derive"] }
parity-util-mem = { version = "0.10", default-features = false }
parking_lot = "0.11.2"
pin-project = "1.0.9"
rand = "0.8.3"
thiserror = "1.0.30"
Expand All @@ -30,10 +35,6 @@ sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-application-crypto = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }

kvdb = "0.10.0"
parity-util-mem = { version = "0.10", default-features = false }
parity-db = { version = "0.3.5" }

[dev-dependencies]
assert_matches = "1.4.0"
env_logger = "0.9.0"
Expand Down
155 changes: 140 additions & 15 deletions node/subsystem-util/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,11 @@ pub mod kvdb_impl {

/// Utils to use parity-db base database.
pub mod paritydb_impl {
use super::{DBTransaction, DBValue, Database, KeyValueDB};
use super::{lru_map::LRUMap, DBTransaction, DBValue, Database, KeyValueDB};
use kvdb::{DBOp, IoStats, IoStatsKind};
use parity_db::Db;
use std::{collections::BTreeSet, io::Result};
use parking_lot::{RwLock, Mutex};
use std::{collections::BTreeSet, io::Result, sync::Arc};

fn handle_err<T>(result: parity_db::Result<T>) -> T {
match result {
Expand All @@ -156,6 +157,8 @@ pub mod paritydb_impl {
pub struct DbAdapter {
db: Db,
allowed_iter: BTreeSet<u32>,
lru: Vec<Option<Arc<RwLock<LRUMap<Vec<u8>, Option<Vec<u8>>>>>>>,
write_lock: Arc<Mutex<()>>,
}

impl parity_util_mem::MallocSizeOf for DbAdapter {
Expand All @@ -170,6 +173,11 @@ pub mod paritydb_impl {
}

fn get(&self, col: u32, key: &[u8]) -> Result<Option<DBValue>> {
if let Some(lru) = self.lru.get(col as usize).map(Option::as_ref).flatten() {
if let Some(cached) = lru.write().get(key) {
return Ok(cached.clone())
}
}
map_err(self.db.get(col as u8, key))
}

Expand Down Expand Up @@ -233,16 +241,23 @@ pub mod paritydb_impl {
if let Some((prefix_iter, col, prefix)) = current_prefix_iter {
if let Some((key, _value)) = handle_err(prefix_iter.next()) {
if key.starts_with(prefix) {
self.set_lru(*col as u32, &key[..], &None);
return Some((*col, key.to_vec(), None))
}
}
*current_prefix_iter = None;
}
return match ops.next() {
None => None,
Some(DBOp::Insert { col, key, value }) =>
Some((col as u8, key.to_vec(), Some(value))),
Some(DBOp::Delete { col, key }) => Some((col as u8, key.to_vec(), None)),
Some(DBOp::Insert { col, key, value }) => {
let value = Some(value);
self.set_lru(col, &key[..], &value);
Some((col as u8, key.to_vec(), value))
},
Some(DBOp::Delete { col, key }) => {
self.set_lru(col, &key[..], &None);
Some((col as u8, key.to_vec(), None))
},
Some(DBOp::DeletePrefix { col, prefix }) => {
let col = col as u8;
let mut iter = handle_err(self.db.iter(col));
Expand All @@ -253,6 +268,8 @@ pub mod paritydb_impl {
}
});

// Locking is required due to possible racy change of the content of a deleted prefix.
let _lock = self.write_lock.lock();
map_err(self.db.commit(transaction))
}
}
Expand All @@ -265,8 +282,22 @@ pub mod paritydb_impl {

impl DbAdapter {
/// Implementation of of `Database` for parity-db adapter.
pub fn new(db: Db, allowed_iter: &[u32]) -> Self {
DbAdapter { db, allowed_iter: allowed_iter.iter().cloned().collect() }
pub fn new(db: Db, allowed_iter: &[u32], caches_size: &[(u32, usize)]) -> Self {
let mut lru = Vec::<Option<Arc<RwLock<LRUMap<Vec<u8>, Option<Vec<u8>>>>>>>::new();
for (col, cache_size) in caches_size {
if *cache_size > 0 {
lru.resize(*col as usize + 1, None);
lru[*col as usize] = Some(Arc::new(RwLock::new(LRUMap::new(*cache_size))));
}
}
let write_lock = Arc::new(Mutex::new(()));
DbAdapter { db, allowed_iter: allowed_iter.iter().cloned().collect(), lru, write_lock }
}

fn set_lru(&self, col: u32, key: &[u8], value: &Option<Vec<u8>>) {
if let Some(lru) = self.lru.get(col as usize).map(Option::as_ref).flatten() {
lru.write().add(key.to_vec(), value.clone());
}
}
}

Expand All @@ -277,7 +308,7 @@ pub mod paritydb_impl {
use std::io;
use tempfile::Builder as TempfileBuilder;

fn create(num_col: u32) -> io::Result<(DbAdapter, tempfile::TempDir)> {
fn create(num_col: u32, with_cache: bool) -> io::Result<(DbAdapter, tempfile::TempDir)> {
let tempdir = TempfileBuilder::new().prefix("").tempdir()?;
let mut options = parity_db::Options::with_columns(tempdir.path(), num_col as u8);
for i in 0..num_col {
Expand All @@ -287,44 +318,138 @@ pub mod paritydb_impl {
let db = parity_db::Db::open_or_create(&options)
.map_err(|err| io::Error::new(io::ErrorKind::Other, format!("{:?}", err)))?;

let db = DbAdapter::new(db, &[0]);
let cache = if with_cache {
&[(0, 100)]
} else {
&[(0, 0)]
};
let db = DbAdapter::new(db, &[0], cache);
Ok((db, tempdir))
}

#[test]
fn put_and_get() -> io::Result<()> {
let (db, _temp_file) = create(1)?;
let (db, _temp_file) = create(1, false)?;
st::test_put_and_get(&db)?;
let (db, _temp_file) = create(1, true)?;
st::test_put_and_get(&db)
}

#[test]
fn delete_and_get() -> io::Result<()> {
let (db, _temp_file) = create(1)?;
let (db, _temp_file) = create(1, false)?;
st::test_delete_and_get(&db)?;
let (db, _temp_file) = create(1, true)?;
st::test_delete_and_get(&db)
}

#[test]
fn delete_prefix() -> io::Result<()> {
let (db, _temp_file) = create(st::DELETE_PREFIX_NUM_COLUMNS)?;
let (db, _temp_file) = create(st::DELETE_PREFIX_NUM_COLUMNS, false)?;
st::test_delete_prefix(&db)?;
let (db, _temp_file) = create(st::DELETE_PREFIX_NUM_COLUMNS, true)?;
st::test_delete_prefix(&db)
}

#[test]
fn iter() -> io::Result<()> {
let (db, _temp_file) = create(1)?;
let (db, _temp_file) = create(1, false)?;
st::test_iter(&db)?;
let (db, _temp_file) = create(1, true)?;
st::test_iter(&db)
}

#[test]
fn iter_with_prefix() -> io::Result<()> {
let (db, _temp_file) = create(1)?;
let (db, _temp_file) = create(1, false)?;
st::test_iter_with_prefix(&db)?;
let (db, _temp_file) = create(1, true)?;
st::test_iter_with_prefix(&db)
}

#[test]
fn complex() -> io::Result<()> {
let (db, _temp_file) = create(1)?;
let (db, _temp_file) = create(1, false)?;
st::test_complex(&db)?;
let (db, _temp_file) = create(1, true)?;
st::test_complex(&db)
}
}
}

mod lru_map {
use linked_hash_map::{Entry, LinkedHashMap};
use std::hash::Hash as StdHash;

pub(super) struct LRUMap<K, V>(LinkedHashMap<K, V>, usize, usize);

impl<K, V> LRUMap<K, V>
where
K: std::hash::Hash + Eq,
{
pub(crate) fn new(size_limit: usize) -> Self {
LRUMap(LinkedHashMap::new(), 0, size_limit)
}
}

/// Internal trait similar to `heapsize` but using
/// a simple estimation.
///
/// This should not be made public, it is implementation
/// detail trait. If it need to become public please
/// consider using `malloc_size_of`.
pub(super) trait EstimateSize {
/// Return a size estimation of additional size needed
/// to cache this struct (in bytes).
fn estimate_size(&self) -> usize;
}

impl EstimateSize for Vec<u8> {
fn estimate_size(&self) -> usize {
self.len()
}
}

impl EstimateSize for Option<Vec<u8>> {
fn estimate_size(&self) -> usize {
self.as_ref().map(|v| v.capacity()).unwrap_or(0)
}
}

impl<K: EstimateSize + Eq + StdHash, V: EstimateSize> LRUMap<K, V> {
pub(super) fn add(&mut self, k: K, v: V) {
let lmap = &mut self.0;
let storage_used_size = &mut self.1;
let limit = self.2;
let klen = k.estimate_size();
*storage_used_size += v.estimate_size();
match lmap.entry(k) {
Entry::Occupied(mut entry) => {
*storage_used_size -= entry.get().estimate_size();
entry.insert(v);
},
Entry::Vacant(entry) => {
*storage_used_size += klen;
entry.insert(v);
},
};

while *storage_used_size > limit {
if let Some((k, v)) = lmap.pop_front() {
*storage_used_size -= k.estimate_size();
*storage_used_size -= v.estimate_size();
} else {
break
}
}
}

pub(super) fn get<Q: ?Sized>(&mut self, k: &Q) -> Option<&mut V>
where
K: std::borrow::Borrow<Q>,
Q: StdHash + Eq,
{
self.0.get_refresh(k)
}
}
}

0 comments on commit ae177bc

Please sign in to comment.