From 7dfafd90359c90c8bd07e317a3df72e1f3774811 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Thu, 16 Jun 2022 20:12:48 +0200 Subject: [PATCH] feat(kad): report get_providers call event based --- examples/distributed-key-value-store.rs | 243 ++++++++--------- examples/file-sharing.rs | 31 ++- examples/ipfs-kad.rs | 2 +- misc/metrics/src/kad.rs | 10 +- protocols/kad/src/behaviour.rs | 252 +++++++++++------- protocols/kad/src/behaviour/test.rs | 112 ++++---- protocols/kad/src/lib.rs | 2 +- protocols/kad/src/query.rs | 12 + protocols/kad/src/query/peers/closest.rs | 13 + .../kad/src/query/peers/closest/disjoint.rs | 9 + protocols/kad/src/query/peers/fixed.rs | 10 + 11 files changed, 421 insertions(+), 275 deletions(-) diff --git a/examples/distributed-key-value-store.rs b/examples/distributed-key-value-store.rs index 7fef717cf780..1c5d162e1d90 100644 --- a/examples/distributed-key-value-store.rs +++ b/examples/distributed-key-value-store.rs @@ -53,6 +53,7 @@ use libp2p::{ swarm::SwarmEvent, NetworkBehaviour, PeerId, Swarm, }; +use libp2p_kad::GetProvidersOk; use std::error::Error; #[async_std::main] @@ -110,147 +111,147 @@ async fn main() -> Result<(), Box> { // Kick it off. loop { select! { - line = stdin.select_next_some() => handle_input_line(&mut swarm.behaviour_mut().kademlia, line.expect("Stdin not to close")), - event = swarm.select_next_some() => match event { - SwarmEvent::NewListenAddr { address, .. } => { - println!("Listening in {:?}", address); - }, - SwarmEvent::Behaviour(MyBehaviourEvent::Mdns(MdnsEvent::Discovered(list))) => { - for (peer_id, multiaddr) in list { - swarm.behaviour_mut().kademlia.add_address(&peer_id, multiaddr); + line = stdin.select_next_some() => handle_input_line(&mut swarm.behaviour_mut().kademlia, line.expect("Stdin not to close")), + event = swarm.select_next_some() => match event { + SwarmEvent::NewListenAddr { address, .. } => { + println!("Listening in {:?}", address); + }, + SwarmEvent::Behaviour(MyBehaviourEvent::Mdns(MdnsEvent::Discovered(list))) => { + for (peer_id, multiaddr) in list { + swarm.behaviour_mut().kademlia.add_address(&peer_id, multiaddr); + } } - } - SwarmEvent::Behaviour(MyBehaviourEvent::Kademlia(KademliaEvent::OutboundQueryCompleted { result, ..})) => { - match result { - QueryResult::GetProviders(Ok(ok)) => { - for peer in ok.providers { + SwarmEvent::Behaviour(MyBehaviourEvent::Kademlia(KademliaEvent::OutboundQueryProgressed { result, ..})) => { + match result { + QueryResult::GetProviders(Ok(GetProvidersOk { key, providers, .. })) => { + for peer in providers { + println!( + "Peer {:?} provides key {:?}", + peer, + std::str::from_utf8(key.as_ref()).unwrap() + ); + } + } + QueryResult::GetProviders(Err(err)) => { + eprintln!("Failed to get providers: {:?}", err); + } + QueryResult::GetRecord(Ok(ok)) => { + for PeerRecord { + record: Record { key, value, .. }, + .. + } in ok.records + { + println!( + "Got record {:?} {:?}", + std::str::from_utf8(key.as_ref()).unwrap(), + std::str::from_utf8(&value).unwrap(), + ); + } + } + QueryResult::GetRecord(Err(err)) => { + eprintln!("Failed to get record: {:?}", err); + } + QueryResult::PutRecord(Ok(PutRecordOk { key })) => { println!( - "Peer {:?} provides key {:?}", - peer, - std::str::from_utf8(ok.key.as_ref()).unwrap() + "Successfully put record {:?}", + std::str::from_utf8(key.as_ref()).unwrap() ); } - } - QueryResult::GetProviders(Err(err)) => { - eprintln!("Failed to get providers: {:?}", err); - } - QueryResult::GetRecord(Ok(ok)) => { - for PeerRecord { - record: Record { key, value, .. }, - .. - } in ok.records - { + QueryResult::PutRecord(Err(err)) => { + eprintln!("Failed to put record: {:?}", err); + } + QueryResult::StartProviding(Ok(AddProviderOk { key })) => { println!( - "Got record {:?} {:?}", - std::str::from_utf8(key.as_ref()).unwrap(), - std::str::from_utf8(&value).unwrap(), + "Successfully put provider record {:?}", + std::str::from_utf8(key.as_ref()).unwrap() ); } + QueryResult::StartProviding(Err(err)) => { + eprintln!("Failed to put provider record: {:?}", err); + } + _ => {} } - QueryResult::GetRecord(Err(err)) => { - eprintln!("Failed to get record: {:?}", err); - } - QueryResult::PutRecord(Ok(PutRecordOk { key })) => { - println!( - "Successfully put record {:?}", - std::str::from_utf8(key.as_ref()).unwrap() - ); - } - QueryResult::PutRecord(Err(err)) => { - eprintln!("Failed to put record: {:?}", err); - } - QueryResult::StartProviding(Ok(AddProviderOk { key })) => { - println!( - "Successfully put provider record {:?}", - std::str::from_utf8(key.as_ref()).unwrap() - ); - } - QueryResult::StartProviding(Err(err)) => { - eprintln!("Failed to put provider record: {:?}", err); - } - _ => {} } + _ => {} } - _ => {} - } } } -} -fn handle_input_line(kademlia: &mut Kademlia, line: String) { - let mut args = line.split(' '); + fn handle_input_line(kademlia: &mut Kademlia, line: String) { + let mut args = line.split(' '); - match args.next() { - Some("GET") => { - let key = { - match args.next() { - Some(key) => Key::new(&key), - None => { - eprintln!("Expected key"); - return; + match args.next() { + Some("GET") => { + let key = { + match args.next() { + Some(key) => Key::new(&key), + None => { + eprintln!("Expected key"); + return; + } } - } - }; - kademlia.get_record(key, Quorum::One); - } - Some("GET_PROVIDERS") => { - let key = { - match args.next() { - Some(key) => Key::new(&key), - None => { - eprintln!("Expected key"); - return; + }; + kademlia.get_record(key, Quorum::One); + } + Some("GET_PROVIDERS") => { + let key = { + match args.next() { + Some(key) => Key::new(&key), + None => { + eprintln!("Expected key"); + return; + } } - } - }; - kademlia.get_providers(key); - } - Some("PUT") => { - let key = { - match args.next() { - Some(key) => Key::new(&key), - None => { - eprintln!("Expected key"); - return; + }; + kademlia.get_providers(key); + } + Some("PUT") => { + let key = { + match args.next() { + Some(key) => Key::new(&key), + None => { + eprintln!("Expected key"); + return; + } } - } - }; - let value = { - match args.next() { - Some(value) => value.as_bytes().to_vec(), - None => { - eprintln!("Expected value"); - return; + }; + let value = { + match args.next() { + Some(value) => value.as_bytes().to_vec(), + None => { + eprintln!("Expected value"); + return; + } } - } - }; - let record = Record { - key, - value, - publisher: None, - expires: None, - }; - kademlia - .put_record(record, Quorum::One) - .expect("Failed to store record locally."); - } - Some("PUT_PROVIDER") => { - let key = { - match args.next() { - Some(key) => Key::new(&key), - None => { - eprintln!("Expected key"); - return; + }; + let record = Record { + key, + value, + publisher: None, + expires: None, + }; + kademlia + .put_record(record, Quorum::One) + .expect("Failed to store record locally."); + } + Some("PUT_PROVIDER") => { + let key = { + match args.next() { + Some(key) => Key::new(&key), + None => { + eprintln!("Expected key"); + return; + } } - } - }; + }; - kademlia - .start_providing(key) - .expect("Failed to start providing key"); - } - _ => { - eprintln!("expected GET, GET_PROVIDERS, PUT or PUT_PROVIDER"); + kademlia + .start_providing(key) + .expect("Failed to start providing key"); + } + _ => { + eprintln!("expected GET, GET_PROVIDERS, PUT or PUT_PROVIDER"); + } } } } diff --git a/examples/file-sharing.rs b/examples/file-sharing.rs index 21fa45d54fcd..dba097563507 100644 --- a/examples/file-sharing.rs +++ b/examples/file-sharing.rs @@ -326,12 +326,16 @@ mod network { /// Find the providers for the given file on the DHT. pub async fn get_providers(&mut self, file_name: String) -> HashSet { - let (sender, receiver) = oneshot::channel(); + let (sender, mut receiver) = mpsc::channel(0); self.sender .send(Command::GetProviders { file_name, sender }) .await .expect("Command receiver not to be dropped."); - receiver.await.expect("Sender not to be dropped.") + let mut out = HashSet::new(); + while let Some(h) = receiver.next().await { + out.extend(h); + } + out } /// Request the content of the given file from the given peer. @@ -371,7 +375,7 @@ mod network { event_sender: mpsc::Sender, pending_dial: HashMap>>>, pending_start_providing: HashMap>, - pending_get_providers: HashMap>>, + pending_get_providers: HashMap>>, pending_request_file: HashMap, Box>>>, } @@ -415,7 +419,7 @@ mod network { ) { match event { SwarmEvent::Behaviour(ComposedEvent::Kademlia( - KademliaEvent::OutboundQueryCompleted { + KademliaEvent::OutboundQueryProgressed { id, result: QueryResult::StartProviding(_), .. @@ -428,7 +432,7 @@ mod network { let _ = sender.send(()); } SwarmEvent::Behaviour(ComposedEvent::Kademlia( - KademliaEvent::OutboundQueryCompleted { + KademliaEvent::OutboundQueryProgressed { id, result: QueryResult::GetProviders(Ok(GetProvidersOk { providers, .. })), .. @@ -436,10 +440,23 @@ mod network { )) => { let _ = self .pending_get_providers - .remove(&id) + .get_mut(&id) .expect("Completed query to be previously pending.") .send(providers); } + SwarmEvent::Behaviour(ComposedEvent::Kademlia( + KademliaEvent::OutboundQueryProgressed { + id, + result: QueryResult::GetProviders(..), + .. + }, + )) => { + // Drop channel to signal query is complete. + let _ = self + .pending_get_providers + .remove(&id) + .expect("Completed query to be previously pending."); + } SwarmEvent::Behaviour(ComposedEvent::Kademlia(_)) => {} SwarmEvent::Behaviour(ComposedEvent::RequestResponse( RequestResponseEvent::Message { message, .. }, @@ -626,7 +643,7 @@ mod network { }, GetProviders { file_name: String, - sender: oneshot::Sender>, + sender: mpsc::Sender>, }, RequestFile { file_name: String, diff --git a/examples/ipfs-kad.rs b/examples/ipfs-kad.rs index a36ed97737bb..ef1ffa7fe198 100644 --- a/examples/ipfs-kad.rs +++ b/examples/ipfs-kad.rs @@ -85,7 +85,7 @@ async fn main() -> Result<(), Box> { task::block_on(async move { loop { let event = swarm.select_next_some().await; - if let SwarmEvent::Behaviour(KademliaEvent::OutboundQueryCompleted { + if let SwarmEvent::Behaviour(KademliaEvent::OutboundQueryProgressed { result: QueryResult::GetClosestPeers(result), .. }) = event diff --git a/misc/metrics/src/kad.rs b/misc/metrics/src/kad.rs index 5e5a1056060a..f76d6e1d2fe6 100644 --- a/misc/metrics/src/kad.rs +++ b/misc/metrics/src/kad.rs @@ -18,6 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use libp2p_kad::GetProvidersOk; use prometheus_client::encoding::text::Encode; use prometheus_client::metrics::counter::Counter; use prometheus_client::metrics::family::Family; @@ -162,7 +163,7 @@ impl Metrics { impl super::Recorder for Metrics { fn record(&self, event: &libp2p_kad::KademliaEvent) { match event { - libp2p_kad::KademliaEvent::OutboundQueryCompleted { result, stats, .. } => { + libp2p_kad::KademliaEvent::OutboundQueryProgressed { result, stats, .. } => { self.query_result_num_requests .get_or_create(&result.into()) .observe(stats.num_requests().into()); @@ -200,9 +201,10 @@ impl super::Recorder for Metrics { } }, libp2p_kad::QueryResult::GetProviders(result) => match result { - Ok(ok) => self - .query_result_get_providers_ok - .observe(ok.providers.len() as f64), + Ok(GetProvidersOk { providers, .. }) => { + self.query_result_get_providers_ok + .observe(providers.len() as f64); + } Err(error) => { self.query_result_get_providers_error .get_or_create(&error.into()) diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index 6e84c4812262..46280e783ffd 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -921,28 +921,48 @@ where /// /// The result of this operation is delivered in a /// reported via [`KademliaEvent::OutboundQueryCompleted{QueryResult::GetProviders}`]. - pub fn get_providers(&mut self, key: record::Key, limit: ProviderLimit) -> QueryId { - let providers = self + pub fn get_providers(&mut self, key: record::Key) -> QueryId { + let providers: HashSet<_> = self .store .providers(&key) .into_iter() .filter(|p| !p.is_expired(Instant::now())) - .map(|p| p.provider); - - let providers = match limit { - ProviderLimit::None => providers.collect(), - ProviderLimit::N(limit) => providers.take(limit.into()).collect(), - }; + .map(|p| p.provider) + .collect(); let info = QueryInfo::GetProviders { key: key.clone(), - providers, - limit, + count: 1, + providers_found: providers.len(), }; - let target = kbucket::Key::new(key); + + let target = kbucket::Key::new(key.clone()); let peers = self.kbuckets.closest_keys(&target); let inner = QueryInner::new(info); - self.queries.add_iter_closest(target.clone(), peers, inner) + let id = self.queries.add_iter_closest(target.clone(), peers, inner); + + // No queries were actually done for the results yet. + let stats = QueryStats::empty(); + + self.queued_events + .push_back(NetworkBehaviourAction::GenerateEvent( + KademliaEvent::OutboundQueryProgressed { + id, + result: QueryResult::GetProviders(Ok(GetProvidersOk { + key, + providers_so_far: providers.len(), + providers, + closest_peers: Default::default(), + })), + step: ProgressStep { + count: 1, + last: false, + }, + stats, + }, + )); + + id } /// Processes discovered peers from a successful request in an iterative `Query`. @@ -1246,38 +1266,56 @@ where .continue_iter_closest(query_id, target.clone(), peers, inner); } - Some(KademliaEvent::OutboundQueryCompleted { + Some(KademliaEvent::OutboundQueryProgressed { id: query_id, stats: result.stats, result: QueryResult::Bootstrap(Ok(BootstrapOk { peer, num_remaining, })), + step: ProgressStep { + count: 1, + last: true, + }, }) } - QueryInfo::GetClosestPeers { key, .. } => Some(KademliaEvent::OutboundQueryCompleted { - id: query_id, - stats: result.stats, - result: QueryResult::GetClosestPeers(Ok(GetClosestPeersOk { - key, - peers: result.peers.collect(), - })), - }), + QueryInfo::GetClosestPeers { key, .. } => { + Some(KademliaEvent::OutboundQueryProgressed { + id: query_id, + stats: result.stats, + result: QueryResult::GetClosestPeers(Ok(GetClosestPeersOk { + key, + peers: result.peers.collect(), + })), + step: ProgressStep { + count: 1, + last: true, + }, + }) + } QueryInfo::GetProviders { key, - providers, - limit: _, - } => Some(KademliaEvent::OutboundQueryCompleted { - id: query_id, - stats: result.stats, - result: QueryResult::GetProviders(Ok(GetProvidersOk { - key, - providers, - closest_peers: result.peers.collect(), - })), - }), + count, + providers_found, + .. + } => { + Some(KademliaEvent::OutboundQueryProgressed { + id: query_id, + stats: result.stats, + result: QueryResult::GetProviders(Ok(GetProvidersOk { + key, + providers: Default::default(), + providers_so_far: providers_found, + closest_peers: result.peers.collect(), + })), + step: ProgressStep { + count, // FIXME: count? + last: true, + }, + }) + } QueryInfo::AddProvider { context, @@ -1308,15 +1346,17 @@ where .. }, } => match context { - AddProviderContext::Publish => Some(KademliaEvent::OutboundQueryCompleted { + AddProviderContext::Publish => Some(KademliaEvent::OutboundQueryProgressed { id: query_id, stats: get_closest_peers_stats.merge(result.stats), result: QueryResult::StartProviding(Ok(AddProviderOk { key })), + step: ProgressStep::single(), }), - AddProviderContext::Republish => Some(KademliaEvent::OutboundQueryCompleted { + AddProviderContext::Republish => Some(KademliaEvent::OutboundQueryProgressed { id: query_id, stats: get_closest_peers_stats.merge(result.stats), result: QueryResult::RepublishProvider(Ok(AddProviderOk { key })), + step: ProgressStep::single(), }), }, @@ -1363,10 +1403,11 @@ where quorum, }) }; - Some(KademliaEvent::OutboundQueryCompleted { + Some(KademliaEvent::OutboundQueryProgressed { id: query_id, stats: result.stats, result: QueryResult::GetRecord(results), + step: ProgressStep::single(), }) } @@ -1413,16 +1454,18 @@ where }; match context { PutRecordContext::Publish | PutRecordContext::Custom => { - Some(KademliaEvent::OutboundQueryCompleted { + Some(KademliaEvent::OutboundQueryProgressed { id: query_id, stats: get_closest_peers_stats.merge(result.stats), result: QueryResult::PutRecord(mk_result(record.key)), + step: ProgressStep::single(), }) } - PutRecordContext::Republish => Some(KademliaEvent::OutboundQueryCompleted { + PutRecordContext::Republish => Some(KademliaEvent::OutboundQueryProgressed { id: query_id, stats: get_closest_peers_stats.merge(result.stats), result: QueryResult::RepublishRecord(mk_result(record.key)), + step: ProgressStep::single(), }), PutRecordContext::Replicate => { debug!("Record replicated: {:?}", record.key); @@ -1463,36 +1506,40 @@ where } } - Some(KademliaEvent::OutboundQueryCompleted { + Some(KademliaEvent::OutboundQueryProgressed { id: query_id, stats: result.stats, result: QueryResult::Bootstrap(Err(BootstrapError::Timeout { peer, num_remaining, })), + step: ProgressStep::single(), }) } QueryInfo::AddProvider { context, key, .. } => Some(match context { - AddProviderContext::Publish => KademliaEvent::OutboundQueryCompleted { + AddProviderContext::Publish => KademliaEvent::OutboundQueryProgressed { id: query_id, stats: result.stats, result: QueryResult::StartProviding(Err(AddProviderError::Timeout { key })), + step: ProgressStep::single(), }, - AddProviderContext::Republish => KademliaEvent::OutboundQueryCompleted { + AddProviderContext::Republish => KademliaEvent::OutboundQueryProgressed { id: query_id, stats: result.stats, result: QueryResult::RepublishProvider(Err(AddProviderError::Timeout { key })), + step: ProgressStep::single(), }, }), - QueryInfo::GetClosestPeers { key } => Some(KademliaEvent::OutboundQueryCompleted { + QueryInfo::GetClosestPeers { key } => Some(KademliaEvent::OutboundQueryProgressed { id: query_id, stats: result.stats, result: QueryResult::GetClosestPeers(Err(GetClosestPeersError::Timeout { key, peers: result.peers.collect(), })), + step: ProgressStep::single(), }), QueryInfo::PutRecord { @@ -1511,16 +1558,18 @@ where }); match context { PutRecordContext::Publish | PutRecordContext::Custom => { - Some(KademliaEvent::OutboundQueryCompleted { + Some(KademliaEvent::OutboundQueryProgressed { id: query_id, stats: result.stats, result: QueryResult::PutRecord(err), + step: ProgressStep::single(), }) } - PutRecordContext::Republish => Some(KademliaEvent::OutboundQueryCompleted { + PutRecordContext::Republish => Some(KademliaEvent::OutboundQueryProgressed { id: query_id, stats: result.stats, result: QueryResult::RepublishRecord(err), + step: ProgressStep::single(), }), PutRecordContext::Replicate => match phase { PutRecordPhase::GetClosestPeers => { @@ -1552,7 +1601,7 @@ where records, quorum, .. - } => Some(KademliaEvent::OutboundQueryCompleted { + } => Some(KademliaEvent::OutboundQueryProgressed { id: query_id, stats: result.stats, result: QueryResult::GetRecord(Err(GetRecordError::Timeout { @@ -1560,20 +1609,17 @@ where records, quorum, })), + step: ProgressStep::single(), }), - QueryInfo::GetProviders { - key, - providers, - limit: _, - } => Some(KademliaEvent::OutboundQueryCompleted { + QueryInfo::GetProviders { key, .. } => Some(KademliaEvent::OutboundQueryProgressed { id: query_id, stats: result.stats, result: QueryResult::GetProviders(Err(GetProvidersError::Timeout { key, - providers, closest_peers: result.peers.collect(), })), + step: ProgressStep::single(), }), } } @@ -2068,10 +2114,36 @@ where let peers = closer_peers.iter().chain(provider_peers.iter()); self.discovered(&user_data, &source, peers); if let Some(query) = self.queries.get_mut(&user_data) { - if let QueryInfo::GetProviders { providers, .. } = &mut query.inner.info { - for peer in provider_peers { - providers.insert(peer.node_id); - } + let stats = query.stats().clone(); + let closest_peers: Vec<_> = query.as_intermediary_result().collect(); + if let QueryInfo::GetProviders { + ref key, + ref mut providers_found, + ref mut count, + .. + } = query.inner.info + { + *providers_found += provider_peers.len(); + *count += 1; + let providers = provider_peers.iter().map(|p| p.node_id).collect(); + + self.queued_events + .push_back(NetworkBehaviourAction::GenerateEvent( + KademliaEvent::OutboundQueryProgressed { + id: user_data, + result: QueryResult::GetProviders(Ok(GetProvidersOk { + key: key.clone(), + providers, + providers_so_far: *providers_found, + closest_peers, + })), + step: ProgressStep { + count: *count, + last: false, + }, + stats, + }, + )); } } } @@ -2343,30 +2415,6 @@ where query.on_success(&peer_id, vec![]) } - if let QueryInfo::GetProviders { - key: _, - providers, - limit, - } = &query.inner.info - { - match limit { - ProviderLimit::None => { - // No limit, so wait for enough peers to respond. - } - ProviderLimit::N(n) => { - // Check if we have enough providers. - if usize::from(*n) <= providers.len() { - debug!( - "found enough providers {}/{}, finishing", - providers.len(), - n - ); - query.finish(); - } - } - } - } - if self.connected_peers.contains(&peer_id) { self.queued_events .push_back(NetworkBehaviourAction::NotifyHandler { @@ -2397,15 +2445,6 @@ where } } -/// Specifies the number of provider records fetched. -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub enum ProviderLimit { - /// No limit on the number of records. - None, - /// Finishes the query as soon as this many records have been found. - N(NonZeroUsize), -} - /// A quorum w.r.t. the configured replication factor specifies the minimum /// number of distinct nodes that must be successfully contacted in order /// for a query to succeed. @@ -2454,14 +2493,16 @@ pub enum KademliaEvent { // is made of multiple requests across multiple remote peers. InboundRequest { request: InboundRequest }, - /// An outbound query has produced a result. - OutboundQueryCompleted { + /// An outbound query has made progress. + OutboundQueryProgressed { /// The ID of the query that finished. id: QueryId, - /// The result of the query. + /// The intermediate result of the query. result: QueryResult, /// Execution statistics from the query. stats: QueryStats, + /// Indicates which event this is, if therer are multiple responses for a single query. + step: ProgressStep, }, /// The routing table has been updated with a new peer and / or @@ -2515,6 +2556,25 @@ pub enum KademliaEvent { PendingRoutablePeer { peer: PeerId, address: Multiaddr }, } +/// Information about progress events. +#[derive(Debug, Clone)] +pub struct ProgressStep { + /// The index into the event + pub count: usize, + /// Is this the final event? + pub last: bool, +} + +impl ProgressStep { + /// Generates an index for the case only a single event is emitted. + pub(crate) fn single() -> Self { + Self { + count: 1, + last: true, + } + } +} + /// Information about a received and handled inbound request. #[derive(Debug, Clone)] pub enum InboundRequest { @@ -2747,7 +2807,10 @@ pub type GetProvidersResult = Result; #[derive(Debug, Clone)] pub struct GetProvidersOk { pub key: record::Key, + /// The new set of providers discovered. pub providers: HashSet, + /// How many providers have been discovered so var. + pub providers_so_far: usize, pub closest_peers: Vec, } @@ -2757,7 +2820,6 @@ pub enum GetProvidersError { #[error("the request timed out")] Timeout { key: record::Key, - providers: HashSet, closest_peers: Vec, }, } @@ -2903,10 +2965,10 @@ pub enum QueryInfo { GetProviders { /// The key for which to search for providers. key: record::Key, - /// The found providers. - providers: HashSet, - /// The limit of how many providers to find, - limit: ProviderLimit, + /// The number of providers found so far. + providers_found: usize, + /// Current index of events. + count: usize, }, /// A (repeated) query initiated by [`Kademlia::start_providing`]. diff --git a/protocols/kad/src/behaviour/test.rs b/protocols/kad/src/behaviour/test.rs index 74b60e149319..c6996c3285dd 100644 --- a/protocols/kad/src/behaviour/test.rs +++ b/protocols/kad/src/behaviour/test.rs @@ -188,7 +188,7 @@ fn bootstrap() { loop { match swarm.poll_next_unpin(ctx) { Poll::Ready(Some(SwarmEvent::Behaviour( - KademliaEvent::OutboundQueryCompleted { + KademliaEvent::OutboundQueryProgressed { id, result: QueryResult::Bootstrap(Ok(ok)), .. @@ -277,7 +277,7 @@ fn query_iter() { loop { match swarm.poll_next_unpin(ctx) { Poll::Ready(Some(SwarmEvent::Behaviour( - KademliaEvent::OutboundQueryCompleted { + KademliaEvent::OutboundQueryProgressed { id, result: QueryResult::GetClosestPeers(Ok(ok)), .. @@ -336,7 +336,7 @@ fn unresponsive_not_returned_direct() { loop { match swarm.poll_next_unpin(ctx) { Poll::Ready(Some(SwarmEvent::Behaviour( - KademliaEvent::OutboundQueryCompleted { + KademliaEvent::OutboundQueryProgressed { result: QueryResult::GetClosestPeers(Ok(ok)), .. }, @@ -396,7 +396,7 @@ fn unresponsive_not_returned_indirect() { loop { match swarm.poll_next_unpin(ctx) { Poll::Ready(Some(SwarmEvent::Behaviour( - KademliaEvent::OutboundQueryCompleted { + KademliaEvent::OutboundQueryProgressed { result: QueryResult::GetClosestPeers(Ok(ok)), .. }, @@ -453,7 +453,7 @@ fn get_record_not_found() { loop { match swarm.poll_next_unpin(ctx) { Poll::Ready(Some(SwarmEvent::Behaviour( - KademliaEvent::OutboundQueryCompleted { + KademliaEvent::OutboundQueryProgressed { id, result: QueryResult::GetRecord(Err(e)), .. @@ -572,17 +572,19 @@ fn put_record() { loop { match swarm.poll_next_unpin(ctx) { Poll::Ready(Some(SwarmEvent::Behaviour( - KademliaEvent::OutboundQueryCompleted { + KademliaEvent::OutboundQueryProgressed { id, result: QueryResult::PutRecord(res), stats, + step: index, }, ))) | Poll::Ready(Some(SwarmEvent::Behaviour( - KademliaEvent::OutboundQueryCompleted { + KademliaEvent::OutboundQueryProgressed { id, result: QueryResult::RepublishRecord(res), stats, + step: index, }, ))) => { assert!(qids.is_empty() || qids.remove(&id)); @@ -590,6 +592,8 @@ fn put_record() { assert!(stats.num_successes() >= replication_factor.get() as u32); assert!(stats.num_requests() >= stats.num_successes()); assert_eq!(stats.num_failures(), 0); + assert_eq!(index.count, 1); + assert!(index.last); match res { Err(e) => panic!("{:?}", e), Ok(ok) => { @@ -766,7 +770,7 @@ fn get_record() { loop { match swarm.poll_next_unpin(ctx) { Poll::Ready(Some(SwarmEvent::Behaviour( - KademliaEvent::OutboundQueryCompleted { + KademliaEvent::OutboundQueryProgressed { id, result: QueryResult::GetRecord(Ok(GetRecordOk { @@ -824,7 +828,7 @@ fn get_record_many() { loop { match swarm.poll_next_unpin(ctx) { Poll::Ready(Some(SwarmEvent::Behaviour( - KademliaEvent::OutboundQueryCompleted { + KademliaEvent::OutboundQueryProgressed { id, result: QueryResult::GetRecord(Ok(GetRecordOk { records, .. })), .. @@ -911,14 +915,14 @@ fn add_provider() { loop { match swarm.poll_next_unpin(ctx) { Poll::Ready(Some(SwarmEvent::Behaviour( - KademliaEvent::OutboundQueryCompleted { + KademliaEvent::OutboundQueryProgressed { id, result: QueryResult::StartProviding(res), .. }, ))) | Poll::Ready(Some(SwarmEvent::Behaviour( - KademliaEvent::OutboundQueryCompleted { + KademliaEvent::OutboundQueryProgressed { id, result: QueryResult::RepublishProvider(res), .. @@ -1049,7 +1053,7 @@ fn exceed_jobs_max_queries() { loop { if let Poll::Ready(Some(e)) = swarm.poll_next_unpin(ctx) { match e { - SwarmEvent::Behaviour(KademliaEvent::OutboundQueryCompleted { + SwarmEvent::Behaviour(KademliaEvent::OutboundQueryProgressed { result: QueryResult::GetClosestPeers(Ok(r)), .. }) => break assert!(r.peers.is_empty()), @@ -1129,7 +1133,7 @@ fn disjoint_query_does_not_finish_before_all_paths_did() { loop { match swarm.poll_next_unpin(ctx) { Poll::Ready(Some(SwarmEvent::Behaviour( - KademliaEvent::OutboundQueryCompleted { + KademliaEvent::OutboundQueryProgressed { result: QueryResult::GetRecord(result), .. }, @@ -1184,7 +1188,7 @@ fn disjoint_query_does_not_finish_before_all_paths_did() { loop { match swarm.poll_next_unpin(ctx) { Poll::Ready(Some(SwarmEvent::Behaviour( - KademliaEvent::OutboundQueryCompleted { + KademliaEvent::OutboundQueryProgressed { result: QueryResult::GetRecord(result), .. }, @@ -1341,7 +1345,7 @@ fn get_providers_single() { block_on(async { match single_swarm.next().await.unwrap() { - SwarmEvent::Behaviour(KademliaEvent::OutboundQueryCompleted { + SwarmEvent::Behaviour(KademliaEvent::OutboundQueryProgressed { result: QueryResult::StartProviding(Ok(_)), .. }) => {} @@ -1350,30 +1354,30 @@ fn get_providers_single() { } }); - let query_id = single_swarm - .behaviour_mut() - .get_providers(key.clone(), ProviderLimit::None); + let query_id = single_swarm.behaviour_mut().get_providers(key.clone()); + let mut found_key = None; block_on(async { - match single_swarm.next().await.unwrap() { - SwarmEvent::Behaviour(KademliaEvent::OutboundQueryCompleted { - id, - result: - QueryResult::GetProviders(Ok(GetProvidersOk { - key: found_key, - providers, - .. - })), - .. - }) if id == query_id => { - assert_eq!(key, found_key); - assert_eq!( - single_swarm.local_peer_id(), - providers.iter().next().unwrap() - ); + loop { + match single_swarm.next().await.unwrap() { + SwarmEvent::Behaviour(KademliaEvent::OutboundQueryProgressed { + id, + result: QueryResult::GetProviders(Ok(GetProvidersOk { key, providers, .. })), + step: index, + .. + }) if id == query_id => { + if index.last { + assert_eq!(key, found_key.unwrap()); + break; + } else { + found_key = Some(key); + assert_eq!(providers.len(), 1); + assert!(providers.contains(single_swarm.local_peer_id())); + } + } + SwarmEvent::Behaviour(e) => panic!("Unexpected event: {:?}", e), + _ => {} } - SwarmEvent::Behaviour(e) => panic!("Unexpected event: {:?}", e), - _ => {} } }); } @@ -1408,16 +1412,16 @@ fn get_providers_limit() { } // Query with expecting a single provider. - let query_id = swarms[0] - .behaviour_mut() - .get_providers(key.clone(), ProviderLimit::N(N.try_into().unwrap())); + let query_id = swarms[0].behaviour_mut().get_providers(key.clone()); + + let mut all_providers: Vec = vec![]; block_on(poll_fn(move |ctx| { for (i, swarm) in swarms.iter_mut().enumerate() { loop { match swarm.poll_next_unpin(ctx) { Poll::Ready(Some(SwarmEvent::Behaviour( - KademliaEvent::OutboundQueryCompleted { + KademliaEvent::OutboundQueryProgressed { id, result: QueryResult::GetProviders(Ok(GetProvidersOk { @@ -1425,15 +1429,31 @@ fn get_providers_limit() { providers, .. })), + step: index, .. }, ))) if i == 0 && id == query_id => { - // There are a total of 2 providers. - assert_eq!(providers.len(), std::cmp::min(N, 2)); - assert_eq!(key, found_key); - // Providers should be either 2 or 3 - assert_ne!(swarm.local_peer_id(), providers.iter().next().unwrap()); - return Poll::Ready(()); + if index.last { + assert_eq!(key, found_key); + assert!(providers.is_empty()); + assert_eq!(all_providers.len(), N); + return Poll::Ready(()); + } else { + // There are a total of 2 providers. + assert_eq!(key, found_key); + for provider in &providers { + // Providers should be either 2 or 3 + assert_ne!(swarm.local_peer_id(), provider); + } + all_providers.extend(providers.clone()); + + // If we have all providers, finish. + if all_providers.len() == N { + swarm.behaviour_mut().query_mut(&id).unwrap().finish(); + } + + return Poll::Ready(()); + } } Poll::Ready(..) => {} Poll::Pending => break, diff --git a/protocols/kad/src/lib.rs b/protocols/kad/src/lib.rs index 048d78f7b67d..de6f8159e0ba 100644 --- a/protocols/kad/src/lib.rs +++ b/protocols/kad/src/lib.rs @@ -65,7 +65,7 @@ pub use behaviour::{ }; pub use behaviour::{ Kademlia, KademliaBucketInserts, KademliaCaching, KademliaConfig, KademliaEvent, - KademliaStoreInserts, ProviderLimit, Quorum, + KademliaStoreInserts, Quorum, }; pub use protocol::KadConnectionType; pub use query::QueryId; diff --git a/protocols/kad/src/query.rs b/protocols/kad/src/query.rs index 708c758464f1..0727a95d7e66 100644 --- a/protocols/kad/src/query.rs +++ b/protocols/kad/src/query.rs @@ -419,6 +419,18 @@ impl Query { stats: self.stats, } } + + pub fn as_intermediary_result(&self) -> impl Iterator + '_ { + match self.peer_iter { + QueryPeerIter::Closest(ref iter) => { + Either::Left(Either::Left(iter.as_intermediary_result())) + } + QueryPeerIter::ClosestDisjoint(ref iter) => { + Either::Left(Either::Right(iter.as_intermediary_result())) + } + QueryPeerIter::Fixed(ref iter) => Either::Right(iter.as_intermediary_result()), + } + } } /// The result of a `Query`. diff --git a/protocols/kad/src/query/peers/closest.rs b/protocols/kad/src/query/peers/closest.rs index 87d452e8e404..efe7b735f134 100644 --- a/protocols/kad/src/query/peers/closest.rs +++ b/protocols/kad/src/query/peers/closest.rs @@ -365,6 +365,19 @@ impl ClosestPeersIter { self.state == State::Finished } + pub fn as_intermediary_result(&self) -> impl Iterator + '_ { + self.closest_peers + .iter() + .filter_map(|(_, peer)| { + if let PeerState::Succeeded = peer.state { + Some(peer.key.clone().into_preimage()) + } else { + None + } + }) + .take(self.config.num_results.get()) + } + /// Consumes the iterator, returning the closest peers. pub fn into_result(self) -> impl Iterator { self.closest_peers diff --git a/protocols/kad/src/query/peers/closest/disjoint.rs b/protocols/kad/src/query/peers/closest/disjoint.rs index df7ab70bedce..3c1120c47362 100644 --- a/protocols/kad/src/query/peers/closest/disjoint.rs +++ b/protocols/kad/src/query/peers/closest/disjoint.rs @@ -331,6 +331,15 @@ impl ClosestDisjointPeersIter { ResultIter::new(self.target, result_per_path).map(Key::into_preimage) } + + pub fn as_intermediary_result(&self) -> impl Iterator + '_ { + let result_per_path = self + .iters + .iter() + .map(|iter| iter.clone().into_result().map(Key::from)); + + ResultIter::new(self.target.clone(), result_per_path).map(Key::into_preimage) + } } /// Index into the [`ClosestDisjointPeersIter`] `iters` vector. diff --git a/protocols/kad/src/query/peers/fixed.rs b/protocols/kad/src/query/peers/fixed.rs index 022a3de6f9f7..7afa08935c0c 100644 --- a/protocols/kad/src/query/peers/fixed.rs +++ b/protocols/kad/src/query/peers/fixed.rs @@ -170,6 +170,16 @@ impl FixedPeersIter { } }) } + + pub fn as_intermediary_result(&self) -> impl Iterator + '_ { + self.peers.iter().filter_map(|(p, s)| { + if let PeerState::Succeeded = s { + Some(*p) + } else { + None + } + }) + } } #[cfg(test)]