diff --git a/io/streams.ts b/io/streams.ts index 5c329eb486be..da3460e81bbc 100644 --- a/io/streams.ts +++ b/io/streams.ts @@ -1,5 +1,52 @@ // Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. +/** Create a `Deno.Reader` from an iterable of `Uint8Array`s. + * + * // Server-sent events: Send runtime metrics to the client every second. + * request.respond({ + * headers: new Headers({ "Content-Type": "text/event-stream" }), + * body: readerFromIterable((async function* () { + * while (true) { + * await new Promise((r) => setTimeout(r, 1000)); + * const message = `data: ${JSON.stringify(Deno.metrics())}\n\n`; + * yield new TextEncoder().encode(message); + * } + * })()), + * }); + */ +export function readerFromIterable( + iterable: Iterable | AsyncIterable, +): Deno.Reader { + const iterator: Iterator | AsyncIterator = + (iterable as AsyncIterable)[Symbol.asyncIterator]?.() ?? + (iterable as Iterable)[Symbol.iterator]?.(); + const buffer: Deno.Buffer = new Deno.Buffer(); + return { + async read(p: Uint8Array): Promise { + if (buffer.length == 0) { + const result = await iterator.next(); + if (result.done) { + return null; + } else { + if (result.value.byteLength <= p.byteLength) { + p.set(result.value); + return result.value.byteLength; + } + p.set(result.value.subarray(0, p.byteLength)); + await Deno.writeAll(buffer, result.value.subarray(p.byteLength)); + return p.byteLength; + } + } else { + const n = await buffer.read(p); + if (n == null) { + return this.read(p); + } + return n; + } + }, + }; +} + /** Create a `Writer` from a `WritableStreamDefaultReader`. */ export function writerFromStreamWriter( streamWriter: WritableStreamDefaultWriter, diff --git a/io/streams_test.ts b/io/streams_test.ts index f46dbdafb64c..061682dace4a 100644 --- a/io/streams_test.ts +++ b/io/streams_test.ts @@ -3,6 +3,7 @@ import { assert, assertEquals } from "../testing/asserts.ts"; import { readableStreamFromIterable, + readerFromIterable, readerFromStreamReader, writableStreamFromWriter, writerFromStreamWriter, @@ -15,6 +16,27 @@ function repeat(c: string, bytes: number): Uint8Array { return ui8; } +Deno.test("[io] readerFromIterable()", async function () { + const reader = readerFromIterable((function* () { + const encoder = new TextEncoder(); + for (const string of ["hello", "deno", "foo"]) { + yield encoder.encode(string); + } + })()); + + const readStrings = []; + const decoder = new TextDecoder(); + const p = new Uint8Array(4); + while (true) { + const n = await reader.read(p); + if (n == null) { + break; + } + readStrings.push(decoder.decode(p.slice(0, n))); + } + assertEquals(readStrings, ["hell", "o", "deno", "foo"]); +}); + Deno.test("[io] writerFromStreamWriter()", async function () { const written: string[] = []; const chunks: string[] = ["hello", "deno", "land"];