diff --git a/transport/src/bootnode.rs b/transport/src/bootnode.rs index 0aecea1..819b354 100644 --- a/transport/src/bootnode.rs +++ b/transport/src/bootnode.rs @@ -69,7 +69,7 @@ async fn main() -> anyhow::Result<()> { kademlia: kad::Behaviour::with_config( local_peer_id, MemoryStore::new(local_peer_id), - Default::default(), + Default::default(), // Same here, this is the default IPFS network. ), relay: relay::Behaviour::new(local_peer_id, Default::default()), gossipsub: gossipsub::Behaviour::new( @@ -104,7 +104,7 @@ async fn main() -> anyhow::Result<()> { swarm.add_external_address(public_addr); } - swarm.behaviour_mut().kademlia.set_mode(Some(Mode::Server)); + swarm.behaviour_mut().kademlia.set_mode(Some(Mode::Server)); // This is unnecessary if you are setting an external address. // Connect to other boot nodes for BootNode { peer_id, address } in @@ -113,15 +113,23 @@ async fn main() -> anyhow::Result<()> { log::info!("Connecting to boot node {peer_id} at {address}"); swarm.behaviour_mut().autonat.add_server(peer_id, Some(address.clone())); swarm.behaviour_mut().kademlia.add_address(&peer_id, address.clone()); + + // By default, the `Swarm` will consult all behaviours for addresses for a peer. + // If you specify `.addresses` here, it won't. + // Unless you want to be specific about _just_ using that one address, I'd leave it off and just specify the peer ID. swarm.dial(DialOpts::peer_id(peer_id).addresses(vec![address]).build())?; } + // As mentioned elsewhere, bootstrap is now included automatically. if swarm.behaviour_mut().kademlia.bootstrap().is_err() { log::warn!("No peers connected. Cannot bootstrap kademlia.") } let mut sigint = signal(SignalKind::interrupt())?; let mut sigterm = signal(SignalKind::terminate())?; + + // Have you looked at the implementation? :) + // This statically returns `false` so you can also just use a `loop`! while !swarm.is_terminated() { let event = tokio::select! { event = swarm.select_next_some() => event, diff --git a/transport/src/task_manager.rs b/transport/src/task_manager.rs index 759433b..2e7fe3b 100644 --- a/transport/src/task_manager.rs +++ b/transport/src/task_manager.rs @@ -14,7 +14,7 @@ pub const DEFAULT_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(1); pub struct TaskManager { shutdown_timeout: Duration, cancel_token: CancellationToken, - tasks: Vec>, + tasks: Vec>, // This can technically grow unbounded. } impl Default for TaskManager { diff --git a/transport/src/transport.rs b/transport/src/transport.rs index 5186c4a..97b7639 100644 --- a/transport/src/transport.rs +++ b/transport/src/transport.rs @@ -86,9 +86,22 @@ where kademlia: kad::Behaviour, relay: RelayClient, dcutr: dcutr::Behaviour, + // I am not sure I fully understand your use of the `request_response` behaviour. + // You seem to be ignoring the association of requests and responses because all responses get dumped into a single stream. + // If all your messages are just events (i.e. don't have a response), I would suggest to don't send responses at all. + // Instead, have both sides just send each other "requests". + // That should simplify your event-handling. + // + // You may also want to look into `libp2p-stream` if you need generic stream-handling and not messages. + // + // Another thing to consider is that currently, you aren't really making use of the protocol-based multiplexing capabilities. + // I am not sure where the requirement for just sending messages comes but it might be worthwhile to consider to send them over multiple protocols. request: request_response::Behaviour>, gossipsub: gossipsub::Behaviour, ping: ping::Behaviour, + + // AutoNATv1 is somewhat broken because it may accidentially hole-punch for you and thus report an invalid NAT status. + // v2 is in the works. Progress is a bit slow because there were some disruptions but I am confident it will ship eventually. autonat: autonat::Behaviour, } @@ -197,6 +210,7 @@ impl Default for P2PTransportBuilder { } } +// More of a nit but I'd expect the functions of a "builder" to return `Self` so it can be chained. impl P2PTransportBuilder { pub fn new() -> Self { let keypair = Keypair::generate_ed25519(); @@ -295,7 +309,7 @@ impl P2PTransportBuilder { kademlia: kad::Behaviour::with_config( local_peer_id, MemoryStore::new(local_peer_id), - Default::default(), + Default::default(), // With the default config, you are running on the IPFS DHT. You probably want to change this to your own protocol string. ), dcutr: dcutr::Behaviour::new(local_peer_id), request: request_response::Behaviour::new( @@ -420,6 +434,10 @@ impl P2PTransportBuilder { for addr in self.listen_addrs { swarm.listen_on(addr)?; } + + // Why block on waiting for listening? + // In general, `Swarm` will always do many things concurrently. + // I would not recommend to interact with `Swarm` in a linear fashion. Self::wait_for_listening( &mut swarm, #[cfg(feature = "metrics")] @@ -438,8 +456,18 @@ impl P2PTransportBuilder { log::info!("Connecting to boot node {peer_id} at {address}"); swarm.behaviour_mut().autonat.add_server(peer_id, Some(address.clone())); swarm.behaviour_mut().kademlia.add_address(&peer_id, address.clone()); - swarm.dial(DialOpts::peer_id(peer_id).addresses(vec![address]).build())?; + let opts = DialOpts::peer_id(peer_id).addresses(vec![address]).build(); + + let _ = opts.connection_id(); + + swarm.dial(opts)?; } + + // You don't really need to wait for the connection before you proceed. + // But if you really wanted to, you should identify it by `ConnectionId`. + // You can grab the connection it from the `DialOpts`, see above. + // Then, wait for an connection established or failed event with this connection ID. + Self::wait_for_first_connection( &mut swarm, #[cfg(feature = "metrics")] @@ -450,9 +478,18 @@ impl P2PTransportBuilder { // Connect to relay and listen for relayed connections if self.relay { + // Why only connect to a single relay? + // Reservations are pretty cheap (only a single file-descriptor for the open connection) + // + // You may also be interested in https://github.com/libp2p/rust-libp2p/issues/4651. let addr = relay_addr.ok_or(Error::NoRelay)?; log::info!("Connecting to relay {addr}"); + + // By issuing a `listen_on` with `/p2p-circuit`, we will automatically dial the relay if we don't have a connection to it yet. + // So you don't need to dial the relay here, calling `listen_on` is enough. swarm.dial(addr.clone())?; + + // Why wait for identify? Self::wait_for_identify( &mut swarm, #[cfg(feature = "metrics")] @@ -604,6 +641,7 @@ impl P2PTransportHandle { &self, peer_id: PeerId, ) -> impl Future> { + // I've had good experienes with for this sort of stuff. let dial_sender = self.dial_sender.clone(); let (tx, rx) = oneshot::channel(); let result_sender = DialResultSender(tx); @@ -619,18 +657,33 @@ impl P2PTransportHandle { } } +// Overall, what you are doing here is good practise: +// 1. An eventloop that coordinates the swarm and incoming commands +// 2. A handle for interacting with that from other places. +// +// As I mentioned elsewhere, I'd look into pushing some of the logic here into `NetworkBehaviour`s. +// It is not super urgent but it will make long-term maintenance easier because it is easier to test to means you can likely remove some of the state from this eventloop. +// Building your logic based on reacting to events also makes clean-ups to avoid memory-leaks. struct P2PTransport { inbound_msg_sender: mpsc::Sender>, outbound_msg_receiver: mpsc::Receiver>, subscription_receiver: mpsc::Receiver, dial_receiver: DialReceiver, + + // I'd suggest renaming this, it is a bit confusing with `ongoing_dials`. pending_dials: HashMap>, + + // I'd suggest unifying the state related to one connection into a struct / enum and keying that under `ConnectionId`. + // That way, it is much easier to correctly clean-up state upon a failed connection because you don't need to check several hash maps. ongoing_dials: HashMap, + ongoing_queries: BiHashMap, - pending_messages: HashMap>, + pending_messages: HashMap>, // I'd recommend a timeout on how long you are willing to buffer messages. Otherwise this can be a memory-leak (and bad UX for the original sender). + + // Some of this state may be easier to deal with if you create your own `NetworkBehaviour` and wrap `gossipsub` with it. subscribed_topics: HashMap, // hash -> (topic, allow_unordered) sequence_numbers: HashMap<(TopicHash, PeerId), u64>, // FIXME: Potential memory leak - active_connections: HashMap>, + active_connections: HashMap>, // HashMap would be the better data structure here. swarm: Swarm>, bootstrap: bool, #[cfg(feature = "metrics")] @@ -670,13 +723,29 @@ impl P2PTransport { log::info!("P2PTransport starting"); let mut bootstrap_timer = IntervalStream::new(interval(BOOTSTRAP_INTERVAL)).fuse(); loop { + // Personally, I am not a fan of `tokio::select` because it: + // a) forces an additional syntax + // b) has very nuanced behaviour in regards to cancellation + // + // It isn't actually very difficult to write your own `poll`-based eventloop. + // You can just do what `futures::future::select` does: Call various poll APIs in a row, + // always hopping to the next one when the first one returns `Pending`. + // That way, you are enforcing syntactically that nothing `.await`s + // in the eventloop (see comment below). + // Blocking the eventloop will stall your application and should be avoided. tokio::select! { _ = cancel_token.cancelled() => break, + + // There is now a built-in functionality to automatically bootstrap: + // https://github.com/libp2p/rust-libp2p/commit/6a916174b0d3c16d2cfcebe5b168690d41484d10 _ = bootstrap_timer.select_next_some() => { if !self.bootstrap() { break } }, + // I would advise to _not_ block this event loop, i.e. don't call `.await` in here. + // Quickly scanning the code suggests that it is actually unnecessary. + // You may want to activate the following clippy lint: https://rust-lang.github.io/rust-clippy/master/#/unused_async event = self.swarm.select_next_some() => self.handle_swarm_event(event).await.unwrap_or_else(|e| { log::error!("Error handling swarm event: {e}") }), @@ -716,6 +785,10 @@ impl P2PTransport { } fn peer_addrs(&mut self, peer_id: PeerId) -> Vec { + // You aren't really supposed to call any APIs of `NetworkBehaviour` yourself. + // Some `NetworkBehaviour`s for example will change their state as part of these. + // There is no need to collect these addresses yourself. + // If you issue a dial to a certain `PeerId`, all known addresses will automatically be tried. self.swarm .behaviour_mut() .handle_pending_outbound_connection( @@ -758,6 +831,7 @@ impl P2PTransport { } e.insert((topic.to_string(), allow_unordered)); } + #[cfg(feature = "metrics")] SUBSCRIBED_TOPICS.inc(); } @@ -975,6 +1049,8 @@ impl P2PTransport { identify::Event::Received { peer_id, info } => (peer_id, info.listen_addrs), _ => return Ok(()), }; + // If you are keen for some contributions, this functionality would be reasonably easy to do + // directly in kademlia itself. See https://github.com/libp2p/rust-libp2p/issues/5313. let kademlia = &mut self.swarm.behaviour_mut().kademlia; listen_addrs.into_iter().filter(addr_is_reachable).for_each(|addr| { kademlia.add_address(&peer_id, addr); @@ -1013,6 +1089,22 @@ impl P2PTransport { }; // Query reached the peer that was looked for. Send all pending messages. + // Instead of keeping this state out here, I would suggest writing your own `NetworkBehaviour` that + // wraps `libp2p-request-response`. + // You can then implement a function that buffers a message (with a timeout?) and only sends it + // once you have a connection to that peer. This is quite easy to detect by listening for the + // various events that a `NetworkBehaviour` gets called with. + // + // This way, the functionality of sending a message to a peer is de-coupled from the kademlia queries + // and it will be sent, regardless of how the connection is established which should be more resilient. + // You can also easily unit test that using https://docs.rs/libp2p-swarm-test/latest/libp2p_swarm_test/. + // + // If you don't want to just dial every peer that you discover, you can still keep a list of "peers we want to dial" within that new behaviour. + // You can then query this list in here as part of handling the kademlia event and only dial the peers you are interested in. + // + // In general, I'd recommend to attempt writing your networking logic as `NetworkBehaviour`. + // It will modularize your codebase and allow you to unit-test them. + // A lot of logic can be expressed by just reacting to the various events that are given to each `NetworkBehaviour`. if peers.contains(&peer_id) { self.ongoing_queries.remove_by_right(&query_id); #[cfg(feature = "metrics")] @@ -1020,6 +1112,7 @@ impl P2PTransport { self.peer_found(peer_id); } // Query finished and the peer wasn't found. Drop all pending messages and dial requests. + // I think the below can be completely dropped in favor of a timeout within above said behaviour. else if finished { self.ongoing_queries.remove_by_right(&query_id); #[cfg(feature = "metrics")] @@ -1042,7 +1135,7 @@ impl P2PTransport { fn peer_found(&mut self, peer_id: PeerId) { log::debug!("Peer found: {peer_id}"); self.pending_dials.remove(&peer_id).into_iter().flatten().for_each(|rs| { - self.dial_peer(peer_id, rs); + self.dial_peer(peer_id, rs); // Attempting to send a message using `request-response` will already attempt to the dial the peer. #[cfg(feature = "metrics")] PENDING_DIALS.dec(); }); @@ -1076,6 +1169,10 @@ impl P2PTransport { self.pending_dials.entry(peer_id).or_default().push(result_sender); #[cfg(feature = "metrics")] PENDING_DIALS.inc(); + // I would suggest tying this sort of logic to events returned from either the `Swarm` or a behaviour. + // For example, `libp2p-request-response` has a `DialFailure` variant that you could hook into. + // https://docs.rs/libp2p-request-response/latest/libp2p_request_response/enum.Event.html#variant.OutboundFailure + // If you wanted to specifically react to `NoAddresses` then that could also be something you can propose changing upstream. self.lookup_peer(peer_id); } Err(e) => { @@ -1117,6 +1214,8 @@ impl P2PTransport { let peer_conns = self.active_connections.entry(peer_id).or_default(); peer_conns.push_front(connection_id); + + // Is there a reason you cannot use https://docs.rs/libp2p-connection-limits/latest/libp2p_connection_limits/struct.ConnectionLimits.html#method.with_max_established_per_peer? if peer_conns.len() > MAX_CONNS_PER_PEER as usize { log::debug!("Connection limit reached for {peer_id}"); let conn_to_close = peer_conns.back().expect("not empty"); diff --git a/transport/src/util.rs b/transport/src/util.rs index e35c245..7082087 100644 --- a/transport/src/util.rs +++ b/transport/src/util.rs @@ -24,7 +24,7 @@ pub async fn get_keypair(path: Option) -> anyhow::Result { Err(_) => { log::info!("Generating new key and saving into {}", path.display()); let keypair = ed25519::Keypair::generate(); - tokio::fs::write(&path, keypair.to_bytes()).await?; + tokio::fs::write(&path, keypair.to_bytes()).await?; // `Keypair` has a function "to_protobuf_encoding" that uses a defined encoding to write the key. You could use that here instead of depending on the bytes representation of `ed25519`. Ok(keypair.into()) } } @@ -35,7 +35,7 @@ pub fn addr_is_reachable(addr: &Multiaddr) -> bool { Some(Protocol::Ip4(addr)) => { !(addr.is_loopback() || addr.is_link_local()) // We need to allow private addresses for testing in local environment - &&(!addr.is_private() || std::env::var("PRIVATE_NETWORK").is_ok()) + &&(!addr.is_private() || std::env::var("PRIVATE_NETWORK").is_ok()) // Side-effects like these are best passed as parameters. } Some(Protocol::Ip6(addr)) => !addr.is_loopback(), Some(Protocol::Dns(_)) => true,