Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shut down channels and tasks on PeerSet Drop #3078

Merged
merged 5 commits into from
Nov 23, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 142 additions & 27 deletions zebra-network/src/peer_set/set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,37 +108,80 @@ pub struct CancelClientWork;
/// Otherwise, malicious peers could interfere with other peers' `PeerSet` state.
pub struct PeerSet<D>
where
D: Discover<Key = SocketAddr>,
D: Discover<Key = SocketAddr> + Unpin,
D::Service: Service<Request, Response = Response> + Load,
D::Error: Into<BoxError>,
<D::Service as Service<Request>>::Error: Into<BoxError> + 'static,
<D::Service as Service<Request>>::Future: Send + 'static,
<D::Service as Load>::Metric: Debug,
{
/// Provides new and deleted peer [`Change`]s to the peer set,
/// via the [`Discover`] trait implementation.
discover: D,

/// Connected peers that are ready to receive requests from Zebra,
/// or send requests to Zebra.
ready_services: IndexMap<D::Key, D::Service>,

/// A preselected index for a ready service.
/// INVARIANT: If this is `Some(i)`, `i` must be a valid index for `ready_services`.
/// This means that every change to `ready_services` must invalidate or correct it.
preselected_p2c_index: Option<usize>,
ready_services: IndexMap<D::Key, D::Service>,
cancel_handles: HashMap<D::Key, oneshot::Sender<CancelClientWork>>,

/// Stores gossiped inventory from connected peers.
/// Used to route inventory requests to peers that are likely to have it.
Comment on lines +131 to +132
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Not sure if this was on purpose, but maybe separate the second sentence into the details section?

Suggested change
/// Stores gossiped inventory from connected peers.
/// Used to route inventory requests to peers that are likely to have it.
/// Stores gossiped inventory from connected peers.
/// Used to route inventory requests to peers that are likely to have it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 7849ca6 in PR #3090.

inventory_registry: InventoryRegistry,

/// Connected peers that are handling a Zebra request,
/// or Zebra is handling one of their requests.
unready_services: FuturesUnordered<UnreadyService<D::Key, D::Service, Request>>,

/// Channels used to cancel the request that an unready service is doing.
cancel_handles: HashMap<D::Key, oneshot::Sender<CancelClientWork>>,

/// A channel that asks the peer crawler task to connect to more peers.
demand_signal: mpsc::Sender<MorePeers>,

/// Channel for passing ownership of tokio JoinHandles from PeerSet's background tasks
///
/// The join handles passed into the PeerSet are used populate the `guards` member
handle_rx: tokio::sync::oneshot::Receiver<Vec<JoinHandle<Result<(), BoxError>>>>,

/// Unordered set of handles to background tasks associated with the `PeerSet`
///
/// These guards are checked for errors as part of `poll_ready` which lets
/// the `PeerSet` propagate errors from background tasks back to the user
guards: futures::stream::FuturesUnordered<JoinHandle<Result<(), BoxError>>>,
inventory_registry: InventoryRegistry,
/// The last time we logged a message about the peer set size
last_peer_log: Option<Instant>,

/// A shared list of peer addresses.
///
/// Used for logging diagnostics.
address_book: Arc<std::sync::Mutex<AddressBook>>,

/// The last time we logged a message about the peer set size
last_peer_log: Option<Instant>,

/// The configured limit for inbound and outbound connections.
///
/// The peer set panics if this size is exceeded.
/// If that happens, our connection limit code has a bug.
peerset_total_connection_limit: usize,
}

impl<D> Drop for PeerSet<D>
where
D: Discover<Key = SocketAddr> + Unpin,
D::Service: Service<Request, Response = Response> + Load,
D::Error: Into<BoxError>,
<D::Service as Service<Request>>::Error: Into<BoxError> + 'static,
<D::Service as Service<Request>>::Future: Send + 'static,
<D::Service as Load>::Metric: Debug,
{
fn drop(&mut self) {
self.shut_down_tasks_and_channels()
}
}

