Skip to content

Commit

Permalink
stream: add support for captureRejection option
Browse files Browse the repository at this point in the history
PR-URL: #27867
Reviewed-By: Benjamin Gruenbaum <[email protected]>
Reviewed-By: James M Snell <[email protected]>
Reviewed-By: Jeremiah Senkpiel <[email protected]>
Reviewed-By: Anna Henningsen <[email protected]>
Reviewed-By: Michaël Zasso <[email protected]>
  • Loading branch information
mcollina authored and targos committed Jan 14, 2020
1 parent 4265d57 commit fa544c8
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 4 deletions.
10 changes: 9 additions & 1 deletion lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ function Readable(options) {
this._destroy = options.destroy;
}

Stream.call(this);
Stream.call(this, options);
}

ObjectDefineProperty(Readable.prototype, 'destroyed', {
Expand Down Expand Up @@ -223,6 +223,14 @@ Readable.prototype._destroy = function(err, cb) {
cb(err);
};

Readable.prototype[EE.captureRejectionSymbol] = function(err) {
// TODO(mcollina): remove the destroyed if once errorEmitted lands in
// Readable.
if (!this.destroyed) {
this.destroy(err);
}
};

// Manually shove something into the read() buffer.
// This returns true if the highWaterMark has not been hit yet,
// similar to how Writable.write() returns true if you should
Expand Down
7 changes: 6 additions & 1 deletion lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ module.exports = Writable;
Writable.WritableState = WritableState;

const internalUtil = require('internal/util');
const EE = require('events');
const Stream = require('stream');
const { Buffer } = require('buffer');
const destroyImpl = require('internal/streams/destroy');
Expand Down Expand Up @@ -250,7 +251,7 @@ function Writable(options) {
this._final = options.final;
}

Stream.call(this);
Stream.call(this, options);
}

// Otherwise people can pipe Writable streams, which is just wrong.
Expand Down Expand Up @@ -782,3 +783,7 @@ Writable.prototype._undestroy = destroyImpl.undestroy;
Writable.prototype._destroy = function(err, cb) {
cb(err);
};

Writable.prototype[EE.captureRejectionSymbol] = function(err) {
this.destroy(err);
};
4 changes: 2 additions & 2 deletions lib/internal/streams/legacy.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ const {

const EE = require('events');

function Stream() {
EE.call(this);
function Stream(opts) {
EE.call(this, opts);
}
ObjectSetPrototypeOf(Stream.prototype, EE.prototype);
ObjectSetPrototypeOf(Stream, EE);
Expand Down
52 changes: 52 additions & 0 deletions test/parallel/test-stream-catch-rejections.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
'use strict';

const common = require('../common');
const stream = require('stream');
const assert = require('assert');

{
const r = new stream.Readable({
captureRejections: true,
read() {
this.push('hello');
this.push('world');
this.push(null);
}
});

const err = new Error('kaboom');

r.on('error', common.mustCall((_err) => {
assert.strictEqual(err, _err);
assert.strictEqual(r.destroyed, true);
}));

r.on('data', async () => {
throw err;
});
}

{
const w = new stream.Writable({
captureRejections: true,
highWaterMark: 1,
write(chunk, enc, cb) {
cb();
}
});

const err = new Error('kaboom');

w.write('hello', () => {
w.write('world');
});

w.on('error', common.mustCall((_err) => {
assert.strictEqual(err, _err);
assert.strictEqual(w.destroyed, true);
}));

w.on('drain', common.mustCall(async () => {
throw err;
}, 2));
}

0 comments on commit fa544c8

Please sign in to comment.