Skip to content

Commit

Permalink
Reduce attestation subscription spam from VC (sigp#4806)
Browse files Browse the repository at this point in the history
## Proposed Changes

Instead of sending every attestation subscription every slot to every BN:

- Send subscriptions 32, 16, 8, 7, 6, 5, 4, 3 slots before they occur.
- Track whether each subscription is sent successfully and retry it in subsequent slots if necessary.

## Additional Info

- [x] Add unit tests for `SubscriptionSlots`.
- [x] Test on Holesky.
- [x] Based on sigp#4774 for testing.
  • Loading branch information
michaelsproul authored and Woodpile37 committed Jan 6, 2024
1 parent 653f6a1 commit 1678b09
Showing 1 changed file with 177 additions and 15 deletions.
192 changes: 177 additions & 15 deletions validator_client/src/duties_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,19 @@ use eth2::types::{
};
use futures::{stream, StreamExt};
use parking_lot::RwLock;
use safe_arith::ArithError;
use safe_arith::{ArithError, SafeArith};
use slog::{debug, error, info, warn, Logger};
use slot_clock::SlotClock;
use std::cmp::min;
use std::collections::{hash_map, BTreeMap, HashMap, HashSet};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use sync::poll_sync_committee_duties;
use sync::SyncDutiesMap;
use tokio::{sync::mpsc::Sender, time::sleep};
use types::{ChainSpec, Epoch, EthSpec, Hash256, PublicKeyBytes, SelectionProof, Slot};

/// Since the BN does not like it when we subscribe to slots that are close to the current time, we
/// will only subscribe to slots which are further than `SUBSCRIPTION_BUFFER_SLOTS` away.
///
/// This number is based upon `MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD` value in the
/// `beacon_node::network::attestation_service` crate. It is not imported directly to avoid
/// bringing in the entire crate.
const SUBSCRIPTION_BUFFER_SLOTS: u64 = 2;

/// Only retain `HISTORICAL_DUTIES_EPOCHS` duties prior to the current epoch.
const HISTORICAL_DUTIES_EPOCHS: u64 = 2;

Expand All @@ -62,6 +55,36 @@ const VALIDATOR_METRICS_MIN_COUNT: usize = 64;
/// reduces the amount of data that needs to be transferred.
const INITIAL_DUTIES_QUERY_SIZE: usize = 1;

/// Offsets from the attestation duty slot at which a subscription should be sent.
const ATTESTATION_SUBSCRIPTION_OFFSETS: [u64; 8] = [3, 4, 5, 6, 7, 8, 16, 32];

/// Check that `ATTESTATION_SUBSCRIPTION_OFFSETS` is sorted ascendingly.
const _: () = assert!({
let mut i = 0;
loop {
let prev = if i > 0 {
ATTESTATION_SUBSCRIPTION_OFFSETS[i - 1]
} else {
0
};
let curr = ATTESTATION_SUBSCRIPTION_OFFSETS[i];
if curr < prev {
break false;
}
i += 1;
if i == ATTESTATION_SUBSCRIPTION_OFFSETS.len() {
break true;
}
}
});
/// Since the BN does not like it when we subscribe to slots that are close to the current time, we
/// will only subscribe to slots which are further than 2 slots away.
///
/// This number is based upon `MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD` value in the
/// `beacon_node::network::attestation_service` crate. It is not imported directly to avoid
/// bringing in the entire crate.
const _: () = assert!(ATTESTATION_SUBSCRIPTION_OFFSETS[0] > 2);

#[derive(Debug)]
pub enum Error {
UnableToReadSlotClock,
Expand All @@ -84,6 +107,16 @@ pub struct DutyAndProof {
pub duty: AttesterData,
/// This value is only set to `Some` if the proof indicates that the validator is an aggregator.
pub selection_proof: Option<SelectionProof>,
/// Track which slots we should send subscriptions at for this duty.
///
/// This value is updated after each subscription is successfully sent.
pub subscription_slots: Arc<SubscriptionSlots>,
}

/// Tracker containing the slots at which an attestation subscription should be sent.
pub struct SubscriptionSlots {
/// Pairs of `(slot, already_sent)` in slot-descending order.
slots: Vec<(Slot, AtomicBool)>,
}

impl DutyAndProof {
Expand Down Expand Up @@ -111,17 +144,55 @@ impl DutyAndProof {
}
})?;

let subscription_slots = SubscriptionSlots::new(duty.slot);

Ok(Self {
duty,
selection_proof,
subscription_slots,
})
}

/// Create a new `DutyAndProof` with the selection proof waiting to be filled in.
pub fn new_without_selection_proof(duty: AttesterData) -> Self {
let subscription_slots = SubscriptionSlots::new(duty.slot);
Self {
duty,
selection_proof: None,
subscription_slots,
}
}
}

