diff --git a/multidim-interop/rust/Cargo.lock b/multidim-interop/rust/Cargo.lock index f2fa5c41c..b0058048a 100644 --- a/multidim-interop/rust/Cargo.lock +++ b/multidim-interop/rust/Cargo.lock @@ -2819,6 +2819,12 @@ dependencies = [ "webpki 0.22.0", ] +[[package]] +name = "rustversion" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5583e89e108996506031660fe09baa5011b9dd0341b89029313006d1fb508d70" + [[package]] name = "rw-stream-sink" version = "0.3.0" @@ -3105,6 +3111,28 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" +[[package]] +name = "strum" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "063e6045c0e62079840579a7e47a355ae92f60eb74daaf156fb1e84ba164e63f" +dependencies = [ + "strum_macros", +] + +[[package]] +name = "strum_macros" +version = "0.24.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e385be0d24f186b4ce2f9982191e7101bb737312ad61c1f2f984f34bcf85d59" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "rustversion", + "syn", +] + [[package]] name = "stun" version = "0.4.4" @@ -3219,6 +3247,7 @@ dependencies = [ "log", "rand 0.8.5", "redis", + "strum", "tokio", ] diff --git a/multidim-interop/rust/Cargo.toml b/multidim-interop/rust/Cargo.toml index e436cf6e0..024b8c59d 100644 --- a/multidim-interop/rust/Cargo.toml +++ b/multidim-interop/rust/Cargo.toml @@ -15,4 +15,5 @@ tokio = { version = "1.24.1", features = ["full"] } libp2pv0500 = { package = "libp2p", version = "0.50.0", features = ["websocket", "webrtc", "quic", "mplex", "yamux", "tcp", "tokio", "ping", "noise", "tls", "dns", "rsa", "macros"] } rand = "0.8.5" +strum = { version = "0.24.1", features = ["derive"] } diff --git a/multidim-interop/rust/src/bin/testplan_0500.rs b/multidim-interop/rust/src/bin/testplan_0500.rs index 08ac63551..4cd39d397 100644 --- a/multidim-interop/rust/src/bin/testplan_0500.rs +++ b/multidim-interop/rust/src/bin/testplan_0500.rs @@ -11,44 +11,42 @@ use libp2p::core::upgrade::EitherUpgrade; use libp2p::swarm::{keep_alive, NetworkBehaviour, SwarmEvent}; use libp2p::websocket::WsConfig; use libp2p::{ - core, identity, mplex, noise, ping, webrtc, yamux, Multiaddr, PeerId, Swarm, Transport, + core, identity, mplex, noise, ping, webrtc, yamux, Multiaddr, PeerId, Swarm, Transport as _, }; use libp2pv0500 as libp2p; -use testplan::{run_ping, PingSwarm}; +use testplan::{run_ping, Muxer, PingSwarm, SecProtocol, Transport}; fn build_builder( builder: core::transport::upgrade::Builder, - secure_channel_param: &str, - muxer_param: &str, + secure_channel_param: SecProtocol, + muxer_param: Muxer, local_key: &identity::Keypair, ) -> Boxed<(libp2p::PeerId, StreamMuxerBox)> where - T: Transport + Send + Unpin + 'static, + T: libp2p::Transport + Send + Unpin + 'static, ::Error: Sync + Send + 'static, ::ListenerUpgrade: Send, ::Dial: Send, C: AsyncRead + AsyncWrite + Send + Unpin + 'static, { let mux_upgrade = match muxer_param { - "yamux" => EitherUpgrade::A(yamux::YamuxConfig::default()), - "mplex" => EitherUpgrade::B(mplex::MplexConfig::default()), - _ => panic!("Unsupported muxer"), + Muxer::Yamux => EitherUpgrade::A(yamux::YamuxConfig::default()), + Muxer::Mplex => EitherUpgrade::B(mplex::MplexConfig::default()), }; let timeout = Duration::from_secs(5); match secure_channel_param { - "noise" => builder + SecProtocol::Noise => builder .authenticate(noise::NoiseAuthenticated::xx(&local_key).unwrap()) .multiplex(mux_upgrade) .timeout(timeout) .boxed(), - "tls" => builder + SecProtocol::Tls => builder .authenticate(libp2p::tls::Config::new(&local_key).unwrap()) .multiplex(mux_upgrade) .timeout(timeout) .boxed(), - _ => panic!("Unsupported secure channel"), } } @@ -57,46 +55,60 @@ async fn main() -> Result<()> { let local_key = identity::Keypair::generate_ed25519(); let local_peer_id = PeerId::from(local_key.public()); - let transport_param = - env::var("transport").context("transport environment variable is not set")?; - let secure_channel_param = - env::var("security").context("security environment variable is not set")?; - let muxer_param = env::var("muxer").context("muxer environment variable is not set")?; + let transport_param: testplan::Transport = env::var("transport") + .context("transport environment variable is not set")? + .parse() + .context("unsupported transport")?; + + let secure_channel_param: testplan::SecProtocol = env::var("security") + .context("security environment variable is not set")? + .parse() + .context("unsupported secure channel")?; + + let muxer_param: Muxer = env::var("muxer") + .context("muxer environment variable is not set")? + .parse() + .context("unsupported muxer")?; let ip = env::var("ip").context("ip environment variable is not set")?; + + let is_dialer = env::var("is_dialer") + .unwrap_or("true".into()) + .parse::()?; + let redis_addr = env::var("REDIS_ADDR") .map(|addr| format!("redis://{addr}")) .unwrap_or("redis://redis:6379".into()); let client = redis::Client::open(redis_addr).context("Could not connect to redis")?; - let (boxed_transport, local_addr) = match transport_param.as_str() { - "quic-v1" => { + let (boxed_transport, local_addr) = match transport_param { + Transport::QuicV1 => { let builder = libp2p::quic::tokio::Transport::new(libp2p::quic::Config::new(&local_key)) .map(|(p, c), _| (p, StreamMuxerBox::new(c))); (builder.boxed(), format!("/ip4/{ip}/udp/0/quic-v1")) } - "tcp" => { + Transport::Tcp => { let builder = libp2p::tcp::tokio::Transport::new(libp2p::tcp::Config::new()) .upgrade(libp2p::core::upgrade::Version::V1Lazy); ( - build_builder(builder, &secure_channel_param, &muxer_param, &local_key), + build_builder(builder, secure_channel_param, muxer_param, &local_key), format!("/ip4/{ip}/tcp/0"), ) } - "ws" => { + Transport::Ws => { let builder = WsConfig::new(libp2p::tcp::tokio::Transport::new( libp2p::tcp::Config::new(), )) .upgrade(libp2p::core::upgrade::Version::V1Lazy); ( - build_builder(builder, &secure_channel_param, &muxer_param, &local_key), + build_builder(builder, secure_channel_param, muxer_param, &local_key), format!("/ip4/{ip}/tcp/0/ws"), ) } - "webrtc" => ( + Transport::Webrtc => ( webrtc::tokio::Transport::new( local_key, webrtc::tokio::Certificate::generate(&mut rand::thread_rng())?, @@ -105,7 +117,6 @@ async fn main() -> Result<()> { .boxed(), format!("/ip4/{ip}/udp/0/webrtc"), ), - _ => panic!("Unsupported"), }; let swarm = OrphanRuleWorkaround(Swarm::with_tokio_executor( @@ -119,7 +130,7 @@ async fn main() -> Result<()> { // Use peer id as a String so that `run_ping` does not depend on a specific libp2p version. let local_peer_id = local_peer_id.to_string(); - run_ping(client, swarm, &local_addr, &local_peer_id).await?; + run_ping(client, swarm, &local_addr, &local_peer_id, is_dialer).await?; Ok(()) } diff --git a/multidim-interop/rust/src/lib.rs b/multidim-interop/rust/src/lib.rs index 3579c141e..d70908b10 100644 --- a/multidim-interop/rust/src/lib.rs +++ b/multidim-interop/rust/src/lib.rs @@ -1,13 +1,40 @@ -use std::env; use std::time::Duration; use anyhow::{Context, Result}; use env_logger::Env; use log::info; use redis::{AsyncCommands, Client as Rclient}; +use strum::EnumString; const REDIS_TIMEOUT: usize = 10; +/// Supported transports by rust-libp2p. +#[derive(Clone, Debug, EnumString)] +#[strum(serialize_all = "kebab-case")] +pub enum Transport { + Tcp, + QuicV1, + Webrtc, + Ws, +} + +/// Supported stream multiplexers by rust-libp2p. +#[derive(Clone, Debug, EnumString)] +#[strum(serialize_all = "kebab-case")] +pub enum Muxer { + Mplex, + Yamux, +} + +/// Supported security protocols by rust-libp2p. +#[derive(Clone, Debug, EnumString)] +#[strum(serialize_all = "kebab-case")] +pub enum SecProtocol { + Noise, + Tls, +} + +/// PingSwarm allows us to abstract over libp2p versions for `run_ping`. #[async_trait::async_trait] pub trait PingSwarm: Sized + Send + 'static { async fn listen_on(&mut self, address: &str) -> Result; @@ -23,11 +50,15 @@ pub trait PingSwarm: Sized + Send + 'static { fn local_peer_id(&self) -> String; } +/// Run a ping interop test. Based on `is_dialer`, either dial the address +/// retrieved via `listenAddr` key over the redis connection. Or wait to be pinged and have +/// `dialerDone` key ready on the redis connection. pub async fn run_ping( client: Rclient, mut swarm: S, local_addr: &str, local_peer_id: &str, + is_dialer: bool, ) -> Result<()> where S: PingSwarm, @@ -37,10 +68,6 @@ where info!("Running ping test: {}", swarm.local_peer_id()); env_logger::Builder::from_env(Env::default().default_filter_or("info")).init(); - let is_dialer = env::var("is_dialer") - .unwrap_or("true".into()) - .parse::()?; - info!( "Test instance, listening for incoming connections on: {:?}.", local_addr