From 0fffb85a3c358018adb86ff5c6235217635cf5b4 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 11 Aug 2021 17:37:10 +0200 Subject: [PATCH] protocols/gossipsub: Fix inconsistency in mesh peer tracking (#2189) Co-authored-by: Age Manning --- protocols/gossipsub/CHANGELOG.md | 10 +++ protocols/gossipsub/src/behaviour.rs | 84 +++++++++++----------- protocols/gossipsub/src/behaviour/tests.rs | 32 +++++++++ 3 files changed, 86 insertions(+), 40 deletions(-) diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index 0f2a4d6e40a..69e0f4f3694 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -1,3 +1,13 @@ +# 0.33.0 [unreleased] + +- Improve internal peer tracking. + [PR 2175](https://github.com/libp2p/rust-libp2p/pull/2175) + +- Update dependencies. + +- Allow `message_id_fn`s to accept closures that capture variables. + [PR 2103](https://github.com/libp2p/rust-libp2p/pull/2103) + # 0.32.0 [2021-07-12] - Update dependencies. diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 7a0b705fffd..340838ee423 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -1339,6 +1339,15 @@ where let mut do_px = self.config.do_px(); + // For each topic, if a peer has grafted us, then we necessarily must be in their mesh + // and they must be subscribed to the topic. Ensure we have recorded the mapping. + for topic in &topics { + self.peer_topics + .entry(*peer_id) + .or_default() + .insert(topic.clone()); + } + // we don't GRAFT to/from explicit peers; complain loudly if this happens if self.explicit_peers.contains(peer_id) { warn!("GRAFT: ignoring request from direct peer {}", peer_id); @@ -1381,7 +1390,7 @@ where peer_score.add_penalty(peer_id, 1); } } - //no PX + // no PX do_px = false; to_prune_topics.insert(topic_hash.clone()); @@ -2979,34 +2988,33 @@ where // Ignore connections from blacklisted peers. if self.blacklisted_peers.contains(peer_id) { debug!("Ignoring connection from blacklisted peer: {}", peer_id); - return; - } - - debug!("New peer connected: {}", peer_id); - // We need to send our subscriptions to the newly-connected node. - let mut subscriptions = vec![]; - for topic_hash in self.mesh.keys() { - subscriptions.push(GossipsubSubscription { - topic_hash: topic_hash.clone(), - action: GossipsubSubscriptionAction::Subscribe, - }); - } + } else { + debug!("New peer connected: {}", peer_id); + // We need to send our subscriptions to the newly-connected node. + let mut subscriptions = vec![]; + for topic_hash in self.mesh.keys() { + subscriptions.push(GossipsubSubscription { + topic_hash: topic_hash.clone(), + action: GossipsubSubscriptionAction::Subscribe, + }); + } - if !subscriptions.is_empty() { - // send our subscriptions to the peer - if self - .send_message( - *peer_id, - GossipsubRpc { - messages: Vec::new(), - subscriptions, - control_msgs: Vec::new(), - } - .into_protobuf(), - ) - .is_err() - { - error!("Failed to send subscriptions, message too large"); + if !subscriptions.is_empty() { + // send our subscriptions to the peer + if self + .send_message( + *peer_id, + GossipsubRpc { + messages: Vec::new(), + subscriptions, + control_msgs: Vec::new(), + } + .into_protobuf(), + ) + .is_err() + { + error!("Failed to send subscriptions, message too large"); + } } } @@ -3025,9 +3033,10 @@ where let topics = match self.peer_topics.get(peer_id) { Some(topics) => (topics), None => { - if !self.blacklisted_peers.contains(peer_id) { - debug!("Disconnected node, not in connected nodes"); - } + debug_assert!( + self.blacklisted_peers.contains(peer_id), + "Disconnected node not in connected list" + ); return; } }; @@ -3065,12 +3074,12 @@ where .get_mut(&topic) .map(|peers| peers.remove(peer_id)); } - - //forget px and outbound status for this peer - self.px_peers.remove(peer_id); - self.outbound_peers.remove(peer_id); } + // Forget px and outbound status for this peer + self.px_peers.remove(peer_id); + self.outbound_peers.remove(peer_id); + // Remove peer from peer_topics and connected_peers // NOTE: It is possible the peer has already been removed from all mappings if it does not // support the protocol. @@ -3088,11 +3097,6 @@ where connection_id: &ConnectionId, endpoint: &ConnectedPoint, ) { - // Ignore connections from blacklisted peers. - if self.blacklisted_peers.contains(peer_id) { - return; - } - // Check if the peer is an outbound peer if let ConnectedPoint::Dialer { .. } = endpoint { // Diverging from the go implementation we only want to consider a peer as outbound peer diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 81b2267fdbb..463452b0cb6 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -5228,4 +5228,36 @@ mod tests { //nobody got penalized assert!(gs1.peer_score.as_ref().unwrap().0.score(&p2) >= original_score); } + + #[test] + /// Test nodes that send grafts without subscriptions. + fn test_graft_without_subscribe() { + // The node should: + // - Create an empty vector in mesh[topic] + // - Send subscription request to all peers + // - run JOIN(topic) + + let topic = String::from("test_subscribe"); + let subscribe_topic = vec![topic.clone()]; + let subscribe_topic_hash = vec![Topic::new(topic.clone()).hash()]; + let (mut gs, peers, topic_hashes) = inject_nodes1() + .peer_no(1) + .topics(subscribe_topic) + .to_subscribe(false) + .create_network(); + + assert!( + gs.mesh.get(&topic_hashes[0]).is_some(), + "Subscribe should add a new entry to the mesh[topic] hashmap" + ); + + // The node sends a graft for the subscribe topic. + gs.handle_graft(&peers[0], subscribe_topic_hash); + + // The node disconnects + gs.inject_disconnected(&peers[0]); + + // We unsubscribe from the topic. + let _ = gs.unsubscribe(&Topic::new(topic)); + } }