diff --git a/validator-network/src/network_impl.rs b/validator-network/src/network_impl.rs index 286db4a8d7..ce7a4414df 100644 --- a/validator-network/src/network_impl.rs +++ b/validator-network/src/network_impl.rs @@ -231,9 +231,13 @@ where RequestError::InboundRequest(InboundRequestError::NoReceiver) => {} // In all other cases, clear the Peer ID cache for this validator _ => { - if let Some(validator_key) = - self.validator_keys.read().get(usize::from(validator_id)) - { + // Make sure to drop the `self.validator_keys` lock after this line. + let validator_key = self + .validator_keys + .read() + .get(usize::from(validator_id)) + .cloned(); + if let Some(validator_key) = validator_key { // Clear the peer ID cache only if the error happened for the same Peer ID that we have cached let mut validator_peer_id_cache = self.validator_peer_id_cache.write(); if let Some(cache_entry) = @@ -293,18 +297,36 @@ where /// ordered, such that the k-th entry is the validator with ID k. async fn set_validators(&self, validator_keys: Vec) { log::trace!(?validator_keys, "Setting validators for ValidatorNetwork"); - // Create new peer ID cache, but keep validators that are still active. - let mut validator_peer_id_cache = self.validator_peer_id_cache.write(); - let mut keep_cached = BTreeMap::new(); - for validator_key in &validator_keys { - if let Some(peer_id) = validator_peer_id_cache.remove(validator_key.compressed()) { - keep_cached.insert(validator_key.compressed().clone(), peer_id); - } - } + // Put the `validator_keys` into the same order as the + // `self.validator_peer_id_cache` so that we can simultaneously iterate + // over them. Note that `LazyPublicKey::cmp` forwards to + // `CompressedPublicKey::cmp`. + let mut sorted_validator_keys: Vec<_> = validator_keys.iter().collect(); + sorted_validator_keys.sort_unstable(); + let mut sorted_validator_keys = sorted_validator_keys.into_iter(); + let mut cur_key = sorted_validator_keys.next(); + + // Drop peer ID cache, but keep validators that are still active and + // validators who are currently being resolved. + self.validator_peer_id_cache + .write() + .retain(|key, cache_state| { + // If a lookup is in progress, the lookup thread expects to be + // able to put the result into the cache map. + // + // It'll get cleaned up on the next validator change. + if let CacheState::InProgress(..) = cache_state { + return true; + } + // Move `cur_key` until we're greater or equal to `key`. + while cur_key.map(|k| k.compressed() < key).unwrap_or(false) { + cur_key = sorted_validator_keys.next(); + } + Some(key) == cur_key.map(LazyPublicKey::compressed) + }); *self.validator_keys.write() = validator_keys; - *validator_peer_id_cache = keep_cached; } async fn send_to(&self, validator_id: u16, msg: M) -> Result<(), Self::Error> {