Skip to content

Commit

Permalink
Manually iterate through transaction locations to re-create blocks
Browse files Browse the repository at this point in the history
Also:
- re-write disk read API to avoid iterator hangs
- move disk read API to ReadDisk
- re-write impl rocksdb::AsColumnFamilyRef to a where clause, for consistency
  • Loading branch information
teor2345 committed Mar 25, 2022
1 parent 2e3aa58 commit cb0ee0c
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 91 deletions.
219 changes: 145 additions & 74 deletions zebra-state/src/service/finalized_state/disk_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,19 @@ pub type DBThreadMode = rocksdb::SingleThreaded;
pub type DB = rocksdb::DBWithThreadMode<DBThreadMode>;

/// Wrapper struct to ensure low-level database access goes through the correct API.
///
/// # Correctness
///
/// Reading transactions from the database using RocksDB iterators causes hangs.
/// But creating iterators and reading the tip height works fine.
///
/// So these hangs are probably caused by holding column family locks to read:
/// - multiple values, or
/// - large values.
///
/// This bug might be fixed by moving database operations to blocking threads (#2188),
/// so that they don't block the tokio executor.
/// (Or it might be fixed by future RocksDB upgrades.)
#[derive(Clone, Debug)]
pub struct DiskDb {
/// The shared inner RocksDB database.
Expand Down Expand Up @@ -63,23 +76,28 @@ pub struct DiskWriteBatch {

/// Helper trait for inserting (Key, Value) pairs into rocksdb with a consistently
/// defined format
//
// TODO: just implement these methods directly on WriteBatch
pub trait WriteDisk {
/// Serialize and insert the given key and value into a rocksdb column family,
/// overwriting any existing `value` for `key`.
fn zs_insert<K, V>(&mut self, cf: &impl rocksdb::AsColumnFamilyRef, key: K, value: V)
fn zs_insert<C, K, V>(&mut self, cf: &C, key: K, value: V)
where
C: rocksdb::AsColumnFamilyRef,
K: IntoDisk + Debug,
V: IntoDisk;

/// Remove the given key form rocksdb column family if it exists.
fn zs_delete<K>(&mut self, cf: &impl rocksdb::AsColumnFamilyRef, key: K)
fn zs_delete<C, K>(&mut self, cf: &C, key: K)
where
C: rocksdb::AsColumnFamilyRef,
K: IntoDisk + Debug;
}

impl WriteDisk for DiskWriteBatch {
fn zs_insert<K, V>(&mut self, cf: &impl rocksdb::AsColumnFamilyRef, key: K, value: V)
fn zs_insert<C, K, V>(&mut self, cf: &C, key: K, value: V)
where
C: rocksdb::AsColumnFamilyRef,
K: IntoDisk + Debug,
V: IntoDisk,
{
Expand All @@ -88,8 +106,9 @@ impl WriteDisk for DiskWriteBatch {
self.batch.put_cf(cf, key_bytes, value_bytes);
}

fn zs_delete<K>(&mut self, cf: &impl rocksdb::AsColumnFamilyRef, key: K)
fn zs_delete<C, K>(&mut self, cf: &C, key: K)
where
C: rocksdb::AsColumnFamilyRef,
K: IntoDisk + Debug,
{
let key_bytes = key.as_bytes();
Expand All @@ -99,16 +118,65 @@ impl WriteDisk for DiskWriteBatch {

/// Helper trait for retrieving values from rocksdb column familys with a consistently
/// defined format
//
// TODO: just implement these methods directly on DiskDb
pub trait ReadDisk {
/// Returns true if a rocksdb column family `cf` does not contain any entries.
fn zs_is_empty<C>(&self, cf: &C) -> bool
where
C: rocksdb::AsColumnFamilyRef;

/// Returns the value for `key` in the rocksdb column family `cf`, if present.
fn zs_get<K, V>(&self, cf: &impl rocksdb::AsColumnFamilyRef, key: &K) -> Option<V>
fn zs_get<C, K, V>(&self, cf: &C, key: &K) -> Option<V>
where
C: rocksdb::AsColumnFamilyRef,
K: IntoDisk,
V: FromDisk;

/// Check if a rocksdb column family `cf` contains the serialized form of `key`.
fn zs_contains<K>(&self, cf: &impl rocksdb::AsColumnFamilyRef, key: &K) -> bool
fn zs_contains<C, K>(&self, cf: &C, key: &K) -> bool
where
C: rocksdb::AsColumnFamilyRef,
K: IntoDisk;

/// Returns the lowest key in `cf`, and the corresponding value.
///
/// Returns `None` if the column family is empty.
fn zs_first_key_value<C>(&self, cf: &C) -> Option<(Box<[u8]>, Box<[u8]>)>
where
C: rocksdb::AsColumnFamilyRef;

/// Returns the highest key in `cf`, and the corresponding value.
///
/// Returns `None` if the column family is empty.
fn zs_last_key_value<C>(&self, cf: &C) -> Option<(Box<[u8]>, Box<[u8]>)>
where
C: rocksdb::AsColumnFamilyRef;

/// Returns the first key greater than or equal to `lower_bound` in `cf`,
/// and the corresponding value.
///
/// Returns `None` if there are no keys greater than or equal to `lower_bound`.
fn zs_next_key_value_from<C, K>(
&self,
cf: &C,
lower_bound: &K,
) -> Option<(Box<[u8]>, Box<[u8]>)>
where
C: rocksdb::AsColumnFamilyRef,
K: IntoDisk;

/// Returns the first key less than or equal to `upper_bound` in `cf`,
/// and the corresponding value.
///
/// Returns `None` if there are no keys less than or equal to `upper_bound`.
fn zs_prev_key_value_back_from<C, K>(
&self,
cf: &C,
upper_bound: &K,
) -> Option<(Box<[u8]>, Box<[u8]>)>
where
C: rocksdb::AsColumnFamilyRef,
K: IntoDisk;
}

Expand All @@ -129,8 +197,22 @@ impl PartialEq for DiskDb {
impl Eq for DiskDb {}

impl ReadDisk for DiskDb {
fn zs_get<K, V>(&self, cf: &impl rocksdb::AsColumnFamilyRef, key: &K) -> Option<V>
fn zs_is_empty<C>(&self, cf: &C) -> bool
where
C: rocksdb::AsColumnFamilyRef,
{
// Empty column families return invalid forward iterators.
//
// Checking iterator validity does not seem to cause database hangs.
!self
.db
.iterator_cf(cf, rocksdb::IteratorMode::Start)
.valid()
}

fn zs_get<C, K, V>(&self, cf: &C, key: &K) -> Option<V>
where
C: rocksdb::AsColumnFamilyRef,
K: IntoDisk,
V: FromDisk,
{
Expand All @@ -149,8 +231,9 @@ impl ReadDisk for DiskDb {
value_bytes.map(V::from_bytes)
}

fn zs_contains<K>(&self, cf: &impl rocksdb::AsColumnFamilyRef, key: &K) -> bool
fn zs_contains<C, K>(&self, cf: &C, key: &K) -> bool
where
C: rocksdb::AsColumnFamilyRef,
K: IntoDisk,
{
let key_bytes = key.as_bytes();
Expand All @@ -164,6 +247,54 @@ impl ReadDisk for DiskDb {
.expect("expected that disk errors would not occur")
.is_some()
}

fn zs_first_key_value<C>(&self, cf: &C) -> Option<(Box<[u8]>, Box<[u8]>)>
where
C: rocksdb::AsColumnFamilyRef,
{
// Reading individual values from iterators does not seem to cause database hangs.
self.db.iterator_cf(cf, rocksdb::IteratorMode::Start).next()
}

fn zs_last_key_value<C>(&self, cf: &C) -> Option<(Box<[u8]>, Box<[u8]>)>
where
C: rocksdb::AsColumnFamilyRef,
{
// Reading individual values from iterators does not seem to cause database hangs.
self.db.iterator_cf(cf, rocksdb::IteratorMode::End).next()
}

fn zs_next_key_value_from<C, K>(
&self,
cf: &C,
lower_bound: &K,
) -> Option<(Box<[u8]>, Box<[u8]>)>
where
C: rocksdb::AsColumnFamilyRef,
K: IntoDisk,
{
let lower_bound = lower_bound.as_bytes();
let from = rocksdb::IteratorMode::From(lower_bound.as_ref(), rocksdb::Direction::Forward);

// Reading individual values from iterators does not seem to cause database hangs.
self.db.iterator_cf(cf, from).next()
}

fn zs_prev_key_value_back_from<C, K>(
&self,
cf: &C,
upper_bound: &K,
) -> Option<(Box<[u8]>, Box<[u8]>)>
where
C: rocksdb::AsColumnFamilyRef,
K: IntoDisk,
{
let upper_bound = upper_bound.as_bytes();
let from = rocksdb::IteratorMode::From(upper_bound.as_ref(), rocksdb::Direction::Reverse);

// Reading individual values from iterators does not seem to cause database hangs.
self.db.iterator_cf(cf, from).next()
}
}

impl DiskWriteBatch {
Expand Down Expand Up @@ -201,15 +332,6 @@ impl DiskDb {
let path = config.db_path(network);
let db_options = DiskDb::options();

// # Correctness
//
// We can't use prefix extractors here, because they cause hangs,
// probably due to column family locks.
//
// This bug might be fixed by moving database operations to blocking threads (#2188),
// so that they don't block the tokio executor.
// (Or it might be fixed by future RocksDB upgrades.)

let column_families = vec![
// Blocks
// TODO: rename to block_header_by_height (#3151)
Expand Down Expand Up @@ -277,7 +399,7 @@ impl DiskDb {
}
}

// Read methods
// Accessor methods

/// Returns the `Path` where the files used by this database are located.
pub fn path(&self) -> &Path {
Expand All @@ -289,63 +411,10 @@ impl DiskDb {
self.db.cf_handle(cf_name)
}

/// Returns a forward iterator over the keys in `cf_name`, starting from the first key.
///
/// TODO: add an iterator wrapper struct that does disk reads in a blocking thread (#2188)
pub fn forward_iterator(
&self,
cf_handle: impl rocksdb::AsColumnFamilyRef,
) -> rocksdb::DBIteratorWithThreadMode<DB> {
self.db
.iterator_cf(&cf_handle, rocksdb::IteratorMode::Start)
}

/// Returns a forward iterator over the keys in `cf_name`, starting from `lowest_key`.
///
/// TODO: add an iterator wrapper struct that does disk reads in a blocking thread (#2188)
pub fn forward_iterator_from(
&self,
cf_handle: impl rocksdb::AsColumnFamilyRef,
lowest_key: &impl IntoDisk,
) -> rocksdb::DBIteratorWithThreadMode<DB> {
let lowest_key = lowest_key.as_bytes();
let from = rocksdb::IteratorMode::From(lowest_key.as_ref(), rocksdb::Direction::Forward);

self.db.iterator_cf(&cf_handle, from)
}

/// Returns a reverse iterator over the keys in `cf_name`, starting from `highest_key`.
///
/// TODO: add an iterator wrapper struct that does disk reads in a blocking thread (#2188)
#[allow(dead_code)]
pub fn reverse_iterator_from(
&self,
cf_handle: impl rocksdb::AsColumnFamilyRef,
highest_key: &impl IntoDisk,
) -> rocksdb::DBIteratorWithThreadMode<DB> {
let highest_key = highest_key.as_bytes();
let from = rocksdb::IteratorMode::From(highest_key.as_ref(), rocksdb::Direction::Reverse);

self.db.iterator_cf(&cf_handle, from)
}

/// Returns a reverse iterator over the keys in `cf_name`, starting from the last key.
///
/// TODO: add an iterator wrapper struct that does disk reads in a blocking thread (#2188)
pub fn reverse_iterator(
&self,
cf_handle: impl rocksdb::AsColumnFamilyRef,
) -> rocksdb::DBIteratorWithThreadMode<DB> {
self.db.iterator_cf(&cf_handle, rocksdb::IteratorMode::End)
}

/// Returns true if `cf` does not contain any entries.
pub fn is_empty(&self, cf_handle: impl rocksdb::AsColumnFamilyRef) -> bool {
// Empty column families return invalid iterators.
!self.forward_iterator(cf_handle).valid()
}
// Read methods are located in the ReadDisk trait

// Write methods
// Low-level write methods are located in the WriteDisk trait

/// Writes `batch` to the database.
pub fn write(&self, batch: DiskWriteBatch) -> Result<(), rocksdb::Error> {
Expand Down Expand Up @@ -475,6 +544,8 @@ impl DiskDb {
current_limit
}

// Cleanup methods

/// Shut down the database, cleaning up background tasks and ephemeral data.
///
/// If `force` is true, clean up regardless of any shared references.
Expand Down Expand Up @@ -605,7 +676,7 @@ impl DiskDb {
fn assert_default_cf_is_empty(&self) {
if let Some(default_cf) = self.cf_handle("default") {
assert!(
self.is_empty(default_cf),
self.zs_is_empty(&default_cf),
"Zebra should not store data in the 'default' column family"
);
}
Expand Down
13 changes: 12 additions & 1 deletion zebra-state/src/service/finalized_state/disk_db/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,18 @@
#![allow(dead_code)]

use crate::service::finalized_state::disk_db::DiskDb;
use std::ops::Deref;

use crate::service::finalized_state::disk_db::{DiskDb, DB};

// Enable older test code to automatically access the inner database via Deref coercion.
impl Deref for DiskDb {
type Target = DB;

fn deref(&self) -> &Self::Target {
&self.db
}
}

impl DiskDb {
/// Returns a list of column family names in this database.
Expand Down
Loading

0 comments on commit cb0ee0c

Please sign in to comment.