From 0ef9a1a2ed189b0964331bfaad8befa8dae7d19d Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Tue, 23 Jul 2024 00:04:59 +1000 Subject: [PATCH 1/7] Add `DataColumnSidecar` gossip topic and verification (#5050 and #5783). --- Cargo.lock | 1 + beacon_node/beacon_chain/src/beacon_chain.rs | 93 +++- .../beacon_chain/src/block_verification.rs | 39 ++ beacon_node/beacon_chain/src/builder.rs | 4 +- .../src/data_availability_checker.rs | 9 + .../src/data_column_verification.rs | 496 ++++++++++++++++++ beacon_node/beacon_chain/src/errors.rs | 2 +- beacon_node/beacon_chain/src/lib.rs | 3 +- beacon_node/beacon_chain/src/metrics.rs | 12 + .../src/observed_data_sidecars.rs | 474 +++++++++++++++++ beacon_node/beacon_processor/src/lib.rs | 25 +- beacon_node/beacon_processor/src/metrics.rs | 5 + beacon_node/lighthouse_network/Cargo.toml | 1 + .../lighthouse_network/src/discovery/mod.rs | 15 +- .../src/discovery/subnet_predicate.rs | 22 +- .../src/peer_manager/mod.rs | 4 + .../src/peer_manager/peerdb/peer_info.rs | 9 + .../src/service/gossip_cache.rs | 1 + .../lighthouse_network/src/service/mod.rs | 3 +- .../lighthouse_network/src/types/pubsub.rs | 61 ++- .../lighthouse_network/src/types/subnet.rs | 4 +- .../lighthouse_network/src/types/topics.rs | 17 +- beacon_node/network/src/metrics.rs | 20 + .../gossip_methods.rs | 221 +++++++- .../src/network_beacon_processor/mod.rs | 30 ++ beacon_node/network/src/router.rs | 14 + beacon_node/network/src/sync/manager.rs | 9 +- consensus/types/src/data_column_subnet_id.rs | 198 +++++++ consensus/types/src/lib.rs | 5 + lcli/src/generate_bootnode_enr.rs | 1 + 30 files changed, 1769 insertions(+), 29 deletions(-) create mode 100644 beacon_node/beacon_chain/src/data_column_verification.rs create mode 100644 beacon_node/beacon_chain/src/observed_data_sidecars.rs create mode 100644 consensus/types/src/data_column_subnet_id.rs diff --git a/Cargo.lock b/Cargo.lock index 4bea9618af4..e49a900304a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4968,6 +4968,7 @@ dependencies = [ "futures", "gossipsub", "hex", + "itertools", "lazy_static", "libp2p", "libp2p-mplex", diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 19ee3d116c1..b188ef5f222 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -23,6 +23,7 @@ use crate::chain_config::ChainConfig; use crate::data_availability_checker::{ Availability, AvailabilityCheckError, AvailableBlock, DataAvailabilityChecker, }; +use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn}; use crate::early_attester_cache::EarlyAttesterCache; use crate::errors::{BeaconChainError as Error, BlockProductionError}; use crate::eth1_chain::{Eth1Chain, Eth1ChainBackend}; @@ -51,8 +52,8 @@ use crate::observed_aggregates::{ use crate::observed_attesters::{ ObservedAggregators, ObservedAttesters, ObservedSyncAggregators, ObservedSyncContributors, }; -use crate::observed_blob_sidecars::ObservedBlobSidecars; use crate::observed_block_producers::ObservedBlockProducers; +use crate::observed_data_sidecars::ObservedDataSidecars; use crate::observed_operations::{ObservationOutcome, ObservedOperations}; use crate::observed_slashable::ObservedSlashable; use crate::persisted_beacon_chain::{PersistedBeaconChain, DUMMY_CANONICAL_HEAD_BLOCK_ROOT}; @@ -426,7 +427,9 @@ pub struct BeaconChain { /// Maintains a record of which validators have proposed blocks for each slot. pub observed_block_producers: RwLock>, /// Maintains a record of blob sidecars seen over the gossip network. - pub observed_blob_sidecars: RwLock>, + pub observed_blob_sidecars: RwLock>>, + /// Maintains a record of column sidecars seen over the gossip network. + pub observed_column_sidecars: RwLock>>, /// Maintains a record of slashable message seen over the gossip network or RPC. pub observed_slashable: RwLock>, /// Maintains a record of which validators have submitted voluntary exits. @@ -2118,6 +2121,19 @@ impl BeaconChain { }) } + pub fn verify_data_column_sidecar_for_gossip( + self: &Arc, + data_column_sidecar: Arc>, + subnet_id: u64, + ) -> Result, GossipDataColumnError> { + metrics::inc_counter(&metrics::BLOBS_COLUMN_SIDECAR_PROCESSING_REQUESTS); + let _timer = metrics::start_timer(&metrics::DATA_COLUMN_SIDECAR_GOSSIP_VERIFICATION_TIMES); + GossipVerifiedDataColumn::new(data_column_sidecar, subnet_id, self).map(|v| { + metrics::inc_counter(&metrics::DATA_COLUMNS_SIDECAR_PROCESSING_SUCCESSES); + v + }) + } + pub fn verify_blob_sidecar_for_gossip( self: &Arc, blob_sidecar: Arc>, @@ -2964,6 +2980,39 @@ impl BeaconChain { self.remove_notified(&block_root, r) } + /// Cache the data columns in the processing cache, process it, then evict it from the cache if it was + /// imported or errors. + pub async fn process_gossip_data_columns( + self: &Arc, + data_columns: Vec>, + ) -> Result> { + let Ok(block_root) = data_columns + .iter() + .map(|c| c.block_root()) + .unique() + .exactly_one() + else { + return Err(BlockError::InternalError( + "Columns should be from the same block".to_string(), + )); + }; + + // If this block has already been imported to forkchoice it must have been available, so + // we don't need to process its samples again. + if self + .canonical_head + .fork_choice_read_lock() + .contains_block(&block_root) + { + return Err(BlockError::BlockIsAlreadyKnown(block_root)); + } + + let r = self + .check_gossip_data_columns_availability_and_import(data_columns) + .await; + self.remove_notified_custody_columns(&block_root, r) + } + /// Cache the blobs in the processing cache, process it, then evict it from the cache if it was /// imported or errors. pub async fn process_rpc_blobs( @@ -3013,6 +3062,21 @@ impl BeaconChain { r } + /// Remove any block components from the *processing cache* if we no longer require them. If the + /// block was imported full or erred, we no longer require them. + fn remove_notified_custody_columns( + &self, + block_root: &Hash256, + r: Result>, + ) -> Result> { + let has_missing_components = + matches!(r, Ok(AvailabilityProcessingStatus::MissingComponents(_, _))); + if !has_missing_components { + self.reqresp_pre_import_cache.write().remove(block_root); + } + r + } + /// Wraps `process_block` in logic to cache the block's commitments in the processing cache /// and evict if the block was imported or errored. pub async fn process_block_with_early_caching>( @@ -3257,6 +3321,31 @@ impl BeaconChain { self.process_availability(slot, availability).await } + /// Checks if the provided data column can make any cached blocks available, and imports immediately + /// if so, otherwise caches the data column in the data availability checker. + async fn check_gossip_data_columns_availability_and_import( + self: &Arc, + data_columns: Vec>, + ) -> Result> { + if let Some(slasher) = self.slasher.as_ref() { + for data_colum in &data_columns { + slasher.accept_block_header(data_colum.signed_block_header()); + } + } + + let Ok(slot) = data_columns.iter().map(|c| c.slot()).unique().exactly_one() else { + return Err(BlockError::InternalError( + "Columns for the same block should have matching slot".to_string(), + )); + }; + + let availability = self + .data_availability_checker + .put_gossip_data_columns(data_columns)?; + + self.process_availability(slot, availability).await + } + /// Checks if the provided blobs can make any cached blocks available, and imports immediately /// if so, otherwise caches the blob in the data availability checker. async fn check_rpc_blob_availability_and_import( diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index d906518ff5a..2ac26607228 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -54,6 +54,7 @@ use crate::block_verification_types::{ AsBlock, BlockContentsError, BlockImportData, GossipVerifiedBlockContents, RpcBlock, }; use crate::data_availability_checker::{AvailabilityCheckError, MaybeAvailableBlock}; +use crate::data_column_verification::GossipDataColumnError; use crate::eth1_finalization_cache::Eth1FinalizationData; use crate::execution_payload::{ is_optimistic_candidate_block, validate_execution_payload_for_gossip, validate_merge_block, @@ -303,6 +304,13 @@ pub enum BlockError { /// TODO: We may need to penalize the peer that gave us a potentially invalid rpc blob. /// https://github.com/sigp/lighthouse/issues/4546 AvailabilityCheck(AvailabilityCheckError), + /// An internal error has occurred when processing the block or sidecars. + /// + /// ## Peer scoring + /// + /// We were unable to process this block due to an internal error. It's unclear if the block is + /// valid. + InternalError(String), } impl From for BlockError { @@ -523,6 +531,20 @@ impl BlockSlashInfo> { } } +impl BlockSlashInfo> { + pub fn from_early_error_data_column( + header: SignedBeaconBlockHeader, + e: GossipDataColumnError, + ) -> Self { + match e { + GossipDataColumnError::ProposalSignatureInvalid => BlockSlashInfo::SignatureInvalid(e), + // `InvalidSignature` could indicate any signature in the block, so we want + // to recheck the proposer signature alone. + _ => BlockSlashInfo::SignatureNotChecked(header, e), + } + } +} + /// Process invalid blocks to see if they are suitable for the slasher. /// /// If no slasher is configured, this is a no-op. @@ -2007,6 +2029,23 @@ impl BlockBlobError for GossipBlobError { } } +impl BlockBlobError for GossipDataColumnError { + fn not_later_than_parent_error(data_column_slot: Slot, parent_slot: Slot) -> Self { + GossipDataColumnError::IsNotLaterThanParent { + data_column_slot, + parent_slot, + } + } + + fn unknown_validator_error(validator_index: u64) -> Self { + GossipDataColumnError::UnknownValidator(validator_index) + } + + fn proposer_signature_invalid() -> Self { + GossipDataColumnError::ProposalSignatureInvalid + } +} + /// Performs a cheap (time-efficient) state advancement so the committees and proposer shuffling for /// `slot` can be obtained from `state`. /// diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 7217f2c640f..637538b6f3e 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -11,6 +11,7 @@ use crate::graffiti_calculator::{GraffitiCalculator, GraffitiOrigin}; use crate::head_tracker::HeadTracker; use crate::light_client_server_cache::LightClientServerCache; use crate::migrate::{BackgroundMigrator, MigratorConfig}; +use crate::observed_data_sidecars::ObservedDataSidecars; use crate::persisted_beacon_chain::PersistedBeaconChain; use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache}; use crate::timeout_rw_lock::TimeoutRwLock; @@ -918,7 +919,8 @@ where observed_sync_aggregators: <_>::default(), // TODO: allow for persisting and loading the pool from disk. observed_block_producers: <_>::default(), - observed_blob_sidecars: <_>::default(), + observed_column_sidecars: RwLock::new(ObservedDataSidecars::new(self.spec.clone())), + observed_blob_sidecars: RwLock::new(ObservedDataSidecars::new(self.spec.clone())), observed_slashable: <_>::default(), observed_voluntary_exits: <_>::default(), observed_proposer_slashings: <_>::default(), diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 2431769ddb0..e6bcb8bc384 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -20,6 +20,7 @@ mod error; mod overflow_lru_cache; mod state_lru_cache; +use crate::data_column_verification::GossipVerifiedDataColumn; pub use error::{Error as AvailabilityCheckError, ErrorCategory as AvailabilityCheckErrorCategory}; use types::non_zero_usize::new_non_zero_usize; @@ -175,6 +176,14 @@ impl DataAvailabilityChecker { .put_kzg_verified_blobs(gossip_blob.block_root(), vec![gossip_blob.into_inner()]) } + pub fn put_gossip_data_columns( + &self, + _gossip_data_columns: Vec>, + ) -> Result, AvailabilityCheckError> { + // TODO(das) to be implemented + unimplemented!("not implemented") + } + /// Check if we have all the blobs for a block. Returns `Availability` which has information /// about whether all components have been received or more are required. pub fn put_pending_executed_block( diff --git a/beacon_node/beacon_chain/src/data_column_verification.rs b/beacon_node/beacon_chain/src/data_column_verification.rs new file mode 100644 index 00000000000..d2ae1deeef5 --- /dev/null +++ b/beacon_node/beacon_chain/src/data_column_verification.rs @@ -0,0 +1,496 @@ +use crate::block_verification::{ + cheap_state_advance_to_obtain_committees, get_validator_pubkey_cache, process_block_slash_info, + BlockSlashInfo, +}; +use crate::{BeaconChain, BeaconChainError, BeaconChainTypes}; +use derivative::Derivative; +use fork_choice::ProtoBlock; +use kzg::{Error as KzgError, Kzg}; +use proto_array::Block; +use slasher::test_utils::E; +use slog::debug; +use slot_clock::SlotClock; +use ssz_derive::{Decode, Encode}; +use std::sync::Arc; +use types::data_column_sidecar::{ColumnIndex, DataColumnIdentifier}; +use types::{ + BeaconStateError, ChainSpec, DataColumnSidecar, DataColumnSubnetId, EthSpec, Hash256, + RuntimeVariableList, SignedBeaconBlockHeader, Slot, +}; + +/// An error occurred while validating a gossip data column. +#[derive(Debug)] +pub enum GossipDataColumnError { + /// There was an error whilst processing the data column. It is not known if it is + /// valid or invalid. + /// + /// ## Peer scoring + /// + /// We were unable to process this data column due to an internal error. It's + /// unclear if the data column is valid. + BeaconChainError(BeaconChainError), + /// The proposal signature in invalid. + /// + /// ## Peer scoring + /// + /// The data column is invalid and the peer is faulty. + ProposalSignatureInvalid, + /// The proposal_index corresponding to data column.beacon_block_root is not known. + /// + /// ## Peer scoring + /// + /// The data column is invalid and the peer is faulty. + UnknownValidator(u64), + /// The provided data column is not from a later slot than its parent. + /// + /// ## Peer scoring + /// + /// The data column is invalid and the peer is faulty. + IsNotLaterThanParent { + data_column_slot: Slot, + parent_slot: Slot, + }, + /// `Kzg` struct hasn't been initialized. This is an internal error. + /// + /// ## Peer scoring + /// + /// The peer isn't faulty, This is an internal error. + KzgNotInitialized, + /// The kzg verification failed. + /// + /// ## Peer scoring + /// + /// The data column sidecar is invalid and the peer is faulty. + InvalidKzgProof(kzg::Error), + /// The column was gossiped over an incorrect subnet. + /// + /// ## Peer scoring + /// + /// The column is invalid or the peer is faulty. + InvalidSubnetId { received: u64, expected: u64 }, + /// The column sidecar is from a slot that is later than the current slot (with respect to the + /// gossip clock disparity). + /// + /// ## Peer scoring + /// + /// Assuming the local clock is correct, the peer has sent an invalid message. + FutureSlot { + message_slot: Slot, + latest_permissible_slot: Slot, + }, + /// The sidecar corresponds to a slot older than the finalized head slot. + /// + /// ## Peer scoring + /// + /// It's unclear if this column is valid, but this column is for a finalized slot and is + /// therefore useless to us. + PastFinalizedSlot { + column_slot: Slot, + finalized_slot: Slot, + }, + /// The pubkey cache timed out. + /// + /// ## Peer scoring + /// + /// The column sidecar may be valid, this is an internal error. + PubkeyCacheTimeout, + /// The proposer index specified in the sidecar does not match the locally computed + /// proposer index. + /// + /// ## Peer scoring + /// + /// The column is invalid and the peer is faulty. + ProposerIndexMismatch { sidecar: usize, local: usize }, + /// The provided columns's parent block is unknown. + /// + /// ## Peer scoring + /// + /// We cannot process the columns without validating its parent, the peer isn't necessarily faulty. + ParentUnknown(Arc>), + /// The column conflicts with finalization, no need to propagate. + /// + /// ## Peer scoring + /// + /// It's unclear if this column is valid, but it conflicts with finality and shouldn't be + /// imported. + NotFinalizedDescendant { block_parent_root: Hash256 }, + /// Invalid kzg commitment inclusion proof + /// + /// ## Peer scoring + /// + /// The column sidecar is invalid and the peer is faulty + InvalidInclusionProof, + /// A column has already been seen for the given `(sidecar.block_root, sidecar.index)` tuple + /// over gossip or no gossip sources. + /// + /// ## Peer scoring + /// + /// The peer isn't faulty, but we do not forward it over gossip. + PriorKnown { + proposer: u64, + slot: Slot, + index: ColumnIndex, + }, +} + +impl From for GossipDataColumnError { + fn from(e: BeaconChainError) -> Self { + GossipDataColumnError::BeaconChainError(e) + } +} + +impl From for GossipDataColumnError { + fn from(e: BeaconStateError) -> Self { + GossipDataColumnError::BeaconChainError(BeaconChainError::BeaconStateError(e)) + } +} + +pub type GossipVerifiedDataColumnList = RuntimeVariableList>; + +/// A wrapper around a `DataColumnSidecar` that indicates it has been approved for re-gossiping on +/// the p2p network. +#[derive(Debug)] +pub struct GossipVerifiedDataColumn { + block_root: Hash256, + data_column: KzgVerifiedDataColumn, +} + +impl GossipVerifiedDataColumn { + pub fn new( + column_sidecar: Arc>, + subnet_id: u64, + chain: &BeaconChain, + ) -> Result> { + let header = column_sidecar.signed_block_header.clone(); + // We only process slashing info if the gossip verification failed + // since we do not process the data column any further in that case. + validate_data_column_sidecar_for_gossip(column_sidecar, subnet_id, chain).map_err(|e| { + process_block_slash_info::<_, GossipDataColumnError>( + chain, + BlockSlashInfo::from_early_error_data_column(header, e), + ) + }) + } + + pub fn id(&self) -> DataColumnIdentifier { + DataColumnIdentifier { + block_root: self.block_root, + index: self.data_column.data_column_index(), + } + } + + pub fn block_root(&self) -> Hash256 { + self.block_root + } + + pub fn slot(&self) -> Slot { + self.data_column.data.slot() + } + + pub fn signed_block_header(&self) -> SignedBeaconBlockHeader { + self.data_column.data.signed_block_header.clone() + } +} + +/// Wrapper over a `DataColumnSidecar` for which we have completed kzg verification. +#[derive(Debug, Derivative, Clone, Encode, Decode)] +#[derivative(PartialEq, Eq)] +#[ssz(struct_behaviour = "transparent")] +pub struct KzgVerifiedDataColumn { + data: Arc>, +} + +impl KzgVerifiedDataColumn { + pub fn new(data_column: Arc>, kzg: &Kzg) -> Result { + verify_kzg_for_data_column(data_column, kzg) + } + pub fn as_data_column(&self) -> &DataColumnSidecar { + &self.data + } + /// This is cheap as we're calling clone on an Arc + pub fn clone_data_column(&self) -> Arc> { + self.data.clone() + } + + pub fn data_column_index(&self) -> u64 { + self.data.index + } +} + +/// Complete kzg verification for a `DataColumnSidecar`. +/// +/// Returns an error if the kzg verification check fails. +pub fn verify_kzg_for_data_column( + data_column: Arc>, + _kzg: &Kzg, +) -> Result, KzgError> { + // TODO(das): KZG verification to be implemented + Ok(KzgVerifiedDataColumn { data: data_column }) +} + +/// Complete kzg verification for a list of `DataColumnSidecar`s. +/// Returns an error if any of the `DataColumnSidecar`s fails kzg verification. +/// +/// Note: This function should be preferred over calling `verify_kzg_for_data_column` +/// in a loop since this function kzg verifies a list of data columns more efficiently. +pub fn verify_kzg_for_data_column_list<'a, E: EthSpec, I>( + _data_column_iter: I, + _kzg: &'a Kzg, +) -> Result<(), KzgError> +where + I: Iterator>> + Clone, +{ + // TODO(das): KZG verification to be implemented + Ok(()) +} + +pub fn validate_data_column_sidecar_for_gossip( + data_column: Arc>, + subnet: u64, + chain: &BeaconChain, +) -> Result, GossipDataColumnError> { + let column_slot = data_column.slot(); + + verify_index_matches_subnet(&data_column, subnet, &chain.spec)?; + verify_sidecar_not_from_future_slot(chain, column_slot)?; + verify_slot_greater_than_latest_finalized_slot(chain, column_slot)?; + verify_is_first_sidecar(chain, &data_column)?; + verify_column_inclusion_proof(&data_column)?; + let parent_block = verify_parent_block_and_finalized_descendant(data_column.clone(), chain)?; + verify_slot_higher_than_parent(&parent_block, column_slot)?; + verify_proposer_and_signature(&data_column, &parent_block, chain)?; + + let kzg = chain + .kzg + .clone() + .ok_or(GossipDataColumnError::KzgNotInitialized)?; + let kzg_verified_data_column = verify_kzg_for_data_column(data_column.clone(), &kzg) + .map_err(GossipDataColumnError::InvalidKzgProof)?; + + chain + .observed_slashable + .write() + .observe_slashable( + column_slot, + data_column.block_proposer_index(), + data_column.block_root(), + ) + .map_err(|e| GossipDataColumnError::BeaconChainError(e.into()))?; + + Ok(GossipVerifiedDataColumn { + block_root: data_column.block_root(), + data_column: kzg_verified_data_column, + }) +} + +// Verify that this is the first column sidecar received for the tuple: +// (block_header.slot, block_header.proposer_index, column_sidecar.index) +fn verify_is_first_sidecar( + chain: &BeaconChain, + data_column: &DataColumnSidecar, +) -> Result<(), GossipDataColumnError> { + if chain + .observed_column_sidecars + .read() + .proposer_is_known(data_column) + .map_err(|e| GossipDataColumnError::BeaconChainError(e.into()))? + { + return Err(GossipDataColumnError::PriorKnown { + proposer: data_column.block_proposer_index(), + slot: data_column.slot(), + index: data_column.index, + }); + } + Ok(()) +} + +fn verify_column_inclusion_proof( + _data_column: &DataColumnSidecar, +) -> Result<(), GossipDataColumnError> { + // TODO(das): to be implemented + Ok(()) +} + +fn verify_slot_higher_than_parent( + parent_block: &Block, + data_column_slot: Slot, +) -> Result<(), GossipDataColumnError> { + if parent_block.slot >= data_column_slot { + return Err(GossipDataColumnError::IsNotLaterThanParent { + data_column_slot, + parent_slot: parent_block.slot, + }); + } + Ok(()) +} + +fn verify_parent_block_and_finalized_descendant( + data_column: Arc>, + chain: &BeaconChain, +) -> Result> { + let fork_choice = chain.canonical_head.fork_choice_read_lock(); + + // We have already verified that the column is past finalization, so we can + // just check fork choice for the block's parent. + let block_parent_root = data_column.block_parent_root(); + let Some(parent_block) = fork_choice.get_block(&block_parent_root) else { + return Err(GossipDataColumnError::ParentUnknown(data_column.clone())); + }; + + // Do not process a column that does not descend from the finalized root. + // We just loaded the parent_block, so we can be sure that it exists in fork choice. + if !fork_choice.is_finalized_checkpoint_or_descendant(block_parent_root) { + return Err(GossipDataColumnError::NotFinalizedDescendant { block_parent_root }); + } + + Ok(parent_block) +} + +fn verify_proposer_and_signature( + data_column: &DataColumnSidecar, + parent_block: &ProtoBlock, + chain: &BeaconChain, +) -> Result<(), GossipDataColumnError> { + let column_slot = data_column.slot(); + let column_epoch = column_slot.epoch(E::slots_per_epoch()); + let column_index = data_column.index; + let block_root = data_column.block_root(); + let block_parent_root = data_column.block_parent_root(); + + let proposer_shuffling_root = + if parent_block.slot.epoch(T::EthSpec::slots_per_epoch()) == column_epoch { + parent_block + .next_epoch_shuffling_id + .shuffling_decision_block + } else { + parent_block.root + }; + + let proposer_opt = chain + .beacon_proposer_cache + .lock() + .get_slot::(proposer_shuffling_root, column_slot); + + let (proposer_index, fork) = if let Some(proposer) = proposer_opt { + (proposer.index, proposer.fork) + } else { + debug!( + chain.log, + "Proposer shuffling cache miss for column verification"; + "block_root" => %block_root, + "index" => %column_index, + ); + let (parent_state_root, mut parent_state) = chain + .store + .get_advanced_hot_state(block_parent_root, column_slot, parent_block.state_root) + .map_err(|e| GossipDataColumnError::BeaconChainError(e.into()))? + .ok_or_else(|| { + BeaconChainError::DBInconsistent(format!( + "Missing state for parent block {block_parent_root:?}", + )) + })?; + + let state = cheap_state_advance_to_obtain_committees::<_, GossipDataColumnError>( + &mut parent_state, + Some(parent_state_root), + column_slot, + &chain.spec, + )?; + + let proposers = state.get_beacon_proposer_indices(&chain.spec)?; + let proposer_index = *proposers + .get(column_slot.as_usize() % T::EthSpec::slots_per_epoch() as usize) + .ok_or_else(|| BeaconChainError::NoProposerForSlot(column_slot))?; + + // Prime the proposer shuffling cache with the newly-learned value. + chain.beacon_proposer_cache.lock().insert( + column_epoch, + proposer_shuffling_root, + proposers, + state.fork(), + )?; + (proposer_index, state.fork()) + }; + + // Signature verify the signed block header. + let signature_is_valid = { + let pubkey_cache = get_validator_pubkey_cache(chain) + .map_err(|_| GossipDataColumnError::PubkeyCacheTimeout)?; + + let pubkey = pubkey_cache + .get(proposer_index) + .ok_or_else(|| GossipDataColumnError::UnknownValidator(proposer_index as u64))?; + let signed_block_header = &data_column.signed_block_header; + signed_block_header.verify_signature::( + pubkey, + &fork, + chain.genesis_validators_root, + &chain.spec, + ) + }; + + if !signature_is_valid { + return Err(GossipDataColumnError::ProposalSignatureInvalid); + } + + let column_proposer_index = data_column.block_proposer_index(); + if proposer_index != column_proposer_index as usize { + return Err(GossipDataColumnError::ProposerIndexMismatch { + sidecar: column_proposer_index as usize, + local: proposer_index, + }); + } + + Ok(()) +} + +fn verify_index_matches_subnet( + data_column: &DataColumnSidecar, + subnet: u64, + spec: &ChainSpec, +) -> Result<(), GossipDataColumnError> { + let expected_subnet: u64 = + DataColumnSubnetId::from_column_index::(data_column.index as usize, spec).into(); + if expected_subnet != subnet { + return Err(GossipDataColumnError::InvalidSubnetId { + received: subnet, + expected: expected_subnet, + }); + } + Ok(()) +} + +fn verify_slot_greater_than_latest_finalized_slot( + chain: &BeaconChain, + column_slot: Slot, +) -> Result<(), GossipDataColumnError> { + let latest_finalized_slot = chain + .head() + .finalized_checkpoint() + .epoch + .start_slot(T::EthSpec::slots_per_epoch()); + if column_slot <= latest_finalized_slot { + return Err(GossipDataColumnError::PastFinalizedSlot { + column_slot, + finalized_slot: latest_finalized_slot, + }); + } + Ok(()) +} + +fn verify_sidecar_not_from_future_slot( + chain: &BeaconChain, + column_slot: Slot, +) -> Result<(), GossipDataColumnError> { + let latest_permissible_slot = chain + .slot_clock + .now_with_future_tolerance(chain.spec.maximum_gossip_clock_disparity()) + .ok_or(BeaconChainError::UnableToReadSlot)?; + if column_slot > latest_permissible_slot { + return Err(GossipDataColumnError::FutureSlot { + message_slot: column_slot, + latest_permissible_slot, + }); + } + Ok(()) +} diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 819de1f5c19..03e8d1a64ae 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -9,8 +9,8 @@ use crate::migrate::PruningError; use crate::naive_aggregation_pool::Error as NaiveAggregationError; use crate::observed_aggregates::Error as ObservedAttestationsError; use crate::observed_attesters::Error as ObservedAttestersError; -use crate::observed_blob_sidecars::Error as ObservedBlobSidecarsError; use crate::observed_block_producers::Error as ObservedBlockProducersError; +use crate::observed_data_sidecars::Error as ObservedBlobSidecarsError; use execution_layer::PayloadStatus; use fork_choice::ExecutionStatus; use futures::channel::mpsc::TrySendError; diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 466ab0b67e7..2bdbc9d3e9f 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -19,6 +19,7 @@ pub mod canonical_head; pub mod capella_readiness; pub mod chain_config; pub mod data_availability_checker; +pub mod data_column_verification; pub mod deneb_readiness; mod early_attester_cache; pub mod electra_readiness; @@ -41,8 +42,8 @@ pub mod migrate; mod naive_aggregation_pool; pub mod observed_aggregates; mod observed_attesters; -mod observed_blob_sidecars; pub mod observed_block_producers; +mod observed_data_sidecars; pub mod observed_operations; mod observed_slashable; pub mod otb_verification_service; diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index 064b2b199ff..001b6609043 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -1032,14 +1032,26 @@ lazy_static! { "beacon_blobs_sidecar_processing_requests_total", "Count of all blob sidecars submitted for processing" ); + pub static ref BLOBS_COLUMN_SIDECAR_PROCESSING_REQUESTS: Result = try_create_int_counter( + "beacon_blobs_column_sidecar_processing_requests_total", + "Count of all data column sidecars submitted for processing" + ); pub static ref BLOBS_SIDECAR_PROCESSING_SUCCESSES: Result = try_create_int_counter( "beacon_blobs_sidecar_processing_successes_total", "Number of blob sidecars verified for gossip" ); + pub static ref DATA_COLUMNS_SIDECAR_PROCESSING_SUCCESSES: Result = try_create_int_counter( + "beacon_blobs_column_sidecar_processing_successes_total", + "Number of data column sidecars verified for gossip" + ); pub static ref BLOBS_SIDECAR_GOSSIP_VERIFICATION_TIMES: Result = try_create_histogram( "beacon_blobs_sidecar_gossip_verification_seconds", "Full runtime of blob sidecars gossip verification" ); + pub static ref DATA_COLUMN_SIDECAR_GOSSIP_VERIFICATION_TIMES: Result = try_create_histogram( + "beacon_blobs_column_sidecar_gossip_verification_seconds", + "Full runtime of data column sidecars gossip verification" + ); pub static ref BLOB_SIDECAR_INCLUSION_PROOF_VERIFICATION: Result = try_create_histogram( "blob_sidecar_inclusion_proof_verification_seconds", "Time taken to verify blob sidecar inclusion proof" diff --git a/beacon_node/beacon_chain/src/observed_data_sidecars.rs b/beacon_node/beacon_chain/src/observed_data_sidecars.rs new file mode 100644 index 00000000000..59b343eaa7c --- /dev/null +++ b/beacon_node/beacon_chain/src/observed_data_sidecars.rs @@ -0,0 +1,474 @@ +//! Provides the `ObservedBlobSidecars` struct which allows for rejecting `BlobSidecar`s +//! that we have already seen over the gossip network. +//! Only `BlobSidecar`s that have completed proposer signature verification can be added +//! to this cache to reduce DoS risks. + +use crate::observed_block_producers::ProposalKey; +use std::collections::{HashMap, HashSet}; +use std::marker::PhantomData; +use types::{BlobSidecar, ChainSpec, DataColumnSidecar, EthSpec, Slot}; + +#[derive(Debug, PartialEq)] +pub enum Error { + /// The slot of the provided `ObservableSidecar` is prior to finalization and should not have been provided + /// to this function. This is an internal error. + FinalizedDataSidecar { slot: Slot, finalized_slot: Slot }, + /// The data sidecar contains an invalid index, the data sidecar is invalid. + /// Note: The invalid data should have been caught and flagged as an error much before reaching + /// here. + InvalidDataIndex(u64), +} + +pub trait ObservableDataSidecar { + fn slot(&self) -> Slot; + fn block_proposer_index(&self) -> u64; + fn index(&self) -> u64; + fn max_num_of_items(spec: &ChainSpec) -> usize; +} + +impl ObservableDataSidecar for BlobSidecar { + fn slot(&self) -> Slot { + self.slot() + } + + fn block_proposer_index(&self) -> u64 { + self.block_proposer_index() + } + + fn index(&self) -> u64 { + self.index + } + + fn max_num_of_items(_spec: &ChainSpec) -> usize { + E::max_blobs_per_block() + } +} + +impl ObservableDataSidecar for DataColumnSidecar { + fn slot(&self) -> Slot { + self.slot() + } + + fn block_proposer_index(&self) -> u64 { + self.block_proposer_index() + } + + fn index(&self) -> u64 { + self.index + } + + fn max_num_of_items(spec: &ChainSpec) -> usize { + spec.number_of_columns + } +} + +/// Maintains a cache of seen `ObservableSidecar`s that are received over gossip +/// and have been gossip verified. +/// +/// The cache supports pruning based upon the finalized epoch. It does not automatically prune, you +/// must call `Self::prune` manually. +/// +/// Note: To prevent DoS attacks, this cache must include only items that have received some DoS resistance +/// like checking the proposer signature. +pub struct ObservedDataSidecars { + finalized_slot: Slot, + /// Stores all received data indices for a given `(ValidatorIndex, Slot)` tuple. + items: HashMap>, + spec: ChainSpec, + _phantom: PhantomData, +} + +impl ObservedDataSidecars { + /// Instantiates `Self` with `finalized_slot == 0`. + pub fn new(spec: ChainSpec) -> Self { + Self { + finalized_slot: Slot::new(0), + items: HashMap::new(), + spec, + _phantom: PhantomData, + } + } + + /// Observe the `data_sidecar` at (`data_sidecar.block_proposer_index, data_sidecar.slot`). + /// This will update `self` so future calls to it indicate that this `data_sidecar` is known. + /// + /// The supplied `data_sidecar` **MUST** have completed proposer signature verification. + pub fn observe_sidecar(&mut self, data_sidecar: &T) -> Result { + self.sanitize_data_sidecar(data_sidecar)?; + + let data_indices = self + .items + .entry(ProposalKey { + slot: data_sidecar.slot(), + proposer: data_sidecar.block_proposer_index(), + }) + .or_insert_with(|| HashSet::with_capacity(T::max_num_of_items(&self.spec))); + let did_not_exist = data_indices.insert(data_sidecar.index()); + + Ok(!did_not_exist) + } + + /// Returns `true` if the `data_sidecar` has already been observed in the cache within the prune window. + pub fn proposer_is_known(&self, data_sidecar: &T) -> Result { + self.sanitize_data_sidecar(data_sidecar)?; + let is_known = self + .items + .get(&ProposalKey { + slot: data_sidecar.slot(), + proposer: data_sidecar.block_proposer_index(), + }) + .map_or(false, |indices| indices.contains(&data_sidecar.index())); + Ok(is_known) + } + + fn sanitize_data_sidecar(&self, data_sidecar: &T) -> Result<(), Error> { + if data_sidecar.index() >= T::max_num_of_items(&self.spec) as u64 { + return Err(Error::InvalidDataIndex(data_sidecar.index())); + } + let finalized_slot = self.finalized_slot; + if finalized_slot > 0 && data_sidecar.slot() <= finalized_slot { + return Err(Error::FinalizedDataSidecar { + slot: data_sidecar.slot(), + finalized_slot, + }); + } + + Ok(()) + } + + /// Prune `data_sidecar` observations for slots less than or equal to the given slot. + pub fn prune(&mut self, finalized_slot: Slot) { + if finalized_slot == 0 { + return; + } + + self.finalized_slot = finalized_slot; + self.items.retain(|k, _| k.slot > finalized_slot); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test_utils::test_spec; + use bls::Hash256; + use std::sync::Arc; + use types::MainnetEthSpec; + + type E = MainnetEthSpec; + + fn get_blob_sidecar(slot: u64, proposer_index: u64, index: u64) -> Arc> { + let mut blob_sidecar = BlobSidecar::empty(); + blob_sidecar.signed_block_header.message.slot = slot.into(); + blob_sidecar.signed_block_header.message.proposer_index = proposer_index; + blob_sidecar.index = index; + Arc::new(blob_sidecar) + } + + #[test] + fn pruning() { + let spec = test_spec::(); + let mut cache = ObservedDataSidecars::>::new(spec); + + assert_eq!(cache.finalized_slot, 0, "finalized slot is zero"); + assert_eq!(cache.items.len(), 0, "no slots should be present"); + + // Slot 0, index 0 + let proposer_index_a = 420; + let sidecar_a = get_blob_sidecar(0, proposer_index_a, 0); + + assert_eq!( + cache.observe_sidecar(&sidecar_a), + Ok(false), + "can observe proposer, indicates proposer unobserved" + ); + + /* + * Preconditions. + */ + + assert_eq!(cache.finalized_slot, 0, "finalized slot is zero"); + assert_eq!( + cache.items.len(), + 1, + "only one (validator_index, slot) tuple should be present" + ); + + let cached_blob_indices = cache + .items + .get(&ProposalKey::new(proposer_index_a, Slot::new(0))) + .expect("slot zero should be present"); + assert_eq!( + cached_blob_indices.len(), + 1, + "only one proposer should be present" + ); + + /* + * Check that a prune at the genesis slot does nothing. + */ + + cache.prune(Slot::new(0)); + + assert_eq!(cache.finalized_slot, 0, "finalized slot is zero"); + assert_eq!(cache.items.len(), 1, "only one slot should be present"); + let cached_blob_indices = cache + .items + .get(&ProposalKey::new(proposer_index_a, Slot::new(0))) + .expect("slot zero should be present"); + assert_eq!( + cached_blob_indices.len(), + 1, + "only one proposer should be present" + ); + + /* + * Check that a prune empties the cache + */ + + cache.prune(E::slots_per_epoch().into()); + assert_eq!( + cache.finalized_slot, + Slot::from(E::slots_per_epoch()), + "finalized slot is updated" + ); + assert_eq!(cache.items.len(), 0, "no items left"); + + /* + * Check that we can't insert a finalized sidecar + */ + + // First slot of finalized epoch + let block_b = get_blob_sidecar(E::slots_per_epoch(), 419, 0); + + assert_eq!( + cache.observe_sidecar(&block_b), + Err(Error::FinalizedDataSidecar { + slot: E::slots_per_epoch().into(), + finalized_slot: E::slots_per_epoch().into(), + }), + "cant insert finalized sidecar" + ); + + assert_eq!(cache.items.len(), 0, "sidecar was not added"); + + /* + * Check that we _can_ insert a non-finalized block + */ + + let three_epochs = E::slots_per_epoch() * 3; + + // First slot of finalized epoch + let proposer_index_b = 421; + let block_b = get_blob_sidecar(three_epochs, proposer_index_b, 0); + + assert_eq!( + cache.observe_sidecar(&block_b), + Ok(false), + "can insert non-finalized block" + ); + + assert_eq!(cache.items.len(), 1, "only one slot should be present"); + let cached_blob_indices = cache + .items + .get(&ProposalKey::new(proposer_index_b, Slot::new(three_epochs))) + .expect("the three epochs slot should be present"); + assert_eq!( + cached_blob_indices.len(), + 1, + "only one proposer should be present" + ); + + /* + * Check that a prune doesnt wipe later blocks + */ + + let two_epochs = E::slots_per_epoch() * 2; + cache.prune(two_epochs.into()); + + assert_eq!( + cache.finalized_slot, + Slot::from(two_epochs), + "finalized slot is updated" + ); + + assert_eq!(cache.items.len(), 1, "only one slot should be present"); + let cached_blob_indices = cache + .items + .get(&ProposalKey::new(proposer_index_b, Slot::new(three_epochs))) + .expect("the three epochs slot should be present"); + assert_eq!( + cached_blob_indices.len(), + 1, + "only one proposer should be present" + ); + } + + #[test] + fn simple_observations() { + let spec = test_spec::(); + let mut cache = ObservedDataSidecars::>::new(spec); + + // Slot 0, index 0 + let proposer_index_a = 420; + let sidecar_a = get_blob_sidecar(0, proposer_index_a, 0); + + assert_eq!( + cache.proposer_is_known(&sidecar_a), + Ok(false), + "no observation in empty cache" + ); + + assert_eq!( + cache.observe_sidecar(&sidecar_a), + Ok(false), + "can observe proposer, indicates proposer unobserved" + ); + + assert_eq!( + cache.proposer_is_known(&sidecar_a), + Ok(true), + "observed block is indicated as true" + ); + + assert_eq!( + cache.observe_sidecar(&sidecar_a), + Ok(true), + "observing again indicates true" + ); + + assert_eq!(cache.finalized_slot, 0, "finalized slot is zero"); + assert_eq!(cache.items.len(), 1, "only one slot should be present"); + let cached_blob_indices = cache + .items + .get(&ProposalKey::new(proposer_index_a, Slot::new(0))) + .expect("slot zero should be present"); + assert_eq!( + cached_blob_indices.len(), + 1, + "only one proposer should be present" + ); + + // Slot 1, proposer 0 + + let proposer_index_b = 421; + let sidecar_b = get_blob_sidecar(1, proposer_index_b, 0); + + assert_eq!( + cache.proposer_is_known(&sidecar_b), + Ok(false), + "no observation for new slot" + ); + assert_eq!( + cache.observe_sidecar(&sidecar_b), + Ok(false), + "can observe proposer for new slot, indicates proposer unobserved" + ); + assert_eq!( + cache.proposer_is_known(&sidecar_b), + Ok(true), + "observed block in slot 1 is indicated as true" + ); + assert_eq!( + cache.observe_sidecar(&sidecar_b), + Ok(true), + "observing slot 1 again indicates true" + ); + + assert_eq!(cache.finalized_slot, 0, "finalized slot is zero"); + assert_eq!(cache.items.len(), 2, "two slots should be present"); + let cached_blob_indices = cache + .items + .get(&ProposalKey::new(proposer_index_a, Slot::new(0))) + .expect("slot zero should be present"); + assert_eq!( + cached_blob_indices.len(), + 1, + "only one proposer should be present in slot 0" + ); + let cached_blob_indices = cache + .items + .get(&ProposalKey::new(proposer_index_b, Slot::new(1))) + .expect("slot zero should be present"); + assert_eq!( + cached_blob_indices.len(), + 1, + "only one proposer should be present in slot 1" + ); + + // Slot 0, index 1 + let sidecar_c = get_blob_sidecar(0, proposer_index_a, 1); + + assert_eq!( + cache.proposer_is_known(&sidecar_c), + Ok(false), + "no observation for new index" + ); + assert_eq!( + cache.observe_sidecar(&sidecar_c), + Ok(false), + "can observe new index, indicates sidecar unobserved for new index" + ); + assert_eq!( + cache.proposer_is_known(&sidecar_c), + Ok(true), + "observed new sidecar is indicated as true" + ); + assert_eq!( + cache.observe_sidecar(&sidecar_c), + Ok(true), + "observing new sidecar again indicates true" + ); + + assert_eq!(cache.finalized_slot, 0, "finalized slot is zero"); + assert_eq!(cache.items.len(), 2, "two slots should be present"); + let cached_blob_indices = cache + .items + .get(&ProposalKey::new(proposer_index_a, Slot::new(0))) + .expect("slot zero should be present"); + assert_eq!( + cached_blob_indices.len(), + 2, + "two blob indices should be present in slot 0" + ); + + // Create a sidecar sharing slot and proposer but with a different block root. + let mut sidecar_d: BlobSidecar = BlobSidecar { + index: sidecar_c.index, + blob: sidecar_c.blob.clone(), + kzg_commitment: sidecar_c.kzg_commitment, + kzg_proof: sidecar_c.kzg_proof, + signed_block_header: sidecar_c.signed_block_header.clone(), + kzg_commitment_inclusion_proof: sidecar_c.kzg_commitment_inclusion_proof.clone(), + }; + sidecar_d.signed_block_header.message.body_root = Hash256::repeat_byte(7); + assert_eq!( + cache.proposer_is_known(&sidecar_d), + Ok(true), + "there has been an observation for this proposer index" + ); + assert_eq!( + cache.observe_sidecar(&sidecar_d), + Ok(true), + "indicates sidecar proposer was observed" + ); + let cached_blob_indices = cache + .items + .get(&ProposalKey::new(proposer_index_a, Slot::new(0))) + .expect("slot zero should be present"); + assert_eq!( + cached_blob_indices.len(), + 2, + "two blob indices should be present in slot 0" + ); + + // Try adding an out of bounds index + let invalid_index = E::max_blobs_per_block() as u64; + let sidecar_d = get_blob_sidecar(0, proposer_index_a, invalid_index); + assert_eq!( + cache.observe_sidecar(&sidecar_d), + Err(Error::InvalidDataIndex(invalid_index)), + "cannot add an index > MaxBlobsPerBlock" + ); + } +} diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index 5bf13d82b7b..f491dc7ffb0 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -112,6 +112,7 @@ pub struct BeaconProcessorQueueLengths { backfill_chain_segment: usize, gossip_block_queue: usize, gossip_blob_queue: usize, + gossip_data_column_queue: usize, delayed_block_queue: usize, status_queue: usize, bbrange_queue: usize, @@ -164,6 +165,7 @@ impl BeaconProcessorQueueLengths { backfill_chain_segment: 64, gossip_block_queue: 1024, gossip_blob_queue: 1024, + gossip_data_column_queue: 1024, delayed_block_queue: 1024, status_queue: 1024, bbrange_queue: 1024, @@ -209,6 +211,7 @@ pub const GOSSIP_AGGREGATE: &str = "gossip_aggregate"; pub const GOSSIP_AGGREGATE_BATCH: &str = "gossip_aggregate_batch"; pub const GOSSIP_BLOCK: &str = "gossip_block"; pub const GOSSIP_BLOBS_SIDECAR: &str = "gossip_blobs_sidecar"; +pub const GOSSIP_BLOBS_COLUMN_SIDECAR: &str = "gossip_blobs_column_sidecar"; pub const DELAYED_IMPORT_BLOCK: &str = "delayed_import_block"; pub const GOSSIP_VOLUNTARY_EXIT: &str = "gossip_voluntary_exit"; pub const GOSSIP_PROPOSER_SLASHING: &str = "gossip_proposer_slashing"; @@ -577,6 +580,7 @@ pub enum Work { }, GossipBlock(AsyncFn), GossipBlobSidecar(AsyncFn), + GossipDataColumnSidecar(AsyncFn), DelayedImportBlock { beacon_block_slot: Slot, beacon_block_root: Hash256, @@ -629,6 +633,7 @@ impl Work { Work::GossipAggregateBatch { .. } => GOSSIP_AGGREGATE_BATCH, Work::GossipBlock(_) => GOSSIP_BLOCK, Work::GossipBlobSidecar(_) => GOSSIP_BLOBS_SIDECAR, + Work::GossipDataColumnSidecar(_) => GOSSIP_BLOBS_COLUMN_SIDECAR, Work::DelayedImportBlock { .. } => DELAYED_IMPORT_BLOCK, Work::GossipVoluntaryExit(_) => GOSSIP_VOLUNTARY_EXIT, Work::GossipProposerSlashing(_) => GOSSIP_PROPOSER_SLASHING, @@ -803,6 +808,7 @@ impl BeaconProcessor { let mut backfill_chain_segment = FifoQueue::new(queue_lengths.backfill_chain_segment); let mut gossip_block_queue = FifoQueue::new(queue_lengths.gossip_block_queue); let mut gossip_blob_queue = FifoQueue::new(queue_lengths.gossip_blob_queue); + let mut gossip_data_column_queue = FifoQueue::new(queue_lengths.gossip_data_column_queue); let mut delayed_block_queue = FifoQueue::new(queue_lengths.delayed_block_queue); let mut status_queue = FifoQueue::new(queue_lengths.status_queue); @@ -961,6 +967,8 @@ impl BeaconProcessor { self.spawn_worker(item, idle_tx); } else if let Some(item) = gossip_blob_queue.pop() { self.spawn_worker(item, idle_tx); + } else if let Some(item) = gossip_data_column_queue.pop() { + self.spawn_worker(item, idle_tx); // Check the priority 0 API requests after blocks and blobs, but before attestations. } else if let Some(item) = api_request_p0_queue.pop() { self.spawn_worker(item, idle_tx); @@ -1208,6 +1216,9 @@ impl BeaconProcessor { Work::GossipBlobSidecar { .. } => { gossip_blob_queue.push(work, work_id, &self.log) } + Work::GossipDataColumnSidecar { .. } => { + gossip_data_column_queue.push(work, work_id, &self.log) + } Work::DelayedImportBlock { .. } => { delayed_block_queue.push(work, work_id, &self.log) } @@ -1312,6 +1323,10 @@ impl BeaconProcessor { &metrics::BEACON_PROCESSOR_GOSSIP_BLOB_QUEUE_TOTAL, gossip_blob_queue.len() as i64, ); + metrics::set_gauge( + &metrics::BEACON_PROCESSOR_GOSSIP_DATA_COLUMN_QUEUE_TOTAL, + gossip_data_column_queue.len() as i64, + ); metrics::set_gauge( &metrics::BEACON_PROCESSOR_RPC_BLOCK_QUEUE_TOTAL, rpc_block_queue.len() as i64, @@ -1463,11 +1478,11 @@ impl BeaconProcessor { task_spawner.spawn_async(process_fn) } Work::IgnoredRpcBlock { process_fn } => task_spawner.spawn_blocking(process_fn), - Work::GossipBlock(work) | Work::GossipBlobSidecar(work) => { - task_spawner.spawn_async(async move { - work.await; - }) - } + Work::GossipBlock(work) + | Work::GossipBlobSidecar(work) + | Work::GossipDataColumnSidecar(work) => task_spawner.spawn_async(async move { + work.await; + }), Work::BlobsByRangeRequest(process_fn) | Work::BlobsByRootsRequest(process_fn) => { task_spawner.spawn_blocking(process_fn) } diff --git a/beacon_node/beacon_processor/src/metrics.rs b/beacon_node/beacon_processor/src/metrics.rs index fa7d7d7b9a3..bcd422b357d 100644 --- a/beacon_node/beacon_processor/src/metrics.rs +++ b/beacon_node/beacon_processor/src/metrics.rs @@ -51,6 +51,11 @@ lazy_static::lazy_static! { "beacon_processor_gossip_blob_queue_total", "Count of blobs from gossip waiting to be verified." ); + // Gossip data column sidecars. + pub static ref BEACON_PROCESSOR_GOSSIP_DATA_COLUMN_QUEUE_TOTAL: Result = try_create_int_gauge( + "beacon_processor_gossip_data_column_queue_total", + "Count of data column sidecars from gossip waiting to be verified." + ); // Gossip Exits. pub static ref BEACON_PROCESSOR_EXIT_QUEUE_TOTAL: Result = try_create_int_gauge( "beacon_processor_exit_queue_total", diff --git a/beacon_node/lighthouse_network/Cargo.toml b/beacon_node/lighthouse_network/Cargo.toml index 56a8fe99c70..3dfa24d467b 100644 --- a/beacon_node/lighthouse_network/Cargo.toml +++ b/beacon_node/lighthouse_network/Cargo.toml @@ -43,6 +43,7 @@ unused_port = { workspace = true } delay_map = { workspace = true } bytes = { workspace = true } either = { workspace = true } +itertools = { workspace = true } # Local dependencies void = "1.0.2" diff --git a/beacon_node/lighthouse_network/src/discovery/mod.rs b/beacon_node/lighthouse_network/src/discovery/mod.rs index 73f51c001a7..2cc3a27953f 100644 --- a/beacon_node/lighthouse_network/src/discovery/mod.rs +++ b/beacon_node/lighthouse_network/src/discovery/mod.rs @@ -43,7 +43,7 @@ use std::{ time::{Duration, Instant}, }; use tokio::sync::mpsc; -use types::{EnrForkId, EthSpec}; +use types::{ChainSpec, EnrForkId, EthSpec}; mod subnet_predicate; pub use subnet_predicate::subnet_predicate; @@ -192,6 +192,7 @@ pub struct Discovery { /// Logger for the discovery behaviour. log: slog::Logger, + spec: ChainSpec, } impl Discovery { @@ -201,6 +202,7 @@ impl Discovery { config: &NetworkConfig, network_globals: Arc>, log: &slog::Logger, + spec: &ChainSpec, ) -> error::Result { let log = log.clone(); @@ -325,6 +327,7 @@ impl Discovery { update_ports, log, enr_dir, + spec: spec.clone(), }) } @@ -548,6 +551,8 @@ impl Discovery { ) .map_err(|e| format!("{:?}", e))?; } + // Data column subnets are computed from node ID. No subnet bitfield in the ENR. + Subnet::DataColumn(_) => return Ok(()), } // replace the global version @@ -753,7 +758,7 @@ impl Discovery { // Only start a discovery query if we have a subnet to look for. if !filtered_subnet_queries.is_empty() { // build the subnet predicate as a combination of the eth2_fork_predicate and the subnet predicate - let subnet_predicate = subnet_predicate::(filtered_subnets, &self.log); + let subnet_predicate = subnet_predicate::(filtered_subnets, &self.log, &self.spec); debug!( self.log, @@ -867,6 +872,7 @@ impl Discovery { let query_str = match query.subnet { Subnet::Attestation(_) => "attestation", Subnet::SyncCommittee(_) => "sync_committee", + Subnet::DataColumn(_) => "data_column", }; if let Some(v) = metrics::get_int_counter( @@ -880,7 +886,7 @@ impl Discovery { // Check the specific subnet against the enr let subnet_predicate = - subnet_predicate::(vec![query.subnet], &self.log); + subnet_predicate::(vec![query.subnet], &self.log, &self.spec); r.clone() .into_iter() @@ -1194,6 +1200,7 @@ mod tests { } async fn build_discovery() -> Discovery { + let spec = ChainSpec::default(); let keypair = secp256k1::Keypair::generate(); let mut config = NetworkConfig::default(); config.set_listening_addr(crate::ListenAddress::unused_v4_ports()); @@ -1212,7 +1219,7 @@ mod tests { &log, ); let keypair = keypair.into(); - Discovery::new(keypair, &config, Arc::new(globals), &log) + Discovery::new(keypair, &config, Arc::new(globals), &log, &spec) .await .unwrap() } diff --git a/beacon_node/lighthouse_network/src/discovery/subnet_predicate.rs b/beacon_node/lighthouse_network/src/discovery/subnet_predicate.rs index 1aabf12b725..a1f45767eec 100644 --- a/beacon_node/lighthouse_network/src/discovery/subnet_predicate.rs +++ b/beacon_node/lighthouse_network/src/discovery/subnet_predicate.rs @@ -1,15 +1,22 @@ //! The subnet predicate used for searching for a particular subnet. use super::*; use crate::types::{EnrAttestationBitfield, EnrSyncCommitteeBitfield}; +use itertools::Itertools; use slog::trace; use std::ops::Deref; +use types::{ChainSpec, DataColumnSubnetId}; /// Returns the predicate for a given subnet. -pub fn subnet_predicate(subnets: Vec, log: &slog::Logger) -> impl Fn(&Enr) -> bool + Send +pub fn subnet_predicate( + subnets: Vec, + log: &slog::Logger, + spec: &ChainSpec, +) -> impl Fn(&Enr) -> bool + Send where E: EthSpec, { let log_clone = log.clone(); + let spec_clone = spec.clone(); move |enr: &Enr| { let attestation_bitfield: EnrAttestationBitfield = match enr.attestation_bitfield::() @@ -19,10 +26,13 @@ where }; // Pre-fork/fork-boundary enrs may not contain a syncnets field. - // Don't return early here + // Don't return early here. let sync_committee_bitfield: Result, _> = enr.sync_committee_bitfield::(); + // TODO(das): compute from enr + let custody_subnet_count = spec_clone.custody_requirement; + let predicate = subnets.iter().any(|subnet| match subnet { Subnet::Attestation(s) => attestation_bitfield .get(*s.deref() as usize) @@ -30,6 +40,14 @@ where Subnet::SyncCommittee(s) => sync_committee_bitfield .as_ref() .map_or(false, |b| b.get(*s.deref() as usize).unwrap_or(false)), + Subnet::DataColumn(s) => { + let mut subnets = DataColumnSubnetId::compute_custody_subnets::( + enr.node_id().raw().into(), + custody_subnet_count, + &spec_clone, + ); + subnets.contains(s) + } }); if !predicate { diff --git a/beacon_node/lighthouse_network/src/peer_manager/mod.rs b/beacon_node/lighthouse_network/src/peer_manager/mod.rs index 0d9a7c60dd2..c86c2098d6f 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/mod.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/mod.rs @@ -1027,6 +1027,10 @@ impl PeerManager { .or_default() .insert(id); } + // TODO(das) to be implemented. We're not pruning data column peers yet + // because data column topics are subscribed as core topics until we + // implement recomputing data column subnets. + Subnet::DataColumn(_) => {} } } } diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs index ab113a1a044..0745cc26008 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs @@ -94,6 +94,15 @@ impl PeerInfo { .syncnets() .map_or(false, |s| s.get(**id as usize).unwrap_or(false)) } + Subnet::DataColumn(_) => { + // TODO(das): Pending spec PR https://github.com/ethereum/consensus-specs/pull/3821 + // We should use MetaDataV3 for peer selection rather than + // looking at subscribed peers (current behavior). Until MetaDataV3 is + // implemented, this is the perhaps the only viable option on the current devnet + // as the peer count is low and it's important to identify supernodes to get a + // good distribution of peers across subnets. + return true; + } } } false diff --git a/beacon_node/lighthouse_network/src/service/gossip_cache.rs b/beacon_node/lighthouse_network/src/service/gossip_cache.rs index 158c7a994a3..fd71ec9aafb 100644 --- a/beacon_node/lighthouse_network/src/service/gossip_cache.rs +++ b/beacon_node/lighthouse_network/src/service/gossip_cache.rs @@ -194,6 +194,7 @@ impl GossipCache { let expire_timeout = match topic.kind() { GossipKind::BeaconBlock => self.beacon_block, GossipKind::BlobSidecar(_) => self.blob_sidecar, + GossipKind::DataColumnSidecar(_) => self.blob_sidecar, GossipKind::BeaconAggregateAndProof => self.aggregates, GossipKind::Attestation(_) => self.attestation, GossipKind::VoluntaryExit => self.voluntary_exit, diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index 2868c616bdd..033446ad650 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -327,6 +327,7 @@ impl Network { &config, network_globals.clone(), &log, + ctx.chain_spec, ) .await?; // start searching for peers @@ -1218,7 +1219,7 @@ impl Network { /// Dial cached Enrs in discovery service that are in the given `subnet_id` and aren't /// in Connected, Dialing or Banned state. fn dial_cached_enrs_in_subnet(&mut self, subnet: Subnet) { - let predicate = subnet_predicate::(vec![subnet], &self.log); + let predicate = subnet_predicate::(vec![subnet], &self.log, &self.fork_context.spec); let peers_to_dial: Vec = self .discovery() .cached_enrs() diff --git a/beacon_node/lighthouse_network/src/types/pubsub.rs b/beacon_node/lighthouse_network/src/types/pubsub.rs index b443ecd1b9b..18353c39fd5 100644 --- a/beacon_node/lighthouse_network/src/types/pubsub.rs +++ b/beacon_node/lighthouse_network/src/types/pubsub.rs @@ -8,13 +8,13 @@ use std::io::{Error, ErrorKind}; use std::sync::Arc; use types::{ Attestation, AttestationBase, AttestationElectra, AttesterSlashing, AttesterSlashingBase, - AttesterSlashingElectra, BlobSidecar, EthSpec, ForkContext, ForkName, - LightClientFinalityUpdate, LightClientOptimisticUpdate, ProposerSlashing, - SignedAggregateAndProof, SignedAggregateAndProofBase, SignedAggregateAndProofElectra, - SignedBeaconBlock, SignedBeaconBlockAltair, SignedBeaconBlockBase, SignedBeaconBlockBellatrix, - SignedBeaconBlockCapella, SignedBeaconBlockDeneb, SignedBeaconBlockElectra, - SignedBlsToExecutionChange, SignedContributionAndProof, SignedVoluntaryExit, SubnetId, - SyncCommitteeMessage, SyncSubnetId, + AttesterSlashingElectra, BlobSidecar, DataColumnSidecar, DataColumnSubnetId, EthSpec, + ForkContext, ForkName, LightClientFinalityUpdate, LightClientOptimisticUpdate, + ProposerSlashing, SignedAggregateAndProof, SignedAggregateAndProofBase, + SignedAggregateAndProofElectra, SignedBeaconBlock, SignedBeaconBlockAltair, + SignedBeaconBlockBase, SignedBeaconBlockBellatrix, SignedBeaconBlockCapella, + SignedBeaconBlockDeneb, SignedBeaconBlockElectra, SignedBlsToExecutionChange, + SignedContributionAndProof, SignedVoluntaryExit, SubnetId, SyncCommitteeMessage, SyncSubnetId, }; #[derive(Debug, Clone, PartialEq)] @@ -23,6 +23,8 @@ pub enum PubsubMessage { BeaconBlock(Arc>), /// Gossipsub message providing notification of a [`BlobSidecar`] along with the subnet id where it was received. BlobSidecar(Box<(u64, Arc>)>), + /// Gossipsub message providing notification of a [`DataColumnSidecar`] along with the subnet id where it was received. + DataColumnSidecar(Box<(DataColumnSubnetId, Arc>)>), /// Gossipsub message providing notification of a Aggregate attestation and associated proof. AggregateAndProofAttestation(Box>), /// Gossipsub message providing notification of a raw un-aggregated attestation with its shard id. @@ -119,6 +121,9 @@ impl PubsubMessage { PubsubMessage::BlobSidecar(blob_sidecar_data) => { GossipKind::BlobSidecar(blob_sidecar_data.0) } + PubsubMessage::DataColumnSidecar(column_sidecar_data) => { + GossipKind::DataColumnSidecar(column_sidecar_data.0) + } PubsubMessage::AggregateAndProofAttestation(_) => GossipKind::BeaconAggregateAndProof, PubsubMessage::Attestation(attestation_data) => { GossipKind::Attestation(attestation_data.0) @@ -270,6 +275,41 @@ impl PubsubMessage { )), } } + GossipKind::DataColumnSidecar(subnet_id) => { + match fork_context.from_context_bytes(gossip_topic.fork_digest) { + Some(ForkName::Deneb | ForkName::Electra) => { + let col_sidecar = Arc::new( + DataColumnSidecar::from_ssz_bytes(data) + .map_err(|e| format!("{:?}", e))?, + ); + let peer_das_enabled = + fork_context.spec.is_peer_das_enabled_for_epoch( + col_sidecar.slot().epoch(E::slots_per_epoch()), + ); + if peer_das_enabled { + Ok(PubsubMessage::DataColumnSidecar(Box::new(( + *subnet_id, + col_sidecar, + )))) + } else { + Err(format!( + "data_column_sidecar topic invalid for given fork digest {:?}", + gossip_topic.fork_digest + )) + } + } + Some( + ForkName::Base + | ForkName::Altair + | ForkName::Bellatrix + | ForkName::Capella, + ) + | None => Err(format!( + "data_column_sidecar topic invalid for given fork digest {:?}", + gossip_topic.fork_digest + )), + } + } GossipKind::VoluntaryExit => { let voluntary_exit = SignedVoluntaryExit::from_ssz_bytes(data) .map_err(|e| format!("{:?}", e))?; @@ -373,6 +413,7 @@ impl PubsubMessage { match &self { PubsubMessage::BeaconBlock(data) => data.as_ssz_bytes(), PubsubMessage::BlobSidecar(data) => data.1.as_ssz_bytes(), + PubsubMessage::DataColumnSidecar(data) => data.1.as_ssz_bytes(), PubsubMessage::AggregateAndProofAttestation(data) => data.as_ssz_bytes(), PubsubMessage::VoluntaryExit(data) => data.as_ssz_bytes(), PubsubMessage::ProposerSlashing(data) => data.as_ssz_bytes(), @@ -402,6 +443,12 @@ impl std::fmt::Display for PubsubMessage { data.1.slot(), data.1.index, ), + PubsubMessage::DataColumnSidecar(data) => write!( + f, + "DataColumnSidecar: slot: {}, column index: {}", + data.1.slot(), + data.1.index, + ), PubsubMessage::AggregateAndProofAttestation(att) => write!( f, "Aggregate and Proof: slot: {}, index: {:?}, aggregator_index: {}", diff --git a/beacon_node/lighthouse_network/src/types/subnet.rs b/beacon_node/lighthouse_network/src/types/subnet.rs index 50d28542bec..1892dcc83af 100644 --- a/beacon_node/lighthouse_network/src/types/subnet.rs +++ b/beacon_node/lighthouse_network/src/types/subnet.rs @@ -1,6 +1,6 @@ use serde::Serialize; use std::time::Instant; -use types::{SubnetId, SyncSubnetId}; +use types::{DataColumnSubnetId, SubnetId, SyncSubnetId}; /// Represents a subnet on an attestation or sync committee `SubnetId`. /// @@ -12,6 +12,8 @@ pub enum Subnet { Attestation(SubnetId), /// Represents a gossipsub sync committee subnet and the metadata `syncnets` field. SyncCommittee(SyncSubnetId), + /// Represents a gossipsub data column subnet. + DataColumn(DataColumnSubnetId), } /// A subnet to discover peers on along with the instant after which it's no longer useful. diff --git a/beacon_node/lighthouse_network/src/types/topics.rs b/beacon_node/lighthouse_network/src/types/topics.rs index c5f4b0c9ebb..174787f999c 100644 --- a/beacon_node/lighthouse_network/src/types/topics.rs +++ b/beacon_node/lighthouse_network/src/types/topics.rs @@ -1,7 +1,7 @@ use gossipsub::{IdentTopic as Topic, TopicHash}; use serde::{Deserialize, Serialize}; use strum::AsRefStr; -use types::{ChainSpec, EthSpec, ForkName, SubnetId, SyncSubnetId, Unsigned}; +use types::{ChainSpec, DataColumnSubnetId, EthSpec, ForkName, SubnetId, SyncSubnetId, Unsigned}; use crate::Subnet; @@ -14,6 +14,7 @@ pub const BEACON_BLOCK_TOPIC: &str = "beacon_block"; pub const BEACON_AGGREGATE_AND_PROOF_TOPIC: &str = "beacon_aggregate_and_proof"; pub const BEACON_ATTESTATION_PREFIX: &str = "beacon_attestation_"; pub const BLOB_SIDECAR_PREFIX: &str = "blob_sidecar_"; +pub const DATA_COLUMN_SIDECAR_PREFIX: &str = "data_column_sidecar_"; pub const VOLUNTARY_EXIT_TOPIC: &str = "voluntary_exit"; pub const PROPOSER_SLASHING_TOPIC: &str = "proposer_slashing"; pub const ATTESTER_SLASHING_TOPIC: &str = "attester_slashing"; @@ -112,6 +113,8 @@ pub enum GossipKind { BeaconAggregateAndProof, /// Topic for publishing BlobSidecars. BlobSidecar(u64), + /// Topic for publishing DataColumnSidecars. + DataColumnSidecar(DataColumnSubnetId), /// Topic for publishing raw attestations on a particular subnet. #[strum(serialize = "beacon_attestation")] Attestation(SubnetId), @@ -144,6 +147,9 @@ impl std::fmt::Display for GossipKind { GossipKind::BlobSidecar(blob_index) => { write!(f, "{}{}", BLOB_SIDECAR_PREFIX, blob_index) } + GossipKind::DataColumnSidecar(column_index) => { + write!(f, "{}{}", DATA_COLUMN_SIDECAR_PREFIX, **column_index) + } x => f.write_str(x.as_ref()), } } @@ -231,6 +237,7 @@ impl GossipTopic { match self.kind() { GossipKind::Attestation(subnet_id) => Some(Subnet::Attestation(*subnet_id)), GossipKind::SyncCommitteeMessage(subnet_id) => Some(Subnet::SyncCommittee(*subnet_id)), + GossipKind::DataColumnSidecar(subnet_id) => Some(Subnet::DataColumn(*subnet_id)), _ => None, } } @@ -269,6 +276,9 @@ impl std::fmt::Display for GossipTopic { GossipKind::BlobSidecar(blob_index) => { format!("{}{}", BLOB_SIDECAR_PREFIX, blob_index) } + GossipKind::DataColumnSidecar(index) => { + format!("{}{}", DATA_COLUMN_SIDECAR_PREFIX, *index) + } GossipKind::BlsToExecutionChange => BLS_TO_EXECUTION_CHANGE_TOPIC.into(), GossipKind::LightClientFinalityUpdate => LIGHT_CLIENT_FINALITY_UPDATE.into(), GossipKind::LightClientOptimisticUpdate => LIGHT_CLIENT_OPTIMISTIC_UPDATE.into(), @@ -289,6 +299,7 @@ impl From for GossipKind { match subnet_id { Subnet::Attestation(s) => GossipKind::Attestation(s), Subnet::SyncCommittee(s) => GossipKind::SyncCommitteeMessage(s), + Subnet::DataColumn(s) => GossipKind::DataColumnSidecar(s), } } } @@ -312,6 +323,10 @@ fn subnet_topic_index(topic: &str) -> Option { ))); } else if let Some(index) = topic.strip_prefix(BLOB_SIDECAR_PREFIX) { return Some(GossipKind::BlobSidecar(index.parse::().ok()?)); + } else if let Some(index) = topic.strip_prefix(DATA_COLUMN_SIDECAR_PREFIX) { + return Some(GossipKind::DataColumnSidecar(DataColumnSubnetId::new( + index.parse::().ok()?, + ))); } None } diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index f0dba8d9655..0fadb51edb2 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -72,6 +72,10 @@ lazy_static! { "beacon_processor_gossip_blob_verified_total", "Total number of gossip blob verified for propagation." ); + pub static ref BEACON_PROCESSOR_GOSSIP_DATA_COLUMN_SIDECAR_VERIFIED_TOTAL: Result = try_create_int_counter( + "beacon_processor_gossip_data_column_verified_total", + "Total number of gossip data column sidecar verified for propagation." + ); // Gossip Exits. pub static ref BEACON_PROCESSOR_EXIT_VERIFIED_TOTAL: Result = try_create_int_counter( "beacon_processor_exit_verified_total", @@ -357,6 +361,22 @@ lazy_static! { "Count of times when a gossip blob arrived from the network later than the attestation deadline.", ); + pub static ref BEACON_DATA_COLUMN_DELAY_GOSSIP: Result = try_create_int_gauge( + "beacon_data_column_delay_gossip_last_delay", + "The first time we see this data column as a delay from the start of the slot" + ); + + pub static ref BEACON_DATA_COLUMN_DELAY_GOSSIP_VERIFICATION: Result = try_create_int_gauge( + "beacon_data_column_delay_gossip_verification", + "Keeps track of the time delay from the start of the slot to the point we propagate the data column" + ); + + pub static ref BEACON_DATA_COLUMN_DELAY_FULL_VERIFICATION: Result = try_create_int_gauge( + "beacon_data_column_last_full_verification_delay", + "The time it takes to verify a beacon data column" + ); + + /* * Light client update reprocessing queue metrics. */ diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index ab250532587..f1f501e225b 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -6,6 +6,7 @@ use crate::{ }; use beacon_chain::blob_verification::{GossipBlobError, GossipVerifiedBlob}; use beacon_chain::block_verification_types::AsBlock; +use beacon_chain::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn}; use beacon_chain::store::Error; use beacon_chain::{ attestation_verification::{self, Error as AttnError, VerifiedAttestation}, @@ -32,8 +33,9 @@ use store::hot_cold_store::HotColdDBError; use tokio::sync::mpsc; use types::{ beacon_block::BlockImportSource, Attestation, AttestationRef, AttesterSlashing, BlobSidecar, - EthSpec, Hash256, IndexedAttestation, LightClientFinalityUpdate, LightClientOptimisticUpdate, - ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedBlsToExecutionChange, + DataColumnSidecar, DataColumnSubnetId, EthSpec, Hash256, IndexedAttestation, + LightClientFinalityUpdate, LightClientOptimisticUpdate, ProposerSlashing, + SignedAggregateAndProof, SignedBeaconBlock, SignedBlsToExecutionChange, SignedContributionAndProof, SignedVoluntaryExit, Slot, SubnetId, SyncCommitteeMessage, SyncSubnetId, }; @@ -599,6 +601,140 @@ impl NetworkBeaconProcessor { } } + pub async fn process_gossip_data_column_sidecar( + self: &Arc, + message_id: MessageId, + peer_id: PeerId, + _peer_client: Client, + subnet_id: DataColumnSubnetId, + column_sidecar: Arc>, + seen_duration: Duration, + ) { + let slot = column_sidecar.slot(); + let block_root = column_sidecar.block_root(); + let index = column_sidecar.index; + let delay = get_slot_delay_ms(seen_duration, slot, &self.chain.slot_clock); + // Log metrics to track delay from other nodes on the network. + metrics::set_gauge( + &metrics::BEACON_DATA_COLUMN_DELAY_GOSSIP, + delay.as_millis() as i64, + ); + match self + .chain + .verify_data_column_sidecar_for_gossip(column_sidecar, *subnet_id) + { + Ok(gossip_verified_data_column) => { + metrics::inc_counter( + &metrics::BEACON_PROCESSOR_GOSSIP_DATA_COLUMN_SIDECAR_VERIFIED_TOTAL, + ); + + debug!( + self.log, + "Successfully verified gossip data column sidecar"; + "slot" => %slot, + "block_root" => %block_root, + "index" => %index, + ); + + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept); + + // Log metrics to keep track of propagation delay times. + if let Some(duration) = SystemTime::now() + .duration_since(UNIX_EPOCH) + .ok() + .and_then(|now| now.checked_sub(seen_duration)) + { + metrics::set_gauge( + &metrics::BEACON_DATA_COLUMN_DELAY_GOSSIP_VERIFICATION, + duration.as_millis() as i64, + ); + } + self.process_gossip_verified_data_column( + peer_id, + gossip_verified_data_column, + seen_duration, + ) + .await + } + Err(err) => { + match err { + GossipDataColumnError::ParentUnknown(column) => { + debug!( + self.log, + "Unknown parent hash for column"; + "action" => "requesting parent", + "block_root" => %column.block_root(), + "parent_root" => %column.block_parent_root(), + ); + self.send_sync_message(SyncMessage::UnknownParentDataColumn( + peer_id, column, + )); + } + GossipDataColumnError::KzgNotInitialized + | GossipDataColumnError::PubkeyCacheTimeout + | GossipDataColumnError::BeaconChainError(_) => { + crit!( + self.log, + "Internal error when verifying column sidecar"; + "error" => ?err, + ) + } + GossipDataColumnError::ProposalSignatureInvalid + | GossipDataColumnError::UnknownValidator(_) + | GossipDataColumnError::ProposerIndexMismatch { .. } + | GossipDataColumnError::IsNotLaterThanParent { .. } + | GossipDataColumnError::InvalidSubnetId { .. } + | GossipDataColumnError::InvalidInclusionProof { .. } + | GossipDataColumnError::InvalidKzgProof { .. } + | GossipDataColumnError::NotFinalizedDescendant { .. } => { + debug!( + self.log, + "Could not verify column sidecar for gossip. Rejecting the column sidecar"; + "error" => ?err, + "slot" => %slot, + "block_root" => %block_root, + "index" => %index, + ); + // Prevent recurring behaviour by penalizing the peer slightly. + self.gossip_penalize_peer( + peer_id, + PeerAction::LowToleranceError, + "gossip_data_column_low", + ); + self.propagate_validation_result( + message_id, + peer_id, + MessageAcceptance::Reject, + ); + } + GossipDataColumnError::FutureSlot { .. } + | GossipDataColumnError::PriorKnown { .. } + | GossipDataColumnError::PastFinalizedSlot { .. } => { + debug!( + self.log, + "Could not verify column sidecar for gossip. Ignoring the column sidecar"; + "error" => ?err, + "slot" => %slot, + "block_root" => %block_root, + "index" => %index, + ); + // Prevent recurring behaviour by penalizing the peer slightly. + self.gossip_penalize_peer( + peer_id, + PeerAction::HighToleranceError, + "gossip_data_column_high", + ); + self.propagate_validation_result( + message_id, + peer_id, + MessageAcceptance::Ignore, + ); + } + } + } + } + } + #[allow(clippy::too_many_arguments)] pub async fn process_gossip_blob( self: &Arc, @@ -837,6 +973,81 @@ impl NetworkBeaconProcessor { } } + pub async fn process_gossip_verified_data_column( + self: &Arc, + peer_id: PeerId, + verified_data_column: GossipVerifiedDataColumn, + // This value is not used presently, but it might come in handy for debugging. + _seen_duration: Duration, + ) { + let processing_start_time = Instant::now(); + let block_root = verified_data_column.block_root(); + let data_column_slot = verified_data_column.slot(); + let data_column_index = verified_data_column.id().index; + + match self + .chain + .process_gossip_data_columns(vec![verified_data_column]) + .await + { + Ok(availability) => { + match availability { + AvailabilityProcessingStatus::Imported(block_root) => { + // Note: Reusing block imported metric here + metrics::inc_counter( + &metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL, + ); + info!( + self.log, + "Gossipsub data column processed, imported fully available block"; + "block_root" => %block_root + ); + self.chain.recompute_head_at_current_slot().await; + + metrics::set_gauge( + &metrics::BEACON_BLOB_DELAY_FULL_VERIFICATION, + processing_start_time.elapsed().as_millis() as i64, + ); + } + AvailabilityProcessingStatus::MissingComponents(slot, block_root) => { + trace!( + self.log, + "Processed data column, waiting for other components"; + "slot" => %slot, + "data_column_index" => %data_column_index, + "block_root" => %block_root, + ); + + // Potentially trigger reconstruction + } + } + } + Err(BlockError::BlockIsAlreadyKnown(_)) => { + debug!( + self.log, + "Ignoring gossip column already imported"; + "block_root" => ?block_root, + "data_column_index" => data_column_index, + ); + } + Err(err) => { + debug!( + self.log, + "Invalid gossip data column"; + "outcome" => ?err, + "block root" => ?block_root, + "block slot" => data_column_slot, + "data column index" => data_column_index, + ); + self.gossip_penalize_peer( + peer_id, + PeerAction::MidToleranceError, + "bad_gossip_data_column_ssz", + ); + } + } + } + /// Process the beacon block received from the gossip network and: /// /// - If it passes gossip propagation criteria, tell the network thread to forward it. @@ -1086,6 +1297,12 @@ impl NetworkBeaconProcessor { ); return None; } + Err(e @ BlockError::InternalError(_)) => { + error!(self.log, "Internal block gossip validation error"; + "error" => %e + ); + return None; + } }; metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_VERIFIED_TOTAL); diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index ccdbb10720c..ffb01a99efb 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -223,6 +223,36 @@ impl NetworkBeaconProcessor { }) } + /// Create a new `Work` event for some data column sidecar. + pub fn send_gossip_data_column_sidecar( + self: &Arc, + message_id: MessageId, + peer_id: PeerId, + peer_client: Client, + subnet_id: DataColumnSubnetId, + column_sidecar: Arc>, + seen_timestamp: Duration, + ) -> Result<(), Error> { + let processor = self.clone(); + let process_fn = async move { + processor + .process_gossip_data_column_sidecar( + message_id, + peer_id, + peer_client, + subnet_id, + column_sidecar, + seen_timestamp, + ) + .await + }; + + self.try_send(BeaconWorkEvent { + drop_during_sync: false, + work: Work::GossipDataColumnSidecar(Box::pin(process_fn)), + }) + } + /// Create a new `Work` event for some sync committee signature. pub fn send_gossip_sync_signature( self: &Arc, diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index e125c13f4c2..c162d52d026 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -319,6 +319,20 @@ impl Router { ), ) } + PubsubMessage::DataColumnSidecar(data) => { + let (subnet_id, column_sidecar) = *data; + self.handle_beacon_processor_send_result( + self.network_beacon_processor + .send_gossip_data_column_sidecar( + message_id, + peer_id, + self.network_globals.client(&peer_id), + subnet_id, + column_sidecar, + timestamp_now(), + ), + ) + } PubsubMessage::VoluntaryExit(exit) => { debug!(self.log, "Received a voluntary exit"; "peer_id" => %peer_id); self.handle_beacon_processor_send_result( diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index dd4fa56d537..8b7cef7494a 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -63,7 +63,7 @@ use std::ops::Sub; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; -use types::{BlobSidecar, EthSpec, Hash256, SignedBeaconBlock, Slot}; +use types::{BlobSidecar, DataColumnSidecar, EthSpec, Hash256, SignedBeaconBlock, Slot}; /// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync /// from a peer. If a peer is within this tolerance (forwards or backwards), it is treated as a @@ -107,6 +107,9 @@ pub enum SyncMessage { /// A blob with an unknown parent has been received. UnknownParentBlob(PeerId, Arc>), + /// A data column with an unknown parent has been received. + UnknownParentDataColumn(PeerId, Arc>), + /// A peer has sent an attestation that references a block that is unknown. This triggers the /// manager to attempt to find the block matching the unknown hash. UnknownBlockHashFromAttestation(PeerId, Hash256), @@ -646,6 +649,10 @@ impl SyncManager { }), ); } + SyncMessage::UnknownParentDataColumn(_peer_id, _data_column) => { + // TODO(das): data column parent lookup to be implemented + unimplemented!("data column parent lookup to be implemented") + } SyncMessage::UnknownBlockHashFromAttestation(peer_id, block_root) => { if !self.notified_unknown_roots.contains(&(peer_id, block_root)) { self.notified_unknown_roots.insert((peer_id, block_root)); diff --git a/consensus/types/src/data_column_subnet_id.rs b/consensus/types/src/data_column_subnet_id.rs new file mode 100644 index 00000000000..cf5b1dd549d --- /dev/null +++ b/consensus/types/src/data_column_subnet_id.rs @@ -0,0 +1,198 @@ +//! Identifies each data column subnet by an integer identifier. +use crate::data_column_sidecar::ColumnIndex; +use crate::{ChainSpec, EthSpec}; +use ethereum_types::U256; +use itertools::Itertools; +use safe_arith::{ArithError, SafeArith}; +use serde::{Deserialize, Serialize}; +use std::collections::HashSet; +use std::fmt::{self, Display}; +use std::ops::{Deref, DerefMut}; + +#[derive(arbitrary::Arbitrary, Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(transparent)] +pub struct DataColumnSubnetId(#[serde(with = "serde_utils::quoted_u64")] u64); + +impl DataColumnSubnetId { + pub fn new(id: u64) -> Self { + id.into() + } + + pub fn from_column_index(column_index: usize, spec: &ChainSpec) -> Self { + (column_index + .safe_rem(spec.data_column_sidecar_subnet_count as usize) + .expect( + "data_column_sidecar_subnet_count should never be zero if this function is called", + ) as u64) + .into() + } + + #[allow(clippy::arithmetic_side_effects)] + pub fn columns(&self, spec: &ChainSpec) -> impl Iterator { + let subnet = self.0; + let data_column_sidecar_subnet = spec.data_column_sidecar_subnet_count; + let columns_per_subnet = spec.data_columns_per_subnet() as u64; + (0..columns_per_subnet).map(move |i| data_column_sidecar_subnet * i + subnet) + } + + /// Compute required subnets to subscribe to given the node id. + #[allow(clippy::arithmetic_side_effects)] + pub fn compute_custody_subnets( + node_id: U256, + custody_subnet_count: u64, + spec: &ChainSpec, + ) -> impl Iterator { + // TODO(das): we could perform check on `custody_subnet_count` here to ensure that it is a valid + // value, but here we assume it is valid. + + let mut subnets: HashSet = HashSet::new(); + let mut current_id = node_id; + while (subnets.len() as u64) < custody_subnet_count { + let mut node_id_bytes = [0u8; 32]; + current_id.to_little_endian(&mut node_id_bytes); + let hash = ethereum_hashing::hash_fixed(&node_id_bytes); + let hash_prefix = [ + hash[0], hash[1], hash[2], hash[3], hash[4], hash[5], hash[6], hash[7], + ]; + let hash_prefix_u64 = u64::from_le_bytes(hash_prefix); + let subnet = hash_prefix_u64 % spec.data_column_sidecar_subnet_count; + + if !subnets.contains(&subnet) { + subnets.insert(subnet); + } + + if current_id == U256::MAX { + current_id = U256::zero() + } + current_id += U256::one() + } + subnets.into_iter().map(DataColumnSubnetId::new) + } + + pub fn compute_custody_columns( + node_id: U256, + custody_subnet_count: u64, + spec: &ChainSpec, + ) -> impl Iterator { + Self::compute_custody_subnets::(node_id, custody_subnet_count, spec) + .flat_map(|subnet| subnet.columns::(spec)) + .sorted() + } +} + +impl Display for DataColumnSubnetId { + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + write!(f, "{}", self.0) + } +} + +impl Deref for DataColumnSubnetId { + type Target = u64; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for DataColumnSubnetId { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl From for DataColumnSubnetId { + fn from(x: u64) -> Self { + Self(x) + } +} + +impl From for u64 { + fn from(val: DataColumnSubnetId) -> Self { + val.0 + } +} + +impl From<&DataColumnSubnetId> for u64 { + fn from(val: &DataColumnSubnetId) -> Self { + val.0 + } +} + +#[derive(Debug)] +pub enum Error { + ArithError(ArithError), +} + +impl From for Error { + fn from(e: ArithError) -> Self { + Error::ArithError(e) + } +} + +#[cfg(test)] +mod test { + use crate::data_column_subnet_id::DataColumnSubnetId; + use crate::EthSpec; + use crate::MainnetEthSpec; + + type E = MainnetEthSpec; + + #[test] + fn test_compute_subnets_for_data_column() { + let spec = E::default_spec(); + let node_ids = [ + "0", + "88752428858350697756262172400162263450541348766581994718383409852729519486397", + "18732750322395381632951253735273868184515463718109267674920115648614659369468", + "27726842142488109545414954493849224833670205008410190955613662332153332462900", + "39755236029158558527862903296867805548949739810920318269566095185775868999998", + "31899136003441886988955119620035330314647133604576220223892254902004850516297", + "58579998103852084482416614330746509727562027284701078483890722833654510444626", + "28248042035542126088870192155378394518950310811868093527036637864276176517397", + "60930578857433095740782970114409273483106482059893286066493409689627770333527", + "103822458477361691467064888613019442068586830412598673713899771287914656699997", + ] + .into_iter() + .map(|v| ethereum_types::U256::from_dec_str(v).unwrap()) + .collect::>(); + + let custody_requirement = 4; + for node_id in node_ids { + let computed_subnets = DataColumnSubnetId::compute_custody_subnets::( + node_id, + custody_requirement, + &spec, + ); + let computed_subnets: Vec<_> = computed_subnets.collect(); + + // the number of subnets is equal to the custody requirement + assert_eq!(computed_subnets.len() as u64, custody_requirement); + + let subnet_count = spec.data_column_sidecar_subnet_count; + for subnet in computed_subnets { + let columns: Vec<_> = subnet.columns::(&spec).collect(); + // the number of columns is equal to the specified number of columns per subnet + assert_eq!(columns.len(), spec.data_columns_per_subnet()); + + for pair in columns.windows(2) { + // each successive column index is offset by the number of subnets + assert_eq!(pair[1] - pair[0], subnet_count); + } + } + } + } + + #[test] + fn test_columns_subnet_conversion() { + let spec = E::default_spec(); + for subnet in 0..spec.data_column_sidecar_subnet_count { + let subnet_id = DataColumnSubnetId::new(subnet); + for column_index in subnet_id.columns::(&spec) { + assert_eq!( + subnet_id, + DataColumnSubnetId::from_column_index::(column_index as usize, &spec) + ); + } + } + } +} diff --git a/consensus/types/src/lib.rs b/consensus/types/src/lib.rs index b5c500f0b22..5a60508c24c 100644 --- a/consensus/types/src/lib.rs +++ b/consensus/types/src/lib.rs @@ -105,6 +105,7 @@ pub mod sqlite; pub mod blob_sidecar; pub mod data_column_sidecar; +pub mod data_column_subnet_id; pub mod light_client_header; pub mod non_zero_usize; pub mod runtime_var_list; @@ -147,6 +148,10 @@ pub use crate::config_and_preset::{ }; pub use crate::consolidation::Consolidation; pub use crate::contribution_and_proof::ContributionAndProof; +pub use crate::data_column_sidecar::{ + ColumnIndex, DataColumnIdentifier, DataColumnSidecar, DataColumnSidecarList, +}; +pub use crate::data_column_subnet_id::DataColumnSubnetId; pub use crate::deposit::{Deposit, DEPOSIT_TREE_DEPTH}; pub use crate::deposit_data::DepositData; pub use crate::deposit_message::DepositMessage; diff --git a/lcli/src/generate_bootnode_enr.rs b/lcli/src/generate_bootnode_enr.rs index 52960b929d8..77c7ed9ffee 100644 --- a/lcli/src/generate_bootnode_enr.rs +++ b/lcli/src/generate_bootnode_enr.rs @@ -37,6 +37,7 @@ pub fn run(matches: &ArgMatches) -> Result<(), String> { next_fork_version: genesis_fork_version, next_fork_epoch: Epoch::max_value(), // FAR_FUTURE_EPOCH }; + let enr = build_enr::(&enr_key, &config, &enr_fork_id) .map_err(|e| format!("Unable to create ENR: {:?}", e))?; From 8e0fa1b789a8f714ce3b21895229850a1dd8b319 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Tue, 23 Jul 2024 00:38:16 +1000 Subject: [PATCH 2/7] Remove gossip verification changes (#5783). --- beacon_node/beacon_chain/src/beacon_chain.rs | 6 +- beacon_node/beacon_chain/src/builder.rs | 4 +- .../src/data_column_verification.rs | 253 +--------- beacon_node/beacon_chain/src/errors.rs | 2 +- beacon_node/beacon_chain/src/lib.rs | 2 +- .../src/observed_data_sidecars.rs | 474 ------------------ .../gossip_methods.rs | 79 +-- 7 files changed, 14 insertions(+), 806 deletions(-) delete mode 100644 beacon_node/beacon_chain/src/observed_data_sidecars.rs diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index b188ef5f222..425193f3ded 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -52,8 +52,8 @@ use crate::observed_aggregates::{ use crate::observed_attesters::{ ObservedAggregators, ObservedAttesters, ObservedSyncAggregators, ObservedSyncContributors, }; +use crate::observed_blob_sidecars::ObservedBlobSidecars; use crate::observed_block_producers::ObservedBlockProducers; -use crate::observed_data_sidecars::ObservedDataSidecars; use crate::observed_operations::{ObservationOutcome, ObservedOperations}; use crate::observed_slashable::ObservedSlashable; use crate::persisted_beacon_chain::{PersistedBeaconChain, DUMMY_CANONICAL_HEAD_BLOCK_ROOT}; @@ -427,9 +427,7 @@ pub struct BeaconChain { /// Maintains a record of which validators have proposed blocks for each slot. pub observed_block_producers: RwLock>, /// Maintains a record of blob sidecars seen over the gossip network. - pub observed_blob_sidecars: RwLock>>, - /// Maintains a record of column sidecars seen over the gossip network. - pub observed_column_sidecars: RwLock>>, + pub observed_blob_sidecars: RwLock>, /// Maintains a record of slashable message seen over the gossip network or RPC. pub observed_slashable: RwLock>, /// Maintains a record of which validators have submitted voluntary exits. diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 637538b6f3e..7217f2c640f 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -11,7 +11,6 @@ use crate::graffiti_calculator::{GraffitiCalculator, GraffitiOrigin}; use crate::head_tracker::HeadTracker; use crate::light_client_server_cache::LightClientServerCache; use crate::migrate::{BackgroundMigrator, MigratorConfig}; -use crate::observed_data_sidecars::ObservedDataSidecars; use crate::persisted_beacon_chain::PersistedBeaconChain; use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache}; use crate::timeout_rw_lock::TimeoutRwLock; @@ -919,8 +918,7 @@ where observed_sync_aggregators: <_>::default(), // TODO: allow for persisting and loading the pool from disk. observed_block_producers: <_>::default(), - observed_column_sidecars: RwLock::new(ObservedDataSidecars::new(self.spec.clone())), - observed_blob_sidecars: RwLock::new(ObservedDataSidecars::new(self.spec.clone())), + observed_blob_sidecars: <_>::default(), observed_slashable: <_>::default(), observed_voluntary_exits: <_>::default(), observed_proposer_slashings: <_>::default(), diff --git a/beacon_node/beacon_chain/src/data_column_verification.rs b/beacon_node/beacon_chain/src/data_column_verification.rs index d2ae1deeef5..2e09c19d74a 100644 --- a/beacon_node/beacon_chain/src/data_column_verification.rs +++ b/beacon_node/beacon_chain/src/data_column_verification.rs @@ -1,21 +1,13 @@ -use crate::block_verification::{ - cheap_state_advance_to_obtain_committees, get_validator_pubkey_cache, process_block_slash_info, - BlockSlashInfo, -}; +use crate::block_verification::{process_block_slash_info, BlockSlashInfo}; use crate::{BeaconChain, BeaconChainError, BeaconChainTypes}; use derivative::Derivative; -use fork_choice::ProtoBlock; use kzg::{Error as KzgError, Kzg}; -use proto_array::Block; -use slasher::test_utils::E; -use slog::debug; -use slot_clock::SlotClock; use ssz_derive::{Decode, Encode}; use std::sync::Arc; use types::data_column_sidecar::{ColumnIndex, DataColumnIdentifier}; use types::{ - BeaconStateError, ChainSpec, DataColumnSidecar, DataColumnSubnetId, EthSpec, Hash256, - RuntimeVariableList, SignedBeaconBlockHeader, Slot, + BeaconStateError, DataColumnSidecar, EthSpec, Hash256, RuntimeVariableList, + SignedBeaconBlockHeader, Slot, }; /// An error occurred while validating a gossip data column. @@ -240,257 +232,24 @@ pub fn verify_kzg_for_data_column_list<'a, E: EthSpec, I>( where I: Iterator>> + Clone, { - // TODO(das): KZG verification to be implemented + // TODO(das): implement KZG verification Ok(()) } pub fn validate_data_column_sidecar_for_gossip( data_column: Arc>, - subnet: u64, + _subnet: u64, chain: &BeaconChain, ) -> Result, GossipDataColumnError> { - let column_slot = data_column.slot(); - - verify_index_matches_subnet(&data_column, subnet, &chain.spec)?; - verify_sidecar_not_from_future_slot(chain, column_slot)?; - verify_slot_greater_than_latest_finalized_slot(chain, column_slot)?; - verify_is_first_sidecar(chain, &data_column)?; - verify_column_inclusion_proof(&data_column)?; - let parent_block = verify_parent_block_and_finalized_descendant(data_column.clone(), chain)?; - verify_slot_higher_than_parent(&parent_block, column_slot)?; - verify_proposer_and_signature(&data_column, &parent_block, chain)?; - + // TODO(das): implement gossip verification let kzg = chain .kzg .clone() .ok_or(GossipDataColumnError::KzgNotInitialized)?; let kzg_verified_data_column = verify_kzg_for_data_column(data_column.clone(), &kzg) .map_err(GossipDataColumnError::InvalidKzgProof)?; - - chain - .observed_slashable - .write() - .observe_slashable( - column_slot, - data_column.block_proposer_index(), - data_column.block_root(), - ) - .map_err(|e| GossipDataColumnError::BeaconChainError(e.into()))?; - Ok(GossipVerifiedDataColumn { block_root: data_column.block_root(), data_column: kzg_verified_data_column, }) } - -// Verify that this is the first column sidecar received for the tuple: -// (block_header.slot, block_header.proposer_index, column_sidecar.index) -fn verify_is_first_sidecar( - chain: &BeaconChain, - data_column: &DataColumnSidecar, -) -> Result<(), GossipDataColumnError> { - if chain - .observed_column_sidecars - .read() - .proposer_is_known(data_column) - .map_err(|e| GossipDataColumnError::BeaconChainError(e.into()))? - { - return Err(GossipDataColumnError::PriorKnown { - proposer: data_column.block_proposer_index(), - slot: data_column.slot(), - index: data_column.index, - }); - } - Ok(()) -} - -fn verify_column_inclusion_proof( - _data_column: &DataColumnSidecar, -) -> Result<(), GossipDataColumnError> { - // TODO(das): to be implemented - Ok(()) -} - -fn verify_slot_higher_than_parent( - parent_block: &Block, - data_column_slot: Slot, -) -> Result<(), GossipDataColumnError> { - if parent_block.slot >= data_column_slot { - return Err(GossipDataColumnError::IsNotLaterThanParent { - data_column_slot, - parent_slot: parent_block.slot, - }); - } - Ok(()) -} - -fn verify_parent_block_and_finalized_descendant( - data_column: Arc>, - chain: &BeaconChain, -) -> Result> { - let fork_choice = chain.canonical_head.fork_choice_read_lock(); - - // We have already verified that the column is past finalization, so we can - // just check fork choice for the block's parent. - let block_parent_root = data_column.block_parent_root(); - let Some(parent_block) = fork_choice.get_block(&block_parent_root) else { - return Err(GossipDataColumnError::ParentUnknown(data_column.clone())); - }; - - // Do not process a column that does not descend from the finalized root. - // We just loaded the parent_block, so we can be sure that it exists in fork choice. - if !fork_choice.is_finalized_checkpoint_or_descendant(block_parent_root) { - return Err(GossipDataColumnError::NotFinalizedDescendant { block_parent_root }); - } - - Ok(parent_block) -} - -fn verify_proposer_and_signature( - data_column: &DataColumnSidecar, - parent_block: &ProtoBlock, - chain: &BeaconChain, -) -> Result<(), GossipDataColumnError> { - let column_slot = data_column.slot(); - let column_epoch = column_slot.epoch(E::slots_per_epoch()); - let column_index = data_column.index; - let block_root = data_column.block_root(); - let block_parent_root = data_column.block_parent_root(); - - let proposer_shuffling_root = - if parent_block.slot.epoch(T::EthSpec::slots_per_epoch()) == column_epoch { - parent_block - .next_epoch_shuffling_id - .shuffling_decision_block - } else { - parent_block.root - }; - - let proposer_opt = chain - .beacon_proposer_cache - .lock() - .get_slot::(proposer_shuffling_root, column_slot); - - let (proposer_index, fork) = if let Some(proposer) = proposer_opt { - (proposer.index, proposer.fork) - } else { - debug!( - chain.log, - "Proposer shuffling cache miss for column verification"; - "block_root" => %block_root, - "index" => %column_index, - ); - let (parent_state_root, mut parent_state) = chain - .store - .get_advanced_hot_state(block_parent_root, column_slot, parent_block.state_root) - .map_err(|e| GossipDataColumnError::BeaconChainError(e.into()))? - .ok_or_else(|| { - BeaconChainError::DBInconsistent(format!( - "Missing state for parent block {block_parent_root:?}", - )) - })?; - - let state = cheap_state_advance_to_obtain_committees::<_, GossipDataColumnError>( - &mut parent_state, - Some(parent_state_root), - column_slot, - &chain.spec, - )?; - - let proposers = state.get_beacon_proposer_indices(&chain.spec)?; - let proposer_index = *proposers - .get(column_slot.as_usize() % T::EthSpec::slots_per_epoch() as usize) - .ok_or_else(|| BeaconChainError::NoProposerForSlot(column_slot))?; - - // Prime the proposer shuffling cache with the newly-learned value. - chain.beacon_proposer_cache.lock().insert( - column_epoch, - proposer_shuffling_root, - proposers, - state.fork(), - )?; - (proposer_index, state.fork()) - }; - - // Signature verify the signed block header. - let signature_is_valid = { - let pubkey_cache = get_validator_pubkey_cache(chain) - .map_err(|_| GossipDataColumnError::PubkeyCacheTimeout)?; - - let pubkey = pubkey_cache - .get(proposer_index) - .ok_or_else(|| GossipDataColumnError::UnknownValidator(proposer_index as u64))?; - let signed_block_header = &data_column.signed_block_header; - signed_block_header.verify_signature::( - pubkey, - &fork, - chain.genesis_validators_root, - &chain.spec, - ) - }; - - if !signature_is_valid { - return Err(GossipDataColumnError::ProposalSignatureInvalid); - } - - let column_proposer_index = data_column.block_proposer_index(); - if proposer_index != column_proposer_index as usize { - return Err(GossipDataColumnError::ProposerIndexMismatch { - sidecar: column_proposer_index as usize, - local: proposer_index, - }); - } - - Ok(()) -} - -fn verify_index_matches_subnet( - data_column: &DataColumnSidecar, - subnet: u64, - spec: &ChainSpec, -) -> Result<(), GossipDataColumnError> { - let expected_subnet: u64 = - DataColumnSubnetId::from_column_index::(data_column.index as usize, spec).into(); - if expected_subnet != subnet { - return Err(GossipDataColumnError::InvalidSubnetId { - received: subnet, - expected: expected_subnet, - }); - } - Ok(()) -} - -fn verify_slot_greater_than_latest_finalized_slot( - chain: &BeaconChain, - column_slot: Slot, -) -> Result<(), GossipDataColumnError> { - let latest_finalized_slot = chain - .head() - .finalized_checkpoint() - .epoch - .start_slot(T::EthSpec::slots_per_epoch()); - if column_slot <= latest_finalized_slot { - return Err(GossipDataColumnError::PastFinalizedSlot { - column_slot, - finalized_slot: latest_finalized_slot, - }); - } - Ok(()) -} - -fn verify_sidecar_not_from_future_slot( - chain: &BeaconChain, - column_slot: Slot, -) -> Result<(), GossipDataColumnError> { - let latest_permissible_slot = chain - .slot_clock - .now_with_future_tolerance(chain.spec.maximum_gossip_clock_disparity()) - .ok_or(BeaconChainError::UnableToReadSlot)?; - if column_slot > latest_permissible_slot { - return Err(GossipDataColumnError::FutureSlot { - message_slot: column_slot, - latest_permissible_slot, - }); - } - Ok(()) -} diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 03e8d1a64ae..819de1f5c19 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -9,8 +9,8 @@ use crate::migrate::PruningError; use crate::naive_aggregation_pool::Error as NaiveAggregationError; use crate::observed_aggregates::Error as ObservedAttestationsError; use crate::observed_attesters::Error as ObservedAttestersError; +use crate::observed_blob_sidecars::Error as ObservedBlobSidecarsError; use crate::observed_block_producers::Error as ObservedBlockProducersError; -use crate::observed_data_sidecars::Error as ObservedBlobSidecarsError; use execution_layer::PayloadStatus; use fork_choice::ExecutionStatus; use futures::channel::mpsc::TrySendError; diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 2bdbc9d3e9f..e1d0f61c58c 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -42,8 +42,8 @@ pub mod migrate; mod naive_aggregation_pool; pub mod observed_aggregates; mod observed_attesters; +mod observed_blob_sidecars; pub mod observed_block_producers; -mod observed_data_sidecars; pub mod observed_operations; mod observed_slashable; pub mod otb_verification_service; diff --git a/beacon_node/beacon_chain/src/observed_data_sidecars.rs b/beacon_node/beacon_chain/src/observed_data_sidecars.rs deleted file mode 100644 index 59b343eaa7c..00000000000 --- a/beacon_node/beacon_chain/src/observed_data_sidecars.rs +++ /dev/null @@ -1,474 +0,0 @@ -//! Provides the `ObservedBlobSidecars` struct which allows for rejecting `BlobSidecar`s -//! that we have already seen over the gossip network. -//! Only `BlobSidecar`s that have completed proposer signature verification can be added -//! to this cache to reduce DoS risks. - -use crate::observed_block_producers::ProposalKey; -use std::collections::{HashMap, HashSet}; -use std::marker::PhantomData; -use types::{BlobSidecar, ChainSpec, DataColumnSidecar, EthSpec, Slot}; - -#[derive(Debug, PartialEq)] -pub enum Error { - /// The slot of the provided `ObservableSidecar` is prior to finalization and should not have been provided - /// to this function. This is an internal error. - FinalizedDataSidecar { slot: Slot, finalized_slot: Slot }, - /// The data sidecar contains an invalid index, the data sidecar is invalid. - /// Note: The invalid data should have been caught and flagged as an error much before reaching - /// here. - InvalidDataIndex(u64), -} - -pub trait ObservableDataSidecar { - fn slot(&self) -> Slot; - fn block_proposer_index(&self) -> u64; - fn index(&self) -> u64; - fn max_num_of_items(spec: &ChainSpec) -> usize; -} - -impl ObservableDataSidecar for BlobSidecar { - fn slot(&self) -> Slot { - self.slot() - } - - fn block_proposer_index(&self) -> u64 { - self.block_proposer_index() - } - - fn index(&self) -> u64 { - self.index - } - - fn max_num_of_items(_spec: &ChainSpec) -> usize { - E::max_blobs_per_block() - } -} - -impl ObservableDataSidecar for DataColumnSidecar { - fn slot(&self) -> Slot { - self.slot() - } - - fn block_proposer_index(&self) -> u64 { - self.block_proposer_index() - } - - fn index(&self) -> u64 { - self.index - } - - fn max_num_of_items(spec: &ChainSpec) -> usize { - spec.number_of_columns - } -} - -/// Maintains a cache of seen `ObservableSidecar`s that are received over gossip -/// and have been gossip verified. -/// -/// The cache supports pruning based upon the finalized epoch. It does not automatically prune, you -/// must call `Self::prune` manually. -/// -/// Note: To prevent DoS attacks, this cache must include only items that have received some DoS resistance -/// like checking the proposer signature. -pub struct ObservedDataSidecars { - finalized_slot: Slot, - /// Stores all received data indices for a given `(ValidatorIndex, Slot)` tuple. - items: HashMap>, - spec: ChainSpec, - _phantom: PhantomData, -} - -impl ObservedDataSidecars { - /// Instantiates `Self` with `finalized_slot == 0`. - pub fn new(spec: ChainSpec) -> Self { - Self { - finalized_slot: Slot::new(0), - items: HashMap::new(), - spec, - _phantom: PhantomData, - } - } - - /// Observe the `data_sidecar` at (`data_sidecar.block_proposer_index, data_sidecar.slot`). - /// This will update `self` so future calls to it indicate that this `data_sidecar` is known. - /// - /// The supplied `data_sidecar` **MUST** have completed proposer signature verification. - pub fn observe_sidecar(&mut self, data_sidecar: &T) -> Result { - self.sanitize_data_sidecar(data_sidecar)?; - - let data_indices = self - .items - .entry(ProposalKey { - slot: data_sidecar.slot(), - proposer: data_sidecar.block_proposer_index(), - }) - .or_insert_with(|| HashSet::with_capacity(T::max_num_of_items(&self.spec))); - let did_not_exist = data_indices.insert(data_sidecar.index()); - - Ok(!did_not_exist) - } - - /// Returns `true` if the `data_sidecar` has already been observed in the cache within the prune window. - pub fn proposer_is_known(&self, data_sidecar: &T) -> Result { - self.sanitize_data_sidecar(data_sidecar)?; - let is_known = self - .items - .get(&ProposalKey { - slot: data_sidecar.slot(), - proposer: data_sidecar.block_proposer_index(), - }) - .map_or(false, |indices| indices.contains(&data_sidecar.index())); - Ok(is_known) - } - - fn sanitize_data_sidecar(&self, data_sidecar: &T) -> Result<(), Error> { - if data_sidecar.index() >= T::max_num_of_items(&self.spec) as u64 { - return Err(Error::InvalidDataIndex(data_sidecar.index())); - } - let finalized_slot = self.finalized_slot; - if finalized_slot > 0 && data_sidecar.slot() <= finalized_slot { - return Err(Error::FinalizedDataSidecar { - slot: data_sidecar.slot(), - finalized_slot, - }); - } - - Ok(()) - } - - /// Prune `data_sidecar` observations for slots less than or equal to the given slot. - pub fn prune(&mut self, finalized_slot: Slot) { - if finalized_slot == 0 { - return; - } - - self.finalized_slot = finalized_slot; - self.items.retain(|k, _| k.slot > finalized_slot); - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::test_utils::test_spec; - use bls::Hash256; - use std::sync::Arc; - use types::MainnetEthSpec; - - type E = MainnetEthSpec; - - fn get_blob_sidecar(slot: u64, proposer_index: u64, index: u64) -> Arc> { - let mut blob_sidecar = BlobSidecar::empty(); - blob_sidecar.signed_block_header.message.slot = slot.into(); - blob_sidecar.signed_block_header.message.proposer_index = proposer_index; - blob_sidecar.index = index; - Arc::new(blob_sidecar) - } - - #[test] - fn pruning() { - let spec = test_spec::(); - let mut cache = ObservedDataSidecars::>::new(spec); - - assert_eq!(cache.finalized_slot, 0, "finalized slot is zero"); - assert_eq!(cache.items.len(), 0, "no slots should be present"); - - // Slot 0, index 0 - let proposer_index_a = 420; - let sidecar_a = get_blob_sidecar(0, proposer_index_a, 0); - - assert_eq!( - cache.observe_sidecar(&sidecar_a), - Ok(false), - "can observe proposer, indicates proposer unobserved" - ); - - /* - * Preconditions. - */ - - assert_eq!(cache.finalized_slot, 0, "finalized slot is zero"); - assert_eq!( - cache.items.len(), - 1, - "only one (validator_index, slot) tuple should be present" - ); - - let cached_blob_indices = cache - .items - .get(&ProposalKey::new(proposer_index_a, Slot::new(0))) - .expect("slot zero should be present"); - assert_eq!( - cached_blob_indices.len(), - 1, - "only one proposer should be present" - ); - - /* - * Check that a prune at the genesis slot does nothing. - */ - - cache.prune(Slot::new(0)); - - assert_eq!(cache.finalized_slot, 0, "finalized slot is zero"); - assert_eq!(cache.items.len(), 1, "only one slot should be present"); - let cached_blob_indices = cache - .items - .get(&ProposalKey::new(proposer_index_a, Slot::new(0))) - .expect("slot zero should be present"); - assert_eq!( - cached_blob_indices.len(), - 1, - "only one proposer should be present" - ); - - /* - * Check that a prune empties the cache - */ - - cache.prune(E::slots_per_epoch().into()); - assert_eq!( - cache.finalized_slot, - Slot::from(E::slots_per_epoch()), - "finalized slot is updated" - ); - assert_eq!(cache.items.len(), 0, "no items left"); - - /* - * Check that we can't insert a finalized sidecar - */ - - // First slot of finalized epoch - let block_b = get_blob_sidecar(E::slots_per_epoch(), 419, 0); - - assert_eq!( - cache.observe_sidecar(&block_b), - Err(Error::FinalizedDataSidecar { - slot: E::slots_per_epoch().into(), - finalized_slot: E::slots_per_epoch().into(), - }), - "cant insert finalized sidecar" - ); - - assert_eq!(cache.items.len(), 0, "sidecar was not added"); - - /* - * Check that we _can_ insert a non-finalized block - */ - - let three_epochs = E::slots_per_epoch() * 3; - - // First slot of finalized epoch - let proposer_index_b = 421; - let block_b = get_blob_sidecar(three_epochs, proposer_index_b, 0); - - assert_eq!( - cache.observe_sidecar(&block_b), - Ok(false), - "can insert non-finalized block" - ); - - assert_eq!(cache.items.len(), 1, "only one slot should be present"); - let cached_blob_indices = cache - .items - .get(&ProposalKey::new(proposer_index_b, Slot::new(three_epochs))) - .expect("the three epochs slot should be present"); - assert_eq!( - cached_blob_indices.len(), - 1, - "only one proposer should be present" - ); - - /* - * Check that a prune doesnt wipe later blocks - */ - - let two_epochs = E::slots_per_epoch() * 2; - cache.prune(two_epochs.into()); - - assert_eq!( - cache.finalized_slot, - Slot::from(two_epochs), - "finalized slot is updated" - ); - - assert_eq!(cache.items.len(), 1, "only one slot should be present"); - let cached_blob_indices = cache - .items - .get(&ProposalKey::new(proposer_index_b, Slot::new(three_epochs))) - .expect("the three epochs slot should be present"); - assert_eq!( - cached_blob_indices.len(), - 1, - "only one proposer should be present" - ); - } - - #[test] - fn simple_observations() { - let spec = test_spec::(); - let mut cache = ObservedDataSidecars::>::new(spec); - - // Slot 0, index 0 - let proposer_index_a = 420; - let sidecar_a = get_blob_sidecar(0, proposer_index_a, 0); - - assert_eq!( - cache.proposer_is_known(&sidecar_a), - Ok(false), - "no observation in empty cache" - ); - - assert_eq!( - cache.observe_sidecar(&sidecar_a), - Ok(false), - "can observe proposer, indicates proposer unobserved" - ); - - assert_eq!( - cache.proposer_is_known(&sidecar_a), - Ok(true), - "observed block is indicated as true" - ); - - assert_eq!( - cache.observe_sidecar(&sidecar_a), - Ok(true), - "observing again indicates true" - ); - - assert_eq!(cache.finalized_slot, 0, "finalized slot is zero"); - assert_eq!(cache.items.len(), 1, "only one slot should be present"); - let cached_blob_indices = cache - .items - .get(&ProposalKey::new(proposer_index_a, Slot::new(0))) - .expect("slot zero should be present"); - assert_eq!( - cached_blob_indices.len(), - 1, - "only one proposer should be present" - ); - - // Slot 1, proposer 0 - - let proposer_index_b = 421; - let sidecar_b = get_blob_sidecar(1, proposer_index_b, 0); - - assert_eq!( - cache.proposer_is_known(&sidecar_b), - Ok(false), - "no observation for new slot" - ); - assert_eq!( - cache.observe_sidecar(&sidecar_b), - Ok(false), - "can observe proposer for new slot, indicates proposer unobserved" - ); - assert_eq!( - cache.proposer_is_known(&sidecar_b), - Ok(true), - "observed block in slot 1 is indicated as true" - ); - assert_eq!( - cache.observe_sidecar(&sidecar_b), - Ok(true), - "observing slot 1 again indicates true" - ); - - assert_eq!(cache.finalized_slot, 0, "finalized slot is zero"); - assert_eq!(cache.items.len(), 2, "two slots should be present"); - let cached_blob_indices = cache - .items - .get(&ProposalKey::new(proposer_index_a, Slot::new(0))) - .expect("slot zero should be present"); - assert_eq!( - cached_blob_indices.len(), - 1, - "only one proposer should be present in slot 0" - ); - let cached_blob_indices = cache - .items - .get(&ProposalKey::new(proposer_index_b, Slot::new(1))) - .expect("slot zero should be present"); - assert_eq!( - cached_blob_indices.len(), - 1, - "only one proposer should be present in slot 1" - ); - - // Slot 0, index 1 - let sidecar_c = get_blob_sidecar(0, proposer_index_a, 1); - - assert_eq!( - cache.proposer_is_known(&sidecar_c), - Ok(false), - "no observation for new index" - ); - assert_eq!( - cache.observe_sidecar(&sidecar_c), - Ok(false), - "can observe new index, indicates sidecar unobserved for new index" - ); - assert_eq!( - cache.proposer_is_known(&sidecar_c), - Ok(true), - "observed new sidecar is indicated as true" - ); - assert_eq!( - cache.observe_sidecar(&sidecar_c), - Ok(true), - "observing new sidecar again indicates true" - ); - - assert_eq!(cache.finalized_slot, 0, "finalized slot is zero"); - assert_eq!(cache.items.len(), 2, "two slots should be present"); - let cached_blob_indices = cache - .items - .get(&ProposalKey::new(proposer_index_a, Slot::new(0))) - .expect("slot zero should be present"); - assert_eq!( - cached_blob_indices.len(), - 2, - "two blob indices should be present in slot 0" - ); - - // Create a sidecar sharing slot and proposer but with a different block root. - let mut sidecar_d: BlobSidecar = BlobSidecar { - index: sidecar_c.index, - blob: sidecar_c.blob.clone(), - kzg_commitment: sidecar_c.kzg_commitment, - kzg_proof: sidecar_c.kzg_proof, - signed_block_header: sidecar_c.signed_block_header.clone(), - kzg_commitment_inclusion_proof: sidecar_c.kzg_commitment_inclusion_proof.clone(), - }; - sidecar_d.signed_block_header.message.body_root = Hash256::repeat_byte(7); - assert_eq!( - cache.proposer_is_known(&sidecar_d), - Ok(true), - "there has been an observation for this proposer index" - ); - assert_eq!( - cache.observe_sidecar(&sidecar_d), - Ok(true), - "indicates sidecar proposer was observed" - ); - let cached_blob_indices = cache - .items - .get(&ProposalKey::new(proposer_index_a, Slot::new(0))) - .expect("slot zero should be present"); - assert_eq!( - cached_blob_indices.len(), - 2, - "two blob indices should be present in slot 0" - ); - - // Try adding an out of bounds index - let invalid_index = E::max_blobs_per_block() as u64; - let sidecar_d = get_blob_sidecar(0, proposer_index_a, invalid_index); - assert_eq!( - cache.observe_sidecar(&sidecar_d), - Err(Error::InvalidDataIndex(invalid_index)), - "cannot add an index > MaxBlobsPerBlock" - ); - } -} diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index f1f501e225b..781c447f811 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -6,7 +6,7 @@ use crate::{ }; use beacon_chain::blob_verification::{GossipBlobError, GossipVerifiedBlob}; use beacon_chain::block_verification_types::AsBlock; -use beacon_chain::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn}; +use beacon_chain::data_column_verification::GossipVerifiedDataColumn; use beacon_chain::store::Error; use beacon_chain::{ attestation_verification::{self, Error as AttnError, VerifiedAttestation}, @@ -656,81 +656,8 @@ impl NetworkBeaconProcessor { ) .await } - Err(err) => { - match err { - GossipDataColumnError::ParentUnknown(column) => { - debug!( - self.log, - "Unknown parent hash for column"; - "action" => "requesting parent", - "block_root" => %column.block_root(), - "parent_root" => %column.block_parent_root(), - ); - self.send_sync_message(SyncMessage::UnknownParentDataColumn( - peer_id, column, - )); - } - GossipDataColumnError::KzgNotInitialized - | GossipDataColumnError::PubkeyCacheTimeout - | GossipDataColumnError::BeaconChainError(_) => { - crit!( - self.log, - "Internal error when verifying column sidecar"; - "error" => ?err, - ) - } - GossipDataColumnError::ProposalSignatureInvalid - | GossipDataColumnError::UnknownValidator(_) - | GossipDataColumnError::ProposerIndexMismatch { .. } - | GossipDataColumnError::IsNotLaterThanParent { .. } - | GossipDataColumnError::InvalidSubnetId { .. } - | GossipDataColumnError::InvalidInclusionProof { .. } - | GossipDataColumnError::InvalidKzgProof { .. } - | GossipDataColumnError::NotFinalizedDescendant { .. } => { - debug!( - self.log, - "Could not verify column sidecar for gossip. Rejecting the column sidecar"; - "error" => ?err, - "slot" => %slot, - "block_root" => %block_root, - "index" => %index, - ); - // Prevent recurring behaviour by penalizing the peer slightly. - self.gossip_penalize_peer( - peer_id, - PeerAction::LowToleranceError, - "gossip_data_column_low", - ); - self.propagate_validation_result( - message_id, - peer_id, - MessageAcceptance::Reject, - ); - } - GossipDataColumnError::FutureSlot { .. } - | GossipDataColumnError::PriorKnown { .. } - | GossipDataColumnError::PastFinalizedSlot { .. } => { - debug!( - self.log, - "Could not verify column sidecar for gossip. Ignoring the column sidecar"; - "error" => ?err, - "slot" => %slot, - "block_root" => %block_root, - "index" => %index, - ); - // Prevent recurring behaviour by penalizing the peer slightly. - self.gossip_penalize_peer( - peer_id, - PeerAction::HighToleranceError, - "gossip_data_column_high", - ); - self.propagate_validation_result( - message_id, - peer_id, - MessageAcceptance::Ignore, - ); - } - } + Err(_) => { + // TODO(das) implement gossip error handling } } } From c99730f592cdcb4b2f8c2ded8a22f90264095661 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Tue, 23 Jul 2024 00:56:25 +1000 Subject: [PATCH 3/7] Add gossip cache timeout for data columns. Rename data column metrics for consistency. --- beacon_node/beacon_chain/src/beacon_chain.rs | 4 ++-- beacon_node/beacon_chain/src/metrics.rs | 24 +++++++++---------- .../src/service/gossip_cache.rs | 8 ++++++- lcli/src/generate_bootnode_enr.rs | 1 - 4 files changed, 21 insertions(+), 16 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 1aa415ab18a..69f8b88ed50 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -2124,10 +2124,10 @@ impl BeaconChain { data_column_sidecar: Arc>, subnet_id: u64, ) -> Result, GossipDataColumnError> { - metrics::inc_counter(&metrics::BLOBS_COLUMN_SIDECAR_PROCESSING_REQUESTS); + metrics::inc_counter(&metrics::DATA_COLUMN_SIDECAR_PROCESSING_REQUESTS); let _timer = metrics::start_timer(&metrics::DATA_COLUMN_SIDECAR_GOSSIP_VERIFICATION_TIMES); GossipVerifiedDataColumn::new(data_column_sidecar, subnet_id, self).map(|v| { - metrics::inc_counter(&metrics::DATA_COLUMNS_SIDECAR_PROCESSING_SUCCESSES); + metrics::inc_counter(&metrics::DATA_COLUMN_SIDECAR_PROCESSING_SUCCESSES); v }) } diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index 001b6609043..ab547cb6006 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -1032,26 +1032,14 @@ lazy_static! { "beacon_blobs_sidecar_processing_requests_total", "Count of all blob sidecars submitted for processing" ); - pub static ref BLOBS_COLUMN_SIDECAR_PROCESSING_REQUESTS: Result = try_create_int_counter( - "beacon_blobs_column_sidecar_processing_requests_total", - "Count of all data column sidecars submitted for processing" - ); pub static ref BLOBS_SIDECAR_PROCESSING_SUCCESSES: Result = try_create_int_counter( "beacon_blobs_sidecar_processing_successes_total", "Number of blob sidecars verified for gossip" ); - pub static ref DATA_COLUMNS_SIDECAR_PROCESSING_SUCCESSES: Result = try_create_int_counter( - "beacon_blobs_column_sidecar_processing_successes_total", - "Number of data column sidecars verified for gossip" - ); pub static ref BLOBS_SIDECAR_GOSSIP_VERIFICATION_TIMES: Result = try_create_histogram( "beacon_blobs_sidecar_gossip_verification_seconds", "Full runtime of blob sidecars gossip verification" ); - pub static ref DATA_COLUMN_SIDECAR_GOSSIP_VERIFICATION_TIMES: Result = try_create_histogram( - "beacon_blobs_column_sidecar_gossip_verification_seconds", - "Full runtime of data column sidecars gossip verification" - ); pub static ref BLOB_SIDECAR_INCLUSION_PROOF_VERIFICATION: Result = try_create_histogram( "blob_sidecar_inclusion_proof_verification_seconds", "Time taken to verify blob sidecar inclusion proof" @@ -1060,6 +1048,18 @@ lazy_static! { "blob_sidecar_inclusion_proof_computation_seconds", "Time taken to compute blob sidecar inclusion proof" ); + pub static ref DATA_COLUMN_SIDECAR_PROCESSING_REQUESTS: Result = try_create_int_counter( + "beacon_data_column_sidecar_processing_requests_total", + "Count of all data column sidecars submitted for processing" + ); + pub static ref DATA_COLUMN_SIDECAR_PROCESSING_SUCCESSES: Result = try_create_int_counter( + "beacon_data_column_sidecar_processing_successes_total", + "Number of data column sidecars verified for gossip" + ); + pub static ref DATA_COLUMN_SIDECAR_GOSSIP_VERIFICATION_TIMES: Result = try_create_histogram( + "beacon_data_column_sidecar_gossip_verification_seconds", + "Full runtime of data column sidecars gossip verification" + ); } // Fifth lazy-static block is used to account for macro recursion limit. diff --git a/beacon_node/lighthouse_network/src/service/gossip_cache.rs b/beacon_node/lighthouse_network/src/service/gossip_cache.rs index fd71ec9aafb..0ad31ff2e80 100644 --- a/beacon_node/lighthouse_network/src/service/gossip_cache.rs +++ b/beacon_node/lighthouse_network/src/service/gossip_cache.rs @@ -22,6 +22,8 @@ pub struct GossipCache { beacon_block: Option, /// Timeout for blobs. blob_sidecar: Option, + /// Timeout for data columns. + data_column_sidecar: Option, /// Timeout for aggregate attestations. aggregates: Option, /// Timeout for attestations. @@ -51,6 +53,8 @@ pub struct GossipCacheBuilder { beacon_block: Option, /// Timeout for blob sidecars. blob_sidecar: Option, + /// Timeout for data column sidecars. + data_column_sidecar: Option, /// Timeout for aggregate attestations. aggregates: Option, /// Timeout for attestations. @@ -152,6 +156,7 @@ impl GossipCacheBuilder { default_timeout, beacon_block, blob_sidecar, + data_column_sidecar, aggregates, attestation, voluntary_exit, @@ -168,6 +173,7 @@ impl GossipCacheBuilder { topic_msgs: HashMap::default(), beacon_block: beacon_block.or(default_timeout), blob_sidecar: blob_sidecar.or(default_timeout), + data_column_sidecar: data_column_sidecar.or(default_timeout), aggregates: aggregates.or(default_timeout), attestation: attestation.or(default_timeout), voluntary_exit: voluntary_exit.or(default_timeout), @@ -194,7 +200,7 @@ impl GossipCache { let expire_timeout = match topic.kind() { GossipKind::BeaconBlock => self.beacon_block, GossipKind::BlobSidecar(_) => self.blob_sidecar, - GossipKind::DataColumnSidecar(_) => self.blob_sidecar, + GossipKind::DataColumnSidecar(_) => self.data_column_sidecar, GossipKind::BeaconAggregateAndProof => self.aggregates, GossipKind::Attestation(_) => self.attestation, GossipKind::VoluntaryExit => self.voluntary_exit, diff --git a/lcli/src/generate_bootnode_enr.rs b/lcli/src/generate_bootnode_enr.rs index 77c7ed9ffee..52960b929d8 100644 --- a/lcli/src/generate_bootnode_enr.rs +++ b/lcli/src/generate_bootnode_enr.rs @@ -37,7 +37,6 @@ pub fn run(matches: &ArgMatches) -> Result<(), String> { next_fork_version: genesis_fork_version, next_fork_epoch: Epoch::max_value(), // FAR_FUTURE_EPOCH }; - let enr = build_enr::(&enr_key, &config, &enr_fork_id) .map_err(|e| format!("Unable to create ENR: {:?}", e))?; From 29aab7a11afd862b09291947698c017e088bc872 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Wed, 24 Jul 2024 10:58:36 +1000 Subject: [PATCH 4/7] Remove usage of `unimplemented!` and address review comments. --- beacon_node/beacon_chain/src/data_availability_checker.rs | 2 +- beacon_node/lighthouse_network/src/types/pubsub.rs | 1 + beacon_node/network/src/sync/manager.rs | 1 - consensus/types/src/data_column_subnet_id.rs | 6 +++--- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 195d07fd626..fdba60a69ac 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -194,7 +194,7 @@ impl DataAvailabilityChecker { _gossip_data_columns: Vec>, ) -> Result, AvailabilityCheckError> { // TODO(das) to be implemented - unimplemented!("not implemented") + Err(AvailabilityCheckError::Unexpected) } /// Check if we have all the blobs for a block. Returns `Availability` which has information diff --git a/beacon_node/lighthouse_network/src/types/pubsub.rs b/beacon_node/lighthouse_network/src/types/pubsub.rs index 18353c39fd5..a4d5f2c6d75 100644 --- a/beacon_node/lighthouse_network/src/types/pubsub.rs +++ b/beacon_node/lighthouse_network/src/types/pubsub.rs @@ -277,6 +277,7 @@ impl PubsubMessage { } GossipKind::DataColumnSidecar(subnet_id) => { match fork_context.from_context_bytes(gossip_topic.fork_digest) { + // TODO(das): Remove Deneb fork Some(ForkName::Deneb | ForkName::Electra) => { let col_sidecar = Arc::new( DataColumnSidecar::from_ssz_bytes(data) diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 8b7cef7494a..c9894c8b243 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -651,7 +651,6 @@ impl SyncManager { } SyncMessage::UnknownParentDataColumn(_peer_id, _data_column) => { // TODO(das): data column parent lookup to be implemented - unimplemented!("data column parent lookup to be implemented") } SyncMessage::UnknownBlockHashFromAttestation(peer_id, block_root) => { if !self.notified_unknown_roots.contains(&(peer_id, block_root)) { diff --git a/consensus/types/src/data_column_subnet_id.rs b/consensus/types/src/data_column_subnet_id.rs index cf5b1dd549d..dd58c6c36b4 100644 --- a/consensus/types/src/data_column_subnet_id.rs +++ b/consensus/types/src/data_column_subnet_id.rs @@ -51,9 +51,9 @@ impl DataColumnSubnetId { let mut node_id_bytes = [0u8; 32]; current_id.to_little_endian(&mut node_id_bytes); let hash = ethereum_hashing::hash_fixed(&node_id_bytes); - let hash_prefix = [ - hash[0], hash[1], hash[2], hash[3], hash[4], hash[5], hash[6], hash[7], - ]; + let hash_prefix: [u8; 8] = hash[0..8] + .try_into() + .expect("hash_fixed produces a 32 byte array"); let hash_prefix_u64 = u64::from_le_bytes(hash_prefix); let subnet = hash_prefix_u64 % spec.data_column_sidecar_subnet_count; From f4cd1fa47c750a73b32760109e7a0ec91a1e0704 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Wed, 24 Jul 2024 22:27:23 +1000 Subject: [PATCH 5/7] Remove unnused `GossipDataColumnError` variants and address review comments. --- beacon_node/beacon_chain/src/beacon_chain.rs | 2 +- .../beacon_chain/src/block_verification.rs | 6 +- .../src/data_column_verification.rs | 81 ++----------------- .../lighthouse_network/src/types/pubsub.rs | 10 +-- 4 files changed, 12 insertions(+), 87 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 69f8b88ed50..1fa77a20433 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -2123,7 +2123,7 @@ impl BeaconChain { self: &Arc, data_column_sidecar: Arc>, subnet_id: u64, - ) -> Result, GossipDataColumnError> { + ) -> Result, GossipDataColumnError> { metrics::inc_counter(&metrics::DATA_COLUMN_SIDECAR_PROCESSING_REQUESTS); let _timer = metrics::start_timer(&metrics::DATA_COLUMN_SIDECAR_GOSSIP_VERIFICATION_TIMES); GossipVerifiedDataColumn::new(data_column_sidecar, subnet_id, self).map(|v| { diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 2ac26607228..5ae98cefbe0 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -531,10 +531,10 @@ impl BlockSlashInfo> { } } -impl BlockSlashInfo> { +impl BlockSlashInfo { pub fn from_early_error_data_column( header: SignedBeaconBlockHeader, - e: GossipDataColumnError, + e: GossipDataColumnError, ) -> Self { match e { GossipDataColumnError::ProposalSignatureInvalid => BlockSlashInfo::SignatureInvalid(e), @@ -2029,7 +2029,7 @@ impl BlockBlobError for GossipBlobError { } } -impl BlockBlobError for GossipDataColumnError { +impl BlockBlobError for GossipDataColumnError { fn not_later_than_parent_error(data_column_slot: Slot, parent_slot: Slot) -> Self { GossipDataColumnError::IsNotLaterThanParent { data_column_slot, diff --git a/beacon_node/beacon_chain/src/data_column_verification.rs b/beacon_node/beacon_chain/src/data_column_verification.rs index e1495aca270..2e88da8f6ad 100644 --- a/beacon_node/beacon_chain/src/data_column_verification.rs +++ b/beacon_node/beacon_chain/src/data_column_verification.rs @@ -12,7 +12,7 @@ use types::{ /// An error occurred while validating a gossip data column. #[derive(Debug)] -pub enum GossipDataColumnError { +pub enum GossipDataColumnError { /// There was an error whilst processing the data column. It is not known if it is /// valid or invalid. /// @@ -54,84 +54,15 @@ pub enum GossipDataColumnError { /// /// The data column sidecar is invalid and the peer is faulty. InvalidKzgProof(kzg::Error), - /// The column was gossiped over an incorrect subnet. - /// - /// ## Peer scoring - /// - /// The column is invalid or the peer is faulty. - InvalidSubnetId { received: u64, expected: u64 }, - /// The column sidecar is from a slot that is later than the current slot (with respect to the - /// gossip clock disparity). - /// - /// ## Peer scoring - /// - /// Assuming the local clock is correct, the peer has sent an invalid message. - FutureSlot { - message_slot: Slot, - latest_permissible_slot: Slot, - }, - /// The sidecar corresponds to a slot older than the finalized head slot. - /// - /// ## Peer scoring - /// - /// It's unclear if this column is valid, but this column is for a finalized slot and is - /// therefore useless to us. - PastFinalizedSlot { - column_slot: Slot, - finalized_slot: Slot, - }, - /// The pubkey cache timed out. - /// - /// ## Peer scoring - /// - /// The column sidecar may be valid, this is an internal error. - PubkeyCacheTimeout, - /// The proposer index specified in the sidecar does not match the locally computed - /// proposer index. - /// - /// ## Peer scoring - /// - /// The column is invalid and the peer is faulty. - ProposerIndexMismatch { sidecar: usize, local: usize }, - /// The provided columns's parent block is unknown. - /// - /// ## Peer scoring - /// - /// We cannot process the columns without validating its parent, the peer isn't necessarily faulty. - ParentUnknown(Arc>), - /// The column conflicts with finalization, no need to propagate. - /// - /// ## Peer scoring - /// - /// It's unclear if this column is valid, but it conflicts with finality and shouldn't be - /// imported. - NotFinalizedDescendant { block_parent_root: Hash256 }, - /// Invalid kzg commitment inclusion proof - /// - /// ## Peer scoring - /// - /// The column sidecar is invalid and the peer is faulty - InvalidInclusionProof, - /// A column has already been seen for the given `(sidecar.block_root, sidecar.index)` tuple - /// over gossip or no gossip sources. - /// - /// ## Peer scoring - /// - /// The peer isn't faulty, but we do not forward it over gossip. - PriorKnown { - proposer: u64, - slot: Slot, - index: ColumnIndex, - }, } -impl From for GossipDataColumnError { +impl From for GossipDataColumnError { fn from(e: BeaconChainError) -> Self { GossipDataColumnError::BeaconChainError(e) } } -impl From for GossipDataColumnError { +impl From for GossipDataColumnError { fn from(e: BeaconStateError) -> Self { GossipDataColumnError::BeaconChainError(BeaconChainError::BeaconStateError(e)) } @@ -152,12 +83,12 @@ impl GossipVerifiedDataColumn { column_sidecar: Arc>, subnet_id: u64, chain: &BeaconChain, - ) -> Result> { + ) -> Result { let header = column_sidecar.signed_block_header.clone(); // We only process slashing info if the gossip verification failed // since we do not process the data column any further in that case. validate_data_column_sidecar_for_gossip(column_sidecar, subnet_id, chain).map_err(|e| { - process_block_slash_info::<_, GossipDataColumnError>( + process_block_slash_info::<_, GossipDataColumnError>( chain, BlockSlashInfo::from_early_error_data_column(header, e), ) @@ -254,7 +185,7 @@ pub fn validate_data_column_sidecar_for_gossip( data_column: Arc>, _subnet: u64, chain: &BeaconChain, -) -> Result, GossipDataColumnError> { +) -> Result, GossipDataColumnError> { // TODO(das): implement gossip verification let kzg = chain .kzg diff --git a/beacon_node/lighthouse_network/src/types/pubsub.rs b/beacon_node/lighthouse_network/src/types/pubsub.rs index a4d5f2c6d75..1bc99f9a6c4 100644 --- a/beacon_node/lighthouse_network/src/types/pubsub.rs +++ b/beacon_node/lighthouse_network/src/types/pubsub.rs @@ -278,7 +278,7 @@ impl PubsubMessage { GossipKind::DataColumnSidecar(subnet_id) => { match fork_context.from_context_bytes(gossip_topic.fork_digest) { // TODO(das): Remove Deneb fork - Some(ForkName::Deneb | ForkName::Electra) => { + Some(fork) if fork.deneb_enabled() => { let col_sidecar = Arc::new( DataColumnSidecar::from_ssz_bytes(data) .map_err(|e| format!("{:?}", e))?, @@ -299,13 +299,7 @@ impl PubsubMessage { )) } } - Some( - ForkName::Base - | ForkName::Altair - | ForkName::Bellatrix - | ForkName::Capella, - ) - | None => Err(format!( + Some(_) | None => Err(format!( "data_column_sidecar topic invalid for given fork digest {:?}", gossip_topic.fork_digest )), From e06a32895224eb4487485c34830cfe2699e456f2 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Wed, 24 Jul 2024 23:13:54 +1000 Subject: [PATCH 6/7] Update Cargo.lock --- Cargo.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 2396ccb54b9..605cb4d2a58 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5012,7 +5012,7 @@ dependencies = [ "futures", "gossipsub", "hex", - "itertools", + "itertools 0.10.5", "lazy_static", "libp2p", "libp2p-mplex", From a037711c81e885e12d0396e20e66028fe2064e87 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Thu, 25 Jul 2024 14:09:04 +1000 Subject: [PATCH 7/7] Arc `ChainSpec` in discovery to avoid performance regression when needing to clone it repeatedly. --- .../lighthouse_network/src/discovery/mod.rs | 14 +++++++++----- .../src/discovery/subnet_predicate.rs | 7 +++---- beacon_node/lighthouse_network/src/service/mod.rs | 9 +++++---- 3 files changed, 17 insertions(+), 13 deletions(-) diff --git a/beacon_node/lighthouse_network/src/discovery/mod.rs b/beacon_node/lighthouse_network/src/discovery/mod.rs index 2cc3a27953f..017db26049c 100644 --- a/beacon_node/lighthouse_network/src/discovery/mod.rs +++ b/beacon_node/lighthouse_network/src/discovery/mod.rs @@ -192,7 +192,7 @@ pub struct Discovery { /// Logger for the discovery behaviour. log: slog::Logger, - spec: ChainSpec, + spec: Arc, } impl Discovery { @@ -327,7 +327,7 @@ impl Discovery { update_ports, log, enr_dir, - spec: spec.clone(), + spec: Arc::new(spec.clone()), }) } @@ -758,7 +758,8 @@ impl Discovery { // Only start a discovery query if we have a subnet to look for. if !filtered_subnet_queries.is_empty() { // build the subnet predicate as a combination of the eth2_fork_predicate and the subnet predicate - let subnet_predicate = subnet_predicate::(filtered_subnets, &self.log, &self.spec); + let subnet_predicate = + subnet_predicate::(filtered_subnets, &self.log, self.spec.clone()); debug!( self.log, @@ -885,8 +886,11 @@ impl Discovery { self.add_subnet_query(query.subnet, query.min_ttl, query.retries + 1); // Check the specific subnet against the enr - let subnet_predicate = - subnet_predicate::(vec![query.subnet], &self.log, &self.spec); + let subnet_predicate = subnet_predicate::( + vec![query.subnet], + &self.log, + self.spec.clone(), + ); r.clone() .into_iter() diff --git a/beacon_node/lighthouse_network/src/discovery/subnet_predicate.rs b/beacon_node/lighthouse_network/src/discovery/subnet_predicate.rs index a1f45767eec..b53afe556db 100644 --- a/beacon_node/lighthouse_network/src/discovery/subnet_predicate.rs +++ b/beacon_node/lighthouse_network/src/discovery/subnet_predicate.rs @@ -10,13 +10,12 @@ use types::{ChainSpec, DataColumnSubnetId}; pub fn subnet_predicate( subnets: Vec, log: &slog::Logger, - spec: &ChainSpec, + spec: Arc, ) -> impl Fn(&Enr) -> bool + Send where E: EthSpec, { let log_clone = log.clone(); - let spec_clone = spec.clone(); move |enr: &Enr| { let attestation_bitfield: EnrAttestationBitfield = match enr.attestation_bitfield::() @@ -31,7 +30,7 @@ where enr.sync_committee_bitfield::(); // TODO(das): compute from enr - let custody_subnet_count = spec_clone.custody_requirement; + let custody_subnet_count = spec.custody_requirement; let predicate = subnets.iter().any(|subnet| match subnet { Subnet::Attestation(s) => attestation_bitfield @@ -44,7 +43,7 @@ where let mut subnets = DataColumnSubnetId::compute_custody_subnets::( enr.node_id().raw().into(), custody_subnet_count, - &spec_clone, + &spec, ); subnets.contains(s) } diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index 3ef4ec0f952..2b825e9c87b 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -39,10 +39,10 @@ use std::{ sync::Arc, task::{Context, Poll}, }; -use types::ForkName; use types::{ consts::altair::SYNC_COMMITTEE_SUBNET_COUNT, EnrForkId, EthSpec, ForkContext, Slot, SubnetId, }; +use types::{ChainSpec, ForkName}; use utils::{build_transport, strip_peer_id, Context as ServiceContext, MAX_CONNECTIONS_PER_PEER}; pub mod api_types; @@ -1019,6 +1019,7 @@ impl Network { return; } + let spec = Arc::new(self.fork_context.spec.clone()); let filtered: Vec = subnets_to_discover .into_iter() .filter(|s| { @@ -1054,7 +1055,7 @@ impl Network { // If we connect to the cached peers before the discovery query starts, then we potentially // save a costly discovery query. } else { - self.dial_cached_enrs_in_subnet(s.subnet); + self.dial_cached_enrs_in_subnet(s.subnet, spec.clone()); true } }) @@ -1218,8 +1219,8 @@ impl Network { /// Dial cached Enrs in discovery service that are in the given `subnet_id` and aren't /// in Connected, Dialing or Banned state. - fn dial_cached_enrs_in_subnet(&mut self, subnet: Subnet) { - let predicate = subnet_predicate::(vec![subnet], &self.log, &self.fork_context.spec); + fn dial_cached_enrs_in_subnet(&mut self, subnet: Subnet, spec: Arc) { + let predicate = subnet_predicate::(vec![subnet], &self.log, spec); let peers_to_dial: Vec = self .discovery() .cached_enrs()