Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use web-sys for raw types #22

Merged
merged 22 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## Unreleased

* Stop calling `byobRequest.respond(0)` on cancel ([#16](https://github.com/MattiasBuelens/wasm-streams/pull/16))
* ⚠ **Breaking change:** The system modules (`readable::sys`, `writable::sys` and `transform::sys`) now re-export directly from [the `web-sys` crate](https://docs.rs/web-sys/latest/web_sys/). This should make it easier to use `from_raw()`, `as_raw()` and `into_raw()`. ([#22](https://github.com/MattiasBuelens/wasm-streams/pull/22/))

## v0.3.0 (2022-10-16)

Expand Down
37 changes: 29 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,45 @@ exclude = [
crate-type = ["cdylib", "rlib"]

[dependencies]
js-sys = "^0.3.47"
wasm-bindgen = "0.2.73"
wasm-bindgen-futures = "^0.4.20"
futures-util = { version = "0.3.21", features = ["io", "sink"] }
js-sys = "^0.3.64"
wasm-bindgen = "0.2.87"
wasm-bindgen-futures = "^0.4.37"
futures-util = { version = "^0.3.28", features = ["io", "sink"] }

[dependencies.web-sys]
version = "^0.3.47"
version = "^0.3.64"
features = [
"AbortSignal",
"QueuingStrategy",
"ReadableStream",
"ReadableStreamType",
"ReadableWritablePair",
"ReadableStreamByobReader",
"ReadableStreamReaderMode",
"ReadableStreamReadResult",
"ReadableStreamByobRequest",
"ReadableStreamDefaultReader",
"ReadableByteStreamController",
"ReadableStreamGetReaderOptions",
"ReadableStreamDefaultController",
"StreamPipeOptions",
"TransformStream",
"TransformStreamDefaultController",
"Transformer",
"UnderlyingSink",
"UnderlyingSource",
"WritableStream",
"WritableStreamDefaultController",
"WritableStreamDefaultWriter",
]

[dev-dependencies]
wasm-bindgen-test = "0.3.20"
wasm-bindgen-test = "0.3.37"
tokio = { version = "^1", features = ["macros", "rt"] }
pin-project = "^1.0.6"
pin-project = "^1.1.3"

[dev-dependencies.web-sys]
version = "^0.3.47"
version = "^0.3.64"
features = [
"console",
"AbortSignal",
Expand Down
2 changes: 1 addition & 1 deletion examples/fetch_as_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

// Get the response's body as a JS ReadableStream
let raw_body = resp.body().unwrap_throw();
let body = ReadableStream::from_raw(raw_body.dyn_into().unwrap_throw());
let body = ReadableStream::from_raw(raw_body);

// Convert the JS ReadableStream to a Rust stream
let mut stream = body.into_stream();
Expand Down
6 changes: 3 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
//! [`WritableStream`](crate::WritableStream) and [`TransformStream`](crate::TransformStream).
//! It also supports converting from and into [`Stream`]s and [`Sink`]s from the [futures] crate.
//!
//! [`Stream`]: https://docs.rs/futures/0.3.18/futures/stream/trait.Stream.html
//! [`Sink`]: https://docs.rs/futures/0.3.18/futures/sink/trait.Sink.html
//! [futures]: https://docs.rs/futures/0.3.18/futures/index.html
//! [`Stream`]: https://docs.rs/futures/0.3.28/futures/stream/trait.Stream.html
//! [`Sink`]: https://docs.rs/futures/0.3.28/futures/sink/trait.Sink.html
//! [futures]: https://docs.rs/futures/0.3.28/futures/index.html

pub use readable::ReadableStream;
pub use transform::TransformStream;
Expand Down
37 changes: 27 additions & 10 deletions src/queuing_strategy/mod.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,38 @@
use js_sys::Object;
use wasm_bindgen::prelude::*;

#[wasm_bindgen]
#[derive(Debug)]
pub(crate) struct QueuingStrategy {
high_water_mark: f64,
extern "C" {
#[wasm_bindgen(extends = Object, js_name = QueuingStrategy)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) type QueuingStrategy;

#[wasm_bindgen(method, getter, js_name = highWaterMark)]
pub fn high_water_mark(this: &QueuingStrategy) -> f64;

#[wasm_bindgen(method, setter, js_name = highWaterMark)]
pub fn set_high_water_mark(this: &QueuingStrategy, value: f64);
}

impl QueuingStrategy {
pub fn new(high_water_mark: f64) -> Self {
Self { high_water_mark }
let strategy = Object::new().unchecked_into::<QueuingStrategy>();
strategy.set_high_water_mark(high_water_mark);
strategy
}
}

#[wasm_bindgen]
impl QueuingStrategy {
#[wasm_bindgen(getter, js_name = highWaterMark)]
pub fn high_water_mark(&self) -> f64 {
self.high_water_mark
#[cfg(web_sys_unstable_apis)]
pub fn from_raw(raw: web_sys::QueuingStrategy) -> Self {
raw.unchecked_into()
}

#[cfg(web_sys_unstable_apis)]
pub fn as_raw(&self) -> &web_sys::QueuingStrategy {
self.unchecked_ref()
}

#[cfg(web_sys_unstable_apis)]
pub fn into_raw(self) -> web_sys::QueuingStrategy {
self.unchecked_into()
}
}
50 changes: 27 additions & 23 deletions src/readable/byob_reader.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::marker::PhantomData;

use js_sys::Uint8Array;
use wasm_bindgen::{throw_val, JsCast, JsValue};
use js_sys::{Object, Uint8Array};
use wasm_bindgen::{JsCast, JsValue};
use wasm_bindgen_futures::JsFuture;

use crate::util::{checked_cast_to_usize, clamp_to_u32, promise_to_void_future};
Expand All @@ -23,9 +23,14 @@ pub struct ReadableStreamBYOBReader<'stream> {
impl<'stream> ReadableStreamBYOBReader<'stream> {
pub(crate) fn new(stream: &mut ReadableStream) -> Result<Self, js_sys::Error> {
Ok(Self {
raw: stream.as_raw().get_reader_with_options(
sys::ReadableStreamGetReaderOptions::new(sys::ReadableStreamReaderMode::BYOB),
)?,
raw: stream
.as_raw()
.unchecked_ref::<sys::ReadableStreamExt>()
.try_get_reader_with_options(
sys::ReadableStreamGetReaderOptions::new()
.mode(sys::ReadableStreamReaderMode::Byob),
)?
.unchecked_into(),
_stream: PhantomData,
})
}
Expand Down Expand Up @@ -109,20 +114,18 @@ impl<'stream> ReadableStreamBYOBReader<'stream> {
let buffer_len = buffer.byte_length();
// Limit view to destination slice's length.
let dst_len = clamp_to_u32(dst.len());
let view = buffer
.subarray(0, dst_len)
.unchecked_into::<sys::ArrayBufferView>();
let view = buffer.subarray(0, dst_len).unchecked_into::<Object>();
// Read into view. This transfers `buffer.buffer()`.
let promise = self.as_raw().read(&view);
let js_value = JsFuture::from(promise).await?;
let result = sys::ReadableStreamBYOBReadResult::from(js_value);
let filled_view = match result.value() {
Some(view) => view,
None => {
// No new view was returned. The stream must have been canceled.
assert!(result.is_done());
return Ok((0, None));
}
let promise = self.as_raw().read_with_array_buffer_view(&view);
let js_result = JsFuture::from(promise).await?;
let result = sys::ReadableStreamReadResult::from(js_result);
let js_value = result.value();
let filled_view = if js_value.is_undefined() {
// No new view was returned. The stream must have been canceled.
assert!(result.is_done());
return Ok((0, None));
} else {
js_value.unchecked_into::<Uint8Array>()
};
let filled_len = checked_cast_to_usize(filled_view.byte_length());
debug_assert!(filled_len <= dst.len());
Expand Down Expand Up @@ -157,9 +160,7 @@ impl<'stream> ReadableStreamBYOBReader<'stream> {
}

fn release_lock_mut(&mut self) {
self.as_raw()
.release_lock()
.unwrap_or_else(|error| throw_val(error.into()))
self.as_raw().release_lock()
}

/// Try to [release](https://streams.spec.whatwg.org/#release-a-lock) this reader's lock on the
Expand All @@ -175,7 +176,10 @@ impl<'stream> ReadableStreamBYOBReader<'stream> {
/// return an error and leave the reader locked to the stream.
#[inline]
pub fn try_release_lock(self) -> Result<(), (js_sys::Error, Self)> {
self.as_raw().release_lock().map_err(|error| (error, self))
self.as_raw()
.unchecked_ref::<sys::ReadableStreamReaderExt>()
.try_release_lock()
.map_err(|err| (err, self))
}

/// Converts this `ReadableStreamBYOBReader` into an [`AsyncRead`].
Expand All @@ -185,7 +189,7 @@ impl<'stream> ReadableStreamBYOBReader<'stream> {
/// still usable. This allows reading only a few bytes from the `AsyncRead`, while still
/// allowing another reader to read the remaining bytes later on.
///
/// [`AsyncRead`]: https://docs.rs/futures/0.3.18/futures/io/trait.AsyncRead.html
/// [`AsyncRead`]: https://docs.rs/futures/0.3.28/futures/io/trait.AsyncRead.html
#[inline]
pub fn into_async_read(self) -> IntoAsyncRead<'stream> {
IntoAsyncRead::new(self, false)
Expand Down
24 changes: 15 additions & 9 deletions src/readable/default_reader.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::marker::PhantomData;

use wasm_bindgen::{throw_val, JsValue};
use wasm_bindgen::JsCast;
use wasm_bindgen::JsValue;
use wasm_bindgen_futures::JsFuture;

use crate::util::promise_to_void_future;
Expand All @@ -22,7 +23,11 @@ pub struct ReadableStreamDefaultReader<'stream> {
impl<'stream> ReadableStreamDefaultReader<'stream> {
pub(crate) fn new(stream: &mut ReadableStream) -> Result<Self, js_sys::Error> {
Ok(Self {
raw: stream.as_raw().get_reader()?,
raw: stream
.as_raw()
.unchecked_ref::<sys::ReadableStreamExt>()
.try_get_reader()?
.unchecked_into(),
_stream: PhantomData,
})
}
Expand Down Expand Up @@ -65,8 +70,8 @@ impl<'stream> ReadableStreamDefaultReader<'stream> {
/// * If the stream encounters an `error`, this returns `Err(error)`.
pub async fn read(&mut self) -> Result<Option<JsValue>, JsValue> {
let promise = self.as_raw().read();
let js_value = JsFuture::from(promise).await?;
let result = sys::ReadableStreamDefaultReadResult::from(js_value);
let js_result = JsFuture::from(promise).await?;
let result = sys::ReadableStreamReadResult::from(js_result);
if result.is_done() {
Ok(None)
} else {
Expand All @@ -91,9 +96,7 @@ impl<'stream> ReadableStreamDefaultReader<'stream> {
}

fn release_lock_mut(&mut self) {
self.as_raw()
.release_lock()
.unwrap_or_else(|error| throw_val(error.into()))
self.as_raw().release_lock()
}

/// Try to [release](https://streams.spec.whatwg.org/#release-a-lock) this reader's lock on the
Expand All @@ -109,7 +112,10 @@ impl<'stream> ReadableStreamDefaultReader<'stream> {
/// return an error and leave the reader locked to the stream.
#[inline]
pub fn try_release_lock(self) -> Result<(), (js_sys::Error, Self)> {
self.as_raw().release_lock().map_err(|error| (error, self))
self.as_raw()
.unchecked_ref::<sys::ReadableStreamReaderExt>()
.try_release_lock()
.map_err(|err| (err, self))
}

/// Converts this `ReadableStreamDefaultReader` into a [`Stream`].
Expand All @@ -119,7 +125,7 @@ impl<'stream> ReadableStreamDefaultReader<'stream> {
/// usable. This allows reading only a few chunks from the `Stream`, while still allowing
/// another reader to read the remaining chunks later on.
///
/// [`Stream`]: https://docs.rs/futures/0.3.18/futures/stream/trait.Stream.html
/// [`Stream`]: https://docs.rs/futures/0.3.28/futures/stream/trait.Stream.html
#[inline]
pub fn into_stream(self) -> IntoStream<'stream> {
IntoStream::new(self, false)
Expand Down
17 changes: 8 additions & 9 deletions src/readable/into_async_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ use core::task::{Context, Poll};
use futures_util::io::{AsyncRead, Error};
use futures_util::ready;
use futures_util::FutureExt;
use js_sys::Uint8Array;
use js_sys::{Object, Uint8Array};
use wasm_bindgen::prelude::*;
use wasm_bindgen::JsCast;
use wasm_bindgen_futures::JsFuture;

use crate::util::{checked_cast_to_usize, clamp_to_u32, js_to_io_error};

use super::sys::{ArrayBufferView, ReadableStreamBYOBReadResult};
use super::sys::ReadableStreamReadResult;
use super::ReadableStreamBYOBReader;

/// An [`AsyncRead`] for the [`into_async_read`](super::ReadableStream::into_async_read) method.
Expand All @@ -20,7 +20,7 @@ use super::ReadableStreamBYOBReader;
/// When this `AsyncRead` is dropped, it also drops its reader which in turn
/// [releases its lock](https://streams.spec.whatwg.org/#release-a-lock).
///
/// [`AsyncRead`]: https://docs.rs/futures/0.3.18/futures/io/trait.AsyncRead.html
/// [`AsyncRead`]: https://docs.rs/futures/0.3.28/futures/io/trait.AsyncRead.html
#[must_use = "readers do nothing unless polled"]
#[derive(Debug)]
pub struct IntoAsyncRead<'reader> {
Expand Down Expand Up @@ -84,13 +84,12 @@ impl<'reader> AsyncRead for IntoAsyncRead<'reader> {
_ => Uint8Array::new_with_length(buf_len),
};
// Limit to output buffer size
let buffer = buffer
.subarray(0, buf_len)
.unchecked_into::<ArrayBufferView>();
let buffer = buffer.subarray(0, buf_len).unchecked_into::<Object>();
match &self.reader {
Some(reader) => {
// Read into internal buffer and store its future
let fut = JsFuture::from(reader.as_raw().read(&buffer));
let fut =
JsFuture::from(reader.as_raw().read_with_array_buffer_view(&buffer));
self.fut.insert(fut)
}
None => {
Expand All @@ -108,14 +107,14 @@ impl<'reader> AsyncRead for IntoAsyncRead<'reader> {
// Read completed
Poll::Ready(match js_result {
Ok(js_value) => {
let result = ReadableStreamBYOBReadResult::from(js_value);
let result = ReadableStreamReadResult::from(js_value);
if result.is_done() {
// End of stream
self.discard_reader();
Ok(0)
} else {
// Cannot be canceled, so view must exist
let filled_view = result.value().unwrap_throw();
let filled_view = result.value().unchecked_into::<Uint8Array>();
// Copy bytes to output buffer
let filled_len = checked_cast_to_usize(filled_view.byte_length());
debug_assert!(filled_len <= buf.len());
Expand Down
2 changes: 1 addition & 1 deletion src/readable/into_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use super::ReadableStreamDefaultReader;
/// When this `Stream` is dropped, it also drops its reader which in turn
/// [releases its lock](https://streams.spec.whatwg.org/#release-a-lock).
///
/// [`Stream`]: https://docs.rs/futures/0.3.18/futures/stream/trait.Stream.html
/// [`Stream`]: https://docs.rs/futures/0.3.28/futures/stream/trait.Stream.html
#[must_use = "streams do nothing unless polled"]
#[derive(Debug)]
pub struct IntoStream<'reader> {
Expand Down
6 changes: 3 additions & 3 deletions src/readable/into_underlying_byte_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl Inner {
// We set autoAllocateChunkSize, so there should always be a BYOB request.
let request = controller.byob_request().unwrap_throw();
// Resize the buffer to fit the BYOB request.
let request_view = request.view().unwrap_throw();
let request_view = request.view().unwrap_throw().unchecked_into::<Uint8Array>();
let request_len = clamp_to_usize(request_view.byte_length());
if self.buffer.len() < request_len {
self.buffer.resize(request_len, 0);
Expand All @@ -114,7 +114,7 @@ impl Inner {
// The stream has closed, drop it.
self.discard();
controller.close()?;
request.respond(0)?;
request.respond_with_u32(0)?;
}
Ok(bytes_read) => {
// Copy read bytes from buffer to BYOB request view
Expand All @@ -127,7 +127,7 @@ impl Inner {
);
dest.copy_from(&self.buffer[0..bytes_read]);
// Respond to BYOB request
request.respond(bytes_read_u32)?;
request.respond_with_u32(bytes_read_u32)?;
}
Err(err) => {
// The stream encountered an error, drop it.
Expand Down
2 changes: 1 addition & 1 deletion src/readable/into_underlying_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl Inner {
// after the stream has closed or encountered an error.
let stream = self.stream.as_mut().unwrap_throw();
match stream.try_next().await {
Ok(Some(chunk)) => controller.enqueue(&chunk)?,
Ok(Some(chunk)) => controller.enqueue_with_chunk(&chunk)?,
Ok(None) => {
// The stream has closed, drop it.
self.stream = None;
Expand Down
Loading