From 43a4639757953807066cb36b2010dfcd51953863 Mon Sep 17 00:00:00 2001 From: Kasey Date: Thu, 20 Oct 2022 00:56:29 -0400 Subject: [PATCH] feat: implement lookup & adjust `connect` accordingly --- iroh-api/src/lib.rs | 4 +- iroh-api/src/p2p.rs | 40 ++++--- iroh-p2p/src/node.rs | 199 +++++++++++++++++++++++++++++---- iroh-p2p/src/rpc.rs | 111 ++++++++++++++++-- iroh-rpc-client/src/lib.rs | 2 +- iroh-rpc-client/src/network.rs | 86 ++++++++++++-- iroh-rpc-types/proto/p2p.proto | 41 +++++-- iroh-rpc-types/src/p2p.rs | 4 +- iroh/src/fixture.rs | 5 +- iroh/src/p2p.rs | 9 +- iroh/tests/cmd/lookup.trycmd | 7 +- 11 files changed, 433 insertions(+), 75 deletions(-) diff --git a/iroh-api/src/lib.rs b/iroh-api/src/lib.rs index d4d2db03bce..2acb63cf503 100644 --- a/iroh-api/src/lib.rs +++ b/iroh-api/src/lib.rs @@ -10,10 +10,10 @@ pub use crate::api_ext::ApiExt; #[cfg(feature = "testing")] pub use crate::p2p::MockP2p; pub use crate::p2p::P2p as P2pApi; -pub use crate::p2p::{Lookup, PeerIdOrAddr}; +pub use crate::p2p::PeerIdOrAddr; pub use bytes::Bytes; pub use cid::Cid; pub use iroh_resolver::resolver::Path as IpfsPath; -pub use iroh_rpc_client::{ServiceStatus, StatusRow, StatusTable}; +pub use iroh_rpc_client::{Lookup, ServiceStatus, StatusRow, StatusTable}; pub use libp2p::gossipsub::MessageId; pub use libp2p::{Multiaddr, PeerId}; diff --git a/iroh-api/src/p2p.rs b/iroh-api/src/p2p.rs index 71be20b7fac..15c0ab9b806 100644 --- a/iroh-api/src/p2p.rs +++ b/iroh-api/src/p2p.rs @@ -1,6 +1,6 @@ use anyhow::Result; use async_trait::async_trait; -use iroh_rpc_client::P2pClient; +use iroh_rpc_client::{Lookup, P2pClient}; use libp2p::{multiaddr::Protocol, Multiaddr, PeerId}; #[cfg(feature = "testing")] use mockall::automock; @@ -9,12 +9,6 @@ pub struct ClientP2p { client: P2pClient, } -pub struct Lookup { - pub peer_id: PeerId, - pub listen_addrs: Vec, - pub local_addrs: Vec, -} - #[derive(Debug, Clone)] pub enum PeerIdOrAddr { PeerId(PeerId), @@ -30,30 +24,44 @@ impl ClientP2p { #[cfg_attr(feature = "testing", automock)] #[async_trait] pub trait P2p: Sync { + async fn lookup_local(&self) -> Result; async fn lookup(&self, addr: &PeerIdOrAddr) -> Result; async fn connect(&self, addr: &PeerIdOrAddr) -> Result<()>; } #[async_trait] impl P2p for ClientP2p { - /// XXX really should be an API that intos a peer id, and then also accepts - /// an address, or two separate methods, one for peer id, one for address - async fn lookup(&self, _addr: &PeerIdOrAddr) -> Result { + async fn lookup_local(&self) -> Result { let (_, listen_addrs) = self.client.get_listening_addrs().await?; Ok(Lookup { peer_id: self.client.local_peer_id().await?, listen_addrs, - local_addrs: self.client.external_addresses().await?, + observed_addrs: self.client.external_addresses().await?, + protocol_version: String::new(), + agent_version: String::new(), + protocols: Default::default(), }) } - // TODO(ramfox): once lookup is implemented, use lookup to get appropriate multiaddr if no - // address is given - async fn connect(&self, addr: &PeerIdOrAddr) -> Result<()> { + async fn lookup(&self, addr: &PeerIdOrAddr) -> Result { match addr { - PeerIdOrAddr::PeerId(_) => { - anyhow::bail!("Connecting two nodes based soley on PeerId is not yet implemented") + PeerIdOrAddr::PeerId(peer_id) => self.client.lookup(*peer_id, None).await, + PeerIdOrAddr::Multiaddr(addr) => { + if let Some(Protocol::P2p(peer_id)) = + addr.iter().find(|p| matches!(*p, Protocol::P2p(_))) + { + let peer_id = PeerId::from_multihash(peer_id).map_err(|m| anyhow::anyhow!("Multiaddr contains invalid p2p multihash {:?}. Cannot derive a PeerId from this address.", m))?; + self.client.lookup(peer_id, Some(addr.clone())).await + } else { + anyhow::bail!("Mulitaddress must include the peer id"); + } } + } + } + + async fn connect(&self, addr: &PeerIdOrAddr) -> Result<()> { + match addr { + PeerIdOrAddr::PeerId(peer_id) => self.client.connect(*peer_id, vec![]).await, PeerIdOrAddr::Multiaddr(addr) => { if let Some(Protocol::P2p(peer_id)) = addr.iter().find(|p| matches!(*p, Protocol::P2p(_))) diff --git a/iroh-p2p/src/node.rs b/iroh-p2p/src/node.rs index 2f9f329f5aa..996b042a47c 100644 --- a/iroh-p2p/src/node.rs +++ b/iroh-p2p/src/node.rs @@ -11,14 +11,15 @@ use iroh_rpc_types::p2p::P2pServerAddr; use libp2p::core::Multiaddr; use libp2p::gossipsub::{GossipsubMessage, MessageId, TopicHash}; pub use libp2p::gossipsub::{IdentTopic, Topic}; -use libp2p::identify::IdentifyEvent; +use libp2p::identify::{IdentifyEvent, IdentifyInfo}; use libp2p::identity::Keypair; use libp2p::kad::kbucket::{Distance, NodeStatus}; -use libp2p::kad::BootstrapOk; use libp2p::kad::{ self, record::Key, GetProvidersError, GetProvidersOk, KademliaEvent, QueryId, QueryResult, }; +use libp2p::kad::{BootstrapOk, GetClosestPeersError, GetClosestPeersOk}; use libp2p::metrics::Recorder; +use libp2p::multiaddr::Protocol; use libp2p::ping::Result as PingResult; use libp2p::swarm::dial_opts::{DialOpts, PeerCondition}; use libp2p::swarm::{ConnectionHandler, IntoConnectionHandler, NetworkBehaviour, SwarmEvent}; @@ -68,7 +69,8 @@ pub struct Node { swarm: Swarm, net_receiver_in: Receiver, kad_queries: AHashMap, - dial_queries: AHashMap>>, + dial_queries: AHashMap>>>, + lookup_queries: AHashMap>>>, network_events: Vec>, #[allow(dead_code)] rpc_client: RpcClient, @@ -90,11 +92,16 @@ enum KadQueryChannel { channels: Vec, String>>>, limit: usize, }, + FindPeerOnDHT { + peer_id: PeerId, + channels: Vec>>, + }, } #[derive(Debug, Hash, PartialEq, Eq)] enum QueryKey { ProviderKey(Key), + FindPeerOnDHTKey(Vec), } pub(crate) const DEFAULT_PROVIDER_LIMIT: usize = 10; @@ -142,6 +149,7 @@ impl Node { net_receiver_in: network_receiver_in, kad_queries: Default::default(), dial_queries: Default::default(), + lookup_queries: Default::default(), network_events: Vec::new(), rpc_client, _keychain: keychain, @@ -364,7 +372,7 @@ impl Node { } => { if let Some(channels) = self.dial_queries.get_mut(&peer_id) { while let Some(channel) = channels.pop() { - channel.send(true).ok(); + channel.send(Ok(())).ok(); } } @@ -392,7 +400,9 @@ impl Node { if let Some(peer_id) = peer_id { if let Some(channels) = self.dial_queries.get_mut(&peer_id) { while let Some(channel) = channels.pop() { - channel.send(false).ok(); + channel + .send(Err(anyhow!("Error dialing peer {:?}: {}", peer_id, error))) + .ok(); } } } @@ -459,6 +469,7 @@ impl Node { } Event::Kademlia(e) => { libp2p_metrics().record(&e); + if let KademliaEvent::OutboundQueryProgressed { id, result, step, .. } = e @@ -543,6 +554,51 @@ impl Node { QueryResult::Bootstrap(Err(e)) => { warn!("kad bootstrap error: {:?}", e); } + QueryResult::GetClosestPeers(Ok(GetClosestPeersOk { key, peers })) => { + debug!("GetClosestPeers ok {:?}", key); + if let Some(KadQueryChannel::FindPeerOnDHT { + channels, peer_id, .. + }) = self.kad_queries.remove(&QueryKey::FindPeerOnDHTKey(key)) + { + if !peers.contains(&peer_id) { + tokio::task::spawn(async move { + for chan in channels.into_iter() { + chan.send(Err(anyhow!( + "Failed to find peer {:?} on the DHT", + peer_id + ))) + .ok(); + } + }); + } else { + tokio::task::spawn(async move { + for chan in channels.into_iter() { + chan.send(Ok(())).ok(); + } + }); + } + } + } + QueryResult::GetClosestPeers(Err(GetClosestPeersError::Timeout { + key, + .. + })) => { + debug!("GetClosestPeers Timeout: {:?}", key); + if let Some(KadQueryChannel::FindPeerOnDHT { + channels, peer_id, .. + }) = self.kad_queries.remove(&QueryKey::FindPeerOnDHTKey(key)) + { + tokio::task::spawn(async move { + for chan in channels.into_iter() { + chan.send(Err(anyhow!( + "Failed to find peer {:?} on the DHT: Timeout", + peer_id + ))) + .ok(); + } + }); + } + } other => { debug!("Libp2p => Unhandled Kademlia query result: {:?}", other) } @@ -579,7 +635,24 @@ impl Node { self.swarm .behaviour_mut() .peer_manager - .inject_identify_info(peer_id, info); + .inject_identify_info(peer_id, info.clone()); + + if let Some(channels) = self.lookup_queries.remove(&peer_id) { + for chan in channels { + chan.send(Ok(info.clone())).ok(); + } + } + } else if let IdentifyEvent::Error { peer_id, error } = *e { + if let Some(channels) = self.lookup_queries.remove(&peer_id) { + for chan in channels { + chan.send(Err(anyhow!( + "error upgrading connection to peer {:?}: {}", + peer_id, + error + ))) + .ok(); + } + } } } Event::Ping(e) => { @@ -682,9 +755,11 @@ impl Node { if let Some(kad) = self.swarm.behaviour_mut().kad.as_mut() { match self.kad_queries.entry(QueryKey::ProviderKey(key.clone())) { std::collections::hash_map::Entry::Occupied(mut entry) => { - let KadQueryChannel::GetProviders { channels, .. } = - entry.get_mut(); - channels.push(response_channel); + if let KadQueryChannel::GetProviders { channels, .. } = + entry.get_mut() + { + channels.push(response_channel); + } } std::collections::hash_map::Entry::Vacant(entry) => { let query_id = kad.get_providers(key); @@ -756,21 +831,61 @@ impl Node { .send(peer_addresses) .map_err(|_| anyhow!("Failed to get Libp2p peers"))?; } - RpcMessage::NetConnect(response_channel, peer_id, addresses) => { - let channels = self.dial_queries.entry(peer_id).or_default(); - channels.push(response_channel); + RpcMessage::NetConnect(response_channel, peer_id, addrs) => { + if self.swarm.is_connected(&peer_id) { + response_channel.send(Ok(())).ok(); + } else { + let channels = self.dial_queries.entry(peer_id).or_default(); + channels.push(response_channel); + + // when using DialOpts::peer_id, having the `P2p` protocol as part of the + // added addresses throws an error + // we can filter out that protocol before adding the addresses to the dial opts + let addrs = addrs + .iter() + .map(|a| { + a.iter() + .filter(|p| !matches!(*p, Protocol::P2p(_))) + .collect() + }) + .collect(); + let dial_opts = DialOpts::peer_id(peer_id) + .addresses(addrs) + .condition(libp2p::swarm::dial_opts::PeerCondition::Always) + .build(); + if let Err(e) = Swarm::dial(&mut self.swarm, dial_opts) { + warn!("invalid dial options: {:?}", e); + while let Some(channel) = channels.pop() { + channel + .send(Err(anyhow!("error dialing peer {:?}: {}", peer_id, e))) + .ok(); + } + } + } + } + RpcMessage::NetConnectByPeerId(response_channel, peer_id) => { + if self.swarm.is_connected(&peer_id) { + response_channel.send(Ok(())).ok(); + } else { + let channels = self.dial_queries.entry(peer_id).or_default(); + channels.push(response_channel); - let dial_opts = DialOpts::peer_id(peer_id) - .addresses(addresses) - .condition(libp2p::swarm::dial_opts::PeerCondition::Always) - .build(); - if let Err(e) = Swarm::dial(&mut self.swarm, dial_opts) { - warn!("invalid dial options: {:?}", e); - while let Some(channel) = channels.pop() { - channel.send(false).ok(); + let dial_opts = DialOpts::peer_id(peer_id) + .condition(libp2p::swarm::dial_opts::PeerCondition::Always) + .build(); + if let Err(e) = Swarm::dial(&mut self.swarm, dial_opts) { + while let Some(channel) = channels.pop() { + channel + .send(Err(anyhow!("error dialing peer {:?}: {}", peer_id, e))) + .ok(); + } } } } + RpcMessage::AddressesOfPeer(response_channel, peer_id) => { + let addrs = self.swarm.behaviour_mut().addresses_of_peer(&peer_id); + response_channel.send(addrs).ok(); + } RpcMessage::NetDisconnect(response_channel, _peer_id) => { warn!("NetDisconnect API not yet implemented"); // TODO: implement NetDisconnect @@ -847,6 +962,50 @@ impl Node { } } } + RpcMessage::ListenForIdentify(response_channel, peer_id) => { + let channels = self.lookup_queries.entry(peer_id).or_default(); + channels.push(response_channel); + } + RpcMessage::LookupPeerInfo(response_channel, peer_id) => { + if let Some(info) = self.swarm.behaviour().peer_manager.info_for_peer(&peer_id) { + response_channel.send(info.last_info.clone()).ok(); + } else { + response_channel.send(None).ok(); + } + } + RpcMessage::CancelListenForIdentify(response_channel, peer_id) => { + self.lookup_queries.remove(&peer_id); + response_channel.send(()).ok(); + } + RpcMessage::FindPeerOnDHT(response_channel, peer_id) => { + debug!("find closest peers for: {:?}", peer_id); + if let Some(kad) = self.swarm.behaviour_mut().kad.as_mut() { + match self + .kad_queries + .entry(QueryKey::FindPeerOnDHTKey(peer_id.to_bytes())) + { + std::collections::hash_map::Entry::Occupied(mut entry) => { + if let KadQueryChannel::FindPeerOnDHT { channels, .. } = entry.get_mut() + { + channels.push(response_channel); + } + } + std::collections::hash_map::Entry::Vacant(entry) => { + kad.get_closest_peers(peer_id); + entry.insert(KadQueryChannel::FindPeerOnDHT { + peer_id, + channels: vec![response_channel], + }); + } + } + } else { + tokio::task::spawn(async move { + response_channel + .send(Err(anyhow!("kademlia is not available"))) + .ok(); + }); + } + } RpcMessage::Shutdown => { return Ok(true); } diff --git a/iroh-p2p/src/rpc.rs b/iroh-p2p/src/rpc.rs index 4216b6608d8..26dd8ba1c41 100644 --- a/iroh-p2p/src/rpc.rs +++ b/iroh-p2p/src/rpc.rs @@ -10,6 +10,7 @@ use libp2p::gossipsub::{ error::{PublishError, SubscriptionError}, MessageId, TopicHash, }; +use libp2p::identify::IdentifyInfo; use libp2p::kad::record::Key; use libp2p::Multiaddr; use libp2p::PeerId; @@ -20,12 +21,12 @@ use tracing::trace; use async_trait::async_trait; use iroh_bitswap::Block; use iroh_rpc_types::p2p::{ - BitswapRequest, BitswapResponse, ConnectRequest, ConnectResponse, DisconnectRequest, + BitswapRequest, BitswapResponse, ConnectByPeerIdRequest, ConnectRequest, DisconnectRequest, GetListeningAddrsResponse, GetPeersResponse, GossipsubAllPeersResponse, GossipsubPeerAndTopics, GossipsubPeerIdMsg, GossipsubPeersResponse, GossipsubPublishRequest, GossipsubPublishResponse, GossipsubSubscribeResponse, GossipsubTopicHashMsg, GossipsubTopicsResponse, Key as ProviderKey, - Multiaddrs, NotifyNewBlocksBitswapRequest, P2p as RpcP2p, P2pServerAddr, PeerIdResponse, - Providers, StopSessionBitswapRequest, VersionResponse, + LookupRequest, Multiaddrs, NotifyNewBlocksBitswapRequest, P2p as RpcP2p, P2pServerAddr, + PeerIdResponse, PeerInfo, Providers, StopSessionBitswapRequest, VersionResponse, }; use super::node::DEFAULT_PROVIDER_LIMIT; @@ -252,15 +253,37 @@ impl RpcP2p for P2p { } #[tracing::instrument(skip(self, req))] - async fn peer_connect(&self, req: ConnectRequest) -> Result { + async fn peer_connect_by_peer_id(&self, req: ConnectByPeerIdRequest) -> Result<()> { + let peer_id = peer_id_from_bytes(req.peer_id)?; + let (s, r) = oneshot::channel(); + // ask the swarm if we already have address for this peer + let msg = RpcMessage::AddressesOfPeer(s, peer_id); + self.sender.send(msg).await?; + let res = r.await?; + if res.is_empty() { + // if we don't have the addr info for this peer, we need to try to + // find it on the dht + let (s, r) = oneshot::channel(); + let msg = RpcMessage::FindPeerOnDHT(s, peer_id); + self.sender.send(msg).await?; + r.await??; + } + // now we know we have found the peer on the dht, + // we can attempt to dial it + let (s, r) = oneshot::channel(); + let msg = RpcMessage::NetConnectByPeerId(s, peer_id); + self.sender.send(msg).await?; + r.await? + } + + #[tracing::instrument(skip(self, req))] + async fn peer_connect(&self, req: ConnectRequest) -> Result<()> { let peer_id = peer_id_from_bytes(req.peer_id)?; let addrs = addrs_from_bytes(req.addrs)?; let (s, r) = oneshot::channel(); let msg = RpcMessage::NetConnect(s, peer_id, addrs); self.sender.send(msg).await?; - - let success = r.await?; - Ok(ConnectResponse { success }) + r.await? } #[tracing::instrument(skip(self, req))] @@ -274,6 +297,56 @@ impl RpcP2p for P2p { Ok(ack) } + #[tracing::instrument(skip(self, req))] + async fn lookup(&self, req: LookupRequest) -> Result { + let (s, r) = oneshot::channel(); + let peer_id = peer_id_from_bytes(req.peer_id.clone())?; + + // check if we have already encountered this peer, and already + // that the peer info + let msg = RpcMessage::LookupPeerInfo(s, peer_id); + self.sender.send(msg).await?; + if let Some(info) = r.await? { + return Ok(peer_info_from_identify_info(info)); + } + + // listen for if any peer info for this peer gets sent to us + let (s, r) = oneshot::channel(); + let msg = RpcMessage::ListenForIdentify(s, peer_id); + self.sender.send(msg).await?; + + // once we connect to the peer, the idenitfy protocol + // will attempt to exchange peer info + let res = match req.addr { + Some(addr) => { + self.peer_connect(ConnectRequest { + peer_id: req.peer_id, + addrs: vec![addr], + }) + .await + } + None => { + self.peer_connect_by_peer_id(ConnectByPeerIdRequest { + peer_id: req.peer_id, + }) + .await + } + }; + + if let Err(e) = res { + let (s, r) = oneshot::channel(); + self.sender + .send(RpcMessage::CancelListenForIdentify(s, peer_id)) + .await?; + r.await?; + anyhow::bail!("Cannot get peer information: {}", e); + } + + let info = r.await??; + + Ok(peer_info_from_identify_info(info)) + } + #[tracing::instrument(skip(self, req))] async fn gossipsub_add_explicit_peer(&self, req: GossipsubPeerIdMsg) -> Result<()> { let (s, r) = oneshot::channel(); @@ -415,6 +488,22 @@ pub async fn new(addr: P2pServerAddr, sender: Sender) -> Result<()> iroh_rpc_types::p2p::serve(addr, p2p).await } +fn peer_info_from_identify_info(i: IdentifyInfo) -> PeerInfo { + let peer_id = i.public_key.to_peer_id(); + PeerInfo { + peer_id: peer_id.to_bytes(), + protocol_version: i.protocol_version, + agent_version: i.agent_version, + listen_addrs: i + .listen_addrs + .into_iter() + .map(|addr| addr.to_vec()) + .collect(), + protocols: i.protocols, + observed_addr: i.observed_addr.to_vec(), + } +} + fn peer_id_from_bytes(p: Vec) -> Result { PeerId::from_bytes(&p[..]).context("invalid peer_id") } @@ -462,9 +551,15 @@ pub enum RpcMessage { StopProviding(oneshot::Sender>, Key), NetListeningAddrs(oneshot::Sender<(PeerId, Vec)>), NetPeers(oneshot::Sender>>), - NetConnect(oneshot::Sender, PeerId, Vec), + NetConnectByPeerId(oneshot::Sender>, PeerId), + NetConnect(oneshot::Sender>, PeerId, Vec), NetDisconnect(oneshot::Sender<()>, PeerId), Gossipsub(GossipsubMessage), + FindPeerOnDHT(oneshot::Sender>, PeerId), + LookupPeerInfo(oneshot::Sender>, PeerId), + ListenForIdentify(oneshot::Sender>, PeerId), + CancelListenForIdentify(oneshot::Sender<()>, PeerId), + AddressesOfPeer(oneshot::Sender>, PeerId), Shutdown, } diff --git a/iroh-rpc-client/src/lib.rs b/iroh-rpc-client/src/lib.rs index 02a185063d4..0883c2e4dc9 100644 --- a/iroh-rpc-client/src/lib.rs +++ b/iroh-rpc-client/src/lib.rs @@ -11,7 +11,7 @@ mod store; pub use crate::client::Client; pub use crate::config::Config; -pub use crate::network::P2pClient; +pub use crate::network::{Lookup, P2pClient}; #[cfg(feature = "grpc")] pub use crate::status::{ServiceStatus, StatusRow, StatusTable}; pub use crate::store::StoreClient; diff --git a/iroh-rpc-client/src/network.rs b/iroh-rpc-client/src/network.rs index cc8e4a3476d..ca44ef4e147 100644 --- a/iroh-rpc-client/src/network.rs +++ b/iroh-rpc-client/src/network.rs @@ -1,16 +1,17 @@ use std::collections::{HashMap, HashSet}; +use std::fmt::Display; -use anyhow::{ensure, Context, Result}; +use anyhow::{Context, Result}; use bytes::Bytes; use cid::Cid; use futures::{Stream, StreamExt}; #[cfg(feature = "grpc")] use iroh_rpc_types::p2p::p2p_client::P2pClient as GrpcP2pClient; use iroh_rpc_types::p2p::{ - BitswapBlock, BitswapRequest, ConnectRequest, DisconnectRequest, GossipsubPeerAndTopics, - GossipsubPeerIdMsg, GossipsubPublishRequest, GossipsubTopicHashMsg, Key, - NotifyNewBlocksBitswapRequest, P2p, P2pClientAddr, P2pClientBackend, Providers, - StopSessionBitswapRequest, + BitswapBlock, BitswapRequest, ConnectByPeerIdRequest, ConnectRequest, DisconnectRequest, + GossipsubPeerAndTopics, GossipsubPeerIdMsg, GossipsubPublishRequest, GossipsubTopicHashMsg, + Key, LookupRequest, NotifyNewBlocksBitswapRequest, P2p, P2pClientAddr, P2pClientBackend, + PeerInfo, Providers, StopSessionBitswapRequest, }; use iroh_rpc_types::Addr; use libp2p::gossipsub::{MessageId, TopicHash}; @@ -153,13 +154,28 @@ impl P2pClient { #[tracing::instrument(skip(self))] pub async fn connect(&self, peer_id: PeerId, addrs: Vec) -> Result<()> { - let req = ConnectRequest { + if !addrs.is_empty() { + let req = ConnectRequest { + peer_id: peer_id.to_bytes(), + addrs: addrs.iter().map(|a| a.to_vec()).collect(), + }; + self.backend.peer_connect(req).await + } else { + let req = ConnectByPeerIdRequest { + peer_id: peer_id.to_bytes(), + }; + self.backend.peer_connect_by_peer_id(req).await + } + } + + #[tracing::instrument(skip(self))] + pub async fn lookup(&self, peer_id: PeerId, addr: Option) -> Result { + let req = LookupRequest { peer_id: peer_id.to_bytes(), - addrs: addrs.iter().map(|a| a.to_vec()).collect(), + addr: addr.map(|a| a.to_vec()), }; - let res = self.backend.peer_connect(req).await?; - ensure!(res.success, "dial failed"); - Ok(()) + let lookup = self.backend.lookup(req).await?; + lookup_from_peer_info(lookup) } #[tracing::instrument(skip(self))] @@ -257,6 +273,36 @@ impl P2pClient { } } +#[derive(Debug)] +pub struct Lookup { + pub peer_id: PeerId, + pub listen_addrs: Vec, + pub observed_addrs: Vec, + pub protocol_version: String, + pub agent_version: String, + pub protocols: Vec, +} + +impl Display for Lookup { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "peer id: {}\nlistening addresses: {:?}\nprotocols: {:?}\nprotocol version: {}\nobserved addresses: {:?}\n", self.peer_id, self.listen_addrs, self.protocols, self.protocol_version, self.observed_addrs) + } +} + +fn lookup_from_peer_info(p: PeerInfo) -> Result { + let peer_id = peer_id_from_bytes(p.peer_id)?; + let listen_addrs = addrs_from_bytes(p.listen_addrs)?; + let addr = addr_from_bytes(p.observed_addr)?; + Ok(Lookup { + peer_id, + protocol_version: p.protocol_version, + agent_version: p.agent_version, + listen_addrs, + protocols: p.protocols, + observed_addrs: vec![addr], + }) +} + fn peers_and_topics_from_bytes(pt: GossipsubPeerAndTopics) -> Result<(PeerId, Vec)> { let peer_id = peer_id_from_bytes(pt.peer_id)?; let topics = pt.topics.into_iter().map(TopicHash::from_raw).collect(); @@ -291,7 +337,7 @@ mod tests { use super::*; use async_trait::async_trait; use iroh_rpc_types::p2p::{ - p2p_server, BitswapResponse, ConnectResponse, GetListeningAddrsResponse, GetPeersResponse, + p2p_server, BitswapResponse, GetListeningAddrsResponse, GetPeersResponse, GossipsubAllPeersResponse, GossipsubPeersResponse, GossipsubPublishResponse, GossipsubSubscribeResponse, GossipsubTopicsResponse, Multiaddrs, PeerIdResponse, VersionResponse, @@ -429,12 +475,21 @@ mod tests { ) -> Result, tonic::Status> { todo!() } + async fn peer_connect( &self, _request: Request, - ) -> Result, tonic::Status> { + ) -> Result, tonic::Status> { + todo!() + } + + async fn peer_connect_by_peer_id( + &self, + _request: Request, + ) -> Result, tonic::Status> { todo!() } + async fn peer_disconnect( &self, _request: Request, @@ -449,6 +504,13 @@ mod tests { todo!() } + async fn lookup( + &self, + _request: Request, + ) -> Result, tonic::Status> { + todo!() + } + async fn gossipsub_add_explicit_peer( &self, _request: Request, diff --git a/iroh-rpc-types/proto/p2p.proto b/iroh-rpc-types/proto/p2p.proto index 429c1a4beca..1aa8542b900 100644 --- a/iroh-rpc-types/proto/p2p.proto +++ b/iroh-rpc-types/proto/p2p.proto @@ -16,9 +16,11 @@ service P2p { rpc StopProviding(Key) returns (google.protobuf.Empty) {} rpc GetListeningAddrs(google.protobuf.Empty) returns (GetListeningAddrsResponse) {} rpc GetPeers(google.protobuf.Empty) returns (GetPeersResponse) {} - rpc PeerConnect(ConnectRequest) returns (ConnectResponse) {} + rpc PeerConnect(ConnectRequest) returns (google.protobuf.Empty) {} + rpc PeerConnectByPeerId(ConnectByPeerIdRequest) returns (google.protobuf.Empty) {} rpc PeerDisconnect(DisconnectRequest) returns (google.protobuf.Empty) {} rpc Shutdown(google.protobuf.Empty) returns (google.protobuf.Empty) {} + rpc Lookup(LookupRequest) returns (PeerInfo) {} rpc GossipsubAddExplicitPeer(GossipsubPeerIdMsg) returns (google.protobuf.Empty) {} rpc GossipsubAllMeshPeers(google.protobuf.Empty) returns (GossipsubPeersResponse) {} @@ -95,15 +97,15 @@ message GetPeersResponse { map peers = 1; } -message ConnectRequest { - // Serialized peer id - bytes peer_id = 1; - // Serialized list of multiaddrs - repeated bytes addrs = 2; +message ConnectByPeerIdRequest { + bytes peer_id = 1; } -message ConnectResponse { - bool success = 1; +message ConnectRequest { + // Serialized PeerId + bytes peer_id = 1; + // Serialized multiaddr + repeated bytes addrs = 2; } message DisconnectRequest { @@ -111,12 +113,32 @@ message DisconnectRequest { bytes peer_id = 1; } +message LookupRequest { + // PeerId + bytes peer_id = 1; + // Serialized multiaddr + optional bytes addr = 2; +} + +message PeerInfo { + // PublicKey + bytes peer_id = 1; + // String + string protocol_version = 2; + // string + string agent_version = 3; + // vec of Multiaddrs + repeated bytes listen_addrs = 4; + // vec of Strings + repeated string protocols = 5; + // Multiaddr + bytes observed_addr = 6; +} message Multiaddrs { // Serialized list of multiaddrs repeated bytes addrs = 1; } - message GossipsubPeerIdMsg { // Serialized PeerId bytes peer_id = 1; @@ -162,4 +184,3 @@ message GossipsubTopicsResponse { repeated string topics = 1; } - diff --git a/iroh-rpc-types/src/p2p.rs b/iroh-rpc-types/src/p2p.rs index be4d14fe8f0..b0dd090eb9d 100644 --- a/iroh-rpc-types/src/p2p.rs +++ b/iroh-rpc-types/src/p2p.rs @@ -12,8 +12,10 @@ proxy!( notify_new_blocks_bitswap: NotifyNewBlocksBitswapRequest => () => (), get_listening_addrs: () => GetListeningAddrsResponse => GetListeningAddrsResponse, get_peers: () => GetPeersResponse => GetPeersResponse, - peer_connect: ConnectRequest => ConnectResponse => ConnectResponse, + peer_connect: ConnectRequest => () => (), + peer_connect_by_peer_id: ConnectByPeerIdRequest => () => (), peer_disconnect: DisconnectRequest => () => (), + lookup: LookupRequest => PeerInfo => PeerInfo, gossipsub_add_explicit_peer: GossipsubPeerIdMsg => () => (), gossipsub_all_mesh_peers: () => GossipsubPeersResponse => GossipsubPeersResponse, gossipsub_all_peers: () => GossipsubAllPeersResponse => GossipsubAllPeersResponse, diff --git a/iroh/src/fixture.rs b/iroh/src/fixture.rs index 256e47a00fb..3ca2e034480 100644 --- a/iroh/src/fixture.rs +++ b/iroh/src/fixture.rs @@ -23,7 +23,10 @@ fn fixture_lookup() -> MockApi { Ok(Lookup { peer_id, listen_addrs: vec![], - local_addrs: vec![], + observed_addrs: vec![], + agent_version: String::new(), + protocols: vec![], + protocol_version: String::new(), }) }); Ok(mock_p2p) diff --git a/iroh/src/p2p.rs b/iroh/src/p2p.rs index 7463c09714e..70520bcd9cd 100644 --- a/iroh/src/p2p.rs +++ b/iroh/src/p2p.rs @@ -27,7 +27,7 @@ pub enum P2pCommands { #[clap(after_help = doc::P2P_LOOKUP_LONG_DESCRIPTION)] Lookup { /// multiaddress or peer ID - addr: PeerIdOrAddrArg, + addr: Option, }, } @@ -56,8 +56,11 @@ pub async fn run_command(p2p: &impl P2pApi, cmd: &P2p) -> Result<()> { Err(e) => return Err(e), }, P2pCommands::Lookup { addr } => { - let lookup = p2p.lookup(&addr.0).await?; - println!("peer id: {}", lookup.peer_id); + let lookup = match addr { + Some(addr) => p2p.lookup(&addr.0).await?, + None => p2p.lookup_local().await?, + }; + println!("{}", lookup); } }; Ok(()) diff --git a/iroh/tests/cmd/lookup.trycmd b/iroh/tests/cmd/lookup.trycmd index b594499a89c..59e32a9b4c6 100644 --- a/iroh/tests/cmd/lookup.trycmd +++ b/iroh/tests/cmd/lookup.trycmd @@ -1,5 +1,10 @@ ``` $ iroh p2p lookup 1AXRDqR8jTkwzGqyu3qknicAC5X578zTMxhAi2brppK2bB peer id: 1AXRDqR8jTkwzGqyu3qknicAC5X578zTMxhAi2brppK2bB +listening addresses: [] +protocols: [] +protocol version: +observed addresses: [] -``` \ No newline at end of file + +```