From 6f44967b8ea54aeeae0b2a3592f4a4ae75087542 Mon Sep 17 00:00:00 2001 From: driftluo Date: Fri, 18 Oct 2024 10:37:27 +0800 Subject: [PATCH] feat: optimizing yamux wasm feature --- .github/workflows/ci.yaml | 7 +-- Makefile | 3 ++ secio/Cargo.toml | 3 +- secio/src/codec/secure_stream.rs | 3 +- tentacle/src/service.rs | 2 +- yamux/Cargo.toml | 3 ++ yamux/src/session.rs | 74 +++++++++++++++++++++----------- 7 files changed, 62 insertions(+), 33 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index a6eaa7c7..8c071c96 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -1,10 +1,10 @@ name: Github Action on: - pull_request: # trigger on pull requests + pull_request: # trigger on pull requests push: branches: - - master # trigger on push to master + - master # trigger on push to master jobs: test: @@ -12,7 +12,7 @@ jobs: runs-on: ${{ matrix.os }} strategy: matrix: - build: [ linux, macos, windows ] + build: [linux, macos, windows] include: - build: linux os: ubuntu-latest @@ -63,6 +63,7 @@ jobs: run: | echo "stable" > rust-toolchain rustup target add wasm32-unknown-unknown + rustup target add wasm32-wasip1 make features-check fuzz_test: diff --git a/Makefile b/Makefile index decfcc64..d7c4a8ab 100644 --- a/Makefile +++ b/Makefile @@ -47,6 +47,9 @@ features-check: # required wasm32-unknown-unknown target $(Change_Work_Path) && cargo build --features wasm-timer,unstable --no-default-features --target=wasm32-unknown-unknown $(Change_Work_Path) && cargo build --features wasm-timer,unstable,secio-async-trait --no-default-features --target=wasm32-unknown-unknown + # required wasm32-wasip1 target + $(Change_Work_Path) && cargo build --features wasm-timer,unstable --no-default-features --target=wasm32-wasip1 + $(Change_Work_Path) && cargo build --features wasm-timer,unstable,secio-async-trait --no-default-features --target=wasm32-wasip1 bench_p2p: cd bench && cargo run --release diff --git a/secio/Cargo.toml b/secio/Cargo.toml index a9e8b6a0..bc9c5751 100644 --- a/secio/Cargo.toml +++ b/secio/Cargo.toml @@ -27,7 +27,7 @@ molecule = "0.8.0" unsigned-varint = "0.8" bs58 = "0.5.0" -secp256k1 = "0.29" +secp256k1 = "0.30" rand = "0.8" [target.'cfg(unix)'.dependencies] @@ -60,7 +60,6 @@ hmac = "0.12.0" x25519-dalek = "2" chacha20poly1305 = "0.10" rand_core = { version = "0.6" } -once_cell = "1.8.0" proptest = "1" [[bench]] diff --git a/secio/src/codec/secure_stream.rs b/secio/src/codec/secure_stream.rs index cd771a1b..65149c03 100644 --- a/secio/src/codec/secure_stream.rs +++ b/secio/src/codec/secure_stream.rs @@ -241,8 +241,7 @@ mod tests { use tokio_util::codec::{length_delimited::LengthDelimitedCodec, Framed}; fn rt() -> &'static tokio::runtime::Runtime { - static RT: once_cell::sync::OnceCell = - once_cell::sync::OnceCell::new(); + static RT: std::sync::OnceLock = std::sync::OnceLock::new(); RT.get_or_init(|| tokio::runtime::Runtime::new().unwrap()) } diff --git a/tentacle/src/service.rs b/tentacle/src/service.rs index e3aed33c..43587a68 100644 --- a/tentacle/src/service.rs +++ b/tentacle/src/service.rs @@ -288,7 +288,7 @@ impl InnerService where K: KeyProvider, { - #[cfg(not(all(target_family = "wasm", target_os = "unknown")))] + #[cfg(not(target_family = "wasm"))] fn spawn_listener(&mut self, incoming: MultiIncoming, listen_address: Multiaddr) { let listener = Listener { inner: incoming, diff --git a/yamux/Cargo.toml b/yamux/Cargo.toml index b0de65ac..c9a74cb1 100644 --- a/yamux/Cargo.toml +++ b/yamux/Cargo.toml @@ -20,6 +20,9 @@ nohash-hasher = "0.2" futures-timer = { version = "3.0.2", optional = true } +[target.'cfg(all(target_family = "wasm", target_os = "unknown"))'.dependencies] +web-time = "1.1.0" + [dev-dependencies] env_logger = "0.6" rand = "0.8" diff --git a/yamux/src/session.rs b/yamux/src/session.rs index 2f24a35d..f289f131 100644 --- a/yamux/src/session.rs +++ b/yamux/src/session.rs @@ -9,8 +9,12 @@ use std::{ task::{Context, Poll}, time::Duration, }; -#[cfg(target_family = "wasm")] + +#[cfg(all(target_family = "wasm", not(target_os = "unknown")))] use timer::Instant; +/// wasm-unknown-unkown brower doesn't support time get, must use browser timer instead +#[cfg(all(target_family = "wasm", target_os = "unknown"))] +use web_time::Instant; use futures::{ channel::mpsc::{channel, unbounded, Receiver, Sender, UnboundedReceiver, UnboundedSender}, @@ -35,13 +39,6 @@ use timer::{interval, Interval}; const BUF_SHRINK_THRESHOLD: usize = u8::MAX as usize; const TIMEOUT: Duration = Duration::from_secs(30); -/// wasm doesn't support time get, must use browser timer instead -/// But we can simulate it with `futures-timer`. -/// So, I implemented a global time dependent on `futures-timer`, -/// Because in the browser environment, it is always single-threaded, so feel free to be unsafe -#[cfg(target_family = "wasm")] -static mut TIME: Instant = Instant::from_u64(0); - /// The session pub struct Session { // Framed low level raw stream @@ -109,6 +106,10 @@ pub struct Session { control_receiver: Receiver, keepalive: Option, + /// wasi use time mock to recording time changes + /// yamux's timeout statistics are session independent + #[cfg(all(target_family = "wasm", not(target_os = "unknown")))] + time_mock: std::sync::Arc, } /// Session type, client or server @@ -149,8 +150,18 @@ where raw_stream, FrameCodec::default().max_frame_size(config.max_stream_window_size), ); + #[cfg(all(target_family = "wasm", not(target_os = "unknown")))] + let time_mock = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)); let keepalive = if config.enable_keepalive { - Some(interval(config.keepalive_interval)) + #[cfg(not(all(target_family = "wasm", not(target_os = "unknown"))))] + let interval = interval(config.keepalive_interval); + + #[cfg(all(target_family = "wasm", not(target_os = "unknown")))] + let mut interval = interval(config.keepalive_interval); + #[cfg(all(target_family = "wasm", not(target_os = "unknown")))] + interval.mock_instant(time_mock.clone()); + + Some(interval) } else { None }; @@ -174,6 +185,8 @@ where control_sender, control_receiver, keepalive, + #[cfg(all(target_family = "wasm", not(target_os = "unknown")))] + time_mock, } } @@ -276,7 +289,7 @@ where if self .pings .iter() - .any(|(_id, time)| Instant::now().saturating_duration_since(*time) > TIMEOUT) + .any(|(_id, time)| ping_at.saturating_duration_since(*time) > TIMEOUT) { return Err(io::ErrorKind::TimedOut.into()); } @@ -636,7 +649,13 @@ where // Assume that remote peer has gone away and this session should be closed. self.remote_go_away = true; } else { - self.keep_alive(cx, Instant::now())?; + #[cfg(not(all(target_family = "wasm", not(target_os = "unknown"))))] + let now = Instant::now(); + #[cfg(all(target_family = "wasm", not(target_os = "unknown")))] + let now = Instant::from_u64( + self.time_mock.load(std::sync::atomic::Ordering::Acquire) as u64, + ); + self.keep_alive(cx, now)?; } } Poll::Ready(None) => { @@ -731,7 +750,7 @@ mod timer { } } - #[cfg(target_family = "wasm")] + #[cfg(all(target_family = "wasm", not(target_os = "unknown")))] pub use wasm_mock::Instant; #[cfg(feature = "generic-timer")] @@ -747,6 +766,8 @@ mod timer { pub struct Interval { delay: Delay, period: Duration, + #[cfg(all(target_family = "wasm", not(target_os = "unknown")))] + mock_instant: std::sync::Arc, } impl Interval { @@ -754,8 +775,18 @@ mod timer { Self { delay: Delay::new(period), period, + #[cfg(all(target_family = "wasm", not(target_os = "unknown")))] + mock_instant: Default::default(), } } + + #[cfg(all(target_family = "wasm", not(target_os = "unknown")))] + pub fn mock_instant( + &mut self, + mock_instant: std::sync::Arc, + ) { + self.mock_instant = mock_instant; + } } impl Stream for Interval { @@ -766,10 +797,11 @@ mod timer { Poll::Ready(_) => { let dur = self.period; self.delay.reset(dur); - #[cfg(target_family = "wasm")] - unsafe { - super::super::TIME += dur; - } + #[cfg(all(target_family = "wasm", not(target_os = "unknown")))] + self.mock_instant.fetch_add( + dur.as_millis() as usize, + std::sync::atomic::Ordering::AcqRel, + ); Poll::Ready(Some(())) } Poll::Pending => Poll::Pending, @@ -784,7 +816,7 @@ mod timer { } } - #[cfg(target_family = "wasm")] + #[cfg(all(target_family = "wasm", not(target_os = "unknown")))] #[allow(dead_code)] mod wasm_mock { use std::cmp::{Eq, Ord, Ordering, PartialEq, PartialOrd}; @@ -824,10 +856,6 @@ mod timer { Instant { inner: val } } - pub fn now() -> Instant { - unsafe { super::super::TIME } - } - pub fn duration_since(&self, earlier: Instant) -> Duration { *self - earlier } @@ -835,10 +863,6 @@ mod timer { pub fn saturating_duration_since(&self, earlier: Instant) -> Duration { *self - earlier } - - pub fn elapsed(&self) -> Duration { - Instant::now() - *self - } } impl Add for Instant {