Skip to content

Commit

Permalink
feat(p2p): make gossipsub configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
fabricedesre authored Aug 22, 2022
1 parent ccb4528 commit 071fd1b
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 102 deletions.
17 changes: 12 additions & 5 deletions iroh-p2p/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub(crate) struct NodeBehaviour {
relay: Toggle<relay::v2::relay::Relay>,
relay_client: Toggle<relay::v2::client::Client>,
dcutr: Toggle<dcutr::behaviour::Behaviour>,
pub(crate) gossipsub: Gossipsub,
pub(crate) gossipsub: Toggle<Gossipsub>,
}

impl NodeBehaviour {
Expand Down Expand Up @@ -135,10 +135,17 @@ impl NodeBehaviour {
Identify::new(config)
};

let gossipsub_config = GossipsubConfig::default();
let message_authenticity = MessageAuthenticity::Signed(local_key.clone());
let gossipsub = Gossipsub::new(message_authenticity, gossipsub_config)
.map_err(|e| anyhow::anyhow!("{}", e))?;
let gossipsub = if config.gossipsub {
let gossipsub_config = GossipsubConfig::default();
let message_authenticity = MessageAuthenticity::Signed(local_key.clone());
Some(
Gossipsub::new(message_authenticity, gossipsub_config)
.map_err(|e| anyhow::anyhow!("{}", e))?,
)
} else {
None
}
.into();

Ok(NodeBehaviour {
ping: Ping::default(),
Expand Down
5 changes: 5 additions & 0 deletions iroh-p2p/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ pub struct Libp2pConfig {
pub relay_server: bool,
/// Relay client enabled.
pub relay_client: bool,
/// Gossipsub enabled.
pub gossipsub: bool,
pub max_conns_out: u32,
pub max_conns_in: u32,
pub max_conns_pending_out: u32,
Expand Down Expand Up @@ -114,6 +116,7 @@ impl Source for Libp2pConfig {
insert_into_config_map(&mut map, "mdns", self.mdns);
insert_into_config_map(&mut map, "relay_server", self.relay_server);
insert_into_config_map(&mut map, "relay_client", self.relay_client);
insert_into_config_map(&mut map, "gossipsub", self.gossipsub);
let peers: Vec<String> = self.bootstrap_peers.iter().map(|b| b.to_string()).collect();
insert_into_config_map(&mut map, "bootstrap_peers", peers);
insert_into_config_map(
Expand Down Expand Up @@ -155,6 +158,7 @@ impl Default for Libp2pConfig {
autonat: true,
relay_server: true,
relay_client: true,
gossipsub: true,
max_conns_pending_out: 256,
max_conns_pending_in: 256,
max_conns_in: 256,
Expand Down Expand Up @@ -293,6 +297,7 @@ mod tests {
"relay_client".to_string(),
Value::new(None, default.relay_client),
);
expect.insert("gossipsub".to_string(), Value::new(None, default.gossipsub));
expect.insert(
"bootstrap_peers".to_string(),
Value::new(None, bootstrap_peers),
Expand Down
165 changes: 68 additions & 97 deletions iroh-p2p/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -625,104 +625,75 @@ impl<KeyStorage: Storage> Node<KeyStorage> {
.send(())
.map_err(|_| anyhow!("sender dropped"))?;
}
RpcMessage::Gossipsub(g) => match g {
rpc::GossipsubMessage::AddExplicitPeer(response_channel, peer_id) => {
self.swarm
.behaviour_mut()
.gossipsub
.add_explicit_peer(&peer_id);
response_channel
.send(())
.map_err(|_| anyhow!("sender dropped"))?;
}
rpc::GossipsubMessage::AllMeshPeers(response_channel) => {
let peers = self
.swarm
.behaviour_mut()
.gossipsub
.all_mesh_peers()
.copied()
.collect();
response_channel
.send(peers)
.map_err(|_| anyhow!("sender dropped"))?;
}
rpc::GossipsubMessage::AllPeers(response_channel) => {
let all_peers = self
.swarm
.behaviour_mut()
.gossipsub
.all_peers()
.map(|(p, t)| (*p, t.into_iter().cloned().collect()))
.collect();
response_channel
.send(all_peers)
.map_err(|_| anyhow!("sender dropped"))?;
}
rpc::GossipsubMessage::MeshPeers(response_channel, topic_hash) => {
let peers = self
.swarm
.behaviour_mut()
.gossipsub
.mesh_peers(&topic_hash)
.copied()
.collect();
response_channel
.send(peers)
.map_err(|_| anyhow!("sender dropped"))?;
}
rpc::GossipsubMessage::Publish(response_channel, topic_hash, bytes) => {
let res = self
.swarm
.behaviour_mut()
.gossipsub
.publish(IdentTopic::new(topic_hash.into_string()), bytes.to_vec());
response_channel
.send(res)
.map_err(|_| anyhow!("sender dropped"))?;
}
rpc::GossipsubMessage::RemoveExplicitPeer(response_channel, peer_id) => {
self.swarm
.behaviour_mut()
.gossipsub
.remove_explicit_peer(&peer_id);
response_channel
.send(())
.map_err(|_| anyhow!("sender dropped"))?;
}
rpc::GossipsubMessage::Subscribe(response_channel, topic_hash) => {
let res = self
.swarm
.behaviour_mut()
.gossipsub
.subscribe(&IdentTopic::new(topic_hash.into_string()));
response_channel
.send(res)
.map_err(|_| anyhow!("sender dropped"))?;
}
rpc::GossipsubMessage::Topics(response_channel) => {
let topics = self
.swarm
.behaviour_mut()
.gossipsub
.topics()
.cloned()
.collect();
response_channel
.send(topics)
.map_err(|_| anyhow!("sender dropped"))?;
}
rpc::GossipsubMessage::Unsubscribe(response_channel, topic_hash) => {
let res = self
.swarm
.behaviour_mut()
.gossipsub
.unsubscribe(&IdentTopic::new(topic_hash.into_string()));
response_channel
.send(res)
.map_err(|_| anyhow!("sender dropped"))?;
RpcMessage::Gossipsub(g) => {
let gossipsub = match self.swarm.behaviour_mut().gossipsub.as_mut() {
Some(gossipsub) => gossipsub,
None => {
tracing::warn!("Unexpected gossipsub message");
return Ok(false);
}
};
match g {
rpc::GossipsubMessage::AddExplicitPeer(response_channel, peer_id) => {
gossipsub.add_explicit_peer(&peer_id);
response_channel
.send(())
.map_err(|_| anyhow!("sender dropped"))?;
}
rpc::GossipsubMessage::AllMeshPeers(response_channel) => {
let peers = gossipsub.all_mesh_peers().copied().collect();
response_channel
.send(peers)
.map_err(|_| anyhow!("sender dropped"))?;
}
rpc::GossipsubMessage::AllPeers(response_channel) => {
let all_peers = gossipsub
.all_peers()
.map(|(p, t)| (*p, t.into_iter().cloned().collect()))
.collect();
response_channel
.send(all_peers)
.map_err(|_| anyhow!("sender dropped"))?;
}
rpc::GossipsubMessage::MeshPeers(response_channel, topic_hash) => {
let peers = gossipsub.mesh_peers(&topic_hash).copied().collect();
response_channel
.send(peers)
.map_err(|_| anyhow!("sender dropped"))?;
}
rpc::GossipsubMessage::Publish(response_channel, topic_hash, bytes) => {
let res = gossipsub
.publish(IdentTopic::new(topic_hash.into_string()), bytes.to_vec());
response_channel
.send(res)
.map_err(|_| anyhow!("sender dropped"))?;
}
rpc::GossipsubMessage::RemoveExplicitPeer(response_channel, peer_id) => {
gossipsub.remove_explicit_peer(&peer_id);
response_channel
.send(())
.map_err(|_| anyhow!("sender dropped"))?;
}
rpc::GossipsubMessage::Subscribe(response_channel, topic_hash) => {
let res = gossipsub.subscribe(&IdentTopic::new(topic_hash.into_string()));
response_channel
.send(res)
.map_err(|_| anyhow!("sender dropped"))?;
}
rpc::GossipsubMessage::Topics(response_channel) => {
let topics = gossipsub.topics().cloned().collect();
response_channel
.send(topics)
.map_err(|_| anyhow!("sender dropped"))?;
}
rpc::GossipsubMessage::Unsubscribe(response_channel, topic_hash) => {
let res = gossipsub.unsubscribe(&IdentTopic::new(topic_hash.into_string()));
response_channel
.send(res)
.map_err(|_| anyhow!("sender dropped"))?;
}
}
},
}
RpcMessage::Shutdown => {
return Ok(true);
}
Expand Down

0 comments on commit 071fd1b

Please sign in to comment.