Skip to content

Commit

Permalink
transports/quic: adapt to transport trait changes
Browse files Browse the repository at this point in the history
Adapt to the transport changes of libp2p#2652.
Note: this is only a draft "to make it work", and not a proper
implementation. It does not support listening on multiple addresses.
The listening logic with multiple Endpoints will need to be supported for
the upstream implementation.
  • Loading branch information
elenaf9 committed Jul 10, 2022
1 parent 6b758e3 commit 5157ea1
Showing 1 changed file with 54 additions and 35 deletions.
89 changes: 54 additions & 35 deletions transports/quic/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use if_watch::IfEvent;

use libp2p_core::{
multiaddr::{Multiaddr, Protocol},
transport::{ListenerEvent, TransportError},
transport::{ListenerId, TransportError, TransportEvent},
PeerId, Transport,
};
use std::task::{Context, Poll};
Expand All @@ -52,18 +52,16 @@ pub use quinn_proto::{
#[derive(Debug, Clone)]
pub struct QuicTransport {
endpoint: Arc<Endpoint>,
/// The IP addresses of network interfaces on which the listening socket
/// is accepting connections.
///
/// If the listen socket listens on all interfaces, these may change over
/// time as interfaces become available or unavailable.
in_addr: InAddr,

listener: Option<(ListenerId, InAddr)>,
}

impl QuicTransport {
pub fn new(endpoint: Arc<Endpoint>) -> Self {
let in_addr = InAddr::new(endpoint.local_addr.ip());
Self { endpoint, in_addr }
Self {
endpoint,
listener: None
}
}
}

Expand All @@ -84,23 +82,27 @@ pub enum Error {
impl Transport for QuicTransport {
type Output = (PeerId, QuicMuxer);
type Error = Error;
// type Listener = Pin<
// Box<dyn Stream<Item = Result<ListenerEvent<Upgrade, Self::Error>, Self::Error>> + Send>,
// >;
type Listener = Self;
type ListenerUpgrade = Upgrade;
type Dial = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;

fn listen_on(
&mut self,
addr: Multiaddr,
) -> Result<Self::Listener, TransportError<Self::Error>> {
fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<Self::Error>> {
multiaddr_to_socketaddr(&addr)
.ok_or_else(|| TransportError::MultiaddrNotSupported(addr))?;
Ok(self.clone())
let listener = self.listener.get_or_insert((ListenerId::new(), InAddr::new(self.endpoint.local_addr.ip())));
Ok(listener.0)
}

fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
fn remove_listener(&mut self, id: ListenerId) -> bool {
if let Some((listener_id, _)) = self.listener {
if id == listener_id {
self.listener = None;
return true
}
}
false
}

fn address_translation(&self, _server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
Some(observed.clone())
}

Expand Down Expand Up @@ -136,17 +138,21 @@ impl Transport for QuicTransport {
// https://github.com/libp2p/specs/blob/master/relay/DCUtR.md#the-protocol
self.dial(addr)
}
}

impl Stream for QuicTransport {
type Item = Result<ListenerEvent<Upgrade, Error>, Error>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
let me = Pin::into_inner(self);
// Poll for a next IfEvent
let (listener_id, in_addr) = match me.listener.as_mut() {
Some((id, in_addr)) => (*id, in_addr),
None => return Poll::Pending
};
let endpoint = me.endpoint.as_ref();

// Poll for a next IfEvent
match me.in_addr.poll_next_unpin(cx) {
match in_addr.poll_next_unpin(cx) {
Poll::Ready(mut item) => {
if let Some(item) = item.take() {
// Consume all events for up/down interface changes.
Expand All @@ -157,7 +163,10 @@ impl Stream for QuicTransport {
let socket_addr = SocketAddr::new(ip, endpoint.local_addr.port());
let ma = socketaddr_to_multiaddr(&socket_addr);
tracing::debug!("New listen address: {}", ma);
return Poll::Ready(Some(Ok(ListenerEvent::NewAddress(ma))));
return Poll::Ready(TransportEvent::NewAddress {
listener_id,
listen_addr: ma,
});
}
}
Ok(IfEvent::Down(inet)) => {
Expand All @@ -166,17 +175,21 @@ impl Stream for QuicTransport {
let socket_addr = SocketAddr::new(ip, endpoint.local_addr.port());
let ma = socketaddr_to_multiaddr(&socket_addr);
tracing::debug!("Expired listen address: {}", ma);
return Poll::Ready(Some(Ok(ListenerEvent::AddressExpired(ma))));
return Poll::Ready(TransportEvent::AddressExpired {
listener_id,
listen_addr: ma,
});
}
}
Err(err) => {
tracing::debug! {
"Failure polling interfaces: {:?}.",
err
};
return Poll::Ready(Some(Ok(ListenerEvent::Error(Error::IfWatcher(
err,
)))));
return Poll::Ready(TransportEvent::ListenerError {
listener_id,
error: Error::IfWatcher(err),
});
}
}
}
Expand All @@ -188,17 +201,23 @@ impl Stream for QuicTransport {

let connection = match endpoint.poll_incoming(cx) {
Poll::Ready(Some(connection)) => connection,
Poll::Ready(None) => return Poll::Ready(None),
Poll::Ready(None) => {
return Poll::Ready(TransportEvent::ListenerClosed {
listener_id,
reason: Ok(()),
})
}
Poll::Pending => return Poll::Pending,
};
let local_addr = socketaddr_to_multiaddr(&connection.local_addr());
let remote_addr = socketaddr_to_multiaddr(&connection.remote_addr());
let event = ListenerEvent::Upgrade {
let send_back_addr = socketaddr_to_multiaddr(&connection.remote_addr());
let event = TransportEvent::Incoming {
upgrade: Upgrade::from_connection(connection),
local_addr,
remote_addr,
send_back_addr,
listener_id,
};
Poll::Ready(Some(Ok(event)))
Poll::Ready(event)
}
}

Expand Down

0 comments on commit 5157ea1

Please sign in to comment.