diff --git a/Cargo.toml b/Cargo.toml index 4298ede054..51c73bd127 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,4 +18,5 @@ members = [ [patch.crates-io] # TODO: switch to crates.io once 0.45 is released -libp2p = { git = "https://github.com/libp2p/rust-libp2p", branch = "master" } +libp2p = { git = "https://github.com/dignifiedquire/rust-libp2p", branch = "feat-kad-count" } +# libp2p = { path = "../rust-libp2p" } diff --git a/iroh-p2p/src/behaviour.rs b/iroh-p2p/src/behaviour.rs index 6562fefc6b..c4a92202c0 100644 --- a/iroh-p2p/src/behaviour.rs +++ b/iroh-p2p/src/behaviour.rs @@ -93,16 +93,18 @@ impl NodeBehaviour { .into(); let kad = if config.kademlia { - let local_peer_id = local_key.public().to_peer_id(); + let pub_key = local_key.public(); + // TODO: persist to store - let store = MemoryStore::new(local_peer_id.to_owned()); + let store = MemoryStore::new(pub_key.to_peer_id()); + // TODO: make user configurable let mut kad_config = KademliaConfig::default(); kad_config.set_parallelism(16usize.try_into().unwrap()); // TODO: potentially lower (this is per query) kad_config.set_query_timeout(Duration::from_secs(5)); - let mut kademlia = Kademlia::with_config(local_peer_id, store, kad_config); + let mut kademlia = Kademlia::with_config(pub_key.to_peer_id(), store, kad_config); for multiaddr in &config.bootstrap_peers { // TODO: move parsing into config let mut addr = multiaddr.to_owned(); @@ -113,9 +115,12 @@ impl NodeBehaviour { warn!("Could not parse bootstrap addr {}", multiaddr); } } + + // Trigger initial bootstrap if let Err(e) = kademlia.bootstrap() { warn!("Kademlia bootstrap failed: {}", e); } + Some(kademlia) } else { None @@ -163,4 +168,12 @@ impl NodeBehaviour { kad.add_address(peer, addr); } } + + pub fn finish_query(&mut self, id: &libp2p::kad::QueryId) { + if let Some(kad) = self.kad.as_mut() { + if let Some(mut query) = kad.query_mut(id) { + query.finish(); + } + } + } } diff --git a/iroh-p2p/src/rpc.rs b/iroh-p2p/src/rpc.rs index ad47655805..b4432a9e4d 100644 --- a/iroh-p2p/src/rpc.rs +++ b/iroh-p2p/src/rpc.rs @@ -9,6 +9,7 @@ use futures::channel::oneshot; use libp2p::kad::record::Key; use libp2p::Multiaddr; use libp2p::PeerId; +use tokio::sync::mpsc; use tonic::{transport::Server as TonicServer, Request, Response, Status}; use tracing::trace; @@ -84,22 +85,26 @@ impl p2p_server::P2p for P2p { iroh_metrics::req::set_trace_ctx(&request); let req = request.into_inner(); trace!("received ProviderRequest: {:?}", req.key); - let (s, r) = oneshot::channel(); + let (s, mut r) = mpsc::channel(1024); let msg = RpcMessage::ProviderRequest { key: req.key.into(), response_channel: s, }; + self.sender .send(msg) .await .map_err(|_| Status::internal("receiver dropped"))?; - let providers = r - .await - .map_err(|_| Status::internal("sender dropped"))? - .map_err(|e| Status::internal(format!("failed to retrieve provider: {:?}", e)))?; + // TODO: streaming response + let mut providers = Vec::new(); + while let Some(provider) = r.recv().await { + match provider { + Ok(provider) => providers.push(provider.to_bytes()), + Err(e) => return Err(Status::internal(e)), + } + } - let providers = providers.into_iter().map(|p| p.to_bytes()).collect(); Ok(Response::new(Providers { providers })) } @@ -219,7 +224,7 @@ pub enum RpcMessage { ProviderRequest { // TODO: potentially change this to Cid, as that is the only key we use for providers key: Key, - response_channel: oneshot::Sender, String>>, + response_channel: mpsc::Sender>, }, NetListeningAddrs(oneshot::Sender<(PeerId, Vec)>), NetPeers(oneshot::Sender>>), diff --git a/iroh-p2p/src/service.rs b/iroh-p2p/src/service.rs index a782f57674..9051efedb3 100644 --- a/iroh-p2p/src/service.rs +++ b/iroh-p2p/src/service.rs @@ -1,4 +1,4 @@ -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::num::NonZeroU8; use std::time::Duration; @@ -6,7 +6,7 @@ use ahash::AHashMap; use anyhow::{anyhow, Context, Result}; use async_channel::{bounded as channel, Receiver}; use cid::Cid; -use futures::channel::oneshot::{self, Sender as OneShotSender}; +use futures::channel::oneshot::Sender as OneShotSender; use futures_util::stream::StreamExt; use iroh_rpc_client::Client as RpcClient; use libp2p::core::muxing::StreamMuxerBox; @@ -16,7 +16,8 @@ pub use libp2p::gossipsub::{IdentTopic, Topic}; use libp2p::identify::{IdentifyEvent, IdentifyInfo}; use libp2p::identity::Keypair; use libp2p::kad::{ - self, record::Key, GetProvidersError, GetProvidersOk, KademliaEvent, QueryResult, + self, record::Key, GetProvidersError, GetProvidersOk, GetProvidersProgress, KademliaEvent, + QueryProgress, QueryResult, }; use libp2p::metrics::{Metrics, Recorder}; use libp2p::multiaddr::Protocol; @@ -28,7 +29,7 @@ use libp2p::swarm::{ use libp2p::yamux::WindowUpdateMode; use libp2p::{core, mplex, noise, yamux, PeerId, Swarm, Transport}; use prometheus_client::registry::Registry; -use tokio::{select, time}; +use tokio::{select, sync::mpsc, time}; use tracing::{debug, info, trace, warn}; use iroh_bitswap::{ @@ -62,7 +63,7 @@ pub struct Libp2pService { } enum QueryChannel { - GetProviders(Vec, String>>>), + GetProviders(Vec>>), } #[derive(Debug, Hash, PartialEq, Eq)] @@ -70,6 +71,8 @@ enum QueryKey { ProviderKey(Key), } +const PROVIDER_LIMIT: usize = 20; + impl Libp2pService { pub async fn new( config: Libp2pConfig, @@ -86,7 +89,7 @@ impl Libp2pService { .with_max_pending_outgoing(Some(30)) // TODO: configurable .with_max_established_incoming(Some(config.target_peer_count)) .with_max_established_outgoing(Some(config.target_peer_count)) - .with_max_established_per_peer(Some(5)); // TODO: configurable + .with_max_established_per_peer(Some(60)); // TODO: configurable let node = NodeBehaviour::new(&net_keypair, &config, registry).await?; let mut swarm = SwarmBuilder::new(transport, node, peer_id) @@ -221,35 +224,22 @@ impl Libp2pService { Event::Kademlia(e) => { self.metrics.record(&e); if let KademliaEvent::OutboundQueryCompleted { result, .. } = e { - debug!("kad: {:?}", result); + debug!("kad completed: {:?}", result); match result { - QueryResult::GetProviders(Ok(GetProvidersOk { - providers, key, .. - })) => { - if let Some(QueryChannel::GetProviders(chans)) = - self.kad_queries.remove(&QueryKey::ProviderKey(key.clone())) - { - for chan in chans.into_iter() { - debug!("Sending providers for {:?}", key); - chan.send(Ok(providers.clone())).ok(); - } - } else { - debug!("No listeners"); - } + QueryResult::GetProviders(Ok(GetProvidersOk { key, .. })) => { + let _ = self.kad_queries.remove(&QueryKey::ProviderKey(key)); } + QueryResult::GetProviders(Err(err)) => { - let (key, providers) = match err { - GetProvidersError::Timeout { key, providers, .. } => { - (key, providers) - } + let key = match err { + GetProvidersError::Timeout { key, .. } => key, }; debug!("GetProviders timeout {:?}", key); if let Some(QueryChannel::GetProviders(chans)) = - self.kad_queries.remove(&QueryKey::ProviderKey(key.clone())) + self.kad_queries.remove(&QueryKey::ProviderKey(key)) { for chan in chans.into_iter() { - debug!("Sending providers for {:?}", key); - chan.send(Ok(providers.clone())).ok(); + chan.send(Err("Timeout".into())).await.ok(); } } } @@ -257,6 +247,33 @@ impl Libp2pService { debug!("Libp2p => Unhandled Kademlia query result: {:?}", other) } } + } else if let KademliaEvent::OutboundQueryProgressed { + id, result, count, .. + } = e + { + debug!("kad progressed: {:?}", result); + match result { + QueryProgress::GetProviders(GetProvidersProgress { + key, provider, .. + }) => { + if count >= PROVIDER_LIMIT { + debug!("finish provider query {}/{}", count, PROVIDER_LIMIT); + // Finish query if we have enough providers. + self.swarm.behaviour_mut().finish_query(&id); + } + + if let Some(QueryChannel::GetProviders(chans)) = self + .kad_queries + .get_mut(&QueryKey::ProviderKey(key.clone())) + { + for chan in chans.iter_mut() { + chan.send(Ok(provider)).await.ok(); + } + } else { + debug!("No listeners"); + } + } + } } } Event::Identify(e) => { @@ -342,7 +359,10 @@ impl Libp2pService { ); } } else { - response_channel.send(Ok(Default::default())).ok(); + response_channel + .send(Err("kademlia is not available".into())) + .await + .ok(); } } RpcMessage::NetListeningAddrs(response_channel) => { @@ -434,3 +454,59 @@ pub async fn build_transport(local_key: Keypair) -> Boxed<(PeerId, StreamMuxerBo .timeout(Duration::from_secs(20)) // TODO: configurable .boxed() } + +#[cfg(test)] +mod tests { + use crate::metrics; + + use super::*; + use anyhow::Result; + use libp2p::identity::ed25519; + + #[tokio::test] + async fn test_fetch_providers() -> Result<()> { + let mut prom_registry = Registry::default(); + let libp2p_metrics = Metrics::new(&mut prom_registry); + let net_keypair = { + let gen_keypair = ed25519::Keypair::generate(); + Keypair::Ed25519(gen_keypair) + }; + + let mut network_config = Libp2pConfig::default(); + network_config.metrics.debug = true; + let metrics_config = network_config.metrics.clone(); + + let mut p2p_service = Libp2pService::new( + network_config, + net_keypair, + &mut prom_registry, + libp2p_metrics, + ) + .await?; + + let metrics_handle = iroh_metrics::init_with_registry( + metrics::metrics_config_with_compile_time_info(metrics_config), + prom_registry, + ) + .await + .expect("failed to initialize metrics"); + + let cfg = iroh_rpc_client::Config::default(); + let p2p_task = tokio::task::spawn(async move { + p2p_service.run().await.unwrap(); + }); + + { + let client = RpcClient::new(&cfg).await?; + let c = "QmbWqxBEKC3P8tqsKc98xmWNzrzDtRLMiMPL8wBuTGsMnR" + .parse() + .unwrap(); + let providers = client.p2p.fetch_providers(&c).await?; + assert!(providers.len() >= PROVIDER_LIMIT); + } + + p2p_task.abort(); + metrics_handle.shutdown(); + Ok(()) + } +}