Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(swarm): keep Connections on the same Task #2536

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 27 additions & 34 deletions swarm/src/connection/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,11 @@ use fnv::FnvHashMap;
use futures::prelude::*;
use futures::{
channel::{mpsc, oneshot},
future::{poll_fn, BoxFuture, Either},
ready,
future::{BoxFuture, Either},
stream::FuturesUnordered,
};
use libp2p_core::connection::{ConnectionId, Endpoint, PendingPoint};
use libp2p_core::muxing::{StreamMuxer, StreamMuxerBox};
use libp2p_core::muxing::StreamMuxerBox;
use std::{
collections::{hash_map, HashMap},
convert::TryFrom as _,
Expand Down Expand Up @@ -98,10 +97,12 @@ where

/// Sender distributed to pending tasks for reporting events back
/// to the pool.
pending_connection_events_tx: mpsc::Sender<task::PendingConnectionEvent<TTrans>>,
pending_connection_events_tx:
mpsc::Sender<task::PendingConnectionEvent<TTrans, THandler::Handler>>,

/// Receiver for events reported from pending tasks.
pending_connection_events_rx: mpsc::Receiver<task::PendingConnectionEvent<TTrans>>,
pending_connection_events_rx:
mpsc::Receiver<task::PendingConnectionEvent<TTrans, THandler::Handler>>,

/// Sender distributed to established tasks for reporting events back
/// to the pool.
Expand Down Expand Up @@ -485,7 +486,7 @@ where
dial_concurrency_factor_override: Option<NonZeroU8>,
) -> Result<ConnectionId, (ConnectionLimit, THandler)>
where
TTrans: Clone + Send,
TTrans: Transport<Output = (PeerId, StreamMuxerBox)> + Clone + Send,
TTrans::Dial: Send + 'static,
{
if let Err(limit) = self.counters.check_max_pending_outgoing() {
Expand Down Expand Up @@ -541,6 +542,7 @@ where
info: IncomingInfo<'_>,
) -> Result<ConnectionId, (ConnectionLimit, THandler)>
where
TTrans: Transport<Output = (PeerId, StreamMuxerBox)>,
TFut: Future<Output = Result<TTrans::Output, TTrans::Error>> + Send + 'static,
{
let endpoint = info.to_connected_point();
Expand Down Expand Up @@ -673,7 +675,9 @@ where
match event {
task::PendingConnectionEvent::ConnectionEstablished {
id,
output: (obtained_peer_id, muxer),
// output: (obtained_peer_id, muxer),
obtained_peer_id,
response,
outgoing,
} => {
let PendingConnectionInfo {
Expand Down Expand Up @@ -759,20 +763,8 @@ where
});

if let Err(error) = error {
self.spawn(
poll_fn(move |cx| {
if let Err(e) = ready!(muxer.close(cx)) {
log::debug!(
"Failed to close connection {:?} to peer {}: {:?}",
id,
obtained_peer_id,
e
);
}
Poll::Ready(())
})
.boxed(),
);
// send message to PendingConnection
let _ = response.send(task::PendingCommand::Close);

match endpoint {
ConnectedPoint::Dialer { .. } => {
Expand Down Expand Up @@ -815,21 +807,22 @@ where
},
);

let connection = super::Connection::new(
muxer,
handler.into_handler(&obtained_peer_id, &endpoint),
self.substream_upgrade_protocol_override,
);
self.spawn(
task::new_for_established_connection(
// Send message to upgrade pending connection to upgrade to a full connection
let cmd = task::PendingCommand::Upgrade {
handler: handler.into_handler(&obtained_peer_id, &endpoint),
substream_upgrade_protocol_override: self
.substream_upgrade_protocol_override,
command_receiver,
events: self.established_connection_events_tx.clone(),
};
if response.send(cmd).is_err() {
// TODO: what else do we want to do if the task is gone?
log::debug!(
"Failed to upgrade connection {:?} to peer {}: Task is gone",
id,
obtained_peer_id,
connection,
command_receiver,
self.established_connection_events_tx.clone(),
)
.boxed(),
);
);
}

match self.get(id) {
Some(PoolConnection::Established(connection)) => {
Expand Down
123 changes: 107 additions & 16 deletions swarm/src/connection/pool/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
use super::concurrent_dial::ConcurrentDial;
use crate::{
connection::{
self, ConnectionError, PendingInboundConnectionError, PendingOutboundConnectionError,
self, Connection, ConnectionError, PendingInboundConnectionError,
PendingOutboundConnectionError,
},
transport::{Transport, TransportError},
ConnectionHandler, Multiaddr, PeerId,
Expand All @@ -34,7 +35,7 @@ use futures::{
future::{poll_fn, Either, Future},
SinkExt, StreamExt,
};
use libp2p_core::connection::ConnectionId;
use libp2p_core::{connection::ConnectionId, muxing::StreamMuxerBox, upgrade, StreamMuxer};
use std::pin::Pin;
use void::Void;

Expand All @@ -48,14 +49,30 @@ pub enum Command<T> {
Close,
}

/// Commands that can be sent to a task driving a pending connection.
#[derive(Debug)]
pub enum PendingConnectionEvent<TTrans>
pub enum PendingCommand<THandler: ConnectionHandler> {
/// Upgrade from pending to established connection.
Upgrade {
handler: THandler,
substream_upgrade_protocol_override: Option<upgrade::Version>,
command_receiver: mpsc::Receiver<Command<THandler::InEvent>>,
events: mpsc::Sender<EstablishedConnectionEvent<THandler>>,
},
/// Close the connection, due to an error, and terminate the task.
Close,
}

#[derive(Debug)]
pub enum PendingConnectionEvent<TTrans, THandler>
where
TTrans: Transport,
THandler: ConnectionHandler,
{
ConnectionEstablished {
id: ConnectionId,
output: TTrans::Output,
obtained_peer_id: PeerId,
response: oneshot::Sender<PendingCommand<THandler>>,
/// [`Some`] when the new connection is an outgoing connection.
/// Addresses are dialed in parallel. Contains the addresses and errors
/// of dial attempts that failed before the one successful dial.
Expand Down Expand Up @@ -97,13 +114,14 @@ pub enum EstablishedConnectionEvent<THandler: ConnectionHandler> {
},
}

pub async fn new_for_pending_outgoing_connection<TTrans>(
pub async fn new_for_pending_outgoing_connection<TTrans, THandler>(
connection_id: ConnectionId,
dial: ConcurrentDial<TTrans>,
abort_receiver: oneshot::Receiver<Void>,
mut events: mpsc::Sender<PendingConnectionEvent<TTrans>>,
mut events: mpsc::Sender<PendingConnectionEvent<TTrans, THandler>>,
) where
TTrans: Transport,
TTrans: Transport<Output = (PeerId, StreamMuxerBox)>,
THandler: ConnectionHandler,
{
match futures::future::select(abort_receiver, Box::pin(dial)).await {
Either::Left((Err(oneshot::Canceled), _)) => {
Expand All @@ -115,14 +133,50 @@ pub async fn new_for_pending_outgoing_connection<TTrans>(
.await;
}
Either::Left((Ok(v), _)) => void::unreachable(v),
Either::Right((Ok((address, output, errors)), _)) => {
Either::Right((Ok((address, (obtained_peer_id, muxer), errors)), _)) => {
let (response, receiver) = oneshot::channel();
let _ = events
.send(PendingConnectionEvent::ConnectionEstablished {
id: connection_id,
output,
obtained_peer_id,
response,
outgoing: Some((address, errors)),
})
.await;

match receiver.await {
Ok(PendingCommand::Upgrade {
handler,
substream_upgrade_protocol_override,
command_receiver,
events,
}) => {
// Upgrade to Connection
let connection =
Connection::new(muxer, handler, substream_upgrade_protocol_override);
new_for_established_connection(
connection_id,
obtained_peer_id,
connection,
command_receiver,
events,
)
.await
}
Ok(PendingCommand::Close) => {
if let Err(e) = poll_fn(move |cx| muxer.close(cx)).await {
log::debug!(
"Failed to close connection {:?} to peer {}: {:?}",
connection_id,
obtained_peer_id,
e
);
}
}
Err(_) => {
// Shutting down, nothing we can do about this.
}
}
}
Either::Right((Err(e), _)) => {
let _ = events
Expand All @@ -135,14 +189,15 @@ pub async fn new_for_pending_outgoing_connection<TTrans>(
}
}

pub async fn new_for_pending_incoming_connection<TFut, TTrans>(
pub async fn new_for_pending_incoming_connection<TFut, TTrans, THandler>(
connection_id: ConnectionId,
future: TFut,
abort_receiver: oneshot::Receiver<Void>,
mut events: mpsc::Sender<PendingConnectionEvent<TTrans>>,
mut events: mpsc::Sender<PendingConnectionEvent<TTrans, THandler>>,
) where
TTrans: Transport,
TTrans: Transport<Output = (PeerId, StreamMuxerBox)>,
TFut: Future<Output = Result<TTrans::Output, TTrans::Error>> + Send + 'static,
THandler: ConnectionHandler,
{
match futures::future::select(abort_receiver, Box::pin(future)).await {
Either::Left((Err(oneshot::Canceled), _)) => {
Expand All @@ -154,14 +209,50 @@ pub async fn new_for_pending_incoming_connection<TFut, TTrans>(
.await;
}
Either::Left((Ok(v), _)) => void::unreachable(v),
Either::Right((Ok(output), _)) => {
Either::Right((Ok((obtained_peer_id, muxer)), _)) => {
let (response, receiver) = oneshot::channel();
let _ = events
.send(PendingConnectionEvent::ConnectionEstablished {
id: connection_id,
output,
obtained_peer_id,
response,
outgoing: None,
})
.await;

match receiver.await {
Ok(PendingCommand::Upgrade {
handler,
substream_upgrade_protocol_override,
command_receiver,
events,
}) => {
// Upgrade to Connection
let connection =
Connection::new(muxer, handler, substream_upgrade_protocol_override);
new_for_established_connection(
connection_id,
obtained_peer_id,
connection,
command_receiver,
events,
)
.await
}
Ok(PendingCommand::Close) => {
if let Err(e) = poll_fn(move |cx| muxer.close(cx)).await {
log::debug!(
"Failed to close connection {:?} to peer {}: {:?}",
connection_id,
obtained_peer_id,
e
);
}
}
Err(_) => {
// Shutting down, nothing we can do about this.
}
}
}
Either::Right((Err(e), _)) => {
let _ = events
Expand All @@ -176,10 +267,10 @@ pub async fn new_for_pending_incoming_connection<TFut, TTrans>(
}
}

pub async fn new_for_established_connection<THandler>(
async fn new_for_established_connection<THandler>(
connection_id: ConnectionId,
peer_id: PeerId,
mut connection: crate::connection::Connection<THandler>,
mut connection: Connection<THandler>,
mut command_receiver: mpsc::Receiver<Command<THandler::InEvent>>,
mut events: mpsc::Sender<EstablishedConnectionEvent<THandler>>,
) where
Expand Down