Skip to content

Commit

Permalink
worker: flush stdout and stderr on exit
Browse files Browse the repository at this point in the history
Signed-off-by: Matteo Collina <[email protected]>
PR-URL: nodejs#56428
Reviewed-By: James M Snell <[email protected]>
Reviewed-By: Paolo Insogna <[email protected]>
  • Loading branch information
mcollina authored Jan 6, 2025
1 parent b736028 commit b0c65bb
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 4 deletions.
13 changes: 12 additions & 1 deletion lib/internal/bootstrap/switches/is_not_main_thread.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,22 @@ process.removeListener('removeListener', stopListeningIfSignal);

const {
createWorkerStdio,
kStdioWantsMoreDataCallback,
} = require('internal/worker/io');

let workerStdio;
function lazyWorkerStdio() {
return workerStdio ??= createWorkerStdio();
if (workerStdio === undefined) {
workerStdio = createWorkerStdio();
process.on('exit', flushSync);
}

return workerStdio;
}

function flushSync() {
workerStdio.stdout[kStdioWantsMoreDataCallback]();
workerStdio.stderr[kStdioWantsMoreDataCallback]();
}

function getStdout() { return lazyWorkerStdio().stdout; }
Expand Down
10 changes: 7 additions & 3 deletions lib/internal/worker/io.js
Original file line number Diff line number Diff line change
Expand Up @@ -292,9 +292,13 @@ class WritableWorkerStdio extends Writable {
chunks: ArrayPrototypeMap(chunks,
({ chunk, encoding }) => ({ chunk, encoding })),
});
ArrayPrototypePush(this[kWritableCallbacks], cb);
if (this[kPort][kWaitingStreams]++ === 0)
this[kPort].ref();
if (process._exiting) {
cb();
} else {
ArrayPrototypePush(this[kWritableCallbacks], cb);
if (this[kPort][kWaitingStreams]++ === 0)
this[kPort].ref();
}
}

_final(cb) {
Expand Down
24 changes: 24 additions & 0 deletions test/parallel/test-worker-stdio-flush-inflight.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const { Worker, isMainThread } = require('worker_threads');

if (isMainThread) {
const w = new Worker(__filename, { stdout: true });
const expected = 'hello world';

let data = '';
w.stdout.setEncoding('utf8');
w.stdout.on('data', (chunk) => {
data += chunk;
});

w.on('exit', common.mustCall(() => {
assert.strictEqual(data, expected);
}));
} else {
process.stdout.write('hello');
process.stdout.write(' ');
process.stdout.write('world');
process.exit(0);
}
25 changes: 25 additions & 0 deletions test/parallel/test-worker-stdio-flush.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const { Worker, isMainThread } = require('worker_threads');

if (isMainThread) {
const w = new Worker(__filename, { stdout: true });
const expected = 'hello world';

let data = '';
w.stdout.setEncoding('utf8');
w.stdout.on('data', (chunk) => {
data += chunk;
});

w.on('exit', common.mustCall(() => {
assert.strictEqual(data, expected);
}));
} else {
process.on('exit', () => {
process.stdout.write(' ');
process.stdout.write('world');
});
process.stdout.write('hello');
}

0 comments on commit b0c65bb

Please sign in to comment.