Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reduces std::mem::size_of::<gossip::CrdsData>() #4391

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/src/repair/serve_repair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1274,7 +1274,8 @@ impl ServeRepair {
.iter()
.filter_map(|key| {
if *key != self.my_id() {
self.cluster_info.lookup_contact_info(key, |ci| ci.clone())
self.cluster_info
.lookup_contact_info(key, |node| ContactInfo::from(node))
} else {
None
}
Expand Down
29 changes: 16 additions & 13 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,11 @@ impl ClusterInfo {
}

// TODO kill insert_info, only used by tests
pub fn insert_info(&self, node: ContactInfo) {
let entry = CrdsValue::new(CrdsData::ContactInfo(node), &self.keypair());
pub fn insert_info<T>(&self, data: T)
where
CrdsData: From<T>,
{
let entry = CrdsValue::new(CrdsData::from(data), &self.keypair());
if let Err(err) = {
let mut gossip_crds = self.gossip.crds.write().unwrap();
gossip_crds.insert(entry, timestamp(), GossipRoute::LocalMessage)
Expand Down Expand Up @@ -474,7 +477,7 @@ impl ClusterInfo {
let mut nodes = gossip_crds.get_nodes_contact_info();
nodes
.find(|node| node.gossip() == Some(*gossip_addr))
.cloned()
.map(ContactInfo::from)
}

pub fn my_contact_info(&self) -> ContactInfo {
Expand Down Expand Up @@ -1048,7 +1051,7 @@ impl ClusterInfo {
pub fn get_node_version(&self, pubkey: &Pubkey) -> Option<solana_version::Version> {
let gossip_crds = self.gossip.crds.read().unwrap();
gossip_crds
.get::<&ContactInfo>(*pubkey)
.get::<&ContactInfo<_>>(*pubkey)
.map(ContactInfo::version)
.cloned()
}
Expand All @@ -1068,7 +1071,7 @@ impl ClusterInfo {
.filter(|node| {
node.pubkey() != &self_pubkey && self.check_socket_addr_space(&node.rpc())
})
.cloned()
.map(ContactInfo::from)
.collect()
}

Expand All @@ -1077,7 +1080,7 @@ impl ClusterInfo {
let gossip_crds = self.gossip.crds.read().unwrap();
gossip_crds
.get_nodes()
.map(|x| (x.value.contact_info().unwrap().clone(), x.local_timestamp))
.map(|x| (x.value.contact_info().unwrap().into(), x.local_timestamp))
.collect()
}

Expand All @@ -1088,7 +1091,7 @@ impl ClusterInfo {
.get_nodes_contact_info()
// shred_version not considered for gossip peers (ie, spy nodes do not set shred_version)
.filter(|node| node.pubkey() != &me && self.check_socket_addr_space(&node.gossip()))
.cloned()
.map(ContactInfo::from)
.collect()
}

Expand All @@ -1101,7 +1104,7 @@ impl ClusterInfo {
node.pubkey() != &self_pubkey
&& self.check_socket_addr_space(&node.tvu(contact_info::Protocol::UDP))
})
.cloned()
.map(ContactInfo::from)
.collect()
}

Expand Down Expand Up @@ -1138,7 +1141,7 @@ impl ClusterInfo {
Some(lowest_slot) => lowest_slot.lowest <= slot,
}
})
.cloned()
.map(ContactInfo::from)
.collect()
}

Expand All @@ -1165,7 +1168,7 @@ impl ClusterInfo {
node.pubkey() != &self_pubkey
&& self.check_socket_addr_space(&node.tpu(contact_info::Protocol::UDP))
})
.cloned()
.map(ContactInfo::from)
.collect()
}

Expand All @@ -1175,7 +1178,7 @@ impl ClusterInfo {
let node = {
let mut node = self.my_contact_info.write().unwrap();
node.set_wallclock(timestamp());
node.clone()
ContactInfo::from(node.deref())
};
let entries: Vec<_> = [
CrdsData::ContactInfo(node),
Expand Down Expand Up @@ -1328,7 +1331,7 @@ impl ClusterInfo {
push_messages
.into_iter()
.filter_map(|(pubkey, messages)| {
let peer: &ContactInfo = gossip_crds.get(pubkey)?;
let peer: &ContactInfo<_> = gossip_crds.get(pubkey)?;
Some((peer.gossip()?, messages))
})
.collect()
Expand Down Expand Up @@ -2069,7 +2072,7 @@ impl ClusterInfo {
prunes
.into_par_iter()
.filter_map(|(pubkey, prunes)| {
let addr = gossip_crds.get::<&ContactInfo>(pubkey)?.gossip()?;
let addr = gossip_crds.get::<&ContactInfo<_>>(pubkey)?.gossip()?;
Some((pubkey, addr, prunes))
})
.collect()
Expand Down
86 changes: 66 additions & 20 deletions gossip/src/contact_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use {
solana_streamer::socket::SocketAddrSpace,
static_assertions::const_assert_eq,
std::{
borrow::Borrow,
cmp::Ordering,
collections::HashSet,
net::{IpAddr, Ipv4Addr, SocketAddr},
Expand Down Expand Up @@ -42,10 +43,12 @@ const SOCKET_TAG_TVU_QUIC: u8 = 11;
const_assert_eq!(SOCKET_CACHE_SIZE, 13);
const SOCKET_CACHE_SIZE: usize = SOCKET_TAG_TPU_VOTE_QUIC as usize + 1usize;

pub(crate) type SocketAddrCache = [SocketAddr; SOCKET_CACHE_SIZE];

// An alias for a function that reads data from a ContactInfo entry stored in
// the gossip CRDS table.
pub trait ContactInfoQuery<R>: Fn(&ContactInfo) -> R {}
impl<R, F: Fn(&ContactInfo) -> R> ContactInfoQuery<R> for F {}
pub trait ContactInfoQuery<R>: Fn(&ContactInfo<Box<SocketAddrCache>>) -> R {}
impl<R, F: Fn(&ContactInfo<Box<SocketAddrCache>>) -> R> ContactInfoQuery<R> for F {}

#[derive(Copy, Clone, Debug, Eq, Error, PartialEq)]
pub enum Error {
Expand Down Expand Up @@ -74,7 +77,7 @@ pub enum Error {
}

#[derive(Clone, Debug, Eq, PartialEq, Serialize)]
pub struct ContactInfo {
pub struct ContactInfo<T = SocketAddrCache> {
pubkey: Pubkey,
#[serde(with = "serde_varint")]
wallclock: u64,
Expand All @@ -93,7 +96,7 @@ pub struct ContactInfo {
extensions: Vec<Extension>,
// Only sanitized socket-addrs can be cached!
#[serde(skip_serializing)]
cache: [SocketAddr; SOCKET_CACHE_SIZE],
cache: T,
}

#[cfg_attr(feature = "frozen-abi", derive(AbiExample))]
Expand Down Expand Up @@ -132,7 +135,7 @@ macro_rules! get_socket {
($name:ident, $key:ident) => {
#[inline]
pub fn $name(&self) -> Option<SocketAddr> {
let socket = &self.cache[usize::from($key)];
let socket = &self.cache.borrow()[usize::from($key)];
(socket != &SOCKET_ADDR_UNSPECIFIED)
.then_some(socket)
.copied()
Expand All @@ -145,7 +148,7 @@ macro_rules! get_socket {
Protocol::QUIC => $quic,
Protocol::UDP => $udp,
};
let socket = &self.cache[usize::from(key)];
let socket = &self.cache.borrow()[usize::from(key)];
(socket != &SOCKET_ADDR_UNSPECIFIED)
.then_some(socket)
.copied()
Expand Down Expand Up @@ -189,7 +192,7 @@ macro_rules! remove_socket {
};
}

impl ContactInfo {
impl ContactInfo<SocketAddrCache> {
pub fn new(pubkey: Pubkey, wallclock: u64, shred_version: u16) -> Self {
Self {
pubkey,
Expand All @@ -203,7 +206,9 @@ impl ContactInfo {
cache: EMPTY_SOCKET_ADDR_CACHE,
}
}
}

impl<T> ContactInfo<T> {
#[inline]
pub fn pubkey(&self) -> &Pubkey {
&self.pubkey
Expand Down Expand Up @@ -238,7 +243,9 @@ impl ContactInfo {
pub fn set_shred_version(&mut self, shred_version: u16) {
self.shred_version = shred_version
}
}

impl<T: Borrow<SocketAddrCache>> ContactInfo<T> {
get_socket!(gossip, SOCKET_TAG_GOSSIP);
get_socket!(rpc, SOCKET_TAG_RPC);
get_socket!(rpc_pubsub, SOCKET_TAG_RPC_PUBSUB);
Expand All @@ -255,7 +262,9 @@ impl ContactInfo {
);
get_socket!(tpu_vote, SOCKET_TAG_TPU_VOTE, SOCKET_TAG_TPU_VOTE_QUIC);
get_socket!(tvu, SOCKET_TAG_TVU, SOCKET_TAG_TVU_QUIC);
}

impl ContactInfo<SocketAddrCache> {
set_socket!(set_gossip, SOCKET_TAG_GOSSIP);
set_socket!(set_rpc, SOCKET_TAG_RPC);
set_socket!(set_rpc_pubsub, SOCKET_TAG_RPC_PUBSUB);
Expand Down Expand Up @@ -389,7 +398,7 @@ impl ContactInfo {
let delay = 10 * 60 * 1000; // 10 minutes
let now = solana_sdk::timing::timestamp() - delay + rng.gen_range(0..2 * delay);
let pubkey = pubkey.unwrap_or_else(solana_pubkey::new_rand);
let mut node = ContactInfo::new_localhost(&pubkey, now);
let mut node = Self::new_localhost(&pubkey, now);
let _ = node.set_gossip((Ipv4Addr::LOCALHOST, rng.gen_range(1024..u16::MAX)));
node
}
Expand Down Expand Up @@ -448,12 +457,14 @@ impl ContactInfo {
node.set_serve_repair_quic((addr, port + 4)).unwrap();
node
}
}

impl<T> ContactInfo<T> {
// Returns true if the other contact-info is a duplicate instance of this
// node, with a more recent `outset` timestamp.
#[inline]
#[must_use]
pub(crate) fn check_duplicate(&self, other: &ContactInfo) -> bool {
pub(crate) fn check_duplicate<S>(&self, other: &ContactInfo<S>) -> bool {
self.pubkey == other.pubkey && self.outset < other.outset
}

Expand All @@ -463,7 +474,7 @@ impl ContactInfo {
// If the tuples are equal it returns None.
#[inline]
#[must_use]
pub(crate) fn overrides(&self, other: &ContactInfo) -> Option<bool> {
pub(crate) fn overrides<S>(&self, other: &ContactInfo<S>) -> Option<bool> {
if self.pubkey != other.pubkey {
return None;
}
Expand All @@ -482,7 +493,7 @@ fn get_node_outset() -> u64 {
u64::try_from(elapsed.as_micros()).unwrap()
}

impl Default for ContactInfo {
impl Default for ContactInfo<SocketAddrCache> {
fn default() -> Self {
Self::new(
Pubkey::default(),
Expand All @@ -492,17 +503,17 @@ impl Default for ContactInfo {
}
}

impl<'de> Deserialize<'de> for ContactInfo {
impl<'de> Deserialize<'de> for ContactInfo<Box<SocketAddrCache>> {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let node = ContactInfoLite::deserialize(deserializer)?;
ContactInfo::try_from(node).map_err(serde::de::Error::custom)
Self::try_from(node).map_err(serde::de::Error::custom)
}
}

impl TryFrom<ContactInfoLite> for ContactInfo {
impl TryFrom<ContactInfoLite> for ContactInfo<Box<SocketAddrCache>> {
type Error = Error;

fn try_from(node: ContactInfoLite) -> Result<Self, Self::Error> {
Expand All @@ -517,7 +528,7 @@ impl TryFrom<ContactInfoLite> for ContactInfo {
extensions,
} = node;
sanitize_entries(&addrs, &sockets)?;
let mut node = ContactInfo {
let mut node = Self {
pubkey,
wallclock,
outset,
Expand All @@ -526,7 +537,7 @@ impl TryFrom<ContactInfoLite> for ContactInfo {
addrs,
sockets,
extensions,
cache: EMPTY_SOCKET_ADDR_CACHE,
cache: Box::new(EMPTY_SOCKET_ADDR_CACHE),
};
// Populate node.cache.
// Only sanitized socket-addrs can be cached!
Expand All @@ -548,7 +559,42 @@ impl TryFrom<ContactInfoLite> for ContactInfo {
}
}

impl Sanitize for ContactInfo {
macro_rules! impl_convert_from {
($a:ty, $b:ty, $f:ident, $g:tt) => {
impl From<$a> for ContactInfo<$b> {
#[inline]
fn from(node: $a) -> Self {
Self {
pubkey: node.pubkey,
wallclock: node.wallclock,
outset: node.outset,
shred_version: node.shred_version,
version: node.version.$f(),
addrs: node.addrs.$f(),
sockets: node.sockets.$f(),
extensions: node.extensions.$f(),
cache: $g(node.cache),
}
}
}
};
}

impl_convert_from!(
ContactInfo<SocketAddrCache>,
Box<SocketAddrCache>,
into,
(Box::new)
);
impl_convert_from!(
&ContactInfo<SocketAddrCache>,
Box<SocketAddrCache>,
clone,
(Box::new)
);
impl_convert_from!(&ContactInfo<Box<SocketAddrCache>>, SocketAddrCache, clone, *);

impl Sanitize for ContactInfo<Box<SocketAddrCache>> {
fn sanitize(&self) -> Result<(), SanitizeError> {
if self.wallclock >= MAX_WALLCLOCK {
return Err(SanitizeError::ValueOutOfBounds);
Expand Down Expand Up @@ -642,7 +688,7 @@ pub(crate) fn get_quic_socket(socket: &SocketAddr) -> Result<SocketAddr, Error>
}

#[cfg(all(test, feature = "frozen-abi"))]
impl solana_frozen_abi::abi_example::AbiExample for ContactInfo {
impl solana_frozen_abi::abi_example::AbiExample for ContactInfo<SocketAddrCache> {
fn example() -> Self {
Self {
pubkey: Pubkey::example(),
Expand Down Expand Up @@ -930,8 +976,8 @@ mod tests {
.is_ok());
// Assert that serde round trips.
let bytes = bincode::serialize(&node).unwrap();
let other: ContactInfo = bincode::deserialize(&bytes).unwrap();
assert_eq!(node, other);
let other: ContactInfo<_> = bincode::deserialize(&bytes).unwrap();
assert_eq!(node, ContactInfo::from(&other));
}
}

Expand Down
8 changes: 5 additions & 3 deletions gossip/src/crds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

use {
crate::{
contact_info::ContactInfo,
contact_info::{ContactInfo, SocketAddrCache},
crds_data::CrdsData,
crds_entry::CrdsEntry,
crds_gossip_pull::CrdsTimeouts,
Expand Down Expand Up @@ -347,7 +347,9 @@ impl Crds {
}

/// Returns ContactInfo of all known nodes.
pub(crate) fn get_nodes_contact_info(&self) -> impl Iterator<Item = &ContactInfo> {
pub(crate) fn get_nodes_contact_info(
&self,
) -> impl Iterator<Item = &ContactInfo<Box<SocketAddrCache>>> {
self.get_nodes().map(|v| match v.value.data() {
CrdsData::ContactInfo(info) => info,
_ => panic!("this should not happen!"),
Expand Down Expand Up @@ -1379,7 +1381,7 @@ mod tests {
// Remove contact-info. Shred version should stay there since there
// are still values associated with the pubkey.
crds.remove(&CrdsValueLabel::ContactInfo(pubkey), timestamp());
assert_eq!(crds.get::<&ContactInfo>(pubkey), None);
assert_eq!(crds.get::<&ContactInfo<_>>(pubkey), None);
assert_eq!(crds.get_shred_version(&pubkey), Some(8));
// Remove the remaining entry with the same pubkey.
crds.remove(&CrdsValueLabel::AccountsHashes(pubkey), timestamp());
Expand Down
Loading
Loading