Skip to content

Commit

Permalink
Add basic implementation of serving RPC data column from DA checker.
Browse files Browse the repository at this point in the history
  • Loading branch information
jimmygchen committed Feb 8, 2024
1 parent f14e9de commit fbfacc4
Show file tree
Hide file tree
Showing 14 changed files with 625 additions and 70 deletions.
58 changes: 42 additions & 16 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ use crate::attester_cache::{AttesterCache, AttesterCacheKey};
use crate::beacon_block_streamer::{BeaconBlockStreamer, CheckEarlyAttesterCache};
use crate::beacon_proposer_cache::compute_proposer_duties_from_head;
use crate::beacon_proposer_cache::BeaconProposerCache;
use crate::blob_verification::{
GossipBlobError, GossipVerifiedBlob, GossipVerifiedDataColumnSidecar,
};
use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob};
use crate::block_times_cache::BlockTimesCache;
use crate::block_verification::POS_PANDA_BANNER;
use crate::block_verification::{
Expand All @@ -25,6 +23,7 @@ use crate::chain_config::ChainConfig;
use crate::data_availability_checker::{
Availability, AvailabilityCheckError, AvailableBlock, DataAvailabilityChecker,
};
use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn};
use crate::early_attester_cache::EarlyAttesterCache;
use crate::errors::{BeaconChainError as Error, BlockProductionError};
use crate::eth1_chain::{Eth1Chain, Eth1ChainBackend};
Expand Down Expand Up @@ -2092,10 +2091,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self: &Arc<Self>,
data_column_sidecar: Arc<DataColumnSidecar<T::EthSpec>>,
subnet_id: u64,
) -> Result<GossipVerifiedDataColumnSidecar<T>, GossipBlobError<T::EthSpec>> {
) -> Result<GossipVerifiedDataColumn<T>, GossipDataColumnError<T::EthSpec>> {
metrics::inc_counter(&metrics::BLOBS_COLUMN_SIDECAR_PROCESSING_REQUESTS);
let _timer = metrics::start_timer(&metrics::DATA_COLUMN_SIDECAR_GOSSIP_VERIFICATION_TIMES);
GossipVerifiedDataColumnSidecar::new(data_column_sidecar, subnet_id, self).map(|v| {
GossipVerifiedDataColumn::new(data_column_sidecar, subnet_id, self).map(|v| {
metrics::inc_counter(&metrics::DATA_COLUMNS_SIDECAR_PROCESSING_SUCCESSES);
v
})
Expand Down Expand Up @@ -2916,18 +2915,28 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.remove_notified(&block_root, r)
}

pub fn process_gossip_data_column(
/// Cache the data column in the processing cache, process it, then evict it from the cache if it was
/// imported or errors.
pub async fn process_gossip_data_column(
self: &Arc<Self>,
gossip_verified_data_column: GossipVerifiedDataColumnSidecar<T>,
) {
let data_column = gossip_verified_data_column.as_data_column();
// TODO(das) send to DA checker
info!(
self.log,
"Processed gossip data column";
"index" => data_column.index,
"slot" => data_column.slot().as_u64()
);
data_column: GossipVerifiedDataColumn<T>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
let block_root = data_column.block_root();

// If this block has already been imported to forkchoice it must have been available, so
// we don't need to process its samples again.
if self
.canonical_head
.fork_choice_read_lock()
.contains_block(&block_root)
{
return Err(BlockError::BlockIsAlreadyKnown);
}

let r = self
.check_gossip_data_column_availability_and_import(data_column)
.await;
self.remove_notified(&block_root, r)
}

/// Cache the blobs in the processing cache, process it, then evict it from the cache if it was
Expand Down Expand Up @@ -3212,6 +3221,23 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.process_availability(slot, availability).await
}

/// Checks if the provided data column can make any cached blocks available, and imports immediately
/// if so, otherwise caches the blob in the data availability checker.
async fn check_gossip_data_column_availability_and_import(
self: &Arc<Self>,
data_column: GossipVerifiedDataColumn<T>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
let slot = data_column.slot();
if let Some(slasher) = self.slasher.as_ref() {
slasher.accept_block_header(data_column.signed_block_header());
}
let availability = self
.data_availability_checker
.put_gossip_data_column(data_column)?;

self.process_availability(slot, availability).await
}

/// Checks if the provided blobs can make any cached blocks available, and imports immediately
/// if so, otherwise caches the blob in the data availability checker.
async fn check_rpc_blob_availability_and_import(
Expand Down
41 changes: 1 addition & 40 deletions beacon_node/beacon_chain/src/blob_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ use ssz_types::VariableList;
use tree_hash::TreeHash;
use types::blob_sidecar::BlobIdentifier;
use types::{
BeaconStateError, BlobSidecar, CloneConfig, DataColumnSidecar, EthSpec, Hash256,
SignedBeaconBlockHeader, Slot,
BeaconStateError, BlobSidecar, CloneConfig, EthSpec, Hash256, SignedBeaconBlockHeader, Slot,
};

/// An error occurred while validating a gossip blob.
Expand Down Expand Up @@ -185,33 +184,6 @@ pub type GossipVerifiedBlobList<T> = VariableList<
<<T as BeaconChainTypes>::EthSpec as EthSpec>::MaxBlobsPerBlock,
>;

#[derive(Debug)]
pub struct GossipVerifiedDataColumnSidecar<T: BeaconChainTypes> {
data_column_sidecar: Arc<DataColumnSidecar<T::EthSpec>>,
}

impl<T: BeaconChainTypes> GossipVerifiedDataColumnSidecar<T> {
pub fn new(
column_sidecar: Arc<DataColumnSidecar<T::EthSpec>>,
subnet_id: u64,
chain: &BeaconChain<T>,
) -> Result<Self, GossipBlobError<T::EthSpec>> {
let header = column_sidecar.signed_block_header.clone();
// We only process slashing info if the gossip verification failed
// since we do not process the blob any further in that case.
validate_data_column_sidecar_for_gossip(column_sidecar, subnet_id, chain).map_err(|e| {
process_block_slash_info::<_, GossipBlobError<T::EthSpec>>(
chain,
BlockSlashInfo::from_early_error_blob(header, e),
)
})
}

pub fn as_data_column(&self) -> &Arc<DataColumnSidecar<T::EthSpec>> {
&self.data_column_sidecar
}
}

/// A wrapper around a `BlobSidecar` that indicates it has been approved for re-gossiping on
/// the p2p network.
#[derive(Debug)]
Expand Down Expand Up @@ -675,17 +647,6 @@ pub fn validate_blob_sidecar_for_gossip<T: BeaconChainTypes>(
})
}

pub fn validate_data_column_sidecar_for_gossip<T: BeaconChainTypes>(
data_column_sidecar: Arc<DataColumnSidecar<T::EthSpec>>,
_subnet: u64,
_chain: &BeaconChain<T>,
) -> Result<GossipVerifiedDataColumnSidecar<T>, GossipBlobError<T::EthSpec>> {
// TODO(das): validate kzg commitments, cell proofs etc
Ok(GossipVerifiedDataColumnSidecar {
data_column_sidecar: data_column_sidecar.clone(),
})
}

/// Returns the canonical root of the given `blob`.
///
/// Use this function to ensure that we report the blob hashing time Prometheus metric.
Expand Down
32 changes: 32 additions & 0 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use crate::block_verification_types::{
AsBlock, BlockContentsError, BlockImportData, GossipVerifiedBlockContents, RpcBlock,
};
use crate::data_availability_checker::{AvailabilityCheckError, MaybeAvailableBlock};
use crate::data_column_verification::GossipDataColumnError;
use crate::eth1_finalization_cache::Eth1FinalizationData;
use crate::execution_payload::{
is_optimistic_candidate_block, validate_execution_payload_for_gossip, validate_merge_block,
Expand Down Expand Up @@ -528,6 +529,20 @@ impl<E: EthSpec> BlockSlashInfo<GossipBlobError<E>> {
}
}

impl<E: EthSpec> BlockSlashInfo<GossipDataColumnError<E>> {
pub fn from_early_error_data_column(
header: SignedBeaconBlockHeader,
e: GossipDataColumnError<E>,
) -> Self {
match e {
GossipDataColumnError::ProposalSignatureInvalid => BlockSlashInfo::SignatureInvalid(e),
// `InvalidSignature` could indicate any signature in the block, so we want
// to recheck the proposer signature alone.
_ => BlockSlashInfo::SignatureNotChecked(header, e),
}
}
}

/// Process invalid blocks to see if they are suitable for the slasher.
///
/// If no slasher is configured, this is a no-op.
Expand Down Expand Up @@ -1985,6 +2000,23 @@ impl<E: EthSpec> BlockBlobError for GossipBlobError<E> {
}
}

