Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make beacon processor queue sizes dynamic #5573

Merged
merged 4 commits into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
277 changes: 128 additions & 149 deletions beacon_node/beacon_processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ use std::time::Duration;
use task_executor::TaskExecutor;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
use types::{Attestation, Hash256, SignedAggregateAndProof, SubnetId};
use types::{
Attestation, BeaconState, ChainSpec, Hash256, RelativeEpoch, SignedAggregateAndProof, SubnetId,
};
use types::{EthSpec, Slot};
use work_reprocessing_queue::IgnoredRpcBlock;
use work_reprocessing_queue::{
Expand All @@ -85,123 +87,98 @@ const MAX_IDLE_QUEUE_LEN: usize = 16_384;
/// The maximum size of the channel for re-processing work events.
const DEFAULT_MAX_SCHEDULED_WORK_QUEUE_LEN: usize = 3 * DEFAULT_MAX_WORK_EVENT_QUEUE_LEN / 4;

/// The maximum number of queued `Attestation` objects that will be stored before we start dropping
/// them.
const MAX_UNAGGREGATED_ATTESTATION_QUEUE_LEN: usize = 16_384;

/// The maximum number of queued `Attestation` objects that will be stored before we start dropping
/// them.
const MAX_UNAGGREGATED_ATTESTATION_REPROCESS_QUEUE_LEN: usize = 8_192;

/// The maximum number of queued `SignedAggregateAndProof` objects that will be stored before we
/// start dropping them.
const MAX_AGGREGATED_ATTESTATION_QUEUE_LEN: usize = 4_096;

/// The maximum number of queued `SignedAggregateAndProof` objects that will be stored before we
/// start dropping them.
const MAX_AGGREGATED_ATTESTATION_REPROCESS_QUEUE_LEN: usize = 1_024;

/// The maximum number of queued `SignedBeaconBlock` objects received on gossip that will be stored
/// before we start dropping them.
const MAX_GOSSIP_BLOCK_QUEUE_LEN: usize = 1_024;

/// The maximum number of queued `BlobSidecar` objects received on gossip that
/// will be stored before we start dropping them.
const MAX_GOSSIP_BLOB_QUEUE_LEN: usize = 1_024;

/// The maximum number of queued `SignedBeaconBlock` objects received prior to their slot (but
/// within acceptable clock disparity) that will be queued before we start dropping them.
const MAX_DELAYED_BLOCK_QUEUE_LEN: usize = 1_024;

/// The maximum number of queued `SignedVoluntaryExit` objects received on gossip that will be stored
/// before we start dropping them.
const MAX_GOSSIP_EXIT_QUEUE_LEN: usize = 4_096;

/// The maximum number of queued `ProposerSlashing` objects received on gossip that will be stored
/// before we start dropping them.
const MAX_GOSSIP_PROPOSER_SLASHING_QUEUE_LEN: usize = 4_096;

/// The maximum number of queued `AttesterSlashing` objects received on gossip that will be stored
/// before we start dropping them.
const MAX_GOSSIP_ATTESTER_SLASHING_QUEUE_LEN: usize = 4_096;

/// The maximum number of queued `LightClientFinalityUpdate` objects received on gossip that will be stored
/// before we start dropping them.
const MAX_GOSSIP_FINALITY_UPDATE_QUEUE_LEN: usize = 1_024;

/// The maximum number of queued `LightClientOptimisticUpdate` objects received on gossip that will be stored
/// before we start dropping them.
const MAX_GOSSIP_OPTIMISTIC_UPDATE_QUEUE_LEN: usize = 1_024;

/// The maximum number of queued `LightClientOptimisticUpdate` objects received on gossip that will be stored
/// for reprocessing before we start dropping them.
const MAX_GOSSIP_OPTIMISTIC_UPDATE_REPROCESS_QUEUE_LEN: usize = 128;

/// The maximum number of queued `SyncCommitteeMessage` objects that will be stored before we start dropping
/// them.
const MAX_SYNC_MESSAGE_QUEUE_LEN: usize = 2048;

/// The maximum number of queued `SignedContributionAndProof` objects that will be stored before we
/// start dropping them.
const MAX_SYNC_CONTRIBUTION_QUEUE_LEN: usize = 1024;

/// The maximum number of queued `SignedBeaconBlock` objects received from the network RPC that
/// will be stored before we start dropping them.
const MAX_RPC_BLOCK_QUEUE_LEN: usize = 1_024;

/// The maximum number of queued `BlobSidecar` objects received from the network RPC that
/// will be stored before we start dropping them.
const MAX_RPC_BLOB_QUEUE_LEN: usize = 1_024;

/// The maximum number of queued `Vec<SignedBeaconBlock>` objects received during syncing that will
/// be stored before we start dropping them.
const MAX_CHAIN_SEGMENT_QUEUE_LEN: usize = 64;

/// The maximum number of queued `StatusMessage` objects received from the network RPC that will be
/// stored before we start dropping them.
const MAX_STATUS_QUEUE_LEN: usize = 1_024;

/// The maximum number of queued `BlocksByRangeRequest` objects received from the network RPC that
/// will be stored before we start dropping them.
const MAX_BLOCKS_BY_RANGE_QUEUE_LEN: usize = 1_024;

/// The maximum number of queued `BlobsByRangeRequest` objects received from the network RPC that
/// will be stored before we start dropping them.
const MAX_BLOBS_BY_RANGE_QUEUE_LEN: usize = 1024;

/// The maximum number of queued `BlocksByRootRequest` objects received from the network RPC that
/// will be stored before we start dropping them.
const MAX_BLOCKS_BY_ROOTS_QUEUE_LEN: usize = 1_024;

/// The maximum number of queued `BlobsByRootRequest` objects received from the network RPC that
/// will be stored before we start dropping them.
const MAX_BLOBS_BY_ROOTS_QUEUE_LEN: usize = 1_024;

/// Maximum number of `SignedBlsToExecutionChange` messages to queue before dropping them.
///
/// This value is set high to accommodate the large spike that is expected immediately after Capella
/// is activated.
const MAX_BLS_TO_EXECUTION_CHANGE_QUEUE_LEN: usize = 16_384;

/// The maximum number of queued `LightClientBootstrapRequest` objects received from the network RPC that
/// will be stored before we start dropping them.
const MAX_LIGHT_CLIENT_BOOTSTRAP_QUEUE_LEN: usize = 1_024;

/// The maximum number of queued `LightClientOptimisticUpdateRequest` objects received from the network RPC that
/// will be stored before we start dropping them.
const MAX_LIGHT_CLIENT_OPTIMISTIC_UPDATE_QUEUE_LEN: usize = 512;

/// The maximum number of queued `LightClientFinalityUpdateRequest` objects received from the network RPC that
/// will be stored before we start dropping them.
const MAX_LIGHT_CLIENT_FINALITY_UPDATE_QUEUE_LEN: usize = 512;

/// The maximum number of priority-0 (highest priority) messages that will be queued before
/// they begin to be dropped.
const MAX_API_REQUEST_P0_QUEUE_LEN: usize = 1_024;
/// Over-provision queues based on active validator count by some factor. The beacon chain has
/// strict churns that prevent the validator set size from changing rapidly. By over-provisioning
/// slightly, we don't need to adjust the queues during the lifetime of a process.
const ACTIVE_VALIDATOR_COUNT_OVERPROVISION_PERCENT: usize = 110;

/// Maximum number of queued items that will be stored before dropping them
pub struct BeaconProcessorQueueLengths {
aggregate_queue: usize,
attestation_queue: usize,
unknown_block_aggregate_queue: usize,
unknown_block_attestation_queue: usize,
sync_message_queue: usize,
sync_contribution_queue: usize,
gossip_voluntary_exit_queue: usize,
gossip_proposer_slashing_queue: usize,
gossip_attester_slashing_queue: usize,
finality_update_queue: usize,
optimistic_update_queue: usize,
unknown_light_client_update_queue: usize,
rpc_block_queue: usize,
rpc_blob_queue: usize,
chain_segment_queue: usize,
backfill_chain_segment: usize,
gossip_block_queue: usize,
gossip_blob_queue: usize,
delayed_block_queue: usize,
status_queue: usize,
bbrange_queue: usize,
bbroots_queue: usize,
blbroots_queue: usize,
blbrange_queue: usize,
gossip_bls_to_execution_change_queue: usize,
lc_bootstrap_queue: usize,
lc_optimistic_update_queue: usize,
lc_finality_update_queue: usize,
api_request_p0_queue: usize,
api_request_p1_queue: usize,
}

/// The maximum number of priority-1 (second-highest priority) messages that will be queued before
/// they begin to be dropped.
const MAX_API_REQUEST_P1_QUEUE_LEN: usize = 1_024;
impl BeaconProcessorQueueLengths {
pub fn from_state<E: EthSpec>(
state: &BeaconState<E>,
spec: &ChainSpec,
) -> Result<Self, String> {
let active_validator_count =
match state.get_cached_active_validator_indices(RelativeEpoch::Current) {
Ok(indices) => indices.len(),
Err(_) => state
.get_active_validator_indices(state.current_epoch(), spec)
.map_err(|e| format!("Error computing active indices: {:?}", e))?
.len(),
};
let active_validator_count =
(ACTIVE_VALIDATOR_COUNT_OVERPROVISION_PERCENT * active_validator_count) / 100;
let slots_per_epoch = E::slots_per_epoch() as usize;

Ok(Self {
aggregate_queue: 4096,
unknown_block_aggregate_queue: 1024,
// Capacity for a full slot's worth of attestations if subscribed to all subnets
attestation_queue: active_validator_count / slots_per_epoch,
dapplion marked this conversation as resolved.
Show resolved Hide resolved
// Capacity for a full slot's worth of attestations if subscribed to all subnets
unknown_block_attestation_queue: active_validator_count / slots_per_epoch,
sync_message_queue: 2048,
sync_contribution_queue: 1024,
gossip_voluntary_exit_queue: 4096,
gossip_proposer_slashing_queue: 4096,
gossip_attester_slashing_queue: 4096,
finality_update_queue: 1024,
optimistic_update_queue: 1024,
unknown_light_client_update_queue: 128,
rpc_block_queue: 1024,
rpc_blob_queue: 1024,
chain_segment_queue: 64,
backfill_chain_segment: 64,
gossip_block_queue: 1024,
gossip_blob_queue: 1024,
delayed_block_queue: 1024,
status_queue: 1024,
bbrange_queue: 1024,
bbroots_queue: 1024,
blbroots_queue: 1024,
blbrange_queue: 1024,
gossip_bls_to_execution_change_queue: 16384,
lc_bootstrap_queue: 1024,
lc_optimistic_update_queue: 512,
lc_finality_update_queue: 512,
api_request_p0_queue: 1024,
api_request_p1_queue: 1024,
})
}
}

/// The name of the manager tokio task.
const MANAGER_TASK_NAME: &str = "beacon_processor_manager";
Expand Down Expand Up @@ -772,6 +749,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
///
/// The optional `work_journal_tx` allows for an outside process to receive a log of all work
/// events processed by `self`. This should only be used during testing.
#[allow(clippy::too_many_arguments)]
pub fn spawn_manager<S: SlotClock + 'static>(
mut self,
event_rx: mpsc::Receiver<WorkEvent<E>>,
Expand All @@ -780,68 +758,69 @@ impl<E: EthSpec> BeaconProcessor<E> {
work_journal_tx: Option<mpsc::Sender<&'static str>>,
slot_clock: S,
maximum_gossip_clock_disparity: Duration,
queue_lengths: BeaconProcessorQueueLengths,
) -> Result<(), String> {
// Used by workers to communicate that they are finished a task.
let (idle_tx, idle_rx) = mpsc::channel::<()>(MAX_IDLE_QUEUE_LEN);

// Using LIFO queues for attestations since validator profits rely upon getting fresh
// attestations into blocks. Additionally, later attestations contain more information than
// earlier ones, so we consider them more valuable.
let mut aggregate_queue = LifoQueue::new(MAX_AGGREGATED_ATTESTATION_QUEUE_LEN);
let mut aggregate_queue = LifoQueue::new(queue_lengths.aggregate_queue);
let mut aggregate_debounce = TimeLatch::default();
let mut attestation_queue = LifoQueue::new(MAX_UNAGGREGATED_ATTESTATION_QUEUE_LEN);
let mut attestation_queue = LifoQueue::new(queue_lengths.attestation_queue);
let mut attestation_debounce = TimeLatch::default();
let mut unknown_block_aggregate_queue =
LifoQueue::new(MAX_AGGREGATED_ATTESTATION_REPROCESS_QUEUE_LEN);
LifoQueue::new(queue_lengths.unknown_block_aggregate_queue);
let mut unknown_block_attestation_queue =
LifoQueue::new(MAX_UNAGGREGATED_ATTESTATION_REPROCESS_QUEUE_LEN);
LifoQueue::new(queue_lengths.unknown_block_attestation_queue);

let mut sync_message_queue = LifoQueue::new(MAX_SYNC_MESSAGE_QUEUE_LEN);
let mut sync_contribution_queue = LifoQueue::new(MAX_SYNC_CONTRIBUTION_QUEUE_LEN);
let mut sync_message_queue = LifoQueue::new(queue_lengths.sync_message_queue);
let mut sync_contribution_queue = LifoQueue::new(queue_lengths.sync_contribution_queue);

// Using a FIFO queue for voluntary exits since it prevents exit censoring. I don't have
// a strong feeling about queue type for exits.
let mut gossip_voluntary_exit_queue = FifoQueue::new(MAX_GOSSIP_EXIT_QUEUE_LEN);
let mut gossip_voluntary_exit_queue =
FifoQueue::new(queue_lengths.gossip_voluntary_exit_queue);

// Using a FIFO queue for slashing to prevent people from flushing their slashings from the
// queues with lots of junk messages.
let mut gossip_proposer_slashing_queue =
FifoQueue::new(MAX_GOSSIP_PROPOSER_SLASHING_QUEUE_LEN);
FifoQueue::new(queue_lengths.gossip_proposer_slashing_queue);
let mut gossip_attester_slashing_queue =
FifoQueue::new(MAX_GOSSIP_ATTESTER_SLASHING_QUEUE_LEN);
FifoQueue::new(queue_lengths.gossip_attester_slashing_queue);

// Using a FIFO queue for light client updates to maintain sequence order.
let mut finality_update_queue = FifoQueue::new(MAX_GOSSIP_FINALITY_UPDATE_QUEUE_LEN);
let mut optimistic_update_queue = FifoQueue::new(MAX_GOSSIP_OPTIMISTIC_UPDATE_QUEUE_LEN);
let mut finality_update_queue = FifoQueue::new(queue_lengths.finality_update_queue);
let mut optimistic_update_queue = FifoQueue::new(queue_lengths.optimistic_update_queue);
let mut unknown_light_client_update_queue =
FifoQueue::new(MAX_GOSSIP_OPTIMISTIC_UPDATE_REPROCESS_QUEUE_LEN);
FifoQueue::new(queue_lengths.unknown_light_client_update_queue);

// Using a FIFO queue since blocks need to be imported sequentially.
let mut rpc_block_queue = FifoQueue::new(MAX_RPC_BLOCK_QUEUE_LEN);
let mut rpc_blob_queue = FifoQueue::new(MAX_RPC_BLOB_QUEUE_LEN);
let mut chain_segment_queue = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN);
let mut backfill_chain_segment = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN);
let mut gossip_block_queue = FifoQueue::new(MAX_GOSSIP_BLOCK_QUEUE_LEN);
let mut gossip_blob_queue = FifoQueue::new(MAX_GOSSIP_BLOB_QUEUE_LEN);
let mut delayed_block_queue = FifoQueue::new(MAX_DELAYED_BLOCK_QUEUE_LEN);

