Skip to content

Commit

Permalink
Persist data columns to store (sigp#6255)
Browse files Browse the repository at this point in the history
* Persist data columns (from das PR sigp#5196)
  • Loading branch information
jimmygchen authored and AgeManning committed Sep 3, 2024
1 parent a82523a commit 63d1131
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 23 deletions.
19 changes: 9 additions & 10 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3661,16 +3661,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}

if let Some(_data_columns) = data_columns {
// TODO(das): depends on https://github.com/sigp/lighthouse/pull/6073
// if !data_columns.is_empty() {
// debug!(
// self.log, "Writing data_columns to store";
// "block_root" => %block_root,
// "count" => data_columns.len(),
// );
// ops.push(StoreOp::PutDataColumns(block_root, data_columns));
// }
if let Some(data_columns) = data_columns {
if !data_columns.is_empty() {
debug!(
self.log, "Writing data_columns to store";
"block_root" => %block_root,
"count" => data_columns.len(),
);
ops.push(StoreOp::PutDataColumns(block_root, data_columns));
}
}

let txn_lock = self.store.hot_db.begin_rw_transaction();
Expand Down
10 changes: 8 additions & 2 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,21 +108,27 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
let custody_column_count =
custody_subnet_count.saturating_mul(spec.data_columns_per_subnet());

let overflow_cache = DataAvailabilityCheckerInner::new(
let inner = DataAvailabilityCheckerInner::new(
OVERFLOW_LRU_CAPACITY,
store,
custody_column_count,
spec.clone(),
)?;
Ok(Self {
availability_cache: Arc::new(overflow_cache),
availability_cache: Arc::new(inner),
slot_clock,
kzg,
log: log.clone(),
spec,
})
}

pub fn get_custody_columns_count(&self) -> usize {
self.availability_cache
.custody_subnet_count()
.saturating_mul(self.spec.data_columns_per_subnet())
}

/// Checks if the block root is currenlty in the availability cache awaiting import because
/// of missing components.
pub fn get_execution_valid_block(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use crate::data_column_verification::KzgVerifiedCustodyDataColumn;
use crate::BeaconChainTypes;
use lru::LruCache;
use parking_lot::RwLock;
use ssz_derive::{Decode, Encode};
use ssz_types::{FixedVector, VariableList};
use std::num::NonZeroUsize;
use std::sync::Arc;
Expand All @@ -20,7 +19,7 @@ use types::{BlobSidecar, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock};
///
/// The blobs are all gossip and kzg verified.
/// The block has completed all verifications except the availability check.
#[derive(Encode, Decode, Clone)]
#[derive(Clone)]
pub struct PendingComponents<E: EthSpec> {
pub block_root: Hash256,
pub verified_blobs: FixedVector<Option<KzgVerifiedBlob<E>>, E::MaxBlobsPerBlock>,
Expand Down Expand Up @@ -303,6 +302,15 @@ impl<E: EthSpec> PendingComponents<E> {
});
}
}

if let Some(kzg_verified_data_column) = self.verified_data_columns.first() {
let epoch = kzg_verified_data_column
.as_data_column()
.slot()
.epoch(E::slots_per_epoch());
return Some(epoch);
}

None
})
}
Expand Down Expand Up @@ -336,6 +344,10 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
})
}

pub fn custody_subnet_count(&self) -> usize {
self.custody_column_count
}

