Skip to content

Commit

Permalink
*: Dial with handler and return handler on error and closed (#2191)
Browse files Browse the repository at this point in the history
Require `NetworkBehaviourAction::{DialPeer,DialAddress}` to contain a
`ProtocolsHandler`. This allows a behaviour to attach custom state to its
handler. The behaviour would no longer need to track this state separately
during connection establishment, thus reducing state required in a behaviour.
E.g. in the case of `libp2p-kad` the behaviour can include a `GetRecord` request
in its handler, or e.g. in the case of `libp2p-request-response` the behaviour
can include the first request in the handler.

Return `ProtocolsHandler` on connection error and close. This allows a behaviour
to extract its custom state previously included in the handler on connection
failure and connection closing. E.g. in the case of `libp2p-kad` the behaviour
could extract the attached `GetRecord` from the handler of the failed connection
and then start another connection attempt with a new handler with the same
`GetRecord` or bubble up an error to the user.

Co-authored-by: Thomas Eizinger <[email protected]>
  • Loading branch information
mxinden and thomaseizinger authored Aug 31, 2021
1 parent b55ee69 commit c161acf
Show file tree
Hide file tree
Showing 38 changed files with 1,110 additions and 476 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ libp2p-pnet = { version = "0.22.0", path = "transports/pnet", optional = true }
libp2p-relay = { version = "0.4.0", path = "protocols/relay", optional = true }
libp2p-request-response = { version = "0.13.0", path = "protocols/request-response", optional = true }
libp2p-swarm = { version = "0.31.0", path = "swarm" }
libp2p-swarm-derive = { version = "0.24.0", path = "swarm-derive" }
libp2p-swarm-derive = { version = "0.25.0", path = "swarm-derive" }
libp2p-uds = { version = "0.30.0", path = "transports/uds", optional = true }
libp2p-wasm-ext = { version = "0.30.0", path = "transports/wasm-ext", default-features = false, optional = true }
libp2p-yamux = { version = "0.34.0", path = "muxers/yamux", optional = true }
Expand Down
8 changes: 8 additions & 0 deletions core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,19 @@

- Remove `DisconnectedPeer::set_connected` and `Pool::add` (see [PR 2195]).

- Report `ConnectionLimit` error through `ConnectionError` and thus through
`NetworkEvent::ConnectionClosed` instead of previously through
`PendingConnectionError` and thus `NetworkEvent::{IncomingConnectionError,
DialError}` (see [PR 2191]).

- Report abortion of pending connection through `DialError`,
`UnknownPeerDialError` or `IncomingConnectionError` (see [PR 2191]).

[PR 2145]: https://github.com/libp2p/rust-libp2p/pull/2145
[PR 2142]: https://github.com/libp2p/rust-libp2p/pull/2142
[PR 2137]: https://github.com/libp2p/rust-libp2p/pull/2137
[PR 2183]: https://github.com/libp2p/rust-libp2p/pull/2183
[PR 2191]: https://github.com/libp2p/rust-libp2p/pull/2191
[PR 2195]: https://github.com/libp2p/rust-libp2p/pull/2195

# 0.29.0 [2021-07-12]
Expand Down
8 changes: 4 additions & 4 deletions core/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,10 @@ where
self.handler.inject_event(event);
}

