From e0d672b5de93d7f923d272490eb1cf21ae80e7c6 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Fri, 17 Jan 2025 13:55:01 +0000 Subject: [PATCH] removes ContactInfo::set_{serve_repair,tpu_vote,tvu}_quic api (#4505) It is not obvious that ContactInfo::set_{serve_repair,tpu_vote,tvu}(...) are only setting the UDP port. Also having separate api for ContactInfo::set_{serve_repair,tpu_vote,tvu}_quic(...) is redundant and the intended protocol can instead be explicitly passed as an argument. For example: node.set_tvu(Protocol::QUIC, socket) or node.set_tvu(Protocol::UDP, socket) --- core/src/repair/serve_repair.rs | 21 +++++----- gossip/src/cluster_info.rs | 59 ++++++++++++++++++++--------- gossip/src/contact_info.rs | 53 +++++++++++++++++--------- gossip/src/crds_gossip_pull.rs | 6 +-- turbine/benches/retransmit_stage.rs | 4 +- 5 files changed, 95 insertions(+), 48 deletions(-) diff --git a/core/src/repair/serve_repair.rs b/core/src/repair/serve_repair.rs index 4e8c7a59d3ec42..9970941323f74e 100644 --- a/core/src/repair/serve_repair.rs +++ b/core/src/repair/serve_repair.rs @@ -1989,6 +1989,7 @@ mod tests { #[test] fn window_index_request() { + use Protocol::{QUIC, UDP}; let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); let bank = Bank::new_for_tests(&genesis_config); let bank_forks = BankForks::new_rw_arc(bank); @@ -2022,15 +2023,15 @@ mod tests { 0u16, // shred_version ); nxt.set_gossip((Ipv4Addr::LOCALHOST, 1234)).unwrap(); - nxt.set_tvu((Ipv4Addr::LOCALHOST, 1235)).unwrap(); - nxt.set_tvu_quic((Ipv4Addr::LOCALHOST, 1236)).unwrap(); + nxt.set_tvu(UDP, (Ipv4Addr::LOCALHOST, 1235)).unwrap(); + nxt.set_tvu(QUIC, (Ipv4Addr::LOCALHOST, 1236)).unwrap(); nxt.set_tpu((Ipv4Addr::LOCALHOST, 1238)).unwrap(); nxt.set_tpu_forwards((Ipv4Addr::LOCALHOST, 1239)).unwrap(); - nxt.set_tpu_vote((Ipv4Addr::LOCALHOST, 1240)).unwrap(); + nxt.set_tpu_vote(UDP, (Ipv4Addr::LOCALHOST, 1240)).unwrap(); nxt.set_rpc((Ipv4Addr::LOCALHOST, 1241)).unwrap(); nxt.set_rpc_pubsub((Ipv4Addr::LOCALHOST, 1242)).unwrap(); - nxt.set_serve_repair(serve_repair_addr).unwrap(); - nxt.set_serve_repair_quic((Ipv4Addr::LOCALHOST, 1237)) + nxt.set_serve_repair(UDP, serve_repair_addr).unwrap(); + nxt.set_serve_repair(QUIC, (Ipv4Addr::LOCALHOST, 1237)) .unwrap(); cluster_info.insert_info(nxt.clone()); let rv = serve_repair @@ -2057,15 +2058,15 @@ mod tests { 0u16, // shred_version ); nxt.set_gossip((Ipv4Addr::LOCALHOST, 1234)).unwrap(); - nxt.set_tvu((Ipv4Addr::LOCALHOST, 1235)).unwrap(); - nxt.set_tvu_quic((Ipv4Addr::LOCALHOST, 1236)).unwrap(); + nxt.set_tvu(UDP, (Ipv4Addr::LOCALHOST, 1235)).unwrap(); + nxt.set_tvu(QUIC, (Ipv4Addr::LOCALHOST, 1236)).unwrap(); nxt.set_tpu((Ipv4Addr::LOCALHOST, 1238)).unwrap(); nxt.set_tpu_forwards((Ipv4Addr::LOCALHOST, 1239)).unwrap(); - nxt.set_tpu_vote((Ipv4Addr::LOCALHOST, 1240)).unwrap(); + nxt.set_tpu_vote(UDP, (Ipv4Addr::LOCALHOST, 1240)).unwrap(); nxt.set_rpc((Ipv4Addr::LOCALHOST, 1241)).unwrap(); nxt.set_rpc_pubsub((Ipv4Addr::LOCALHOST, 1242)).unwrap(); - nxt.set_serve_repair(serve_repair_addr2).unwrap(); - nxt.set_serve_repair_quic((Ipv4Addr::LOCALHOST, 1237)) + nxt.set_serve_repair(UDP, serve_repair_addr2).unwrap(); + nxt.set_serve_repair(QUIC, (Ipv4Addr::LOCALHOST, 1237)) .unwrap(); cluster_info.insert_info(nxt); let mut one = false; diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index bdb39ba5099de4..7d376d64307877 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -2650,19 +2650,32 @@ impl Node { $name )) }; + ($method:ident, $protocol:ident, $addr:expr, $name:literal) => {{ + info.$method(contact_info::Protocol::$protocol, $addr) + .expect(&format!( + "Operator must spin up node with valid {} address", + $name + )) + }}; } set_socket!(set_gossip, gossip_addr, "gossip"); - set_socket!(set_tvu, tvu.local_addr().unwrap(), "TVU"); - set_socket!(set_tvu_quic, tvu_quic.local_addr().unwrap(), "TVU QUIC"); + set_socket!(set_tvu, UDP, tvu.local_addr().unwrap(), "TVU"); + set_socket!(set_tvu, QUIC, tvu_quic.local_addr().unwrap(), "TVU QUIC"); set_socket!(set_tpu, tpu.local_addr().unwrap(), "TPU"); set_socket!( set_tpu_forwards, tpu_forwards.local_addr().unwrap(), "TPU-forwards" ); - set_socket!(set_tpu_vote, tpu_vote.local_addr().unwrap(), "TPU-vote"); set_socket!( - set_tpu_vote_quic, + set_tpu_vote, + UDP, + tpu_vote.local_addr().unwrap(), + "TPU-vote" + ); + set_socket!( + set_tpu_vote, + QUIC, tpu_vote_quic[0].local_addr().unwrap(), "TPU-vote QUIC" ); @@ -2670,11 +2683,13 @@ impl Node { set_socket!(set_rpc_pubsub, rpc_pubsub_addr, "RPC-pubsub"); set_socket!( set_serve_repair, + UDP, serve_repair.local_addr().unwrap(), "serve-repair" ); set_socket!( - set_serve_repair_quic, + set_serve_repair, + QUIC, serve_repair_quic.local_addr().unwrap(), "serve-repair QUIC" ); @@ -2814,22 +2829,30 @@ impl Node { $name )) }; + ($method:ident, $protocol:ident, $port:ident, $name:literal) => {{ + info.$method(contact_info::Protocol::$protocol, (addr, $port)) + .expect(&format!( + "Operator must spin up node with valid {} address", + $name + )) + }}; } set_socket!(set_gossip, gossip_port, "gossip"); - set_socket!(set_tvu, tvu_port, "TVU"); - set_socket!(set_tvu_quic, tvu_quic_port, "TVU QUIC"); + set_socket!(set_tvu, UDP, tvu_port, "TVU"); + set_socket!(set_tvu, QUIC, tvu_quic_port, "TVU QUIC"); set_socket!(set_tpu, tpu_port, "TPU"); set_socket!(set_tpu_forwards, tpu_forwards_port, "TPU-forwards"); - set_socket!(set_tpu_vote, tpu_vote_port, "TPU-vote"); + set_socket!(set_tpu_vote, UDP, tpu_vote_port, "TPU-vote"); + set_socket!(set_tpu_vote, QUIC, tpu_vote_quic_port, "TPU-vote QUIC"); set_socket!(set_rpc, rpc_port, "RPC"); set_socket!(set_rpc_pubsub, rpc_pubsub_port, "RPC-pubsub"); - set_socket!(set_serve_repair, serve_repair_port, "serve-repair"); + set_socket!(set_serve_repair, UDP, serve_repair_port, "serve-repair"); set_socket!( - set_serve_repair_quic, + set_serve_repair, + QUIC, serve_repair_quic_port, "serve-repair QUIC" ); - set_socket!(set_tpu_vote_quic, tpu_vote_quic_port, "TPU-vote QUIC"); trace!("new ContactInfo: {:?}", info); @@ -2959,20 +2982,22 @@ impl Node { 0u16, // shred_version ); let addr = gossip_addr.ip(); + use contact_info::Protocol::{QUIC, UDP}; info.set_gossip((addr, gossip_port)).unwrap(); - info.set_tvu((addr, tvu_port)).unwrap(); - info.set_tvu_quic((addr, tvu_quic_port)).unwrap(); + info.set_tvu(UDP, (addr, tvu_port)).unwrap(); + info.set_tvu(QUIC, (addr, tvu_quic_port)).unwrap(); info.set_tpu(public_tpu_addr.unwrap_or_else(|| SocketAddr::new(addr, tpu_port))) .unwrap(); info.set_tpu_forwards( public_tpu_forwards_addr.unwrap_or_else(|| SocketAddr::new(addr, tpu_forwards_port)), ) .unwrap(); - info.set_tpu_vote((addr, tpu_vote_port)).unwrap(); - info.set_serve_repair((addr, serve_repair_port)).unwrap(); - info.set_serve_repair_quic((addr, serve_repair_quic_port)) + info.set_tpu_vote(UDP, (addr, tpu_vote_port)).unwrap(); + info.set_tpu_vote(QUIC, (addr, tpu_vote_quic_port)).unwrap(); + info.set_serve_repair(UDP, (addr, serve_repair_port)) + .unwrap(); + info.set_serve_repair(QUIC, (addr, serve_repair_quic_port)) .unwrap(); - info.set_tpu_vote_quic((addr, tpu_vote_quic_port)).unwrap(); trace!("new ContactInfo: {:?}", info); diff --git a/gossip/src/contact_info.rs b/gossip/src/contact_info.rs index 3e0d66f9a2f1ba..8c11ecb91f3d37 100644 --- a/gossip/src/contact_info.rs +++ b/gossip/src/contact_info.rs @@ -173,6 +173,19 @@ macro_rules! set_socket { self.set_socket($quic, get_quic_socket(&socket)?) } }; + (@multi $name:ident, $udp:ident, $quic:ident) => { + pub fn $name(&mut self, protocol: Protocol, socket: T) -> Result<(), Error> + where + SocketAddr: From, + { + let socket = SocketAddr::from(socket); + let key = match protocol { + Protocol::UDP => $udp, + Protocol::QUIC => $quic, + }; + self.set_socket(key, socket) + } + }; } macro_rules! remove_socket { @@ -259,18 +272,15 @@ impl ContactInfo { set_socket!(set_gossip, SOCKET_TAG_GOSSIP); set_socket!(set_rpc, SOCKET_TAG_RPC); set_socket!(set_rpc_pubsub, SOCKET_TAG_RPC_PUBSUB); - set_socket!(set_serve_repair, SOCKET_TAG_SERVE_REPAIR); - set_socket!(set_serve_repair_quic, SOCKET_TAG_SERVE_REPAIR_QUIC); set_socket!(set_tpu, SOCKET_TAG_TPU, SOCKET_TAG_TPU_QUIC); set_socket!( set_tpu_forwards, SOCKET_TAG_TPU_FORWARDS, SOCKET_TAG_TPU_FORWARDS_QUIC ); - set_socket!(set_tpu_vote, SOCKET_TAG_TPU_VOTE); - set_socket!(set_tpu_vote_quic, SOCKET_TAG_TPU_VOTE_QUIC); - set_socket!(set_tvu, SOCKET_TAG_TVU); - set_socket!(set_tvu_quic, SOCKET_TAG_TVU_QUIC); + set_socket!(@multi set_serve_repair, SOCKET_TAG_SERVE_REPAIR, SOCKET_TAG_SERVE_REPAIR_QUIC); + set_socket!(@multi set_tpu_vote, SOCKET_TAG_TPU_VOTE, SOCKET_TAG_TPU_VOTE_QUIC); + set_socket!(@multi set_tvu, SOCKET_TAG_TVU, SOCKET_TAG_TVU_QUIC); remove_socket!( remove_serve_repair, @@ -409,25 +419,30 @@ impl ContactInfo { // Only for tests and simulations. pub fn new_localhost(pubkey: &Pubkey, wallclock: u64) -> Self { + use Protocol::{QUIC, UDP}; let mut node = Self::new(*pubkey, wallclock, /*shred_version:*/ 0u16); node.set_gossip((Ipv4Addr::LOCALHOST, 8000)).unwrap(); - node.set_tvu((Ipv4Addr::LOCALHOST, 8001)).unwrap(); - node.set_tvu_quic((Ipv4Addr::LOCALHOST, 8002)).unwrap(); + node.set_tvu(UDP, (Ipv4Addr::LOCALHOST, 8001)).unwrap(); + node.set_tvu(QUIC, (Ipv4Addr::LOCALHOST, 8002)).unwrap(); node.set_tpu((Ipv4Addr::LOCALHOST, 8003)).unwrap(); // quic: 8009 node.set_tpu_forwards((Ipv4Addr::LOCALHOST, 8004)).unwrap(); // quic: 8010 - node.set_tpu_vote((Ipv4Addr::LOCALHOST, 8005)).unwrap(); + node.set_tpu_vote(UDP, (Ipv4Addr::LOCALHOST, 8005)).unwrap(); + node.set_tpu_vote(QUIC, (Ipv4Addr::LOCALHOST, 8007)) + .unwrap(); node.set_rpc((Ipv4Addr::LOCALHOST, DEFAULT_RPC_PORT)) .unwrap(); node.set_rpc_pubsub((Ipv4Addr::LOCALHOST, DEFAULT_RPC_PUBSUB_PORT)) .unwrap(); - node.set_serve_repair((Ipv4Addr::LOCALHOST, 8008)).unwrap(); - node.set_serve_repair_quic((Ipv4Addr::LOCALHOST, 8006)) + node.set_serve_repair(UDP, (Ipv4Addr::LOCALHOST, 8008)) + .unwrap(); + node.set_serve_repair(QUIC, (Ipv4Addr::LOCALHOST, 8006)) .unwrap(); node } // Only for tests and simulations. pub fn new_with_socketaddr(pubkey: &Pubkey, socket: &SocketAddr) -> Self { + use Protocol::{QUIC, UDP}; assert_matches!(sanitize_socket(socket), Ok(())); let mut node = Self::new( *pubkey, @@ -436,16 +451,17 @@ impl ContactInfo { ); let (addr, port) = (socket.ip(), socket.port()); node.set_gossip((addr, port + 1)).unwrap(); - node.set_tvu((addr, port + 2)).unwrap(); - node.set_tvu_quic((addr, port + 3)).unwrap(); + node.set_tvu(UDP, (addr, port + 2)).unwrap(); + node.set_tvu(QUIC, (addr, port + 3)).unwrap(); node.set_tpu((addr, port)).unwrap(); // quic: port + 6 node.set_tpu_forwards((addr, port + 5)).unwrap(); // quic: port + 11 - node.set_tpu_vote((addr, port + 7)).unwrap(); + node.set_tpu_vote(UDP, (addr, port + 7)).unwrap(); + node.set_tpu_vote(QUIC, (addr, port + 9)).unwrap(); node.set_rpc((addr, DEFAULT_RPC_PORT)).unwrap(); node.set_rpc_pubsub((addr, DEFAULT_RPC_PUBSUB_PORT)) .unwrap(); - node.set_serve_repair((addr, port + 8)).unwrap(); - node.set_serve_repair_quic((addr, port + 4)).unwrap(); + node.set_serve_repair(UDP, (addr, port + 8)).unwrap(); + node.set_serve_repair(QUIC, (addr, port + 4)).unwrap(); node } @@ -1093,7 +1109,10 @@ mod tests { { let mut other = node.clone(); while other.set_gossip(new_rand_socket(&mut rng)).is_err() {} - while other.set_serve_repair(new_rand_socket(&mut rng)).is_err() {} + while other + .set_serve_repair(Protocol::UDP, new_rand_socket(&mut rng)) + .is_err() + {} assert!(!node.check_duplicate(&other)); assert!(!other.check_duplicate(&node)); assert_eq!(node.overrides(&other), None); diff --git a/gossip/src/crds_gossip_pull.rs b/gossip/src/crds_gossip_pull.rs index 8d15c5e55ca769..e580550f3e3b68 100644 --- a/gossip/src/crds_gossip_pull.rs +++ b/gossip/src/crds_gossip_pull.rs @@ -1441,7 +1441,7 @@ pub(crate) mod tests { caller: &CrdsValue, num_items: usize, ) { - let packet_data_size_range = (PACKET_DATA_SIZE - 5)..=PACKET_DATA_SIZE; + let packet_data_size_range = (PACKET_DATA_SIZE - 7)..=PACKET_DATA_SIZE; let max_bytes = get_max_bloom_filter_bytes(caller); let filters = CrdsFilterSet::new(rng, num_items, max_bytes); let request_bytes = caller.bincode_serialized_size() as u64; @@ -1467,7 +1467,7 @@ pub(crate) mod tests { let keypair = Keypair::new(); let node = { let mut node = - ContactInfo::new_localhost(&keypair.pubkey(), /*wallclock:*/ rng.gen()); + ContactInfo::new_localhost(&keypair.pubkey(), /*wallclock:*/ timestamp()); node.set_shred_version(rng.gen()); node }; @@ -1491,7 +1491,7 @@ pub(crate) mod tests { }; { let caller = CrdsValue::new(CrdsData::from(&node), &keypair); - assert_eq!(get_max_bloom_filter_bytes(&caller), 1165); + assert_eq!(get_max_bloom_filter_bytes(&caller), 1155); verify_get_max_bloom_filter_bytes(&mut rng, &caller, num_items); } let node = LegacyContactInfo::try_from(&node).unwrap(); diff --git a/turbine/benches/retransmit_stage.rs b/turbine/benches/retransmit_stage.rs index cf40e5f871ac85..dcb6ee8ba1b2ef 100644 --- a/turbine/benches/retransmit_stage.rs +++ b/turbine/benches/retransmit_stage.rs @@ -63,7 +63,9 @@ fn bench_retransmitter(bencher: &mut Bencher) { let socket = bind_to_unspecified().unwrap(); let mut contact_info = ContactInfo::new_localhost(&id, timestamp()); let port = socket.local_addr().unwrap().port(); - contact_info.set_tvu((Ipv4Addr::LOCALHOST, port)).unwrap(); + contact_info + .set_tvu(Protocol::UDP, (Ipv4Addr::LOCALHOST, port)) + .unwrap(); info!("local: {:?}", contact_info.tvu(Protocol::UDP).unwrap()); cluster_info.insert_info(contact_info); socket.set_nonblocking(true).unwrap();