Skip to content

Commit

Permalink
feat(p2p): dial MDNS peers
Browse files Browse the repository at this point in the history
* feat: upgrade to latest rust-libp2p
* feat(p2p): dial discovered mdns peers
  • Loading branch information
dignifiedquire authored Nov 21, 2022
1 parent 22e3a42 commit 5bee5b7
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 28 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ members = [
resolver = "2"

[patch.crates-io]
libp2p = { git = "https://github.com/dignifiedquire/rust-libp2p", branch = "iroh-0-50" }
# libp2p = { path = "../rust-libp2p" }
libp2p = { git = "https://github.com/dignifiedquire/rust-libp2p", branch = "iroh-0-50-1" }
#libp2p = { path = "../rust-libp2p" }

[profile.optimized-release]
inherits = 'release'
Expand Down
19 changes: 5 additions & 14 deletions iroh-bitswap/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -642,8 +642,8 @@ mod tests {
use libp2p::core::transport::upgrade::Version;
use libp2p::core::transport::Boxed;
use libp2p::identity::Keypair;
use libp2p::swarm::{SwarmBuilder, SwarmEvent};
use libp2p::tcp::{GenTcpConfig, TokioTcpTransport};
use libp2p::swarm::SwarmEvent;
use libp2p::tcp::{tokio::Transport as TcpTransport, Config as TcpConfig};
use libp2p::yamux::YamuxConfig;
use libp2p::{noise, PeerId, Swarm, Transport};
use tokio::sync::{mpsc, RwLock};
Expand Down Expand Up @@ -689,7 +689,7 @@ mod tests {
};

let peer_id = local_key.public().to_peer_id();
let transport = TokioTcpTransport::new(GenTcpConfig::default().nodelay(true))
let transport = TcpTransport::new(TcpConfig::default().nodelay(true))
.upgrade(Version::V1)
.authenticate(auth_config)
.multiplex(YamuxConfig::default())
Expand Down Expand Up @@ -779,12 +779,7 @@ mod tests {
let (peer1_id, trans) = mk_transport();
let store1 = TestStore::default();
let bs1 = Bitswap::new(peer1_id, store1.clone(), Config::default()).await;
let mut swarm1 = SwarmBuilder::new(trans, bs1, peer1_id)
.executor(Box::new(|fut| {
tokio::task::spawn(fut);
}))
.build();

let mut swarm1 = Swarm::with_tokio_executor(trans, bs1, peer1_id);
let blocks = (0..N).map(|_| create_random_block_v1()).collect::<Vec<_>>();

for block in &blocks {
Expand Down Expand Up @@ -817,11 +812,7 @@ mod tests {
let store2 = TestStore::default();
let bs2 = Bitswap::new(peer2_id, store2.clone(), Config::default()).await;

let mut swarm2 = SwarmBuilder::new(trans, bs2, peer2_id)
.executor(Box::new(|fut| {
tokio::task::spawn(fut);
}))
.build();
let mut swarm2 = Swarm::with_tokio_executor(trans, bs2, peer2_id);

let swarm2_bs = swarm2.behaviour().clone();
let peer2 = tokio::task::spawn(async move {
Expand Down
1 change: 1 addition & 0 deletions iroh-p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ features = [
"autonat",
"rsa",
"tokio",
"macros",
]

[dev-dependencies]
Expand Down
6 changes: 3 additions & 3 deletions iroh-p2p/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ use libp2p::gossipsub::{self, MessageAuthenticity};
use libp2p::identify;
use libp2p::kad::store::{MemoryStore, MemoryStoreConfig};
use libp2p::kad::{Kademlia, KademliaConfig};
use libp2p::mdns::TokioMdns as Mdns;
use libp2p::mdns::tokio::Behaviour as Mdns;
use libp2p::multiaddr::Protocol;
use libp2p::ping::Behaviour as Ping;
use libp2p::relay;
use libp2p::swarm::behaviour::toggle::Toggle;
use libp2p::NetworkBehaviour;
use libp2p::swarm::NetworkBehaviour;
use libp2p::{autonat, dcutr};
use tracing::{info, warn};

Expand All @@ -29,7 +29,7 @@ mod peer_manager;

/// Libp2p behaviour for the node.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "Event", event_process = false)]
#[behaviour(out_event = "Event")]
pub(crate) struct NodeBehaviour {
ping: Ping,
identify: identify::Behaviour,
Expand Down
2 changes: 1 addition & 1 deletion iroh-p2p/src/behaviour/event.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use iroh_bitswap::BitswapEvent;
use libp2p::{
autonat, dcutr, gossipsub::GossipsubEvent, identify::Event as IdentifyEvent,
kad::KademliaEvent, mdns::MdnsEvent, ping::Event as PingEvent, relay,
kad::KademliaEvent, mdns::Event as MdnsEvent, ping::Event as PingEvent, relay,
};

use super::peer_manager::PeerManagerEvent;
Expand Down
20 changes: 20 additions & 0 deletions iroh-p2p/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use libp2p::kad::{
self, BootstrapOk, GetClosestPeersError, GetClosestPeersOk, GetProvidersOk, KademliaEvent,
QueryId, QueryResult,
};
use libp2p::mdns;
use libp2p::metrics::Recorder;
use libp2p::multiaddr::Protocol;
use libp2p::ping::Result as PingResult;
Expand Down Expand Up @@ -670,6 +671,25 @@ impl<KeyStorage: Storage> Node<KeyStorage> {
));
}
}
Event::Mdns(e) => match e {
mdns::Event::Discovered(peers) => {
for (peer_id, addr) in peers {
let is_connected = self.swarm.is_connected(&peer_id);
debug!(
"mdns: discovered {} at {} (connected: {:?})",
peer_id, addr, is_connected
);
if !is_connected {
let dial_opts =
DialOpts::peer_id(peer_id).addresses(vec![addr]).build();
if let Err(e) = Swarm::dial(&mut self.swarm, dial_opts) {
warn!("invalid dial options: {:?}", e);
}
}
}
}
mdns::Event::Expired(_) => {}
},
_ => {
// TODO: check all important events are handled
}
Expand Down
20 changes: 12 additions & 8 deletions iroh-p2p/src/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use libp2p::{
dns,
identity::Keypair,
mplex, noise,
swarm::{ConnectionLimits, SwarmBuilder},
swarm::{ConnectionLimits, Executor, SwarmBuilder},
yamux::{self, WindowUpdateMode},
PeerId, Swarm, Transport,
};
Expand All @@ -28,10 +28,10 @@ async fn build_transport(
) {
// TODO: make transports configurable

let tcp_config = libp2p::tcp::GenTcpConfig::default().port_reuse(true);
let transport = libp2p::tcp::TokioTcpTransport::new(tcp_config.clone());
let tcp_config = libp2p::tcp::Config::default().port_reuse(true);
let transport = libp2p::tcp::tokio::Transport::new(tcp_config.clone());
let transport =
libp2p::websocket::WsConfig::new(libp2p::tcp::TokioTcpTransport::new(tcp_config))
libp2p::websocket::WsConfig::new(libp2p::tcp::tokio::Transport::new(tcp_config))
.or_transport(transport);

// TODO: configurable
Expand Down Expand Up @@ -104,15 +104,19 @@ pub(crate) async fn build_swarm(
.with_max_established_incoming(Some(config.max_conns_in))
.with_max_established_outgoing(Some(config.max_conns_out))
.with_max_established_per_peer(Some(config.max_conns_per_peer));
let swarm = SwarmBuilder::new(transport, behaviour, peer_id)
let swarm = SwarmBuilder::with_executor(transport, behaviour, peer_id, Tokio)
.connection_limits(limits)
.notify_handler_buffer_size(config.notify_handler_buffer_size.try_into()?)
.connection_event_buffer_size(config.connection_event_buffer_size)
.dial_concurrency_factor(config.dial_concurrency_factor.try_into().unwrap())
.executor(Box::new(|fut| {
tokio::task::spawn(fut);
}))
.build();

Ok(swarm)
}

struct Tokio;
impl Executor for Tokio {
fn exec(&self, fut: std::pin::Pin<Box<dyn futures::Future<Output = ()> + Send>>) {
tokio::task::spawn(fut);
}
}

0 comments on commit 5bee5b7

Please sign in to comment.