From d5dd5b4477001f46bf0806ae729d80df719f99bf Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Thu, 23 Jan 2025 09:36:52 +0100 Subject: [PATCH] worker: refactor stdio to improve performance Signed-off-by: Matteo Collina PR-URL: https://github.com/nodejs/node/pull/56630 Reviewed-By: Yagiz Nizipli Reviewed-By: Robert Nagy Reviewed-By: Paolo Insogna Reviewed-By: Luigi Pinca --- lib/internal/worker.js | 6 ++++-- lib/internal/worker/io.js | 35 ++++++++++++++++++++++------------- 2 files changed, 26 insertions(+), 15 deletions(-) diff --git a/lib/internal/worker.js b/lib/internal/worker.js index dcc9651802c4e4..e5a2cd06892c75 100644 --- a/lib/internal/worker.js +++ b/lib/internal/worker.js @@ -340,9 +340,11 @@ class Worker extends EventEmitter { { const { stream, chunks } = message; const readable = this[kParentSideStdio][stream]; - ArrayPrototypeForEach(chunks, ({ chunk, encoding }) => { + // This is a hot path, use a for(;;) loop + for (let i = 0; i < chunks.length; i++) { + const { chunk, encoding } = chunks[i]; readable.push(chunk, encoding); - }); + } return; } case messageTypes.STDIO_WANTS_MORE_DATA: diff --git a/lib/internal/worker/io.js b/lib/internal/worker/io.js index 2b28c6a2487b11..29c7914982b67a 100644 --- a/lib/internal/worker/io.js +++ b/lib/internal/worker/io.js @@ -1,9 +1,7 @@ 'use strict'; const { - ArrayPrototypeForEach, - ArrayPrototypeMap, - ArrayPrototypePush, + Array, FunctionPrototypeBind, FunctionPrototypeCall, ObjectAssign, @@ -77,7 +75,7 @@ const kOnMessage = Symbol('kOnMessage'); const kOnMessageError = Symbol('kOnMessageError'); const kPort = Symbol('kPort'); const kWaitingStreams = Symbol('kWaitingStreams'); -const kWritableCallbacks = Symbol('kWritableCallbacks'); +const kWritableCallback = Symbol('kWritableCallback'); const kStartedReading = Symbol('kStartedReading'); const kStdioWantsMoreDataCallback = Symbol('kStdioWantsMoreDataCallback'); const kCurrentlyReceivingPorts = @@ -282,20 +280,29 @@ class WritableWorkerStdio extends Writable { super({ decodeStrings: false }); this[kPort] = port; this[kName] = name; - this[kWritableCallbacks] = []; + this[kWritableCallback] = null; } _writev(chunks, cb) { + const toSend = new Array(chunks.length); + + // We avoid .map() because it's a hot path + for (let i = 0; i < chunks.length; i++) { + const { chunk, encoding } = chunks[i]; + toSend[i] = { chunk, encoding }; + } + this[kPort].postMessage({ type: messageTypes.STDIO_PAYLOAD, stream: this[kName], - chunks: ArrayPrototypeMap(chunks, - ({ chunk, encoding }) => ({ chunk, encoding })), + chunks: toSend, }); if (process._exiting) { cb(); } else { - ArrayPrototypePush(this[kWritableCallbacks], cb); + // Only one writev happens at any given time, + // so we can safely overwrite the callback. + this[kWritableCallback] = cb; if (this[kPort][kWaitingStreams]++ === 0) this[kPort].ref(); } @@ -311,11 +318,13 @@ class WritableWorkerStdio extends Writable { } [kStdioWantsMoreDataCallback]() { - const cbs = this[kWritableCallbacks]; - this[kWritableCallbacks] = []; - ArrayPrototypeForEach(cbs, (cb) => cb()); - if ((this[kPort][kWaitingStreams] -= cbs.length) === 0) - this[kPort].unref(); + const cb = this[kWritableCallback]; + if (cb) { + this[kWritableCallback] = null; + cb(); + if (--this[kPort][kWaitingStreams] === 0) + this[kPort].unref(); + } } }