/// Returns true if the block root is known, without altering the LRU ordering
pub fn get_execution_valid_block(
&self,
Expand Down
4 changes: 4 additions & 0 deletions beacon_node/beacon_chain/src/data_column_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,10 @@ impl<E: EthSpec> KzgVerifiedCustodyDataColumn<E> {
pub fn into_inner(self) -> Arc<DataColumnSidecar<E>> {
self.data
}

pub fn as_data_column(&self) -> &DataColumnSidecar<E> {
&self.data
}
}

/// Complete kzg verification for a `DataColumnSidecar`.
Expand Down
41 changes: 32 additions & 9 deletions beacon_node/beacon_chain/src/historical_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use state_processing::{
use std::borrow::Cow;
use std::iter;
use std::time::Duration;
use store::metadata::DataColumnInfo;
use store::{chunked_vector::BlockRoots, AnchorInfo, BlobInfo, ChunkWriter, KeyValueStore};
use types::{Hash256, Slot};

Expand Down Expand Up @@ -66,6 +67,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.get_anchor_info()
.ok_or(HistoricalBlockError::NoAnchorInfo)?;
let blob_info = self.store.get_blob_info();
let data_column_info = self.store.get_data_column_info();

// Take all blocks with slots less than the oldest block slot.
let num_relevant = blocks.partition_point(|available_block| {
Expand All @@ -90,18 +92,27 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
return Ok(0);
}

let n_blobs_lists_to_import = blocks_to_import
// Blobs are stored per block, and data columns are each stored individually
let n_blob_ops_per_block = if self.spec.is_peer_das_scheduled() {
self.data_availability_checker.get_custody_columns_count()
} else {
1
};

let blob_batch_size = blocks_to_import
.iter()
.filter(|available_block| available_block.blobs().is_some())
.count();
.count()
.saturating_mul(n_blob_ops_per_block);

let mut expected_block_root = anchor_info.oldest_block_parent;
let mut prev_block_slot = anchor_info.oldest_block_slot;
let mut chunk_writer =
ChunkWriter::<BlockRoots, _, _>::new(&self.store.cold_db, prev_block_slot.as_usize())?;
let mut new_oldest_blob_slot = blob_info.oldest_blob_slot;
let mut new_oldest_data_column_slot = data_column_info.oldest_data_column_slot;

let mut blob_batch = Vec::with_capacity(n_blobs_lists_to_import);
let mut blob_batch = Vec::with_capacity(blob_batch_size);
let mut cold_batch = Vec::with_capacity(blocks_to_import.len());
let mut hot_batch = Vec::with_capacity(blocks_to_import.len());
let mut signed_blocks = Vec::with_capacity(blocks_to_import.len());
Expand Down Expand Up @@ -129,11 +140,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.blobs_as_kv_store_ops(&block_root, blobs, &mut blob_batch);
}
// Store the data columns too
if let Some(_data_columns) = maybe_data_columns {
// TODO(das): depends on https://github.com/sigp/lighthouse/pull/6073
// new_oldest_data_column_slot = Some(block.slot());
// self.store
// .data_columns_as_kv_store_ops(&block_root, data_columns, &mut blob_batch);
if let Some(data_columns) = maybe_data_columns {
new_oldest_data_column_slot = Some(block.slot());
self.store
.data_columns_as_kv_store_ops(&block_root, data_columns, &mut blob_batch);
}

// Store block roots, including at all skip slots in the freezer DB.
Expand Down Expand Up @@ -212,7 +222,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.store.hot_db.do_atomically(hot_batch)?;
self.store.cold_db.do_atomically(cold_batch)?;

let mut anchor_and_blob_batch = Vec::with_capacity(2);
let mut anchor_and_blob_batch = Vec::with_capacity(3);

// Update the blob info.
if new_oldest_blob_slot != blob_info.oldest_blob_slot {
Expand All @@ -228,6 +238,19 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}

// Update the data column info.
if new_oldest_data_column_slot != data_column_info.oldest_data_column_slot {
if let Some(oldest_data_column_slot) = new_oldest_data_column_slot {
let new_data_column_info = DataColumnInfo {
oldest_data_column_slot: Some(oldest_data_column_slot),
};
anchor_and_blob_batch.push(
self.store
.compare_and_set_data_column_info(data_column_info, new_data_column_info)?,
);
}
}

// Update the anchor.
let new_anchor = AnchorInfo {
oldest_block_slot: prev_block_slot,
Expand Down

0 comments on commit 63d1131

Please sign in to comment.