Skip to content

Commit

Permalink
feat: implement lookup & adjust connect accordingly
Browse files Browse the repository at this point in the history
  • Loading branch information
ramfox committed Oct 20, 2022
1 parent d06c463 commit 43a4639
Show file tree
Hide file tree
Showing 11 changed files with 433 additions and 75 deletions.
4 changes: 2 additions & 2 deletions iroh-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ pub use crate::api_ext::ApiExt;
#[cfg(feature = "testing")]
pub use crate::p2p::MockP2p;
pub use crate::p2p::P2p as P2pApi;
pub use crate::p2p::{Lookup, PeerIdOrAddr};
pub use crate::p2p::PeerIdOrAddr;
pub use bytes::Bytes;
pub use cid::Cid;
pub use iroh_resolver::resolver::Path as IpfsPath;
pub use iroh_rpc_client::{ServiceStatus, StatusRow, StatusTable};
pub use iroh_rpc_client::{Lookup, ServiceStatus, StatusRow, StatusTable};
pub use libp2p::gossipsub::MessageId;
pub use libp2p::{Multiaddr, PeerId};
40 changes: 24 additions & 16 deletions iroh-api/src/p2p.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::Result;
use async_trait::async_trait;
use iroh_rpc_client::P2pClient;
use iroh_rpc_client::{Lookup, P2pClient};
use libp2p::{multiaddr::Protocol, Multiaddr, PeerId};
#[cfg(feature = "testing")]
use mockall::automock;
Expand All @@ -9,12 +9,6 @@ pub struct ClientP2p {
client: P2pClient,
}

pub struct Lookup {
pub peer_id: PeerId,
pub listen_addrs: Vec<Multiaddr>,
pub local_addrs: Vec<Multiaddr>,
}

