Skip to content

Commit

Permalink
Fix bug in block root storage (sigp#4663)
Browse files Browse the repository at this point in the history
## Issue Addressed

Fix a bug in the storage of the linear block roots array in the freezer DB. Previously this array was always written as part of state storage (or block backfill). With state pruning enabled by sigp#4610, these states were no longer being written and as a result neither were the block roots.

The impact is quite low, we would just log an error when trying to forwards-iterate the block roots, which for validating nodes only happens when they try to look up blocks for peers:

> Aug 25 03:42:36.980 ERRO Missing chunk in forwards iterator      chunk index: 49726, service: freezer_db

Any node checkpoint synced off `unstable` is affected and has a corrupt database. If you see the log above, you need to re-sync with the fix. Nodes that haven't checkpoint synced recently should _not_ be corrupted, even if they ran the buggy version.

## Proposed Changes

- Use a `ChunkWriter` to write the block roots when states are not being stored.
- Tweak the usage of `get_latest_restore_point` so that it doesn't return a nonsense value when state pruning is enabled.
- Tweak the guarantee on the block roots array so that block roots are assumed available up to the split slot (exclusive). This is a bit nicer than relying on anything to do with the latest restore point, which is a nonsensical concept when there aren't any restore points.

## Additional Info

I'm looking forward to deleting the chunked vector code for good when we merge tree-states :grin:
  • Loading branch information
michaelsproul authored and jxs committed Aug 28, 2023
1 parent c9d8097 commit db9aeb7
Show file tree
Hide file tree
Showing 8 changed files with 144 additions and 42 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,21 @@ where
let (_, updated_builder) = self.set_genesis_state(genesis_state)?;
self = updated_builder;

// Fill in the linear block roots between the checkpoint block's slot and the aligned
// state's slot. All slots less than the block's slot will be handled by block backfill,
// while states greater or equal to the checkpoint state will be handled by `migrate_db`.
let block_root_batch = store
.store_frozen_block_root_at_skip_slots(
weak_subj_block.slot(),
weak_subj_state.slot(),
weak_subj_block_root,
)
.map_err(|e| format!("Error writing frozen block roots: {e:?}"))?;
store
.cold_db
.do_atomically(block_root_batch)
.map_err(|e| format!("Error writing frozen block roots: {e:?}"))?;

// Write the state and block non-atomically, it doesn't matter if they're forgotten
// about on a crash restart.
store
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/tests/store_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ async fn forwards_iter_block_and_state_roots_until() {

// The last restore point slot is the point at which the hybrid forwards iterator behaviour
// changes.
let last_restore_point_slot = store.get_latest_restore_point_slot();
let last_restore_point_slot = store.get_latest_restore_point_slot().unwrap();
assert!(last_restore_point_slot > 0);

let chain = &harness.chain;
Expand Down
8 changes: 4 additions & 4 deletions beacon_node/store/src/chunked_iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@ where
/// Create a new iterator which can yield elements from `start_vindex` up to the last
/// index stored by the restore point at `last_restore_point_slot`.
///
/// The `last_restore_point` slot should be the slot of a recent restore point as obtained from
/// `HotColdDB::get_latest_restore_point_slot`. We pass it as a parameter so that the caller can
/// The `freezer_upper_limit` slot should be the slot of a recent restore point as obtained from
/// `Root::freezer_upper_limit`. We pass it as a parameter so that the caller can
/// maintain a stable view of the database (see `HybridForwardsBlockRootsIterator`).
pub fn new(
store: &'a HotColdDB<E, Hot, Cold>,
start_vindex: usize,
last_restore_point_slot: Slot,
freezer_upper_limit: Slot,
spec: &ChainSpec,
) -> Self {
let (_, end_vindex) = F::start_and_end_vindex(last_restore_point_slot, spec);
let (_, end_vindex) = F::start_and_end_vindex(freezer_upper_limit, spec);

// Set the next chunk to the one containing `start_vindex`.
let next_cindex = start_vindex / F::chunk_size();
Expand Down
54 changes: 46 additions & 8 deletions beacon_node/store/src/forwards_iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ pub trait Root<E: EthSpec>: Field<E, Value = Hash256> {
end_state: BeaconState<E>,
end_root: Hash256,
) -> Result<SimpleForwardsIterator>;

/// The first slot for which this field is *no longer* stored in the freezer database.
///
/// If `None`, then this field is not stored in the freezer database at all due to pruning
/// configuration.
fn freezer_upper_limit<Hot: ItemStore<E>, Cold: ItemStore<E>>(
store: &HotColdDB<E, Hot, Cold>,
) -> Option<Slot>;
}

impl<E: EthSpec> Root<E> for BlockRoots {
Expand All @@ -39,6 +47,13 @@ impl<E: EthSpec> Root<E> for BlockRoots {
)?;
Ok(SimpleForwardsIterator { values })
}

fn freezer_upper_limit<Hot: ItemStore<E>, Cold: ItemStore<E>>(
store: &HotColdDB<E, Hot, Cold>,
) -> Option<Slot> {
// Block roots are stored for all slots up to the split slot (exclusive).
Some(store.get_split_slot())
}
}

impl<E: EthSpec> Root<E> for StateRoots {
Expand All @@ -59,6 +74,15 @@ impl<E: EthSpec> Root<E> for StateRoots {
)?;
Ok(SimpleForwardsIterator { values })
}

fn freezer_upper_limit<Hot: ItemStore<E>, Cold: ItemStore<E>>(
store: &HotColdDB<E, Hot, Cold>,
) -> Option<Slot> {
// State roots are stored for all slots up to the latest restore point (exclusive).
// There may not be a latest restore point if state pruning is enabled, in which
// case this function will return `None`.
store.get_latest_restore_point_slot()
}
}

