Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Add RestartHeaviestFork to Gossip #34161

Merged
merged 13 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ pub fn make_accounts_hashes_message(
pub(crate) type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>;

// TODO These messages should go through the gpu pipeline for spam filtering
#[frozen_abi(digest = "7a2P1GeQjyqCHMyBrhNPTKfPfG4iv32vki7XHahoN55z")]
#[frozen_abi(digest = "9kjVBD9aCZDcBvdtyTEoHVnQ6JUDJb6opYXvmhV43goJ")]
#[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)]
#[allow(clippy::large_enum_variant)]
pub(crate) enum Protocol {
Expand Down Expand Up @@ -395,6 +395,7 @@ fn retain_staked(values: &mut Vec<CrdsValue>, stakes: &HashMap<Pubkey, u64>) {
CrdsData::LowestSlot(_, _)
| CrdsData::LegacyVersion(_)
| CrdsData::DuplicateShred(_, _)
| CrdsData::RestartHeaviestFork(_)
| CrdsData::RestartLastVotedForkSlots(_) => {
let stake = stakes.get(&value.pubkey()).copied();
stake.unwrap_or_default() >= MIN_STAKE_FOR_GOSSIP
Expand Down
4 changes: 4 additions & 0 deletions gossip/src/cluster_info_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,8 @@ pub(crate) fn submit_gossip_stats(
crds_stats.pull.counts[12],
i64
),
("RestartHeaviestFork-push", crds_stats.push.counts[13], i64),
("RestartHeaviestFork-pull", crds_stats.pull.counts[13], i64),
(
"all-push",
crds_stats.push.counts.iter().sum::<usize>(),
Expand Down Expand Up @@ -684,6 +686,8 @@ pub(crate) fn submit_gossip_stats(
crds_stats.pull.fails[12],
i64
),
("RestartHeaviestFork-push", crds_stats.push.fails[13], i64),
("RestartHeaviestFork-pull", crds_stats.pull.fails[13], i64),
("all-push", crds_stats.push.fails.iter().sum::<usize>(), i64),
("all-pull", crds_stats.pull.fails.iter().sum::<usize>(), i64),
);
Expand Down
3 changes: 2 additions & 1 deletion gossip/src/crds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ pub enum GossipRoute<'a> {
PushMessage(/*from:*/ &'a Pubkey),
}

type CrdsCountsArray = [usize; 13];
type CrdsCountsArray = [usize; 14];

pub(crate) struct CrdsDataStats {
pub(crate) counts: CrdsCountsArray,
Expand Down Expand Up @@ -722,6 +722,7 @@ impl CrdsDataStats {
CrdsData::SnapshotHashes(_) => 10,
CrdsData::ContactInfo(_) => 11,
CrdsData::RestartLastVotedForkSlots(_) => 12,
CrdsData::RestartHeaviestFork(_) => 13,
// Update CrdsCountsArray if new items are added here.
}
}
Expand Down
15 changes: 13 additions & 2 deletions gossip/src/crds_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use {
duplicate_shred::{DuplicateShred, DuplicateShredIndex, MAX_DUPLICATE_SHREDS},
epoch_slots::EpochSlots,
legacy_contact_info::LegacyContactInfo,
restart_crds_values::RestartLastVotedForkSlots,
restart_crds_values::{RestartHeaviestFork, RestartLastVotedForkSlots},
},
bincode::{serialize, serialized_size},
rand::{CryptoRng, Rng},
Expand Down Expand Up @@ -96,6 +96,7 @@ pub enum CrdsData {
SnapshotHashes(SnapshotHashes),
ContactInfo(ContactInfo),
RestartLastVotedForkSlots(RestartLastVotedForkSlots),
RestartHeaviestFork(RestartHeaviestFork),
}

impl Sanitize for CrdsData {
Expand Down Expand Up @@ -135,6 +136,7 @@ impl Sanitize for CrdsData {
CrdsData::SnapshotHashes(val) => val.sanitize(),
CrdsData::ContactInfo(node) => node.sanitize(),
CrdsData::RestartLastVotedForkSlots(slots) => slots.sanitize(),
CrdsData::RestartHeaviestFork(fork) => fork.sanitize(),
}
}
}
Expand All @@ -148,7 +150,7 @@ pub(crate) fn new_rand_timestamp<R: Rng>(rng: &mut R) -> u64 {
impl CrdsData {
/// New random CrdsData for tests and benchmarks.
fn new_rand<R: Rng>(rng: &mut R, pubkey: Option<Pubkey>) -> CrdsData {
let kind = rng.gen_range(0..8);
let kind = rng.gen_range(0..9);
// TODO: Implement other kinds of CrdsData here.
// TODO: Assign ranges to each arm proportional to their frequency in
// the mainnet crds table.
Expand All @@ -163,6 +165,7 @@ impl CrdsData {
6 => CrdsData::RestartLastVotedForkSlots(RestartLastVotedForkSlots::new_rand(
rng, pubkey,
)),
7 => CrdsData::RestartHeaviestFork(RestartHeaviestFork::new_rand(rng, pubkey)),
_ => CrdsData::EpochSlots(
rng.gen_range(0..MAX_EPOCH_SLOTS),
EpochSlots::new_rand(rng, pubkey),
Expand Down Expand Up @@ -508,6 +511,7 @@ pub enum CrdsValueLabel {
SnapshotHashes(Pubkey),
ContactInfo(Pubkey),
RestartLastVotedForkSlots(Pubkey),
RestartHeaviestFork(Pubkey),
}

impl fmt::Display for CrdsValueLabel {
Expand All @@ -534,6 +538,9 @@ impl fmt::Display for CrdsValueLabel {
CrdsValueLabel::RestartLastVotedForkSlots(_) => {
write!(f, "RestartLastVotedForkSlots({})", self.pubkey())
}
CrdsValueLabel::RestartHeaviestFork(_) => {
write!(f, "RestartHeaviestFork({})", self.pubkey())
}
}
}
}
Expand All @@ -554,6 +561,7 @@ impl CrdsValueLabel {
CrdsValueLabel::SnapshotHashes(p) => *p,
CrdsValueLabel::ContactInfo(pubkey) => *pubkey,
CrdsValueLabel::RestartLastVotedForkSlots(p) => *p,
CrdsValueLabel::RestartHeaviestFork(p) => *p,
}
}
}
Expand Down Expand Up @@ -605,6 +613,7 @@ impl CrdsValue {
CrdsData::SnapshotHashes(hash) => hash.wallclock,
CrdsData::ContactInfo(node) => node.wallclock(),
CrdsData::RestartLastVotedForkSlots(slots) => slots.wallclock,
CrdsData::RestartHeaviestFork(fork) => fork.wallclock,
}
}
pub fn pubkey(&self) -> Pubkey {
Expand All @@ -622,6 +631,7 @@ impl CrdsValue {
CrdsData::SnapshotHashes(hash) => hash.from,
CrdsData::ContactInfo(node) => *node.pubkey(),
CrdsData::RestartLastVotedForkSlots(slots) => slots.from,
CrdsData::RestartHeaviestFork(fork) => fork.from,
}
}
pub fn label(&self) -> CrdsValueLabel {
Expand All @@ -643,6 +653,7 @@ impl CrdsValue {
CrdsData::RestartLastVotedForkSlots(_) => {
CrdsValueLabel::RestartLastVotedForkSlots(self.pubkey())
}
CrdsData::RestartHeaviestFork(_) => CrdsValueLabel::RestartHeaviestFork(self.pubkey()),
}
}
pub fn contact_info(&self) -> Option<&LegacyContactInfo> {
Expand Down
136 changes: 136 additions & 0 deletions gossip/src/restart_crds_values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,25 @@ pub enum RestartLastVotedForkSlotsError {
LastVotedForkEmpty,
}

#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, AbiExample, Debug)]
pub struct RestartHeaviestFork {
pub from: Pubkey,
pub wallclock: u64,
pub last_slot: Slot,
pub last_slot_hash: Hash,
// Sum of received heaviest fork validator stake / total stake * u16::MAX.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is "total stake"?
Is that the epoch stake or the stake observed in gossip?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.

received_heaviest_fork_ratio: u16,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would probably call this stake_ratio or observed_stake_ratio.
heaviest_fork is already in the name of struct; it is redundant to repeat it here again.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also why this u16 better than sending a f64 or the (stake, total_stake) as a tuple?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original idea is we can save some bytes, but now you mention it, it's not really worth losing the precision by switching from 16 bytes to 2 bytes, I'm changing to (stake, total_stake) now.

pub shred_version: u16,
}

#[derive(Debug, Error, PartialEq)]
pub enum RestartHeaviestForkError {
#[error("Received stake larger than total stake")]
ReceivedHeaviestForkStakeLargerThanTotalStake,
#[error("Total stake of received heaviest fork cannot be zero")]
ReceivedHeaviestForkStakeZero,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to repeat the enum name in the variants.
The variants are already qualified by the enum name, ie. RestartHeaviestForkError::....

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.

}

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, AbiExample, AbiEnumVisitor)]
enum SlotsOffsets {
RunLengthEncoding(RunLengthEncoding),
Expand Down Expand Up @@ -122,6 +141,64 @@ impl RestartLastVotedForkSlots {
}
}

impl Sanitize for RestartHeaviestFork {
fn sanitize(&self) -> Result<(), SanitizeError> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we sanitize the wallclock like the other crds values?
also probably that should be done for the previous restart crds value as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

if self.received_heaviest_fork_ratio == 0 {
// this should at least include its own stake.
return Err(SanitizeError::ValueOutOfBounds);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if node_stake / total_stake * u16::MAX < 1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

self.last_slot_hash.sanitize()
}
}

impl RestartHeaviestFork {
pub fn new(
from: Pubkey,
now: u64,
last_slot: Slot,
last_slot_hash: Hash,
received_heaviest_fork_stake: u64,
total_stake: u64,
shred_version: u16,
) -> Result<Self, RestartHeaviestForkError> {
if received_heaviest_fork_stake == 0 {
return Err(RestartHeaviestForkError::ReceivedHeaviestForkStakeZero);
}
if received_heaviest_fork_stake > total_stake {
return Err(RestartHeaviestForkError::ReceivedHeaviestForkStakeLargerThanTotalStake);
}
let received_heaviest_fork_ratio =
((received_heaviest_fork_stake as f64) / (total_stake as f64) * (u16::MAX as f64))
.round() as u16;
Ok(Self {
from,
wallclock: now,
last_slot,
last_slot_hash,
received_heaviest_fork_ratio,
shred_version,
})
}

pub fn new_rand<R: Rng>(rng: &mut R, pubkey: Option<Pubkey>) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be pub?
I feel like every review I have to ask not to pollute public interface.

Can you go over all pubs in this code and remove those not needed?
or at least change them to pub(crate).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used in tests, and I don't see any benchmarks needing it now. So I changed it to pub(crate).

let pubkey = pubkey.unwrap_or_else(solana_sdk::pubkey::new_rand);
Self::new(
pubkey,
new_rand_timestamp(rng),
rng.gen_range(0..1000),
Hash::new_unique(),
rng.gen_range(1..u64::MAX),
u64::MAX,
1,
)
.unwrap()
}

pub fn get_ratio(&self) -> f64 {
self.received_heaviest_fork_ratio as f64 / u16::MAX as f64
}
}

impl RunLengthEncoding {
fn new(bits: &BitVec<u8>) -> Self {
let encoded = (0..bits.len())
Expand Down Expand Up @@ -188,6 +265,7 @@ mod test {
crds_value::{CrdsData, CrdsValue, CrdsValueLabel},
},
bincode::serialized_size,
num_traits::abs,
solana_sdk::{signature::Signer, signer::keypair::Keypair, timing::timestamp},
std::iter::repeat_with,
};
Expand Down Expand Up @@ -317,4 +395,62 @@ mod test {
let range: Vec<Slot> = make_rand_slots(&mut rng).take(large_length).collect();
check_run_length_encoding(range);
}

#[test]
fn test_restart_heaviest_fork() {
let keypair = Keypair::new();
let slot = 53;
// received_heaviest_fork_percent can never be 0.
assert_eq!(
RestartHeaviestFork::new(
keypair.pubkey(),
timestamp(),
slot,
Hash::default(),
0,
1_000_000,
1,
),
Err(RestartHeaviestForkError::ReceivedHeaviestForkStakeZero)
);
assert_eq!(
RestartHeaviestFork::new(
keypair.pubkey(),
timestamp(),
slot,
Hash::default(),
1_000_000,
0,
1,
),
Err(RestartHeaviestForkError::ReceivedHeaviestForkStakeLargerThanTotalStake)
);
assert_eq!(
RestartHeaviestFork::new(
keypair.pubkey(),
timestamp(),
slot,
Hash::default(),
1_000_001,
1_000_000,
1,
),
Err(RestartHeaviestForkError::ReceivedHeaviestForkStakeLargerThanTotalStake)
);
let mut fork = RestartHeaviestFork::new(
keypair.pubkey(),
timestamp(),
slot,
Hash::default(),
800_000,
1_000_000,
1,
)
.unwrap();
assert_eq!(fork.sanitize(), Ok(()));
assert!(abs(fork.get_ratio() - 0.8) < 0.0001);

fork.received_heaviest_fork_ratio = 0;
assert_eq!(fork.sanitize(), Err(SanitizeError::ValueOutOfBounds));
}
}