let mut status_queue = FifoQueue::new(MAX_STATUS_QUEUE_LEN);
let mut bbrange_queue = FifoQueue::new(MAX_BLOCKS_BY_RANGE_QUEUE_LEN);
let mut bbroots_queue = FifoQueue::new(MAX_BLOCKS_BY_ROOTS_QUEUE_LEN);
let mut blbroots_queue = FifoQueue::new(MAX_BLOBS_BY_ROOTS_QUEUE_LEN);
let mut blbrange_queue = FifoQueue::new(MAX_BLOBS_BY_RANGE_QUEUE_LEN);
let mut rpc_block_queue = FifoQueue::new(queue_lengths.rpc_block_queue);
let mut rpc_blob_queue = FifoQueue::new(queue_lengths.rpc_blob_queue);
let mut chain_segment_queue = FifoQueue::new(queue_lengths.chain_segment_queue);
let mut backfill_chain_segment = FifoQueue::new(queue_lengths.backfill_chain_segment);
let mut gossip_block_queue = FifoQueue::new(queue_lengths.gossip_block_queue);
let mut gossip_blob_queue = FifoQueue::new(queue_lengths.gossip_blob_queue);
let mut delayed_block_queue = FifoQueue::new(queue_lengths.delayed_block_queue);

let mut status_queue = FifoQueue::new(queue_lengths.status_queue);
let mut bbrange_queue = FifoQueue::new(queue_lengths.bbrange_queue);
let mut bbroots_queue = FifoQueue::new(queue_lengths.bbroots_queue);
let mut blbroots_queue = FifoQueue::new(queue_lengths.blbroots_queue);
let mut blbrange_queue = FifoQueue::new(queue_lengths.blbrange_queue);

