Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve provider fetching #124

Merged
merged 2 commits into from
Jun 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ members = [

[patch.crates-io]
# TODO: switch to crates.io once 0.45 is released
libp2p = { git = "https://github.com/libp2p/rust-libp2p", branch = "master" }
libp2p = { git = "https://github.com/dignifiedquire/rust-libp2p", branch = "feat-kad-count" }
# libp2p = { path = "../rust-libp2p" }
20 changes: 17 additions & 3 deletions iroh-bitswap/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ use libp2p::core::connection::ConnectionId;
use libp2p::core::{ConnectedPoint, Multiaddr, PeerId};
use libp2p::swarm::handler::OneShotHandler;
use libp2p::swarm::{
IntoConnectionHandler, NetworkBehaviour, NetworkBehaviourAction, PollParameters,
DialError, IntoConnectionHandler, NetworkBehaviour, NetworkBehaviourAction, PollParameters,
};
use prometheus_client::registry::Registry;
use tracing::{debug, instrument, warn};
use tracing::{debug, instrument, trace, warn};

use crate::message::{BitswapMessage, Priority};
use crate::protocol::{BitswapProtocol, Upgrade};
Expand Down Expand Up @@ -206,6 +206,20 @@ impl NetworkBehaviour for Bitswap {
}
}

#[instrument(skip(self, _handler))]
fn inject_dial_failure(
&mut self,
peer_id: Option<PeerId>,
_handler: Self::ConnectionHandler,
_error: &DialError,
) {
trace!("failed to dial");
if let Some(ref peer_id) = peer_id {
self.sessions.dial_failure(peer_id);
self.queries.dial_failure(peer_id);
}
}

