Skip to content

Commit

Permalink
refactor(noise): implement tests using futures_ringbuf
Browse files Browse the repository at this point in the history
Instead of creating a connection via a TCP transport, we can use the `futures_ringbuf` crate and run the noise encryption on an in-memory transport. This removes the dependency on the `libp2p_core::upgrade::apply` function and takes less code to implement.

Related #3748.

Pull-Request: #3773.
  • Loading branch information
thomaseizinger authored May 2, 2023
1 parent c728824 commit 4ca8885
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 119 deletions.
3 changes: 1 addition & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions transports/noise/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,8 @@ snow = { version = "0.9.2", features = ["ring-resolver"], default-features = fal
snow = { version = "0.9.2", features = ["default-resolver"], default-features = false }

[dev-dependencies]
async-io = "1.13.0"
env_logger = "0.10.0"
libp2p-tcp = { workspace = true, features = ["async-io"] }
futures_ringbuf = "0.3.1"
quickcheck = { workspace = true }

# Passing arguments to the docsrs builder in order to properly document cfg's.
Expand Down
6 changes: 3 additions & 3 deletions transports/noise/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@
//! Example:
//!
//! ```
//! use libp2p_core::{identity, Transport, upgrade};
//! use libp2p_tcp::TcpTransport;
//! use libp2p_core::{Transport, upgrade, transport::MemoryTransport};
//! use libp2p_noise as noise;
//! use libp2p_identity as identity;
//!
//! # fn main() {
//! let id_keys = identity::Keypair::generate_ed25519();
//! let noise = noise::Config::new(&id_keys).unwrap();
//! let builder = TcpTransport::default().upgrade(upgrade::Version::V1).authenticate(noise);
//! let builder = MemoryTransport::default().upgrade(upgrade::Version::V1).authenticate(noise);
//! // let transport = builder.multiplex(...);
//! # }
//! ```
Expand Down
166 changes: 54 additions & 112 deletions transports/noise/tests/smoke.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,22 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use async_io::Async;
use futures::prelude::*;
use libp2p_core::transport::Transport;
use libp2p_core::upgrade::Negotiated;
use libp2p_core::{transport, upgrade};
use libp2p_core::transport::{MemoryTransport, Transport};
use libp2p_core::{upgrade, InboundUpgrade, OutboundUpgrade};
use libp2p_identity as identity;
use libp2p_identity::PeerId;
use libp2p_noise as noise;
use libp2p_tcp as tcp;
use log::info;
use quickcheck::*;
use std::{convert::TryInto, io, net::TcpStream};
use std::{convert::TryInto, io};

#[allow(dead_code)]
fn core_upgrade_compat() {
// Tests API compaibility with the libp2p-core upgrade API,
// i.e. if it compiles, the "test" is considered a success.
let id_keys = identity::Keypair::generate_ed25519();
let noise = noise::Config::new(&id_keys).unwrap();
let _ = tcp::async_io::Transport::default()
let _ = MemoryTransport::default()
.upgrade(upgrade::Version::V1)
.authenticate(noise);
}
Expand All @@ -50,119 +46,65 @@ fn xx() {
let server_id = identity::Keypair::generate_ed25519();
let client_id = identity::Keypair::generate_ed25519();

let server_id_public = server_id.public();
let client_id_public = client_id.public();

let server_transport = tcp::async_io::Transport::default()
.and_then(move |output, endpoint| {
upgrade::apply(
output,
noise::Config::new(&server_id).unwrap(),
endpoint,
upgrade::Version::V1,
)
})
.map(move |out, _| {
assert_eq!(out.0, client_id_public.to_peer_id());

out
})
.boxed();

let client_transport = tcp::async_io::Transport::default()
.and_then(move |output, endpoint| {
upgrade::apply(
output,
noise::Config::new(&client_id).unwrap(),
endpoint,
upgrade::Version::V1,
)
})
.map(move |out, _| {
assert_eq!(out.0, server_id_public.to_peer_id());
let (client, server) = futures_ringbuf::Endpoint::pair(100, 100);

futures::executor::block_on(async move {
let (
(reported_client_id, mut client_session),
(reported_server_id, mut server_session),
) = futures::future::try_join(
noise::Config::new(&server_id)
.unwrap()
.upgrade_inbound(server, b""),
noise::Config::new(&client_id)
.unwrap()
.upgrade_outbound(client, b""),
)
.await
.unwrap();

out
})
.boxed();
assert_eq!(reported_client_id, client_id.public().to_peer_id());
assert_eq!(reported_server_id, server_id.public().to_peer_id());

let client_fut = async {
for m in &messages {
let n = (m.0.len() as u64).to_be_bytes();
client_session.write_all(&n[..]).await.expect("len written");
client_session.write_all(&m.0).await.expect("no error")
}
client_session.flush().await.expect("no error");
};

let server_fut = async {
for m in &messages {
let len = {
let mut n = [0; 8];
match server_session.read_exact(&mut n).await {
Ok(()) => u64::from_be_bytes(n),
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => 0,
Err(e) => panic!("error reading len: {e}"),
}
};
info!("server: reading message ({} bytes)", len);
let mut server_buffer = vec![0; len.try_into().unwrap()];
server_session
.read_exact(&mut server_buffer)
.await
.expect("no error");
assert_eq!(server_buffer, m.0)
}
};

futures::future::join(client_fut, server_fut).await;
});

run(server_transport, client_transport, messages);
true
}
QuickCheck::new()
.max_tests(30)
.quickcheck(prop as fn(Vec<Message>) -> bool)
}

type Output = (PeerId, noise::Output<Negotiated<Async<TcpStream>>>);

fn run<I>(mut server: transport::Boxed<Output>, mut client: transport::Boxed<Output>, messages: I)
where
I: IntoIterator<Item = Message> + Clone,
{
futures::executor::block_on(async {
server
.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())
.unwrap();

let server_address = server
.next()
.await
.expect("some event")
.into_new_address()
.expect("listen address");

let outbound_msgs = messages.clone();
let client_fut = async {
let mut client_session = client
.dial(server_address.clone())
.unwrap()
.await
.map(|(_, session)| session)
.expect("no error");

for m in outbound_msgs {
let n = (m.0.len() as u64).to_be_bytes();
client_session.write_all(&n[..]).await.expect("len written");
client_session.write_all(&m.0).await.expect("no error")
}
client_session.flush().await.expect("no error");
};

let server_fut = async {
let mut server_session = server
.next()
.await
.expect("some event")
.into_incoming()
.expect("listener upgrade")
.0
.await
.map(|(_, session)| session)
.expect("no error");

for m in messages {
let len = {
let mut n = [0; 8];
match server_session.read_exact(&mut n).await {
Ok(()) => u64::from_be_bytes(n),
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => 0,
Err(e) => panic!("error reading len: {e}"),
}
};
info!("server: reading message ({} bytes)", len);
let mut server_buffer = vec![0; len.try_into().unwrap()];
server_session
.read_exact(&mut server_buffer)
.await
.expect("no error");
assert_eq!(server_buffer, m.0)
}
};

futures::future::join(server_fut, client_fut).await;
})
}

#[derive(Debug, Clone, PartialEq, Eq)]
struct Message(Vec<u8>);

Expand Down

0 comments on commit 4ca8885

Please sign in to comment.