/// Begins an orderly shutdown of the connection, returning a
/// `Future` that resolves when connection shutdown is complete.
pub fn close(self) -> Close<TMuxer> {
self.muxing.close().0
/// Begins an orderly shutdown of the connection, returning the connection
/// handler and a `Future` that resolves when connection shutdown is complete.
pub fn close(self) -> (THandler, Close<TMuxer>) {
(self.handler, self.muxing.close().0)
}

/// Polls the connection for events produced by the associated handler
Expand Down
21 changes: 13 additions & 8 deletions core/src/connection/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ pub enum ConnectionError<THandlerErr> {
// TODO: Eventually this should also be a custom error?
IO(io::Error),

/// The connection was dropped because the connection limit
/// for a peer has been reached.
ConnectionLimit(ConnectionLimit),

/// The connection handler produced an error.
Handler(THandlerErr),
}
Expand All @@ -41,6 +45,9 @@ where
match self {
ConnectionError::IO(err) => write!(f, "Connection error: I/O error: {}", err),
ConnectionError::Handler(err) => write!(f, "Connection error: Handler error: {}", err),
ConnectionError::ConnectionLimit(l) => {
write!(f, "Connection error: Connection limit: {}.", l)
}
}
}
}
Expand All @@ -53,6 +60,7 @@ where
match self {
ConnectionError::IO(err) => Some(err),
ConnectionError::Handler(err) => Some(err),
ConnectionError::ConnectionLimit(..) => None,
}
}
}
Expand All @@ -63,14 +71,13 @@ pub enum PendingConnectionError<TTransErr> {
/// An error occurred while negotiating the transport protocol(s).
Transport(TransportError<TTransErr>),

/// Pending connection attempt has been aborted.
Aborted,

/// The peer identity obtained on the connection did not
/// match the one that was expected or is otherwise invalid.
InvalidPeerId,

/// The connection was dropped because the connection limit
/// for a peer has been reached.
ConnectionLimit(ConnectionLimit),

/// An I/O error occurred on the connection.
// TODO: Eventually this should also be a custom error?
IO(io::Error),
Expand All @@ -83,15 +90,13 @@ where
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
PendingConnectionError::IO(err) => write!(f, "Pending connection: I/O error: {}", err),
PendingConnectionError::Aborted => write!(f, "Pending connection: Aborted."),
PendingConnectionError::Transport(err) => {
write!(f, "Pending connection: Transport error: {}", err)
}
PendingConnectionError::InvalidPeerId => {
write!(f, "Pending connection: Invalid peer ID.")
}
PendingConnectionError::ConnectionLimit(l) => {
write!(f, "Connection error: Connection limit: {}.", l)
}
}
}
}
Expand All @@ -105,7 +110,7 @@ where
PendingConnectionError::IO(err) => Some(err),
PendingConnectionError::Transport(err) => Some(err),
PendingConnectionError::InvalidPeerId => None,
PendingConnectionError::ConnectionLimit(..) => None,
PendingConnectionError::Aborted => None,
}
}
}
23 changes: 7 additions & 16 deletions core/src/connection/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

use super::{
handler::{THandlerError, THandlerInEvent, THandlerOutEvent},
Connected, ConnectedPoint, ConnectionError, ConnectionHandler, IntoConnectionHandler,
PendingConnectionError, Substream,
Connected, ConnectedPoint, ConnectionError, ConnectionHandler, ConnectionLimit,
IntoConnectionHandler, PendingConnectionError, Substream,
};
use crate::{muxing::StreamMuxer, Executor};
use fnv::FnvHashMap;
Expand Down Expand Up @@ -192,6 +192,7 @@ pub enum Event<'a, H: IntoConnectionHandler, TE> {
/// The error that occurred, if any. If `None`, the connection
/// has been actively closed.
error: Option<ConnectionError<THandlerError<H>>>,
handler: H::Handler,
},