#[instrument(skip(self))]
fn inject_event(&mut self, peer_id: PeerId, connection: ConnectionId, message: HandlerEvent) {
match message {
Expand Down Expand Up @@ -254,7 +268,7 @@ impl NetworkBehaviour for Bitswap {
.push_back(NetworkBehaviourAction::GenerateEvent(event));
}

// TODO: cancle Query::Send
// TODO: cancel Query::Send

// Propagate Cancel Events
for cid in message.wantlist().cancels() {
Expand Down
4 changes: 4 additions & 0 deletions iroh-bitswap/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ impl QueryManager {
}
}

pub fn dial_failure(&mut self, peer_id: &PeerId) {
self.disconnected(peer_id);
}

fn next_finished_query(&mut self) -> Option<(QueryId, Query)> {
let mut next_query = None;
for (query_id, query) in &self.queries {
Expand Down
28 changes: 23 additions & 5 deletions iroh-bitswap/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use libp2p::{
},
PeerId,
};
use tracing::{debug, trace};
use tracing::trace;

use crate::{behaviour::BitswapHandler, query::QueryManager, BitswapEvent};

Expand Down Expand Up @@ -73,6 +73,12 @@ impl SessionManager {
self.sessions.remove(peer_id);
}

pub fn dial_failure(&mut self, peer_id: &PeerId) {
if let Some(session) = self.sessions.get_mut(peer_id) {
session.state = State::Disconnected;
}
}

pub fn create_session(&mut self, peer_id: &PeerId) {
let session = self.sessions.entry(*peer_id).or_insert(Session {
state: State::New,
Expand All @@ -99,8 +105,13 @@ impl SessionManager {
queries: &mut QueryManager,
) -> Option<NetworkBehaviourAction<BitswapEvent, BitswapHandler>> {
// cleanup disconnects
self.sessions
.retain(|_, s| !matches!(s.state, State::Disconnected));
self.sessions.retain(|_id, s| {
if matches!(s.state, State::Disconnected) {
return false;
}

true
});

// limit parallel dials
let skip_dialing =
Expand All @@ -117,7 +128,7 @@ impl SessionManager {
// no dialing this round
continue;
}
trace!("Dialing {}", peer_id);
trace!("dialing {}", peer_id);
let handler = Default::default();
session.state = State::Dialing(Instant::now());

Expand All @@ -134,13 +145,20 @@ impl SessionManager {
State::Dialing(start) => {
// check for dial timeouts
if start.elapsed() >= self.config.dial_timeout {
debug!("dialing {}: timed out", peer_id);
trace!("dialing {}: timed out", peer_id);
queries.disconnected(peer_id);
session.state = State::Disconnected;
}
}
State::Connected => {
if let Some(event) = queries.poll_peer(peer_id) {
if let NetworkBehaviourAction::GenerateEvent(
BitswapEvent::OutboundQueryCompleted { .. },
) = event
{
session.query_count -= 1;
}

return Some(event);
}
}
Expand Down
21 changes: 17 additions & 4 deletions iroh-p2p/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,18 @@ impl NodeBehaviour {
.into();

let kad = if config.kademlia {
let local_peer_id = local_key.public().to_peer_id();
let pub_key = local_key.public();

// TODO: persist to store
let store = MemoryStore::new(local_peer_id.to_owned());
let store = MemoryStore::new(pub_key.to_peer_id());

// TODO: make user configurable
let mut kad_config = KademliaConfig::default();
kad_config.set_parallelism(16usize.try_into().unwrap());
// TODO: potentially lower (this is per query)
kad_config.set_query_timeout(Duration::from_secs(5));
kad_config.set_query_timeout(Duration::from_secs(60));

let mut kademlia = Kademlia::with_config(local_peer_id, store, kad_config);
let mut kademlia = Kademlia::with_config(pub_key.to_peer_id(), store, kad_config);
for multiaddr in &config.bootstrap_peers {
// TODO: move parsing into config
let mut addr = multiaddr.to_owned();
Expand All @@ -113,9 +115,12 @@ impl NodeBehaviour {
warn!("Could not parse bootstrap addr {}", multiaddr);
}
}

// Trigger initial bootstrap
if let Err(e) = kademlia.bootstrap() {
warn!("Kademlia bootstrap failed: {}", e);
}

Some(kademlia)
} else {
None
Expand Down Expand Up @@ -163,4 +168,12 @@ impl NodeBehaviour {
kad.add_address(peer, addr);
}
}

pub fn finish_query(&mut self, id: &libp2p::kad::QueryId) {
if let Some(kad) = self.kad.as_mut() {
if let Some(mut query) = kad.query_mut(id) {
query.finish();
}
}
}
}
2 changes: 1 addition & 1 deletion iroh-p2p/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl Default for Libp2pConfig {
bootstrap_peers,
mdns: false,
kademlia: true,
target_peer_count: 75,
target_peer_count: 256,
rpc_addr: "0.0.0.0:4401".parse().unwrap(),
rpc_client: RpcClientConfig::default(),
metrics: MetricsConfig::default(),
Expand Down
30 changes: 21 additions & 9 deletions iroh-p2p/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ use futures::channel::oneshot;
use libp2p::kad::record::Key;
use libp2p::Multiaddr;
use libp2p::PeerId;
use tokio::sync::mpsc;
use tonic::{transport::Server as TonicServer, Request, Response, Status};
use tracing::trace;
use tracing::{trace, warn};

use iroh_bitswap::{Block, QueryError};
use iroh_rpc_types::p2p::p2p_server;
Expand Down Expand Up @@ -84,22 +85,33 @@ impl p2p_server::P2p for P2p {
iroh_metrics::req::set_trace_ctx(&request);
let req = request.into_inner();
trace!("received ProviderRequest: {:?}", req.key);
let (s, r) = oneshot::channel();
let (s, mut r) = mpsc::channel(1024);
let msg = RpcMessage::ProviderRequest {
key: req.key.into(),
key: req.key.clone().into(),
response_channel: s,
};

self.sender
.send(msg)
.await
.map_err(|_| Status::internal("receiver dropped"))?;

let providers = r
.await
.map_err(|_| Status::internal("sender dropped"))?
.map_err(|e| Status::internal(format!("failed to retrieve provider: {:?}", e)))?;
// TODO: streaming response
let mut providers = Vec::new();
while let Some(provider) = r.recv().await {
match provider {
Ok(provider) => providers.push(provider.to_bytes()),
Err(e) => {
if providers.is_empty() {
return Err(Status::internal(e));
} else {
warn!("error fetching providers for key {:?}: {:?}", req.key, e);
break;
}
}
}
}

let providers = providers.into_iter().map(|p| p.to_bytes()).collect();
Ok(Response::new(Providers { providers }))
}

Expand Down Expand Up @@ -219,7 +231,7 @@ pub enum RpcMessage {
ProviderRequest {
// TODO: potentially change this to Cid, as that is the only key we use for providers
key: Key,
response_channel: oneshot::Sender<Result<HashSet<PeerId>, String>>,
response_channel: mpsc::Sender<Result<PeerId, String>>,
},
NetListeningAddrs(oneshot::Sender<(PeerId, Vec<Multiaddr>)>),
NetPeers(oneshot::Sender<HashMap<PeerId, Vec<Multiaddr>>>),
Expand Down
Loading