Skip to content

Commit

Permalink
[multistream-select] Fix panic with V1Lazy and add integration tests.
Browse files Browse the repository at this point in the history
Fixes a panic when using the `V1Lazy` negotiation protocol,
a regression introduced in libp2p#1484.

Thereby adds integration tests for a transport upgrade with both
`V1` and `V1Lazy` to the `multistream-select` crate to prevent
future regressions.
  • Loading branch information
Roman S. Borschel committed Oct 5, 2020
1 parent 049d808 commit 9924059
Show file tree
Hide file tree
Showing 12 changed files with 192 additions and 78 deletions.
29 changes: 20 additions & 9 deletions core/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,13 @@
//! any desired protocols. The rest of the module defines combinators for
//! modifying a transport through composition with other transports or protocol upgrades.
use crate::ConnectedPoint;
use crate::{ConnectedPoint, ConnectionInfo, muxing::{StreamMuxer, StreamMuxerBox}};
use futures::prelude::*;
use multiaddr::Multiaddr;
use std::{error::Error, fmt};
use std::time::Duration;

pub mod and_then;
pub mod boxed;
pub mod choice;
pub mod dummy;
pub mod map;
Expand All @@ -41,8 +40,10 @@ pub mod memory;
pub mod timeout;
pub mod upgrade;

mod boxed;
mod optional;

pub use self::boxed::Boxed;
pub use self::choice::OrTransport;
pub use self::memory::MemoryTransport;
pub use self::optional::OptionalTransport;
Expand Down Expand Up @@ -128,14 +129,24 @@ pub trait Transport {
where
Self: Sized;

/// Turns the transport into an abstract boxed (i.e. heap-allocated) transport.
fn boxed(self) -> boxed::Boxed<Self::Output, Self::Error>
where Self: Sized + Clone + Send + Sync + 'static,
Self::Dial: Send + 'static,
Self::Listener: Send + 'static,
Self::ListenerUpgrade: Send + 'static,
/// Boxes an authenticated, multiplexed transport, including the
/// `StreamMuxer` and transport errors.
fn boxed<I, M>(self) -> boxed::Boxed<(I, StreamMuxerBox), std::io::Error>
where
Self: Transport<Output = (I, M)> + Sized + Clone + Send + Sync + 'static,
Self::Dial: Send + 'static,
Self::Listener: Send + 'static,
Self::ListenerUpgrade: Send + 'static,
Self::Error: Send + Sync,
I: ConnectionInfo,
M: StreamMuxer + Send + Sync + 'static,
M::Substream: Send + 'static,
M::OutboundSubstream: Send + 'static

{
boxed::boxed(self)
boxed::boxed(
self.map(|(i, m), _| (i, StreamMuxerBox::new(m)))
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)))
}

/// Applies a function on the connections created by the transport.
Expand Down
17 changes: 8 additions & 9 deletions core/src/transport/boxed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use multiaddr::Multiaddr;
use std::{error, fmt, pin::Pin, sync::Arc};

/// See the `Transport::boxed` method.
#[inline]
pub fn boxed<T>(transport: T) -> Boxed<T::Output, T::Error>
where
T: Transport + Clone + Send + Sync + 'static,
Expand All @@ -37,9 +36,14 @@ where
}
}

pub type Dial<O, E> = Pin<Box<dyn Future<Output = Result<O, E>> + Send>>;
pub type Listener<O, E> = Pin<Box<dyn Stream<Item = Result<ListenerEvent<ListenerUpgrade<O, E>, E>, E>> + Send>>;
pub type ListenerUpgrade<O, E> = Pin<Box<dyn Future<Output = Result<O, E>> + Send>>;
/// See the `Transport::boxed` method.
pub struct Boxed<O, E> {
inner: Arc<dyn Abstract<O, E> + Send + Sync>,
}

type Dial<O, E> = Pin<Box<dyn Future<Output = Result<O, E>> + Send>>;
type Listener<O, E> = Pin<Box<dyn Stream<Item = Result<ListenerEvent<ListenerUpgrade<O, E>, E>, E>> + Send>>;
type ListenerUpgrade<O, E> = Pin<Box<dyn Future<Output = Result<O, E>> + Send>>;

trait Abstract<O, E> {
fn listen_on(&self, addr: Multiaddr) -> Result<Listener<O, E>, TransportError<E>>;
Expand Down Expand Up @@ -68,11 +72,6 @@ where
}
}

/// See the `Transport::boxed` method.
pub struct Boxed<O, E> {
inner: Arc<dyn Abstract<O, E> + Send + Sync>,
}