/// Forwards root iterator that makes use of a flat field table in the freezer DB.
Expand Down Expand Up @@ -118,6 +142,7 @@ impl Iterator for SimpleForwardsIterator {
pub enum HybridForwardsIterator<'a, E: EthSpec, F: Root<E>, Hot: ItemStore<E>, Cold: ItemStore<E>> {
PreFinalization {
iter: Box<FrozenForwardsIterator<'a, E, F, Hot, Cold>>,
end_slot: Option<Slot>,
/// Data required by the `PostFinalization` iterator when we get to it.
continuation_data: Option<Box<(BeaconState<E>, Hash256)>>,
},
Expand All @@ -129,6 +154,7 @@ pub enum HybridForwardsIterator<'a, E: EthSpec, F: Root<E>, Hot: ItemStore<E>, C
PostFinalization {
iter: SimpleForwardsIterator,
},
Finished,
}

impl<'a, E: EthSpec, F: Root<E>, Hot: ItemStore<E>, Cold: ItemStore<E>>
Expand All @@ -138,8 +164,8 @@ impl<'a, E: EthSpec, F: Root<E>, Hot: ItemStore<E>, Cold: ItemStore<E>>
///
/// The `get_state` closure should return a beacon state and final block/state root to backtrack
/// from in the case where the iterated range does not lie entirely within the frozen portion of
/// the database. If an `end_slot` is provided and it is before the database's latest restore
/// point slot then the `get_state` closure will not be called at all.
/// the database. If an `end_slot` is provided and it is before the database's freezer upper
/// limit for the field then the `get_state` closure will not be called at all.
///
/// It is OK for `get_state` to hold a lock while this function is evaluated, as the returned
/// iterator is as lazy as possible and won't do any work apart from calling `get_state`.
Expand All @@ -155,27 +181,30 @@ impl<'a, E: EthSpec, F: Root<E>, Hot: ItemStore<E>, Cold: ItemStore<E>>
) -> Result<Self> {
use HybridForwardsIterator::*;

let latest_restore_point_slot = store.get_latest_restore_point_slot();
// First slot at which this field is *not* available in the freezer. i.e. all slots less
// than this slot have their data available in the freezer.
let freezer_upper_limit = F::freezer_upper_limit(store).unwrap_or(Slot::new(0));

let result = if start_slot < latest_restore_point_slot {
let result = if start_slot < freezer_upper_limit {
let iter = Box::new(FrozenForwardsIterator::new(
store,
start_slot,
latest_restore_point_slot,
freezer_upper_limit,
spec,
));

// No continuation data is needed if the forwards iterator plans to halt before
// `end_slot`. If it tries to continue further a `NoContinuationData` error will be
// returned.
let continuation_data =
if end_slot.map_or(false, |end_slot| end_slot < latest_restore_point_slot) {
if end_slot.map_or(false, |end_slot| end_slot < freezer_upper_limit) {
None
} else {
Some(Box::new(get_state()))
};
PreFinalization {
iter,
end_slot,
continuation_data,
}
} else {
Expand All @@ -195,6 +224,7 @@ impl<'a, E: EthSpec, F: Root<E>, Hot: ItemStore<E>, Cold: ItemStore<E>>
match self {
PreFinalization {
iter,
end_slot,
continuation_data,
} => {
match iter.next() {
Expand All @@ -203,10 +233,17 @@ impl<'a, E: EthSpec, F: Root<E>, Hot: ItemStore<E>, Cold: ItemStore<E>>
// to a post-finalization iterator beginning from the last slot
// of the pre iterator.
None => {
// If the iterator has an end slot (inclusive) which has already been
// covered by the (exclusive) frozen forwards iterator, then we're done!
let iter_end_slot = Slot::from(iter.inner.end_vindex);
if end_slot.map_or(false, |end_slot| iter_end_slot == end_slot + 1) {
*self = Finished;
return Ok(None);
}

let continuation_data = continuation_data.take();
let store = iter.inner.store;
let start_slot = Slot::from(iter.inner.end_vindex);

let start_slot = iter_end_slot;
*self = PostFinalizationLazy {
continuation_data,
store,
Expand All @@ -230,6 +267,7 @@ impl<'a, E: EthSpec, F: Root<E>, Hot: ItemStore<E>, Cold: ItemStore<E>>
self.do_next()
}
PostFinalization { iter } => iter.next().transpose(),
Finished => Ok(None),
}
}
}
Expand Down
82 changes: 65 additions & 17 deletions beacon_node/store/src/hot_cold_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::metadata::{
};
use crate::metrics;
use crate::{
get_key_for_col, DBColumn, DatabaseBlock, Error, ItemStore, KeyValueStoreOp,
get_key_for_col, ChunkWriter, DBColumn, DatabaseBlock, Error, ItemStore, KeyValueStoreOp,
PartialBeaconState, StoreItem, StoreOp,
};
use itertools::process_results;
Expand Down Expand Up @@ -963,6 +963,9 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
ops.push(op);

// 2. Store updated vector entries.
// Block roots need to be written here as well as by the `ChunkWriter` in `migrate_db`
// because states may require older block roots, and the writer only stores block roots
// between the previous split point and the new split point.
let db = &self.cold_db;
store_updated_vector(BlockRoots, db, state, &self.spec, ops)?;
store_updated_vector(StateRoots, db, state, &self.spec, ops)?;
Expand Down Expand Up @@ -1243,10 +1246,21 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
};
}

/// Fetch the slot of the most recently stored restore point.
pub fn get_latest_restore_point_slot(&self) -> Slot {
(self.get_split_slot() - 1) / self.config.slots_per_restore_point
* self.config.slots_per_restore_point
/// Fetch the slot of the most recently stored restore point (if any).
pub fn get_latest_restore_point_slot(&self) -> Option<Slot> {
let split_slot = self.get_split_slot();
let anchor = self.get_anchor_info();

// There are no restore points stored if the state upper limit lies in the hot database.
// It hasn't been reached yet, and may never be.
if anchor.map_or(false, |a| a.state_upper_limit >= split_slot) {
None
} else {
Some(
(split_slot - 1) / self.config.slots_per_restore_point
* self.config.slots_per_restore_point,
)
}
}

/// Load the database schema version from disk.
Expand Down Expand Up @@ -1585,6 +1599,25 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
)
}

/// Update the linear array of frozen block roots with the block root for several skipped slots.
///
/// Write the block root at all slots from `start_slot` (inclusive) to `end_slot` (exclusive).
pub fn store_frozen_block_root_at_skip_slots(
&self,
start_slot: Slot,
end_slot: Slot,
block_root: Hash256,
) -> Result<Vec<KeyValueStoreOp>, Error> {
let mut ops = vec![];
let mut block_root_writer =
ChunkWriter::<BlockRoots, _, _>::new(&self.cold_db, start_slot.as_usize())?;
for slot in start_slot.as_usize()..end_slot.as_usize() {
block_root_writer.set(slot, block_root, &mut ops)?;
}
block_root_writer.write(&mut ops)?;
Ok(ops)
}

/// Try to prune all execution payloads, returning early if there is no need to prune.
pub fn try_prune_execution_payloads(&self, force: bool) -> Result<(), Error> {
let split = self.get_split_info();
Expand Down Expand Up @@ -1725,7 +1758,14 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
return Err(HotColdDBError::FreezeSlotUnaligned(finalized_state.slot()).into());
}

let mut hot_db_ops: Vec<StoreOp<E>> = Vec::new();
let mut hot_db_ops = vec![];
let mut cold_db_ops = vec![];

// Chunk writer for the linear block roots in the freezer DB.
// Start at the new upper limit because we iterate backwards.
let new_frozen_block_root_upper_limit = finalized_state.slot().as_usize().saturating_sub(1);
let mut block_root_writer =
ChunkWriter::<BlockRoots, _, _>::new(&store.cold_db, new_frozen_block_root_upper_limit)?;

// 1. Copy all of the states between the new finalized state and the split slot, from the hot DB
// to the cold DB. Delete the execution payloads of these now-finalized blocks.
Expand All @@ -1750,6 +1790,9 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
// Delete the old summary, and the full state if we lie on an epoch boundary.
hot_db_ops.push(StoreOp::DeleteState(state_root, Some(slot)));

// Store the block root for this slot in the linear array of frozen block roots.
block_root_writer.set(slot.as_usize(), block_root, &mut cold_db_ops)?;

// Do not try to store states if a restore point is yet to be stored, or will never be
// stored (see `STATE_UPPER_LIMIT_NO_RETAIN`). Make an exception for the genesis state
// which always needs to be copied from the hot DB to the freezer and should not be deleted.
Expand All @@ -1759,29 +1802,34 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
.map_or(false, |anchor| slot < anchor.state_upper_limit)
{
debug!(store.log, "Pruning finalized state"; "slot" => slot);

continue;
}

let mut cold_db_ops: Vec<KeyValueStoreOp> = Vec::new();
// Store a pointer from this state root to its slot, so we can later reconstruct states
// from their state root alone.
let cold_state_summary = ColdStateSummary { slot };
let op = cold_state_summary.as_kv_store_op(state_root);
cold_db_ops.push(op);

if slot % store.config.slots_per_restore_point == 0 {
let state: BeaconState<E> = get_full_state(&store.hot_db, &state_root, &store.spec)?
.ok_or(HotColdDBError::MissingStateToFreeze(state_root))?;

store.store_cold_state(&state_root, &state, &mut cold_db_ops)?;
}

// Store a pointer from this state root to its slot, so we can later reconstruct states
// from their state root alone.
let cold_state_summary = ColdStateSummary { slot };
let op = cold_state_summary.as_kv_store_op(state_root);
cold_db_ops.push(op);

// There are data dependencies between calls to `store_cold_state()` that prevent us from
// doing one big call to `store.cold_db.do_atomically()` at end of the loop.
store.cold_db.do_atomically(cold_db_ops)?;
// Commit the batch of cold DB ops whenever a full state is written. Each state stored
// may read the linear fields of previous states stored.
store
.cold_db
.do_atomically(std::mem::take(&mut cold_db_ops))?;
}
}

