Skip to content

Commit

Permalink
feat(fetch): accept async iterables for body (#26882)
Browse files Browse the repository at this point in the history
Reland of #24623, but with a fix for `String` objects.

Co-authored-by: crowlkats <[email protected]>
  • Loading branch information
lucacasonato and crowlKats authored Nov 15, 2024
1 parent 3f26310 commit b8cf259
Show file tree
Hide file tree
Showing 10 changed files with 251 additions and 51 deletions.
14 changes: 14 additions & 0 deletions ext/fetch/22_body.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { core, primordials } from "ext:core/mod.js";
const {
isAnyArrayBuffer,
isArrayBuffer,
isStringObject,
} = core;
const {
ArrayBufferIsView,
Expand Down Expand Up @@ -466,6 +467,8 @@ function extractBody(object) {
if (object.locked || isReadableStreamDisturbed(object)) {
throw new TypeError("ReadableStream is locked or disturbed");
}
} else if (object[webidl.AsyncIterable] === webidl.AsyncIterable) {
stream = ReadableStream.from(object.open());
}
if (typeof source === "string") {
// WARNING: this deviates from spec (expects length to be set)
Expand All @@ -483,6 +486,9 @@ function extractBody(object) {
return { body, contentType };
}

webidl.converters["async iterable<Uint8Array>"] = webidl
.createAsyncIterableConverter(webidl.converters.Uint8Array);

webidl.converters["BodyInit_DOMString"] = (V, prefix, context, opts) => {
// Union for (ReadableStream or Blob or ArrayBufferView or ArrayBuffer or FormData or URLSearchParams or USVString)
if (ObjectPrototypeIsPrototypeOf(ReadableStreamPrototype, V)) {
Expand All @@ -501,6 +507,14 @@ webidl.converters["BodyInit_DOMString"] = (V, prefix, context, opts) => {
if (ArrayBufferIsView(V)) {
return webidl.converters["ArrayBufferView"](V, prefix, context, opts);
}
if (webidl.isAsyncIterable(V) && !isStringObject(V)) {
return webidl.converters["async iterable<Uint8Array>"](
V,
prefix,
context,
opts,
);
}
}
// BodyInit conversion is passed to extractBody(), which calls core.encode().
// core.encode() will UTF-8 encode strings with replacement, being equivalent to the USV normalization.
Expand Down
2 changes: 2 additions & 0 deletions ext/fetch/lib.deno_fetch.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ type BodyInit =
| FormData
| URLSearchParams
| ReadableStream<Uint8Array>
| Iterable<Uint8Array>
| AsyncIterable<Uint8Array>
| string;
/** @category Fetch */
type RequestDestination =
Expand Down
63 changes: 15 additions & 48 deletions ext/web/06_streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ const {
String,
Symbol,
SymbolAsyncIterator,
SymbolIterator,
SymbolFor,
TypeError,
TypedArrayPrototypeGetBuffer,
Expand Down Expand Up @@ -5084,34 +5083,6 @@ function initializeCountSizeFunction(globalObject) {
WeakMapPrototypeSet(countSizeFunctionWeakMap, globalObject, size);
}

// Ref: https://tc39.es/ecma262/#sec-getiterator
function getAsyncOrSyncIterator(obj) {
let iterator;
if (obj[SymbolAsyncIterator] != null) {
iterator = obj[SymbolAsyncIterator]();
if (!isObject(iterator)) {
throw new TypeError(
"[Symbol.asyncIterator] returned a non-object value",
);
}
} else if (obj[SymbolIterator] != null) {
iterator = obj[SymbolIterator]();
if (!isObject(iterator)) {
throw new TypeError("[Symbol.iterator] returned a non-object value");
}
} else {
throw new TypeError("No iterator found");
}
if (typeof iterator.next !== "function") {
throw new TypeError("iterator.next is not a function");
}
return iterator;
}

function isObject(x) {
return (typeof x === "object" && x != null) || typeof x === "function";
}

const _resourceBacking = Symbol("[[resourceBacking]]");
// This distinction exists to prevent unrefable streams being used in
// regular fast streams that are unaware of refability
Expand Down Expand Up @@ -5197,21 +5168,22 @@ class ReadableStream {
}

static from(asyncIterable) {
const prefix = "Failed to execute 'ReadableStream.from'";
webidl.requiredArguments(
arguments.length,
1,
"Failed to execute 'ReadableStream.from'",
prefix,
);
asyncIterable = webidl.converters.any(asyncIterable);

const iterator = getAsyncOrSyncIterator(asyncIterable);
asyncIterable = webidl.converters["async iterable<any>"](
asyncIterable,
prefix,
"Argument 1",
);
const iter = asyncIterable.open();

const stream = createReadableStream(noop, async () => {
// deno-lint-ignore prefer-primordials
const res = await iterator.next();
if (!isObject(res)) {
throw new TypeError("iterator.next value is not an object");
}
const res = await iter.next();
if (res.done) {
readableStreamDefaultControllerClose(stream[_controller]);
} else {
Expand All @@ -5221,17 +5193,8 @@ class ReadableStream {
);
}
}, async (reason) => {
if (iterator.return == null) {
return undefined;
} else {
// deno-lint-ignore prefer-primordials
const res = await iterator.return(reason);
if (!isObject(res)) {
throw new TypeError("iterator.return value is not an object");
} else {
return undefined;
}
}
// deno-lint-ignore prefer-primordials
await iter.return(reason);
}, 0);
return stream;
}
Expand Down Expand Up @@ -6892,6 +6855,10 @@ webidl.converters.StreamPipeOptions = webidl
{ key: "signal", converter: webidl.converters.AbortSignal },
]);

webidl.converters["async iterable<any>"] = webidl.createAsyncIterableConverter(
webidl.converters.any,
);

internals.resourceForReadableStream = resourceForReadableStream;

export {
Expand Down
126 changes: 126 additions & 0 deletions ext/webidl/00_webidl.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const {
Float32Array,
Float64Array,
FunctionPrototypeBind,
FunctionPrototypeCall,
Int16Array,
Int32Array,
Int8Array,
Expand Down Expand Up @@ -77,6 +78,7 @@ const {
StringPrototypeToWellFormed,
Symbol,
SymbolIterator,
SymbolAsyncIterator,
SymbolToStringTag,
TypedArrayPrototypeGetBuffer,
TypedArrayPrototypeGetSymbolToStringTag,
Expand Down Expand Up @@ -920,6 +922,127 @@ function createSequenceConverter(converter) {
};
}

function isAsyncIterable(obj) {
if (obj[SymbolAsyncIterator] === undefined) {
if (obj[SymbolIterator] === undefined) {
return false;
}
}

return true;
}

const AsyncIterable = Symbol("[[asyncIterable]]");

function createAsyncIterableConverter(converter) {
return function (
V,
prefix = undefined,
context = undefined,
opts = { __proto__: null },
) {
if (type(V) !== "Object") {
throw makeException(
TypeError,
"can not be converted to async iterable.",
prefix,
context,
);
}

let isAsync = true;
let method = V[SymbolAsyncIterator];
if (method === undefined) {
method = V[SymbolIterator];

if (method === undefined) {
throw makeException(
TypeError,
"is not iterable.",
prefix,
context,
);
}

isAsync = false;
}

return {
value: V,
[AsyncIterable]: AsyncIterable,
open(context) {
const iter = FunctionPrototypeCall(method, V);
if (type(iter) !== "Object") {
throw new TypeError(
`${context} could not be iterated because iterator method did not return object, but ${
type(iter)
}.`,
);
}

let asyncIterator = iter;

if (!isAsync) {
asyncIterator = {
// deno-lint-ignore require-await
async next() {
// deno-lint-ignore prefer-primordials
return iter.next();
},
};
}

return {
async next() {
// deno-lint-ignore prefer-primordials
const iterResult = await asyncIterator.next();
if (type(iterResult) !== "Object") {
throw TypeError(
`${context} failed to iterate next value because the next() method did not return an object, but ${
type(iterResult)
}.`,
);
}

if (iterResult.done) {
return { done: true };
}

const iterValue = converter(
iterResult.value,
`${context} failed to iterate next value`,
`The value returned from the next() method`,
opts,
);

return { done: false, value: iterValue };
},
async return(reason) {
if (asyncIterator.return === undefined) {
return undefined;
}

// deno-lint-ignore prefer-primordials
const returnPromiseResult = await asyncIterator.return(reason);
if (type(returnPromiseResult) !== "Object") {
throw TypeError(
`${context} failed to close iterator because the return() method did not return an object, but ${
type(returnPromiseResult)
}.`,
);
}

return undefined;
},
[SymbolAsyncIterator]() {
return this;
},
};
},
};
};
}

function createRecordConverter(keyConverter, valueConverter) {
return (V, prefix, context, opts) => {
if (type(V) !== "Object") {
Expand Down Expand Up @@ -1302,9 +1425,11 @@ function setlike(obj, objPrototype, readonly) {

export {
assertBranded,
AsyncIterable,
brand,
configureInterface,
converters,
createAsyncIterableConverter,
createBranded,
createDictionaryConverter,
createEnumConverter,
Expand All @@ -1315,6 +1440,7 @@ export {
createSequenceConverter,
illegalConstructor,
invokeCallbackFunction,
isAsyncIterable,
makeException,
mixinPairIterable,
requiredArguments,
Expand Down
26 changes: 26 additions & 0 deletions ext/webidl/internal.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,27 @@ declare module "ext:deno_webidl/00_webidl.js" {
opts?: any,
) => T[];

/**
* Create a converter that converts an async iterable of the inner type.
*/
function createAsyncIterableConverter<V, T>(
converter: (
v: V,
prefix?: string,
context?: string,
opts?: any,
) => T,
): (
v: any,
prefix?: string,
context?: string,
opts?: any,
) => ConvertedAsyncIterable<V, T>;

interface ConvertedAsyncIterable<V, T> extends AsyncIterableIterator<T> {
value: V;
}

/**
* Create a converter that converts a Promise of the inner type.
*/
Expand Down Expand Up @@ -559,4 +580,9 @@ declare module "ext:deno_webidl/00_webidl.js" {
| "Symbol"
| "BigInt"
| "Object";

/**
* Check whether a value is an async iterable.
*/
function isAsyncIterable(v: any): boolean;
}
1 change: 1 addition & 0 deletions tests/integration/node_unit_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ util::unit_test_factory!(
dgram_test,
domain_test,
fs_test,
fetch_test,
http_test,
http2_test,
inspector_test,
Expand Down
27 changes: 27 additions & 0 deletions tests/unit/fetch_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2119,3 +2119,30 @@ Deno.test(
await server;
},
);

Deno.test("fetch async iterable", async () => {
const iterable = (async function* () {
yield new Uint8Array([1, 2, 3, 4, 5]);
yield new Uint8Array([6, 7, 8, 9, 10]);
})();
const res = new Response(iterable);
const actual = await res.bytes();
const expected = new Uint8Array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
assertEquals(actual, expected);
});

Deno.test("fetch iterable", async () => {
const iterable = (function* () {
yield new Uint8Array([1, 2, 3, 4, 5]);
yield new Uint8Array([6, 7, 8, 9, 10]);
})();
const res = new Response(iterable);
const actual = await res.bytes();
const expected = new Uint8Array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
assertEquals(actual, expected);
});

Deno.test("fetch string object", async () => {
const res = new Response(Object("hello"));
assertEquals(await res.text(), "hello");
});
Loading

0 comments on commit b8cf259

Please sign in to comment.