let mut gossip_bls_to_execution_change_queue =
FifoQueue::new(MAX_BLS_TO_EXECUTION_CHANGE_QUEUE_LEN);
FifoQueue::new(queue_lengths.gossip_bls_to_execution_change_queue);

let mut lc_bootstrap_queue = FifoQueue::new(MAX_LIGHT_CLIENT_BOOTSTRAP_QUEUE_LEN);
let mut lc_bootstrap_queue = FifoQueue::new(queue_lengths.lc_bootstrap_queue);
let mut lc_optimistic_update_queue =
FifoQueue::new(MAX_LIGHT_CLIENT_OPTIMISTIC_UPDATE_QUEUE_LEN);
let mut lc_finality_update_queue =
FifoQueue::new(MAX_LIGHT_CLIENT_FINALITY_UPDATE_QUEUE_LEN);
FifoQueue::new(queue_lengths.lc_optimistic_update_queue);
let mut lc_finality_update_queue = FifoQueue::new(queue_lengths.lc_finality_update_queue);

let mut api_request_p0_queue = FifoQueue::new(MAX_API_REQUEST_P0_QUEUE_LEN);
let mut api_request_p1_queue = FifoQueue::new(MAX_API_REQUEST_P1_QUEUE_LEN);
let mut api_request_p0_queue = FifoQueue::new(queue_lengths.api_request_p0_queue);
let mut api_request_p1_queue = FifoQueue::new(queue_lengths.api_request_p1_queue);

