Skip to content

Commit

Permalink
Fix block backfill with genesis skip slots
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelsproul committed Oct 10, 2023
1 parent c3321dd commit b6a78e2
Show file tree
Hide file tree
Showing 11 changed files with 205 additions and 101 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 6 additions & 7 deletions beacon_node/beacon_chain/src/historical_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
prev_block_slot = block.slot();
expected_block_root = block.message().parent_root();

// If we've reached genesis, add the genesis block root to the batch and set the
// anchor slot to 0 to indicate completion.
// If we've reached genesis, add the genesis block root to the batch for all slots
// between 0 and the first block slot, and set the anchor slot to 0 to indicate
// completion.
if expected_block_root == self.genesis_block_root {
let genesis_slot = self.spec.genesis_slot;
chunk_writer.set(
genesis_slot.as_usize(),
self.genesis_block_root,
&mut cold_batch,
)?;
for slot in genesis_slot.as_usize()..block.slot().as_usize() {
chunk_writer.set(slot, self.genesis_block_root, &mut cold_batch)?;
}
prev_block_slot = genesis_slot;
expected_block_root = Hash256::zero();
break;
Expand Down
11 changes: 7 additions & 4 deletions beacon_node/beacon_chain/src/otb_verification_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,13 @@ pub fn start_otb_verification_service<T: BeaconChainTypes>(
pub fn load_optimistic_transition_blocks<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
) -> Result<Vec<OptimisticTransitionBlock>, StoreError> {
process_results(chain.store.hot_db.iter_column(OTBColumn), |iter| {
iter.map(|(_, bytes)| OptimisticTransitionBlock::from_store_bytes(&bytes))
.collect()
})?
process_results(
chain.store.hot_db.iter_column::<Hash256>(OTBColumn),
|iter| {
iter.map(|(_, bytes)| OptimisticTransitionBlock::from_store_bytes(&bytes))
.collect()
},
)?
}

#[derive(Debug)]
Expand Down
12 changes: 12 additions & 0 deletions beacon_node/beacon_chain/tests/store_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2074,6 +2074,18 @@ async fn weak_subjectivity_sync_unaligned_unadvanced_checkpoint() {
weak_subjectivity_sync_test(slots, checkpoint_slot).await
}

// Regression test for https://github.com/sigp/lighthouse/issues/4817
// Skip 3 slots immediately after genesis, creating a gap between the genesis block and the first
// real block.
#[tokio::test]
async fn weak_subjectivity_sync_skips_at_genesis() {
let start_slot = 4;
let end_slot = E::slots_per_epoch() * 4;
let slots = (start_slot..end_slot).map(Slot::new).collect();
let checkpoint_slot = Slot::new(E::slots_per_epoch() * 2);
weak_subjectivity_sync_test(slots, checkpoint_slot).await
}

async fn weak_subjectivity_sync_test(slots: Vec<Slot>, checkpoint_slot: Slot) {
// Build an initial chain on one harness, representing a synced node with full history.
let num_final_blocks = E::slots_per_epoch() * 2;
Expand Down
2 changes: 2 additions & 0 deletions beacon_node/store/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ pub enum Error {
BlockReplayError(BlockReplayError),
AddPayloadLogicError,
SlotClockUnavailableForMigration,
InvalidKey,
InvalidBytes,
UnableToDowngrade,
InconsistentFork(InconsistentFork),
}
Expand Down
13 changes: 10 additions & 3 deletions beacon_node/store/src/hot_cold_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1251,10 +1251,17 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
let split_slot = self.get_split_slot();
let anchor = self.get_anchor_info();

// There are no restore points stored if the state upper limit lies in the hot database.
// It hasn't been reached yet, and may never be.
if anchor.map_or(false, |a| a.state_upper_limit >= split_slot) {
// There are no restore points stored if the state upper limit lies in the hot database,
// and the lower limit is zero. It hasn't been reached yet, and may never be.
if anchor.as_ref().map_or(false, |a| {
a.state_upper_limit >= split_slot && a.state_lower_limit == 0
}) {
None
} else if let Some(lower_limit) = anchor
.map(|a| a.state_lower_limit)
.filter(|limit| *limit > 0)
{
Some(lower_limit)
} else {
Some(
(split_slot - 1) / self.config.slots_per_restore_point
Expand Down
38 changes: 21 additions & 17 deletions beacon_node/store/src/leveldb_store.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use super::*;
use crate::hot_cold_store::HotColdDBError;
use crate::metrics;
use db_key::Key;
use leveldb::compaction::Compaction;
use leveldb::database::batch::{Batch, Writebatch};
use leveldb::database::kv::KV;
Expand Down Expand Up @@ -176,24 +175,21 @@ impl<E: EthSpec> KeyValueStore<E> for LevelDB<E> {
Ok(())
}

/// Iterate through all keys and values in a particular column.
fn iter_column(&self, column: DBColumn) -> ColumnIter {
let start_key =
BytesKey::from_vec(get_key_for_col(column.into(), Hash256::zero().as_bytes()));
fn iter_column_from<K: Key>(&self, column: DBColumn, from: &[u8]) -> ColumnIter<K> {
let start_key = BytesKey::from_vec(get_key_for_col(column.into(), from));

let iter = self.db.iter(self.read_options());
iter.seek(&start_key);

Box::new(
iter.take_while(move |(key, _)| key.matches_column(column))
.map(move |(bytes_key, value)| {
let key =
bytes_key
.remove_column(column)
.ok_or(HotColdDBError::IterationError {
unexpected_key: bytes_key,
})?;
Ok((key, value))
let key = bytes_key.remove_column_variable(column).ok_or_else(|| {
HotColdDBError::IterationError {
unexpected_key: bytes_key.clone(),
}
})?;
Ok((K::from_bytes(key)?, value))
}),
)
}
Expand Down Expand Up @@ -224,12 +220,12 @@ impl<E: EthSpec> KeyValueStore<E> for LevelDB<E> {
impl<E: EthSpec> ItemStore<E> for LevelDB<E> {}

/// Used for keying leveldb.
#[derive(Debug, PartialEq)]
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct BytesKey {
key: Vec<u8>,
}

impl Key for BytesKey {
impl db_key::Key for BytesKey {
fn from_u8(key: &[u8]) -> Self {
Self { key: key.to_vec() }
}
Expand All @@ -245,12 +241,20 @@ impl BytesKey {
self.key.starts_with(column.as_bytes())
}

/// Remove the column from a key, returning its `Hash256` portion.
/// Remove the column from a 32 byte key, yielding the `Hash256` key.
pub fn remove_column(&self, column: DBColumn) -> Option<Hash256> {
let key = self.remove_column_variable(column)?;
(column.key_size() == 32).then(|| Hash256::from_slice(key))
}

/// Remove the column from a key.
///
/// Will return `None` if the value doesn't match the column or has the wrong length.
pub fn remove_column_variable(&self, column: DBColumn) -> Option<&[u8]> {
if self.matches_column(column) {
let subkey = &self.key[column.as_bytes().len()..];
if subkey.len() == 32 {
return Some(Hash256::from_slice(subkey));
if subkey.len() == column.key_size() {
return Some(subkey);
}
}
None
Expand Down
59 changes: 52 additions & 7 deletions beacon_node/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use std::sync::Arc;
use strum::{EnumString, IntoStaticStr};
pub use types::*;

pub type ColumnIter<'a> = Box<dyn Iterator<Item = Result<(Hash256, Vec<u8>), Error>> + 'a>;
pub type ColumnIter<'a, K> = Box<dyn Iterator<Item = Result<(K, Vec<u8>), Error>> + 'a>;
pub type ColumnKeyIter<'a> = Box<dyn Iterator<Item = Result<Hash256, Error>> + 'a>;

pub trait KeyValueStore<E: EthSpec>: Sync + Send + Sized + 'static {
Expand Down Expand Up @@ -80,15 +80,33 @@ pub trait KeyValueStore<E: EthSpec>: Sync + Send + Sized + 'static {
fn compact(&self) -> Result<(), Error>;

/// Iterate through all keys and values in a particular column.
fn iter_column(&self, _column: DBColumn) -> ColumnIter {
// Default impl for non LevelDB databases
Box::new(std::iter::empty())
fn iter_column<K: Key>(&self, column: DBColumn) -> ColumnIter<K> {
self.iter_column_from(column, &vec![0; column.key_size()])
}

fn iter_column_from<K: Key>(&self, column: DBColumn, from: &[u8]) -> ColumnIter<K>;

/// Iterate through all keys in a particular column.
fn iter_column_keys(&self, _column: DBColumn) -> ColumnKeyIter {
// Default impl for non LevelDB databases
Box::new(std::iter::empty())
fn iter_column_keys(&self, column: DBColumn) -> ColumnKeyIter;
}

pub trait Key: Sized + 'static {
fn from_bytes(key: &[u8]) -> Result<Self, Error>;
}

impl Key for Hash256 {
fn from_bytes(key: &[u8]) -> Result<Self, Error> {
if key.len() == 32 {
Ok(Hash256::from_slice(key))
} else {
Err(Error::InvalidKey)
}
}
}

impl Key for Vec<u8> {
fn from_bytes(key: &[u8]) -> Result<Self, Error> {
Ok(key.to_vec())
}
}

Expand Down Expand Up @@ -230,6 +248,33 @@ impl DBColumn {
pub fn as_bytes(self) -> &'static [u8] {
self.as_str().as_bytes()
}

/// Most database keys are 32 bytes, but some freezer DB keys are 8 bytes.
///
/// This function returns the number of bytes used by keys in a given column.
pub fn key_size(self) -> usize {
match self {
Self::BeaconMeta
| Self::BeaconBlock
| Self::BeaconState
| Self::BeaconStateSummary
| Self::BeaconStateTemporary
| Self::ExecPayload
| Self::BeaconChain
| Self::OpPool
| Self::Eth1Cache
| Self::ForkChoice
| Self::PubkeyCache
| Self::BeaconRestorePoint
| Self::DhtEnrs
| Self::OptimisticTransitionBlock => 32,
Self::BeaconBlockRoots
| Self::BeaconStateRoots
| Self::BeaconHistoricalRoots
| Self::BeaconHistoricalSummaries
| Self::BeaconRandaoMixes => 8,
}
}
}

/// An item that may stored in a `Store` by serializing and deserializing from bytes.
Expand Down
Loading

0 comments on commit b6a78e2

Please sign in to comment.