Skip to content

Commit

Permalink
stream: add fast-path for readable streams
Browse files Browse the repository at this point in the history
  • Loading branch information
anonrig committed Nov 16, 2022
1 parent 7675749 commit 9f72045
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 1 deletion.
35 changes: 35 additions & 0 deletions benchmark/streams/readable-encoding.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
'use strict';

const common = require('../common');
const Readable = require('stream').Readable;

const BASE = 'hello world\n\n';

const bench = common.createBenchmark(main, {
encoding: ['utf-8', 'latin1'],
len: [256, 512, 1024 * 16],
op: ['unshift', 'push'],
n: [1e3]
});

function main({ n, encoding, len, op }) {
const b = BASE.repeat(len);
const s = new Readable({
objectMode: false,
});

bench.start();
switch (op) {
case 'unshift': {
for (let i = 0; i < n; i++)
s.unshift(b, encoding);
break;
}
case 'push': {
for (let i = 0; i < n; i++)
s.push(b, encoding);
break;
}
}
bench.end(n);
}
15 changes: 14 additions & 1 deletion lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ const EE = require('events');
const { Stream, prependListener } = require('internal/streams/legacy');
const { Buffer } = require('buffer');

const { TextEncoder } = require('internal/encoding');

const {
addAbortSignal,
} = require('internal/streams/add-abort-signal');
Expand Down Expand Up @@ -73,6 +75,9 @@ const kPaused = Symbol('kPaused');

const { StringDecoder } = require('string_decoder');
const from = require('internal/streams/from');
const { normalizeEncoding } = require('internal/util');
const { FastBuffer } = require('internal/buffer');
const encoder = new TextEncoder();

ObjectSetPrototypeOf(Readable.prototype, Stream.prototype);
ObjectSetPrototypeOf(Readable, Stream);
Expand Down Expand Up @@ -251,9 +256,17 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {
if (addToFront && state.encoding) {
// When unshifting, if state.encoding is set, we have to save
// the string in the BufferList with the state encoding.

chunk = Buffer.from(chunk, encoding).toString(state.encoding);
} else {
chunk = Buffer.from(chunk, encoding);
const enc = normalizeEncoding(encoding);

if (enc === 'utf8') {
const uint8 = encoder.encode(chunk);
chunk = new FastBuffer(uint8, 0, uint8.length);
} else {
chunk = Buffer.from(chunk, encoding);
}
encoding = '';
}
}
Expand Down

0 comments on commit 9f72045

Please sign in to comment.