diff --git a/zebra-network/src/peer_set/set.rs b/zebra-network/src/peer_set/set.rs index 0b614637e11..f7187b5a2ab 100644 --- a/zebra-network/src/peer_set/set.rs +++ b/zebra-network/src/peer_set/set.rs @@ -108,37 +108,80 @@ pub struct CancelClientWork; /// Otherwise, malicious peers could interfere with other peers' `PeerSet` state. pub struct PeerSet where - D: Discover, + D: Discover + Unpin, + D::Service: Service + Load, + D::Error: Into, + >::Error: Into + 'static, + >::Future: Send + 'static, + ::Metric: Debug, { + /// Provides new and deleted peer [`Change`]s to the peer set, + /// via the [`Discover`] trait implementation. discover: D, + + /// Connected peers that are ready to receive requests from Zebra, + /// or send requests to Zebra. + ready_services: IndexMap, + /// A preselected index for a ready service. /// INVARIANT: If this is `Some(i)`, `i` must be a valid index for `ready_services`. /// This means that every change to `ready_services` must invalidate or correct it. preselected_p2c_index: Option, - ready_services: IndexMap, - cancel_handles: HashMap>, + + /// Stores gossiped inventory from connected peers. + /// Used to route inventory requests to peers that are likely to have it. + inventory_registry: InventoryRegistry, + + /// Connected peers that are handling a Zebra request, + /// or Zebra is handling one of their requests. unready_services: FuturesUnordered>, + + /// Channels used to cancel the request that an unready service is doing. + cancel_handles: HashMap>, + + /// A channel that asks the peer crawler task to connect to more peers. demand_signal: mpsc::Sender, + /// Channel for passing ownership of tokio JoinHandles from PeerSet's background tasks /// /// The join handles passed into the PeerSet are used populate the `guards` member handle_rx: tokio::sync::oneshot::Receiver>>>, + /// Unordered set of handles to background tasks associated with the `PeerSet` /// /// These guards are checked for errors as part of `poll_ready` which lets /// the `PeerSet` propagate errors from background tasks back to the user guards: futures::stream::FuturesUnordered>>, - inventory_registry: InventoryRegistry, - /// The last time we logged a message about the peer set size - last_peer_log: Option, + /// A shared list of peer addresses. /// /// Used for logging diagnostics. address_book: Arc>, + + /// The last time we logged a message about the peer set size + last_peer_log: Option, + /// The configured limit for inbound and outbound connections. + /// + /// The peer set panics if this size is exceeded. + /// If that happens, our connection limit code has a bug. peerset_total_connection_limit: usize, } +impl Drop for PeerSet +where + D: Discover + Unpin, + D::Service: Service + Load, + D::Error: Into, + >::Error: Into + 'static, + >::Future: Send + 'static, + ::Metric: Debug, +{ + fn drop(&mut self) { + self.shut_down_tasks_and_channels() + } +} + impl PeerSet where D: Discover + Unpin, @@ -169,15 +212,22 @@ where address_book: Arc>, ) -> Self { Self { + // Ready peers discover, - preselected_p2c_index: None, ready_services: IndexMap::new(), - cancel_handles: HashMap::new(), + preselected_p2c_index: None, + inventory_registry: InventoryRegistry::new(inv_stream), + + // Unready peers unready_services: FuturesUnordered::new(), + cancel_handles: HashMap::new(), demand_signal, - guards: futures::stream::FuturesUnordered::new(), + + // Background tasks handle_rx, - inventory_registry: InventoryRegistry::new(inv_stream), + guards: futures::stream::FuturesUnordered::new(), + + // Metrics last_peer_log: None, address_book, peerset_total_connection_limit: config.peerset_total_connection_limit(), @@ -189,42 +239,107 @@ where /// If any background task exits, shuts down all other background tasks, /// and returns an error. fn poll_background_errors(&mut self, cx: &mut Context) -> Result<(), BoxError> { - if self.guards.is_empty() { - match self.handle_rx.try_recv() { - Ok(handles) => { - for handle in handles { - self.guards.push(handle); - } - } - Err(TryRecvError::Closed) => unreachable!( - "try_recv will never be called if the futures have already been received" - ), - Err(TryRecvError::Empty) => return Ok(()), - } + if let Some(result) = self.receive_tasks_if_needed() { + return result; } - let exit_error = match Pin::new(&mut self.guards).poll_next(cx) { - Poll::Pending => return Ok(()), + match Pin::new(&mut self.guards).poll_next(cx) { + // All background tasks are still running. + Poll::Pending => Ok(()), + Poll::Ready(Some(res)) => { info!( background_tasks = %self.guards.len(), "a peer set background task exited, shutting down other peer set tasks" ); + self.shut_down_tasks_and_channels(); + // Flatten the join result and inner result, // then turn Ok() task exits into errors. res.map_err(Into::into) + // TODO: replace with Result::flatten when it stabilises (#70142) .and_then(convert::identity) .and(Err("a peer set background task exited".into())) } - Poll::Ready(None) => Err("all peer set background tasks have exited".into()), - }; + Poll::Ready(None) => { + self.shut_down_tasks_and_channels(); + Err("all peer set background tasks have exited".into()) + } + } + } + + /// Receive background tasks, if they've been sent on the channel, + /// but not consumed yet. + /// + /// Returns a result representing the current task state, + /// or `None` if the background tasks should be polled to check their state. + fn receive_tasks_if_needed(&mut self) -> Option> { + if self.guards.is_empty() { + match self.handle_rx.try_recv() { + // The tasks haven't been sent yet. + Err(TryRecvError::Empty) => Some(Ok(())), + + // The tasks have been sent, but not consumed. + Ok(handles) => { + // Currently, the peer set treats an empty background task set as an error. + // + // TODO: refactor `handle_rx` and `guards` into an enum + // for the background task state: Waiting/Running/Shutdown. + assert!( + !handles.is_empty(), + "the peer set requires at least one background task" + ); + + for handle in handles { + self.guards.push(handle); + } + + None + } + + // The tasks have been sent and consumed, but then they exited. + // + // Correctness: the peer set must receive at least one task. + // + // TODO: refactor `handle_rx` and `guards` into an enum + // for the background task state: Waiting/Running/Shutdown. + Err(TryRecvError::Closed) => { + Some(Err("all peer set background tasks have exited".into())) + } + } + } else { + None + } + } + + /// Shut down: + /// - services by dropping the service lists + /// - background tasks via their join handles or cancel handles + /// - channels by closing the channel + fn shut_down_tasks_and_channels(&mut self) { + // Drop services and cancel their background tasks. + self.preselected_p2c_index = None; + self.ready_services = IndexMap::new(); + + for (_peer_key, handle) in self.cancel_handles.drain() { + let _ = handle.send(CancelClientWork); + } + self.unready_services = FuturesUnordered::new(); + + // Close the MorePeers channel for all senders, + // so we don't add more peers to a shut down peer set. + self.demand_signal.close_channel(); + + // Shut down background tasks. + self.handle_rx.close(); + self.receive_tasks_if_needed(); for guard in self.guards.iter() { guard.abort(); } - exit_error + // TODO: implement graceful shutdown for InventoryRegistry (#1678) } fn poll_unready(&mut self, cx: &mut Context<'_>) {