Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

child_process: only stop readable side of stream passed to process #27373

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions lib/internal/child_process.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ let freeParser;
let HTTPParser;

const MAX_HANDLE_RETRANSMISSIONS = 3;
const kIsUsedAsStdio = Symbol('kIsUsedAsStdio');

// This object contain function to convert TCP objects to native handle objects
// and back again.
Expand Down Expand Up @@ -278,8 +279,14 @@ function flushStdio(subprocess) {

for (var i = 0; i < stdio.length; i++) {
const stream = stdio[i];
if (!stream || !stream.readable || stream._readableState.readableListening)
// TODO(addaleax): This doesn't necessarily account for all the ways in
// which data can be read from a stream, e.g. being consumed on the
// native layer directly as a StreamBase.
if (!stream || !stream.readable ||
stream._readableState.readableListening ||
stream[kIsUsedAsStdio]) {
continue;
}
stream.resume();
}
}
Expand Down Expand Up @@ -384,12 +391,16 @@ ChildProcess.prototype.spawn = function(options) {
continue;
}

// The stream is already cloned and piped, thus close it.
// The stream is already cloned and piped, thus stop its readable side,
// otherwise we might attempt to read from the stream when at the same time
// the child process does.
if (stream.type === 'wrap') {
stream.handle.close();
if (stream._stdio && stream._stdio instanceof EventEmitter) {
stream._stdio.emit('close');
}
stream.handle.reading = false;
stream.handle.readStop();
stream._stdio.pause();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of this, we could also use stream._stdio.push(null) and disable reading permanently, instead of until it’s explicitly started again. Any thoughts on that? Generally, the use case of “let another process read from this pipe, once that process is done we’ll continue reading ourselves” is not absurd or anything – shell scripts do that a lot.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I never saw it myself but I can understand why just pausing the stream might be useful for piping logs or similar if the process crashes. If there's no technical drawback it sounds strictly better in term of enabled use cases, compared to closing it for good.

A few questions to better understand how it works:

  • What does readStop do, particularly on writable streams? Is it just a noop?
  • Maybe related, what does pause exactly do? Keep the fd open but stop polling it for data for readable streams, and buffer what's written into it for writable streams?
  • Since you call both pause and readStop, how is it meant to be resumed? Is it just resume(), or is there another function that should be called to counteract readStop?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I never saw it myself but I can understand why just pausing the stream might be useful for piping logs or similar if the process crashes. If there's no technical drawback it sounds strictly better in term of enabled use cases, compared to closing it for good.

@arcanis Okay, thanks! I don’t think there are any technical drawbacks to this.

A few questions to better understand how it works:

  • What does readStop do, particularly on writable streams? Is it just a noop?

It stops libuv from listening for readable events on the fd until readStart() is called again. In partiuclar, no read() system call will occur during that time.

  • Maybe related, what does pause exactly do? Keep the fd open but stop polling it for data for readable streams, and buffer what's written into it for writable streams?

pause() makes the stream stop emitting 'data' events. According to the documentation, it does not stop emitting readable events, but I guess in that case it’s up to the user to take care of not calling .read() anyway.

  • Since you call both pause and readStop, how is it meant to be resumed? Is it just resume(), or is there another function that should be called to counteract readStop?

Thanks for pointing this out – yes, one thing more is necessary here: Adding stream.handle.reading = false;, so that ._read() will call readStart() again:

node/lib/net.js

Line 522 in eac8f50

} else if (!this._handle.reading) {
, as well as resetting the stream state so that _read() actually gets called once we resume the stream. (None of these things are signs of a great streams implementation… we should probably get rid of handle.reading altogether, but that’s not really a concern for this PR.)

(Both calling .resume() explicitly and using .on('data') to start reading again should work.)

I’ve added those things, and also added a test file for this scenario.

Copy link
Contributor

@arcanis arcanis Apr 23, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a lot of flags, eheh! 😄

Btw, why isn't readStop enough? If the libuv doesn't listen for readable events anymore, I guess that the data events won't be triggered, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@arcanis The issue that I’m seeing is, that at the time that this code is executed, there might be an existing 'data' listener or similar that reads data from the stream. When the readable stream buffer drains, the streams implementation calls _read(), and for sockets that leads to a handle.readStart() call, undoing the effects of the handle.readStop() call here.

I mean, sure, it’s somewhat questionable to just ignore existing listeners that read data from the stream, but at this point I feel like that’s a reasonable solution?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, that makes sense, thanks for the explanation!

stream._stdio.readableFlowing = false;
stream._stdio._readableState.reading = false;
stream._stdio[kIsUsedAsStdio] = true;
continue;
}