// Channels for sending work to the re-process scheduler (`work_reprocessing_tx`) and to
// receive them back once they are ready (`ready_work_rx`).
Expand Down
10 changes: 9 additions & 1 deletion beacon_node/client/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use beacon_chain::{
store::{HotColdDB, ItemStore, LevelDB, StoreConfig},
BeaconChain, BeaconChainTypes, Eth1ChainBackend, MigratorConfig, ServerSentEventHandler,
};
use beacon_processor::BeaconProcessorConfig;
use beacon_processor::{BeaconProcessor, BeaconProcessorChannels};
use beacon_processor::{BeaconProcessorConfig, BeaconProcessorQueueLengths};
use environment::RuntimeContext;
use eth1::{Config as Eth1Config, Service as Eth1Service};
use eth2::{
Expand Down Expand Up @@ -884,6 +884,14 @@ where
None,
beacon_chain.slot_clock.clone(),
beacon_chain.spec.maximum_gossip_clock_disparity(),
BeaconProcessorQueueLengths::from_state(
&beacon_chain
.canonical_head
.cached_head()
.snapshot
.beacon_state,
&beacon_chain.spec,
)?,
)?;
}

Expand Down
9 changes: 8 additions & 1 deletion beacon_node/http_api/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use beacon_chain::{
test_utils::{BeaconChainHarness, BoxedMutator, Builder, EphemeralHarnessType},
BeaconChain, BeaconChainTypes,
};
use beacon_processor::{BeaconProcessor, BeaconProcessorChannels, BeaconProcessorConfig};
use beacon_processor::{
BeaconProcessor, BeaconProcessorChannels, BeaconProcessorConfig, BeaconProcessorQueueLengths,
};
use directory::DEFAULT_ROOT_DIR;
use eth2::{BeaconNodeHttpClient, Timeouts};
use lighthouse_network::{
Expand Down Expand Up @@ -206,6 +208,11 @@ pub async fn create_api_server<T: BeaconChainTypes>(
None,
chain.slot_clock.clone(),
chain.spec.maximum_gossip_clock_disparity(),
BeaconProcessorQueueLengths::from_state(
&chain.canonical_head.cached_head().snapshot.beacon_state,
&chain.spec,
)
.unwrap(),
)
.unwrap();

Expand Down
5 changes: 5 additions & 0 deletions beacon_node/network/src/network_beacon_processor/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,11 @@ impl TestRig {
Some(work_journal_tx),
harness.chain.slot_clock.clone(),
chain.spec.maximum_gossip_clock_disparity(),
BeaconProcessorQueueLengths::from_state(
&chain.canonical_head.cached_head().snapshot.beacon_state,
&chain.spec,
)
.unwrap(),
);

assert!(beacon_processor.is_ok());
Expand Down
Loading