Skip to content

Commit

Permalink
Apply back-pressure to Node.js Readable
Browse files Browse the repository at this point in the history
  • Loading branch information
Borewit committed Feb 10, 2025
1 parent 7650186 commit 04f937c
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 1 deletion.
23 changes: 22 additions & 1 deletion lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ import {ReadableStream, type ReadableByteStreamController, type ReadableStreamBY
export function makeByteReadableStreamFromNodeReadable(nodeReadable: Readable): ReadableStream {
let leftoverChunk: Uint8Array | null = null; // Proper declaration
let isNodeStreamEnded = false;
/**
* Number of bytes in leftoverChunk, after which backpressure is applied
*/
const highWaterMark = 16 * 1024;

const processLeftover = (controller: ReadableByteStreamController) => {
const byobRequest = controller.byobRequest as ReadableStreamBYOBRequest | undefined;
Expand Down Expand Up @@ -49,6 +53,8 @@ export function makeByteReadableStreamFromNodeReadable(nodeReadable: Readable):
}
};

nodeReadable.pause(); // Start in pause mode

return new ReadableStream({
type: 'bytes',
start(controller: ReadableByteStreamController) {
Expand All @@ -57,6 +63,19 @@ export function makeByteReadableStreamFromNodeReadable(nodeReadable: Readable):
? new Uint8Array([...leftoverChunk, ...chunk])
: new Uint8Array(chunk);
processLeftover(controller);
if (!nodeReadable.isPaused()) {
if (controller.desiredSize === null) {
// BYOB Request backpressure
if (leftoverChunk && leftoverChunk.length > highWaterMark && !nodeReadable.isPaused()) {
nodeReadable.pause(); // Apply back pressure
}
} else {
// Default request backpressure
if (controller.desiredSize <= 0) {
nodeReadable.pause(); // Start in pause mode
}
}
}
});
nodeReadable.once('end', () => {
isNodeStreamEnded = true;
Expand All @@ -65,10 +84,12 @@ export function makeByteReadableStreamFromNodeReadable(nodeReadable: Readable):
nodeReadable.once('error', err => {
controller.error(err);
});
nodeReadable.resume();
},
pull(controller: ReadableByteStreamController) {
processLeftover(controller);
if (nodeReadable.isPaused()) {
nodeReadable.resume();
}
},
cancel(reason) {
nodeReadable.destroy(reason);
Expand Down
89 changes: 89 additions & 0 deletions test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import path from 'node:path';
import {describe, it} from "mocha";
import {assert} from "chai";
import {fileURLToPath} from "node:url";
import {PassThrough} from "node:stream";
import {makeByteReadableStreamFromFile, SourceStream} from "./util.js";

const filename = fileURLToPath(import.meta.url);
Expand Down Expand Up @@ -94,6 +95,49 @@ describe('ReadableStreamDefaultReader', () => {
nodeReadable.destroy();
}
});

it('should apply backpressure, when data is not consumed', async () => {
const nodePassThrough = new PassThrough();
try {
const webReadableStream = makeByteReadableStreamFromNodeReadable(nodePassThrough);
try {
const streamReader = webReadableStream.getReader();
try {
let result;
let bytesWritten = 0;
const data = new Uint8Array(256);
let pushMoreData;
assert.isTrue(nodePassThrough.isPaused(), 'Node Readable is paused');
do {
pushMoreData = nodePassThrough.push(data);
bytesWritten += data.length;
} while(pushMoreData && bytesWritten < 32 * 1024);
assert.isTrue(nodePassThrough.isPaused(), 'Node Readable is paused');

let bytesRead = 0;
do {
const {value, done} = await streamReader.read();
assert.isFalse(done, 'Read stream result');
bytesRead += value.length;
} while(bytesRead < bytesWritten);
const prom = streamReader.read(); // Read more than there is available
assert.isFalse(nodePassThrough.isPaused(), 'Node Readable is paused after reading all bytes written');
do {
pushMoreData = nodePassThrough.push(data);
bytesWritten += data.length;
} while(pushMoreData && bytesWritten < 32 * 1024);
await prom;
assert.isTrue(nodePassThrough.isPaused(), 'Node Readable is paused');
} finally {
streamReader.releaseLock();
}
} finally {
await webReadableStream.cancel();
}
} finally {
nodePassThrough.destroy();
}
});
});

describe('ReadableStreamBYOBReader', () => {
Expand Down Expand Up @@ -193,5 +237,50 @@ describe('ReadableStreamBYOBReader', () => {
nodeReadable.destroy();
}
});

it('should apply backpressure, when data is not consumed', async () => {
const nodePassThrough = new PassThrough();
try {
const webReadableStream = makeByteReadableStreamFromNodeReadable(nodePassThrough);
try {
const streamReader = webReadableStream.getReader({mode: 'byob'});
try {
let result;
let bytesWritten = 0;
const data = new Uint8Array(256);
let pushMoreData;
assert.isTrue(nodePassThrough.isPaused(), 'Node Readable is paused');
do {
pushMoreData = nodePassThrough.push(data);
bytesWritten += data.length;
} while(pushMoreData && bytesWritten < 32 * 1024);
assert.isTrue(nodePassThrough.isPaused(), 'Node Readable is paused');

let bytesRead = 0;
do {
const buf = new Uint8Array(256);
const {value, done} = await streamReader.read(buf);
assert.isFalse(done, 'Read stream result');
bytesRead += value.length;
} while(bytesRead < bytesWritten);
const buf = new Uint8Array(256);
const prom = streamReader.read(buf); // Read more than there is available
assert.isFalse(nodePassThrough.isPaused(), 'Node Readable is paused after reading all bytes written');
do {
pushMoreData = nodePassThrough.push(data);
bytesWritten += data.length;
} while(pushMoreData && bytesWritten < 32 * 1024);
await prom;
assert.isTrue(nodePassThrough.isPaused(), 'Node Readable is paused');
} finally {
streamReader.releaseLock();
}
} finally {
await webReadableStream.cancel();
}
} finally {
nodePassThrough.destroy();
}
});
});

0 comments on commit 04f937c

Please sign in to comment.