From c7f099bd701708f51ee647278c80e52d143be24c Mon Sep 17 00:00:00 2001 From: Thomas Coratger Date: Tue, 25 Apr 2023 11:18:59 +0200 Subject: [PATCH 1/8] new type client::Connection with AsyncRead and AsyncWrite --- protocols/relay/src/priv_client.rs | 71 +++++++++++++--------- protocols/relay/src/priv_client/handler.rs | 13 ++-- protocols/relay/src/v2.rs | 2 +- 3 files changed, 49 insertions(+), 37 deletions(-) diff --git a/protocols/relay/src/priv_client.rs b/protocols/relay/src/priv_client.rs index 2a24607a724..48be56d52d4 100644 --- a/protocols/relay/src/priv_client.rs +++ b/protocols/relay/src/priv_client.rs @@ -45,7 +45,6 @@ use libp2p_swarm::{ }; use std::collections::{hash_map, HashMap, VecDeque}; use std::io::{Error, ErrorKind, IoSlice}; -use std::ops::DerefMut; use std::pin::Pin; use std::task::{Context, Poll}; use transport::Transport; @@ -387,10 +386,14 @@ impl NetworkBehaviour for Behaviour { } } -/// A [`NegotiatedSubstream`] acting as a [`Connection`]. -pub enum Connection { +pub struct Connection { + state: ConnectionState, +} + +/// A [`NegotiatedSubstream`] acting as a [`ConnectionState`]. +pub enum ConnectionState { InboundAccepting { - accept: BoxFuture<'static, Result>, + accept: BoxFuture<'static, Result>, }, Operational { read_buffer: Bytes, @@ -399,20 +402,20 @@ pub enum Connection { }, } -impl Unpin for Connection {} +impl Unpin for ConnectionState {} -impl Connection { +impl ConnectionState { pub(crate) fn new_inbound( circuit: inbound_stop::Circuit, drop_notifier: oneshot::Sender, ) -> Self { - Connection::InboundAccepting { + ConnectionState::InboundAccepting { accept: async { let (substream, read_buffer) = circuit .accept() .await .map_err(|e| Error::new(ErrorKind::Other, e))?; - Ok(Connection::Operational { + Ok(ConnectionState::Operational { read_buffer, substream, drop_notifier, @@ -427,7 +430,7 @@ impl Connection { read_buffer: Bytes, drop_notifier: oneshot::Sender, ) -> Self { - Connection::Operational { + ConnectionState::Operational { substream, read_buffer, drop_notifier, @@ -442,11 +445,13 @@ impl AsyncWrite for Connection { buf: &[u8], ) -> Poll> { loop { - match self.deref_mut() { - Connection::InboundAccepting { accept } => { - *self = ready!(accept.poll_unpin(cx))?; + match &mut self.state { + ConnectionState::InboundAccepting { accept } => { + *self = Connection { + state: ready!(accept.poll_unpin(cx))?, + }; } - Connection::Operational { substream, .. } => { + ConnectionState::Operational { substream, .. } => { return Pin::new(substream).poll_write(cx, buf); } } @@ -454,11 +459,13 @@ impl AsyncWrite for Connection { } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { loop { - match self.deref_mut() { - Connection::InboundAccepting { accept } => { - *self = ready!(accept.poll_unpin(cx))?; + match &mut self.state { + ConnectionState::InboundAccepting { accept } => { + *self = Connection { + state: ready!(accept.poll_unpin(cx))?, + }; } - Connection::Operational { substream, .. } => { + ConnectionState::Operational { substream, .. } => { return Pin::new(substream).poll_flush(cx); } } @@ -466,11 +473,13 @@ impl AsyncWrite for Connection { } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { loop { - match self.deref_mut() { - Connection::InboundAccepting { accept } => { - *self = ready!(accept.poll_unpin(cx))?; + match &mut self.state { + ConnectionState::InboundAccepting { accept } => { + *self = Connection { + state: ready!(accept.poll_unpin(cx))?, + }; } - Connection::Operational { substream, .. } => { + ConnectionState::Operational { substream, .. } => { return Pin::new(substream).poll_close(cx); } } @@ -483,11 +492,13 @@ impl AsyncWrite for Connection { bufs: &[IoSlice], ) -> Poll> { loop { - match self.deref_mut() { - Connection::InboundAccepting { accept } => { - *self = ready!(accept.poll_unpin(cx))?; + match &mut self.state { + ConnectionState::InboundAccepting { accept } => { + *self = Connection { + state: ready!(accept.poll_unpin(cx))?, + }; } - Connection::Operational { substream, .. } => { + ConnectionState::Operational { substream, .. } => { return Pin::new(substream).poll_write_vectored(cx, bufs); } } @@ -502,11 +513,13 @@ impl AsyncRead for Connection { buf: &mut [u8], ) -> Poll> { loop { - match self.deref_mut() { - Connection::InboundAccepting { accept } => { - *self = ready!(accept.poll_unpin(cx))?; + match &mut self.state { + ConnectionState::InboundAccepting { accept } => { + *self = Connection { + state: ready!(accept.poll_unpin(cx))?, + }; } - Connection::Operational { + ConnectionState::Operational { read_buffer, substream, .. diff --git a/protocols/relay/src/priv_client/handler.rs b/protocols/relay/src/priv_client/handler.rs index 504fab590f8..290b72e94af 100644 --- a/protocols/relay/src/priv_client/handler.rs +++ b/protocols/relay/src/priv_client/handler.rs @@ -189,10 +189,11 @@ impl Handler { let (tx, rx) = oneshot::channel(); self.alive_lend_out_substreams.push(rx); - let connection = super::Connection::new_inbound(inbound_circuit, tx); + let connection = super::ConnectionState::new_inbound(inbound_circuit, tx); pending_msgs.push_back(transport::ToListenerMsg::IncomingRelayedConnection { - stream: connection, + // stream: connection, + stream: super::Connection { state: connection }, src_peer_id, relay_peer_id: self.remote_peer_id, relay_addr: self.remote_addr.clone(), @@ -271,11 +272,9 @@ impl Handler { OutboundOpenInfo::Connect { send_back }, ) => { let (tx, rx) = oneshot::channel(); - match send_back.send(Ok(super::Connection::new_outbound( - substream, - read_buffer, - tx, - ))) { + match send_back.send(Ok(super::Connection { + state: super::ConnectionState::new_outbound(substream, read_buffer, tx), + })) { Ok(()) => { self.alive_lend_out_substreams.push(rx); self.queued_events.push_back(ConnectionHandlerEvent::Custom( diff --git a/protocols/relay/src/v2.rs b/protocols/relay/src/v2.rs index baaf148f2b5..23b7292116b 100644 --- a/protocols/relay/src/v2.rs +++ b/protocols/relay/src/v2.rs @@ -33,7 +33,7 @@ pub mod client { #[deprecated( since = "0.15.0", - note = "Use libp2p_relay::client::Connection instead." + note = "Use libp2p_relay::client::ConnectionState instead." )] pub type RelayedConnection = crate::client::Connection; From 9d132c635206657737c8b44b5334f3d4b63558fe Mon Sep 17 00:00:00 2001 From: Thomas Coratger Date: Tue, 25 Apr 2023 19:20:12 +0200 Subject: [PATCH 2/8] ConnectionState pub enum to private --- protocols/relay/src/priv_client.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/protocols/relay/src/priv_client.rs b/protocols/relay/src/priv_client.rs index 48be56d52d4..6faec21c885 100644 --- a/protocols/relay/src/priv_client.rs +++ b/protocols/relay/src/priv_client.rs @@ -391,7 +391,8 @@ pub struct Connection { } /// A [`NegotiatedSubstream`] acting as a [`ConnectionState`]. -pub enum ConnectionState { +#[allow(dead_code)] +enum ConnectionState { InboundAccepting { accept: BoxFuture<'static, Result>, }, From 89f12ad0caf87d1e458af158dbd218a9d7a58b99 Mon Sep 17 00:00:00 2001 From: Thomas Coratger Date: Tue, 25 Apr 2023 19:24:36 +0200 Subject: [PATCH 3/8] modify RelayedConnection deprecation message --- protocols/relay/src/v2.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocols/relay/src/v2.rs b/protocols/relay/src/v2.rs index 23b7292116b..baaf148f2b5 100644 --- a/protocols/relay/src/v2.rs +++ b/protocols/relay/src/v2.rs @@ -33,7 +33,7 @@ pub mod client { #[deprecated( since = "0.15.0", - note = "Use libp2p_relay::client::ConnectionState instead." + note = "Use libp2p_relay::client::Connection instead." )] pub type RelayedConnection = crate::client::Connection; From 6bdc9ac4652e3e272cf5c81b5f006884cada5b70 Mon Sep 17 00:00:00 2001 From: Thomas Coratger Date: Tue, 25 Apr 2023 19:25:47 +0200 Subject: [PATCH 4/8] remove doc private ConnectionState --- protocols/relay/src/priv_client.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/protocols/relay/src/priv_client.rs b/protocols/relay/src/priv_client.rs index 6faec21c885..132ae35e674 100644 --- a/protocols/relay/src/priv_client.rs +++ b/protocols/relay/src/priv_client.rs @@ -390,7 +390,6 @@ pub struct Connection { state: ConnectionState, } -/// A [`NegotiatedSubstream`] acting as a [`ConnectionState`]. #[allow(dead_code)] enum ConnectionState { InboundAccepting { From e7c79a9c894f97b56b326ce54f06943cb5371bbd Mon Sep 17 00:00:00 2001 From: Thomas Coratger <60488569+tcoratger@users.noreply.github.com> Date: Tue, 25 Apr 2023 19:26:34 +0200 Subject: [PATCH 5/8] Update protocols/relay/src/priv_client.rs Co-authored-by: Thomas Eizinger --- protocols/relay/src/priv_client.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/protocols/relay/src/priv_client.rs b/protocols/relay/src/priv_client.rs index 132ae35e674..8d81bc7b7ed 100644 --- a/protocols/relay/src/priv_client.rs +++ b/protocols/relay/src/priv_client.rs @@ -386,6 +386,9 @@ impl NetworkBehaviour for Behaviour { } } +/// Represents a connection to another peer via a relay. +/// +/// Internally, this uses a stream to the relay. pub struct Connection { state: ConnectionState, } From 0e8f81e912a4047f653fb59e43a6eb088c3d7dab Mon Sep 17 00:00:00 2001 From: Thomas Coratger Date: Tue, 25 Apr 2023 23:12:34 +0200 Subject: [PATCH 6/8] remove dead code allowance --- protocols/relay/src/priv_client.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/protocols/relay/src/priv_client.rs b/protocols/relay/src/priv_client.rs index 8d81bc7b7ed..5af56a1ac93 100644 --- a/protocols/relay/src/priv_client.rs +++ b/protocols/relay/src/priv_client.rs @@ -393,7 +393,6 @@ pub struct Connection { state: ConnectionState, } -#[allow(dead_code)] enum ConnectionState { InboundAccepting { accept: BoxFuture<'static, Result>, From 308db96595c4f30b50ba4c38bb17adadcead0150 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 26 Apr 2023 12:24:46 +0200 Subject: [PATCH 7/8] Document drop notifier pattern --- protocols/relay/src/priv_client.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/protocols/relay/src/priv_client.rs b/protocols/relay/src/priv_client.rs index 5af56a1ac93..9f250adf8d8 100644 --- a/protocols/relay/src/priv_client.rs +++ b/protocols/relay/src/priv_client.rs @@ -400,6 +400,11 @@ enum ConnectionState { Operational { read_buffer: Bytes, substream: NegotiatedSubstream, + /// "Drop notifier" pattern to signal to the transport that the connection has been dropped. + /// + /// This is flagged as "dead-code" by the compiler because we never read from it here. + /// However, it is actual use is to trigger the `Canceled` error in the `Transport` when this `Sender` is dropped. + #[allow(dead_code)] drop_notifier: oneshot::Sender, }, } From 3dbb583b52fd3e2fc7539919f63a19192e9c663d Mon Sep 17 00:00:00 2001 From: Thomas Coratger Date: Tue, 2 May 2023 11:30:24 +0200 Subject: [PATCH 8/8] add changelog entry --- protocols/relay/CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/protocols/relay/CHANGELOG.md b/protocols/relay/CHANGELOG.md index f718fc5adf5..4b3b9778439 100644 --- a/protocols/relay/CHANGELOG.md +++ b/protocols/relay/CHANGELOG.md @@ -3,7 +3,11 @@ - Raise MSRV to 1.65. See [PR 3715]. +- Hide internals of `Connection` and expose only `AsyncRead` and `AsyncWrite`. + See [PR 3829]. + [PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715 +[PR 3829]: https://github.com/libp2p/rust-libp2p/pull/3829 ## 0.15.2