diff --git a/CHANGELOG.md b/CHANGELOG.md index 1cf9f7c9559..cde9b2e6ba0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,10 +31,11 @@ - [`libp2p-pnet` CHANGELOG](transports/pnet/CHANGELOG.md) - [`libp2p-quic` CHANGELOG](transports/quic/CHANGELOG.md) - [`libp2p-tcp` CHANGELOG](transports/tcp/CHANGELOG.md) +- [`libp2p-tls` CHANGELOG](transports/tls/CHANGELOG.md) - [`libp2p-uds` CHANGELOG](transports/uds/CHANGELOG.md) - [`libp2p-wasm-ext` CHANGELOG](transports/wasm-ext/CHANGELOG.md) - [`libp2p-websocket` CHANGELOG](transports/websocket/CHANGELOG.md) -- [`libp2p-tls` CHANGELOG](transports/tls/CHANGELOG.md) +- [`libp2p-websys-websocket` CHANGELOG](transports/websys-websocket/CHANGELOG.md) ## Multiplexers diff --git a/Cargo.lock b/Cargo.lock index 978a40d6140..e61f436fe84 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1666,7 +1666,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" dependencies = [ "gloo-timers", - "send_wrapper", + "send_wrapper 0.4.0", ] [[package]] @@ -2315,6 +2315,7 @@ dependencies = [ "libp2p-wasm-ext", "libp2p-webrtc", "libp2p-websocket", + "libp2p-websys-websocket", "libp2p-yamux", "multiaddr", "pin-project", @@ -3076,6 +3077,23 @@ dependencies = [ "webpki-roots 0.23.0", ] +[[package]] +name = "libp2p-websys-websocket" +version = "0.1.0" +dependencies = [ + "futures", + "js-sys", + "libp2p-core", + "libp2p-identity", + "libp2p-noise", + "libp2p-yamux", + "parking_lot 0.12.1", + "send_wrapper 0.6.0", + "thiserror", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "libp2p-yamux" version = "0.44.0" @@ -4484,6 +4502,12 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f638d531eccd6e23b980caf34876660d38e265409d8e99b397ab71eb3612fad0" +[[package]] +name = "send_wrapper" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd0b0ec5f1c1ca621c432a25813d8d60c88abe6d3e08a3eb9cf37d97a0fe3d73" + [[package]] name = "serde" version = "1.0.160" @@ -5349,9 +5373,9 @@ dependencies = [ [[package]] name = "web-sys" -version = "0.3.60" +version = "0.3.61" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bcda906d8be16e728fd5adc5b729afad4e444e106ab28cd1c7256e54fa61510f" +checksum = "e33b99f4b23ba3eec1a53ac264e35a755f00e966e0065077d6027c0f575b0b97" dependencies = [ "js-sys", "wasm-bindgen", diff --git a/Cargo.toml b/Cargo.toml index 8400238368d..254f842664f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,6 +53,7 @@ members = [ "transports/wasm-ext", "transports/webrtc", "transports/websocket", + "transports/websys-websocket", ] resolver = "2" diff --git a/libp2p/CHANGELOG.md b/libp2p/CHANGELOG.md index 5581fe7921d..ff302134a00 100644 --- a/libp2p/CHANGELOG.md +++ b/libp2p/CHANGELOG.md @@ -8,10 +8,15 @@ ## 0.51.3 - Deprecate the `mplex` feature. -The recommended baseline stream multiplexer is `yamux`. -See [PR 3689]. + The recommended baseline stream multiplexer is `yamux`. + See [PR 3689]. + +- Introduce `libp2p::websys_websocket` module behind `websys-websocket` feature flag. + This supersedes the existing `libp2p::wasm_ext` module which is now deprecated. + See [PR 3679]. [PR 3689]: https://github.com/libp2p/rust-libp2p/pull/3689 +[PR 3679]: https://github.com/libp2p/rust-libp2p/pull/3679 ## 0.51.2 diff --git a/libp2p/Cargo.toml b/libp2p/Cargo.toml index b4fa79d6343..d1d756a3c50 100644 --- a/libp2p/Cargo.toml +++ b/libp2p/Cargo.toml @@ -46,6 +46,7 @@ full = [ "wasm-bindgen", "wasm-ext", "wasm-ext-websocket", + "websys-websocket", "webrtc", "websocket", "yamux", @@ -85,6 +86,7 @@ uds = ["dep:libp2p-uds"] wasm-bindgen = ["futures-timer/wasm-bindgen", "instant/wasm-bindgen", "getrandom/js", "libp2p-swarm/wasm-bindgen"] wasm-ext = ["dep:libp2p-wasm-ext"] wasm-ext-websocket = ["wasm-ext", "libp2p-wasm-ext?/websocket"] +websys-websocket = ["dep:libp2p-websys-websocket"] webrtc = ["dep:libp2p-webrtc", "libp2p-webrtc?/pem"] websocket = ["dep:libp2p-websocket"] yamux = ["dep:libp2p-yamux"] @@ -116,8 +118,8 @@ libp2p-rendezvous = { workspace = true, optional = true } libp2p-request-response = { workspace = true, optional = true } libp2p-swarm = { workspace = true } libp2p-wasm-ext = { workspace = true, optional = true } +libp2p-websys-websocket = { version = "0.1.0", path = "../transports/websys-websocket", optional = true } libp2p-yamux = { workspace = true, optional = true } - multiaddr = { version = "0.17.0" } pin-project = "1.0.0" diff --git a/libp2p/src/lib.rs b/libp2p/src/lib.rs index fad08b0128c..5f0e852a5da 100644 --- a/libp2p/src/lib.rs +++ b/libp2p/src/lib.rs @@ -141,8 +141,16 @@ pub use libp2p_tls as tls; #[doc(inline)] pub use libp2p_uds as uds; #[cfg(feature = "wasm-ext")] +#[deprecated( + note = "`wasm-ext` is deprecated and will be removed in favor of `libp2p-websys-websocket`." +)] +pub mod wasm_ext { + #[doc(inline)] + pub use libp2p_wasm_ext::*; +} +#[cfg(feature = "websys-websocket")] #[doc(inline)] -pub use libp2p_wasm_ext as wasm_ext; +pub use libp2p_websys_websocket as websys_websocket; #[cfg(feature = "webrtc")] #[cfg_attr(docsrs, doc(cfg(feature = "webrtc")))] #[cfg(not(target_arch = "wasm32"))] diff --git a/transports/websys-websocket/CHANGELOG.md b/transports/websys-websocket/CHANGELOG.md new file mode 100644 index 00000000000..cc8907fb086 --- /dev/null +++ b/transports/websys-websocket/CHANGELOG.md @@ -0,0 +1,3 @@ +# 0.1.0 - unreleased + +- Add Websys Websocket transport. diff --git a/transports/websys-websocket/Cargo.toml b/transports/websys-websocket/Cargo.toml new file mode 100644 index 00000000000..d85ca341aae --- /dev/null +++ b/transports/websys-websocket/Cargo.toml @@ -0,0 +1,33 @@ +[package] +name = "libp2p-websys-websocket" +edition = "2021" +rust-version = "1.60.0" +description = "libp2p websocket transports for WASM browser runtime." +version = "0.1.0" +authors = ["Vince Vasta "] +license = "MIT" +repository = "https://github.com/libp2p/rust-libp2p" +keywords = ["peer-to-peer", "libp2p", "networking"] +categories = ["network-programming", "asynchronous"] + +[dependencies] +futures = "0.3.26" +js-sys = "0.3.61" +libp2p-core = { version = "0.39", path = "../../core" } +parking_lot = "0.12.1" +send_wrapper = "0.6.0" +thiserror = "1.0.38" +wasm-bindgen = "0.2.84" +web-sys = { version = "0.3.61", features = ["BinaryType", "CloseEvent", "MessageEvent", "WebSocket"] } + +[dev-dependencies] +libp2p-identity = { path = "../../identity" } +libp2p-noise = { path = "../noise" } +libp2p-yamux = { path = "../../muxers/yamux" } + +# Passing arguments to the docsrs builder in order to properly document cfg's. +# More information: https://docs.rs/about/builds#cross-compiling +[package.metadata.docs.rs] +all-features = true +rustdoc-args = ["--cfg", "docsrs"] +rustc-args = ["--cfg", "docsrs"] diff --git a/transports/websys-websocket/src/lib.rs b/transports/websys-websocket/src/lib.rs new file mode 100644 index 00000000000..dbe45dcc52a --- /dev/null +++ b/transports/websys-websocket/src/lib.rs @@ -0,0 +1,403 @@ +// Copyright (C) 2023 Vince Vasta +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +//! Libp2p websocket transports built on [web-sys](https://rustwasm.github.io/wasm-bindgen/web-sys/index.html). +use futures::{future::Ready, io, prelude::*}; +use libp2p_core::{ + multiaddr::{Multiaddr, Protocol}, + transport::{ListenerId, TransportError, TransportEvent}, +}; +use parking_lot::Mutex; +use send_wrapper::SendWrapper; +use wasm_bindgen::{prelude::*, JsCast}; +use web_sys::{MessageEvent, WebSocket}; + +use std::{ + pin::Pin, + sync::Arc, + task::Poll, + task::{Context, Waker}, +}; + +/// A Websocket transport that can be used in a wasm environment. +/// +/// ## Example +/// +/// To create an authenticated transport instance with Noise protocol and Yamux: +/// +/// ``` +/// # use libp2p_core::{upgrade::Version, Transport}; +/// # use libp2p_identity::Keypair; +/// # use libp2p_yamux::YamuxConfig; +/// # use libp2p_noise::NoiseAuthenticated; +/// let local_key = Keypair::generate_ed25519(); +/// let transport = libp2p_websys_websocket::Transport::default() +/// .upgrade(Version::V1) +/// .authenticate(NoiseAuthenticated::xx(&local_key).unwrap()) +/// .multiplex(YamuxConfig::default()) +/// .boxed(); +/// ``` +/// +#[derive(Default)] +pub struct Transport { + _private: (), +} + +impl libp2p_core::Transport for Transport { + type Output = Connection; + type Error = Error; + type ListenerUpgrade = Ready>; + type Dial = Pin> + Send>>; + + fn listen_on(&mut self, addr: Multiaddr) -> Result> { + Err(TransportError::MultiaddrNotSupported(addr)) + } + + fn remove_listener(&mut self, _id: ListenerId) -> bool { + false + } + + fn dial(&mut self, addr: Multiaddr) -> Result> { + let url = extract_websocket_url(&addr) + .ok_or_else(|| TransportError::MultiaddrNotSupported(addr))?; + + Ok(async move { + let socket = match WebSocket::new(&url) { + Ok(ws) => ws, + Err(_) => return Err(Error::invalid_websocket_url(&url)), + }; + + Ok(Connection::new(socket)) + } + .boxed()) + } + + fn dial_as_listener( + &mut self, + addr: Multiaddr, + ) -> Result> { + Err(TransportError::MultiaddrNotSupported(addr)) + } + + fn poll( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> std::task::Poll> { + Poll::Pending + } + + fn address_translation(&self, _listen: &Multiaddr, _observed: &Multiaddr) -> Option { + None + } +} + +// Try to convert Multiaddr to a Websocket url. +fn extract_websocket_url(addr: &Multiaddr) -> Option { + let mut protocols = addr.iter(); + let host_port = match (protocols.next(), protocols.next()) { + (Some(Protocol::Ip4(ip)), Some(Protocol::Tcp(port))) => { + format!("{ip}:{port}") + } + (Some(Protocol::Ip6(ip)), Some(Protocol::Tcp(port))) => { + format!("[{ip}]:{port}") + } + (Some(Protocol::Dns(h)), Some(Protocol::Tcp(port))) + | (Some(Protocol::Dns4(h)), Some(Protocol::Tcp(port))) + | (Some(Protocol::Dns6(h)), Some(Protocol::Tcp(port))) + | (Some(Protocol::Dnsaddr(h)), Some(Protocol::Tcp(port))) => { + format!("{}:{}", &h, port) + } + _ => return None, + }; + + let (scheme, wspath) = match protocols.next() { + Some(Protocol::Ws(path)) => ("ws", path.into_owned()), + Some(Protocol::Wss(path)) => ("wss", path.into_owned()), + _ => return None, + }; + + Some(format!("{scheme}://{host_port}{wspath}")) +} + +#[derive(thiserror::Error, Debug)] +#[error("{msg}")] +pub struct Error { + msg: String, +} + +impl Error { + fn invalid_websocket_url(url: &str) -> Self { + Self { + msg: format!("Invalid websocket url: {url}"), + } + } +} + +/// A Websocket connection created by the [`Transport`]. +pub struct Connection { + shared: Arc>, +} + +struct Shared { + state: State, + read_waker: Option, + write_waker: Option, + socket: SendWrapper, + closures: Option>, +} + +enum State { + Connecting, + Open { buffer: Vec }, + Closing, + Closed, + Error, +} + +impl Shared { + fn wake_read_write(&self) { + self.wake_read(); + self.wake_write() + } + + fn wake_write(&self) { + if let Some(waker) = &self.write_waker { + waker.wake_by_ref(); + } + } + + fn wake_read(&self) { + if let Some(waker) = &self.read_waker { + waker.wake_by_ref(); + } + } +} + +type Closures = ( + Closure, + Closure, + Closure, + Closure, +); + +impl Connection { + fn new(socket: WebSocket) -> Self { + socket.set_binary_type(web_sys::BinaryType::Arraybuffer); + + let shared = Arc::new(Mutex::new(Shared { + state: State::Connecting, + read_waker: None, + write_waker: None, + socket: SendWrapper::new(socket.clone()), + closures: None, + })); + + let open_callback = Closure::::new({ + let weak_shared = Arc::downgrade(&shared); + move || { + if let Some(shared) = weak_shared.upgrade() { + let mut locked = shared.lock(); + locked.state = State::Open { + buffer: Vec::default(), + }; + locked.wake_read_write(); + } + } + }); + socket.set_onopen(Some(open_callback.as_ref().unchecked_ref())); + + let message_callback = Closure::::new({ + let weak_shared = Arc::downgrade(&shared); + move |e: MessageEvent| { + let buf = match e.data().dyn_into::() { + Ok(buf) => buf, + _ => { + debug_assert!(false, "Unexpected data format {:?}", e.data()); + return; + } + }; + + let shared = match weak_shared.upgrade() { + Some(shared) => shared, + None => return, + }; + + let mut locked = shared.lock(); + let bytes = js_sys::Uint8Array::new(&buf).to_vec(); + + if let State::Open { buffer } = &mut locked.state { + buffer.extend(bytes.into_iter()); + locked.wake_read(); + } + } + }); + socket.set_onmessage(Some(message_callback.as_ref().unchecked_ref())); + + let error_callback = Closure::::new({ + let weak_shared = Arc::downgrade(&shared); + move |_| { + // The error event for error callback doesn't give any information and + // generates error on the browser console we just signal it to the + // stream. + if let Some(shared) = weak_shared.upgrade() { + let mut locked = shared.lock(); + locked.state = State::Error; + locked.wake_read_write(); + } + } + }); + socket.set_onerror(Some(error_callback.as_ref().unchecked_ref())); + + let close_callback = Closure::::new({ + let weak_shared = Arc::downgrade(&shared); + move |_| { + if let Some(shared) = weak_shared.upgrade() { + let mut locked = shared.lock(); + locked.state = State::Closed; + locked.wake_write(); + } + } + }); + socket.set_onclose(Some(close_callback.as_ref().unchecked_ref())); + + // Manage closures memory. + let closures = SendWrapper::new(( + open_callback, + message_callback, + error_callback, + close_callback, + )); + + shared.lock().closures = Some(closures); + + Self { shared } + } +} + +impl AsyncRead for Connection { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + let mut shared = self.shared.lock(); + + let buffer = match &mut shared.state { + State::Connecting => { + shared.read_waker = Some(cx.waker().clone()); + return Poll::Pending; + } + State::Open { buffer } if buffer.is_empty() => { + shared.read_waker = Some(cx.waker().clone()); + return Poll::Pending; + } + State::Open { buffer } => buffer, + State::Closed | State::Closing => { + return Poll::Ready(Ok(0)); + } + State::Error => { + return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "Socket error"))); + } + }; + + let n = buffer.len().min(buf.len()); + + let remaining_buffer = buffer.split_off(n); + buf.copy_from_slice(buffer); + buffer.clear(); + *buffer = remaining_buffer; + + Poll::Ready(Ok(n)) + } +} + +impl AsyncWrite for Connection { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let mut shared = self.shared.lock(); + + match &shared.state { + State::Connecting => { + shared.write_waker = Some(cx.waker().clone()); + return Poll::Pending; + } + State::Closed | State::Closing => { + return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into())); + } + State::Error => { + return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "Socket error"))); + } + State::Open { .. } => {} + } + + shared.socket.send_with_u8_array(buf).map_err(|e| { + io::Error::new( + io::ErrorKind::Other, + format!( + "Failed to write data: {}", + e.as_string().unwrap_or_default() + ), + ) + })?; + + Poll::Ready(Ok(buf.len())) + } + + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut shared = self.shared.lock(); + + match &shared.state { + State::Open { .. } | State::Connecting => { + let _ = shared.socket.close(); + shared.state = State::Closing; + + shared.write_waker = Some(cx.waker().clone()); + Poll::Pending + } + State::Closing => { + shared.write_waker = Some(cx.waker().clone()); + Poll::Pending + } + State::Closed => Poll::Ready(Ok(())), + State::Error => Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "Socket error"))), + } + } +} + +impl Drop for Connection { + fn drop(&mut self) { + const GO_AWAY_STATUS_CODE: u16 = 1001; // See https://www.rfc-editor.org/rfc/rfc6455.html#section-7.4.1. + + let shared = self.shared.lock(); + + if let State::Connecting | State::Open { .. } = shared.state { + let _ = shared + .socket + .close_with_code_and_reason(GO_AWAY_STATUS_CODE, "connection dropped"); + } + } +}