diff --git a/beacon_node/network/src/sync/block_lookups/common.rs b/beacon_node/network/src/sync/block_lookups/common.rs index 3bd39301b21..7193dd6e216 100644 --- a/beacon_node/network/src/sync/block_lookups/common.rs +++ b/beacon_node/network/src/sync/block_lookups/common.rs @@ -29,34 +29,12 @@ pub enum LookupType { Parent, } -/// This trait helps differentiate `SingleBlockLookup`s from `ParentLookup`s .This is useful in -/// ensuring requests and responses are handled separately and enables us to use different failure -/// tolerances for each, while re-using the same basic request and retry logic. -pub trait Lookup { - const MAX_ATTEMPTS: u8; - fn lookup_type() -> LookupType; - fn max_attempts() -> u8 { - Self::MAX_ATTEMPTS - } -} - -/// A `Lookup` that is a part of a `ParentLookup`. -pub struct Parent; - -impl Lookup for Parent { - const MAX_ATTEMPTS: u8 = PARENT_FAIL_TOLERANCE; - fn lookup_type() -> LookupType { - LookupType::Parent - } -} - -/// A `Lookup` that part of a single block lookup. -pub struct Current; - -impl Lookup for Current { - const MAX_ATTEMPTS: u8 = SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS; - fn lookup_type() -> LookupType { - LookupType::Current +impl LookupType { + fn max_attempts(&self) -> u8 { + match self { + LookupType::Current => SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS, + LookupType::Parent => PARENT_FAIL_TOLERANCE, + } } } @@ -68,7 +46,7 @@ impl Lookup for Current { /// The use of the `ResponseType` associated type gives us a degree of type /// safety when handling a block/blob response ensuring we only mutate the correct corresponding /// state. -pub trait RequestState { +pub trait RequestState { /// The type of the request . type RequestType; @@ -81,9 +59,12 @@ pub trait RequestState { /* Request building methods */ /// Construct a new request. - fn build_request(&mut self) -> Result<(PeerId, Self::RequestType), LookupRequestError> { + fn build_request( + &mut self, + lookup_type: LookupType, + ) -> Result<(PeerId, Self::RequestType), LookupRequestError> { // Verify and construct request. - self.too_many_attempts()?; + self.too_many_attempts(lookup_type)?; let peer = self.get_peer()?; let request = self.new_request(); Ok((peer, request)) @@ -93,6 +74,7 @@ pub trait RequestState { fn build_request_and_send( &mut self, id: Id, + lookup_type: LookupType, cx: &mut SyncNetworkContext, ) -> Result<(), LookupRequestError> { // Check if request is necessary. @@ -101,7 +83,7 @@ pub trait RequestState { } // Construct request. - let (peer_id, request) = self.build_request()?; + let (peer_id, request) = self.build_request(lookup_type)?; // Update request state. let req_counter = self.get_state_mut().on_download_start(peer_id); @@ -110,17 +92,16 @@ pub trait RequestState { let id = SingleLookupReqId { id, req_counter, - lookup_type: L::lookup_type(), + lookup_type, }; Self::make_request(id, peer_id, request, cx) } /// Verify the current request has not exceeded the maximum number of attempts. - fn too_many_attempts(&self) -> Result<(), LookupRequestError> { - let max_attempts = L::max_attempts(); + fn too_many_attempts(&self, lookup_type: LookupType) -> Result<(), LookupRequestError> { let request_state = self.get_state(); - if request_state.failed_attempts() >= max_attempts { + if request_state.failed_attempts() >= lookup_type.max_attempts() { let cannot_process = request_state.more_failed_processing_attempts(); Err(LookupRequestError::TooManyAttempts { cannot_process }) } else { @@ -187,7 +168,7 @@ pub trait RequestState { fn response_type() -> ResponseType; /// A getter for the `BlockRequestState` or `BlobRequestState` associated with this trait. - fn request_state_mut(request: &mut SingleBlockLookup) -> &mut Self; + fn request_state_mut(request: &mut SingleBlockLookup) -> &mut Self; /// A getter for a reference to the `SingleLookupRequestState` associated with this trait. fn get_state(&self) -> &SingleLookupRequestState; @@ -196,7 +177,7 @@ pub trait RequestState { fn get_state_mut(&mut self) -> &mut SingleLookupRequestState; } -impl RequestState for BlockRequestState { +impl RequestState for BlockRequestState { type RequestType = BlocksByRootSingleRequest; type VerifiedResponseType = Arc>; type ReconstructedResponseType = RpcBlock; @@ -253,7 +234,7 @@ impl RequestState for BlockRequestState fn response_type() -> ResponseType { ResponseType::Block } - fn request_state_mut(request: &mut SingleBlockLookup) -> &mut Self { + fn request_state_mut(request: &mut SingleBlockLookup) -> &mut Self { &mut request.block_request_state } fn get_state(&self) -> &SingleLookupRequestState { @@ -264,7 +245,7 @@ impl RequestState for BlockRequestState } } -impl RequestState for BlobRequestState { +impl RequestState for BlobRequestState { type RequestType = BlobsByRootSingleBlockRequest; type VerifiedResponseType = FixedBlobSidecarList; type ReconstructedResponseType = FixedBlobSidecarList; @@ -328,7 +309,7 @@ impl RequestState for BlobRequestState ResponseType { ResponseType::Blob } - fn request_state_mut(request: &mut SingleBlockLookup) -> &mut Self { + fn request_state_mut(request: &mut SingleBlockLookup) -> &mut Self { &mut request.blob_request_state } fn get_state(&self) -> &SingleLookupRequestState { diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index fa2683fb0f0..a2909b49dd1 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -16,9 +16,6 @@ use beacon_chain::data_availability_checker::{ }; use beacon_chain::validator_monitor::timestamp_now; use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError}; -pub use common::Current; -pub use common::Lookup; -pub use common::Parent; pub use common::RequestState; use fnv::FnvHashMap; use lighthouse_network::{PeerAction, PeerId}; @@ -55,12 +52,12 @@ pub struct BlockLookups { /// Parent chain lookups being downloaded. parent_lookups: SmallVec<[ParentLookup; 3]>, - processing_parent_lookups: HashMap, SingleBlockLookup)>, + processing_parent_lookups: HashMap, SingleBlockLookup)>, /// A cache of failed chain lookups to prevent duplicate searches. failed_chains: LRUTimeCache, - single_block_lookups: FnvHashMap>, + single_block_lookups: FnvHashMap>, pub(crate) da_checker: Arc>, @@ -131,7 +128,7 @@ impl BlockLookups { /// Attempts to trigger the request matching the given `block_root`. pub fn trigger_single_lookup( &mut self, - mut single_block_lookup: SingleBlockLookup, + mut single_block_lookup: SingleBlockLookup, cx: &mut SyncNetworkContext, ) { let block_root = single_block_lookup.block_root(); @@ -147,7 +144,7 @@ impl BlockLookups { } /// Adds a lookup to the `single_block_lookups` map. - pub fn add_single_lookup(&mut self, single_block_lookup: SingleBlockLookup) { + pub fn add_single_lookup(&mut self, single_block_lookup: SingleBlockLookup) { self.single_block_lookups .insert(single_block_lookup.id, single_block_lookup); @@ -212,6 +209,7 @@ impl BlockLookups { peers, self.da_checker.clone(), cx.next_id(), + LookupType::Current, ); debug!( @@ -284,10 +282,10 @@ impl BlockLookups { /// Get a single block lookup by its ID. This method additionally ensures the `req_counter` /// matches the current `req_counter` for the lookup. This ensures any stale responses from requests /// that have been retried are ignored. - fn get_single_lookup>( + fn get_single_lookup>( &mut self, id: SingleLookupReqId, - ) -> Option> { + ) -> Option> { let mut lookup = self.single_block_lookups.remove(&id.id)?; let request_state = R::request_state_mut(&mut lookup); @@ -314,7 +312,7 @@ impl BlockLookups { } /// Process a block or blob response received from a single lookup request. - pub fn single_lookup_response>( + pub fn single_lookup_response>( &mut self, lookup_id: SingleLookupReqId, peer_id: PeerId, @@ -345,7 +343,7 @@ impl BlockLookups { "response_type" => ?response_type, ); - match self.handle_verified_response::( + match self.handle_verified_response::( seen_timestamp, cx, BlockProcessType::SingleBlock { id: lookup.id }, @@ -372,13 +370,13 @@ impl BlockLookups { /// Consolidates error handling for `single_lookup_response`. An `Err` here should always mean /// the lookup is dropped. - fn handle_verified_response>( + fn handle_verified_response>( &self, seen_timestamp: Duration, cx: &mut SyncNetworkContext, process_type: BlockProcessType, verified_response: R::VerifiedResponseType, - lookup: &mut SingleBlockLookup, + lookup: &mut SingleBlockLookup, ) -> Result<(), LookupRequestError> { let id = lookup.id; let block_root = lookup.block_root(); @@ -389,7 +387,7 @@ impl BlockLookups { // If we have an outstanding parent request for this block, delay sending the response until // all parent blocks have been processed, otherwise we will fail validation with an // `UnknownParent`. - let delay_send = match L::lookup_type() { + let delay_send = match lookup.lookup_type { LookupType::Parent => false, LookupType::Current => self.has_pending_parent_request(lookup.block_root()), }; @@ -453,7 +451,7 @@ impl BlockLookups { /// Get a parent block lookup by its ID. This method additionally ensures the `req_counter` /// matches the current `req_counter` for the lookup. This any stale responses from requests /// that have been retried are ignored. - fn get_parent_lookup>( + fn get_parent_lookup>( &mut self, id: SingleLookupReqId, ) -> Option> { @@ -479,7 +477,7 @@ impl BlockLookups { } /// Process a response received from a parent lookup request. - pub fn parent_lookup_response>( + pub fn parent_lookup_response>( &mut self, id: SingleLookupReqId, peer_id: PeerId, @@ -523,7 +521,7 @@ impl BlockLookups { /// Consolidates error handling for `parent_lookup_response`. An `Err` here should always mean /// the lookup is dropped. - fn parent_lookup_response_inner>( + fn parent_lookup_response_inner>( &mut self, peer_id: PeerId, response: R::VerifiedResponseType, @@ -554,7 +552,7 @@ impl BlockLookups { } } - self.handle_verified_response::( + self.handle_verified_response::( seen_timestamp, cx, BlockProcessType::ParentLookup { @@ -633,7 +631,7 @@ impl BlockLookups { } /// An RPC error has occurred during a parent lookup. This function handles this case. - pub fn parent_lookup_failed>( + pub fn parent_lookup_failed>( &mut self, id: SingleLookupReqId, peer_id: &PeerId, @@ -669,7 +667,7 @@ impl BlockLookups { } /// An RPC error has occurred during a single lookup. This function handles this case.\ - pub fn single_block_lookup_failed>( + pub fn single_block_lookup_failed>( &mut self, id: SingleLookupReqId, peer_id: &PeerId, @@ -717,7 +715,7 @@ impl BlockLookups { /* Processing responses */ - pub fn single_block_component_processed>( + pub fn single_block_component_processed>( &mut self, target_id: Id, result: BlockProcessingResult, diff --git a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs index b7a71860bff..11eb908953f 100644 --- a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs @@ -1,6 +1,6 @@ +use super::common::LookupType; use super::single_block_lookup::{LookupRequestError, SingleBlockLookup}; use super::{DownloadedBlock, PeerId}; -use crate::sync::block_lookups::common::Parent; use crate::sync::{manager::SLOT_IMPORT_TOLERANCE, network_context::SyncNetworkContext}; use beacon_chain::block_verification_types::AsBlock; use beacon_chain::block_verification_types::RpcBlock; @@ -24,7 +24,7 @@ pub(crate) struct ParentLookup { /// The blocks that have currently been downloaded. downloaded_blocks: Vec>, /// Request of the last parent. - pub current_parent_request: SingleBlockLookup, + pub current_parent_request: SingleBlockLookup, } #[derive(Debug, PartialEq, Eq)] @@ -55,6 +55,7 @@ impl ParentLookup { &[peer_id], da_checker, cx.next_id(), + LookupType::Parent, ); Self { @@ -132,7 +133,7 @@ impl ParentLookup { Hash256, VecDeque>, Vec, - SingleBlockLookup, + SingleBlockLookup, ) { let ParentLookup { chain_hash, diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 5bb663967d7..077af7c3d19 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -1,5 +1,6 @@ +use super::common::LookupType; use super::PeerId; -use crate::sync::block_lookups::common::{Lookup, RequestState}; +use crate::sync::block_lookups::common::RequestState; use crate::sync::block_lookups::Id; use crate::sync::network_context::SyncNetworkContext; use beacon_chain::block_verification_types::RpcBlock; @@ -14,7 +15,6 @@ use rand::seq::IteratorRandom; use slog::{debug, Logger}; use std::collections::HashSet; use std::fmt::Debug; -use std::marker::PhantomData; use std::sync::Arc; use store::Hash256; use strum::IntoStaticStr; @@ -33,27 +33,30 @@ pub enum LookupRequestError { BadState(String), } -pub struct SingleBlockLookup { +pub struct SingleBlockLookup { pub id: Id, - pub block_request_state: BlockRequestState, - pub blob_request_state: BlobRequestState, + pub lookup_type: LookupType, + pub block_request_state: BlockRequestState, + pub blob_request_state: BlobRequestState, pub da_checker: Arc>, /// Only necessary for requests triggered by an `UnknownBlockParent` or `UnknownBlockParent` /// because any blocks or blobs without parents won't hit the data availability cache. pub child_components: Option>, } -impl SingleBlockLookup { +impl SingleBlockLookup { pub fn new( requested_block_root: Hash256, child_components: Option>, peers: &[PeerId], da_checker: Arc>, id: Id, + lookup_type: LookupType, ) -> Self { let is_deneb = da_checker.is_deneb(); Self { id, + lookup_type, block_request_state: BlockRequestState::new(requested_block_root, peers), blob_request_state: BlobRequestState::new(requested_block_root, peers, is_deneb), da_checker, @@ -103,11 +106,11 @@ impl SingleBlockLookup { if !block_already_downloaded { self.block_request_state - .build_request_and_send(self.id, cx)?; + .build_request_and_send(self.id, self.lookup_type, cx)?; } if !blobs_already_downloaded { self.blob_request_state - .build_request_and_send(self.id, cx)?; + .build_request_and_send(self.id, self.lookup_type, cx)?; } Ok(()) } @@ -144,7 +147,7 @@ impl SingleBlockLookup { /// Accepts a verified response, and adds it to the child components if required. This method /// returns a `CachedChild` which provides a completed block + blob response if all components have been /// received, or information about whether the child is required and if it has been downloaded. - pub fn add_response>( + pub fn add_response>( &mut self, verified_response: R::VerifiedResponseType, ) -> CachedChild { @@ -301,7 +304,7 @@ impl SingleBlockLookup { } /// The state of the blob request component of a `SingleBlockLookup`. -pub struct BlobRequestState { +pub struct BlobRequestState { /// The latest picture of which blobs still need to be requested. This includes information /// from both block/blobs downloaded in the network layer and any blocks/blobs that exist in /// the data availability checker. @@ -310,10 +313,9 @@ pub struct BlobRequestState { /// Where we store blobs until we receive the stream terminator. pub blob_download_queue: FixedBlobSidecarList, pub state: SingleLookupRequestState, - _phantom: PhantomData, } -impl BlobRequestState { +impl BlobRequestState { pub fn new(block_root: Hash256, peer_source: &[PeerId], is_deneb: bool) -> Self { let default_ids = MissingBlobs::new_without_block(block_root, is_deneb); Self { @@ -321,24 +323,21 @@ impl BlobRequestState { requested_ids: default_ids, blob_download_queue: <_>::default(), state: SingleLookupRequestState::new(peer_source), - _phantom: PhantomData, } } } /// The state of the block request component of a `SingleBlockLookup`. -pub struct BlockRequestState { +pub struct BlockRequestState { pub requested_block_root: Hash256, pub state: SingleLookupRequestState, - _phantom: PhantomData, } -impl BlockRequestState { +impl BlockRequestState { pub fn new(block_root: Hash256, peers: &[PeerId]) -> Self { Self { requested_block_root: block_root, state: SingleLookupRequestState::new(peers), - _phantom: PhantomData, } } } @@ -525,7 +524,7 @@ impl SingleLookupRequestState { } } -impl slog::Value for SingleBlockLookup { +impl slog::Value for SingleBlockLookup { fn serialize( &self, _record: &slog::Record, @@ -533,7 +532,7 @@ impl slog::Value for SingleBlockLookup { serializer: &mut dyn slog::Serializer, ) -> slog::Result { serializer.emit_str("request", key)?; - serializer.emit_arguments("lookup_type", &format_args!("{:?}", L::lookup_type()))?; + serializer.emit_arguments("lookup_type", &format_args!("{:?}", self.lookup_type))?; serializer.emit_arguments("hash", &format_args!("{}", self.block_root()))?; serializer.emit_arguments( "blob_ids", @@ -587,138 +586,3 @@ impl std::fmt::Display for State { } } } - -#[cfg(test)] -mod tests { - use super::*; - use crate::sync::block_lookups::common::LookupType; - use beacon_chain::builder::Witness; - use beacon_chain::eth1_chain::CachingEth1Backend; - use sloggers::null::NullLoggerBuilder; - use sloggers::Build; - use slot_clock::{SlotClock, TestingSlotClock}; - use std::time::Duration; - use store::{HotColdDB, MemoryStore, StoreConfig}; - use types::{ - test_utils::{SeedableRng, TestRandom, XorShiftRng}, - ChainSpec, MinimalEthSpec as E, SignedBeaconBlock, Slot, - }; - - fn rand_block() -> SignedBeaconBlock { - let mut rng = XorShiftRng::from_seed([42; 16]); - SignedBeaconBlock::from_block( - types::BeaconBlock::Base(types::BeaconBlockBase { - ..<_>::random_for_test(&mut rng) - }), - types::Signature::random_for_test(&mut rng), - ) - } - type T = Witness, E, MemoryStore, MemoryStore>; - - struct TestLookup1; - - impl Lookup for TestLookup1 { - const MAX_ATTEMPTS: u8 = 3; - - fn lookup_type() -> LookupType { - panic!() - } - } - - struct TestLookup2; - - impl Lookup for TestLookup2 { - const MAX_ATTEMPTS: u8 = 4; - - fn lookup_type() -> LookupType { - panic!() - } - } - - #[test] - fn test_happy_path() { - let peer_id = PeerId::random(); - let block = rand_block(); - let spec = E::default_spec(); - let slot_clock = TestingSlotClock::new( - Slot::new(0), - Duration::from_secs(0), - Duration::from_secs(spec.seconds_per_slot), - ); - let log = NullLoggerBuilder.build().expect("logger should build"); - let store = - HotColdDB::open_ephemeral(StoreConfig::default(), ChainSpec::minimal(), log.clone()) - .expect("store"); - let da_checker = Arc::new( - DataAvailabilityChecker::new(slot_clock, None, store.into(), &log, spec.clone()) - .expect("data availability checker"), - ); - let mut sl = SingleBlockLookup::::new( - block.canonical_root(), - None, - &[peer_id], - da_checker, - 1, - ); - as RequestState>::build_request( - &mut sl.block_request_state, - ) - .unwrap(); - sl.block_request_state.state.state = State::Downloading { peer_id }; - } - - #[test] - fn test_block_lookup_failures() { - let peer_id = PeerId::random(); - let block = rand_block(); - let spec = E::default_spec(); - let slot_clock = TestingSlotClock::new( - Slot::new(0), - Duration::from_secs(0), - Duration::from_secs(spec.seconds_per_slot), - ); - let log = NullLoggerBuilder.build().expect("logger should build"); - let store = - HotColdDB::open_ephemeral(StoreConfig::default(), ChainSpec::minimal(), log.clone()) - .expect("store"); - - let da_checker = Arc::new( - DataAvailabilityChecker::new(slot_clock, None, store.into(), &log, spec.clone()) - .expect("data availability checker"), - ); - - let mut sl = SingleBlockLookup::::new( - block.canonical_root(), - None, - &[peer_id], - da_checker, - 1, - ); - for _ in 1..TestLookup2::MAX_ATTEMPTS { - as RequestState>::build_request( - &mut sl.block_request_state, - ) - .unwrap(); - sl.block_request_state.state.on_download_failure(); - } - - // Now we receive the block and send it for processing - as RequestState>::build_request( - &mut sl.block_request_state, - ) - .unwrap(); - sl.block_request_state.state.state = State::Downloading { peer_id }; - - // One processing failure maxes the available attempts - sl.block_request_state.state.on_processing_failure(); - assert_eq!( - as RequestState>::build_request( - &mut sl.block_request_state, - ) - .unwrap_err(), - LookupRequestError::TooManyAttempts { - cannot_process: false - } - ) - } -} diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 23bd1010bfe..9c17c6a1512 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -42,7 +42,6 @@ use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH}; use crate::network_beacon_processor::{ChainSegmentProcessId, NetworkBeaconProcessor}; use crate::service::NetworkMessage; use crate::status::ToStatusMessage; -use crate::sync::block_lookups::common::{Current, Parent}; use crate::sync::block_lookups::{BlobRequestState, BlockRequestState}; use crate::sync::block_sidecar_coupling::BlocksAndBlobsRequestInfo; use beacon_chain::block_verification_types::AsBlock; @@ -621,14 +620,14 @@ impl SyncManager { } => match process_type { BlockProcessType::SingleBlock { id } => self .block_lookups - .single_block_component_processed::>( + .single_block_component_processed::( id, result, &mut self.network, ), BlockProcessType::SingleBlob { id } => self .block_lookups - .single_block_component_processed::>( + .single_block_component_processed::>( id, result, &mut self.network, @@ -834,7 +833,7 @@ impl SyncManager { Ok((block, seen_timestamp)) => match id.lookup_type { LookupType::Current => self .block_lookups - .single_lookup_response::>( + .single_lookup_response::( id, peer_id, block, @@ -843,7 +842,7 @@ impl SyncManager { ), LookupType::Parent => self .block_lookups - .parent_lookup_response::>( + .parent_lookup_response::( id, peer_id, block, @@ -854,7 +853,7 @@ impl SyncManager { Err(error) => match id.lookup_type { LookupType::Current => self .block_lookups - .single_block_lookup_failed::>( + .single_block_lookup_failed::( id, &peer_id, &mut self.network, @@ -862,7 +861,7 @@ impl SyncManager { ), LookupType::Parent => self .block_lookups - .parent_lookup_failed::>( + .parent_lookup_failed::( id, &peer_id, &mut self.network, @@ -909,7 +908,7 @@ impl SyncManager { Ok((blobs, seen_timestamp)) => match id.lookup_type { LookupType::Current => self .block_lookups - .single_lookup_response::>( + .single_lookup_response::>( id, peer_id, blobs, @@ -918,7 +917,7 @@ impl SyncManager { ), LookupType::Parent => self .block_lookups - .parent_lookup_response::>( + .parent_lookup_response::>( id, peer_id, blobs, @@ -930,7 +929,7 @@ impl SyncManager { Err(error) => match id.lookup_type { LookupType::Current => self .block_lookups - .single_block_lookup_failed::>( + .single_block_lookup_failed::>( id, &peer_id, &mut self.network, @@ -938,7 +937,7 @@ impl SyncManager { ), LookupType::Parent => self .block_lookups - .parent_lookup_failed::>( + .parent_lookup_failed::>( id, &peer_id, &mut self.network,