Skip to content

Commit

Permalink
feat(kad): report get_providers call event based
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire committed Sep 23, 2022
1 parent 757f79c commit 7dfafd9
Show file tree
Hide file tree
Showing 11 changed files with 421 additions and 275 deletions.
243 changes: 122 additions & 121 deletions examples/distributed-key-value-store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use libp2p::{
swarm::SwarmEvent,
NetworkBehaviour, PeerId, Swarm,
};
use libp2p_kad::GetProvidersOk;
use std::error::Error;

#[async_std::main]
Expand Down Expand Up @@ -110,147 +111,147 @@ async fn main() -> Result<(), Box<dyn Error>> {
// Kick it off.
loop {
select! {
line = stdin.select_next_some() => handle_input_line(&mut swarm.behaviour_mut().kademlia, line.expect("Stdin not to close")),
event = swarm.select_next_some() => match event {
SwarmEvent::NewListenAddr { address, .. } => {
println!("Listening in {:?}", address);
},
SwarmEvent::Behaviour(MyBehaviourEvent::Mdns(MdnsEvent::Discovered(list))) => {
for (peer_id, multiaddr) in list {
swarm.behaviour_mut().kademlia.add_address(&peer_id, multiaddr);
line = stdin.select_next_some() => handle_input_line(&mut swarm.behaviour_mut().kademlia, line.expect("Stdin not to close")),
event = swarm.select_next_some() => match event {
SwarmEvent::NewListenAddr { address, .. } => {
println!("Listening in {:?}", address);
},
SwarmEvent::Behaviour(MyBehaviourEvent::Mdns(MdnsEvent::Discovered(list))) => {
for (peer_id, multiaddr) in list {
swarm.behaviour_mut().kademlia.add_address(&peer_id, multiaddr);
}
}
}
SwarmEvent::Behaviour(MyBehaviourEvent::Kademlia(KademliaEvent::OutboundQueryCompleted { result, ..})) => {
match result {
QueryResult::GetProviders(Ok(ok)) => {
for peer in ok.providers {
SwarmEvent::Behaviour(MyBehaviourEvent::Kademlia(KademliaEvent::OutboundQueryProgressed { result, ..})) => {
match result {
QueryResult::GetProviders(Ok(GetProvidersOk { key, providers, .. })) => {
for peer in providers {
println!(
"Peer {:?} provides key {:?}",
peer,
std::str::from_utf8(key.as_ref()).unwrap()
);
}
}
QueryResult::GetProviders(Err(err)) => {
eprintln!("Failed to get providers: {:?}", err);
}
QueryResult::GetRecord(Ok(ok)) => {
for PeerRecord {
record: Record { key, value, .. },
..
} in ok.records
{
println!(
"Got record {:?} {:?}",
std::str::from_utf8(key.as_ref()).unwrap(),
std::str::from_utf8(&value).unwrap(),
);
}
}
QueryResult::GetRecord(Err(err)) => {
eprintln!("Failed to get record: {:?}", err);
}
QueryResult::PutRecord(Ok(PutRecordOk { key })) => {
println!(
"Peer {:?} provides key {:?}",
peer,
std::str::from_utf8(ok.key.as_ref()).unwrap()
"Successfully put record {:?}",
std::str::from_utf8(key.as_ref()).unwrap()
);
}
}
QueryResult::GetProviders(Err(err)) => {
eprintln!("Failed to get providers: {:?}", err);
}
QueryResult::GetRecord(Ok(ok)) => {
for PeerRecord {
record: Record { key, value, .. },
..
} in ok.records
{
QueryResult::PutRecord(Err(err)) => {
eprintln!("Failed to put record: {:?}", err);
}
QueryResult::StartProviding(Ok(AddProviderOk { key })) => {
println!(
"Got record {:?} {:?}",
std::str::from_utf8(key.as_ref()).unwrap(),
std::str::from_utf8(&value).unwrap(),
"Successfully put provider record {:?}",
std::str::from_utf8(key.as_ref()).unwrap()
);
}
QueryResult::StartProviding(Err(err)) => {
eprintln!("Failed to put provider record: {:?}", err);
}
_ => {}
}
QueryResult::GetRecord(Err(err)) => {
eprintln!("Failed to get record: {:?}", err);
}
QueryResult::PutRecord(Ok(PutRecordOk { key })) => {
println!(
"Successfully put record {:?}",
std::str::from_utf8(key.as_ref()).unwrap()
);
}
QueryResult::PutRecord(Err(err)) => {
eprintln!("Failed to put record: {:?}", err);
}
QueryResult::StartProviding(Ok(AddProviderOk { key })) => {
println!(
"Successfully put provider record {:?}",
std::str::from_utf8(key.as_ref()).unwrap()
);
}
QueryResult::StartProviding(Err(err)) => {
eprintln!("Failed to put provider record: {:?}", err);
}
_ => {}
}
_ => {}
}
_ => {}
}
}
}
}

fn handle_input_line(kademlia: &mut Kademlia<MemoryStore>, line: String) {
let mut args = line.split(' ');
fn handle_input_line(kademlia: &mut Kademlia<MemoryStore>, line: String) {
let mut args = line.split(' ');

match args.next() {
Some("GET") => {
let key = {
match args.next() {
Some(key) => Key::new(&key),
None => {
eprintln!("Expected key");
return;
match args.next() {
Some("GET") => {
let key = {
match args.next() {
Some(key) => Key::new(&key),
None => {
eprintln!("Expected key");
return;
}
}
}
};
kademlia.get_record(key, Quorum::One);
}
Some("GET_PROVIDERS") => {
let key = {
match args.next() {
Some(key) => Key::new(&key),
None => {
eprintln!("Expected key");
return;
};
kademlia.get_record(key, Quorum::One);
}
Some("GET_PROVIDERS") => {
let key = {
match args.next() {
Some(key) => Key::new(&key),
None => {
eprintln!("Expected key");
return;
}
}
}
};
kademlia.get_providers(key);
}
Some("PUT") => {
let key = {
match args.next() {
Some(key) => Key::new(&key),
None => {
eprintln!("Expected key");
return;
};
kademlia.get_providers(key);
}
Some("PUT") => {
let key = {
match args.next() {
Some(key) => Key::new(&key),
None => {
eprintln!("Expected key");
return;
}
}
}
};
let value = {
match args.next() {
Some(value) => value.as_bytes().to_vec(),
None => {
eprintln!("Expected value");
return;
};
let value = {
match args.next() {
Some(value) => value.as_bytes().to_vec(),
None => {
eprintln!("Expected value");
return;
}
}
}
};
let record = Record {
key,
value,
publisher: None,
expires: None,
};
kademlia
.put_record(record, Quorum::One)
.expect("Failed to store record locally.");
}
Some("PUT_PROVIDER") => {
let key = {
match args.next() {
Some(key) => Key::new(&key),
None => {
eprintln!("Expected key");
return;
};
let record = Record {
key,
value,
publisher: None,
expires: None,
};
kademlia
.put_record(record, Quorum::One)
.expect("Failed to store record locally.");
}
Some("PUT_PROVIDER") => {
let key = {
match args.next() {
Some(key) => Key::new(&key),
None => {
eprintln!("Expected key");
return;
}
}
}
};
};

kademlia
.start_providing(key)
.expect("Failed to start providing key");
}
_ => {
eprintln!("expected GET, GET_PROVIDERS, PUT or PUT_PROVIDER");
kademlia
.start_providing(key)
.expect("Failed to start providing key");
}
_ => {
eprintln!("expected GET, GET_PROVIDERS, PUT or PUT_PROVIDER");
}
}
}
}
31 changes: 24 additions & 7 deletions examples/file-sharing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,12 +326,16 @@ mod network {

/// Find the providers for the given file on the DHT.
pub async fn get_providers(&mut self, file_name: String) -> HashSet<PeerId> {
let (sender, receiver) = oneshot::channel();
let (sender, mut receiver) = mpsc::channel(0);
self.sender
.send(Command::GetProviders { file_name, sender })
.await
.expect("Command receiver not to be dropped.");
receiver.await.expect("Sender not to be dropped.")
let mut out = HashSet::new();
while let Some(h) = receiver.next().await {
out.extend(h);
}
out
}

/// Request the content of the given file from the given peer.
Expand Down Expand Up @@ -371,7 +375,7 @@ mod network {
event_sender: mpsc::Sender<Event>,
pending_dial: HashMap<PeerId, oneshot::Sender<Result<(), Box<dyn Error + Send>>>>,
pending_start_providing: HashMap<QueryId, oneshot::Sender<()>>,
pending_get_providers: HashMap<QueryId, oneshot::Sender<HashSet<PeerId>>>,
pending_get_providers: HashMap<QueryId, mpsc::Sender<HashSet<PeerId>>>,
pending_request_file:
HashMap<RequestId, oneshot::Sender<Result<Vec<u8>, Box<dyn Error + Send>>>>,
}
Expand Down Expand Up @@ -415,7 +419,7 @@ mod network {
) {
match event {
SwarmEvent::Behaviour(ComposedEvent::Kademlia(
KademliaEvent::OutboundQueryCompleted {
KademliaEvent::OutboundQueryProgressed {
id,
result: QueryResult::StartProviding(_),
..
Expand All @@ -428,18 +432,31 @@ mod network {
let _ = sender.send(());
}
SwarmEvent::Behaviour(ComposedEvent::Kademlia(
KademliaEvent::OutboundQueryCompleted {
KademliaEvent::OutboundQueryProgressed {
id,
result: QueryResult::GetProviders(Ok(GetProvidersOk { providers, .. })),
..
},
)) => {
let _ = self
.pending_get_providers
.remove(&id)
.get_mut(&id)
.expect("Completed query to be previously pending.")
.send(providers);
}
SwarmEvent::Behaviour(ComposedEvent::Kademlia(
KademliaEvent::OutboundQueryProgressed {
id,
result: QueryResult::GetProviders(..),
..
},
)) => {
// Drop channel to signal query is complete.
let _ = self
.pending_get_providers
.remove(&id)
.expect("Completed query to be previously pending.");
}
SwarmEvent::Behaviour(ComposedEvent::Kademlia(_)) => {}
SwarmEvent::Behaviour(ComposedEvent::RequestResponse(
RequestResponseEvent::Message { message, .. },
Expand Down Expand Up @@ -626,7 +643,7 @@ mod network {
},
GetProviders {
file_name: String,
sender: oneshot::Sender<HashSet<PeerId>>,
sender: mpsc::Sender<HashSet<PeerId>>,
},
RequestFile {
file_name: String,
Expand Down
2 changes: 1 addition & 1 deletion examples/ipfs-kad.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
task::block_on(async move {
loop {
let event = swarm.select_next_some().await;
if let SwarmEvent::Behaviour(KademliaEvent::OutboundQueryCompleted {
if let SwarmEvent::Behaviour(KademliaEvent::OutboundQueryProgressed {
result: QueryResult::GetClosestPeers(result),
..
}) = event
Expand Down
10 changes: 6 additions & 4 deletions misc/metrics/src/kad.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use libp2p_kad::GetProvidersOk;
use prometheus_client::encoding::text::Encode;
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::family::Family;
Expand Down Expand Up @@ -162,7 +163,7 @@ impl Metrics {
impl super::Recorder<libp2p_kad::KademliaEvent> for Metrics {
fn record(&self, event: &libp2p_kad::KademliaEvent) {
match event {
libp2p_kad::KademliaEvent::OutboundQueryCompleted { result, stats, .. } => {
libp2p_kad::KademliaEvent::OutboundQueryProgressed { result, stats, .. } => {
self.query_result_num_requests
.get_or_create(&result.into())
.observe(stats.num_requests().into());
Expand Down Expand Up @@ -200,9 +201,10 @@ impl super::Recorder<libp2p_kad::KademliaEvent> for Metrics {
}
},
libp2p_kad::QueryResult::GetProviders(result) => match result {
Ok(ok) => self
.query_result_get_providers_ok
.observe(ok.providers.len() as f64),
Ok(GetProvidersOk { providers, .. }) => {
self.query_result_get_providers_ok
.observe(providers.len() as f64);
}
Err(error) => {
self.query_result_get_providers_error
.get_or_create(&error.into())
Expand Down
Loading

0 comments on commit 7dfafd9

Please sign in to comment.