Skip to content

Commit

Permalink
Use oneshot's instead for libp2p-relay
Browse files Browse the repository at this point in the history
  • Loading branch information
thomaseizinger committed Nov 15, 2023
1 parent 2850dc9 commit 172fea4
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 105 deletions.
193 changes: 90 additions & 103 deletions protocols/relay/src/priv_client/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,19 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::client::Connection;
use crate::priv_client::transport;
use crate::priv_client::transport::ToListenerMsg;
use crate::protocol::{self, inbound_stop, outbound_hop};
use crate::{priv_client, proto, HOP_PROTOCOL_NAME, STOP_PROTOCOL_NAME};
use futures::channel::mpsc::Sender;
use futures::channel::{mpsc, oneshot};
use futures::future::FutureExt;
use futures_timer::Delay;
use libp2p_core::multiaddr::Protocol;
use libp2p_core::upgrade::ReadyUpgrade;
use libp2p_core::Multiaddr;
use libp2p_identity::PeerId;
use libp2p_protocol_utils::InflightProtocolDataQueue;
use libp2p_swarm::handler::{ConnectionEvent, FullyNegotiatedInbound};
use libp2p_swarm::{
ConnectionHandler, ConnectionHandlerEvent, Stream, StreamProtocol, StreamUpgradeError,
Expand Down Expand Up @@ -104,12 +106,7 @@ pub struct Handler {
>,
>,

/// Manages associated data whilst we wait for outbound streams to be opened.
pending_streams: InflightProtocolDataQueue<
PendingRequest,
StreamProtocol,
Result<Stream, StreamUpgradeError<Void>>,
>,
pending_streams: VecDeque<oneshot::Sender<Result<Stream, StreamUpgradeError<Void>>>>,

inflight_reserve_requests: futures_bounded::FuturesTupleSet<
Result<outbound_hop::Reservation, outbound_hop::ReserveError>,
Expand Down Expand Up @@ -172,6 +169,62 @@ impl Handler {
)
}
}

fn make_new_reservation(&mut self, to_listener: Sender<ToListenerMsg>) {
let (sender, receiver) = oneshot::channel();

self.pending_streams.push_back(sender);
self.queued_events
.push_back(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(ReadyUpgrade::new(HOP_PROTOCOL_NAME), ()),
});
let result = self.inflight_reserve_requests.try_push(
async move {
let stream = receiver
.await
.map_err(|_| io::Error::from(io::ErrorKind::BrokenPipe))?
.map_err(into_reserve_error)?;

let reservation = outbound_hop::make_reservation(stream).await?;

Ok(reservation)
},
to_listener,
);

if result.is_err() {
tracing::warn!("Dropping in-flight reservation request because we are at capacity");
}
}

fn establish_new_circuit(
&mut self,
to_dial: oneshot::Sender<Result<Connection, outbound_hop::ConnectError>>,
dst_peer_id: PeerId,
) {
let (sender, receiver) = oneshot::channel();

self.pending_streams.push_back(sender);
self.queued_events
.push_back(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(ReadyUpgrade::new(HOP_PROTOCOL_NAME), ()),
});
let result = self.inflight_outbound_connect_requests.try_push(
async move {
let stream = receiver
.await
.map_err(|_| io::Error::from(io::ErrorKind::BrokenPipe))?
.map_err(into_connect_error)?;

outbound_hop::open_circuit(stream, dst_peer_id).await
},
to_dial,
);

if result.is_err() {
tracing::warn!("Dropping in-flight connect request because we are at capacity")
}
}
}

impl ConnectionHandler for Handler {
Expand All @@ -189,20 +242,13 @@ impl ConnectionHandler for Handler {
fn on_behaviour_event(&mut self, event: Self::FromBehaviour) {
match event {
In::Reserve { to_listener } => {
self.pending_streams
.enqueue_request(HOP_PROTOCOL_NAME, PendingRequest::Reserve { to_listener });
self.make_new_reservation(to_listener);
}
In::EstablishCircuit {
to_dial: send_back,
to_dial,
dst_peer_id,
} => {
self.pending_streams.enqueue_request(
HOP_PROTOCOL_NAME,
PendingRequest::Connect {
dst_peer_id,
to_dial: send_back,
},
);
self.establish_new_circuit(to_dial, dst_peer_id);
}
}
}
Expand Down Expand Up @@ -350,8 +396,7 @@ impl ConnectionHandler for Handler {
}

if let Poll::Ready(Some(to_listener)) = self.reservation.poll(cx) {
self.pending_streams
.enqueue_request(HOP_PROTOCOL_NAME, PendingRequest::Reserve { to_listener });
self.make_new_reservation(to_listener);
continue;
}

Expand All @@ -369,78 +414,6 @@ impl ConnectionHandler for Handler {
Poll::Pending => {}
}

if let Some(protocol) = self.pending_streams.next_request() {
return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(ReadyUpgrade::new(protocol), ()),
});
}

