Skip to content

Commit

Permalink
transports/quic: upgrade to if-watch v2.0.0
Browse files Browse the repository at this point in the history
See corresponding change in tcp transport: libp2p#2813.
  • Loading branch information
elenaf9 committed Sep 9, 2022
1 parent 69caf98 commit fe3e09b
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 179 deletions.
2 changes: 1 addition & 1 deletion transports/quic/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ license = "MIT"
async-global-executor = "2.0.2"
async-io = "1.6.0"
futures = "0.3.15"
if-watch = "1.0.0"
if-watch = "2.0.0"
libp2p-core = { version = "0.36.0", path = "../../core" }
parking_lot = "0.12.0"
quinn-proto = { version = "0.8.2", default-features = false, features = ["tls-rustls"] }
Expand Down
100 changes: 0 additions & 100 deletions transports/quic/src/in_addr.rs

This file was deleted.

1 change: 0 additions & 1 deletion transports/quic/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
mod connection;
mod endpoint;
mod error;
mod in_addr;
mod muxer;
mod tls;
mod upgrade;
Expand Down
153 changes: 76 additions & 77 deletions transports/quic/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@
use crate::connection::Connection;
use crate::endpoint::ToEndpoint;
use crate::Config;
use crate::{endpoint::Endpoint, in_addr::InAddr, muxer::QuicMuxer, upgrade::Upgrade};
use crate::{endpoint::Endpoint, muxer::QuicMuxer, upgrade::Upgrade};

use futures::channel::oneshot;
use futures::ready;
use futures::stream::StreamExt;
use futures::{channel::mpsc, prelude::*, stream::SelectAll};

use if_watch::IfEvent;
use if_watch::{IfEvent, IfWatcher};

