From b375b2ffd3b1141c56c6f61e4539ea0993f9ccfa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Mon, 13 Nov 2023 00:44:27 +0000 Subject: [PATCH] address review --- protocols/gossipsub/src/behaviour.rs | 219 +++++++-------- protocols/gossipsub/src/behaviour/tests.rs | 42 +-- protocols/gossipsub/src/types.rs | 294 ++++++++++++++------- 3 files changed, 319 insertions(+), 236 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 0625666b937..3bdc2871baf 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -534,14 +534,10 @@ where } // send subscription request to all peers - let peer_list = self.peer_topics.keys().cloned().collect::>(); - if !peer_list.is_empty() { + for peer in self.peer_topics.keys().cloned().collect::>() { + tracing::debug!(%peer, "Sending SUBSCRIBE to peer"); let event = RpcOut::Subscribe(topic_hash.clone()); - - for peer in peer_list { - tracing::debug!(%peer, "Sending SUBSCRIBE to peer"); - self.send_message(peer, event.clone()); - } + self.send_message(peer, event); } // call JOIN(topic) @@ -608,8 +604,6 @@ where return Err(PublishError::MessageTooLarge); } - let event = RpcOut::Publish(raw_message.clone()); - // Check the if the message has been published before if self.duplicate_cache.contains(&msg_id) { // This message has already been seen. We don't re-publish messages that have already @@ -638,10 +632,48 @@ where .cloned(), ); } else { - // Mesh peers - if let Some(mesh_peers) = self.mesh.get(&raw_message.topic) { - for peer_id in mesh_peers { - recipient_peers.insert(*peer_id); + match self.mesh.get(&raw_message.topic) { + // Mesh peers + Some(mesh_peers) => { + recipient_peers.extend(mesh_peers); + } + // Gossipsub peers + None => { + tracing::debug!(topic=%topic_hash, "Topic not in the mesh"); + // If we have fanout peers add them to the map. + if self.fanout.contains_key(&topic_hash) { + for peer in self.fanout.get(&topic_hash).expect("Topic must exist") { + recipient_peers.insert(*peer); + } + } else { + // We have no fanout peers, select mesh_n of them and add them to the fanout + let mesh_n = self.config.mesh_n(); + let new_peers = get_random_peers( + &self.topic_peers, + &self.connected_peers, + &topic_hash, + mesh_n, + { + |p| { + !self.explicit_peers.contains(p) + && !self + .score_below_threshold(p, |pst| { + pst.publish_threshold + }) + .0 + } + }, + ); + // Add the new peers to the fanout and recipient peers + self.fanout.insert(topic_hash.clone(), new_peers.clone()); + for peer in new_peers { + tracing::debug!(%peer, "Peer added to fanout"); + recipient_peers.insert(peer); + } + } + // We are publishing to fanout peers - update the time we published + self.fanout_last_pub + .insert(topic_hash.clone(), Instant::now()); } } @@ -662,43 +694,6 @@ where recipient_peers.insert(*peer); } } - - // Gossipsub peers - if self.mesh.get(&topic_hash).is_none() { - tracing::debug!(topic=%topic_hash, "Topic not in the mesh"); - // If we have fanout peers add them to the map. - if self.fanout.contains_key(&topic_hash) { - for peer in self.fanout.get(&topic_hash).expect("Topic must exist") { - recipient_peers.insert(*peer); - } - } else { - // We have no fanout peers, select mesh_n of them and add them to the fanout - let mesh_n = self.config.mesh_n(); - let new_peers = get_random_peers( - &self.topic_peers, - &self.connected_peers, - &topic_hash, - mesh_n, - { - |p| { - !self.explicit_peers.contains(p) - && !self - .score_below_threshold(p, |pst| pst.publish_threshold) - .0 - } - }, - ); - // Add the new peers to the fanout and recipient peers - self.fanout.insert(topic_hash.clone(), new_peers.clone()); - for peer in new_peers { - tracing::debug!(%peer, "Peer added to fanout"); - recipient_peers.insert(peer); - } - } - // We are publishing to fanout peers - update the time we published - self.fanout_last_pub - .insert(topic_hash.clone(), Instant::now()); - } } } @@ -709,7 +704,7 @@ where // If the message isn't a duplicate and we have sent it to some peers add it to the // duplicate cache and memcache. self.duplicate_cache.insert(msg_id.clone()); - self.mcache.put(&msg_id, raw_message); + self.mcache.put(&msg_id, raw_message.clone()); // If the message is anonymous or has a random author add it to the published message ids // cache. @@ -722,7 +717,7 @@ where // Send to peers we know are subscribed to the topic. for peer_id in recipient_peers.iter() { tracing::trace!(peer=%peer_id, "Sending message to peer"); - self.send_message(*peer_id, event.clone()); + self.send_message(*peer_id, RpcOut::Publish(raw_message.clone())); } tracing::debug!(message=%msg_id, "Published message"); @@ -1303,13 +1298,15 @@ where } tracing::debug!(peer=%peer_id, "Handling IWANT for peer"); - // build a hashmap of available messages - let mut cached_messages = HashMap::new(); for id in iwant_msgs { - // If we have it and the IHAVE count is not above the threshold, add it do the - // cached_messages mapping - if let Some((msg, count)) = self.mcache.get_with_iwant_counts(&id, peer_id) { + // If we have it and the IHAVE count is not above the threshold, + // foward the message. + if let Some((msg, count)) = self + .mcache + .get_with_iwant_counts(&id, peer_id) + .map(|(msg, count)| (msg.clone(), count)) + { if count > self.config.gossip_retransimission() { tracing::debug!( peer=%peer_id, @@ -1317,16 +1314,11 @@ where "IWANT: Peer has asked for message too many times; ignoring request" ); } else { - cached_messages.insert(id.clone(), msg.clone()); + tracing::debug!(peer=%peer_id, "IWANT: Sending cached messages to peer"); + self.send_message(*peer_id, RpcOut::Forward(msg)); } } } - - // Forward cached messages. - for message in cached_messages.into_iter().map(|entry| entry.1) { - tracing::debug!(peer=%peer_id, "IWANT: Sending cached messages to peer"); - self.send_message(*peer_id, RpcOut::Forward(message)); - } tracing::debug!(peer=%peer_id, "Completed IWANT handling for peer"); } @@ -1476,16 +1468,18 @@ where if !to_prune_topics.is_empty() { // build the prune messages to send let on_unsubscribe = false; - let prune_messages = to_prune_topics + for action in to_prune_topics .iter() .map(|t| self.make_prune(t, peer_id, do_px, on_unsubscribe)) - .collect(); + .collect::>() + { + self.send_message(*peer_id, RpcOut::Control(action)); + } // Send the prune messages to the peer tracing::debug!( peer=%peer_id, "GRAFT: Not subscribed to topics - Sending PRUNE to peer" ); - self.send_message(*peer_id, RpcOut::Control(prune_messages)); } tracing::debug!(peer=%peer_id, "Completed GRAFT handling for peer"); } @@ -1977,16 +1971,12 @@ where // If we need to send grafts to peer, do so immediately, rather than waiting for the // heartbeat. - if !topics_to_graft.is_empty() { - self.send_message( - *propagation_source, - RpcOut::Control( - topics_to_graft - .into_iter() - .map(|topic_hash| ControlAction::Graft { topic_hash }) - .collect(), - ), - ) + for action in topics_to_graft + .into_iter() + .map(|topic_hash| ControlAction::Graft { topic_hash }) + .collect::>() + { + self.send_message(*propagation_source, RpcOut::Control(action)) } // Notify the application of the subscriptions @@ -2512,12 +2502,9 @@ where &self.connected_peers, ); } - let mut control_msgs: Vec = topics - .iter() - .map(|topic_hash| ControlAction::Graft { - topic_hash: topic_hash.clone(), - }) - .collect(); + let control_msgs = topics.iter().map(|topic_hash| ControlAction::Graft { + topic_hash: topic_hash.clone(), + }); // If there are prunes associated with the same peer add them. // NOTE: In this case a peer has been added to a topic mesh, and removed from another. @@ -2525,39 +2512,37 @@ where // of its removal from another. // The following prunes are not due to unsubscribing. - let on_unsubscribe = false; - if let Some(topics) = to_prune.remove(&peer) { - let mut prunes = topics - .iter() - .map(|topic_hash| { - self.make_prune( - topic_hash, - &peer, - self.config.do_px() && !no_px.contains(&peer), - on_unsubscribe, - ) - }) - .collect::>(); - control_msgs.append(&mut prunes); - } + let prunes = to_prune + .remove(&peer) + .into_iter() + .flatten() + .map(|topic_hash| { + self.make_prune( + &topic_hash, + &peer, + self.config.do_px() && !no_px.contains(&peer), + false, + ) + }); // send the control messages - self.send_message(peer, RpcOut::Control(control_msgs)); + for msg in control_msgs.chain(prunes).collect::>() { + self.send_message(peer, RpcOut::Control(msg)); + } } // handle the remaining prunes // The following prunes are not due to unsubscribing. - let on_unsubscribe = false; for (peer, topics) in to_prune.iter() { - let mut remaining_prunes = Vec::new(); for topic_hash in topics { let prune = self.make_prune( topic_hash, peer, self.config.do_px() && !no_px.contains(peer), - on_unsubscribe, + false, ); - remaining_prunes.push(prune); + self.send_message(*peer, RpcOut::Control(prune)); + // inform the handler peer_removed_from_mesh( *peer, @@ -2568,8 +2553,6 @@ where &self.connected_peers, ); } - - self.send_message(*peer, RpcOut::Control(remaining_prunes)) } } @@ -2743,7 +2726,9 @@ where /// Takes each control action mapping and turns it into a message fn flush_control_pool(&mut self) { for (peer, controls) in self.control_pool.drain().collect::>() { - self.send_message(peer, RpcOut::Control(controls)); + for msg in controls { + self.send_message(peer, RpcOut::Control(msg)); + } } // This clears all pending IWANT messages @@ -2817,6 +2802,13 @@ where return; // Not our first connection to this peer, hence nothing to do. } + // Insert an empty set of the topics of this peer until known. + self.peer_topics.insert(peer_id, Default::default()); + + if let Some((peer_score, ..)) = &mut self.peer_score { + peer_score.add_peer(peer_id); + } + // Ignore connections from blacklisted peers. if self.blacklisted_peers.contains(&peer_id) { tracing::debug!(peer=%peer_id, "Ignoring connection from blacklisted peer"); @@ -2828,13 +2820,6 @@ where for topic_hash in self.mesh.keys().cloned().collect::>() { self.send_message(peer_id, RpcOut::Subscribe(topic_hash)); } - - // Insert an empty set of the topics of this peer until known. - self.peer_topics.insert(peer_id, Default::default()); - - if let Some((peer_score, ..)) = &mut self.peer_score { - peer_score.add_peer(peer_id); - } } fn on_connection_closed( @@ -3434,13 +3419,7 @@ mod local_test { 1 => RpcOut::Unsubscribe(IdentTopic::new("TestTopic").hash()), 2 => RpcOut::Publish(test_message()), 3 => RpcOut::Forward(test_message()), - 4 => { - let mut control = Vec::new(); - for _ in 0..g.gen_range(0..10u8) { - control.push(test_control()); - } - RpcOut::Control(control) - } + 4 => RpcOut::Control(test_control()), _ => panic!("outside range"), } } diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 48e5a48a9ee..96586901dd3 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -402,13 +402,16 @@ fn test_subscribe() { let subscriptions = gs .events .iter() - .fold(0, |collected_subscriptions, e| match e { - ToSwarm::NotifyHandler { - event: HandlerIn::Message(RpcOut::Subscribe(_)), - .. - } => collected_subscriptions + 1, - _ => collected_subscriptions, - }); + .filter(|e| { + matches!( + e, + ToSwarm::NotifyHandler { + event: HandlerIn::Message(RpcOut::Subscribe(_)), + .. + } + ) + }) + .count(); // we sent a subscribe to all known peers assert_eq!(subscriptions, 20); @@ -783,12 +786,15 @@ fn test_inject_connected() { } => Some((peer_id, topic)), _ => None, }) - .fold(HashMap::new(), |mut subs, (peer, sub)| { - let mut peer_subs = subs.remove(&peer).unwrap_or(vec![]); - peer_subs.push(sub.into_string()); - subs.insert(peer, peer_subs); - subs - }); + .fold( + HashMap::>::new(), + |mut subs, (peer, sub)| { + let mut peer_subs = subs.remove(&peer).unwrap_or_default(); + peer_subs.push(sub.into_string()); + subs.insert(peer, peer_subs); + subs + }, + ); // check that there are two subscriptions sent to each peer for peer_subs in subscriptions.values() { @@ -1309,15 +1315,15 @@ fn count_control_msgs( .sum::() + gs.events .iter() - .map(|e| match e { + .filter(|e| match e { ToSwarm::NotifyHandler { peer_id, - event: HandlerIn::Message(RpcOut::Control(actions)), + event: HandlerIn::Message(RpcOut::Control(action)), .. - } => actions.iter().filter(|m| filter(peer_id, m)).count(), - _ => 0, + } => filter(peer_id, action), + _ => false, }) - .sum::() + .count() } fn flush_events(gs: &mut Behaviour) { diff --git a/protocols/gossipsub/src/types.rs b/protocols/gossipsub/src/types.rs index 8602a01e8f7..d1b92ff0ba8 100644 --- a/protocols/gossipsub/src/types.rs +++ b/protocols/gossipsub/src/types.rs @@ -233,33 +233,6 @@ pub enum ControlAction { }, } -/// An RPC received/sent. -#[derive(Clone, PartialEq, Eq, Hash)] -pub struct Rpc { - /// List of messages that were part of this RPC query. - pub messages: Vec, - /// List of subscriptions. - pub subscriptions: Vec, - /// List of Gossipsub control messages. - pub control_msgs: Vec, -} - -impl fmt::Debug for Rpc { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let mut b = f.debug_struct("GossipsubRpc"); - if !self.messages.is_empty() { - b.field("messages", &self.messages); - } - if !self.subscriptions.is_empty() { - b.field("subscriptions", &self.subscriptions); - } - if !self.control_msgs.is_empty() { - b.field("control_msgs", &self.control_msgs); - } - b.finish() - } -} - /// A Gossipsub RPC message sent. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum RpcOut { @@ -272,7 +245,7 @@ pub enum RpcOut { /// Unsubscribe a topic. Unsubscribe(TopicHash), /// List of Gossipsub control messages. - Control(Vec), + Control(ControlAction), } impl RpcOut { @@ -313,82 +286,207 @@ impl From for proto::RPC { }], control: None, }, - RpcOut::Control(actions) => { - if actions.is_empty() { - return proto::RPC { - publish: Vec::new(), - subscriptions: Vec::new(), - control: None, - }; - } - - let mut control = proto::ControlMessage { - ihave: Vec::new(), - iwant: Vec::new(), - graft: Vec::new(), - prune: Vec::new(), - }; - - for action in actions { - match action { - // collect all ihave messages - ControlAction::IHave { - topic_hash, - message_ids, - } => { - let rpc_ihave = proto::ControlIHave { - topic_id: Some(topic_hash.into_string()), - message_ids: message_ids - .into_iter() - .map(|msg_id| msg_id.0) - .collect(), - }; - control.ihave.push(rpc_ihave); - } - ControlAction::IWant { message_ids } => { - let rpc_iwant = proto::ControlIWant { - message_ids: message_ids - .into_iter() - .map(|msg_id| msg_id.0) - .collect(), - }; - control.iwant.push(rpc_iwant); - } - ControlAction::Graft { topic_hash } => { - let rpc_graft = proto::ControlGraft { - topic_id: Some(topic_hash.into_string()), - }; - control.graft.push(rpc_graft); - } - ControlAction::Prune { - topic_hash, - peers, + RpcOut::Control(ControlAction::IHave { + topic_hash, + message_ids, + }) => proto::RPC { + publish: Vec::new(), + subscriptions: Vec::new(), + control: Some(proto::ControlMessage { + ihave: vec![proto::ControlIHave { + topic_id: Some(topic_hash.into_string()), + message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(), + }], + iwant: vec![], + graft: vec![], + prune: vec![], + }), + }, + RpcOut::Control(ControlAction::IWant { message_ids }) => proto::RPC { + publish: Vec::new(), + subscriptions: Vec::new(), + control: Some(proto::ControlMessage { + ihave: vec![], + iwant: vec![proto::ControlIWant { + message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(), + }], + graft: vec![], + prune: vec![], + }), + }, + RpcOut::Control(ControlAction::Graft { topic_hash }) => proto::RPC { + publish: Vec::new(), + subscriptions: vec![], + control: Some(proto::ControlMessage { + ihave: vec![], + iwant: vec![], + graft: vec![proto::ControlGraft { + topic_id: Some(topic_hash.into_string()), + }], + prune: vec![], + }), + }, + RpcOut::Control(ControlAction::Prune { + topic_hash, + peers, + backoff, + }) => { + proto::RPC { + publish: Vec::new(), + subscriptions: vec![], + control: Some(proto::ControlMessage { + ihave: vec![], + iwant: vec![], + graft: vec![], + prune: vec![proto::ControlPrune { + topic_id: Some(topic_hash.into_string()), + peers: peers + .into_iter() + .map(|info| proto::PeerInfo { + peer_id: info.peer_id.map(|id| id.to_bytes()), + // TODO, see https://github.com/libp2p/specs/pull/217 + signed_peer_record: None, + }) + .collect(), backoff, - } => { - let rpc_prune = proto::ControlPrune { - topic_id: Some(topic_hash.into_string()), - peers: peers - .into_iter() - .map(|info| proto::PeerInfo { - peer_id: info.peer_id.map(|id| id.to_bytes()), - // TODO, see https://github.com/libp2p/specs/pull/217 - signed_peer_record: None, - }) - .collect(), - backoff, - }; - control.prune.push(rpc_prune); - } - } + }], + }), } + } + } + } +} - proto::RPC { - publish: Vec::new(), - subscriptions: Vec::new(), - control: Some(control), +/// An RPC received/sent. +#[derive(Clone, PartialEq, Eq, Hash)] +pub struct Rpc { + /// List of messages that were part of this RPC query. + pub messages: Vec, + /// List of subscriptions. + pub subscriptions: Vec, + /// List of Gossipsub control messages. + pub control_msgs: Vec, +} + +impl Rpc { + /// Converts the GossipsubRPC into its protobuf format. + // A convenience function to avoid explicitly specifying types. + pub fn into_protobuf(self) -> proto::RPC { + self.into() + } +} + +impl From for proto::RPC { + /// Converts the RPC into protobuf format. + fn from(rpc: Rpc) -> Self { + // Messages + let mut publish = Vec::new(); + + for message in rpc.messages.into_iter() { + let message = proto::Message { + from: message.source.map(|m| m.to_bytes()), + data: Some(message.data), + seqno: message.sequence_number.map(|s| s.to_be_bytes().to_vec()), + topic: TopicHash::into_string(message.topic), + signature: message.signature, + key: message.key, + }; + + publish.push(message); + } + + // subscriptions + let subscriptions = rpc + .subscriptions + .into_iter() + .map(|sub| proto::SubOpts { + subscribe: Some(sub.action == SubscriptionAction::Subscribe), + topic_id: Some(sub.topic_hash.into_string()), + }) + .collect::>(); + + // control messages + let mut control = proto::ControlMessage { + ihave: Vec::new(), + iwant: Vec::new(), + graft: Vec::new(), + prune: Vec::new(), + }; + + let empty_control_msg = rpc.control_msgs.is_empty(); + + for action in rpc.control_msgs { + match action { + // collect all ihave messages + ControlAction::IHave { + topic_hash, + message_ids, + } => { + let rpc_ihave = proto::ControlIHave { + topic_id: Some(topic_hash.into_string()), + message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(), + }; + control.ihave.push(rpc_ihave); + } + ControlAction::IWant { message_ids } => { + let rpc_iwant = proto::ControlIWant { + message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(), + }; + control.iwant.push(rpc_iwant); + } + ControlAction::Graft { topic_hash } => { + let rpc_graft = proto::ControlGraft { + topic_id: Some(topic_hash.into_string()), + }; + control.graft.push(rpc_graft); + } + ControlAction::Prune { + topic_hash, + peers, + backoff, + } => { + let rpc_prune = proto::ControlPrune { + topic_id: Some(topic_hash.into_string()), + peers: peers + .into_iter() + .map(|info| proto::PeerInfo { + peer_id: info.peer_id.map(|id| id.to_bytes()), + // TODO, see https://github.com/libp2p/specs/pull/217 + signed_peer_record: None, + }) + .collect(), + backoff, + }; + control.prune.push(rpc_prune); } } } + + proto::RPC { + subscriptions, + publish, + control: if empty_control_msg { + None + } else { + Some(control) + }, + } + } +} + +impl fmt::Debug for Rpc { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut b = f.debug_struct("GossipsubRpc"); + if !self.messages.is_empty() { + b.field("messages", &self.messages); + } + if !self.subscriptions.is_empty() { + b.field("subscriptions", &self.subscriptions); + } + if !self.control_msgs.is_empty() { + b.field("control_msgs", &self.control_msgs); + } + b.finish() } }