impl SubscriptionSlots {
fn new(duty_slot: Slot) -> Arc<Self> {
let slots = ATTESTATION_SUBSCRIPTION_OFFSETS
.into_iter()
.filter_map(|offset| duty_slot.safe_sub(offset).ok())
.map(|scheduled_slot| (scheduled_slot, AtomicBool::new(false)))
.collect();
Arc::new(Self { slots })
}

/// Return `true` if we should send a subscription at `slot`.
fn should_send_subscription_at(&self, slot: Slot) -> bool {
// Iterate slots from smallest to largest looking for one that hasn't been completed yet.
self.slots
.iter()
.rev()
.any(|(scheduled_slot, already_sent)| {
slot >= *scheduled_slot && !already_sent.load(Ordering::Relaxed)
})
}

/// Update our record of subscribed slots to account for successful subscription at `slot`.
fn record_successful_subscription_at(&self, slot: Slot) {
for (scheduled_slot, already_sent) in self.slots.iter().rev() {
if slot >= *scheduled_slot {
already_sent.store(true, Ordering::Relaxed);
} else {
break;
}
}
}
}
Expand Down Expand Up @@ -574,8 +645,24 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
let subscriptions_timer =
metrics::start_timer_vec(&metrics::DUTIES_SERVICE_TIMES, &[metrics::SUBSCRIPTIONS]);

// This vector is likely to be a little oversized, but it won't reallocate.
let mut subscriptions = Vec::with_capacity(local_pubkeys.len() * 2);
// This vector is intentionally oversized by 10% so that it won't reallocate.
// Each validator has 2 attestation duties occuring in the current and next epoch, for which
// they must send `ATTESTATION_SUBSCRIPTION_OFFSETS.len()` subscriptions. These subscription
// slots are approximately evenly distributed over the two epochs, usually with a slight lag
// that balances out (some subscriptions for the current epoch were sent in the previous, and
// some subscriptions for the next next epoch will be sent in the next epoch but aren't included
// in our calculation). We cancel the factor of 2 from the formula for simplicity.
let overallocation_numerator = 110;
let overallocation_denominator = 100;
let num_expected_subscriptions = overallocation_numerator
* std::cmp::max(
1,
local_pubkeys.len() * ATTESTATION_SUBSCRIPTION_OFFSETS.len()
/ E::slots_per_epoch() as usize,
)
/ overallocation_denominator;
let mut subscriptions = Vec::with_capacity(num_expected_subscriptions);
let mut subscription_slots_to_confirm = Vec::with_capacity(num_expected_subscriptions);

// For this epoch and the next epoch, produce any beacon committee subscriptions.
//
Expand All @@ -588,10 +675,10 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
.read()
.iter()
.filter_map(|(_, map)| map.get(epoch))
// The BN logs a warning if we try and subscribe to current or near-by slots. Give it a
// buffer.
.filter(|(_, duty_and_proof)| {
current_slot + SUBSCRIPTION_BUFFER_SLOTS < duty_and_proof.duty.slot
duty_and_proof
.subscription_slots
.should_send_subscription_at(current_slot)
})
.for_each(|(_, duty_and_proof)| {
let duty = &duty_and_proof.duty;
Expand All @@ -603,7 +690,8 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
committees_at_slot: duty.committees_at_slot,
slot: duty.slot,
is_aggregator,
})
});
subscription_slots_to_confirm.push(duty_and_proof.subscription_slots.clone());
});
}

Expand Down Expand Up @@ -632,6 +720,16 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
"Failed to subscribe validators";
"error" => %e
)
} else {
// Record that subscriptions were successfully sent.
debug!(
log,
"Broadcast attestation subscriptions";
"count" => subscriptions.len(),
);
for subscription_slots in subscription_slots_to_confirm {
subscription_slots.record_successful_subscription_at(current_slot);
}
}
}

Expand Down Expand Up @@ -1200,3 +1298,67 @@ async fn notify_block_production_service<T: SlotClock + 'static, E: EthSpec>(
};
}
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn subscription_slots_exact() {
for duty_slot in [
Slot::new(32),
Slot::new(47),
Slot::new(99),
Slot::new(1002003),
] {
let subscription_slots = SubscriptionSlots::new(duty_slot);

// Run twice to check idempotence (subscription slots shouldn't be marked as done until
// we mark them manually).
for _ in 0..2 {
for offset in ATTESTATION_SUBSCRIPTION_OFFSETS {
assert!(subscription_slots.should_send_subscription_at(duty_slot - offset));
}
}

// Mark each slot as complete and check that all prior slots are still marked
// incomplete.
for (i, offset) in ATTESTATION_SUBSCRIPTION_OFFSETS
.into_iter()
.rev()
.enumerate()
{
subscription_slots.record_successful_subscription_at(duty_slot - offset);
for lower_offset in ATTESTATION_SUBSCRIPTION_OFFSETS
.into_iter()
.rev()
.skip(i + 1)
{
assert!(lower_offset < offset);
assert!(
subscription_slots.should_send_subscription_at(duty_slot - lower_offset)
);
}
}
}
}
#[test]
fn subscription_slots_mark_multiple() {
for (i, offset) in ATTESTATION_SUBSCRIPTION_OFFSETS.into_iter().enumerate() {
let duty_slot = Slot::new(64);
let subscription_slots = SubscriptionSlots::new(duty_slot);

subscription_slots.record_successful_subscription_at(duty_slot - offset);

// All past offsets (earlier slots) should be marked as complete.
for (j, other_offset) in ATTESTATION_SUBSCRIPTION_OFFSETS.into_iter().enumerate() {
let past = j >= i;
assert_eq!(other_offset >= offset, past);
assert_eq!(
subscription_slots.should_send_subscription_at(duty_slot - other_offset),
!past
);
}
}
}
}

0 comments on commit 1678b09

Please sign in to comment.