From ce81bbcda8c1c8d8b42739ac76da6c470a1996db Mon Sep 17 00:00:00 2001 From: Wen Date: Wed, 13 Sep 2023 11:46:55 -0700 Subject: [PATCH 01/13] Add RestartLastVotedForkSlots and RestartHeaviestFork for wen_restart. --- gossip/src/cluster_info.rs | 310 ++++++++++++++++++++++++++--- gossip/src/cluster_info_metrics.rs | 24 +++ gossip/src/crds.rs | 198 +++++++++++++++++- gossip/src/crds_value.rs | 144 +++++++++++++- 4 files changed, 646 insertions(+), 30 deletions(-) diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index d5b2b447ea9422..7363a143d638fd 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -33,8 +33,8 @@ use { }, crds_value::{ self, AccountsHashes, CrdsData, CrdsValue, CrdsValueLabel, EpochSlotsIndex, - LegacySnapshotHashes, LowestSlot, NodeInstance, SnapshotHashes, Version, Vote, - MAX_WALLCLOCK, + LegacySnapshotHashes, LowestSlot, NodeInstance, Percent, SnapshotHashes, Version, Vote, + MAX_PERCENT, MAX_WALLCLOCK, }, duplicate_shred::DuplicateShred, epoch_slots::EpochSlots, @@ -397,7 +397,9 @@ fn retain_staked(values: &mut Vec, stakes: &HashMap) { CrdsData::AccountsHashes(_) => true, CrdsData::LowestSlot(_, _) | CrdsData::LegacyVersion(_) - | CrdsData::DuplicateShred(_, _) => { + | CrdsData::DuplicateShred(_, _) + | CrdsData::RestartLastVotedForkSlots(_, _, _, _) + | CrdsData::RestartHeaviestFork(_, _, _) => { let stake = stakes.get(&value.pubkey()).copied(); stake.unwrap_or_default() >= MIN_STAKE_FOR_GOSSIP } @@ -722,9 +724,8 @@ impl ClusterInfo { self.my_contact_info.read().unwrap().shred_version() } - fn lookup_epoch_slots(&self, ix: EpochSlotsIndex) -> EpochSlots { + fn lookup_epoch_slots(&self, label: CrdsValueLabel) -> EpochSlots { let self_pubkey = self.id(); - let label = CrdsValueLabel::EpochSlots(ix, self_pubkey); let gossip_crds = self.gossip.crds.read().unwrap(); gossip_crds .get::<&CrdsValue>(&label) @@ -899,16 +900,42 @@ impl ClusterInfo { } } + pub fn push_epoch_slots(&self, update: &[Slot]) { + self.push_epoch_slots_or_restart_last_voted_fork_slots(update, None) + } + + pub fn push_restart_last_voted_fork_slots(&self, update: &[Slot], last_vote_bankhash: Hash) { + self.push_epoch_slots_or_restart_last_voted_fork_slots(update, Some(last_vote_bankhash)) + } + // TODO: If two threads call into this function then epoch_slot_index has a // race condition and the threads will overwrite each other in crds table. - pub fn push_epoch_slots(&self, mut update: &[Slot]) { + fn push_epoch_slots_or_restart_last_voted_fork_slots( + &self, + mut update: &[Slot], + last_vote_bankhash: Option, + ) { + let is_epoch_slot = last_vote_bankhash.is_none(); let self_pubkey = self.id(); + let create_label = |ix| { + if is_epoch_slot { + CrdsValueLabel::EpochSlots(ix, self_pubkey) + } else { + CrdsValueLabel::RestartLastVotedForkSlots(ix, self_pubkey) + } + }; + let max_entries = if is_epoch_slot { + crds_value::MAX_EPOCH_SLOTS + } else { + crds_value::MAX_RESTART_LAST_VOTED_FORK_SLOTS + }; + let last_vote_slot = last_vote_bankhash.map(|_| *update.last().unwrap()); let current_slots: Vec<_> = { let gossip_crds = self.time_gossip_read_lock("lookup_epoch_slots", &self.stats.epoch_slots_lookup); - (0..crds_value::MAX_EPOCH_SLOTS) + (0..max_entries) .filter_map(|ix| { - let label = CrdsValueLabel::EpochSlots(ix, self_pubkey); + let label = create_label(ix); let crds_value = gossip_crds.get::<&CrdsValue>(&label)?; let epoch_slots = crds_value.epoch_slots()?; let first_slot = epoch_slots.first_slot()?; @@ -922,17 +949,19 @@ impl ClusterInfo { .min() .unwrap_or_default(); let max_slot: Slot = update.iter().max().cloned().unwrap_or(0); - let total_slots = max_slot as isize - min_slot as isize; - // WARN if CRDS is not storing at least a full epoch worth of slots - if DEFAULT_SLOTS_PER_EPOCH as isize > total_slots - && crds_value::MAX_EPOCH_SLOTS as usize <= current_slots.len() - { - self.stats.epoch_slots_filled.add_relaxed(1); - warn!( - "EPOCH_SLOTS are filling up FAST {}/{}", - total_slots, - current_slots.len() - ); + if is_epoch_slot { + let total_slots = max_slot as isize - min_slot as isize; + // WARN if CRDS is not storing at least a full epoch worth of slots + if DEFAULT_SLOTS_PER_EPOCH as isize > total_slots + && crds_value::MAX_EPOCH_SLOTS as usize <= current_slots.len() + { + self.stats.epoch_slots_filled.add_relaxed(1); + warn!( + "EPOCH_SLOTS are filling up FAST {}/{}", + total_slots, + current_slots.len() + ); + } } let mut reset = false; let mut epoch_slot_index = match current_slots.iter().max() { @@ -942,18 +971,27 @@ impl ClusterInfo { let mut entries = Vec::default(); let keypair = self.keypair(); while !update.is_empty() { - let ix = epoch_slot_index % crds_value::MAX_EPOCH_SLOTS; + let ix = epoch_slot_index % max_entries; let now = timestamp(); let mut slots = if !reset { - self.lookup_epoch_slots(ix) + self.lookup_epoch_slots(create_label(ix)) } else { EpochSlots::new(self_pubkey, now) }; let n = slots.fill(update, now); update = &update[n..]; if n > 0 { - let epoch_slots = CrdsData::EpochSlots(ix, slots); - let entry = CrdsValue::new_signed(epoch_slots, &keypair); + let data = if is_epoch_slot { + CrdsData::EpochSlots(ix, slots) + } else { + CrdsData::RestartLastVotedForkSlots( + ix, + slots, + last_vote_slot.unwrap(), + last_vote_bankhash.unwrap(), + ) + }; + let entry = CrdsValue::new_signed(data, &keypair); entries.push(entry); } epoch_slot_index += 1; @@ -963,11 +1001,36 @@ impl ClusterInfo { let now = timestamp(); for entry in entries { if let Err(err) = gossip_crds.insert(entry, now, GossipRoute::LocalMessage) { - error!("push_epoch_slots failed: {:?}", err); + error!( + "push_epoch_slots_or_restart_last_voted_fork_slots failed: {:?}", + err + ); } } } + pub fn push_restart_heaviest_fork( + &self, + slot: Slot, + hash: Hash, + percent: f64, + ) -> Result<(), String> { + if !(0.0..1.0).contains(&percent) { + return Err(format!( + "heaviest_fork with out of bound percent, ignored: {}", + percent + )); + } + + let message = CrdsData::RestartHeaviestFork( + slot, + hash, + Percent::new(self.id(), (percent * MAX_PERCENT as f64) as u16), + ); + self.push_message(CrdsValue::new_signed(message, &self.keypair())); + Ok(()) + } + fn time_gossip_read_lock<'a>( &'a self, label: &'static str, @@ -1243,6 +1306,49 @@ impl ClusterInfo { .collect() } + /// Returns last-voted-fork-slots inserted since the given cursor. + /// Excludes entries from nodes with unkown or different shred version. + pub fn get_restart_last_voted_fork_slots( + &self, + cursor: &mut Cursor, + ) -> Vec<(EpochSlotsIndex, EpochSlots, Slot, Hash)> { + let self_shred_version = Some(self.my_shred_version()); + let gossip_crds = self.gossip.crds.read().unwrap(); + gossip_crds + .get_restart_last_voted_fork_slots(cursor) + .filter(|entry| { + let origin = entry.value.pubkey(); + gossip_crds.get_shred_version(&origin) == self_shred_version + }) + .map(|entry| match &entry.value.data { + CrdsData::RestartLastVotedForkSlots(index, slots, last_vote, last_vote_hash) => { + (*index, slots.clone(), *last_vote, *last_vote_hash) + } + _ => panic!("this should not happen!"), + }) + .collect() + } + + /// Returns heaviest-fork inserted since the given cursor. + /// Excludes entries from nodes with unkown or different shred version. + pub fn get_restart_heaviest_fork(&self, cursor: &mut Cursor) -> Vec<(Slot, Hash, Percent)> { + let self_shred_version = Some(self.my_shred_version()); + let gossip_crds = self.gossip.crds.read().unwrap(); + gossip_crds + .get_restart_heaviest_fork(cursor) + .filter(|entry| { + let origin = entry.value.pubkey(); + gossip_crds.get_shred_version(&origin) == self_shred_version + }) + .map(|entry| match &entry.value.data { + CrdsData::RestartHeaviestFork(last_vote, last_vote_hash, percent) => { + (*last_vote, *last_vote_hash, percent.clone()) + } + _ => panic!("this should not happen!"), + }) + .collect() + } + /// Returns duplicate-shreds inserted since the given cursor. pub(crate) fn get_duplicate_shreds(&self, cursor: &mut Cursor) -> Vec { let gossip_crds = self.gossip.crds.read().unwrap(); @@ -4003,6 +4109,162 @@ mod tests { assert_eq!(slots[1].from, node_pubkey); } + #[test] + fn test_push_restart_last_voted_fork_slots() { + solana_logger::setup(); + let keypair = Arc::new(Keypair::new()); + let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), 0); + let cluster_info = ClusterInfo::new(contact_info, keypair, SocketAddrSpace::Unspecified); + let slots = cluster_info.get_restart_last_voted_fork_slots(&mut Cursor::default()); + assert!(slots.is_empty()); + let mut update: Vec = vec![0]; + for i in 0..81 { + for j in 0..1000 { + update.push(i * 1050 + j); + } + } + cluster_info.push_restart_last_voted_fork_slots(&update, Hash::default()); + + let mut cursor = Cursor::default(); + let slots = cluster_info.get_restart_last_voted_fork_slots(&mut cursor); + assert_eq!(slots.len(), 2); + assert_eq!(slots[0].1.to_slots(0).len(), 42468); + assert_eq!(slots[1].1.to_slots(0).len(), 38532); + + let slots = cluster_info.get_restart_last_voted_fork_slots(&mut cursor); + assert!(slots.is_empty()); + + // Test with different shred versions. + let mut rng = rand::thread_rng(); + let node_pubkey = Pubkey::new_unique(); + let mut node = LegacyContactInfo::new_rand(&mut rng, Some(node_pubkey)); + node.set_shred_version(42); + let epoch_slots = EpochSlots::new_rand(&mut rng, Some(node_pubkey)); + let entries = vec![ + CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(node)), + CrdsValue::new_unsigned(CrdsData::RestartLastVotedForkSlots( + 0, + epoch_slots, + 0, + Hash::default(), + )), + ]; + { + let mut gossip_crds = cluster_info.gossip.crds.write().unwrap(); + for entry in entries { + assert!(gossip_crds + .insert(entry, /*now=*/ 0, GossipRoute::LocalMessage) + .is_ok()); + } + } + // Should exclude other node's last-voted-fork-slot because of different + // shred-version. + let slots = cluster_info.get_restart_last_voted_fork_slots(&mut Cursor::default()); + assert_eq!(slots.len(), 2); + assert_eq!(slots[0].1.from, cluster_info.id()); + assert_eq!(slots[1].1.from, cluster_info.id()); + // Match shred versions. + { + let mut node = cluster_info.my_contact_info.write().unwrap(); + node.set_shred_version(42); + } + cluster_info.push_self(); + cluster_info.flush_push_queue(); + // Should now include both epoch slots. + let slots = cluster_info.get_restart_last_voted_fork_slots(&mut Cursor::default()); + assert_eq!(slots.len(), 3); + assert_eq!(slots[0].1.from, cluster_info.id()); + assert_eq!(slots[1].1.from, cluster_info.id()); + assert_eq!(slots[2].1.from, node_pubkey); + } + + #[test] + fn test_push_restart_heaviest_fork() { + solana_logger::setup(); + let keypair = Arc::new(Keypair::new()); + let pubkey = keypair.pubkey(); + let contact_info = ContactInfo::new_localhost(&pubkey, 0); + let cluster_info = ClusterInfo::new(contact_info, keypair, SocketAddrSpace::Unspecified); + + // make sure empty crds is handled correctly + let mut cursor = Cursor::default(); + let heaviest_forks = cluster_info.get_restart_heaviest_fork(&mut cursor); + assert_eq!(heaviest_forks, vec![]); + + // add new message + let slot1 = 53; + let hash1 = Hash::new_unique(); + let percent1 = 0.015; + assert!(cluster_info + .push_restart_heaviest_fork(slot1, hash1, percent1) + .is_ok()); + cluster_info.flush_push_queue(); + + let heaviest_forks = cluster_info.get_restart_heaviest_fork(&mut cursor); + assert_eq!(heaviest_forks.len(), 1); + let (slot, hash, percent) = &heaviest_forks[0]; + assert_eq!(slot, &slot1); + assert_eq!(hash, &hash1); + assert!((percent.percent as f64 - percent1 * MAX_PERCENT as f64).abs() < f64::EPSILON); + assert_eq!(percent.from, pubkey); + + // ignore bad input + assert_eq!( + cluster_info.push_restart_heaviest_fork(slot1, hash1, 1.5), + Err(format!( + "heaviest_fork with out of bound percent, ignored: 1.5" + )) + ); + assert_eq!( + cluster_info.push_restart_heaviest_fork(slot1, hash1, -0.3), + Err(format!( + "heaviest_fork with out of bound percent, ignored: -0.3" + )) + ); + + // Test with different shred versions. + let mut rng = rand::thread_rng(); + let pubkey2 = Pubkey::new_unique(); + let mut new_node = LegacyContactInfo::new_rand(&mut rng, Some(pubkey2)); + new_node.set_shred_version(42); + let slot2 = 54; + let hash2 = Hash::new_unique(); + let percent2 = 0.023; + let entries = vec![ + CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(new_node)), + CrdsValue::new_unsigned(CrdsData::RestartHeaviestFork( + slot2, + hash2, + Percent::new(pubkey2, (percent2 * MAX_PERCENT as f64) as u16), + )), + ]; + { + let mut gossip_crds = cluster_info.gossip.crds.write().unwrap(); + for entry in entries { + assert!(gossip_crds + .insert(entry, /*now=*/ 0, GossipRoute::LocalMessage) + .is_ok()); + } + } + // Should exclude other node's heaviest_fork because of different + // shred-version. + let heaviest_forks = cluster_info.get_restart_heaviest_fork(&mut Cursor::default()); + assert_eq!(heaviest_forks.len(), 1); + assert_eq!(heaviest_forks[0].2.from, pubkey); + // Match shred versions. + { + let mut node = cluster_info.my_contact_info.write().unwrap(); + node.set_shred_version(42); + } + cluster_info.push_self(); + cluster_info.flush_push_queue(); + // Should now include both epoch slots. + let heaviest_forks = cluster_info.get_restart_heaviest_fork(&mut Cursor::default()); + assert_eq!(heaviest_forks.len(), 2); + assert_eq!(heaviest_forks[0].2.from, pubkey); + assert_eq!(heaviest_forks[1].2.from, pubkey2); + } + #[test] fn test_append_entrypoint_to_pulls() { let thread_pool = ThreadPoolBuilder::new().build().unwrap(); diff --git a/gossip/src/cluster_info_metrics.rs b/gossip/src/cluster_info_metrics.rs index 095848fd2932ca..0e474d3cf5284f 100644 --- a/gossip/src/cluster_info_metrics.rs +++ b/gossip/src/cluster_info_metrics.rs @@ -627,6 +627,18 @@ pub(crate) fn submit_gossip_stats( ("SnapshotHashes-pull", crds_stats.pull.counts[10], i64), ("ContactInfo-push", crds_stats.push.counts[11], i64), ("ContactInfo-pull", crds_stats.pull.counts[11], i64), + ( + "RestartLastVotedForkSlots-push", + crds_stats.push.counts[12], + i64 + ), + ( + "RestartLastVotedForkSlots-pull", + crds_stats.pull.counts[12], + i64 + ), + ("RestartHeaviestFork-push", crds_stats.push.counts[13], i64), + ("RestartHeaviestFork-pull", crds_stats.pull.counts[13], i64), ( "all-push", crds_stats.push.counts.iter().sum::(), @@ -664,6 +676,18 @@ pub(crate) fn submit_gossip_stats( ("SnapshotHashes-pull", crds_stats.pull.fails[10], i64), ("ContactInfo-push", crds_stats.push.fails[11], i64), ("ContactInfo-pull", crds_stats.pull.fails[11], i64), + ( + "RestartLastVotedForkSlots-push", + crds_stats.push.fails[12], + i64 + ), + ( + "RestartLastVotedForkSlots-pull", + crds_stats.pull.fails[12], + i64 + ), + ("RestartHeaviestFork-push", crds_stats.push.fails[13], i64), + ("RestartHeaviestFork-pull", crds_stats.pull.fails[13], i64), ("all-push", crds_stats.push.fails.iter().sum::(), i64), ("all-pull", crds_stats.pull.fails.iter().sum::(), i64), ); diff --git a/gossip/src/crds.rs b/gossip/src/crds.rs index d8ab6e45b3d593..39383dd61597e7 100644 --- a/gossip/src/crds.rs +++ b/gossip/src/crds.rs @@ -77,6 +77,10 @@ pub struct Crds { epoch_slots: BTreeMap, // Indices of DuplicateShred keyed by insert order. duplicate_shreds: BTreeMap, + // Indices of RestartLastVotedForkSlots keyed by insert order. + restart_last_voted_fork_slots: BTreeMap, + // Indices of RestartHeaviestFork keyed by insert order. + restart_heaviest_fork: BTreeMap, // Indices of all crds values associated with a node. records: HashMap>, // Indices of all entries keyed by insert order. @@ -103,7 +107,7 @@ pub enum GossipRoute<'a> { PushMessage(/*from:*/ &'a Pubkey), } -type CrdsCountsArray = [usize; 12]; +type CrdsCountsArray = [usize; 14]; pub(crate) struct CrdsDataStats { pub(crate) counts: CrdsCountsArray, @@ -169,6 +173,8 @@ impl Default for Crds { votes: BTreeMap::default(), epoch_slots: BTreeMap::default(), duplicate_shreds: BTreeMap::default(), + restart_last_voted_fork_slots: BTreeMap::default(), + restart_heaviest_fork: BTreeMap::default(), records: HashMap::default(), entries: BTreeMap::default(), purged: VecDeque::default(), @@ -242,6 +248,14 @@ impl Crds { CrdsData::DuplicateShred(_, _) => { self.duplicate_shreds.insert(value.ordinal, entry_index); } + CrdsData::RestartLastVotedForkSlots(_, _, _, _) => { + self.restart_last_voted_fork_slots + .insert(value.ordinal, entry_index); + } + CrdsData::RestartHeaviestFork(_, _, _) => { + self.restart_heaviest_fork + .insert(value.ordinal, entry_index); + } _ => (), }; self.entries.insert(value.ordinal, entry_index); @@ -277,6 +291,17 @@ impl Crds { self.duplicate_shreds.remove(&entry.get().ordinal); self.duplicate_shreds.insert(value.ordinal, entry_index); } + CrdsData::RestartLastVotedForkSlots(_, _, _, _) => { + self.restart_last_voted_fork_slots + .remove(&entry.get().ordinal); + self.restart_last_voted_fork_slots + .insert(value.ordinal, entry_index); + } + CrdsData::RestartHeaviestFork(_, _, _) => { + self.restart_heaviest_fork.remove(&entry.get().ordinal); + self.restart_heaviest_fork + .insert(value.ordinal, entry_index); + } _ => (), } self.entries.remove(&entry.get().ordinal); @@ -377,6 +402,36 @@ impl Crds { }) } + /// Returns last_voted_fork_slots inserted since the given cursor. + /// Updates the cursor as the values are consumed. + pub(crate) fn get_restart_last_voted_fork_slots<'a>( + &'a self, + cursor: &'a mut Cursor, + ) -> impl Iterator { + let range = (Bound::Included(cursor.ordinal()), Bound::Unbounded); + self.restart_last_voted_fork_slots + .range(range) + .map(move |(ordinal, index)| { + cursor.consume(*ordinal); + self.table.index(*index) + }) + } + + /// Returns heaviest_fork inserted since the given cursor. + /// Updates the cursor as the values are consumed. + pub(crate) fn get_restart_heaviest_fork<'a>( + &'a self, + cursor: &'a mut Cursor, + ) -> impl Iterator { + let range = (Bound::Included(cursor.ordinal()), Bound::Unbounded); + self.restart_heaviest_fork + .range(range) + .map(move |(ordinal, index)| { + cursor.consume(*ordinal); + self.table.index(*index) + }) + } + /// Returns all entries inserted since the given cursor. pub(crate) fn get_entries<'a>( &'a self, @@ -544,6 +599,12 @@ impl Crds { CrdsData::DuplicateShred(_, _) => { self.duplicate_shreds.remove(&value.ordinal); } + CrdsData::RestartLastVotedForkSlots(_, _, _, _) => { + self.restart_last_voted_fork_slots.remove(&value.ordinal); + } + CrdsData::RestartHeaviestFork(_, _, _) => { + self.restart_heaviest_fork.remove(&value.ordinal); + } _ => (), } self.entries.remove(&value.ordinal); @@ -581,6 +642,13 @@ impl Crds { CrdsData::DuplicateShred(_, _) => { self.duplicate_shreds.insert(value.ordinal, index); } + CrdsData::RestartLastVotedForkSlots(_, _, _, _) => { + self.restart_last_voted_fork_slots + .insert(value.ordinal, index); + } + CrdsData::RestartHeaviestFork(_, _, _) => { + self.restart_heaviest_fork.insert(value.ordinal, index); + } _ => (), }; self.entries.insert(value.ordinal, index); @@ -721,6 +789,8 @@ impl CrdsDataStats { CrdsData::DuplicateShred(_, _) => 9, CrdsData::SnapshotHashes(_) => 10, CrdsData::ContactInfo(_) => 11, + CrdsData::RestartLastVotedForkSlots(_, _, _, _) => 12, + CrdsData::RestartHeaviestFork(_, _, _) => 13, // Update CrdsCountsArray if new items are added here. } } @@ -1118,6 +1188,8 @@ mod tests { usize, // number of nodes usize, // number of votes usize, // number of epoch slots + usize, // number of restart last voted fork slots + usize, // number of restart heaviest forks ) { let size = crds.table.len(); let since = if size == 0 || rng.gen() { @@ -1146,6 +1218,65 @@ mod tests { assert!(value.ordinal >= since); assert_matches!(value.value.data, CrdsData::EpochSlots(_, _)); } + let num_last_voted_fork_slots = crds + .table + .values() + .filter(|v| v.ordinal >= since) + .filter(|v| { + matches!( + v.value.data, + CrdsData::RestartLastVotedForkSlots(_, _, _, _) + ) + }) + .count(); + let mut cursor = Cursor(since); + assert_eq!( + num_last_voted_fork_slots, + crds.get_restart_last_voted_fork_slots(&mut cursor).count() + ); + assert_eq!( + cursor.0, + crds.restart_last_voted_fork_slots + .iter() + .last() + .map(|(k, _)| k + 1) + .unwrap_or_default() + .max(since) + ); + for value in crds.get_restart_last_voted_fork_slots(&mut Cursor(since)) { + assert!(value.ordinal >= since); + match value.value.data { + CrdsData::RestartLastVotedForkSlots(_, _, _, _) => (), + _ => panic!("not a last-voted-fork-slot!"), + } + } + let num_heaviest_forks = crds + .table + .values() + .filter(|v| v.ordinal >= since) + .filter(|v| matches!(v.value.data, CrdsData::RestartHeaviestFork(_, _, _))) + .count(); + let mut cursor = Cursor(since); + assert_eq!( + num_heaviest_forks, + crds.get_restart_heaviest_fork(&mut cursor).count() + ); + assert_eq!( + cursor.0, + crds.restart_heaviest_fork + .iter() + .last() + .map(|(k, _)| k + 1) + .unwrap_or_default() + .max(since) + ); + for value in crds.get_restart_heaviest_fork(&mut Cursor(since)) { + assert!(value.ordinal >= since); + match value.value.data { + CrdsData::RestartHeaviestFork(_, _, _) => (), + _ => panic!("not a heaviest-fork!"), + } + } let num_votes = crds .table .values() @@ -1203,6 +1334,21 @@ mod tests { .values() .filter(|v| matches!(v.value.data, CrdsData::EpochSlots(_, _))) .count(); + let num_restart_last_voted_fork_slots = crds + .table + .values() + .filter(|v| { + matches!( + v.value.data, + CrdsData::RestartLastVotedForkSlots(_, _, _, _) + ) + }) + .count(); + let num_restart_heaviest_forks = crds + .table + .values() + .filter(|v| matches!(v.value.data, CrdsData::RestartHeaviestFork(_, _, _))) + .count(); assert_eq!( crds.table.len(), crds.get_entries(&mut Cursor::default()).count() @@ -1213,13 +1359,43 @@ mod tests { num_epoch_slots, crds.get_epoch_slots(&mut Cursor::default()).count() ); + assert_eq!( + num_restart_last_voted_fork_slots, + crds.get_restart_last_voted_fork_slots(&mut Cursor::default()) + .count() + ); + assert_eq!( + num_restart_heaviest_forks, + crds.get_restart_heaviest_fork(&mut Cursor::default()) + .count() + ); for vote in crds.get_votes(&mut Cursor::default()) { assert_matches!(vote.value.data, CrdsData::Vote(_, _)); } for epoch_slots in crds.get_epoch_slots(&mut Cursor::default()) { assert_matches!(epoch_slots.value.data, CrdsData::EpochSlots(_, _)); } - (num_nodes, num_votes, num_epoch_slots) + for restart_last_voted_fork_slots in + crds.get_restart_last_voted_fork_slots(&mut Cursor::default()) + { + match restart_last_voted_fork_slots.value.data { + CrdsData::RestartLastVotedForkSlots(_, _, _, _) => (), + _ => panic!("not a restart-last-voted-fork-slot!"), + } + } + for restart_heaviest_fork in crds.get_restart_heaviest_fork(&mut Cursor::default()) { + match restart_heaviest_fork.value.data { + CrdsData::RestartHeaviestFork(_, _, _) => (), + _ => panic!("not a restart-heaviest-fork!"), + } + } + ( + num_nodes, + num_votes, + num_epoch_slots, + num_last_voted_fork_slots, + num_heaviest_forks, + ) } #[test] @@ -1232,7 +1408,7 @@ mod tests { let keypair = &keypairs[rng.gen_range(0..keypairs.len())]; let value = CrdsValue::new_rand(&mut rng, Some(keypair)); let local_timestamp = new_rand_timestamp(&mut rng); - if let Ok(()) = crds.insert(value, local_timestamp, GossipRoute::LocalMessage) { + if let Ok(()) = crds.insert(value.clone(), local_timestamp, GossipRoute::LocalMessage) { num_inserts += 1; } if k % 16 == 0 { @@ -1245,11 +1421,25 @@ mod tests { assert!(crds.table.len() > 200); assert_eq!(crds.num_purged() + crds.table.len(), 4096); assert!(num_inserts > crds.table.len()); - let (num_nodes, num_votes, num_epoch_slots) = check_crds_value_indices(&mut rng, &crds); + let ( + num_nodes, + num_votes, + num_epoch_slots, + num_restart_last_voted_fork_slots, + num_restart_heaviest_forks, + ) = check_crds_value_indices(&mut rng, &crds); assert!(num_nodes * 3 < crds.table.len()); assert!(num_nodes > 100, "num nodes: {num_nodes}"); assert!(num_votes > 100, "num votes: {num_votes}"); assert!(num_epoch_slots > 100, "num epoch slots: {num_epoch_slots}"); + assert!( + num_restart_last_voted_fork_slots > 0, + "num restart last voted fork slots: {num_restart_last_voted_fork_slots}" + ); + assert!( + num_restart_heaviest_forks > 0, + "num restart heaviest forks: {num_restart_heaviest_forks}" + ); // Remove values one by one and assert that nodes indices stay valid. while !crds.table.is_empty() { let index = rng.gen_range(0..crds.table.len()); diff --git a/gossip/src/crds_value.rs b/gossip/src/crds_value.rs index 24d66e3c520b71..c2c9ed527079f8 100644 --- a/gossip/src/crds_value.rs +++ b/gossip/src/crds_value.rs @@ -38,6 +38,9 @@ pub const MAX_VOTES: VoteIndex = 32; pub type EpochSlotsIndex = u8; pub const MAX_EPOCH_SLOTS: EpochSlotsIndex = 255; +// We now keep 81000 slots, 81000/MAX_SLOTS_PER_ENTRY = 5. +pub const MAX_RESTART_LAST_VOTED_FORK_SLOTS: EpochSlotsIndex = 5; +pub const MAX_PERCENT: u16 = 10000; /// CrdsValue that is replicated across the cluster #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, AbiExample)] @@ -94,6 +97,8 @@ pub enum CrdsData { DuplicateShred(DuplicateShredIndex, DuplicateShred), SnapshotHashes(SnapshotHashes), ContactInfo(ContactInfo), + RestartLastVotedForkSlots(EpochSlotsIndex, EpochSlots, Slot, Hash), + RestartHeaviestFork(Slot, Hash, Percent), } impl Sanitize for CrdsData { @@ -132,6 +137,18 @@ impl Sanitize for CrdsData { } CrdsData::SnapshotHashes(val) => val.sanitize(), CrdsData::ContactInfo(node) => node.sanitize(), + CrdsData::RestartLastVotedForkSlots(ix, slots, _last_vote_slot, last_vote_hash) => { + if *ix as usize >= MAX_RESTART_LAST_VOTED_FORK_SLOTS as usize { + return Err(SanitizeError::ValueOutOfBounds); + } + slots.sanitize().and(last_vote_hash.sanitize()) + } + CrdsData::RestartHeaviestFork(_slot, hash, percent) => { + if percent.percent > MAX_PERCENT { + return Err(SanitizeError::ValueOutOfBounds); + } + hash.sanitize() + } } } } @@ -145,7 +162,7 @@ pub(crate) fn new_rand_timestamp(rng: &mut R) -> u64 { impl CrdsData { /// New random CrdsData for tests and benchmarks. fn new_rand(rng: &mut R, pubkey: Option) -> CrdsData { - let kind = rng.gen_range(0..7); + let kind = rng.gen_range(0..9); // TODO: Implement other kinds of CrdsData here. // TODO: Assign ranges to each arm proportional to their frequency in // the mainnet crds table. @@ -157,9 +174,20 @@ impl CrdsData { 3 => CrdsData::AccountsHashes(AccountsHashes::new_rand(rng, pubkey)), 4 => CrdsData::Version(Version::new_rand(rng, pubkey)), 5 => CrdsData::Vote(rng.gen_range(0..MAX_VOTES), Vote::new_rand(rng, pubkey)), - _ => CrdsData::EpochSlots( + 6 => CrdsData::RestartLastVotedForkSlots( rng.gen_range(0..MAX_EPOCH_SLOTS), EpochSlots::new_rand(rng, pubkey), + rng.gen_range(0..512), + Hash::new_unique(), + ), + 7 => CrdsData::RestartHeaviestFork( + rng.gen_range(0..512), + Hash::new_unique(), + Percent::new_rand(rng, pubkey), + ), + _ => CrdsData::EpochSlots( + rng.gen_range(0..MAX_RESTART_LAST_VOTED_FORK_SLOTS), + EpochSlots::new_rand(rng, pubkey), ), } } @@ -485,6 +513,41 @@ impl Sanitize for NodeInstance { } } +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, AbiExample)] +pub struct Percent { + pub from: Pubkey, + pub(crate) percent: u16, + pub(crate) wallclock: u64, +} + +impl Percent { + pub fn new(from: Pubkey, percent: u16) -> Self { + Self { + from, + percent, + wallclock: timestamp(), + } + } + + fn new_rand(rng: &mut R, pubkey: Option) -> Self { + Self { + from: pubkey.unwrap_or_else(pubkey::new_rand), + wallclock: new_rand_timestamp(rng), + percent: rng.gen_range(1..80), + } + } +} + +impl Sanitize for Percent { + fn sanitize(&self) -> Result<(), SanitizeError> { + sanitize_wallclock(self.wallclock)?; + if self.percent > MAX_PERCENT { + return Err(SanitizeError::ValueOutOfBounds); + } + self.from.sanitize() + } +} + /// Type of the replicated value /// These are labels for values in a record that is associated with `Pubkey` #[derive(PartialEq, Hash, Eq, Clone, Debug)] @@ -501,6 +564,8 @@ pub enum CrdsValueLabel { DuplicateShred(DuplicateShredIndex, Pubkey), SnapshotHashes(Pubkey), ContactInfo(Pubkey), + RestartLastVotedForkSlots(EpochSlotsIndex, Pubkey), + RestartHeaviestFork(Pubkey), } impl fmt::Display for CrdsValueLabel { @@ -524,6 +589,12 @@ impl fmt::Display for CrdsValueLabel { write!(f, "SnapshotHashes({})", self.pubkey()) } CrdsValueLabel::ContactInfo(_) => write!(f, "ContactInfo({})", self.pubkey()), + CrdsValueLabel::RestartLastVotedForkSlots(ix, _) => { + write!(f, "RestartLastVotedForkSlots({}, {})", ix, self.pubkey()) + } + CrdsValueLabel::RestartHeaviestFork(_) => { + write!(f, "RestartHeaviestFork({})", self.pubkey()) + } } } } @@ -543,6 +614,8 @@ impl CrdsValueLabel { CrdsValueLabel::DuplicateShred(_, p) => *p, CrdsValueLabel::SnapshotHashes(p) => *p, CrdsValueLabel::ContactInfo(pubkey) => *pubkey, + CrdsValueLabel::RestartLastVotedForkSlots(_, p) => *p, + CrdsValueLabel::RestartHeaviestFork(p) => *p, } } } @@ -593,6 +666,8 @@ impl CrdsValue { CrdsData::DuplicateShred(_, shred) => shred.wallclock, CrdsData::SnapshotHashes(hash) => hash.wallclock, CrdsData::ContactInfo(node) => node.wallclock(), + CrdsData::RestartLastVotedForkSlots(_, slots, _, _) => slots.wallclock, + CrdsData::RestartHeaviestFork(_, _, percent) => percent.wallclock, } } pub fn pubkey(&self) -> Pubkey { @@ -609,6 +684,8 @@ impl CrdsValue { CrdsData::DuplicateShred(_, shred) => shred.from, CrdsData::SnapshotHashes(hash) => hash.from, CrdsData::ContactInfo(node) => *node.pubkey(), + CrdsData::RestartLastVotedForkSlots(_, slots, _, _) => slots.from, + CrdsData::RestartHeaviestFork(_, _, percent) => percent.from, } } pub fn label(&self) -> CrdsValueLabel { @@ -627,6 +704,12 @@ impl CrdsValue { CrdsData::DuplicateShred(ix, shred) => CrdsValueLabel::DuplicateShred(*ix, shred.from), CrdsData::SnapshotHashes(_) => CrdsValueLabel::SnapshotHashes(self.pubkey()), CrdsData::ContactInfo(node) => CrdsValueLabel::ContactInfo(*node.pubkey()), + CrdsData::RestartLastVotedForkSlots(ix, _, _, _) => { + CrdsValueLabel::RestartLastVotedForkSlots(*ix, self.pubkey()) + } + CrdsData::RestartHeaviestFork(_, _, percent) => { + CrdsValueLabel::RestartHeaviestFork(percent.from) + } } } pub fn contact_info(&self) -> Option<&LegacyContactInfo> { @@ -646,6 +729,7 @@ impl CrdsValue { pub(crate) fn epoch_slots(&self) -> Option<&EpochSlots> { match &self.data { CrdsData::EpochSlots(_, slots) => Some(slots), + CrdsData::RestartLastVotedForkSlots(_, slots, _, _) => Some(slots), _ => None, } } @@ -1073,4 +1157,60 @@ mod test { assert!(node.should_force_push(&pubkey)); assert!(!node.should_force_push(&Pubkey::new_unique())); } + + #[test] + fn test_restart_last_voted_fork_slots() { + let keypair = Keypair::new(); + let mut epoch_slots = EpochSlots::new(keypair.pubkey(), timestamp()); + let slot = 53; + epoch_slots.fill(&vec![slot], timestamp()); + let ix = 1; + let value = CrdsValue::new_signed( + CrdsData::RestartLastVotedForkSlots(ix, epoch_slots.clone(), slot, Hash::default()), + &keypair, + ); + assert_eq!(value.sanitize(), Ok(())); + let label = value.label(); + assert_eq!( + label, + CrdsValueLabel::RestartLastVotedForkSlots(ix, keypair.pubkey()) + ); + assert_eq!(label.pubkey(), keypair.pubkey()); + let retrieved_epoch_slots = value.epoch_slots().unwrap(); + assert_eq!(value.wallclock(), epoch_slots.wallclock); + assert_eq!(retrieved_epoch_slots, &epoch_slots); + + let bad_value = CrdsValue::new_signed( + CrdsData::RestartLastVotedForkSlots( + MAX_RESTART_LAST_VOTED_FORK_SLOTS, + epoch_slots, + slot, + Hash::default(), + ), + &keypair, + ); + assert_eq!(bad_value.sanitize(), Err(SanitizeError::ValueOutOfBounds)) + } + + #[test] + fn test_restart_heaviest_fork() { + let keypair = Keypair::new(); + let slot = 53; + let hash = Hash::new_unique(); + let percent = Percent::new(keypair.pubkey(), 150); + let value = + CrdsValue::new_signed(CrdsData::RestartHeaviestFork(slot, hash, percent), &keypair); + assert_eq!(value.sanitize(), Ok(())); + let label = value.label(); + assert_eq!(label, CrdsValueLabel::RestartHeaviestFork(keypair.pubkey())); + assert_eq!(label.pubkey(), keypair.pubkey()); + + let bad_percent = Percent::new(keypair.pubkey(), MAX_PERCENT + 1); + assert_eq!(bad_percent.sanitize(), Err(SanitizeError::ValueOutOfBounds)); + let bad_value = CrdsValue::new_signed( + CrdsData::RestartHeaviestFork(slot, hash, bad_percent), + &keypair, + ); + assert_eq!(bad_value.sanitize(), Err(SanitizeError::ValueOutOfBounds)); + } } From 0edf40e481f0a20ba7ef1163ca1d4b501546a5c4 Mon Sep 17 00:00:00 2001 From: Wen Date: Wed, 13 Sep 2023 12:17:05 -0700 Subject: [PATCH 02/13] Fix linter errors. --- gossip/src/cluster_info.rs | 8 ++------ gossip/src/crds_value.rs | 2 +- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 7363a143d638fd..36266911c9135c 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -4211,15 +4211,11 @@ mod tests { // ignore bad input assert_eq!( cluster_info.push_restart_heaviest_fork(slot1, hash1, 1.5), - Err(format!( - "heaviest_fork with out of bound percent, ignored: 1.5" - )) + Err("heaviest_fork with out of bound percent, ignored: 1.5".to_string()) ); assert_eq!( cluster_info.push_restart_heaviest_fork(slot1, hash1, -0.3), - Err(format!( - "heaviest_fork with out of bound percent, ignored: -0.3" - )) + Err("heaviest_fork with out of bound percent, ignored: -0.3".to_string()) ); // Test with different shred versions. diff --git a/gossip/src/crds_value.rs b/gossip/src/crds_value.rs index c2c9ed527079f8..5f41c584d5b2a6 100644 --- a/gossip/src/crds_value.rs +++ b/gossip/src/crds_value.rs @@ -1163,7 +1163,7 @@ mod test { let keypair = Keypair::new(); let mut epoch_slots = EpochSlots::new(keypair.pubkey(), timestamp()); let slot = 53; - epoch_slots.fill(&vec![slot], timestamp()); + epoch_slots.fill(&[slot], timestamp()); let ix = 1; let value = CrdsValue::new_signed( CrdsData::RestartLastVotedForkSlots(ix, epoch_slots.clone(), slot, Hash::default()), From a3749b2d21073cee6585fa88b728273dec5cb135 Mon Sep 17 00:00:00 2001 From: Wen Date: Wed, 13 Sep 2023 14:39:15 -0700 Subject: [PATCH 03/13] Revert RestartHeaviestFork, it will be added in another PR. --- gossip/src/cluster_info.rs | 132 +---------------------------- gossip/src/cluster_info_metrics.rs | 4 - gossip/src/crds.rs | 94 +------------------- gossip/src/crds_value.rs | 82 +----------------- 4 files changed, 7 insertions(+), 305 deletions(-) diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 36266911c9135c..840c2bcc114b25 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -33,8 +33,8 @@ use { }, crds_value::{ self, AccountsHashes, CrdsData, CrdsValue, CrdsValueLabel, EpochSlotsIndex, - LegacySnapshotHashes, LowestSlot, NodeInstance, Percent, SnapshotHashes, Version, Vote, - MAX_PERCENT, MAX_WALLCLOCK, + LegacySnapshotHashes, LowestSlot, NodeInstance, SnapshotHashes, Version, Vote, + MAX_WALLCLOCK, }, duplicate_shred::DuplicateShred, epoch_slots::EpochSlots, @@ -398,8 +398,7 @@ fn retain_staked(values: &mut Vec, stakes: &HashMap) { CrdsData::LowestSlot(_, _) | CrdsData::LegacyVersion(_) | CrdsData::DuplicateShred(_, _) - | CrdsData::RestartLastVotedForkSlots(_, _, _, _) - | CrdsData::RestartHeaviestFork(_, _, _) => { + | CrdsData::RestartLastVotedForkSlots(_, _, _, _) => { let stake = stakes.get(&value.pubkey()).copied(); stake.unwrap_or_default() >= MIN_STAKE_FOR_GOSSIP } @@ -1009,28 +1008,6 @@ impl ClusterInfo { } } - pub fn push_restart_heaviest_fork( - &self, - slot: Slot, - hash: Hash, - percent: f64, - ) -> Result<(), String> { - if !(0.0..1.0).contains(&percent) { - return Err(format!( - "heaviest_fork with out of bound percent, ignored: {}", - percent - )); - } - - let message = CrdsData::RestartHeaviestFork( - slot, - hash, - Percent::new(self.id(), (percent * MAX_PERCENT as f64) as u16), - ); - self.push_message(CrdsValue::new_signed(message, &self.keypair())); - Ok(()) - } - fn time_gossip_read_lock<'a>( &'a self, label: &'static str, @@ -1329,26 +1306,6 @@ impl ClusterInfo { .collect() } - /// Returns heaviest-fork inserted since the given cursor. - /// Excludes entries from nodes with unkown or different shred version. - pub fn get_restart_heaviest_fork(&self, cursor: &mut Cursor) -> Vec<(Slot, Hash, Percent)> { - let self_shred_version = Some(self.my_shred_version()); - let gossip_crds = self.gossip.crds.read().unwrap(); - gossip_crds - .get_restart_heaviest_fork(cursor) - .filter(|entry| { - let origin = entry.value.pubkey(); - gossip_crds.get_shred_version(&origin) == self_shred_version - }) - .map(|entry| match &entry.value.data { - CrdsData::RestartHeaviestFork(last_vote, last_vote_hash, percent) => { - (*last_vote, *last_vote_hash, percent.clone()) - } - _ => panic!("this should not happen!"), - }) - .collect() - } - /// Returns duplicate-shreds inserted since the given cursor. pub(crate) fn get_duplicate_shreds(&self, cursor: &mut Cursor) -> Vec { let gossip_crds = self.gossip.crds.read().unwrap(); @@ -4178,89 +4135,6 @@ mod tests { assert_eq!(slots[2].1.from, node_pubkey); } - #[test] - fn test_push_restart_heaviest_fork() { - solana_logger::setup(); - let keypair = Arc::new(Keypair::new()); - let pubkey = keypair.pubkey(); - let contact_info = ContactInfo::new_localhost(&pubkey, 0); - let cluster_info = ClusterInfo::new(contact_info, keypair, SocketAddrSpace::Unspecified); - - // make sure empty crds is handled correctly - let mut cursor = Cursor::default(); - let heaviest_forks = cluster_info.get_restart_heaviest_fork(&mut cursor); - assert_eq!(heaviest_forks, vec![]); - - // add new message - let slot1 = 53; - let hash1 = Hash::new_unique(); - let percent1 = 0.015; - assert!(cluster_info - .push_restart_heaviest_fork(slot1, hash1, percent1) - .is_ok()); - cluster_info.flush_push_queue(); - - let heaviest_forks = cluster_info.get_restart_heaviest_fork(&mut cursor); - assert_eq!(heaviest_forks.len(), 1); - let (slot, hash, percent) = &heaviest_forks[0]; - assert_eq!(slot, &slot1); - assert_eq!(hash, &hash1); - assert!((percent.percent as f64 - percent1 * MAX_PERCENT as f64).abs() < f64::EPSILON); - assert_eq!(percent.from, pubkey); - - // ignore bad input - assert_eq!( - cluster_info.push_restart_heaviest_fork(slot1, hash1, 1.5), - Err("heaviest_fork with out of bound percent, ignored: 1.5".to_string()) - ); - assert_eq!( - cluster_info.push_restart_heaviest_fork(slot1, hash1, -0.3), - Err("heaviest_fork with out of bound percent, ignored: -0.3".to_string()) - ); - - // Test with different shred versions. - let mut rng = rand::thread_rng(); - let pubkey2 = Pubkey::new_unique(); - let mut new_node = LegacyContactInfo::new_rand(&mut rng, Some(pubkey2)); - new_node.set_shred_version(42); - let slot2 = 54; - let hash2 = Hash::new_unique(); - let percent2 = 0.023; - let entries = vec![ - CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(new_node)), - CrdsValue::new_unsigned(CrdsData::RestartHeaviestFork( - slot2, - hash2, - Percent::new(pubkey2, (percent2 * MAX_PERCENT as f64) as u16), - )), - ]; - { - let mut gossip_crds = cluster_info.gossip.crds.write().unwrap(); - for entry in entries { - assert!(gossip_crds - .insert(entry, /*now=*/ 0, GossipRoute::LocalMessage) - .is_ok()); - } - } - // Should exclude other node's heaviest_fork because of different - // shred-version. - let heaviest_forks = cluster_info.get_restart_heaviest_fork(&mut Cursor::default()); - assert_eq!(heaviest_forks.len(), 1); - assert_eq!(heaviest_forks[0].2.from, pubkey); - // Match shred versions. - { - let mut node = cluster_info.my_contact_info.write().unwrap(); - node.set_shred_version(42); - } - cluster_info.push_self(); - cluster_info.flush_push_queue(); - // Should now include both epoch slots. - let heaviest_forks = cluster_info.get_restart_heaviest_fork(&mut Cursor::default()); - assert_eq!(heaviest_forks.len(), 2); - assert_eq!(heaviest_forks[0].2.from, pubkey); - assert_eq!(heaviest_forks[1].2.from, pubkey2); - } - #[test] fn test_append_entrypoint_to_pulls() { let thread_pool = ThreadPoolBuilder::new().build().unwrap(); diff --git a/gossip/src/cluster_info_metrics.rs b/gossip/src/cluster_info_metrics.rs index 0e474d3cf5284f..fbb7365387aad9 100644 --- a/gossip/src/cluster_info_metrics.rs +++ b/gossip/src/cluster_info_metrics.rs @@ -637,8 +637,6 @@ pub(crate) fn submit_gossip_stats( crds_stats.pull.counts[12], i64 ), - ("RestartHeaviestFork-push", crds_stats.push.counts[13], i64), - ("RestartHeaviestFork-pull", crds_stats.pull.counts[13], i64), ( "all-push", crds_stats.push.counts.iter().sum::(), @@ -686,8 +684,6 @@ pub(crate) fn submit_gossip_stats( crds_stats.pull.fails[12], i64 ), - ("RestartHeaviestFork-push", crds_stats.push.fails[13], i64), - ("RestartHeaviestFork-pull", crds_stats.pull.fails[13], i64), ("all-push", crds_stats.push.fails.iter().sum::(), i64), ("all-pull", crds_stats.pull.fails.iter().sum::(), i64), ); diff --git a/gossip/src/crds.rs b/gossip/src/crds.rs index 39383dd61597e7..f828ad2960f3b4 100644 --- a/gossip/src/crds.rs +++ b/gossip/src/crds.rs @@ -79,8 +79,6 @@ pub struct Crds { duplicate_shreds: BTreeMap, // Indices of RestartLastVotedForkSlots keyed by insert order. restart_last_voted_fork_slots: BTreeMap, - // Indices of RestartHeaviestFork keyed by insert order. - restart_heaviest_fork: BTreeMap, // Indices of all crds values associated with a node. records: HashMap>, // Indices of all entries keyed by insert order. @@ -107,7 +105,7 @@ pub enum GossipRoute<'a> { PushMessage(/*from:*/ &'a Pubkey), } -type CrdsCountsArray = [usize; 14]; +type CrdsCountsArray = [usize; 13]; pub(crate) struct CrdsDataStats { pub(crate) counts: CrdsCountsArray, @@ -174,7 +172,6 @@ impl Default for Crds { epoch_slots: BTreeMap::default(), duplicate_shreds: BTreeMap::default(), restart_last_voted_fork_slots: BTreeMap::default(), - restart_heaviest_fork: BTreeMap::default(), records: HashMap::default(), entries: BTreeMap::default(), purged: VecDeque::default(), @@ -252,10 +249,6 @@ impl Crds { self.restart_last_voted_fork_slots .insert(value.ordinal, entry_index); } - CrdsData::RestartHeaviestFork(_, _, _) => { - self.restart_heaviest_fork - .insert(value.ordinal, entry_index); - } _ => (), }; self.entries.insert(value.ordinal, entry_index); @@ -297,11 +290,6 @@ impl Crds { self.restart_last_voted_fork_slots .insert(value.ordinal, entry_index); } - CrdsData::RestartHeaviestFork(_, _, _) => { - self.restart_heaviest_fork.remove(&entry.get().ordinal); - self.restart_heaviest_fork - .insert(value.ordinal, entry_index); - } _ => (), } self.entries.remove(&entry.get().ordinal); @@ -417,21 +405,6 @@ impl Crds { }) } - /// Returns heaviest_fork inserted since the given cursor. - /// Updates the cursor as the values are consumed. - pub(crate) fn get_restart_heaviest_fork<'a>( - &'a self, - cursor: &'a mut Cursor, - ) -> impl Iterator { - let range = (Bound::Included(cursor.ordinal()), Bound::Unbounded); - self.restart_heaviest_fork - .range(range) - .map(move |(ordinal, index)| { - cursor.consume(*ordinal); - self.table.index(*index) - }) - } - /// Returns all entries inserted since the given cursor. pub(crate) fn get_entries<'a>( &'a self, @@ -602,9 +575,6 @@ impl Crds { CrdsData::RestartLastVotedForkSlots(_, _, _, _) => { self.restart_last_voted_fork_slots.remove(&value.ordinal); } - CrdsData::RestartHeaviestFork(_, _, _) => { - self.restart_heaviest_fork.remove(&value.ordinal); - } _ => (), } self.entries.remove(&value.ordinal); @@ -646,9 +616,6 @@ impl Crds { self.restart_last_voted_fork_slots .insert(value.ordinal, index); } - CrdsData::RestartHeaviestFork(_, _, _) => { - self.restart_heaviest_fork.insert(value.ordinal, index); - } _ => (), }; self.entries.insert(value.ordinal, index); @@ -790,7 +757,6 @@ impl CrdsDataStats { CrdsData::SnapshotHashes(_) => 10, CrdsData::ContactInfo(_) => 11, CrdsData::RestartLastVotedForkSlots(_, _, _, _) => 12, - CrdsData::RestartHeaviestFork(_, _, _) => 13, // Update CrdsCountsArray if new items are added here. } } @@ -1189,7 +1155,6 @@ mod tests { usize, // number of votes usize, // number of epoch slots usize, // number of restart last voted fork slots - usize, // number of restart heaviest forks ) { let size = crds.table.len(); let since = if size == 0 || rng.gen() { @@ -1250,33 +1215,6 @@ mod tests { _ => panic!("not a last-voted-fork-slot!"), } } - let num_heaviest_forks = crds - .table - .values() - .filter(|v| v.ordinal >= since) - .filter(|v| matches!(v.value.data, CrdsData::RestartHeaviestFork(_, _, _))) - .count(); - let mut cursor = Cursor(since); - assert_eq!( - num_heaviest_forks, - crds.get_restart_heaviest_fork(&mut cursor).count() - ); - assert_eq!( - cursor.0, - crds.restart_heaviest_fork - .iter() - .last() - .map(|(k, _)| k + 1) - .unwrap_or_default() - .max(since) - ); - for value in crds.get_restart_heaviest_fork(&mut Cursor(since)) { - assert!(value.ordinal >= since); - match value.value.data { - CrdsData::RestartHeaviestFork(_, _, _) => (), - _ => panic!("not a heaviest-fork!"), - } - } let num_votes = crds .table .values() @@ -1344,11 +1282,6 @@ mod tests { ) }) .count(); - let num_restart_heaviest_forks = crds - .table - .values() - .filter(|v| matches!(v.value.data, CrdsData::RestartHeaviestFork(_, _, _))) - .count(); assert_eq!( crds.table.len(), crds.get_entries(&mut Cursor::default()).count() @@ -1364,11 +1297,6 @@ mod tests { crds.get_restart_last_voted_fork_slots(&mut Cursor::default()) .count() ); - assert_eq!( - num_restart_heaviest_forks, - crds.get_restart_heaviest_fork(&mut Cursor::default()) - .count() - ); for vote in crds.get_votes(&mut Cursor::default()) { assert_matches!(vote.value.data, CrdsData::Vote(_, _)); } @@ -1383,18 +1311,11 @@ mod tests { _ => panic!("not a restart-last-voted-fork-slot!"), } } - for restart_heaviest_fork in crds.get_restart_heaviest_fork(&mut Cursor::default()) { - match restart_heaviest_fork.value.data { - CrdsData::RestartHeaviestFork(_, _, _) => (), - _ => panic!("not a restart-heaviest-fork!"), - } - } ( num_nodes, num_votes, num_epoch_slots, num_last_voted_fork_slots, - num_heaviest_forks, ) } @@ -1421,13 +1342,8 @@ mod tests { assert!(crds.table.len() > 200); assert_eq!(crds.num_purged() + crds.table.len(), 4096); assert!(num_inserts > crds.table.len()); - let ( - num_nodes, - num_votes, - num_epoch_slots, - num_restart_last_voted_fork_slots, - num_restart_heaviest_forks, - ) = check_crds_value_indices(&mut rng, &crds); + let (num_nodes, num_votes, num_epoch_slots, num_restart_last_voted_fork_slots) = + check_crds_value_indices(&mut rng, &crds); assert!(num_nodes * 3 < crds.table.len()); assert!(num_nodes > 100, "num nodes: {num_nodes}"); assert!(num_votes > 100, "num votes: {num_votes}"); @@ -1436,10 +1352,6 @@ mod tests { num_restart_last_voted_fork_slots > 0, "num restart last voted fork slots: {num_restart_last_voted_fork_slots}" ); - assert!( - num_restart_heaviest_forks > 0, - "num restart heaviest forks: {num_restart_heaviest_forks}" - ); // Remove values one by one and assert that nodes indices stay valid. while !crds.table.is_empty() { let index = rng.gen_range(0..crds.table.len()); diff --git a/gossip/src/crds_value.rs b/gossip/src/crds_value.rs index 5f41c584d5b2a6..f91f656a27e9f8 100644 --- a/gossip/src/crds_value.rs +++ b/gossip/src/crds_value.rs @@ -40,7 +40,6 @@ pub type EpochSlotsIndex = u8; pub const MAX_EPOCH_SLOTS: EpochSlotsIndex = 255; // We now keep 81000 slots, 81000/MAX_SLOTS_PER_ENTRY = 5. pub const MAX_RESTART_LAST_VOTED_FORK_SLOTS: EpochSlotsIndex = 5; -pub const MAX_PERCENT: u16 = 10000; /// CrdsValue that is replicated across the cluster #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, AbiExample)] @@ -98,7 +97,6 @@ pub enum CrdsData { SnapshotHashes(SnapshotHashes), ContactInfo(ContactInfo), RestartLastVotedForkSlots(EpochSlotsIndex, EpochSlots, Slot, Hash), - RestartHeaviestFork(Slot, Hash, Percent), } impl Sanitize for CrdsData { @@ -143,12 +141,6 @@ impl Sanitize for CrdsData { } slots.sanitize().and(last_vote_hash.sanitize()) } - CrdsData::RestartHeaviestFork(_slot, hash, percent) => { - if percent.percent > MAX_PERCENT { - return Err(SanitizeError::ValueOutOfBounds); - } - hash.sanitize() - } } } } @@ -162,7 +154,7 @@ pub(crate) fn new_rand_timestamp(rng: &mut R) -> u64 { impl CrdsData { /// New random CrdsData for tests and benchmarks. fn new_rand(rng: &mut R, pubkey: Option) -> CrdsData { - let kind = rng.gen_range(0..9); + let kind = rng.gen_range(0..8); // TODO: Implement other kinds of CrdsData here. // TODO: Assign ranges to each arm proportional to their frequency in // the mainnet crds table. @@ -180,11 +172,6 @@ impl CrdsData { rng.gen_range(0..512), Hash::new_unique(), ), - 7 => CrdsData::RestartHeaviestFork( - rng.gen_range(0..512), - Hash::new_unique(), - Percent::new_rand(rng, pubkey), - ), _ => CrdsData::EpochSlots( rng.gen_range(0..MAX_RESTART_LAST_VOTED_FORK_SLOTS), EpochSlots::new_rand(rng, pubkey), @@ -513,41 +500,6 @@ impl Sanitize for NodeInstance { } } -#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, AbiExample)] -pub struct Percent { - pub from: Pubkey, - pub(crate) percent: u16, - pub(crate) wallclock: u64, -} - -impl Percent { - pub fn new(from: Pubkey, percent: u16) -> Self { - Self { - from, - percent, - wallclock: timestamp(), - } - } - - fn new_rand(rng: &mut R, pubkey: Option) -> Self { - Self { - from: pubkey.unwrap_or_else(pubkey::new_rand), - wallclock: new_rand_timestamp(rng), - percent: rng.gen_range(1..80), - } - } -} - -impl Sanitize for Percent { - fn sanitize(&self) -> Result<(), SanitizeError> { - sanitize_wallclock(self.wallclock)?; - if self.percent > MAX_PERCENT { - return Err(SanitizeError::ValueOutOfBounds); - } - self.from.sanitize() - } -} - /// Type of the replicated value /// These are labels for values in a record that is associated with `Pubkey` #[derive(PartialEq, Hash, Eq, Clone, Debug)] @@ -565,7 +517,6 @@ pub enum CrdsValueLabel { SnapshotHashes(Pubkey), ContactInfo(Pubkey), RestartLastVotedForkSlots(EpochSlotsIndex, Pubkey), - RestartHeaviestFork(Pubkey), } impl fmt::Display for CrdsValueLabel { @@ -592,9 +543,6 @@ impl fmt::Display for CrdsValueLabel { CrdsValueLabel::RestartLastVotedForkSlots(ix, _) => { write!(f, "RestartLastVotedForkSlots({}, {})", ix, self.pubkey()) } - CrdsValueLabel::RestartHeaviestFork(_) => { - write!(f, "RestartHeaviestFork({})", self.pubkey()) - } } } } @@ -615,7 +563,6 @@ impl CrdsValueLabel { CrdsValueLabel::SnapshotHashes(p) => *p, CrdsValueLabel::ContactInfo(pubkey) => *pubkey, CrdsValueLabel::RestartLastVotedForkSlots(_, p) => *p, - CrdsValueLabel::RestartHeaviestFork(p) => *p, } } } @@ -667,7 +614,6 @@ impl CrdsValue { CrdsData::SnapshotHashes(hash) => hash.wallclock, CrdsData::ContactInfo(node) => node.wallclock(), CrdsData::RestartLastVotedForkSlots(_, slots, _, _) => slots.wallclock, - CrdsData::RestartHeaviestFork(_, _, percent) => percent.wallclock, } } pub fn pubkey(&self) -> Pubkey { @@ -685,7 +631,6 @@ impl CrdsValue { CrdsData::SnapshotHashes(hash) => hash.from, CrdsData::ContactInfo(node) => *node.pubkey(), CrdsData::RestartLastVotedForkSlots(_, slots, _, _) => slots.from, - CrdsData::RestartHeaviestFork(_, _, percent) => percent.from, } } pub fn label(&self) -> CrdsValueLabel { @@ -707,9 +652,6 @@ impl CrdsValue { CrdsData::RestartLastVotedForkSlots(ix, _, _, _) => { CrdsValueLabel::RestartLastVotedForkSlots(*ix, self.pubkey()) } - CrdsData::RestartHeaviestFork(_, _, percent) => { - CrdsValueLabel::RestartHeaviestFork(percent.from) - } } } pub fn contact_info(&self) -> Option<&LegacyContactInfo> { @@ -1191,26 +1133,4 @@ mod test { ); assert_eq!(bad_value.sanitize(), Err(SanitizeError::ValueOutOfBounds)) } - - #[test] - fn test_restart_heaviest_fork() { - let keypair = Keypair::new(); - let slot = 53; - let hash = Hash::new_unique(); - let percent = Percent::new(keypair.pubkey(), 150); - let value = - CrdsValue::new_signed(CrdsData::RestartHeaviestFork(slot, hash, percent), &keypair); - assert_eq!(value.sanitize(), Ok(())); - let label = value.label(); - assert_eq!(label, CrdsValueLabel::RestartHeaviestFork(keypair.pubkey())); - assert_eq!(label.pubkey(), keypair.pubkey()); - - let bad_percent = Percent::new(keypair.pubkey(), MAX_PERCENT + 1); - assert_eq!(bad_percent.sanitize(), Err(SanitizeError::ValueOutOfBounds)); - let bad_value = CrdsValue::new_signed( - CrdsData::RestartHeaviestFork(slot, hash, bad_percent), - &keypair, - ); - assert_eq!(bad_value.sanitize(), Err(SanitizeError::ValueOutOfBounds)); - } } From 25e0117fc6b317aec5cf913bcc53fbe2dd08b65a Mon Sep 17 00:00:00 2001 From: Wen Date: Fri, 15 Sep 2023 09:31:38 -0700 Subject: [PATCH 04/13] Update frozen abi message. --- gossip/src/cluster_info.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 840c2bcc114b25..63ee19da564bac 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -271,7 +271,7 @@ pub fn make_accounts_hashes_message( pub(crate) type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>; // TODO These messages should go through the gpu pipeline for spam filtering -#[frozen_abi(digest = "EnbW8mYTsPMndq9NkHLTkHJgduXvWSfSD6bBdmqQ8TiF")] +#[frozen_abi(digest = "66QJmpJDqGW5NRc8LEzjrHQPb5shECT1tqxnocAS4KYW")] #[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)] #[allow(clippy::large_enum_variant)] pub(crate) enum Protocol { From 1fcc81de63cb195fe0bdafd5be83c35b3f7c790b Mon Sep 17 00:00:00 2001 From: Wen Date: Fri, 15 Sep 2023 10:21:06 -0700 Subject: [PATCH 05/13] Fix wrong number in test generation, change to pub(crate) to limit scope. --- gossip/src/crds_value.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/gossip/src/crds_value.rs b/gossip/src/crds_value.rs index f91f656a27e9f8..28ca9bac44ed73 100644 --- a/gossip/src/crds_value.rs +++ b/gossip/src/crds_value.rs @@ -39,7 +39,7 @@ pub const MAX_VOTES: VoteIndex = 32; pub type EpochSlotsIndex = u8; pub const MAX_EPOCH_SLOTS: EpochSlotsIndex = 255; // We now keep 81000 slots, 81000/MAX_SLOTS_PER_ENTRY = 5. -pub const MAX_RESTART_LAST_VOTED_FORK_SLOTS: EpochSlotsIndex = 5; +pub(crate) const MAX_RESTART_LAST_VOTED_FORK_SLOTS: EpochSlotsIndex = 5; /// CrdsValue that is replicated across the cluster #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, AbiExample)] @@ -167,13 +167,13 @@ impl CrdsData { 4 => CrdsData::Version(Version::new_rand(rng, pubkey)), 5 => CrdsData::Vote(rng.gen_range(0..MAX_VOTES), Vote::new_rand(rng, pubkey)), 6 => CrdsData::RestartLastVotedForkSlots( - rng.gen_range(0..MAX_EPOCH_SLOTS), + rng.gen_range(0..MAX_RESTART_LAST_VOTED_FORK_SLOTS), EpochSlots::new_rand(rng, pubkey), rng.gen_range(0..512), Hash::new_unique(), ), _ => CrdsData::EpochSlots( - rng.gen_range(0..MAX_RESTART_LAST_VOTED_FORK_SLOTS), + rng.gen_range(0..MAX_EPOCH_SLOTS), EpochSlots::new_rand(rng, pubkey), ), } From b7077a7f1472411c4873e7dc4db4665bcb47cfd2 Mon Sep 17 00:00:00 2001 From: Wen Date: Fri, 15 Sep 2023 16:04:30 -0700 Subject: [PATCH 06/13] Separate push_epoch_slots and push_restart_last_voted_fork_slots. --- gossip/src/cluster_info.rs | 112 ++++++++++++++++++------------------- 1 file changed, 55 insertions(+), 57 deletions(-) diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 63ee19da564bac..fd0c842bc3b03c 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -723,8 +723,9 @@ impl ClusterInfo { self.my_contact_info.read().unwrap().shred_version() } - fn lookup_epoch_slots(&self, label: CrdsValueLabel) -> EpochSlots { + fn lookup_epoch_slots(&self, ix: EpochSlotsIndex) -> EpochSlots { let self_pubkey = self.id(); + let label = CrdsValueLabel::EpochSlots(ix, self_pubkey); let gossip_crds = self.gossip.crds.read().unwrap(); gossip_crds .get::<&CrdsValue>(&label) @@ -899,42 +900,16 @@ impl ClusterInfo { } } - pub fn push_epoch_slots(&self, update: &[Slot]) { - self.push_epoch_slots_or_restart_last_voted_fork_slots(update, None) - } - - pub fn push_restart_last_voted_fork_slots(&self, update: &[Slot], last_vote_bankhash: Hash) { - self.push_epoch_slots_or_restart_last_voted_fork_slots(update, Some(last_vote_bankhash)) - } - // TODO: If two threads call into this function then epoch_slot_index has a // race condition and the threads will overwrite each other in crds table. - fn push_epoch_slots_or_restart_last_voted_fork_slots( - &self, - mut update: &[Slot], - last_vote_bankhash: Option, - ) { - let is_epoch_slot = last_vote_bankhash.is_none(); + pub fn push_epoch_slots(&self, mut update: &[Slot]) { let self_pubkey = self.id(); - let create_label = |ix| { - if is_epoch_slot { - CrdsValueLabel::EpochSlots(ix, self_pubkey) - } else { - CrdsValueLabel::RestartLastVotedForkSlots(ix, self_pubkey) - } - }; - let max_entries = if is_epoch_slot { - crds_value::MAX_EPOCH_SLOTS - } else { - crds_value::MAX_RESTART_LAST_VOTED_FORK_SLOTS - }; - let last_vote_slot = last_vote_bankhash.map(|_| *update.last().unwrap()); let current_slots: Vec<_> = { let gossip_crds = self.time_gossip_read_lock("lookup_epoch_slots", &self.stats.epoch_slots_lookup); - (0..max_entries) + (0..crds_value::MAX_EPOCH_SLOTS) .filter_map(|ix| { - let label = create_label(ix); + let label = CrdsValueLabel::EpochSlots(ix, self_pubkey); let crds_value = gossip_crds.get::<&CrdsValue>(&label)?; let epoch_slots = crds_value.epoch_slots()?; let first_slot = epoch_slots.first_slot()?; @@ -948,19 +923,17 @@ impl ClusterInfo { .min() .unwrap_or_default(); let max_slot: Slot = update.iter().max().cloned().unwrap_or(0); - if is_epoch_slot { - let total_slots = max_slot as isize - min_slot as isize; - // WARN if CRDS is not storing at least a full epoch worth of slots - if DEFAULT_SLOTS_PER_EPOCH as isize > total_slots - && crds_value::MAX_EPOCH_SLOTS as usize <= current_slots.len() - { - self.stats.epoch_slots_filled.add_relaxed(1); - warn!( - "EPOCH_SLOTS are filling up FAST {}/{}", - total_slots, - current_slots.len() - ); - } + let total_slots = max_slot as isize - min_slot as isize; + // WARN if CRDS is not storing at least a full epoch worth of slots + if DEFAULT_SLOTS_PER_EPOCH as isize > total_slots + && crds_value::MAX_EPOCH_SLOTS as usize <= current_slots.len() + { + self.stats.epoch_slots_filled.add_relaxed(1); + warn!( + "EPOCH_SLOTS are filling up FAST {}/{}", + total_slots, + current_slots.len() + ); } let mut reset = false; let mut epoch_slot_index = match current_slots.iter().max() { @@ -970,27 +943,18 @@ impl ClusterInfo { let mut entries = Vec::default(); let keypair = self.keypair(); while !update.is_empty() { - let ix = epoch_slot_index % max_entries; + let ix = epoch_slot_index % crds_value::MAX_EPOCH_SLOTS; let now = timestamp(); let mut slots = if !reset { - self.lookup_epoch_slots(create_label(ix)) + self.lookup_epoch_slots(ix) } else { EpochSlots::new(self_pubkey, now) }; let n = slots.fill(update, now); update = &update[n..]; if n > 0 { - let data = if is_epoch_slot { - CrdsData::EpochSlots(ix, slots) - } else { - CrdsData::RestartLastVotedForkSlots( - ix, - slots, - last_vote_slot.unwrap(), - last_vote_bankhash.unwrap(), - ) - }; - let entry = CrdsValue::new_signed(data, &keypair); + let epoch_slots = CrdsData::EpochSlots(ix, slots); + let entry = CrdsValue::new_signed(epoch_slots, &keypair); entries.push(entry); } epoch_slot_index += 1; @@ -998,10 +962,44 @@ impl ClusterInfo { } let mut gossip_crds = self.gossip.crds.write().unwrap(); let now = timestamp(); + for entry in entries { + if let Err(err) = gossip_crds.insert(entry, now, GossipRoute::LocalMessage) { + error!("push_epoch_slots failed: {:?}", err); + } + } + } + + // This is currently only called from one thread during startup, so no race conditions. + // The data format is similar to EpochSlots, but we overwrite previous results each time, + // so here we always start from 0. All pieces of the same push use the same timestamp. + pub fn push_restart_last_voted_fork_slots(&self, mut update: &[Slot], last_vote_bankhash: Hash) { + let self_pubkey = self.id(); + let last_vote_slot = *update.last().unwrap(); + let mut epoch_slot_index = 0; + let mut entries = Vec::default(); + let keypair = self.keypair(); + let now = timestamp(); + while !update.is_empty() { + let mut slots = EpochSlots::new(self_pubkey, now); + let n = slots.fill(update, now); + update = &update[n..]; + if n > 0 { + let data = CrdsData::RestartLastVotedForkSlots( + epoch_slot_index, + slots, + last_vote_slot, + last_vote_bankhash, + ); + let entry = CrdsValue::new_signed(data, &keypair); + entries.push(entry); + } + epoch_slot_index += 1; + } + let mut gossip_crds = self.gossip.crds.write().unwrap(); for entry in entries { if let Err(err) = gossip_crds.insert(entry, now, GossipRoute::LocalMessage) { error!( - "push_epoch_slots_or_restart_last_voted_fork_slots failed: {:?}", + "push_restart_last_voted_fork_slots failed: {:?}", err ); } From 354673a7847da7221d321e75b040952639ca2582 Mon Sep 17 00:00:00 2001 From: Wen Date: Sun, 17 Sep 2023 11:54:33 -0700 Subject: [PATCH 07/13] Add RestartLastVotedForkSlots data structure. --- gossip/src/cluster_info.rs | 64 +++++++++++------------ gossip/src/crds.rs | 28 ++++------ gossip/src/crds_value.rs | 101 +++++++++++++++++++++++++++++-------- 3 files changed, 117 insertions(+), 76 deletions(-) diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index fd0c842bc3b03c..2acc9248f596d6 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -33,8 +33,8 @@ use { }, crds_value::{ self, AccountsHashes, CrdsData, CrdsValue, CrdsValueLabel, EpochSlotsIndex, - LegacySnapshotHashes, LowestSlot, NodeInstance, SnapshotHashes, Version, Vote, - MAX_WALLCLOCK, + LegacySnapshotHashes, LowestSlot, NodeInstance, RestartLastVotedForkSlots, + SnapshotHashes, Version, Vote, MAX_WALLCLOCK, }, duplicate_shred::DuplicateShred, epoch_slots::EpochSlots, @@ -271,7 +271,7 @@ pub fn make_accounts_hashes_message( pub(crate) type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>; // TODO These messages should go through the gpu pipeline for spam filtering -#[frozen_abi(digest = "66QJmpJDqGW5NRc8LEzjrHQPb5shECT1tqxnocAS4KYW")] +#[frozen_abi(digest = "DfMmcwQriDV8hqHW1WYWuTCES41hLz6sXZZtXNL2wx7u")] #[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)] #[allow(clippy::large_enum_variant)] pub(crate) enum Protocol { @@ -398,7 +398,7 @@ fn retain_staked(values: &mut Vec, stakes: &HashMap) { CrdsData::LowestSlot(_, _) | CrdsData::LegacyVersion(_) | CrdsData::DuplicateShred(_, _) - | CrdsData::RestartLastVotedForkSlots(_, _, _, _) => { + | CrdsData::RestartLastVotedForkSlots(_, _) => { let stake = stakes.get(&value.pubkey()).copied(); stake.unwrap_or_default() >= MIN_STAKE_FOR_GOSSIP } @@ -972,7 +972,11 @@ impl ClusterInfo { // This is currently only called from one thread during startup, so no race conditions. // The data format is similar to EpochSlots, but we overwrite previous results each time, // so here we always start from 0. All pieces of the same push use the same timestamp. - pub fn push_restart_last_voted_fork_slots(&self, mut update: &[Slot], last_vote_bankhash: Hash) { + pub fn push_restart_last_voted_fork_slots( + &self, + mut update: &[Slot], + last_vote_bankhash: Hash, + ) { let self_pubkey = self.id(); let last_vote_slot = *update.last().unwrap(); let mut epoch_slot_index = 0; @@ -980,16 +984,16 @@ impl ClusterInfo { let keypair = self.keypair(); let now = timestamp(); while !update.is_empty() { - let mut slots = EpochSlots::new(self_pubkey, now); - let n = slots.fill(update, now); + let mut slots = RestartLastVotedForkSlots::new( + self_pubkey, + now, + last_vote_slot, + last_vote_bankhash, + ); + let n = slots.fill(update); update = &update[n..]; if n > 0 { - let data = CrdsData::RestartLastVotedForkSlots( - epoch_slot_index, - slots, - last_vote_slot, - last_vote_bankhash, - ); + let data = CrdsData::RestartLastVotedForkSlots(epoch_slot_index, slots); let entry = CrdsValue::new_signed(data, &keypair); entries.push(entry); } @@ -998,10 +1002,7 @@ impl ClusterInfo { let mut gossip_crds = self.gossip.crds.write().unwrap(); for entry in entries { if let Err(err) = gossip_crds.insert(entry, now, GossipRoute::LocalMessage) { - error!( - "push_restart_last_voted_fork_slots failed: {:?}", - err - ); + error!("push_restart_last_voted_fork_slots failed: {:?}", err); } } } @@ -1286,7 +1287,7 @@ impl ClusterInfo { pub fn get_restart_last_voted_fork_slots( &self, cursor: &mut Cursor, - ) -> Vec<(EpochSlotsIndex, EpochSlots, Slot, Hash)> { + ) -> Vec { let self_shred_version = Some(self.my_shred_version()); let gossip_crds = self.gossip.crds.read().unwrap(); gossip_crds @@ -1296,9 +1297,7 @@ impl ClusterInfo { gossip_crds.get_shred_version(&origin) == self_shred_version }) .map(|entry| match &entry.value.data { - CrdsData::RestartLastVotedForkSlots(index, slots, last_vote, last_vote_hash) => { - (*index, slots.clone(), *last_vote, *last_vote_hash) - } + CrdsData::RestartLastVotedForkSlots(_index, slots) => slots.clone(), _ => panic!("this should not happen!"), }) .collect() @@ -4083,8 +4082,8 @@ mod tests { let mut cursor = Cursor::default(); let slots = cluster_info.get_restart_last_voted_fork_slots(&mut cursor); assert_eq!(slots.len(), 2); - assert_eq!(slots[0].1.to_slots(0).len(), 42468); - assert_eq!(slots[1].1.to_slots(0).len(), 38532); + assert_eq!(slots[0].slots.to_slots(0).len(), 42468); + assert_eq!(slots[1].slots.to_slots(0).len(), 38532); let slots = cluster_info.get_restart_last_voted_fork_slots(&mut cursor); assert!(slots.is_empty()); @@ -4094,15 +4093,10 @@ mod tests { let node_pubkey = Pubkey::new_unique(); let mut node = LegacyContactInfo::new_rand(&mut rng, Some(node_pubkey)); node.set_shred_version(42); - let epoch_slots = EpochSlots::new_rand(&mut rng, Some(node_pubkey)); + let slots = RestartLastVotedForkSlots::new_rand(&mut rng, Some(node_pubkey)); let entries = vec![ CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(node)), - CrdsValue::new_unsigned(CrdsData::RestartLastVotedForkSlots( - 0, - epoch_slots, - 0, - Hash::default(), - )), + CrdsValue::new_unsigned(CrdsData::RestartLastVotedForkSlots(0, slots)), ]; { let mut gossip_crds = cluster_info.gossip.crds.write().unwrap(); @@ -4116,8 +4110,8 @@ mod tests { // shred-version. let slots = cluster_info.get_restart_last_voted_fork_slots(&mut Cursor::default()); assert_eq!(slots.len(), 2); - assert_eq!(slots[0].1.from, cluster_info.id()); - assert_eq!(slots[1].1.from, cluster_info.id()); + assert_eq!(slots[0].slots.from, cluster_info.id()); + assert_eq!(slots[1].slots.from, cluster_info.id()); // Match shred versions. { let mut node = cluster_info.my_contact_info.write().unwrap(); @@ -4128,9 +4122,9 @@ mod tests { // Should now include both epoch slots. let slots = cluster_info.get_restart_last_voted_fork_slots(&mut Cursor::default()); assert_eq!(slots.len(), 3); - assert_eq!(slots[0].1.from, cluster_info.id()); - assert_eq!(slots[1].1.from, cluster_info.id()); - assert_eq!(slots[2].1.from, node_pubkey); + assert_eq!(slots[0].slots.from, cluster_info.id()); + assert_eq!(slots[1].slots.from, cluster_info.id()); + assert_eq!(slots[2].slots.from, node_pubkey); } #[test] diff --git a/gossip/src/crds.rs b/gossip/src/crds.rs index f828ad2960f3b4..27099b4c975eb3 100644 --- a/gossip/src/crds.rs +++ b/gossip/src/crds.rs @@ -245,7 +245,7 @@ impl Crds { CrdsData::DuplicateShred(_, _) => { self.duplicate_shreds.insert(value.ordinal, entry_index); } - CrdsData::RestartLastVotedForkSlots(_, _, _, _) => { + CrdsData::RestartLastVotedForkSlots(_, _) => { self.restart_last_voted_fork_slots .insert(value.ordinal, entry_index); } @@ -284,7 +284,7 @@ impl Crds { self.duplicate_shreds.remove(&entry.get().ordinal); self.duplicate_shreds.insert(value.ordinal, entry_index); } - CrdsData::RestartLastVotedForkSlots(_, _, _, _) => { + CrdsData::RestartLastVotedForkSlots(_, _) => { self.restart_last_voted_fork_slots .remove(&entry.get().ordinal); self.restart_last_voted_fork_slots @@ -572,7 +572,7 @@ impl Crds { CrdsData::DuplicateShred(_, _) => { self.duplicate_shreds.remove(&value.ordinal); } - CrdsData::RestartLastVotedForkSlots(_, _, _, _) => { + CrdsData::RestartLastVotedForkSlots(_, _) => { self.restart_last_voted_fork_slots.remove(&value.ordinal); } _ => (), @@ -612,7 +612,7 @@ impl Crds { CrdsData::DuplicateShred(_, _) => { self.duplicate_shreds.insert(value.ordinal, index); } - CrdsData::RestartLastVotedForkSlots(_, _, _, _) => { + CrdsData::RestartLastVotedForkSlots(_, _) => { self.restart_last_voted_fork_slots .insert(value.ordinal, index); } @@ -756,7 +756,7 @@ impl CrdsDataStats { CrdsData::DuplicateShred(_, _) => 9, CrdsData::SnapshotHashes(_) => 10, CrdsData::ContactInfo(_) => 11, - CrdsData::RestartLastVotedForkSlots(_, _, _, _) => 12, + CrdsData::RestartLastVotedForkSlots(_, _) => 12, // Update CrdsCountsArray if new items are added here. } } @@ -1187,12 +1187,7 @@ mod tests { .table .values() .filter(|v| v.ordinal >= since) - .filter(|v| { - matches!( - v.value.data, - CrdsData::RestartLastVotedForkSlots(_, _, _, _) - ) - }) + .filter(|v| matches!(v.value.data, CrdsData::RestartLastVotedForkSlots(_, _))) .count(); let mut cursor = Cursor(since); assert_eq!( @@ -1211,7 +1206,7 @@ mod tests { for value in crds.get_restart_last_voted_fork_slots(&mut Cursor(since)) { assert!(value.ordinal >= since); match value.value.data { - CrdsData::RestartLastVotedForkSlots(_, _, _, _) => (), + CrdsData::RestartLastVotedForkSlots(_, _) => (), _ => panic!("not a last-voted-fork-slot!"), } } @@ -1275,12 +1270,7 @@ mod tests { let num_restart_last_voted_fork_slots = crds .table .values() - .filter(|v| { - matches!( - v.value.data, - CrdsData::RestartLastVotedForkSlots(_, _, _, _) - ) - }) + .filter(|v| matches!(v.value.data, CrdsData::RestartLastVotedForkSlots(_, _))) .count(); assert_eq!( crds.table.len(), @@ -1307,7 +1297,7 @@ mod tests { crds.get_restart_last_voted_fork_slots(&mut Cursor::default()) { match restart_last_voted_fork_slots.value.data { - CrdsData::RestartLastVotedForkSlots(_, _, _, _) => (), + CrdsData::RestartLastVotedForkSlots(_, _) => (), _ => panic!("not a restart-last-voted-fork-slot!"), } } diff --git a/gossip/src/crds_value.rs b/gossip/src/crds_value.rs index 28ca9bac44ed73..550f48431b41ff 100644 --- a/gossip/src/crds_value.rs +++ b/gossip/src/crds_value.rs @@ -96,7 +96,7 @@ pub enum CrdsData { DuplicateShred(DuplicateShredIndex, DuplicateShred), SnapshotHashes(SnapshotHashes), ContactInfo(ContactInfo), - RestartLastVotedForkSlots(EpochSlotsIndex, EpochSlots, Slot, Hash), + RestartLastVotedForkSlots(EpochSlotsIndex, RestartLastVotedForkSlots), } impl Sanitize for CrdsData { @@ -135,11 +135,11 @@ impl Sanitize for CrdsData { } CrdsData::SnapshotHashes(val) => val.sanitize(), CrdsData::ContactInfo(node) => node.sanitize(), - CrdsData::RestartLastVotedForkSlots(ix, slots, _last_vote_slot, last_vote_hash) => { + CrdsData::RestartLastVotedForkSlots(ix, slots) => { if *ix as usize >= MAX_RESTART_LAST_VOTED_FORK_SLOTS as usize { return Err(SanitizeError::ValueOutOfBounds); } - slots.sanitize().and(last_vote_hash.sanitize()) + slots.sanitize() } } } @@ -167,10 +167,8 @@ impl CrdsData { 4 => CrdsData::Version(Version::new_rand(rng, pubkey)), 5 => CrdsData::Vote(rng.gen_range(0..MAX_VOTES), Vote::new_rand(rng, pubkey)), 6 => CrdsData::RestartLastVotedForkSlots( - rng.gen_range(0..MAX_RESTART_LAST_VOTED_FORK_SLOTS), - EpochSlots::new_rand(rng, pubkey), - rng.gen_range(0..512), - Hash::new_unique(), + 0, + RestartLastVotedForkSlots::new_rand(rng, pubkey), ), _ => CrdsData::EpochSlots( rng.gen_range(0..MAX_EPOCH_SLOTS), @@ -500,6 +498,69 @@ impl Sanitize for NodeInstance { } } +#[derive(Serialize, Deserialize, Clone, Default, PartialEq, Eq, AbiExample)] +pub struct RestartLastVotedForkSlots { + pub slots: EpochSlots, + pub last_voted_slot: Slot, + pub last_voted_hash: Hash, +} + +impl Sanitize for RestartLastVotedForkSlots { + fn sanitize(&self) -> std::result::Result<(), SanitizeError> { + self.slots.sanitize()?; + self.last_voted_hash.sanitize() + } +} +impl fmt::Debug for RestartLastVotedForkSlots { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "RestartLastVotedForkSlots {{ slots: {:?} last_voted: {}({}) }}", + self.slots, self.last_voted_slot, self.last_voted_hash + ) + } +} + +impl RestartLastVotedForkSlots { + pub fn new(from: Pubkey, now: u64, last_voted_slot: Slot, last_voted_hash: Hash) -> Self { + Self { + slots: EpochSlots { + from, + slots: vec![], + wallclock: now, + }, + last_voted_slot, + last_voted_hash, + } + } + + pub fn new_from_fields( + slots: EpochSlots, + last_voted_slot: Slot, + last_voted_hash: Hash, + ) -> Self { + Self { + slots, + last_voted_slot, + last_voted_hash, + } + } + + /// New random Version for tests and benchmarks. + pub fn new_rand(rng: &mut R, pubkey: Option) -> Self { + let pubkey = pubkey.unwrap_or_else(solana_sdk::pubkey::new_rand); + Self { + slots: EpochSlots::new_rand(rng, Some(pubkey)), + last_voted_slot: rng.gen_range(0..512), + last_voted_hash: Hash::new_unique(), + } + } + + pub fn fill(&mut self, update: &[Slot]) -> usize { + self.slots.fill(update, timestamp()) + } +} + /// Type of the replicated value /// These are labels for values in a record that is associated with `Pubkey` #[derive(PartialEq, Hash, Eq, Clone, Debug)] @@ -613,7 +674,7 @@ impl CrdsValue { CrdsData::DuplicateShred(_, shred) => shred.wallclock, CrdsData::SnapshotHashes(hash) => hash.wallclock, CrdsData::ContactInfo(node) => node.wallclock(), - CrdsData::RestartLastVotedForkSlots(_, slots, _, _) => slots.wallclock, + CrdsData::RestartLastVotedForkSlots(_, slots) => slots.slots.wallclock, } } pub fn pubkey(&self) -> Pubkey { @@ -630,7 +691,7 @@ impl CrdsValue { CrdsData::DuplicateShred(_, shred) => shred.from, CrdsData::SnapshotHashes(hash) => hash.from, CrdsData::ContactInfo(node) => *node.pubkey(), - CrdsData::RestartLastVotedForkSlots(_, slots, _, _) => slots.from, + CrdsData::RestartLastVotedForkSlots(_, slots) => slots.slots.from, } } pub fn label(&self) -> CrdsValueLabel { @@ -649,7 +710,7 @@ impl CrdsValue { CrdsData::DuplicateShred(ix, shred) => CrdsValueLabel::DuplicateShred(*ix, shred.from), CrdsData::SnapshotHashes(_) => CrdsValueLabel::SnapshotHashes(self.pubkey()), CrdsData::ContactInfo(node) => CrdsValueLabel::ContactInfo(*node.pubkey()), - CrdsData::RestartLastVotedForkSlots(ix, _, _, _) => { + CrdsData::RestartLastVotedForkSlots(ix, _) => { CrdsValueLabel::RestartLastVotedForkSlots(*ix, self.pubkey()) } } @@ -671,7 +732,7 @@ impl CrdsValue { pub(crate) fn epoch_slots(&self) -> Option<&EpochSlots> { match &self.data { CrdsData::EpochSlots(_, slots) => Some(slots), - CrdsData::RestartLastVotedForkSlots(_, slots, _, _) => Some(slots), + CrdsData::RestartLastVotedForkSlots(_, slots) => Some(&slots.slots), _ => None, } } @@ -1103,12 +1164,13 @@ mod test { #[test] fn test_restart_last_voted_fork_slots() { let keypair = Keypair::new(); - let mut epoch_slots = EpochSlots::new(keypair.pubkey(), timestamp()); let slot = 53; - epoch_slots.fill(&[slot], timestamp()); + let mut slots = + RestartLastVotedForkSlots::new(keypair.pubkey(), timestamp(), slot, Hash::default()); + slots.fill(&[slot]); let ix = 1; let value = CrdsValue::new_signed( - CrdsData::RestartLastVotedForkSlots(ix, epoch_slots.clone(), slot, Hash::default()), + CrdsData::RestartLastVotedForkSlots(ix, slots.clone()), &keypair, ); assert_eq!(value.sanitize(), Ok(())); @@ -1119,16 +1181,11 @@ mod test { ); assert_eq!(label.pubkey(), keypair.pubkey()); let retrieved_epoch_slots = value.epoch_slots().unwrap(); - assert_eq!(value.wallclock(), epoch_slots.wallclock); - assert_eq!(retrieved_epoch_slots, &epoch_slots); + assert_eq!(value.wallclock(), slots.slots.wallclock); + assert_eq!(retrieved_epoch_slots, &slots.slots); let bad_value = CrdsValue::new_signed( - CrdsData::RestartLastVotedForkSlots( - MAX_RESTART_LAST_VOTED_FORK_SLOTS, - epoch_slots, - slot, - Hash::default(), - ), + CrdsData::RestartLastVotedForkSlots(MAX_RESTART_LAST_VOTED_FORK_SLOTS, slots), &keypair, ); assert_eq!(bad_value.sanitize(), Err(SanitizeError::ValueOutOfBounds)) From 0028f648365488cc260eb58a638387cd6b1223e1 Mon Sep 17 00:00:00 2001 From: Wen Date: Sun, 17 Sep 2023 23:31:17 -0700 Subject: [PATCH 08/13] Remove unused parts to make PR smaller. --- gossip/src/cluster_info.rs | 127 +------------------------------------ gossip/src/crds.rs | 95 +-------------------------- gossip/src/crds_value.rs | 27 +------- 3 files changed, 7 insertions(+), 242 deletions(-) diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 2acc9248f596d6..2f81df42396075 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -33,8 +33,8 @@ use { }, crds_value::{ self, AccountsHashes, CrdsData, CrdsValue, CrdsValueLabel, EpochSlotsIndex, - LegacySnapshotHashes, LowestSlot, NodeInstance, RestartLastVotedForkSlots, - SnapshotHashes, Version, Vote, MAX_WALLCLOCK, + LegacySnapshotHashes, LowestSlot, NodeInstance, SnapshotHashes, Version, Vote, + MAX_WALLCLOCK, }, duplicate_shred::DuplicateShred, epoch_slots::EpochSlots, @@ -969,44 +969,6 @@ impl ClusterInfo { } } - // This is currently only called from one thread during startup, so no race conditions. - // The data format is similar to EpochSlots, but we overwrite previous results each time, - // so here we always start from 0. All pieces of the same push use the same timestamp. - pub fn push_restart_last_voted_fork_slots( - &self, - mut update: &[Slot], - last_vote_bankhash: Hash, - ) { - let self_pubkey = self.id(); - let last_vote_slot = *update.last().unwrap(); - let mut epoch_slot_index = 0; - let mut entries = Vec::default(); - let keypair = self.keypair(); - let now = timestamp(); - while !update.is_empty() { - let mut slots = RestartLastVotedForkSlots::new( - self_pubkey, - now, - last_vote_slot, - last_vote_bankhash, - ); - let n = slots.fill(update); - update = &update[n..]; - if n > 0 { - let data = CrdsData::RestartLastVotedForkSlots(epoch_slot_index, slots); - let entry = CrdsValue::new_signed(data, &keypair); - entries.push(entry); - } - epoch_slot_index += 1; - } - let mut gossip_crds = self.gossip.crds.write().unwrap(); - for entry in entries { - if let Err(err) = gossip_crds.insert(entry, now, GossipRoute::LocalMessage) { - error!("push_restart_last_voted_fork_slots failed: {:?}", err); - } - } - } - fn time_gossip_read_lock<'a>( &'a self, label: &'static str, @@ -1282,27 +1244,6 @@ impl ClusterInfo { .collect() } - /// Returns last-voted-fork-slots inserted since the given cursor. - /// Excludes entries from nodes with unkown or different shred version. - pub fn get_restart_last_voted_fork_slots( - &self, - cursor: &mut Cursor, - ) -> Vec { - let self_shred_version = Some(self.my_shred_version()); - let gossip_crds = self.gossip.crds.read().unwrap(); - gossip_crds - .get_restart_last_voted_fork_slots(cursor) - .filter(|entry| { - let origin = entry.value.pubkey(); - gossip_crds.get_shred_version(&origin) == self_shred_version - }) - .map(|entry| match &entry.value.data { - CrdsData::RestartLastVotedForkSlots(_index, slots) => slots.clone(), - _ => panic!("this should not happen!"), - }) - .collect() - } - /// Returns duplicate-shreds inserted since the given cursor. pub(crate) fn get_duplicate_shreds(&self, cursor: &mut Cursor) -> Vec { let gossip_crds = self.gossip.crds.read().unwrap(); @@ -4063,70 +4004,6 @@ mod tests { assert_eq!(slots[1].from, node_pubkey); } - #[test] - fn test_push_restart_last_voted_fork_slots() { - solana_logger::setup(); - let keypair = Arc::new(Keypair::new()); - let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), 0); - let cluster_info = ClusterInfo::new(contact_info, keypair, SocketAddrSpace::Unspecified); - let slots = cluster_info.get_restart_last_voted_fork_slots(&mut Cursor::default()); - assert!(slots.is_empty()); - let mut update: Vec = vec![0]; - for i in 0..81 { - for j in 0..1000 { - update.push(i * 1050 + j); - } - } - cluster_info.push_restart_last_voted_fork_slots(&update, Hash::default()); - - let mut cursor = Cursor::default(); - let slots = cluster_info.get_restart_last_voted_fork_slots(&mut cursor); - assert_eq!(slots.len(), 2); - assert_eq!(slots[0].slots.to_slots(0).len(), 42468); - assert_eq!(slots[1].slots.to_slots(0).len(), 38532); - - let slots = cluster_info.get_restart_last_voted_fork_slots(&mut cursor); - assert!(slots.is_empty()); - - // Test with different shred versions. - let mut rng = rand::thread_rng(); - let node_pubkey = Pubkey::new_unique(); - let mut node = LegacyContactInfo::new_rand(&mut rng, Some(node_pubkey)); - node.set_shred_version(42); - let slots = RestartLastVotedForkSlots::new_rand(&mut rng, Some(node_pubkey)); - let entries = vec![ - CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(node)), - CrdsValue::new_unsigned(CrdsData::RestartLastVotedForkSlots(0, slots)), - ]; - { - let mut gossip_crds = cluster_info.gossip.crds.write().unwrap(); - for entry in entries { - assert!(gossip_crds - .insert(entry, /*now=*/ 0, GossipRoute::LocalMessage) - .is_ok()); - } - } - // Should exclude other node's last-voted-fork-slot because of different - // shred-version. - let slots = cluster_info.get_restart_last_voted_fork_slots(&mut Cursor::default()); - assert_eq!(slots.len(), 2); - assert_eq!(slots[0].slots.from, cluster_info.id()); - assert_eq!(slots[1].slots.from, cluster_info.id()); - // Match shred versions. - { - let mut node = cluster_info.my_contact_info.write().unwrap(); - node.set_shred_version(42); - } - cluster_info.push_self(); - cluster_info.flush_push_queue(); - // Should now include both epoch slots. - let slots = cluster_info.get_restart_last_voted_fork_slots(&mut Cursor::default()); - assert_eq!(slots.len(), 3); - assert_eq!(slots[0].slots.from, cluster_info.id()); - assert_eq!(slots[1].slots.from, cluster_info.id()); - assert_eq!(slots[2].slots.from, node_pubkey); - } - #[test] fn test_append_entrypoint_to_pulls() { let thread_pool = ThreadPoolBuilder::new().build().unwrap(); diff --git a/gossip/src/crds.rs b/gossip/src/crds.rs index 27099b4c975eb3..20aebf3670f807 100644 --- a/gossip/src/crds.rs +++ b/gossip/src/crds.rs @@ -77,8 +77,6 @@ pub struct Crds { epoch_slots: BTreeMap, // Indices of DuplicateShred keyed by insert order. duplicate_shreds: BTreeMap, - // Indices of RestartLastVotedForkSlots keyed by insert order. - restart_last_voted_fork_slots: BTreeMap, // Indices of all crds values associated with a node. records: HashMap>, // Indices of all entries keyed by insert order. @@ -171,7 +169,6 @@ impl Default for Crds { votes: BTreeMap::default(), epoch_slots: BTreeMap::default(), duplicate_shreds: BTreeMap::default(), - restart_last_voted_fork_slots: BTreeMap::default(), records: HashMap::default(), entries: BTreeMap::default(), purged: VecDeque::default(), @@ -245,10 +242,6 @@ impl Crds { CrdsData::DuplicateShred(_, _) => { self.duplicate_shreds.insert(value.ordinal, entry_index); } - CrdsData::RestartLastVotedForkSlots(_, _) => { - self.restart_last_voted_fork_slots - .insert(value.ordinal, entry_index); - } _ => (), }; self.entries.insert(value.ordinal, entry_index); @@ -284,12 +277,6 @@ impl Crds { self.duplicate_shreds.remove(&entry.get().ordinal); self.duplicate_shreds.insert(value.ordinal, entry_index); } - CrdsData::RestartLastVotedForkSlots(_, _) => { - self.restart_last_voted_fork_slots - .remove(&entry.get().ordinal); - self.restart_last_voted_fork_slots - .insert(value.ordinal, entry_index); - } _ => (), } self.entries.remove(&entry.get().ordinal); @@ -390,21 +377,6 @@ impl Crds { }) } - /// Returns last_voted_fork_slots inserted since the given cursor. - /// Updates the cursor as the values are consumed. - pub(crate) fn get_restart_last_voted_fork_slots<'a>( - &'a self, - cursor: &'a mut Cursor, - ) -> impl Iterator { - let range = (Bound::Included(cursor.ordinal()), Bound::Unbounded); - self.restart_last_voted_fork_slots - .range(range) - .map(move |(ordinal, index)| { - cursor.consume(*ordinal); - self.table.index(*index) - }) - } - /// Returns all entries inserted since the given cursor. pub(crate) fn get_entries<'a>( &'a self, @@ -572,9 +544,6 @@ impl Crds { CrdsData::DuplicateShred(_, _) => { self.duplicate_shreds.remove(&value.ordinal); } - CrdsData::RestartLastVotedForkSlots(_, _) => { - self.restart_last_voted_fork_slots.remove(&value.ordinal); - } _ => (), } self.entries.remove(&value.ordinal); @@ -612,10 +581,6 @@ impl Crds { CrdsData::DuplicateShred(_, _) => { self.duplicate_shreds.insert(value.ordinal, index); } - CrdsData::RestartLastVotedForkSlots(_, _) => { - self.restart_last_voted_fork_slots - .insert(value.ordinal, index); - } _ => (), }; self.entries.insert(value.ordinal, index); @@ -1154,7 +1119,6 @@ mod tests { usize, // number of nodes usize, // number of votes usize, // number of epoch slots - usize, // number of restart last voted fork slots ) { let size = crds.table.len(); let since = if size == 0 || rng.gen() { @@ -1183,33 +1147,6 @@ mod tests { assert!(value.ordinal >= since); assert_matches!(value.value.data, CrdsData::EpochSlots(_, _)); } - let num_last_voted_fork_slots = crds - .table - .values() - .filter(|v| v.ordinal >= since) - .filter(|v| matches!(v.value.data, CrdsData::RestartLastVotedForkSlots(_, _))) - .count(); - let mut cursor = Cursor(since); - assert_eq!( - num_last_voted_fork_slots, - crds.get_restart_last_voted_fork_slots(&mut cursor).count() - ); - assert_eq!( - cursor.0, - crds.restart_last_voted_fork_slots - .iter() - .last() - .map(|(k, _)| k + 1) - .unwrap_or_default() - .max(since) - ); - for value in crds.get_restart_last_voted_fork_slots(&mut Cursor(since)) { - assert!(value.ordinal >= since); - match value.value.data { - CrdsData::RestartLastVotedForkSlots(_, _) => (), - _ => panic!("not a last-voted-fork-slot!"), - } - } let num_votes = crds .table .values() @@ -1267,11 +1204,6 @@ mod tests { .values() .filter(|v| matches!(v.value.data, CrdsData::EpochSlots(_, _))) .count(); - let num_restart_last_voted_fork_slots = crds - .table - .values() - .filter(|v| matches!(v.value.data, CrdsData::RestartLastVotedForkSlots(_, _))) - .count(); assert_eq!( crds.table.len(), crds.get_entries(&mut Cursor::default()).count() @@ -1282,31 +1214,13 @@ mod tests { num_epoch_slots, crds.get_epoch_slots(&mut Cursor::default()).count() ); - assert_eq!( - num_restart_last_voted_fork_slots, - crds.get_restart_last_voted_fork_slots(&mut Cursor::default()) - .count() - ); for vote in crds.get_votes(&mut Cursor::default()) { assert_matches!(vote.value.data, CrdsData::Vote(_, _)); } for epoch_slots in crds.get_epoch_slots(&mut Cursor::default()) { assert_matches!(epoch_slots.value.data, CrdsData::EpochSlots(_, _)); } - for restart_last_voted_fork_slots in - crds.get_restart_last_voted_fork_slots(&mut Cursor::default()) - { - match restart_last_voted_fork_slots.value.data { - CrdsData::RestartLastVotedForkSlots(_, _) => (), - _ => panic!("not a restart-last-voted-fork-slot!"), - } - } - ( - num_nodes, - num_votes, - num_epoch_slots, - num_last_voted_fork_slots, - ) + (num_nodes, num_votes, num_epoch_slots) } #[test] @@ -1332,16 +1246,11 @@ mod tests { assert!(crds.table.len() > 200); assert_eq!(crds.num_purged() + crds.table.len(), 4096); assert!(num_inserts > crds.table.len()); - let (num_nodes, num_votes, num_epoch_slots, num_restart_last_voted_fork_slots) = - check_crds_value_indices(&mut rng, &crds); + let (num_nodes, num_votes, num_epoch_slots) = check_crds_value_indices(&mut rng, &crds); assert!(num_nodes * 3 < crds.table.len()); assert!(num_nodes > 100, "num nodes: {num_nodes}"); assert!(num_votes > 100, "num votes: {num_votes}"); assert!(num_epoch_slots > 100, "num epoch slots: {num_epoch_slots}"); - assert!( - num_restart_last_voted_fork_slots > 0, - "num restart last voted fork slots: {num_restart_last_voted_fork_slots}" - ); // Remove values one by one and assert that nodes indices stay valid. while !crds.table.is_empty() { let index = rng.gen_range(0..crds.table.len()); diff --git a/gossip/src/crds_value.rs b/gossip/src/crds_value.rs index 550f48431b41ff..d3e2a4b1ef0ec5 100644 --- a/gossip/src/crds_value.rs +++ b/gossip/src/crds_value.rs @@ -136,7 +136,7 @@ impl Sanitize for CrdsData { CrdsData::SnapshotHashes(val) => val.sanitize(), CrdsData::ContactInfo(node) => node.sanitize(), CrdsData::RestartLastVotedForkSlots(ix, slots) => { - if *ix as usize >= MAX_RESTART_LAST_VOTED_FORK_SLOTS as usize { + if *ix >= MAX_RESTART_LAST_VOTED_FORK_SLOTS { return Err(SanitizeError::ValueOutOfBounds); } slots.sanitize() @@ -167,7 +167,7 @@ impl CrdsData { 4 => CrdsData::Version(Version::new_rand(rng, pubkey)), 5 => CrdsData::Vote(rng.gen_range(0..MAX_VOTES), Vote::new_rand(rng, pubkey)), 6 => CrdsData::RestartLastVotedForkSlots( - 0, + rng.gen_range(0..MAX_RESTART_LAST_VOTED_FORK_SLOTS), RestartLastVotedForkSlots::new_rand(rng, pubkey), ), _ => CrdsData::EpochSlots( @@ -498,7 +498,7 @@ impl Sanitize for NodeInstance { } } -#[derive(Serialize, Deserialize, Clone, Default, PartialEq, Eq, AbiExample)] +#[derive(Serialize, Deserialize, Clone, Default, PartialEq, Eq, AbiExample, Debug)] pub struct RestartLastVotedForkSlots { pub slots: EpochSlots, pub last_voted_slot: Slot, @@ -511,15 +511,6 @@ impl Sanitize for RestartLastVotedForkSlots { self.last_voted_hash.sanitize() } } -impl fmt::Debug for RestartLastVotedForkSlots { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - f, - "RestartLastVotedForkSlots {{ slots: {:?} last_voted: {}({}) }}", - self.slots, self.last_voted_slot, self.last_voted_hash - ) - } -} impl RestartLastVotedForkSlots { pub fn new(from: Pubkey, now: u64, last_voted_slot: Slot, last_voted_hash: Hash) -> Self { @@ -534,18 +525,6 @@ impl RestartLastVotedForkSlots { } } - pub fn new_from_fields( - slots: EpochSlots, - last_voted_slot: Slot, - last_voted_hash: Hash, - ) -> Self { - Self { - slots, - last_voted_slot, - last_voted_hash, - } - } - /// New random Version for tests and benchmarks. pub fn new_rand(rng: &mut R, pubkey: Option) -> Self { let pubkey = pubkey.unwrap_or_else(solana_sdk::pubkey::new_rand); From 46d205475028d70245d7f5f6f5be8d88b2ac645e Mon Sep 17 00:00:00 2001 From: Wen Date: Mon, 18 Sep 2023 09:57:50 -0700 Subject: [PATCH 09/13] Remove unused clone. --- gossip/src/crds.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gossip/src/crds.rs b/gossip/src/crds.rs index 20aebf3670f807..16aaba17b1a4c5 100644 --- a/gossip/src/crds.rs +++ b/gossip/src/crds.rs @@ -1233,7 +1233,7 @@ mod tests { let keypair = &keypairs[rng.gen_range(0..keypairs.len())]; let value = CrdsValue::new_rand(&mut rng, Some(keypair)); let local_timestamp = new_rand_timestamp(&mut rng); - if let Ok(()) = crds.insert(value.clone(), local_timestamp, GossipRoute::LocalMessage) { + if let Ok(()) = crds.insert(value, local_timestamp, GossipRoute::LocalMessage) { num_inserts += 1; } if k % 16 == 0 { From cc11d42c0a7c1531354152006012718385e8079e Mon Sep 17 00:00:00 2001 From: Wen Date: Mon, 18 Sep 2023 11:50:32 -0700 Subject: [PATCH 10/13] Use CompressedSlotsVec to share code between EpochSlots and RestartLastVotedForkSlots. --- gossip/src/cluster_info.rs | 2 +- gossip/src/crds_value.rs | 37 ++++++----- gossip/src/epoch_slots.rs | 131 ++++++++++++++++++++++++------------- 3 files changed, 109 insertions(+), 61 deletions(-) diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 2f81df42396075..8b7a7da3c58e4a 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -271,7 +271,7 @@ pub fn make_accounts_hashes_message( pub(crate) type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>; // TODO These messages should go through the gpu pipeline for spam filtering -#[frozen_abi(digest = "DfMmcwQriDV8hqHW1WYWuTCES41hLz6sXZZtXNL2wx7u")] +#[frozen_abi(digest = "DxPH4Cj8QHbGb8daVf8uX3HckPkB9XqKE2AHtQzYYZLr")] #[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)] #[allow(clippy::large_enum_variant)] pub(crate) enum Protocol { diff --git a/gossip/src/crds_value.rs b/gossip/src/crds_value.rs index d3e2a4b1ef0ec5..3be49da4e1f620 100644 --- a/gossip/src/crds_value.rs +++ b/gossip/src/crds_value.rs @@ -4,7 +4,7 @@ use { contact_info::ContactInfo, deprecated, duplicate_shred::{DuplicateShred, DuplicateShredIndex, MAX_DUPLICATE_SHREDS}, - epoch_slots::EpochSlots, + epoch_slots::{CompressedSlotsVec, EpochSlots}, legacy_contact_info::LegacyContactInfo, }, bincode::{serialize, serialized_size}, @@ -500,7 +500,9 @@ impl Sanitize for NodeInstance { #[derive(Serialize, Deserialize, Clone, Default, PartialEq, Eq, AbiExample, Debug)] pub struct RestartLastVotedForkSlots { - pub slots: EpochSlots, + pub from: Pubkey, + pub wallclock: u64, + pub slots: CompressedSlotsVec, pub last_voted_slot: Slot, pub last_voted_hash: Hash, } @@ -515,11 +517,9 @@ impl Sanitize for RestartLastVotedForkSlots { impl RestartLastVotedForkSlots { pub fn new(from: Pubkey, now: u64, last_voted_slot: Slot, last_voted_hash: Hash) -> Self { Self { - slots: EpochSlots { - from, - slots: vec![], - wallclock: now, - }, + from, + wallclock: now, + slots: CompressedSlotsVec::new(), last_voted_slot, last_voted_hash, } @@ -529,14 +529,16 @@ impl RestartLastVotedForkSlots { pub fn new_rand(rng: &mut R, pubkey: Option) -> Self { let pubkey = pubkey.unwrap_or_else(solana_sdk::pubkey::new_rand); Self { - slots: EpochSlots::new_rand(rng, Some(pubkey)), + from: pubkey, + wallclock: new_rand_timestamp(rng), + slots: CompressedSlotsVec::new_rand(rng), last_voted_slot: rng.gen_range(0..512), last_voted_hash: Hash::new_unique(), } } pub fn fill(&mut self, update: &[Slot]) -> usize { - self.slots.fill(update, timestamp()) + self.slots.fill(update) } } @@ -653,7 +655,7 @@ impl CrdsValue { CrdsData::DuplicateShred(_, shred) => shred.wallclock, CrdsData::SnapshotHashes(hash) => hash.wallclock, CrdsData::ContactInfo(node) => node.wallclock(), - CrdsData::RestartLastVotedForkSlots(_, slots) => slots.slots.wallclock, + CrdsData::RestartLastVotedForkSlots(_, slots) => slots.wallclock, } } pub fn pubkey(&self) -> Pubkey { @@ -670,7 +672,7 @@ impl CrdsValue { CrdsData::DuplicateShred(_, shred) => shred.from, CrdsData::SnapshotHashes(hash) => hash.from, CrdsData::ContactInfo(node) => *node.pubkey(), - CrdsData::RestartLastVotedForkSlots(_, slots) => slots.slots.from, + CrdsData::RestartLastVotedForkSlots(_, slots) => slots.from, } } pub fn label(&self) -> CrdsValueLabel { @@ -711,7 +713,6 @@ impl CrdsValue { pub(crate) fn epoch_slots(&self) -> Option<&EpochSlots> { match &self.data { CrdsData::EpochSlots(_, slots) => Some(slots), - CrdsData::RestartLastVotedForkSlots(_, slots) => Some(&slots.slots), _ => None, } } @@ -1144,9 +1145,11 @@ mod test { fn test_restart_last_voted_fork_slots() { let keypair = Keypair::new(); let slot = 53; + let slot_parent = slot - 5; let mut slots = RestartLastVotedForkSlots::new(keypair.pubkey(), timestamp(), slot, Hash::default()); - slots.fill(&[slot]); + let original_slots_vec = [slot_parent, slot]; + slots.fill(&original_slots_vec); let ix = 1; let value = CrdsValue::new_signed( CrdsData::RestartLastVotedForkSlots(ix, slots.clone()), @@ -1159,9 +1162,11 @@ mod test { CrdsValueLabel::RestartLastVotedForkSlots(ix, keypair.pubkey()) ); assert_eq!(label.pubkey(), keypair.pubkey()); - let retrieved_epoch_slots = value.epoch_slots().unwrap(); - assert_eq!(value.wallclock(), slots.slots.wallclock); - assert_eq!(retrieved_epoch_slots, &slots.slots); + assert_eq!(value.wallclock(), slots.wallclock); + let retrived_slots = slots.slots.to_slots(0); + assert_eq!(retrived_slots.len(), 2); + assert_eq!(retrived_slots[0], slot_parent); + assert_eq!(retrived_slots[1], slot); let bad_value = CrdsValue::new_signed( CrdsData::RestartLastVotedForkSlots(MAX_RESTART_LAST_VOTED_FORK_SLOTS, slots), diff --git a/gossip/src/epoch_slots.rs b/gossip/src/epoch_slots.rs index dc94380b33e5de..02d9d4200ad48f 100644 --- a/gossip/src/epoch_slots.rs +++ b/gossip/src/epoch_slots.rs @@ -11,6 +11,7 @@ use { pubkey::Pubkey, sanitize::{Sanitize, SanitizeError}, }, + std::fmt, }; const MAX_SLOTS_PER_ENTRY: usize = 2048 * 8; @@ -225,24 +226,17 @@ impl CompressedSlots { } #[derive(Serialize, Deserialize, Clone, Default, PartialEq, Eq, AbiExample)] -pub struct EpochSlots { - pub from: Pubkey, - pub slots: Vec, - pub wallclock: u64, +pub struct CompressedSlotsVec { + slots: Vec, } -impl Sanitize for EpochSlots { +impl Sanitize for CompressedSlotsVec { fn sanitize(&self) -> std::result::Result<(), SanitizeError> { - if self.wallclock >= MAX_WALLCLOCK { - return Err(SanitizeError::ValueOutOfBounds); - } - self.from.sanitize()?; self.slots.sanitize() } } -use std::fmt; -impl fmt::Debug for EpochSlots { +impl fmt::Debug for CompressedSlotsVec { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let num_slots: usize = self.slots.iter().map(|s| s.num_slots()).sum(); let lowest_slot = self @@ -250,25 +244,17 @@ impl fmt::Debug for EpochSlots { .iter() .map(|s| s.first_slot()) .fold(0, std::cmp::min); - write!( - f, - "EpochSlots {{ from: {} num_slots: {} lowest_slot: {} wallclock: {} }}", - self.from, num_slots, lowest_slot, self.wallclock - ) + write!(f, "num_slots: {} lowest_slot: {}", num_slots, lowest_slot,) } } -impl EpochSlots { - pub fn new(from: Pubkey, now: u64) -> Self { - Self { - from, - wallclock: now, - slots: vec![], - } +impl CompressedSlotsVec { + pub fn new() -> Self { + Self { slots: vec![] } } - pub fn fill(&mut self, slots: &[Slot], now: u64) -> usize { + + pub fn fill(&mut self, slots: &[Slot]) -> usize { let mut num = 0; - self.wallclock = std::cmp::max(now, self.wallclock + 1); while num < slots.len() { num += self.add(&slots[num..]); if num < slots.len() { @@ -321,17 +307,73 @@ impl EpochSlots { .collect() } - /// New random EpochSlots for tests and simulations. - pub(crate) fn new_rand(rng: &mut R, pubkey: Option) -> Self { - let now = crds_value::new_rand_timestamp(rng); - let pubkey = pubkey.unwrap_or_else(solana_sdk::pubkey::new_rand); - let mut epoch_slots = Self::new(pubkey, now); + /// New random CompressedSlotsVec for tests and simulations. + pub(crate) fn new_rand(rng: &mut R) -> Self { + let mut result = Self::new(); let num_slots = rng.gen_range(0..20); let slots: Vec<_> = std::iter::repeat_with(|| 47825632 + rng.gen_range(0..512)) .take(num_slots) .collect(); - epoch_slots.add(&slots); - epoch_slots + result.add(&slots); + result + } +} + +#[derive(Serialize, Deserialize, Clone, Default, PartialEq, Eq, AbiExample, Debug)] +pub struct EpochSlots { + pub from: Pubkey, + slots_vec: CompressedSlotsVec, + pub wallclock: u64, +} + +impl Sanitize for EpochSlots { + fn sanitize(&self) -> std::result::Result<(), SanitizeError> { + if self.wallclock >= MAX_WALLCLOCK { + return Err(SanitizeError::ValueOutOfBounds); + } + self.from.sanitize()?; + self.slots_vec.slots.sanitize() + } +} + +impl EpochSlots { + pub fn new(from: Pubkey, now: u64) -> Self { + Self { + from, + wallclock: now, + slots_vec: CompressedSlotsVec::new(), + } + } + pub fn fill(&mut self, slots: &[Slot], now: u64) -> usize { + self.wallclock = std::cmp::max(now, self.wallclock + 1); + self.slots_vec.fill(slots) + } + + pub fn add(&mut self, slots: &[Slot]) -> usize { + self.slots_vec.add(slots) + } + pub fn deflate(&mut self) -> Result<()> { + self.slots_vec.deflate() + } + pub fn max_compressed_slot_size(&self) -> isize { + self.slots_vec.max_compressed_slot_size() + } + + pub fn first_slot(&self) -> Option { + self.slots_vec.first_slot() + } + + pub fn to_slots(&self, min_slot: Slot) -> Vec { + self.slots_vec.to_slots(min_slot) + } + + /// New random EpochSlots for tests and simulations. + pub(crate) fn new_rand(rng: &mut R, pubkey: Option) -> Self { + Self { + from: pubkey.unwrap_or_else(solana_sdk::pubkey::new_rand), + wallclock: crds_value::new_rand_timestamp(rng), + slots_vec: CompressedSlotsVec::new_rand(rng), + } } } @@ -466,15 +508,16 @@ mod tests { let mut slots = EpochSlots::default(); assert_eq!(slots.fill(&range, 2), 5000); assert_eq!(slots.wallclock, 2); - assert_eq!(slots.slots.len(), 3); - assert_eq!(slots.slots[0].first_slot(), 0); - assert_ne!(slots.slots[0].num_slots(), 0); - let next = slots.slots[0].num_slots() as u64 + slots.slots[0].first_slot(); - assert!(slots.slots[1].first_slot() >= next); - assert_ne!(slots.slots[1].num_slots(), 0); - assert_ne!(slots.slots[2].num_slots(), 0); - assert_eq!(slots.to_slots(0), range); - assert_eq!(slots.to_slots(4999 * 3), vec![4999 * 3]); + assert_eq!(slots.slots_vec.slots.len(), 3); + assert_eq!(slots.slots_vec.slots[0].first_slot(), 0); + assert_ne!(slots.slots_vec.slots[0].num_slots(), 0); + let next = + slots.slots_vec.slots[0].num_slots() as u64 + slots.slots_vec.slots[0].first_slot(); + assert!(slots.slots_vec.slots[1].first_slot() >= next); + assert_ne!(slots.slots_vec.slots[1].num_slots(), 0); + assert_ne!(slots.slots_vec.slots[2].num_slots(), 0); + assert_eq!(slots.slots_vec.to_slots(0), range); + assert_eq!(slots.slots_vec.to_slots(4999 * 3), vec![4999 * 3]); } #[test] @@ -532,11 +575,11 @@ mod tests { let last = range[sz - 1]; assert_eq!( last, - slots.slots.last().unwrap().first_slot() - + slots.slots.last().unwrap().num_slots() as u64 + slots.slots_vec.slots.last().unwrap().first_slot() + + slots.slots_vec.slots.last().unwrap().num_slots() as u64 - 1 ); - for s in &slots.slots { + for s in &slots.slots_vec.slots { assert!(s.to_slots(0).is_ok()); } let slots = slots.to_slots(0); From 5b7a724bb3d451dc7630aea5c397c9f41b212f59 Mon Sep 17 00:00:00 2001 From: Wen Date: Sun, 1 Oct 2023 22:50:33 -0700 Subject: [PATCH 11/13] Add total_messages to show how many messages are there. --- gossip/src/crds_value.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/gossip/src/crds_value.rs b/gossip/src/crds_value.rs index 3be49da4e1f620..5aa5fe9f17a451 100644 --- a/gossip/src/crds_value.rs +++ b/gossip/src/crds_value.rs @@ -505,6 +505,7 @@ pub struct RestartLastVotedForkSlots { pub slots: CompressedSlotsVec, pub last_voted_slot: Slot, pub last_voted_hash: Hash, + pub total_messages: u8, } impl Sanitize for RestartLastVotedForkSlots { @@ -522,6 +523,7 @@ impl RestartLastVotedForkSlots { slots: CompressedSlotsVec::new(), last_voted_slot, last_voted_hash, + total_messages: 1, } } @@ -534,6 +536,7 @@ impl RestartLastVotedForkSlots { slots: CompressedSlotsVec::new_rand(rng), last_voted_slot: rng.gen_range(0..512), last_voted_hash: Hash::new_unique(), + total_messages: 1, } } From ae8d01d0f551ffb976f58f0f65633187df86551d Mon Sep 17 00:00:00 2001 From: Wen Date: Mon, 2 Oct 2023 13:30:41 -0700 Subject: [PATCH 12/13] Reduce RestartLastVotedForkSlots to one packet (16k slots). --- gossip/src/cluster_info.rs | 4 +-- gossip/src/crds.rs | 2 +- gossip/src/crds_value.rs | 59 ++++++++++++++++---------------------- gossip/src/epoch_slots.rs | 4 +++ 4 files changed, 31 insertions(+), 38 deletions(-) diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 1a171ba3649206..6a70b00d680d39 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -272,7 +272,7 @@ pub fn make_accounts_hashes_message( pub(crate) type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>; // TODO These messages should go through the gpu pipeline for spam filtering -#[frozen_abi(digest = "DxPH4Cj8QHbGb8daVf8uX3HckPkB9XqKE2AHtQzYYZLr")] +#[frozen_abi(digest = "BNsMvqo5qaqbiKzj3XvAqxKbWhFBxNVXmhEGHFmhwhKC")] #[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)] #[allow(clippy::large_enum_variant)] pub(crate) enum Protocol { @@ -399,7 +399,7 @@ fn retain_staked(values: &mut Vec, stakes: &HashMap) { CrdsData::LowestSlot(_, _) | CrdsData::LegacyVersion(_) | CrdsData::DuplicateShred(_, _) - | CrdsData::RestartLastVotedForkSlots(_, _) => { + | CrdsData::RestartLastVotedForkSlots(_) => { let stake = stakes.get(&value.pubkey()).copied(); stake.unwrap_or_default() >= MIN_STAKE_FOR_GOSSIP } diff --git a/gossip/src/crds.rs b/gossip/src/crds.rs index 16aaba17b1a4c5..dd98461118900b 100644 --- a/gossip/src/crds.rs +++ b/gossip/src/crds.rs @@ -721,7 +721,7 @@ impl CrdsDataStats { CrdsData::DuplicateShred(_, _) => 9, CrdsData::SnapshotHashes(_) => 10, CrdsData::ContactInfo(_) => 11, - CrdsData::RestartLastVotedForkSlots(_, _) => 12, + CrdsData::RestartLastVotedForkSlots(_) => 12, // Update CrdsCountsArray if new items are added here. } } diff --git a/gossip/src/crds_value.rs b/gossip/src/crds_value.rs index 88f6d4c79b92bc..eb45becf3643af 100644 --- a/gossip/src/crds_value.rs +++ b/gossip/src/crds_value.rs @@ -38,8 +38,6 @@ pub const MAX_VOTES: VoteIndex = 32; pub type EpochSlotsIndex = u8; pub const MAX_EPOCH_SLOTS: EpochSlotsIndex = 255; -// We now keep 81000 slots, 81000/MAX_SLOTS_PER_ENTRY = 5. -pub(crate) const MAX_RESTART_LAST_VOTED_FORK_SLOTS: EpochSlotsIndex = 5; /// CrdsValue that is replicated across the cluster #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, AbiExample)] @@ -96,7 +94,7 @@ pub enum CrdsData { DuplicateShred(DuplicateShredIndex, DuplicateShred), SnapshotHashes(SnapshotHashes), ContactInfo(ContactInfo), - RestartLastVotedForkSlots(EpochSlotsIndex, RestartLastVotedForkSlots), + RestartLastVotedForkSlots(RestartLastVotedForkSlots), } impl Sanitize for CrdsData { @@ -135,12 +133,7 @@ impl Sanitize for CrdsData { } CrdsData::SnapshotHashes(val) => val.sanitize(), CrdsData::ContactInfo(node) => node.sanitize(), - CrdsData::RestartLastVotedForkSlots(ix, slots) => { - if *ix >= MAX_RESTART_LAST_VOTED_FORK_SLOTS { - return Err(SanitizeError::ValueOutOfBounds); - } - slots.sanitize() - } + CrdsData::RestartLastVotedForkSlots(slots) => slots.sanitize(), } } } @@ -166,10 +159,9 @@ impl CrdsData { 3 => CrdsData::AccountsHashes(AccountsHashes::new_rand(rng, pubkey)), 4 => CrdsData::Version(Version::new_rand(rng, pubkey)), 5 => CrdsData::Vote(rng.gen_range(0..MAX_VOTES), Vote::new_rand(rng, pubkey)), - 6 => CrdsData::RestartLastVotedForkSlots( - rng.gen_range(0..MAX_RESTART_LAST_VOTED_FORK_SLOTS), - RestartLastVotedForkSlots::new_rand(rng, pubkey), - ), + 6 => CrdsData::RestartLastVotedForkSlots(RestartLastVotedForkSlots::new_rand( + rng, pubkey, + )), _ => CrdsData::EpochSlots( rng.gen_range(0..MAX_EPOCH_SLOTS), EpochSlots::new_rand(rng, pubkey), @@ -505,11 +497,13 @@ pub struct RestartLastVotedForkSlots { pub slots: CompressedSlotsVec, pub last_voted_slot: Slot, pub last_voted_hash: Hash, - pub total_messages: u8, } impl Sanitize for RestartLastVotedForkSlots { fn sanitize(&self) -> std::result::Result<(), SanitizeError> { + if self.slots.is_empty() { + return Err(SanitizeError::InvalidValue); + } self.slots.sanitize()?; self.last_voted_hash.sanitize() } @@ -523,7 +517,6 @@ impl RestartLastVotedForkSlots { slots: CompressedSlotsVec::new(), last_voted_slot, last_voted_hash, - total_messages: 1, } } @@ -536,7 +529,6 @@ impl RestartLastVotedForkSlots { slots: CompressedSlotsVec::new_rand(rng), last_voted_slot: rng.gen_range(0..512), last_voted_hash: Hash::new_unique(), - total_messages: 1, } } @@ -561,7 +553,7 @@ pub enum CrdsValueLabel { DuplicateShred(DuplicateShredIndex, Pubkey), SnapshotHashes(Pubkey), ContactInfo(Pubkey), - RestartLastVotedForkSlots(EpochSlotsIndex, Pubkey), + RestartLastVotedForkSlots(Pubkey), } impl fmt::Display for CrdsValueLabel { @@ -585,8 +577,8 @@ impl fmt::Display for CrdsValueLabel { write!(f, "SnapshotHashes({})", self.pubkey()) } CrdsValueLabel::ContactInfo(_) => write!(f, "ContactInfo({})", self.pubkey()), - CrdsValueLabel::RestartLastVotedForkSlots(ix, _) => { - write!(f, "RestartLastVotedForkSlots({}, {})", ix, self.pubkey()) + CrdsValueLabel::RestartLastVotedForkSlots(_) => { + write!(f, "RestartLastVotedForkSlots({})", self.pubkey()) } } } @@ -607,7 +599,7 @@ impl CrdsValueLabel { CrdsValueLabel::DuplicateShred(_, p) => *p, CrdsValueLabel::SnapshotHashes(p) => *p, CrdsValueLabel::ContactInfo(pubkey) => *pubkey, - CrdsValueLabel::RestartLastVotedForkSlots(_, p) => *p, + CrdsValueLabel::RestartLastVotedForkSlots(p) => *p, } } } @@ -658,7 +650,7 @@ impl CrdsValue { CrdsData::DuplicateShred(_, shred) => shred.wallclock, CrdsData::SnapshotHashes(hash) => hash.wallclock, CrdsData::ContactInfo(node) => node.wallclock(), - CrdsData::RestartLastVotedForkSlots(_, slots) => slots.wallclock, + CrdsData::RestartLastVotedForkSlots(slots) => slots.wallclock, } } pub fn pubkey(&self) -> Pubkey { @@ -675,7 +667,7 @@ impl CrdsValue { CrdsData::DuplicateShred(_, shred) => shred.from, CrdsData::SnapshotHashes(hash) => hash.from, CrdsData::ContactInfo(node) => *node.pubkey(), - CrdsData::RestartLastVotedForkSlots(_, slots) => slots.from, + CrdsData::RestartLastVotedForkSlots(slots) => slots.from, } } pub fn label(&self) -> CrdsValueLabel { @@ -694,8 +686,8 @@ impl CrdsValue { CrdsData::DuplicateShred(ix, shred) => CrdsValueLabel::DuplicateShred(*ix, shred.from), CrdsData::SnapshotHashes(_) => CrdsValueLabel::SnapshotHashes(self.pubkey()), CrdsData::ContactInfo(node) => CrdsValueLabel::ContactInfo(*node.pubkey()), - CrdsData::RestartLastVotedForkSlots(ix, _) => { - CrdsValueLabel::RestartLastVotedForkSlots(*ix, self.pubkey()) + CrdsData::RestartLastVotedForkSlots(_) => { + CrdsValueLabel::RestartLastVotedForkSlots(self.pubkey()) } } } @@ -1153,16 +1145,13 @@ mod test { RestartLastVotedForkSlots::new(keypair.pubkey(), timestamp(), slot, Hash::default()); let original_slots_vec = [slot_parent, slot]; slots.fill(&original_slots_vec); - let ix = 1; - let value = CrdsValue::new_signed( - CrdsData::RestartLastVotedForkSlots(ix, slots.clone()), - &keypair, - ); + let value = + CrdsValue::new_signed(CrdsData::RestartLastVotedForkSlots(slots.clone()), &keypair); assert_eq!(value.sanitize(), Ok(())); let label = value.label(); assert_eq!( label, - CrdsValueLabel::RestartLastVotedForkSlots(ix, keypair.pubkey()) + CrdsValueLabel::RestartLastVotedForkSlots(keypair.pubkey()) ); assert_eq!(label.pubkey(), keypair.pubkey()); assert_eq!(value.wallclock(), slots.wallclock); @@ -1171,10 +1160,10 @@ mod test { assert_eq!(retrived_slots[0], slot_parent); assert_eq!(retrived_slots[1], slot); - let bad_value = CrdsValue::new_signed( - CrdsData::RestartLastVotedForkSlots(MAX_RESTART_LAST_VOTED_FORK_SLOTS, slots), - &keypair, - ); - assert_eq!(bad_value.sanitize(), Err(SanitizeError::ValueOutOfBounds)) + let empty_slots = + RestartLastVotedForkSlots::new(keypair.pubkey(), timestamp(), slot, Hash::default()); + let bad_value = + CrdsValue::new_signed(CrdsData::RestartLastVotedForkSlots(empty_slots), &keypair); + assert_eq!(bad_value.sanitize(), Err(SanitizeError::InvalidValue)) } } diff --git a/gossip/src/epoch_slots.rs b/gossip/src/epoch_slots.rs index 02d9d4200ad48f..a929aedcbc4424 100644 --- a/gossip/src/epoch_slots.rs +++ b/gossip/src/epoch_slots.rs @@ -317,6 +317,10 @@ impl CompressedSlotsVec { result.add(&slots); result } + + pub fn is_empty(&self) -> bool { + self.slots.iter().all(|s| s.num_slots() == 0) + } } #[derive(Serialize, Deserialize, Clone, Default, PartialEq, Eq, AbiExample, Debug)] From 1ee2699ceab380b53913c4362043d69e5c90f54c Mon Sep 17 00:00:00 2001 From: Wen Date: Thu, 5 Oct 2023 20:13:04 -0700 Subject: [PATCH 13/13] Replace last_vote_slot with shred_version, revert CompressedSlotsVec. --- gossip/src/cluster_info.rs | 4 +- gossip/src/crds_value.rs | 99 ++++++++++++++++++++------ gossip/src/epoch_slots.rs | 139 ++++++++++++------------------------- 3 files changed, 126 insertions(+), 116 deletions(-) diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 6a70b00d680d39..1593904bbddb2e 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -272,7 +272,7 @@ pub fn make_accounts_hashes_message( pub(crate) type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>; // TODO These messages should go through the gpu pipeline for spam filtering -#[frozen_abi(digest = "BNsMvqo5qaqbiKzj3XvAqxKbWhFBxNVXmhEGHFmhwhKC")] +#[frozen_abi(digest = "CVvKB495YW6JN4w1rWwajyZmG5wvNhmD97V99rSv9fGw")] #[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)] #[allow(clippy::large_enum_variant)] pub(crate) enum Protocol { @@ -4079,7 +4079,7 @@ mod tests { ClusterInfo::split_gossip_messages(PUSH_MESSAGE_MAX_PAYLOAD_SIZE, values.clone()) .collect(); let self_pubkey = solana_sdk::pubkey::new_rand(); - assert!(splits.len() * 3 < NUM_CRDS_VALUES); + assert!(splits.len() * 2 < NUM_CRDS_VALUES); // Assert that all messages are included in the splits. assert_eq!(NUM_CRDS_VALUES, splits.iter().map(Vec::len).sum::()); splits diff --git a/gossip/src/crds_value.rs b/gossip/src/crds_value.rs index eb45becf3643af..1769afbbb4dcdf 100644 --- a/gossip/src/crds_value.rs +++ b/gossip/src/crds_value.rs @@ -1,10 +1,10 @@ use { crate::{ - cluster_info::MAX_LEGACY_SNAPSHOT_HASHES, + cluster_info::{MAX_CRDS_OBJECT_SIZE, MAX_LEGACY_SNAPSHOT_HASHES}, contact_info::ContactInfo, deprecated, duplicate_shred::{DuplicateShred, DuplicateShredIndex, MAX_DUPLICATE_SHREDS}, - epoch_slots::{CompressedSlotsVec, EpochSlots}, + epoch_slots::{CompressedSlots, EpochSlots, MAX_SLOTS_PER_ENTRY}, legacy_contact_info::LegacyContactInfo, }, bincode::{serialize, serialized_size}, @@ -494,9 +494,9 @@ impl Sanitize for NodeInstance { pub struct RestartLastVotedForkSlots { pub from: Pubkey, pub wallclock: u64, - pub slots: CompressedSlotsVec, - pub last_voted_slot: Slot, + pub slots: Vec, pub last_voted_hash: Hash, + pub shred_version: u16, } impl Sanitize for RestartLastVotedForkSlots { @@ -510,30 +510,64 @@ impl Sanitize for RestartLastVotedForkSlots { } impl RestartLastVotedForkSlots { - pub fn new(from: Pubkey, now: u64, last_voted_slot: Slot, last_voted_hash: Hash) -> Self { + pub fn new(from: Pubkey, now: u64, last_voted_hash: Hash, shred_version: u16) -> Self { Self { from, wallclock: now, - slots: CompressedSlotsVec::new(), - last_voted_slot, + slots: Vec::new(), last_voted_hash, + shred_version, } } /// New random Version for tests and benchmarks. pub fn new_rand(rng: &mut R, pubkey: Option) -> Self { let pubkey = pubkey.unwrap_or_else(solana_sdk::pubkey::new_rand); - Self { - from: pubkey, - wallclock: new_rand_timestamp(rng), - slots: CompressedSlotsVec::new_rand(rng), - last_voted_slot: rng.gen_range(0..512), - last_voted_hash: Hash::new_unique(), + let mut result = + RestartLastVotedForkSlots::new(pubkey, new_rand_timestamp(rng), Hash::new_unique(), 1); + let num_slots = rng.gen_range(2..20); + let mut slots = std::iter::repeat_with(|| 47825632 + rng.gen_range(0..512)) + .take(num_slots) + .collect::>(); + slots.sort(); + result.fill(&slots); + result + } + + pub fn fill(&mut self, slots: &[Slot]) -> usize { + let slots = &slots[slots.len().saturating_sub(MAX_SLOTS_PER_ENTRY)..]; + let mut num = 0; + let space = self.max_compressed_slot_size(); + if space == 0 { + return 0; + } + while num < slots.len() { + let mut cslot = CompressedSlots::new(space as usize); + num += cslot.add(&slots[num..]); + self.slots.push(cslot); + } + num + } + + pub fn deflate(&mut self) { + for s in self.slots.iter_mut() { + let _ = s.deflate(); } } - pub fn fill(&mut self, update: &[Slot]) -> usize { - self.slots.fill(update) + pub fn max_compressed_slot_size(&self) -> isize { + let len_header = serialized_size(self).unwrap(); + let len_slot = serialized_size(&CompressedSlots::default()).unwrap(); + MAX_CRDS_OBJECT_SIZE as isize - (len_header + len_slot) as isize + } + + pub fn to_slots(&self, min_slot: Slot) -> Vec { + self.slots + .iter() + .filter(|s| min_slot < s.first_slot() + s.num_slots() as u64) + .filter_map(|s| s.to_slots(min_slot).ok()) + .flatten() + .collect() } } @@ -1141,8 +1175,13 @@ mod test { let keypair = Keypair::new(); let slot = 53; let slot_parent = slot - 5; - let mut slots = - RestartLastVotedForkSlots::new(keypair.pubkey(), timestamp(), slot, Hash::default()); + let shred_version = 21; + let mut slots = RestartLastVotedForkSlots::new( + keypair.pubkey(), + timestamp(), + Hash::default(), + shred_version, + ); let original_slots_vec = [slot_parent, slot]; slots.fill(&original_slots_vec); let value = @@ -1155,15 +1194,33 @@ mod test { ); assert_eq!(label.pubkey(), keypair.pubkey()); assert_eq!(value.wallclock(), slots.wallclock); - let retrived_slots = slots.slots.to_slots(0); + let retrived_slots = slots.to_slots(0); assert_eq!(retrived_slots.len(), 2); assert_eq!(retrived_slots[0], slot_parent); assert_eq!(retrived_slots[1], slot); - let empty_slots = - RestartLastVotedForkSlots::new(keypair.pubkey(), timestamp(), slot, Hash::default()); + let empty_slots = RestartLastVotedForkSlots::new( + keypair.pubkey(), + timestamp(), + Hash::default(), + shred_version, + ); let bad_value = CrdsValue::new_signed(CrdsData::RestartLastVotedForkSlots(empty_slots), &keypair); - assert_eq!(bad_value.sanitize(), Err(SanitizeError::InvalidValue)) + assert_eq!(bad_value.sanitize(), Err(SanitizeError::InvalidValue)); + + let last_slot: Slot = (MAX_SLOTS_PER_ENTRY + 10).try_into().unwrap(); + let mut large_slots = RestartLastVotedForkSlots::new( + keypair.pubkey(), + timestamp(), + Hash::default(), + shred_version, + ); + let large_slots_vec: Vec = (0..last_slot + 1).collect(); + large_slots.fill(&large_slots_vec); + let retrived_slots = large_slots.to_slots(0); + assert_eq!(retrived_slots.len(), MAX_SLOTS_PER_ENTRY); + assert_eq!(retrived_slots.first(), Some(&11)); + assert_eq!(retrived_slots.last(), Some(&last_slot)); } } diff --git a/gossip/src/epoch_slots.rs b/gossip/src/epoch_slots.rs index a929aedcbc4424..3e41fca9a73150 100644 --- a/gossip/src/epoch_slots.rs +++ b/gossip/src/epoch_slots.rs @@ -11,10 +11,9 @@ use { pubkey::Pubkey, sanitize::{Sanitize, SanitizeError}, }, - std::fmt, }; -const MAX_SLOTS_PER_ENTRY: usize = 2048 * 8; +pub(crate) const MAX_SLOTS_PER_ENTRY: usize = 2048 * 8; #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, AbiExample)] pub struct Uncompressed { pub first_slot: Slot, @@ -179,7 +178,7 @@ impl Default for CompressedSlots { } impl CompressedSlots { - fn new(max_size: usize) -> Self { + pub(crate) fn new(max_size: usize) -> Self { CompressedSlots::Uncompressed(Uncompressed::new(max_size)) } @@ -226,17 +225,24 @@ impl CompressedSlots { } #[derive(Serialize, Deserialize, Clone, Default, PartialEq, Eq, AbiExample)] -pub struct CompressedSlotsVec { - slots: Vec, +pub struct EpochSlots { + pub from: Pubkey, + pub slots: Vec, + pub wallclock: u64, } -impl Sanitize for CompressedSlotsVec { +impl Sanitize for EpochSlots { fn sanitize(&self) -> std::result::Result<(), SanitizeError> { + if self.wallclock >= MAX_WALLCLOCK { + return Err(SanitizeError::ValueOutOfBounds); + } + self.from.sanitize()?; self.slots.sanitize() } } -impl fmt::Debug for CompressedSlotsVec { +use std::fmt; +impl fmt::Debug for EpochSlots { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let num_slots: usize = self.slots.iter().map(|s| s.num_slots()).sum(); let lowest_slot = self @@ -244,17 +250,25 @@ impl fmt::Debug for CompressedSlotsVec { .iter() .map(|s| s.first_slot()) .fold(0, std::cmp::min); - write!(f, "num_slots: {} lowest_slot: {}", num_slots, lowest_slot,) + write!( + f, + "EpochSlots {{ from: {} num_slots: {} lowest_slot: {} wallclock: {} }}", + self.from, num_slots, lowest_slot, self.wallclock + ) } } -impl CompressedSlotsVec { - pub fn new() -> Self { - Self { slots: vec![] } +impl EpochSlots { + pub fn new(from: Pubkey, now: u64) -> Self { + Self { + from, + wallclock: now, + slots: vec![], + } } - - pub fn fill(&mut self, slots: &[Slot]) -> usize { + pub fn fill(&mut self, slots: &[Slot], now: u64) -> usize { let mut num = 0; + self.wallclock = std::cmp::max(now, self.wallclock + 1); while num < slots.len() { num += self.add(&slots[num..]); if num < slots.len() { @@ -307,77 +321,17 @@ impl CompressedSlotsVec { .collect() } - /// New random CompressedSlotsVec for tests and simulations. - pub(crate) fn new_rand(rng: &mut R) -> Self { - let mut result = Self::new(); + /// New random EpochSlots for tests and simulations. + pub(crate) fn new_rand(rng: &mut R, pubkey: Option) -> Self { + let now = crds_value::new_rand_timestamp(rng); + let pubkey = pubkey.unwrap_or_else(solana_sdk::pubkey::new_rand); + let mut epoch_slots = Self::new(pubkey, now); let num_slots = rng.gen_range(0..20); let slots: Vec<_> = std::iter::repeat_with(|| 47825632 + rng.gen_range(0..512)) .take(num_slots) .collect(); - result.add(&slots); - result - } - - pub fn is_empty(&self) -> bool { - self.slots.iter().all(|s| s.num_slots() == 0) - } -} - -#[derive(Serialize, Deserialize, Clone, Default, PartialEq, Eq, AbiExample, Debug)] -pub struct EpochSlots { - pub from: Pubkey, - slots_vec: CompressedSlotsVec, - pub wallclock: u64, -} - -impl Sanitize for EpochSlots { - fn sanitize(&self) -> std::result::Result<(), SanitizeError> { - if self.wallclock >= MAX_WALLCLOCK { - return Err(SanitizeError::ValueOutOfBounds); - } - self.from.sanitize()?; - self.slots_vec.slots.sanitize() - } -} - -impl EpochSlots { - pub fn new(from: Pubkey, now: u64) -> Self { - Self { - from, - wallclock: now, - slots_vec: CompressedSlotsVec::new(), - } - } - pub fn fill(&mut self, slots: &[Slot], now: u64) -> usize { - self.wallclock = std::cmp::max(now, self.wallclock + 1); - self.slots_vec.fill(slots) - } - - pub fn add(&mut self, slots: &[Slot]) -> usize { - self.slots_vec.add(slots) - } - pub fn deflate(&mut self) -> Result<()> { - self.slots_vec.deflate() - } - pub fn max_compressed_slot_size(&self) -> isize { - self.slots_vec.max_compressed_slot_size() - } - - pub fn first_slot(&self) -> Option { - self.slots_vec.first_slot() - } - - pub fn to_slots(&self, min_slot: Slot) -> Vec { - self.slots_vec.to_slots(min_slot) - } - - /// New random EpochSlots for tests and simulations. - pub(crate) fn new_rand(rng: &mut R, pubkey: Option) -> Self { - Self { - from: pubkey.unwrap_or_else(solana_sdk::pubkey::new_rand), - wallclock: crds_value::new_rand_timestamp(rng), - slots_vec: CompressedSlotsVec::new_rand(rng), - } + epoch_slots.add(&slots); + epoch_slots } } @@ -512,16 +466,15 @@ mod tests { let mut slots = EpochSlots::default(); assert_eq!(slots.fill(&range, 2), 5000); assert_eq!(slots.wallclock, 2); - assert_eq!(slots.slots_vec.slots.len(), 3); - assert_eq!(slots.slots_vec.slots[0].first_slot(), 0); - assert_ne!(slots.slots_vec.slots[0].num_slots(), 0); - let next = - slots.slots_vec.slots[0].num_slots() as u64 + slots.slots_vec.slots[0].first_slot(); - assert!(slots.slots_vec.slots[1].first_slot() >= next); - assert_ne!(slots.slots_vec.slots[1].num_slots(), 0); - assert_ne!(slots.slots_vec.slots[2].num_slots(), 0); - assert_eq!(slots.slots_vec.to_slots(0), range); - assert_eq!(slots.slots_vec.to_slots(4999 * 3), vec![4999 * 3]); + assert_eq!(slots.slots.len(), 3); + assert_eq!(slots.slots[0].first_slot(), 0); + assert_ne!(slots.slots[0].num_slots(), 0); + let next = slots.slots[0].num_slots() as u64 + slots.slots[0].first_slot(); + assert!(slots.slots[1].first_slot() >= next); + assert_ne!(slots.slots[1].num_slots(), 0); + assert_ne!(slots.slots[2].num_slots(), 0); + assert_eq!(slots.to_slots(0), range); + assert_eq!(slots.to_slots(4999 * 3), vec![4999 * 3]); } #[test] @@ -579,11 +532,11 @@ mod tests { let last = range[sz - 1]; assert_eq!( last, - slots.slots_vec.slots.last().unwrap().first_slot() - + slots.slots_vec.slots.last().unwrap().num_slots() as u64 + slots.slots.last().unwrap().first_slot() + + slots.slots.last().unwrap().num_slots() as u64 - 1 ); - for s in &slots.slots_vec.slots { + for s in &slots.slots { assert!(s.to_slots(0).is_ok()); } let slots = slots.to_slots(0);