impl<E: EthSpec> BlockBlobError for GossipDataColumnError<E> {
fn not_later_than_parent_error(data_column_slot: Slot, parent_slot: Slot) -> Self {
GossipDataColumnError::DataColumnIsNotLaterThanParent {
data_column_slot,
parent_slot,
}
}

fn unknown_validator_error(validator_index: u64) -> Self {
GossipDataColumnError::UnknownValidator(validator_index)
}

fn proposer_signature_invalid() -> Self {
GossipDataColumnError::ProposalSignatureInvalid
}
}

/// Performs a cheap (time-efficient) state advancement so the committees and proposer shuffling for
/// `slot` can be obtained from `state`.
///
Expand Down
29 changes: 28 additions & 1 deletion beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ use std::sync::Arc;
use task_executor::TaskExecutor;
use types::beacon_block_body::{KzgCommitmentOpts, KzgCommitments};
use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList};
use types::{BlobSidecarList, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot};
use types::{
BlobSidecarList, ChainSpec, DataColumnSidecar, Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot,
};

mod availability_view;
mod child_components;
Expand All @@ -31,7 +33,9 @@ mod overflow_lru_cache;
mod processing_cache;
mod state_lru_cache;

use crate::data_column_verification::GossipVerifiedDataColumn;
pub use error::{Error as AvailabilityCheckError, ErrorCategory as AvailabilityCheckErrorCategory};
use types::data_column_sidecar::DataColumnIdentifier;
use types::non_zero_usize::new_non_zero_usize;

