Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core/swarm] Graceful shutdown for connections, networks and swarms. #1682

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 104 additions & 26 deletions core/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub use error::{ConnectionError, PendingConnectionError};
pub use handler::{ConnectionHandler, ConnectionHandlerEvent, IntoConnectionHandler};
pub use listeners::{ListenerId, ListenersStream, ListenersEvent};
pub use manager::ConnectionId;
pub use substream::{Substream, SubstreamEndpoint, Close};
pub use substream::{Substream, SubstreamEndpoint};
pub use pool::{EstablishedConnection, EstablishedConnectionIter, PendingConnection};

use crate::muxing::StreamMuxer;
Expand Down Expand Up @@ -194,10 +194,12 @@ where
TMuxer: StreamMuxer,
THandler: ConnectionHandler<Substream = Substream<TMuxer>>,
{
/// Node that handles the muxing.
/// The substream multiplexer over the connection I/O stream.
muxing: substream::Muxing<TMuxer, THandler::OutboundOpenInfo>,
/// Handler that processes substreams.
/// The connection handler for the substreams.
handler: THandler,
/// The operating state of the connection.
state: ConnectionState,
}

impl<TMuxer, THandler> fmt::Debug for Connection<TMuxer, THandler>
Expand Down Expand Up @@ -231,68 +233,122 @@ where
Connection {
muxing: Muxing::new(muxer),
handler,
state: ConnectionState::Open,
}
}

/// Returns a reference to the `ConnectionHandler`
pub fn handler(&self) -> &THandler {
&self.handler
}