match self.pending_streams.next_completed() {
Some((Ok(stream), PendingRequest::Reserve { to_listener })) => {
if self
.inflight_reserve_requests
.try_push(outbound_hop::make_reservation(stream), to_listener)
.is_err()
{
tracing::warn!("Dropping outbound stream because we are at capacity")
}
continue;
}
Some((Err(error), PendingRequest::Reserve { mut to_listener })) => {
let error = match error {
StreamUpgradeError::Timeout => {
outbound_hop::ReserveError::Io(io::ErrorKind::TimedOut.into())
}
StreamUpgradeError::Apply(never) => void::unreachable(never),
StreamUpgradeError::NegotiationFailed => {
outbound_hop::ReserveError::Unsupported
}
StreamUpgradeError::Io(e) => outbound_hop::ReserveError::Io(e),
};

if let Err(e) =
to_listener.try_send(transport::ToListenerMsg::Reservation(Err(error)))
{
tracing::debug!("Unable to send error to listener: {}", e.into_send_error())
}
self.reservation.failed();
continue;
}
Some((
Ok(stream),
PendingRequest::Connect {
to_dial,
dst_peer_id,
},
)) => {
if self
.inflight_outbound_connect_requests
.try_push(outbound_hop::open_circuit(stream, dst_peer_id), to_dial)
.is_err()
{
tracing::warn!("Dropping outbound stream because we are at capacity")
}
continue;
}

Some((Err(error), PendingRequest::Connect { to_dial, .. })) => {
let error = match error {
StreamUpgradeError::Timeout => {
outbound_hop::ConnectError::Io(io::ErrorKind::TimedOut.into())
}
StreamUpgradeError::NegotiationFailed => {
outbound_hop::ConnectError::Unsupported
}
StreamUpgradeError::Io(e) => outbound_hop::ConnectError::Io(e),
StreamUpgradeError::Apply(v) => void::unreachable(v),
};

let _ = to_dial.send(Err(error));
continue;
}
None => {}
}

return Poll::Pending;
}
}
Expand Down Expand Up @@ -468,11 +441,15 @@ impl ConnectionHandler for Handler {
}
}
ConnectionEvent::FullyNegotiatedOutbound(ev) => {
self.pending_streams.submit_response(Ok(ev.protocol));
if let Some(next) = self.pending_streams.pop_front() {
let _ = next.send(Ok(ev.protocol));
}
}
ConnectionEvent::ListenUpgradeError(ev) => void::unreachable(ev.error),
ConnectionEvent::DialUpgradeError(ev) => {
self.pending_streams.submit_response(Err(ev.error));
if let Some(next) = self.pending_streams.pop_front() {
let _ = next.send(Err(ev.error));
}
}
_ => {}
}
Expand Down Expand Up @@ -601,14 +578,24 @@ impl Reservation {
}
}

pub(crate) enum PendingRequest {
Reserve {
/// A channel into the [`Transport`](priv_client::Transport).
to_listener: mpsc::Sender<transport::ToListenerMsg>,
},
Connect {
dst_peer_id: PeerId,
/// A channel into the future returned by [`Transport::dial`](libp2p_core::Transport::dial).
to_dial: oneshot::Sender<Result<priv_client::Connection, outbound_hop::ConnectError>>,
},
fn into_reserve_error(e: StreamUpgradeError<Void>) -> outbound_hop::ReserveError {
match e {
StreamUpgradeError::Timeout => {
outbound_hop::ReserveError::Io(io::ErrorKind::TimedOut.into())
}
StreamUpgradeError::Apply(never) => void::unreachable(never),
StreamUpgradeError::NegotiationFailed => outbound_hop::ReserveError::Unsupported,
StreamUpgradeError::Io(e) => outbound_hop::ReserveError::Io(e),
}
}

fn into_connect_error(e: StreamUpgradeError<Void>) -> outbound_hop::ConnectError {
match e {
StreamUpgradeError::Timeout => {
outbound_hop::ConnectError::Io(io::ErrorKind::TimedOut.into())
}
StreamUpgradeError::Apply(never) => void::unreachable(never),
StreamUpgradeError::NegotiationFailed => outbound_hop::ConnectError::Unsupported,
StreamUpgradeError::Io(e) => outbound_hop::ConnectError::Io(e),
}
}
4 changes: 2 additions & 2 deletions protocols/relay/src/protocol/outbound_hop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub enum ConnectError {
#[error("Remote does not support the `{HOP_PROTOCOL_NAME}` protocol")]
Unsupported,
#[error("IO error")]
Io(#[source] io::Error),
Io(#[from] io::Error),
#[error("Protocol error")]
Protocol(#[from] ProtocolViolation),
}
Expand All @@ -61,7 +61,7 @@ pub enum ReserveError {
#[error("Remote does not support the `{HOP_PROTOCOL_NAME}` protocol")]
Unsupported,
#[error("IO error")]
Io(#[source] io::Error),
Io(#[from] io::Error),
#[error("Protocol error")]
Protocol(#[from] ProtocolViolation),
}
Expand Down

0 comments on commit 172fea4

Please sign in to comment.