Skip to content

Commit

Permalink
feat: improve provider fetching
Browse files Browse the repository at this point in the history
Integrates libp2p/rust-libp2p#2712 to fetch providers more quickly. For widely available providers this reduces the fetch time considerably.
  • Loading branch information
dignifiedquire committed Jun 16, 2022
1 parent 652b7ff commit 38ce246
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 39 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ members = [

[patch.crates-io]
# TODO: switch to crates.io once 0.45 is released
libp2p = { git = "https://github.com/libp2p/rust-libp2p", branch = "master" }
libp2p = { git = "https://github.com/dignifiedquire/rust-libp2p", branch = "feat-kad-count" }
# libp2p = { path = "../rust-libp2p" }
19 changes: 16 additions & 3 deletions iroh-p2p/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,18 @@ impl NodeBehaviour {
.into();

let kad = if config.kademlia {
let local_peer_id = local_key.public().to_peer_id();
let pub_key = local_key.public();

// TODO: persist to store
let store = MemoryStore::new(local_peer_id.to_owned());
let store = MemoryStore::new(pub_key.to_peer_id());

// TODO: make user configurable
let mut kad_config = KademliaConfig::default();
kad_config.set_parallelism(16usize.try_into().unwrap());
// TODO: potentially lower (this is per query)
kad_config.set_query_timeout(Duration::from_secs(5));

let mut kademlia = Kademlia::with_config(local_peer_id, store, kad_config);
let mut kademlia = Kademlia::with_config(pub_key.to_peer_id(), store, kad_config);
for multiaddr in &config.bootstrap_peers {
// TODO: move parsing into config
let mut addr = multiaddr.to_owned();
Expand All @@ -113,9 +115,12 @@ impl NodeBehaviour {
warn!("Could not parse bootstrap addr {}", multiaddr);
}
}

// Trigger initial bootstrap
if let Err(e) = kademlia.bootstrap() {
warn!("Kademlia bootstrap failed: {}", e);
}

Some(kademlia)
} else {
None
Expand Down Expand Up @@ -163,4 +168,12 @@ impl NodeBehaviour {
kad.add_address(peer, addr);
}
}

pub fn finish_query(&mut self, id: &libp2p::kad::QueryId) {
if let Some(kad) = self.kad.as_mut() {
if let Some(mut query) = kad.query_mut(id) {
query.finish();
}
}
}
}
19 changes: 12 additions & 7 deletions iroh-p2p/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use futures::channel::oneshot;
use libp2p::kad::record::Key;
use libp2p::Multiaddr;
use libp2p::PeerId;
use tokio::sync::mpsc;
use tonic::{transport::Server as TonicServer, Request, Response, Status};
use tracing::trace;

Expand Down Expand Up @@ -84,22 +85,26 @@ impl p2p_server::P2p for P2p {
iroh_metrics::req::set_trace_ctx(&request);
let req = request.into_inner();
trace!("received ProviderRequest: {:?}", req.key);
let (s, r) = oneshot::channel();
let (s, mut r) = mpsc::channel(1024);
let msg = RpcMessage::ProviderRequest {
key: req.key.into(),
response_channel: s,
};

self.sender
.send(msg)
.await
.map_err(|_| Status::internal("receiver dropped"))?;

let providers = r
.await
.map_err(|_| Status::internal("sender dropped"))?
.map_err(|e| Status::internal(format!("failed to retrieve provider: {:?}", e)))?;
// TODO: streaming response
let mut providers = Vec::new();
while let Some(provider) = r.recv().await {
match provider {
Ok(provider) => providers.push(provider.to_bytes()),
Err(e) => return Err(Status::internal(e)),
}
}

let providers = providers.into_iter().map(|p| p.to_bytes()).collect();
Ok(Response::new(Providers { providers }))
}

Expand Down Expand Up @@ -219,7 +224,7 @@ pub enum RpcMessage {
ProviderRequest {
// TODO: potentially change this to Cid, as that is the only key we use for providers
key: Key,
response_channel: oneshot::Sender<Result<HashSet<PeerId>, String>>,
response_channel: mpsc::Sender<Result<PeerId, String>>,
},
NetListeningAddrs(oneshot::Sender<(PeerId, Vec<Multiaddr>)>),
NetPeers(oneshot::Sender<HashMap<PeerId, Vec<Multiaddr>>>),
Expand Down
132 changes: 104 additions & 28 deletions iroh-p2p/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::num::NonZeroU8;
use std::time::Duration;

use ahash::AHashMap;
use anyhow::{anyhow, Context, Result};
use async_channel::{bounded as channel, Receiver};
use cid::Cid;
use futures::channel::oneshot::{self, Sender as OneShotSender};
use futures::channel::oneshot::Sender as OneShotSender;
use futures_util::stream::StreamExt;
use iroh_rpc_client::Client as RpcClient;
use libp2p::core::muxing::StreamMuxerBox;
Expand All @@ -16,7 +16,8 @@ pub use libp2p::gossipsub::{IdentTopic, Topic};
use libp2p::identify::{IdentifyEvent, IdentifyInfo};
use libp2p::identity::Keypair;
use libp2p::kad::{
self, record::Key, GetProvidersError, GetProvidersOk, KademliaEvent, QueryResult,
self, record::Key, GetProvidersError, GetProvidersOk, GetProvidersProgress, KademliaEvent,
QueryProgress, QueryResult,
};
use libp2p::metrics::{Metrics, Recorder};
use libp2p::multiaddr::Protocol;
Expand All @@ -28,7 +29,7 @@ use libp2p::swarm::{
use libp2p::yamux::WindowUpdateMode;
use libp2p::{core, mplex, noise, yamux, PeerId, Swarm, Transport};
use prometheus_client::registry::Registry;
use tokio::{select, time};
use tokio::{select, sync::mpsc, time};
use tracing::{debug, info, trace, warn};

use iroh_bitswap::{
Expand Down Expand Up @@ -62,14 +63,16 @@ pub struct Libp2pService {
}

enum QueryChannel {
GetProviders(Vec<oneshot::Sender<Result<HashSet<PeerId>, String>>>),
GetProviders(Vec<mpsc::Sender<Result<PeerId, String>>>),
}

#[derive(Debug, Hash, PartialEq, Eq)]
enum QueryKey {
ProviderKey(Key),
}

const PROVIDER_LIMIT: usize = 20;

impl Libp2pService {
pub async fn new(
config: Libp2pConfig,
Expand All @@ -86,7 +89,7 @@ impl Libp2pService {
.with_max_pending_outgoing(Some(30)) // TODO: configurable
.with_max_established_incoming(Some(config.target_peer_count))
.with_max_established_outgoing(Some(config.target_peer_count))
.with_max_established_per_peer(Some(5)); // TODO: configurable
.with_max_established_per_peer(Some(60)); // TODO: configurable

let node = NodeBehaviour::new(&net_keypair, &config, registry).await?;
let mut swarm = SwarmBuilder::new(transport, node, peer_id)
Expand Down Expand Up @@ -221,42 +224,56 @@ impl Libp2pService {
Event::Kademlia(e) => {
self.metrics.record(&e);
if let KademliaEvent::OutboundQueryCompleted { result, .. } = e {
debug!("kad: {:?}", result);
debug!("kad completed: {:?}", result);
match result {
QueryResult::GetProviders(Ok(GetProvidersOk {
providers, key, ..
})) => {
if let Some(QueryChannel::GetProviders(chans)) =
self.kad_queries.remove(&QueryKey::ProviderKey(key.clone()))
{
for chan in chans.into_iter() {
debug!("Sending providers for {:?}", key);
chan.send(Ok(providers.clone())).ok();
}
} else {
debug!("No listeners");
}
QueryResult::GetProviders(Ok(GetProvidersOk { key, .. })) => {
let _ = self.kad_queries.remove(&QueryKey::ProviderKey(key));
}

QueryResult::GetProviders(Err(err)) => {
let (key, providers) = match err {
GetProvidersError::Timeout { key, providers, .. } => {
(key, providers)
}
let key = match err {
GetProvidersError::Timeout { key, .. } => key,
};
debug!("GetProviders timeout {:?}", key);
if let Some(QueryChannel::GetProviders(chans)) =
self.kad_queries.remove(&QueryKey::ProviderKey(key.clone()))
self.kad_queries.remove(&QueryKey::ProviderKey(key))
{
for chan in chans.into_iter() {
debug!("Sending providers for {:?}", key);
chan.send(Ok(providers.clone())).ok();
chan.send(Err("Timeout".into())).await.ok();
}
}
}
other => {
debug!("Libp2p => Unhandled Kademlia query result: {:?}", other)
}
}
} else if let KademliaEvent::OutboundQueryProgressed {
id, result, count, ..
} = e
{
debug!("kad progressed: {:?}", result);
match result {
QueryProgress::GetProviders(GetProvidersProgress {
key, provider, ..
}) => {
if count >= PROVIDER_LIMIT {
debug!("finish provider query {}/{}", count, PROVIDER_LIMIT);
// Finish query if we have enough providers.
self.swarm.behaviour_mut().finish_query(&id);
}

if let Some(QueryChannel::GetProviders(chans)) = self
.kad_queries
.get_mut(&QueryKey::ProviderKey(key.clone()))
{
for chan in chans.iter_mut() {
chan.send(Ok(provider)).await.ok();
}
} else {
debug!("No listeners");
}
}
}
}
}
Event::Identify(e) => {
Expand Down Expand Up @@ -342,7 +359,10 @@ impl Libp2pService {
);
}
} else {
response_channel.send(Ok(Default::default())).ok();
response_channel
.send(Err("kademlia is not available".into()))
.await
.ok();
}
}
RpcMessage::NetListeningAddrs(response_channel) => {
Expand Down Expand Up @@ -434,3 +454,59 @@ pub async fn build_transport(local_key: Keypair) -> Boxed<(PeerId, StreamMuxerBo
.timeout(Duration::from_secs(20)) // TODO: configurable
.boxed()
}

#[cfg(test)]
mod tests {
use crate::metrics;

use super::*;
use anyhow::Result;
use libp2p::identity::ed25519;

#[tokio::test]
async fn test_fetch_providers() -> Result<()> {
let mut prom_registry = Registry::default();
let libp2p_metrics = Metrics::new(&mut prom_registry);
let net_keypair = {
let gen_keypair = ed25519::Keypair::generate();
Keypair::Ed25519(gen_keypair)
};

let mut network_config = Libp2pConfig::default();
network_config.metrics.debug = true;
let metrics_config = network_config.metrics.clone();

let mut p2p_service = Libp2pService::new(
network_config,
net_keypair,
&mut prom_registry,
libp2p_metrics,
)
.await?;

let metrics_handle = iroh_metrics::init_with_registry(
metrics::metrics_config_with_compile_time_info(metrics_config),
prom_registry,
)
.await
.expect("failed to initialize metrics");

let cfg = iroh_rpc_client::Config::default();
let p2p_task = tokio::task::spawn(async move {
p2p_service.run().await.unwrap();
});

{
let client = RpcClient::new(&cfg).await?;
let c = "QmbWqxBEKC3P8tqsKc98xmWNzrzDtRLMiMPL8wBuTGsMnR"
.parse()
.unwrap();
let providers = client.p2p.fetch_providers(&c).await?;
assert!(providers.len() >= PROVIDER_LIMIT);
}

p2p_task.abort();
metrics_handle.shutdown();
Ok(())
}
}

0 comments on commit 38ce246

Please sign in to comment.