From 9f720450907091ee456947a63a94fe6dd426c1db Mon Sep 17 00:00:00 2001 From: Yagiz Nizipli Date: Wed, 16 Nov 2022 14:02:27 -0500 Subject: [PATCH] stream: add fast-path for readable streams --- benchmark/streams/readable-encoding.js | 35 ++++++++++++++++++++++++++ lib/internal/streams/readable.js | 15 ++++++++++- 2 files changed, 49 insertions(+), 1 deletion(-) create mode 100644 benchmark/streams/readable-encoding.js diff --git a/benchmark/streams/readable-encoding.js b/benchmark/streams/readable-encoding.js new file mode 100644 index 00000000000000..fe96efda3b5a7c --- /dev/null +++ b/benchmark/streams/readable-encoding.js @@ -0,0 +1,35 @@ +'use strict'; + +const common = require('../common'); +const Readable = require('stream').Readable; + +const BASE = 'hello world\n\n'; + +const bench = common.createBenchmark(main, { + encoding: ['utf-8', 'latin1'], + len: [256, 512, 1024 * 16], + op: ['unshift', 'push'], + n: [1e3] +}); + +function main({ n, encoding, len, op }) { + const b = BASE.repeat(len); + const s = new Readable({ + objectMode: false, + }); + + bench.start(); + switch (op) { + case 'unshift': { + for (let i = 0; i < n; i++) + s.unshift(b, encoding); + break; + } + case 'push': { + for (let i = 0; i < n; i++) + s.push(b, encoding); + break; + } + } + bench.end(n); +} diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index aad4c594501ba6..ece55df73cabad 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -42,6 +42,8 @@ const EE = require('events'); const { Stream, prependListener } = require('internal/streams/legacy'); const { Buffer } = require('buffer'); +const { TextEncoder } = require('internal/encoding'); + const { addAbortSignal, } = require('internal/streams/add-abort-signal'); @@ -73,6 +75,9 @@ const kPaused = Symbol('kPaused'); const { StringDecoder } = require('string_decoder'); const from = require('internal/streams/from'); +const { normalizeEncoding } = require('internal/util'); +const { FastBuffer } = require('internal/buffer'); +const encoder = new TextEncoder(); ObjectSetPrototypeOf(Readable.prototype, Stream.prototype); ObjectSetPrototypeOf(Readable, Stream); @@ -251,9 +256,17 @@ function readableAddChunk(stream, chunk, encoding, addToFront) { if (addToFront && state.encoding) { // When unshifting, if state.encoding is set, we have to save // the string in the BufferList with the state encoding. + chunk = Buffer.from(chunk, encoding).toString(state.encoding); } else { - chunk = Buffer.from(chunk, encoding); + const enc = normalizeEncoding(encoding); + + if (enc === 'utf8') { + const uint8 = encoder.encode(chunk); + chunk = new FastBuffer(uint8, 0, uint8.length); + } else { + chunk = Buffer.from(chunk, encoding); + } encoding = ''; } }