From e2e615e87e52773274619d0167716a766c16ac64 Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Sun, 5 Jun 2016 13:28:11 +0200 Subject: [PATCH] stream: reset awaitDrain after manual .resume() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reset the `readableState.awaitDrain` counter after manual calls to `.resume()`. What might happen otherwise is that a slow consumer at the end of the pipe could end up stalling the piping in the following scenario: 1. The writable stream indicates that its buffer is full. 2. This leads the readable stream to `pause()` and increase its `awaitDrain` counter, which will be decreased by the writable’s next `drain` event. 3. Something calls `.resume()` manually. 4. The readable continues to pipe to the writable, but once again the writable stream indicates that the buffer is full. 5. The `awaitDrain` counter is thus increased again, but since it has now been increased twice for a single piping destination, the next `drain` event will not be able to reset `awaitDrain` to zero. 6. The pipe is stalled and no data is passed along anymore. The solution in this commit is to reset the `awaitDrain` counter to zero when `resume()` is called. Fixes: https://github.com/nodejs/node/issues/7159 PR-URL: https://github.com/nodejs/node/pull/7160 Reviewed-By: Matteo Collina Reviewed-By: James M Snell --- lib/_stream_readable.js | 1 + ...t-stream-pipe-await-drain-manual-resume.js | 54 +++++++++++++++++++ 2 files changed, 55 insertions(+) create mode 100644 test/parallel/test-stream-pipe-await-drain-manual-resume.js diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index afe3b3baf065c2..dde1eb500a782d 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -740,6 +740,7 @@ function resume_(stream, state) { } state.resumeScheduled = false; + state.awaitDrain = 0; stream.emit('resume'); flow(stream); if (state.flowing && !state.reading) diff --git a/test/parallel/test-stream-pipe-await-drain-manual-resume.js b/test/parallel/test-stream-pipe-await-drain-manual-resume.js new file mode 100644 index 00000000000000..33540b47b3d5e3 --- /dev/null +++ b/test/parallel/test-stream-pipe-await-drain-manual-resume.js @@ -0,0 +1,54 @@ +'use strict'; +const common = require('../common'); +const stream = require('stream'); + +// A consumer stream with a very low highWaterMark, which starts in a state +// where it buffers the chunk it receives rather than indicating that they +// have been consumed. +const writable = new stream.Writable({ + highWaterMark: 5 +}); + +let isCurrentlyBufferingWrites = true; +const queue = []; + +writable._write = (chunk, encoding, cb) => { + if (isCurrentlyBufferingWrites) + queue.push({chunk, cb}); + else + cb(); +}; + +const readable = new stream.Readable({ + read() {} +}); + +readable.pipe(writable); + +readable.once('pause', common.mustCall(() => { + // First pause, resume manually. The next write() to writable will still + // return false, because chunks are still being buffered, so it will increase + // the awaitDrain counter again. + process.nextTick(common.mustCall(() => { + readable.resume(); + })); + + readable.once('pause', common.mustCall(() => { + // Second pause, handle all chunks from now on. Once all callbacks that + // are currently queued up are handled, the awaitDrain drain counter should + // fall back to 0 and all chunks that are pending on the readable side + // should be flushed. + isCurrentlyBufferingWrites = false; + for (const queued of queue) + queued.cb(); + })); +})); + +readable.push(Buffer.alloc(100)); // Fill the writable HWM, first 'pause'. +readable.push(Buffer.alloc(100)); // Second 'pause'. +readable.push(Buffer.alloc(100)); // Should get through to the writable. +readable.push(null); + +writable.on('finish', common.mustCall(() => { + // Everything okay, all chunks were written. +}));