Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(streams): LimitedTransformStream & LimitedBytesTransformStream #2007

Merged
merged 3 commits into from
Mar 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions streams/buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -166,3 +166,69 @@ export class Buffer {
this.#reslice(m);
}
}

/** A TransformStream that will only read & enqueue `size` amount of bytes.
* This operation is chunk based and not BYOB based,
* and as such will read more than needed.
*
* if options.error is set, then instead of terminating the stream,
* an error will be thrown.
*
* ```ts
* import { LimitedBytesTransformStream } from "./buffer.ts";
* const res = await fetch("https://example.com");
* const parts = res.body!
* .pipeThrough(new LimitedBytesTransformStream(512 * 1024));
* ```
*/
export class LimitedBytesTransformStream
extends TransformStream<Uint8Array, Uint8Array> {
#read = 0;
constructor(size: number, options: { error?: boolean } = {}) {
super({
transform: (chunk, controller) => {
if ((this.#read + chunk.byteLength) > size) {
if (options.error) {
throw new RangeError(`Exceeded byte size limit of '${size}'`);
} else {
controller.terminate();
}
} else {
this.#read += chunk.byteLength;
controller.enqueue(chunk);
}
},
});
}
}

/** A TransformStream that will only read & enqueue `size` amount of chunks.
*
* if options.error is set, then instead of terminating the stream,
* an error will be thrown.
*
* ```ts
* import { LimitedTransformStream } from "./buffer.ts";
* const res = await fetch("https://example.com");
* const parts = res.body!.pipeThrough(new LimitedTransformStream(50));
* ```
*/
export class LimitedTransformStream<T> extends TransformStream<T, T> {
#read = 0;
constructor(size: number, options: { error?: boolean } = {}) {
super({
transform: (chunk, controller) => {
if ((this.#read + 1) > size) {
if (options.error) {
throw new RangeError(`Exceeded chunk limit of '${size}'`);
} else {
controller.terminate();
}
} else {
this.#read++;
controller.enqueue(chunk);
}
},
});
}
}
104 changes: 98 additions & 6 deletions streams/buffer_test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.

import { assert, assertEquals } from "../testing/asserts.ts";
import { Buffer } from "./buffer.ts";
import { assert, assertEquals, assertRejects } from "../testing/asserts.ts";
import {
Buffer,
LimitedBytesTransformStream,
LimitedTransformStream,
} from "./buffer.ts";

Deno.test("Buffer Write & Read", async function () {
Deno.test("[streams] Buffer Write & Read", async function () {
const buf = new Buffer();
const writer = buf.writable.getWriter();
const reader = buf.readable.getReader({ mode: "byob" });
Expand All @@ -13,26 +17,114 @@ Deno.test("Buffer Write & Read", async function () {
assertEquals(read.value, data);
});

Deno.test("Buffer Read empty", async function () {
Deno.test("[streams] Buffer Read empty", async function () {
const buf = new Buffer();
const reader = buf.readable.getReader({ mode: "byob" });
const read = await reader.read(new Uint8Array(5));
assert(read.done);
assertEquals(read.value!.byteLength, 0);
});

Deno.test("Buffer Write & get bytes", async function () {
Deno.test("[streams] Buffer Write & get bytes", async function () {
const buf = new Buffer();
const writer = buf.writable.getWriter();
const data = new Uint8Array([4, 21, 45, 19]);
await writer.write(data);
assertEquals(buf.bytes(), data);
});

Deno.test("Buffer truncate", async function () {
Deno.test("[streams] Buffer truncate", async function () {
const buf = new Buffer();
const writer = buf.writable.getWriter();
await writer.write(new Uint8Array([4, 21, 45, 19]));
buf.truncate(3);
assertEquals(buf.bytes(), new Uint8Array([4, 21, 45]));
});

Deno.test("[streams] LimitedBytesTransformStream", async function () {
const r = new ReadableStream({
start(controller) {
controller.enqueue(new Uint8Array([1, 2, 3]));
controller.enqueue(new Uint8Array([4, 5, 6]));
controller.enqueue(new Uint8Array([7, 8, 9]));
controller.enqueue(new Uint8Array([10, 11, 12]));
controller.enqueue(new Uint8Array([13, 14, 15]));
controller.enqueue(new Uint8Array([16, 17, 18]));
controller.close();
},
});

const chunks = [];
for await (const chunk of r.pipeThrough(new LimitedBytesTransformStream(7))) {
chunks.push(chunk);
}
assertEquals(chunks.length, 2);
});

Deno.test("[streams] LimitedBytesTransformStream error", async function () {
const r = new ReadableStream({
start(controller) {
controller.enqueue(new Uint8Array([1, 2, 3]));
controller.enqueue(new Uint8Array([4, 5, 6]));
controller.enqueue(new Uint8Array([7, 8, 9]));
controller.enqueue(new Uint8Array([10, 11, 12]));
controller.enqueue(new Uint8Array([13, 14, 15]));
controller.enqueue(new Uint8Array([16, 17, 18]));
controller.close();
},
});

await assertRejects(async () => {
for await (
const _chunk of r.pipeThrough(
new LimitedBytesTransformStream(7, { error: true }),
)
) {
// needed to read
}
}, RangeError);
});

Deno.test("[streams] LimitedTransformStream", async function () {
const r = new ReadableStream({
start(controller) {
controller.enqueue("foo");
controller.enqueue("foo");
controller.enqueue("foo");
controller.enqueue("foo");
controller.enqueue("foo");
controller.enqueue("foo");
controller.close();
},
});

const chunks = [];
for await (const chunk of r.pipeThrough(new LimitedTransformStream(3))) {
chunks.push(chunk);
}
assertEquals(chunks.length, 3);
});

Deno.test("[streams] LimitedTransformStream error", async function () {
const r = new ReadableStream({
start(controller) {
controller.enqueue("foo");
controller.enqueue("foo");
controller.enqueue("foo");
controller.enqueue("foo");
controller.enqueue("foo");
controller.enqueue("foo");
controller.close();
},
});

await assertRejects(async () => {
for await (
const _chunk of r.pipeThrough(
new LimitedTransformStream(3, { error: true }),
)
) {
// needed to read
}
}, RangeError);
});
1 change: 1 addition & 0 deletions streams/mod.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.

export * from "./buffer.ts";
export * from "./conversion.ts";
export * from "./delimiter.ts";
export * from "./merge.ts";