Skip to content

Commit

Permalink
Review PR
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion committed May 30, 2024
1 parent 7a4e44b commit 35ac9f4
Showing 1 changed file with 20 additions and 7 deletions.
27 changes: 20 additions & 7 deletions beacon_node/beacon_processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ use std::time::Duration;
use task_executor::TaskExecutor;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
use types::{Attestation, BeaconState, ChainSpec, Hash256, SignedAggregateAndProof, SubnetId};
use types::{
Attestation, BeaconState, ChainSpec, Hash256, RelativeEpoch, SignedAggregateAndProof, SubnetId,
};
use types::{EthSpec, Slot};
use work_reprocessing_queue::IgnoredRpcBlock;
use work_reprocessing_queue::{
Expand All @@ -85,6 +87,11 @@ const MAX_IDLE_QUEUE_LEN: usize = 16_384;
/// The maximum size of the channel for re-processing work events.
const DEFAULT_MAX_SCHEDULED_WORK_QUEUE_LEN: usize = 3 * DEFAULT_MAX_WORK_EVENT_QUEUE_LEN / 4;

/// Over-provision queues based on active validator count by some factor. The beacon chain has
/// strict churns that prevent the validator set size from changing rapidly. By over-provisioning
/// slightly, we don't need to adjust the queues during the lifetime of a process.
const ACTIVE_VALIDATOR_COUNT_OVERPROVISION_PERCENT: usize = 110;

/// Maximum number of queued items that will be stored before dropping them
pub struct BeaconProcessorQueueLengths {
aggregate_queue: usize,
Expand Down Expand Up @@ -124,19 +131,25 @@ impl BeaconProcessorQueueLengths {
state: &BeaconState<E>,
spec: &ChainSpec,
) -> Result<Self, String> {
let active_validator_count = state
.get_active_validator_indices(state.slot().epoch(E::slots_per_epoch()), spec)
.map_err(|e| format!("Error computing active indices: {:?}", e))?
.len();
let active_validator_count =
match state.get_cached_active_validator_indices(RelativeEpoch::Current) {
Ok(indices) => indices.len(),
Err(_) => state
.get_active_validator_indices(state.current_epoch(), spec)
.map_err(|e| format!("Error computing active indices: {:?}", e))?
.len(),
};
let active_validator_count =
(ACTIVE_VALIDATOR_COUNT_OVERPROVISION_PERCENT * active_validator_count) / 100;
let slots_per_epoch = E::slots_per_epoch() as usize;

Ok(Self {
aggregate_queue: 4096,
unknown_block_aggregate_queue: 1024,
// Capacity for a full slot's worth of attestations if subscribed to all subnets
attestation_queue: active_validator_count / slots_per_epoch,
// Capacity /2 of a full slot's worth of attestations, no specific reason for 2
unknown_block_attestation_queue: active_validator_count / slots_per_epoch / 2,
// Capacity for a full slot's worth of attestations if subscribed to all subnets
unknown_block_attestation_queue: active_validator_count / slots_per_epoch,
sync_message_queue: 2048,
sync_contribution_queue: 1024,
gossip_voluntary_exit_queue: 4096,
Expand Down

0 comments on commit 35ac9f4

Please sign in to comment.