Skip to content

Commit

Permalink
feat(p2p): refresh kad buckets & rebootstrap regularly
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire committed Jun 30, 2022
1 parent 8e292b5 commit 339ed13
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 6 deletions.
7 changes: 7 additions & 0 deletions iroh-p2p/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,4 +187,11 @@ impl NodeBehaviour {
}
}
}

pub fn kad_bootstrap(&mut self) -> Result<()> {
if let Some(kad) = self.kad.as_mut() {
kad.bootstrap()?;
}
Ok(())
}
}
77 changes: 72 additions & 5 deletions iroh-p2p/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@ use libp2p::core::Multiaddr;
pub use libp2p::gossipsub::{IdentTopic, Topic};
use libp2p::identify::{IdentifyEvent, IdentifyInfo};
use libp2p::identity::Keypair;
use libp2p::kad::kbucket::{Distance, NodeStatus};
use libp2p::kad::BootstrapOk;
use libp2p::kad::{
self, record::Key, GetProvidersError, GetProvidersOk, GetProvidersProgress, KademliaEvent,
QueryProgress, QueryResult,
};
use libp2p::metrics::{Metrics, Recorder};
use libp2p::multiaddr::Protocol;
use libp2p::multihash::Multihash;
use libp2p::swarm::dial_opts::{DialOpts, PeerCondition};
use libp2p::swarm::{ConnectionHandler, IntoConnectionHandler, NetworkBehaviour, SwarmEvent};
use libp2p::{PeerId, Swarm};
use prometheus_client::registry::Registry;
Expand Down Expand Up @@ -54,6 +57,7 @@ pub struct Node<KeyStorage: Storage> {
metrics: Metrics,
rpc_client: RpcClient,
_keychain: Keychain<KeyStorage>,
kad_last_range: Option<(Distance, Distance)>,
}

enum QueryChannel {
Expand All @@ -67,6 +71,9 @@ enum QueryKey {

const PROVIDER_LIMIT: usize = 20;

const NICE_INTERVAL: Duration = Duration::from_secs(6);
const BOOTSTRAP_INTERVAL: Duration = Duration::from_secs(5 * 60);

impl<KeyStorage: Storage> Node<KeyStorage> {
pub async fn new(
config: Libp2pConfig,
Expand Down Expand Up @@ -97,15 +104,15 @@ impl<KeyStorage: Storage> Node<KeyStorage> {
metrics,
rpc_client,
_keychain: keychain,
kad_last_range: None,
})
}

/// Starts the libp2p service networking stack. This Future resolves when shutdown occurs.
pub async fn run(&mut self) -> anyhow::Result<()> {
info!("Local Peer ID: {}", self.swarm.local_peer_id());
let mut interval = time::interval(Duration::from_secs(15)); // TODO: configurable

// TODO: add kad random queries if necessary
let mut nice_interval = time::interval(NICE_INTERVAL);
let mut bootstrap_interval = time::interval(BOOTSTRAP_INTERVAL);

loop {
select! {
Expand All @@ -119,14 +126,63 @@ impl<KeyStorage: Storage> Node<KeyStorage> {
warn!("rpc: {:?}", err);
}
}
_interval_event = interval.tick() => {
_interval_event = nice_interval.tick() => {
// Print peer count on an interval.
info!("Peers connected: {:?}", self.swarm.connected_peers().count());

self.dht_nice_tick().await;
}
_interval_event = bootstrap_interval.tick() => {
if let Err(e) = self.swarm.behaviour_mut().kad_bootstrap() {
warn!("kad bootstrap failed: {:?}", e);
}
}
}
}
}

/// Check the next node in the DHT.
async fn dht_nice_tick(&mut self) {
let mut to_dial = None;
if let Some(kad) = self.swarm.behaviour_mut().kad.as_mut() {
for kbucket in kad.kbuckets() {
if let Some(range) = self.kad_last_range {
if kbucket.range() == range {
continue;
}
}

// find the first disconnected node
for entry in kbucket.iter() {
if entry.status == NodeStatus::Disconnected {
let peer_id = entry.node.key.preimage();

let dial_opts = DialOpts::peer_id(*peer_id)
.condition(PeerCondition::Disconnected)
.addresses(entry.node.value.clone().into_vec())
.extend_addresses_through_behaviour()
.build();
to_dial = Some((dial_opts, kbucket.range()));
break;
}
}
}
}

if let Some((dial_opts, range)) = to_dial {
debug!(
"checking node {:?} in bucket range ({:?})",
dial_opts.get_peer_id().unwrap(),
range
);

if let Err(e) = self.swarm.dial(dial_opts) {
warn!("failed to dial: {:?}", e);
}
self.kad_last_range = Some(range);
}
}

async fn handle_swarm_event(
&mut self,
event: SwarmEvent<
Expand Down Expand Up @@ -199,7 +255,6 @@ impl<KeyStorage: Storage> Node<KeyStorage> {
Event::Kademlia(e) => {
self.metrics.record(&e);
if let KademliaEvent::OutboundQueryCompleted { result, .. } = e {
debug!("kad completed: {:?}", result);
match result {
QueryResult::GetProviders(Ok(GetProvidersOk { key, .. })) => {
let _ = self.kad_queries.remove(&QueryKey::ProviderKey(key));
Expand All @@ -218,6 +273,18 @@ impl<KeyStorage: Storage> Node<KeyStorage> {
}
}
}
QueryResult::Bootstrap(Ok(BootstrapOk {
peer,
num_remaining,
})) => {
debug!(
"kad bootstrap done {:?}, remaining: {}",
peer, num_remaining
);
}
QueryResult::Bootstrap(Err(e)) => {
warn!("kad bootstrap error: {:?}", e);
}
other => {
debug!("Libp2p => Unhandled Kademlia query result: {:?}", other)
}
Expand Down
2 changes: 1 addition & 1 deletion iroh-p2p/src/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ pub(crate) async fn build_swarm(
let behaviour = NodeBehaviour::new(keypair, config, registry, relay_client).await?;

let limits = ConnectionLimits::default()
.with_max_pending_incoming(Some(10)) // TODO: configurable
.with_max_pending_incoming(Some(30)) // TODO: configurable
.with_max_pending_outgoing(Some(30)) // TODO: configurable
.with_max_established_incoming(Some(config.target_peer_count))
.with_max_established_outgoing(Some(config.target_peer_count))
Expand Down

0 comments on commit 339ed13

Please sign in to comment.