impl<D> PeerSet<D>
where
D: Discover<Key = SocketAddr> + Unpin,
Expand Down Expand Up @@ -169,15 +212,22 @@ where
address_book: Arc<std::sync::Mutex<AddressBook>>,
) -> Self {
Self {
// Ready peers
discover,
preselected_p2c_index: None,
ready_services: IndexMap::new(),
cancel_handles: HashMap::new(),
preselected_p2c_index: None,
inventory_registry: InventoryRegistry::new(inv_stream),

// Unready peers
unready_services: FuturesUnordered::new(),
cancel_handles: HashMap::new(),
demand_signal,
guards: futures::stream::FuturesUnordered::new(),

// Background tasks
handle_rx,
inventory_registry: InventoryRegistry::new(inv_stream),
guards: futures::stream::FuturesUnordered::new(),

// Metrics
last_peer_log: None,
address_book,
peerset_total_connection_limit: config.peerset_total_connection_limit(),
Expand All @@ -189,42 +239,107 @@ where
/// If any background task exits, shuts down all other background tasks,
/// and returns an error.
fn poll_background_errors(&mut self, cx: &mut Context) -> Result<(), BoxError> {
if self.guards.is_empty() {
match self.handle_rx.try_recv() {
Ok(handles) => {
for handle in handles {
self.guards.push(handle);
}
}
Err(TryRecvError::Closed) => unreachable!(
"try_recv will never be called if the futures have already been received"
),
Err(TryRecvError::Empty) => return Ok(()),
}
if let Some(result) = self.receive_tasks_if_needed() {
return result;
}

let exit_error = match Pin::new(&mut self.guards).poll_next(cx) {
Poll::Pending => return Ok(()),
match Pin::new(&mut self.guards).poll_next(cx) {
// All background tasks are still running.
Poll::Pending => Ok(()),

Poll::Ready(Some(res)) => {
info!(
background_tasks = %self.guards.len(),
"a peer set background task exited, shutting down other peer set tasks"
);

self.shut_down_tasks_and_channels();

// Flatten the join result and inner result,
// then turn Ok() task exits into errors.
res.map_err(Into::into)
// TODO: replace with Result::flatten when it stabilises (#70142)
.and_then(convert::identity)
.and(Err("a peer set background task exited".into()))
}
Poll::Ready(None) => Err("all peer set background tasks have exited".into()),
};

Poll::Ready(None) => {
self.shut_down_tasks_and_channels();
Err("all peer set background tasks have exited".into())
}
}
}

/// Receive background tasks, if they've been sent on the channel,
/// but not consumed yet.
///
/// Returns a result representing the current task state,
/// or `None` if the background tasks should be polled to check their state.
fn receive_tasks_if_needed(&mut self) -> Option<Result<(), BoxError>> {
if self.guards.is_empty() {
match self.handle_rx.try_recv() {
// The tasks haven't been sent yet.
Err(TryRecvError::Empty) => Some(Ok(())),

// The tasks have been sent, but not consumed.
Ok(handles) => {
// Currently, the peer set treats an empty background task set as an error.
//
// TODO: refactor `handle_rx` and `guards` into an enum
// for the background task state: Waiting/Running/Shutdown.
assert!(
!handles.is_empty(),
"the peer set requires at least one background task"
);

for handle in handles {
self.guards.push(handle);
}
Comment on lines +295 to +297
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional suggestion to make it shorter:

Suggested change
for handle in handles {
self.guards.push(handle);
}
self.guards.extend(handles);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 7849ca6 in PR #3090.


None
}

// The tasks have been sent and consumed, but then they exited.
//
// Correctness: the peer set must receive at least one task.
//
// TODO: refactor `handle_rx` and `guards` into an enum
// for the background task state: Waiting/Running/Shutdown.
Err(TryRecvError::Closed) => {
Some(Err("all peer set background tasks have exited".into()))
}
}
} else {
None
}
}

/// Shut down:
/// - services by dropping the service lists
/// - background tasks via their join handles or cancel handles
/// - channels by closing the channel
fn shut_down_tasks_and_channels(&mut self) {
// Drop services and cancel their background tasks.
self.preselected_p2c_index = None;
self.ready_services = IndexMap::new();

for (_peer_key, handle) in self.cancel_handles.drain() {
let _ = handle.send(CancelClientWork);
}
self.unready_services = FuturesUnordered::new();

// Close the MorePeers channel for all senders,
// so we don't add more peers to a shut down peer set.
self.demand_signal.close_channel();

// Shut down background tasks.
self.handle_rx.close();
self.receive_tasks_if_needed();
for guard in self.guards.iter() {
guard.abort();
}

exit_error
// TODO: implement graceful shutdown for InventoryRegistry (#1678)
}

fn poll_unready(&mut self, cx: &mut Context<'_>) {
Expand Down