/// A connection has been established.
Expand Down Expand Up @@ -350,14 +351,15 @@ impl<H: IntoConnectionHandler, TE> Manager<H, TE> {
new_endpoint: new,
}
}
task::Event::Closed { id, error } => {
task::Event::Closed { id, error, handler } => {
let id = ConnectionId(id);
let task = task.remove();
match task.state {
TaskState::Established(connected) => Event::ConnectionClosed {
id,
connected,
error,
handler,
},
TaskState::Pending => unreachable!(
"`Event::Closed` implies (2) occurred on that task and thus (3)."
Expand Down Expand Up @@ -437,15 +439,15 @@ impl<'a, I> EstablishedEntry<'a, I> {
///
/// When the connection is ultimately closed, [`Event::ConnectionClosed`]
/// is emitted by [`Manager::poll`].
pub fn start_close(mut self) {
pub fn start_close(mut self, error: Option<ConnectionLimit>) {
// Clone the sender so that we are guaranteed to have
// capacity for the close command (every sender gets a slot).
match self
.task
.get_mut()
.sender
.clone()
.try_send(task::Command::Close)
.try_send(task::Command::Close(error))
{
Ok(()) => {}
Err(e) => assert!(e.is_disconnected(), "No capacity for close command."),
Expand All @@ -460,17 +462,6 @@ impl<'a, I> EstablishedEntry<'a, I> {
}
}

/// Instantly removes the entry from the manager, dropping
/// the command channel to the background task of the connection,
/// which will thus drop the connection asap without an orderly
/// close or emitting another event.
pub fn remove(self) -> Connected {
match self.task.remove().state {
TaskState::Established(c) => c,
TaskState::Pending => unreachable!("By Entry::new()"),
}
}

/// Returns the connection ID.
pub fn id(&self) -> ConnectionId {
ConnectionId(*self.task.key())
Expand Down
55 changes: 43 additions & 12 deletions core/src/connection/manager/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use crate::{
connection::{
self,
handler::{THandlerError, THandlerInEvent, THandlerOutEvent},
Close, Connected, Connection, ConnectionError, ConnectionHandler, IntoConnectionHandler,
PendingConnectionError, Substream,
Close, Connected, Connection, ConnectionError, ConnectionHandler, ConnectionLimit,
IntoConnectionHandler, PendingConnectionError, Substream,
},
muxing::StreamMuxer,
Multiaddr,
Expand All @@ -43,7 +43,7 @@ pub enum Command<T> {
NotifyHandler(T),
/// Gracefully close the connection (active close) before
/// terminating the task.
Close,
Close(Option<ConnectionLimit>),
}

/// Events that a task can emit to its manager.
Expand Down Expand Up @@ -71,6 +71,7 @@ pub enum Event<H: IntoConnectionHandler, TE> {
Closed {
id: TaskId,
error: Option<ConnectionError<THandlerError<H>>>,
handler: H::Handler,
},
}

Expand Down Expand Up @@ -159,7 +160,11 @@ where
},

/// The connection is closing (active close).
Closing(Close<M>),
Closing {
closing_muxer: Close<M>,
handler: H::Handler,
error: Option<ConnectionLimit>,
},

/// The task is terminating with a final event for the `Manager`.
Terminating(Event<H, E>),
Expand Down Expand Up @@ -204,7 +209,16 @@ where
Poll::Pending => {}
Poll::Ready(None) => {
// The manager has dropped the task; abort.
return Poll::Ready(());
// Don't accept any further commands and terminate the
// task with a final event.
this.commands.get_mut().close();
let event = Event::Failed {
id,
handler,
error: PendingConnectionError::Aborted,
};
this.state = State::Terminating(event);
continue 'poll;
}
Poll::Ready(Some(_)) => {
panic!("Task received command while the connection is pending.")
Expand Down Expand Up @@ -243,15 +257,20 @@ where
Poll::Ready(Some(Command::NotifyHandler(event))) => {
connection.inject_event(event)
}
Poll::Ready(Some(Command::Close)) => {
Poll::Ready(Some(Command::Close(error))) => {
// Don't accept any further commands.
this.commands.get_mut().close();
// Discard the event, if any, and start a graceful close.
this.state = State::Closing(connection.close());
let (handler, closing_muxer) = connection.close();
this.state = State::Closing {
handler,
closing_muxer,
error,
};
continue 'poll;
}
Poll::Ready(None) => {
// The manager has dropped the task or disappeared; abort.
// The manager has disappeared; abort.
return Poll::Ready(());
}
}
Expand Down Expand Up @@ -306,36 +325,48 @@ where
Poll::Ready(Err(error)) => {
// Don't accept any further commands.
this.commands.get_mut().close();
let (handler, _closing_muxer) = connection.close();
// Terminate the task with the error, dropping the connection.
let event = Event::Closed {
id,
error: Some(error),
handler,
};
this.state = State::Terminating(event);
}
}
}
}

State::Closing(mut closing) => {
State::Closing {
handler,
error,
mut closing_muxer,
} => {
// Try to gracefully close the connection.
match closing.poll_unpin(cx) {
match closing_muxer.poll_unpin(cx) {
Poll::Ready(Ok(())) => {
let event = Event::Closed {
id: this.id,
error: None,
error: error.map(|limit| ConnectionError::ConnectionLimit(limit)),
handler,
};
this.state = State::Terminating(event);
}
Poll::Ready(Err(e)) => {
let event = Event::Closed {
id: this.id,
error: Some(ConnectionError::IO(e)),
handler,
};
this.state = State::Terminating(event);
}
Poll::Pending => {
this.state = State::Closing(closing);
this.state = State::Closing {
handler,
error,
closing_muxer,
};
return Poll::Pending;
}
}
Expand Down
Loading

0 comments on commit c161acf

Please sign in to comment.