From 071fd1bad42c8d3228a478a92af74bf321195dd8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabrice=20Desr=C3=A9?= Date: Mon, 22 Aug 2022 06:50:50 -0700 Subject: [PATCH] feat(p2p): make gossipsub configurable --- iroh-p2p/src/behaviour.rs | 17 ++-- iroh-p2p/src/config.rs | 5 ++ iroh-p2p/src/node.rs | 165 ++++++++++++++++---------------------- 3 files changed, 85 insertions(+), 102 deletions(-) diff --git a/iroh-p2p/src/behaviour.rs b/iroh-p2p/src/behaviour.rs index c6d8b10a06..b8a3034c2a 100644 --- a/iroh-p2p/src/behaviour.rs +++ b/iroh-p2p/src/behaviour.rs @@ -40,7 +40,7 @@ pub(crate) struct NodeBehaviour { relay: Toggle, relay_client: Toggle, dcutr: Toggle, - pub(crate) gossipsub: Gossipsub, + pub(crate) gossipsub: Toggle, } impl NodeBehaviour { @@ -135,10 +135,17 @@ impl NodeBehaviour { Identify::new(config) }; - let gossipsub_config = GossipsubConfig::default(); - let message_authenticity = MessageAuthenticity::Signed(local_key.clone()); - let gossipsub = Gossipsub::new(message_authenticity, gossipsub_config) - .map_err(|e| anyhow::anyhow!("{}", e))?; + let gossipsub = if config.gossipsub { + let gossipsub_config = GossipsubConfig::default(); + let message_authenticity = MessageAuthenticity::Signed(local_key.clone()); + Some( + Gossipsub::new(message_authenticity, gossipsub_config) + .map_err(|e| anyhow::anyhow!("{}", e))?, + ) + } else { + None + } + .into(); Ok(NodeBehaviour { ping: Ping::default(), diff --git a/iroh-p2p/src/config.rs b/iroh-p2p/src/config.rs index 325a5f6c92..6560293401 100644 --- a/iroh-p2p/src/config.rs +++ b/iroh-p2p/src/config.rs @@ -48,6 +48,8 @@ pub struct Libp2pConfig { pub relay_server: bool, /// Relay client enabled. pub relay_client: bool, + /// Gossipsub enabled. + pub gossipsub: bool, pub max_conns_out: u32, pub max_conns_in: u32, pub max_conns_pending_out: u32, @@ -114,6 +116,7 @@ impl Source for Libp2pConfig { insert_into_config_map(&mut map, "mdns", self.mdns); insert_into_config_map(&mut map, "relay_server", self.relay_server); insert_into_config_map(&mut map, "relay_client", self.relay_client); + insert_into_config_map(&mut map, "gossipsub", self.gossipsub); let peers: Vec = self.bootstrap_peers.iter().map(|b| b.to_string()).collect(); insert_into_config_map(&mut map, "bootstrap_peers", peers); insert_into_config_map( @@ -155,6 +158,7 @@ impl Default for Libp2pConfig { autonat: true, relay_server: true, relay_client: true, + gossipsub: true, max_conns_pending_out: 256, max_conns_pending_in: 256, max_conns_in: 256, @@ -293,6 +297,7 @@ mod tests { "relay_client".to_string(), Value::new(None, default.relay_client), ); + expect.insert("gossipsub".to_string(), Value::new(None, default.gossipsub)); expect.insert( "bootstrap_peers".to_string(), Value::new(None, bootstrap_peers), diff --git a/iroh-p2p/src/node.rs b/iroh-p2p/src/node.rs index 3f654fa46a..8e17f1562f 100644 --- a/iroh-p2p/src/node.rs +++ b/iroh-p2p/src/node.rs @@ -625,104 +625,75 @@ impl Node { .send(()) .map_err(|_| anyhow!("sender dropped"))?; } - RpcMessage::Gossipsub(g) => match g { - rpc::GossipsubMessage::AddExplicitPeer(response_channel, peer_id) => { - self.swarm - .behaviour_mut() - .gossipsub - .add_explicit_peer(&peer_id); - response_channel - .send(()) - .map_err(|_| anyhow!("sender dropped"))?; - } - rpc::GossipsubMessage::AllMeshPeers(response_channel) => { - let peers = self - .swarm - .behaviour_mut() - .gossipsub - .all_mesh_peers() - .copied() - .collect(); - response_channel - .send(peers) - .map_err(|_| anyhow!("sender dropped"))?; - } - rpc::GossipsubMessage::AllPeers(response_channel) => { - let all_peers = self - .swarm - .behaviour_mut() - .gossipsub - .all_peers() - .map(|(p, t)| (*p, t.into_iter().cloned().collect())) - .collect(); - response_channel - .send(all_peers) - .map_err(|_| anyhow!("sender dropped"))?; - } - rpc::GossipsubMessage::MeshPeers(response_channel, topic_hash) => { - let peers = self - .swarm - .behaviour_mut() - .gossipsub - .mesh_peers(&topic_hash) - .copied() - .collect(); - response_channel - .send(peers) - .map_err(|_| anyhow!("sender dropped"))?; - } - rpc::GossipsubMessage::Publish(response_channel, topic_hash, bytes) => { - let res = self - .swarm - .behaviour_mut() - .gossipsub - .publish(IdentTopic::new(topic_hash.into_string()), bytes.to_vec()); - response_channel - .send(res) - .map_err(|_| anyhow!("sender dropped"))?; - } - rpc::GossipsubMessage::RemoveExplicitPeer(response_channel, peer_id) => { - self.swarm - .behaviour_mut() - .gossipsub - .remove_explicit_peer(&peer_id); - response_channel - .send(()) - .map_err(|_| anyhow!("sender dropped"))?; - } - rpc::GossipsubMessage::Subscribe(response_channel, topic_hash) => { - let res = self - .swarm - .behaviour_mut() - .gossipsub - .subscribe(&IdentTopic::new(topic_hash.into_string())); - response_channel - .send(res) - .map_err(|_| anyhow!("sender dropped"))?; - } - rpc::GossipsubMessage::Topics(response_channel) => { - let topics = self - .swarm - .behaviour_mut() - .gossipsub - .topics() - .cloned() - .collect(); - response_channel - .send(topics) - .map_err(|_| anyhow!("sender dropped"))?; - } - rpc::GossipsubMessage::Unsubscribe(response_channel, topic_hash) => { - let res = self - .swarm - .behaviour_mut() - .gossipsub - .unsubscribe(&IdentTopic::new(topic_hash.into_string())); - response_channel - .send(res) - .map_err(|_| anyhow!("sender dropped"))?; + RpcMessage::Gossipsub(g) => { + let gossipsub = match self.swarm.behaviour_mut().gossipsub.as_mut() { + Some(gossipsub) => gossipsub, + None => { + tracing::warn!("Unexpected gossipsub message"); + return Ok(false); + } + }; + match g { + rpc::GossipsubMessage::AddExplicitPeer(response_channel, peer_id) => { + gossipsub.add_explicit_peer(&peer_id); + response_channel + .send(()) + .map_err(|_| anyhow!("sender dropped"))?; + } + rpc::GossipsubMessage::AllMeshPeers(response_channel) => { + let peers = gossipsub.all_mesh_peers().copied().collect(); + response_channel + .send(peers) + .map_err(|_| anyhow!("sender dropped"))?; + } + rpc::GossipsubMessage::AllPeers(response_channel) => { + let all_peers = gossipsub + .all_peers() + .map(|(p, t)| (*p, t.into_iter().cloned().collect())) + .collect(); + response_channel + .send(all_peers) + .map_err(|_| anyhow!("sender dropped"))?; + } + rpc::GossipsubMessage::MeshPeers(response_channel, topic_hash) => { + let peers = gossipsub.mesh_peers(&topic_hash).copied().collect(); + response_channel + .send(peers) + .map_err(|_| anyhow!("sender dropped"))?; + } + rpc::GossipsubMessage::Publish(response_channel, topic_hash, bytes) => { + let res = gossipsub + .publish(IdentTopic::new(topic_hash.into_string()), bytes.to_vec()); + response_channel + .send(res) + .map_err(|_| anyhow!("sender dropped"))?; + } + rpc::GossipsubMessage::RemoveExplicitPeer(response_channel, peer_id) => { + gossipsub.remove_explicit_peer(&peer_id); + response_channel + .send(()) + .map_err(|_| anyhow!("sender dropped"))?; + } + rpc::GossipsubMessage::Subscribe(response_channel, topic_hash) => { + let res = gossipsub.subscribe(&IdentTopic::new(topic_hash.into_string())); + response_channel + .send(res) + .map_err(|_| anyhow!("sender dropped"))?; + } + rpc::GossipsubMessage::Topics(response_channel) => { + let topics = gossipsub.topics().cloned().collect(); + response_channel + .send(topics) + .map_err(|_| anyhow!("sender dropped"))?; + } + rpc::GossipsubMessage::Unsubscribe(response_channel, topic_hash) => { + let res = gossipsub.unsubscribe(&IdentTopic::new(topic_hash.into_string())); + response_channel + .send(res) + .map_err(|_| anyhow!("sender dropped"))?; + } } - }, + } RpcMessage::Shutdown => { return Ok(true); }