Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add NetworkBehaviour::inject_replaced #914

Merged
merged 5 commits into from
Feb 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions core/src/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,7 @@ where TBehaviour: NetworkBehaviour,
self.behaviour.inject_disconnected(&peer_id, endpoint);
},
Async::Ready(RawSwarmEvent::Replaced { peer_id, closed_endpoint, endpoint }) => {
self.behaviour.inject_disconnected(&peer_id, closed_endpoint);
self.behaviour.inject_connected(peer_id, endpoint);
self.behaviour.inject_replaced(peer_id, closed_endpoint, endpoint);
},
Async::Ready(RawSwarmEvent::IncomingConnection(incoming)) => {
let handler = self.behaviour.new_handler();
Expand Down Expand Up @@ -343,6 +342,12 @@ pub trait NetworkBehaviour {
/// endpoint is the one we used to be connected to.
fn inject_disconnected(&mut self, peer_id: &PeerId, endpoint: ConnectedPoint);

/// Indicates the behaviour that we replace the connection from the node with another.
fn inject_replaced(&mut self, peer_id: PeerId, closed_endpoint: ConnectedPoint, new_endpoint: ConnectedPoint) {
self.inject_disconnected(&peer_id, closed_endpoint);
self.inject_connected(peer_id, new_endpoint);
}

/// Indicates the behaviour that the node with the given peer id has generated an event for
/// us.
///
Expand Down
31 changes: 31 additions & 0 deletions misc/core-derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,32 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
})
};

// Build the list of statements to put in the body of `inject_replaced()`.
let inject_replaced_stmts = {
let num_fields = data_struct.fields.iter().filter(|f| !is_ignored(f)).count();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you doing this count and all the rather complex looking code below only to save three clones? Are these three structs so heavy that this warrants the extra looping and increased code complexity?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! The plan is to ultimately remove this custom derive, which is why no effort has been put into making its code more readable.

data_struct.fields.iter().enumerate().filter_map(move |(field_n, field)| {
if is_ignored(&field) {
return None;
}

Some(if field_n == num_fields - 1 {
match field.ident {
Some(ref i) => quote!{ self.#i.inject_replaced(peer_id, closed_endpoint, new_endpoint); },
None => quote!{ self.#field_n.inject_replaced(peer_id, closed_endpoint, new_endpoint); },
}
} else {
match field.ident {
Some(ref i) => quote!{
self.#i.inject_replaced(peer_id.clone(), closed_endpoint.clone(), new_endpoint.clone());
},
None => quote!{
self.#field_n.inject_replaced(peer_id.clone(), closed_endpoint.clone(), new_endpoint.clone());
},
}
})
})
};

// Build the list of statements to put in the body of `inject_dial_failure()`.
let inject_dial_failure_stmts = {
data_struct.fields.iter().enumerate().filter_map(move |(field_n, field)| {
Expand Down Expand Up @@ -367,6 +393,11 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
#(#inject_disconnected_stmts);*
}

#[inline]
fn inject_replaced(&mut self, peer_id: #peer_id, closed_endpoint: #connected_point, new_endpoint: #connected_point) {
#(#inject_replaced_stmts);*
}

#[inline]
fn inject_dial_failure(&mut self, peer_id: Option<&#peer_id>, addr: &#multiaddr, error: &dyn std::error::Error) {
#(#inject_dial_failure_stmts);*
Expand Down
32 changes: 31 additions & 1 deletion protocols/kad/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ where
.unwrap_or_else(Vec::new)
}

fn inject_connected(&mut self, id: PeerId, _: ConnectedPoint) {
fn inject_connected(&mut self, id: PeerId, endpoint: ConnectedPoint) {
if let Some(pos) = self.pending_rpcs.iter().position(|(p, _)| p == &id) {
let (_, rpc) = self.pending_rpcs.remove(pos);
self.queued_events.push(NetworkBehaviourAction::SendEvent {
Expand All @@ -309,6 +309,14 @@ where
_ => ()
}

if let ConnectedPoint::Dialer { address } = endpoint {
if let Some(list) = self.kbuckets.entry_mut(&id) {
if list.iter().all(|a| *a != address) {
list.push(address);
}
}
}

self.connected_peers.insert(id);
}

Expand All @@ -319,6 +327,28 @@ where
for (query, _, _) in self.active_queries.values_mut() {
query.inject_rpc_error(id);
}

self.kbuckets.set_disconnected(&id);
}

fn inject_replaced(&mut self, peer_id: PeerId, _: ConnectedPoint, new_endpoint: ConnectedPoint) {
// We need to re-send the active queries.
for (query_id, (query, _, _)) in self.active_queries.iter() {
if query.is_waiting(&peer_id) {
self.queued_events.push(NetworkBehaviourAction::SendEvent {
peer_id: peer_id.clone(),
event: query.target().to_rpc_request(*query_id),
});
}
}

if let ConnectedPoint::Dialer { address } = new_endpoint {
if let Some(list) = self.kbuckets.entry_mut(&peer_id) {
if list.iter().all(|a| *a != address) {
list.push(address);
}
}
}
}

fn inject_node_event(&mut self, source: PeerId, event: KademliaHandlerEvent<QueryId>) {
Expand Down
27 changes: 27 additions & 0 deletions protocols/kad/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,33 @@ impl QueryState {
}
}

/// Returns true if we are waiting for a query answer from that peer.
///
/// After `poll()` returned `SendRpc`, this function will return `true`.
pub fn is_waiting(&self, id: &PeerId) -> bool {
let state = self
.closest_peers
.iter()
.filter_map(
|(peer_id, state)| {
if peer_id == id {
Some(state)
} else {
None
}
},
)
.next();

match state {
Some(&QueryPeerState::InProgress(_)) => true,
Some(&QueryPeerState::NotContacted) => false,
Some(&QueryPeerState::Succeeded) => false,
Some(&QueryPeerState::Failed) => false,
None => false,
}
}

/// After `poll()` returned `SendRpc`, this function should be called if we were unable to
/// reach the peer, or if an error of some sort happened.
///
Expand Down