From cb32f69e005153fac2dc7cda33a6373435e430c2 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 9 Jul 2021 20:18:06 +0200 Subject: [PATCH] stream: cleanup async handling Cleanup async stream method handling. PR-URL: https://github.com/nodejs/node/pull/39329 Reviewed-By: Matteo Collina Reviewed-By: James M Snell --- lib/internal/streams/destroy.js | 130 +++++------------- lib/internal/streams/readable.js | 2 + lib/internal/streams/writable.js | 41 +++--- .../test-stream-construct-async-error.js | 3 + 4 files changed, 63 insertions(+), 113 deletions(-) diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js index a2892c67a0fcfa..6d50b09f0948b4 100644 --- a/lib/internal/streams/destroy.js +++ b/lib/internal/streams/destroy.js @@ -69,12 +69,16 @@ function destroy(err, cb) { function _destroy(self, err, cb) { let called = false; - const result = self._destroy(err || null, (err) => { - const r = self._readableState; - const w = self._writableState; + function onDestroy(err) { + if (called) { + return; + } called = true; + const r = self._readableState; + const w = self._writableState; + checkError(err, w, r); if (w) { @@ -93,64 +97,24 @@ function _destroy(self, err, cb) { } else { process.nextTick(emitCloseNT, self); } - }); - if (result !== undefined && result !== null) { - try { + } + try { + const result = self._destroy(err || null, onDestroy); + if (result != null) { const then = result.then; if (typeof then === 'function') { then.call( result, function() { - if (called) - return; - - const r = self._readableState; - const w = self._writableState; - - if (w) { - w.closed = true; - } - if (r) { - r.closed = true; - } - - if (typeof cb === 'function') { - process.nextTick(cb); - } - - process.nextTick(emitCloseNT, self); + process.nextTick(onDestroy, null); }, function(err) { - const r = self._readableState; - const w = self._writableState; - err.stack; // eslint-disable-line no-unused-expressions - - called = true; - - if (w && !w.errored) { - w.errored = err; - } - if (r && !r.errored) { - r.errored = err; - } - - if (w) { - w.closed = true; - } - if (r) { - r.closed = true; - } - - if (typeof cb === 'function') { - process.nextTick(cb, err); - } - - process.nextTick(emitErrorCloseNT, self, err); + process.nextTick(onDestroy, err); }); } - } catch (err) { - process.nextTick(emitErrorNT, self, err); } + } catch (err) { + onDestroy(err); } } @@ -284,13 +248,19 @@ function construct(stream, cb) { } function constructNT(stream) { - const r = stream._readableState; - const w = stream._writableState; - // With duplex streams we use the writable side for state. - const s = w || r; - let called = false; - const result = stream._construct((err) => { + + function onConstruct(err) { + if (called) { + errorOrDestroy(stream, err ?? new ERR_MULTIPLE_CALLBACK()); + return; + } + called = true; + + const r = stream._readableState; + const w = stream._writableState; + const s = w || r; + if (r) { r.constructed = true; } @@ -298,12 +268,6 @@ function constructNT(stream) { w.constructed = true; } - if (called) { - err = new ERR_MULTIPLE_CALLBACK(); - } else { - called = true; - } - if (s.destroyed) { stream.emit(kDestroy, err); } else if (err) { @@ -311,47 +275,25 @@ function constructNT(stream) { } else { process.nextTick(emitConstructNT, stream); } - }); - if (result !== undefined && result !== null) { - try { + } + + try { + const result = stream._construct(onConstruct); + if (result != null) { const then = result.then; if (typeof then === 'function') { then.call( result, function() { - // If the callback was invoked, do nothing further. - if (called) - return; - if (r) { - r.constructed = true; - } - if (w) { - w.constructed = true; - } - if (s.destroyed) { - process.nextTick(() => stream.emit(kDestroy)); - } else { - process.nextTick(emitConstructNT, stream); - } + process.nextTick(onConstruct, null); }, function(err) { - if (r) { - r.constructed = true; - } - if (w) { - w.constructed = true; - } - called = true; - if (s.destroyed) { - process.nextTick(() => stream.emit(kDestroy, err)); - } else { - process.nextTick(errorOrDestroy, stream, err); - } + process.nextTick(onConstruct, err); }); } - } catch (err) { - process.nextTick(emitErrorNT, stream, err); } + } catch (err) { + onConstruct(err); } } diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index 7f6876599cc7fc..d2d4f19ed3fa5c 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -479,8 +479,10 @@ Readable.prototype.read = function(n) { // If the length is currently zero, then we *need* a readable event. if (state.length === 0) state.needReadable = true; + // Call internal read method this._read(state.highWaterMark); + state.sync = false; // If _read pushed data synchronously, then `reading` will be false, // and we need to re-evaluate how much data we can return to the user. diff --git a/lib/internal/streams/writable.js b/lib/internal/streams/writable.js index 91d1230f1c7d9a..f41cc183f0939a 100644 --- a/lib/internal/streams/writable.js +++ b/lib/internal/streams/writable.js @@ -660,9 +660,15 @@ function needFinish(state) { } function callFinal(stream, state) { - state.sync = true; - state.pendingcb++; - const result = stream._final((err) => { + let called = false; + + function onFinish(err) { + if (called) { + errorOrDestroy(stream, err ?? ERR_MULTIPLE_CALLBACK()); + return; + } + called = true; + state.pendingcb--; if (err) { const onfinishCallbacks = state[kOnFinished].splice(0); @@ -679,33 +685,30 @@ function callFinal(stream, state) { state.pendingcb++; process.nextTick(finish, stream, state); } - }); - if (result !== undefined && result !== null) { - try { + } + + state.sync = true; + state.pendingcb++; + + try { + const result = stream._final(onFinish); + if (result != null) { const then = result.then; if (typeof then === 'function') { then.call( result, function() { - if (state.prefinished || !needFinish(state)) - return; - state.prefinish = true; - process.nextTick(() => stream.emit('prefinish')); - state.pendingcb++; - process.nextTick(finish, stream, state); + process.nextTick(onFinish, null); }, function(err) { - const onfinishCallbacks = state[kOnFinished].splice(0); - for (let i = 0; i < onfinishCallbacks.length; i++) { - process.nextTick(onfinishCallbacks[i], err); - } - process.nextTick(errorOrDestroy, stream, err, state.sync); + process.nextTick(onFinish, err); }); } - } catch (err) { - process.nextTick(errorOrDestroy, stream, err, state.sync); } + } catch (err) { + onFinish(stream, state, err); } + state.sync = false; } diff --git a/test/parallel/test-stream-construct-async-error.js b/test/parallel/test-stream-construct-async-error.js index 34e450c853a850..8101ec93fc0d74 100644 --- a/test/parallel/test-stream-construct-async-error.js +++ b/test/parallel/test-stream-construct-async-error.js @@ -98,6 +98,9 @@ const assert = require('assert'); const foo = new Foo(); foo.write('test', common.mustCall()); + foo.on('error', common.expectsError({ + code: 'ERR_MULTIPLE_CALLBACK' + })); } {