Expand Down
4 changes: 4 additions & 0 deletions test/parallel/test-child-process-pipe-dataflow.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ const MB = KB * KB;
grep = spawn('grep', ['x'], { stdio: [cat.stdout, 'pipe', 'pipe'] });
wc = spawn('wc', ['-c'], { stdio: [grep.stdout, 'pipe', 'pipe'] });

// Extra checks: We never try to start reading data ourselves.
cat.stdout._handle.readStart = common.mustNotCall();
grep.stdout._handle.readStart = common.mustNotCall();

[cat, grep, wc].forEach((child, index) => {
child.stderr.on('data', (d) => {
// Don't want to assert here, as we might miss error code info.
Expand Down
6 changes: 3 additions & 3 deletions test/parallel/test-child-process-server-close.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ const tmpdir = require('../common/tmpdir');
tmpdir.refresh();

const server = net.createServer((conn) => {
conn.on('close', common.mustCall());

spawn(process.execPath, ['-v'], {
stdio: ['ignore', conn, 'ignore']
}).on('close', common.mustCall());
}).on('close', common.mustCall(() => {
conn.end();
}));
}).listen(common.PIPE, () => {
const client = net.connect(common.PIPE, common.mustCall());
client.on('data', () => {
Expand Down
30 changes: 30 additions & 0 deletions test/parallel/test-child-process-stdio-merge-stdouts-into-cat.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const { spawn } = require('child_process');

// Regression test for https://github.com/nodejs/node/issues/27097.
// Check that (cat [p1] ; cat [p2]) | cat [p3] works.

const p3 = spawn('cat', { stdio: ['pipe', 'pipe', 'inherit'] });
const p1 = spawn('cat', { stdio: ['pipe', p3.stdin, 'inherit'] });
const p2 = spawn('cat', { stdio: ['pipe', p3.stdin, 'inherit'] });
p3.stdout.setEncoding('utf8');

// Write three different chunks:
// - 'hello' from this process to p1 to p3 back to us
// - 'world' from this process to p2 to p3 back to us
// - 'foobar' from this process to p3 back to us
// Do so sequentially in order to avoid race conditions.
p1.stdin.end('hello\n');
p3.stdout.once('data', common.mustCall((chunk) => {
assert.strictEqual(chunk, 'hello\n');
p2.stdin.end('world\n');
p3.stdout.once('data', common.mustCall((chunk) => {
assert.strictEqual(chunk, 'world\n');
p3.stdin.end('foobar\n');
p3.stdout.once('data', common.mustCall((chunk) => {
assert.strictEqual(chunk, 'foobar\n');
}));
}));
}));
27 changes: 27 additions & 0 deletions test/parallel/test-child-process-stdio-reuse-readable-stdio.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const { spawn } = require('child_process');

// Check that, once a child process has ended, it’s safe to read from a pipe
// that the child had used as input.
// We simulate that using cat | (head -n1; ...)

const p1 = spawn('cat', { stdio: ['pipe', 'pipe', 'inherit'] });
const p2 = spawn('head', ['-n1'], { stdio: [p1.stdout, 'pipe', 'inherit'] });

// First, write the line that gets passed through p2, making 'head' exit.
p1.stdin.write('hello\n');
p2.stdout.setEncoding('utf8');
p2.stdout.on('data', common.mustCall((chunk) => {
assert.strictEqual(chunk, 'hello\n');
}));
p2.on('exit', common.mustCall(() => {
// We can now use cat’s output, because 'head' is no longer reading from it.
p1.stdin.end('world\n');
p1.stdout.setEncoding('utf8');
p1.stdout.on('data', common.mustCall((chunk) => {
assert.strictEqual(chunk, 'world\n');
}));
p1.stdout.resume();
}));