Skip to content

Commit

Permalink
swarm/src/behaviour: Deprecate NetworkBehaviourEventProcess (#2784)
Browse files Browse the repository at this point in the history
In preparation for #2751.
  • Loading branch information
mxinden authored Aug 16, 2022
1 parent 0e5a25d commit 878c49f
Show file tree
Hide file tree
Showing 9 changed files with 267 additions and 164 deletions.
80 changes: 45 additions & 35 deletions examples/chat-tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use libp2p::{
mdns::{Mdns, MdnsEvent},
mplex,
noise,
swarm::{dial_opts::DialOpts, NetworkBehaviourEventProcess, SwarmBuilder, SwarmEvent},
swarm::{SwarmBuilder, SwarmEvent},
// `TokioTcpTransport` is available through the `tcp-tokio` feature.
tcp::TokioTcpTransport,
Multiaddr,
Expand Down Expand Up @@ -82,47 +82,29 @@ async fn main() -> Result<(), Box<dyn Error>> {
// Create a Floodsub topic
let floodsub_topic = floodsub::Topic::new("chat");

// We create a custom network behaviour that combines floodsub and mDNS.
// The derive generates a delegating `NetworkBehaviour` impl which in turn
// requires the implementations of `NetworkBehaviourEventProcess` for
// the events of each behaviour.
// We create a custom behaviour that combines floodsub and mDNS.
// The derive generates a delegating `NetworkBehaviour` impl.
#[derive(NetworkBehaviour)]
#[behaviour(event_process = true)]
#[behaviour(out_event = "MyBehaviourEvent")]
struct MyBehaviour {
floodsub: Floodsub,
mdns: Mdns,
}

impl NetworkBehaviourEventProcess<FloodsubEvent> for MyBehaviour {
// Called when `floodsub` produces an event.
fn inject_event(&mut self, message: FloodsubEvent) {
if let FloodsubEvent::Message(message) = message {
println!(
"Received: '{:?}' from {:?}",
String::from_utf8_lossy(&message.data),
message.source
);
}
enum MyBehaviourEvent {
Floodsub(FloodsubEvent),
Mdns(MdnsEvent),
}

impl From<FloodsubEvent> for MyBehaviourEvent {
fn from(event: FloodsubEvent) -> Self {
MyBehaviourEvent::Floodsub(event)
}
}

impl NetworkBehaviourEventProcess<MdnsEvent> for MyBehaviour {
// Called when `mdns` produces an event.
fn inject_event(&mut self, event: MdnsEvent) {
match event {
MdnsEvent::Discovered(list) => {
for (peer, _) in list {
self.floodsub.add_node_to_partial_view(peer);
}
}
MdnsEvent::Expired(list) => {
for (peer, _) in list {
if !self.mdns.has_node(&peer) {
self.floodsub.remove_node_from_partial_view(&peer);
}
}
}
}
impl From<MdnsEvent> for MyBehaviourEvent {
fn from(event: MdnsEvent) -> Self {
MyBehaviourEvent::Mdns(event)
}
}

Expand Down Expand Up @@ -166,8 +148,36 @@ async fn main() -> Result<(), Box<dyn Error>> {
swarm.behaviour_mut().floodsub.publish(floodsub_topic.clone(), line.as_bytes());
}
event = swarm.select_next_some() => {
if let SwarmEvent::NewListenAddr { address, .. } = event {
println!("Listening on {:?}", address);
match event {
SwarmEvent::NewListenAddr { address, .. } => {
println!("Listening on {:?}", address);
}
SwarmEvent::Behaviour(MyBehaviourEvent::Floodsub(event)) => {
if let FloodsubEvent::Message(message) = event {
println!(
"Received: '{:?}' from {:?}",
String::from_utf8_lossy(&message.data),
message.source
);
}
}
SwarmEvent::Behaviour(MyBehaviourEvent::Mdns(event)) => {
match event {
MdnsEvent::Discovered(list) => {
for (peer, _) in list {
swarm.behaviour_mut().floodsub.add_node_to_partial_view(peer);
}
}
MdnsEvent::Expired(list) => {
for (peer, _) in list {
if !swarm.behaviour().mdns.has_node(&peer) {
swarm.behaviour_mut().floodsub.remove_node_from_partial_view(&peer);
}
}
}
}
}
_ => {}
}
}
}
Expand Down
4 changes: 1 addition & 3 deletions examples/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
let floodsub_topic = floodsub::Topic::new("chat");

// We create a custom network behaviour that combines floodsub and mDNS.
// In the future, we want to improve libp2p to make this easier to do.
// Use the derive to generate delegating NetworkBehaviour impl and require the
// NetworkBehaviourEventProcess implementations below.
// Use the derive to generate delegating NetworkBehaviour impl.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "OutEvent")]
struct MyBehaviour {
Expand Down
94 changes: 49 additions & 45 deletions examples/distributed-key-value-store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use libp2p::kad::{
use libp2p::{
development_transport, identity,
mdns::{Mdns, MdnsConfig, MdnsEvent},
swarm::{NetworkBehaviourEventProcess, SwarmEvent},
swarm::SwarmEvent,
NetworkBehaviour, PeerId, Swarm,
};
use std::error::Error;
Expand All @@ -68,28 +68,60 @@ async fn main() -> Result<(), Box<dyn Error>> {

// We create a custom network behaviour that combines Kademlia and mDNS.
#[derive(NetworkBehaviour)]
#[behaviour(event_process = true)]
#[behaviour(out_event = "MyBehaviourEvent")]
struct MyBehaviour {
kademlia: Kademlia<MemoryStore>,
mdns: Mdns,
}

impl NetworkBehaviourEventProcess<MdnsEvent> for MyBehaviour {
// Called when `mdns` produces an event.
fn inject_event(&mut self, event: MdnsEvent) {
if let MdnsEvent::Discovered(list) = event {
for (peer_id, multiaddr) in list {
self.kademlia.add_address(&peer_id, multiaddr);
}
}
enum MyBehaviourEvent {
Kademlia(KademliaEvent),
Mdns(MdnsEvent),
}

impl From<KademliaEvent> for MyBehaviourEvent {
fn from(event: KademliaEvent) -> Self {
MyBehaviourEvent::Kademlia(event)
}
}

impl From<MdnsEvent> for MyBehaviourEvent {
fn from(event: MdnsEvent) -> Self {
MyBehaviourEvent::Mdns(event)
}
}

impl NetworkBehaviourEventProcess<KademliaEvent> for MyBehaviour {
// Called when `kademlia` produces an event.
fn inject_event(&mut self, message: KademliaEvent) {
match message {
KademliaEvent::OutboundQueryCompleted { result, .. } => match result {
// Create a swarm to manage peers and events.
let mut swarm = {
// Create a Kademlia behaviour.
let store = MemoryStore::new(local_peer_id);
let kademlia = Kademlia::new(local_peer_id, store);
let mdns = task::block_on(Mdns::new(MdnsConfig::default()))?;
let behaviour = MyBehaviour { kademlia, mdns };
Swarm::new(transport, behaviour, local_peer_id)
};

// Read full lines from stdin
let mut stdin = io::BufReader::new(io::stdin()).lines().fuse();

// Listen on all interfaces and whatever port the OS assigns.
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;

// Kick it off.
loop {
select! {
line = stdin.select_next_some() => handle_input_line(&mut swarm.behaviour_mut().kademlia, line.expect("Stdin not to close")),
event = swarm.select_next_some() => match event {
SwarmEvent::NewListenAddr { address, .. } => {
println!("Listening in {:?}", address);
},
SwarmEvent::Behaviour(MyBehaviourEvent::Mdns(MdnsEvent::Discovered(list))) => {
for (peer_id, multiaddr) in list {
swarm.behaviour_mut().kademlia.add_address(&peer_id, multiaddr);
}
}
SwarmEvent::Behaviour(MyBehaviourEvent::Kademlia(KademliaEvent::OutboundQueryCompleted { result, ..})) => {
match result {
QueryResult::GetProviders(Ok(ok)) => {
for peer in ok.providers {
println!(
Expand Down Expand Up @@ -137,38 +169,10 @@ async fn main() -> Result<(), Box<dyn Error>> {
eprintln!("Failed to put provider record: {:?}", err);
}
_ => {}
},
_ => {}
}
}
_ => {}
}
}

// Create a swarm to manage peers and events.
let mut swarm = {
// Create a Kademlia behaviour.
let store = MemoryStore::new(local_peer_id);
let kademlia = Kademlia::new(local_peer_id, store);
let mdns = task::block_on(Mdns::new(MdnsConfig::default()))?;
let behaviour = MyBehaviour { kademlia, mdns };
Swarm::new(transport, behaviour, local_peer_id)
};

// Read full lines from stdin
let mut stdin = io::BufReader::new(io::stdin()).lines().fuse();

// Listen on all interfaces and whatever port the OS assigns.
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;

// Kick it off.
loop {
select! {
line = stdin.select_next_some() => handle_input_line(&mut swarm.behaviour_mut().kademlia, line.expect("Stdin not to close")),
event = swarm.select_next_some() => match event {
SwarmEvent::NewListenAddr { address, .. } => {
println!("Listening in {:?}", address);
},
_ => {}
}
}
}
}
Expand Down
Loading

0 comments on commit 878c49f

Please sign in to comment.