#[derive(Debug, Clone)]
pub enum PeerIdOrAddr {
PeerId(PeerId),
Expand All @@ -30,30 +24,44 @@ impl ClientP2p {
#[cfg_attr(feature = "testing", automock)]
#[async_trait]
pub trait P2p: Sync {
async fn lookup_local(&self) -> Result<Lookup>;
async fn lookup(&self, addr: &PeerIdOrAddr) -> Result<Lookup>;
async fn connect(&self, addr: &PeerIdOrAddr) -> Result<()>;
}

#[async_trait]
impl P2p for ClientP2p {
/// XXX really should be an API that intos a peer id, and then also accepts
/// an address, or two separate methods, one for peer id, one for address
async fn lookup(&self, _addr: &PeerIdOrAddr) -> Result<Lookup> {
async fn lookup_local(&self) -> Result<Lookup> {
let (_, listen_addrs) = self.client.get_listening_addrs().await?;
Ok(Lookup {
peer_id: self.client.local_peer_id().await?,
listen_addrs,
local_addrs: self.client.external_addresses().await?,
observed_addrs: self.client.external_addresses().await?,
protocol_version: String::new(),
agent_version: String::new(),
protocols: Default::default(),
})
}

// TODO(ramfox): once lookup is implemented, use lookup to get appropriate multiaddr if no
// address is given
async fn connect(&self, addr: &PeerIdOrAddr) -> Result<()> {
async fn lookup(&self, addr: &PeerIdOrAddr) -> Result<Lookup> {
match addr {
PeerIdOrAddr::PeerId(_) => {
anyhow::bail!("Connecting two nodes based soley on PeerId is not yet implemented")
PeerIdOrAddr::PeerId(peer_id) => self.client.lookup(*peer_id, None).await,
PeerIdOrAddr::Multiaddr(addr) => {
if let Some(Protocol::P2p(peer_id)) =
addr.iter().find(|p| matches!(*p, Protocol::P2p(_)))
{
let peer_id = PeerId::from_multihash(peer_id).map_err(|m| anyhow::anyhow!("Multiaddr contains invalid p2p multihash {:?}. Cannot derive a PeerId from this address.", m))?;
self.client.lookup(peer_id, Some(addr.clone())).await
} else {
anyhow::bail!("Mulitaddress must include the peer id");
}
}
}
}

async fn connect(&self, addr: &PeerIdOrAddr) -> Result<()> {
match addr {
PeerIdOrAddr::PeerId(peer_id) => self.client.connect(*peer_id, vec![]).await,
PeerIdOrAddr::Multiaddr(addr) => {
if let Some(Protocol::P2p(peer_id)) =
addr.iter().find(|p| matches!(*p, Protocol::P2p(_)))
Expand Down
199 changes: 179 additions & 20 deletions iroh-p2p/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ use iroh_rpc_types::p2p::P2pServerAddr;
use libp2p::core::Multiaddr;
use libp2p::gossipsub::{GossipsubMessage, MessageId, TopicHash};
pub use libp2p::gossipsub::{IdentTopic, Topic};
use libp2p::identify::IdentifyEvent;
use libp2p::identify::{IdentifyEvent, IdentifyInfo};
use libp2p::identity::Keypair;
use libp2p::kad::kbucket::{Distance, NodeStatus};
use libp2p::kad::BootstrapOk;
use libp2p::kad::{
self, record::Key, GetProvidersError, GetProvidersOk, KademliaEvent, QueryId, QueryResult,
};
use libp2p::kad::{BootstrapOk, GetClosestPeersError, GetClosestPeersOk};
use libp2p::metrics::Recorder;
use libp2p::multiaddr::Protocol;
use libp2p::ping::Result as PingResult;
use libp2p::swarm::dial_opts::{DialOpts, PeerCondition};
use libp2p::swarm::{ConnectionHandler, IntoConnectionHandler, NetworkBehaviour, SwarmEvent};
Expand Down Expand Up @@ -68,7 +69,8 @@ pub struct Node<KeyStorage: Storage> {
swarm: Swarm<NodeBehaviour>,
net_receiver_in: Receiver<RpcMessage>,
kad_queries: AHashMap<QueryKey, KadQueryChannel>,
dial_queries: AHashMap<PeerId, Vec<OneShotSender<bool>>>,
dial_queries: AHashMap<PeerId, Vec<OneShotSender<Result<()>>>>,
lookup_queries: AHashMap<PeerId, Vec<oneshot::Sender<Result<IdentifyInfo>>>>,
network_events: Vec<Sender<NetworkEvent>>,
#[allow(dead_code)]
rpc_client: RpcClient,
Expand All @@ -90,11 +92,16 @@ enum KadQueryChannel {
channels: Vec<Sender<Result<HashSet<PeerId>, String>>>,
limit: usize,
},
FindPeerOnDHT {
peer_id: PeerId,
channels: Vec<oneshot::Sender<Result<()>>>,
},
}

#[derive(Debug, Hash, PartialEq, Eq)]
enum QueryKey {
ProviderKey(Key),
FindPeerOnDHTKey(Vec<u8>),
}

pub(crate) const DEFAULT_PROVIDER_LIMIT: usize = 10;
Expand Down Expand Up @@ -142,6 +149,7 @@ impl<KeyStorage: Storage> Node<KeyStorage> {
net_receiver_in: network_receiver_in,
kad_queries: Default::default(),
dial_queries: Default::default(),
lookup_queries: Default::default(),
network_events: Vec::new(),
rpc_client,
_keychain: keychain,
Expand Down Expand Up @@ -364,7 +372,7 @@ impl<KeyStorage: Storage> Node<KeyStorage> {
} => {
if let Some(channels) = self.dial_queries.get_mut(&peer_id) {
while let Some(channel) = channels.pop() {
channel.send(true).ok();
channel.send(Ok(())).ok();
}
}

Expand Down Expand Up @@ -392,7 +400,9 @@ impl<KeyStorage: Storage> Node<KeyStorage> {
if let Some(peer_id) = peer_id {
if let Some(channels) = self.dial_queries.get_mut(&peer_id) {
while let Some(channel) = channels.pop() {
channel.send(false).ok();
channel
.send(Err(anyhow!("Error dialing peer {:?}: {}", peer_id, error)))
.ok();
}
}
}
Expand Down Expand Up @@ -459,6 +469,7 @@ impl<KeyStorage: Storage> Node<KeyStorage> {
}
Event::Kademlia(e) => {
libp2p_metrics().record(&e);

if let KademliaEvent::OutboundQueryProgressed {
id, result, step, ..
} = e
Expand Down Expand Up @@ -543,6 +554,51 @@ impl<KeyStorage: Storage> Node<KeyStorage> {
QueryResult::Bootstrap(Err(e)) => {
warn!("kad bootstrap error: {:?}", e);
}
QueryResult::GetClosestPeers(Ok(GetClosestPeersOk { key, peers })) => {
debug!("GetClosestPeers ok {:?}", key);
if let Some(KadQueryChannel::FindPeerOnDHT {
channels, peer_id, ..
}) = self.kad_queries.remove(&QueryKey::FindPeerOnDHTKey(key))
{
if !peers.contains(&peer_id) {
tokio::task::spawn(async move {
for chan in channels.into_iter() {
chan.send(Err(anyhow!(
"Failed to find peer {:?} on the DHT",
peer_id
)))
.ok();
}
});
} else {
tokio::task::spawn(async move {
for chan in channels.into_iter() {
chan.send(Ok(())).ok();
}
});
}
}
}
QueryResult::GetClosestPeers(Err(GetClosestPeersError::Timeout {
key,
..
})) => {
debug!("GetClosestPeers Timeout: {:?}", key);
if let Some(KadQueryChannel::FindPeerOnDHT {
channels, peer_id, ..
}) = self.kad_queries.remove(&QueryKey::FindPeerOnDHTKey(key))
{
tokio::task::spawn(async move {
for chan in channels.into_iter() {
chan.send(Err(anyhow!(
"Failed to find peer {:?} on the DHT: Timeout",
peer_id
)))
.ok();
}
});
}
}
other => {
debug!("Libp2p => Unhandled Kademlia query result: {:?}", other)
}
Expand Down Expand Up @@ -579,7 +635,24 @@ impl<KeyStorage: Storage> Node<KeyStorage> {
self.swarm
.behaviour_mut()
.peer_manager
.inject_identify_info(peer_id, info);
.inject_identify_info(peer_id, info.clone());

if let Some(channels) = self.lookup_queries.remove(&peer_id) {
for chan in channels {
chan.send(Ok(info.clone())).ok();
}
}
} else if let IdentifyEvent::Error { peer_id, error } = *e {
if let Some(channels) = self.lookup_queries.remove(&peer_id) {
for chan in channels {
chan.send(Err(anyhow!(
"error upgrading connection to peer {:?}: {}",
peer_id,
error
)))
.ok();
}
}
}
}
Event::Ping(e) => {
Expand Down Expand Up @@ -682,9 +755,11 @@ impl<KeyStorage: Storage> Node<KeyStorage> {
if let Some(kad) = self.swarm.behaviour_mut().kad.as_mut() {
match self.kad_queries.entry(QueryKey::ProviderKey(key.clone())) {
std::collections::hash_map::Entry::Occupied(mut entry) => {
let KadQueryChannel::GetProviders { channels, .. } =
entry.get_mut();
channels.push(response_channel);
if let KadQueryChannel::GetProviders { channels, .. } =
entry.get_mut()
{
channels.push(response_channel);
}
}
std::collections::hash_map::Entry::Vacant(entry) => {
let query_id = kad.get_providers(key);
Expand Down Expand Up @@ -756,21 +831,61 @@ impl<KeyStorage: Storage> Node<KeyStorage> {
.send(peer_addresses)
.map_err(|_| anyhow!("Failed to get Libp2p peers"))?;
}
RpcMessage::NetConnect(response_channel, peer_id, addresses) => {
let channels = self.dial_queries.entry(peer_id).or_default();
channels.push(response_channel);
RpcMessage::NetConnect(response_channel, peer_id, addrs) => {
if self.swarm.is_connected(&peer_id) {
response_channel.send(Ok(())).ok();
} else {
let channels = self.dial_queries.entry(peer_id).or_default();
channels.push(response_channel);

// when using DialOpts::peer_id, having the `P2p` protocol as part of the
// added addresses throws an error
// we can filter out that protocol before adding the addresses to the dial opts
let addrs = addrs
.iter()
.map(|a| {
a.iter()
.filter(|p| !matches!(*p, Protocol::P2p(_)))
.collect()
})
.collect();
let dial_opts = DialOpts::peer_id(peer_id)
.addresses(addrs)
.condition(libp2p::swarm::dial_opts::PeerCondition::Always)
.build();
if let Err(e) = Swarm::dial(&mut self.swarm, dial_opts) {
warn!("invalid dial options: {:?}", e);
while let Some(channel) = channels.pop() {
channel
.send(Err(anyhow!("error dialing peer {:?}: {}", peer_id, e)))
.ok();
}
}
}
}
RpcMessage::NetConnectByPeerId(response_channel, peer_id) => {
if self.swarm.is_connected(&peer_id) {
response_channel.send(Ok(())).ok();
} else {
let channels = self.dial_queries.entry(peer_id).or_default();
channels.push(response_channel);

let dial_opts = DialOpts::peer_id(peer_id)
.addresses(addresses)
.condition(libp2p::swarm::dial_opts::PeerCondition::Always)
.build();
if let Err(e) = Swarm::dial(&mut self.swarm, dial_opts) {
warn!("invalid dial options: {:?}", e);
while let Some(channel) = channels.pop() {
channel.send(false).ok();
let dial_opts = DialOpts::peer_id(peer_id)
.condition(libp2p::swarm::dial_opts::PeerCondition::Always)
.build();
if let Err(e) = Swarm::dial(&mut self.swarm, dial_opts) {
while let Some(channel) = channels.pop() {
channel
.send(Err(anyhow!("error dialing peer {:?}: {}", peer_id, e)))
.ok();
}
}
}
}
RpcMessage::AddressesOfPeer(response_channel, peer_id) => {
let addrs = self.swarm.behaviour_mut().addresses_of_peer(&peer_id);
response_channel.send(addrs).ok();
}
RpcMessage::NetDisconnect(response_channel, _peer_id) => {
warn!("NetDisconnect API not yet implemented"); // TODO: implement NetDisconnect

Expand Down Expand Up @@ -847,6 +962,50 @@ impl<KeyStorage: Storage> Node<KeyStorage> {
}
}
}
RpcMessage::ListenForIdentify(response_channel, peer_id) => {
let channels = self.lookup_queries.entry(peer_id).or_default();
channels.push(response_channel);
}
RpcMessage::LookupPeerInfo(response_channel, peer_id) => {
if let Some(info) = self.swarm.behaviour().peer_manager.info_for_peer(&peer_id) {
response_channel.send(info.last_info.clone()).ok();
} else {
response_channel.send(None).ok();
}
}
RpcMessage::CancelListenForIdentify(response_channel, peer_id) => {
self.lookup_queries.remove(&peer_id);
response_channel.send(()).ok();
}
RpcMessage::FindPeerOnDHT(response_channel, peer_id) => {
debug!("find closest peers for: {:?}", peer_id);
if let Some(kad) = self.swarm.behaviour_mut().kad.as_mut() {
match self
.kad_queries
.entry(QueryKey::FindPeerOnDHTKey(peer_id.to_bytes()))
{
std::collections::hash_map::Entry::Occupied(mut entry) => {
if let KadQueryChannel::FindPeerOnDHT { channels, .. } = entry.get_mut()
{
channels.push(response_channel);
}
}
std::collections::hash_map::Entry::Vacant(entry) => {
kad.get_closest_peers(peer_id);
entry.insert(KadQueryChannel::FindPeerOnDHT {
peer_id,
channels: vec![response_channel],
});
}
}
} else {
tokio::task::spawn(async move {
response_channel
.send(Err(anyhow!("kademlia is not available")))
.ok();
});
}
}
RpcMessage::Shutdown => {
return Ok(true);
}
Expand Down
Loading

0 comments on commit 43a4639

Please sign in to comment.