From 339ed13789352eb0abf56e18ea1e23593647e9c7 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Tue, 21 Jun 2022 23:13:28 +0200 Subject: [PATCH] feat(p2p): refresh kad buckets & rebootstrap regularly --- iroh-p2p/src/behaviour.rs | 7 ++++ iroh-p2p/src/node.rs | 77 ++++++++++++++++++++++++++++++++++++--- iroh-p2p/src/swarm.rs | 2 +- 3 files changed, 80 insertions(+), 6 deletions(-) diff --git a/iroh-p2p/src/behaviour.rs b/iroh-p2p/src/behaviour.rs index 79d52d448c..f5d583eaf7 100644 --- a/iroh-p2p/src/behaviour.rs +++ b/iroh-p2p/src/behaviour.rs @@ -187,4 +187,11 @@ impl NodeBehaviour { } } } + + pub fn kad_bootstrap(&mut self) -> Result<()> { + if let Some(kad) = self.kad.as_mut() { + kad.bootstrap()?; + } + Ok(()) + } } diff --git a/iroh-p2p/src/node.rs b/iroh-p2p/src/node.rs index 084a5f1fab..fec57eea7e 100644 --- a/iroh-p2p/src/node.rs +++ b/iroh-p2p/src/node.rs @@ -12,6 +12,8 @@ use libp2p::core::Multiaddr; pub use libp2p::gossipsub::{IdentTopic, Topic}; use libp2p::identify::{IdentifyEvent, IdentifyInfo}; use libp2p::identity::Keypair; +use libp2p::kad::kbucket::{Distance, NodeStatus}; +use libp2p::kad::BootstrapOk; use libp2p::kad::{ self, record::Key, GetProvidersError, GetProvidersOk, GetProvidersProgress, KademliaEvent, QueryProgress, QueryResult, @@ -19,6 +21,7 @@ use libp2p::kad::{ use libp2p::metrics::{Metrics, Recorder}; use libp2p::multiaddr::Protocol; use libp2p::multihash::Multihash; +use libp2p::swarm::dial_opts::{DialOpts, PeerCondition}; use libp2p::swarm::{ConnectionHandler, IntoConnectionHandler, NetworkBehaviour, SwarmEvent}; use libp2p::{PeerId, Swarm}; use prometheus_client::registry::Registry; @@ -54,6 +57,7 @@ pub struct Node { metrics: Metrics, rpc_client: RpcClient, _keychain: Keychain, + kad_last_range: Option<(Distance, Distance)>, } enum QueryChannel { @@ -67,6 +71,9 @@ enum QueryKey { const PROVIDER_LIMIT: usize = 20; +const NICE_INTERVAL: Duration = Duration::from_secs(6); +const BOOTSTRAP_INTERVAL: Duration = Duration::from_secs(5 * 60); + impl Node { pub async fn new( config: Libp2pConfig, @@ -97,15 +104,15 @@ impl Node { metrics, rpc_client, _keychain: keychain, + kad_last_range: None, }) } /// Starts the libp2p service networking stack. This Future resolves when shutdown occurs. pub async fn run(&mut self) -> anyhow::Result<()> { info!("Local Peer ID: {}", self.swarm.local_peer_id()); - let mut interval = time::interval(Duration::from_secs(15)); // TODO: configurable - - // TODO: add kad random queries if necessary + let mut nice_interval = time::interval(NICE_INTERVAL); + let mut bootstrap_interval = time::interval(BOOTSTRAP_INTERVAL); loop { select! { @@ -119,14 +126,63 @@ impl Node { warn!("rpc: {:?}", err); } } - _interval_event = interval.tick() => { + _interval_event = nice_interval.tick() => { // Print peer count on an interval. info!("Peers connected: {:?}", self.swarm.connected_peers().count()); + + self.dht_nice_tick().await; + } + _interval_event = bootstrap_interval.tick() => { + if let Err(e) = self.swarm.behaviour_mut().kad_bootstrap() { + warn!("kad bootstrap failed: {:?}", e); + } } } } } + /// Check the next node in the DHT. + async fn dht_nice_tick(&mut self) { + let mut to_dial = None; + if let Some(kad) = self.swarm.behaviour_mut().kad.as_mut() { + for kbucket in kad.kbuckets() { + if let Some(range) = self.kad_last_range { + if kbucket.range() == range { + continue; + } + } + + // find the first disconnected node + for entry in kbucket.iter() { + if entry.status == NodeStatus::Disconnected { + let peer_id = entry.node.key.preimage(); + + let dial_opts = DialOpts::peer_id(*peer_id) + .condition(PeerCondition::Disconnected) + .addresses(entry.node.value.clone().into_vec()) + .extend_addresses_through_behaviour() + .build(); + to_dial = Some((dial_opts, kbucket.range())); + break; + } + } + } + } + + if let Some((dial_opts, range)) = to_dial { + debug!( + "checking node {:?} in bucket range ({:?})", + dial_opts.get_peer_id().unwrap(), + range + ); + + if let Err(e) = self.swarm.dial(dial_opts) { + warn!("failed to dial: {:?}", e); + } + self.kad_last_range = Some(range); + } + } + async fn handle_swarm_event( &mut self, event: SwarmEvent< @@ -199,7 +255,6 @@ impl Node { Event::Kademlia(e) => { self.metrics.record(&e); if let KademliaEvent::OutboundQueryCompleted { result, .. } = e { - debug!("kad completed: {:?}", result); match result { QueryResult::GetProviders(Ok(GetProvidersOk { key, .. })) => { let _ = self.kad_queries.remove(&QueryKey::ProviderKey(key)); @@ -218,6 +273,18 @@ impl Node { } } } + QueryResult::Bootstrap(Ok(BootstrapOk { + peer, + num_remaining, + })) => { + debug!( + "kad bootstrap done {:?}, remaining: {}", + peer, num_remaining + ); + } + QueryResult::Bootstrap(Err(e)) => { + warn!("kad bootstrap error: {:?}", e); + } other => { debug!("Libp2p => Unhandled Kademlia query result: {:?}", other) } diff --git a/iroh-p2p/src/swarm.rs b/iroh-p2p/src/swarm.rs index ea4da03d6c..0882ff8080 100644 --- a/iroh-p2p/src/swarm.rs +++ b/iroh-p2p/src/swarm.rs @@ -93,7 +93,7 @@ pub(crate) async fn build_swarm( let behaviour = NodeBehaviour::new(keypair, config, registry, relay_client).await?; let limits = ConnectionLimits::default() - .with_max_pending_incoming(Some(10)) // TODO: configurable + .with_max_pending_incoming(Some(30)) // TODO: configurable .with_max_pending_outgoing(Some(30)) // TODO: configurable .with_max_established_incoming(Some(config.target_peer_count)) .with_max_established_outgoing(Some(config.target_peer_count))