Skip to content

Commit

Permalink
feat(io/streams): Add readerFromIterable()
Browse files Browse the repository at this point in the history
  • Loading branch information
nayeemrmn committed Feb 26, 2021
1 parent f285c79 commit f2aa9ff
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 0 deletions.
47 changes: 47 additions & 0 deletions io/streams.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,52 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.

/** Create a `Deno.Reader` from an iterable of `Uint8Array`s.
*
* // Server-sent events: Send runtime metrics to the client every second.
* request.respond({
* headers: new Headers({ "Content-Type": "text/event-stream" }),
* body: readerFromIterable((async function* () {
* while (true) {
* await new Promise((r) => setTimeout(r, 1000));
* const message = `data: ${JSON.stringify(Deno.metrics())}\n\n`;
* yield new TextEncoder().encode(message);
* }
* })()),
* });
*/
export function readerFromIterable(
iterable: Iterable<Uint8Array> | AsyncIterable<Uint8Array>,
): Deno.Reader {
const iterator: Iterator<Uint8Array> | AsyncIterator<Uint8Array> =
(iterable as AsyncIterable<Uint8Array>)[Symbol.asyncIterator]?.() ??
(iterable as Iterable<Uint8Array>)[Symbol.iterator]?.();
const buffer: Deno.Buffer = new Deno.Buffer();
return {
async read(p: Uint8Array): Promise<number | null> {
if (buffer.length == 0) {
const result = await iterator.next();
if (result.done) {
return null;
} else {
if (result.value.byteLength <= p.byteLength) {
p.set(result.value);
return result.value.byteLength;
}
p.set(result.value.subarray(0, p.byteLength));
await Deno.writeAll(buffer, result.value.subarray(p.byteLength));
return p.byteLength;
}
} else {
const n = await buffer.read(p);
if (n == null) {
return this.read(p);
}
return n;
}
},
};
}

/** Create a `Writer` from a `WritableStreamDefaultReader`. */
export function writerFromStreamWriter(
streamWriter: WritableStreamDefaultWriter<Uint8Array>,
Expand Down
22 changes: 22 additions & 0 deletions io/streams_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import { assert, assertEquals } from "../testing/asserts.ts";
import {
readableStreamFromIterable,
readerFromIterable,
readerFromStreamReader,
writableStreamFromWriter,
writerFromStreamWriter,
Expand All @@ -15,6 +16,27 @@ function repeat(c: string, bytes: number): Uint8Array {
return ui8;
}

Deno.test("[io] readerFromIterable()", async function () {
const reader = readerFromIterable((function* () {
const encoder = new TextEncoder();
for (const string of ["hello", "deno", "foo"]) {
yield encoder.encode(string);
}
})());

const readStrings = [];
const decoder = new TextDecoder();
const p = new Uint8Array(4);
while (true) {
const n = await reader.read(p);
if (n == null) {
break;
}
readStrings.push(decoder.decode(p.slice(0, n)));
}
assertEquals(readStrings, ["hell", "o", "deno", "foo"]);
});

Deno.test("[io] writerFromStreamWriter()", async function () {
const written: string[] = [];
const chunks: string[] = ["hello", "deno", "land"];
Expand Down

0 comments on commit f2aa9ff

Please sign in to comment.