Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transports/tcp: Update if-watch to v3.0.0 #3101

Merged
merged 5 commits into from
Nov 15, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions transports/tcp/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
# 0.38.0 [unreleased]

- Update `if-watch` to version 3.0.0 and pass through `tokio` and `async-io` features. See [PR 3101].
rkuhn marked this conversation as resolved.
Show resolved Hide resolved

- Deprecate types with `Tcp` prefix (`GenTcpConfig`, `TcpTransport` and `TokioTcpTransport`) in favor of referencing them by module / crate. See [PR 2961].

- Remove `TcpListenStream` and `TcpListenerEvent` from public API. See [PR 2961].

- Update to `libp2p-core` `v0.38.0`.

[PR 3101]: https://github.com/libp2p/rust-libp2p/pull/3101
[PR 2961]: https://github.com/libp2p/rust-libp2p/pull/2961

# 0.37.0
Expand Down
14 changes: 7 additions & 7 deletions transports/tcp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,26 @@ keywords = ["peer-to-peer", "libp2p", "networking"]
categories = ["network-programming", "asynchronous"]

[dependencies]
async-io-crate = { package = "async-io", version = "1.2.0", optional = true }
async-io = { version = "1.2.0", optional = true }
futures = "0.3.8"
futures-timer = "3.0"
if-watch = "2.0.0"
if-watch = "3.0.0"
libc = "0.2.80"
libp2p-core = { version = "0.38.0", path = "../../core" }
log = "0.4.11"
socket2 = { version = "0.4.0", features = ["all"] }
tokio-crate = { package = "tokio", version = "1.19.0", default-features = false, features = ["net"], optional = true }
tokio = { version = "1.19.0", default-features = false, features = ["net"], optional = true }

[features]
tokio = ["tokio-crate"]
async-io = ["async-io-crate"]
tokio = ["dep:tokio", "if-watch/tokio"]
async-io = ["dep:async-io", "if-watch/smol"]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

switching to dep:tokio style as was done in other crates — let me know if this one was deliberately left unchanged


[dev-dependencies]
async-std = { version = "1.6.5", features = ["attributes"] }
tokio-crate = { package = "tokio", version = "1.0.1", default-features = false, features = ["full"] }
tokio = { version = "1.0.1", default-features = false, features = ["full"] }
env_logger = "0.9.0"

