Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Add RestartHeaviestFork to Gossip #34161

Merged
merged 13 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ pub fn make_accounts_hashes_message(
pub(crate) type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>;

// TODO These messages should go through the gpu pipeline for spam filtering
#[frozen_abi(digest = "7a2P1GeQjyqCHMyBrhNPTKfPfG4iv32vki7XHahoN55z")]
#[frozen_abi(digest = "ogEqvffeEkPpojAaSiUbCv2HdJcdXDQ1ykgYyvKvLo2")]
#[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)]
#[allow(clippy::large_enum_variant)]
pub(crate) enum Protocol {
Expand Down Expand Up @@ -395,6 +395,7 @@ fn retain_staked(values: &mut Vec<CrdsValue>, stakes: &HashMap<Pubkey, u64>) {
CrdsData::LowestSlot(_, _)
| CrdsData::LegacyVersion(_)
| CrdsData::DuplicateShred(_, _)
| CrdsData::RestartHeaviestFork(_)
| CrdsData::RestartLastVotedForkSlots(_) => {
let stake = stakes.get(&value.pubkey()).copied();
stake.unwrap_or_default() >= MIN_STAKE_FOR_GOSSIP
Expand Down
4 changes: 4 additions & 0 deletions gossip/src/cluster_info_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,8 @@ pub(crate) fn submit_gossip_stats(
crds_stats.pull.counts[12],
i64
),
("RestartHeaviestFork-push", crds_stats.push.counts[13], i64),
("RestartHeaviestFork-pull", crds_stats.pull.counts[13], i64),
(
"all-push",
crds_stats.push.counts.iter().sum::<usize>(),
Expand Down Expand Up @@ -684,6 +686,8 @@ pub(crate) fn submit_gossip_stats(
crds_stats.pull.fails[12],
i64
),
("RestartHeaviestFork-push", crds_stats.push.fails[13], i64),
("RestartHeaviestFork-pull", crds_stats.pull.fails[13], i64),
("all-push", crds_stats.push.fails.iter().sum::<usize>(), i64),
("all-pull", crds_stats.pull.fails.iter().sum::<usize>(), i64),
);
Expand Down
3 changes: 2 additions & 1 deletion gossip/src/crds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ pub enum GossipRoute<'a> {
PushMessage(/*from:*/ &'a Pubkey),
}

type CrdsCountsArray = [usize; 13];
type CrdsCountsArray = [usize; 14];

pub(crate) struct CrdsDataStats {
pub(crate) counts: CrdsCountsArray,
Expand Down Expand Up @@ -722,6 +722,7 @@ impl CrdsDataStats {
CrdsData::SnapshotHashes(_) => 10,
CrdsData::ContactInfo(_) => 11,
CrdsData::RestartLastVotedForkSlots(_) => 12,
CrdsData::RestartHeaviestFork(_) => 13,
// Update CrdsCountsArray if new items are added here.
}
}
Expand Down
15 changes: 13 additions & 2 deletions gossip/src/crds_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use {
duplicate_shred::{DuplicateShred, DuplicateShredIndex, MAX_DUPLICATE_SHREDS},
epoch_slots::EpochSlots,
legacy_contact_info::LegacyContactInfo,
restart_crds_values::RestartLastVotedForkSlots,
restart_crds_values::{RestartHeaviestFork, RestartLastVotedForkSlots},
},
bincode::{serialize, serialized_size},
rand::{CryptoRng, Rng},
Expand Down Expand Up @@ -96,6 +96,7 @@ pub enum CrdsData {
SnapshotHashes(SnapshotHashes),
ContactInfo(ContactInfo),
RestartLastVotedForkSlots(RestartLastVotedForkSlots),
RestartHeaviestFork(RestartHeaviestFork),
}

impl Sanitize for CrdsData {
Expand Down Expand Up @@ -135,6 +136,7 @@ impl Sanitize for CrdsData {
CrdsData::SnapshotHashes(val) => val.sanitize(),
CrdsData::ContactInfo(node) => node.sanitize(),
CrdsData::RestartLastVotedForkSlots(slots) => slots.sanitize(),
CrdsData::RestartHeaviestFork(fork) => fork.sanitize(),
}
}
}
Expand All @@ -148,7 +150,7 @@ pub(crate) fn new_rand_timestamp<R: Rng>(rng: &mut R) -> u64 {
impl CrdsData {
/// New random CrdsData for tests and benchmarks.
fn new_rand<R: Rng>(rng: &mut R, pubkey: Option<Pubkey>) -> CrdsData {
let kind = rng.gen_range(0..8);
let kind = rng.gen_range(0..9);
// TODO: Implement other kinds of CrdsData here.
// TODO: Assign ranges to each arm proportional to their frequency in
// the mainnet crds table.
Expand All @@ -163,6 +165,7 @@ impl CrdsData {
6 => CrdsData::RestartLastVotedForkSlots(RestartLastVotedForkSlots::new_rand(
rng, pubkey,
)),
7 => CrdsData::RestartHeaviestFork(RestartHeaviestFork::new_rand(rng, pubkey)),
_ => CrdsData::EpochSlots(
rng.gen_range(0..MAX_EPOCH_SLOTS),
EpochSlots::new_rand(rng, pubkey),
Expand Down Expand Up @@ -508,6 +511,7 @@ pub enum CrdsValueLabel {
SnapshotHashes(Pubkey),
ContactInfo(Pubkey),
RestartLastVotedForkSlots(Pubkey),
RestartHeaviestFork(Pubkey),
}

impl fmt::Display for CrdsValueLabel {
Expand All @@ -534,6 +538,9 @@ impl fmt::Display for CrdsValueLabel {
CrdsValueLabel::RestartLastVotedForkSlots(_) => {
write!(f, "RestartLastVotedForkSlots({})", self.pubkey())
}
CrdsValueLabel::RestartHeaviestFork(_) => {
write!(f, "RestartHeaviestFork({})", self.pubkey())
}
}
}
}
Expand All @@ -554,6 +561,7 @@ impl CrdsValueLabel {
CrdsValueLabel::SnapshotHashes(p) => *p,
CrdsValueLabel::ContactInfo(pubkey) => *pubkey,
CrdsValueLabel::RestartLastVotedForkSlots(p) => *p,
CrdsValueLabel::RestartHeaviestFork(p) => *p,
}
}
}
Expand Down Expand Up @@ -605,6 +613,7 @@ impl CrdsValue {
CrdsData::SnapshotHashes(hash) => hash.wallclock,
CrdsData::ContactInfo(node) => node.wallclock(),
CrdsData::RestartLastVotedForkSlots(slots) => slots.wallclock,
CrdsData::RestartHeaviestFork(fork) => fork.wallclock,
}
}
pub fn pubkey(&self) -> Pubkey {
Expand All @@ -622,6 +631,7 @@ impl CrdsValue {
CrdsData::SnapshotHashes(hash) => hash.from,
CrdsData::ContactInfo(node) => *node.pubkey(),
CrdsData::RestartLastVotedForkSlots(slots) => slots.from,
CrdsData::RestartHeaviestFork(fork) => fork.from,
}
}
pub fn label(&self) -> CrdsValueLabel {
Expand All @@ -643,6 +653,7 @@ impl CrdsValue {
CrdsData::RestartLastVotedForkSlots(_) => {
CrdsValueLabel::RestartLastVotedForkSlots(self.pubkey())
}
CrdsData::RestartHeaviestFork(_) => CrdsValueLabel::RestartHeaviestFork(self.pubkey()),
}
}
pub fn contact_info(&self) -> Option<&LegacyContactInfo> {
Expand Down
54 changes: 52 additions & 2 deletions gossip/src/restart_crds_values.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use {
crate::crds_value::new_rand_timestamp,
crate::crds_value::{new_rand_timestamp, sanitize_wallclock},
bv::BitVec,
itertools::Itertools,
rand::Rng,
Expand Down Expand Up @@ -29,6 +29,16 @@ pub enum RestartLastVotedForkSlotsError {
LastVotedForkEmpty,
}

#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, AbiExample, Debug)]
pub struct RestartHeaviestFork {
pub from: Pubkey,
pub wallclock: u64,
pub last_slot: Slot,
pub last_slot_hash: Hash,
pub observed_stake: u64,
pub shred_version: u16,
}

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, AbiExample, AbiEnumVisitor)]
enum SlotsOffsets {
RunLengthEncoding(RunLengthEncoding),
Expand All @@ -48,6 +58,7 @@ struct RawOffsets(BitVec<u8>);

impl Sanitize for RestartLastVotedForkSlots {
fn sanitize(&self) -> std::result::Result<(), SanitizeError> {
sanitize_wallclock(self.wallclock)?;
self.last_voted_hash.sanitize()
}
}
Expand Down Expand Up @@ -94,7 +105,7 @@ impl RestartLastVotedForkSlots {
}

/// New random Version for tests and benchmarks.
pub fn new_rand<R: Rng>(rng: &mut R, pubkey: Option<Pubkey>) -> Self {
pub(crate) fn new_rand<R: Rng>(rng: &mut R, pubkey: Option<Pubkey>) -> Self {
let pubkey = pubkey.unwrap_or_else(solana_sdk::pubkey::new_rand);
let num_slots = rng.gen_range(2..20);
let slots = std::iter::repeat_with(|| 47825632 + rng.gen_range(0..512))
Expand Down Expand Up @@ -122,6 +133,27 @@ impl RestartLastVotedForkSlots {
}
}

impl Sanitize for RestartHeaviestFork {
fn sanitize(&self) -> Result<(), SanitizeError> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we sanitize the wallclock like the other crds values?
also probably that should be done for the previous restart crds value as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

sanitize_wallclock(self.wallclock)?;
self.last_slot_hash.sanitize()
}
}

impl RestartHeaviestFork {
pub(crate) fn new_rand<R: Rng>(rng: &mut R, from: Option<Pubkey>) -> Self {
let from = from.unwrap_or_else(solana_sdk::pubkey::new_rand);
Self {
from,
wallclock: new_rand_timestamp(rng),
last_slot: rng.gen_range(0..1000),
last_slot_hash: Hash::new_unique(),
observed_stake: rng.gen_range(1..u64::MAX),
shred_version: 1,
}
}
}

impl RunLengthEncoding {
fn new(bits: &BitVec<u8>) -> Self {
let encoded = (0..bits.len())
Expand Down Expand Up @@ -317,4 +349,22 @@ mod test {
let range: Vec<Slot> = make_rand_slots(&mut rng).take(large_length).collect();
check_run_length_encoding(range);
}

#[test]
fn test_restart_heaviest_fork() {
let keypair = Keypair::new();
let slot = 53;
let mut fork = RestartHeaviestFork {
from: keypair.pubkey(),
wallclock: timestamp(),
last_slot: slot,
last_slot_hash: Hash::default(),
observed_stake: 800_000,
shred_version: 1,
};
assert_eq!(fork.sanitize(), Ok(()));
assert_eq!(fork.observed_stake, 800_000);
fork.wallclock = crate::crds_value::MAX_WALLCLOCK;
assert_eq!(fork.sanitize(), Err(SanitizeError::ValueOutOfBounds));
}
}