From 327dafdd783ce0103fcc07d51afca0dc97e87489 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Mon, 4 Dec 2023 14:51:04 +0000 Subject: [PATCH 1/8] split ControlAction variants into its own structs which will help for the goal of prioritizing GRAFT and PRUNE messages over IWANT/IHAVE. --- protocols/gossipsub/src/behaviour.rs | 76 +++++++++------- protocols/gossipsub/src/behaviour/tests.rs | 100 +++++++++++---------- protocols/gossipsub/src/protocol.rs | 43 +++++---- protocols/gossipsub/src/types.rs | 88 ++++++++++-------- 4 files changed, 172 insertions(+), 135 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 8f72acb63bf..95781daa370 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -45,10 +45,6 @@ use libp2p_swarm::{ THandlerOutEvent, ToSwarm, }; -use crate::gossip_promises::GossipPromises; -use crate::handler::{Handler, HandlerEvent, HandlerIn}; -use crate::mcache::MessageCache; -use crate::metrics::{Churn, Config as MetricsConfig, Inclusion, Metrics, Penalty}; use crate::peer_score::{PeerScore, PeerScoreParams, PeerScoreThresholds, RejectReason}; use crate::protocol::SIGNING_PREFIX; use crate::subscription_filter::{AllowAllSubscriptionFilter, TopicSubscriptionFilter}; @@ -65,6 +61,16 @@ use crate::{ config::{Config, ValidationMode}, types::RpcOut, }; +use crate::{gossip_promises::GossipPromises, types::Graft}; +use crate::{ + handler::{Handler, HandlerEvent, HandlerIn}, + types::Prune, +}; +use crate::{mcache::MessageCache, types::IWant}; +use crate::{ + metrics::{Churn, Config as MetricsConfig, Inclusion, Metrics, Penalty}, + types::IHave, +}; use crate::{rpc_proto::proto, TopicScoreParams}; use crate::{PublishError, SubscriptionError, ValidationError}; use instant::SystemTime; @@ -1030,9 +1036,9 @@ where Self::control_pool_add( &mut self.control_pool, peer_id, - ControlAction::Graft { + ControlAction::Graft(Graft { topic_hash: topic_hash.clone(), - }, + }), ); // If the peer did not previously exist in any mesh, inform the handler @@ -1061,7 +1067,7 @@ where peer: &PeerId, do_px: bool, on_unsubscribe: bool, - ) -> ControlAction { + ) -> Prune { if let Some((peer_score, ..)) = &mut self.peer_score { peer_score.prune(peer, topic_hash.clone()); } @@ -1072,7 +1078,7 @@ where } Some(PeerKind::Gossipsub) => { // GossipSub v1.0 -- no peer exchange, the peer won't be able to parse it anyway - return ControlAction::Prune { + return Prune { topic_hash: topic_hash.clone(), peers: Vec::new(), backoff: None, @@ -1109,7 +1115,7 @@ where // update backoff self.backoffs.update_backoff(topic_hash, peer, backoff); - ControlAction::Prune { + Prune { topic_hash: topic_hash.clone(), peers, backoff: Some(backoff.as_secs()), @@ -1129,9 +1135,8 @@ where // Send a PRUNE control message tracing::debug!(%peer, "LEAVE: Sending PRUNE to peer"); let on_unsubscribe = true; - let control = - self.make_prune(topic_hash, &peer, self.config.do_px(), on_unsubscribe); - Self::control_pool_add(&mut self.control_pool, peer, control); + let prune = self.make_prune(topic_hash, &peer, self.config.do_px(), on_unsubscribe); + Self::control_pool_add(&mut self.control_pool, peer, ControlAction::Prune(prune)); // If the peer did not previously exist in any mesh, inform the handler peer_removed_from_mesh( @@ -1305,9 +1310,9 @@ where Self::control_pool_add( &mut self.control_pool, *peer_id, - ControlAction::IWant { + ControlAction::IWant(IWant { message_ids: iwant_ids_vec, - }, + }), ); } tracing::trace!(peer=%peer_id, "Completed IHAVE handling for peer"); @@ -1512,11 +1517,11 @@ where .expect("Peerid should exist") .clone(); - for action in to_prune_topics + for prune in to_prune_topics .iter() .map(|t| self.make_prune(t, peer_id, do_px, on_unsubscribe)) { - sender.control(action); + sender.control(ControlAction::Prune(prune)); } // Send the prune messages to the peer tracing::debug!( @@ -2016,11 +2021,8 @@ where .get_mut(propagation_source) .expect("Peerid should exist"); - for action in topics_to_graft - .into_iter() - .map(|topic_hash| ControlAction::Graft { topic_hash }) - { - sender.control(action); + for topic_hash in topics_to_graft.into_iter() { + sender.control(ControlAction::Graft(Graft { topic_hash })); } // Notify the application of the subscriptions @@ -2510,10 +2512,10 @@ where Self::control_pool_add( &mut self.control_pool, peer, - ControlAction::IHave { + ControlAction::IHave(IHave { topic_hash: topic_hash.clone(), message_ids: peer_message_ids, - }, + }), ); } } @@ -2546,8 +2548,10 @@ where &self.connected_peers, ); } - let control_msgs = topics.iter().map(|topic_hash| ControlAction::Graft { - topic_hash: topic_hash.clone(), + let grafts = topics.iter().map(|topic_hash| { + ControlAction::Graft(Graft { + topic_hash: topic_hash.clone(), + }) }); // If there are prunes associated with the same peer add them. @@ -2576,8 +2580,12 @@ where ) }); - for msg in control_msgs.chain(prunes) { - sender.control(msg); + for graft in grafts { + sender.control(graft); + } + + for prune in prunes { + sender.control(ControlAction::Prune(prune)); } } @@ -2597,7 +2605,7 @@ where .expect("Peerid should exist") .clone(); - sender.control(prune); + sender.control(ControlAction::Prune(prune)); // inform the handler peer_removed_from_mesh( @@ -3205,21 +3213,21 @@ where let mut prune_msgs = vec![]; for control_msg in rpc.control_msgs { match control_msg { - ControlAction::IHave { + ControlAction::IHave(IHave { topic_hash, message_ids, - } => { + }) => { ihave_msgs.push((topic_hash, message_ids)); } - ControlAction::IWant { message_ids } => { + ControlAction::IWant(IWant { message_ids }) => { self.handle_iwant(&propagation_source, message_ids) } - ControlAction::Graft { topic_hash } => graft_msgs.push(topic_hash), - ControlAction::Prune { + ControlAction::Graft(Graft { topic_hash }) => graft_msgs.push(topic_hash), + ControlAction::Prune(Prune { topic_hash, peers, backoff, - } => prune_msgs.push((topic_hash, peers, backoff)), + }) => prune_msgs.push((topic_hash, peers, backoff)), } } if !ihave_msgs.is_empty() { diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 8482240fe81..383ffa171b0 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -315,33 +315,39 @@ fn proto_to_message(rpc: &proto::RPC) -> Rpc { let ihave_msgs: Vec = rpc_control .ihave .into_iter() - .map(|ihave| ControlAction::IHave { - topic_hash: TopicHash::from_raw(ihave.topic_id.unwrap_or_default()), - message_ids: ihave - .message_ids - .into_iter() - .map(MessageId::from) - .collect::>(), + .map(|ihave| { + ControlAction::IHave(IHave { + topic_hash: TopicHash::from_raw(ihave.topic_id.unwrap_or_default()), + message_ids: ihave + .message_ids + .into_iter() + .map(MessageId::from) + .collect::>(), + }) }) .collect(); let iwant_msgs: Vec = rpc_control .iwant .into_iter() - .map(|iwant| ControlAction::IWant { - message_ids: iwant - .message_ids - .into_iter() - .map(MessageId::from) - .collect::>(), + .map(|iwant| { + ControlAction::IWant(IWant { + message_ids: iwant + .message_ids + .into_iter() + .map(MessageId::from) + .collect::>(), + }) }) .collect(); let graft_msgs: Vec = rpc_control .graft .into_iter() - .map(|graft| ControlAction::Graft { - topic_hash: TopicHash::from_raw(graft.topic_id.unwrap_or_default()), + .map(|graft| { + ControlAction::Graft(Graft { + topic_hash: TopicHash::from_raw(graft.topic_id.unwrap_or_default()), + }) }) .collect(); @@ -364,11 +370,11 @@ fn proto_to_message(rpc: &proto::RPC) -> Rpc { .collect::>(); let topic_hash = TopicHash::from_raw(prune.topic_id.unwrap_or_default()); - prune_msgs.push(ControlAction::Prune { + prune_msgs.push(ControlAction::Prune(Prune { topic_hash, peers, backoff: prune.backoff, - }); + })); } control_msgs.extend(ihave_msgs); @@ -548,7 +554,7 @@ fn test_join() { (_, controls): (&PeerId, &Vec), ) -> Vec { for c in controls.iter() { - if let ControlAction::Graft { topic_hash: _ } = c { + if let ControlAction::Graft(Graft { topic_hash: _ }) = c { collected_grafts.push(c.clone()) } } @@ -1153,7 +1159,7 @@ fn test_handle_ihave_subscribed_and_msg_not_cached() { // check that we sent an IWANT request for `unknown id` let iwant_exists = match gs.control_pool.get(&peers[7]) { Some(controls) => controls.iter().any(|c| match c { - ControlAction::IWant { message_ids } => message_ids + ControlAction::IWant(IWant { message_ids }) => message_ids .iter() .any(|m| *m == MessageId::new(b"unknown id")), _ => false, @@ -1467,7 +1473,7 @@ fn test_handle_graft_explicit_peer() { assert!( count_control_msgs(&gs, &queues, |peer_id, m| peer_id == peer && match m { - ControlAction::Prune { topic_hash, .. } => + ControlAction::Prune(Prune { topic_hash, .. }) => topic_hash == &topic_hashes[0] || topic_hash == &topic_hashes[1], _ => false, }) @@ -1848,11 +1854,11 @@ fn test_send_px_and_backoff_in_prune() { assert_eq!( count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[0] && match m { - ControlAction::Prune { + ControlAction::Prune(Prune { topic_hash, peers, backoff, - } => + }) => topic_hash == &topics[0] && peers.len() == config.prune_peers() && //all peers are different @@ -1896,11 +1902,11 @@ fn test_prune_backoffed_peer_on_graft() { assert_eq!( count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[0] && match m { - ControlAction::Prune { + ControlAction::Prune(Prune { topic_hash, peers, backoff, - } => + }) => topic_hash == &topics[0] && //no px in this case peers.is_empty() && @@ -2046,7 +2052,7 @@ fn test_unsubscribe_backoff() { assert_eq!( count_control_msgs(&gs, &queues, |_, m| match m { - ControlAction::Prune { backoff, .. } => backoff == &Some(1), + ControlAction::Prune(Prune { backoff, .. }) => backoff == &Some(1), _ => false, }), 1, @@ -2181,10 +2187,10 @@ fn test_gossip_to_at_least_gossip_lazy_peers() { //check that exactly config.gossip_lazy() many gossip messages were sent. assert_eq!( count_control_msgs(&gs, &queues, |_, action| match action { - ControlAction::IHave { + ControlAction::IHave(IHave { topic_hash, message_ids, - } => topic_hash == &topic_hashes[0] && message_ids.iter().any(|id| id == &msg_id), + }) => topic_hash == &topic_hashes[0] && message_ids.iter().any(|id| id == &msg_id), _ => false, }), config.gossip_lazy() @@ -2225,10 +2231,10 @@ fn test_gossip_to_at_most_gossip_factor_peers() { //check that exactly config.gossip_lazy() many gossip messages were sent. assert_eq!( count_control_msgs(&gs, &queues, |_, action| match action { - ControlAction::IHave { + ControlAction::IHave(IHave { topic_hash, message_ids, - } => topic_hash == &topic_hashes[0] && message_ids.iter().any(|id| id == &msg_id), + }) => topic_hash == &topic_hashes[0] && message_ids.iter().any(|id| id == &msg_id), _ => false, }), ((m - config.mesh_n_low()) as f64 * config.gossip_factor()) as usize @@ -2385,11 +2391,11 @@ fn test_prune_negative_scored_peers() { assert_eq!( count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[0] && match m { - ControlAction::Prune { + ControlAction::Prune(Prune { topic_hash, peers, backoff, - } => + }) => topic_hash == &topics[0] && //no px in this case peers.is_empty() && @@ -2519,11 +2525,11 @@ fn test_only_send_nonnegative_scoring_peers_in_px() { assert_eq!( count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[1] && match m { - ControlAction::Prune { + ControlAction::Prune(Prune { topic_hash, peers: px, .. - } => + }) => topic_hash == &topics[0] && px.len() == 1 && px[0].peer_id.as_ref().unwrap() == &peers[2], @@ -2591,10 +2597,10 @@ fn test_do_not_gossip_to_peers_below_gossip_threshold() { // Check that exactly one gossip messages got sent and it got sent to p2 assert_eq!( count_control_msgs(&gs, &queues, |peer, action| match action { - ControlAction::IHave { + ControlAction::IHave(IHave { topic_hash, message_ids, - } => { + }) => { if topic_hash == &topics[0] && message_ids.iter().any(|id| id == &msg_id) { assert_eq!(peer, &p2); true @@ -2755,7 +2761,7 @@ fn test_ihave_msg_from_peer_below_gossip_threshold_gets_ignored() { // check that we sent exactly one IWANT request to p2 assert_eq!( count_control_msgs(&gs, &queues, |peer, c| match c { - ControlAction::IWant { message_ids } => + ControlAction::IWant(IWant { message_ids }) => if message_ids.iter().any(|m| m == &msg_id) { assert_eq!(peer, &p2); true @@ -2964,10 +2970,10 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() { topic_hash: topics[0].clone(), }; - let control_action = ControlAction::IHave { + let control_action = ControlAction::IHave(IHave { topic_hash: topics[0].clone(), message_ids: vec![config.message_id(message2)], - }; + }); //clear events gs.events.clear(); @@ -2993,10 +2999,10 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() { ToSwarm::GenerateEvent(Event::Subscribed { .. }) )); - let control_action = ControlAction::IHave { + let control_action = ControlAction::IHave(IHave { topic_hash: topics[0].clone(), message_ids: vec![config.message_id(message4)], - }; + }); //receive from p2 gs.on_connection_handler_event( @@ -4451,7 +4457,7 @@ fn test_ignore_too_many_ihaves() { //we send iwant only for the first 10 messages assert_eq!( count_control_msgs(&gs, &queues, |p, action| p == &peer - && matches!(action, ControlAction::IWant { message_ids } if message_ids.len() == 1 && first_ten.contains(&message_ids[0]))), + && matches!(action, ControlAction::IWant(IWant { message_ids }) if message_ids.len() == 1 && first_ten.contains(&message_ids[0]))), 10, "exactly the first ten ihaves should be processed and one iwant for each created" ); @@ -4474,7 +4480,7 @@ fn test_ignore_too_many_ihaves() { //we sent iwant for all 20 messages assert_eq!( count_control_msgs(&gs, &queues, |p, action| p == &peer - && matches!(action, ControlAction::IWant { message_ids } if message_ids.len() == 1)), + && matches!(action, ControlAction::IWant(IWant { message_ids }) if message_ids.len() == 1)), 20, "all 20 should get sent" ); @@ -4524,7 +4530,7 @@ fn test_ignore_too_many_messages_in_ihave() { let mut sum = 0; assert_eq!( count_control_msgs(&gs, &queues, |p, action| match action { - ControlAction::IWant { message_ids } => + ControlAction::IWant(IWant { message_ids }) => p == &peer && { assert!(first_twelve.is_superset(&message_ids.iter().collect())); sum += message_ids.len(); @@ -4549,7 +4555,7 @@ fn test_ignore_too_many_messages_in_ihave() { let mut sum = 0; assert_eq!( count_control_msgs(&gs, &queues, |p, action| match action { - ControlAction::IWant { message_ids } => + ControlAction::IWant(IWant { message_ids }) => p == &peer && { sum += message_ids.len(); true @@ -4603,7 +4609,7 @@ fn test_limit_number_of_message_ids_inside_ihave() { assert_eq!( count_control_msgs(&gs, &queues, |p, action| match action { - ControlAction::IHave { message_ids, .. } => { + ControlAction::IHave(IHave { message_ids, .. }) => { if p == &p1 { ihaves1 = message_ids.iter().cloned().collect(); true @@ -4938,7 +4944,7 @@ fn test_dont_send_px_to_old_gossipsub_peers() { //check that prune does not contain px assert_eq!( count_control_msgs(&gs, &queues, |_, m| match m { - ControlAction::Prune { peers: px, .. } => !px.is_empty(), + ControlAction::Prune(Prune { peers: px, .. }) => !px.is_empty(), _ => false, }), 0, @@ -4976,7 +4982,7 @@ fn test_dont_send_floodsub_peers_in_px() { //check that px in prune message is empty assert_eq!( count_control_msgs(&gs, &queues, |_, m| match m { - ControlAction::Prune { peers: px, .. } => !px.is_empty(), + ControlAction::Prune(Prune { peers: px, .. }) => !px.is_empty(), _ => false, }), 0, diff --git a/protocols/gossipsub/src/protocol.rs b/protocols/gossipsub/src/protocol.rs index e9600a4d8d8..7f200ac81b4 100644 --- a/protocols/gossipsub/src/protocol.rs +++ b/protocols/gossipsub/src/protocol.rs @@ -23,7 +23,8 @@ use crate::handler::HandlerEvent; use crate::rpc_proto::proto; use crate::topic::TopicHash; use crate::types::{ - ControlAction, MessageId, PeerInfo, PeerKind, RawMessage, Rpc, Subscription, SubscriptionAction, + ControlAction, Graft, IHave, IWant, MessageId, PeerInfo, PeerKind, Prune, RawMessage, Rpc, + Subscription, SubscriptionAction, }; use crate::ValidationError; use asynchronous_codec::{Decoder, Encoder, Framed}; @@ -413,33 +414,39 @@ impl Decoder for GossipsubCodec { let ihave_msgs: Vec = rpc_control .ihave .into_iter() - .map(|ihave| ControlAction::IHave { - topic_hash: TopicHash::from_raw(ihave.topic_id.unwrap_or_default()), - message_ids: ihave - .message_ids - .into_iter() - .map(MessageId::from) - .collect::>(), + .map(|ihave| { + ControlAction::IHave(IHave { + topic_hash: TopicHash::from_raw(ihave.topic_id.unwrap_or_default()), + message_ids: ihave + .message_ids + .into_iter() + .map(MessageId::from) + .collect::>(), + }) }) .collect(); let iwant_msgs: Vec = rpc_control .iwant .into_iter() - .map(|iwant| ControlAction::IWant { - message_ids: iwant - .message_ids - .into_iter() - .map(MessageId::from) - .collect::>(), + .map(|iwant| { + ControlAction::IWant(IWant { + message_ids: iwant + .message_ids + .into_iter() + .map(MessageId::from) + .collect::>(), + }) }) .collect(); let graft_msgs: Vec = rpc_control .graft .into_iter() - .map(|graft| ControlAction::Graft { - topic_hash: TopicHash::from_raw(graft.topic_id.unwrap_or_default()), + .map(|graft| { + ControlAction::Graft(Graft { + topic_hash: TopicHash::from_raw(graft.topic_id.unwrap_or_default()), + }) }) .collect(); @@ -463,11 +470,11 @@ impl Decoder for GossipsubCodec { .collect::>(); let topic_hash = TopicHash::from_raw(prune.topic_id.unwrap_or_default()); - prune_msgs.push(ControlAction::Prune { + prune_msgs.push(ControlAction::Prune(Prune { topic_hash, peers, backoff: prune.backoff, - }); + })); } control_msgs.extend(ihave_msgs); diff --git a/protocols/gossipsub/src/types.rs b/protocols/gossipsub/src/types.rs index f6438687960..d266daae8a5 100644 --- a/protocols/gossipsub/src/types.rs +++ b/protocols/gossipsub/src/types.rs @@ -203,8 +203,8 @@ pub enum SubscriptionAction { } #[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct PeerInfo { - pub peer_id: Option, +pub(crate) struct PeerInfo { + pub(crate) peer_id: Option, //TODO add this when RFC: Signed Address Records got added to the spec (see pull request // https://github.com/libp2p/specs/pull/217) //pub signed_peer_record: ?, @@ -214,31 +214,47 @@ pub struct PeerInfo { #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum ControlAction { /// Node broadcasts known messages per topic - IHave control message. - IHave { - /// The topic of the messages. - topic_hash: TopicHash, - /// A list of known message ids (peer_id + sequence _number) as a string. - message_ids: Vec, - }, + IHave(IHave), /// The node requests specific message ids (peer_id + sequence _number) - IWant control message. - IWant { - /// A list of known message ids (peer_id + sequence _number) as a string. - message_ids: Vec, - }, + IWant(IWant), /// The node has been added to the mesh - Graft control message. - Graft { - /// The mesh topic the peer should be added to. - topic_hash: TopicHash, - }, + Graft(Graft), /// The node has been removed from the mesh - Prune control message. - Prune { - /// The mesh topic the peer should be removed from. - topic_hash: TopicHash, - /// A list of peers to be proposed to the removed peer as peer exchange - peers: Vec, - /// The backoff time in seconds before we allow to reconnect - backoff: Option, - }, + Prune(Prune), +} + +/// Node broadcasts known messages per topic - IHave control message. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct IHave { + /// The topic of the messages. + pub(crate) topic_hash: TopicHash, + /// A list of known message ids (peer_id + sequence _number) as a string. + pub(crate) message_ids: Vec, +} + +/// The node requests specific message ids (peer_id + sequence _number) - IWant control message. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct IWant { + /// A list of known message ids (peer_id + sequence _number) as a string. + pub(crate) message_ids: Vec, +} + +/// The node has been added to the mesh - Graft control message. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct Graft { + /// The mesh topic the peer should be added to. + pub(crate) topic_hash: TopicHash, +} + +/// The node has been removed from the mesh - Prune control message. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct Prune { + /// The mesh topic the peer should be removed from. + pub(crate) topic_hash: TopicHash, + /// A list of peers to be proposed to the removed peer as peer exchange + pub(crate) peers: Vec, + /// The backoff time in seconds before we allow to reconnect + pub(crate) backoff: Option, } /// A Gossipsub RPC message sent. @@ -302,10 +318,10 @@ impl From for proto::RPC { }], control: None, }, - RpcOut::Control(ControlAction::IHave { + RpcOut::Control(ControlAction::IHave(IHave { topic_hash, message_ids, - }) => proto::RPC { + })) => proto::RPC { publish: Vec::new(), subscriptions: Vec::new(), control: Some(proto::ControlMessage { @@ -318,7 +334,7 @@ impl From for proto::RPC { prune: vec![], }), }, - RpcOut::Control(ControlAction::IWant { message_ids }) => proto::RPC { + RpcOut::Control(ControlAction::IWant(IWant { message_ids })) => proto::RPC { publish: Vec::new(), subscriptions: Vec::new(), control: Some(proto::ControlMessage { @@ -330,7 +346,7 @@ impl From for proto::RPC { prune: vec![], }), }, - RpcOut::Control(ControlAction::Graft { topic_hash }) => proto::RPC { + RpcOut::Control(ControlAction::Graft(Graft { topic_hash })) => proto::RPC { publish: Vec::new(), subscriptions: vec![], control: Some(proto::ControlMessage { @@ -342,11 +358,11 @@ impl From for proto::RPC { prune: vec![], }), }, - RpcOut::Control(ControlAction::Prune { + RpcOut::Control(ControlAction::Prune(Prune { topic_hash, peers, backoff, - }) => { + })) => { proto::RPC { publish: Vec::new(), subscriptions: vec![], @@ -434,33 +450,33 @@ impl From for proto::RPC { for action in rpc.control_msgs { match action { // collect all ihave messages - ControlAction::IHave { + ControlAction::IHave(IHave { topic_hash, message_ids, - } => { + }) => { let rpc_ihave = proto::ControlIHave { topic_id: Some(topic_hash.into_string()), message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(), }; control.ihave.push(rpc_ihave); } - ControlAction::IWant { message_ids } => { + ControlAction::IWant(IWant { message_ids }) => { let rpc_iwant = proto::ControlIWant { message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(), }; control.iwant.push(rpc_iwant); } - ControlAction::Graft { topic_hash } => { + ControlAction::Graft(Graft { topic_hash }) => { let rpc_graft = proto::ControlGraft { topic_id: Some(topic_hash.into_string()), }; control.graft.push(rpc_graft); } - ControlAction::Prune { + ControlAction::Prune(Prune { topic_hash, peers, backoff, - } => { + }) => { let rpc_prune = proto::ControlPrune { topic_id: Some(topic_hash.into_string()), peers: peers From 38316adf31bd1c6053f72928e841d24ce4c29064 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Tue, 5 Dec 2023 14:40:39 +0000 Subject: [PATCH 2/8] split control messages for different priorities --- protocols/gossipsub/src/behaviour.rs | 41 ++++---- protocols/gossipsub/src/behaviour/tests.rs | 113 +++++++++------------ protocols/gossipsub/src/types.rs | 62 +++++++++-- 3 files changed, 119 insertions(+), 97 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 95781daa370..88fdcd21a13 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -254,7 +254,7 @@ pub struct Behaviour { events: VecDeque>, /// Pools non-urgent control messages between heartbeats. - control_pool: HashMap>, + control_pool: HashMap>, /// Information used for publishing messages. publish_config: PublishConfig, @@ -1036,7 +1036,7 @@ where Self::control_pool_add( &mut self.control_pool, peer_id, - ControlAction::Graft(Graft { + RpcOut::Graft(Graft { topic_hash: topic_hash.clone(), }), ); @@ -1136,7 +1136,7 @@ where tracing::debug!(%peer, "LEAVE: Sending PRUNE to peer"); let on_unsubscribe = true; let prune = self.make_prune(topic_hash, &peer, self.config.do_px(), on_unsubscribe); - Self::control_pool_add(&mut self.control_pool, peer, ControlAction::Prune(prune)); + Self::control_pool_add(&mut self.control_pool, peer, RpcOut::Prune(prune)); // If the peer did not previously exist in any mesh, inform the handler peer_removed_from_mesh( @@ -1310,7 +1310,7 @@ where Self::control_pool_add( &mut self.control_pool, *peer_id, - ControlAction::IWant(IWant { + RpcOut::IWant(IWant { message_ids: iwant_ids_vec, }), ); @@ -1521,7 +1521,7 @@ where .iter() .map(|t| self.make_prune(t, peer_id, do_px, on_unsubscribe)) { - sender.control(ControlAction::Prune(prune)); + sender.prune(prune); } // Send the prune messages to the peer tracing::debug!( @@ -2022,7 +2022,7 @@ where .expect("Peerid should exist"); for topic_hash in topics_to_graft.into_iter() { - sender.control(ControlAction::Graft(Graft { topic_hash })); + sender.graft(Graft { topic_hash }); } // Notify the application of the subscriptions @@ -2512,7 +2512,7 @@ where Self::control_pool_add( &mut self.control_pool, peer, - ControlAction::IHave(IHave { + RpcOut::IHave(IHave { topic_hash: topic_hash.clone(), message_ids: peer_message_ids, }), @@ -2548,11 +2548,6 @@ where &self.connected_peers, ); } - let grafts = topics.iter().map(|topic_hash| { - ControlAction::Graft(Graft { - topic_hash: topic_hash.clone(), - }) - }); // If there are prunes associated with the same peer add them. // NOTE: In this case a peer has been added to a topic mesh, and removed from another. @@ -2580,12 +2575,14 @@ where ) }); - for graft in grafts { - sender.control(graft); + for topic_hash in topics { + sender.graft(Graft { + topic_hash: topic_hash.clone(), + }); } for prune in prunes { - sender.control(ControlAction::Prune(prune)); + sender.prune(prune); } } @@ -2605,7 +2602,7 @@ where .expect("Peerid should exist") .clone(); - sender.control(ControlAction::Prune(prune)); + sender.prune(prune); // inform the handler peer_removed_from_mesh( @@ -2786,9 +2783,9 @@ where // adds a control action to control_pool fn control_pool_add( - control_pool: &mut HashMap>, + control_pool: &mut HashMap>, peer: PeerId, - control: ControlAction, + control: RpcOut, ) { control_pool.entry(peer).or_default().push(control); } @@ -2802,7 +2799,13 @@ where .get_mut(&peer) .expect("Peerid should exist"); - sender.control(msg); + match msg { + RpcOut::IHave(ihave) => sender.ihave(ihave), + RpcOut::IWant(iwant) => sender.iwant(iwant), + RpcOut::Graft(graft) => sender.graft(graft), + RpcOut::Prune(prune) => sender.prune(prune), + _ => unreachable!(), + } } } diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 383ffa171b0..7ae9deb5b92 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -549,13 +549,13 @@ fn test_join() { "Should have added 6 nodes to the mesh" ); - fn collect_grafts( - mut collected_grafts: Vec, - (_, controls): (&PeerId, &Vec), - ) -> Vec { + fn collect_grafts<'a>( + mut collected_grafts: Vec<&'a RpcOut>, + (_, controls): (&'a PeerId, &'a Vec), + ) -> Vec<&'a RpcOut> { for c in controls.iter() { - if let ControlAction::Graft(Graft { topic_hash: _ }) = c { - collected_grafts.push(c.clone()) + if let RpcOut::Graft(Graft { topic_hash: _ }) = c { + collected_grafts.push(c) } } collected_grafts @@ -1159,7 +1159,7 @@ fn test_handle_ihave_subscribed_and_msg_not_cached() { // check that we sent an IWANT request for `unknown id` let iwant_exists = match gs.control_pool.get(&peers[7]) { Some(controls) => controls.iter().any(|c| match c { - ControlAction::IWant(IWant { message_ids }) => message_ids + RpcOut::IWant(IWant { message_ids }) => message_ids .iter() .any(|m| *m == MessageId::new(b"unknown id")), _ => false, @@ -1328,7 +1328,7 @@ fn test_handle_prune_peer_in_mesh() { fn count_control_msgs( gs: &Behaviour, queues: &HashMap, - mut filter: impl FnMut(&PeerId, &ControlAction) -> bool, + mut filter: impl FnMut(&PeerId, &RpcOut) -> bool, ) -> usize { gs.control_pool .iter() @@ -1338,13 +1338,13 @@ fn count_control_msgs( .iter() .fold(0, |mut collected_messages, (peer_id, c)| { while !c.priority.is_empty() || !c.non_priority.is_empty() { - if let Ok(RpcOut::Control(action)) = c.priority.try_recv() { - if filter(peer_id, &action) { + if let Ok(rpc) = c.priority.try_recv() { + if filter(peer_id, &rpc) { collected_messages += 1; } } - if let Ok(RpcOut::Control(action)) = c.non_priority.try_recv() { - if filter(peer_id, &action) { + if let Ok(rpc) = c.non_priority.try_recv() { + if filter(peer_id, &rpc) { collected_messages += 1; } } @@ -1355,11 +1355,11 @@ fn count_control_msgs( fn flush_events( gs: &mut Behaviour, - receiver_queues: &mut HashMap, + receiver_queues: &HashMap, ) { gs.control_pool.clear(); gs.events.clear(); - for c in receiver_queues.values_mut() { + for c in receiver_queues.values() { while !c.priority.is_empty() || !c.non_priority.is_empty() { let _ = c.priority.try_recv(); let _ = c.non_priority.try_recv(); @@ -1473,7 +1473,7 @@ fn test_handle_graft_explicit_peer() { assert!( count_control_msgs(&gs, &queues, |peer_id, m| peer_id == peer && match m { - ControlAction::Prune(Prune { topic_hash, .. }) => + RpcOut::Prune(Prune { topic_hash, .. }) => topic_hash == &topic_hashes[0] || topic_hash == &topic_hashes[1], _ => false, }) @@ -1501,7 +1501,7 @@ fn explicit_peers_not_added_to_mesh_on_receiving_subscription() { //assert that graft gets created to non-explicit peer assert!( count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[1] - && matches!(m, ControlAction::Graft { .. })) + && matches!(m, RpcOut::Graft { .. })) >= 1, "No graft message got created to non-explicit peer" ); @@ -1509,7 +1509,7 @@ fn explicit_peers_not_added_to_mesh_on_receiving_subscription() { //assert that no graft gets created to explicit peer assert_eq!( count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[0] - && matches!(m, ControlAction::Graft { .. })), + && matches!(m, RpcOut::Graft { .. })), 0, "A graft message got created to an explicit peer" ); @@ -1533,7 +1533,7 @@ fn do_not_graft_explicit_peer() { //assert that no graft gets created to explicit peer assert_eq!( count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &others[0] - && matches!(m, ControlAction::Graft { .. })), + && matches!(m, RpcOut::Graft { .. })), 0, "A graft message got created to an explicit peer" ); @@ -1607,7 +1607,7 @@ fn explicit_peers_not_added_to_mesh_on_subscribe() { //assert that graft gets created to non-explicit peer assert!( count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[1] - && matches!(m, ControlAction::Graft { .. })) + && matches!(m, RpcOut::Graft { .. })) > 0, "No graft message got created to non-explicit peer" ); @@ -1615,7 +1615,7 @@ fn explicit_peers_not_added_to_mesh_on_subscribe() { //assert that no graft gets created to explicit peer assert_eq!( count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[0] - && matches!(m, ControlAction::Graft { .. })), + && matches!(m, RpcOut::Graft { .. })), 0, "A graft message got created to an explicit peer" ); @@ -1656,7 +1656,7 @@ fn explicit_peers_not_added_to_mesh_from_fanout_on_subscribe() { //assert that graft gets created to non-explicit peer assert!( count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[1] - && matches!(m, ControlAction::Graft { .. })) + && matches!(m, RpcOut::Graft { .. })) >= 1, "No graft message got created to non-explicit peer" ); @@ -1664,7 +1664,7 @@ fn explicit_peers_not_added_to_mesh_from_fanout_on_subscribe() { //assert that no graft gets created to explicit peer assert_eq!( count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[0] - && matches!(m, ControlAction::Graft { .. })), + && matches!(m, RpcOut::Graft { .. })), 0, "A graft message got created to an explicit peer" ); @@ -1706,7 +1706,7 @@ fn no_gossip_gets_sent_to_explicit_peers() { .get(&peers[0]) .unwrap_or(&Vec::new()) .iter() - .filter(|m| matches!(m, ControlAction::IHave { .. })) + .filter(|m| matches!(m, RpcOut::IHave { .. })) .count(), 0, "Gossip got emitted to explicit peer" @@ -1854,7 +1854,7 @@ fn test_send_px_and_backoff_in_prune() { assert_eq!( count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[0] && match m { - ControlAction::Prune(Prune { + RpcOut::Prune(Prune { topic_hash, peers, backoff, @@ -1902,7 +1902,7 @@ fn test_prune_backoffed_peer_on_graft() { assert_eq!( count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[0] && match m { - ControlAction::Prune(Prune { + RpcOut::Prune(Prune { topic_hash, peers, backoff, @@ -1950,10 +1950,7 @@ fn test_do_not_graft_within_backoff_period() { //Check that no graft got created (we have backoff_slack = 1 therefore one more heartbeat // is needed). assert_eq!( - count_control_msgs(&gs, &queues, |_, m| matches!( - m, - ControlAction::Graft { .. } - )), + count_control_msgs(&gs, &queues, |_, m| matches!(m, RpcOut::Graft { .. })), 0, "Graft message created too early within backoff period" ); @@ -1964,10 +1961,7 @@ fn test_do_not_graft_within_backoff_period() { //check that graft got created assert!( - count_control_msgs(&gs, &queues, |_, m| matches!( - m, - ControlAction::Graft { .. } - )) > 0, + count_control_msgs(&gs, &queues, |_, m| matches!(m, RpcOut::Graft { .. })) > 0, "No graft message was created after backoff period" ); } @@ -2005,10 +1999,7 @@ fn test_do_not_graft_within_default_backoff_period_after_receiving_prune_without //Check that no graft got created (we have backoff_slack = 1 therefore one more heartbeat // is needed). assert_eq!( - count_control_msgs(&gs, &queues, |_, m| matches!( - m, - ControlAction::Graft { .. } - )), + count_control_msgs(&gs, &queues, |_, m| matches!(m, RpcOut::Graft { .. })), 0, "Graft message created too early within backoff period" ); @@ -2019,10 +2010,7 @@ fn test_do_not_graft_within_default_backoff_period_after_receiving_prune_without //check that graft got created assert!( - count_control_msgs(&gs, &queues, |_, m| matches!( - m, - ControlAction::Graft { .. } - )) > 0, + count_control_msgs(&gs, &queues, |_, m| matches!(m, RpcOut::Graft { .. })) > 0, "No graft message was created after backoff period" ); } @@ -2052,7 +2040,7 @@ fn test_unsubscribe_backoff() { assert_eq!( count_control_msgs(&gs, &queues, |_, m| match m { - ControlAction::Prune(Prune { backoff, .. }) => backoff == &Some(1), + RpcOut::Prune(Prune { backoff, .. }) => backoff == &Some(1), _ => false, }), 1, @@ -2076,10 +2064,7 @@ fn test_unsubscribe_backoff() { // Check that no graft got created (we have backoff_slack = 1 therefore one more heartbeat // is needed). assert_eq!( - count_control_msgs(&gs, &queues, |_, m| matches!( - m, - ControlAction::Graft { .. } - )), + count_control_msgs(&gs, &queues, |_, m| matches!(m, RpcOut::Graft { .. })), 0, "Graft message created too early within backoff period" ); @@ -2090,10 +2075,7 @@ fn test_unsubscribe_backoff() { // check that graft got created assert!( - count_control_msgs(&gs, &queues, |_, m| matches!( - m, - ControlAction::Graft { .. } - )) > 0, + count_control_msgs(&gs, &queues, |_, m| matches!(m, RpcOut::Graft { .. })) > 0, "No graft message was created after backoff period" ); } @@ -2187,7 +2169,7 @@ fn test_gossip_to_at_least_gossip_lazy_peers() { //check that exactly config.gossip_lazy() many gossip messages were sent. assert_eq!( count_control_msgs(&gs, &queues, |_, action| match action { - ControlAction::IHave(IHave { + RpcOut::IHave(IHave { topic_hash, message_ids, }) => topic_hash == &topic_hashes[0] && message_ids.iter().any(|id| id == &msg_id), @@ -2231,7 +2213,7 @@ fn test_gossip_to_at_most_gossip_factor_peers() { //check that exactly config.gossip_lazy() many gossip messages were sent. assert_eq!( count_control_msgs(&gs, &queues, |_, action| match action { - ControlAction::IHave(IHave { + RpcOut::IHave(IHave { topic_hash, message_ids, }) => topic_hash == &topic_hashes[0] && message_ids.iter().any(|id| id == &msg_id), @@ -2391,7 +2373,7 @@ fn test_prune_negative_scored_peers() { assert_eq!( count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[0] && match m { - ControlAction::Prune(Prune { + RpcOut::Prune(Prune { topic_hash, peers, backoff, @@ -2525,7 +2507,7 @@ fn test_only_send_nonnegative_scoring_peers_in_px() { assert_eq!( count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[1] && match m { - ControlAction::Prune(Prune { + RpcOut::Prune(Prune { topic_hash, peers: px, .. @@ -2597,7 +2579,7 @@ fn test_do_not_gossip_to_peers_below_gossip_threshold() { // Check that exactly one gossip messages got sent and it got sent to p2 assert_eq!( count_control_msgs(&gs, &queues, |peer, action| match action { - ControlAction::IHave(IHave { + RpcOut::IHave(IHave { topic_hash, message_ids, }) => { @@ -2761,7 +2743,7 @@ fn test_ihave_msg_from_peer_below_gossip_threshold_gets_ignored() { // check that we sent exactly one IWANT request to p2 assert_eq!( count_control_msgs(&gs, &queues, |peer, c| match c { - ControlAction::IWant(IWant { message_ids }) => + RpcOut::IWant(IWant { message_ids }) => if message_ids.iter().any(|m| m == &msg_id) { assert_eq!(peer, &p2); true @@ -4354,10 +4336,7 @@ fn test_ignore_graft_from_unknown_topic() { //assert that no prune got created assert_eq!( - count_control_msgs(&gs, &queues, |_, a| matches!( - a, - ControlAction::Prune { .. } - )), + count_control_msgs(&gs, &queues, |_, a| matches!(a, RpcOut::Prune { .. })), 0, "we should not prune after graft in unknown topic" ); @@ -4457,7 +4436,7 @@ fn test_ignore_too_many_ihaves() { //we send iwant only for the first 10 messages assert_eq!( count_control_msgs(&gs, &queues, |p, action| p == &peer - && matches!(action, ControlAction::IWant(IWant { message_ids }) if message_ids.len() == 1 && first_ten.contains(&message_ids[0]))), + && matches!(action, RpcOut::IWant(IWant { message_ids }) if message_ids.len() == 1 && first_ten.contains(&message_ids[0]))), 10, "exactly the first ten ihaves should be processed and one iwant for each created" ); @@ -4480,7 +4459,7 @@ fn test_ignore_too_many_ihaves() { //we sent iwant for all 20 messages assert_eq!( count_control_msgs(&gs, &queues, |p, action| p == &peer - && matches!(action, ControlAction::IWant(IWant { message_ids }) if message_ids.len() == 1)), + && matches!(action, RpcOut::IWant(IWant { message_ids }) if message_ids.len() == 1)), 20, "all 20 should get sent" ); @@ -4530,7 +4509,7 @@ fn test_ignore_too_many_messages_in_ihave() { let mut sum = 0; assert_eq!( count_control_msgs(&gs, &queues, |p, action| match action { - ControlAction::IWant(IWant { message_ids }) => + RpcOut::IWant(IWant { message_ids }) => p == &peer && { assert!(first_twelve.is_superset(&message_ids.iter().collect())); sum += message_ids.len(); @@ -4555,7 +4534,7 @@ fn test_ignore_too_many_messages_in_ihave() { let mut sum = 0; assert_eq!( count_control_msgs(&gs, &queues, |p, action| match action { - ControlAction::IWant(IWant { message_ids }) => + RpcOut::IWant(IWant { message_ids }) => p == &peer && { sum += message_ids.len(); true @@ -4609,7 +4588,7 @@ fn test_limit_number_of_message_ids_inside_ihave() { assert_eq!( count_control_msgs(&gs, &queues, |p, action| match action { - ControlAction::IHave(IHave { message_ids, .. }) => { + RpcOut::IHave(IHave { message_ids, .. }) => { if p == &p1 { ihaves1 = message_ids.iter().cloned().collect(); true @@ -4944,7 +4923,7 @@ fn test_dont_send_px_to_old_gossipsub_peers() { //check that prune does not contain px assert_eq!( count_control_msgs(&gs, &queues, |_, m| match m { - ControlAction::Prune(Prune { peers: px, .. }) => !px.is_empty(), + RpcOut::Prune(Prune { peers: px, .. }) => !px.is_empty(), _ => false, }), 0, @@ -4982,7 +4961,7 @@ fn test_dont_send_floodsub_peers_in_px() { //check that px in prune message is empty assert_eq!( count_control_msgs(&gs, &queues, |_, m| match m { - ControlAction::Prune(Prune { peers: px, .. }) => !px.is_empty(), + RpcOut::Prune(Prune { peers: px, .. }) => !px.is_empty(), _ => false, }), 0, diff --git a/protocols/gossipsub/src/types.rs b/protocols/gossipsub/src/types.rs index d266daae8a5..edf619cab5e 100644 --- a/protocols/gossipsub/src/types.rs +++ b/protocols/gossipsub/src/types.rs @@ -270,8 +270,14 @@ pub enum RpcOut { Subscribe(TopicHash), /// Unsubscribe a topic. Unsubscribe(TopicHash), - /// List of Gossipsub control messages. - Control(ControlAction), + /// Send a GRAFT control message. + Graft(Graft), + /// Send a PRUNE control message. + Prune(Prune), + /// Send a IHave control message. + IHave(IHave), + /// Send a IWant control message. + IWant(IWant), } impl RpcOut { @@ -318,10 +324,10 @@ impl From for proto::RPC { }], control: None, }, - RpcOut::Control(ControlAction::IHave(IHave { + RpcOut::IHave(IHave { topic_hash, message_ids, - })) => proto::RPC { + }) => proto::RPC { publish: Vec::new(), subscriptions: Vec::new(), control: Some(proto::ControlMessage { @@ -334,7 +340,7 @@ impl From for proto::RPC { prune: vec![], }), }, - RpcOut::Control(ControlAction::IWant(IWant { message_ids })) => proto::RPC { + RpcOut::IWant(IWant { message_ids }) => proto::RPC { publish: Vec::new(), subscriptions: Vec::new(), control: Some(proto::ControlMessage { @@ -346,7 +352,7 @@ impl From for proto::RPC { prune: vec![], }), }, - RpcOut::Control(ControlAction::Graft(Graft { topic_hash })) => proto::RPC { + RpcOut::Graft(Graft { topic_hash }) => proto::RPC { publish: Vec::new(), subscriptions: vec![], control: Some(proto::ControlMessage { @@ -358,11 +364,11 @@ impl From for proto::RPC { prune: vec![], }), }, - RpcOut::Control(ControlAction::Prune(Prune { + RpcOut::Prune(Prune { topic_hash, peers, backoff, - })) => { + }) => { proto::RPC { publish: Vec::new(), subscriptions: vec![], @@ -582,14 +588,48 @@ impl RpcSender { self.receiver.clone() } - /// Send a `RpcOut::Control` message to the `RpcReceiver` + /// Send a `RpcOut::Graft` message to the `RpcReceiver` /// this is high priority. - pub(crate) fn control(&mut self, control: ControlAction) { + pub(crate) fn graft(&mut self, graft: Graft) { self.priority - .try_send(RpcOut::Control(control)) + .try_send(RpcOut::Graft(graft)) .expect("Channel is unbounded and should always be open"); } + /// Send a `RpcOut::Prune` message to the `RpcReceiver` + /// this is high priority. + pub(crate) fn prune(&mut self, prune: Prune) { + self.priority + .try_send(RpcOut::Prune(prune)) + .expect("Channel is unbounded and should always be open"); + } + + /// Send a `RpcOut::IHave` message to the `RpcReceiver` + /// this is low priority and if queue is full the message is dropped. + pub(crate) fn ihave(&mut self, ihave: IHave) { + if let Err(err) = self.non_priority.try_send(RpcOut::IHave(ihave)) { + let rpc = err.into_inner(); + tracing::trace!( + "IHAVE message {:?} to peer {} dropped, queue is full", + rpc, + self.peer_id + ); + } + } + + /// Send a `RpcOut::IHave` message to the `RpcReceiver` + /// this is low priority and if queue is full the message is dropped. + pub(crate) fn iwant(&mut self, iwant: IWant) { + if let Err(err) = self.non_priority.try_send(RpcOut::IWant(iwant)) { + let rpc = err.into_inner(); + tracing::trace!( + "IWANT message {:?} to peer {} dropped, queue is full", + rpc, + self.peer_id + ); + } + } + /// Send a `RpcOut::Subscribe` message to the `RpcReceiver` /// this is high priority. pub(crate) fn subscribe(&mut self, topic: TopicHash) { From f50c579d7b017e0d9dc41a0d3cff178e726898f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Tue, 5 Dec 2023 16:19:45 +0000 Subject: [PATCH 3/8] send GRAFT messages immediately when subscribing a topic, instead of doing it in the next heartbeat via control pool. --- protocols/gossipsub/src/behaviour.rs | 15 +++++----- protocols/gossipsub/src/behaviour/tests.rs | 33 ++++++++++++---------- 2 files changed, 26 insertions(+), 22 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 88fdcd21a13..5835c30ec52 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -1033,13 +1033,14 @@ where if let Some((peer_score, ..)) = &mut self.peer_score { peer_score.graft(&peer_id, topic_hash.clone()); } - Self::control_pool_add( - &mut self.control_pool, - peer_id, - RpcOut::Graft(Graft { - topic_hash: topic_hash.clone(), - }), - ); + let sender = self + .handler_send_queues + .get_mut(&peer_id) + .expect("Peerid should exist"); + + sender.graft(Graft { + topic_hash: topic_hash.clone(), + }); // If the peer did not previously exist in any mesh, inform the handler peer_added_to_mesh( diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 7ae9deb5b92..2a28a24be10 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -521,12 +521,15 @@ fn test_join() { .map(|t| Topic::new(t.clone())) .collect::>(); - let (mut gs, _, _receivers, topic_hashes) = inject_nodes1() + let (mut gs, _, mut receivers, topic_hashes) = inject_nodes1() .peer_no(20) .topics(topic_strings) .to_subscribe(true) .create_network(); + // Flush previous GRAFT messages. + flush_events(&mut gs, &receivers); + // unsubscribe, then call join to invoke functionality assert!( gs.unsubscribe(&topics[0]).unwrap(), @@ -549,24 +552,20 @@ fn test_join() { "Should have added 6 nodes to the mesh" ); - fn collect_grafts<'a>( - mut collected_grafts: Vec<&'a RpcOut>, - (_, controls): (&'a PeerId, &'a Vec), - ) -> Vec<&'a RpcOut> { - for c in controls.iter() { - if let RpcOut::Graft(Graft { topic_hash: _ }) = c { - collected_grafts.push(c) + fn count_grafts(mut acc: usize, receiver: &RpcReceiver) -> usize { + while !receiver.priority.is_empty() || !receiver.non_priority.is_empty() { + if let Ok(RpcOut::Graft(_)) = receiver.priority.try_recv() { + acc += 1; } } - collected_grafts + acc } // there should be mesh_n GRAFT messages. - let graft_messages = gs.control_pool.iter().fold(vec![], collect_grafts); + let graft_messages = receivers.values().fold(0, count_grafts); assert_eq!( - graft_messages.len(), - 6, + graft_messages, 6, "There should be 6 grafts messages sent to peers" ); @@ -590,6 +589,10 @@ fn test_join() { ) .unwrap(); peers.push(peer); + let sender = RpcSender::new(random_peer, gs.config.connection_handler_queue_len()); + let receiver = sender.new_receiver(); + gs.handler_send_queues.insert(random_peer, sender); + receivers.insert(random_peer, receiver); gs.on_swarm_event(FromSwarm::ConnectionEstablished(ConnectionEstablished { peer_id: random_peer, @@ -625,10 +628,10 @@ fn test_join() { } // there should now be 12 graft messages to be sent - let graft_messages = gs.control_pool.iter().fold(vec![], collect_grafts); + let graft_messages = receivers.values().fold(graft_messages, count_grafts); - assert!( - graft_messages.len() == 12, + assert_eq!( + graft_messages, 12, "There should be 12 grafts messages sent to peers" ); } From cecf69eeb89ed903a7ce61dbcbd5986d54ba0291 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Tue, 5 Dec 2023 16:22:44 +0000 Subject: [PATCH 4/8] send PRUNE messages immediately when leaving a topic, instead of doing it in the next heartbeat via control pool. --- protocols/gossipsub/src/behaviour.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 5835c30ec52..5068f395f33 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -1137,7 +1137,12 @@ where tracing::debug!(%peer, "LEAVE: Sending PRUNE to peer"); let on_unsubscribe = true; let prune = self.make_prune(topic_hash, &peer, self.config.do_px(), on_unsubscribe); - Self::control_pool_add(&mut self.control_pool, peer, RpcOut::Prune(prune)); + let sender = self + .handler_send_queues + .get_mut(&peer) + .expect("Peerid should exist"); + + sender.prune(prune); // If the peer did not previously exist in any mesh, inform the handler peer_removed_from_mesh( From 97035bc2607a8a41196df61cb8b9921bff7961ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Tue, 5 Dec 2023 19:27:49 +0000 Subject: [PATCH 5/8] send IWANT messages immediately when handling IHAVE tests are changed due to how count_control_messages work. Previously when counting messages they were consumed from the source: control_pool and the NetworkBehaviour events list, with the new channels, to read a message one has to consume it, that's why test numbers were changed. --- protocols/gossipsub/src/behaviour.rs | 26 +++----- protocols/gossipsub/src/behaviour/tests.rs | 73 +++++++++++++--------- 2 files changed, 50 insertions(+), 49 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 5068f395f33..3148c9bf8ae 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -477,7 +477,6 @@ where peer_score: None, count_received_ihave: HashMap::new(), count_sent_iwant: HashMap::new(), - pending_iwant_msgs: HashSet::new(), connected_peers: HashMap::new(), published_message_ids: DuplicateCache::new(config.published_message_ids_cache_time()), config, @@ -1241,10 +1240,6 @@ where return false; } - if self.pending_iwant_msgs.contains(id) { - return false; - } - self.peer_score .as_ref() .map(|(_, _, _, promises)| !promises.contains(id)) @@ -1295,10 +1290,6 @@ where iwant_ids_vec.truncate(iask); *iasked += iask; - for message_id in &iwant_ids_vec { - // Add all messages to the pending list - self.pending_iwant_msgs.insert(message_id.clone()); - } if let Some((_, _, _, gossip_promises)) = &mut self.peer_score { gossip_promises.add_promise( @@ -1313,13 +1304,14 @@ where iwant_ids_vec ); - Self::control_pool_add( - &mut self.control_pool, - *peer_id, - RpcOut::IWant(IWant { - message_ids: iwant_ids_vec, - }), - ); + let sender = self + .handler_send_queues + .get_mut(peer_id) + .expect("Peerid should exist"); + + sender.iwant(IWant { + message_ids: iwant_ids_vec, + }); } tracing::trace!(peer=%peer_id, "Completed IHAVE handling for peer"); } @@ -2815,8 +2807,6 @@ where } } - // This clears all pending IWANT messages - self.pending_iwant_msgs.clear(); } fn on_connection_established( diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 2a28a24be10..9d2be223d03 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -1148,7 +1148,7 @@ fn test_handle_iwant_msg_not_cached() { #[test] // tests that an event is created when a peer shares that it has a message we want fn test_handle_ihave_subscribed_and_msg_not_cached() { - let (mut gs, peers, _, topic_hashes) = inject_nodes1() + let (mut gs, peers, receivers, topic_hashes) = inject_nodes1() .peer_no(20) .topics(vec![String::from("topic1")]) .to_subscribe(true) @@ -1160,15 +1160,19 @@ fn test_handle_ihave_subscribed_and_msg_not_cached() { ); // check that we sent an IWANT request for `unknown id` - let iwant_exists = match gs.control_pool.get(&peers[7]) { - Some(controls) => controls.iter().any(|c| match c { - RpcOut::IWant(IWant { message_ids }) => message_ids + let mut iwant_exists = false; + let receiver = receivers.get(&peers[7]).unwrap(); + while !receiver.non_priority.is_empty() { + if let Ok(RpcOut::IWant(IWant { message_ids })) = receiver.non_priority.try_recv() { + if message_ids .iter() - .any(|m| *m == MessageId::new(b"unknown id")), - _ => false, - }), - _ => false, - }; + .any(|m| *m == MessageId::new(b"unknown id")) + { + iwant_exists = true; + break; + } + } + } assert!( iwant_exists, @@ -2697,7 +2701,7 @@ fn test_ihave_msg_from_peer_below_gossip_threshold_gets_ignored() { ..PeerScoreThresholds::default() }; //build full mesh - let (mut gs, peers, queues, topics) = inject_nodes1() + let (mut gs, peers, mut queues, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) .topics(vec!["test".into()]) .to_subscribe(true) @@ -2713,8 +2717,10 @@ fn test_ihave_msg_from_peer_below_gossip_threshold_gets_ignored() { } //add two additional peers that will not be part of the mesh - let (p1, _receiver1) = add_peer(&mut gs, &topics, false, false); - let (p2, _receiver2) = add_peer(&mut gs, &topics, false, false); + let (p1, receiver1) = add_peer(&mut gs, &topics, false, false); + queues.insert(p1, receiver1); + let (p2, receiver2) = add_peer(&mut gs, &topics, false, false); + queues.insert(p2, receiver2); //reduce score of p1 below peer_score_thresholds.gossip_threshold //note that penalties get squared so two penalties means a score of @@ -4400,7 +4406,7 @@ fn test_ignore_too_many_ihaves() { .build() .unwrap(); //build gossipsub with full mesh - let (mut gs, _, mut queues, topics) = inject_nodes1() + let (mut gs, _, mut receivers, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) .topics(vec!["test".into()]) .to_subscribe(false) @@ -4409,7 +4415,7 @@ fn test_ignore_too_many_ihaves() { //add another peer not in the mesh let (peer, receiver) = add_peer(&mut gs, &topics, false, false); - queues.insert(peer, receiver); + receivers.insert(peer, receiver); //peer has 20 messages let mut seq = 0; @@ -4438,7 +4444,7 @@ fn test_ignore_too_many_ihaves() { //we send iwant only for the first 10 messages assert_eq!( - count_control_msgs(&gs, &queues, |p, action| p == &peer + count_control_msgs(&gs, &receivers, |p, action| p == &peer && matches!(action, RpcOut::IWant(IWant { message_ids }) if message_ids.len() == 1 && first_ten.contains(&message_ids[0]))), 10, "exactly the first ten ihaves should be processed and one iwant for each created" @@ -4446,6 +4452,7 @@ fn test_ignore_too_many_ihaves() { //after a heartbeat everything is forgotten gs.heartbeat(); + for raw_message in messages[10..].iter() { // Transform the inbound message let message = &gs @@ -4459,11 +4466,11 @@ fn test_ignore_too_many_ihaves() { ); } - //we sent iwant for all 20 messages + //we sent iwant for all 10 messages assert_eq!( - count_control_msgs(&gs, &queues, |p, action| p == &peer + count_control_msgs(&gs, &receivers, |p, action| p == &peer && matches!(action, RpcOut::IWant(IWant { message_ids }) if message_ids.len() == 1)), - 20, + 10, "all 20 should get sent" ); } @@ -4511,13 +4518,14 @@ fn test_ignore_too_many_messages_in_ihave() { //we send iwant only for the first 10 messages let mut sum = 0; assert_eq!( - count_control_msgs(&gs, &queues, |p, action| match action { - RpcOut::IWant(IWant { message_ids }) => + count_control_msgs(&gs, &queues, |p, rpc| match rpc { + RpcOut::IWant(IWant { message_ids }) => { p == &peer && { assert!(first_twelve.is_superset(&message_ids.iter().collect())); sum += message_ids.len(); true - }, + } + } _ => false, }), 2, @@ -4533,20 +4541,23 @@ fn test_ignore_too_many_messages_in_ihave() { vec![(topics[0].clone(), message_ids[10..20].to_vec())], ); - //we sent 20 iwant messages + //we sent 10 iwant messages ids via a IWANT rpc. let mut sum = 0; assert_eq!( - count_control_msgs(&gs, &queues, |p, action| match action { - RpcOut::IWant(IWant { message_ids }) => - p == &peer && { - sum += message_ids.len(); - true - }, - _ => false, + count_control_msgs(&gs, &queues, |p, rpc| { + match rpc { + RpcOut::IWant(IWant { message_ids }) => { + p == &peer && { + sum += message_ids.len(); + true + } + } + _ => false, + } }), - 3 + 1 ); - assert_eq!(sum, 20, "exactly 20 iwants should get sent"); + assert_eq!(sum, 10, "exactly 20 iwants should get sent"); } #[test] From 756ccf724595416306ac29a973e9487840b97b62 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Tue, 5 Dec 2023 19:35:18 +0000 Subject: [PATCH 6/8] send IHAVE messages immediately when emitting gossip --- protocols/gossipsub/src/behaviour.rs | 19 +++++----- protocols/gossipsub/src/behaviour/tests.rs | 40 ++++++++++++---------- 2 files changed, 29 insertions(+), 30 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 3148c9bf8ae..4593dec912a 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -2507,14 +2507,14 @@ where } // send an IHAVE message - Self::control_pool_add( - &mut self.control_pool, - peer, - RpcOut::IHave(IHave { - topic_hash: topic_hash.clone(), - message_ids: peer_message_ids, - }), - ); + let sender = self + .handler_send_queues + .get_mut(&peer) + .expect("Peerid should exist"); + sender.ihave(IHave { + topic_hash: topic_hash.clone(), + message_ids: peer_message_ids, + }); } } } @@ -2799,9 +2799,6 @@ where match msg { RpcOut::IHave(ihave) => sender.ihave(ihave), - RpcOut::IWant(iwant) => sender.iwant(iwant), - RpcOut::Graft(graft) => sender.graft(graft), - RpcOut::Prune(prune) => sender.prune(prune), _ => unreachable!(), } } diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 9d2be223d03..12444351de0 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -1679,7 +1679,7 @@ fn explicit_peers_not_added_to_mesh_from_fanout_on_subscribe() { #[test] fn no_gossip_gets_sent_to_explicit_peers() { - let (mut gs, peers, _, topic_hashes) = inject_nodes1() + let (mut gs, peers, receivers, topic_hashes) = inject_nodes1() .peer_no(2) .topics(vec![String::from("topic1"), String::from("topic2")]) .to_subscribe(true) @@ -1708,16 +1708,14 @@ fn no_gossip_gets_sent_to_explicit_peers() { } //assert that no gossip gets sent to explicit peer - assert_eq!( - gs.control_pool - .get(&peers[0]) - .unwrap_or(&Vec::new()) - .iter() - .filter(|m| matches!(m, RpcOut::IHave { .. })) - .count(), - 0, - "Gossip got emitted to explicit peer" - ); + let receiver = receivers.get(&peers[0]).unwrap(); + let mut gossips = 0; + while !receiver.non_priority.is_empty() { + if let Ok(RpcOut::IHave(_)) = receiver.non_priority.try_recv() { + gossips += 1; + } + } + assert_eq!(gossips, 0, "Gossip got emitted to explicit peer"); } // Tests the mesh maintenance addition @@ -2538,7 +2536,7 @@ fn test_do_not_gossip_to_peers_below_gossip_threshold() { }; // Build full mesh - let (mut gs, peers, queues, topics) = inject_nodes1() + let (mut gs, peers, mut receivers, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) .topics(vec!["test".into()]) .to_subscribe(true) @@ -2552,8 +2550,10 @@ fn test_do_not_gossip_to_peers_below_gossip_threshold() { } // Add two additional peers that will not be part of the mesh - let (p1, _receiver1) = add_peer(&mut gs, &topics, false, false); - let (p2, _receiver2) = add_peer(&mut gs, &topics, false, false); + let (p1, receiver1) = add_peer(&mut gs, &topics, false, false); + receivers.insert(p1, receiver1); + let (p2, receiver2) = add_peer(&mut gs, &topics, false, false); + receivers.insert(p2, receiver2); // Reduce score of p1 below peer_score_thresholds.gossip_threshold // note that penalties get squared so two penalties means a score of @@ -2585,7 +2585,7 @@ fn test_do_not_gossip_to_peers_below_gossip_threshold() { // Check that exactly one gossip messages got sent and it got sent to p2 assert_eq!( - count_control_msgs(&gs, &queues, |peer, action| match action { + count_control_msgs(&gs, &receivers, |peer, action| match action { RpcOut::IHave(IHave { topic_hash, message_ids, @@ -4568,7 +4568,7 @@ fn test_limit_number_of_message_ids_inside_ihave() { .build() .unwrap(); //build gossipsub with full mesh - let (mut gs, peers, queues, topics) = inject_nodes1() + let (mut gs, peers, mut receivers, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) .topics(vec!["test".into()]) .to_subscribe(false) @@ -4581,8 +4581,10 @@ fn test_limit_number_of_message_ids_inside_ihave() { } //add two other peers not in the mesh - let (p1, _) = add_peer(&mut gs, &topics, false, false); - let (p2, _) = add_peer(&mut gs, &topics, false, false); + let (p1, receiver1) = add_peer(&mut gs, &topics, false, false); + receivers.insert(p1, receiver1); + let (p2, receiver2) = add_peer(&mut gs, &topics, false, false); + receivers.insert(p2, receiver2); //receive 200 messages from another peer let mut seq = 0; @@ -4601,7 +4603,7 @@ fn test_limit_number_of_message_ids_inside_ihave() { let mut ihaves2 = HashSet::new(); assert_eq!( - count_control_msgs(&gs, &queues, |p, action| match action { + count_control_msgs(&gs, &receivers, |p, action| match action { RpcOut::IHave(IHave { message_ids, .. }) => { if p == &p1 { ihaves1 = message_ids.iter().cloned().collect(); From 8dd06399dbe3f072814b33295024473f4528d386 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Tue, 5 Dec 2023 22:53:46 +0000 Subject: [PATCH 7/8] remove no longer used control_pool --- protocols/gossipsub/src/behaviour.rs | 40 --------- protocols/gossipsub/src/behaviour/tests.rs | 96 ++++++++++------------ 2 files changed, 45 insertions(+), 91 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 4593dec912a..1e9e43f5872 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -253,9 +253,6 @@ pub struct Behaviour { /// Events that need to be yielded to the outside when polling. events: VecDeque>, - /// Pools non-urgent control messages between heartbeats. - control_pool: HashMap>, - /// Information used for publishing messages. publish_config: PublishConfig, @@ -323,10 +320,6 @@ pub struct Behaviour { /// Counts the number of `IWANT` that we sent the each peer since the last heartbeat. count_sent_iwant: HashMap, - /// Keeps track of IWANT messages that we are awaiting to send. - /// This is used to prevent sending duplicate IWANT messages for the same message. - pending_iwant_msgs: HashSet, - /// Short term cache for published message ids. This is used for penalizing peers sending /// our own messages back if the messages are anonymous or use a random author. published_message_ids: DuplicateCache, @@ -451,7 +444,6 @@ where Ok(Behaviour { metrics: metrics.map(|(registry, cfg)| Metrics::new(registry, cfg)), events: VecDeque::new(), - control_pool: HashMap::new(), publish_config: privacy.into(), duplicate_cache: DuplicateCache::new(config.duplicate_cache_time()), topic_peers: HashMap::new(), @@ -1290,7 +1282,6 @@ where iwant_ids_vec.truncate(iask); *iasked += iask; - if let Some((_, _, _, gossip_promises)) = &mut self.peer_score { gossip_promises.add_promise( *peer_id, @@ -2438,9 +2429,6 @@ where self.send_graft_prune(to_graft, to_prune, no_px); } - // piggyback pooled control messages - self.flush_control_pool(); - // shift the memcache self.mcache.shift(); @@ -2779,33 +2767,6 @@ where } } - // adds a control action to control_pool - fn control_pool_add( - control_pool: &mut HashMap>, - peer: PeerId, - control: RpcOut, - ) { - control_pool.entry(peer).or_default().push(control); - } - - /// Takes each control action mapping and turns it into a message - fn flush_control_pool(&mut self) { - for (peer, controls) in self.control_pool.drain().collect::>() { - for msg in controls { - let sender = self - .handler_send_queues - .get_mut(&peer) - .expect("Peerid should exist"); - - match msg { - RpcOut::IHave(ihave) => sender.ihave(ihave), - _ => unreachable!(), - } - } - } - - } - fn on_connection_established( &mut self, ConnectionEstablished { @@ -3446,7 +3407,6 @@ impl fmt::Debug for Behaviour( - gs: &Behaviour, +fn count_control_msgs( queues: &HashMap, mut filter: impl FnMut(&PeerId, &RpcOut) -> bool, ) -> usize { - gs.control_pool + queues .iter() - .map(|(peer_id, actions)| actions.iter().filter(|m| filter(peer_id, m)).count()) - .sum::() - + queues - .iter() - .fold(0, |mut collected_messages, (peer_id, c)| { - while !c.priority.is_empty() || !c.non_priority.is_empty() { - if let Ok(rpc) = c.priority.try_recv() { - if filter(peer_id, &rpc) { - collected_messages += 1; - } + .fold(0, |mut collected_messages, (peer_id, c)| { + while !c.priority.is_empty() || !c.non_priority.is_empty() { + if let Ok(rpc) = c.priority.try_recv() { + if filter(peer_id, &rpc) { + collected_messages += 1; } - if let Ok(rpc) = c.non_priority.try_recv() { - if filter(peer_id, &rpc) { - collected_messages += 1; - } + } + if let Ok(rpc) = c.non_priority.try_recv() { + if filter(peer_id, &rpc) { + collected_messages += 1; } } - collected_messages - }) + } + collected_messages + }) } fn flush_events( gs: &mut Behaviour, receiver_queues: &HashMap, ) { - gs.control_pool.clear(); gs.events.clear(); for c in receiver_queues.values() { while !c.priority.is_empty() || !c.non_priority.is_empty() { @@ -1478,7 +1472,7 @@ fn test_handle_graft_explicit_peer() { //check prunes assert!( - count_control_msgs(&gs, &queues, |peer_id, m| peer_id == peer + count_control_msgs(&queues, |peer_id, m| peer_id == peer && match m { RpcOut::Prune(Prune { topic_hash, .. }) => topic_hash == &topic_hashes[0] || topic_hash == &topic_hashes[1], @@ -1507,7 +1501,7 @@ fn explicit_peers_not_added_to_mesh_on_receiving_subscription() { //assert that graft gets created to non-explicit peer assert!( - count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[1] + count_control_msgs(&queues, |peer_id, m| peer_id == &peers[1] && matches!(m, RpcOut::Graft { .. })) >= 1, "No graft message got created to non-explicit peer" @@ -1515,7 +1509,7 @@ fn explicit_peers_not_added_to_mesh_on_receiving_subscription() { //assert that no graft gets created to explicit peer assert_eq!( - count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[0] + count_control_msgs(&queues, |peer_id, m| peer_id == &peers[0] && matches!(m, RpcOut::Graft { .. })), 0, "A graft message got created to an explicit peer" @@ -1539,7 +1533,7 @@ fn do_not_graft_explicit_peer() { //assert that no graft gets created to explicit peer assert_eq!( - count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &others[0] + count_control_msgs(&queues, |peer_id, m| peer_id == &others[0] && matches!(m, RpcOut::Graft { .. })), 0, "A graft message got created to an explicit peer" @@ -1613,7 +1607,7 @@ fn explicit_peers_not_added_to_mesh_on_subscribe() { //assert that graft gets created to non-explicit peer assert!( - count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[1] + count_control_msgs(&queues, |peer_id, m| peer_id == &peers[1] && matches!(m, RpcOut::Graft { .. })) > 0, "No graft message got created to non-explicit peer" @@ -1621,7 +1615,7 @@ fn explicit_peers_not_added_to_mesh_on_subscribe() { //assert that no graft gets created to explicit peer assert_eq!( - count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[0] + count_control_msgs(&queues, |peer_id, m| peer_id == &peers[0] && matches!(m, RpcOut::Graft { .. })), 0, "A graft message got created to an explicit peer" @@ -1662,7 +1656,7 @@ fn explicit_peers_not_added_to_mesh_from_fanout_on_subscribe() { //assert that graft gets created to non-explicit peer assert!( - count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[1] + count_control_msgs(&queues, |peer_id, m| peer_id == &peers[1] && matches!(m, RpcOut::Graft { .. })) >= 1, "No graft message got created to non-explicit peer" @@ -1670,7 +1664,7 @@ fn explicit_peers_not_added_to_mesh_from_fanout_on_subscribe() { //assert that no graft gets created to explicit peer assert_eq!( - count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[0] + count_control_msgs(&queues, |peer_id, m| peer_id == &peers[0] && matches!(m, RpcOut::Graft { .. })), 0, "A graft message got created to an explicit peer" @@ -1857,7 +1851,7 @@ fn test_send_px_and_backoff_in_prune() { //check prune message assert_eq!( - count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[0] + count_control_msgs(&queues, |peer_id, m| peer_id == &peers[0] && match m { RpcOut::Prune(Prune { topic_hash, @@ -1905,7 +1899,7 @@ fn test_prune_backoffed_peer_on_graft() { //check prune message assert_eq!( - count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[0] + count_control_msgs(&queues, |peer_id, m| peer_id == &peers[0] && match m { RpcOut::Prune(Prune { topic_hash, @@ -1955,7 +1949,7 @@ fn test_do_not_graft_within_backoff_period() { //Check that no graft got created (we have backoff_slack = 1 therefore one more heartbeat // is needed). assert_eq!( - count_control_msgs(&gs, &queues, |_, m| matches!(m, RpcOut::Graft { .. })), + count_control_msgs(&queues, |_, m| matches!(m, RpcOut::Graft { .. })), 0, "Graft message created too early within backoff period" ); @@ -1966,7 +1960,7 @@ fn test_do_not_graft_within_backoff_period() { //check that graft got created assert!( - count_control_msgs(&gs, &queues, |_, m| matches!(m, RpcOut::Graft { .. })) > 0, + count_control_msgs(&queues, |_, m| matches!(m, RpcOut::Graft { .. })) > 0, "No graft message was created after backoff period" ); } @@ -2004,7 +1998,7 @@ fn test_do_not_graft_within_default_backoff_period_after_receiving_prune_without //Check that no graft got created (we have backoff_slack = 1 therefore one more heartbeat // is needed). assert_eq!( - count_control_msgs(&gs, &queues, |_, m| matches!(m, RpcOut::Graft { .. })), + count_control_msgs(&queues, |_, m| matches!(m, RpcOut::Graft { .. })), 0, "Graft message created too early within backoff period" ); @@ -2015,7 +2009,7 @@ fn test_do_not_graft_within_default_backoff_period_after_receiving_prune_without //check that graft got created assert!( - count_control_msgs(&gs, &queues, |_, m| matches!(m, RpcOut::Graft { .. })) > 0, + count_control_msgs(&queues, |_, m| matches!(m, RpcOut::Graft { .. })) > 0, "No graft message was created after backoff period" ); } @@ -2044,7 +2038,7 @@ fn test_unsubscribe_backoff() { let _ = gs.unsubscribe(&Topic::new(topic)); assert_eq!( - count_control_msgs(&gs, &queues, |_, m| match m { + count_control_msgs(&queues, |_, m| match m { RpcOut::Prune(Prune { backoff, .. }) => backoff == &Some(1), _ => false, }), @@ -2069,7 +2063,7 @@ fn test_unsubscribe_backoff() { // Check that no graft got created (we have backoff_slack = 1 therefore one more heartbeat // is needed). assert_eq!( - count_control_msgs(&gs, &queues, |_, m| matches!(m, RpcOut::Graft { .. })), + count_control_msgs(&queues, |_, m| matches!(m, RpcOut::Graft { .. })), 0, "Graft message created too early within backoff period" ); @@ -2080,7 +2074,7 @@ fn test_unsubscribe_backoff() { // check that graft got created assert!( - count_control_msgs(&gs, &queues, |_, m| matches!(m, RpcOut::Graft { .. })) > 0, + count_control_msgs(&queues, |_, m| matches!(m, RpcOut::Graft { .. })) > 0, "No graft message was created after backoff period" ); } @@ -2173,7 +2167,7 @@ fn test_gossip_to_at_least_gossip_lazy_peers() { //check that exactly config.gossip_lazy() many gossip messages were sent. assert_eq!( - count_control_msgs(&gs, &queues, |_, action| match action { + count_control_msgs(&queues, |_, action| match action { RpcOut::IHave(IHave { topic_hash, message_ids, @@ -2217,7 +2211,7 @@ fn test_gossip_to_at_most_gossip_factor_peers() { let msg_id = gs.config.message_id(message); //check that exactly config.gossip_lazy() many gossip messages were sent. assert_eq!( - count_control_msgs(&gs, &queues, |_, action| match action { + count_control_msgs(&queues, |_, action| match action { RpcOut::IHave(IHave { topic_hash, message_ids, @@ -2376,7 +2370,7 @@ fn test_prune_negative_scored_peers() { //check prune message assert_eq!( - count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[0] + count_control_msgs(&queues, |peer_id, m| peer_id == &peers[0] && match m { RpcOut::Prune(Prune { topic_hash, @@ -2510,7 +2504,7 @@ fn test_only_send_nonnegative_scoring_peers_in_px() { // Check that px in prune message only contains third peer assert_eq!( - count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[1] + count_control_msgs(&queues, |peer_id, m| peer_id == &peers[1] && match m { RpcOut::Prune(Prune { topic_hash, @@ -2585,7 +2579,7 @@ fn test_do_not_gossip_to_peers_below_gossip_threshold() { // Check that exactly one gossip messages got sent and it got sent to p2 assert_eq!( - count_control_msgs(&gs, &receivers, |peer, action| match action { + count_control_msgs(&receivers, |peer, action| match action { RpcOut::IHave(IHave { topic_hash, message_ids, @@ -2751,7 +2745,7 @@ fn test_ihave_msg_from_peer_below_gossip_threshold_gets_ignored() { // check that we sent exactly one IWANT request to p2 assert_eq!( - count_control_msgs(&gs, &queues, |peer, c| match c { + count_control_msgs(&queues, |peer, c| match c { RpcOut::IWant(IWant { message_ids }) => if message_ids.iter().any(|m| m == &msg_id) { assert_eq!(peer, &p2); @@ -4345,7 +4339,7 @@ fn test_ignore_graft_from_unknown_topic() { //assert that no prune got created assert_eq!( - count_control_msgs(&gs, &queues, |_, a| matches!(a, RpcOut::Prune { .. })), + count_control_msgs(&queues, |_, a| matches!(a, RpcOut::Prune { .. })), 0, "we should not prune after graft in unknown topic" ); @@ -4444,7 +4438,7 @@ fn test_ignore_too_many_ihaves() { //we send iwant only for the first 10 messages assert_eq!( - count_control_msgs(&gs, &receivers, |p, action| p == &peer + count_control_msgs(&receivers, |p, action| p == &peer && matches!(action, RpcOut::IWant(IWant { message_ids }) if message_ids.len() == 1 && first_ten.contains(&message_ids[0]))), 10, "exactly the first ten ihaves should be processed and one iwant for each created" @@ -4468,7 +4462,7 @@ fn test_ignore_too_many_ihaves() { //we sent iwant for all 10 messages assert_eq!( - count_control_msgs(&gs, &receivers, |p, action| p == &peer + count_control_msgs(&receivers, |p, action| p == &peer && matches!(action, RpcOut::IWant(IWant { message_ids }) if message_ids.len() == 1)), 10, "all 20 should get sent" @@ -4518,7 +4512,7 @@ fn test_ignore_too_many_messages_in_ihave() { //we send iwant only for the first 10 messages let mut sum = 0; assert_eq!( - count_control_msgs(&gs, &queues, |p, rpc| match rpc { + count_control_msgs(&queues, |p, rpc| match rpc { RpcOut::IWant(IWant { message_ids }) => { p == &peer && { assert!(first_twelve.is_superset(&message_ids.iter().collect())); @@ -4544,7 +4538,7 @@ fn test_ignore_too_many_messages_in_ihave() { //we sent 10 iwant messages ids via a IWANT rpc. let mut sum = 0; assert_eq!( - count_control_msgs(&gs, &queues, |p, rpc| { + count_control_msgs(&queues, |p, rpc| { match rpc { RpcOut::IWant(IWant { message_ids }) => { p == &peer && { @@ -4603,7 +4597,7 @@ fn test_limit_number_of_message_ids_inside_ihave() { let mut ihaves2 = HashSet::new(); assert_eq!( - count_control_msgs(&gs, &receivers, |p, action| match action { + count_control_msgs(&receivers, |p, action| match action { RpcOut::IHave(IHave { message_ids, .. }) => { if p == &p1 { ihaves1 = message_ids.iter().cloned().collect(); @@ -4938,7 +4932,7 @@ fn test_dont_send_px_to_old_gossipsub_peers() { //check that prune does not contain px assert_eq!( - count_control_msgs(&gs, &queues, |_, m| match m { + count_control_msgs(&queues, |_, m| match m { RpcOut::Prune(Prune { peers: px, .. }) => !px.is_empty(), _ => false, }), @@ -4976,7 +4970,7 @@ fn test_dont_send_floodsub_peers_in_px() { //check that px in prune message is empty assert_eq!( - count_control_msgs(&gs, &queues, |_, m| match m { + count_control_msgs(&queues, |_, m| match m { RpcOut::Prune(Prune { peers: px, .. }) => !px.is_empty(), _ => false, }), From 5837ea34e764669e08d676630ec02ea7008ac952 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Tue, 5 Dec 2023 22:57:50 +0000 Subject: [PATCH 8/8] clippy --- protocols/gossipsub/src/behaviour/tests.rs | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 400ea8fcb0c..89bcd14d15a 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -1404,7 +1404,7 @@ fn test_explicit_peer_reconnects() { .check_explicit_peers_ticks(2) .build() .unwrap(); - let (mut gs, others, mut queues, _) = inject_nodes1() + let (mut gs, others, queues, _) = inject_nodes1() .peer_no(1) .topics(Vec::new()) .to_subscribe(true) @@ -1416,7 +1416,7 @@ fn test_explicit_peer_reconnects() { //add peer as explicit peer gs.add_explicit_peer(peer); - flush_events(&mut gs, &mut queues); + flush_events(&mut gs, &queues); //disconnect peer disconnect_peer(&mut gs, peer); @@ -1875,7 +1875,7 @@ fn test_prune_backoffed_peer_on_graft() { let config: Config = Config::default(); //build mesh with enough peers for px - let (mut gs, peers, mut queues, topics) = inject_nodes1() + let (mut gs, peers, queues, topics) = inject_nodes1() .peer_no(config.prune_peers() + 1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -1892,7 +1892,7 @@ fn test_prune_backoffed_peer_on_graft() { ); //ignore all messages until now - flush_events(&mut gs, &mut queues); + flush_events(&mut gs, &queues); //handle graft gs.handle_graft(&peers[0], vec![topics[0].clone()]); @@ -1924,7 +1924,7 @@ fn test_do_not_graft_within_backoff_period() { .build() .unwrap(); //only one peer => mesh too small and will try to regraft as early as possible - let (mut gs, peers, mut queues, topics) = inject_nodes1() + let (mut gs, peers, queues, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -1935,7 +1935,7 @@ fn test_do_not_graft_within_backoff_period() { gs.handle_prune(&peers[0], vec![(topics[0].clone(), Vec::new(), Some(1))]); //forget all events until now - flush_events(&mut gs, &mut queues); + flush_events(&mut gs, &queues); //call heartbeat gs.heartbeat(); @@ -1975,7 +1975,7 @@ fn test_do_not_graft_within_default_backoff_period_after_receiving_prune_without .build() .unwrap(); //only one peer => mesh too small and will try to regraft as early as possible - let (mut gs, peers, mut queues, topics) = inject_nodes1() + let (mut gs, peers, queues, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -1986,7 +1986,7 @@ fn test_do_not_graft_within_default_backoff_period_after_receiving_prune_without gs.handle_prune(&peers[0], vec![(topics[0].clone(), Vec::new(), None)]); //forget all events until now - flush_events(&mut gs, &mut queues); + flush_events(&mut gs, &queues); //call heartbeat gs.heartbeat(); @@ -2028,7 +2028,7 @@ fn test_unsubscribe_backoff() { let topic = String::from("test"); // only one peer => mesh too small and will try to regraft as early as possible - let (mut gs, _, mut queues, topics) = inject_nodes1() + let (mut gs, _, queues, topics) = inject_nodes1() .peer_no(1) .topics(vec![topic.clone()]) .to_subscribe(true) @@ -2049,7 +2049,7 @@ fn test_unsubscribe_backoff() { let _ = gs.subscribe(&Topic::new(topics[0].to_string())); // forget all events until now - flush_events(&mut gs, &mut queues); + flush_events(&mut gs, &queues); // call heartbeat gs.heartbeat(); @@ -4371,7 +4371,7 @@ fn test_ignore_too_many_iwants_from_same_peer_for_same_message() { gs.handle_received_message(m1, &PeerId::random()); //clear events - flush_events(&mut gs, &mut queues); + flush_events(&mut gs, &queues); //the first gossip_retransimission many iwants return the valid message, all others are // ignored.