Skip to content

Commit

Permalink
Re-process early sampling requests (#5569)
Browse files Browse the repository at this point in the history
* Re-process early sampling requests

# Conflicts:
#	beacon_node/beacon_processor/src/work_reprocessing_queue.rs
#	beacon_node/lighthouse_network/src/rpc/methods.rs
#	beacon_node/network/src/network_beacon_processor/rpc_methods.rs

* Update beacon_node/beacon_processor/src/work_reprocessing_queue.rs

Co-authored-by: Jimmy Chen <[email protected]>

* Add missing var

* Beta compiler fixes and small typo fixes.

* Remove duplicate method.

---------

Co-authored-by: Jimmy Chen <[email protected]>
  • Loading branch information
dapplion and jimmygchen authored Apr 30, 2024
1 parent cb11a16 commit c85c205
Show file tree
Hide file tree
Showing 10 changed files with 332 additions and 83 deletions.
68 changes: 67 additions & 1 deletion beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ use task_executor::{ShutdownReason, TaskExecutor};
use tokio_stream::Stream;
use tree_hash::TreeHash;
use types::blob_sidecar::FixedBlobSidecarList;
use types::data_column_sidecar::DataColumnSidecarList;
use types::data_column_sidecar::{ColumnIndex, DataColumnIdentifier, DataColumnSidecarList};
use types::payload::BlockProductionVersion;
use types::*;

Expand Down Expand Up @@ -541,6 +541,12 @@ pub struct BeaconBlockResponse<E: EthSpec, Payload: AbstractExecPayload<E>> {
pub consensus_block_value: u64,
}

pub enum BlockImportStatus<E: EthSpec> {
PendingImport(Arc<SignedBeaconBlock<E>>),
Imported,
Unknown,
}

impl FinalizationAndCanonicity {
pub fn is_finalized(self) -> bool {
self.slot_is_finalized && self.canonical
Expand Down Expand Up @@ -1166,6 +1172,66 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.map_or_else(|| self.get_data_columns(block_root), Ok)
}

pub fn get_selected_data_columns_checking_all_caches(
&self,
block_root: Hash256,
indices: &[ColumnIndex],
) -> Result<Vec<Arc<DataColumnSidecar<T::EthSpec>>>, Error> {
let columns_from_availability_cache = indices
.iter()
.copied()
.filter_map(|index| {
self.data_availability_checker
.get_data_column(&DataColumnIdentifier { block_root, index })
.transpose()
})
.collect::<Result<Vec<_>, _>>()?;
// Existence of a column in the data availability cache and downstream caches is exclusive.
// If there's a single match in the availability cache we can safely skip other sources.
if !columns_from_availability_cache.is_empty() {
return Ok(columns_from_availability_cache);
}

Ok(self
.early_attester_cache
.get_data_columns(block_root)
.map_or_else(|| self.get_data_columns(&block_root), Ok)?
.into_iter()
.filter(|dc| indices.contains(&dc.index))
.collect())
}

/// Returns the import status of block checking (in order) pre-import caches, fork-choice, db store
pub fn get_block_import_status(&self, block_root: &Hash256) -> BlockImportStatus<T::EthSpec> {
if let Some(block) = self
.reqresp_pre_import_cache
.read()
.get(block_root)
.map(|block| {
metrics::inc_counter(&metrics::BEACON_REQRESP_PRE_IMPORT_CACHE_HITS);
block.clone()
})
{
return BlockImportStatus::PendingImport(block);
}
// Check fork-choice before early_attester_cache as the latter is pruned lazily
if self
.canonical_head
.fork_choice_read_lock()
.contains_block(block_root)
{
return BlockImportStatus::Imported;
}
if let Some(block) = self.early_attester_cache.get_block(*block_root) {
return BlockImportStatus::PendingImport(block);
}
if let Ok(true) = self.store.block_exists(block_root) {
BlockImportStatus::Imported
} else {
BlockImportStatus::Unknown
}
}

/// Returns the block at the given root, if any.
///
/// ## Errors
Expand Down
6 changes: 5 additions & 1 deletion beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,11 @@ use store::{Error as DBError, HotStateSummary, KeyValueStore, StoreOp};
use task_executor::JoinHandle;
use tree_hash::TreeHash;
use types::data_column_sidecar::DataColumnSidecarError;
use types::{BeaconBlockRef, BeaconState, BeaconStateError, BlobSidecarList, ChainSpec, DataColumnSidecar, DataColumnSubnetId, Epoch, EthSpec, ExecutionBlockHash, FullPayload, Hash256, InconsistentFork, PublicKey, PublicKeyBytes, RelativeEpoch, SignedBeaconBlock, SignedBeaconBlockHeader, Slot};
use types::{
BeaconBlockRef, BeaconState, BeaconStateError, BlobSidecarList, ChainSpec, DataColumnSidecar,
DataColumnSubnetId, Epoch, EthSpec, ExecutionBlockHash, FullPayload, Hash256, InconsistentFork,
PublicKey, PublicKeyBytes, RelativeEpoch, SignedBeaconBlock, SignedBeaconBlockHeader, Slot,
};
use types::{BlobSidecar, ExecPayload};

pub const POS_PANDA_BANNER: &str = r#"
Expand Down
1 change: 1 addition & 0 deletions beacon_node/beacon_chain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ pub use self::chain_config::ChainConfig;
pub use self::errors::{BeaconChainError, BlockProductionError};
pub use self::historical_blocks::HistoricalBlockError;
pub use attestation_verification::Error as AttestationError;
pub use beacon_chain::BlockImportStatus;
pub use beacon_fork_choice_store::{BeaconForkChoiceStore, Error as ForkChoiceStoreError};
pub use block_verification::{
get_block_root, BlockError, ExecutionPayloadError, ExecutionPendingBlock, GossipVerifiedBlock,
Expand Down
48 changes: 36 additions & 12 deletions beacon_node/beacon_processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
use types::{Attestation, Hash256, SignedAggregateAndProof, SubnetId};
use types::{EthSpec, Slot};
use work_reprocessing_queue::IgnoredRpcBlock;
use work_reprocessing_queue::{
spawn_reprocess_scheduler, QueuedAggregate, QueuedLightClientUpdate, QueuedRpcBlock,
QueuedUnaggregate, ReadyWork,
};
use work_reprocessing_queue::{IgnoredRpcBlock, QueuedSamplingRequest};

mod metrics;
pub mod work_reprocessing_queue;
Expand Down Expand Up @@ -141,6 +141,10 @@ const MAX_GOSSIP_OPTIMISTIC_UPDATE_QUEUE_LEN: usize = 1_024;
/// for reprocessing before we start dropping them.
const MAX_GOSSIP_OPTIMISTIC_UPDATE_REPROCESS_QUEUE_LEN: usize = 128;

/// The maximum number of queued retries of ReqResp sampling requests from the reprocess queue that
/// will be stored before dropping them.
const MAX_UNKNOWN_BLOCK_SAMPLING_REQUEST_QUEUE_LEN: usize = 16_384;

/// The maximum number of queued `SyncCommitteeMessage` objects that will be stored before we start dropping
/// them.
const MAX_SYNC_MESSAGE_QUEUE_LEN: usize = 2048;
Expand Down Expand Up @@ -272,6 +276,7 @@ pub const LIGHT_CLIENT_OPTIMISTIC_UPDATE_REQUEST: &str = "light_client_optimisti
pub const UNKNOWN_BLOCK_ATTESTATION: &str = "unknown_block_attestation";
pub const UNKNOWN_BLOCK_AGGREGATE: &str = "unknown_block_aggregate";
pub const UNKNOWN_LIGHT_CLIENT_UPDATE: &str = "unknown_light_client_update";
pub const UNKNOWN_BLOCK_SAMPLING_REQUEST: &str = "unknown_block_sampling_request";
pub const GOSSIP_BLS_TO_EXECUTION_CHANGE: &str = "gossip_bls_to_execution_change";
pub const API_REQUEST_P0: &str = "api_request_p0";
pub const API_REQUEST_P1: &str = "api_request_p1";
Expand Down Expand Up @@ -527,6 +532,10 @@ impl<E: EthSpec> From<ReadyWork> for WorkEvent<E> {
process_fn,
},
},
ReadyWork::SamplingRequest(QueuedSamplingRequest { process_fn, .. }) => Self {
drop_during_sync: true,
work: Work::UnknownBlockSamplingRequest { process_fn },
},
ReadyWork::BackfillSync(QueuedBackfillBatch(process_fn)) => Self {
drop_during_sync: false,
work: Work::ChainSegmentBackfill(process_fn),
Expand Down Expand Up @@ -610,6 +619,9 @@ pub enum Work<E: EthSpec> {
parent_root: Hash256,
process_fn: BlockingFn,
},
UnknownBlockSamplingRequest {
process_fn: BlockingFn,
},
GossipAggregateBatch {
aggregates: Vec<GossipAggregatePackage<E>>,
process_batch: Box<dyn FnOnce(Vec<GossipAggregatePackage<E>>) + Send + Sync>,
Expand Down Expand Up @@ -699,8 +711,9 @@ impl<E: EthSpec> Work<E> {
Work::LightClientFinalityUpdateRequest(_) => LIGHT_CLIENT_FINALITY_UPDATE_REQUEST,
Work::UnknownBlockAttestation { .. } => UNKNOWN_BLOCK_ATTESTATION,
Work::UnknownBlockAggregate { .. } => UNKNOWN_BLOCK_AGGREGATE,
Work::GossipBlsToExecutionChange(_) => GOSSIP_BLS_TO_EXECUTION_CHANGE,
Work::UnknownLightClientOptimisticUpdate { .. } => UNKNOWN_LIGHT_CLIENT_UPDATE,
Work::UnknownBlockSamplingRequest { .. } => UNKNOWN_BLOCK_SAMPLING_REQUEST,
Work::GossipBlsToExecutionChange(_) => GOSSIP_BLS_TO_EXECUTION_CHANGE,
Work::ApiRequestP0 { .. } => API_REQUEST_P0,
Work::ApiRequestP1 { .. } => API_REQUEST_P1,
}
Expand Down Expand Up @@ -839,6 +852,8 @@ impl<E: EthSpec> BeaconProcessor<E> {
let mut optimistic_update_queue = FifoQueue::new(MAX_GOSSIP_OPTIMISTIC_UPDATE_QUEUE_LEN);
let mut unknown_light_client_update_queue =
FifoQueue::new(MAX_GOSSIP_OPTIMISTIC_UPDATE_REPROCESS_QUEUE_LEN);
let mut unknown_block_sampling_request_queue =
FifoQueue::new(MAX_UNKNOWN_BLOCK_SAMPLING_REQUEST_QUEUE_LEN);

// Using a FIFO queue since blocks need to be imported sequentially.
let mut rpc_block_queue = FifoQueue::new(MAX_RPC_BLOCK_QUEUE_LEN);
Expand Down Expand Up @@ -1158,16 +1173,22 @@ impl<E: EthSpec> BeaconProcessor<E> {
// and BlocksByRoot)
} else if let Some(item) = status_queue.pop() {
self.spawn_worker(item, idle_tx);
} else if let Some(item) = bbrange_queue.pop() {
self.spawn_worker(item, idle_tx);
// Prioritize by_root requests over by_range as the former are time
// sensitive for recovery
} else if let Some(item) = bbroots_queue.pop() {
self.spawn_worker(item, idle_tx);
} else if let Some(item) = blbrange_queue.pop() {
self.spawn_worker(item, idle_tx);
} else if let Some(item) = blbroots_queue.pop() {
self.spawn_worker(item, idle_tx);
} else if let Some(item) = dcbroots_queue.pop() {
self.spawn_worker(item, idle_tx);
// Prioritize sampling requests after block syncing requests
} else if let Some(item) = unknown_block_sampling_request_queue.pop() {
self.spawn_worker(item, idle_tx);
// by_range sync after sampling
} else if let Some(item) = bbrange_queue.pop() {
self.spawn_worker(item, idle_tx);
} else if let Some(item) = blbrange_queue.pop() {
self.spawn_worker(item, idle_tx);
// Check slashings after all other consensus messages so we prioritize
// following head.
//
Expand Down Expand Up @@ -1344,6 +1365,9 @@ impl<E: EthSpec> BeaconProcessor<E> {
Work::UnknownLightClientOptimisticUpdate { .. } => {
unknown_light_client_update_queue.push(work, work_id, &self.log)
}
Work::UnknownBlockSamplingRequest { .. } => {
unknown_block_sampling_request_queue.push(work, work_id, &self.log)
}
Work::ApiRequestP0 { .. } => {
api_request_p0_queue.push(work, work_id, &self.log)
}
Expand Down Expand Up @@ -1530,12 +1554,12 @@ impl<E: EthSpec> BeaconProcessor<E> {
Work::ChainSegment(process_fn) => task_spawner.spawn_async(async move {
process_fn.await;
}),
Work::UnknownBlockAttestation { process_fn } => task_spawner.spawn_blocking(process_fn),
Work::UnknownBlockAggregate { process_fn } => task_spawner.spawn_blocking(process_fn),
Work::UnknownLightClientOptimisticUpdate {
parent_root: _,
process_fn,
} => task_spawner.spawn_blocking(process_fn),
Work::UnknownBlockAttestation { process_fn }
| Work::UnknownBlockAggregate { process_fn }
| Work::UnknownLightClientOptimisticUpdate { process_fn, .. }
| Work::UnknownBlockSamplingRequest { process_fn } => {
task_spawner.spawn_blocking(process_fn)
}
Work::DelayedImportBlock {
beacon_block_slot: _,
beacon_block_root: _,
Expand Down
7 changes: 6 additions & 1 deletion beacon_node/beacon_processor/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,12 @@ lazy_static::lazy_static! {
);
pub static ref BEACON_PROCESSOR_REPROCESSING_QUEUE_MATCHED_OPTIMISTIC_UPDATES: Result<IntCounter> = try_create_int_counter(
"beacon_processor_reprocessing_queue_matched_optimistic_updates",
"Number of queued light client optimistic updates where as matching block has been imported."
"Number of queued light client optimistic updates where a matching block has been imported."
);
// TODO: This should be labeled instead of N single metrics
pub static ref BEACON_PROCESSOR_REPROCESSING_QUEUE_MATCHED_SAMPLING_REQUESTS: Result<IntCounter> = try_create_int_counter(
"beacon_processor_reprocessing_queue_matches_sampling_requests",
"Number of queued sampling requests where a matching block has been imported."
);

/// Errors and Debugging Stats
Expand Down
Loading

0 comments on commit c85c205

Please sign in to comment.