Skip to content

Commit

Permalink
Add NetworkBehaviour::inject_replaced (#914)
Browse files Browse the repository at this point in the history
* Add NetworkBehaviour::inject_replaced

* Address style

* Forgot to call set_disconnected

* Also add incoming addresses to kbuckets
  • Loading branch information
tomaka authored Feb 4, 2019
1 parent 7f66c4f commit c9b7e23
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 3 deletions.
9 changes: 7 additions & 2 deletions core/src/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,7 @@ where TBehaviour: NetworkBehaviour,
self.behaviour.inject_disconnected(&peer_id, endpoint);
},
Async::Ready(RawSwarmEvent::Replaced { peer_id, closed_endpoint, endpoint }) => {
self.behaviour.inject_disconnected(&peer_id, closed_endpoint);
self.behaviour.inject_connected(peer_id, endpoint);
self.behaviour.inject_replaced(peer_id, closed_endpoint, endpoint);
},
Async::Ready(RawSwarmEvent::IncomingConnection(incoming)) => {
let handler = self.behaviour.new_handler();
Expand Down Expand Up @@ -343,6 +342,12 @@ pub trait NetworkBehaviour {
/// endpoint is the one we used to be connected to.
fn inject_disconnected(&mut self, peer_id: &PeerId, endpoint: ConnectedPoint);

/// Indicates the behaviour that we replace the connection from the node with another.
fn inject_replaced(&mut self, peer_id: PeerId, closed_endpoint: ConnectedPoint, new_endpoint: ConnectedPoint) {
self.inject_disconnected(&peer_id, closed_endpoint);
self.inject_connected(peer_id, new_endpoint);
}

/// Indicates the behaviour that the node with the given peer id has generated an event for
/// us.
///
Expand Down
31 changes: 31 additions & 0 deletions misc/core-derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,32 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
})
};

// Build the list of statements to put in the body of `inject_replaced()`.
let inject_replaced_stmts = {
let num_fields = data_struct.fields.iter().filter(|f| !is_ignored(f)).count();
data_struct.fields.iter().enumerate().filter_map(move |(field_n, field)| {
if is_ignored(&field) {
return None;
}

Some(if field_n == num_fields - 1 {
match field.ident {
Some(ref i) => quote!{ self.#i.inject_replaced(peer_id, closed_endpoint, new_endpoint); },
None => quote!{ self.#field_n.inject_replaced(peer_id, closed_endpoint, new_endpoint); },
}
} else {
match field.ident {
Some(ref i) => quote!{
self.#i.inject_replaced(peer_id.clone(), closed_endpoint.clone(), new_endpoint.clone());
},
None => quote!{
self.#field_n.inject_replaced(peer_id.clone(), closed_endpoint.clone(), new_endpoint.clone());
},
}
})
})
};

// Build the list of statements to put in the body of `inject_dial_failure()`.
let inject_dial_failure_stmts = {
data_struct.fields.iter().enumerate().filter_map(move |(field_n, field)| {
Expand Down Expand Up @@ -367,6 +393,11 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
#(#inject_disconnected_stmts);*
}

#[inline]
fn inject_replaced(&mut self, peer_id: #peer_id, closed_endpoint: #connected_point, new_endpoint: #connected_point) {
#(#inject_replaced_stmts);*
}

#[inline]
fn inject_dial_failure(&mut self, peer_id: Option<&#peer_id>, addr: &#multiaddr, error: &dyn std::error::Error) {
#(#inject_dial_failure_stmts);*
Expand Down
32 changes: 31 additions & 1 deletion protocols/kad/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ where
.unwrap_or_else(Vec::new)
}

fn inject_connected(&mut self, id: PeerId, _: ConnectedPoint) {
fn inject_connected(&mut self, id: PeerId, endpoint: ConnectedPoint) {
if let Some(pos) = self.pending_rpcs.iter().position(|(p, _)| p == &id) {
let (_, rpc) = self.pending_rpcs.remove(pos);
self.queued_events.push(NetworkBehaviourAction::SendEvent {
Expand All @@ -309,6 +309,14 @@ where
_ => ()
}

if let ConnectedPoint::Dialer { address } = endpoint {
if let Some(list) = self.kbuckets.entry_mut(&id) {
if list.iter().all(|a| *a != address) {
list.push(address);
}
}
}

self.connected_peers.insert(id);
}

Expand All @@ -319,6 +327,28 @@ where
for (query, _, _) in self.active_queries.values_mut() {
query.inject_rpc_error(id);
}

self.kbuckets.set_disconnected(&id);
}

fn inject_replaced(&mut self, peer_id: PeerId, _: ConnectedPoint, new_endpoint: ConnectedPoint) {
// We need to re-send the active queries.
for (query_id, (query, _, _)) in self.active_queries.iter() {
if query.is_waiting(&peer_id) {
self.queued_events.push(NetworkBehaviourAction::SendEvent {
peer_id: peer_id.clone(),
event: query.target().to_rpc_request(*query_id),
});
}
}

if let ConnectedPoint::Dialer { address } = new_endpoint {
if let Some(list) = self.kbuckets.entry_mut(&peer_id) {
if list.iter().all(|a| *a != address) {
list.push(address);
}
}
}
}

fn inject_node_event(&mut self, source: PeerId, event: KademliaHandlerEvent<QueryId>) {
Expand Down
27 changes: 27 additions & 0 deletions protocols/kad/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,33 @@ impl QueryState {
}
}

/// Returns true if we are waiting for a query answer from that peer.
///
/// After `poll()` returned `SendRpc`, this function will return `true`.
pub fn is_waiting(&self, id: &PeerId) -> bool {
let state = self
.closest_peers
.iter()
.filter_map(
|(peer_id, state)| {
if peer_id == id {
Some(state)
} else {
None
}
},
)
.next();

match state {
Some(&QueryPeerState::InProgress(_)) => true,
Some(&QueryPeerState::NotContacted) => false,
Some(&QueryPeerState::Succeeded) => false,
Some(&QueryPeerState::Failed) => false,
None => false,
}
}

/// After `poll()` returned `SendRpc`, this function should be called if we were unable to
/// reach the peer, or if an error of some sort happened.
///
Expand Down

0 comments on commit c9b7e23

Please sign in to comment.