-
Notifications
You must be signed in to change notification settings - Fork 114
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Shut down channels and tasks on PeerSet Drop #3078
Merged
Merged
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
ac11728
Shut down channels and tasks on PeerSet Drop
teor2345 d154313
Document all the PeerSet fields
teor2345 4c2bf31
Close the peer set background task handle on shutdown
teor2345 7871b11
Receive background tasks during shutdown
teor2345 7760bf1
Tweak comments
teor2345 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -108,37 +108,80 @@ pub struct CancelClientWork; | |||||||||
/// Otherwise, malicious peers could interfere with other peers' `PeerSet` state. | ||||||||||
pub struct PeerSet<D> | ||||||||||
where | ||||||||||
D: Discover<Key = SocketAddr>, | ||||||||||
D: Discover<Key = SocketAddr> + Unpin, | ||||||||||
D::Service: Service<Request, Response = Response> + Load, | ||||||||||
D::Error: Into<BoxError>, | ||||||||||
<D::Service as Service<Request>>::Error: Into<BoxError> + 'static, | ||||||||||
<D::Service as Service<Request>>::Future: Send + 'static, | ||||||||||
<D::Service as Load>::Metric: Debug, | ||||||||||
{ | ||||||||||
/// Provides new and deleted peer [`Change`]s to the peer set, | ||||||||||
/// via the [`Discover`] trait implementation. | ||||||||||
discover: D, | ||||||||||
|
||||||||||
/// Connected peers that are ready to receive requests from Zebra, | ||||||||||
/// or send requests to Zebra. | ||||||||||
ready_services: IndexMap<D::Key, D::Service>, | ||||||||||
|
||||||||||
/// A preselected index for a ready service. | ||||||||||
/// INVARIANT: If this is `Some(i)`, `i` must be a valid index for `ready_services`. | ||||||||||
/// This means that every change to `ready_services` must invalidate or correct it. | ||||||||||
preselected_p2c_index: Option<usize>, | ||||||||||
ready_services: IndexMap<D::Key, D::Service>, | ||||||||||
cancel_handles: HashMap<D::Key, oneshot::Sender<CancelClientWork>>, | ||||||||||
|
||||||||||
/// Stores gossiped inventory from connected peers. | ||||||||||
/// Used to route inventory requests to peers that are likely to have it. | ||||||||||
inventory_registry: InventoryRegistry, | ||||||||||
|
||||||||||
/// Connected peers that are handling a Zebra request, | ||||||||||
/// or Zebra is handling one of their requests. | ||||||||||
unready_services: FuturesUnordered<UnreadyService<D::Key, D::Service, Request>>, | ||||||||||
|
||||||||||
/// Channels used to cancel the request that an unready service is doing. | ||||||||||
cancel_handles: HashMap<D::Key, oneshot::Sender<CancelClientWork>>, | ||||||||||
|
||||||||||
/// A channel that asks the peer crawler task to connect to more peers. | ||||||||||
demand_signal: mpsc::Sender<MorePeers>, | ||||||||||
|
||||||||||
/// Channel for passing ownership of tokio JoinHandles from PeerSet's background tasks | ||||||||||
/// | ||||||||||
/// The join handles passed into the PeerSet are used populate the `guards` member | ||||||||||
handle_rx: tokio::sync::oneshot::Receiver<Vec<JoinHandle<Result<(), BoxError>>>>, | ||||||||||
|
||||||||||
/// Unordered set of handles to background tasks associated with the `PeerSet` | ||||||||||
/// | ||||||||||
/// These guards are checked for errors as part of `poll_ready` which lets | ||||||||||
/// the `PeerSet` propagate errors from background tasks back to the user | ||||||||||
guards: futures::stream::FuturesUnordered<JoinHandle<Result<(), BoxError>>>, | ||||||||||
inventory_registry: InventoryRegistry, | ||||||||||
/// The last time we logged a message about the peer set size | ||||||||||
last_peer_log: Option<Instant>, | ||||||||||
|
||||||||||
/// A shared list of peer addresses. | ||||||||||
/// | ||||||||||
/// Used for logging diagnostics. | ||||||||||
address_book: Arc<std::sync::Mutex<AddressBook>>, | ||||||||||
|
||||||||||
/// The last time we logged a message about the peer set size | ||||||||||
last_peer_log: Option<Instant>, | ||||||||||
|
||||||||||
/// The configured limit for inbound and outbound connections. | ||||||||||
/// | ||||||||||
/// The peer set panics if this size is exceeded. | ||||||||||
/// If that happens, our connection limit code has a bug. | ||||||||||
peerset_total_connection_limit: usize, | ||||||||||
} | ||||||||||
|
||||||||||
impl<D> Drop for PeerSet<D> | ||||||||||
where | ||||||||||
D: Discover<Key = SocketAddr> + Unpin, | ||||||||||
D::Service: Service<Request, Response = Response> + Load, | ||||||||||
D::Error: Into<BoxError>, | ||||||||||
<D::Service as Service<Request>>::Error: Into<BoxError> + 'static, | ||||||||||
<D::Service as Service<Request>>::Future: Send + 'static, | ||||||||||
<D::Service as Load>::Metric: Debug, | ||||||||||
{ | ||||||||||
fn drop(&mut self) { | ||||||||||
self.shut_down_tasks_and_channels() | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
impl<D> PeerSet<D> | ||||||||||
where | ||||||||||
D: Discover<Key = SocketAddr> + Unpin, | ||||||||||
|
@@ -169,15 +212,22 @@ where | |||||||||
address_book: Arc<std::sync::Mutex<AddressBook>>, | ||||||||||
) -> Self { | ||||||||||
Self { | ||||||||||
// Ready peers | ||||||||||
discover, | ||||||||||
preselected_p2c_index: None, | ||||||||||
ready_services: IndexMap::new(), | ||||||||||
cancel_handles: HashMap::new(), | ||||||||||
preselected_p2c_index: None, | ||||||||||
inventory_registry: InventoryRegistry::new(inv_stream), | ||||||||||
|
||||||||||
// Unready peers | ||||||||||
unready_services: FuturesUnordered::new(), | ||||||||||
cancel_handles: HashMap::new(), | ||||||||||
demand_signal, | ||||||||||
guards: futures::stream::FuturesUnordered::new(), | ||||||||||
|
||||||||||
// Background tasks | ||||||||||
handle_rx, | ||||||||||
inventory_registry: InventoryRegistry::new(inv_stream), | ||||||||||
guards: futures::stream::FuturesUnordered::new(), | ||||||||||
|
||||||||||
// Metrics | ||||||||||
last_peer_log: None, | ||||||||||
address_book, | ||||||||||
peerset_total_connection_limit: config.peerset_total_connection_limit(), | ||||||||||
|
@@ -189,42 +239,107 @@ where | |||||||||
/// If any background task exits, shuts down all other background tasks, | ||||||||||
/// and returns an error. | ||||||||||
fn poll_background_errors(&mut self, cx: &mut Context) -> Result<(), BoxError> { | ||||||||||
if self.guards.is_empty() { | ||||||||||
match self.handle_rx.try_recv() { | ||||||||||
Ok(handles) => { | ||||||||||
for handle in handles { | ||||||||||
self.guards.push(handle); | ||||||||||
} | ||||||||||
} | ||||||||||
Err(TryRecvError::Closed) => unreachable!( | ||||||||||
"try_recv will never be called if the futures have already been received" | ||||||||||
), | ||||||||||
Err(TryRecvError::Empty) => return Ok(()), | ||||||||||
} | ||||||||||
if let Some(result) = self.receive_tasks_if_needed() { | ||||||||||
return result; | ||||||||||
} | ||||||||||
|
||||||||||
let exit_error = match Pin::new(&mut self.guards).poll_next(cx) { | ||||||||||
Poll::Pending => return Ok(()), | ||||||||||
match Pin::new(&mut self.guards).poll_next(cx) { | ||||||||||
// All background tasks are still running. | ||||||||||
Poll::Pending => Ok(()), | ||||||||||
|
||||||||||
Poll::Ready(Some(res)) => { | ||||||||||
info!( | ||||||||||
background_tasks = %self.guards.len(), | ||||||||||
"a peer set background task exited, shutting down other peer set tasks" | ||||||||||
); | ||||||||||
|
||||||||||
self.shut_down_tasks_and_channels(); | ||||||||||
|
||||||||||
// Flatten the join result and inner result, | ||||||||||
// then turn Ok() task exits into errors. | ||||||||||
res.map_err(Into::into) | ||||||||||
// TODO: replace with Result::flatten when it stabilises (#70142) | ||||||||||
.and_then(convert::identity) | ||||||||||
.and(Err("a peer set background task exited".into())) | ||||||||||
} | ||||||||||
Poll::Ready(None) => Err("all peer set background tasks have exited".into()), | ||||||||||
}; | ||||||||||
|
||||||||||
Poll::Ready(None) => { | ||||||||||
self.shut_down_tasks_and_channels(); | ||||||||||
Err("all peer set background tasks have exited".into()) | ||||||||||
} | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
/// Receive background tasks, if they've been sent on the channel, | ||||||||||
/// but not consumed yet. | ||||||||||
/// | ||||||||||
/// Returns a result representing the current task state, | ||||||||||
/// or `None` if the background tasks should be polled to check their state. | ||||||||||
fn receive_tasks_if_needed(&mut self) -> Option<Result<(), BoxError>> { | ||||||||||
if self.guards.is_empty() { | ||||||||||
match self.handle_rx.try_recv() { | ||||||||||
// The tasks haven't been sent yet. | ||||||||||
Err(TryRecvError::Empty) => Some(Ok(())), | ||||||||||
|
||||||||||
// The tasks have been sent, but not consumed. | ||||||||||
Ok(handles) => { | ||||||||||
// Currently, the peer set treats an empty background task set as an error. | ||||||||||
// | ||||||||||
// TODO: refactor `handle_rx` and `guards` into an enum | ||||||||||
// for the background task state: Waiting/Running/Shutdown. | ||||||||||
assert!( | ||||||||||
!handles.is_empty(), | ||||||||||
"the peer set requires at least one background task" | ||||||||||
); | ||||||||||
|
||||||||||
for handle in handles { | ||||||||||
self.guards.push(handle); | ||||||||||
} | ||||||||||
Comment on lines
+295
to
+297
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Optional suggestion to make it shorter:
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||||||
|
||||||||||
None | ||||||||||
} | ||||||||||
|
||||||||||
// The tasks have been sent and consumed, but then they exited. | ||||||||||
// | ||||||||||
// Correctness: the peer set must receive at least one task. | ||||||||||
// | ||||||||||
// TODO: refactor `handle_rx` and `guards` into an enum | ||||||||||
// for the background task state: Waiting/Running/Shutdown. | ||||||||||
Err(TryRecvError::Closed) => { | ||||||||||
Some(Err("all peer set background tasks have exited".into())) | ||||||||||
} | ||||||||||
} | ||||||||||
} else { | ||||||||||
None | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
/// Shut down: | ||||||||||
/// - services by dropping the service lists | ||||||||||
/// - background tasks via their join handles or cancel handles | ||||||||||
/// - channels by closing the channel | ||||||||||
fn shut_down_tasks_and_channels(&mut self) { | ||||||||||
// Drop services and cancel their background tasks. | ||||||||||
self.preselected_p2c_index = None; | ||||||||||
self.ready_services = IndexMap::new(); | ||||||||||
|
||||||||||
for (_peer_key, handle) in self.cancel_handles.drain() { | ||||||||||
let _ = handle.send(CancelClientWork); | ||||||||||
} | ||||||||||
self.unready_services = FuturesUnordered::new(); | ||||||||||
|
||||||||||
// Close the MorePeers channel for all senders, | ||||||||||
// so we don't add more peers to a shut down peer set. | ||||||||||
self.demand_signal.close_channel(); | ||||||||||
|
||||||||||
// Shut down background tasks. | ||||||||||
self.handle_rx.close(); | ||||||||||
self.receive_tasks_if_needed(); | ||||||||||
for guard in self.guards.iter() { | ||||||||||
guard.abort(); | ||||||||||
} | ||||||||||
|
||||||||||
exit_error | ||||||||||
// TODO: implement graceful shutdown for InventoryRegistry (#1678) | ||||||||||
} | ||||||||||
|
||||||||||
fn poll_unready(&mut self, cx: &mut Context<'_>) { | ||||||||||
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Not sure if this was on purpose, but maybe separate the second sentence into the details section?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in 7849ca6 in PR #3090.