/// The LRU Cache stores `PendingComponents` which can store up to
Expand Down Expand Up @@ -192,6 +196,14 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
self.availability_cache.peek_blob(blob_id)
}

/// Get a data column from the availability cache.
pub fn get_data_column(
&self,
data_column_id: &DataColumnIdentifier,
) -> Result<Option<Arc<DataColumnSidecar<T::EthSpec>>>, AvailabilityCheckError> {
self.availability_cache.peek_data_column(data_column_id)
}

/// Put a list of blobs received via RPC into the availability cache. This performs KZG
/// verification on the blobs in the list.
pub fn put_rpc_blobs(
Expand Down Expand Up @@ -223,6 +235,21 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
.put_kzg_verified_blobs(gossip_blob.block_root(), vec![gossip_blob.into_inner()])
}

/// Check if we've cached other data columns for this block. If it completes a set and we also
/// have a block cached, return the `Availability` variant triggering block import.
/// Otherwise cache the data column sidecar.
///
/// This should only accept gossip verified data columns, so we should not have to worry about dupes.
pub fn put_gossip_data_column(
&self,
gossip_data_column: GossipVerifiedDataColumn<T>,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
self.availability_cache.put_kzg_verified_data_columns(
gossip_data_column.block_root(),
vec![gossip_data_column.into_inner()],
)
}

/// Check if we have all the blobs for a block. Returns `Availability` which has information
/// about whether all components have been received or more are required.
pub fn put_pending_executed_block(
Expand Down
Loading

0 comments on commit fbfacc4

Please sign in to comment.