diff --git a/Cargo.lock b/Cargo.lock index f476cf6ab05..db8068b8f89 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4394,7 +4394,6 @@ dependencies = [ "chrono", "futures", "hex", - "indexmap", "lazy_static", "metrics", "pin-project 1.0.7", diff --git a/zebra-network/Cargo.toml b/zebra-network/Cargo.toml index 3e2cdef6577..aa244682cae 100644 --- a/zebra-network/Cargo.toml +++ b/zebra-network/Cargo.toml @@ -13,9 +13,6 @@ byteorder = "1.4" bytes = "1.1.0" chrono = "0.4" hex = "0.4" -# indexmap has rayon support for parallel iteration, -# which we don't use, so disable it to drop the dependencies. -indexmap = { version = "1.7", default-features = false } lazy_static = "1.4.0" pin-project = "1.0.7" rand = "0.8" diff --git a/zebra-network/src/peer_set/set.rs b/zebra-network/src/peer_set/set.rs index f7187b5a2ab..e8191727d92 100644 --- a/zebra-network/src/peer_set/set.rs +++ b/zebra-network/src/peer_set/set.rs @@ -2,8 +2,8 @@ //! //! # Implementation //! -//! The [`PeerSet`] implementation is adapted from the one in the [Tower Balance][tower-balance] crate, and as -//! described in that crate's documentation, it +//! The [`PeerSet`] implementation is adapted from the one in the [Tower Balance][tower-balance] crate. +//! As described in that crate's documentation, it: //! //! > Distributes requests across inner services using the [Power of Two Choices][p2c]. //! > @@ -16,11 +16,11 @@ //! > > The maximum load variance between any two servers is bound by `ln(ln(n))` where //! > > `n` is the number of servers in the cluster. //! -//! This should work well for many network requests, but not all of them: some -//! requests, e.g., a request for some particular inventory item, can only be -//! made to a subset of connected peers, e.g., the ones that have recently -//! advertised that inventory hash, and other requests require specialized logic -//! (e.g., transaction diffusion). +//! The Power of Two Choices should work well for many network requests, but not all of them. +//! Some requests should only be made to a subset of connected peers. +//! For example, a request for a particular inventory item +//! should be made to a peer that has recently advertised that inventory hash. +//! Other requests require broadcasts, such as transaction diffusion. //! //! Implementing this specialized routing logic inside the `PeerSet` -- so that //! it continues to abstract away "the rest of the network" into one endpoint -- @@ -43,7 +43,7 @@ //! [tower-balance]: https://crates.io/crates/tower-balance use std::{ - collections::HashMap, + collections::{HashMap, HashSet}, convert, fmt::Debug, future::Future, @@ -61,7 +61,6 @@ use futures::{ prelude::*, stream::FuturesUnordered, }; -use indexmap::IndexMap; use tokio::{ sync::{broadcast, oneshot::error::TryRecvError}, task::JoinHandle, @@ -121,14 +120,20 @@ where /// Connected peers that are ready to receive requests from Zebra, /// or send requests to Zebra. - ready_services: IndexMap, + ready_services: HashMap, - /// A preselected index for a ready service. - /// INVARIANT: If this is `Some(i)`, `i` must be a valid index for `ready_services`. - /// This means that every change to `ready_services` must invalidate or correct it. - preselected_p2c_index: Option, + /// A preselected ready service. + /// + /// # Correctness + /// + /// If this is `Some(addr)`, `addr` must be a key for a peer in `ready_services`. + /// If that peer is removed from `ready_services`, we must set the preselected peer to `None`. + /// + /// This is handled by [`PeerSet::take_ready_service`] and [`PeerSet::route_all`]. + preselected_p2c_peer: Option, - /// Stores gossiped inventory from connected peers. + /// Stores gossiped inventory hashes from connected peers. + /// /// Used to route inventory requests to peers that are likely to have it. inventory_registry: InventoryRegistry, @@ -214,8 +219,8 @@ where Self { // Ready peers discover, - ready_services: IndexMap::new(), - preselected_p2c_index: None, + ready_services: HashMap::new(), + preselected_p2c_peer: None, inventory_registry: InventoryRegistry::new(inv_stream), // Unready peers @@ -292,9 +297,7 @@ where "the peer set requires at least one background task" ); - for handle in handles { - self.guards.push(handle); - } + self.guards.extend(handles); None } @@ -320,8 +323,8 @@ where /// - channels by closing the channel fn shut_down_tasks_and_channels(&mut self) { // Drop services and cancel their background tasks. - self.preselected_p2c_index = None; - self.ready_services = IndexMap::new(); + self.preselected_p2c_peer = None; + self.ready_services = HashMap::new(); for (_peer_key, handle) in self.cancel_handles.drain() { let _ = handle.send(CancelClientWork); @@ -342,34 +345,51 @@ where // TODO: implement graceful shutdown for InventoryRegistry (#1678) } + /// Check busy peer services for request completion or errors. + /// + /// Move newly ready services to the ready list, and drop failed services. fn poll_unready(&mut self, cx: &mut Context<'_>) { loop { match Pin::new(&mut self.unready_services).poll_next(cx) { + // No unready service changes, or empty unready services Poll::Pending | Poll::Ready(None) => return, + + // Unready -> Ready Poll::Ready(Some(Ok((key, svc)))) => { trace!(?key, "service became ready"); - let _cancel = self.cancel_handles.remove(&key); - assert!(_cancel.is_some(), "missing cancel handle"); + let cancel = self.cancel_handles.remove(&key); + assert!(cancel.is_some(), "missing cancel handle"); self.ready_services.insert(key, svc); } + + // Unready -> Canceled Poll::Ready(Some(Err((key, UnreadyError::Canceled)))) => { - trace!(?key, "service was canceled"); - // This debug assert is invalid because we can have a - // service be canceled due us connecting to the same service - // twice. - // - // assert!(!self.cancel_handles.contains_key(&key)) + // A service be canceled because we've connected to the same service twice. + // In that case, there is a cancel handle for the peer address, + // but it belongs to the service for the newer connection. + trace!( + ?key, + duplicate_connection = self.cancel_handles.contains_key(&key), + "service was canceled, dropping service" + ); } + + // Unready -> Errored Poll::Ready(Some(Err((key, UnreadyError::Inner(e))))) => { let error = e.into(); - debug!(%error, "service failed while unready, dropped"); - let _cancel = self.cancel_handles.remove(&key); - assert!(_cancel.is_some(), "missing cancel handle"); + debug!(%error, "service failed while unready, dropping service"); + + let cancel = self.cancel_handles.remove(&key); + assert!(cancel.is_some(), "missing cancel handle"); } } } } + /// Checks for newly inserted or removed services. + /// + /// Puts inserted services in the unready list. + /// Drops removed services, after cancelling any pending requests. fn poll_discover(&mut self, cx: &mut Context<'_>) -> Poll> { use futures::ready; loop { @@ -390,38 +410,42 @@ where } } - /// Takes a ready service by key, preserving `preselected_p2c_index` if possible. - fn take_ready_service(&mut self, key: &D::Key) -> Option<(D::Key, D::Service)> { - if let Some((i, key, svc)) = self.ready_services.swap_remove_full(key) { - // swap_remove perturbs the position of the last element of - // ready_services, so we may have invalidated self.next_idx, in - // which case we need to fix it. Specifically, swap_remove swaps the - // position of the removee and the last element, then drops the - // removee from the end, so we compare the active and removed indices: - // - // We just removed one element, so this was the index of the last element. - let last_idx = self.ready_services.len(); - self.preselected_p2c_index = match self.preselected_p2c_index { - None => None, // No active index - Some(j) if j == i => None, // We removed j - Some(j) if j == last_idx => Some(i), // We swapped i and j - Some(j) => Some(j), // We swapped an unrelated service. - }; - // No Heisenservices: they must be ready or unready. - assert!(!self.cancel_handles.contains_key(&key)); - Some((key, svc)) + /// Takes a ready service by key, invalidating `preselected_p2c_peer` if needed. + fn take_ready_service(&mut self, key: &D::Key) -> Option { + if let Some(svc) = self.ready_services.remove(key) { + if Some(*key) == self.preselected_p2c_peer { + self.preselected_p2c_peer = None; + } + + assert!( + !self.cancel_handles.contains_key(key), + "cancel handles are only used for unready service work" + ); + + Some(svc) } else { None } } + /// Remove the service corresponding to `key` from the peer set. + /// + /// Drops the service, cancelling any pending request or response to that peer. + /// If the peer does not exist, does nothing. fn remove(&mut self, key: &D::Key) { - if self.take_ready_service(key).is_some() { + if let Some(ready_service) = self.take_ready_service(key) { + // A ready service has no work to cancel, so just drop it. + std::mem::drop(ready_service); } else if let Some(handle) = self.cancel_handles.remove(key) { + // Cancel the work, implicitly dropping the cancel handle. + // The service future returns a `Canceled` error, + // making `poll_unready` drop the service. let _ = handle.send(CancelClientWork); } } + /// Adds a busy service to the unready list, + /// and adds a cancel handle for the service's current request. fn push_unready(&mut self, key: D::Key, svc: D::Service) { let (tx, rx) = oneshot::channel(); self.cancel_handles.insert(key, tx); @@ -433,49 +457,83 @@ where }); } - /// Performs P2C on inner services to select a ready service. - fn preselect_p2c_index(&mut self) -> Option { - match self.ready_services.len() { + /// Performs P2C on `self.ready_services` to randomly select a less-loaded ready service. + fn preselect_p2c_peer(&self) -> Option { + self.select_p2c_peer_from_list(self.ready_services.keys().copied().collect()) + } + + /// Performs P2C on `ready_service_list` to randomly select a less-loaded ready service. + fn select_p2c_peer_from_list(&self, ready_service_list: HashSet) -> Option { + match ready_service_list.len() { 0 => None, - 1 => Some(0), + 1 => Some( + ready_service_list + .into_iter() + .next() + .expect("just checked there is one service"), + ), len => { + // If there are only 2 peers, randomise their order. + // Otherwise, choose 2 random peers in a random order. let (a, b) = { let idxs = rand::seq::index::sample(&mut rand::thread_rng(), len, 2); - (idxs.index(0), idxs.index(1)) + let a = idxs.index(0); + let b = idxs.index(1); + + let a = *ready_service_list + .iter() + .nth(a) + .expect("sample returns valid indexes"); + let b = *ready_service_list + .iter() + .nth(b) + .expect("sample returns valid indexes"); + + (a, b) }; - let a_load = self.query_load(a); - let b_load = self.query_load(b); + let a_load = self.query_load(&a).expect("supplied services are ready"); + let b_load = self.query_load(&b).expect("supplied services are ready"); let selected = if a_load <= b_load { a } else { b }; - trace!(a.idx = a, a.load = ?a_load, b.idx = b, b.load = ?b_load, selected, "selected service by p2c"); + trace!( + a.key = ?a, + a.load = ?a_load, + b.key = ?b, + b.load = ?b_load, + selected = ?selected, + ?len, + "selected service by p2c" + ); Some(selected) } } } - /// Accesses a ready endpoint by index and returns its current load. - fn query_load(&self, index: usize) -> ::Metric { - let (_, svc) = self.ready_services.get_index(index).expect("invalid index"); - svc.load() + /// Accesses a ready endpoint by `key` and returns its current load. + /// + /// Returns `None` if the service is not in the ready service list. + fn query_load(&self, key: &D::Key) -> Option<::Metric> { + let svc = self.ready_services.get(key); + svc.map(|svc| svc.load()) } /// Routes a request using P2C load-balancing. fn route_p2c(&mut self, req: Request) -> >::Future { - let index = self - .preselected_p2c_index - .take() - .expect("ready service must have valid preselected index"); + let preselected_key = self + .preselected_p2c_peer + .expect("ready peer service must have a preselected peer"); + + tracing::trace!(?preselected_key, "routing based on p2c"); - let (key, mut svc) = self - .ready_services - .swap_remove_index(index) - .expect("preselected index must be valid"); + let mut svc = self + .take_ready_service(&preselected_key) + .expect("ready peer set must have preselected a ready peer"); let fut = svc.call(req); - self.push_unready(key, svc); + self.push_unready(preselected_key, svc); fut.map_err(Into::into).boxed() } @@ -486,32 +544,44 @@ where req: Request, hash: InventoryHash, ) -> >::Future { - let peer = self + let inventory_peer_list = self .inventory_registry .peers(&hash) - .find(|&key| self.ready_services.contains_key(key)) - .cloned(); + .filter(|&key| self.ready_services.contains_key(key)) + .copied() + .collect(); + + // # Security + // + // Choose a random, less-loaded peer with the inventory. + // + // If we chose the first peer in HashMap order, + // peers would be able to influence our choice by switching addresses. + // But we need the choice to be random, + // so that a peer can't provide all our inventory responses. + let peer = self.select_p2c_peer_from_list(inventory_peer_list); match peer.and_then(|key| self.take_ready_service(&key)) { - Some((key, mut svc)) => { - tracing::debug!(?hash, ?key, "routing based on inventory"); + Some(mut svc) => { + let peer = peer.expect("just checked peer is Some"); + tracing::trace!(?hash, ?peer, "routing based on inventory"); let fut = svc.call(req); - self.push_unready(key, svc); + self.push_unready(peer, svc); fut.map_err(Into::into).boxed() } None => { - tracing::debug!(?hash, "no ready peer for inventory, falling back to p2c"); + tracing::trace!(?hash, "no ready peer for inventory, falling back to p2c"); self.route_p2c(req) } } } - // Routes a request to all ready peers, ignoring return values. + /// Routes a request to all ready peers, ignoring return values. fn route_all(&mut self, req: Request) -> >::Future { // This is not needless: otherwise, we'd hold a &mut reference to self.ready_services, // blocking us from passing &mut self to push_unready. let ready_services = std::mem::take(&mut self.ready_services); - self.preselected_p2c_index = None; // All services are now unready. + self.preselected_p2c_peer = None; // All services are now unready. let futs = FuturesUnordered::new(); for (key, mut svc) in ready_services { @@ -524,12 +594,14 @@ where tracing::debug!( ok.len = results.iter().filter(|r| r.is_ok()).count(), err.len = results.iter().filter(|r| r.is_err()).count(), + "sent peer request broadcast" ); Ok(Response::Nil) } .boxed() } + /// Logs the peer set size. fn log_peer_set_size(&mut self) { let ready_services_len = self.ready_services.len(); let unready_services_len = self.unready_services.len(); @@ -575,6 +647,11 @@ where } } + /// Updates the peer set metrics. + /// + /// # Panics + /// + /// If the peer set size exceeds the connection limit. fn update_metrics(&self) { let num_ready = self.ready_services.len(); let num_unready = self.unready_services.len(); @@ -612,7 +689,8 @@ where fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.poll_background_errors(cx)?; - // Process peer discovery updates. + + // Update peer statuses let _ = self.poll_discover(cx)?; self.inventory_registry.poll_inventory(cx)?; self.poll_unready(cx); @@ -624,37 +702,35 @@ where // Re-check that the pre-selected service is ready, in case // something has happened since (e.g., it failed, peer closed // connection, ...) - if let Some(index) = self.preselected_p2c_index { - let (key, service) = self - .ready_services - .get_index_mut(index) - .expect("preselected index must be valid"); - trace!(preselected_index = index, ?key); + if let Some(key) = self.preselected_p2c_peer { + trace!(preselected_key = ?key); + let mut service = self + .take_ready_service(&key) + .expect("preselected peer must be in the ready list"); match service.poll_ready(cx) { - Poll::Ready(Ok(())) => return Poll::Ready(Ok(())), + Poll::Ready(Ok(())) => { + trace!("preselected service is still ready, keeping it selected"); + self.preselected_p2c_peer = Some(key); + self.ready_services.insert(key, service); + return Poll::Ready(Ok(())); + } Poll::Pending => { - trace!("preselected service is no longer ready"); - let (key, service) = self - .ready_services - .swap_remove_index(index) - .expect("preselected index must be valid"); + trace!("preselected service is no longer ready, moving to unready list"); self.push_unready(key, service); } Poll::Ready(Err(e)) => { let error = e.into(); trace!(%error, "preselected service failed, dropping it"); - self.ready_services - .swap_remove_index(index) - .expect("preselected index must be valid"); + std::mem::drop(service); } } } - trace!("preselected service was not ready, reselecting"); - self.preselected_p2c_index = self.preselect_p2c_index(); + trace!("preselected service was not ready, preselecting another ready service"); + self.preselected_p2c_peer = self.preselect_p2c_peer(); self.update_metrics(); - if self.preselected_p2c_index.is_none() { + if self.preselected_p2c_peer.is_none() { // CORRECTNESS // // If the channel is full, drop the demand signal rather than waiting.