Skip to content

Commit

Permalink
protocols/gossipsub: Fix inconsistency in mesh peer tracking (libp2p#…
Browse files Browse the repository at this point in the history
…2189)

Co-authored-by: Age Manning <[email protected]>
  • Loading branch information
2 people authored and ethDreamer committed Aug 11, 2021
1 parent 074c478 commit 0fffb85
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 40 deletions.
10 changes: 10 additions & 0 deletions protocols/gossipsub/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
# 0.33.0 [unreleased]

- Improve internal peer tracking.
[PR 2175](https://github.com/libp2p/rust-libp2p/pull/2175)

- Update dependencies.

- Allow `message_id_fn`s to accept closures that capture variables.
[PR 2103](https://github.com/libp2p/rust-libp2p/pull/2103)

# 0.32.0 [2021-07-12]

- Update dependencies.
Expand Down
84 changes: 44 additions & 40 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1339,6 +1339,15 @@ where

let mut do_px = self.config.do_px();

// For each topic, if a peer has grafted us, then we necessarily must be in their mesh
// and they must be subscribed to the topic. Ensure we have recorded the mapping.
for topic in &topics {
self.peer_topics
.entry(*peer_id)
.or_default()
.insert(topic.clone());
}

// we don't GRAFT to/from explicit peers; complain loudly if this happens
if self.explicit_peers.contains(peer_id) {
warn!("GRAFT: ignoring request from direct peer {}", peer_id);
Expand Down Expand Up @@ -1381,7 +1390,7 @@ where
peer_score.add_penalty(peer_id, 1);
}
}
//no PX
// no PX
do_px = false;

to_prune_topics.insert(topic_hash.clone());
Expand Down Expand Up @@ -2979,34 +2988,33 @@ where
// Ignore connections from blacklisted peers.
if self.blacklisted_peers.contains(peer_id) {
debug!("Ignoring connection from blacklisted peer: {}", peer_id);
return;
}

debug!("New peer connected: {}", peer_id);
// We need to send our subscriptions to the newly-connected node.
let mut subscriptions = vec![];
for topic_hash in self.mesh.keys() {
subscriptions.push(GossipsubSubscription {
topic_hash: topic_hash.clone(),
action: GossipsubSubscriptionAction::Subscribe,
});
}
} else {
debug!("New peer connected: {}", peer_id);
// We need to send our subscriptions to the newly-connected node.
let mut subscriptions = vec![];
for topic_hash in self.mesh.keys() {
subscriptions.push(GossipsubSubscription {
topic_hash: topic_hash.clone(),
action: GossipsubSubscriptionAction::Subscribe,
});
}

if !subscriptions.is_empty() {
// send our subscriptions to the peer
if self
.send_message(
*peer_id,
GossipsubRpc {
messages: Vec::new(),
subscriptions,
control_msgs: Vec::new(),
}
.into_protobuf(),
)
.is_err()
{
error!("Failed to send subscriptions, message too large");
if !subscriptions.is_empty() {
// send our subscriptions to the peer
if self
.send_message(
*peer_id,
GossipsubRpc {
messages: Vec::new(),
subscriptions,
control_msgs: Vec::new(),
}
.into_protobuf(),
)
.is_err()
{
error!("Failed to send subscriptions, message too large");
}
}
}

Expand All @@ -3025,9 +3033,10 @@ where
let topics = match self.peer_topics.get(peer_id) {
Some(topics) => (topics),
None => {
if !self.blacklisted_peers.contains(peer_id) {
debug!("Disconnected node, not in connected nodes");
}
debug_assert!(
self.blacklisted_peers.contains(peer_id),
"Disconnected node not in connected list"
);
return;
}
};
Expand Down Expand Up @@ -3065,12 +3074,12 @@ where
.get_mut(&topic)
.map(|peers| peers.remove(peer_id));
}

//forget px and outbound status for this peer
self.px_peers.remove(peer_id);
self.outbound_peers.remove(peer_id);
}

// Forget px and outbound status for this peer
self.px_peers.remove(peer_id);
self.outbound_peers.remove(peer_id);

// Remove peer from peer_topics and connected_peers
// NOTE: It is possible the peer has already been removed from all mappings if it does not
// support the protocol.
Expand All @@ -3088,11 +3097,6 @@ where
connection_id: &ConnectionId,
endpoint: &ConnectedPoint,
) {
// Ignore connections from blacklisted peers.
if self.blacklisted_peers.contains(peer_id) {
return;
}

// Check if the peer is an outbound peer
if let ConnectedPoint::Dialer { .. } = endpoint {
// Diverging from the go implementation we only want to consider a peer as outbound peer
Expand Down
32 changes: 32 additions & 0 deletions protocols/gossipsub/src/behaviour/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5228,4 +5228,36 @@ mod tests {
//nobody got penalized
assert!(gs1.peer_score.as_ref().unwrap().0.score(&p2) >= original_score);
}

#[test]
/// Test nodes that send grafts without subscriptions.
fn test_graft_without_subscribe() {
// The node should:
// - Create an empty vector in mesh[topic]
// - Send subscription request to all peers
// - run JOIN(topic)

let topic = String::from("test_subscribe");
let subscribe_topic = vec![topic.clone()];
let subscribe_topic_hash = vec![Topic::new(topic.clone()).hash()];
let (mut gs, peers, topic_hashes) = inject_nodes1()
.peer_no(1)
.topics(subscribe_topic)
.to_subscribe(false)
.create_network();

assert!(
gs.mesh.get(&topic_hashes[0]).is_some(),
"Subscribe should add a new entry to the mesh[topic] hashmap"
);

// The node sends a graft for the subscribe topic.
gs.handle_graft(&peers[0], subscribe_topic_hash);

// The node disconnects
gs.inject_disconnected(&peers[0]);

// We unsubscribe from the topic.
let _ = gs.unsubscribe(&Topic::new(topic));
}
}

0 comments on commit 0fffb85

Please sign in to comment.