From e741065ecbeec817af14d90633f9f6db1a2ab754 Mon Sep 17 00:00:00 2001 From: Diva M Date: Thu, 20 Jan 2022 15:52:24 -0500 Subject: [PATCH 01/10] add other_established to inject_connection_established --- protocols/autonat/src/behaviour.rs | 10 ++++++++-- protocols/gossipsub/src/behaviour.rs | 1 + protocols/gossipsub/src/behaviour/tests.rs | 6 +++++- protocols/identify/src/identify.rs | 1 + protocols/kad/src/behaviour.rs | 1 + protocols/kad/src/behaviour/test.rs | 2 +- protocols/relay/src/v1/behaviour.rs | 1 + protocols/relay/src/v2/client.rs | 1 + protocols/request-response/src/lib.rs | 1 + swarm-derive/src/lib.rs | 6 +++--- swarm/src/behaviour.rs | 1 + swarm/src/behaviour/either.rs | 21 +++++++++++++++------ swarm/src/behaviour/toggle.rs | 9 ++++++++- swarm/src/lib.rs | 16 +++++++++++----- swarm/src/test.rs | 4 +++- 15 files changed, 61 insertions(+), 20 deletions(-) diff --git a/protocols/autonat/src/behaviour.rs b/protocols/autonat/src/behaviour.rs index a1a8bb6ff21..c43a4f5d1db 100644 --- a/protocols/autonat/src/behaviour.rs +++ b/protocols/autonat/src/behaviour.rs @@ -303,9 +303,15 @@ impl NetworkBehaviour for Behaviour { conn: &ConnectionId, endpoint: &ConnectedPoint, failed_addresses: Option<&Vec>, + other_established: usize, ) { - self.inner - .inject_connection_established(peer, conn, endpoint, failed_addresses); + self.inner.inject_connection_established( + peer, + conn, + endpoint, + failed_addresses, + other_established, + ); let connections = self.connected.entry(*peer).or_default(); let addr = if endpoint.is_relayed() { None diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 34fe368ef40..b58395ab638 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -3183,6 +3183,7 @@ where connection_id: &ConnectionId, endpoint: &ConnectedPoint, _: Option<&Vec>, + other_established: usize, ) { // Check if the peer is an outbound peer if let ConnectedPoint::Dialer { .. } = endpoint { diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 1183f83b83c..d2515329c7c 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -185,7 +185,6 @@ mod tests { F: TopicSubscriptionFilter + Clone + Default + Send + 'static, { let peer = PeerId::random(); - //peers.push(peer.clone()); gs.inject_connection_established( &peer, &ConnectionId::new(0), @@ -201,6 +200,7 @@ mod tests { } }, None, + 0, // first connection ); as NetworkBehaviour>::inject_connected(gs, &peer); if let Some(kind) = kind { @@ -541,6 +541,7 @@ mod tests { role_override: Endpoint::Dialer, }, None, + 0, ); gs.inject_connected(&random_peer); @@ -4154,6 +4155,7 @@ mod tests { role_override: Endpoint::Dialer, }, None, + 0, ); } @@ -4174,6 +4176,7 @@ mod tests { role_override: Endpoint::Dialer, }, None, + 1, ); } @@ -4203,6 +4206,7 @@ mod tests { role_override: Endpoint::Dialer, }, None, + 2, ); //nothing changed diff --git a/protocols/identify/src/identify.rs b/protocols/identify/src/identify.rs index 2e644502014..b055afdd29d 100644 --- a/protocols/identify/src/identify.rs +++ b/protocols/identify/src/identify.rs @@ -226,6 +226,7 @@ impl NetworkBehaviour for Identify { conn: &ConnectionId, endpoint: &ConnectedPoint, failed_addresses: Option<&Vec>, + other_established: usize, ) { let addr = match endpoint { ConnectedPoint::Dialer { address, .. } => address.clone(), diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index 335e2eba8e1..7bb64ecd467 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -1815,6 +1815,7 @@ where _: &ConnectionId, _: &ConnectedPoint, errors: Option<&Vec>, + other_established: usize, ) { for addr in errors.map(|a| a.into_iter()).into_iter().flatten() { self.address_failed(*peer_id, addr); diff --git a/protocols/kad/src/behaviour/test.rs b/protocols/kad/src/behaviour/test.rs index 21eb76c1d25..f25b610bb89 100644 --- a/protocols/kad/src/behaviour/test.rs +++ b/protocols/kad/src/behaviour/test.rs @@ -1291,7 +1291,7 @@ fn network_behaviour_inject_address_change() { }; // Mimick a connection being established. - kademlia.inject_connection_established(&remote_peer_id, &connection_id, &endpoint, None); + kademlia.inject_connection_established(&remote_peer_id, &connection_id, &endpoint, None, 0); kademlia.inject_connected(&remote_peer_id); // At this point the remote is not yet known to support the diff --git a/protocols/relay/src/v1/behaviour.rs b/protocols/relay/src/v1/behaviour.rs index 953be32d6bd..860912cd821 100644 --- a/protocols/relay/src/v1/behaviour.rs +++ b/protocols/relay/src/v1/behaviour.rs @@ -205,6 +205,7 @@ impl NetworkBehaviour for Relay { connection_id: &ConnectionId, _: &ConnectedPoint, _: Option<&Vec>, + other_established: usize, ) { let is_first = self .connected_peers diff --git a/protocols/relay/src/v2/client.rs b/protocols/relay/src/v2/client.rs index e0459768879..902d7f389ef 100644 --- a/protocols/relay/src/v2/client.rs +++ b/protocols/relay/src/v2/client.rs @@ -129,6 +129,7 @@ impl NetworkBehaviour for Client { connection_id: &ConnectionId, endpoint: &ConnectedPoint, _failed_addresses: Option<&Vec>, + other_established: usize, ) { if !endpoint.is_relayed() { self.directly_connected_peers diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index af6b601eae0..d67293f12d6 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -629,6 +629,7 @@ where conn: &ConnectionId, endpoint: &ConnectedPoint, _errors: Option<&Vec>, + other_established: usize, ) { let address = match endpoint { ConnectedPoint::Dialer { address, .. } => Some(address.clone()), diff --git a/swarm-derive/src/lib.rs b/swarm-derive/src/lib.rs index e221a349f38..96b95a0a6ee 100644 --- a/swarm-derive/src/lib.rs +++ b/swarm-derive/src/lib.rs @@ -204,8 +204,8 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { return None; } Some(match field.ident { - Some(ref i) => quote!{ self.#i.inject_connection_established(peer_id, connection_id, endpoint, errors); }, - None => quote!{ self.#field_n.inject_connection_established(peer_id, connection_id, endpoint, errors); }, + Some(ref i) => quote!{ self.#i.inject_connection_established(peer_id, connection_id, endpoint, errors, other_established); }, + None => quote!{ self.#field_n.inject_connection_established(peer_id, connection_id, endpoint, errors, other_established); }, }) }) }; @@ -657,7 +657,7 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { #(#inject_disconnected_stmts);* } - fn inject_connection_established(&mut self, peer_id: &#peer_id, connection_id: &#connection_id, endpoint: &#connected_point, errors: #dial_errors) { + fn inject_connection_established(&mut self, peer_id: &#peer_id, connection_id: &#connection_id, endpoint: &#connected_point, errors: #dial_errors, other_established: usize) { #(#inject_connection_established_stmts);* } diff --git a/swarm/src/behaviour.rs b/swarm/src/behaviour.rs index 269bddf3bad..c2c16a5669b 100644 --- a/swarm/src/behaviour.rs +++ b/swarm/src/behaviour.rs @@ -214,6 +214,7 @@ pub trait NetworkBehaviour: Send + 'static { _connection_id: &ConnectionId, _endpoint: &ConnectedPoint, _failed_addresses: Option<&Vec>, + _other_established: usize, ) { } diff --git a/swarm/src/behaviour/either.rs b/swarm/src/behaviour/either.rs index 3dd6d28a3d2..d328c90b39e 100644 --- a/swarm/src/behaviour/either.rs +++ b/swarm/src/behaviour/either.rs @@ -73,14 +73,23 @@ where connection: &ConnectionId, endpoint: &ConnectedPoint, errors: Option<&Vec>, + other_established: usize, ) { match self { - Either::Left(a) => { - a.inject_connection_established(peer_id, connection, endpoint, errors) - } - Either::Right(b) => { - b.inject_connection_established(peer_id, connection, endpoint, errors) - } + Either::Left(a) => a.inject_connection_established( + peer_id, + connection, + endpoint, + errors, + other_established, + ), + Either::Right(b) => b.inject_connection_established( + peer_id, + connection, + endpoint, + errors, + other_established, + ), } } diff --git a/swarm/src/behaviour/toggle.rs b/swarm/src/behaviour/toggle.rs index b6af0e38237..8b09fa2bd85 100644 --- a/swarm/src/behaviour/toggle.rs +++ b/swarm/src/behaviour/toggle.rs @@ -104,9 +104,16 @@ where connection: &ConnectionId, endpoint: &ConnectedPoint, errors: Option<&Vec>, + other_established: usize, ) { if let Some(inner) = self.inner.as_mut() { - inner.inject_connection_established(peer_id, connection, endpoint, errors) + inner.inject_connection_established( + peer_id, + connection, + endpoint, + errors, + other_established, + ) } } diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 38ed1143d86..4323fa1d4a4 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -662,12 +662,20 @@ where u32::try_from(other_established_connection_ids.len() + 1).unwrap(), ) .expect("n + 1 is always non-zero; qed"); + let first_non_banned = other_established_connection_ids + .iter() + .all(|conn_id| this.banned_peer_connections.contains(conn_id)); + let non_banned_established = other_established_connection_ids + .into_iter() + .filter(|conn_id| !this.banned_peer_connections.contains(&conn_id)) + .count(); log::debug!( - "Connection established: {:?} {:?}; Total (peer): {}.", + "Connection established: {:?} {:?}; Total (peer): {}. Total non-banned (peer): {}", connection.peer_id(), connection.endpoint(), - num_established + num_established, + non_banned_established + 1, ); let endpoint = connection.endpoint().clone(); let failed_addresses = concurrent_dial_errors @@ -678,13 +686,11 @@ where &connection.id(), &endpoint, failed_addresses.as_ref(), + non_banned_established, ); // The peer is not banned, but there could be previous banned connections // if the peer was just unbanned. Check if this is the first non-banned // connection. - let first_non_banned = other_established_connection_ids - .into_iter() - .all(|conn_id| this.banned_peer_connections.contains(&conn_id)); if first_non_banned { this.behaviour.inject_connected(&peer_id); } diff --git a/swarm/src/test.rs b/swarm/src/test.rs index 22f7a51fa9b..29e366dd05a 100644 --- a/swarm/src/test.rs +++ b/swarm/src/test.rs @@ -235,10 +235,12 @@ where c: &ConnectionId, e: &ConnectedPoint, errors: Option<&Vec>, + other_established: usize, ) { self.inject_connection_established .push((p.clone(), c.clone(), e.clone())); - self.inner.inject_connection_established(p, c, e, errors); + self.inner + .inject_connection_established(p, c, e, errors, other_established); } fn inject_disconnected(&mut self, peer: &PeerId) { From 322b6f8952eb42fda99529e2aa6543a994f7838d Mon Sep 17 00:00:00 2001 From: Diva M Date: Thu, 20 Jan 2022 16:23:22 -0500 Subject: [PATCH 02/10] remove inject_connected --- protocols/autonat/src/behaviour.rs | 4 - protocols/floodsub/src/layer.rs | 15 ++- protocols/gossipsub/src/behaviour.rs | 103 ++++++++++---------- protocols/gossipsub/src/behaviour/tests.rs | 2 - protocols/identify/src/identify.rs | 2 +- protocols/kad/src/behaviour.rs | 39 ++++---- protocols/kad/src/behaviour/test.rs | 1 - protocols/relay/src/v1/behaviour.rs | 108 ++++++++++----------- protocols/relay/src/v2/client.rs | 2 +- protocols/request-response/src/lib.rs | 18 ++-- swarm-derive/src/lib.rs | 21 ---- swarm/src/behaviour.rs | 8 -- swarm/src/behaviour/either.rs | 7 -- swarm/src/behaviour/toggle.rs | 6 -- swarm/src/lib.rs | 9 -- swarm/src/test.rs | 11 --- 16 files changed, 144 insertions(+), 212 deletions(-) diff --git a/protocols/autonat/src/behaviour.rs b/protocols/autonat/src/behaviour.rs index c43a4f5d1db..ad5bd4a106f 100644 --- a/protocols/autonat/src/behaviour.rs +++ b/protocols/autonat/src/behaviour.rs @@ -467,10 +467,6 @@ impl NetworkBehaviour for Behaviour { self.inner.addresses_of_peer(peer) } - fn inject_connected(&mut self, peer: &PeerId) { - self.inner.inject_connected(peer) - } - fn inject_event( &mut self, peer_id: PeerId, diff --git a/protocols/floodsub/src/layer.rs b/protocols/floodsub/src/layer.rs index b2093c763d6..d8778017e1e 100644 --- a/protocols/floodsub/src/layer.rs +++ b/protocols/floodsub/src/layer.rs @@ -27,6 +27,7 @@ use crate::FloodsubConfig; use cuckoofilter::{CuckooError, CuckooFilter}; use fnv::FnvHashSet; use libp2p_core::{connection::ConnectionId, PeerId}; +use libp2p_core::{ConnectedPoint, Multiaddr}; use libp2p_swarm::{ dial_opts::{self, DialOpts}, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, OneShotHandler, PollParameters, @@ -287,7 +288,19 @@ impl NetworkBehaviour for Floodsub { Default::default() } - fn inject_connected(&mut self, id: &PeerId) { + fn inject_connection_established( + &mut self, + id: &PeerId, + _: &ConnectionId, + _: &ConnectedPoint, + _: Option<&Vec>, + other_established: usize, + ) { + if other_established > 0 { + // We only care about the first time a peer connects. + return; + } + // We need to send our subscriptions to the newly-connected node. if self.target_peers.contains(id) { for topic in self.subscribed_topics.iter().cloned() { diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index b58395ab638..2452465ee27 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -3055,48 +3055,6 @@ where ) } - fn inject_connected(&mut self, peer_id: &PeerId) { - // Ignore connections from blacklisted peers. - if self.blacklisted_peers.contains(peer_id) { - debug!("Ignoring connection from blacklisted peer: {}", peer_id); - } else { - debug!("New peer connected: {}", peer_id); - // We need to send our subscriptions to the newly-connected node. - let mut subscriptions = vec![]; - for topic_hash in self.mesh.keys() { - subscriptions.push(GossipsubSubscription { - topic_hash: topic_hash.clone(), - action: GossipsubSubscriptionAction::Subscribe, - }); - } - - if !subscriptions.is_empty() { - // send our subscriptions to the peer - if self - .send_message( - *peer_id, - GossipsubRpc { - messages: Vec::new(), - subscriptions, - control_msgs: Vec::new(), - } - .into_protobuf(), - ) - .is_err() - { - error!("Failed to send subscriptions, message too large"); - } - } - } - - // Insert an empty set of the topics of this peer until known. - self.peer_topics.insert(*peer_id, Default::default()); - - if let Some((peer_score, ..)) = &mut self.peer_score { - peer_score.add_peer(*peer_id); - } - } - fn inject_disconnected(&mut self, peer_id: &PeerId) { // remove from mesh, topic_peers, peer_topic and the fanout debug!("Peer disconnected: {}", peer_id); @@ -3185,18 +3143,13 @@ where _: Option<&Vec>, other_established: usize, ) { - // Check if the peer is an outbound peer - if let ConnectedPoint::Dialer { .. } = endpoint { - // Diverging from the go implementation we only want to consider a peer as outbound peer - // if its first connection is outbound. To check if this connection is the first we - // check if the peer isn't connected yet. This only works because the - // `inject_connection_established` event for the first connection gets called immediately - // before `inject_connected` gets called. - if !self.peer_topics.contains_key(peer_id) && !self.px_peers.contains(peer_id) { - // The first connection is outbound and it is not a peer from peer exchange => mark - // it as outbound peer - self.outbound_peers.insert(*peer_id); - } + // Diverging from the go implementation we only want to consider a peer as outbound peer + // if its first connection is outbound. + + if endpoint.is_dialer() && other_established == 0 && !self.px_peers.contains(peer_id) { + // The first connection is outbound and it is not a peer from peer exchange => mark + // it as outbound peer + self.outbound_peers.insert(*peer_id); } // Add the IP to the peer scoring system @@ -3225,6 +3178,48 @@ where }) .connections .push(*connection_id); + + if other_established == 0 { + // Ignore connections from blacklisted peers. + if self.blacklisted_peers.contains(peer_id) { + debug!("Ignoring connection from blacklisted peer: {}", peer_id); + } else { + debug!("New peer connected: {}", peer_id); + // We need to send our subscriptions to the newly-connected node. + let mut subscriptions = vec![]; + for topic_hash in self.mesh.keys() { + subscriptions.push(GossipsubSubscription { + topic_hash: topic_hash.clone(), + action: GossipsubSubscriptionAction::Subscribe, + }); + } + + if !subscriptions.is_empty() { + // send our subscriptions to the peer + if self + .send_message( + *peer_id, + GossipsubRpc { + messages: Vec::new(), + subscriptions, + control_msgs: Vec::new(), + } + .into_protobuf(), + ) + .is_err() + { + error!("Failed to send subscriptions, message too large"); + } + } + } + + // Insert an empty set of the topics of this peer until known. + self.peer_topics.insert(*peer_id, Default::default()); + + if let Some((peer_score, ..)) = &mut self.peer_score { + peer_score.add_peer(*peer_id); + } + } } fn inject_connection_closed( diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index d2515329c7c..64adb8424f4 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -202,7 +202,6 @@ mod tests { None, 0, // first connection ); - as NetworkBehaviour>::inject_connected(gs, &peer); if let Some(kind) = kind { gs.inject_event( peer.clone(), @@ -543,7 +542,6 @@ mod tests { None, 0, ); - gs.inject_connected(&random_peer); // add the new peer to the fanout let fanout_peers = gs.fanout.get_mut(&topic_hashes[1]).unwrap(); diff --git a/protocols/identify/src/identify.rs b/protocols/identify/src/identify.rs index b055afdd29d..02817c72a27 100644 --- a/protocols/identify/src/identify.rs +++ b/protocols/identify/src/identify.rs @@ -226,7 +226,7 @@ impl NetworkBehaviour for Identify { conn: &ConnectionId, endpoint: &ConnectedPoint, failed_addresses: Option<&Vec>, - other_established: usize, + _other_established: usize, ) { let addr = match endpoint { ConnectedPoint::Dialer { address, .. } => address.clone(), diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index 7bb64ecd467..290484864d5 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -1825,27 +1825,28 @@ where // remote supports the configured protocol name. Only once a connection // handler reports [`KademliaHandlerEvent::ProtocolConfirmed`] do we // update the local routing table. - } - fn inject_connected(&mut self, peer: &PeerId) { - // Queue events for sending pending RPCs to the connected peer. - // There can be only one pending RPC for a particular peer and query per definition. - for (peer_id, event) in self.queries.iter_mut().filter_map(|q| { - q.inner - .pending_rpcs - .iter() - .position(|(p, _)| p == peer) - .map(|p| q.inner.pending_rpcs.remove(p)) - }) { - self.queued_events - .push_back(NetworkBehaviourAction::NotifyHandler { - peer_id, - event, - handler: NotifyHandler::Any, - }); - } + // Peer's first connection. + if other_established == 0 { + // Queue events for sending pending RPCs to the connected peer. + // There can be only one pending RPC for a particular peer and query per definition. + for (peer_id, event) in self.queries.iter_mut().filter_map(|q| { + q.inner + .pending_rpcs + .iter() + .position(|(p, _)| p == peer_id) + .map(|p| q.inner.pending_rpcs.remove(p)) + }) { + self.queued_events + .push_back(NetworkBehaviourAction::NotifyHandler { + peer_id, + event, + handler: NotifyHandler::Any, + }); + } - self.connected_peers.insert(*peer); + self.connected_peers.insert(*peer_id); + } } fn inject_address_change( diff --git a/protocols/kad/src/behaviour/test.rs b/protocols/kad/src/behaviour/test.rs index f25b610bb89..163c91ab5b3 100644 --- a/protocols/kad/src/behaviour/test.rs +++ b/protocols/kad/src/behaviour/test.rs @@ -1292,7 +1292,6 @@ fn network_behaviour_inject_address_change() { // Mimick a connection being established. kademlia.inject_connection_established(&remote_peer_id, &connection_id, &endpoint, None, 0); - kademlia.inject_connected(&remote_peer_id); // At this point the remote is not yet known to support the // configured protocol name, so the peer is not yet in the diff --git a/protocols/relay/src/v1/behaviour.rs b/protocols/relay/src/v1/behaviour.rs index 860912cd821..1c47067aa29 100644 --- a/protocols/relay/src/v1/behaviour.rs +++ b/protocols/relay/src/v1/behaviour.rs @@ -239,69 +239,61 @@ impl NetworkBehaviour for Relay { }, ); } - } - - fn inject_connected(&mut self, peer_id: &PeerId) { - assert!( - self.connected_peers - .get(peer_id) - .map(|cs| !cs.is_empty()) - .unwrap_or(false), - "Expect to be connected to peer with at least one connection." - ); - if let Some(reqs) = self.outgoing_relay_reqs.dialing.remove(peer_id) { - for req in reqs { - let OutgoingDialingRelayReq { - request_id, - src_peer_id, - relay_addr: _, - dst_addr, - dst_peer_id, - send_back, - } = req; - self.outbox_to_swarm - .push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: *peer_id, - handler: NotifyHandler::Any, - event: RelayHandlerIn::OutgoingRelayReq { - src_peer_id, - request_id, - dst_peer_id, - dst_addr: dst_addr.clone(), - }, - }); + if other_established == 0 { + if let Some(reqs) = self.outgoing_relay_reqs.dialing.remove(peer) { + for req in reqs { + let OutgoingDialingRelayReq { + request_id, + src_peer_id, + relay_addr: _, + dst_addr, + dst_peer_id, + send_back, + } = req; + self.outbox_to_swarm + .push_back(NetworkBehaviourAction::NotifyHandler { + peer_id: *peer, + handler: NotifyHandler::Any, + event: RelayHandlerIn::OutgoingRelayReq { + src_peer_id, + request_id, + dst_peer_id, + dst_addr: dst_addr.clone(), + }, + }); - self.outgoing_relay_reqs - .upgrading - .insert(request_id, OutgoingUpgradingRelayReq { send_back }); + self.outgoing_relay_reqs + .upgrading + .insert(request_id, OutgoingUpgradingRelayReq { send_back }); + } } - } - // Ask the newly-opened connection to be used as destination if relevant. - if let Some(reqs) = self.incoming_relay_reqs.remove(peer_id) { - for req in reqs { - let IncomingRelayReq::DialingDst { - src_peer_id, - src_addr, - src_connection_id, - request_id, - incoming_relay_req, - } = req; - let event = RelayHandlerIn::OutgoingDstReq { - src_peer_id, - src_addr, - src_connection_id, - request_id, - incoming_relay_req, - }; + // Ask the newly-opened connection to be used as destination if relevant. + if let Some(reqs) = self.incoming_relay_reqs.remove(peer) { + for req in reqs { + let IncomingRelayReq::DialingDst { + src_peer_id, + src_addr, + src_connection_id, + request_id, + incoming_relay_req, + } = req; + let event = RelayHandlerIn::OutgoingDstReq { + src_peer_id, + src_addr, + src_connection_id, + request_id, + incoming_relay_req, + }; - self.outbox_to_swarm - .push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: *peer_id, - handler: NotifyHandler::Any, - event, - }); + self.outbox_to_swarm + .push_back(NetworkBehaviourAction::NotifyHandler { + peer_id: *peer, + handler: NotifyHandler::Any, + event, + }); + } } } } diff --git a/protocols/relay/src/v2/client.rs b/protocols/relay/src/v2/client.rs index 902d7f389ef..eb4645e5df1 100644 --- a/protocols/relay/src/v2/client.rs +++ b/protocols/relay/src/v2/client.rs @@ -129,7 +129,7 @@ impl NetworkBehaviour for Client { connection_id: &ConnectionId, endpoint: &ConnectedPoint, _failed_addresses: Option<&Vec>, - other_established: usize, + _other_established: usize, ) { if !endpoint.is_relayed() { self.directly_connected_peers diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index d67293f12d6..e1460f48121 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -614,15 +614,6 @@ where connection.address = new_address; } - fn inject_connected(&mut self, peer: &PeerId) { - if let Some(pending) = self.pending_outbound_requests.remove(peer) { - for request in pending { - let request = self.try_send_request(peer, request); - assert!(request.is_none()); - } - } - } - fn inject_connection_established( &mut self, peer: &PeerId, @@ -639,6 +630,15 @@ where .entry(*peer) .or_default() .push(Connection::new(*conn, address)); + + if other_established == 0 { + if let Some(pending) = self.pending_outbound_requests.remove(peer) { + for request in pending { + let request = self.try_send_request(peer, request); + assert!(request.is_none()); + } + } + } } fn inject_connection_closed( diff --git a/swarm-derive/src/lib.rs b/swarm-derive/src/lib.rs index 96b95a0a6ee..ffaafc6e821 100644 --- a/swarm-derive/src/lib.rs +++ b/swarm-derive/src/lib.rs @@ -163,23 +163,6 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { }) }; - // Build the list of statements to put in the body of `inject_connected()`. - let inject_connected_stmts = { - data_struct - .fields - .iter() - .enumerate() - .filter_map(move |(field_n, field)| { - if is_ignored(field) { - return None; - } - Some(match field.ident { - Some(ref i) => quote! { self.#i.inject_connected(peer_id); }, - None => quote! { self.#field_n.inject_connected(peer_id); }, - }) - }) - }; - // Build the list of statements to put in the body of `inject_disconnected()`. let inject_disconnected_stmts = { data_struct @@ -649,10 +632,6 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { out } - fn inject_connected(&mut self, peer_id: &#peer_id) { - #(#inject_connected_stmts);* - } - fn inject_disconnected(&mut self, peer_id: &#peer_id) { #(#inject_disconnected_stmts);* } diff --git a/swarm/src/behaviour.rs b/swarm/src/behaviour.rs index c2c16a5669b..5bcc9f2b343 100644 --- a/swarm/src/behaviour.rs +++ b/swarm/src/behaviour.rs @@ -190,14 +190,6 @@ pub trait NetworkBehaviour: Send + 'static { vec![] } - /// Indicate to the behaviour that we connected to the node with the given peer id. - /// - /// This node now has a handler (as spawned by `new_handler`) running in the background. - /// - /// This method is only called when the first connection to the peer is established, preceded by - /// [`inject_connection_established`](NetworkBehaviour::inject_connection_established). - fn inject_connected(&mut self, _: &PeerId) {} - /// Indicates to the behaviour that we disconnected from the node with the given peer id. /// /// There is no handler running anymore for this node. Any event that has been sent to it may diff --git a/swarm/src/behaviour/either.rs b/swarm/src/behaviour/either.rs index d328c90b39e..17fe097b3bc 100644 --- a/swarm/src/behaviour/either.rs +++ b/swarm/src/behaviour/either.rs @@ -53,13 +53,6 @@ where } } - fn inject_connected(&mut self, peer_id: &PeerId) { - match self { - Either::Left(a) => a.inject_connected(peer_id), - Either::Right(b) => b.inject_connected(peer_id), - }; - } - fn inject_disconnected(&mut self, peer_id: &PeerId) { match self { Either::Left(a) => a.inject_disconnected(peer_id), diff --git a/swarm/src/behaviour/toggle.rs b/swarm/src/behaviour/toggle.rs index 8b09fa2bd85..af348f153e5 100644 --- a/swarm/src/behaviour/toggle.rs +++ b/swarm/src/behaviour/toggle.rs @@ -86,12 +86,6 @@ where .unwrap_or_else(Vec::new) } - fn inject_connected(&mut self, peer_id: &PeerId) { - if let Some(inner) = self.inner.as_mut() { - inner.inject_connected(peer_id) - } - } - fn inject_disconnected(&mut self, peer_id: &PeerId) { if let Some(inner) = self.inner.as_mut() { inner.inject_disconnected(peer_id) diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 4323fa1d4a4..c81bf829fef 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -662,9 +662,6 @@ where u32::try_from(other_established_connection_ids.len() + 1).unwrap(), ) .expect("n + 1 is always non-zero; qed"); - let first_non_banned = other_established_connection_ids - .iter() - .all(|conn_id| this.banned_peer_connections.contains(conn_id)); let non_banned_established = other_established_connection_ids .into_iter() .filter(|conn_id| !this.banned_peer_connections.contains(&conn_id)) @@ -688,12 +685,6 @@ where failed_addresses.as_ref(), non_banned_established, ); - // The peer is not banned, but there could be previous banned connections - // if the peer was just unbanned. Check if this is the first non-banned - // connection. - if first_non_banned { - this.behaviour.inject_connected(&peer_id); - } return Poll::Ready(SwarmEvent::ConnectionEstablished { peer_id, num_established, diff --git a/swarm/src/test.rs b/swarm/src/test.rs index 29e366dd05a..dc4b8d95727 100644 --- a/swarm/src/test.rs +++ b/swarm/src/test.rs @@ -218,17 +218,6 @@ where self.inner.addresses_of_peer(p) } - fn inject_connected(&mut self, peer: &PeerId) { - assert!( - self.inject_connection_established - .iter() - .any(|(peer_id, _, _)| peer_id == peer), - "`inject_connected` is called after at least one `inject_connection_established`." - ); - self.inject_connected.push(peer.clone()); - self.inner.inject_connected(peer); - } - fn inject_connection_established( &mut self, p: &PeerId, From 7e9524b4cde47909fe7b09e40cd7d4a9837ce45b Mon Sep 17 00:00:00 2001 From: Diva M Date: Thu, 20 Jan 2022 17:11:38 -0500 Subject: [PATCH 03/10] fix tests --- swarm/src/lib.rs | 2 +- swarm/src/test.rs | 56 +++++++++++++++++++++++++++++++++++++++-------- 2 files changed, 48 insertions(+), 10 deletions(-) diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index c81bf829fef..f19a4f53355 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -1864,7 +1864,7 @@ mod tests { s.behaviour.inject_connection_established.len(), num_connections ); - assert_eq!(s.behaviour.inject_connected.len(), 1); + s.behaviour.assert_connected(num_connections, 1); } if [&swarm1, &swarm2] .iter() diff --git a/swarm/src/test.rs b/swarm/src/test.rs index dc4b8d95727..f3c6d572495 100644 --- a/swarm/src/test.rs +++ b/swarm/src/test.rs @@ -99,9 +99,8 @@ where inner: TInner, pub addresses_of_peer: Vec, - pub inject_connected: Vec, pub inject_disconnected: Vec, - pub inject_connection_established: Vec<(PeerId, ConnectionId, ConnectedPoint)>, + pub inject_connection_established: Vec<(PeerId, ConnectionId, ConnectedPoint, usize)>, pub inject_connection_closed: Vec<(PeerId, ConnectionId, ConnectedPoint)>, pub inject_event: Vec<( PeerId, @@ -127,7 +126,6 @@ where Self { inner, addresses_of_peer: Vec::new(), - inject_connected: Vec::new(), inject_disconnected: Vec::new(), inject_connection_established: Vec::new(), inject_connection_closed: Vec::new(), @@ -147,7 +145,6 @@ where #[allow(dead_code)] pub fn reset(&mut self) { self.addresses_of_peer = Vec::new(); - self.inject_connected = Vec::new(); self.inject_disconnected = Vec::new(); self.inject_connection_established = Vec::new(); self.inject_connection_closed = Vec::new(); @@ -192,7 +189,15 @@ where expected_connections: usize, ) -> bool { if self.inject_connection_established.len() == expected_established_connections { - assert_eq!(self.inject_connected.len(), expected_connections); + assert_eq!( + self.inject_connection_established + .iter() + .filter(|(.., reported_aditional_connections)| { + *reported_aditional_connections == 0 + }) + .count(), + expected_connections + ); return true; } @@ -226,8 +231,39 @@ where errors: Option<&Vec>, other_established: usize, ) { - self.inject_connection_established - .push((p.clone(), c.clone(), e.clone())); + let mut other_peer_connections = self + .inject_connection_established + .iter() + .rev() // take last to first + .filter_map(|(peer, .., other_established)| { + if p == peer { + Some(other_established) + } else { + None + } + }) + .take(other_established); + + // We are informed that there are `other_established` additional connections. Ensure that the + // number of previous connections is consistent with this + if let Some(&prev) = other_peer_connections.next() { + if prev < other_established { + assert_eq!( + prev, + other_established - 1, + "Inconsistent connection reporting" + ) + } + assert_eq!(other_peer_connections.count(), other_established - 1); + } else { + assert_eq!(other_established, 0) + } + self.inject_connection_established.push(( + p.clone(), + c.clone(), + e.clone(), + other_established, + )); self.inner .inject_connection_established(p, c, e, errors, other_established); } @@ -252,7 +288,9 @@ where ) { let connection = (p.clone(), c.clone(), e.clone()); assert!( - self.inject_connection_established.contains(&connection), + self.inject_connection_established + .iter() + .any(|(peer, conn_id, endpoint, _)| (peer, conn_id, endpoint) == (p, c, e)), "`inject_connection_closed` is called only for connections for \ which `inject_connection_established` was called first." ); @@ -269,7 +307,7 @@ where assert!( self.inject_connection_established .iter() - .any(|(peer_id, conn_id, _)| *peer_id == p && c == *conn_id), + .any(|(peer_id, conn_id, ..)| *peer_id == p && c == *conn_id), "`inject_event` is called for reported connections." ); assert!( From 1bbcdd0d50f14e91c3a63fa56d10a112b1678212 Mon Sep 17 00:00:00 2001 From: Diva M Date: Fri, 21 Jan 2022 11:39:22 -0500 Subject: [PATCH 04/10] add remaining_established to inject_connection_closed --- protocols/autonat/src/behaviour.rs | 3 +- protocols/gossipsub/src/behaviour.rs | 1 + protocols/identify/src/identify.rs | 1 + protocols/relay/src/v1/behaviour.rs | 1 + protocols/relay/src/v2/client.rs | 1 + protocols/relay/src/v2/relay.rs | 1 + protocols/request-response/src/lib.rs | 1 + swarm/src/behaviour.rs | 1 + swarm/src/behaviour/either.rs | 22 +++++++++---- swarm/src/behaviour/toggle.rs | 9 +++++- swarm/src/lib.rs | 16 ++++++---- swarm/src/test.rs | 45 ++++++++++++++++++++------- 12 files changed, 77 insertions(+), 25 deletions(-) diff --git a/protocols/autonat/src/behaviour.rs b/protocols/autonat/src/behaviour.rs index ad5bd4a106f..db59e132acb 100644 --- a/protocols/autonat/src/behaviour.rs +++ b/protocols/autonat/src/behaviour.rs @@ -348,9 +348,10 @@ impl NetworkBehaviour for Behaviour { conn: &ConnectionId, endpoint: &ConnectedPoint, handler: ::Handler, + remaining_established: usize, ) { self.inner - .inject_connection_closed(peer, conn, endpoint, handler); + .inject_connection_closed(peer, conn, endpoint, handler, remaining_established); let connections = self.connected.get_mut(peer).expect("Peer is connected."); connections.remove(conn); } diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 2452465ee27..e0d8ea413ac 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -3228,6 +3228,7 @@ where connection_id: &ConnectionId, endpoint: &ConnectedPoint, _: ::Handler, + remaining_established: usize, ) { // Remove IP from peer scoring system if let Some((peer_score, ..)) = &mut self.peer_score { diff --git a/protocols/identify/src/identify.rs b/protocols/identify/src/identify.rs index 02817c72a27..7c5a476c9ca 100644 --- a/protocols/identify/src/identify.rs +++ b/protocols/identify/src/identify.rs @@ -255,6 +255,7 @@ impl NetworkBehaviour for Identify { conn: &ConnectionId, _: &ConnectedPoint, _: ::Handler, + remaining_established: usize, ) { if let Some(addrs) = self.connected.get_mut(peer_id) { addrs.remove(conn); diff --git a/protocols/relay/src/v1/behaviour.rs b/protocols/relay/src/v1/behaviour.rs index 1c47067aa29..72955f2c5c8 100644 --- a/protocols/relay/src/v1/behaviour.rs +++ b/protocols/relay/src/v1/behaviour.rs @@ -353,6 +353,7 @@ impl NetworkBehaviour for Relay { connection: &ConnectionId, _: &ConnectedPoint, _: ::Handler, + remaining_established: usize, ) { // Remove connection from the set of connections for the given peer. In case the set is // empty it will be removed in `inject_disconnected`. diff --git a/protocols/relay/src/v2/client.rs b/protocols/relay/src/v2/client.rs index eb4645e5df1..71f73855926 100644 --- a/protocols/relay/src/v2/client.rs +++ b/protocols/relay/src/v2/client.rs @@ -145,6 +145,7 @@ impl NetworkBehaviour for Client { connection_id: &ConnectionId, endpoint: &ConnectedPoint, _handler: Either, + remaining_established: usize, ) { if !endpoint.is_relayed() { match self.directly_connected_peers.entry(*peer_id) { diff --git a/protocols/relay/src/v2/relay.rs b/protocols/relay/src/v2/relay.rs index eae734f3c3e..a06febfcfd2 100644 --- a/protocols/relay/src/v2/relay.rs +++ b/protocols/relay/src/v2/relay.rs @@ -213,6 +213,7 @@ impl NetworkBehaviour for Relay { connection: &ConnectionId, _: &ConnectedPoint, _handler: Either, + remaining_established: usize, ) { if let Some(connections) = self.reservations.get_mut(peer) { connections.remove(&connection); diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index e1460f48121..619da8d188b 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -647,6 +647,7 @@ where conn: &ConnectionId, _: &ConnectedPoint, _: ::Handler, + remaining_established: usize, ) { let connections = self .connected diff --git a/swarm/src/behaviour.rs b/swarm/src/behaviour.rs index 5bcc9f2b343..3a1150c8c6a 100644 --- a/swarm/src/behaviour.rs +++ b/swarm/src/behaviour.rs @@ -221,6 +221,7 @@ pub trait NetworkBehaviour: Send + 'static { _: &ConnectionId, _: &ConnectedPoint, _: ::Handler, + remaining_established: usize, ) { } diff --git a/swarm/src/behaviour/either.rs b/swarm/src/behaviour/either.rs index 17fe097b3bc..0e823002685 100644 --- a/swarm/src/behaviour/either.rs +++ b/swarm/src/behaviour/either.rs @@ -92,14 +92,24 @@ where connection: &ConnectionId, endpoint: &ConnectedPoint, handler: ::Handler, + remaining_established: usize, ) { match (self, handler) { - (Either::Left(behaviour), Either::Left(handler)) => { - behaviour.inject_connection_closed(peer_id, connection, endpoint, handler) - } - (Either::Right(behaviour), Either::Right(handler)) => { - behaviour.inject_connection_closed(peer_id, connection, endpoint, handler) - } + (Either::Left(behaviour), Either::Left(handler)) => behaviour.inject_connection_closed( + peer_id, + connection, + endpoint, + handler, + remaining_established, + ), + (Either::Right(behaviour), Either::Right(handler)) => behaviour + .inject_connection_closed( + peer_id, + connection, + endpoint, + handler, + remaining_established, + ), _ => unreachable!(), } } diff --git a/swarm/src/behaviour/toggle.rs b/swarm/src/behaviour/toggle.rs index af348f153e5..0fc5ea4a52b 100644 --- a/swarm/src/behaviour/toggle.rs +++ b/swarm/src/behaviour/toggle.rs @@ -117,10 +117,17 @@ where connection: &ConnectionId, endpoint: &ConnectedPoint, handler: ::Handler, + remaining_established: usize, ) { if let Some(inner) = self.inner.as_mut() { if let Some(handler) = handler.inner { - inner.inject_connection_closed(peer_id, connection, endpoint, handler) + inner.inject_connection_closed( + peer_id, + connection, + endpoint, + handler, + remaining_established, + ) } } } diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index f19a4f53355..54aeecfd2ff 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -720,19 +720,23 @@ where u32::try_from(remaining_established_connection_ids.len()).unwrap(); let conn_was_reported = !this.banned_peer_connections.remove(&id); if conn_was_reported { + // This connection was reported as open to the behaviour. Check if this is + // the last non-banned connection for the peer. + let last_non_banned = remaining_established_connection_ids + .iter() + .all(|conn_id| this.banned_peer_connections.contains(conn_id)); + let remaining_non_banned = remaining_established_connection_ids + .into_iter() + .filter(|conn_id| !this.banned_peer_connections.contains(&conn_id)) + .count(); this.behaviour.inject_connection_closed( &peer_id, &id, &endpoint, handler.into_protocols_handler(), + remaining_non_banned, ); - // This connection was reported as open to the behaviour. Check if this is - // the last non-banned connection for the peer. - let last_non_banned = remaining_established_connection_ids - .into_iter() - .all(|conn_id| this.banned_peer_connections.contains(&conn_id)); - if last_non_banned { this.behaviour.inject_disconnected(&peer_id) } diff --git a/swarm/src/test.rs b/swarm/src/test.rs index f3c6d572495..be40686d869 100644 --- a/swarm/src/test.rs +++ b/swarm/src/test.rs @@ -101,7 +101,7 @@ where pub addresses_of_peer: Vec, pub inject_disconnected: Vec, pub inject_connection_established: Vec<(PeerId, ConnectionId, ConnectedPoint, usize)>, - pub inject_connection_closed: Vec<(PeerId, ConnectionId, ConnectedPoint)>, + pub inject_connection_closed: Vec<(PeerId, ConnectionId, ConnectedPoint, usize)>, pub inject_event: Vec<( PeerId, ConnectionId, @@ -269,12 +269,6 @@ where } fn inject_disconnected(&mut self, peer: &PeerId) { - assert!( - self.inject_connection_closed - .iter() - .any(|(peer_id, _, _)| peer_id == peer), - "`inject_disconnected` is called after at least one `inject_connection_closed`." - ); self.inject_disconnected.push(*peer); self.inner.inject_disconnected(peer); } @@ -285,8 +279,35 @@ where c: &ConnectionId, e: &ConnectedPoint, handler: ::Handler, + remaining_established: usize, ) { - let connection = (p.clone(), c.clone(), e.clone()); + let mut other_closed_connections = self + .inject_connection_established + .iter() + .rev() // take last to first + .filter_map(|(peer, .., remaining_established)| { + if p == peer { + Some(remaining_established) + } else { + None + } + }) + .take(remaining_established); + + // We are informed that there are `other_established` additional connections. Ensure that the + // number of previous connections is consistent with this + if let Some(&prev) = other_closed_connections.next() { + if prev < remaining_established { + assert_eq!( + prev, + remaining_established - 1, + "Inconsistent closed connection reporting" + ) + } + assert_eq!(other_closed_connections.count(), remaining_established - 1); + } else { + assert_eq!(remaining_established, 0) + } assert!( self.inject_connection_established .iter() @@ -294,8 +315,10 @@ where "`inject_connection_closed` is called only for connections for \ which `inject_connection_established` was called first." ); - self.inject_connection_closed.push(connection); - self.inner.inject_connection_closed(p, c, e, handler); + self.inject_connection_closed + .push((*p, *c, e.clone(), remaining_established)); + self.inner + .inject_connection_closed(p, c, e, handler, remaining_established); } fn inject_event( @@ -314,7 +337,7 @@ where !self .inject_connection_closed .iter() - .any(|(peer_id, conn_id, _)| *peer_id == p && c == *conn_id), + .any(|(peer_id, conn_id, ..)| *peer_id == p && c == *conn_id), "`inject_event` is never called for closed connections." ); From 077813c5a161e3c6757a7334c6b3fc41c5e255af Mon Sep 17 00:00:00 2001 From: Diva M Date: Fri, 21 Jan 2022 14:12:00 -0500 Subject: [PATCH 05/10] remove inject_disconnected --- protocols/autonat/src/behaviour.rs | 13 +- protocols/floodsub/src/layer.rs | 14 +- protocols/gossipsub/src/behaviour.rs | 205 ++++++++++----------- protocols/gossipsub/src/behaviour/tests.rs | 30 ++- protocols/identify/src/identify.rs | 10 +- protocols/kad/src/behaviour.rs | 19 +- protocols/mdns/src/behaviour.rs | 13 +- protocols/relay/src/v1/behaviour.rs | 44 ++--- protocols/relay/src/v2/client.rs | 2 +- protocols/relay/src/v2/relay.rs | 3 +- protocols/request-response/src/lib.rs | 5 +- swarm/src/behaviour.rs | 11 +- swarm/src/behaviour/either.rs | 7 - swarm/src/behaviour/toggle.rs | 6 - swarm/src/lib.rs | 22 +-- swarm/src/test.rs | 16 +- 16 files changed, 221 insertions(+), 199 deletions(-) diff --git a/protocols/autonat/src/behaviour.rs b/protocols/autonat/src/behaviour.rs index db59e132acb..b2e13b11dde 100644 --- a/protocols/autonat/src/behaviour.rs +++ b/protocols/autonat/src/behaviour.rs @@ -352,8 +352,12 @@ impl NetworkBehaviour for Behaviour { ) { self.inner .inject_connection_closed(peer, conn, endpoint, handler, remaining_established); - let connections = self.connected.get_mut(peer).expect("Peer is connected."); - connections.remove(conn); + if remaining_established == 0 { + self.connected.remove(peer); + } else { + let connections = self.connected.get_mut(peer).expect("Peer is connected."); + connections.remove(conn); + } } fn inject_dial_failure( @@ -369,11 +373,6 @@ impl NetworkBehaviour for Behaviour { } } - fn inject_disconnected(&mut self, peer: &PeerId) { - self.inner.inject_disconnected(peer); - self.connected.remove(peer); - } - fn inject_address_change( &mut self, peer: &PeerId, diff --git a/protocols/floodsub/src/layer.rs b/protocols/floodsub/src/layer.rs index d8778017e1e..6f22e804012 100644 --- a/protocols/floodsub/src/layer.rs +++ b/protocols/floodsub/src/layer.rs @@ -322,7 +322,19 @@ impl NetworkBehaviour for Floodsub { self.connected_peers.insert(*id, SmallVec::new()); } - fn inject_disconnected(&mut self, id: &PeerId) { + fn inject_connection_closed( + &mut self, + id: &PeerId, + _: &ConnectionId, + _: &ConnectedPoint, + _: Self::ProtocolsHandler, + remaining_established: usize, + ) { + if remaining_established > 0 { + // we only care about peer disconnections + return; + } + let was_in = self.connected_peers.remove(id); debug_assert!(was_in.is_some()); diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index e0d8ea413ac..09006007a9a 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -3055,86 +3055,6 @@ where ) } - fn inject_disconnected(&mut self, peer_id: &PeerId) { - // remove from mesh, topic_peers, peer_topic and the fanout - debug!("Peer disconnected: {}", peer_id); - { - let topics = match self.peer_topics.get(peer_id) { - Some(topics) => (topics), - None => { - debug_assert!( - self.blacklisted_peers.contains(peer_id), - "Disconnected node not in connected list" - ); - return; - } - }; - - // remove peer from all mappings - for topic in topics { - // check the mesh for the topic - if let Some(mesh_peers) = self.mesh.get_mut(topic) { - // check if the peer is in the mesh and remove it - if mesh_peers.remove(peer_id) { - if let Some(m) = self.metrics.as_mut() { - m.peers_removed(topic, Churn::Dc, 1); - m.set_mesh_peers(topic, mesh_peers.len()); - } - }; - } - - // remove from topic_peers - if let Some(peer_list) = self.topic_peers.get_mut(topic) { - if !peer_list.remove(peer_id) { - // debugging purposes - warn!( - "Disconnected node: {} not in topic_peers peer list", - peer_id - ); - } - if let Some(m) = self.metrics.as_mut() { - m.set_topic_peers(topic, peer_list.len()) - } - } else { - warn!( - "Disconnected node: {} with topic: {:?} not in topic_peers", - &peer_id, &topic - ); - } - - // remove from fanout - self.fanout - .get_mut(topic) - .map(|peers| peers.remove(peer_id)); - } - } - - // Forget px and outbound status for this peer - self.px_peers.remove(peer_id); - self.outbound_peers.remove(peer_id); - - // Remove peer from peer_topics and connected_peers - // NOTE: It is possible the peer has already been removed from all mappings if it does not - // support the protocol. - self.peer_topics.remove(peer_id); - - // If metrics are enabled, register the disconnection of a peer based on its protocol. - if let Some(metrics) = self.metrics.as_mut() { - let peer_kind = &self - .connected_peers - .get(peer_id) - .expect("Connected peer must be registered") - .kind; - metrics.peer_protocol_disconnected(peer_kind.clone()); - } - - self.connected_peers.remove(peer_id); - - if let Some((peer_score, ..)) = &mut self.peer_score { - peer_score.remove_peer(peer_id); - } - } - fn inject_connection_established( &mut self, peer_id: &PeerId, @@ -3243,35 +3163,114 @@ where } } - // Remove the connection from the list - // If there are no connections left, inject_disconnected will remove the mapping entirely. - if let Some(connections) = self.connected_peers.get_mut(peer_id) { - let index = connections - .connections - .iter() - .position(|v| v == connection_id) - .expect("Previously established connection to peer must be present"); - connections.connections.remove(index); + if remaining_established != 0 { + // Remove the connection from the list + if let Some(connections) = self.connected_peers.get_mut(peer_id) { + let index = connections + .connections + .iter() + .position(|v| v == connection_id) + .expect("Previously established connection to peer must be present"); + connections.connections.remove(index); + + // If there are more connections and this peer is in a mesh, inform the first connection + // handler. + if !connections.connections.is_empty() { + if let Some(topics) = self.peer_topics.get(peer_id) { + for topic in topics { + if let Some(mesh_peers) = self.mesh.get(topic) { + if mesh_peers.contains(peer_id) { + self.events + .push_back(NetworkBehaviourAction::NotifyHandler { + peer_id: *peer_id, + event: Arc::new(GossipsubHandlerIn::JoinedMesh), + handler: NotifyHandler::One(connections.connections[0]), + }); + break; + } + } + } + } + } + } + } else { + // remove from mesh, topic_peers, peer_topic and the fanout + debug!("Peer disconnected: {}", peer_id); + { + let topics = match self.peer_topics.get(peer_id) { + Some(topics) => (topics), + None => { + debug_assert!( + self.blacklisted_peers.contains(peer_id), + "Disconnected node not in connected list" + ); + return; + } + }; - // If there are more connections and this peer is in a mesh, inform the first connection - // handler. - if !connections.connections.is_empty() { - if let Some(topics) = self.peer_topics.get(peer_id) { - for topic in topics { - if let Some(mesh_peers) = self.mesh.get(topic) { - if mesh_peers.contains(peer_id) { - self.events - .push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: *peer_id, - event: Arc::new(GossipsubHandlerIn::JoinedMesh), - handler: NotifyHandler::One(connections.connections[0]), - }); - break; + // remove peer from all mappings + for topic in topics { + // check the mesh for the topic + if let Some(mesh_peers) = self.mesh.get_mut(topic) { + // check if the peer is in the mesh and remove it + if mesh_peers.remove(peer_id) { + if let Some(m) = self.metrics.as_mut() { + m.peers_removed(topic, Churn::Dc, 1); + m.set_mesh_peers(topic, mesh_peers.len()); } + }; + } + + // remove from topic_peers + if let Some(peer_list) = self.topic_peers.get_mut(topic) { + if !peer_list.remove(peer_id) { + // debugging purposes + warn!( + "Disconnected node: {} not in topic_peers peer list", + peer_id + ); } + if let Some(m) = self.metrics.as_mut() { + m.set_topic_peers(topic, peer_list.len()) + } + } else { + warn!( + "Disconnected node: {} with topic: {:?} not in topic_peers", + &peer_id, &topic + ); } + + // remove from fanout + self.fanout + .get_mut(topic) + .map(|peers| peers.remove(peer_id)); } } + + // Forget px and outbound status for this peer + self.px_peers.remove(peer_id); + self.outbound_peers.remove(peer_id); + + // Remove peer from peer_topics and connected_peers + // NOTE: It is possible the peer has already been removed from all mappings if it does not + // support the protocol. + self.peer_topics.remove(peer_id); + + // If metrics are enabled, register the disconnection of a peer based on its protocol. + if let Some(metrics) = self.metrics.as_mut() { + let peer_kind = &self + .connected_peers + .get(peer_id) + .expect("Connected peer must be registered") + .kind; + metrics.peer_protocol_disconnected(peer_kind.clone()); + } + + self.connected_peers.remove(peer_id); + + if let Some((peer_score, ..)) = &mut self.peer_score { + peer_score.remove_peer(peer_id); + } } } diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 64adb8424f4..e7ff3935830 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -228,6 +228,32 @@ mod tests { peer } + fn disconnect_peer(gs: &mut Gossipsub, peer_id: &PeerId) + where + D: DataTransform + Default + Clone + Send + 'static, + F: TopicSubscriptionFilter + Clone + Default + Send + 'static, + { + if let Some(peer_connections) = gs.connected_peers.get(peer_id) { + let fake_endpoint = ConnectedPoint::Dialer { + address: Multiaddr::empty(), + role_override: Endpoint::Dialer, + }; // this is not relevant + // peer_connections.connections should never be empty. + let mut active_connections = peer_connections.connections.len() - 1; + for conn_id in peer_connections.connections.clone() { + let handler = gs.new_handler(); + gs.inject_connection_closed( + peer_id, + &conn_id, + &fake_endpoint, + handler, + active_connections, + ); + active_connections = active_connections.saturating_sub(1); + } + } + } + // Converts a protobuf message into a gossipsub message for reading the Gossipsub event queue. fn proto_to_message(rpc: &crate::rpc_proto::Rpc) -> GossipsubRpc { // Store valid messages. @@ -1382,7 +1408,7 @@ mod tests { flush_events(&mut gs); //disconnect peer - gs.inject_disconnected(peer); + disconnect_peer(&mut gs, peer); gs.heartbeat(); @@ -5335,7 +5361,7 @@ mod tests { gs.handle_graft(&peers[0], subscribe_topic_hash); // The node disconnects - gs.inject_disconnected(&peers[0]); + disconnect_peer(&mut gs, &peers[0]); // We unsubscribe from the topic. let _ = gs.unsubscribe(&Topic::new(topic)); diff --git a/protocols/identify/src/identify.rs b/protocols/identify/src/identify.rs index 7c5a476c9ca..45530404a66 100644 --- a/protocols/identify/src/identify.rs +++ b/protocols/identify/src/identify.rs @@ -257,7 +257,10 @@ impl NetworkBehaviour for Identify { _: ::Handler, remaining_established: usize, ) { - if let Some(addrs) = self.connected.get_mut(peer_id) { + if remaining_established == 0 { + self.connected.remove(peer_id); + self.pending_push.remove(peer_id); + } else if let Some(addrs) = self.connected.get_mut(peer_id) { addrs.remove(conn); } } @@ -283,11 +286,6 @@ impl NetworkBehaviour for Identify { } } - fn inject_disconnected(&mut self, peer_id: &PeerId) { - self.connected.remove(peer_id); - self.pending_push.remove(peer_id); - } - fn inject_new_listen_addr(&mut self, _id: ListenerId, _addr: &Multiaddr) { if self.config.push_listen_addr_updates { self.pending_push.extend(self.connected.keys()); diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index 290484864d5..c6d63ff69fa 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -1949,12 +1949,21 @@ where } } - fn inject_disconnected(&mut self, id: &PeerId) { - for query in self.queries.iter_mut() { - query.on_failure(id); + fn inject_connection_closed( + &mut self, + id: &PeerId, + _: &ConnectionId, + _: &ConnectedPoint, + _: ::Handler, + remaining_established: usize, + ) { + if remaining_established == 0 { + for query in self.queries.iter_mut() { + query.on_failure(id); + } + self.connection_updated(*id, None, NodeStatus::Disconnected); + self.connected_peers.remove(id); } - self.connection_updated(*id, None, NodeStatus::Disconnected); - self.connected_peers.remove(id); } fn inject_event( diff --git a/protocols/mdns/src/behaviour.rs b/protocols/mdns/src/behaviour.rs index fdc26839c11..5e58f8bc66d 100644 --- a/protocols/mdns/src/behaviour.rs +++ b/protocols/mdns/src/behaviour.rs @@ -127,8 +127,17 @@ impl NetworkBehaviour for Mdns { } } - fn inject_disconnected(&mut self, peer: &PeerId) { - self.expire_node(peer); + fn inject_connection_closed( + &mut self, + peer: &PeerId, + _: &libp2p_core::connection::ConnectionId, + _: &libp2p_core::ConnectedPoint, + _: Self::ProtocolsHandler, + remaining_established: usize, + ) { + if remaining_established == 0 { + self.expire_node(peer); + } } fn poll( diff --git a/protocols/relay/src/v1/behaviour.rs b/protocols/relay/src/v1/behaviour.rs index 72955f2c5c8..aaddb0bbe61 100644 --- a/protocols/relay/src/v1/behaviour.rs +++ b/protocols/relay/src/v1/behaviour.rs @@ -408,6 +408,28 @@ impl NetworkBehaviour for Relay { } } } + + if remaining_established == 0 { + self.connected_peers.remove(peer); + + if let Some(reqs) = self.incoming_relay_reqs.remove(peer) { + for req in reqs { + let IncomingRelayReq::DialingDst { + src_peer_id, + incoming_relay_req, + .. + } = req; + self.outbox_to_swarm + .push_back(NetworkBehaviourAction::NotifyHandler { + peer_id: src_peer_id, + handler: NotifyHandler::Any, + event: RelayHandlerIn::DenyIncomingRelayReq( + incoming_relay_req.deny(circuit_relay::Status::HopCantDialDst), + ), + }) + } + } + } } fn inject_listener_error(&mut self, _id: ListenerId, _err: &(dyn std::error::Error + 'static)) { @@ -415,28 +437,6 @@ impl NetworkBehaviour for Relay { fn inject_listener_closed(&mut self, _id: ListenerId, _reason: Result<(), &std::io::Error>) {} - fn inject_disconnected(&mut self, id: &PeerId) { - self.connected_peers.remove(id); - - if let Some(reqs) = self.incoming_relay_reqs.remove(id) { - for req in reqs { - let IncomingRelayReq::DialingDst { - src_peer_id, - incoming_relay_req, - .. - } = req; - self.outbox_to_swarm - .push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: src_peer_id, - handler: NotifyHandler::Any, - event: RelayHandlerIn::DenyIncomingRelayReq( - incoming_relay_req.deny(circuit_relay::Status::HopCantDialDst), - ), - }) - } - } - } - fn inject_event( &mut self, event_source: PeerId, diff --git a/protocols/relay/src/v2/client.rs b/protocols/relay/src/v2/client.rs index 71f73855926..fa4447df830 100644 --- a/protocols/relay/src/v2/client.rs +++ b/protocols/relay/src/v2/client.rs @@ -145,7 +145,7 @@ impl NetworkBehaviour for Client { connection_id: &ConnectionId, endpoint: &ConnectedPoint, _handler: Either, - remaining_established: usize, + _remaining_established: usize, ) { if !endpoint.is_relayed() { match self.directly_connected_peers.entry(*peer_id) { diff --git a/protocols/relay/src/v2/relay.rs b/protocols/relay/src/v2/relay.rs index a06febfcfd2..241d4555b20 100644 --- a/protocols/relay/src/v2/relay.rs +++ b/protocols/relay/src/v2/relay.rs @@ -213,9 +213,10 @@ impl NetworkBehaviour for Relay { connection: &ConnectionId, _: &ConnectedPoint, _handler: Either, - remaining_established: usize, + _remaining_established: usize, ) { if let Some(connections) = self.reservations.get_mut(peer) { + // TODO: it looks like `peer` is never removed from `self.reservations`? connections.remove(&connection); } diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index 619da8d188b..27041b153c5 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -660,6 +660,7 @@ where .map(|p: usize| connections.remove(p)) .expect("Expected connection to be established before closing."); + debug_assert_eq!(connections.is_empty(), remaining_established == 0); if connections.is_empty() { self.connected.remove(peer_id); } @@ -687,10 +688,6 @@ where } } - fn inject_disconnected(&mut self, peer: &PeerId) { - self.connected.remove(peer); - } - fn inject_dial_failure( &mut self, peer: Option, diff --git a/swarm/src/behaviour.rs b/swarm/src/behaviour.rs index 3a1150c8c6a..48ccf1faf84 100644 --- a/swarm/src/behaviour.rs +++ b/swarm/src/behaviour.rs @@ -190,15 +190,6 @@ pub trait NetworkBehaviour: Send + 'static { vec![] } - /// Indicates to the behaviour that we disconnected from the node with the given peer id. - /// - /// There is no handler running anymore for this node. Any event that has been sent to it may - /// or may not have been processed by the handler. - /// - /// This method is only called when the last established connection to the peer is closed, - /// preceded by [`inject_connection_closed`](NetworkBehaviour::inject_connection_closed). - fn inject_disconnected(&mut self, _: &PeerId) {} - /// Informs the behaviour about a newly established connection to a peer. fn inject_connection_established( &mut self, @@ -221,7 +212,7 @@ pub trait NetworkBehaviour: Send + 'static { _: &ConnectionId, _: &ConnectedPoint, _: ::Handler, - remaining_established: usize, + _remaining_established: usize, ) { } diff --git a/swarm/src/behaviour/either.rs b/swarm/src/behaviour/either.rs index 0e823002685..c48b2beaaef 100644 --- a/swarm/src/behaviour/either.rs +++ b/swarm/src/behaviour/either.rs @@ -53,13 +53,6 @@ where } } - fn inject_disconnected(&mut self, peer_id: &PeerId) { - match self { - Either::Left(a) => a.inject_disconnected(peer_id), - Either::Right(b) => b.inject_disconnected(peer_id), - } - } - fn inject_connection_established( &mut self, peer_id: &PeerId, diff --git a/swarm/src/behaviour/toggle.rs b/swarm/src/behaviour/toggle.rs index 0fc5ea4a52b..7aeee3cde4a 100644 --- a/swarm/src/behaviour/toggle.rs +++ b/swarm/src/behaviour/toggle.rs @@ -86,12 +86,6 @@ where .unwrap_or_else(Vec::new) } - fn inject_disconnected(&mut self, peer_id: &PeerId) { - if let Some(inner) = self.inner.as_mut() { - inner.inject_disconnected(peer_id) - } - } - fn inject_connection_established( &mut self, peer_id: &PeerId, diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 54aeecfd2ff..533e15e6a24 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -720,11 +720,6 @@ where u32::try_from(remaining_established_connection_ids.len()).unwrap(); let conn_was_reported = !this.banned_peer_connections.remove(&id); if conn_was_reported { - // This connection was reported as open to the behaviour. Check if this is - // the last non-banned connection for the peer. - let last_non_banned = remaining_established_connection_ids - .iter() - .all(|conn_id| this.banned_peer_connections.contains(conn_id)); let remaining_non_banned = remaining_established_connection_ids .into_iter() .filter(|conn_id| !this.banned_peer_connections.contains(&conn_id)) @@ -736,10 +731,6 @@ where handler.into_protocols_handler(), remaining_non_banned, ); - - if last_non_banned { - this.behaviour.inject_disconnected(&peer_id) - } } return Poll::Ready(SwarmEvent::ConnectionClosed { peer_id, @@ -1535,9 +1526,10 @@ mod tests { [swarm1, swarm2] .iter() .all(|s| s.behaviour.inject_connection_closed.len() == num_connections) - && [swarm1, swarm2] - .iter() - .all(|s| s.behaviour.inject_disconnected.len() == 1) + && [swarm1, swarm2].iter().all(|s| { + let (.., last_remaining) = s.behaviour.inject_connection_closed.last().unwrap(); + *last_remaining == 0 + }) } /// Establishes multiple connections between two peers, @@ -1863,7 +1855,11 @@ mod tests { } State::Disconnecting => { for s in &[&swarm1, &swarm2] { - assert_eq!(s.behaviour.inject_disconnected.len(), 0); + assert!(s + .behaviour + .inject_connection_closed + .iter() + .all(|(.., remaining_conns)| *remaining_conns > 0)); assert_eq!( s.behaviour.inject_connection_established.len(), num_connections diff --git a/swarm/src/test.rs b/swarm/src/test.rs index be40686d869..25f6b47ac23 100644 --- a/swarm/src/test.rs +++ b/swarm/src/test.rs @@ -99,7 +99,6 @@ where inner: TInner, pub addresses_of_peer: Vec, - pub inject_disconnected: Vec, pub inject_connection_established: Vec<(PeerId, ConnectionId, ConnectedPoint, usize)>, pub inject_connection_closed: Vec<(PeerId, ConnectionId, ConnectedPoint, usize)>, pub inject_event: Vec<( @@ -126,7 +125,6 @@ where Self { inner, addresses_of_peer: Vec::new(), - inject_disconnected: Vec::new(), inject_connection_established: Vec::new(), inject_connection_closed: Vec::new(), inject_event: Vec::new(), @@ -145,7 +143,6 @@ where #[allow(dead_code)] pub fn reset(&mut self) { self.addresses_of_peer = Vec::new(); - self.inject_disconnected = Vec::new(); self.inject_connection_established = Vec::new(); self.inject_connection_closed = Vec::new(); self.inject_event = Vec::new(); @@ -172,7 +169,13 @@ where expected_disconnections: usize, ) -> bool { if self.inject_connection_closed.len() == expected_closed_connections { - assert_eq!(self.inject_disconnected.len(), expected_disconnections); + assert_eq!( + self.inject_connection_closed + .iter() + .filter(|(.., remaining_established)| { *remaining_established == 0 }) + .count(), + expected_disconnections + ); return true; } @@ -268,11 +271,6 @@ where .inject_connection_established(p, c, e, errors, other_established); } - fn inject_disconnected(&mut self, peer: &PeerId) { - self.inject_disconnected.push(*peer); - self.inner.inject_disconnected(peer); - } - fn inject_connection_closed( &mut self, p: &PeerId, From e110e71dae6948c976615137c26e59d217364e7b Mon Sep 17 00:00:00 2001 From: Diva M Date: Fri, 21 Jan 2022 14:12:51 -0500 Subject: [PATCH 06/10] remove inject_disconnected from swarm-derive --- swarm-derive/src/lib.rs | 27 +++------------------------ 1 file changed, 3 insertions(+), 24 deletions(-) diff --git a/swarm-derive/src/lib.rs b/swarm-derive/src/lib.rs index ffaafc6e821..8cb35c9cfe9 100644 --- a/swarm-derive/src/lib.rs +++ b/swarm-derive/src/lib.rs @@ -163,23 +163,6 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { }) }; - // Build the list of statements to put in the body of `inject_disconnected()`. - let inject_disconnected_stmts = { - data_struct - .fields - .iter() - .enumerate() - .filter_map(move |(field_n, field)| { - if is_ignored(field) { - return None; - } - Some(match field.ident { - Some(ref i) => quote! { self.#i.inject_disconnected(peer_id); }, - None => quote! { self.#field_n.inject_disconnected(peer_id); }, - }) - }) - }; - // Build the list of statements to put in the body of `inject_connection_established()`. let inject_connection_established_stmts = { data_struct.fields.iter().enumerate().filter_map(move |(field_n, field)| { @@ -226,8 +209,8 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { } }; let inject = match field.ident { - Some(ref i) => quote!{ self.#i.inject_connection_closed(peer_id, connection_id, endpoint, handler) }, - None => quote!{ self.#enum_n.inject_connection_closed(peer_id, connection_id, endpoint, handler) }, + Some(ref i) => quote!{ self.#i.inject_connection_closed(peer_id, connection_id, endpoint, handler, remaining_established) }, + None => quote!{ self.#enum_n.inject_connection_closed(peer_id, connection_id, endpoint, handler, remaining_established) }, }; quote! { @@ -632,10 +615,6 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { out } - fn inject_disconnected(&mut self, peer_id: &#peer_id) { - #(#inject_disconnected_stmts);* - } - fn inject_connection_established(&mut self, peer_id: &#peer_id, connection_id: &#connection_id, endpoint: &#connected_point, errors: #dial_errors, other_established: usize) { #(#inject_connection_established_stmts);* } @@ -644,7 +623,7 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { #(#inject_address_change_stmts);* } - fn inject_connection_closed(&mut self, peer_id: &#peer_id, connection_id: &#connection_id, endpoint: &#connected_point, handlers: ::Handler) { + fn inject_connection_closed(&mut self, peer_id: &#peer_id, connection_id: &#connection_id, endpoint: &#connected_point, handlers: ::Handler, remaining_established: usize) { #(#inject_connection_closed_stmts);* } From 2d47bd344fe5127d666752d5e37eca8adb5a3612 Mon Sep 17 00:00:00 2001 From: Diva M Date: Mon, 31 Jan 2022 10:48:04 -0500 Subject: [PATCH 07/10] more fashionable sub 1 --- protocols/gossipsub/src/behaviour/tests.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index e7ff3935830..b4760b7208b 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -239,9 +239,10 @@ mod tests { role_override: Endpoint::Dialer, }; // this is not relevant // peer_connections.connections should never be empty. - let mut active_connections = peer_connections.connections.len() - 1; + let mut active_connections = peer_connections.connections.len(); for conn_id in peer_connections.connections.clone() { let handler = gs.new_handler(); + active_connections = active_connections.checked_sub(1).unwrap(); gs.inject_connection_closed( peer_id, &conn_id, @@ -249,7 +250,6 @@ mod tests { handler, active_connections, ); - active_connections = active_connections.saturating_sub(1); } } } From bc69a580625755e8189fa510d39723b56db2f1c3 Mon Sep 17 00:00:00 2001 From: Diva M Date: Mon, 7 Feb 2022 15:51:57 -0500 Subject: [PATCH 08/10] CHANGELOG entries --- protocols/autonat/CHANGELOG.md | 6 +++++- protocols/floodsub/CHANGELOG.md | 4 ++++ protocols/gossipsub/CHANGELOG.md | 3 +++ protocols/identify/CHANGELOG.md | 4 ++++ protocols/kad/CHANGELOG.md | 4 ++++ protocols/mdns/CHANGELOG.md | 4 ++++ protocols/ping/CHANGELOG.md | 4 ++++ protocols/relay/CHANGELOG.md | 4 ++++ protocols/rendezvous/CHANGELOG.md | 4 ++++ protocols/request-response/CHANGELOG.md | 4 ++++ 10 files changed, 40 insertions(+), 1 deletion(-) diff --git a/protocols/autonat/CHANGELOG.md b/protocols/autonat/CHANGELOG.md index c93d77b8c06..5df9d722fa0 100644 --- a/protocols/autonat/CHANGELOG.md +++ b/protocols/autonat/CHANGELOG.md @@ -6,6 +6,10 @@ - Update to `libp2p-request-response` `v0.16.0`. +- Merge NetworkBehaviour's inject_\* paired methods (see PR 2445). + +[PR 2445]: https://github.com/libp2p/rust-libp2p/pull/2445 + # 0.1.0 [2022-01-27] -- Initial release. \ No newline at end of file +- Initial release. diff --git a/protocols/floodsub/CHANGELOG.md b/protocols/floodsub/CHANGELOG.md index d1ec2814973..631fbafb37a 100644 --- a/protocols/floodsub/CHANGELOG.md +++ b/protocols/floodsub/CHANGELOG.md @@ -4,6 +4,10 @@ - Update to `libp2p-swarm` `v0.34.0`. +- Merge NetworkBehaviour's inject_\* paired methods (see PR 2445). + +[PR 2445]: https://github.com/libp2p/rust-libp2p/pull/2445 + # 0.33.0 [2022-01-27] - Update dependencies. diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index 36bb23bd4bc..857697c42db 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -8,8 +8,11 @@ - Emit gossip of all non empty topics (see [PR 2481]). +- Merge NetworkBehaviour's inject_\* paired methods (see PR 2445). + [PR 2442]: https://github.com/libp2p/rust-libp2p/pull/2442 [PR 2481]: https://github.com/libp2p/rust-libp2p/pull/2481 +[PR 2445]: https://github.com/libp2p/rust-libp2p/pull/2445 # 0.35.0 [2022-01-27] diff --git a/protocols/identify/CHANGELOG.md b/protocols/identify/CHANGELOG.md index f4bb78075b5..05206ea78a5 100644 --- a/protocols/identify/CHANGELOG.md +++ b/protocols/identify/CHANGELOG.md @@ -4,6 +4,10 @@ - Update to `libp2p-swarm` `v0.34.0`. +- Merge NetworkBehaviour's inject_\* paired methods (see PR 2445). + +[PR 2445]: https://github.com/libp2p/rust-libp2p/pull/2445 + # 0.33.0 [2022-01-27] - Update dependencies. diff --git a/protocols/kad/CHANGELOG.md b/protocols/kad/CHANGELOG.md index d8d2c3c246d..f1c7272652f 100644 --- a/protocols/kad/CHANGELOG.md +++ b/protocols/kad/CHANGELOG.md @@ -4,6 +4,10 @@ - Update to `libp2p-swarm` `v0.34.0`. +- Merge NetworkBehaviour's inject_\* paired methods (see PR 2445). + +[PR 2445]: https://github.com/libp2p/rust-libp2p/pull/2445 + # 0.34.0 [2022-01-27] - Update dependencies. diff --git a/protocols/mdns/CHANGELOG.md b/protocols/mdns/CHANGELOG.md index 278dc9abff6..9e5276615d7 100644 --- a/protocols/mdns/CHANGELOG.md +++ b/protocols/mdns/CHANGELOG.md @@ -4,6 +4,10 @@ - Update to `libp2p-swarm` `v0.34.0`. +- Merge NetworkBehaviour's inject_\* paired methods (see PR 2445). + +[PR 2445]: https://github.com/libp2p/rust-libp2p/pull/2445 + # 0.34.0 [2022-01-27] - Update dependencies. diff --git a/protocols/ping/CHANGELOG.md b/protocols/ping/CHANGELOG.md index 9ff7196e21a..9d9341d5607 100644 --- a/protocols/ping/CHANGELOG.md +++ b/protocols/ping/CHANGELOG.md @@ -4,6 +4,10 @@ - Update to `libp2p-swarm` `v0.34.0`. +- Merge NetworkBehaviour's inject_\* paired methods (see PR 2445). + +[PR 2445]: https://github.com/libp2p/rust-libp2p/pull/2445 + # 0.33.0 [2022-01-27] - Update dependencies. diff --git a/protocols/relay/CHANGELOG.md b/protocols/relay/CHANGELOG.md index 7449def4a06..644bb1c1cb0 100644 --- a/protocols/relay/CHANGELOG.md +++ b/protocols/relay/CHANGELOG.md @@ -4,6 +4,10 @@ - Update to `libp2p-swarm` `v0.34.0`. +- Merge NetworkBehaviour's inject_\* paired methods (see PR 2445). + +[PR 2445]: https://github.com/libp2p/rust-libp2p/pull/2445 + # 0.6.1 [2022-02-02] - Remove empty peer entries in `reservations` `HashMap`. See [PR 2464]. diff --git a/protocols/rendezvous/CHANGELOG.md b/protocols/rendezvous/CHANGELOG.md index 35c198b09cc..7aaad13dc03 100644 --- a/protocols/rendezvous/CHANGELOG.md +++ b/protocols/rendezvous/CHANGELOG.md @@ -4,6 +4,10 @@ - Update to `libp2p-swarm` `v0.34.0`. +- Merge NetworkBehaviour's inject_\* paired methods (see PR 2445). + +[PR 2445]: https://github.com/libp2p/rust-libp2p/pull/2445 + # 0.3.0 [2022-01-27] - Update dependencies. diff --git a/protocols/request-response/CHANGELOG.md b/protocols/request-response/CHANGELOG.md index 7758a1f6450..e8ab11c3eea 100644 --- a/protocols/request-response/CHANGELOG.md +++ b/protocols/request-response/CHANGELOG.md @@ -4,6 +4,10 @@ - Update to `libp2p-swarm` `v0.34.0`. +- Merge NetworkBehaviour's inject_\* paired methods (see PR 2445). + +[PR 2445]: https://github.com/libp2p/rust-libp2p/pull/2445 + # 0.15.0 [2022-01-27] - Update dependencies. From 2b69060b89d8de4489561c9e53ca4d7ca125ef21 Mon Sep 17 00:00:00 2001 From: Diva M Date: Wed, 9 Feb 2022 09:31:02 -0500 Subject: [PATCH 09/10] add missing CHANGELOG entry --- swarm/CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index e938160684a..a5714678e5e 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -28,6 +28,8 @@ `DialOpts`. This option is needed for NAT and firewall hole punching. See [PR 2363]. +- Merge NetworkBehaviour's inject_\* paired methods (see PR 2445). + [PR 2339]: https://github.com/libp2p/rust-libp2p/pull/2339 [PR 2350]: https://github.com/libp2p/rust-libp2p/pull/2350 [PR 2362]: https://github.com/libp2p/rust-libp2p/pull/2362 @@ -37,6 +39,7 @@ [PR 2404]: https://github.com/libp2p/rust-libp2p/pull/2404 [PR 2428]: https://github.com/libp2p/rust-libp2p/pull/2428 [PR 2363]: https://github.com/libp2p/rust-libp2p/pull/2363 +[PR 2445]: https://github.com/libp2p/rust-libp2p/pull/2445 # 0.32.0 [2021-11-16] From 2073544fd3f8dc88b43279f469da18fd2b7e3235 Mon Sep 17 00:00:00 2001 From: Diva M Date: Wed, 9 Feb 2022 09:45:23 -0500 Subject: [PATCH 10/10] integrate dcutr --- protocols/dcutr/src/behaviour.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/protocols/dcutr/src/behaviour.rs b/protocols/dcutr/src/behaviour.rs index fe8bbe42b2f..f1966abf45a 100644 --- a/protocols/dcutr/src/behaviour.rs +++ b/protocols/dcutr/src/behaviour.rs @@ -100,6 +100,7 @@ impl NetworkBehaviour for Behaviour { connection_id: &ConnectionId, connected_point: &ConnectedPoint, _failed_addresses: Option<&Vec>, + _other_established: usize, ) { if connected_point.is_relayed() { if connected_point.is_listener() && !self.direct_connections.contains_key(peer_id) { @@ -181,16 +182,13 @@ impl NetworkBehaviour for Behaviour { } } - fn inject_disconnected(&mut self, peer_id: &PeerId) { - assert!(!self.direct_connections.contains_key(peer_id)); - } - fn inject_connection_closed( &mut self, peer_id: &PeerId, connection_id: &ConnectionId, connected_point: &ConnectedPoint, _handler: <::ProtocolsHandler as IntoProtocolsHandler>::Handler, + _remaining_established: usize, ) { if !connected_point.is_relayed() { let connections = self