From f2f669a0a010b65c4d6d5a7a757e324d2b38014f Mon Sep 17 00:00:00 2001 From: driftluo Date: Thu, 17 Oct 2024 17:25:17 +0800 Subject: [PATCH 1/3] feat: optimizing wasm experience --- rust-toolchain | 2 +- secio/Cargo.toml | 4 +- secio/src/codec/hmac_compat/mod.rs | 8 +-- secio/src/crypto/mod.rs | 8 +-- secio/src/dh_compat/mod.rs | 8 +-- secio/src/error.rs | 2 +- secio/src/lib.rs | 6 +-- secio/src/sha256_compat/mod.rs | 8 +-- secio/src/support.rs | 8 +-- tentacle/Cargo.toml | 14 +++--- tentacle/src/builder.rs | 4 +- tentacle/src/lib.rs | 4 +- tentacle/src/runtime/async_runtime.rs | 6 +-- tentacle/src/runtime/generic_timer.rs | 10 ++++ tentacle/src/runtime/mod.rs | 18 +++---- tentacle/src/runtime/tokio_runtime.rs | 8 ++- tentacle/src/service.rs | 30 +++++------ tentacle/src/service/config.rs | 6 +-- tentacle/src/service/future_task.rs | 3 -- tentacle/src/service/helper.rs | 8 +-- tentacle/src/traits.rs | 30 ++++------- tentacle/src/transports/browser.rs | 9 ++-- tentacle/src/transports/memory.rs | 6 +-- tentacle/src/transports/mod.rs | 18 +++---- tentacle/src/transports/tls.rs | 4 +- tentacle/src/utils.rs | 2 +- tentacle/tests/test_tls_dial.rs | 70 +++++++++++++------------- tentacle/tests/test_tls_reconnect.rs | 71 +++++++++++++-------------- yamux/Cargo.toml | 14 ++++-- yamux/src/session.rs | 15 +++--- 30 files changed, 202 insertions(+), 202 deletions(-) diff --git a/rust-toolchain b/rust-toolchain index 7c7053aa..dbd41264 100644 --- a/rust-toolchain +++ b/rust-toolchain @@ -1 +1 @@ -1.75.0 +1.81.0 diff --git a/secio/Cargo.toml b/secio/Cargo.toml index ab20856c..a9e8b6a0 100644 --- a/secio/Cargo.toml +++ b/secio/Cargo.toml @@ -34,10 +34,10 @@ rand = "0.8" openssl = "0.10.25" openssl-sys = "0.9" -[target.'cfg(not(target_arch = "wasm32"))'.dependencies] +[target.'cfg(not(target_family = "wasm"))'.dependencies] ring = "0.17" -[target.'cfg(target_arch = "wasm32")'.dependencies] +[target.'cfg(target_family = "wasm")'.dependencies] rand_core = { version = "0.6" } getrandom = { version = "0.2", features = ["js"] } sha2 = "0.10.0" diff --git a/secio/src/codec/hmac_compat/mod.rs b/secio/src/codec/hmac_compat/mod.rs index f3164184..9774e299 100644 --- a/secio/src/codec/hmac_compat/mod.rs +++ b/secio/src/codec/hmac_compat/mod.rs @@ -1,17 +1,17 @@ #[cfg(unix)] mod openssl_impl; -#[cfg(not(target_arch = "wasm32"))] +#[cfg(not(target_family = "wasm"))] #[cfg(any(test, not(unix)))] mod ring_impl; -#[cfg(any(target_arch = "wasm32", test))] +#[cfg(any(target_family = "wasm", test))] mod wasm_compat; #[cfg(unix)] pub use openssl_impl::*; -#[cfg(not(target_arch = "wasm32"))] +#[cfg(not(target_family = "wasm"))] #[cfg(not(unix))] pub use ring_impl::*; -#[cfg(target_arch = "wasm32")] +#[cfg(target_family = "wasm")] pub use wasm_compat::*; #[cfg(test)] diff --git a/secio/src/crypto/mod.rs b/secio/src/crypto/mod.rs index ce8b26ad..b8d5c26f 100644 --- a/secio/src/crypto/mod.rs +++ b/secio/src/crypto/mod.rs @@ -5,10 +5,10 @@ use bytes::BytesMut; pub mod cipher; #[cfg(unix)] mod openssl_impl; -#[cfg(not(target_arch = "wasm32"))] +#[cfg(not(target_family = "wasm"))] #[cfg(any(not(ossl110), test, not(unix)))] mod ring_impl; -#[cfg(any(target_arch = "wasm32", test))] +#[cfg(any(target_family = "wasm", test))] mod wasm_compat; /// Variant cipher which contains all possible stream ciphers @@ -66,7 +66,7 @@ pub fn new_stream(t: cipher::CipherType, key: &[u8], mode: CryptoMode) -> BoxStr /// Generate a specific Cipher with key and initialize vector #[doc(hidden)] -#[cfg(not(target_arch = "wasm32"))] +#[cfg(not(target_family = "wasm"))] #[cfg(not(unix))] pub fn new_stream(t: cipher::CipherType, key: &[u8], mode: CryptoMode) -> BoxStreamCipher { Box::new(ring_impl::RingAeadCipher::new(t, key, mode)) @@ -74,7 +74,7 @@ pub fn new_stream(t: cipher::CipherType, key: &[u8], mode: CryptoMode) -> BoxStr /// Generate a specific Cipher with key and initialize vector #[doc(hidden)] -#[cfg(target_arch = "wasm32")] +#[cfg(target_family = "wasm")] pub fn new_stream(t: cipher::CipherType, key: &[u8], _mode: CryptoMode) -> BoxStreamCipher { Box::new(wasm_compat::WasmCrypt::new(t, key)) } diff --git a/secio/src/dh_compat/mod.rs b/secio/src/dh_compat/mod.rs index ce86889b..43fad226 100644 --- a/secio/src/dh_compat/mod.rs +++ b/secio/src/dh_compat/mod.rs @@ -1,12 +1,12 @@ #[cfg(unix)] mod openssl_impl; -#[cfg(not(target_arch = "wasm32"))] +#[cfg(not(target_family = "wasm"))] #[cfg(any(test, not(unix), not(ossl110)))] mod ring_impl; -#[cfg(any(target_arch = "wasm32", test))] +#[cfg(any(target_family = "wasm", test))] mod wasm_compat; -#[cfg(target_arch = "wasm32")] +#[cfg(target_family = "wasm")] pub use wasm_compat::*; /// Possible key agreement algorithms. @@ -19,7 +19,7 @@ pub enum KeyAgreement { #[cfg(all(ossl110, unix))] pub use openssl_impl::*; -#[cfg(not(target_arch = "wasm32"))] +#[cfg(not(target_family = "wasm"))] #[cfg(not(unix))] pub use ring_impl::*; #[cfg(all(not(ossl110), unix))] diff --git a/secio/src/error.rs b/secio/src/error.rs index f74c0eb1..460d6122 100644 --- a/secio/src/error.rs +++ b/secio/src/error.rs @@ -92,7 +92,7 @@ impl From for SecioError { } } -#[cfg(not(target_arch = "wasm32"))] +#[cfg(not(target_family = "wasm"))] impl From for SecioError { fn from(_err: ring::error::Unspecified) -> SecioError { SecioError::CryptoError diff --git a/secio/src/lib.rs b/secio/src/lib.rs index a2d5db1d..d00c4118 100644 --- a/secio/src/lib.rs +++ b/secio/src/lib.rs @@ -110,11 +110,7 @@ impl Digest { } /// KeyProvider on ecdh procedure -#[cfg_attr(all(target_arch = "wasm32", feature = "async-trait"), async_trait::async_trait(?Send))] -#[cfg_attr( - all(not(target_arch = "wasm32"), feature = "async-trait"), - async_trait::async_trait -)] +#[cfg_attr(feature = "async-trait", async_trait::async_trait)] pub trait KeyProvider: std::clone::Clone + Send + Sync + 'static { /// Error type Error: Into; diff --git a/secio/src/sha256_compat/mod.rs b/secio/src/sha256_compat/mod.rs index c1ec49ea..94639476 100644 --- a/secio/src/sha256_compat/mod.rs +++ b/secio/src/sha256_compat/mod.rs @@ -1,18 +1,18 @@ #[cfg(unix)] mod openssl_impl; -#[cfg(not(target_arch = "wasm32"))] +#[cfg(not(target_family = "wasm"))] #[cfg(any(test, not(unix)))] mod ring_impl; -#[cfg(any(target_arch = "wasm32", test))] +#[cfg(any(target_family = "wasm", test))] mod wasm_compat; #[cfg(unix)] pub use openssl_impl::*; -#[cfg(not(target_arch = "wasm32"))] +#[cfg(not(target_family = "wasm"))] #[cfg(not(unix))] pub use ring_impl::*; -#[cfg(target_arch = "wasm32")] +#[cfg(target_family = "wasm")] pub use wasm_compat::*; #[cfg(test)] diff --git a/secio/src/support.rs b/secio/src/support.rs index d60ce8a5..f037132e 100644 --- a/secio/src/support.rs +++ b/secio/src/support.rs @@ -19,13 +19,13 @@ const CHACHA20_POLY1305: &str = "CHACHA20_POLY1305"; const SHA_256: &str = "SHA256"; const SHA_512: &str = "SHA512"; -#[cfg(not(target_arch = "wasm32"))] +#[cfg(not(target_family = "wasm"))] pub(crate) const DEFAULT_AGREEMENTS_PROPOSITION: &str = "P-256,P-384,X25519"; -#[cfg(target_arch = "wasm32")] +#[cfg(target_family = "wasm")] pub(crate) const DEFAULT_AGREEMENTS_PROPOSITION: &str = "X25519"; -#[cfg(not(target_arch = "wasm32"))] +#[cfg(not(target_family = "wasm"))] pub(crate) const DEFAULT_CIPHERS_PROPOSITION: &str = "AES-128-GCM,AES-256-GCM,CHACHA20_POLY1305"; -#[cfg(target_arch = "wasm32")] +#[cfg(target_family = "wasm")] pub(crate) const DEFAULT_CIPHERS_PROPOSITION: &str = "CHACHA20_POLY1305"; pub(crate) const DEFAULT_DIGESTS_PROPOSITION: &str = "SHA256,SHA512"; diff --git a/tentacle/Cargo.toml b/tentacle/Cargo.toml index 9b52b840..5d7fda85 100644 --- a/tentacle/Cargo.toml +++ b/tentacle/Cargo.toml @@ -27,11 +27,10 @@ async-trait = "0.1" log = "0.4" bytes = "1.0.0" thiserror = "1.0" -once_cell = "1.0" nohash-hasher = "0.2" parking_lot = { version = "0.12", optional = true } -tokio-tungstenite = { version = "0.21", optional = true } +tokio-tungstenite = { version = "0.24", optional = true } futures-timer = { version = "3.0.2", optional = true } async-std = { version = "1", features = ["unstable"], optional = true } async-io = { version = "1", optional = true } @@ -43,14 +42,14 @@ molecule = "0.8.0" igd = { version = "0.12", optional = true } #tls -tokio-rustls = { version = "0.24.0", optional = true } +tokio-rustls = { version = "0.26.0", optional = true } -[target.'cfg(not(target_arch = "wasm32"))'.dependencies] +[target.'cfg(not(target_family = "wasm"))'.dependencies] # rand 0.8 not support wasm32 rand = "0.8" socket2 = { version = "0.5.0", features = ["all"] } -[target.'cfg(target_arch = "wasm32")'.dependencies] +[target.'cfg(target_family = "wasm")'.dependencies] js-sys = "0.3" wasm-bindgen = "0.2" wasm-bindgen-futures = "0.4" @@ -69,12 +68,11 @@ winapi = { version = "0.3.7", features = [ [dev-dependencies] env_logger = "0.6.0" crossbeam-channel = "0.5" -systemstat = "0.1.3" +systemstat = "0.2" futures-test = "0.3.5" -rustls-pemfile = "1" [target.'cfg(unix)'.dev-dependencies] -nix = { version = "0.24.1", default-features = false, features = ["signal"] } +nix = { version = "0.29", default-features = false, features = ["signal"] } [features] default = ["tokio-runtime", "tokio-timer"] diff --git a/tentacle/src/builder.rs b/tentacle/src/builder.rs index 087e8ef8..7be24c65 100644 --- a/tentacle/src/builder.rs +++ b/tentacle/src/builder.rs @@ -148,7 +148,7 @@ where /// then an attempt is made to register the local listener port into the mapping so that it can /// receive the access request of the external network, and if the external ip of the route is not the public network, /// Then do nothing - #[cfg(all(not(target_arch = "wasm32"), feature = "upnp"))] + #[cfg(all(not(target_family = "wasm"), feature = "upnp"))] #[cfg_attr(docsrs, doc(cfg(feature = "upnp")))] pub fn upnp(mut self, enable: bool) -> Self { self.config.upnp = enable; @@ -214,7 +214,7 @@ where /// ## Note /// /// User use `listen(2)` or `connect(2)` on this closure will cause abnormal behavior - #[cfg(not(target_arch = "wasm32"))] + #[cfg(not(target_family = "wasm"))] pub fn tcp_config(mut self, f: F) -> Self where F: Fn(TcpSocket) -> Result + Send + Sync + 'static, diff --git a/tentacle/src/lib.rs b/tentacle/src/lib.rs index 88164d98..635fed45 100644 --- a/tentacle/src/lib.rs +++ b/tentacle/src/lib.rs @@ -103,7 +103,7 @@ #![deny(missing_docs)] #![cfg_attr( - target_arch = "wasm32", + target_family = "wasm", allow(dead_code, unused_variables, unused_imports) )] #![cfg_attr(docsrs, feature(doc_cfg))] @@ -151,7 +151,7 @@ mod channel; #[allow(missing_docs)] pub mod runtime; -#[cfg(all(not(target_arch = "wasm32"), feature = "upnp"))] +#[cfg(all(not(target_family = "wasm"), feature = "upnp"))] pub(crate) mod upnp; use std::{fmt, ops::AddAssign}; diff --git a/tentacle/src/runtime/async_runtime.rs b/tentacle/src/runtime/async_runtime.rs index a6a4fb6f..6c70351b 100644 --- a/tentacle/src/runtime/async_runtime.rs +++ b/tentacle/src/runtime/async_runtime.rs @@ -1,4 +1,4 @@ -#[cfg(not(target_arch = "wasm32"))] +#[cfg(not(target_family = "wasm"))] pub use async_std::task::{spawn, spawn_blocking, yield_now, JoinHandle}; pub fn block_in_place(f: F) -> R @@ -8,10 +8,10 @@ where f() } -#[cfg(not(target_arch = "wasm32"))] +#[cfg(not(target_family = "wasm"))] pub use os::*; -#[cfg(not(target_arch = "wasm32"))] +#[cfg(not(target_family = "wasm"))] mod os { use crate::{ runtime::CompatStream2, diff --git a/tentacle/src/runtime/generic_timer.rs b/tentacle/src/runtime/generic_timer.rs index 0533e1ce..1eecac59 100644 --- a/tentacle/src/runtime/generic_timer.rs +++ b/tentacle/src/runtime/generic_timer.rs @@ -20,6 +20,8 @@ impl Interval { period, } } + + pub fn set_missed_tick_behavior(&mut self, _behavior: MissedTickBehavior) {} } impl Stream for Interval { @@ -37,6 +39,14 @@ impl Stream for Interval { } } +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] +pub enum MissedTickBehavior { + #[default] + Burst, + Delay, + Skip, +} + pub fn interval(period: Duration) -> Interval { assert!(period > Duration::new(0, 0), "`period` must be non-zero."); diff --git a/tentacle/src/runtime/mod.rs b/tentacle/src/runtime/mod.rs index 8f3bca80..8348ac30 100644 --- a/tentacle/src/runtime/mod.rs +++ b/tentacle/src/runtime/mod.rs @@ -9,31 +9,31 @@ //! this module contains async timer, compat async read/write between futures and tokio, spawn on any runtime //! -#[cfg(all(not(target_arch = "wasm32"), feature = "async-runtime"))] +#[cfg(all(not(target_family = "wasm"), feature = "async-runtime"))] mod async_runtime; #[cfg(any( feature = "generic-timer", - all(target_arch = "wasm32", feature = "wasm-timer") + all(target_family = "wasm", feature = "wasm-timer") ))] mod generic_timer; -#[cfg(all(not(target_arch = "wasm32"), feature = "tokio-runtime"))] +#[cfg(all(not(target_family = "wasm"), feature = "tokio-runtime"))] mod tokio_runtime; -#[cfg(target_arch = "wasm32")] +#[cfg(target_family = "wasm")] mod wasm_runtime; -#[cfg(all(not(target_arch = "wasm32"), feature = "async-runtime"))] +#[cfg(all(not(target_family = "wasm"), feature = "async-runtime"))] pub use async_runtime::*; #[cfg(any( feature = "generic-timer", - all(target_arch = "wasm32", feature = "wasm-timer") + all(target_family = "wasm", feature = "wasm-timer") ))] pub use generic_timer::*; -#[cfg(all(not(target_arch = "wasm32"), feature = "tokio-runtime"))] +#[cfg(all(not(target_family = "wasm"), feature = "tokio-runtime"))] pub use tokio_runtime::*; -#[cfg(target_arch = "wasm32")] +#[cfg(target_family = "wasm")] pub use wasm_runtime::*; -#[cfg(all(not(target_arch = "wasm32"), feature = "tokio-runtime"))] +#[cfg(all(not(target_family = "wasm"), feature = "tokio-runtime"))] pub use tokio::io::{split, ReadHalf, WriteHalf}; #[cfg(not(feature = "tokio-runtime"))] diff --git a/tentacle/src/runtime/tokio_runtime.rs b/tentacle/src/runtime/tokio_runtime.rs index 63c5f7c0..01efbd6b 100644 --- a/tentacle/src/runtime/tokio_runtime.rs +++ b/tentacle/src/runtime/tokio_runtime.rs @@ -16,7 +16,7 @@ use tokio::net::TcpSocket as TokioTcp; #[cfg(feature = "tokio-timer")] pub use time::{interval, Interval}; #[cfg(feature = "tokio-timer")] -pub use tokio::time::{sleep as delay_for, timeout, Sleep as Delay, Timeout}; +pub use tokio::time::{sleep as delay_for, timeout, MissedTickBehavior, Sleep as Delay, Timeout}; #[cfg(feature = "tokio-timer")] mod time { @@ -26,7 +26,7 @@ mod time { task::{Context, Poll}, time::Duration, }; - use tokio::time::{interval as inner_interval, Interval as Inner}; + use tokio::time::{interval as inner_interval, Interval as Inner, MissedTickBehavior}; pub struct Interval(Inner); @@ -34,6 +34,10 @@ mod time { pub fn new(period: Duration) -> Self { Self(inner_interval(period)) } + + pub fn set_missed_tick_behavior(&mut self, behavior: MissedTickBehavior) { + self.0.set_missed_tick_behavior(behavior); + } } impl Stream for Interval { diff --git a/tentacle/src/service.rs b/tentacle/src/service.rs index 35ab88d8..e3aed33c 100644 --- a/tentacle/src/service.rs +++ b/tentacle/src/service.rs @@ -12,7 +12,7 @@ use std::{ }; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; -#[cfg(not(target_arch = "wasm32"))] +#[cfg(not(target_family = "wasm"))] use crate::service::helper::Listener; use crate::{ buffer::Buffer, @@ -69,7 +69,7 @@ struct InnerService { listens: HashSet, - #[cfg(all(not(target_arch = "wasm32"), feature = "upnp"))] + #[cfg(all(not(target_family = "wasm"), feature = "upnp"))] igd_client: Option, dial_protocols: HashMap, @@ -142,7 +142,7 @@ where let (user_handle_sender, user_handle_receiver) = mpsc::channel(config.session_config.channel_size); let shutdown = Arc::new(AtomicBool::new(false)); - #[cfg(all(not(target_arch = "wasm32"), feature = "upnp"))] + #[cfg(all(not(target_family = "wasm"), feature = "upnp"))] let igd_client = if config.upnp { crate::upnp::IgdClient::new() } else { @@ -168,10 +168,10 @@ where future_task_sender, handshake_type, multi_transport: { - #[cfg(target_arch = "wasm32")] + #[cfg(target_family = "wasm")] let transport = MultiTransport::new(config.timeout); #[allow(clippy::let_and_return)] - #[cfg(not(target_arch = "wasm32"))] + #[cfg(not(target_family = "wasm"))] let transport = MultiTransport::new(config.timeout, config.tcp_config.clone()); #[cfg(feature = "tls")] let transport = transport.tls_config(config.tls_config.clone()); @@ -181,7 +181,7 @@ where service_proto_handles: HashMap::default(), session_proto_handles: HashMap::default(), listens: HashSet::new(), - #[cfg(all(not(target_arch = "wasm32"), feature = "upnp"))] + #[cfg(all(not(target_family = "wasm"), feature = "upnp"))] igd_client, dial_protocols: HashMap::default(), state: State::new(forever), @@ -205,10 +205,10 @@ where let inner = self.inner_service.as_mut().unwrap(); let listen_future = inner.multi_transport.clone().listen(address.clone())?; - #[cfg(target_arch = "wasm32")] + #[cfg(target_family = "wasm")] unreachable!(); - #[cfg(not(target_arch = "wasm32"))] + #[cfg(not(target_family = "wasm"))] match listen_future.await { Ok((addr, incoming)) => { let listen_address = addr.clone(); @@ -288,7 +288,7 @@ impl InnerService where K: KeyProvider, { - #[cfg(not(target_arch = "wasm32"))] + #[cfg(not(all(target_family = "wasm", target_os = "unknown")))] fn spawn_listener(&mut self, incoming: MultiIncoming, listen_address: Multiaddr) { let listener = Listener { inner: incoming, @@ -314,7 +314,7 @@ where fn listen_inner(&mut self, address: Multiaddr) -> Result<()> { let listen_future = self.multi_transport.clone().listen(address.clone())?; - #[cfg(not(target_arch = "wasm32"))] + #[cfg(not(target_family = "wasm"))] { let mut sender = self.session_event_sender.clone(); let task = async move { @@ -825,7 +825,7 @@ where } /// When listen update, call here - #[cfg(not(target_arch = "wasm32"))] + #[cfg(not(target_family = "wasm"))] #[inline] async fn try_update_listens(&mut self) { #[cfg(feature = "upnp")] @@ -969,7 +969,7 @@ where })) .await; } - #[cfg(not(target_arch = "wasm32"))] + #[cfg(not(target_family = "wasm"))] SessionEvent::ListenError { address, error } => { let _ignore = self .handle_sender @@ -1016,7 +1016,7 @@ where .await; } } - #[cfg(not(target_arch = "wasm32"))] + #[cfg(not(target_family = "wasm"))] SessionEvent::ListenStart { listen_address, incoming, @@ -1186,7 +1186,7 @@ where let _ignore = self.handle_sender.send_all(&mut events).await; // clear upnp register - #[cfg(all(not(target_arch = "wasm32"), feature = "upnp"))] + #[cfg(all(not(target_family = "wasm"), feature = "upnp"))] if let Some(client) = self.igd_client.as_mut() { client.clear() }; @@ -1237,7 +1237,7 @@ where } poll_fn(crate::runtime::poll_proceed).await; - #[cfg(not(target_arch = "wasm32"))] + #[cfg(not(target_family = "wasm"))] self.try_update_listens().await; tokio::select! { Some(event) = self.session_event_receiver.next() => { diff --git a/tentacle/src/service/config.rs b/tentacle/src/service/config.rs index 8897d8cb..3e568d64 100644 --- a/tentacle/src/service/config.rs +++ b/tentacle/src/service/config.rs @@ -25,7 +25,7 @@ pub(crate) struct ServiceConfig { pub session_config: SessionConfig, pub max_frame_length: usize, pub keep_buffer: bool, - #[cfg(all(not(target_arch = "wasm32"), feature = "upnp"))] + #[cfg(all(not(target_family = "wasm"), feature = "upnp"))] pub upnp: bool, pub max_connection_number: usize, pub tcp_config: TcpConfig, @@ -40,7 +40,7 @@ impl Default for ServiceConfig { session_config: SessionConfig::default(), max_frame_length: 1024 * 1024 * 8, keep_buffer: false, - #[cfg(all(not(target_arch = "wasm32"), feature = "upnp"))] + #[cfg(all(not(target_family = "wasm"), feature = "upnp"))] upnp: false, max_connection_number: 65535, tcp_config: Default::default(), @@ -122,7 +122,7 @@ impl Default for TcpConfig { /// inbound connections. The caller is able to set socket option and explicitly /// bind the socket with a socket address. pub struct TcpSocket { - #[cfg(not(target_arch = "wasm32"))] + #[cfg(not(target_family = "wasm"))] pub(crate) inner: socket2::Socket, } diff --git a/tentacle/src/service/future_task.rs b/tentacle/src/service/future_task.rs index c367377a..456884b9 100644 --- a/tentacle/src/service/future_task.rs +++ b/tentacle/src/service/future_task.rs @@ -15,10 +15,7 @@ use std::{ }; pub(crate) type FutureTaskId = u64; -#[cfg(not(target_arch = "wasm32"))] pub(crate) type BoxedFutureTask = Pin + 'static + Send>>; -#[cfg(target_arch = "wasm32")] -pub(crate) type BoxedFutureTask = Pin + 'static>>; /// A future task manager pub(crate) struct FutureTaskManager { diff --git a/tentacle/src/service/helper.rs b/tentacle/src/service/helper.rs index 1829fbc2..e55b6f67 100644 --- a/tentacle/src/service/helper.rs +++ b/tentacle/src/service/helper.rs @@ -154,7 +154,7 @@ where } } -#[cfg(not(target_arch = "wasm32"))] +#[cfg(not(target_family = "wasm"))] pub struct Listener { pub(crate) inner: MultiIncoming, pub(crate) handshake_type: HandshakeType, @@ -165,7 +165,7 @@ pub struct Listener { pub(crate) future_task_sender: mpsc::Sender, } -#[cfg(not(target_arch = "wasm32"))] +#[cfg(not(target_family = "wasm"))] impl Listener where K: KeyProvider, @@ -221,10 +221,10 @@ where } } -#[cfg(not(target_arch = "wasm32"))] +#[cfg(not(target_family = "wasm"))] impl Unpin for Listener {} -#[cfg(not(target_arch = "wasm32"))] +#[cfg(not(target_family = "wasm"))] impl Stream for Listener where K: KeyProvider, diff --git a/tentacle/src/traits.rs b/tentacle/src/traits.rs index d39a5ae2..c6fecb4f 100644 --- a/tentacle/src/traits.rs +++ b/tentacle/src/traits.rs @@ -22,8 +22,7 @@ use crate::{ /// Mainly handle some Service-level errors thrown at runtime, such as listening errors. /// /// At the same time, the session establishment and disconnection messages will also be perceived here. -#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] -#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] +#[async_trait::async_trait] pub trait ServiceHandle: Send { /// Handling runtime errors async fn handle_error(&mut self, _control: &mut ServiceContext, _error: ServiceError) {} @@ -52,8 +51,7 @@ pub trait ServiceHandle: Send { /// The opening and closing of the protocol will create and clean up the handle exclusive /// to the session, but the service handle will remain in the state until the service is closed. /// -#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] -#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] +#[async_trait::async_trait] pub trait ServiceProtocol: Send { /// This function is called when the service start. /// @@ -77,8 +75,7 @@ pub trait ServiceProtocol: Send { } /// Session level protocol handle -#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] -#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] +#[async_trait::async_trait] pub trait SessionProtocol: Send { /// Called when opening protocol async fn connected(&mut self, _context: ProtocolContextMutRef<'_>, _version: &str) {} @@ -97,8 +94,7 @@ pub trait SessionProtocol: Send { } } -#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] -#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] +#[async_trait::async_trait] impl ServiceHandle for Box { async fn handle_error(&mut self, control: &mut ServiceContext, error: ServiceError) { (**self).handle_error(control, error).await @@ -109,8 +105,7 @@ impl ServiceHandle for Box { } } -#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] -#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] +#[async_trait::async_trait] impl ServiceHandle for Box { async fn handle_error(&mut self, control: &mut ServiceContext, error: ServiceError) { (**self).handle_error(control, error).await @@ -121,12 +116,10 @@ impl ServiceHandle for Box { } } -#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] -#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] +#[async_trait::async_trait] impl ServiceHandle for () {} -#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] -#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] +#[async_trait::async_trait] impl ServiceProtocol for Box { async fn init(&mut self, context: &mut ProtocolContext) { (**self).init(context).await @@ -154,8 +147,7 @@ impl ServiceProtocol for Box { } } -#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] -#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] +#[async_trait::async_trait] impl ServiceProtocol for Box { async fn init(&mut self, context: &mut ProtocolContext) { (**self).init(context).await @@ -183,8 +175,7 @@ impl ServiceProtocol for Box { async fn connected(&mut self, context: ProtocolContextMutRef<'_>, version: &str) { (**self).connected(context, version).await @@ -208,8 +199,7 @@ impl SessionProtocol for Box { } } -#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] -#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] +#[async_trait::async_trait] impl SessionProtocol for Box { async fn connected(&mut self, context: ProtocolContextMutRef<'_>, version: &str) { (**self).connected(context, version).await diff --git a/tentacle/src/transports/browser.rs b/tentacle/src/transports/browser.rs index ab410dc0..d2db9a47 100644 --- a/tentacle/src/transports/browser.rs +++ b/tentacle/src/transports/browser.rs @@ -111,7 +111,7 @@ impl BrowserTransport { } pub type BrowserDialFuture = - TransportFuture>>>>; + TransportFuture> + Send>>>; impl Transport for BrowserTransport { type ListenFuture = (); @@ -125,8 +125,11 @@ impl Transport for BrowserTransport { if !matches!(find_type(&address), TransportType::Ws) { return Err(TransportErrorKind::NotSupported(address)); } - let dial = connect(address, self.timeout); - Ok(TransportFuture::new(Box::pin(dial))) + let dial = crate::runtime::spawn(connect(address, self.timeout)); + + Ok(TransportFuture::new(Box::pin(async { + dial.await.expect("oneshot channel panic") + }))) } } diff --git a/tentacle/src/transports/memory.rs b/tentacle/src/transports/memory.rs index bfcf7545..b339d778 100644 --- a/tentacle/src/transports/memory.rs +++ b/tentacle/src/transports/memory.rs @@ -11,19 +11,19 @@ use futures::{ stream::{FusedStream, Stream, StreamExt}, SinkExt, }; -use once_cell::sync::Lazy; use std::{ collections::{hash_map::Entry, HashMap}, future::Future, io, num::NonZeroU64, pin::Pin, + sync::LazyLock, task::{Context, Poll}, }; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; -static MEMORY_HUB: Lazy>>> = - Lazy::new(|| Mutex::new(HashMap::default())); +static MEMORY_HUB: LazyLock>>> = + LazyLock::new(|| Mutex::new(HashMap::default())); async fn bind(address: Multiaddr) -> Result<(Multiaddr, MemoryListener)> { match parse_memory_port(&address) { diff --git a/tentacle/src/transports/mod.rs b/tentacle/src/transports/mod.rs index bbc2f74a..c70f4e78 100644 --- a/tentacle/src/transports/mod.rs +++ b/tentacle/src/transports/mod.rs @@ -10,20 +10,20 @@ use std::{ task::{Context, Poll}, }; -#[cfg(target_arch = "wasm32")] +#[cfg(target_family = "wasm")] mod browser; -#[cfg(not(target_arch = "wasm32"))] +#[cfg(not(target_family = "wasm"))] mod memory; -#[cfg(not(target_arch = "wasm32"))] +#[cfg(not(target_family = "wasm"))] mod tcp; -#[cfg(all(feature = "tls", not(target_arch = "wasm32")))] +#[cfg(all(feature = "tls", not(target_family = "wasm")))] mod tls; -#[cfg(all(feature = "ws", not(target_arch = "wasm32")))] +#[cfg(all(feature = "ws", not(target_family = "wasm")))] mod ws; -#[cfg(target_arch = "wasm32")] +#[cfg(target_family = "wasm")] pub use on_browser::*; -#[cfg(not(target_arch = "wasm32"))] +#[cfg(not(target_family = "wasm"))] pub use os::*; type Result = std::result::Result; @@ -93,7 +93,7 @@ pub fn find_type(addr: &Multiaddr) -> TransportType { .unwrap_or(TransportType::Tcp) } -#[cfg(not(target_arch = "wasm32"))] +#[cfg(not(target_family = "wasm"))] mod os { use super::*; @@ -467,7 +467,7 @@ mod os { } } -#[cfg(target_arch = "wasm32")] +#[cfg(target_family = "wasm")] mod on_browser { use super::*; diff --git a/tentacle/src/transports/tls.rs b/tentacle/src/transports/tls.rs index 5b344c79..69d34daf 100644 --- a/tentacle/src/transports/tls.rs +++ b/tentacle/src/transports/tls.rs @@ -22,7 +22,7 @@ use crate::{ use futures::channel::mpsc::{channel, Receiver, Sender}; use std::sync::Arc; use tokio::io; -use tokio_rustls::rustls::{ServerConfig, ServerName}; +use tokio_rustls::rustls::{pki_types::ServerName, ServerConfig}; use tokio_rustls::{TlsAcceptor, TlsConnector}; pub type TlsStream = Box; @@ -71,7 +71,7 @@ async fn connect( Some(socket_address) => { let stream = tcp_dial(socket_address, tcp_config, timeout).await?; - let domain_name = ServerName::try_from(domain_name.as_str()) + let domain_name = ServerName::try_from(domain_name) .map_err(|_| TransportErrorKind::TlsError("invalid dnsname".to_string()))?; let connector = TlsConnector::from(tls_client_config); Ok(( diff --git a/tentacle/src/utils.rs b/tentacle/src/utils.rs index f6dfc30f..26eb05ff 100644 --- a/tentacle/src/utils.rs +++ b/tentacle/src/utils.rs @@ -8,7 +8,7 @@ use std::{ }; /// This module create a `DnsResolver` future task to DNS resolver -#[cfg(not(target_arch = "wasm32"))] +#[cfg(not(target_family = "wasm"))] pub mod dns; /// Check if the ip address is reachable. diff --git a/tentacle/tests/test_tls_dial.rs b/tentacle/tests/test_tls_dial.rs index be6917d3..81092e40 100644 --- a/tentacle/tests/test_tls_dial.rs +++ b/tentacle/tests/test_tls_dial.rs @@ -1,6 +1,5 @@ #![cfg(feature = "tls")] use futures::channel; -use rustls_pemfile::{certs, pkcs8_private_keys, rsa_private_keys}; use std::io::BufReader; use std::str::FromStr; use std::sync::Arc; @@ -19,11 +18,15 @@ use tentacle::{ traits::{ServiceHandle, ServiceProtocol}, ProtocolId, SessionId, }; -use tokio_rustls::rustls::server::AllowAnyAuthenticatedClient; +use tokio_rustls::rustls::server::WebPkiClientVerifier; use tokio_rustls::rustls::version::{TLS12, TLS13}; use tokio_rustls::rustls::{ - Certificate, ClientConfig, PrivateKey, RootCertStore, ServerConfig, SupportedCipherSuite, - SupportedProtocolVersion, ALL_CIPHER_SUITES, + crypto::aws_lc_rs::default_provider, + crypto::aws_lc_rs::ALL_CIPHER_SUITES, + pki_types::{ + pem::PemObject, CertificateDer, PrivateKeyDer, PrivatePkcs1KeyDer, PrivatePkcs8KeyDer, + }, + ClientConfig, RootCertStore, ServerConfig, SupportedCipherSuite, SupportedProtocolVersion, }; pub fn create(meta: ProtocolMeta, shandle: F, cert_path: String) -> Service @@ -220,45 +223,43 @@ fn lookup_versions(versions: &[String]) -> Vec<&'static SupportedProtocolVersion out } -fn load_certs(filename: &str) -> Vec { +fn load_certs(filename: &str) -> Vec> { let certfile = fs::File::open(filename).expect("cannot open certificate file"); let mut reader = BufReader::new(certfile); - certs(&mut reader) + CertificateDer::pem_reader_iter(&mut reader) + .collect::, _>>() .unwrap() - .iter() - .map(|v| Certificate(v.clone())) - .collect() } -fn load_private_key(filename: &str) -> PrivateKey { +fn load_private_key(filename: &str) -> PrivateKeyDer<'static> { let keyfile = fs::File::open(filename).expect("cannot open private key file"); let mut reader = BufReader::new(keyfile); - let rsa_keys = rsa_private_keys(&mut reader).expect("file contains invalid rsa private key"); + let mut rsa_keys = PrivatePkcs1KeyDer::pem_reader_iter(&mut reader); + + let rsa_keys_peek = rsa_keys.next(); - if !rsa_keys.is_empty() { - return PrivateKey(rsa_keys[0].clone()); + if rsa_keys_peek.is_some() { + return PrivateKeyDer::Pkcs1(rsa_keys_peek.unwrap().unwrap().clone_key()); } let keyfile = fs::File::open(filename).expect("cannot open private key file"); let mut reader = BufReader::new(keyfile); - let pkcs8_keys = - pkcs8_private_keys(&mut reader).expect("file contains invalid pkcs8 private key"); + let mut pkcs8_keys = PrivatePkcs8KeyDer::pem_reader_iter(&mut reader); + let pkcs8_keys_peek = pkcs8_keys.next(); - assert!(!pkcs8_keys.is_empty()); - PrivateKey(pkcs8_keys[0].clone()) + assert!(pkcs8_keys_peek.is_some()); + PrivateKeyDer::Pkcs8(pkcs8_keys_peek.unwrap().unwrap().clone_key()) } /// Build a `ServerConfig` from our NetConfig pub fn make_server_config(config: &NetConfig) -> ServerConfig { - let server_config = ServerConfig::builder(); + let mut cryp = default_provider(); - let server_config = if config.cypher_suits.is_some() { - server_config.with_cipher_suites(&lookup_suites(config.cypher_suits.as_ref().unwrap())) - } else { - server_config.with_safe_default_cipher_suites() + if config.cypher_suits.is_some() { + cryp.cipher_suites = lookup_suites(config.cypher_suits.as_ref().unwrap()) }; - let server_config = server_config.with_safe_default_kx_groups(); + let server_config = ServerConfig::builder_with_provider(Arc::new(cryp)); let server_config = if config.protocols.is_some() { server_config @@ -272,9 +273,11 @@ pub fn make_server_config(config: &NetConfig) -> ServerConfig { let mut client_auth_roots = RootCertStore::empty(); for cacert in &cacerts { - client_auth_roots.add(cacert).unwrap(); + client_auth_roots.add(cacert.clone()).unwrap(); } - let client_auth = Arc::new(AllowAnyAuthenticatedClient::new(client_auth_roots)); + let client_auth = WebPkiClientVerifier::builder(client_auth_roots.into()) + .build() + .unwrap(); let server_config = server_config.with_client_cert_verifier(client_auth); @@ -302,15 +305,13 @@ pub fn make_server_config(config: &NetConfig) -> ServerConfig { /// Build a `ClientConfig` from our NetConfig pub fn make_client_config(config: &NetConfig) -> ClientConfig { - let client_config = ClientConfig::builder(); + let mut cryp = default_provider(); - let client_config = if config.cypher_suits.is_some() { - client_config.with_cipher_suites(&lookup_suites(config.cypher_suits.as_ref().unwrap())) - } else { - client_config.with_safe_default_cipher_suites() + if config.cypher_suits.is_some() { + cryp.cipher_suites = lookup_suites(config.cypher_suits.as_ref().unwrap()); }; - let client_config = client_config.with_safe_default_kx_groups(); + let client_config = ClientConfig::builder_with_provider(Arc::new(cryp)); let client_config = if config.protocols.is_some() { client_config @@ -322,11 +323,8 @@ pub fn make_client_config(config: &NetConfig) -> ClientConfig { let cafile = config.ca_cert.as_ref().unwrap(); - let certfile = fs::File::open(cafile).expect("Cannot open CA file"); - let mut reader = BufReader::new(certfile); - let mut client_root_cert_store = RootCertStore::empty(); - client_root_cert_store.add_parsable_certificates(&certs(&mut reader).unwrap()); + client_root_cert_store.add_parsable_certificates(load_certs(cafile)); let client_config = client_config.with_root_certificates(client_root_cert_store); @@ -351,7 +349,7 @@ pub fn make_client_config(config: &NetConfig) -> ClientConfig { certs.extend(cacerts); } - client_config.with_single_cert(certs, privkey).unwrap() + client_config.with_client_auth_cert(certs, privkey).unwrap() } else { client_config.with_no_client_auth() } diff --git a/tentacle/tests/test_tls_reconnect.rs b/tentacle/tests/test_tls_reconnect.rs index d695ef65..cbced41e 100644 --- a/tentacle/tests/test_tls_reconnect.rs +++ b/tentacle/tests/test_tls_reconnect.rs @@ -1,6 +1,5 @@ #![cfg(feature = "tls")] use crossbeam_channel::Receiver; -use rustls_pemfile::{certs, pkcs8_private_keys, rsa_private_keys}; use std::io::BufReader; use std::str::FromStr; use std::sync::Arc; @@ -18,11 +17,15 @@ use tentacle::{ traits::{ServiceHandle, ServiceProtocol}, ProtocolId, }; -use tokio_rustls::rustls::server::AllowAnyAuthenticatedClient; +use tokio_rustls::rustls::server::WebPkiClientVerifier; use tokio_rustls::rustls::version::{TLS12, TLS13}; use tokio_rustls::rustls::{ - Certificate, ClientConfig, PrivateKey, RootCertStore, ServerConfig, SupportedCipherSuite, - SupportedProtocolVersion, ALL_CIPHER_SUITES, + crypto::aws_lc_rs::default_provider, + crypto::aws_lc_rs::ALL_CIPHER_SUITES, + pki_types::{ + pem::PemObject, CertificateDer, PrivateKeyDer, PrivatePkcs1KeyDer, PrivatePkcs8KeyDer, + }, + ClientConfig, RootCertStore, ServerConfig, SupportedCipherSuite, SupportedProtocolVersion, }; pub fn create(meta: ProtocolMeta, shandle: F, cert_path: String) -> Service @@ -163,46 +166,43 @@ fn lookup_versions(versions: &[String]) -> Vec<&'static SupportedProtocolVersion out } -fn load_certs(filename: &str) -> Vec { +fn load_certs(filename: &str) -> Vec> { let certfile = fs::File::open(filename).expect("cannot open certificate file"); let mut reader = BufReader::new(certfile); - certs(&mut reader) + CertificateDer::pem_reader_iter(&mut reader) + .collect::, _>>() .unwrap() - .iter() - .map(|v| Certificate(v.clone())) - .collect() } -fn load_private_key(filename: &str) -> PrivateKey { +fn load_private_key(filename: &str) -> PrivateKeyDer<'static> { let keyfile = fs::File::open(filename).expect("cannot open private key file"); let mut reader = BufReader::new(keyfile); - let rsa_keys = rsa_private_keys(&mut reader).expect("file contains invalid rsa private key"); + let mut rsa_keys = PrivatePkcs1KeyDer::pem_reader_iter(&mut reader); - if !rsa_keys.is_empty() { - return PrivateKey(rsa_keys[0].clone()); + let rsa_keys_peek = rsa_keys.next(); + + if rsa_keys_peek.is_some() { + return PrivateKeyDer::Pkcs1(rsa_keys_peek.unwrap().unwrap().clone_key()); } let keyfile = fs::File::open(filename).expect("cannot open private key file"); let mut reader = BufReader::new(keyfile); - let pkcs8_keys = - pkcs8_private_keys(&mut reader).expect("file contains invalid pkcs8 private key"); + let mut pkcs8_keys = PrivatePkcs8KeyDer::pem_reader_iter(&mut reader); + let pkcs8_keys_peek = pkcs8_keys.next(); - assert!(!pkcs8_keys.is_empty()); - PrivateKey(pkcs8_keys[0].clone()) + assert!(pkcs8_keys_peek.is_some()); + PrivateKeyDer::Pkcs8(pkcs8_keys_peek.unwrap().unwrap().clone_key()) } /// Build a `ServerConfig` from our NetConfig pub fn make_server_config(config: &NetConfig) -> ServerConfig { - let server_config = ServerConfig::builder(); + let mut cryp = default_provider(); - let server_config = if config.cypher_suits.is_some() { - server_config.with_cipher_suites(&lookup_suites(config.cypher_suits.as_ref().unwrap())) - } else { - server_config.with_safe_default_cipher_suites() + if config.cypher_suits.is_some() { + cryp.cipher_suites = lookup_suites(config.cypher_suits.as_ref().unwrap()) }; - let server_config = server_config.with_safe_default_kx_groups(); - + let server_config = ServerConfig::builder_with_provider(Arc::new(cryp)); let server_config = if config.protocols.is_some() { server_config .with_protocol_versions(lookup_versions(config.protocols.as_ref().unwrap()).as_slice()) @@ -215,9 +215,11 @@ pub fn make_server_config(config: &NetConfig) -> ServerConfig { let mut client_auth_roots = RootCertStore::empty(); for cacert in &cacerts { - client_auth_roots.add(cacert).unwrap(); + client_auth_roots.add(cacert.clone()).unwrap(); } - let client_auth = Arc::new(AllowAnyAuthenticatedClient::new(client_auth_roots)); + let client_auth = WebPkiClientVerifier::builder(client_auth_roots.into()) + .build() + .unwrap(); let server_config = server_config.with_client_cert_verifier(client_auth); @@ -245,15 +247,13 @@ pub fn make_server_config(config: &NetConfig) -> ServerConfig { /// Build a `ClientConfig` from our NetConfig pub fn make_client_config(config: &NetConfig) -> ClientConfig { - let client_config = ClientConfig::builder(); + let mut cryp = default_provider(); - let client_config = if config.cypher_suits.is_some() { - client_config.with_cipher_suites(&lookup_suites(config.cypher_suits.as_ref().unwrap())) - } else { - client_config.with_safe_default_cipher_suites() + if config.cypher_suits.is_some() { + cryp.cipher_suites = lookup_suites(config.cypher_suits.as_ref().unwrap()); }; - let client_config = client_config.with_safe_default_kx_groups(); + let client_config = ClientConfig::builder_with_provider(Arc::new(cryp)); let client_config = if config.protocols.is_some() { client_config @@ -265,11 +265,8 @@ pub fn make_client_config(config: &NetConfig) -> ClientConfig { let cafile = config.ca_cert.as_ref().unwrap(); - let certfile = fs::File::open(cafile).expect("Cannot open CA file"); - let mut reader = BufReader::new(certfile); - let mut client_root_cert_store = RootCertStore::empty(); - client_root_cert_store.add_parsable_certificates(&certs(&mut reader).unwrap()); + client_root_cert_store.add_parsable_certificates(load_certs(cafile)); let client_config = client_config.with_root_certificates(client_root_cert_store); @@ -294,7 +291,7 @@ pub fn make_client_config(config: &NetConfig) -> ClientConfig { certs.extend(cacerts); } - client_config.with_single_cert(certs, privkey).unwrap() + client_config.with_client_auth_cert(certs, privkey).unwrap() } else { client_config.with_no_client_auth() } diff --git a/yamux/Cargo.toml b/yamux/Cargo.toml index 05776d1b..b0de65ac 100644 --- a/yamux/Cargo.toml +++ b/yamux/Cargo.toml @@ -4,7 +4,10 @@ version = "0.3.8" license = "MIT" repository = "https://github.com/nervosnetwork/tentacle" description = "Rust implementation of Yamux" -authors = ["Linfeng Qian ", "Nervos Core Dev "] +authors = [ + "Linfeng Qian ", + "Nervos Core Dev ", +] edition = "2021" [dependencies] @@ -21,8 +24,13 @@ futures-timer = { version = "3.0.2", optional = true } env_logger = "0.6" rand = "0.8" bytesize = "1" -tokio = { version = "1.0.0", features = ["time", "net", "io-util", "rt", "rt-multi-thread"] } -once_cell = "1.8.0" +tokio = { version = "1.0.0", features = [ + "time", + "net", + "io-util", + "rt", + "rt-multi-thread", +] } [features] default = ["tokio-timer"] diff --git a/yamux/src/session.rs b/yamux/src/session.rs index 95c3c8a6..2f24a35d 100644 --- a/yamux/src/session.rs +++ b/yamux/src/session.rs @@ -1,6 +1,6 @@ //! The session, can open and manage substreams -#[cfg(not(target_arch = "wasm32"))] +#[cfg(not(target_family = "wasm"))] use std::time::Instant; use std::{ collections::{hash_map::Entry, BTreeMap, HashMap, HashSet, VecDeque}, @@ -9,7 +9,7 @@ use std::{ task::{Context, Poll}, time::Duration, }; -#[cfg(target_arch = "wasm32")] +#[cfg(target_family = "wasm")] use timer::Instant; use futures::{ @@ -39,7 +39,7 @@ const TIMEOUT: Duration = Duration::from_secs(30); /// But we can simulate it with `futures-timer`. /// So, I implemented a global time dependent on `futures-timer`, /// Because in the browser environment, it is always single-threaded, so feel free to be unsafe -#[cfg(target_arch = "wasm32")] +#[cfg(target_family = "wasm")] static mut TIME: Instant = Instant::from_u64(0); /// The session @@ -731,7 +731,7 @@ mod timer { } } - #[cfg(target_arch = "wasm32")] + #[cfg(target_family = "wasm")] pub use wasm_mock::Instant; #[cfg(feature = "generic-timer")] @@ -766,7 +766,7 @@ mod timer { Poll::Ready(_) => { let dur = self.period; self.delay.reset(dur); - #[cfg(target_arch = "wasm32")] + #[cfg(target_family = "wasm")] unsafe { super::super::TIME += dur; } @@ -784,7 +784,7 @@ mod timer { } } - #[cfg(target_arch = "wasm32")] + #[cfg(target_family = "wasm")] #[allow(dead_code)] mod wasm_mock { use std::cmp::{Eq, Ord, Ordering, PartialEq, PartialOrd}; @@ -881,8 +881,7 @@ mod timer { #[cfg(test)] pub(crate) fn rt() -> &'static tokio::runtime::Runtime { - static RT: once_cell::sync::OnceCell = - once_cell::sync::OnceCell::new(); + static RT: std::sync::OnceLock = std::sync::OnceLock::new(); RT.get_or_init(|| tokio::runtime::Runtime::new().unwrap()) } From 6f44967b8ea54aeeae0b2a3592f4a4ae75087542 Mon Sep 17 00:00:00 2001 From: driftluo Date: Fri, 18 Oct 2024 10:37:27 +0800 Subject: [PATCH 2/3] feat: optimizing yamux wasm feature --- .github/workflows/ci.yaml | 7 +-- Makefile | 3 ++ secio/Cargo.toml | 3 +- secio/src/codec/secure_stream.rs | 3 +- tentacle/src/service.rs | 2 +- yamux/Cargo.toml | 3 ++ yamux/src/session.rs | 74 +++++++++++++++++++++----------- 7 files changed, 62 insertions(+), 33 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index a6eaa7c7..8c071c96 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -1,10 +1,10 @@ name: Github Action on: - pull_request: # trigger on pull requests + pull_request: # trigger on pull requests push: branches: - - master # trigger on push to master + - master # trigger on push to master jobs: test: @@ -12,7 +12,7 @@ jobs: runs-on: ${{ matrix.os }} strategy: matrix: - build: [ linux, macos, windows ] + build: [linux, macos, windows] include: - build: linux os: ubuntu-latest @@ -63,6 +63,7 @@ jobs: run: | echo "stable" > rust-toolchain rustup target add wasm32-unknown-unknown + rustup target add wasm32-wasip1 make features-check fuzz_test: diff --git a/Makefile b/Makefile index decfcc64..d7c4a8ab 100644 --- a/Makefile +++ b/Makefile @@ -47,6 +47,9 @@ features-check: # required wasm32-unknown-unknown target $(Change_Work_Path) && cargo build --features wasm-timer,unstable --no-default-features --target=wasm32-unknown-unknown $(Change_Work_Path) && cargo build --features wasm-timer,unstable,secio-async-trait --no-default-features --target=wasm32-unknown-unknown + # required wasm32-wasip1 target + $(Change_Work_Path) && cargo build --features wasm-timer,unstable --no-default-features --target=wasm32-wasip1 + $(Change_Work_Path) && cargo build --features wasm-timer,unstable,secio-async-trait --no-default-features --target=wasm32-wasip1 bench_p2p: cd bench && cargo run --release diff --git a/secio/Cargo.toml b/secio/Cargo.toml index a9e8b6a0..bc9c5751 100644 --- a/secio/Cargo.toml +++ b/secio/Cargo.toml @@ -27,7 +27,7 @@ molecule = "0.8.0" unsigned-varint = "0.8" bs58 = "0.5.0" -secp256k1 = "0.29" +secp256k1 = "0.30" rand = "0.8" [target.'cfg(unix)'.dependencies] @@ -60,7 +60,6 @@ hmac = "0.12.0" x25519-dalek = "2" chacha20poly1305 = "0.10" rand_core = { version = "0.6" } -once_cell = "1.8.0" proptest = "1" [[bench]] diff --git a/secio/src/codec/secure_stream.rs b/secio/src/codec/secure_stream.rs index cd771a1b..65149c03 100644 --- a/secio/src/codec/secure_stream.rs +++ b/secio/src/codec/secure_stream.rs @@ -241,8 +241,7 @@ mod tests { use tokio_util::codec::{length_delimited::LengthDelimitedCodec, Framed}; fn rt() -> &'static tokio::runtime::Runtime { - static RT: once_cell::sync::OnceCell = - once_cell::sync::OnceCell::new(); + static RT: std::sync::OnceLock = std::sync::OnceLock::new(); RT.get_or_init(|| tokio::runtime::Runtime::new().unwrap()) } diff --git a/tentacle/src/service.rs b/tentacle/src/service.rs index e3aed33c..43587a68 100644 --- a/tentacle/src/service.rs +++ b/tentacle/src/service.rs @@ -288,7 +288,7 @@ impl InnerService where K: KeyProvider, { - #[cfg(not(all(target_family = "wasm", target_os = "unknown")))] + #[cfg(not(target_family = "wasm"))] fn spawn_listener(&mut self, incoming: MultiIncoming, listen_address: Multiaddr) { let listener = Listener { inner: incoming, diff --git a/yamux/Cargo.toml b/yamux/Cargo.toml index b0de65ac..c9a74cb1 100644 --- a/yamux/Cargo.toml +++ b/yamux/Cargo.toml @@ -20,6 +20,9 @@ nohash-hasher = "0.2" futures-timer = { version = "3.0.2", optional = true } +[target.'cfg(all(target_family = "wasm", target_os = "unknown"))'.dependencies] +web-time = "1.1.0" + [dev-dependencies] env_logger = "0.6" rand = "0.8" diff --git a/yamux/src/session.rs b/yamux/src/session.rs index 2f24a35d..f289f131 100644 --- a/yamux/src/session.rs +++ b/yamux/src/session.rs @@ -9,8 +9,12 @@ use std::{ task::{Context, Poll}, time::Duration, }; -#[cfg(target_family = "wasm")] + +#[cfg(all(target_family = "wasm", not(target_os = "unknown")))] use timer::Instant; +/// wasm-unknown-unkown brower doesn't support time get, must use browser timer instead +#[cfg(all(target_family = "wasm", target_os = "unknown"))] +use web_time::Instant; use futures::{ channel::mpsc::{channel, unbounded, Receiver, Sender, UnboundedReceiver, UnboundedSender}, @@ -35,13 +39,6 @@ use timer::{interval, Interval}; const BUF_SHRINK_THRESHOLD: usize = u8::MAX as usize; const TIMEOUT: Duration = Duration::from_secs(30); -/// wasm doesn't support time get, must use browser timer instead -/// But we can simulate it with `futures-timer`. -/// So, I implemented a global time dependent on `futures-timer`, -/// Because in the browser environment, it is always single-threaded, so feel free to be unsafe -#[cfg(target_family = "wasm")] -static mut TIME: Instant = Instant::from_u64(0); - /// The session pub struct Session { // Framed low level raw stream @@ -109,6 +106,10 @@ pub struct Session { control_receiver: Receiver, keepalive: Option, + /// wasi use time mock to recording time changes + /// yamux's timeout statistics are session independent + #[cfg(all(target_family = "wasm", not(target_os = "unknown")))] + time_mock: std::sync::Arc, } /// Session type, client or server @@ -149,8 +150,18 @@ where raw_stream, FrameCodec::default().max_frame_size(config.max_stream_window_size), ); + #[cfg(all(target_family = "wasm", not(target_os = "unknown")))] + let time_mock = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)); let keepalive = if config.enable_keepalive { - Some(interval(config.keepalive_interval)) + #[cfg(not(all(target_family = "wasm", not(target_os = "unknown"))))] + let interval = interval(config.keepalive_interval); + + #[cfg(all(target_family = "wasm", not(target_os = "unknown")))] + let mut interval = interval(config.keepalive_interval); + #[cfg(all(target_family = "wasm", not(target_os = "unknown")))] + interval.mock_instant(time_mock.clone()); + + Some(interval) } else { None }; @@ -174,6 +185,8 @@ where control_sender, control_receiver, keepalive, + #[cfg(all(target_family = "wasm", not(target_os = "unknown")))] + time_mock, } } @@ -276,7 +289,7 @@ where if self .pings .iter() - .any(|(_id, time)| Instant::now().saturating_duration_since(*time) > TIMEOUT) + .any(|(_id, time)| ping_at.saturating_duration_since(*time) > TIMEOUT) { return Err(io::ErrorKind::TimedOut.into()); } @@ -636,7 +649,13 @@ where // Assume that remote peer has gone away and this session should be closed. self.remote_go_away = true; } else { - self.keep_alive(cx, Instant::now())?; + #[cfg(not(all(target_family = "wasm", not(target_os = "unknown"))))] + let now = Instant::now(); + #[cfg(all(target_family = "wasm", not(target_os = "unknown")))] + let now = Instant::from_u64( + self.time_mock.load(std::sync::atomic::Ordering::Acquire) as u64, + ); + self.keep_alive(cx, now)?; } } Poll::Ready(None) => { @@ -731,7 +750,7 @@ mod timer { } } - #[cfg(target_family = "wasm")] + #[cfg(all(target_family = "wasm", not(target_os = "unknown")))] pub use wasm_mock::Instant; #[cfg(feature = "generic-timer")] @@ -747,6 +766,8 @@ mod timer { pub struct Interval { delay: Delay, period: Duration, + #[cfg(all(target_family = "wasm", not(target_os = "unknown")))] + mock_instant: std::sync::Arc, } impl Interval { @@ -754,8 +775,18 @@ mod timer { Self { delay: Delay::new(period), period, + #[cfg(all(target_family = "wasm", not(target_os = "unknown")))] + mock_instant: Default::default(), } } + + #[cfg(all(target_family = "wasm", not(target_os = "unknown")))] + pub fn mock_instant( + &mut self, + mock_instant: std::sync::Arc, + ) { + self.mock_instant = mock_instant; + } } impl Stream for Interval { @@ -766,10 +797,11 @@ mod timer { Poll::Ready(_) => { let dur = self.period; self.delay.reset(dur); - #[cfg(target_family = "wasm")] - unsafe { - super::super::TIME += dur; - } + #[cfg(all(target_family = "wasm", not(target_os = "unknown")))] + self.mock_instant.fetch_add( + dur.as_millis() as usize, + std::sync::atomic::Ordering::AcqRel, + ); Poll::Ready(Some(())) } Poll::Pending => Poll::Pending, @@ -784,7 +816,7 @@ mod timer { } } - #[cfg(target_family = "wasm")] + #[cfg(all(target_family = "wasm", not(target_os = "unknown")))] #[allow(dead_code)] mod wasm_mock { use std::cmp::{Eq, Ord, Ordering, PartialEq, PartialOrd}; @@ -824,10 +856,6 @@ mod timer { Instant { inner: val } } - pub fn now() -> Instant { - unsafe { super::super::TIME } - } - pub fn duration_since(&self, earlier: Instant) -> Duration { *self - earlier } @@ -835,10 +863,6 @@ mod timer { pub fn saturating_duration_since(&self, earlier: Instant) -> Duration { *self - earlier } - - pub fn elapsed(&self) -> Duration { - Instant::now() - *self - } } impl Add for Instant { From e1a46f4d2ad421a74e9995190fca902a7878c3a3 Mon Sep 17 00:00:00 2001 From: driftluo Date: Fri, 18 Oct 2024 16:59:01 +0800 Subject: [PATCH 3/3] chore: upgrade molecule gen-type --- .github/CODEOWNERS | 2 +- Makefile | 2 +- secio/src/handshake/handshake_mol.rs | 267 +++++++++++++----- tentacle/Cargo.toml | 2 +- .../protocol_select/protocol_select_mol.rs | 126 ++++++--- tentacle/src/upnp/mod.rs | 12 +- 6 files changed, 299 insertions(+), 112 deletions(-) diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index d57cdad3..017bf4d3 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -1 +1 @@ -* @nervosnetwork/ckb-dev @driftluo @TheWaWaR +* @nervosnetwork/ckb-dev @driftluo @eval-exec diff --git a/Makefile b/Makefile index d7c4a8ab..8e892aa7 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ MOLC := moleculec -MOLC_VERSION := 0.7.0 +MOLC_VERSION := 0.8.0 MOL_FILES := \ tentacle/src/protocol_select/protocol_select.mol \ diff --git a/secio/src/handshake/handshake_mol.rs b/secio/src/handshake/handshake_mol.rs index a1a2f906..cff379ea 100644 --- a/secio/src/handshake/handshake_mol.rs +++ b/secio/src/handshake/handshake_mol.rs @@ -1,4 +1,4 @@ -// Generated by Molecule 0.7.0 +// Generated by Molecule 0.8.0 use molecule::prelude::*; #[derive(Clone)] @@ -26,14 +26,15 @@ impl ::core::fmt::Display for Secp256k1 { } impl ::core::default::Default for Secp256k1 { fn default() -> Self { - let v: Vec = vec![0, 0, 0, 0]; - Secp256k1::new_unchecked(v.into()) + let v = molecule::bytes::Bytes::from_static(&Self::DEFAULT_VALUE); + Secp256k1::new_unchecked(v) } } impl Secp256k1 { + const DEFAULT_VALUE: [u8; 4] = [0, 0, 0, 0]; pub const ITEM_SIZE: usize = 1; pub fn total_size(&self) -> usize { - molecule::NUMBER_SIZE * (self.item_count() + 1) + molecule::NUMBER_SIZE + Self::ITEM_SIZE * self.item_count() } pub fn item_count(&self) -> usize { molecule::unpack_number(self.as_slice()) as usize @@ -114,7 +115,7 @@ impl<'r> ::core::fmt::Display for Secp256k1Reader<'r> { impl<'r> Secp256k1Reader<'r> { pub const ITEM_SIZE: usize = 1; pub fn total_size(&self) -> usize { - molecule::NUMBER_SIZE * (self.item_count() + 1) + molecule::NUMBER_SIZE + Self::ITEM_SIZE * self.item_count() } pub fn item_count(&self) -> usize { molecule::unpack_number(self.as_slice()) as usize @@ -173,7 +174,7 @@ impl<'r> molecule::prelude::Reader<'r> for Secp256k1Reader<'r> { Ok(()) } } -#[derive(Debug, Default)] +#[derive(Clone, Debug, Default)] pub struct Secp256k1Builder(pub(crate) Vec); impl Secp256k1Builder { pub const ITEM_SIZE: usize = 1; @@ -181,16 +182,25 @@ impl Secp256k1Builder { self.0 = v; self } - pub fn push(mut self, v: Byte) -> Self { - self.0.push(v); + pub fn push(mut self, v: T) -> Self + where + T: ::core::convert::Into, + { + self.0.push(v.into()); self } pub fn extend>(mut self, iter: T) -> Self { - for elem in iter { - self.0.push(elem); - } + self.0.extend(iter); self } + pub fn replace(&mut self, index: usize, v: T) -> Option + where + T: ::core::convert::Into, + { + self.0 + .get_mut(index) + .map(|item| ::core::mem::replace(item, v.into())) + } } impl molecule::prelude::Builder for Secp256k1Builder { type Entity = Secp256k1; @@ -198,7 +208,7 @@ impl molecule::prelude::Builder for Secp256k1Builder { fn expected_length(&self) -> usize { molecule::NUMBER_SIZE + Self::ITEM_SIZE * self.0.len() } - fn write(&self, writer: &mut W) -> ::molecule::io::Result<()> { + fn write(&self, writer: &mut W) -> molecule::io::Result<()> { writer.write_all(&molecule::pack_number(self.0.len() as molecule::Number))?; for inner in &self.0[..] { writer.write_all(inner.as_slice())?; @@ -238,6 +248,30 @@ impl ::core::iter::IntoIterator for Secp256k1 { Secp256k1Iterator(self, 0, len) } } +impl ::core::iter::FromIterator for Secp256k1 { + fn from_iter>(iter: T) -> Self { + Self::new_builder().extend(iter).build() + } +} +impl From> for Secp256k1 { + fn from(v: Vec) -> Self { + Self::new_builder().set(v).build() + } +} +impl ::core::iter::FromIterator for Secp256k1 { + fn from_iter>(iter: T) -> Self { + Self::new_builder() + .extend(iter.into_iter().map(Into::into)) + .build() + } +} +impl From> for Secp256k1 { + fn from(v: Vec) -> Self { + Self::new_builder() + .set(v.into_iter().map(Into::into).collect()) + .build() + } +} #[derive(Clone)] pub struct Bytes(molecule::bytes::Bytes); impl ::core::fmt::LowerHex for Bytes { @@ -263,14 +297,15 @@ impl ::core::fmt::Display for Bytes { } impl ::core::default::Default for Bytes { fn default() -> Self { - let v: Vec = vec![0, 0, 0, 0]; - Bytes::new_unchecked(v.into()) + let v = molecule::bytes::Bytes::from_static(&Self::DEFAULT_VALUE); + Bytes::new_unchecked(v) } } impl Bytes { + const DEFAULT_VALUE: [u8; 4] = [0, 0, 0, 0]; pub const ITEM_SIZE: usize = 1; pub fn total_size(&self) -> usize { - molecule::NUMBER_SIZE * (self.item_count() + 1) + molecule::NUMBER_SIZE + Self::ITEM_SIZE * self.item_count() } pub fn item_count(&self) -> usize { molecule::unpack_number(self.as_slice()) as usize @@ -351,7 +386,7 @@ impl<'r> ::core::fmt::Display for BytesReader<'r> { impl<'r> BytesReader<'r> { pub const ITEM_SIZE: usize = 1; pub fn total_size(&self) -> usize { - molecule::NUMBER_SIZE * (self.item_count() + 1) + molecule::NUMBER_SIZE + Self::ITEM_SIZE * self.item_count() } pub fn item_count(&self) -> usize { molecule::unpack_number(self.as_slice()) as usize @@ -410,7 +445,7 @@ impl<'r> molecule::prelude::Reader<'r> for BytesReader<'r> { Ok(()) } } -#[derive(Debug, Default)] +#[derive(Clone, Debug, Default)] pub struct BytesBuilder(pub(crate) Vec); impl BytesBuilder { pub const ITEM_SIZE: usize = 1; @@ -418,16 +453,25 @@ impl BytesBuilder { self.0 = v; self } - pub fn push(mut self, v: Byte) -> Self { - self.0.push(v); + pub fn push(mut self, v: T) -> Self + where + T: ::core::convert::Into, + { + self.0.push(v.into()); self } pub fn extend>(mut self, iter: T) -> Self { - for elem in iter { - self.0.push(elem); - } + self.0.extend(iter); self } + pub fn replace(&mut self, index: usize, v: T) -> Option + where + T: ::core::convert::Into, + { + self.0 + .get_mut(index) + .map(|item| ::core::mem::replace(item, v.into())) + } } impl molecule::prelude::Builder for BytesBuilder { type Entity = Bytes; @@ -435,7 +479,7 @@ impl molecule::prelude::Builder for BytesBuilder { fn expected_length(&self) -> usize { molecule::NUMBER_SIZE + Self::ITEM_SIZE * self.0.len() } - fn write(&self, writer: &mut W) -> ::molecule::io::Result<()> { + fn write(&self, writer: &mut W) -> molecule::io::Result<()> { writer.write_all(&molecule::pack_number(self.0.len() as molecule::Number))?; for inner in &self.0[..] { writer.write_all(inner.as_slice())?; @@ -475,6 +519,30 @@ impl ::core::iter::IntoIterator for Bytes { BytesIterator(self, 0, len) } } +impl ::core::iter::FromIterator for Bytes { + fn from_iter>(iter: T) -> Self { + Self::new_builder().extend(iter).build() + } +} +impl From> for Bytes { + fn from(v: Vec) -> Self { + Self::new_builder().set(v).build() + } +} +impl ::core::iter::FromIterator for Bytes { + fn from_iter>(iter: T) -> Self { + Self::new_builder() + .extend(iter.into_iter().map(Into::into)) + .build() + } +} +impl From> for Bytes { + fn from(v: Vec) -> Self { + Self::new_builder() + .set(v.into_iter().map(Into::into).collect()) + .build() + } +} #[derive(Clone)] pub struct String(molecule::bytes::Bytes); impl ::core::fmt::LowerHex for String { @@ -500,14 +568,15 @@ impl ::core::fmt::Display for String { } impl ::core::default::Default for String { fn default() -> Self { - let v: Vec = vec![0, 0, 0, 0]; - String::new_unchecked(v.into()) + let v = molecule::bytes::Bytes::from_static(&Self::DEFAULT_VALUE); + String::new_unchecked(v) } } impl String { + const DEFAULT_VALUE: [u8; 4] = [0, 0, 0, 0]; pub const ITEM_SIZE: usize = 1; pub fn total_size(&self) -> usize { - molecule::NUMBER_SIZE * (self.item_count() + 1) + molecule::NUMBER_SIZE + Self::ITEM_SIZE * self.item_count() } pub fn item_count(&self) -> usize { molecule::unpack_number(self.as_slice()) as usize @@ -588,7 +657,7 @@ impl<'r> ::core::fmt::Display for StringReader<'r> { impl<'r> StringReader<'r> { pub const ITEM_SIZE: usize = 1; pub fn total_size(&self) -> usize { - molecule::NUMBER_SIZE * (self.item_count() + 1) + molecule::NUMBER_SIZE + Self::ITEM_SIZE * self.item_count() } pub fn item_count(&self) -> usize { molecule::unpack_number(self.as_slice()) as usize @@ -647,7 +716,7 @@ impl<'r> molecule::prelude::Reader<'r> for StringReader<'r> { Ok(()) } } -#[derive(Debug, Default)] +#[derive(Clone, Debug, Default)] pub struct StringBuilder(pub(crate) Vec); impl StringBuilder { pub const ITEM_SIZE: usize = 1; @@ -655,16 +724,25 @@ impl StringBuilder { self.0 = v; self } - pub fn push(mut self, v: Byte) -> Self { - self.0.push(v); + pub fn push(mut self, v: T) -> Self + where + T: ::core::convert::Into, + { + self.0.push(v.into()); self } pub fn extend>(mut self, iter: T) -> Self { - for elem in iter { - self.0.push(elem); - } + self.0.extend(iter); self } + pub fn replace(&mut self, index: usize, v: T) -> Option + where + T: ::core::convert::Into, + { + self.0 + .get_mut(index) + .map(|item| ::core::mem::replace(item, v.into())) + } } impl molecule::prelude::Builder for StringBuilder { type Entity = String; @@ -672,7 +750,7 @@ impl molecule::prelude::Builder for StringBuilder { fn expected_length(&self) -> usize { molecule::NUMBER_SIZE + Self::ITEM_SIZE * self.0.len() } - fn write(&self, writer: &mut W) -> ::molecule::io::Result<()> { + fn write(&self, writer: &mut W) -> molecule::io::Result<()> { writer.write_all(&molecule::pack_number(self.0.len() as molecule::Number))?; for inner in &self.0[..] { writer.write_all(inner.as_slice())?; @@ -712,6 +790,30 @@ impl ::core::iter::IntoIterator for String { StringIterator(self, 0, len) } } +impl ::core::iter::FromIterator for String { + fn from_iter>(iter: T) -> Self { + Self::new_builder().extend(iter).build() + } +} +impl From> for String { + fn from(v: Vec) -> Self { + Self::new_builder().set(v).build() + } +} +impl ::core::iter::FromIterator for String { + fn from_iter>(iter: T) -> Self { + Self::new_builder() + .extend(iter.into_iter().map(Into::into)) + .build() + } +} +impl From> for String { + fn from(v: Vec) -> Self { + Self::new_builder() + .set(v.into_iter().map(Into::into).collect()) + .build() + } +} #[derive(Clone)] pub struct PublicKey(molecule::bytes::Bytes); impl ::core::fmt::LowerHex for PublicKey { @@ -737,11 +839,12 @@ impl ::core::fmt::Display for PublicKey { } impl ::core::default::Default for PublicKey { fn default() -> Self { - let v: Vec = vec![0, 0, 0, 0, 0, 0, 0, 0]; - PublicKey::new_unchecked(v.into()) + let v = molecule::bytes::Bytes::from_static(&Self::DEFAULT_VALUE); + PublicKey::new_unchecked(v) } } impl PublicKey { + const DEFAULT_VALUE: [u8; 8] = [0, 0, 0, 0, 0, 0, 0, 0]; pub const ITEMS_COUNT: usize = 1; pub fn item_id(&self) -> molecule::Number { molecule::unpack_number(self.as_slice()) @@ -845,7 +948,7 @@ impl<'r> molecule::prelude::Reader<'r> for PublicKeyReader<'r> { Ok(()) } } -#[derive(Debug, Default)] +#[derive(Clone, Debug, Default)] pub struct PublicKeyBuilder(pub(crate) PublicKeyUnion); impl PublicKeyBuilder { pub const ITEMS_COUNT: usize = 1; @@ -863,7 +966,7 @@ impl molecule::prelude::Builder for PublicKeyBuilder { fn expected_length(&self) -> usize { molecule::NUMBER_SIZE + self.0.as_slice().len() } - fn write(&self, writer: &mut W) -> ::molecule::io::Result<()> { + fn write(&self, writer: &mut W) -> molecule::io::Result<()> { writer.write_all(&molecule::pack_number(self.0.item_id()))?; writer.write_all(self.0.as_slice()) } @@ -975,6 +1078,11 @@ impl<'r> PublicKeyUnionReader<'r> { } } } +impl From for PublicKey { + fn from(value: Secp256k1) -> Self { + Self::new_builder().set(value).build() + } +} #[derive(Clone)] pub struct Propose(molecule::bytes::Bytes); impl ::core::fmt::LowerHex for Propose { @@ -1008,14 +1116,15 @@ impl ::core::fmt::Display for Propose { } impl ::core::default::Default for Propose { fn default() -> Self { - let v: Vec = vec![ - 44, 0, 0, 0, 24, 0, 0, 0, 28, 0, 0, 0, 32, 0, 0, 0, 36, 0, 0, 0, 40, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - ]; - Propose::new_unchecked(v.into()) + let v = molecule::bytes::Bytes::from_static(&Self::DEFAULT_VALUE); + Propose::new_unchecked(v) } } impl Propose { + const DEFAULT_VALUE: [u8; 44] = [ + 44, 0, 0, 0, 24, 0, 0, 0, 28, 0, 0, 0, 32, 0, 0, 0, 36, 0, 0, 0, 40, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + ]; pub const FIELD_COUNT: usize = 5; pub fn total_size(&self) -> usize { molecule::unpack_number(self.as_slice()) as usize @@ -1207,9 +1316,6 @@ impl<'r> molecule::prelude::Reader<'r> for ProposeReader<'r> { if slice_len != total_size { return ve!(Self, TotalSizeNotMatch, total_size, slice_len); } - if slice_len == molecule::NUMBER_SIZE && Self::FIELD_COUNT == 0 { - return Ok(()); - } if slice_len < molecule::NUMBER_SIZE * 2 { return ve!(Self, HeaderIsBroken, molecule::NUMBER_SIZE * 2, slice_len); } @@ -1242,7 +1348,7 @@ impl<'r> molecule::prelude::Reader<'r> for ProposeReader<'r> { Ok(()) } } -#[derive(Debug, Default)] +#[derive(Clone, Debug, Default)] pub struct ProposeBuilder { pub(crate) rand: Bytes, pub(crate) pubkey: Bytes, @@ -1252,24 +1358,39 @@ pub struct ProposeBuilder { } impl ProposeBuilder { pub const FIELD_COUNT: usize = 5; - pub fn rand(mut self, v: Bytes) -> Self { - self.rand = v; + pub fn rand(mut self, v: T) -> Self + where + T: ::core::convert::Into, + { + self.rand = v.into(); self } - pub fn pubkey(mut self, v: Bytes) -> Self { - self.pubkey = v; + pub fn pubkey(mut self, v: T) -> Self + where + T: ::core::convert::Into, + { + self.pubkey = v.into(); self } - pub fn exchanges(mut self, v: String) -> Self { - self.exchanges = v; + pub fn exchanges(mut self, v: T) -> Self + where + T: ::core::convert::Into, + { + self.exchanges = v.into(); self } - pub fn ciphers(mut self, v: String) -> Self { - self.ciphers = v; + pub fn ciphers(mut self, v: T) -> Self + where + T: ::core::convert::Into, + { + self.ciphers = v.into(); self } - pub fn hashes(mut self, v: String) -> Self { - self.hashes = v; + pub fn hashes(mut self, v: T) -> Self + where + T: ::core::convert::Into, + { + self.hashes = v.into(); self } } @@ -1284,7 +1405,7 @@ impl molecule::prelude::Builder for ProposeBuilder { + self.ciphers.as_slice().len() + self.hashes.as_slice().len() } - fn write(&self, writer: &mut W) -> ::molecule::io::Result<()> { + fn write(&self, writer: &mut W) -> molecule::io::Result<()> { let mut total_size = molecule::NUMBER_SIZE * (Self::FIELD_COUNT + 1); let mut offsets = Vec::with_capacity(Self::FIELD_COUNT); offsets.push(total_size); @@ -1345,13 +1466,14 @@ impl ::core::fmt::Display for Exchange { } impl ::core::default::Default for Exchange { fn default() -> Self { - let v: Vec = vec![ - 20, 0, 0, 0, 12, 0, 0, 0, 16, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - ]; - Exchange::new_unchecked(v.into()) + let v = molecule::bytes::Bytes::from_static(&Self::DEFAULT_VALUE); + Exchange::new_unchecked(v) } } impl Exchange { + const DEFAULT_VALUE: [u8; 20] = [ + 20, 0, 0, 0, 12, 0, 0, 0, 16, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + ]; pub const FIELD_COUNT: usize = 2; pub fn total_size(&self) -> usize { molecule::unpack_number(self.as_slice()) as usize @@ -1501,9 +1623,6 @@ impl<'r> molecule::prelude::Reader<'r> for ExchangeReader<'r> { if slice_len != total_size { return ve!(Self, TotalSizeNotMatch, total_size, slice_len); } - if slice_len == molecule::NUMBER_SIZE && Self::FIELD_COUNT == 0 { - return Ok(()); - } if slice_len < molecule::NUMBER_SIZE * 2 { return ve!(Self, HeaderIsBroken, molecule::NUMBER_SIZE * 2, slice_len); } @@ -1533,19 +1652,25 @@ impl<'r> molecule::prelude::Reader<'r> for ExchangeReader<'r> { Ok(()) } } -#[derive(Debug, Default)] +#[derive(Clone, Debug, Default)] pub struct ExchangeBuilder { pub(crate) epubkey: Bytes, pub(crate) signature: Bytes, } impl ExchangeBuilder { pub const FIELD_COUNT: usize = 2; - pub fn epubkey(mut self, v: Bytes) -> Self { - self.epubkey = v; + pub fn epubkey(mut self, v: T) -> Self + where + T: ::core::convert::Into, + { + self.epubkey = v.into(); self } - pub fn signature(mut self, v: Bytes) -> Self { - self.signature = v; + pub fn signature(mut self, v: T) -> Self + where + T: ::core::convert::Into, + { + self.signature = v.into(); self } } @@ -1557,7 +1682,7 @@ impl molecule::prelude::Builder for ExchangeBuilder { + self.epubkey.as_slice().len() + self.signature.as_slice().len() } - fn write(&self, writer: &mut W) -> ::molecule::io::Result<()> { + fn write(&self, writer: &mut W) -> molecule::io::Result<()> { let mut total_size = molecule::NUMBER_SIZE * (Self::FIELD_COUNT + 1); let mut offsets = Vec::with_capacity(Self::FIELD_COUNT); offsets.push(total_size); diff --git a/tentacle/Cargo.toml b/tentacle/Cargo.toml index 5d7fda85..e54d6c66 100644 --- a/tentacle/Cargo.toml +++ b/tentacle/Cargo.toml @@ -39,7 +39,7 @@ multiaddr = { path = "../multiaddr", package = "tentacle-multiaddr", version = " molecule = "0.8.0" # upnp -igd = { version = "0.12", optional = true } +igd = { version = "0.15", optional = true, package = "igd-next" } #tls tokio-rustls = { version = "0.26.0", optional = true } diff --git a/tentacle/src/protocol_select/protocol_select_mol.rs b/tentacle/src/protocol_select/protocol_select_mol.rs index 79065e8a..b1068dbc 100644 --- a/tentacle/src/protocol_select/protocol_select_mol.rs +++ b/tentacle/src/protocol_select/protocol_select_mol.rs @@ -1,4 +1,4 @@ -// Generated by Molecule 0.7.0 +// Generated by Molecule 0.8.0 use molecule::prelude::*; #[derive(Clone)] @@ -26,14 +26,15 @@ impl ::core::fmt::Display for String { } impl ::core::default::Default for String { fn default() -> Self { - let v: Vec = vec![0, 0, 0, 0]; - String::new_unchecked(v.into()) + let v = molecule::bytes::Bytes::from_static(&Self::DEFAULT_VALUE); + String::new_unchecked(v) } } impl String { + const DEFAULT_VALUE: [u8; 4] = [0, 0, 0, 0]; pub const ITEM_SIZE: usize = 1; pub fn total_size(&self) -> usize { - molecule::NUMBER_SIZE * (self.item_count() + 1) + molecule::NUMBER_SIZE + Self::ITEM_SIZE * self.item_count() } pub fn item_count(&self) -> usize { molecule::unpack_number(self.as_slice()) as usize @@ -114,7 +115,7 @@ impl<'r> ::core::fmt::Display for StringReader<'r> { impl<'r> StringReader<'r> { pub const ITEM_SIZE: usize = 1; pub fn total_size(&self) -> usize { - molecule::NUMBER_SIZE * (self.item_count() + 1) + molecule::NUMBER_SIZE + Self::ITEM_SIZE * self.item_count() } pub fn item_count(&self) -> usize { molecule::unpack_number(self.as_slice()) as usize @@ -173,7 +174,7 @@ impl<'r> molecule::prelude::Reader<'r> for StringReader<'r> { Ok(()) } } -#[derive(Debug, Default)] +#[derive(Clone, Debug, Default)] pub struct StringBuilder(pub(crate) Vec); impl StringBuilder { pub const ITEM_SIZE: usize = 1; @@ -181,16 +182,25 @@ impl StringBuilder { self.0 = v; self } - pub fn push(mut self, v: Byte) -> Self { - self.0.push(v); + pub fn push(mut self, v: T) -> Self + where + T: ::core::convert::Into, + { + self.0.push(v.into()); self } pub fn extend>(mut self, iter: T) -> Self { - for elem in iter { - self.0.push(elem); - } + self.0.extend(iter); self } + pub fn replace(&mut self, index: usize, v: T) -> Option + where + T: ::core::convert::Into, + { + self.0 + .get_mut(index) + .map(|item| ::core::mem::replace(item, v.into())) + } } impl molecule::prelude::Builder for StringBuilder { type Entity = String; @@ -198,7 +208,7 @@ impl molecule::prelude::Builder for StringBuilder { fn expected_length(&self) -> usize { molecule::NUMBER_SIZE + Self::ITEM_SIZE * self.0.len() } - fn write(&self, writer: &mut W) -> ::molecule::io::Result<()> { + fn write(&self, writer: &mut W) -> molecule::io::Result<()> { writer.write_all(&molecule::pack_number(self.0.len() as molecule::Number))?; for inner in &self.0[..] { writer.write_all(inner.as_slice())?; @@ -238,6 +248,30 @@ impl ::core::iter::IntoIterator for String { StringIterator(self, 0, len) } } +impl ::core::iter::FromIterator for String { + fn from_iter>(iter: T) -> Self { + Self::new_builder().extend(iter).build() + } +} +impl From> for String { + fn from(v: Vec) -> Self { + Self::new_builder().set(v).build() + } +} +impl ::core::iter::FromIterator for String { + fn from_iter>(iter: T) -> Self { + Self::new_builder() + .extend(iter.into_iter().map(Into::into)) + .build() + } +} +impl From> for String { + fn from(v: Vec) -> Self { + Self::new_builder() + .set(v.into_iter().map(Into::into).collect()) + .build() + } +} #[derive(Clone)] pub struct StringVec(molecule::bytes::Bytes); impl ::core::fmt::LowerHex for StringVec { @@ -269,11 +303,12 @@ impl ::core::fmt::Display for StringVec { } impl ::core::default::Default for StringVec { fn default() -> Self { - let v: Vec = vec![4, 0, 0, 0]; - StringVec::new_unchecked(v.into()) + let v = molecule::bytes::Bytes::from_static(&Self::DEFAULT_VALUE); + StringVec::new_unchecked(v) } } impl StringVec { + const DEFAULT_VALUE: [u8; 4] = [4, 0, 0, 0]; pub fn total_size(&self) -> usize { molecule::unpack_number(self.as_slice()) as usize } @@ -460,23 +495,32 @@ impl<'r> molecule::prelude::Reader<'r> for StringVecReader<'r> { Ok(()) } } -#[derive(Debug, Default)] +#[derive(Clone, Debug, Default)] pub struct StringVecBuilder(pub(crate) Vec); impl StringVecBuilder { pub fn set(mut self, v: Vec) -> Self { self.0 = v; self } - pub fn push(mut self, v: String) -> Self { - self.0.push(v); + pub fn push(mut self, v: T) -> Self + where + T: ::core::convert::Into, + { + self.0.push(v.into()); self } pub fn extend>(mut self, iter: T) -> Self { - for elem in iter { - self.0.push(elem); - } + self.0.extend(iter); self } + pub fn replace(&mut self, index: usize, v: T) -> Option + where + T: ::core::convert::Into, + { + self.0 + .get_mut(index) + .map(|item| ::core::mem::replace(item, v.into())) + } } impl molecule::prelude::Builder for StringVecBuilder { type Entity = StringVec; @@ -489,7 +533,7 @@ impl molecule::prelude::Builder for StringVecBuilder { .map(|inner| inner.as_slice().len()) .sum::() } - fn write(&self, writer: &mut W) -> ::molecule::io::Result<()> { + fn write(&self, writer: &mut W) -> molecule::io::Result<()> { let item_count = self.0.len(); if item_count == 0 { writer.write_all(&molecule::pack_number( @@ -572,6 +616,16 @@ impl<'t: 'r, 'r> ::core::iter::ExactSizeIterator for StringVecReaderIterator<'t, self.2 - self.1 } } +impl ::core::iter::FromIterator for StringVec { + fn from_iter>(iter: T) -> Self { + Self::new_builder().extend(iter).build() + } +} +impl From> for StringVec { + fn from(v: Vec) -> Self { + Self::new_builder().set(v).build() + } +} #[derive(Clone)] pub struct ProtocolInfo(molecule::bytes::Bytes); impl ::core::fmt::LowerHex for ProtocolInfo { @@ -602,13 +656,14 @@ impl ::core::fmt::Display for ProtocolInfo { } impl ::core::default::Default for ProtocolInfo { fn default() -> Self { - let v: Vec = vec![ - 20, 0, 0, 0, 12, 0, 0, 0, 16, 0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, - ]; - ProtocolInfo::new_unchecked(v.into()) + let v = molecule::bytes::Bytes::from_static(&Self::DEFAULT_VALUE); + ProtocolInfo::new_unchecked(v) } } impl ProtocolInfo { + const DEFAULT_VALUE: [u8; 20] = [ + 20, 0, 0, 0, 12, 0, 0, 0, 16, 0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, + ]; pub const FIELD_COUNT: usize = 2; pub fn total_size(&self) -> usize { molecule::unpack_number(self.as_slice()) as usize @@ -758,9 +813,6 @@ impl<'r> molecule::prelude::Reader<'r> for ProtocolInfoReader<'r> { if slice_len != total_size { return ve!(Self, TotalSizeNotMatch, total_size, slice_len); } - if slice_len == molecule::NUMBER_SIZE && Self::FIELD_COUNT == 0 { - return Ok(()); - } if slice_len < molecule::NUMBER_SIZE * 2 { return ve!(Self, HeaderIsBroken, molecule::NUMBER_SIZE * 2, slice_len); } @@ -790,19 +842,25 @@ impl<'r> molecule::prelude::Reader<'r> for ProtocolInfoReader<'r> { Ok(()) } } -#[derive(Debug, Default)] +#[derive(Clone, Debug, Default)] pub struct ProtocolInfoBuilder { pub(crate) name: String, pub(crate) support_versions: StringVec, } impl ProtocolInfoBuilder { pub const FIELD_COUNT: usize = 2; - pub fn name(mut self, v: String) -> Self { - self.name = v; + pub fn name(mut self, v: T) -> Self + where + T: ::core::convert::Into, + { + self.name = v.into(); self } - pub fn support_versions(mut self, v: StringVec) -> Self { - self.support_versions = v; + pub fn support_versions(mut self, v: T) -> Self + where + T: ::core::convert::Into, + { + self.support_versions = v.into(); self } } @@ -814,7 +872,7 @@ impl molecule::prelude::Builder for ProtocolInfoBuilder { + self.name.as_slice().len() + self.support_versions.as_slice().len() } - fn write(&self, writer: &mut W) -> ::molecule::io::Result<()> { + fn write(&self, writer: &mut W) -> molecule::io::Result<()> { let mut total_size = molecule::NUMBER_SIZE * (Self::FIELD_COUNT + 1); let mut offsets = Vec::with_capacity(Self::FIELD_COUNT); offsets.push(total_size); diff --git a/tentacle/src/upnp/mod.rs b/tentacle/src/upnp/mod.rs index aba08a00..abcf20dd 100644 --- a/tentacle/src/upnp/mod.rs +++ b/tentacle/src/upnp/mod.rs @@ -48,7 +48,7 @@ impl IgdClient { } Ok(gateway) => { // if gateway address is public, don't need upnp, disable it - if is_reachable((*gateway.addr.ip()).into()) { + if is_reachable((gateway.addr.ip()).into()) { return None; } @@ -73,7 +73,11 @@ impl IgdClient { let state = get_local_net_state().ok().and_then(|networks| { networks.into_iter().find(|network| { - in_same_subnet(network.address, *gateway.addr.ip(), network.net_mask) + if let std::net::IpAddr::V4(ip) = gateway.addr.ip() { + in_same_subnet(network.address, ip, network.net_mask) + } else { + false + } }) })?; @@ -106,7 +110,7 @@ impl IgdClient { match self.gateway.add_port( igd::PortMappingProtocol::TCP, addr.port(), - SocketAddrV4::new(self.state.address, addr.port()), + SocketAddrV4::new(self.state.address, addr.port()).into(), 0, // forever "p2p", ) { @@ -152,7 +156,7 @@ impl IgdClient { let _ignore = self.gateway.add_port( igd::PortMappingProtocol::TCP, addr.port(), - SocketAddrV4::new(self.state.address, addr.port()), + SocketAddrV4::new(self.state.address, addr.port()).into(), 60, // 60s "p2p", );