Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
Extracted update_bucket_record from update_node
Browse files Browse the repository at this point in the history
  • Loading branch information
Kirill Pimenov committed Feb 12, 2019
1 parent be94743 commit eef7838
Showing 1 changed file with 44 additions and 24 deletions.
68 changes: 44 additions & 24 deletions util/network-devp2p/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ enum NodeValidity {
UnknownNode
}

enum BucketError {
Ourselves,
NotInTheBucket{node_entry: NodeEntry, bucket_distance: usize},
}

struct PingRequest {
// Time when the request was sent
sent_at: Instant,
Expand Down Expand Up @@ -224,41 +229,51 @@ impl<'a> Discovery<'a> {
}
}

fn update_node(&mut self, e: NodeEntry) -> Option<TableUpdates> {
trace!(target: "discovery", "Inserting {:?}", &e);
fn update_bucket_record(&mut self, e: NodeEntry) -> Result<(), BucketError> {
let id_hash = keccak(e.id);
let dist = match Discovery::distance(&self.id_hash, &id_hash) {
Some(dist) => dist,
None => {
debug!(target: "discovery", "Attempted to update own entry: {:?}", e);
return None;
return Err(BucketError::Ourselves);
}
};
let bucket = &mut self.node_buckets[dist];
bucket.nodes.iter_mut().find(|n| n.address.id == e.id)
.map_or(Err(BucketError::NotInTheBucket{node_entry: e.clone(), bucket_distance: dist}.into()), |entry| {
entry.address = e;
entry.last_seen = Instant::now();
entry.backoff_until = Instant::now();
entry.fail_count = 0;
Ok(())
})
}

let mut added_map = HashMap::new();
let ping = {
let bucket = &mut self.node_buckets[dist];
let updated = if let Some(node) = bucket.nodes.iter_mut().find(|n| n.address.id == e.id) {
node.address = e.clone();
node.last_seen = Instant::now();
node.backoff_until = Instant::now();
node.fail_count = 0;
true
} else { false };
fn update_node(&mut self, e: NodeEntry) -> Option<TableUpdates> {
trace!(target: "discovery", "Inserting {:?}", &e);

match self.update_bucket_record(e) {
Ok(()) => None,
Err(BucketError::Ourselves) => None,
Err(BucketError::NotInTheBucket{node_entry, bucket_distance}) => Some((node_entry, bucket_distance))
}.map(|(node_entry, bucket_distance)| {
trace!(target: "discovery", "Adding a new node {:?} into our bucket {}", &node_entry, bucket_distance);

if !updated {
added_map.insert(e.id, e.clone());
bucket.nodes.push_front(BucketEntry::new(e));
let mut added_map = HashMap::with_capacity(1);
added_map.insert(node_entry.id, node_entry.clone());

let node_to_ping = {
let bucket = &mut self.node_buckets[bucket_distance];
bucket.nodes.push_front(BucketEntry::new(node_entry));
if bucket.nodes.len() > BUCKET_SIZE {
select_bucket_ping(bucket.nodes.iter())
} else { None }
} else { None }
};
if let Some(node) = ping {
self.try_ping(node, PingReason::Default);
}
Some(TableUpdates { added: added_map, removed: HashSet::new() })
} else {
None
}
};
node_to_ping.map(|node| self.try_ping(node, PingReason::Default));
TableUpdates{added: added_map, removed: HashSet::new()}
})
}

/// Starts the discovery process at round 0
Expand Down Expand Up @@ -571,8 +586,13 @@ impl<'a> Discovery<'a> {
// but `on_packet` happens synchronously, so doing the full TCP handshake ceremony here is a bad idea.
// So instead we just LRU-caching most recently seen nodes to avoid unnecessary pinging
match validity {
NodeValidity::ValidNode(NodeCategory::Bucket) | NodeValidity::ExpiredNode(NodeCategory::Bucket) => { self.update_node(node); },
NodeValidity::ValidNode(NodeCategory::Bucket) | NodeValidity::ExpiredNode(NodeCategory::Bucket) => {
trace!(target: "discovery", "Updating node {:?} in our Kad buckets", &node);
self.update_bucket_record(node).map_err(|error| {
debug!(target: "discovery", "Error occured when pricessing ping from a bucket node: {}", error);
})},
NodeValidity::UnknownNode | NodeValidity::ExpiredNode(NodeCategory::Observed) | NodeValidity::ValidNode(NodeCategory::Observed)=> {
trace!(target: "discovery", "Updating node {:?} in the list of other_observed_nodes", &node);
self.other_observed_nodes.insert(node.id, (node.endpoint, Instant::now()));
},
NodeValidity::Ourselves => (),
Expand Down

0 comments on commit eef7838

Please sign in to comment.