// Finish writing the block roots and commit the remaining cold DB ops.
block_root_writer.write(&mut cold_db_ops)?;
store.cold_db.do_atomically(cold_db_ops)?;

// Warning: Critical section. We have to take care not to put any of the two databases in an
// inconsistent state if the OS process dies at any point during the freezeing
// procedure.
Expand Down
1 change: 1 addition & 0 deletions watch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,4 @@ network = { path = "../beacon_node/network" }
testcontainers = { git = "https://github.com/testcontainers/testcontainers-rs/", rev = "0f2c9851" }
unused_port = { path = "../common/unused_port" }
task_executor = { path = "../common/task_executor" }
logging = { path = "../common/logging" }
23 changes: 11 additions & 12 deletions watch/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,21 @@ use beacon_chain::{
};
use eth2::{types::BlockId, BeaconNodeHttpClient, SensitiveUrl, Timeouts};
use http_api::test_utils::{create_api_server, ApiServer};
use log::error;
use logging::test_logger;
use network::NetworkReceivers;

use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
use std::collections::HashMap;
use std::env;
use std::net::SocketAddr;
use std::time::Duration;
use testcontainers::{clients::Cli, core::WaitFor, Image, RunnableImage};
use tokio::sync::oneshot;
use tokio::{runtime, task::JoinHandle};
use tokio_postgres::{config::Config as PostgresConfig, Client, NoTls};
use types::{Hash256, MainnetEthSpec, Slot};
use unused_port::unused_tcp4_port;
use url::Url;
use watch::{
client::WatchHttpClient,
Expand All @@ -22,17 +31,6 @@ use watch::{
updater::{handler::*, run_updater, Config as UpdaterConfig, WatchSpec},
};

use log::error;
use std::collections::HashMap;
use std::env;
use std::net::SocketAddr;
use std::time::Duration;
use tokio::{runtime, task::JoinHandle};
use tokio_postgres::{config::Config as PostgresConfig, Client, NoTls};
use unused_port::unused_tcp4_port;

use testcontainers::{clients::Cli, core::WaitFor, Image, RunnableImage};

#[derive(Debug)]
pub struct Postgres(HashMap<String, String>);

Expand Down Expand Up @@ -132,6 +130,7 @@ impl TesterBuilder {
reconstruct_historic_states: true,
..ChainConfig::default()
})
.logger(test_logger())
.deterministic_keypairs(VALIDATOR_COUNT)
.fresh_ephemeral_store()
.build();
Expand Down

0 comments on commit db9aeb7

Please sign in to comment.