# Passing arguments to the docsrs builder in order to properly document cfg's.
# Passing arguments to the docsrs builder in order to properly document cfg's.
# More information: https://docs.rs/about/builds#cross-compiling
[package.metadata.docs.rs]
all-features = true
Expand Down
28 changes: 14 additions & 14 deletions transports/tcp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use futures::{
prelude::*,
};
use futures_timer::Delay;
use if_watch::{IfEvent, IfWatcher};
use if_watch::IfEvent;
use libp2p_core::{
address_translation,
multiaddr::{Multiaddr, Protocol},
Expand Down Expand Up @@ -385,7 +385,7 @@ where
return TcpListenStream::<T>::new(
id,
listener,
Some(IfWatcher::new()?),
Some(T::new_if_watcher()?),
self.port_reuse.clone(),
);
}
Expand Down Expand Up @@ -656,7 +656,7 @@ where
/// become or stop being available.
///
/// `None` if the socket is only listening on a single interface.
if_watcher: Option<IfWatcher>,
if_watcher: Option<T::IfWatcher>,
/// The port reuse configuration for outgoing connections.
///
/// If enabled, all IP addresses on which this listening stream
Expand All @@ -680,7 +680,7 @@ where
fn new(
listener_id: ListenerId,
listener: TcpListener,
if_watcher: Option<IfWatcher>,
if_watcher: Option<T::IfWatcher>,
port_reuse: PortReuse,
) -> io::Result<Self> {
let listen_addr = listener.local_addr()?;
Expand All @@ -706,7 +706,7 @@ where
fn disable_port_reuse(&mut self) {
match &self.if_watcher {
Some(if_watcher) => {
for ip_net in if_watcher.iter() {
for ip_net in T::addrs(if_watcher) {
self.port_reuse
.unregister(ip_net.addr(), self.listen_addr.port());
}
Expand Down Expand Up @@ -749,7 +749,7 @@ where
}

if let Some(if_watcher) = me.if_watcher.as_mut() {
while let Poll::Ready(event) = if_watcher.poll_if_event(cx) {
while let Poll::Ready(Some(event)) = if_watcher.poll_next_unpin(cx) {
match event {
Ok(IfEvent::Up(inet)) => {
let ip = inet.addr();
Expand Down Expand Up @@ -986,11 +986,11 @@ mod tests {
let (ready_tx, ready_rx) = mpsc::channel(1);
let listener = listener::<tokio::Tcp>(addr, ready_tx);
let dialer = dialer::<tokio::Tcp>(ready_rx);
let rt = tokio_crate::runtime::Builder::new_current_thread()
let rt = ::tokio::runtime::Builder::new_current_thread()
.enable_io()
.build()
.unwrap();
let tasks = tokio_crate::task::LocalSet::new();
let tasks = ::tokio::task::LocalSet::new();
let listener = tasks.spawn_local(listener);
tasks.block_on(&rt, dialer);
tasks.block_on(&rt, listener).unwrap();
Expand Down Expand Up @@ -1055,11 +1055,11 @@ mod tests {
let (ready_tx, ready_rx) = mpsc::channel(1);
let listener = listener::<tokio::Tcp>(addr, ready_tx);
let dialer = dialer::<tokio::Tcp>(ready_rx);
let rt = tokio_crate::runtime::Builder::new_current_thread()
let rt = ::tokio::runtime::Builder::new_current_thread()
.enable_io()
.build()
.unwrap();
let tasks = tokio_crate::task::LocalSet::new();
let tasks = ::tokio::task::LocalSet::new();
let listener = tasks.spawn_local(listener);
tasks.block_on(&rt, dialer);
tasks.block_on(&rt, listener).unwrap();
Expand Down Expand Up @@ -1162,11 +1162,11 @@ mod tests {
let (port_reuse_tx, port_reuse_rx) = oneshot::channel();
let listener = listener::<tokio::Tcp>(addr.clone(), ready_tx, port_reuse_rx);
let dialer = dialer::<tokio::Tcp>(addr, ready_rx, port_reuse_tx);
let rt = tokio_crate::runtime::Builder::new_current_thread()
let rt = ::tokio::runtime::Builder::new_current_thread()
.enable_io()
.build()
.unwrap();
let tasks = tokio_crate::task::LocalSet::new();
let tasks = ::tokio::task::LocalSet::new();
let listener = tasks.spawn_local(listener);
tasks.block_on(&rt, dialer);
tasks.block_on(&rt, listener).unwrap();
Expand Down Expand Up @@ -1220,7 +1220,7 @@ mod tests {
#[cfg(feature = "tokio")]
{
let listener = listen_twice::<tokio::Tcp>(addr);
let rt = tokio_crate::runtime::Builder::new_current_thread()
let rt = ::tokio::runtime::Builder::new_current_thread()
.enable_io()
.build()
.unwrap();
Expand Down Expand Up @@ -1253,7 +1253,7 @@ mod tests {

#[cfg(feature = "tokio")]
{
let rt = tokio_crate::runtime::Builder::new_current_thread()
let rt = ::tokio::runtime::Builder::new_current_thread()
.enable_io()
.build()
.unwrap();
Expand Down
10 changes: 10 additions & 0 deletions transports/tcp/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ pub mod tokio;

use futures::future::BoxFuture;
use futures::io::{AsyncRead, AsyncWrite};
use futures::Stream;
use if_watch::{IfEvent, IpNet};
use std::net::{SocketAddr, TcpListener, TcpStream};
use std::task::{Context, Poll};
use std::{fmt, io};
Expand All @@ -46,6 +48,14 @@ pub trait Provider: Clone + Send + 'static {
type Stream: AsyncRead + AsyncWrite + Send + Unpin + fmt::Debug;
/// The type of TCP listeners obtained from [`Provider::new_listener`].
type Listener: Send + Unpin;
/// The type of IfWatcher obtained from [`Provider::new_if_watcher`].
type IfWatcher: Stream<Item = io::Result<IfEvent>> + Send + Unpin;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
type IfWatcher: Stream<Item = io::Result<IfEvent>> + Send + Unpin;
type IfWatcher: Stream<Item = io::Result<IfEvent>> + IntoIterator<Item = IpNet> + Send + Unpin;

Is this legal Rust? My brain compiler is unsure but it could avoid the need for the addrs function I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is legal syntax, but neither IfWatcher implementation implements this trait, plus into_iter consumes self, so we wouldn’t be able to use this here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah damn!

The naming of iter suggested that we actually implement the Iterator trait but I guess the consuming nature of IntoIterator makes this unworkable anyway.

Thanks for checking!


/// Create a new IfWatcher responsible for detecting IP address changes.
fn new_if_watcher() -> io::Result<Self::IfWatcher>;

/// An iterator over all currently discovered addresses.
fn addrs(_: &Self::IfWatcher) -> Vec<IpNet>;

/// Creates a new listener wrapping the given [`TcpListener`] that
/// can be polled for incoming connections via [`Self::poll_accept()`].
Expand Down
11 changes: 10 additions & 1 deletion transports/tcp/src/provider/async_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

use super::{Incoming, Provider};

use async_io_crate::Async;
use async_io::Async;
use futures::future::{BoxFuture, FutureExt};
use std::io;
use std::net;
Expand Down Expand Up @@ -55,6 +55,15 @@ pub enum Tcp {}
impl Provider for Tcp {
type Stream = Async<net::TcpStream>;
type Listener = Async<net::TcpListener>;
type IfWatcher = if_watch::smol::IfWatcher;

fn new_if_watcher() -> io::Result<Self::IfWatcher> {
Self::IfWatcher::new()
}

fn addrs(if_watcher: &Self::IfWatcher) -> Vec<if_watch::IpNet> {
if_watcher.iter().copied().collect()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning the iterator would be possible using a HRTB, but given that this is only called in a rare error path I opted for the simpler approach.

}

fn new_listener(l: net::TcpListener) -> io::Result<Self::Listener> {
Async::new(l)
Expand Down
39 changes: 24 additions & 15 deletions transports/tcp/src/provider/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use std::task::{Context, Poll};
/// # use libp2p_core::Transport;
/// # use futures::future;
/// # use std::pin::Pin;
/// # use tokio_crate as tokio;
/// # use tokio as tokio;
rkuhn marked this conversation as resolved.
Show resolved Hide resolved
/// #
/// # #[tokio::main]
/// # async fn main() {
Expand All @@ -59,17 +59,26 @@ pub enum Tcp {}

impl Provider for Tcp {
type Stream = TcpStream;
type Listener = tokio_crate::net::TcpListener;
type Listener = tokio::net::TcpListener;
type IfWatcher = if_watch::tokio::IfWatcher;

fn new_if_watcher() -> io::Result<Self::IfWatcher> {
Self::IfWatcher::new()
}

fn addrs(if_watcher: &Self::IfWatcher) -> Vec<if_watch::IpNet> {
if_watcher.iter().copied().collect()
}

fn new_listener(l: net::TcpListener) -> io::Result<Self::Listener> {
tokio_crate::net::TcpListener::try_from(l)
tokio::net::TcpListener::try_from(l)
}

fn new_stream(s: net::TcpStream) -> BoxFuture<'static, io::Result<Self::Stream>> {
async move {
// Taken from [`tokio_crate::net::TcpStream::connect_mio`].
// Taken from [`tokio::net::TcpStream::connect_mio`].

let stream = tokio_crate::net::TcpStream::try_from(s)?;
let stream = tokio::net::TcpStream::try_from(s)?;

// Once we've connected, wait for the stream to be writable as
// that's when the actual connection has been initiated. Once we're
Expand Down Expand Up @@ -109,12 +118,12 @@ impl Provider for Tcp {
}
}

/// A [`tokio_crate::net::TcpStream`] that implements [`AsyncRead`] and [`AsyncWrite`].
/// A [`tokio::net::TcpStream`] that implements [`AsyncRead`] and [`AsyncWrite`].
#[derive(Debug)]
pub struct TcpStream(pub tokio_crate::net::TcpStream);
pub struct TcpStream(pub tokio::net::TcpStream);

impl From<TcpStream> for tokio_crate::net::TcpStream {
fn from(t: TcpStream) -> tokio_crate::net::TcpStream {
impl From<TcpStream> for tokio::net::TcpStream {
fn from(t: TcpStream) -> tokio::net::TcpStream {
t.0
}
}
Expand All @@ -125,8 +134,8 @@ impl AsyncRead for TcpStream {
cx: &mut Context,
buf: &mut [u8],
) -> Poll<Result<usize, io::Error>> {
let mut read_buf = tokio_crate::io::ReadBuf::new(buf);
futures::ready!(tokio_crate::io::AsyncRead::poll_read(
let mut read_buf = tokio::io::ReadBuf::new(buf);
futures::ready!(tokio::io::AsyncRead::poll_read(
Pin::new(&mut self.0),
cx,
&mut read_buf
Expand All @@ -141,22 +150,22 @@ impl AsyncWrite for TcpStream {
cx: &mut Context,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
tokio_crate::io::AsyncWrite::poll_write(Pin::new(&mut self.0), cx, buf)
tokio::io::AsyncWrite::poll_write(Pin::new(&mut self.0), cx, buf)
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
tokio_crate::io::AsyncWrite::poll_flush(Pin::new(&mut self.0), cx)
tokio::io::AsyncWrite::poll_flush(Pin::new(&mut self.0), cx)
}

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
tokio_crate::io::AsyncWrite::poll_shutdown(Pin::new(&mut self.0), cx)
tokio::io::AsyncWrite::poll_shutdown(Pin::new(&mut self.0), cx)
}

fn poll_write_vectored(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[io::IoSlice<'_>],
) -> Poll<io::Result<usize>> {
tokio_crate::io::AsyncWrite::poll_write_vectored(Pin::new(&mut self.0), cx, bufs)
tokio::io::AsyncWrite::poll_write_vectored(Pin::new(&mut self.0), cx, bufs)
}
}