Skip to content

Commit

Permalink
stream: fix pipe deadlock when starting with needDrain
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Dec 18, 2020
1 parent 743ee9d commit 0af4317
Showing 1 changed file with 28 additions and 24 deletions.
52 changes: 28 additions & 24 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -716,35 +716,39 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
ondrain();
}

function pause() {
// If the user unpiped during `dest.write()`, it is possible
// to get stuck in a permanently paused state if that write
// also returned false.
// => Check whether `dest` is still a piping destination.
if (!cleanedUp) {
if (state.pipes.length === 1 && state.pipes[0] === dest) {
debug('false write response, pause', 0);
state.awaitDrainWriters = dest;
state.multiAwaitDrain = false;
} else if (state.pipes.length > 1 && state.pipes.includes(dest)) {
debug('false write response, pause', state.awaitDrainWriters.size);
state.awaitDrainWriters.add(dest);
}
src.pause();
}
if (!ondrain) {
// When the dest drains, it reduces the awaitDrain counter
// on the source. This would be more elegant with a .once()
// handler in flow(), but adding and removing repeatedly is
// too slow.
ondrain = pipeOnDrain(src, dest);
dest.on('drain', ondrain);
}
}

src.on('data', ondata);
function ondata(chunk) {
debug('ondata');
const ret = dest.write(chunk);
debug('dest.write', ret);
if (ret === false) {
// If the user unpiped during `dest.write()`, it is possible
// to get stuck in a permanently paused state if that write
// also returned false.
// => Check whether `dest` is still a piping destination.
if (!cleanedUp) {
if (state.pipes.length === 1 && state.pipes[0] === dest) {
debug('false write response, pause', 0);
state.awaitDrainWriters = dest;
state.multiAwaitDrain = false;
} else if (state.pipes.length > 1 && state.pipes.includes(dest)) {
debug('false write response, pause', state.awaitDrainWriters.size);
state.awaitDrainWriters.add(dest);
}
src.pause();
}
if (!ondrain) {
// When the dest drains, it reduces the awaitDrain counter
// on the source. This would be more elegant with a .once()
// handler in flow(), but adding and removing repeatedly is
// too slow.
ondrain = pipeOnDrain(src, dest);
dest.on('drain', ondrain);
}
pause();
}
}

Expand Down Expand Up @@ -793,7 +797,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {

if (dest.writableNeedDrain === true) {
if (state.flowing) {
src.pause();
pause();
}
} else if (!state.flowing) {
debug('pipe resume');
Expand Down

0 comments on commit 0af4317

Please sign in to comment.