diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index 2a89ff2b591..5bf13d82b7b 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -60,7 +60,9 @@ use std::time::Duration; use task_executor::TaskExecutor; use tokio::sync::mpsc; use tokio::sync::mpsc::error::TrySendError; -use types::{Attestation, BeaconState, ChainSpec, Hash256, SignedAggregateAndProof, SubnetId}; +use types::{ + Attestation, BeaconState, ChainSpec, Hash256, RelativeEpoch, SignedAggregateAndProof, SubnetId, +}; use types::{EthSpec, Slot}; use work_reprocessing_queue::IgnoredRpcBlock; use work_reprocessing_queue::{ @@ -85,6 +87,11 @@ const MAX_IDLE_QUEUE_LEN: usize = 16_384; /// The maximum size of the channel for re-processing work events. const DEFAULT_MAX_SCHEDULED_WORK_QUEUE_LEN: usize = 3 * DEFAULT_MAX_WORK_EVENT_QUEUE_LEN / 4; +/// Over-provision queues based on active validator count by some factor. The beacon chain has +/// strict churns that prevent the validator set size from changing rapidly. By over-provisioning +/// slightly, we don't need to adjust the queues during the lifetime of a process. +const ACTIVE_VALIDATOR_COUNT_OVERPROVISION_PERCENT: usize = 110; + /// Maximum number of queued items that will be stored before dropping them pub struct BeaconProcessorQueueLengths { aggregate_queue: usize, @@ -124,10 +131,16 @@ impl BeaconProcessorQueueLengths { state: &BeaconState, spec: &ChainSpec, ) -> Result { - let active_validator_count = state - .get_active_validator_indices(state.slot().epoch(E::slots_per_epoch()), spec) - .map_err(|e| format!("Error computing active indices: {:?}", e))? - .len(); + let active_validator_count = + match state.get_cached_active_validator_indices(RelativeEpoch::Current) { + Ok(indices) => indices.len(), + Err(_) => state + .get_active_validator_indices(state.current_epoch(), spec) + .map_err(|e| format!("Error computing active indices: {:?}", e))? + .len(), + }; + let active_validator_count = + (ACTIVE_VALIDATOR_COUNT_OVERPROVISION_PERCENT * active_validator_count) / 100; let slots_per_epoch = E::slots_per_epoch() as usize; Ok(Self { @@ -135,8 +148,8 @@ impl BeaconProcessorQueueLengths { unknown_block_aggregate_queue: 1024, // Capacity for a full slot's worth of attestations if subscribed to all subnets attestation_queue: active_validator_count / slots_per_epoch, - // Capacity /2 of a full slot's worth of attestations, no specific reason for 2 - unknown_block_attestation_queue: active_validator_count / slots_per_epoch / 2, + // Capacity for a full slot's worth of attestations if subscribed to all subnets + unknown_block_attestation_queue: active_validator_count / slots_per_epoch, sync_message_queue: 2048, sync_contribution_queue: 1024, gossip_voluntary_exit_queue: 4096,