Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1. feat(db): Store transactions in a separate database index, to improve query speed #3934

Merged
merged 18 commits into from
Apr 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion zebra-state/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub use zebra_chain::transparent::MIN_TRANSPARENT_COINBASE_MATURITY;
pub const MAX_BLOCK_REORG_HEIGHT: u32 = MIN_TRANSPARENT_COINBASE_MATURITY - 1;

/// The database format version, incremented each time the database format changes.
pub const DATABASE_FORMAT_VERSION: u32 = 14;
pub const DATABASE_FORMAT_VERSION: u32 = 16;
teor2345 marked this conversation as resolved.
Show resolved Hide resolved

/// The maximum number of blocks to check for NU5 transactions,
/// before we assume we are on a pre-NU5 legacy chain.
Expand Down
220 changes: 182 additions & 38 deletions zebra-state/src/service/finalized_state/disk_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,40 @@ use crate::{
#[cfg(any(test, feature = "proptest-impl"))]
mod tests;

/// The [`rocksdb::ThreadMode`] used by the database.
pub type DBThreadMode = rocksdb::SingleThreaded;

/// The [`rocksdb`] database type, including thread mode.
///
/// Also the [`rocksdb::DBAccess`] used by database iterators.
pub type DB = rocksdb::DBWithThreadMode<DBThreadMode>;

/// Wrapper struct to ensure low-level database access goes through the correct API.
///
/// # Correctness
///
/// Reading transactions from the database using RocksDB iterators causes hangs.
/// But creating iterators and reading the tip height works fine.
///
/// So these hangs are probably caused by holding column family locks to read:
/// - multiple values, or
/// - large values.
///
/// This bug might be fixed by moving database operations to blocking threads (#2188),
/// so that they don't block the tokio executor.
/// (Or it might be fixed by future RocksDB upgrades.)
#[derive(Clone, Debug)]
pub struct DiskDb {
/// The shared inner RocksDB database.
///
/// RocksDB allows reads and writes via a shared reference.
/// Only column family changes and [`Drop`] require exclusive access.
db: Arc<rocksdb::DB>,
///
/// In [`SingleThreaded`](rocksdb::SingleThreaded) mode,
/// column family changes and [`Drop`] require exclusive access.
///
/// In [`MultiThreaded`](rocksdb::MultiThreaded) mode,
/// only [`Drop`] requires exclusive access.
db: Arc<DB>,

/// The configured temporary database setting.
///
Expand All @@ -50,23 +76,28 @@ pub struct DiskWriteBatch {

/// Helper trait for inserting (Key, Value) pairs into rocksdb with a consistently
/// defined format
//
// TODO: just implement these methods directly on WriteBatch
pub trait WriteDisk {
/// Serialize and insert the given key and value into a rocksdb column family,
/// overwriting any existing `value` for `key`.
fn zs_insert<K, V>(&mut self, cf: &rocksdb::ColumnFamily, key: K, value: V)
fn zs_insert<C, K, V>(&mut self, cf: &C, key: K, value: V)
where
C: rocksdb::AsColumnFamilyRef,
K: IntoDisk + Debug,
V: IntoDisk;

/// Remove the given key form rocksdb column family if it exists.
fn zs_delete<K>(&mut self, cf: &rocksdb::ColumnFamily, key: K)
fn zs_delete<C, K>(&mut self, cf: &C, key: K)
where
C: rocksdb::AsColumnFamilyRef,
K: IntoDisk + Debug;
}

impl WriteDisk for DiskWriteBatch {
fn zs_insert<K, V>(&mut self, cf: &rocksdb::ColumnFamily, key: K, value: V)
fn zs_insert<C, K, V>(&mut self, cf: &C, key: K, value: V)
where
C: rocksdb::AsColumnFamilyRef,
K: IntoDisk + Debug,
V: IntoDisk,
{
Expand All @@ -75,8 +106,9 @@ impl WriteDisk for DiskWriteBatch {
self.batch.put_cf(cf, key_bytes, value_bytes);
}

fn zs_delete<K>(&mut self, cf: &rocksdb::ColumnFamily, key: K)
fn zs_delete<C, K>(&mut self, cf: &C, key: K)
where
C: rocksdb::AsColumnFamilyRef,
K: IntoDisk + Debug,
{
let key_bytes = key.as_bytes();
Expand All @@ -86,16 +118,65 @@ impl WriteDisk for DiskWriteBatch {

/// Helper trait for retrieving values from rocksdb column familys with a consistently
/// defined format
//
// TODO: just implement these methods directly on DiskDb
pub trait ReadDisk {
/// Returns true if a rocksdb column family `cf` does not contain any entries.
fn zs_is_empty<C>(&self, cf: &C) -> bool
where
C: rocksdb::AsColumnFamilyRef;

/// Returns the value for `key` in the rocksdb column family `cf`, if present.
fn zs_get<K, V>(&self, cf: &rocksdb::ColumnFamily, key: &K) -> Option<V>
fn zs_get<C, K, V>(&self, cf: &C, key: &K) -> Option<V>
where
C: rocksdb::AsColumnFamilyRef,
K: IntoDisk,
V: FromDisk;

/// Check if a rocksdb column family `cf` contains the serialized form of `key`.
fn zs_contains<K>(&self, cf: &rocksdb::ColumnFamily, key: &K) -> bool
fn zs_contains<C, K>(&self, cf: &C, key: &K) -> bool
where
C: rocksdb::AsColumnFamilyRef,
K: IntoDisk;

/// Returns the lowest key in `cf`, and the corresponding value.
///
/// Returns `None` if the column family is empty.
fn zs_first_key_value<C>(&self, cf: &C) -> Option<(Box<[u8]>, Box<[u8]>)>
where
C: rocksdb::AsColumnFamilyRef;

/// Returns the highest key in `cf`, and the corresponding value.
///
/// Returns `None` if the column family is empty.
fn zs_last_key_value<C>(&self, cf: &C) -> Option<(Box<[u8]>, Box<[u8]>)>
where
C: rocksdb::AsColumnFamilyRef;

/// Returns the first key greater than or equal to `lower_bound` in `cf`,
/// and the corresponding value.
///
/// Returns `None` if there are no keys greater than or equal to `lower_bound`.
fn zs_next_key_value_from<C, K>(
&self,
cf: &C,
lower_bound: &K,
) -> Option<(Box<[u8]>, Box<[u8]>)>
where
C: rocksdb::AsColumnFamilyRef,
K: IntoDisk;

/// Returns the first key less than or equal to `upper_bound` in `cf`,
/// and the corresponding value.
///
/// Returns `None` if there are no keys less than or equal to `upper_bound`.
fn zs_prev_key_value_back_from<C, K>(
&self,
cf: &C,
upper_bound: &K,
) -> Option<(Box<[u8]>, Box<[u8]>)>
where
C: rocksdb::AsColumnFamilyRef,
K: IntoDisk;
}

Expand All @@ -116,8 +197,22 @@ impl PartialEq for DiskDb {
impl Eq for DiskDb {}

impl ReadDisk for DiskDb {
fn zs_get<K, V>(&self, cf: &rocksdb::ColumnFamily, key: &K) -> Option<V>
fn zs_is_empty<C>(&self, cf: &C) -> bool
where
C: rocksdb::AsColumnFamilyRef,
{
// Empty column families return invalid forward iterators.
//
// Checking iterator validity does not seem to cause database hangs.
!self
.db
.iterator_cf(cf, rocksdb::IteratorMode::Start)
.valid()
}

fn zs_get<C, K, V>(&self, cf: &C, key: &K) -> Option<V>
where
C: rocksdb::AsColumnFamilyRef,
K: IntoDisk,
V: FromDisk,
{
Expand All @@ -136,8 +231,9 @@ impl ReadDisk for DiskDb {
value_bytes.map(V::from_bytes)
}

fn zs_contains<K>(&self, cf: &rocksdb::ColumnFamily, key: &K) -> bool
fn zs_contains<C, K>(&self, cf: &C, key: &K) -> bool
where
C: rocksdb::AsColumnFamilyRef,
K: IntoDisk,
{
let key_bytes = key.as_bytes();
Expand All @@ -151,6 +247,54 @@ impl ReadDisk for DiskDb {
.expect("expected that disk errors would not occur")
.is_some()
}

fn zs_first_key_value<C>(&self, cf: &C) -> Option<(Box<[u8]>, Box<[u8]>)>
where
C: rocksdb::AsColumnFamilyRef,
{
// Reading individual values from iterators does not seem to cause database hangs.
self.db.iterator_cf(cf, rocksdb::IteratorMode::Start).next()
}

fn zs_last_key_value<C>(&self, cf: &C) -> Option<(Box<[u8]>, Box<[u8]>)>
where
C: rocksdb::AsColumnFamilyRef,
{
// Reading individual values from iterators does not seem to cause database hangs.
self.db.iterator_cf(cf, rocksdb::IteratorMode::End).next()
}

fn zs_next_key_value_from<C, K>(
&self,
cf: &C,
lower_bound: &K,
) -> Option<(Box<[u8]>, Box<[u8]>)>
where
C: rocksdb::AsColumnFamilyRef,
K: IntoDisk,
{
let lower_bound = lower_bound.as_bytes();
let from = rocksdb::IteratorMode::From(lower_bound.as_ref(), rocksdb::Direction::Forward);

// Reading individual values from iterators does not seem to cause database hangs.
self.db.iterator_cf(cf, from).next()
}

fn zs_prev_key_value_back_from<C, K>(
&self,
cf: &C,
upper_bound: &K,
) -> Option<(Box<[u8]>, Box<[u8]>)>
where
C: rocksdb::AsColumnFamilyRef,
K: IntoDisk,
{
let upper_bound = upper_bound.as_bytes();
let from = rocksdb::IteratorMode::From(upper_bound.as_ref(), rocksdb::Direction::Reverse);

// Reading individual values from iterators does not seem to cause database hangs.
self.db.iterator_cf(cf, from).next()
}
}

impl DiskWriteBatch {
Expand Down Expand Up @@ -189,32 +333,47 @@ impl DiskDb {
let db_options = DiskDb::options();

let column_families = vec![
// Blocks
// TODO: rename to block_header_by_height (#3151)
rocksdb::ColumnFamilyDescriptor::new("block_by_height", db_options.clone()),
rocksdb::ColumnFamilyDescriptor::new("hash_by_height", db_options.clone()),
rocksdb::ColumnFamilyDescriptor::new("height_by_hash", db_options.clone()),
rocksdb::ColumnFamilyDescriptor::new("block_by_height", db_options.clone()),
// Transactions
rocksdb::ColumnFamilyDescriptor::new("tx_by_loc", db_options.clone()),
rocksdb::ColumnFamilyDescriptor::new("hash_by_tx_loc", db_options.clone()),
// TODO: rename to tx_loc_by_hash (#3151)
rocksdb::ColumnFamilyDescriptor::new("tx_by_hash", db_options.clone()),
// Transparent
rocksdb::ColumnFamilyDescriptor::new("utxo_by_outpoint", db_options.clone()),
// Sprout
rocksdb::ColumnFamilyDescriptor::new("sprout_nullifiers", db_options.clone()),
rocksdb::ColumnFamilyDescriptor::new("sapling_nullifiers", db_options.clone()),
rocksdb::ColumnFamilyDescriptor::new("orchard_nullifiers", db_options.clone()),
rocksdb::ColumnFamilyDescriptor::new("sprout_anchors", db_options.clone()),
rocksdb::ColumnFamilyDescriptor::new("sapling_anchors", db_options.clone()),
rocksdb::ColumnFamilyDescriptor::new("orchard_anchors", db_options.clone()),
rocksdb::ColumnFamilyDescriptor::new("sprout_note_commitment_tree", db_options.clone()),
// Sapling
rocksdb::ColumnFamilyDescriptor::new("sapling_nullifiers", db_options.clone()),
rocksdb::ColumnFamilyDescriptor::new("sapling_anchors", db_options.clone()),
rocksdb::ColumnFamilyDescriptor::new(
"sapling_note_commitment_tree",
db_options.clone(),
),
// Orchard
rocksdb::ColumnFamilyDescriptor::new("orchard_nullifiers", db_options.clone()),
rocksdb::ColumnFamilyDescriptor::new("orchard_anchors", db_options.clone()),
rocksdb::ColumnFamilyDescriptor::new(
"orchard_note_commitment_tree",
db_options.clone(),
),
// Chain
rocksdb::ColumnFamilyDescriptor::new("history_tree", db_options.clone()),
rocksdb::ColumnFamilyDescriptor::new("tip_chain_value_pool", db_options.clone()),
];

// TODO: move opening the database to a blocking thread (#2188)
let db_result = rocksdb::DB::open_cf_descriptors(&db_options, &path, column_families);
let db_result = rocksdb::DBWithThreadMode::<DBThreadMode>::open_cf_descriptors(
&db_options,
&path,
column_families,
);

match db_result {
Ok(db) => {
Expand All @@ -240,39 +399,22 @@ impl DiskDb {
}
}

// Read methods
// Accessor methods

/// Returns the `Path` where the files used by this database are located.
pub fn path(&self) -> &Path {
self.db.path()
}

/// Returns the column family handle for `cf_name`.
pub fn cf_handle(&self, cf_name: &str) -> Option<&rocksdb::ColumnFamily> {
pub fn cf_handle(&self, cf_name: &str) -> Option<impl rocksdb::AsColumnFamilyRef + '_> {
self.db.cf_handle(cf_name)
}

/// Returns an iterator over the keys in `cf_name`, starting from the first key.
///
/// TODO: add an iterator wrapper struct that does disk reads in a blocking thread (#2188)
pub fn forward_iterator(&self, cf_handle: &rocksdb::ColumnFamily) -> rocksdb::DBIterator {
self.db.iterator_cf(cf_handle, rocksdb::IteratorMode::Start)
}

/// Returns a reverse iterator over the keys in `cf_name`, starting from the last key.
///
/// TODO: add an iterator wrapper struct that does disk reads in a blocking thread (#2188)
pub fn reverse_iterator(&self, cf_handle: &rocksdb::ColumnFamily) -> rocksdb::DBIterator {
self.db.iterator_cf(cf_handle, rocksdb::IteratorMode::End)
}

/// Returns true if `cf` does not contain any entries.
pub fn is_empty(&self, cf_handle: &rocksdb::ColumnFamily) -> bool {
// Empty column families return invalid iterators.
!self.forward_iterator(cf_handle).valid()
}
// Read methods are located in the ReadDisk trait

// Write methods
// Low-level write methods are located in the WriteDisk trait

/// Writes `batch` to the database.
pub fn write(&self, batch: DiskWriteBatch) -> Result<(), rocksdb::Error> {
Expand Down Expand Up @@ -402,6 +544,8 @@ impl DiskDb {
current_limit
}

// Cleanup methods

/// Shut down the database, cleaning up background tasks and ephemeral data.
///
/// If `force` is true, clean up regardless of any shared references.
Expand Down Expand Up @@ -532,7 +676,7 @@ impl DiskDb {
fn assert_default_cf_is_empty(&self) {
if let Some(default_cf) = self.cf_handle("default") {
assert!(
self.is_empty(default_cf),
self.zs_is_empty(&default_cf),
"Zebra should not store data in the 'default' column family"
);
}
Expand Down
13 changes: 12 additions & 1 deletion zebra-state/src/service/finalized_state/disk_db/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,18 @@

#![allow(dead_code)]

use crate::service::finalized_state::disk_db::DiskDb;
use std::ops::Deref;

use crate::service::finalized_state::disk_db::{DiskDb, DB};

// Enable older test code to automatically access the inner database via Deref coercion.
impl Deref for DiskDb {
type Target = DB;

fn deref(&self) -> &Self::Target {
&self.db
}
}

impl DiskDb {
/// Returns a list of column family names in this database.
Expand Down
Loading