From e1ae8f96aa46e66643e3730d6a51bce5832670be Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Thu, 9 Jan 2025 13:29:20 -0600 Subject: [PATCH] reduces std::mem::size_of::() Because ContactInfo.cache is huge, currently std::mem::size_of::() stands at 560 bytes. However only a small % of entries in gossip CRDS table are ContactInfo so this wastes a lot of memory. In order to reduce the size of CrdsData enum, the commit adds type parameter for ContactInfo.cache. For the ContactInfo stored in CRDS table we use ContactInfo.cache: Box<[SocketAddr; N]> whereas outside gossip CRDS table we avoid Box<...> overhead by just using the plain array: ContactInfo.cache: [SocketAddr; N] Doing so, * std::mem::size_of::() reduces from 560 bytes to 176 bytes. * Accessing other fields of ContactInfo does not incur Box<...> overhead. * Outside gossip CRDS table, we avoid Box<...> overhead entirely. --- core/src/repair/serve_repair.rs | 3 +- gossip/src/cluster_info.rs | 29 ++++++----- gossip/src/contact_info.rs | 86 +++++++++++++++++++++++++-------- gossip/src/crds.rs | 8 +-- gossip/src/crds_data.rs | 24 +++++++-- gossip/src/crds_entry.rs | 5 +- gossip/src/crds_gossip.rs | 2 +- gossip/src/crds_gossip_pull.rs | 12 ++--- gossip/src/crds_value.rs | 4 +- gossip/src/gossip_service.rs | 2 +- gossip/tests/crds_gossip.rs | 5 +- gossip/tests/gossip.rs | 2 +- turbine/src/cluster_nodes.rs | 39 ++++++++------- 13 files changed, 149 insertions(+), 72 deletions(-) diff --git a/core/src/repair/serve_repair.rs b/core/src/repair/serve_repair.rs index 4e8c7a59d3ec42..5048d63d082bb2 100644 --- a/core/src/repair/serve_repair.rs +++ b/core/src/repair/serve_repair.rs @@ -1274,7 +1274,8 @@ impl ServeRepair { .iter() .filter_map(|key| { if *key != self.my_id() { - self.cluster_info.lookup_contact_info(key, |ci| ci.clone()) + self.cluster_info + .lookup_contact_info(key, |node| ContactInfo::from(node)) } else { None } diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index b2c38340891dc7..7709dd92120a1e 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -282,8 +282,11 @@ impl ClusterInfo { } // TODO kill insert_info, only used by tests - pub fn insert_info(&self, node: ContactInfo) { - let entry = CrdsValue::new(CrdsData::ContactInfo(node), &self.keypair()); + pub fn insert_info(&self, data: T) + where + CrdsData: From, + { + let entry = CrdsValue::new(CrdsData::from(data), &self.keypair()); if let Err(err) = { let mut gossip_crds = self.gossip.crds.write().unwrap(); gossip_crds.insert(entry, timestamp(), GossipRoute::LocalMessage) @@ -474,7 +477,7 @@ impl ClusterInfo { let mut nodes = gossip_crds.get_nodes_contact_info(); nodes .find(|node| node.gossip() == Some(*gossip_addr)) - .cloned() + .map(ContactInfo::from) } pub fn my_contact_info(&self) -> ContactInfo { @@ -1048,7 +1051,7 @@ impl ClusterInfo { pub fn get_node_version(&self, pubkey: &Pubkey) -> Option { let gossip_crds = self.gossip.crds.read().unwrap(); gossip_crds - .get::<&ContactInfo>(*pubkey) + .get::<&ContactInfo<_>>(*pubkey) .map(ContactInfo::version) .cloned() } @@ -1068,7 +1071,7 @@ impl ClusterInfo { .filter(|node| { node.pubkey() != &self_pubkey && self.check_socket_addr_space(&node.rpc()) }) - .cloned() + .map(ContactInfo::from) .collect() } @@ -1077,7 +1080,7 @@ impl ClusterInfo { let gossip_crds = self.gossip.crds.read().unwrap(); gossip_crds .get_nodes() - .map(|x| (x.value.contact_info().unwrap().clone(), x.local_timestamp)) + .map(|x| (x.value.contact_info().unwrap().into(), x.local_timestamp)) .collect() } @@ -1088,7 +1091,7 @@ impl ClusterInfo { .get_nodes_contact_info() // shred_version not considered for gossip peers (ie, spy nodes do not set shred_version) .filter(|node| node.pubkey() != &me && self.check_socket_addr_space(&node.gossip())) - .cloned() + .map(ContactInfo::from) .collect() } @@ -1101,7 +1104,7 @@ impl ClusterInfo { node.pubkey() != &self_pubkey && self.check_socket_addr_space(&node.tvu(contact_info::Protocol::UDP)) }) - .cloned() + .map(ContactInfo::from) .collect() } @@ -1138,7 +1141,7 @@ impl ClusterInfo { Some(lowest_slot) => lowest_slot.lowest <= slot, } }) - .cloned() + .map(ContactInfo::from) .collect() } @@ -1165,7 +1168,7 @@ impl ClusterInfo { node.pubkey() != &self_pubkey && self.check_socket_addr_space(&node.tpu(contact_info::Protocol::UDP)) }) - .cloned() + .map(ContactInfo::from) .collect() } @@ -1175,7 +1178,7 @@ impl ClusterInfo { let node = { let mut node = self.my_contact_info.write().unwrap(); node.set_wallclock(timestamp()); - node.clone() + ContactInfo::from(node.deref()) }; let entries: Vec<_> = [ CrdsData::ContactInfo(node), @@ -1328,7 +1331,7 @@ impl ClusterInfo { push_messages .into_iter() .filter_map(|(pubkey, messages)| { - let peer: &ContactInfo = gossip_crds.get(pubkey)?; + let peer: &ContactInfo<_> = gossip_crds.get(pubkey)?; Some((peer.gossip()?, messages)) }) .collect() @@ -2069,7 +2072,7 @@ impl ClusterInfo { prunes .into_par_iter() .filter_map(|(pubkey, prunes)| { - let addr = gossip_crds.get::<&ContactInfo>(pubkey)?.gossip()?; + let addr = gossip_crds.get::<&ContactInfo<_>>(pubkey)?.gossip()?; Some((pubkey, addr, prunes)) }) .collect() diff --git a/gossip/src/contact_info.rs b/gossip/src/contact_info.rs index 3e0d66f9a2f1ba..97d1c3076088d1 100644 --- a/gossip/src/contact_info.rs +++ b/gossip/src/contact_info.rs @@ -13,6 +13,7 @@ use { solana_streamer::socket::SocketAddrSpace, static_assertions::const_assert_eq, std::{ + borrow::Borrow, cmp::Ordering, collections::HashSet, net::{IpAddr, Ipv4Addr, SocketAddr}, @@ -42,10 +43,12 @@ const SOCKET_TAG_TVU_QUIC: u8 = 11; const_assert_eq!(SOCKET_CACHE_SIZE, 13); const SOCKET_CACHE_SIZE: usize = SOCKET_TAG_TPU_VOTE_QUIC as usize + 1usize; +pub(crate) type SocketAddrCache = [SocketAddr; SOCKET_CACHE_SIZE]; + // An alias for a function that reads data from a ContactInfo entry stored in // the gossip CRDS table. -pub trait ContactInfoQuery: Fn(&ContactInfo) -> R {} -impl R> ContactInfoQuery for F {} +pub trait ContactInfoQuery: Fn(&ContactInfo>) -> R {} +impl>) -> R> ContactInfoQuery for F {} #[derive(Copy, Clone, Debug, Eq, Error, PartialEq)] pub enum Error { @@ -74,7 +77,7 @@ pub enum Error { } #[derive(Clone, Debug, Eq, PartialEq, Serialize)] -pub struct ContactInfo { +pub struct ContactInfo { pubkey: Pubkey, #[serde(with = "serde_varint")] wallclock: u64, @@ -93,7 +96,7 @@ pub struct ContactInfo { extensions: Vec, // Only sanitized socket-addrs can be cached! #[serde(skip_serializing)] - cache: [SocketAddr; SOCKET_CACHE_SIZE], + cache: T, } #[cfg_attr(feature = "frozen-abi", derive(AbiExample))] @@ -132,7 +135,7 @@ macro_rules! get_socket { ($name:ident, $key:ident) => { #[inline] pub fn $name(&self) -> Option { - let socket = &self.cache[usize::from($key)]; + let socket = &self.cache.borrow()[usize::from($key)]; (socket != &SOCKET_ADDR_UNSPECIFIED) .then_some(socket) .copied() @@ -145,7 +148,7 @@ macro_rules! get_socket { Protocol::QUIC => $quic, Protocol::UDP => $udp, }; - let socket = &self.cache[usize::from(key)]; + let socket = &self.cache.borrow()[usize::from(key)]; (socket != &SOCKET_ADDR_UNSPECIFIED) .then_some(socket) .copied() @@ -189,7 +192,7 @@ macro_rules! remove_socket { }; } -impl ContactInfo { +impl ContactInfo { pub fn new(pubkey: Pubkey, wallclock: u64, shred_version: u16) -> Self { Self { pubkey, @@ -203,7 +206,9 @@ impl ContactInfo { cache: EMPTY_SOCKET_ADDR_CACHE, } } +} +impl ContactInfo { #[inline] pub fn pubkey(&self) -> &Pubkey { &self.pubkey @@ -238,7 +243,9 @@ impl ContactInfo { pub fn set_shred_version(&mut self, shred_version: u16) { self.shred_version = shred_version } +} +impl> ContactInfo { get_socket!(gossip, SOCKET_TAG_GOSSIP); get_socket!(rpc, SOCKET_TAG_RPC); get_socket!(rpc_pubsub, SOCKET_TAG_RPC_PUBSUB); @@ -255,7 +262,9 @@ impl ContactInfo { ); get_socket!(tpu_vote, SOCKET_TAG_TPU_VOTE, SOCKET_TAG_TPU_VOTE_QUIC); get_socket!(tvu, SOCKET_TAG_TVU, SOCKET_TAG_TVU_QUIC); +} +impl ContactInfo { set_socket!(set_gossip, SOCKET_TAG_GOSSIP); set_socket!(set_rpc, SOCKET_TAG_RPC); set_socket!(set_rpc_pubsub, SOCKET_TAG_RPC_PUBSUB); @@ -389,7 +398,7 @@ impl ContactInfo { let delay = 10 * 60 * 1000; // 10 minutes let now = solana_sdk::timing::timestamp() - delay + rng.gen_range(0..2 * delay); let pubkey = pubkey.unwrap_or_else(solana_pubkey::new_rand); - let mut node = ContactInfo::new_localhost(&pubkey, now); + let mut node = Self::new_localhost(&pubkey, now); let _ = node.set_gossip((Ipv4Addr::LOCALHOST, rng.gen_range(1024..u16::MAX))); node } @@ -448,12 +457,14 @@ impl ContactInfo { node.set_serve_repair_quic((addr, port + 4)).unwrap(); node } +} +impl ContactInfo { // Returns true if the other contact-info is a duplicate instance of this // node, with a more recent `outset` timestamp. #[inline] #[must_use] - pub(crate) fn check_duplicate(&self, other: &ContactInfo) -> bool { + pub(crate) fn check_duplicate(&self, other: &ContactInfo) -> bool { self.pubkey == other.pubkey && self.outset < other.outset } @@ -463,7 +474,7 @@ impl ContactInfo { // If the tuples are equal it returns None. #[inline] #[must_use] - pub(crate) fn overrides(&self, other: &ContactInfo) -> Option { + pub(crate) fn overrides(&self, other: &ContactInfo) -> Option { if self.pubkey != other.pubkey { return None; } @@ -482,7 +493,7 @@ fn get_node_outset() -> u64 { u64::try_from(elapsed.as_micros()).unwrap() } -impl Default for ContactInfo { +impl Default for ContactInfo { fn default() -> Self { Self::new( Pubkey::default(), @@ -492,17 +503,17 @@ impl Default for ContactInfo { } } -impl<'de> Deserialize<'de> for ContactInfo { +impl<'de> Deserialize<'de> for ContactInfo> { fn deserialize(deserializer: D) -> Result where D: Deserializer<'de>, { let node = ContactInfoLite::deserialize(deserializer)?; - ContactInfo::try_from(node).map_err(serde::de::Error::custom) + Self::try_from(node).map_err(serde::de::Error::custom) } } -impl TryFrom for ContactInfo { +impl TryFrom for ContactInfo> { type Error = Error; fn try_from(node: ContactInfoLite) -> Result { @@ -517,7 +528,7 @@ impl TryFrom for ContactInfo { extensions, } = node; sanitize_entries(&addrs, &sockets)?; - let mut node = ContactInfo { + let mut node = Self { pubkey, wallclock, outset, @@ -526,7 +537,7 @@ impl TryFrom for ContactInfo { addrs, sockets, extensions, - cache: EMPTY_SOCKET_ADDR_CACHE, + cache: Box::new(EMPTY_SOCKET_ADDR_CACHE), }; // Populate node.cache. // Only sanitized socket-addrs can be cached! @@ -548,7 +559,42 @@ impl TryFrom for ContactInfo { } } -impl Sanitize for ContactInfo { +macro_rules! impl_convert_from { + ($a:ty, $b:ty, $f:ident, $g:tt) => { + impl From<$a> for ContactInfo<$b> { + #[inline] + fn from(node: $a) -> Self { + Self { + pubkey: node.pubkey, + wallclock: node.wallclock, + outset: node.outset, + shred_version: node.shred_version, + version: node.version.$f(), + addrs: node.addrs.$f(), + sockets: node.sockets.$f(), + extensions: node.extensions.$f(), + cache: $g(node.cache), + } + } + } + }; +} + +impl_convert_from!( + ContactInfo, + Box, + into, + (Box::new) +); +impl_convert_from!( + &ContactInfo, + Box, + clone, + (Box::new) +); +impl_convert_from!(&ContactInfo>, SocketAddrCache, clone, *); + +impl Sanitize for ContactInfo> { fn sanitize(&self) -> Result<(), SanitizeError> { if self.wallclock >= MAX_WALLCLOCK { return Err(SanitizeError::ValueOutOfBounds); @@ -642,7 +688,7 @@ pub(crate) fn get_quic_socket(socket: &SocketAddr) -> Result } #[cfg(all(test, feature = "frozen-abi"))] -impl solana_frozen_abi::abi_example::AbiExample for ContactInfo { +impl solana_frozen_abi::abi_example::AbiExample for ContactInfo { fn example() -> Self { Self { pubkey: Pubkey::example(), @@ -930,8 +976,8 @@ mod tests { .is_ok()); // Assert that serde round trips. let bytes = bincode::serialize(&node).unwrap(); - let other: ContactInfo = bincode::deserialize(&bytes).unwrap(); - assert_eq!(node, other); + let other: ContactInfo<_> = bincode::deserialize(&bytes).unwrap(); + assert_eq!(node, ContactInfo::from(&other)); } } diff --git a/gossip/src/crds.rs b/gossip/src/crds.rs index b30b7df94d85e3..b986c7d2da9aba 100644 --- a/gossip/src/crds.rs +++ b/gossip/src/crds.rs @@ -27,7 +27,7 @@ use { crate::{ - contact_info::ContactInfo, + contact_info::{ContactInfo, SocketAddrCache}, crds_data::CrdsData, crds_entry::CrdsEntry, crds_gossip_pull::CrdsTimeouts, @@ -347,7 +347,9 @@ impl Crds { } /// Returns ContactInfo of all known nodes. - pub(crate) fn get_nodes_contact_info(&self) -> impl Iterator { + pub(crate) fn get_nodes_contact_info( + &self, + ) -> impl Iterator>> { self.get_nodes().map(|v| match v.value.data() { CrdsData::ContactInfo(info) => info, _ => panic!("this should not happen!"), @@ -1379,7 +1381,7 @@ mod tests { // Remove contact-info. Shred version should stay there since there // are still values associated with the pubkey. crds.remove(&CrdsValueLabel::ContactInfo(pubkey), timestamp()); - assert_eq!(crds.get::<&ContactInfo>(pubkey), None); + assert_eq!(crds.get::<&ContactInfo<_>>(pubkey), None); assert_eq!(crds.get_shred_version(&pubkey), Some(8)); // Remove the remaining entry with the same pubkey. crds.remove(&CrdsValueLabel::AccountsHashes(pubkey), timestamp()); diff --git a/gossip/src/crds_data.rs b/gossip/src/crds_data.rs index f122f61ed9aea4..a2e34a7010eea2 100644 --- a/gossip/src/crds_data.rs +++ b/gossip/src/crds_data.rs @@ -1,6 +1,6 @@ use { crate::{ - contact_info::ContactInfo, + contact_info::{ContactInfo, SocketAddrCache}, deprecated, duplicate_shred::{DuplicateShred, DuplicateShredIndex, MAX_DUPLICATE_SHREDS}, epoch_slots::EpochSlots, @@ -44,7 +44,7 @@ pub(crate) const MAX_EPOCH_SLOTS: EpochSlotsIndex = 255; #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] pub enum CrdsData { #[allow(private_interfaces)] - LegacyContactInfo(LegacyContactInfo), + LegacyContactInfo(Box), Vote(VoteIndex, Vote), LowestSlot(/*DEPRECATED:*/ u8, LowestSlot), #[allow(private_interfaces)] @@ -60,7 +60,7 @@ pub enum CrdsData { NodeInstance(NodeInstance), DuplicateShred(DuplicateShredIndex, DuplicateShred), SnapshotHashes(SnapshotHashes), - ContactInfo(ContactInfo), + ContactInfo(ContactInfo>), RestartLastVotedForkSlots(RestartLastVotedForkSlots), RestartHeaviestFork(RestartHeaviestFork), } @@ -203,14 +203,21 @@ impl CrdsData { impl From for CrdsData { #[inline] fn from(node: ContactInfo) -> Self { - Self::ContactInfo(node) + Self::ContactInfo(ContactInfo::from(node)) } } impl From<&ContactInfo> for CrdsData { #[inline] fn from(node: &ContactInfo) -> Self { - Self::ContactInfo(node.clone()) + Self::ContactInfo(ContactInfo::from(node)) + } +} + +impl From>> for CrdsData { + #[inline] + fn from(node: ContactInfo>) -> Self { + Self::ContactInfo(node) } } @@ -545,6 +552,13 @@ mod test { solana_vote_program::{vote_instruction, vote_state}, }; + #[test] + fn test_crds_data_size() { + // This is just so that we can keep an eye on std::mem::size_of + // CrdsData and if a change unknowingly increases it by too much. + assert_eq!(std::mem::size_of::(), 176); + } + #[test] fn test_lowest_slot_sanitize() { let ls = LowestSlot::new(Pubkey::default(), 0, 0); diff --git a/gossip/src/crds_entry.rs b/gossip/src/crds_entry.rs index 43aeda1a686d38..f848c273395bd5 100644 --- a/gossip/src/crds_entry.rs +++ b/gossip/src/crds_entry.rs @@ -1,6 +1,6 @@ use { crate::{ - contact_info::ContactInfo, + contact_info::{self, SocketAddrCache}, crds::VersionedCrdsValue, crds_data::{CrdsData, LowestSlot, SnapshotHashes}, crds_value::{CrdsValue, CrdsValueLabel}, @@ -10,6 +10,7 @@ use { }; type CrdsTable = IndexMap; +type ContactInfo = contact_info::ContactInfo>; /// Represents types which can be looked up from crds table given a key. e.g. /// CrdsValueLabel -> VersionedCrdsValue, CrdsValue, CrdsData @@ -24,6 +25,7 @@ macro_rules! impl_crds_entry ( ($name:ident, |$entry:ident| $body:expr) => ( impl<'a, 'b> CrdsEntry<'a, 'b> for &'a $name { type Key = &'b CrdsValueLabel; + #[inline] fn get_entry(table:&'a CrdsTable, key: Self::Key) -> Option { let $entry = table.get(key); $body @@ -34,6 +36,7 @@ macro_rules! impl_crds_entry ( ($name:ident, $pat:pat, $expr:expr) => ( impl<'a, 'b> CrdsEntry<'a, 'b> for &'a $name { type Key = Pubkey; + #[inline] fn get_entry(table:&'a CrdsTable, key: Self::Key) -> Option { let key = CrdsValueLabel::$name(key); match table.get(&key)?.value.data() { diff --git a/gossip/src/crds_gossip.rs b/gossip/src/crds_gossip.rs index c546a483cb1aa0..bd1f36aac00ffe 100644 --- a/gossip/src/crds_gossip.rs +++ b/gossip/src/crds_gossip.rs @@ -352,7 +352,7 @@ pub(crate) fn get_gossip_nodes( None => true, } }) - .cloned() + .map(ContactInfo::from) .collect() } diff --git a/gossip/src/crds_gossip_pull.rs b/gossip/src/crds_gossip_pull.rs index 8d15c5e55ca769..3723011aa1e224 100644 --- a/gossip/src/crds_gossip_pull.rs +++ b/gossip/src/crds_gossip_pull.rs @@ -335,7 +335,7 @@ impl CrdsGossipPull { } else if now <= response.wallclock().saturating_add(timeout) { active_values.push(response); None - } else if crds.get::<&ContactInfo>(owner).is_some() { + } else if crds.get::<&ContactInfo<_>>(owner).is_some() { // Silently insert this old value without bumping record // timestamps expired_values.push(response); @@ -923,7 +923,7 @@ pub(crate) mod tests { &SocketAddrSpace::Unspecified, ); let peers: Vec<_> = req.unwrap().into_iter().map(|(node, _)| node).collect(); - assert_eq!(peers, vec![new.contact_info().unwrap().clone()]); + assert_eq!(peers, vec![new.contact_info().unwrap().into()]); let offline = ContactInfo::new_localhost(&solana_pubkey::new_rand(), now); let offline = CrdsValue::new_unsigned(CrdsData::from(offline)); @@ -947,7 +947,7 @@ pub(crate) mod tests { // Even though the offline node should have higher weight, we shouldn't request from it // until we receive a ping. let peers: Vec<_> = req.unwrap().into_iter().map(|(node, _)| node).collect(); - assert_eq!(peers, vec![new.contact_info().unwrap().clone()]); + assert_eq!(peers, vec![new.contact_info().unwrap().into()]); } #[test] @@ -1002,7 +1002,7 @@ pub(crate) mod tests { }) .flatten() .take(100) - .filter(|peer| peer != old) + .filter(|peer| peer != &ContactInfo::from(old)) .count(); assert!(count < 75, "count of peer != old: {count}"); } @@ -1478,7 +1478,7 @@ pub(crate) mod tests { } let node = LegacyContactInfo::try_from(&node).unwrap(); { - let caller = CrdsValue::new(CrdsData::LegacyContactInfo(node), &keypair); + let caller = CrdsValue::new(CrdsData::LegacyContactInfo(Box::new(node)), &keypair); assert_eq!(get_max_bloom_filter_bytes(&caller), 1136); verify_get_max_bloom_filter_bytes(&mut rng, &caller, num_items); } @@ -1496,7 +1496,7 @@ pub(crate) mod tests { } let node = LegacyContactInfo::try_from(&node).unwrap(); { - let caller = CrdsValue::new(CrdsData::LegacyContactInfo(node), &keypair); + let caller = CrdsValue::new(CrdsData::LegacyContactInfo(Box::new(node)), &keypair); assert_eq!(get_max_bloom_filter_bytes(&caller), 992); verify_get_max_bloom_filter_bytes(&mut rng, &caller, num_items); } diff --git a/gossip/src/crds_value.rs b/gossip/src/crds_value.rs index b40d7e6f6221c0..581f1ee77f983c 100644 --- a/gossip/src/crds_value.rs +++ b/gossip/src/crds_value.rs @@ -1,6 +1,6 @@ use { crate::{ - contact_info::ContactInfo, + contact_info::{ContactInfo, SocketAddrCache}, crds_data::{CrdsData, EpochSlotsIndex, VoteIndex}, duplicate_shred::DuplicateShredIndex, epoch_slots::EpochSlots, @@ -185,7 +185,7 @@ impl CrdsValue { } } - pub(crate) fn contact_info(&self) -> Option<&ContactInfo> { + pub(crate) fn contact_info(&self) -> Option<&ContactInfo>> { let CrdsData::ContactInfo(node) = &self.data else { return None; }; diff --git a/gossip/src/gossip_service.rs b/gossip/src/gossip_service.rs index 23abc6d2dc194c..7d82f7e563dc97 100644 --- a/gossip/src/gossip_service.rs +++ b/gossip/src/gossip_service.rs @@ -400,7 +400,7 @@ mod tests { let (met_criteria, elapsed, _, tvu_peers) = spy(spy_ref.clone(), None, TIMEOUT, None, None); assert!(!met_criteria); assert!((TIMEOUT..TIMEOUT + Duration::from_secs(1)).contains(&elapsed)); - assert_eq!(tvu_peers, spy_ref.tvu_peers(ContactInfo::clone)); + assert_eq!(tvu_peers, spy_ref.tvu_peers(|node| ContactInfo::from(node))); // Find num_nodes let (met_criteria, _, _, _) = spy(spy_ref.clone(), Some(1), TIMEOUT, None, None); diff --git a/gossip/tests/crds_gossip.rs b/gossip/tests/crds_gossip.rs index 4bf69c480f76ac..81092565c2531b 100644 --- a/gossip/tests/crds_gossip.rs +++ b/gossip/tests/crds_gossip.rs @@ -313,7 +313,10 @@ fn network_simulator(thread_pool: &ThreadPool, network: &mut Network, max_conver let node_pubkey = node.keypair.pubkey(); let mut m = { let node_crds = node.gossip.crds.read().unwrap(); - node_crds.get::<&ContactInfo>(node_pubkey).cloned().unwrap() + node_crds + .get::<&ContactInfo<_>>(node_pubkey) + .cloned() + .unwrap() }; m.set_wallclock(now); node.gossip.process_push_message( diff --git a/gossip/tests/gossip.rs b/gossip/tests/gossip.rs index cb13722509f12b..d296276827cf94 100644 --- a/gossip/tests/gossip.rs +++ b/gossip/tests/gossip.rs @@ -257,7 +257,7 @@ pub fn cluster_info_retransmit() { assert!(done); let mut p = Packet::default(); p.meta_mut().size = 10; - let peers = c1.tvu_peers(ContactInfo::clone); + let peers = c1.tvu_peers(|node| ContactInfo::from(node)); let retransmit_peers: Vec<_> = peers.iter().collect(); retransmit_to( &retransmit_peers, diff --git a/turbine/src/cluster_nodes.rs b/turbine/src/cluster_nodes.rs index 7c5f17dd272794..99076f60247f03 100644 --- a/turbine/src/cluster_nodes.rs +++ b/turbine/src/cluster_nodes.rs @@ -141,7 +141,7 @@ impl ContactInfo { // Removes respective TVU address from the ContactInfo so that no more // shreds are sent to that socket address. #[inline] - fn remove_tvu_addr(&mut self, protocol: Protocol) { + fn remove_tvu(&mut self, protocol: Protocol) { match protocol { Protocol::QUIC => { self.tvu_quic = None; @@ -309,6 +309,23 @@ pub fn new_cluster_nodes( } } +// Defines a closure which converts from &GossipContactInfo<_> to ContactInfo. +// Same can be achieved with +// impl> From<&GossipContactInfo> for ContactInfo +// but SocketAddrCache type is private to gossip. +macro_rules! from_gossip_node { + () => { + |node: &GossipContactInfo<_>| -> ContactInfo { + ContactInfo { + pubkey: *node.pubkey(), + wallclock: node.wallclock(), + tvu_quic: node.tvu(Protocol::QUIC), + tvu_udp: node.tvu(Protocol::UDP), + } + } + }; +} + // All staked nodes + other known tvu-peers + the node itself; // sorted by (stake, pubkey) in descending order. fn get_nodes( @@ -324,14 +341,14 @@ fn get_nodes( let mut nodes: Vec = std::iter::once({ // The local node itself. let stake = stakes.get(&self_pubkey).copied().unwrap_or_default(); - let node = ContactInfo::from(&cluster_info.my_contact_info()); + let node = from_gossip_node!()(&cluster_info.my_contact_info()); let node = NodeId::from(node); Node { node, stake } }) // All known tvu-peers from gossip. .chain( cluster_info - .tvu_peers(|node| ContactInfo::from(node)) + .tvu_peers(from_gossip_node!()) .into_iter() .map(|node| { let stake = stakes.get(node.pubkey()).copied().unwrap_or_default(); @@ -411,7 +428,7 @@ fn dedup_tvu_addrs(nodes: &mut Vec) { if !addrs.insert((protocol, addr)) || count > MAX_NUM_NODES_PER_IP_ADDRESS { // Remove the respective TVU address so that no more shreds are // sent to this socket address. - node.remove_tvu_addr(protocol); + node.remove_tvu(protocol); } } // Always keep staked nodes for deterministic shuffle, @@ -571,18 +588,6 @@ impl From for NodeId { } } -impl From<&GossipContactInfo> for ContactInfo { - #[inline] - fn from(node: &GossipContactInfo) -> Self { - Self { - pubkey: *node.pubkey(), - wallclock: node.wallclock(), - tvu_quic: node.tvu(Protocol::QUIC), - tvu_udp: node.tvu(Protocol::UDP), - } - } -} - #[inline] pub(crate) fn get_broadcast_protocol(_: &ShredId) -> Protocol { Protocol::UDP @@ -991,7 +996,7 @@ mod tests { let node = GossipContactInfo::new_localhost(&pubkey, /*wallclock:*/ timestamp()); [ Node { - node: NodeId::from(ContactInfo::from(&node)), + node: NodeId::from(from_gossip_node!()(&node)), stake, }, Node {