Skip to content

Commit

Permalink
feat: introduce RollDB (#307)
Browse files Browse the repository at this point in the history
  • Loading branch information
scarmuega authored Oct 13, 2023
1 parent 986ee75 commit d1ac056
Show file tree
Hide file tree
Showing 14 changed files with 1,481 additions and 0 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ members = [
"pallas-crypto",
"pallas-configs",
"pallas-primitives",
"pallas-rolldb",
"pallas-traverse",
"pallas-utxorpc",
"pallas",
Expand Down
26 changes: 26 additions & 0 deletions pallas-rolldb/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[package]
name = "pallas-rolldb"
description = "An opinionated Cardano storage engine built on top of RocksDB"
version = "0.19.1"
edition = "2021"
repository = "https://github.com/txpipe/pallas"
homepage = "https://github.com/txpipe/pallas"
documentation = "https://docs.rs/pallas-rolldb"
license = "Apache-2.0"
readme = "README.md"
authors = ["Santiago Carmuega <[email protected]>"]

[dependencies]
rocksdb = { version = "0.21.0", default-features = false, features = ["multi-threaded-cf"] }
bincode = "1.3.3"
serde = "1.0.188"
thiserror = "1.0.49"
pallas-crypto = { version = "=0.19.1", path = "../pallas-crypto" }
tracing = "0.1.37"
tokio = { version = "1.32.0", features = ["sync", "rt", "time", "macros"] }
async-stream = "0.3.5"
futures-core = "0.3.28"
futures-util = "0.3.28"

[dev-dependencies]
tempfile = "3.3.0"
4 changes: 4 additions & 0 deletions pallas-rolldb/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Pallas RollDB

An opinionated Cardano storage engine built on top of RocksDB.

12 changes: 12 additions & 0 deletions pallas-rolldb/src/chain/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
use pallas_crypto::hash::Hash;

mod store;

#[cfg(test)]
mod tests;

pub type BlockSlot = u64;
pub type BlockHash = Hash<32>;
pub type BlockBody = Vec<u8>;

pub use store::*;
271 changes: 271 additions & 0 deletions pallas-rolldb/src/chain/store.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
use pallas_crypto::hash::Hash;
use std::{path::Path, sync::Arc};
use tracing::warn;

use rocksdb::{Options, WriteBatch, DB};

use super::{BlockBody, BlockHash, BlockSlot};

use crate::kvtable::*;

#[derive(Clone)]
pub struct Store {
db: Arc<DB>,
pub tip_change: Arc<tokio::sync::Notify>,
}

pub struct BlockByHashKV;

// hash -> block cbor
impl KVTable<DBHash, DBBytes> for BlockByHashKV {
const CF_NAME: &'static str = "BlockByHashKV";
}

// slot => block hash
pub struct HashBySlotKV;

impl KVTable<DBInt, DBHash> for HashBySlotKV {
const CF_NAME: &'static str = "HashBySlotKV";
}

pub struct ChainIterator<'a>(pub EntryIterator<'a, DBInt, DBHash>);

impl Iterator for ChainIterator<'_> {
type Item = Result<(u64, Hash<32>), Error>;

fn next(&mut self) -> Option<Self::Item> {
self.0.next().map(|v| v.map(|(seq, val)| (seq.0, val.0)))
}
}

