From a128534d564c77e386095a535efb8570aec5e0c0 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 4 Feb 2019 13:52:27 +0100 Subject: [PATCH 1/4] Add NetworkBehaviour::inject_replaced --- core/src/swarm.rs | 9 +++++++-- misc/core-derive/src/lib.rs | 27 +++++++++++++++++++++++++++ protocols/kad/src/behaviour.rs | 12 ++++++++++++ protocols/kad/src/query.rs | 27 +++++++++++++++++++++++++++ 4 files changed, 73 insertions(+), 2 deletions(-) diff --git a/core/src/swarm.rs b/core/src/swarm.rs index 010e3dc6274..14fc4e14db6 100644 --- a/core/src/swarm.rs +++ b/core/src/swarm.rs @@ -252,8 +252,7 @@ where TBehaviour: NetworkBehaviour, self.behaviour.inject_disconnected(&peer_id, endpoint); }, Async::Ready(RawSwarmEvent::Replaced { peer_id, closed_endpoint, endpoint }) => { - self.behaviour.inject_disconnected(&peer_id, closed_endpoint); - self.behaviour.inject_connected(peer_id, endpoint); + self.behaviour.inject_replaced(peer_id, closed_endpoint, endpoint); }, Async::Ready(RawSwarmEvent::IncomingConnection(incoming)) => { let handler = self.behaviour.new_handler(); @@ -333,6 +332,12 @@ pub trait NetworkBehaviour { /// endpoint is the one we used to be connected to. fn inject_disconnected(&mut self, peer_id: &PeerId, endpoint: ConnectedPoint); + /// Indicates the behaviour that we replace the connection from the node with another. + fn inject_replaced(&mut self, peer_id: PeerId, closed_endpoint: ConnectedPoint, new_endpoint: ConnectedPoint) { + self.inject_disconnected(&peer_id, closed_endpoint); + self.inject_connected(peer_id, new_endpoint); + } + /// Indicates the behaviour that the node with the given peer id has generated an event for /// us. /// diff --git a/misc/core-derive/src/lib.rs b/misc/core-derive/src/lib.rs index f63f927df3e..8c899b4decf 100644 --- a/misc/core-derive/src/lib.rs +++ b/misc/core-derive/src/lib.rs @@ -191,6 +191,28 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { }) }; + // Build the list of statements to put in the body of `inject_replaced()`. + let inject_replaced_stmts = { + let num_fields = data_struct.fields.iter().filter(|f| !is_ignored(f)).count(); + data_struct.fields.iter().enumerate().filter_map(move |(field_n, field)| { + if is_ignored(&field) { + return None; + } + + Some(if field_n == num_fields - 1 { + match field.ident { + Some(ref i) => quote!{ self.#i.inject_replaced(peer_id, closed_endpoint, new_endpoint); }, + None => quote!{ self.#field_n.inject_replaced(peer_id, closed_endpoint, new_endpoint); }, + } + } else { + match field.ident { + Some(ref i) => quote!{ self.#i.inject_replaced(peer_id.clone(), closed_endpoint.clone(), new_endpoint.clone()); }, + None => quote!{ self.#field_n.inject_replaced(peer_id.clone(), closed_endpoint.clone(), new_endpoint.clone()); }, + } + }) + }) + }; + // Build the list of statements to put in the body of `inject_dial_failure()`. let inject_dial_failure_stmts = { data_struct.fields.iter().enumerate().filter_map(move |(field_n, field)| { @@ -367,6 +389,11 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { #(#inject_disconnected_stmts);* } + #[inline] + fn inject_replaced(&mut self, peer_id: #peer_id, closed_endpoint: #connected_point, new_endpoint: #connected_point) { + #(#inject_replaced_stmts);* + } + #[inline] fn inject_dial_failure(&mut self, peer_id: Option<&#peer_id>, addr: &#multiaddr, error: &dyn std::error::Error) { #(#inject_dial_failure_stmts);* diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index 4755c93ac36..ad0fd4f69d0 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -321,6 +321,18 @@ where } } + fn inject_replaced(&mut self, peer_id: PeerId, _: ConnectedPoint, _: ConnectedPoint) { + // We need to re-send the active queries. + for (query_id, (query, _, _)) in self.active_queries.iter() { + if query.is_waiting(&peer_id) { + self.queued_events.push(NetworkBehaviourAction::SendEvent { + peer_id: peer_id.clone(), + event: query.target().to_rpc_request(*query_id), + }); + } + } + } + fn inject_node_event(&mut self, source: PeerId, event: KademliaHandlerEvent) { match event { KademliaHandlerEvent::FindNodeReq { key, request_id } => { diff --git a/protocols/kad/src/query.rs b/protocols/kad/src/query.rs index 88d5881b9db..888015df771 100644 --- a/protocols/kad/src/query.rs +++ b/protocols/kad/src/query.rs @@ -216,6 +216,33 @@ impl QueryState { } } + /// Returns true if we are waiting for a query answer from that peer. + /// + /// After `poll()` returned `SendRpc`, this function will return `true`. + pub fn is_waiting(&self, id: &PeerId) -> bool { + let state = self + .closest_peers + .iter() + .filter_map( + |(peer_id, state)| { + if peer_id == id { + Some(state) + } else { + None + } + }, + ) + .next(); + + match state { + Some(&QueryPeerState::InProgress(_)) => true, + Some(&QueryPeerState::NotContacted) => false, + Some(&QueryPeerState::Succeeded) => false, + Some(&QueryPeerState::Failed) => false, + None => false, + } + } + /// After `poll()` returned `SendRpc`, this function should be called if we were unable to /// reach the peer, or if an error of some sort happened. /// From 61328280a7a86020a8b4e5bbfe6db85766f2d732 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 4 Feb 2019 14:28:03 +0100 Subject: [PATCH 2/4] Address style --- misc/core-derive/src/lib.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/misc/core-derive/src/lib.rs b/misc/core-derive/src/lib.rs index 8c899b4decf..917587699cb 100644 --- a/misc/core-derive/src/lib.rs +++ b/misc/core-derive/src/lib.rs @@ -206,8 +206,12 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { } } else { match field.ident { - Some(ref i) => quote!{ self.#i.inject_replaced(peer_id.clone(), closed_endpoint.clone(), new_endpoint.clone()); }, - None => quote!{ self.#field_n.inject_replaced(peer_id.clone(), closed_endpoint.clone(), new_endpoint.clone()); }, + Some(ref i) => quote!{ + self.#i.inject_replaced(peer_id.clone(), closed_endpoint.clone(), new_endpoint.clone()); + }, + None => quote!{ + self.#field_n.inject_replaced(peer_id.clone(), closed_endpoint.clone(), new_endpoint.clone()); + }, } }) }) From f01f5eee7d81af4e99f68b0df03000b76216aed3 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 4 Feb 2019 14:47:06 +0100 Subject: [PATCH 3/4] Forgot to call set_disconnected --- protocols/kad/src/behaviour.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index ad0fd4f69d0..fb319aa9126 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -319,6 +319,8 @@ where for (query, _, _) in self.active_queries.values_mut() { query.inject_rpc_error(id); } + + self.kbuckets.set_disconnected(&id); } fn inject_replaced(&mut self, peer_id: PeerId, _: ConnectedPoint, _: ConnectedPoint) { From bb5175837fb91a46e01da195fb9562c3168fefae Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 4 Feb 2019 14:50:13 +0100 Subject: [PATCH 4/4] Also add incoming addresses to kbuckets --- protocols/kad/src/behaviour.rs | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index fb319aa9126..216bf993147 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -291,7 +291,7 @@ where .unwrap_or_else(Vec::new) } - fn inject_connected(&mut self, id: PeerId, _: ConnectedPoint) { + fn inject_connected(&mut self, id: PeerId, endpoint: ConnectedPoint) { if let Some(pos) = self.pending_rpcs.iter().position(|(p, _)| p == &id) { let (_, rpc) = self.pending_rpcs.remove(pos); self.queued_events.push(NetworkBehaviourAction::SendEvent { @@ -309,6 +309,14 @@ where _ => () } + if let ConnectedPoint::Dialer { address } = endpoint { + if let Some(list) = self.kbuckets.entry_mut(&id) { + if list.iter().all(|a| *a != address) { + list.push(address); + } + } + } + self.connected_peers.insert(id); } @@ -323,7 +331,7 @@ where self.kbuckets.set_disconnected(&id); } - fn inject_replaced(&mut self, peer_id: PeerId, _: ConnectedPoint, _: ConnectedPoint) { + fn inject_replaced(&mut self, peer_id: PeerId, _: ConnectedPoint, new_endpoint: ConnectedPoint) { // We need to re-send the active queries. for (query_id, (query, _, _)) in self.active_queries.iter() { if query.is_waiting(&peer_id) { @@ -333,6 +341,14 @@ where }); } } + + if let ConnectedPoint::Dialer { address } = new_endpoint { + if let Some(list) = self.kbuckets.entry_mut(&peer_id) { + if list.iter().all(|a| *a != address) { + list.push(address); + } + } + } } fn inject_node_event(&mut self, source: PeerId, event: KademliaHandlerEvent) {