use libp2p_core::{
multiaddr::{Multiaddr, Protocol},
Expand Down Expand Up @@ -253,17 +254,13 @@ struct Listener {
/// Channel where new connections are being sent.
new_connections_rx: mpsc::Receiver<Connection>,

/// The IP addresses of network interfaces on which the listening socket
/// is accepting connections.
///
/// If the listen socket listens on all interfaces, these may change over
/// time as interfaces become available or unavailable.
in_addr: InAddr,
if_watcher: Option<IfWatcher>,

/// Set to `Some` if this [`Listener`] should close.
/// Optionally contains a [`TransportEvent::ListenerClosed`] that should be
/// reported before the listener's stream is terminated.
report_closed: Option<Option<<Self as Stream>::Item>>,
/// Whether the listener was closed and the stream should terminate.
is_closed: bool,

/// Pending event to reported.
pending_event: Option<<Self as Stream>::Item>,

pending_dials: VecDeque<ToEndpoint>,

Expand All @@ -276,14 +273,29 @@ impl Listener {
socket_addr: SocketAddr,
config: Config,
) -> Result<Self, Error> {
let in_addr = InAddr::new(socket_addr.ip());
let (endpoint, new_connections_rx) = Endpoint::new_bidirectional(config, socket_addr)?;

let if_watcher;
let pending_event;
if socket_addr.ip().is_unspecified() {
if_watcher = Some(IfWatcher::new()?);
pending_event = None;
} else {
if_watcher = None;
let ma = socketaddr_to_multiaddr(&endpoint.socket_addr);
pending_event = Some(TransportEvent::NewAddress {
listener_id,
listen_addr: ma,
})
}

Ok(Listener {
endpoint,
listener_id,
new_connections_rx,
in_addr,
report_closed: None,
if_watcher,
is_closed: false,
pending_event,
pending_dials: VecDeque::new(),
waker: None,
})
Expand All @@ -292,68 +304,54 @@ impl Listener {
/// Report the listener as closed in a [`TransportEvent::ListenerClosed`] and
/// terminate the stream.
fn close(&mut self, reason: Result<(), Error>) {
match self.report_closed {
Some(_) => tracing::debug!("Listener was already closed."),
None => {
// Report the listener event as closed.
let _ = self
.report_closed
.insert(Some(TransportEvent::ListenerClosed {
listener_id: self.listener_id,
reason,
}));
}
if self.is_closed {
return;
}
self.pending_event = Some(TransportEvent::ListenerClosed {
listener_id: self.listener_id,
reason,
});
self.is_closed = true;
}

/// Poll for a next If Event.
fn poll_if_addr(&mut self, cx: &mut Context<'_>) -> Option<<Self as Stream>::Item> {
fn poll_if_addr(&mut self, cx: &mut Context<'_>) -> Poll<<Self as Stream>::Item> {
let if_watcher = match self.if_watcher.as_mut() {
Some(iw) => iw,
None => return Poll::Pending,
};
loop {
match self.in_addr.poll_next_unpin(cx) {
Poll::Ready(mut item) => {
if let Some(item) = item.take() {
// Consume all events for up/down interface changes.
match item {
Ok(IfEvent::Up(inet)) => {
let ip = inet.addr();
if self.endpoint.socket_addr.is_ipv4() == ip.is_ipv4() {
let socket_addr =
SocketAddr::new(ip, self.endpoint.socket_addr.port());
let ma = socketaddr_to_multiaddr(&socket_addr);
tracing::debug!("New listen address: {}", ma);
return Some(TransportEvent::NewAddress {
listener_id: self.listener_id,
listen_addr: ma,
});
}
}
Ok(IfEvent::Down(inet)) => {
let ip = inet.addr();
if self.endpoint.socket_addr.is_ipv4() == ip.is_ipv4() {
let socket_addr =
SocketAddr::new(ip, self.endpoint.socket_addr.port());
let ma = socketaddr_to_multiaddr(&socket_addr);
tracing::debug!("Expired listen address: {}", ma);
return Some(TransportEvent::AddressExpired {
listener_id: self.listener_id,
listen_addr: ma,
});
}
}
Err(err) => {
tracing::debug! {
"Failure polling interfaces: {:?}.",
err
};
return Some(TransportEvent::ListenerError {
listener_id: self.listener_id,
error: err.into(),
});
}
}
match ready!(if_watcher.poll_if_event(cx)) {
Ok(IfEvent::Up(inet)) => {
let ip = inet.addr();
if self.endpoint.socket_addr.is_ipv4() == ip.is_ipv4() {
let socket_addr = SocketAddr::new(ip, self.endpoint.socket_addr.port());
let ma = socketaddr_to_multiaddr(&socket_addr);
tracing::debug!("New listen address: {}", ma);
return Poll::Ready(TransportEvent::NewAddress {
listener_id: self.listener_id,
listen_addr: ma,
});
}
}
Poll::Pending => return None,
Ok(IfEvent::Down(inet)) => {
let ip = inet.addr();
if self.endpoint.socket_addr.is_ipv4() == ip.is_ipv4() {
let socket_addr = SocketAddr::new(ip, self.endpoint.socket_addr.port());
let ma = socketaddr_to_multiaddr(&socket_addr);
tracing::debug!("Expired listen address: {}", ma);
return Poll::Ready(TransportEvent::AddressExpired {
listener_id: self.listener_id,
listen_addr: ma,
});
}
}
Err(err) => {
return Poll::Ready(TransportEvent::ListenerError {
listener_id: self.listener_id,
error: err.into(),
})
}
}
}
}
Expand All @@ -363,15 +361,16 @@ impl Stream for Listener {
type Item = TransportEvent<<QuicTransport as Transport>::ListenerUpgrade, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
if let Some(closed) = self.report_closed.as_mut() {
// Listener was closed.
// Report the transport event if there is one. On the next iteration, return
// `Poll::Ready(None)` to terminate the stream.
return Poll::Ready(closed.take());
}
if let Some(event) = self.poll_if_addr(cx) {
if let Some(event) = self.pending_event.take() {
return Poll::Ready(Some(event));
}
if self.is_closed {
return Poll::Ready(None);
}
match self.poll_if_addr(cx) {
Poll::Ready(event) => return Poll::Ready(Some(event)),
Poll::Pending => {}
}
if !self.pending_dials.is_empty() {
match self.endpoint.to_endpoint.poll_ready_unpin(cx) {
Poll::Ready(Ok(_)) => {
Expand Down

0 comments on commit fe3e09b

Please sign in to comment.