From 006a63caaf58a762d1830b41c046f2b7e2c6535e Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 21 Jan 2019 15:19:21 +0100 Subject: [PATCH 1/7] Embed the topology in the NetworkBehaviour --- core/src/lib.rs | 1 - core/src/swarm.rs | 169 +++++++++-------------------- core/src/topology/mod.rs | 151 -------------------------- examples/chat.rs | 24 ++-- examples/ipfs-kad.rs | 27 ++--- misc/core-derive/src/lib.rs | 50 +++++---- misc/core-derive/tests/test.rs | 2 +- misc/mdns/Cargo.toml | 1 + misc/mdns/src/behaviour.rs | 81 ++++++++++---- misc/mdns/src/dns.rs | 2 +- misc/mdns/src/lib.rs | 2 +- misc/mdns/src/service.rs | 11 +- protocols/floodsub/src/layer.rs | 10 +- protocols/identify/src/identify.rs | 25 ++--- protocols/identify/src/lib.rs | 2 - protocols/identify/src/topology.rs | 43 -------- protocols/kad/src/behaviour.rs | 116 +++++++++++--------- protocols/kad/src/kbucket.rs | 41 +++++-- protocols/kad/src/lib.rs | 2 - protocols/kad/src/topology.rs | 87 --------------- protocols/ping/src/lib.rs | 10 +- 21 files changed, 304 insertions(+), 553 deletions(-) delete mode 100644 core/src/topology/mod.rs delete mode 100644 protocols/identify/src/topology.rs delete mode 100644 protocols/kad/src/topology.rs diff --git a/core/src/lib.rs b/core/src/lib.rs index c0a8160c685..1fd0b81ce0d 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -77,7 +77,6 @@ pub mod muxing; pub mod nodes; pub mod protocols_handler; pub mod swarm; -pub mod topology; pub mod transport; pub mod upgrade; diff --git a/core/src/swarm.rs b/core/src/swarm.rs index 857dc279c0e..54c4238c949 100644 --- a/core/src/swarm.rs +++ b/core/src/swarm.rs @@ -42,7 +42,7 @@ //! use crate::{ - Transport, Multiaddr, PublicKey, PeerId, InboundUpgrade, OutboundUpgrade, UpgradeInfo, ProtocolName, + Transport, Multiaddr, PeerId, InboundUpgrade, OutboundUpgrade, UpgradeInfo, ProtocolName, muxing::StreamMuxer, nodes::{ handled_node::NodeHandler, @@ -50,9 +50,7 @@ use crate::{ raw_swarm::{RawSwarm, RawSwarmEvent} }, protocols_handler::{NodeHandlerWrapperBuilder, NodeHandlerWrapper, IntoProtocolsHandler, ProtocolsHandler}, - topology::Topology, transport::TransportError, - topology::DisconnectReason, }; use futures::prelude::*; use smallvec::SmallVec; @@ -61,26 +59,22 @@ use std::{fmt, io, ops::{Deref, DerefMut}}; pub use crate::nodes::raw_swarm::ConnectedPoint; /// Contains the state of the network, plus the way it should behave. -pub struct Swarm +pub struct Swarm where TTransport: Transport, - TBehaviour: NetworkBehaviour, + TBehaviour: NetworkBehaviour, { raw_swarm: RawSwarm< TTransport, - <<>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent, - <<>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent, + <<::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent, + <<::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent, NodeHandlerWrapperBuilder, - <<>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::Error, + <<::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::Error, >, /// Handles which nodes to connect to and how to handle the events sent back by the protocol /// handlers. behaviour: TBehaviour, - /// Holds the topology of the network. In other words all the nodes that we think exist, even - /// if we're not connected to them. - topology: TTopology, - /// List of protocols that the behaviour says it supports. supported_protocols: SmallVec<[Vec; 16]>, @@ -88,9 +82,9 @@ where TTransport: Transport, listened_addrs: SmallVec<[Multiaddr; 8]>, } -impl Deref for Swarm +impl Deref for Swarm where TTransport: Transport, - TBehaviour: NetworkBehaviour, + TBehaviour: NetworkBehaviour, { type Target = TBehaviour; @@ -100,9 +94,9 @@ where TTransport: Transport, } } -impl DerefMut for Swarm +impl DerefMut for Swarm where TTransport: Transport, - TBehaviour: NetworkBehaviour, + TBehaviour: NetworkBehaviour, { #[inline] fn deref_mut(&mut self) -> &mut Self::Target { @@ -110,8 +104,8 @@ where TTransport: Transport, } } -impl Swarm -where TBehaviour: NetworkBehaviour, +impl Swarm +where TBehaviour: NetworkBehaviour, TMuxer: StreamMuxer + Send + Sync + 'static, ::OutboundSubstream: Send + 'static, ::Substream: Send + 'static, @@ -139,29 +133,12 @@ where TBehaviour: NetworkBehaviour, <<::Handler as ProtocolsHandler>::OutboundProtocol as OutboundUpgrade>>::Future: Send + 'static, <<::Handler as ProtocolsHandler>::OutboundProtocol as OutboundUpgrade>>::Error: fmt::Debug + Send + 'static, ::Handler> as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary - TTopology: Topology, { /// Builds a new `Swarm`. #[inline] - pub fn new(transport: TTransport, mut behaviour: TBehaviour, topology: TTopology) -> Self { - let supported_protocols = behaviour - .new_handler() - .into_handler(topology.local_peer_id()) - .listen_protocol() - .protocol_info() - .into_iter() - .map(|info| info.protocol_name().to_vec()) - .collect(); - - let raw_swarm = RawSwarm::new(transport, topology.local_peer_id().clone()); - - Swarm { - raw_swarm, - behaviour, - topology, - supported_protocols, - listened_addrs: SmallVec::new(), - } + pub fn new(transport: TTransport, behaviour: TBehaviour, local_peer_id: PeerId) -> Self { + SwarmBuilder::new(transport, behaviour, local_peer_id) + .build() } /// Returns the transport passed when building this object. @@ -198,7 +175,7 @@ where TBehaviour: NetworkBehaviour, /// peer. #[inline] pub fn dial(me: &mut Self, peer_id: PeerId) { - let addrs = me.topology.addresses_of_peer(&peer_id); + let addrs = me.behaviour.addresses_of_peer(&peer_id); let handler = me.behaviour.new_handler().into_node_handler_builder(); if let Some(peer) = me.raw_swarm.peer(peer_id).as_not_connected() { let _ = peer.connect_iter(addrs, handler); @@ -216,22 +193,10 @@ where TBehaviour: NetworkBehaviour, pub fn local_peer_id(me: &Self) -> &PeerId { &me.raw_swarm.local_peer_id() } - - /// Returns the topology of the swarm. - #[inline] - pub fn topology(me: &Self) -> &TTopology { - &me.topology - } - - /// Returns the topology of the swarm. - #[inline] - pub fn topology_mut(me: &mut Self) -> &mut TTopology { - &mut me.topology - } } -impl Stream for Swarm -where TBehaviour: NetworkBehaviour, +impl Stream for Swarm +where TBehaviour: NetworkBehaviour, TMuxer: StreamMuxer + Send + Sync + 'static, ::OutboundSubstream: Send + 'static, ::Substream: Send + 'static, @@ -259,7 +224,6 @@ where TBehaviour: NetworkBehaviour, <<::Handler as ProtocolsHandler>::OutboundProtocol as UpgradeInfo>::InfoIter: Send + 'static, <<<::Handler as ProtocolsHandler>::OutboundProtocol as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send + 'static, ::Handler> as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary - TTopology: Topology, { type Item = TBehaviour::OutEvent; type Error = io::Error; @@ -275,20 +239,15 @@ where TBehaviour: NetworkBehaviour, self.behaviour.inject_node_event(peer_id, event); }, Async::Ready(RawSwarmEvent::Connected { peer_id, endpoint }) => { - self.topology.set_connected(&peer_id, &endpoint); self.behaviour.inject_connected(peer_id, endpoint); }, Async::Ready(RawSwarmEvent::NodeClosed { peer_id, endpoint }) => { - self.topology.set_disconnected(&peer_id, &endpoint, DisconnectReason::Graceful); self.behaviour.inject_disconnected(&peer_id, endpoint); }, Async::Ready(RawSwarmEvent::NodeError { peer_id, endpoint, .. }) => { - self.topology.set_disconnected(&peer_id, &endpoint, DisconnectReason::Error); self.behaviour.inject_disconnected(&peer_id, endpoint); }, Async::Ready(RawSwarmEvent::Replaced { peer_id, closed_endpoint, endpoint }) => { - self.topology.set_disconnected(&peer_id, &closed_endpoint, DisconnectReason::Replaced); - self.topology.set_connected(&peer_id, &endpoint); self.behaviour.inject_disconnected(&peer_id, closed_endpoint); self.behaviour.inject_connected(peer_id, endpoint); }, @@ -298,18 +257,14 @@ where TBehaviour: NetworkBehaviour, }, Async::Ready(RawSwarmEvent::ListenerClosed { .. }) => {}, Async::Ready(RawSwarmEvent::IncomingConnectionError { .. }) => {}, - Async::Ready(RawSwarmEvent::DialError { multiaddr, .. }) => { - self.topology.set_unreachable(&multiaddr); - }, - Async::Ready(RawSwarmEvent::UnknownPeerDialError { multiaddr, .. }) => { - self.topology.set_unreachable(&multiaddr); - }, + Async::Ready(RawSwarmEvent::DialError { .. }) => {}, + Async::Ready(RawSwarmEvent::UnknownPeerDialError { .. }) => {}, } let behaviour_poll = { let transport = self.raw_swarm.transport(); let mut parameters = PollParameters { - topology: &mut self.topology, + local_peer_id: &mut self.raw_swarm.local_peer_id(), supported_protocols: &self.supported_protocols, listened_addrs: &self.listened_addrs, nat_traversal: &move |a, b| transport.nat_traversal(a, b), @@ -335,7 +290,7 @@ where TBehaviour: NetworkBehaviour, } }, Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }) => { - self.topology.add_local_external_addrs(self.raw_swarm.nat_traversal(&address)); + // TODO: self.topology.add_local_external_addrs(self.raw_swarm.nat_traversal(&address)); }, } } @@ -346,7 +301,7 @@ where TBehaviour: NetworkBehaviour, /// /// This trait has been designed to be composable. Multiple implementations can be combined into /// one that handles all the behaviours at once. -pub trait NetworkBehaviour { +pub trait NetworkBehaviour { /// Handler for all the protocols the network supports. type ProtocolsHandler: IntoProtocolsHandler; /// Event generated by the swarm. @@ -355,6 +310,10 @@ pub trait NetworkBehaviour { /// Builds a new `ProtocolsHandler`. fn new_handler(&mut self) -> Self::ProtocolsHandler; + /// Addresses that this behaviour is aware of for this specific peer, and that may allow + /// reaching the peer. + fn addresses_of_peer(&self, peer_id: &PeerId) -> Vec; + /// Indicates the behaviour that we connected to the node with the given peer id through the /// given endpoint. fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint); @@ -376,7 +335,7 @@ pub trait NetworkBehaviour { /// Polls for things that swarm should do. /// /// This API mimics the API of the `Stream` trait. - fn poll(&mut self, topology: &mut PollParameters) -> Async::Handler as ProtocolsHandler>::InEvent, Self::OutEvent>>; + fn poll(&mut self, topology: &mut PollParameters) -> Async::Handler as ProtocolsHandler>::InEvent, Self::OutEvent>>; } /// Used when deriving `NetworkBehaviour`. When deriving `NetworkBehaviour`, must be implemented @@ -390,20 +349,14 @@ pub trait NetworkBehaviourEventProcess { /// Parameters passed to `poll()`, that the `NetworkBehaviour` has access to. // TODO: #[derive(Debug)] -pub struct PollParameters<'a, TTopology: 'a> { - topology: &'a mut TTopology, +pub struct PollParameters<'a: 'a> { + local_peer_id: &'a PeerId, supported_protocols: &'a [Vec], listened_addrs: &'a [Multiaddr], nat_traversal: &'a dyn Fn(&Multiaddr, &Multiaddr) -> Option, } -impl<'a, TTopology> PollParameters<'a, TTopology> { - /// Returns a reference to the topology of the network. - #[inline] - pub fn topology(&mut self) -> &mut TTopology { - &mut self.topology - } - +impl<'a> PollParameters<'a> { /// Returns the list of protocol the behaviour supports when a remote negotiates a protocol on /// an inbound substream. /// @@ -423,27 +376,16 @@ impl<'a, TTopology> PollParameters<'a, TTopology> { /// Returns the list of the addresses nodes can use to reach us. #[inline] - pub fn external_addresses<'b>(&'b mut self) -> impl ExactSizeIterator + 'b - where TTopology: Topology - { - let local_peer_id = self.topology.local_peer_id().clone(); - self.topology.addresses_of_peer(&local_peer_id).into_iter() - } - - /// Returns the public key of the local node. - #[inline] - pub fn local_public_key(&self) -> &PublicKey - where TTopology: Topology - { - self.topology.local_public_key() + pub fn external_addresses<'b>(&'b mut self) -> impl ExactSizeIterator + 'b { + /*let local_peer_id = self.topology.local_peer_id().clone(); + self.topology.addresses_of_peer(&self.local_peer_id).into_iter()*/ + std::iter::empty() // TODO: ? } /// Returns the peer id of the local node. #[inline] - pub fn local_peer_id(&self) -> &PeerId - where TTopology: Topology - { - self.topology.local_peer_id() + pub fn local_peer_id(&self) -> &PeerId { + self.local_peer_id } /// Calls the `nat_traversal` method on the underlying transport of the `Swarm`. @@ -493,18 +435,15 @@ pub enum NetworkBehaviourAction { }, } -pub struct SwarmBuilder -where TTransport: Transport, - TBehaviour: NetworkBehaviour -{ +pub struct SwarmBuilder { incoming_limit: Option, - topology: TTopology, + local_peer_id: PeerId, transport: TTransport, behaviour: TBehaviour, } -impl SwarmBuilder -where TBehaviour: NetworkBehaviour, +impl SwarmBuilder +where TBehaviour: NetworkBehaviour, TMuxer: StreamMuxer + Send + Sync + 'static, ::OutboundSubstream: Send + 'static, ::Substream: Send + 'static, @@ -532,43 +471,37 @@ where TBehaviour: NetworkBehaviour, <<::Handler as ProtocolsHandler>::OutboundProtocol as OutboundUpgrade>>::Future: Send + 'static, <<::Handler as ProtocolsHandler>::OutboundProtocol as OutboundUpgrade>>::Error: fmt::Debug + Send + 'static, ::Handler> as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary - TTopology: Topology, { - pub fn new(transport: TTransport, behaviour: TBehaviour, - topology:TTopology) -> Self { + pub fn new(transport: TTransport, behaviour: TBehaviour, local_peer_id: PeerId) -> Self { SwarmBuilder { incoming_limit: None, - transport: transport, - topology: topology, - behaviour: behaviour, + local_peer_id, + transport, + behaviour, } } - pub fn incoming_limit(mut self, incoming_limit: Option) -> Self - { + pub fn incoming_limit(mut self, incoming_limit: Option) -> Self { self.incoming_limit = incoming_limit; self } - pub fn build(mut self) -> - Swarm - { + pub fn build(mut self) -> Swarm { let supported_protocols = self.behaviour .new_handler() - .into_handler(self.topology.local_peer_id()) + .into_handler(&self.local_peer_id) .listen_protocol() .protocol_info() .into_iter() .map(|info| info.protocol_name().to_vec()) .collect(); - let raw_swarm = RawSwarm::new_with_incoming_limit(self.transport, - self.topology.local_peer_id().clone(), - self.incoming_limit); + + let raw_swarm = RawSwarm::new_with_incoming_limit(self.transport, self.local_peer_id, self.incoming_limit); + Swarm { raw_swarm, behaviour: self.behaviour, - topology: self.topology, supported_protocols, listened_addrs: SmallVec::new(), } diff --git a/core/src/topology/mod.rs b/core/src/topology/mod.rs deleted file mode 100644 index d88bb0fbdae..00000000000 --- a/core/src/topology/mod.rs +++ /dev/null @@ -1,151 +0,0 @@ -// Copyright 2018 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -//! A *network topology* is a collection of nodes that are part of the network or that we think -//! are part of the network. In other words, it is essentially a container whose layout is -//! optimized for certain operations. -//! -//! In libp2p, a *topology* is any struct that implements at least the `Topology` trait. In order -//! to build a `Swarm`, you have to give to it ownership of a type that implements this trait. -//! -//! In order to use some protocols defined outside of `libp2p-core` (such as Kademlia) in your -//! `Swarm`, you will have to implement additional traits on your topology. -//! -//! While the `MemoryTopology` is provided as a ready-to-go topology that is suitable for quick -//! prototyping, it shouldn't be used in an actual high-performance production software. - -use std::collections::HashMap; -use crate::{swarm::ConnectedPoint, Multiaddr, PeerId, PublicKey}; - -/// Storage for the network topology. -/// -/// The topology should also store information about the local node, including its public key, its -/// `PeerId`, and the addresses it's advertising. -pub trait Topology { - /// Returns the addresses to try use to reach the given peer. - /// - /// > **Note**: Keep in mind that `peer` can be the local node. - fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec; - - /// Returns the `PeerId` of the local node. - fn local_peer_id(&self) -> &PeerId; - - /// Returns the public key of the local node. - fn local_public_key(&self) -> &PublicKey; - - /// Adds an address that other nodes can use to connect to our local node. - /// - /// > **Note**: Should later be returned when calling `addresses_of_peer()` with the `PeerId` - /// > of the local node. - fn add_local_external_addrs(&mut self, addrs: TIter) - where TIter: Iterator; - - /// Indicates to the topology that we have successfully connected to the given address with the - /// given `PeerId`. - fn set_connected(&mut self, _peer_id: &PeerId, _addr: &ConnectedPoint) {} - - /// Indicates to the topology that we have been disconnected from the given address with the - /// given `PeerId`. - fn set_disconnected(&mut self, _peer_id: &PeerId, _addr: &ConnectedPoint, _reason: DisconnectReason) {} - - /// Indicates to the topology that we have failed to reach the given address. - fn set_unreachable(&mut self, _addr: &Multiaddr) {} -} - -/// Reason why the peer has been disconnected. -#[derive(Debug, Copy, Clone)] -pub enum DisconnectReason { - Error, - Graceful, - Replaced, -} - -/// Topology of the network stored in memory. -pub struct MemoryTopology { - list: HashMap>, - local_peer_id: PeerId, - local_public_key: PublicKey, -} - -impl MemoryTopology { - /// Creates an empty topology. - #[inline] - pub fn empty(pubkey: PublicKey) -> MemoryTopology { - let local_peer_id = pubkey.clone().into_peer_id(); - - MemoryTopology { - list: Default::default(), - local_peer_id, - local_public_key: pubkey, - } - } - - /// Returns true if the topology is empty. - #[inline] - pub fn is_empty(&self) -> bool { - self.list.is_empty() - } - - /// Adds an address to the topology. - #[inline] - pub fn add_address(&mut self, peer: PeerId, addr: Multiaddr) { - let addrs = self.list.entry(peer).or_insert_with(|| Vec::new()); - if addrs.iter().all(|a| a != &addr) { - addrs.push(addr); - } - } - - /// Returns a list of all the known peers in the topology. - #[inline] - pub fn peers(&self) -> impl Iterator { - self.list.keys() - } - - /// Returns an iterator to all the entries in the topology. - #[inline] - pub fn iter(&self) -> impl Iterator { - self.list.iter().flat_map(|(p, l)| l.iter().map(move |ma| (p, ma))) - } -} - -impl Topology for MemoryTopology { - fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec { - self.list.get(peer).map(|v| v.clone()).unwrap_or(Vec::new()) - } - - fn add_local_external_addrs(&mut self, addrs: TIter) - where TIter: Iterator - { - for addr in addrs { - let id = self.local_peer_id.clone(); - self.add_address(id, addr); - } - } - - #[inline] - fn local_peer_id(&self) -> &PeerId { - &self.local_peer_id - } - - #[inline] - fn local_public_key(&self) -> &PublicKey { - &self.local_public_key - } -} diff --git a/examples/chat.rs b/examples/chat.rs index 8ce4485891f..46081fa203f 100644 --- a/examples/chat.rs +++ b/examples/chat.rs @@ -20,7 +20,7 @@ //! A basic chat application demonstrating libp2p and the mDNS and floodsub protocols. //! -//! Using two terminal windows, start two instances. If you local network allows mDNS, +//! Using two terminal windows, start two instances. If you local network allows mDNS, //! they will automatically connect. Type a message in either terminal and hit return: the //! message is sent and printed in the other terminal. Close with Ctrl-c. //! @@ -47,7 +47,7 @@ //! cargo run --example chat -- /ip4/127.0.0.1/tcp/24915 //! ``` //! -//! The two nodes then connect. +//! The two nodes then connect. extern crate env_logger; extern crate futures; @@ -67,8 +67,8 @@ fn main() { // Create a random PeerId let local_key = secio::SecioKeyPair::ed25519_generated().unwrap(); - let local_pub_key = local_key.to_public_key(); - println!("Local peer id: {:?}", local_pub_key.clone().into_peer_id()); + let local_peer_id = local_key.to_peer_id(); + println!("Local peer id: {:?}", local_peer_id); // Set up a an encrypted DNS-enabled TCP Transport over the Mplex and Yamux protocols let transport = libp2p::build_development_transport(local_key); @@ -84,9 +84,13 @@ fn main() { mdns: libp2p::mdns::Mdns, } - impl libp2p::core::swarm::NetworkBehaviourEventProcess for MyBehaviour { - fn inject_event(&mut self, _ev: void::Void) { - void::unreachable(_ev) + impl libp2p::core::swarm::NetworkBehaviourEventProcess for MyBehaviour { + fn inject_event(&mut self, event: libp2p::mdns::MdnsEvent) { + match event { + libp2p::mdns::MdnsEvent::Discovered(list) => { + // TODO: + }, + } } } @@ -102,12 +106,12 @@ fn main() { // Create a Swarm to manage peers and events let mut swarm = { let mut behaviour = MyBehaviour { - floodsub: libp2p::floodsub::Floodsub::new(local_pub_key.clone().into_peer_id()), + floodsub: libp2p::floodsub::Floodsub::new(local_peer_id.clone()), mdns: libp2p::mdns::Mdns::new().expect("Failed to create mDNS service"), }; behaviour.floodsub.subscribe(floodsub_topic.clone()); - libp2p::Swarm::new(transport, behaviour, libp2p::core::topology::MemoryTopology::empty(local_pub_key)) + libp2p::Swarm::new(transport, behaviour, local_peer_id) }; // Listen on all interfaces and whatever port the OS assigns @@ -145,7 +149,7 @@ fn main() { loop { match swarm.poll().expect("Error while polling swarm") { Async::Ready(Some(_)) => { - + }, Async::Ready(None) | Async::NotReady => break, } diff --git a/examples/ipfs-kad.rs b/examples/ipfs-kad.rs index 74be104506e..32a30816a56 100644 --- a/examples/ipfs-kad.rs +++ b/examples/ipfs-kad.rs @@ -37,23 +37,11 @@ use libp2p::{ fn main() { // Create a random key for ourselves. let local_key = secio::SecioKeyPair::ed25519_generated().unwrap(); - let local_pub_key = local_key.to_public_key(); + let local_peer_id = local_key.to_peer_id(); // Set up a an encrypted DNS-enabled TCP Transport over the Mplex protocol let transport = libp2p::build_development_transport(local_key); - // Create the topology of the network with the IPFS bootstrap nodes. - let mut topology = libp2p::core::topology::MemoryTopology::empty(local_pub_key.clone()); - topology.add_address("QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ".parse().unwrap(), "/ip4/104.131.131.82/tcp/4001".parse().unwrap()); - topology.add_address("QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM".parse().unwrap(), "/ip4/104.236.179.241/tcp/4001".parse().unwrap()); - topology.add_address("QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64".parse().unwrap(), "/ip4/104.236.76.40/tcp/4001".parse().unwrap()); - topology.add_address("QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu".parse().unwrap(), "/ip4/128.199.219.111/tcp/4001".parse().unwrap()); - topology.add_address("QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd".parse().unwrap(), "/ip4/178.62.158.247/tcp/4001".parse().unwrap()); - topology.add_address("QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu".parse().unwrap(), "/ip6/2400:6180:0:d0::151:6001/tcp/4001".parse().unwrap()); - topology.add_address("QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM".parse().unwrap(), "/ip6/2604:a880:1:20::203:d001/tcp/4001".parse().unwrap()); - topology.add_address("QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64".parse().unwrap(), "/ip6/2604:a880:800:10::4a:5001/tcp/4001".parse().unwrap()); - topology.add_address("QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd".parse().unwrap(), "/ip6/2a03:b0c0:0:1010::23:1001/tcp/4001".parse().unwrap()); - // Create a swarm to manage peers and events. let mut swarm = { // Create a Kademlia behaviour. @@ -61,8 +49,17 @@ fn main() { // to insert our local node in the DHT. However here we use `without_init` because this // example is very ephemeral and we don't want to pollute the DHT. In a real world // application, you want to use `new` instead. - let mut behaviour = libp2p::kad::Kademlia::without_init(local_pub_key.into_peer_id()); - libp2p::core::Swarm::new(transport, behaviour, topology) + let mut behaviour = libp2p::kad::Kademlia::without_init(local_peer_id.clone()); + behaviour.add_address(&"QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ".parse().unwrap(), "/ip4/104.131.131.82/tcp/4001".parse().unwrap()); + behaviour.add_address(&"QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM".parse().unwrap(), "/ip4/104.236.179.241/tcp/4001".parse().unwrap()); + behaviour.add_address(&"QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64".parse().unwrap(), "/ip4/104.236.76.40/tcp/4001".parse().unwrap()); + behaviour.add_address(&"QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu".parse().unwrap(), "/ip4/128.199.219.111/tcp/4001".parse().unwrap()); + behaviour.add_address(&"QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd".parse().unwrap(), "/ip4/178.62.158.247/tcp/4001".parse().unwrap()); + behaviour.add_address(&"QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu".parse().unwrap(), "/ip6/2400:6180:0:d0::151:6001/tcp/4001".parse().unwrap()); + behaviour.add_address(&"QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM".parse().unwrap(), "/ip6/2604:a880:1:20::203:d001/tcp/4001".parse().unwrap()); + behaviour.add_address(&"QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64".parse().unwrap(), "/ip6/2604:a880:800:10::4a:5001/tcp/4001".parse().unwrap()); + behaviour.add_address(&"QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd".parse().unwrap(), "/ip6/2a03:b0c0:0:1010::23:1001/tcp/4001".parse().unwrap()); + libp2p::core::Swarm::new(transport, behaviour, local_peer_id) }; // Order Kademlia to search for a peer. diff --git a/misc/core-derive/src/lib.rs b/misc/core-derive/src/lib.rs index 79a551a5331..dce3e681a69 100644 --- a/misc/core-derive/src/lib.rs +++ b/misc/core-derive/src/lib.rs @@ -49,6 +49,7 @@ fn build(ast: &DeriveInput) -> TokenStream { fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { let name = &ast.ident; let (_, ty_generics, where_clause) = ast.generics.split_for_impl(); + let multiaddr = quote!{::libp2p::core::Multiaddr}; let trait_to_impl = quote!{::libp2p::core::swarm::NetworkBehaviour}; let net_behv_event_proc = quote!{::libp2p::core::swarm::NetworkBehaviourEventProcess}; let either_ident = quote!{::libp2p::core::either::EitherOutput}; @@ -70,25 +71,14 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { quote!{#n} }; - // Name of the type parameter that represents the topology. - let topology_generic = { - let mut n = "TTopology".to_string(); - // Avoid collisions. - while ast.generics.type_params().any(|tp| tp.ident.to_string() == n) { - n.push('1'); - } - let n = Ident::new(&n, name.span()); - quote!{#n} - }; - - let poll_parameters = quote!{::libp2p::core::swarm::PollParameters<#topology_generic>}; + let poll_parameters = quote!{::libp2p::core::swarm::PollParameters}; // Build the generics. let impl_generics = { let tp = ast.generics.type_params(); let lf = ast.generics.lifetimes(); let cst = ast.generics.const_params(); - quote!{<#(#lf,)* #(#tp,)* #(#cst,)* #topology_generic, #substream_generic>} + quote!{<#(#lf,)* #(#tp,)* #(#cst,)* #substream_generic>} }; // Build the `where ...` clause of the trait implementation. @@ -98,12 +88,12 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { .flat_map(|field| { let ty = &field.ty; vec![ - quote!{#ty: #trait_to_impl<#topology_generic>}, - quote!{Self: #net_behv_event_proc<<#ty as #trait_to_impl<#topology_generic>>::OutEvent>}, - quote!{<<#ty as #trait_to_impl<#topology_generic>>::ProtocolsHandler as #into_protocols_handler>::Handler: #protocols_handler}, + quote!{#ty: #trait_to_impl}, + quote!{Self: #net_behv_event_proc<<#ty as #trait_to_impl>::OutEvent>}, + quote!{<<#ty as #trait_to_impl>::ProtocolsHandler as #into_protocols_handler>::Handler: #protocols_handler}, // Note: this bound is required because of https://github.com/rust-lang/rust/issues/55697 - quote!{<<<#ty as #trait_to_impl<#topology_generic>>::ProtocolsHandler as #into_protocols_handler>::Handler as #protocols_handler>::InboundProtocol: ::libp2p::core::InboundUpgrade<#substream_generic>}, - quote!{<<<#ty as #trait_to_impl<#topology_generic>>::ProtocolsHandler as #into_protocols_handler>::Handler as #protocols_handler>::OutboundProtocol: ::libp2p::core::OutboundUpgrade<#substream_generic>}, + quote!{<<<#ty as #trait_to_impl>::ProtocolsHandler as #into_protocols_handler>::Handler as #protocols_handler>::InboundProtocol: ::libp2p::core::InboundUpgrade<#substream_generic>}, + quote!{<<<#ty as #trait_to_impl>::ProtocolsHandler as #into_protocols_handler>::Handler as #protocols_handler>::OutboundProtocol: ::libp2p::core::OutboundUpgrade<#substream_generic>}, ] }) .collect::>(); @@ -143,6 +133,20 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { out }; + // Build the list of statements to put in the body of `addresses_of_peer()`. + let addresses_of_peer_stmts = { + data_struct.fields.iter().enumerate().filter_map(move |(field_n, field)| { + if is_ignored(&field) { + return None; + } + + Some(match field.ident { + Some(ref i) => quote!{ out.extend(self.#i.addresses_of_peer(peer_id)); }, + None => quote!{ out.extend(self.#field_n.addresses_of_peer(peer_id)); }, + }) + }) + }; + // Build the list of statements to put in the body of `inject_connected()`. let inject_connected_stmts = { let num_fields = data_struct.fields.iter().filter(|f| !is_ignored(f)).count(); @@ -216,7 +220,7 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { continue; } let ty = &field.ty; - let field_info = quote!{ <#ty as #trait_to_impl<#topology_generic>>::ProtocolsHandler }; + let field_info = quote!{ <#ty as #trait_to_impl>::ProtocolsHandler }; match ph_ty { Some(ev) => ph_ty = Some(quote!{ #into_proto_select_ident<#ev, #field_info> }), ref mut ev @ None => *ev = Some(field_info), @@ -321,7 +325,7 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { // Now the magic happens. let final_quote = quote!{ - impl #impl_generics #trait_to_impl<#topology_generic> for #name #ty_generics + impl #impl_generics #trait_to_impl for #name #ty_generics #where_clause { type ProtocolsHandler = #protocols_handler_ty; @@ -333,6 +337,12 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { #new_handler } + fn addresses_of_peer(&self, peer_id: &#peer_id) -> Vec<#multiaddr> { + let mut out = Vec::new(); + #(#addresses_of_peer_stmts);* + out + } + #[inline] fn inject_connected(&mut self, peer_id: #peer_id, endpoint: #connected_point) { #(#inject_connected_stmts);* diff --git a/misc/core-derive/tests/test.rs b/misc/core-derive/tests/test.rs index 98073e94285..5f83a2e7f1c 100644 --- a/misc/core-derive/tests/test.rs +++ b/misc/core-derive/tests/test.rs @@ -24,7 +24,7 @@ extern crate void; /// Small utility to check that a type implements `NetworkBehaviour`. #[allow(dead_code)] -fn require_net_behaviour>() {} +fn require_net_behaviour() {} // TODO: doesn't compile /*#[test] diff --git a/misc/mdns/Cargo.toml b/misc/mdns/Cargo.toml index d021ee4d4f2..c84fb4e1b62 100644 --- a/misc/mdns/Cargo.toml +++ b/misc/mdns/Cargo.toml @@ -1,5 +1,6 @@ [package] name = "libp2p-mdns" +edition = "2018" version = "0.2.0" description = "Implementation of the libp2p mDNS discovery method" authors = ["Parity Technologies "] diff --git a/misc/mdns/src/behaviour.rs b/misc/mdns/src/behaviour.rs index e1e94841f55..2a524901814 100644 --- a/misc/mdns/src/behaviour.rs +++ b/misc/mdns/src/behaviour.rs @@ -22,11 +22,10 @@ use crate::service::{MdnsService, MdnsPacket}; use futures::prelude::*; use libp2p_core::protocols_handler::{DummyProtocolsHandler, ProtocolsHandler}; use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters}; -use libp2p_core::{Multiaddr, PeerId, multiaddr::Protocol, topology::MemoryTopology, topology::Topology}; +use libp2p_core::{Multiaddr, PeerId, multiaddr::Protocol}; use smallvec::SmallVec; -use std::{fmt, io, iter, marker::PhantomData, time::Duration}; +use std::{cmp, fmt, io, iter, marker::PhantomData, time::Duration, time::Instant}; use tokio_io::{AsyncRead, AsyncWrite}; -use void::{self, Void}; /// A `NetworkBehaviour` for mDNS. Automatically discovers peers on the local network and adds /// them to the topology. @@ -34,6 +33,12 @@ pub struct Mdns { /// The inner service. service: MdnsService, + /// List of nodes that we have discovered, the address, and when their TTL expires. + /// + /// Each combination of `PeerId` and `Multiaddr` can only appear once, but the same `PeerId` + /// can appear multiple times. + discovered_nodes: SmallVec<[(PeerId, Multiaddr, Instant); 8]>, + /// If `Some`, then we automatically connect to nodes we discover and this is the list of nodes /// to connect to. Drained in `poll()`. /// If `None`, then we don't automatically connect. @@ -48,39 +53,49 @@ impl Mdns { pub fn new() -> io::Result> { Ok(Mdns { service: MdnsService::new()?, + discovered_nodes: SmallVec::new(), to_connect_to: Some(SmallVec::new()), marker: PhantomData, }) } } -/// Trait that must be implemented on the network topology for it to be usable with `Mdns`. -pub trait MdnsTopology: Topology { - /// Adds an address discovered by mDNS. - /// - /// Will never be called with the local peer ID. - fn add_mdns_discovered_address(&mut self, peer: PeerId, addr: Multiaddr); +/// Event that can be produced by the `Mdns` behaviour. +#[derive(Debug)] +pub enum MdnsEvent { + /// Discovered nodes through mDNS. + Discovered(Vec), } -impl MdnsTopology for MemoryTopology { - #[inline] - fn add_mdns_discovered_address(&mut self, peer: PeerId, addr: Multiaddr) { - self.add_address(peer, addr) - } +/// One node discovered by mDNS. +#[derive(Debug)] +pub struct MdnsDiscovered { + /// Id of the peer that has been discovered. + pub peer_id: PeerId, + /// Addresses of the peer that has been discovered. + pub addresses: Vec, } -impl NetworkBehaviour for Mdns +impl NetworkBehaviour for Mdns where TSubstream: AsyncRead + AsyncWrite, - TTopology: MdnsTopology, { type ProtocolsHandler = DummyProtocolsHandler; - type OutEvent = Void; + type OutEvent = MdnsEvent; fn new_handler(&mut self) -> Self::ProtocolsHandler { DummyProtocolsHandler::default() } + fn addresses_of_peer(&self, peer_id: &PeerId) -> Vec { + let now = Instant::now(); + self.discovered_nodes + .iter() + .filter(move |(p, _, expires)| p == peer_id && *expires > now) + .map(|(_, addr, _)| addr.clone()) + .collect() + } + fn inject_connected(&mut self, _: PeerId, _: ConnectedPoint) {} fn inject_disconnected(&mut self, _: &PeerId, _: ConnectedPoint) {} @@ -95,13 +110,19 @@ where fn poll( &mut self, - params: &mut PollParameters, + params: &mut PollParameters, ) -> Async< NetworkBehaviourAction< ::InEvent, Self::OutEvent, >, > { + // Remove expired peers. + { + let now = Instant::now(); + self.discovered_nodes.retain(move |(_, _, expires)| *expires > now); + } + loop { if let Some(ref mut to_connect_to) = self.to_connect_to { if !to_connect_to.is_empty() { @@ -134,23 +155,43 @@ where .chain(iter::once(obs_port)) .collect(); + let mut discovered = Vec::new(); for peer in response.discovered_peers() { if peer.id() == params.local_peer_id() { continue; } + let new_expiration = Instant::now() + peer.ttl(); + + let mut addrs = Vec::new(); for addr in peer.addresses() { if let Some(new_addr) = params.nat_traversal(&addr, &observed) { - params.topology().add_mdns_discovered_address(peer.id().clone(), new_addr); + addrs.push(new_addr); } + addrs.push(addr); + } - params.topology().add_mdns_discovered_address(peer.id().clone(), addr); + for addr in &addrs { + if let Some((_, _, cur_expires)) = self.discovered_nodes.iter_mut() + .find(|(p, a, _)| p == peer.id() && a == addr) + { + *cur_expires = cmp::max(*cur_expires, new_expiration); + } else { + self.discovered_nodes.push((peer.id().clone(), addr.clone(), new_expiration)); + } } + discovered.push(MdnsDiscovered { + peer_id: peer.id().clone(), + addresses: addrs, + }); + if let Some(ref mut to_connect_to) = self.to_connect_to { to_connect_to.push(peer.id().clone()); } } + + return Async::Ready(NetworkBehaviourAction::GenerateEvent(MdnsEvent::Discovered(discovered))); }, MdnsPacket::ServiceDiscovery(disc) => { disc.respond(Duration::from_secs(5 * 60)); diff --git a/misc/mdns/src/dns.rs b/misc/mdns/src/dns.rs index 77d52d01104..d7642b2e2c2 100644 --- a/misc/mdns/src/dns.rs +++ b/misc/mdns/src/dns.rs @@ -21,11 +21,11 @@ //! Contains methods that handle the DNS encoding and decoding capabilities not available in the //! `dns_parser` library. +use crate::{META_QUERY_SERVICE, SERVICE_NAME}; use data_encoding; use libp2p_core::{Multiaddr, PeerId}; use rand; use std::{borrow::Cow, cmp, error, fmt, str, time::Duration}; -use {META_QUERY_SERVICE, SERVICE_NAME}; /// Decodes a `` (as defined by RFC1035) into a `Vec` of ASCII characters. // TODO: better error type? diff --git a/misc/mdns/src/lib.rs b/misc/mdns/src/lib.rs index 7f5fd59d873..c79939b4628 100644 --- a/misc/mdns/src/lib.rs +++ b/misc/mdns/src/lib.rs @@ -52,7 +52,7 @@ const SERVICE_NAME: &'static [u8] = b"_p2p._udp.local"; /// Hardcoded name of the service used for DNS-SD. const META_QUERY_SERVICE: &'static [u8] = b"_services._dns-sd._udp.local"; -pub use self::behaviour::{Mdns, MdnsTopology}; +pub use self::behaviour::{Mdns, MdnsEvent}; pub use self::service::MdnsService; mod behaviour; diff --git a/misc/mdns/src/service.rs b/misc/mdns/src/service.rs index b0f8b475932..da87706664b 100644 --- a/misc/mdns/src/service.rs +++ b/misc/mdns/src/service.rs @@ -48,7 +48,7 @@ pub use dns::MdnsResponseError; /// the local network. /// /// # Usage -/// +/// /// In order to use mDNS to discover peers on the local network, use the `MdnsService`. This is /// done by creating a `MdnsService` then polling it in the same way as you would poll a stream. /// @@ -451,6 +451,7 @@ impl<'a> MdnsResponse<'a> { packet, record_value, peer_id, + ttl: record.ttl, }) }) } @@ -478,6 +479,8 @@ pub struct MdnsPeer<'a> { record_value: String, /// Id of the peer. peer_id: PeerId, + /// TTL of the record in seconds. + ttl: u32, } impl<'a> MdnsPeer<'a> { @@ -487,6 +490,12 @@ impl<'a> MdnsPeer<'a> { &self.peer_id } + /// Returns the requested time-to-live for the record. + #[inline] + pub fn ttl(&self) -> Duration { + Duration::from_secs(u64::from(self.ttl)) + } + /// Returns the list of addresses the peer says it is listening on. /// /// Filters out invalid addresses. diff --git a/protocols/floodsub/src/layer.rs b/protocols/floodsub/src/layer.rs index c1ac0a18f73..35f8800e1eb 100644 --- a/protocols/floodsub/src/layer.rs +++ b/protocols/floodsub/src/layer.rs @@ -23,7 +23,7 @@ use crate::topic::{Topic, TopicHash}; use cuckoofilter::CuckooFilter; use futures::prelude::*; use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters}; -use libp2p_core::{protocols_handler::ProtocolsHandler, protocols_handler::OneShotHandler, PeerId}; +use libp2p_core::{protocols_handler::ProtocolsHandler, protocols_handler::OneShotHandler, Multiaddr, PeerId}; use rand; use smallvec::SmallVec; use std::{collections::VecDeque, iter, marker::PhantomData}; @@ -171,7 +171,7 @@ impl Floodsub { } } -impl NetworkBehaviour for Floodsub +impl NetworkBehaviour for Floodsub where TSubstream: AsyncRead + AsyncWrite, { @@ -182,6 +182,10 @@ where Default::default() } + fn addresses_of_peer(&self, peer_id: &PeerId) -> Vec { + Vec::new() // TODO: no + } + fn inject_connected(&mut self, id: PeerId, _: ConnectedPoint) { // We need to send our subscriptions to the newly-connected node. for topic in self.subscribed_topics.iter() { @@ -290,7 +294,7 @@ where fn poll( &mut self, - _: &mut PollParameters, + _: &mut PollParameters, ) -> Async< NetworkBehaviourAction< ::InEvent, diff --git a/protocols/identify/src/identify.rs b/protocols/identify/src/identify.rs index 3573979cf48..3fcfbfa3d0a 100644 --- a/protocols/identify/src/identify.rs +++ b/protocols/identify/src/identify.rs @@ -21,11 +21,10 @@ use crate::listen_handler::IdentifyListenHandler; use crate::periodic_id_handler::{PeriodicIdHandler, PeriodicIdHandlerEvent}; use crate::protocol::{IdentifyInfo, IdentifySender, IdentifySenderFuture}; -use crate::topology::IdentifyTopology; use futures::prelude::*; use libp2p_core::protocols_handler::{ProtocolsHandler, ProtocolsHandlerSelect, ProtocolsHandlerUpgrErr}; use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters}; -use libp2p_core::{Multiaddr, PeerId, either::EitherOutput}; +use libp2p_core::{Multiaddr, PeerId, PublicKey, either::EitherOutput}; use smallvec::SmallVec; use std::{collections::HashMap, collections::VecDeque, io}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -38,6 +37,8 @@ pub struct Identify { protocol_version: String, /// Agent version to send back to remotes. agent_version: String, + /// The public key of the local node. To report on the wire. + local_public_key: PublicKey, /// For each peer we're connected to, the observed address to send back to it. observed_addresses: HashMap, /// List of senders to answer, with the observed multiaddr. @@ -50,10 +51,11 @@ pub struct Identify { impl Identify { /// Creates a `Identify`. - pub fn new(protocol_version: String, agent_version: String) -> Self { + pub fn new(protocol_version: String, agent_version: String, local_public_key: PublicKey) -> Self { Identify { protocol_version, agent_version, + local_public_key, observed_addresses: HashMap::new(), to_answer: SmallVec::new(), futures: SmallVec::new(), @@ -62,10 +64,9 @@ impl Identify { } } -impl NetworkBehaviour for Identify +impl NetworkBehaviour for Identify where TSubstream: AsyncRead + AsyncWrite, - TTopology: IdentifyTopology, { type ProtocolsHandler = ProtocolsHandlerSelect, PeriodicIdHandler>; type OutEvent = IdentifyEvent; @@ -74,6 +75,10 @@ where IdentifyListenHandler::new().select(PeriodicIdHandler::new()) } + fn addresses_of_peer(&self, _: &PeerId) -> Vec { + Vec::new() + } + fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) { let observed = match endpoint { ConnectedPoint::Dialer { address } => address, @@ -124,7 +129,7 @@ where fn poll( &mut self, - params: &mut PollParameters, + params: &mut PollParameters, ) -> Async< NetworkBehaviourAction< ::InEvent, @@ -132,12 +137,6 @@ where >, > { if let Some(event) = self.events.pop_front() { - // We intercept identified events in order to insert the addresses in the topology. - if let NetworkBehaviourAction::GenerateEvent(IdentifyEvent::Identified { ref peer_id, ref info, .. }) = event { - let iter = info.listen_addrs.iter().cloned(); - params.topology().add_identify_discovered_addrs(peer_id, iter); - } - return Async::Ready(event); } @@ -150,7 +149,7 @@ where .collect(); let send_back_info = IdentifyInfo { - public_key: params.local_public_key().clone(), + public_key: self.local_public_key.clone(), protocol_version: self.protocol_version.clone(), agent_version: self.agent_version.clone(), listen_addrs: params.listened_addresses().cloned().collect(), diff --git a/protocols/identify/src/lib.rs b/protocols/identify/src/lib.rs index 9e7633b9829..2dd42c88a95 100644 --- a/protocols/identify/src/lib.rs +++ b/protocols/identify/src/lib.rs @@ -83,7 +83,6 @@ extern crate void; pub use self::identify::{Identify, IdentifyEvent}; pub use self::id_transport::IdentifyTransport; -pub use self::topology::IdentifyTopology; pub mod listen_handler; pub mod periodic_id_handler; @@ -92,4 +91,3 @@ pub mod protocol; mod identify; mod id_transport; mod structs_proto; -mod topology; diff --git a/protocols/identify/src/topology.rs b/protocols/identify/src/topology.rs deleted file mode 100644 index 6215dab01fb..00000000000 --- a/protocols/identify/src/topology.rs +++ /dev/null @@ -1,43 +0,0 @@ -// Copyright 2018 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -use libp2p_core::{Multiaddr, PeerId}; -use libp2p_core::topology::{MemoryTopology, Topology}; - -/// Trait required on the topology for the identify system to store addresses. -pub trait IdentifyTopology: Topology { - /// Adds to the topology an address discovered through identification. - /// - /// > **Note**: Will never be called with the local peer ID. - fn add_identify_discovered_addrs(&mut self, peer: &PeerId, addr: TIter) - where - TIter: Iterator; -} - -impl IdentifyTopology for MemoryTopology { - fn add_identify_discovered_addrs(&mut self, peer: &PeerId, addr: TIter) - where - TIter: Iterator, - { - for addr in addr { - self.add_address(peer.clone(), addr); - } - } -} diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index 6d4771445f7..074cf7a96c4 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -19,13 +19,13 @@ // DEALINGS IN THE SOFTWARE. use crate::handler::{KademliaHandler, KademliaHandlerEvent, KademliaHandlerIn, KademliaRequestId}; +use crate::kbucket::KBucketsTable; use crate::protocol::{KadConnectionType, KadPeer}; use crate::query::{QueryConfig, QueryState, QueryStatePollOut, QueryTarget}; -use crate::topology::KademliaTopology; use fnv::{FnvHashMap, FnvHashSet}; use futures::{prelude::*, stream}; use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters}; -use libp2p_core::{protocols_handler::ProtocolsHandler, topology::Topology, Multiaddr, PeerId}; +use libp2p_core::{protocols_handler::ProtocolsHandler, Multiaddr, PeerId}; use multihash::Multihash; use rand; use smallvec::SmallVec; @@ -35,8 +35,8 @@ use tokio_timer::Interval; /// Network behaviour that handles Kademlia. pub struct Kademlia { - /// Peer ID of the local node. - local_peer_id: PeerId, + /// Storage for the nodes. Contains the known multiaddresses for this node. + kbuckets: KBucketsTable>, /// All the iterative queries we are currently performing, with their ID. The last parameter /// is the list of accumulated providers for `GET_PROVIDERS` queries. @@ -80,9 +80,6 @@ pub struct Kademlia { /// Events to return when polling. queued_events: SmallVec<[NetworkBehaviourAction, KademliaOut>; 32]>, - /// List of addresses to add to the topology as soon as we are in `poll()`. - add_to_topology: SmallVec<[(PeerId, Multiaddr, KadConnectionType); 32]>, - /// List of providers to add to the topology as soon as we are in `poll()`. add_provider: SmallVec<[(Multihash, PeerId); 32]>, @@ -121,12 +118,17 @@ impl Kademlia { Self::new_inner(local_peer_id, false) } + /// Adds a known address for the given `PeerId`. + pub fn add_address(&mut self, peer_id: &PeerId, address: Multiaddr) { + unimplemented!() // TODO: + } + /// Inner implementation of the constructors. fn new_inner(local_peer_id: PeerId, initialize: bool) -> Self { let parallelism = 3; let mut behaviour = Kademlia { - local_peer_id: local_peer_id.clone(), + kbuckets: KBucketsTable::new(local_peer_id, Duration::from_secs(60)), // TODO: constant queued_events: SmallVec::new(), queries_to_starts: SmallVec::new(), active_queries: Default::default(), @@ -139,7 +141,6 @@ impl Kademlia { parallelism, num_results: 20, rpc_timeout: Duration::from_secs(8), - add_to_topology: SmallVec::new(), add_provider: SmallVec::new(), marker: PhantomData, }; @@ -148,7 +149,7 @@ impl Kademlia { // As part of the initialization process, we start one `FIND_NODE` for each bit of the // possible range of peer IDs. for n in 0..256 { - let peer_id = match gen_random_id(&local_peer_id, n) { + let peer_id = match gen_random_id(behaviour.kbuckets.my_id(), n) { Ok(p) => p, Err(()) => continue, }; @@ -163,26 +164,24 @@ impl Kademlia { /// Builds a `KadPeer` structure corresponding to the local node. fn build_local_kad_peer(&self, local_addrs: impl IntoIterator) -> KadPeer { KadPeer { - node_id: self.local_peer_id.clone(), + node_id: self.kbuckets.my_id().clone(), multiaddrs: local_addrs.into_iter().collect(), connection_ty: KadConnectionType::Connected, } } /// Builds the answer to a request. - fn build_result(&self, query: QueryTarget, request_id: KademliaRequestId, parameters: &mut PollParameters) + fn build_result(&mut self, query: QueryTarget, request_id: KademliaRequestId, parameters: &mut PollParameters) -> KademliaHandlerIn - where TTopology: KademliaTopology { let local_kad_peer = self.build_local_kad_peer(parameters.external_addresses()); match query { QueryTarget::FindPeer(key) => { - let topology = parameters.topology(); - // TODO: insert local_kad_peer somewhere? - let closer_peers = topology - .closest_peers(key.as_ref(), self.num_results) - .map(|peer_id| build_kad_peer(peer_id, topology, &self.connected_peers)) + let closer_peers = self.kbuckets + .find_closest_with_self(key.as_ref()) + .take(self.num_results) + .map(|peer_id| build_kad_peer(peer_id, &self.connected_peers)) .collect(); KademliaHandlerIn::FindNodeRes { @@ -191,24 +190,24 @@ impl Kademlia { } }, QueryTarget::GetProviders(key) => { - let topology = parameters.topology(); // TODO: insert local_kad_peer somewhere? - let closer_peers = topology - .closest_peers(&key, self.num_results) - .map(|peer_id| build_kad_peer(peer_id, topology, &self.connected_peers)) + let closer_peers = self.kbuckets + .find_closest_with_self(&key) + .take(self.num_results) + .map(|peer_id| build_kad_peer(peer_id, &self.connected_peers)) .collect(); let local_node_is_providing = self.providing_keys.iter().any(|k| k.as_ref() == &key); - let provider_peers = topology + let provider_peers = Vec::new();/* TODO: topology .get_providers(&key) - .map(|peer_id| build_kad_peer(peer_id, topology, &self.connected_peers)) + .map(|peer_id| build_kad_peer(peer_id, &self.connected_peers)) .chain(if local_node_is_providing { Some(local_kad_peer) } else { None }.into_iter()) - .collect(); + .collect();*/ KademliaHandlerIn::GetProvidersRes { closer_peers, @@ -271,10 +270,9 @@ impl Kademlia { } } -impl NetworkBehaviour for Kademlia +impl NetworkBehaviour for Kademlia where TSubstream: AsyncRead + AsyncWrite, - TTopology: KademliaTopology, { type ProtocolsHandler = KademliaHandler; type OutEvent = KademliaOut; @@ -283,6 +281,10 @@ where KademliaHandler::dial_and_listen() } + fn addresses_of_peer(&self, peer_id: &PeerId) -> Vec { + Vec::new() // TODO: wrong + } + fn inject_connected(&mut self, id: PeerId, _: ConnectedPoint) { if let Some(pos) = self.pending_rpcs.iter().position(|(p, _)| p == &id) { let (_, rpc) = self.pending_rpcs.remove(pos); @@ -317,10 +319,11 @@ where // It is possible that we obtain a response for a query that has finished, which is // why we may not find an entry in `self.active_queries`. for peer in closer_peers.iter() { - for addr in peer.multiaddrs.iter() { - self.add_to_topology - .push((peer.node_id.clone(), addr.clone(), peer.connection_ty)); - } + self.queued_events.push(NetworkBehaviourAction::GenerateEvent(KademliaOut::Discovered { + peer_id: peer.node_id.clone(), + addresses: peer.multiaddrs.clone(), + ty: peer.connection_ty, + })); } if let Some((query, _, _)) = self.active_queries.get_mut(&user_data) { query.inject_rpc_result(&source, closer_peers.into_iter().map(|kp| kp.node_id)) @@ -336,10 +339,11 @@ where user_data, } => { for peer in closer_peers.iter().chain(provider_peers.iter()) { - for addr in peer.multiaddrs.iter() { - self.add_to_topology - .push((peer.node_id.clone(), addr.clone(), peer.connection_ty)); - } + self.queued_events.push(NetworkBehaviourAction::GenerateEvent(KademliaOut::Discovered { + peer_id: peer.node_id.clone(), + addresses: peer.multiaddrs.clone(), + ty: peer.connection_ty, + })); } // It is possible that we obtain a response for a query that has finished, which is // why we may not find an entry in `self.active_queries`. @@ -358,10 +362,11 @@ where } } KademliaHandlerEvent::AddProvider { key, provider_peer } => { - for addr in provider_peer.multiaddrs.iter() { - self.add_to_topology - .push((provider_peer.node_id.clone(), addr.clone(), provider_peer.connection_ty)); - } + self.queued_events.push(NetworkBehaviourAction::GenerateEvent(KademliaOut::Discovered { + peer_id: provider_peer.node_id.clone(), + addresses: provider_peer.multiaddrs.clone(), + ty: provider_peer.connection_ty, + })); self.add_provider.push((key, provider_peer.node_id)); return; } @@ -370,7 +375,7 @@ where fn poll( &mut self, - parameters: &mut PollParameters, + parameters: &mut PollParameters, ) -> Async< NetworkBehaviourAction< ::InEvent, @@ -378,13 +383,10 @@ where >, > { // Flush the changes to the topology that we want to make. - for (peer_id, addr, connection_ty) in self.add_to_topology.drain() { - parameters.topology().add_kad_discovered_address(peer_id, addr, connection_ty); - } - self.add_to_topology.shrink_to_fit(); - for (key, provider) in self.add_provider.drain() { + // TODO: + /*for (key, provider) in self.add_provider.drain() { parameters.topology().add_provider(key, provider); - } + }*/ self.add_provider.shrink_to_fit(); // Handle `refresh_add_providers`. @@ -402,9 +404,9 @@ where // Start queries that are waiting to start. for (query_id, query_target, query_purpose) in self.queries_to_starts.drain() { - let known_closest_peers = parameters - .topology() - .closest_peers(query_target.as_hash(), self.num_results); + let known_closest_peers = self.kbuckets + .find_closest(query_target.as_hash()) + .take(self.num_results); self.active_queries.insert( query_id, ( @@ -526,6 +528,16 @@ where /// Output event of the `Kademlia` behaviour. #[derive(Debug, Clone)] pub enum KademliaOut { + /// We have discovered a node. + Discovered { + /// Id of the node that was discovered. + peer_id: PeerId, + /// Addresses of the node. + addresses: Vec, + /// How the reporter is connected to the reported. + ty: KadConnectionType, + }, + /// Result of a `FIND_NODE` iterative query. FindNodeResult { /// The key that we looked for in the query. @@ -581,10 +593,8 @@ fn gen_random_id(my_id: &PeerId, bucket_num: usize) -> Result { /// Builds a `KadPeer` struct corresponding to the given `PeerId`. /// /// > **Note**: This is just a convenience function that doesn't do anything note-worthy. -fn build_kad_peer(peer_id: PeerId, topology: &mut TTopology, connected_peers: &FnvHashSet) -> KadPeer -where TTopology: Topology -{ - let multiaddrs = topology.addresses_of_peer(&peer_id); +fn build_kad_peer(peer_id: PeerId, connected_peers: &FnvHashSet) -> KadPeer { + let multiaddrs = Vec::new();// TODO: topology.addresses_of_peer(&peer_id); // TODO: implement the other possibilities correctly let connection_ty = if connected_peers.contains(&peer_id) { diff --git a/protocols/kad/src/kbucket.rs b/protocols/kad/src/kbucket.rs index dc1818faef0..5f6b850456e 100644 --- a/protocols/kad/src/kbucket.rs +++ b/protocols/kad/src/kbucket.rs @@ -29,6 +29,7 @@ use arrayvec::ArrayVec; use bigint::U512; +use libp2p_core::PeerId; use multihash::Multihash; use std::mem; use std::slice::IterMut as SliceIterMut; @@ -90,9 +91,9 @@ impl KBucket { } /// Trait that must be implemented on types that can be used as an identifier in a k-bucket. -pub trait KBucketsPeerId: Eq + Clone { +pub trait KBucketsPeerId: PartialEq + Clone { /// Computes the XOR of this value and another one. The lower the closer. - fn distance_with(&self, other: &Self) -> u32; + fn distance_with(&self, other: &TOther) -> u32; /// Returns then number of bits that are necessary to store the distance between peer IDs. /// Used for pre-allocations. @@ -101,6 +102,30 @@ pub trait KBucketsPeerId: Eq + Clone { fn max_distance() -> usize; } +impl KBucketsPeerId for PeerId { + #[inline] + fn distance_with(&self, other: &Self) -> u32 { + Multihash::distance_with(self.as_ref(), other.as_ref()) + } + + #[inline] + fn max_distance() -> usize { + ::max_distance() + } +} + +impl KBucketsPeerId for PeerId { + #[inline] + fn distance_with(&self, other: &Multihash) -> u32 { + Multihash::distance_with(self.as_ref(), other) + } + + #[inline] + fn max_distance() -> usize { + ::max_distance() + } +} + impl KBucketsPeerId for Multihash { #[inline] fn distance_with(&self, other: &Self) -> u32 { @@ -161,9 +186,9 @@ where } /// Finds the `num` nodes closest to `id`, ordered by distance. - pub fn find_closest(&mut self, id: &Id) -> VecIntoIter + pub fn find_closest(&mut self, id: &TOther) -> VecIntoIter where - Id: Clone, + Id: Clone + KBucketsPeerId, { // TODO: optimize let mut out = Vec::new(); @@ -181,15 +206,15 @@ where } /// Same as `find_closest`, but includes the local peer as well. - pub fn find_closest_with_self(&mut self, id: &Id) -> VecIntoIter + pub fn find_closest_with_self(&mut self, id: &TOther) -> VecIntoIter where - Id: Clone, + Id: Clone + KBucketsPeerId, { // TODO: optimize - let mut intermediate: Vec<_> = self.find_closest(&id).collect(); + let mut intermediate: Vec<_> = self.find_closest(id).collect(); if let Some(pos) = intermediate .iter() - .position(|e| e.distance_with(&id) >= self.my_id.distance_with(&id)) + .position(|e| e.distance_with(id) >= self.my_id.distance_with(id)) { if intermediate[pos] != self.my_id { intermediate.insert(pos, self.my_id.clone()); diff --git a/protocols/kad/src/lib.rs b/protocols/kad/src/lib.rs index 163a929606c..2590d04c67d 100644 --- a/protocols/kad/src/lib.rs +++ b/protocols/kad/src/lib.rs @@ -85,7 +85,6 @@ extern crate tokio; pub use self::behaviour::{Kademlia, KademliaOut}; pub use self::kbucket::KBucketsPeerId; pub use self::protocol::KadConnectionType; -pub use self::topology::KademliaTopology; pub mod handler; pub mod kbucket; @@ -94,4 +93,3 @@ pub mod protocol; mod behaviour; mod protobuf_structs; mod query; -mod topology; diff --git a/protocols/kad/src/topology.rs b/protocols/kad/src/topology.rs deleted file mode 100644 index 90fe47acd92..00000000000 --- a/protocols/kad/src/topology.rs +++ /dev/null @@ -1,87 +0,0 @@ -// Copyright 2018 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -use crate::kbucket::KBucketsPeerId; -use crate::protocol::KadConnectionType; -use libp2p_core::{Multiaddr, PeerId, topology::MemoryTopology, topology::Topology}; -use multihash::Multihash; -use std::vec; - -/// Trait allowing retreival of information necessary for the Kadmelia system to work. -pub trait KademliaTopology: Topology { - /// Iterator returned by `closest_peers`. - type ClosestPeersIter: Iterator; - - /// Iterator returned by `get_providers`. - type GetProvidersIter: Iterator; - - /// Adds an address discovered through Kademlia to the topology. - /// - /// > **Note**: Keep in mind that `peer` can be the local peer. - fn add_kad_discovered_address(&mut self, peer: PeerId, addr: Multiaddr, - connection_ty: KadConnectionType); - - /// Returns the known peers closest by XOR distance to the `target`. - /// - /// The `max` parameter is the maximum number of results that we are going to use. If more - /// than `max` elements are returned, they will be ignored. - /// - /// > **Note**: The results should include the local node. - fn closest_peers(&mut self, target: &Multihash, max: usize) -> Self::ClosestPeersIter; - - /// Registers the given peer as provider of the resource with the given ID. - /// - /// > **Note**: There is no `remove_provider` method. Implementations must include a - /// > time-to-live system so that entries disappear after a while. - // TODO: specify the TTL? it has to match the timeout in the behaviour somehow, but this could - // also be handled by the user - fn add_provider(&mut self, key: Multihash, peer_id: PeerId); - - /// Returns the list of providers that have been registered with `add_provider`. - /// - /// If the local node is a provider for `key`, our local peer ID should also be returned. - fn get_providers(&mut self, key: &Multihash) -> Self::GetProvidersIter; -} - -// TODO: stupid idea to implement on `MemoryTopology` -impl KademliaTopology for MemoryTopology { - type ClosestPeersIter = vec::IntoIter; - type GetProvidersIter = vec::IntoIter; - - fn add_kad_discovered_address(&mut self, peer: PeerId, addr: Multiaddr, _: KadConnectionType) { - if &peer != self.local_peer_id() { - self.add_address(peer, addr) - } - } - - fn closest_peers(&mut self, target: &Multihash, _: usize) -> Self::ClosestPeersIter { - let mut list = self.peers().cloned().collect::>(); - list.sort_by(|a, b| target.distance_with(b.as_ref()).cmp(&target.distance_with(a.as_ref()))); - list.into_iter() - } - - fn add_provider(&mut self, _: Multihash, _: PeerId) { - unimplemented!() - } - - fn get_providers(&mut self, _: &Multihash) -> Self::GetProvidersIter { - unimplemented!() - } -} diff --git a/protocols/ping/src/lib.rs b/protocols/ping/src/lib.rs index b0e0f2f6ef2..f23ab0c2c22 100644 --- a/protocols/ping/src/lib.rs +++ b/protocols/ping/src/lib.rs @@ -38,7 +38,7 @@ pub mod protocol; use futures::prelude::*; use libp2p_core::either::EitherOutput; use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters}; -use libp2p_core::{protocols_handler::ProtocolsHandler, protocols_handler::ProtocolsHandlerSelect, PeerId}; +use libp2p_core::{protocols_handler::ProtocolsHandler, protocols_handler::ProtocolsHandlerSelect, Multiaddr, PeerId}; use std::{marker::PhantomData, time::Duration}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -81,7 +81,7 @@ impl Default for Ping { } } -impl NetworkBehaviour for Ping +impl NetworkBehaviour for Ping where TSubstream: AsyncRead + AsyncWrite, { @@ -93,6 +93,10 @@ where .select(dial_handler::PeriodicPingHandler::new()) } + fn addresses_of_peer(&self, peer_id: &PeerId) -> Vec { + Vec::new() + } + fn inject_connected(&mut self, _: PeerId, _: ConnectedPoint) {} fn inject_disconnected(&mut self, _: &PeerId, _: ConnectedPoint) {} @@ -112,7 +116,7 @@ where fn poll( &mut self, - _: &mut PollParameters, + _: &mut PollParameters, ) -> Async< NetworkBehaviourAction< ::InEvent, From f12e5742c6e4a074ba54c5f1e923a5ac52de4d91 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 22 Jan 2019 10:22:02 +0100 Subject: [PATCH 2/7] Put topologies inside of Floodsub and Kad --- examples/chat.rs | 4 +- examples/ipfs-kad.rs | 5 +- misc/mdns/Cargo.toml | 1 + misc/mdns/src/behaviour.rs | 146 ++++++++++++++----- protocols/floodsub/src/layer.rs | 22 ++- protocols/kad/src/behaviour.rs | 160 ++++++++++++++------- protocols/kad/src/kbucket.rs | 241 +++++++++++++++++++++++++------- 7 files changed, 433 insertions(+), 146 deletions(-) diff --git a/examples/chat.rs b/examples/chat.rs index 46081fa203f..a72fba7c6b4 100644 --- a/examples/chat.rs +++ b/examples/chat.rs @@ -88,7 +88,9 @@ fn main() { fn inject_event(&mut self, event: libp2p::mdns::MdnsEvent) { match event { libp2p::mdns::MdnsEvent::Discovered(list) => { - // TODO: + for peer in list { + self.floodsub.add_node_to_partial_view(peer.peer_id.clone()); + } }, } } diff --git a/examples/ipfs-kad.rs b/examples/ipfs-kad.rs index 32a30816a56..b822e1e6e2d 100644 --- a/examples/ipfs-kad.rs +++ b/examples/ipfs-kad.rs @@ -75,10 +75,11 @@ fn main() { tokio::run(futures::future::poll_fn(move || -> Result<_, ()> { loop { match swarm.poll().expect("Error while polling swarm") { - Async::Ready(Some(event)) => { - println!("Result: {:#?}", event); + Async::Ready(Some(ev @ libp2p::kad::KademliaOut::FindNodeResult { .. })) => { + println!("Result: {:#?}", ev); return Ok(Async::Ready(())); }, + Async::Ready(Some(_)) => (), Async::Ready(None) | Async::NotReady => break, } } diff --git a/misc/mdns/Cargo.toml b/misc/mdns/Cargo.toml index c84fb4e1b62..01badf2124a 100644 --- a/misc/mdns/Cargo.toml +++ b/misc/mdns/Cargo.toml @@ -14,6 +14,7 @@ data-encoding = "2.0" dns-parser = "0.8" futures = "0.1" libp2p-core = { version = "0.2.0", path = "../../core" } +log = "0.4" multiaddr = { package = "parity-multiaddr", version = "0.1.0", path = "../multiaddr" } net2 = "0.2" rand = "0.6" diff --git a/misc/mdns/src/behaviour.rs b/misc/mdns/src/behaviour.rs index 2a524901814..428d1c1311b 100644 --- a/misc/mdns/src/behaviour.rs +++ b/misc/mdns/src/behaviour.rs @@ -20,12 +20,14 @@ use crate::service::{MdnsService, MdnsPacket}; use futures::prelude::*; +use log::warn; use libp2p_core::protocols_handler::{DummyProtocolsHandler, ProtocolsHandler}; use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters}; use libp2p_core::{Multiaddr, PeerId, multiaddr::Protocol}; use smallvec::SmallVec; use std::{cmp, fmt, io, iter, marker::PhantomData, time::Duration, time::Instant}; use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_timer::Delay; /// A `NetworkBehaviour` for mDNS. Automatically discovers peers on the local network and adds /// them to the topology. @@ -39,10 +41,10 @@ pub struct Mdns { /// can appear multiple times. discovered_nodes: SmallVec<[(PeerId, Multiaddr, Instant); 8]>, - /// If `Some`, then we automatically connect to nodes we discover and this is the list of nodes - /// to connect to. Drained in `poll()`. - /// If `None`, then we don't automatically connect. - to_connect_to: Option>, + /// Future that fires when the TTL at least one node in `discovered_nodes` expires. + /// + /// `None` if `discovered_nodes` is empty. + closest_expiration: Option, /// Marker to pin the generic. marker: PhantomData, @@ -54,7 +56,7 @@ impl Mdns { Ok(Mdns { service: MdnsService::new()?, discovered_nodes: SmallVec::new(), - to_connect_to: Some(SmallVec::new()), + closest_expiration: None, marker: PhantomData, }) } @@ -64,16 +66,71 @@ impl Mdns { #[derive(Debug)] pub enum MdnsEvent { /// Discovered nodes through mDNS. - Discovered(Vec), + Discovered(DiscoveredAddrsIter), + + /// The given combinations of `PeerId` and `Multiaddr` have expired. + /// + /// Each discovered record has a time-to-live. When this TTL expires and the address hasn't + /// been refreshed, we remove it from the list emit it as an `Expired` event. + Expired(ExpiredAddrsIter), } -/// One node discovered by mDNS. -#[derive(Debug)] -pub struct MdnsDiscovered { - /// Id of the peer that has been discovered. - pub peer_id: PeerId, - /// Addresses of the peer that has been discovered. - pub addresses: Vec, +/// Iterator that produces the list of addresses that have been discovered. +pub struct DiscoveredAddrsIter { + inner: smallvec::IntoIter<[(PeerId, Multiaddr); 4]> +} + +impl Iterator for DiscoveredAddrsIter { + type Item = (PeerId, Multiaddr); + + #[inline] + fn next(&mut self) -> Option { + self.inner.next() + } + + #[inline] + fn size_hint(&self) -> (usize, Option) { + self.inner.size_hint() + } +} + +impl ExactSizeIterator for DiscoveredAddrsIter { +} + +impl fmt::Debug for DiscoveredAddrsIter { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("DiscoveredAddrsIter") + .finish() + } +} + +/// Iterator that produces the list of addresses that have expired. +pub struct ExpiredAddrsIter { + inner: smallvec::IntoIter<[(PeerId, Multiaddr); 4]> +} + +impl Iterator for ExpiredAddrsIter { + type Item = (PeerId, Multiaddr); + + #[inline] + fn next(&mut self) -> Option { + self.inner.next() + } + + #[inline] + fn size_hint(&self) -> (usize, Option) { + self.inner.size_hint() + } +} + +impl ExactSizeIterator for ExpiredAddrsIter { +} + +impl fmt::Debug for ExpiredAddrsIter { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("ExpiredAddrsIter") + .finish() + } } impl NetworkBehaviour for Mdns @@ -118,21 +175,31 @@ where >, > { // Remove expired peers. - { - let now = Instant::now(); - self.discovered_nodes.retain(move |(_, _, expires)| *expires > now); - } + if let Some(ref mut closest_expiration) = self.closest_expiration { + match closest_expiration.poll() { + Ok(Async::Ready(())) => { + let now = Instant::now(); + let mut expired = SmallVec::<[(PeerId, Multiaddr); 4]>::new(); + while let Some(pos) = self.discovered_nodes.iter().position(|(_, _, exp)| *exp < now) { + let (peer_id, addr, _) = self.discovered_nodes.remove(pos); + expired.push((peer_id, addr)); + } + + if !expired.is_empty() { + let event = MdnsEvent::Expired(ExpiredAddrsIter { + inner: expired.into_iter(), + }); - loop { - if let Some(ref mut to_connect_to) = self.to_connect_to { - if !to_connect_to.is_empty() { - let peer_id = to_connect_to.remove(0); - return Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }); - } else { - to_connect_to.shrink_to_fit(); - } + return Async::Ready(NetworkBehaviourAction::GenerateEvent(event)); + } + }, + Ok(Async::NotReady) => (), + Err(err) => warn!("tokio timer has errored: {:?}", err), } + } + // Polling the mDNS service, and obtain the list of nodes discovered this round. + let discovered = loop { let event = match self.service.poll() { Async::Ready(ev) => ev, Async::NotReady => return Async::NotReady, @@ -155,7 +222,7 @@ where .chain(iter::once(obs_port)) .collect(); - let mut discovered = Vec::new(); + let mut discovered: SmallVec<[_; 4]> = SmallVec::new(); for peer in response.discovered_peers() { if peer.id() == params.local_peer_id() { continue; @@ -171,33 +238,36 @@ where addrs.push(addr); } - for addr in &addrs { + for addr in addrs { if let Some((_, _, cur_expires)) = self.discovered_nodes.iter_mut() - .find(|(p, a, _)| p == peer.id() && a == addr) + .find(|(p, a, _)| p == peer.id() && *a == addr) { *cur_expires = cmp::max(*cur_expires, new_expiration); } else { self.discovered_nodes.push((peer.id().clone(), addr.clone(), new_expiration)); } - } - - discovered.push(MdnsDiscovered { - peer_id: peer.id().clone(), - addresses: addrs, - }); - if let Some(ref mut to_connect_to) = self.to_connect_to { - to_connect_to.push(peer.id().clone()); + discovered.push((peer.id().clone(), addr)); } } - return Async::Ready(NetworkBehaviourAction::GenerateEvent(MdnsEvent::Discovered(discovered))); + break discovered; }, MdnsPacket::ServiceDiscovery(disc) => { disc.respond(Duration::from_secs(5 * 60)); }, } - } + }; + + // As the final step, we need to refresh `closest_expiration`. + self.closest_expiration = self.discovered_nodes.iter() + .fold(None, |exp, &(_, _, elem_exp)| { + Some(exp.map(|exp| cmp::min(exp, elem_exp)).unwrap_or(elem_exp)) + }) + .map(Delay::new); + Async::Ready(NetworkBehaviourAction::GenerateEvent(MdnsEvent::Discovered(DiscoveredAddrsIter { + inner: discovered.into_iter(), + }))) } } diff --git a/protocols/floodsub/src/layer.rs b/protocols/floodsub/src/layer.rs index 35f8800e1eb..e905f5ce6d1 100644 --- a/protocols/floodsub/src/layer.rs +++ b/protocols/floodsub/src/layer.rs @@ -21,6 +21,7 @@ use crate::protocol::{FloodsubConfig, FloodsubMessage, FloodsubRpc, FloodsubSubscription, FloodsubSubscriptionAction}; use crate::topic::{Topic, TopicHash}; use cuckoofilter::CuckooFilter; +use fnv::FnvHashSet; use futures::prelude::*; use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters}; use libp2p_core::{protocols_handler::ProtocolsHandler, protocols_handler::OneShotHandler, Multiaddr, PeerId}; @@ -39,6 +40,10 @@ pub struct Floodsub { /// Peer id of the local node. Used for the source of the messages that we publish. local_peer_id: PeerId, + /// List of peers to send messages to. + // TODO: unused + target_peers: FnvHashSet, + /// List of peers the network is connected to, and the topics that they're subscribed to. // TODO: filter out peers that don't support floodsub, so that we avoid hammering them with // opened substreams @@ -62,12 +67,25 @@ impl Floodsub { Floodsub { events: VecDeque::new(), local_peer_id, + target_peers: FnvHashSet::default(), connected_peers: HashMap::new(), subscribed_topics: SmallVec::new(), received: CuckooFilter::new(), marker: PhantomData, } } + + /// Add a node to the list of nodes to propagate messages to. + #[inline] + pub fn add_node_to_partial_view(&mut self, peer_id: PeerId) { + self.target_peers.insert(peer_id); + } + + /// Remove a node from the list of nodes to propagate messages to. + #[inline] + pub fn remove_node_from_partial_view(&mut self, peer_id: &PeerId) { + self.target_peers.remove(&peer_id); + } } impl Floodsub { @@ -182,8 +200,8 @@ where Default::default() } - fn addresses_of_peer(&self, peer_id: &PeerId) -> Vec { - Vec::new() // TODO: no + fn addresses_of_peer(&self, _: &PeerId) -> Vec { + Vec::new() } fn inject_connected(&mut self, id: PeerId, _: ConnectedPoint) { diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index 074cf7a96c4..efcba74d6e0 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use crate::handler::{KademliaHandler, KademliaHandlerEvent, KademliaHandlerIn, KademliaRequestId}; -use crate::kbucket::KBucketsTable; +use crate::kbucket::{KBucketsTable, Update}; use crate::protocol::{KadConnectionType, KadPeer}; use crate::query::{QueryConfig, QueryState, QueryStatePollOut, QueryTarget}; use fnv::{FnvHashMap, FnvHashSet}; @@ -28,7 +28,7 @@ use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourActio use libp2p_core::{protocols_handler::ProtocolsHandler, Multiaddr, PeerId}; use multihash::Multihash; use rand; -use smallvec::SmallVec; +use smallvec::{smallvec, SmallVec}; use std::{cmp::Ordering, marker::PhantomData, time::Duration, time::Instant}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_timer::Interval; @@ -36,6 +36,7 @@ use tokio_timer::Interval; /// Network behaviour that handles Kademlia. pub struct Kademlia { /// Storage for the nodes. Contains the known multiaddresses for this node. + // TODO: consider storing whether we're connected to a node in the kbuckets table kbuckets: KBucketsTable>, /// All the iterative queries we are currently performing, with their ID. The last parameter @@ -58,11 +59,15 @@ pub struct Kademlia { /// Requests received by a remote that we should fulfill as soon as possible. remote_requests: SmallVec<[(PeerId, KademliaRequestId, QueryTarget); 4]>, - /// List of multihashes that we're providing. + /// List of values and peers that are providing them. /// - /// Note that we use a `PeerId` so that we know that it uses SHA-256. The question as to how to - /// handle more hashes should eventually be resolved. - providing_keys: SmallVec<[PeerId; 8]>, + /// Our local peer ID can be in this container. + // TODO: Note that in reality the value is a SHA-256 of the actual value (https://github.com/libp2p/rust-libp2p/issues/694) + values_providers: FnvHashMap>, + + /// List of values that we are providing ourselves. Must be kept in sync with + /// `values_providers`. + providing_keys: FnvHashSet, /// Interval to send `ADD_PROVIDER` messages to everyone. refresh_add_providers: stream::Fuse, @@ -120,7 +125,13 @@ impl Kademlia { /// Adds a known address for the given `PeerId`. pub fn add_address(&mut self, peer_id: &PeerId, address: Multiaddr) { - unimplemented!() // TODO: + match self.kbuckets.update(peer_id.clone()) { + Update::Add(add) => add.add(smallvec![address]), + Update::AddPending(add) => add.add(smallvec![address]), + Update::Refreshed(refresh) => refresh.into_mut().push(address), + Update::Discarded => (), + Update::FailSelfUpdate => (), + } } /// Inner implementation of the constructors. @@ -136,7 +147,8 @@ impl Kademlia { pending_rpcs: SmallVec::with_capacity(parallelism), next_query_id: QueryId(0), remote_requests: SmallVec::new(), - providing_keys: SmallVec::new(), + values_providers: FnvHashMap::default(), + providing_keys: FnvHashSet::default(), refresh_add_providers: Interval::new_interval(Duration::from_secs(60)).fuse(), // TODO: constant parallelism, num_results: 20, @@ -161,27 +173,16 @@ impl Kademlia { behaviour } - /// Builds a `KadPeer` structure corresponding to the local node. - fn build_local_kad_peer(&self, local_addrs: impl IntoIterator) -> KadPeer { - KadPeer { - node_id: self.kbuckets.my_id().clone(), - multiaddrs: local_addrs.into_iter().collect(), - connection_ty: KadConnectionType::Connected, - } - } - /// Builds the answer to a request. fn build_result(&mut self, query: QueryTarget, request_id: KademliaRequestId, parameters: &mut PollParameters) -> KademliaHandlerIn { - let local_kad_peer = self.build_local_kad_peer(parameters.external_addresses()); - match query { QueryTarget::FindPeer(key) => { let closer_peers = self.kbuckets .find_closest_with_self(key.as_ref()) .take(self.num_results) - .map(|peer_id| build_kad_peer(peer_id, &self.connected_peers)) + .map(|peer_id| build_kad_peer(peer_id, parameters, &self.kbuckets, &self.connected_peers)) .collect(); KademliaHandlerIn::FindNodeRes { @@ -190,24 +191,18 @@ impl Kademlia { } }, QueryTarget::GetProviders(key) => { - // TODO: insert local_kad_peer somewhere? let closer_peers = self.kbuckets .find_closest_with_self(&key) .take(self.num_results) - .map(|peer_id| build_kad_peer(peer_id, &self.connected_peers)) + .map(|peer_id| build_kad_peer(peer_id, parameters, &self.kbuckets, &self.connected_peers)) .collect(); - let local_node_is_providing = self.providing_keys.iter().any(|k| k.as_ref() == &key); - - let provider_peers = Vec::new();/* TODO: topology - .get_providers(&key) - .map(|peer_id| build_kad_peer(peer_id, &self.connected_peers)) - .chain(if local_node_is_providing { - Some(local_kad_peer) - } else { - None - }.into_iter()) - .collect();*/ + let provider_peers = self.values_providers + .get(&key) + .into_iter() + .flat_map(|peers| peers) + .map(|peer_id| build_kad_peer(peer_id.clone(), parameters, &self.kbuckets, &self.connected_peers)) + .collect(); KademliaHandlerIn::GetProvidersRes { closer_peers, @@ -244,8 +239,11 @@ impl Kademlia { /// The actual meaning of *providing* the value of a key is not defined, and is specific to /// the value whose key is the hash. pub fn add_providing(&mut self, key: PeerId) { - if !self.providing_keys.iter().any(|k| k == &key) { - self.providing_keys.push(key); + self.providing_keys.insert(key.as_ref().clone()); + let providers = self.values_providers.entry(key.into()).or_insert_with(Default::default); + let my_id = self.kbuckets.my_id(); + if !providers.iter().any(|k| k == my_id) { + providers.push(my_id.clone()); } // Trigger the next refresh now. @@ -257,14 +255,22 @@ impl Kademlia { /// There doesn't exist any "remove provider" message to broadcast on the network, therefore we /// will still be registered as a provider in the DHT for as long as the timeout doesn't expire. pub fn remove_providing(&mut self, key: &Multihash) { - if let Some(position) = self.providing_keys.iter().position(|k| k.as_ref() == key) { - self.providing_keys.remove(position); + self.providing_keys.remove(key); + + let providers = match self.values_providers.get_mut(key) { + Some(p) => p, + None => return, + }; + + if let Some(position) = providers.iter().position(|k| k.as_ref() == key) { + providers.remove(position); + providers.shrink_to_fit(); } } /// Internal function that starts a query. fn start_query(&mut self, target: QueryTarget, purpose: QueryPurpose) { - let query_id = self.next_query_id.clone(); + let query_id = self.next_query_id; self.next_query_id.0 += 1; self.queries_to_starts.push((query_id, target, purpose)); } @@ -282,7 +288,10 @@ where } fn addresses_of_peer(&self, peer_id: &PeerId) -> Vec { - Vec::new() // TODO: wrong + self.kbuckets + .get(peer_id) + .map(|l| l.iter().cloned().collect::>()) + .unwrap_or_else(Vec::new) } fn inject_connected(&mut self, id: PeerId, _: ConnectedPoint) { @@ -319,6 +328,14 @@ where // It is possible that we obtain a response for a query that has finished, which is // why we may not find an entry in `self.active_queries`. for peer in closer_peers.iter() { + match self.kbuckets.update(peer.node_id.clone()) { + Update::Add(add) => add.add(peer.multiaddrs.iter().cloned().collect()), + Update::AddPending(add) => add.add(peer.multiaddrs.iter().cloned().collect()), + Update::Refreshed(refresh) => refresh.into_mut().extend(peer.multiaddrs.iter().cloned()), + Update::Discarded => (), + Update::FailSelfUpdate => (), + } + self.queued_events.push(NetworkBehaviourAction::GenerateEvent(KademliaOut::Discovered { peer_id: peer.node_id.clone(), addresses: peer.multiaddrs.clone(), @@ -339,12 +356,21 @@ where user_data, } => { for peer in closer_peers.iter().chain(provider_peers.iter()) { + match self.kbuckets.update(peer.node_id.clone()) { + Update::Add(add) => add.add(peer.multiaddrs.iter().cloned().collect()), + Update::AddPending(add) => add.add(peer.multiaddrs.iter().cloned().collect()), + Update::Refreshed(refresh) => refresh.into_mut().extend(peer.multiaddrs.iter().cloned()), + Update::Discarded => (), + Update::FailSelfUpdate => (), + } + self.queued_events.push(NetworkBehaviourAction::GenerateEvent(KademliaOut::Discovered { peer_id: peer.node_id.clone(), addresses: peer.multiaddrs.clone(), ty: peer.connection_ty, })); } + // It is possible that we obtain a response for a query that has finished, which is // why we may not find an entry in `self.active_queries`. if let Some((query, _, providers)) = self.active_queries.get_mut(&user_data) { @@ -362,6 +388,13 @@ where } } KademliaHandlerEvent::AddProvider { key, provider_peer } => { + match self.kbuckets.update(provider_peer.node_id.clone()) { + Update::Add(add) => add.add(provider_peer.multiaddrs.iter().cloned().collect()), + Update::AddPending(add) => add.add(provider_peer.multiaddrs.iter().cloned().collect()), + Update::Refreshed(refresh) => refresh.into_mut().extend(provider_peer.multiaddrs.iter().cloned()), + Update::Discarded => (), + Update::FailSelfUpdate => (), + } self.queued_events.push(NetworkBehaviourAction::GenerateEvent(KademliaOut::Discovered { peer_id: provider_peer.node_id.clone(), addresses: provider_peer.multiaddrs.clone(), @@ -383,10 +416,16 @@ where >, > { // Flush the changes to the topology that we want to make. - // TODO: - /*for (key, provider) in self.add_provider.drain() { - parameters.topology().add_provider(key, provider); - }*/ + for (key, provider) in self.add_provider.drain() { + // Don't add ourselves to the providers. + if provider == *self.kbuckets.my_id() { + continue; + } + let providers = self.values_providers.entry(key).or_insert_with(Default::default); + if !providers.iter().any(|k| k == &provider) { + providers.push(provider); + } + } self.add_provider.shrink_to_fit(); // Handle `refresh_add_providers`. @@ -394,8 +433,11 @@ where Ok(Async::NotReady) => {}, Ok(Async::Ready(Some(_))) => { for provided in self.providing_keys.clone().into_iter() { - let purpose = QueryPurpose::AddProvider(provided.as_ref().clone()); - self.start_query(QueryTarget::FindPeer(provided), purpose); + let purpose = QueryPurpose::AddProvider(provided.clone()); + // TODO: messy because of the PeerId/Multihash division + if let Ok(key_as_peer) = PeerId::from_multihash(provided) { + self.start_query(QueryTarget::FindPeer(key_as_peer), purpose); + } } }, // Ignore errors. @@ -510,7 +552,7 @@ where peer_id: closest, event: KademliaHandlerIn::AddProvider { key: key.clone(), - provider_peer: self.build_local_kad_peer(parameters.external_addresses()), + provider_peer: build_kad_peer(parameters.local_peer_id().clone(), parameters, &self.kbuckets, &self.connected_peers), }, }; @@ -591,13 +633,33 @@ fn gen_random_id(my_id: &PeerId, bucket_num: usize) -> Result { } /// Builds a `KadPeer` struct corresponding to the given `PeerId`. +/// The `PeerId` can be the same as the local one. /// /// > **Note**: This is just a convenience function that doesn't do anything note-worthy. -fn build_kad_peer(peer_id: PeerId, connected_peers: &FnvHashSet) -> KadPeer { - let multiaddrs = Vec::new();// TODO: topology.addresses_of_peer(&peer_id); +fn build_kad_peer( + peer_id: PeerId, + parameters: &mut PollParameters, + kbuckets: &KBucketsTable>, + connected_peers: &FnvHashSet +) -> KadPeer { + let is_self = peer_id == *parameters.local_peer_id(); + + let multiaddrs = if is_self { + let mut addrs = parameters + .listened_addresses() + .cloned() + .collect::>(); + addrs.extend(parameters.external_addresses()); + addrs + } else { + kbuckets + .get(&peer_id) + .map(|addrs| addrs.iter().cloned().collect()) + .unwrap_or_else(Vec::new) + }; // TODO: implement the other possibilities correctly - let connection_ty = if connected_peers.contains(&peer_id) { + let connection_ty = if is_self || connected_peers.contains(&peer_id) { KadConnectionType::Connected } else { KadConnectionType::NotConnected diff --git a/protocols/kad/src/kbucket.rs b/protocols/kad/src/kbucket.rs index 5f6b850456e..0056e38e1c8 100644 --- a/protocols/kad/src/kbucket.rs +++ b/protocols/kad/src/kbucket.rs @@ -31,7 +31,6 @@ use arrayvec::ArrayVec; use bigint::U512; use libp2p_core::PeerId; use multihash::Multihash; -use std::mem; use std::slice::IterMut as SliceIterMut; use std::time::{Duration, Instant}; use std::vec::IntoIter as VecIntoIter; @@ -185,6 +184,42 @@ where &self.my_id } + /// Returns the value associated to a node, if any is present. + /// + /// Does **not** include pending nodes. + pub fn get(&self, id: &Id) -> Option<&Val> { + let table = match self.bucket_num(&id) { + Some(n) => &self.tables[n], + None => return None, + }; + + for elem in &table.nodes { + if elem.id == *id { + return Some(&elem.value); + } + } + + None + } + + /// Returns the value associated to a node, if any is present. + /// + /// Does **not** include pending nodes. + pub fn get_mut(&mut self, id: &Id) -> Option<&mut Val> { + let table = match self.bucket_num(&id) { + Some(n) => &mut self.tables[n], + None => return None, + }; + + for elem in &mut table.nodes { + if elem.id == *id { + return Some(&mut elem.value); + } + } + + None + } + /// Finds the `num` nodes closest to `id`, ordered by distance. pub fn find_closest(&mut self, id: &TOther) -> VecIntoIter where @@ -227,72 +262,162 @@ where /// Marks the node as "most recent" in its bucket and modifies the value associated to it. /// This function should be called whenever we receive a communication from a node. - pub fn update(&mut self, id: Id, value: Val) -> UpdateOutcome { + pub fn update(&mut self, id: Id) -> Update { let table = match self.bucket_num(&id) { Some(n) => &mut self.tables[n], - None => return UpdateOutcome::FailSelfUpdate, + None => return Update::FailSelfUpdate, }; table.flush(self.ping_timeout); if let Some(pos) = table.nodes.iter().position(|n| n.id == id) { // Node is already in the bucket. - let mut existing = table.nodes.remove(pos); - let old_val = mem::replace(&mut existing.value, value); - if pos == 0 { - // If it's the first node of the bucket that we update, then we drop the node that - // was waiting for a ping. - table.nodes.truncate(MAX_NODES_PER_BUCKET - 1); - table.pending_node = None; + if pos != table.nodes.len() - 1 { + let existing = table.nodes.remove(pos); + if pos == 0 { + // If it's the oldest node of the bucket that we update, then we drop the node that + // was waiting for a ping. + table.nodes.truncate(MAX_NODES_PER_BUCKET - 1); + table.pending_node = None; + } + table.nodes.push(existing); } - table.nodes.push(existing); table.last_update = Instant::now(); - UpdateOutcome::Refreshed(old_val) + Update::Refreshed(UpdateRefresh { + value: &mut table.nodes.last_mut() + .expect("nodes is not empty since the value is in it; QED").value, + }) + } else if table.nodes.len() < MAX_NODES_PER_BUCKET { // Node not yet in the bucket, but there's plenty of space. - table.nodes.push(Node { - id: id, - value: value, - }); - table.last_update = Instant::now(); - UpdateOutcome::Added + Update::Add(UpdateAdd { + table, + id, + }) + } else { // Not enough space to put the node, but we can add it to the end as "pending". We // then need to tell the caller that we want it to ping the node at the top of the // list. if table.pending_node.is_none() { - table.pending_node = Some(( - Node { - id: id, - value: value, - }, - Instant::now(), - )); - UpdateOutcome::NeedPing(table.nodes[0].id.clone()) + let to_ping = table.nodes[0].id.clone(); + Update::AddPending(UpdateAddPending { + table, + id, + to_ping, + }) } else { - UpdateOutcome::Discarded + Update::Discarded } } } } /// Return value of the `update()` method. -#[derive(Debug, Copy, Clone, PartialEq, Eq)] +#[derive(Debug)] #[must_use] -pub enum UpdateOutcome { - /// The node has been added to the bucket. - Added, +pub enum Update<'a, Id, Val> { + /// The node can be added to the bucket. + Add(UpdateAdd<'a, Id, Val>), /// The node was already in the bucket and has been refreshed. - Refreshed(Val), - /// The node wasn't added. Instead we need to ping the node passed as parameter, and call + Refreshed(UpdateRefresh<'a, Val>), + /// The node can be added as pending. We need to ping the node passed as parameter, and call /// `update` if it responds. - NeedPing(Id), + AddPending(UpdateAddPending<'a, Id, Val>), /// The node wasn't added at all because a node was already pending. Discarded, /// Tried to update the local peer ID. This is an invalid operation. FailSelfUpdate, } +impl<'a, Id, Val> Update<'a, Id, Val> { + /// Writes the given value in the table. + #[inline] + pub fn write(self, value: Val) { + match self { + Update::Add(add) => add.add(value), + Update::Refreshed(refresh) => *refresh.into_mut() = value, + Update::AddPending(add) => add.add(value), + Update::Discarded => (), + Update::FailSelfUpdate => (), + } + } +} + +/// The node can be added to the bucket. +#[derive(Debug)] +#[must_use] +pub struct UpdateAdd<'a, Id, Val> { + /// Where to insert the value. + table: &'a mut KBucket, + /// The ID to insert. + id: Id, +} + +impl<'a, Id, Val> UpdateAdd<'a, Id, Val> { + /// Insert the node in the kbuckets with the given value. + pub fn add(self, value: Val) { + self.table.last_update = Instant::now(); + self.table.nodes.push(Node { + id: self.id, + value, + }); + } +} + +/// The node is already in the kbucket. +#[derive(Debug)] +pub struct UpdateRefresh<'a, Val> { + /// The value in the k-buckets. + value: &'a mut Val, +} + +impl<'a, Val> UpdateRefresh<'a, Val> { + /// Returns a mutable reference to the value. + #[inline] + pub fn get_mut(&mut self) -> &mut Val { + &mut self.value + } + + /// Returns a mutable reference to the value. + #[inline] + pub fn into_mut(self) -> &'a mut Val { + self.value + } +} + +/// The node can be added as pending. We need to ping the node passed as parameter, and call +/// `update` if it responds. +#[derive(Debug)] +#[must_use] +pub struct UpdateAddPending<'a, Id, Val> { + /// Where to insert the value. + table: &'a mut KBucket, + /// The ID to insert. + id: Id, + /// Id of the node to ping. + to_ping: Id, +} + +impl<'a, Id, Val> UpdateAddPending<'a, Id, Val> { + /// The node we heard from the latest, and that we should ping. + #[inline] + pub fn node_to_ping(&self) -> &Id { + &self.to_ping + } + + /// Insert the node in the kbuckets with the given value. + pub fn add(self, value: Val) { + self.table.pending_node = Some(( + Node { + id: self.id, + value, + }, + Instant::now(), + )); + } +} + /// Iterator giving access to a bucket. pub struct BucketsIter<'a, Id: 'a, Val: 'a>(SliceIterMut<'a, KBucket>, Duration); @@ -348,8 +473,7 @@ impl<'a, Id: 'a, Val: 'a> Bucket<'a, Id, Val> { mod tests { extern crate rand; use self::rand::random; - use crate::kbucket::{KBucketsPeerId, KBucketsTable}; - use crate::kbucket::{UpdateOutcome, MAX_NODES_PER_BUCKET}; + use crate::kbucket::{KBucketsPeerId, KBucketsTable, Update, MAX_NODES_PER_BUCKET}; use multihash::{Multihash, Hash}; use std::thread; use std::time::Duration; @@ -360,7 +484,7 @@ mod tests { let other_id = Multihash::random(Hash::SHA2256); let mut table = KBucketsTable::new(my_id, Duration::from_secs(5)); - let _ = table.update(other_id.clone(), ()); + let _ = table.update(other_id.clone()).write(()); let res = table.find_closest(&other_id).collect::>(); assert_eq!(res.len(), 1); @@ -371,9 +495,9 @@ mod tests { fn update_local_id_fails() { let my_id = Multihash::random(Hash::SHA2256); - let mut table = KBucketsTable::new(my_id.clone(), Duration::from_secs(5)); - match table.update(my_id, ()) { - UpdateOutcome::FailSelfUpdate => (), + let mut table = KBucketsTable::<_, ()>::new(my_id.clone(), Duration::from_secs(5)); + match table.update(my_id) { + Update::FailSelfUpdate => (), _ => panic!(), } } @@ -397,7 +521,7 @@ mod tests { thread::sleep(Duration::from_secs(2)); for &(ref id, _) in &other_ids { - let _ = table.update(id.clone(), ()); + let _ = table.update(id.clone()).write(()); } let after_update = table.buckets().map(|b| b.last_update()).collect::>(); @@ -431,7 +555,10 @@ mod tests { let mut table = KBucketsTable::new(my_id.clone(), Duration::from_secs(1)); for (num, id) in fill_ids.drain(..MAX_NODES_PER_BUCKET).enumerate() { - assert_eq!(table.update(id, ()), UpdateOutcome::Added); + match table.update(id) { + Update::Add(add) => add.add(()), + _ => panic!() + } assert_eq!(table.buckets().nth(255).unwrap().num_entries(), num + 1); } @@ -440,27 +567,33 @@ mod tests { MAX_NODES_PER_BUCKET ); assert!(!table.buckets().nth(255).unwrap().has_pending()); - assert_eq!( - table.update(fill_ids.remove(0), ()), - UpdateOutcome::NeedPing(first_node) - ); + match table.update(fill_ids.remove(0)) { + Update::AddPending(add) => { + assert_eq!(*add.node_to_ping(), first_node); + add.add(()); + }, + _ => () + } assert_eq!( table.buckets().nth(255).unwrap().num_entries(), MAX_NODES_PER_BUCKET ); assert!(table.buckets().nth(255).unwrap().has_pending()); - assert_eq!( - table.update(fill_ids.remove(0), ()), - UpdateOutcome::Discarded - ); + match table.update(fill_ids.remove(0)) { + Update::Discarded => (), + _ => () + } thread::sleep(Duration::from_secs(2)); assert!(!table.buckets().nth(255).unwrap().has_pending()); - assert_eq!( - table.update(fill_ids.remove(0), ()), - UpdateOutcome::NeedPing(second_node) - ); + match table.update(fill_ids.remove(0)) { + Update::AddPending(add) => { + assert_eq!(*add.node_to_ping(), second_node); + add.add(()); + }, + _ => panic!() + } } #[test] From 94e208620e7c38cbd08b477aab6ebe1257eb0b8a Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 24 Jan 2019 11:58:36 +0100 Subject: [PATCH 3/7] Fix core tests --- core/src/swarm.rs | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/core/src/swarm.rs b/core/src/swarm.rs index 54c4238c949..05e1105c2d9 100644 --- a/core/src/swarm.rs +++ b/core/src/swarm.rs @@ -510,14 +510,13 @@ where TBehaviour: NetworkBehaviour, #[cfg(test)] mod tests { - use crate::nodes::raw_swarm::RawSwarm; use crate::peer_id::PeerId; use crate::protocols_handler::{DummyProtocolsHandler, ProtocolsHandler}; use crate::public_key::PublicKey; use crate::tests::dummy_transport::DummyTransport; - use crate::topology::MemoryTopology; use futures::prelude::*; + use multiaddr::Multiaddr; use rand::random; use smallvec::SmallVec; use std::marker::PhantomData; @@ -533,7 +532,7 @@ mod tests { trait TSubstream: AsyncRead + AsyncWrite {} - impl NetworkBehaviour + impl NetworkBehaviour for DummyBehaviour where TSubstream: AsyncRead + AsyncWrite { @@ -544,6 +543,10 @@ mod tests { DummyProtocolsHandler::default() } + fn addresses_of_peer(&self, _: &PeerId) -> Vec { + Vec::new() + } + fn inject_connected(&mut self, _: PeerId, _: ConnectedPoint) {} fn inject_disconnected(&mut self, _: &PeerId, _: ConnectedPoint) {} @@ -551,7 +554,7 @@ mod tests { fn inject_node_event(&mut self, _: PeerId, _: ::OutEvent) {} - fn poll(&mut self, _:&mut PollParameters) -> + fn poll(&mut self, _: &mut PollParameters) -> Async::InEvent, Self::OutEvent>> { @@ -571,10 +574,9 @@ mod tests { fn test_build_swarm() { let id = get_random_id(); let transport = DummyTransport::new(); - let topology = MemoryTopology::empty(id); let behaviour = DummyBehaviour{marker: PhantomData}; let swarm = SwarmBuilder::new(transport, behaviour, - topology).incoming_limit(Some(4)).build(); + id.into_peer_id()).incoming_limit(Some(4)).build(); assert_eq!(swarm.raw_swarm.incoming_limit(), Some(4)); } @@ -582,9 +584,8 @@ mod tests { fn test_build_swarm_with_max_listeners_none() { let id = get_random_id(); let transport = DummyTransport::new(); - let topology = MemoryTopology::empty(id); let behaviour = DummyBehaviour{marker: PhantomData}; - let swarm = SwarmBuilder::new(transport, behaviour, topology) + let swarm = SwarmBuilder::new(transport, behaviour, id.into_peer_id()) .build(); assert!(swarm.raw_swarm.incoming_limit().is_none()) From 6372fb5c74c286ec6020c6118c478c03fa82a0ac Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 24 Jan 2019 12:03:08 +0100 Subject: [PATCH 4/7] Fix chat example --- examples/chat.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/examples/chat.rs b/examples/chat.rs index a72fba7c6b4..407888eabe2 100644 --- a/examples/chat.rs +++ b/examples/chat.rs @@ -88,10 +88,16 @@ fn main() { fn inject_event(&mut self, event: libp2p::mdns::MdnsEvent) { match event { libp2p::mdns::MdnsEvent::Discovered(list) => { - for peer in list { - self.floodsub.add_node_to_partial_view(peer.peer_id.clone()); + for (peer, _) in list { + self.floodsub.add_node_to_partial_view(peer); } }, + libp2p::mdns::MdnsEvent::Expired(list) => { + for (peer, _) in list { + // TODO: wrong if multiple addresses per peer + self.floodsub.remove_node_from_partial_view(&peer); + } + } } } } From 5272d21666b7fa10eb674aaecea365f4151e6497 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 24 Jan 2019 17:10:29 +0100 Subject: [PATCH 5/7] More work --- examples/chat.rs | 5 +- misc/mdns/src/behaviour.rs | 5 + protocols/floodsub/src/layer.rs | 56 ++++- protocols/kad/src/behaviour.rs | 44 ++-- protocols/kad/src/kbucket.rs | 427 ++++++++++++++++---------------- 5 files changed, 280 insertions(+), 257 deletions(-) diff --git a/examples/chat.rs b/examples/chat.rs index 407888eabe2..a3e173d3982 100644 --- a/examples/chat.rs +++ b/examples/chat.rs @@ -94,8 +94,9 @@ fn main() { }, libp2p::mdns::MdnsEvent::Expired(list) => { for (peer, _) in list { - // TODO: wrong if multiple addresses per peer - self.floodsub.remove_node_from_partial_view(&peer); + if !self.mdns.has_node(&peer) { + self.floodsub.remove_node_from_partial_view(&peer); + } } } } diff --git a/misc/mdns/src/behaviour.rs b/misc/mdns/src/behaviour.rs index 428d1c1311b..9aec1f9717d 100644 --- a/misc/mdns/src/behaviour.rs +++ b/misc/mdns/src/behaviour.rs @@ -60,6 +60,11 @@ impl Mdns { marker: PhantomData, }) } + + /// Returns true if the given `PeerId` is in the list of nodes discovered through mDNS. + pub fn has_node(&self, peer_id: &PeerId) -> bool { + self.discovered_nodes.iter().any(|(p, _, _)| p == peer_id) + } } /// Event that can be produced by the `Mdns` behaviour. diff --git a/protocols/floodsub/src/layer.rs b/protocols/floodsub/src/layer.rs index e905f5ce6d1..a44a7fc2830 100644 --- a/protocols/floodsub/src/layer.rs +++ b/protocols/floodsub/src/layer.rs @@ -78,7 +78,25 @@ impl Floodsub { /// Add a node to the list of nodes to propagate messages to. #[inline] pub fn add_node_to_partial_view(&mut self, peer_id: PeerId) { - self.target_peers.insert(peer_id); + // Send our topics to this node if we're already connected to it. + if self.connected_peers.contains_key(&peer_id) { + for topic in self.subscribed_topics.iter() { + self.events.push_back(NetworkBehaviourAction::SendEvent { + peer_id: peer_id.clone(), + event: FloodsubRpc { + messages: Vec::new(), + subscriptions: vec![FloodsubSubscription { + topic: topic.hash().clone(), + action: FloodsubSubscriptionAction::Subscribe, + }], + }, + }); + } + } + + if self.target_peers.insert(peer_id.clone()) { + self.events.push_back(NetworkBehaviourAction::DialPeer { peer_id }); + } } /// Remove a node from the list of nodes to propagate messages to. @@ -206,17 +224,19 @@ where fn inject_connected(&mut self, id: PeerId, _: ConnectedPoint) { // We need to send our subscriptions to the newly-connected node. - for topic in self.subscribed_topics.iter() { - self.events.push_back(NetworkBehaviourAction::SendEvent { - peer_id: id.clone(), - event: FloodsubRpc { - messages: Vec::new(), - subscriptions: vec![FloodsubSubscription { - topic: topic.hash().clone(), - action: FloodsubSubscriptionAction::Subscribe, - }], - }, - }); + if self.target_peers.contains(&id) { + for topic in self.subscribed_topics.iter() { + self.events.push_back(NetworkBehaviourAction::SendEvent { + peer_id: id.clone(), + event: FloodsubRpc { + messages: Vec::new(), + subscriptions: vec![FloodsubSubscription { + topic: topic.hash().clone(), + action: FloodsubSubscriptionAction::Subscribe, + }], + }, + }); + } } self.connected_peers.insert(id.clone(), SmallVec::new()); @@ -225,6 +245,12 @@ where fn inject_disconnected(&mut self, id: &PeerId, _: ConnectedPoint) { let was_in = self.connected_peers.remove(id); debug_assert!(was_in.is_some()); + + // We can be disconnected by the remote in case of inactivity for example, so we always + // try to reconnect. + if self.target_peers.contains(id) { + self.events.push_back(NetworkBehaviourAction::DialPeer { peer_id: id.clone() }); + } } fn inject_node_event( @@ -319,6 +345,12 @@ where Self::OutEvent, >, > { + for peer in &self.target_peers { + if !self.connected_peers.contains_key(peer) { + + } + } + if let Some(event) = self.events.pop_front() { return Async::Ready(event); } diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index efcba74d6e0..40c1e29f639 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -28,7 +28,7 @@ use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourActio use libp2p_core::{protocols_handler::ProtocolsHandler, Multiaddr, PeerId}; use multihash::Multihash; use rand; -use smallvec::{smallvec, SmallVec}; +use smallvec::SmallVec; use std::{cmp::Ordering, marker::PhantomData, time::Duration, time::Instant}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_timer::Interval; @@ -36,7 +36,6 @@ use tokio_timer::Interval; /// Network behaviour that handles Kademlia. pub struct Kademlia { /// Storage for the nodes. Contains the known multiaddresses for this node. - // TODO: consider storing whether we're connected to a node in the kbuckets table kbuckets: KBucketsTable>, /// All the iterative queries we are currently performing, with their ID. The last parameter @@ -125,12 +124,8 @@ impl Kademlia { /// Adds a known address for the given `PeerId`. pub fn add_address(&mut self, peer_id: &PeerId, address: Multiaddr) { - match self.kbuckets.update(peer_id.clone()) { - Update::Add(add) => add.add(smallvec![address]), - Update::AddPending(add) => add.add(smallvec![address]), - Update::Refreshed(refresh) => refresh.into_mut().push(address), - Update::Discarded => (), - Update::FailSelfUpdate => (), + if let Some(list) = self.kbuckets.entry_mut(peer_id) { + list.push(address); } } @@ -303,6 +298,15 @@ where }); } + match self.kbuckets.set_connected(&id) { + Update::Pending(to_ping) => { + self.queued_events.push(NetworkBehaviourAction::DialPeer { + peer_id: to_ping.clone(), + }) + }, + _ => () + } + self.connected_peers.insert(id); } @@ -328,12 +332,8 @@ where // It is possible that we obtain a response for a query that has finished, which is // why we may not find an entry in `self.active_queries`. for peer in closer_peers.iter() { - match self.kbuckets.update(peer.node_id.clone()) { - Update::Add(add) => add.add(peer.multiaddrs.iter().cloned().collect()), - Update::AddPending(add) => add.add(peer.multiaddrs.iter().cloned().collect()), - Update::Refreshed(refresh) => refresh.into_mut().extend(peer.multiaddrs.iter().cloned()), - Update::Discarded => (), - Update::FailSelfUpdate => (), + if let Some(entry) = self.kbuckets.entry_mut(&peer.node_id) { + entry.extend(peer.multiaddrs.iter().cloned()); } self.queued_events.push(NetworkBehaviourAction::GenerateEvent(KademliaOut::Discovered { @@ -356,12 +356,8 @@ where user_data, } => { for peer in closer_peers.iter().chain(provider_peers.iter()) { - match self.kbuckets.update(peer.node_id.clone()) { - Update::Add(add) => add.add(peer.multiaddrs.iter().cloned().collect()), - Update::AddPending(add) => add.add(peer.multiaddrs.iter().cloned().collect()), - Update::Refreshed(refresh) => refresh.into_mut().extend(peer.multiaddrs.iter().cloned()), - Update::Discarded => (), - Update::FailSelfUpdate => (), + if let Some(entry) = self.kbuckets.entry_mut(&peer.node_id) { + entry.extend(peer.multiaddrs.iter().cloned()); } self.queued_events.push(NetworkBehaviourAction::GenerateEvent(KademliaOut::Discovered { @@ -388,12 +384,8 @@ where } } KademliaHandlerEvent::AddProvider { key, provider_peer } => { - match self.kbuckets.update(provider_peer.node_id.clone()) { - Update::Add(add) => add.add(provider_peer.multiaddrs.iter().cloned().collect()), - Update::AddPending(add) => add.add(provider_peer.multiaddrs.iter().cloned().collect()), - Update::Refreshed(refresh) => refresh.into_mut().extend(provider_peer.multiaddrs.iter().cloned()), - Update::Discarded => (), - Update::FailSelfUpdate => (), + if let Some(entry) = self.kbuckets.entry_mut(&provider_peer.node_id) { + entry.extend(provider_peer.multiaddrs.iter().cloned()); } self.queued_events.push(NetworkBehaviourAction::GenerateEvent(KademliaOut::Discovered { peer_id: provider_peer.node_id.clone(), diff --git a/protocols/kad/src/kbucket.rs b/protocols/kad/src/kbucket.rs index 0056e38e1c8..edae09de723 100644 --- a/protocols/kad/src/kbucket.rs +++ b/protocols/kad/src/kbucket.rs @@ -40,40 +40,46 @@ pub const MAX_NODES_PER_BUCKET: usize = 20; /// Table of k-buckets. #[derive(Debug, Clone)] -pub struct KBucketsTable { +pub struct KBucketsTable { /// Peer ID of the local node. - my_id: Id, - /// The actual tables that store peers or values. - tables: Vec>, - // The timeout when pinging the first node after which we consider it unresponsive. - ping_timeout: Duration, + my_id: TPeerId, + /// The actual tables that store peers or values. + tables: Vec>, + /// The timeout when trying to reach the first node after which we consider it unresponsive. + unresponsive_timeout: Duration, } /// An individual table that stores peers or values. #[derive(Debug, Clone)] -struct KBucket { - /// Nodes are always ordered from oldest to newest. - /// Note that we will very often move elements to the end of this. No benchmarking has been - /// performed, but it is very likely that a `ArrayVec` is the most performant data structure. - nodes: ArrayVec<[Node; MAX_NODES_PER_BUCKET]>, +struct KBucket { + /// Nodes are always ordered from oldest to newest. The nodes we are connected to are always + /// all on top of the nodes we are not connected to. + nodes: ArrayVec<[Node; MAX_NODES_PER_BUCKET]>, + + /// Index in `nodes` over which all nodes are connected. Must always be <= to the length + /// of `nodes`. + first_connected_pos: usize, /// Node received when the bucket was full. Will be added to the list if the first node doesn't - /// respond in time to our ping. The second element is the time when the pending node was added. - /// If it is too old we drop the first node and add the pending node to the - /// end of the list. - pending_node: Option<(Node, Instant)>, + /// respond in time to our reach attempt. The second element is the time when the pending node + /// was added. If it is too old we drop the first node and add the pending node to the end of + /// the list. + pending_node: Option<(Node, Instant)>, /// Last time this bucket was updated. - last_update: Instant, + latest_update: Instant, } +/// A single node in a k-bucket. #[derive(Debug, Clone)] -struct Node { - id: Id, - value: Val, +struct Node { + /// Id of the node. + id: TPeerId, + /// Value associated to it. + value: TVal, } -impl KBucket { +impl KBucket { /// Puts the kbucket into a coherent state. /// If a node is pending and the timeout has expired, removes the first element of `nodes` /// and puts the node back in `pending_node`. @@ -142,22 +148,23 @@ impl KBucketsPeerId for Multihash { } } -impl KBucketsTable +impl KBucketsTable where - Id: KBucketsPeerId, + TPeerId: KBucketsPeerId, { /// Builds a new routing table. - pub fn new(my_id: Id, ping_timeout: Duration) -> Self { + pub fn new(my_id: TPeerId, unresponsive_timeout: Duration) -> Self { KBucketsTable { - my_id: my_id, - tables: (0..Id::max_distance()) + my_id, + tables: (0..TPeerId::max_distance()) .map(|_| KBucket { nodes: ArrayVec::new(), + first_connected_pos: 0, pending_node: None, - last_update: Instant::now(), + latest_update: Instant::now(), }) .collect(), - ping_timeout: ping_timeout, + unresponsive_timeout, } } @@ -165,7 +172,7 @@ where // // Returns `None` if out of range, which happens if `id` is the same as the local peer id. #[inline] - fn bucket_num(&self, id: &Id) -> Option { + fn bucket_num(&self, id: &TPeerId) -> Option { (self.my_id.distance_with(id) as usize).checked_sub(1) } @@ -174,20 +181,20 @@ where /// Ordered by proximity to the local node. Closest bucket (with max. one node in it) comes /// first. #[inline] - pub fn buckets(&mut self) -> BucketsIter { - BucketsIter(self.tables.iter_mut(), self.ping_timeout) + pub fn buckets(&mut self) -> BucketsIter { + BucketsIter(self.tables.iter_mut(), self.unresponsive_timeout) } /// Returns the ID of the local node. #[inline] - pub fn my_id(&self) -> &Id { + pub fn my_id(&self) -> &TPeerId { &self.my_id } /// Returns the value associated to a node, if any is present. /// /// Does **not** include pending nodes. - pub fn get(&self, id: &Id) -> Option<&Val> { + pub fn get(&self, id: &TPeerId) -> Option<&TVal> { let table = match self.bucket_num(&id) { Some(n) => &self.tables[n], None => return None, @@ -205,12 +212,14 @@ where /// Returns the value associated to a node, if any is present. /// /// Does **not** include pending nodes. - pub fn get_mut(&mut self, id: &Id) -> Option<&mut Val> { + pub fn get_mut(&mut self, id: &TPeerId) -> Option<&mut TVal> { let table = match self.bucket_num(&id) { Some(n) => &mut self.tables[n], None => return None, }; + table.flush(self.unresponsive_timeout); + for elem in &mut table.nodes { if elem.id == *id { return Some(&mut elem.value); @@ -220,16 +229,141 @@ where None } + /// Returns the value associated to a node if any is present. Otherwise, tries to add the + /// node to the table in a disconnected state and return its value. Returns `None` if `id` is + /// the local peer, or if the table is full. + pub fn entry_mut(&mut self, id: &TPeerId) -> Option<&mut TVal> + where + TVal: Default, + { + if let Some((bucket, entry)) = self.entry_mut_inner(id) { + Some(&mut self.tables[bucket].nodes[entry].value) + } else { + None + } + } + + /// Apparently non-lexical lifetimes still aren't working properly in some situations, so we + /// delegate `entry_mut` to this method that returns an index within `self.tables` and the + /// node index within that table. + fn entry_mut_inner(&mut self, id: &TPeerId) -> Option<(usize, usize)> + where + TVal: Default, + { + let (bucket_num, table) = match self.bucket_num(&id) { + Some(n) => (n, &mut self.tables[n]), + None => return None, + }; + + table.flush(self.unresponsive_timeout); + + if let Some(pos) = table.nodes.iter().position(|elem| elem.id == *id) { + return Some((bucket_num, pos)); + } + + if !table.nodes.is_full() { + table.nodes.insert(table.first_connected_pos, Node { + id: id.clone(), + value: Default::default(), + }); + table.first_connected_pos += 1; + table.latest_update = Instant::now(); + return Some((bucket_num, table.first_connected_pos - 1)); + } + + None + } + + /// Reports that we are connected to the given node. + /// + /// This inserts the node in the k-buckets, if possible. If it is already in a k-bucket, puts + /// it above the disconnected nodes. If it is not already in a k-bucket, then the value will + /// be built with the `Default` trait. + pub fn set_connected(&mut self, id: &TPeerId) -> Update + where + TVal: Default, + { + let table = match self.bucket_num(&id) { + Some(n) => &mut self.tables[n], + None => return Update::FailSelfUpdate, + }; + + table.flush(self.unresponsive_timeout); + + if let Some(pos) = table.nodes.iter().position(|elem| elem.id == *id) { + // Node is already in the table; move it over `first_connected_pos` if necessary. + // We do a `saturating_sub(1)`, because if `first_connected_pos` is 0 then + // `pos < first_connected_pos` can never be true anyway. + if pos < table.first_connected_pos.saturating_sub(1) { + let elem = table.nodes.remove(pos); + table.first_connected_pos -= 1; + table.nodes.insert(table.first_connected_pos, elem); + } + table.latest_update = Instant::now(); + Update::Updated + + } else if !table.nodes.is_full() { + // Node is not in the table yet, but there's plenty of space for it. + table.nodes.insert(table.first_connected_pos, Node { + id: id.clone(), + value: Default::default(), + }); + table.latest_update = Instant::now(); + Update::Added + + } else if table.first_connected_pos > 0 && table.pending_node.is_none() { + // Node is not in the table yet, but there could be room for it if we drop the first + // element. However we first add the node to add to `pending_node` and try to reconnect + // to the oldest node. + let pending_node = Node { + id: id.clone(), + value: Default::default(), + }; + table.pending_node = Some((pending_node, Instant::now())); + Update::Pending(&table.nodes[0].id) + + } else { + debug_assert!(table.first_connected_pos == 0 || table.pending_node.is_some()); + Update::Discarded + } + } + + /// Reports that we are now disconnected from the given node. + /// + /// This does *not* remove the node from the k-buckets, but moves it underneath the nodes we + /// are still connected to. + pub fn set_disconnected(&mut self, id: &TPeerId) { + let table = match self.bucket_num(&id) { + Some(n) => &mut self.tables[n], + None => return, + }; + + table.flush(self.unresponsive_timeout); + + let pos = match table.nodes.iter().position(|elem| elem.id == *id) { + Some(pos) => pos, + None => return, + }; + + if pos > table.first_connected_pos { + let elem = table.nodes.remove(pos); + table.nodes.insert(table.first_connected_pos, elem); + table.first_connected_pos += 1; + } else if pos == table.first_connected_pos { + table.first_connected_pos += 1; + } + } + /// Finds the `num` nodes closest to `id`, ordered by distance. - pub fn find_closest(&mut self, id: &TOther) -> VecIntoIter + pub fn find_closest(&mut self, id: &TOther) -> VecIntoIter where - Id: Clone + KBucketsPeerId, + TPeerId: Clone + KBucketsPeerId, { // TODO: optimize let mut out = Vec::new(); for table in self.tables.iter_mut() { - table.flush(self.ping_timeout); - if table.last_update.elapsed() > self.ping_timeout { + table.flush(self.unresponsive_timeout); + if table.latest_update.elapsed() > self.unresponsive_timeout { continue; // ignore bucket with expired nodes } for node in table.nodes.iter() { @@ -241,9 +375,9 @@ where } /// Same as `find_closest`, but includes the local peer as well. - pub fn find_closest_with_self(&mut self, id: &TOther) -> VecIntoIter + pub fn find_closest_with_self(&mut self, id: &TOther) -> VecIntoIter where - Id: Clone + KBucketsPeerId, + TPeerId: Clone + KBucketsPeerId, { // TODO: optimize let mut intermediate: Vec<_> = self.find_closest(id).collect(); @@ -259,170 +393,29 @@ where } intermediate.into_iter() } - - /// Marks the node as "most recent" in its bucket and modifies the value associated to it. - /// This function should be called whenever we receive a communication from a node. - pub fn update(&mut self, id: Id) -> Update { - let table = match self.bucket_num(&id) { - Some(n) => &mut self.tables[n], - None => return Update::FailSelfUpdate, - }; - - table.flush(self.ping_timeout); - - if let Some(pos) = table.nodes.iter().position(|n| n.id == id) { - // Node is already in the bucket. - if pos != table.nodes.len() - 1 { - let existing = table.nodes.remove(pos); - if pos == 0 { - // If it's the oldest node of the bucket that we update, then we drop the node that - // was waiting for a ping. - table.nodes.truncate(MAX_NODES_PER_BUCKET - 1); - table.pending_node = None; - } - table.nodes.push(existing); - } - table.last_update = Instant::now(); - Update::Refreshed(UpdateRefresh { - value: &mut table.nodes.last_mut() - .expect("nodes is not empty since the value is in it; QED").value, - }) - - } else if table.nodes.len() < MAX_NODES_PER_BUCKET { - // Node not yet in the bucket, but there's plenty of space. - Update::Add(UpdateAdd { - table, - id, - }) - - } else { - // Not enough space to put the node, but we can add it to the end as "pending". We - // then need to tell the caller that we want it to ping the node at the top of the - // list. - if table.pending_node.is_none() { - let to_ping = table.nodes[0].id.clone(); - Update::AddPending(UpdateAddPending { - table, - id, - to_ping, - }) - } else { - Update::Discarded - } - } - } } -/// Return value of the `update()` method. +/// Return value of the `set_connected()` method. #[derive(Debug)] #[must_use] -pub enum Update<'a, Id, Val> { - /// The node can be added to the bucket. - Add(UpdateAdd<'a, Id, Val>), - /// The node was already in the bucket and has been refreshed. - Refreshed(UpdateRefresh<'a, Val>), - /// The node can be added as pending. We need to ping the node passed as parameter, and call - /// `update` if it responds. - AddPending(UpdateAddPending<'a, Id, Val>), +pub enum Update<'a, TPeerId> { + /// The node has been added to the bucket. + Added, + /// The node was already in the bucket and has been updated. + Updated, + /// The node has been added as pending. We need to try connect to the node passed as parameter. + Pending(&'a TPeerId), /// The node wasn't added at all because a node was already pending. Discarded, /// Tried to update the local peer ID. This is an invalid operation. FailSelfUpdate, } -impl<'a, Id, Val> Update<'a, Id, Val> { - /// Writes the given value in the table. - #[inline] - pub fn write(self, value: Val) { - match self { - Update::Add(add) => add.add(value), - Update::Refreshed(refresh) => *refresh.into_mut() = value, - Update::AddPending(add) => add.add(value), - Update::Discarded => (), - Update::FailSelfUpdate => (), - } - } -} - -/// The node can be added to the bucket. -#[derive(Debug)] -#[must_use] -pub struct UpdateAdd<'a, Id, Val> { - /// Where to insert the value. - table: &'a mut KBucket, - /// The ID to insert. - id: Id, -} - -impl<'a, Id, Val> UpdateAdd<'a, Id, Val> { - /// Insert the node in the kbuckets with the given value. - pub fn add(self, value: Val) { - self.table.last_update = Instant::now(); - self.table.nodes.push(Node { - id: self.id, - value, - }); - } -} - -/// The node is already in the kbucket. -#[derive(Debug)] -pub struct UpdateRefresh<'a, Val> { - /// The value in the k-buckets. - value: &'a mut Val, -} - -impl<'a, Val> UpdateRefresh<'a, Val> { - /// Returns a mutable reference to the value. - #[inline] - pub fn get_mut(&mut self) -> &mut Val { - &mut self.value - } - - /// Returns a mutable reference to the value. - #[inline] - pub fn into_mut(self) -> &'a mut Val { - self.value - } -} - -/// The node can be added as pending. We need to ping the node passed as parameter, and call -/// `update` if it responds. -#[derive(Debug)] -#[must_use] -pub struct UpdateAddPending<'a, Id, Val> { - /// Where to insert the value. - table: &'a mut KBucket, - /// The ID to insert. - id: Id, - /// Id of the node to ping. - to_ping: Id, -} - -impl<'a, Id, Val> UpdateAddPending<'a, Id, Val> { - /// The node we heard from the latest, and that we should ping. - #[inline] - pub fn node_to_ping(&self) -> &Id { - &self.to_ping - } - - /// Insert the node in the kbuckets with the given value. - pub fn add(self, value: Val) { - self.table.pending_node = Some(( - Node { - id: self.id, - value, - }, - Instant::now(), - )); - } -} - /// Iterator giving access to a bucket. -pub struct BucketsIter<'a, Id: 'a, Val: 'a>(SliceIterMut<'a, KBucket>, Duration); +pub struct BucketsIter<'a, TPeerId: 'a, TVal: 'a>(SliceIterMut<'a, KBucket>, Duration); -impl<'a, Id: 'a, Val: 'a> Iterator for BucketsIter<'a, Id, Val> { - type Item = Bucket<'a, Id, Val>; +impl<'a, TPeerId: 'a, TVal: 'a> Iterator for BucketsIter<'a, TPeerId, TVal> { + type Item = Bucket<'a, TPeerId, TVal>; #[inline] fn next(&mut self) -> Option { @@ -438,12 +431,12 @@ impl<'a, Id: 'a, Val: 'a> Iterator for BucketsIter<'a, Id, Val> { } } -impl<'a, Id: 'a, Val: 'a> ExactSizeIterator for BucketsIter<'a, Id, Val> {} +impl<'a, TPeerId: 'a, TVal: 'a> ExactSizeIterator for BucketsIter<'a, TPeerId, TVal> {} /// Access to a bucket. -pub struct Bucket<'a, Id: 'a, Val: 'a>(&'a mut KBucket); +pub struct Bucket<'a, TPeerId: 'a, TVal: 'a>(&'a mut KBucket); -impl<'a, Id: 'a, Val: 'a> Bucket<'a, Id, Val> { +impl<'a, TPeerId: 'a, TVal: 'a> Bucket<'a, TPeerId, TVal> { /// Returns the number of entries in that bucket. /// /// > **Note**: Keep in mind that this operation can be racy. If `update()` is called on the @@ -464,8 +457,8 @@ impl<'a, Id: 'a, Val: 'a> Bucket<'a, Id, Val> { /// /// If the bucket is empty, this returns the time when the whole table was created. #[inline] - pub fn last_update(&self) -> Instant { - self.0.last_update.clone() + pub fn latest_update(&self) -> Instant { + self.0.latest_update } } @@ -483,8 +476,8 @@ mod tests { let my_id = Multihash::random(Hash::SHA2256); let other_id = Multihash::random(Hash::SHA2256); - let mut table = KBucketsTable::new(my_id, Duration::from_secs(5)); - let _ = table.update(other_id.clone()).write(()); + let mut table = KBucketsTable::<_, ()>::new(my_id, Duration::from_secs(5)); + table.entry_mut(&other_id); let res = table.find_closest(&other_id).collect::>(); assert_eq!(res.len(), 1); @@ -496,7 +489,8 @@ mod tests { let my_id = Multihash::random(Hash::SHA2256); let mut table = KBucketsTable::<_, ()>::new(my_id.clone(), Duration::from_secs(5)); - match table.update(my_id) { + assert!(table.entry_mut(&my_id).is_none()); + match table.set_connected(&my_id) { Update::FailSelfUpdate => (), _ => panic!(), } @@ -516,15 +510,15 @@ mod tests { }) .collect::>(); - let mut table = KBucketsTable::new(my_id, Duration::from_secs(5)); - let before_update = table.buckets().map(|b| b.last_update()).collect::>(); + let mut table = KBucketsTable::<_, ()>::new(my_id, Duration::from_secs(5)); + let before_update = table.buckets().map(|b| b.latest_update()).collect::>(); thread::sleep(Duration::from_secs(2)); for &(ref id, _) in &other_ids { - let _ = table.update(id.clone()).write(()); + table.entry_mut(&id); } - let after_update = table.buckets().map(|b| b.last_update()).collect::>(); + let after_update = table.buckets().map(|b| b.latest_update()).collect::>(); for (offset, (bef, aft)) in before_update.iter().zip(after_update.iter()).enumerate() { if other_ids.iter().any(|&(_, bucket)| bucket == offset) { @@ -552,13 +546,14 @@ mod tests { let first_node = fill_ids[0].clone(); let second_node = fill_ids[1].clone(); - let mut table = KBucketsTable::new(my_id.clone(), Duration::from_secs(1)); + let mut table = KBucketsTable::<_, ()>::new(my_id.clone(), Duration::from_secs(1)); for (num, id) in fill_ids.drain(..MAX_NODES_PER_BUCKET).enumerate() { - match table.update(id) { - Update::Add(add) => add.add(()), + match table.set_connected(&id) { + Update::Added => (), _ => panic!() } + table.set_disconnected(&id); assert_eq!(table.buckets().nth(255).unwrap().num_entries(), num + 1); } @@ -567,12 +562,11 @@ mod tests { MAX_NODES_PER_BUCKET ); assert!(!table.buckets().nth(255).unwrap().has_pending()); - match table.update(fill_ids.remove(0)) { - Update::AddPending(add) => { - assert_eq!(*add.node_to_ping(), first_node); - add.add(()); + match table.set_connected(&fill_ids.remove(0)) { + Update::Pending(to_ping) => { + assert_eq!(*to_ping, first_node); }, - _ => () + _ => panic!() } assert_eq!( @@ -580,17 +574,16 @@ mod tests { MAX_NODES_PER_BUCKET ); assert!(table.buckets().nth(255).unwrap().has_pending()); - match table.update(fill_ids.remove(0)) { + match table.set_connected(&fill_ids.remove(0)) { Update::Discarded => (), - _ => () + _ => panic!() } thread::sleep(Duration::from_secs(2)); assert!(!table.buckets().nth(255).unwrap().has_pending()); - match table.update(fill_ids.remove(0)) { - Update::AddPending(add) => { - assert_eq!(*add.node_to_ping(), second_node); - add.add(()); + match table.set_connected(&fill_ids.remove(0)) { + Update::Pending(to_ping) => { + assert_eq!(*to_ping, second_node); }, _ => panic!() } From 403abb5afe39be120b290dc371045b0aa850a224 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Fri, 25 Jan 2019 11:38:56 +0100 Subject: [PATCH 6/7] Some cleanup --- protocols/floodsub/src/layer.rs | 7 ------- 1 file changed, 7 deletions(-) diff --git a/protocols/floodsub/src/layer.rs b/protocols/floodsub/src/layer.rs index a44a7fc2830..349f99a8fe8 100644 --- a/protocols/floodsub/src/layer.rs +++ b/protocols/floodsub/src/layer.rs @@ -41,7 +41,6 @@ pub struct Floodsub { local_peer_id: PeerId, /// List of peers to send messages to. - // TODO: unused target_peers: FnvHashSet, /// List of peers the network is connected to, and the topics that they're subscribed to. @@ -345,12 +344,6 @@ where Self::OutEvent, >, > { - for peer in &self.target_peers { - if !self.connected_peers.contains_key(peer) { - - } - } - if let Some(event) = self.events.pop_front() { return Async::Ready(event); } From c32649c9cd164c86591564c65ed1bd80df2de798 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Fri, 25 Jan 2019 11:41:32 +0100 Subject: [PATCH 7/7] Restore external addresses system --- core/src/swarm.rs | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/core/src/swarm.rs b/core/src/swarm.rs index 05e1105c2d9..0e0c5b45274 100644 --- a/core/src/swarm.rs +++ b/core/src/swarm.rs @@ -80,6 +80,10 @@ where TTransport: Transport, /// List of multiaddresses we're listening on. listened_addrs: SmallVec<[Multiaddr; 8]>, + + /// List of multiaddresses we're listening on, after account for external IP addresses and + /// similar mechanisms. + external_addrs: SmallVec<[Multiaddr; 8]>, } impl Deref for Swarm @@ -267,6 +271,7 @@ where TBehaviour: NetworkBehaviour, local_peer_id: &mut self.raw_swarm.local_peer_id(), supported_protocols: &self.supported_protocols, listened_addrs: &self.listened_addrs, + external_addrs: &self.external_addrs, nat_traversal: &move |a, b| transport.nat_traversal(a, b), }; self.behaviour.poll(&mut parameters) @@ -290,7 +295,9 @@ where TBehaviour: NetworkBehaviour, } }, Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }) => { - // TODO: self.topology.add_local_external_addrs(self.raw_swarm.nat_traversal(&address)); + for addr in self.raw_swarm.nat_traversal(&address) { + self.external_addrs.push(addr); + } }, } } @@ -353,6 +360,7 @@ pub struct PollParameters<'a: 'a> { local_peer_id: &'a PeerId, supported_protocols: &'a [Vec], listened_addrs: &'a [Multiaddr], + external_addrs: &'a [Multiaddr], nat_traversal: &'a dyn Fn(&Multiaddr, &Multiaddr) -> Option, } @@ -375,11 +383,10 @@ impl<'a> PollParameters<'a> { } /// Returns the list of the addresses nodes can use to reach us. + // TODO: should return references #[inline] pub fn external_addresses<'b>(&'b mut self) -> impl ExactSizeIterator + 'b { - /*let local_peer_id = self.topology.local_peer_id().clone(); - self.topology.addresses_of_peer(&self.local_peer_id).into_iter()*/ - std::iter::empty() // TODO: ? + self.external_addrs.iter().cloned() } /// Returns the peer id of the local node. @@ -504,6 +511,7 @@ where TBehaviour: NetworkBehaviour, behaviour: self.behaviour, supported_protocols, listened_addrs: SmallVec::new(), + external_addrs: SmallVec::new(), } } }