diff --git a/beacon_node/beacon_chain/src/beacon_block_streamer.rs b/beacon_node/beacon_chain/src/beacon_block_streamer.rs index 0c92b7c1f62..f0a68b6be55 100644 --- a/beacon_node/beacon_chain/src/beacon_block_streamer.rs +++ b/beacon_node/beacon_chain/src/beacon_block_streamer.rs @@ -1,4 +1,4 @@ -use crate::{metrics, BeaconChain, BeaconChainError, BeaconChainTypes}; +use crate::{metrics, BeaconChain, BeaconChainError, BeaconChainTypes, BlockProcessStatus}; use execution_layer::{ExecutionLayer, ExecutionPayloadBodyV1}; use slog::{crit, debug, error, Logger}; use std::collections::HashMap; @@ -410,15 +410,14 @@ impl BeaconBlockStreamer { fn check_caches(&self, root: Hash256) -> Option>> { if self.check_caches == CheckCaches::Yes { - self.beacon_chain - .reqresp_pre_import_cache - .read() - .get(&root) - .map(|block| { + match self.beacon_chain.get_block_process_status(&root) { + BlockProcessStatus::Unknown => None, + BlockProcessStatus::NotValidated(block) + | BlockProcessStatus::ExecutionValidated(block) => { metrics::inc_counter(&metrics::BEACON_REQRESP_PRE_IMPORT_CACHE_HITS); - block.clone() - }) - .or(self.beacon_chain.early_attester_cache.get_block(root)) + Some(block) + } + } } else { None } diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 9584b2e29f1..0bc0a4a2d3d 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -337,6 +337,20 @@ struct PartialBeaconBlock { bls_to_execution_changes: Vec, } +pub enum BlockProcessStatus { + /// Block is not in any pre-import cache. Block may be in the data-base or in the fork-choice. + Unknown, + /// Block is currently processing but not yet validated. + NotValidated(Arc>), + /// Block is fully valid, but not yet imported. It's cached in the da_checker while awaiting + /// missing block components. + ExecutionValidated(Arc>), +} + +pub struct BeaconChainMetrics { + pub reqresp_pre_import_cache_len: usize, +} + pub type LightClientProducerEvent = (Hash256, Slot, SyncAggregate); pub type BeaconForkChoice = ForkChoice< @@ -1237,6 +1251,27 @@ impl BeaconChain { Ok(self.store.get_blinded_block(block_root)?) } + /// Return the status of a block as it progresses through the various caches of the beacon + /// chain. Used by sync to learn the status of a block and prevent repeated downloads / + /// processing attempts. + pub fn get_block_process_status(&self, block_root: &Hash256) -> BlockProcessStatus { + if let Some(block) = self + .data_availability_checker + .get_execution_valid_block(block_root) + { + return BlockProcessStatus::ExecutionValidated(block); + } + + if let Some(block) = self.reqresp_pre_import_cache.read().get(block_root) { + // A block is on the `reqresp_pre_import_cache` but NOT in the + // `data_availability_checker` only if it is actively processing. We can expect a future + // event with the result of processing + return BlockProcessStatus::NotValidated(block.clone()); + } + + BlockProcessStatus::Unknown + } + /// Returns the state at the given root, if any. /// /// ## Errors @@ -6630,6 +6665,12 @@ impl BeaconChain { ForkName::Base => Err(Error::UnsupportedFork), } } + + pub fn metrics(&self) -> BeaconChainMetrics { + BeaconChainMetrics { + reqresp_pre_import_cache_len: self.reqresp_pre_import_cache.read().len(), + } + } } impl Drop for BeaconChain { diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index a981d31e554..57f718f62d9 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -86,17 +86,12 @@ impl DataAvailabilityChecker { /// Checks if the block root is currenlty in the availability cache awaiting import because /// of missing components. - pub fn has_execution_valid_block(&self, block_root: &Hash256) -> bool { - self.availability_cache - .has_execution_valid_block(block_root) - } - - /// Return the required blobs `block_root` expects if the block is currenlty in the cache. - pub fn num_expected_blobs(&self, block_root: &Hash256) -> Option { + pub fn get_execution_valid_block( + &self, + block_root: &Hash256, + ) -> Option>> { self.availability_cache - .peek_pending_components(block_root, |components| { - components.and_then(|components| components.num_expected_blobs()) - }) + .get_execution_valid_block(block_root) } /// Return the set of imported blob indexes for `block_root`. Returns None if there is no block diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 2e3c4aac558..e350181c867 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -44,7 +44,7 @@ use ssz_types::{FixedVector, VariableList}; use std::num::NonZeroUsize; use std::{collections::HashSet, sync::Arc}; use types::blob_sidecar::BlobIdentifier; -use types::{BlobSidecar, ChainSpec, Epoch, EthSpec, Hash256}; +use types::{BlobSidecar, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock}; /// This represents the components of a partially available block /// @@ -544,12 +544,19 @@ impl OverflowLRUCache { } /// Returns true if the block root is known, without altering the LRU ordering - pub fn has_execution_valid_block(&self, block_root: &Hash256) -> bool { - if let Some(pending_components) = self.critical.read().peek_pending_components(block_root) { - pending_components.executed_block.is_some() - } else { - false - } + pub fn get_execution_valid_block( + &self, + block_root: &Hash256, + ) -> Option>> { + self.critical + .read() + .peek_pending_components(block_root) + .and_then(|pending_components| { + pending_components + .executed_block + .as_ref() + .map(|block| block.block_cloned()) + }) } /// Fetch a blob from the cache without affecting the LRU ordering diff --git a/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs index f8a243bd9e8..9775d54c024 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs @@ -37,6 +37,10 @@ impl DietAvailabilityPendingExecutedBlock { &self.block } + pub fn block_cloned(&self) -> Arc> { + self.block.clone() + } + pub fn num_blobs_expected(&self) -> usize { self.block .message() diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 221bb8b2922..f419429e090 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -62,9 +62,10 @@ pub mod validator_pubkey_cache; pub use self::beacon_chain::{ AttestationProcessingOutcome, AvailabilityProcessingStatus, BeaconBlockResponse, - BeaconBlockResponseWrapper, BeaconChain, BeaconChainTypes, BeaconStore, ChainSegmentResult, - ForkChoiceError, LightClientProducerEvent, OverrideForkchoiceUpdate, ProduceBlockVerification, - StateSkipConfig, WhenSlotSkipped, INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON, + BeaconBlockResponseWrapper, BeaconChain, BeaconChainTypes, BeaconStore, BlockProcessStatus, + ChainSegmentResult, ForkChoiceError, LightClientProducerEvent, OverrideForkchoiceUpdate, + ProduceBlockVerification, StateSkipConfig, WhenSlotSkipped, + INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON, INVALID_JUSTIFIED_PAYLOAD_SHUTDOWN_REASON, }; pub use self::beacon_snapshot::BeaconSnapshot; diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index 3b2453c3112..4ceaf675cec 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -1192,6 +1192,7 @@ pub fn scrape_for_metrics(beacon_chain: &BeaconChain) { } let attestation_stats = beacon_chain.op_pool.attestation_stats(); + let chain_metrics = beacon_chain.metrics(); set_gauge_by_usize( &BLOCK_PROCESSING_SNAPSHOT_CACHE_SIZE, @@ -1200,7 +1201,7 @@ pub fn scrape_for_metrics(beacon_chain: &BeaconChain) { set_gauge_by_usize( &BEACON_REQRESP_PRE_IMPORT_CACHE_SIZE, - beacon_chain.reqresp_pre_import_cache.read().len(), + chain_metrics.reqresp_pre_import_cache_len, ); let da_checker_metrics = beacon_chain.data_availability_checker.metrics(); diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index 5a85e57f632..70a8c6174d4 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -1452,13 +1452,16 @@ fn block_in_processing_cache_becomes_invalid() { let peer_id = r.new_connected_peer(); r.insert_block_to_processing_cache(block.clone().into()); r.trigger_unknown_block_from_attestation(block_root, peer_id); + // Should trigger blob request + let id = r.expect_blob_lookup_request(block_root); // Should not trigger block request r.expect_empty_network(); // Simulate invalid block, removing it from processing cache r.simulate_block_gossip_processing_becomes_invalid(block_root); // Should download block, then issue blobs request r.complete_lookup_block_download(block); - let id = r.expect_blob_lookup_request(block_root); + // Should not trigger block or blob request + r.expect_empty_network(); r.complete_lookup_block_import_valid(block_root, false); // Resolve blob and expect lookup completed r.complete_single_lookup_blob_lookup_valid(id, peer_id, blobs, true); @@ -1475,11 +1478,14 @@ fn block_in_processing_cache_becomes_valid_imported() { let peer_id = r.new_connected_peer(); r.insert_block_to_processing_cache(block.clone().into()); r.trigger_unknown_block_from_attestation(block_root, peer_id); + // Should trigger blob request + let id = r.expect_blob_lookup_request(block_root); // Should not trigger block request r.expect_empty_network(); // Resolve the block from processing step r.simulate_block_gossip_processing_becomes_valid_missing_components(block.into()); - let id = r.expect_blob_lookup_request(block_root); + // Should not trigger block or blob request + r.expect_empty_network(); // Resolve blob and expect lookup completed r.complete_single_lookup_blob_lookup_valid(id, peer_id, blobs, true); r.expect_no_active_lookups(); diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index fa1f50cee06..1b6c820a2f6 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -12,7 +12,7 @@ use crate::status::ToStatusMessage; use crate::sync::block_lookups::SingleLookupId; use crate::sync::manager::{BlockProcessType, SingleLookupReqId}; use beacon_chain::block_verification_types::RpcBlock; -use beacon_chain::{BeaconChain, BeaconChainTypes, EngineState}; +use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessStatus, EngineState}; use fnv::FnvHashMap; use lighthouse_network::rpc::methods::BlobsByRangeRequest; use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError}; @@ -337,26 +337,17 @@ impl SyncNetworkContext { peer_id: PeerId, block_root: Hash256, ) -> Result { - // da_checker includes block that are execution verified, but are missing components - if self - .chain - .data_availability_checker - .has_execution_valid_block(&block_root) - { - return Ok(LookupRequestResult::NoRequestNeeded); - } - - // reqresp_pre_import_cache includes blocks that may not be yet execution verified - if self - .chain - .reqresp_pre_import_cache - .read() - .contains_key(&block_root) - { - // A block is on the `reqresp_pre_import_cache` but NOT in the - // `data_availability_checker` only if it is actively processing. We can expect a future - // event with the result of processing - return Ok(LookupRequestResult::Pending); + match self.chain.get_block_process_status(&block_root) { + // Unknown block, continue request to download + BlockProcessStatus::Unknown => {} + // Block is known are currently processing, expect a future event with the result of + // processing. + BlockProcessStatus::NotValidated { .. } => return Ok(LookupRequestResult::Pending), + // Block is fully validated. If it's not yet imported it's waiting for missing block + // components. Consider this request completed and do nothing. + BlockProcessStatus::ExecutionValidated { .. } => { + return Ok(LookupRequestResult::NoRequestNeeded) + } } let req_id = self.next_id(); @@ -401,9 +392,14 @@ impl SyncNetworkContext { downloaded_block_expected_blobs: Option, ) -> Result { let Some(expected_blobs) = downloaded_block_expected_blobs.or_else(|| { - self.chain - .data_availability_checker - .num_expected_blobs(&block_root) + // If the block is already being processed or fully validated, retrieve how many blobs + // it expects. Consider any stage of the block. If the block root has been validated, we + // can assert that this is the correct value of `blob_kzg_commitments_count`. + match self.chain.get_block_process_status(&block_root) { + BlockProcessStatus::Unknown => None, + BlockProcessStatus::NotValidated(block) + | BlockProcessStatus::ExecutionValidated(block) => Some(block.num_expected_blobs()), + } }) else { // Wait to download the block before downloading blobs. Then we can be sure that the // block has data, so there's no need to do "blind" requests for all possible blobs and