From 9460d584edebf5b9ab0cdd2c20904e8b8efc12a3 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Fri, 12 Apr 2024 15:17:27 +0900 Subject: [PATCH 1/4] Make beacon processor queue sizes dynamic --- beacon_node/beacon_processor/src/lib.rs | 263 ++++++++++-------------- beacon_node/client/src/builder.rs | 10 +- beacon_node/http_api/src/test_utils.rs | 9 +- 3 files changed, 131 insertions(+), 151 deletions(-) diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index fee55b39adc..a515307f118 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -60,7 +60,7 @@ use std::time::Duration; use task_executor::TaskExecutor; use tokio::sync::mpsc; use tokio::sync::mpsc::error::TrySendError; -use types::{Attestation, Hash256, SignedAggregateAndProof, SubnetId}; +use types::{Attestation, BeaconState, ChainSpec, Hash256, SignedAggregateAndProof, SubnetId}; use types::{EthSpec, Slot}; use work_reprocessing_queue::IgnoredRpcBlock; use work_reprocessing_queue::{ @@ -85,123 +85,87 @@ const MAX_IDLE_QUEUE_LEN: usize = 16_384; /// The maximum size of the channel for re-processing work events. const DEFAULT_MAX_SCHEDULED_WORK_QUEUE_LEN: usize = 3 * DEFAULT_MAX_WORK_EVENT_QUEUE_LEN / 4; -/// The maximum number of queued `Attestation` objects that will be stored before we start dropping -/// them. -const MAX_UNAGGREGATED_ATTESTATION_QUEUE_LEN: usize = 16_384; - -/// The maximum number of queued `Attestation` objects that will be stored before we start dropping -/// them. -const MAX_UNAGGREGATED_ATTESTATION_REPROCESS_QUEUE_LEN: usize = 8_192; - -/// The maximum number of queued `SignedAggregateAndProof` objects that will be stored before we -/// start dropping them. -const MAX_AGGREGATED_ATTESTATION_QUEUE_LEN: usize = 4_096; - -/// The maximum number of queued `SignedAggregateAndProof` objects that will be stored before we -/// start dropping them. -const MAX_AGGREGATED_ATTESTATION_REPROCESS_QUEUE_LEN: usize = 1_024; - -/// The maximum number of queued `SignedBeaconBlock` objects received on gossip that will be stored -/// before we start dropping them. -const MAX_GOSSIP_BLOCK_QUEUE_LEN: usize = 1_024; - -/// The maximum number of queued `BlobSidecar` objects received on gossip that -/// will be stored before we start dropping them. -const MAX_GOSSIP_BLOB_QUEUE_LEN: usize = 1_024; - -/// The maximum number of queued `SignedBeaconBlock` objects received prior to their slot (but -/// within acceptable clock disparity) that will be queued before we start dropping them. -const MAX_DELAYED_BLOCK_QUEUE_LEN: usize = 1_024; - -/// The maximum number of queued `SignedVoluntaryExit` objects received on gossip that will be stored -/// before we start dropping them. -const MAX_GOSSIP_EXIT_QUEUE_LEN: usize = 4_096; - -/// The maximum number of queued `ProposerSlashing` objects received on gossip that will be stored -/// before we start dropping them. -const MAX_GOSSIP_PROPOSER_SLASHING_QUEUE_LEN: usize = 4_096; - -/// The maximum number of queued `AttesterSlashing` objects received on gossip that will be stored -/// before we start dropping them. -const MAX_GOSSIP_ATTESTER_SLASHING_QUEUE_LEN: usize = 4_096; - -/// The maximum number of queued `LightClientFinalityUpdate` objects received on gossip that will be stored -/// before we start dropping them. -const MAX_GOSSIP_FINALITY_UPDATE_QUEUE_LEN: usize = 1_024; - -/// The maximum number of queued `LightClientOptimisticUpdate` objects received on gossip that will be stored -/// before we start dropping them. -const MAX_GOSSIP_OPTIMISTIC_UPDATE_QUEUE_LEN: usize = 1_024; - -/// The maximum number of queued `LightClientOptimisticUpdate` objects received on gossip that will be stored -/// for reprocessing before we start dropping them. -const MAX_GOSSIP_OPTIMISTIC_UPDATE_REPROCESS_QUEUE_LEN: usize = 128; - -/// The maximum number of queued `SyncCommitteeMessage` objects that will be stored before we start dropping -/// them. -const MAX_SYNC_MESSAGE_QUEUE_LEN: usize = 2048; - -/// The maximum number of queued `SignedContributionAndProof` objects that will be stored before we -/// start dropping them. -const MAX_SYNC_CONTRIBUTION_QUEUE_LEN: usize = 1024; - -/// The maximum number of queued `SignedBeaconBlock` objects received from the network RPC that -/// will be stored before we start dropping them. -const MAX_RPC_BLOCK_QUEUE_LEN: usize = 1_024; - -/// The maximum number of queued `BlobSidecar` objects received from the network RPC that -/// will be stored before we start dropping them. -const MAX_RPC_BLOB_QUEUE_LEN: usize = 1_024; - -/// The maximum number of queued `Vec` objects received during syncing that will -/// be stored before we start dropping them. -const MAX_CHAIN_SEGMENT_QUEUE_LEN: usize = 64; - -/// The maximum number of queued `StatusMessage` objects received from the network RPC that will be -/// stored before we start dropping them. -const MAX_STATUS_QUEUE_LEN: usize = 1_024; - -/// The maximum number of queued `BlocksByRangeRequest` objects received from the network RPC that -/// will be stored before we start dropping them. -const MAX_BLOCKS_BY_RANGE_QUEUE_LEN: usize = 1_024; - -/// The maximum number of queued `BlobsByRangeRequest` objects received from the network RPC that -/// will be stored before we start dropping them. -const MAX_BLOBS_BY_RANGE_QUEUE_LEN: usize = 1024; - -/// The maximum number of queued `BlocksByRootRequest` objects received from the network RPC that -/// will be stored before we start dropping them. -const MAX_BLOCKS_BY_ROOTS_QUEUE_LEN: usize = 1_024; - -/// The maximum number of queued `BlobsByRootRequest` objects received from the network RPC that -/// will be stored before we start dropping them. -const MAX_BLOBS_BY_ROOTS_QUEUE_LEN: usize = 1_024; - -/// Maximum number of `SignedBlsToExecutionChange` messages to queue before dropping them. -/// -/// This value is set high to accommodate the large spike that is expected immediately after Capella -/// is activated. -const MAX_BLS_TO_EXECUTION_CHANGE_QUEUE_LEN: usize = 16_384; - -/// The maximum number of queued `LightClientBootstrapRequest` objects received from the network RPC that -/// will be stored before we start dropping them. -const MAX_LIGHT_CLIENT_BOOTSTRAP_QUEUE_LEN: usize = 1_024; - -/// The maximum number of queued `LightClientOptimisticUpdateRequest` objects received from the network RPC that -/// will be stored before we start dropping them. -const MAX_LIGHT_CLIENT_OPTIMISTIC_UPDATE_QUEUE_LEN: usize = 512; - -/// The maximum number of queued `LightClientFinalityUpdateRequest` objects received from the network RPC that -/// will be stored before we start dropping them. -const MAX_LIGHT_CLIENT_FINALITY_UPDATE_QUEUE_LEN: usize = 512; - -/// The maximum number of priority-0 (highest priority) messages that will be queued before -/// they begin to be dropped. -const MAX_API_REQUEST_P0_QUEUE_LEN: usize = 1_024; +/// Maximum number of queued items that will be stored before dropping them +pub struct BeaconProcessorQueueLengths { + aggregate_queue: usize, + attestation_queue: usize, + unknown_block_aggregate_queue: usize, + unknown_block_attestation_queue: usize, + sync_message_queue: usize, + sync_contribution_queue: usize, + gossip_voluntary_exit_queue: usize, + gossip_proposer_slashing_queue: usize, + gossip_attester_slashing_queue: usize, + finality_update_queue: usize, + optimistic_update_queue: usize, + unknown_light_client_update_queue: usize, + rpc_block_queue: usize, + rpc_blob_queue: usize, + chain_segment_queue: usize, + backfill_chain_segment: usize, + gossip_block_queue: usize, + gossip_blob_queue: usize, + delayed_block_queue: usize, + status_queue: usize, + bbrange_queue: usize, + bbroots_queue: usize, + blbroots_queue: usize, + blbrange_queue: usize, + gossip_bls_to_execution_change_queue: usize, + lc_bootstrap_queue: usize, + lc_optimistic_update_queue: usize, + lc_finality_update_queue: usize, + api_request_p0_queue: usize, + api_request_p1_queue: usize, +} -/// The maximum number of priority-1 (second-highest priority) messages that will be queued before -/// they begin to be dropped. -const MAX_API_REQUEST_P1_QUEUE_LEN: usize = 1_024; +impl BeaconProcessorQueueLengths { + pub fn from_state( + state: &BeaconState, + spec: &ChainSpec, + ) -> Result { + let active_validator_count = state + .get_active_validator_indices(state.slot().epoch(E::slots_per_epoch()), spec) + .map_err(|e| format!("Error computing active indices: {:?}", e))? + .len() as usize; + let slots_per_epoch = E::slots_per_epoch() as usize; + + Ok(Self { + aggregate_queue: 4096, + unknown_block_aggregate_queue: 1024, + // Capacity for a full slot's worth of attestations if subscribed to all subnets + attestation_queue: active_validator_count / slots_per_epoch, + // Capacity /2 of a full slot's worth of attestations, no specific reason for 2 + unknown_block_attestation_queue: active_validator_count / slots_per_epoch / 2, + sync_message_queue: 2048, + sync_contribution_queue: 1024, + gossip_voluntary_exit_queue: 4096, + gossip_proposer_slashing_queue: 4096, + gossip_attester_slashing_queue: 4096, + finality_update_queue: 1024, + optimistic_update_queue: 1024, + unknown_light_client_update_queue: 128, + rpc_block_queue: 1024, + rpc_blob_queue: 1024, + chain_segment_queue: 64, + backfill_chain_segment: 64, + gossip_block_queue: 1024, + gossip_blob_queue: 1024, + delayed_block_queue: 1024, + status_queue: 1024, + bbrange_queue: 1024, + bbroots_queue: 1024, + blbroots_queue: 1024, + blbrange_queue: 1024, + gossip_bls_to_execution_change_queue: 16384, + lc_bootstrap_queue: 1024, + lc_optimistic_update_queue: 512, + lc_finality_update_queue: 512, + api_request_p0_queue: 1024, + api_request_p1_queue: 1024, + }) + } +} /// The name of the manager tokio task. const MANAGER_TASK_NAME: &str = "beacon_processor_manager"; @@ -780,6 +744,7 @@ impl BeaconProcessor { work_journal_tx: Option>, slot_clock: S, maximum_gossip_clock_disparity: Duration, + queue_lengths: BeaconProcessorQueueLengths, ) -> Result<(), String> { // Used by workers to communicate that they are finished a task. let (idle_tx, idle_rx) = mpsc::channel::<()>(MAX_IDLE_QUEUE_LEN); @@ -787,61 +752,61 @@ impl BeaconProcessor { // Using LIFO queues for attestations since validator profits rely upon getting fresh // attestations into blocks. Additionally, later attestations contain more information than // earlier ones, so we consider them more valuable. - let mut aggregate_queue = LifoQueue::new(MAX_AGGREGATED_ATTESTATION_QUEUE_LEN); + let mut aggregate_queue = LifoQueue::new(queue_lengths.aggregate_queue); let mut aggregate_debounce = TimeLatch::default(); - let mut attestation_queue = LifoQueue::new(MAX_UNAGGREGATED_ATTESTATION_QUEUE_LEN); + let mut attestation_queue = LifoQueue::new(queue_lengths.attestation_queue); let mut attestation_debounce = TimeLatch::default(); let mut unknown_block_aggregate_queue = - LifoQueue::new(MAX_AGGREGATED_ATTESTATION_REPROCESS_QUEUE_LEN); + LifoQueue::new(queue_lengths.unknown_block_aggregate_queue); let mut unknown_block_attestation_queue = - LifoQueue::new(MAX_UNAGGREGATED_ATTESTATION_REPROCESS_QUEUE_LEN); + LifoQueue::new(queue_lengths.unknown_block_attestation_queue); - let mut sync_message_queue = LifoQueue::new(MAX_SYNC_MESSAGE_QUEUE_LEN); - let mut sync_contribution_queue = LifoQueue::new(MAX_SYNC_CONTRIBUTION_QUEUE_LEN); + let mut sync_message_queue = LifoQueue::new(queue_lengths.sync_message_queue); + let mut sync_contribution_queue = LifoQueue::new(queue_lengths.sync_contribution_queue); // Using a FIFO queue for voluntary exits since it prevents exit censoring. I don't have // a strong feeling about queue type for exits. - let mut gossip_voluntary_exit_queue = FifoQueue::new(MAX_GOSSIP_EXIT_QUEUE_LEN); + let mut gossip_voluntary_exit_queue = + FifoQueue::new(queue_lengths.gossip_voluntary_exit_queue); // Using a FIFO queue for slashing to prevent people from flushing their slashings from the // queues with lots of junk messages. let mut gossip_proposer_slashing_queue = - FifoQueue::new(MAX_GOSSIP_PROPOSER_SLASHING_QUEUE_LEN); + FifoQueue::new(queue_lengths.gossip_proposer_slashing_queue); let mut gossip_attester_slashing_queue = - FifoQueue::new(MAX_GOSSIP_ATTESTER_SLASHING_QUEUE_LEN); + FifoQueue::new(queue_lengths.gossip_attester_slashing_queue); // Using a FIFO queue for light client updates to maintain sequence order. - let mut finality_update_queue = FifoQueue::new(MAX_GOSSIP_FINALITY_UPDATE_QUEUE_LEN); - let mut optimistic_update_queue = FifoQueue::new(MAX_GOSSIP_OPTIMISTIC_UPDATE_QUEUE_LEN); + let mut finality_update_queue = FifoQueue::new(queue_lengths.finality_update_queue); + let mut optimistic_update_queue = FifoQueue::new(queue_lengths.optimistic_update_queue); let mut unknown_light_client_update_queue = - FifoQueue::new(MAX_GOSSIP_OPTIMISTIC_UPDATE_REPROCESS_QUEUE_LEN); + FifoQueue::new(queue_lengths.unknown_light_client_update_queue); // Using a FIFO queue since blocks need to be imported sequentially. - let mut rpc_block_queue = FifoQueue::new(MAX_RPC_BLOCK_QUEUE_LEN); - let mut rpc_blob_queue = FifoQueue::new(MAX_RPC_BLOB_QUEUE_LEN); - let mut chain_segment_queue = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN); - let mut backfill_chain_segment = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN); - let mut gossip_block_queue = FifoQueue::new(MAX_GOSSIP_BLOCK_QUEUE_LEN); - let mut gossip_blob_queue = FifoQueue::new(MAX_GOSSIP_BLOB_QUEUE_LEN); - let mut delayed_block_queue = FifoQueue::new(MAX_DELAYED_BLOCK_QUEUE_LEN); - - let mut status_queue = FifoQueue::new(MAX_STATUS_QUEUE_LEN); - let mut bbrange_queue = FifoQueue::new(MAX_BLOCKS_BY_RANGE_QUEUE_LEN); - let mut bbroots_queue = FifoQueue::new(MAX_BLOCKS_BY_ROOTS_QUEUE_LEN); - let mut blbroots_queue = FifoQueue::new(MAX_BLOBS_BY_ROOTS_QUEUE_LEN); - let mut blbrange_queue = FifoQueue::new(MAX_BLOBS_BY_RANGE_QUEUE_LEN); + let mut rpc_block_queue = FifoQueue::new(queue_lengths.rpc_block_queue); + let mut rpc_blob_queue = FifoQueue::new(queue_lengths.rpc_blob_queue); + let mut chain_segment_queue = FifoQueue::new(queue_lengths.chain_segment_queue); + let mut backfill_chain_segment = FifoQueue::new(queue_lengths.backfill_chain_segment); + let mut gossip_block_queue = FifoQueue::new(queue_lengths.gossip_block_queue); + let mut gossip_blob_queue = FifoQueue::new(queue_lengths.gossip_blob_queue); + let mut delayed_block_queue = FifoQueue::new(queue_lengths.delayed_block_queue); + + let mut status_queue = FifoQueue::new(queue_lengths.status_queue); + let mut bbrange_queue = FifoQueue::new(queue_lengths.bbrange_queue); + let mut bbroots_queue = FifoQueue::new(queue_lengths.bbroots_queue); + let mut blbroots_queue = FifoQueue::new(queue_lengths.blbroots_queue); + let mut blbrange_queue = FifoQueue::new(queue_lengths.blbrange_queue); let mut gossip_bls_to_execution_change_queue = - FifoQueue::new(MAX_BLS_TO_EXECUTION_CHANGE_QUEUE_LEN); + FifoQueue::new(queue_lengths.gossip_bls_to_execution_change_queue); - let mut lc_bootstrap_queue = FifoQueue::new(MAX_LIGHT_CLIENT_BOOTSTRAP_QUEUE_LEN); + let mut lc_bootstrap_queue = FifoQueue::new(queue_lengths.lc_bootstrap_queue); let mut lc_optimistic_update_queue = - FifoQueue::new(MAX_LIGHT_CLIENT_OPTIMISTIC_UPDATE_QUEUE_LEN); - let mut lc_finality_update_queue = - FifoQueue::new(MAX_LIGHT_CLIENT_FINALITY_UPDATE_QUEUE_LEN); + FifoQueue::new(queue_lengths.lc_optimistic_update_queue); + let mut lc_finality_update_queue = FifoQueue::new(queue_lengths.lc_finality_update_queue); - let mut api_request_p0_queue = FifoQueue::new(MAX_API_REQUEST_P0_QUEUE_LEN); - let mut api_request_p1_queue = FifoQueue::new(MAX_API_REQUEST_P1_QUEUE_LEN); + let mut api_request_p0_queue = FifoQueue::new(queue_lengths.api_request_p0_queue); + let mut api_request_p1_queue = FifoQueue::new(queue_lengths.api_request_p1_queue); // Channels for sending work to the re-process scheduler (`work_reprocessing_tx`) and to // receive them back once they are ready (`ready_work_rx`). diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 2af4e74c224..393ce35f000 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -19,8 +19,8 @@ use beacon_chain::{ store::{HotColdDB, ItemStore, LevelDB, StoreConfig}, BeaconChain, BeaconChainTypes, Eth1ChainBackend, MigratorConfig, ServerSentEventHandler, }; -use beacon_processor::BeaconProcessorConfig; use beacon_processor::{BeaconProcessor, BeaconProcessorChannels}; +use beacon_processor::{BeaconProcessorConfig, BeaconProcessorQueueLengths}; use environment::RuntimeContext; use eth1::{Config as Eth1Config, Service as Eth1Service}; use eth2::{ @@ -884,6 +884,14 @@ where None, beacon_chain.slot_clock.clone(), beacon_chain.spec.maximum_gossip_clock_disparity(), + BeaconProcessorQueueLengths::from_state( + &beacon_chain + .canonical_head + .cached_head() + .snapshot + .beacon_state, + &beacon_chain.spec, + )?, )?; } diff --git a/beacon_node/http_api/src/test_utils.rs b/beacon_node/http_api/src/test_utils.rs index c1313168bcd..88112de10b6 100644 --- a/beacon_node/http_api/src/test_utils.rs +++ b/beacon_node/http_api/src/test_utils.rs @@ -3,7 +3,9 @@ use beacon_chain::{ test_utils::{BeaconChainHarness, BoxedMutator, Builder, EphemeralHarnessType}, BeaconChain, BeaconChainTypes, }; -use beacon_processor::{BeaconProcessor, BeaconProcessorChannels, BeaconProcessorConfig}; +use beacon_processor::{ + BeaconProcessor, BeaconProcessorChannels, BeaconProcessorConfig, BeaconProcessorQueueLengths, +}; use directory::DEFAULT_ROOT_DIR; use eth2::{BeaconNodeHttpClient, Timeouts}; use lighthouse_network::{ @@ -206,6 +208,11 @@ pub async fn create_api_server( None, chain.slot_clock.clone(), chain.spec.maximum_gossip_clock_disparity(), + BeaconProcessorQueueLengths::from_state( + &chain.canonical_head.cached_head().snapshot.beacon_state, + &chain.spec, + ) + .unwrap(), ) .unwrap(); From 7590494a427d8107d2ab4f0f9afb8c4b25f3544e Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Fri, 12 Apr 2024 16:05:06 +0900 Subject: [PATCH 2/4] Update tests --- beacon_node/network/src/network_beacon_processor/tests.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index 4ba4c4ddd1d..06b12c14ae9 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -239,6 +239,11 @@ impl TestRig { Some(work_journal_tx), harness.chain.slot_clock.clone(), chain.spec.maximum_gossip_clock_disparity(), + BeaconProcessorQueueLengths::from_state( + &chain.canonical_head.cached_head().snapshot.beacon_state, + &chain.spec, + ) + .unwrap(), ); assert!(beacon_processor.is_ok()); From 7a4e44b38b310c1932b28bac3260d943f198faaf Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Fri, 12 Apr 2024 16:24:44 +0900 Subject: [PATCH 3/4] lint --- beacon_node/beacon_processor/src/lib.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index a515307f118..2a89ff2b591 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -127,7 +127,7 @@ impl BeaconProcessorQueueLengths { let active_validator_count = state .get_active_validator_indices(state.slot().epoch(E::slots_per_epoch()), spec) .map_err(|e| format!("Error computing active indices: {:?}", e))? - .len() as usize; + .len(); let slots_per_epoch = E::slots_per_epoch() as usize; Ok(Self { @@ -736,6 +736,7 @@ impl BeaconProcessor { /// /// The optional `work_journal_tx` allows for an outside process to receive a log of all work /// events processed by `self`. This should only be used during testing. + #[allow(clippy::too_many_arguments)] pub fn spawn_manager( mut self, event_rx: mpsc::Receiver>, From 35ac9f4602a7c68d258f039f93828fe604f37ab3 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Thu, 30 May 2024 13:15:50 +0300 Subject: [PATCH 4/4] Review PR --- beacon_node/beacon_processor/src/lib.rs | 27 ++++++++++++++++++------- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index 2a89ff2b591..5bf13d82b7b 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -60,7 +60,9 @@ use std::time::Duration; use task_executor::TaskExecutor; use tokio::sync::mpsc; use tokio::sync::mpsc::error::TrySendError; -use types::{Attestation, BeaconState, ChainSpec, Hash256, SignedAggregateAndProof, SubnetId}; +use types::{ + Attestation, BeaconState, ChainSpec, Hash256, RelativeEpoch, SignedAggregateAndProof, SubnetId, +}; use types::{EthSpec, Slot}; use work_reprocessing_queue::IgnoredRpcBlock; use work_reprocessing_queue::{ @@ -85,6 +87,11 @@ const MAX_IDLE_QUEUE_LEN: usize = 16_384; /// The maximum size of the channel for re-processing work events. const DEFAULT_MAX_SCHEDULED_WORK_QUEUE_LEN: usize = 3 * DEFAULT_MAX_WORK_EVENT_QUEUE_LEN / 4; +/// Over-provision queues based on active validator count by some factor. The beacon chain has +/// strict churns that prevent the validator set size from changing rapidly. By over-provisioning +/// slightly, we don't need to adjust the queues during the lifetime of a process. +const ACTIVE_VALIDATOR_COUNT_OVERPROVISION_PERCENT: usize = 110; + /// Maximum number of queued items that will be stored before dropping them pub struct BeaconProcessorQueueLengths { aggregate_queue: usize, @@ -124,10 +131,16 @@ impl BeaconProcessorQueueLengths { state: &BeaconState, spec: &ChainSpec, ) -> Result { - let active_validator_count = state - .get_active_validator_indices(state.slot().epoch(E::slots_per_epoch()), spec) - .map_err(|e| format!("Error computing active indices: {:?}", e))? - .len(); + let active_validator_count = + match state.get_cached_active_validator_indices(RelativeEpoch::Current) { + Ok(indices) => indices.len(), + Err(_) => state + .get_active_validator_indices(state.current_epoch(), spec) + .map_err(|e| format!("Error computing active indices: {:?}", e))? + .len(), + }; + let active_validator_count = + (ACTIVE_VALIDATOR_COUNT_OVERPROVISION_PERCENT * active_validator_count) / 100; let slots_per_epoch = E::slots_per_epoch() as usize; Ok(Self { @@ -135,8 +148,8 @@ impl BeaconProcessorQueueLengths { unknown_block_aggregate_queue: 1024, // Capacity for a full slot's worth of attestations if subscribed to all subnets attestation_queue: active_validator_count / slots_per_epoch, - // Capacity /2 of a full slot's worth of attestations, no specific reason for 2 - unknown_block_attestation_queue: active_validator_count / slots_per_epoch / 2, + // Capacity for a full slot's worth of attestations if subscribed to all subnets + unknown_block_attestation_queue: active_validator_count / slots_per_epoch, sync_message_queue: 2048, sync_contribution_queue: 1024, gossip_voluntary_exit_queue: 4096,