/// Returns a mutable reference to the `ConnectionHandler`
pub fn handler_mut(&mut self) -> &mut THandler {
&mut self.handler
/// Notifies the connection handler of an event.
///
/// Returns `Ok` if the event was delivered to the handler, `Err`
/// if the connection is closing and the handler is already closed.
pub fn inject_event(&mut self, event: THandler::InEvent) -> Result<(), THandler::InEvent> {
match self.state {
ConnectionState::Open | ConnectionState::CloseHandler => {
self.handler.inject_event(event);
Ok(())
}
_ => Err(event)
}
}

/// Notifies the connection handler of an event.
pub fn inject_event(&mut self, event: THandler::InEvent) {
self.handler.inject_event(event);
/// Begins a graceful shutdown of the connection.
///
/// The connection must continue to be `poll()`ed to drive the
/// shutdown process to completion. Once connection shutdown is
/// complete, `poll()` returns `Ok(None)`.
pub fn start_close(&mut self) {
if self.state == ConnectionState::Open {
self.state = ConnectionState::CloseHandler;
}
}

/// Begins an orderly shutdown of the connection, returning a
/// `Future` that resolves when connection shutdown is complete.
pub fn close(self) -> Close<TMuxer> {
self.muxing.close().0
/// Whether the connection is open, i.e. neither closing nor already closed.
pub fn is_open(&self) -> bool {
self.state == ConnectionState::Open
}

/// Polls the connection for events produced by the associated handler
/// as a result of I/O activity on the substream multiplexer.
///
/// > **Note**: A return value of `Ok(None)` signals successful
/// > connection shutdown, whereas an `Err` signals termination
/// > of the connection due to an error. In either case, the
/// > connection must be dropped; any further method calls
/// > result in unspecified behaviour.
pub fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Result<Event<THandler::OutEvent>, ConnectionError<THandler::Error>>>
-> Poll<Result<Option<Event<THandler::OutEvent>>, ConnectionError<THandler::Error>>>
{
loop {
if let ConnectionState::Closed = self.state { // (1)
return Poll::Ready(Ok(None))
}

if let ConnectionState::CloseMuxer = self.state { // (2)
romanb marked this conversation as resolved.
Show resolved Hide resolved
match futures::ready!(self.muxing.poll_close(cx)) {
Ok(()) => {
self.state = ConnectionState::Closed;
return Poll::Ready(Ok(None))
}
Err(e) => {
return Poll::Ready(Err(ConnectionError::IO(e)))
}
}
}

// At this point the connection is either open or in the process
// of a graceful shutdown by the connection handler.
let mut io_pending = false;

// Perform I/O on the connection through the muxer, informing the handler
// of new substreams.
// of new substreams or other muxer events.
match self.muxing.poll(cx) {
Poll::Pending => io_pending = true,
Poll::Ready(Ok(SubstreamEvent::InboundSubstream { substream })) => {
self.handler.inject_substream(substream, SubstreamEndpoint::Listener)
// Drop new inbound substreams when closing. This is analogous
// to rejecting new connections.
if self.state == ConnectionState::Open {
self.handler.inject_substream(substream, SubstreamEndpoint::Listener)
} else {
log::trace!("Inbound substream dropped. Connection is closing.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that dropping a substream can cause write errors on the remote side, e.g. sending of initial data could fail when the substream is reset.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the current idea is to treat new inbound substreams analogously (as much as possible anyway) to new connections during shutdown - that is, to refuse them at the earliest possible moment. Any failures relating to attempts at creating new substreams or connections towards the peer that is shutting down should ideally result in retries with a different peer, much like without the graceful shutdown.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if the shutdown logic as currently implemented can really alleviate error rates because any error in one substream will often lead to a connection close, causing failures in other substreams. For example, the request-response handler will immediately close the connection when any inbound or outbound upgrade fails other than via timeout or protocol mismatch. This means that if the remote closes the connection and my next outbound request over this connection fails, all my ongoing requests to this connection will fail too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is true, unfortunately. It depends very much on how substream write errors are handled, in the specific case of libp2p-request-response by RequestResponseCodec::write_request, which is currently completely up to the user. As you say, most other protocols probably consider write errors on a substream as fatal as well. The only somewhat clean solution that comes to my mind would seem to lead back to the ability for the substream multiplexers to have a configurable inbound substream limit similar to that of the connection limit for the Pool which can be changed during shutdown (i.e. set to 0), i.e. a limit that does not affect existing substreams but can be used to exert back-pressure in a way such that the remote knows it reached the current substream limit on that connection, but the connection itself is not broken.

}
}
Poll::Ready(Ok(SubstreamEvent::OutboundSubstream { user_data, substream })) => {
let endpoint = SubstreamEndpoint::Dialer(user_data);
self.handler.inject_substream(substream, endpoint)
}
Poll::Ready(Ok(SubstreamEvent::AddressChange(address))) => {
self.handler.inject_address_change(&address);
return Poll::Ready(Ok(Event::AddressChange(address)));
return Poll::Ready(Ok(Some(Event::AddressChange(address))));
}
Poll::Ready(Err(err)) => return Poll::Ready(Err(ConnectionError::IO(err))),
}

// Poll the handler for new events.
match self.handler.poll(cx) {
let poll = match &self.state {
ConnectionState::Open => self.handler.poll(cx).map_ok(Some),
ConnectionState::CloseHandler => self.handler.poll_close(cx).map_ok(
|event| event.map(ConnectionHandlerEvent::Custom)),
s => panic!("Unexpected closing state: {:?}", s) // s.a. (1),(2)
};

match poll {
Poll::Pending => {
if io_pending {
return Poll::Pending // Nothing to do
}
}
Poll::Ready(Ok(ConnectionHandlerEvent::OutboundSubstreamRequest(user_data))) => {
Poll::Ready(Ok(Some(ConnectionHandlerEvent::OutboundSubstreamRequest(user_data)))) => {
self.muxing.open_substream(user_data);
}
Poll::Ready(Ok(ConnectionHandlerEvent::Custom(event))) => {
return Poll::Ready(Ok(Event::Handler(event)));
Poll::Ready(Ok(Some(ConnectionHandlerEvent::Custom(event)))) => {
return Poll::Ready(Ok(Some(Event::Handler(event))));
}
Poll::Ready(Ok(Some(ConnectionHandlerEvent::Close))) => {
self.start_close()
}
Poll::Ready(Ok(None)) => {
// The handler is done, we can now close the muxer (i.e. connection).
self.state = ConnectionState::CloseMuxer;
}
Poll::Ready(Err(err)) => return Poll::Ready(Err(ConnectionError::Handler(err))),
}
Expand Down Expand Up @@ -352,3 +408,25 @@ impl fmt::Display for ConnectionLimit {

/// A `ConnectionLimit` can represent an error if it has been exceeded.
impl Error for ConnectionLimit {}

/// The state of a [`Connection`] w.r.t. an active graceful close.
#[derive(Debug, PartialEq, Eq)]
enum ConnectionState {
/// The connection is open, accepting new inbound and outbound
/// substreams.
Open,
/// The connection is closing, rejecting new inbound substreams
/// and not permitting new outbound substreams while the
/// connection handler closes. [`ConnectionHandler::poll_close`]
/// is called until completion which results in transitioning to
/// `CloseMuxer`.
CloseHandler,
/// The connection is closing, rejecting new inbound substreams
/// and not permitting new outbound substreams while the
/// muxer is closing the transport connection. [`substream::Muxing::poll_close`]
/// is called until completion, which results in transitioning
/// to `Closed`.
CloseMuxer,
/// The connection is closed.
Closed
}
38 changes: 37 additions & 1 deletion core/src/connection/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
// DEALINGS IN THE SOFTWARE.

use crate::{Multiaddr, PeerId};
use std::{task::Context, task::Poll};
use std::task::{Context, Poll};
use super::{Connected, SubstreamEndpoint};

/// The interface of a connection handler.
Expand Down Expand Up @@ -66,6 +66,37 @@ pub trait ConnectionHandler {
/// Returning an error will close the connection to the remote.
fn poll(&mut self, cx: &mut Context<'_>)
-> Poll<Result<ConnectionHandlerEvent<Self::OutboundOpenInfo, Self::OutEvent>, Self::Error>>;

/// Polls the handler to make progress towards closing the connection.
///
/// When a connection is actively closed, the handler can perform
/// a graceful shutdown of the connection by draining the I/O
/// activity, e.g. allowing in-flight requests to complete without
/// accepting new ones, possibly signaling the remote that it
/// should direct further requests elsewhere.
///
/// The handler can also use the opportunity to flush any buffers
/// or clean up any other (asynchronous) resources before the
/// connection is ultimately dropped and closed on the transport
/// layer.
///
/// While closing, new inbound substreams are rejected and the
/// handler is unable to request new outbound substreams as
/// per the return type of `poll_close`.
///
/// The handler signals its readiness for the connection
/// to be closed by returning `Ready(Ok(None))`, which is the
/// default implementation. Hence, by default, connection
/// shutdown is not delayed and may result in ungraceful
/// interruption of ongoing I/O.
///
/// > **Note**: Once `poll_close()` is invoked, the handler is no
/// > longer `poll()`ed.
fn poll_close(&mut self, _: &mut Context)
-> Poll<Result<Option<Self::OutEvent>, Self::Error>>
{
Poll::Ready(Ok(None))
}
}

/// Prototype for a `ConnectionHandler`.
Expand Down Expand Up @@ -99,6 +130,9 @@ pub enum ConnectionHandlerEvent<TOutboundOpenInfo, TCustom> {

/// Other event.
Custom(TCustom),

/// Initiate connection shutdown.
Close,
}

/// Event produced by a handler.
Expand All @@ -112,6 +146,7 @@ impl<TOutboundOpenInfo, TCustom> ConnectionHandlerEvent<TOutboundOpenInfo, TCust
ConnectionHandlerEvent::OutboundSubstreamRequest(map(val))
},
ConnectionHandlerEvent::Custom(val) => ConnectionHandlerEvent::Custom(val),
ConnectionHandlerEvent::Close => ConnectionHandlerEvent::Close,
}
}

Expand All @@ -124,6 +159,7 @@ impl<TOutboundOpenInfo, TCustom> ConnectionHandlerEvent<TOutboundOpenInfo, TCust
ConnectionHandlerEvent::OutboundSubstreamRequest(val)
},
ConnectionHandlerEvent::Custom(val) => ConnectionHandlerEvent::Custom(map(val)),
ConnectionHandlerEvent::Close => ConnectionHandlerEvent::Close,
}
}
}
Expand Down
62 changes: 41 additions & 21 deletions core/src/connection/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,18 +196,19 @@ pub enum Event<'a, I, O, H, TE, HE, C> {
handler: H
},

/// An established connection has encountered an error.
ConnectionError {
/// An established connection has been closed.
ConnectionClosed {
/// The connection ID.
///
/// As a result of the error, the connection has been removed
/// from the `Manager` and is being closed. Hence this ID will
/// no longer resolve to a valid entry in the manager.
/// > **Note**: Closed connections are removed from the `Manager`.
/// > Hence this ID will no longer resolve to a valid entry in
/// > the manager.
id: ConnectionId,
/// Information about the connection that encountered the error.
/// Information about the closed connection.
connected: Connected<C>,
/// The error that occurred.
error: ConnectionError<HE>,
/// The error that occurred, if any. If `None`, the connection
/// has been actively closed.
error: Option<ConnectionError<HE>>,
},

/// A connection has been established.
Expand Down Expand Up @@ -348,11 +349,11 @@ impl<I, O, H, TE, HE, C> Manager<I, O, H, TE, HE, C> {
/// Polls the manager for events relating to the managed connections.
pub fn poll<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<Event<'a, I, O, H, TE, HE, C>> {
// Advance the content of `local_spawns`.
while let Poll::Ready(Some(_)) = Stream::poll_next(Pin::new(&mut self.local_spawns), cx) {}
while let Poll::Ready(Some(_)) = self.local_spawns.poll_next_unpin(cx) {}

// Poll for the first event for which the manager still has a registered task, if any.
let event = loop {
match Stream::poll_next(Pin::new(&mut self.events_rx), cx) {
match self.events_rx.poll_next_unpin(cx) {
Poll::Ready(Some(event)) => {
if self.tasks.contains_key(event.id()) { // (1)
break event
Expand Down Expand Up @@ -397,19 +398,18 @@ impl<I, O, H, TE, HE, C> Manager<I, O, H, TE, HE, C> {
old_endpoint: old,
new_endpoint: new,
}
},
task::Event::Error { id, error } => {
}
task::Event::Closed { id, error } => {
let id = ConnectionId(id);
let task = task.remove();
match task.state {
TaskState::Established(connected) =>
Event::ConnectionError { id, connected, error },
Event::ConnectionClosed { id, connected, error },
TaskState::Pending => unreachable!(
"`Event::Error` implies (2) occurred on that task and thus (3)."
"`Event::Closed` implies (2) occurred on that task and thus (3)."
),
}
}

})
} else {
unreachable!("By (1)")
Expand Down Expand Up @@ -455,10 +455,11 @@ impl<'a, I, C> EstablishedEntry<'a, I, C> {
/// > task _may not be notified_ if sending the event fails due to
/// > the connection handler not being ready at this time.
pub fn notify_handler(&mut self, event: I) -> Result<(), I> {
let cmd = task::Command::NotifyHandler(event);
let cmd = task::Command::NotifyHandler(event); // (*)
self.task.get_mut().sender.try_send(cmd)
.map_err(|e| match e.into_inner() {
task::Command::NotifyHandler(event) => event
task::Command::NotifyHandler(event) => event,
_ => panic!("Unexpected command. Expected `NotifyHandler`") // see (*)
})
}

Expand All @@ -472,6 +473,22 @@ impl<'a, I, C> EstablishedEntry<'a, I, C> {
self.task.get_mut().sender.poll_ready(cx).map_err(|_| ())
}

/// Sends a close command to the associated background task,
/// thus initiating a graceful active close of the connection.
///
/// Has no effect if the connection is already closing.
///
/// When the connection is ultimately closed, [`Event::ConnectionClosed`]
/// is emitted by [`Manager::poll`].
pub fn start_close(mut self) {
// Clone the sender so that we are guaranteed to have
// capacity for the close command (every sender gets a slot).
match self.task.get_mut().sender.clone().try_send(task::Command::Close) {
Ok(()) => {},
Err(e) => assert!(e.is_disconnected(), "No capacity for close command.")
}
}

/// Obtains information about the established connection.
pub fn connected(&self) -> &Connected<C> {
match &self.task.get().state {
Expand All @@ -480,16 +497,18 @@ impl<'a, I, C> EstablishedEntry<'a, I, C> {
}
}

/// Closes the connection represented by this entry,
/// returning the connection information.
pub fn close(self) -> Connected<C> {
/// Instantly removes the entry from the manager, dropping
/// the command channel to the background task of the connection,
/// which will thus drop the connection asap without an orderly
/// close or emitting another event.
pub fn remove(self) -> Connected<C> {
match self.task.remove().state {
TaskState::Established(c) => c,
TaskState::Pending => unreachable!("By Entry::new()")
}
}

/// Returns the connection id.
/// Returns the connection ID.
pub fn id(&self) -> ConnectionId {
ConnectionId(*self.task.key())
}
Expand All @@ -513,3 +532,4 @@ impl<'a, I, C> PendingEntry<'a, I, C> {
self.task.remove();
}
}

Loading