Skip to content

Commit

Permalink
streams: add stream.pipe
Browse files Browse the repository at this point in the history
pipe is similar to pipeline however it supports stream composition.
  • Loading branch information
ronag committed Jun 14, 2021
1 parent 4e17ffc commit 46ddf18
Showing 1 changed file with 56 additions and 0 deletions.
56 changes: 56 additions & 0 deletions lib/internal/streams/pipe.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
'use strict';

const pipeline = require('internal/streams/pipeline');
const Duplex = require('internal/streams/duplex');

module.exports = function pipe(...streams) {
let cb;
let ret;

const r = pipeline(streams, function(err) {
if (cb) {
cb(err);
} else {
ret.destroy(err);
}
});
const w = streams[0];

ret = new Duplex({
writable: !!w?.writable,
readable: !!r?.readable,
objectMode: streams[0].readableObjectMode,
highWaterMark: 1
});

if (ret.writable) {
ret._write = function(chunk, encoding, callback) {
w.write(chunk, encoding, callback);
};

ret._final = function(chunk, encoding, callback) {
w.end(chunk, encoding, callback);
};
}

if (ret.readable) {
ret._read = function() {
r.resume();
};

r
.on('data', function(buf) {
if (!ret.push(buf)) {
this.pause();
}
})
.on('end', function() {
ret.push(null);
});
}

ret._destroy = function(err, callback) {
cb = callback;
streams[0].destroy(err);
};
}

0 comments on commit 46ddf18

Please sign in to comment.