Skip to content

Commit

Permalink
transports/tcp: Update if-watch to v3.0.0 (#3101)
Browse files Browse the repository at this point in the history
Update to `if-watch` version 3.0.0 and pass through features, such that `libp2p-tcp/async-io` selects `if-watch/smol` and `libp2p-tcp/tokio` brings in `if-watch/tokio`.
The mDNS part is already done in #3096.
  • Loading branch information
rkuhn authored Nov 15, 2022
1 parent ca8bcd8 commit d8fe7bf
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 37 deletions.
3 changes: 3 additions & 0 deletions transports/tcp/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
# 0.38.0 [unreleased]

- Update to `if-watch` `v3.0.0` and pass through `tokio` and `async-io` features. See [PR 3101].

- Deprecate types with `Tcp` prefix (`GenTcpConfig`, `TcpTransport` and `TokioTcpTransport`) in favor of referencing them by module / crate. See [PR 2961].

- Remove `TcpListenStream` and `TcpListenerEvent` from public API. See [PR 2961].

- Update to `libp2p-core` `v0.38.0`.

[PR 3101]: https://github.com/libp2p/rust-libp2p/pull/3101
[PR 2961]: https://github.com/libp2p/rust-libp2p/pull/2961

# 0.37.0
Expand Down
14 changes: 7 additions & 7 deletions transports/tcp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,26 @@ keywords = ["peer-to-peer", "libp2p", "networking"]
categories = ["network-programming", "asynchronous"]

[dependencies]
async-io-crate = { package = "async-io", version = "1.2.0", optional = true }
async-io = { version = "1.2.0", optional = true }
futures = "0.3.8"
futures-timer = "3.0"
if-watch = "2.0.0"
if-watch = "3.0.0"
libc = "0.2.80"
libp2p-core = { version = "0.38.0", path = "../../core" }
log = "0.4.11"
socket2 = { version = "0.4.0", features = ["all"] }
tokio-crate = { package = "tokio", version = "1.19.0", default-features = false, features = ["net"], optional = true }
tokio = { version = "1.19.0", default-features = false, features = ["net"], optional = true }

[features]
tokio = ["tokio-crate"]
async-io = ["async-io-crate"]
tokio = ["dep:tokio", "if-watch/tokio"]
async-io = ["dep:async-io", "if-watch/smol"]

[dev-dependencies]
async-std = { version = "1.6.5", features = ["attributes"] }
tokio-crate = { package = "tokio", version = "1.0.1", default-features = false, features = ["full"] }
tokio = { version = "1.0.1", default-features = false, features = ["full"] }
env_logger = "0.9.0"

# Passing arguments to the docsrs builder in order to properly document cfg's.
# Passing arguments to the docsrs builder in order to properly document cfg's.
# More information: https://docs.rs/about/builds#cross-compiling
[package.metadata.docs.rs]
all-features = true
Expand Down
28 changes: 14 additions & 14 deletions transports/tcp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use futures::{
prelude::*,
};
use futures_timer::Delay;
use if_watch::{IfEvent, IfWatcher};
use if_watch::IfEvent;
use libp2p_core::{
address_translation,
multiaddr::{Multiaddr, Protocol},
Expand Down Expand Up @@ -385,7 +385,7 @@ where
return TcpListenStream::<T>::new(
id,
listener,
Some(IfWatcher::new()?),
Some(T::new_if_watcher()?),
self.port_reuse.clone(),
);
}
Expand Down Expand Up @@ -656,7 +656,7 @@ where
/// become or stop being available.
///
/// `None` if the socket is only listening on a single interface.
if_watcher: Option<IfWatcher>,
if_watcher: Option<T::IfWatcher>,
/// The port reuse configuration for outgoing connections.
///
/// If enabled, all IP addresses on which this listening stream
Expand All @@ -680,7 +680,7 @@ where
fn new(
listener_id: ListenerId,
listener: TcpListener,
if_watcher: Option<IfWatcher>,
if_watcher: Option<T::IfWatcher>,
port_reuse: PortReuse,
) -> io::Result<Self> {
let listen_addr = listener.local_addr()?;
Expand All @@ -706,7 +706,7 @@ where
fn disable_port_reuse(&mut self) {
match &self.if_watcher {
Some(if_watcher) => {
for ip_net in if_watcher.iter() {
for ip_net in T::addrs(if_watcher) {
self.port_reuse
.unregister(ip_net.addr(), self.listen_addr.port());
}
Expand Down Expand Up @@ -749,7 +749,7 @@ where
}

if let Some(if_watcher) = me.if_watcher.as_mut() {
while let Poll::Ready(event) = if_watcher.poll_if_event(cx) {
while let Poll::Ready(Some(event)) = if_watcher.poll_next_unpin(cx) {
match event {
Ok(IfEvent::Up(inet)) => {
let ip = inet.addr();
Expand Down Expand Up @@ -986,11 +986,11 @@ mod tests {
let (ready_tx, ready_rx) = mpsc::channel(1);
let listener = listener::<tokio::Tcp>(addr, ready_tx);
let dialer = dialer::<tokio::Tcp>(ready_rx);
let rt = tokio_crate::runtime::Builder::new_current_thread()
let rt = ::tokio::runtime::Builder::new_current_thread()
.enable_io()
.build()
.unwrap();
let tasks = tokio_crate::task::LocalSet::new();
let tasks = ::tokio::task::LocalSet::new();
let listener = tasks.spawn_local(listener);
tasks.block_on(&rt, dialer);
tasks.block_on(&rt, listener).unwrap();
Expand Down Expand Up @@ -1055,11 +1055,11 @@ mod tests {
let (ready_tx, ready_rx) = mpsc::channel(1);
let listener = listener::<tokio::Tcp>(addr, ready_tx);
let dialer = dialer::<tokio::Tcp>(ready_rx);
let rt = tokio_crate::runtime::Builder::new_current_thread()
let rt = ::tokio::runtime::Builder::new_current_thread()
.enable_io()
.build()
.unwrap();
let tasks = tokio_crate::task::LocalSet::new();
let tasks = ::tokio::task::LocalSet::new();
let listener = tasks.spawn_local(listener);
tasks.block_on(&rt, dialer);
tasks.block_on(&rt, listener).unwrap();
Expand Down Expand Up @@ -1162,11 +1162,11 @@ mod tests {
let (port_reuse_tx, port_reuse_rx) = oneshot::channel();
let listener = listener::<tokio::Tcp>(addr.clone(), ready_tx, port_reuse_rx);
let dialer = dialer::<tokio::Tcp>(addr, ready_rx, port_reuse_tx);
let rt = tokio_crate::runtime::Builder::new_current_thread()
let rt = ::tokio::runtime::Builder::new_current_thread()
.enable_io()
.build()
.unwrap();
let tasks = tokio_crate::task::LocalSet::new();
let tasks = ::tokio::task::LocalSet::new();
let listener = tasks.spawn_local(listener);
tasks.block_on(&rt, dialer);
tasks.block_on(&rt, listener).unwrap();
Expand Down Expand Up @@ -1220,7 +1220,7 @@ mod tests {
#[cfg(feature = "tokio")]
{
let listener = listen_twice::<tokio::Tcp>(addr);
let rt = tokio_crate::runtime::Builder::new_current_thread()
let rt = ::tokio::runtime::Builder::new_current_thread()
.enable_io()
.build()
.unwrap();
Expand Down Expand Up @@ -1253,7 +1253,7 @@ mod tests {

#[cfg(feature = "tokio")]
{
let rt = tokio_crate::runtime::Builder::new_current_thread()
let rt = ::tokio::runtime::Builder::new_current_thread()
.enable_io()
.build()
.unwrap();
Expand Down
10 changes: 10 additions & 0 deletions transports/tcp/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ pub mod tokio;

use futures::future::BoxFuture;
use futures::io::{AsyncRead, AsyncWrite};
use futures::Stream;
use if_watch::{IfEvent, IpNet};
use std::net::{SocketAddr, TcpListener, TcpStream};
use std::task::{Context, Poll};
use std::{fmt, io};
Expand All @@ -46,6 +48,14 @@ pub trait Provider: Clone + Send + 'static {
type Stream: AsyncRead + AsyncWrite + Send + Unpin + fmt::Debug;
/// The type of TCP listeners obtained from [`Provider::new_listener`].
type Listener: Send + Unpin;
/// The type of IfWatcher obtained from [`Provider::new_if_watcher`].
type IfWatcher: Stream<Item = io::Result<IfEvent>> + Send + Unpin;

/// Create a new IfWatcher responsible for detecting IP address changes.
fn new_if_watcher() -> io::Result<Self::IfWatcher>;

/// An iterator over all currently discovered addresses.
fn addrs(_: &Self::IfWatcher) -> Vec<IpNet>;

/// Creates a new listener wrapping the given [`TcpListener`] that
/// can be polled for incoming connections via [`Self::poll_accept()`].
Expand Down
11 changes: 10 additions & 1 deletion transports/tcp/src/provider/async_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

use super::{Incoming, Provider};

use async_io_crate::Async;
use async_io::Async;
use futures::future::{BoxFuture, FutureExt};
use std::io;
use std::net;
Expand Down Expand Up @@ -55,6 +55,15 @@ pub enum Tcp {}
impl Provider for Tcp {
type Stream = Async<net::TcpStream>;
type Listener = Async<net::TcpListener>;
type IfWatcher = if_watch::smol::IfWatcher;

fn new_if_watcher() -> io::Result<Self::IfWatcher> {
Self::IfWatcher::new()
}

fn addrs(if_watcher: &Self::IfWatcher) -> Vec<if_watch::IpNet> {
if_watcher.iter().copied().collect()
}

fn new_listener(l: net::TcpListener) -> io::Result<Self::Listener> {
Async::new(l)
Expand Down
38 changes: 23 additions & 15 deletions transports/tcp/src/provider/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ use std::task::{Context, Poll};
/// # use libp2p_core::Transport;
/// # use futures::future;
/// # use std::pin::Pin;
/// # use tokio_crate as tokio;
/// #
/// # #[tokio::main]
/// # async fn main() {
Expand All @@ -59,17 +58,26 @@ pub enum Tcp {}

impl Provider for Tcp {
type Stream = TcpStream;
type Listener = tokio_crate::net::TcpListener;
type Listener = tokio::net::TcpListener;
type IfWatcher = if_watch::tokio::IfWatcher;

fn new_if_watcher() -> io::Result<Self::IfWatcher> {
Self::IfWatcher::new()
}

fn addrs(if_watcher: &Self::IfWatcher) -> Vec<if_watch::IpNet> {
if_watcher.iter().copied().collect()
}

fn new_listener(l: net::TcpListener) -> io::Result<Self::Listener> {
tokio_crate::net::TcpListener::try_from(l)
tokio::net::TcpListener::try_from(l)
}

fn new_stream(s: net::TcpStream) -> BoxFuture<'static, io::Result<Self::Stream>> {
async move {
// Taken from [`tokio_crate::net::TcpStream::connect_mio`].
// Taken from [`tokio::net::TcpStream::connect_mio`].

let stream = tokio_crate::net::TcpStream::try_from(s)?;
let stream = tokio::net::TcpStream::try_from(s)?;

// Once we've connected, wait for the stream to be writable as
// that's when the actual connection has been initiated. Once we're
Expand Down Expand Up @@ -109,12 +117,12 @@ impl Provider for Tcp {
}
}

/// A [`tokio_crate::net::TcpStream`] that implements [`AsyncRead`] and [`AsyncWrite`].
/// A [`tokio::net::TcpStream`] that implements [`AsyncRead`] and [`AsyncWrite`].
#[derive(Debug)]
pub struct TcpStream(pub tokio_crate::net::TcpStream);
pub struct TcpStream(pub tokio::net::TcpStream);

impl From<TcpStream> for tokio_crate::net::TcpStream {
fn from(t: TcpStream) -> tokio_crate::net::TcpStream {
impl From<TcpStream> for tokio::net::TcpStream {
fn from(t: TcpStream) -> tokio::net::TcpStream {
t.0
}
}
Expand All @@ -125,8 +133,8 @@ impl AsyncRead for TcpStream {
cx: &mut Context,
buf: &mut [u8],
) -> Poll<Result<usize, io::Error>> {
let mut read_buf = tokio_crate::io::ReadBuf::new(buf);
futures::ready!(tokio_crate::io::AsyncRead::poll_read(
let mut read_buf = tokio::io::ReadBuf::new(buf);
futures::ready!(tokio::io::AsyncRead::poll_read(
Pin::new(&mut self.0),
cx,
&mut read_buf
Expand All @@ -141,22 +149,22 @@ impl AsyncWrite for TcpStream {
cx: &mut Context,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
tokio_crate::io::AsyncWrite::poll_write(Pin::new(&mut self.0), cx, buf)
tokio::io::AsyncWrite::poll_write(Pin::new(&mut self.0), cx, buf)
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
tokio_crate::io::AsyncWrite::poll_flush(Pin::new(&mut self.0), cx)
tokio::io::AsyncWrite::poll_flush(Pin::new(&mut self.0), cx)
}

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
tokio_crate::io::AsyncWrite::poll_shutdown(Pin::new(&mut self.0), cx)
tokio::io::AsyncWrite::poll_shutdown(Pin::new(&mut self.0), cx)
}

fn poll_write_vectored(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[io::IoSlice<'_>],
) -> Poll<io::Result<usize>> {
tokio_crate::io::AsyncWrite::poll_write_vectored(Pin::new(&mut self.0), cx, bufs)
tokio::io::AsyncWrite::poll_write_vectored(Pin::new(&mut self.0), cx, bufs)
}
}

0 comments on commit d8fe7bf

Please sign in to comment.