Skip to content

Commit

Permalink
transports/tcp: impl Stream for GenTcpTransport
Browse files Browse the repository at this point in the history
  • Loading branch information
elenaf9 committed May 22, 2022
1 parent bfd5fb0 commit 90721b9
Showing 1 changed file with 50 additions and 17 deletions.
67 changes: 50 additions & 17 deletions transports/tcp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub type TokioTcpTransport = GenTcpTransport<tokio::Tcp>;
use futures::{
future::{self, BoxFuture, Ready},
prelude::*,
ready,
ready, stream::FusedStream,
};
use futures_timer::Delay;
use libp2p_core::{
Expand Down Expand Up @@ -293,9 +293,9 @@ impl GenTcpConfig {
/// following example:
///
/// ```no_run
/// # use futures::StreamExt;
/// # use libp2p_core::transport::{ListenerId, TransportEvent};
/// # use libp2p_core::{Multiaddr, Transport};
/// # use futures::{stream::StreamExt, future::poll_fn};
/// # use std::pin::Pin;
/// #[cfg(feature = "async-io")]
/// #[async_std::main]
Expand All @@ -307,7 +307,7 @@ impl GenTcpConfig {
///
/// let mut tcp1 = TcpTransport::new(GenTcpConfig::new().port_reuse(true));
/// tcp1.listen_on(ListenerId::new(1), listen_addr1.clone()).expect("listener");
/// match poll_fn(|cx| Pin::new(&mut tcp1).poll(cx)).await {
/// match tcp1.select_next_some().await {
/// TransportEvent::NewAddress { listen_addr, .. } => {
/// println!("Listening on {:?}", listen_addr);
/// let mut stream = tcp1.dial(listen_addr2.clone()).unwrap().await?;
Expand All @@ -318,7 +318,7 @@ impl GenTcpConfig {
///
/// let mut tcp2 = TcpTransport::new(GenTcpConfig::new().port_reuse(true));
/// tcp2.listen_on(ListenerId::new(1), listen_addr2).expect("listener");
/// match poll_fn(|cx| Pin::new(&mut tcp2).poll(cx)).await {
/// match tcp2.select_next_some().await {
/// TransportEvent::NewAddress { listen_addr, .. } => {
/// println!("Listening on {:?}", listen_addr);
/// let mut socket = tcp2.dial(listen_addr1).unwrap().await?;
Expand Down Expand Up @@ -550,6 +550,32 @@ where
}
}

impl<T> Stream for GenTcpTransport<T>
where
T: Provider + Send + 'static,
T::Listener: Unpin,
T::IfWatcher: Unpin,
T::Stream: Unpin,
{
type Item = TransportEvent<<Self as Transport>::ListenerUpgrade, <Self as Transport>::Error>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Transport::poll(self, cx).map(Some)
}
}
impl<T> FusedStream for GenTcpTransport<T>
where
T: Provider + Send + 'static,
T::Listener: Unpin,
T::IfWatcher: Unpin,
T::Stream: Unpin,
{
fn is_terminated(&self) -> bool {
false
}
}


#[derive(Debug)]
pub enum TcpTransportEvent<S> {
/// The transport is listening on a new additional [`Multiaddr`].
Expand Down Expand Up @@ -855,7 +881,7 @@ fn ip_to_multiaddr(ip: IpAddr, port: u16) -> Multiaddr {
#[cfg(test)]
mod tests {
use super::*;
use futures::{channel::mpsc, future::poll_fn};
use futures::channel::mpsc;

#[test]
fn multiaddr_to_tcp_conversion() {
Expand Down Expand Up @@ -914,7 +940,7 @@ mod tests {
let mut tcp = GenTcpTransport::<T>::new(GenTcpConfig::new());
tcp.listen_on(ListenerId::new(1), addr).unwrap();
loop {
match poll_fn(|cx| Pin::new(&mut tcp).poll(cx)).await {
match tcp.select_next_some().await {
TransportEvent::NewAddress { listen_addr, .. } => {
ready_tx.send(listen_addr).await.unwrap();
}
Expand Down Expand Up @@ -984,7 +1010,7 @@ mod tests {
tcp.listen_on(ListenerId::new(1), addr).unwrap();

loop {
match poll_fn(|cx| Pin::new(&mut tcp).poll(cx)).await {
match tcp.select_next_some().await {
TransportEvent::NewAddress { listen_addr, .. } => {
let mut iter = listen_addr.iter();
match iter.next().expect("ip address") {
Expand Down Expand Up @@ -1052,7 +1078,7 @@ mod tests {
let mut tcp = GenTcpTransport::<T>::new(GenTcpConfig::new());
tcp.listen_on(ListenerId::new(1), addr).unwrap();
loop {
match poll_fn(|cx| Pin::new(&mut tcp).poll(cx)).await {
match tcp.select_next_some().await {
TransportEvent::NewAddress { listen_addr, .. } => {
ready_tx.send(listen_addr).await.ok();
}
Expand All @@ -1073,7 +1099,7 @@ mod tests {
let dest_addr = ready_rx.next().await.unwrap();
let mut tcp = GenTcpTransport::<T>::new(GenTcpConfig::new().port_reuse(true));
tcp.listen_on(ListenerId::new(1), addr).unwrap();
match poll_fn(|cx| Pin::new(&mut tcp).poll(cx)).await {
match tcp.select_next_some().await {
TransportEvent::NewAddress { .. } => {
// Obtain a future socket through dialing
let mut socket = tcp.dial(dest_addr).unwrap().await.unwrap();
Expand Down Expand Up @@ -1122,16 +1148,22 @@ mod tests {
fn port_reuse_listening() {
env_logger::try_init().ok();

async fn listen_twice<T: Provider>(addr: Multiaddr) {
async fn listen_twice<T>(addr: Multiaddr)
where
T: Provider + Sized + Send + Sync + Unpin + 'static,
T::Listener: Sync,
T::IfWatcher: Sync,
T::Stream: Sync,
{
let mut tcp = GenTcpTransport::<T>::new(GenTcpConfig::new().port_reuse(true));
tcp.listen_on(ListenerId::new(1), addr).unwrap();
match poll_fn(|cx| Pin::new(&mut tcp).poll(cx)).await {
match tcp.select_next_some().await {
TransportEvent::NewAddress {
listen_addr: addr1, ..
} => {
// Listen on the same address a second time.
tcp.listen_on(ListenerId::new(1), addr1.clone()).unwrap();
match poll_fn(|cx| Pin::new(&mut tcp).poll(cx)).await {
match tcp.select_next_some().await {
TransportEvent::NewAddress {
listen_addr: addr2, ..
} => {
Expand Down Expand Up @@ -1170,13 +1202,14 @@ mod tests {
fn listen_port_0() {
env_logger::try_init().ok();

async fn listen<T: Provider>(addr: Multiaddr) -> Multiaddr {
async fn listen<T>(addr: Multiaddr) -> Multiaddr
where
T: Provider,
T::IfWatcher: Sync
{
let mut tcp = GenTcpTransport::<T>::new(GenTcpConfig::new());
tcp.listen_on(ListenerId::new(1), addr).unwrap();
poll_fn(|cx| Pin::new(&mut tcp).poll(cx))
.await
.into_new_address()
.expect("listen address")
tcp.select_next_some().await.into_new_address().expect("listen address")
}

fn test(addr: Multiaddr) {
Expand Down

0 comments on commit 90721b9

Please sign in to comment.