impl<O, E> fmt::Debug for Boxed<O, E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "BoxedTransport")
Expand Down
17 changes: 2 additions & 15 deletions core/tests/network_dial_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,11 @@ use libp2p_core::{
use libp2p_noise as noise;
use rand::Rng;
use rand::seq::SliceRandom;
use std::{io, error::Error, fmt, task::Poll};
use std::{io, task::Poll};
use util::TestHandler;

type TestNetwork = Network<TestTransport, (), (), TestHandler>;
type TestTransport = transport::boxed::Boxed<(PeerId, StreamMuxerBox), BoxError>;

#[derive(Debug)]
struct BoxError(Box<dyn Error + Send + 'static>);

impl Error for BoxError {}

impl fmt::Display for BoxError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Transport error: {}", self.0)
}
}
type TestTransport = transport::Boxed<(PeerId, StreamMuxerBox), io::Error>;

fn new_network(cfg: NetworkConfig) -> TestNetwork {
let local_key = identity::Keypair::generate_ed25519();
Expand All @@ -61,13 +50,11 @@ fn new_network(cfg: NetworkConfig) -> TestNetwork {
.upgrade(upgrade::Version::V1)
.authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
.multiplex(libp2p_mplex::MplexConfig::new())
.map(|(conn_info, muxer), _| (conn_info, StreamMuxerBox::new(muxer)))
.and_then(|(peer, mplex), _| {
// Gracefully close the connection to allow protocol
// negotiation to complete.
util::CloseMuxer::new(mplex).map_ok(move |mplex| (peer, mplex))
})
.map_err(|e| BoxError(Box::new(e)))
.boxed();
TestNetwork::new(transport, local_public_key.into(), cfg)
}
Expand Down
4 changes: 4 additions & 0 deletions misc/multistream-select/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ unsigned-varint = "0.5"

[dev-dependencies]
async-std = "1.6.2"
env_logger = "*"
libp2p-core = { path = "../../core" }
libp2p-mplex = { path = "../../muxers/mplex" }
libp2p-plaintext = { path = "../../protocols/plaintext" }
quickcheck = "0.9.0"
rand = "0.7.2"
rw-stream-sink = "0.2.1"
1 change: 1 addition & 0 deletions misc/multistream-select/src/negotiated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ impl<TInner> Negotiated<TInner> {

if let Message::Header(v) = &msg {
if *v == version {
*this.state = State::Expecting { io, protocol, version };
continue
}
}
Expand Down
136 changes: 136 additions & 0 deletions misc/multistream-select/tests/transport.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use libp2p_core::{
connection::{ConnectionHandler, ConnectionHandlerEvent, Substream, SubstreamEndpoint},
identity,
muxing::StreamMuxerBox,
upgrade,
multiaddr::Protocol,
Multiaddr,
Network,
network::{NetworkEvent, NetworkConfig},
PeerId,
Transport,
transport::{self, MemoryTransport}
};
use libp2p_mplex::MplexConfig;
use libp2p_plaintext::PlainText2Config;
use futures::{channel::oneshot, ready, prelude::*};
use rand::random;
use std::{io, task::{Context, Poll}};

type TestTransport = transport::Boxed<(PeerId, StreamMuxerBox), io::Error>;
type TestNetwork = Network<TestTransport, (), (), TestHandler>;

fn mk_transport(up: upgrade::Version) -> (PeerId, TestTransport) {
let keys = identity::Keypair::generate_ed25519();
let id = keys.public().into_peer_id();
(id, MemoryTransport::default()
.upgrade(up)
.authenticate(PlainText2Config { local_public_key: keys.public() })
.multiplex(MplexConfig::default())
.boxed())
}

/// Tests the transport upgrade process with all supported
/// upgrade protocol versions.
#[test]
fn transport_upgrade() {
let _ = env_logger::try_init();

fn run(up: upgrade::Version) {
let (dialer_id, dialer_transport) = mk_transport(up);
let (listener_id, listener_transport) = mk_transport(up);

let listen_addr = Multiaddr::from(Protocol::Memory(random::<u64>()));

let mut dialer = TestNetwork::new(dialer_transport, dialer_id, NetworkConfig::default());
let mut listener = TestNetwork::new(listener_transport, listener_id, NetworkConfig::default());

listener.listen_on(listen_addr).unwrap();
let (addr_sender, addr_receiver) = oneshot::channel();

let client = async move {
let addr = addr_receiver.await.unwrap();
dialer.dial(&addr, TestHandler()).unwrap();
futures::future::poll_fn(move |cx| {
loop {
match ready!(dialer.poll(cx)) {
NetworkEvent::ConnectionEstablished { .. } => {
return Poll::Ready(())
}
_ => {}
}
}
}).await
};

let mut addr_sender = Some(addr_sender);
let server = futures::future::poll_fn(move |cx| {
loop {
match ready!(listener.poll(cx)) {
NetworkEvent::NewListenerAddress { listen_addr, .. } => {
addr_sender.take().unwrap().send(listen_addr).unwrap();
}
NetworkEvent::IncomingConnection { connection, .. } => {
listener.accept(connection, TestHandler()).unwrap();
}
NetworkEvent::ConnectionEstablished { .. } => {
return Poll::Ready(())
}
_ => {}
}
}
});

async_std::task::block_on(future::select(Box::pin(server), Box::pin(client)));
}

run(upgrade::Version::V1);
run(upgrade::Version::V1Lazy);
}

struct TestHandler();

impl ConnectionHandler for TestHandler {
type InEvent = ();
type OutEvent = ();
type Error = io::Error;
type Substream = Substream<StreamMuxerBox>;
type OutboundOpenInfo = ();

fn inject_substream(&mut self, _: Self::Substream, _: SubstreamEndpoint<Self::OutboundOpenInfo>)
{}

fn inject_event(&mut self, _: Self::InEvent)
{}

fn inject_address_change(&mut self, _: &Multiaddr)
{}

fn poll(&mut self, _: &mut Context<'_>)
-> Poll<Result<ConnectionHandlerEvent<Self::OutboundOpenInfo, Self::OutEvent>, Self::Error>>
{
Poll::Pending
// Poll::Ready(Ok(ConnectionHandlerEvent::Custom(())))
}
}

8 changes: 2 additions & 6 deletions protocols/gossipsub/tests/smoke.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,13 @@ use log::debug;
use quickcheck::{QuickCheck, TestResult};
use rand::{random, seq::SliceRandom, SeedableRng};
use std::{
io::Error,
pin::Pin,
task::{Context, Poll},
time::Duration,
};

use libp2p_core::{
identity, multiaddr::Protocol, muxing::StreamMuxerBox, transport::MemoryTransport, upgrade,
identity, multiaddr::Protocol, transport::MemoryTransport, upgrade,
Multiaddr, Transport,
};
use libp2p_gossipsub::{
Expand Down Expand Up @@ -151,10 +150,7 @@ fn build_node() -> (Multiaddr, Swarm<Gossipsub>) {
.authenticate(PlainText2Config {
local_public_key: public_key.clone(),
})
.multiplex(yamux::Config::default())
.map(|(p, m), _| (p, StreamMuxerBox::new(m)))
.map_err(|e| -> Error { panic!("Failed to create transport: {:?}", e) })
.boxed();
.multiplex(yamux::Config::default());

let peer_id = public_key.clone().into_peer_id();

Expand Down
8 changes: 2 additions & 6 deletions protocols/kad/src/behaviour/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,14 @@ use libp2p_core::{
identity,
transport::MemoryTransport,
multiaddr::{Protocol, Multiaddr, multiaddr},
muxing::StreamMuxerBox,
upgrade
};
use libp2p_noise as noise;
use libp2p_swarm::Swarm;
use libp2p_yamux as yamux;
use quickcheck::*;
use rand::{Rng, random, thread_rng, rngs::StdRng, SeedableRng};
use std::{collections::{HashSet, HashMap}, time::Duration, io, num::NonZeroUsize, u64};
use std::{collections::{HashSet, HashMap}, time::Duration, num::NonZeroUsize, u64};
use multihash::{wrap, Code, Multihash};

type TestSwarm = Swarm<Kademlia<MemoryStore>>;
Expand All @@ -62,10 +61,7 @@ fn build_node_with_config(cfg: KademliaConfig) -> (Multiaddr, TestSwarm) {
let transport = MemoryTransport::default()
.upgrade(upgrade::Version::V1)
.authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
.multiplex(yamux::Config::default())
.map(|(p, m), _| (p, StreamMuxerBox::new(m)))
.map_err(|e| -> io::Error { panic!("Failed to create transport: {:?}", e); })
.boxed();
.multiplex(yamux::Config::default());

let local_id = local_public_key.clone().into_peer_id();
let store = MemoryStore::new(local_id.clone());
Expand Down
13 changes: 4 additions & 9 deletions protocols/ping/tests/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use libp2p_core::{
PeerId,
identity,
muxing::StreamMuxerBox,
transport::{Transport, boxed::Boxed},
transport::{self, Transport},
upgrade
};
use libp2p_mplex as mplex;
Expand Down Expand Up @@ -196,16 +196,15 @@ fn max_failures() {

fn mk_transport(muxer: MuxerChoice) -> (
PeerId,
Boxed<
transport::Boxed<
(PeerId, StreamMuxerBox),
io::Error
>
) {
let id_keys = identity::Keypair::generate_ed25519();
let peer_id = id_keys.public().into_peer_id();
let noise_keys = noise::Keypair::<noise::X25519Spec>::new().into_authentic(&id_keys).unwrap();

let transport = TcpConfig::new()
(peer_id, TcpConfig::new()
.nodelay(true)
.upgrade(upgrade::Version::V1)
.authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
Expand All @@ -215,11 +214,7 @@ fn mk_transport(muxer: MuxerChoice) -> (
MuxerChoice::Mplex =>
upgrade::EitherUpgrade::B(mplex::MplexConfig::default()),
})
.map(|(peer, muxer), _| (peer, StreamMuxerBox::new(muxer)))
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
.boxed();

(peer_id, transport)
.boxed())
}

#[derive(Debug, Copy, Clone)]
Expand Down
Loading

0 comments on commit 9924059

Please sign in to comment.