impl Store {
pub fn open(path: impl AsRef<Path>) -> Result<Self, Error> {
let mut opts = Options::default();
opts.create_if_missing(true);
opts.create_missing_column_families(true);

let db = DB::open_cf(&opts, path, [BlockByHashKV::CF_NAME, HashBySlotKV::CF_NAME])
.map_err(|_| Error::IO)?;

let out = Self {
db: Arc::new(db),
tip_change: Arc::new(tokio::sync::Notify::new()),
};

Ok(out)
}

pub fn get_block(&self, hash: Hash<32>) -> Result<Option<BlockBody>, Error> {
let dbval = BlockByHashKV::get_by_key(&self.db, DBHash(hash))?;
Ok(dbval.map(|x| x.0))
}

pub fn roll_forward(
&mut self,
slot: BlockSlot,
hash: BlockHash,
body: BlockBody,
) -> Result<(), Error> {
let mut batch = WriteBatch::default();

// keep track of the new block body
BlockByHashKV::stage_upsert(&self.db, DBHash(hash), DBBytes(body), &mut batch);

// add new block to HashBySlotKV
HashBySlotKV::stage_upsert(&self.db, DBInt(slot), DBHash(hash), &mut batch);

self.db.write(batch).map_err(|_| Error::IO)?;
self.tip_change.notify_waiters();

Ok(())
}

pub fn roll_back(&mut self, until: BlockSlot) -> Result<(), Error> {
let mut batch = WriteBatch::default();

// remove rollback-ed blocks from HashBySlotKV
let to_remove = HashBySlotKV::iter_keys_from(&self.db, DBInt(until)).skip(1);

for key in to_remove {
HashBySlotKV::stage_delete(&self.db, key?, &mut batch);
}

self.db.write(batch).map_err(|_| Error::IO)?;
self.tip_change.notify_waiters();

Ok(())
}

pub fn roll_back_origin(&mut self) -> Result<(), Error> {
HashBySlotKV::reset(&self.db)?;
BlockByHashKV::reset(&self.db)?;

self.tip_change.notify_waiters();

Ok(())
}

pub fn find_tip(&self) -> Result<Option<(BlockSlot, BlockHash)>, Error> {
let mut iter = HashBySlotKV::iter_entries(&self.db, rocksdb::IteratorMode::End);

if let Some(last) = iter.next() {
let (slot, hash) = last?;
Ok(Some((slot.0, hash.0)))
} else {
Ok(None)
}
}

pub fn intersect_options(
&self,
max_items: usize,
) -> Result<Vec<(BlockSlot, BlockHash)>, Error> {
let mut iter = HashBySlotKV::iter_entries(&self.db, rocksdb::IteratorMode::End)
.filter_map(|res| res.ok())
.map(|(k, v)| (k.0, v.0));

let mut out = Vec::with_capacity(max_items);

while let Some((slot, hash)) = iter.next() {
out.push((slot, hash));

if out.len() >= max_items {
break;
}

// skip exponentially
let skip = 2usize.pow(out.len() as u32) - 1;
for _ in 0..skip {
iter.next();
}
}

Ok(out)
}

pub fn crawl_after(&self, slot: Option<u64>) -> ChainIterator {
if let Some(slot) = slot {
let slot = Box::<[u8]>::from(DBInt(slot));
let from = rocksdb::IteratorMode::From(&slot, rocksdb::Direction::Forward);
let mut iter = HashBySlotKV::iter_entries(&self.db, from);

// skip current
iter.next();

ChainIterator(iter)
} else {
let from = rocksdb::IteratorMode::Start;
let iter = HashBySlotKV::iter_entries(&self.db, from);
ChainIterator(iter)
}
}

pub fn crawl(&self) -> ChainIterator {
self.crawl_after(None)
}

pub fn read_chain_page(
&self,
from: BlockSlot,
len: usize,
) -> impl Iterator<Item = Result<(BlockSlot, BlockHash), Error>> + '_ {
HashBySlotKV::iter_entries_from(&self.db, DBInt(from))
.map(|res| res.map(|(x, y)| (x.0, y.0)))
.take(len)
}

/// Iterator over chain between two points (inclusive)
///
/// To use Origin as start point set `from` to None.
///
/// Returns None if either point in range don't exist or `to` point is
/// earlier in chain than `from`.
pub fn read_chain_range(
&self,
from: Option<(BlockSlot, BlockHash)>,
to: (BlockSlot, BlockHash),
) -> Result<Option<impl Iterator<Item = Result<(BlockSlot, BlockHash), Error>> + '_>, Error>
{
// TODO: We want to use a snapshot here to avoid race condition where
// point is checked to be in the HashBySlotKV but it is rolled-back before we
// create the iterator. Problem is `HashBySlotKV` etc must take `DB`, not
// `Snapshot<DB>`, so maybe we need a new way of creating something like
// a "KVTableSnapshot" in addition to the current "KVTable" type, which
// has methods on snapshots, but here I was having issues as there is
// no `cf` method on Snapshot but it is used is KVTable.

// let snapshot = self.db.snapshot();

// check p2 not before p1
let p1_slot = if let Some((slot, _)) = from {
if to.0 < slot {
warn!("chain range end slot before start slot");
return Ok(None);
} else {
slot
}
} else {
0 // Use 0 as slot for Origin
};

// check p1 exists in HashBySlotKV if provided
if let Some((slot, hash)) = from {
match HashBySlotKV::get_by_key(&self.db, DBInt(slot))? {
Some(DBHash(found_hash)) => {
if hash != found_hash {
warn!("chain range start hash mismatch");
return Ok(None);
}
}
None => {
warn!("chain range start slot not found");
return Ok(None);
}
}
}

// check p2 exists in HashBySlotKV
match HashBySlotKV::get_by_key(&self.db, DBInt(to.0))? {
Some(DBHash(found_hash)) => {
if to.1 != found_hash {
warn!("chain range end hash mismatch");
return Ok(None);
}
}
None => {
warn!("chain range end slot not found");
return Ok(None);
}
};

// return iterator between p1 and p2 inclusive
Ok(Some(
HashBySlotKV::iter_entries_from(&self.db, DBInt(p1_slot))
.map(|res| res.map(|(x, y)| (x.0, y.0)))
.take_while(move |x| {
if let Ok((slot, _)) = x {
// iter returns None once point is after `to` slot
*slot <= to.0
} else {
false
}
}),
))
}

/// Check if a point (pair of slot and block hash) exists in the
/// HashBySlotKV
pub fn chain_contains(&self, slot: BlockSlot, hash: &BlockHash) -> Result<bool, Error> {
if let Some(DBHash(found)) = HashBySlotKV::get_by_key(&self.db, DBInt(slot))? {
if found == *hash {
return Ok(true);
}
}

Ok(false)
}

pub fn destroy(path: impl AsRef<Path>) -> Result<(), Error> {
DB::destroy(&Options::default(), path).map_err(|_| Error::IO)
}
}
Loading

0 comments on commit d1ac056

Please sign in to comment.