From b647de01d1631d0096daf07d3ed6758d40346d11 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Fri, 5 Aug 2016 13:23:34 +0200 Subject: [PATCH 1/2] start moving things --- .gitignore | 1 + client.js | 54 +++++++++++++++++++++++++++++-------------------- handle.js | 1 - server.js | 57 ++++++++++++++++++++++++++++++++-------------------- test/echo.js | 1 - 5 files changed, 68 insertions(+), 46 deletions(-) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b512c09 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +node_modules \ No newline at end of file diff --git a/client.js b/client.js index c2c296e..877ef4b 100644 --- a/client.js +++ b/client.js @@ -1,33 +1,43 @@ var Handle = require('./handle') -var TCP = process.binding('tcp_wrap').TCP; -var TCPConnectWrap = process.binding('tcp_wrap').TCPConnectWrap; +var TCP = process.binding('tcp_wrap').TCP +var TCPConnectWrap = process.binding('tcp_wrap').TCPConnectWrap +var pull = require('pull-stream') +var net = require('net') module.exports = function (port, address, cb) { + port |= 0 var clientHandle = new TCP() - var connect = new TCPConnectWrap(), stream + var connect = new TCPConnectWrap() + var stream - connect.oncomplete = function (err) { - if(err) return cb(new Error('error connecting 1:'+err)) + connect.port = port + connect.address = address + connect.oncomplete = function afterConnect (err) { + if (err) return cb(new Error('error connecting 1:' + err)) cb && cb(null, stream) } - var err = clientHandle.connect(connect, address, port); - - stream = err ? Handle(clientHandle, function () {}) : error.duplex(err) - if(!err) return Handle(clientHandle, function () {}) - if(err) return cb(new Error('error connecting 2:'+err)) + var err + if (net.isIPv4) { + err = clientHandle.connect(connect, address, port) + } else { + err = clientHandle.connect6(connect, address, port) + } -//so, I could actually return the client stream syncly. + // stream = err ? Handle(clientHandle, function () {}) : pull.error(err) + // if (!err) return Handle(clientHandle, function () {}) + // if (err) return cb(new Error('error connecting 2:' + err)) + // so, I could actually return the client stream syncly. -// -// if(err) { -// console.log("ERROR", err) -// err = new Error('connection failed:'+err) -// return { -// source: Error(err), -// sink: function (read) {read(err, cb)} -// } -// } -// return Handle(clientHandle, cb) + if (err) { + console.log('ERROR', err) + err = new Error('connection failed: ' + err) + return { + source: pull.error(err), + sink: function () { + return function (read) { read(err, cb) } + } + } + } + return Handle(clientHandle, cb) } - diff --git a/handle.js b/handle.js index 152cc86..34e5e5d 100644 --- a/handle.js +++ b/handle.js @@ -65,4 +65,3 @@ module.exports = function (handle, cb) { } } } - diff --git a/server.js b/server.js index 0cb3d52..2cbf573 100644 --- a/server.js +++ b/server.js @@ -1,40 +1,53 @@ -var TCP = process.binding('tcp_wrap').TCP; +var TCP = process.binding('tcp_wrap').TCP +var net = require('net') var Handle = require('./handle') -function noop() {} +function noop () {} module.exports = function (onConnect) { - var server = new TCP(); + var server = new TCP() return { listen: function (port, addr, cb) { - var err = server.bind(addr, port) - if(err) throw Error('could not bind') //server.close(), cb && cb(err) + cb = cb || noop + var err + if (net.isIPv6(addr)) { + err = server.bind6(addr, port) + } else { + err = server.bind(addr, port) + } + + if (err) { + server.close() + cb(err) + return + } - //512 connections allowed in backlog + // 512 connections allowed in backlog server.listen(511) - server.onconnection = function(err, client) { - if (err) return console.error(new Error('error connected:'+err)) + server.onconnection = function (err, client) { + if (err) { + return console.error(new Error('error connected:' + err)) + } onConnect(Handle(client, noop)) } return server }, - close: function () { - server.close() + address: function () { + if (server && server.getsockname) { + var out = {} + server.getsockname(out) + return out + } else if (this._pipeName) { + return this._pipeName + } else { + return null + } + }, + close: function (cb) { + server.close(cb) return server } } } - - - - - - - - - - - - diff --git a/test/echo.js b/test/echo.js index 6841bbc..93b00e0 100644 --- a/test/echo.js +++ b/test/echo.js @@ -26,4 +26,3 @@ pull( server.close() }) ) - From 04a61c2a763a4d3288d8483a3b75d786d3d6e91e Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Fri, 5 Aug 2016 13:51:05 +0200 Subject: [PATCH 2/2] fix tests and cleanup code --- client.js | 27 +++++++------------ handle.js | 55 ++++++++++++++++++++++++-------------- package.json | 8 +++--- test/echo.js | 8 +++--- test/{echo2js => echo2.js} | 9 +++---- 5 files changed, 58 insertions(+), 49 deletions(-) rename test/{echo2js => echo2.js} (89%) diff --git a/client.js b/client.js index 877ef4b..6f71483 100644 --- a/client.js +++ b/client.js @@ -5,6 +5,7 @@ var pull = require('pull-stream') var net = require('net') module.exports = function (port, address, cb) { + cb = cb || function () {} port |= 0 var clientHandle = new TCP() var connect = new TCPConnectWrap() @@ -23,21 +24,13 @@ module.exports = function (port, address, cb) { err = clientHandle.connect6(connect, address, port) } - // stream = err ? Handle(clientHandle, function () {}) : pull.error(err) - // if (!err) return Handle(clientHandle, function () {}) - // if (err) return cb(new Error('error connecting 2:' + err)) - - // so, I could actually return the client stream syncly. - - if (err) { - console.log('ERROR', err) - err = new Error('connection failed: ' + err) - return { - source: pull.error(err), - sink: function () { - return function (read) { read(err, cb) } - } - } - } - return Handle(clientHandle, cb) + if (err) { + console.log('ERROR', err) + err = new Error('connection failed: ' + err) + return { + source: pull.error(err), + sink: function (read) { read(err, cb) } + } + } + return Handle(clientHandle, cb) } diff --git a/handle.js b/handle.js index 34e5e5d..211661f 100644 --- a/handle.js +++ b/handle.js @@ -1,24 +1,29 @@ -const ShutdownWrap = process.binding('stream_wrap').ShutdownWrap; -const WriteWrap = process.binding('stream_wrap').WriteWrap; -const LOW = 32*1024, HIGH = 64*1024 +const ShutdownWrap = process.binding('stream_wrap').ShutdownWrap +const WriteWrap = process.binding('stream_wrap').WriteWrap +const LOW = 32 * 1024 +const HIGH = 64 * 1024 function noop () {} module.exports = function (handle, cb) { - var queue = [], buffered = 0, waiting = null, ended = null + var queue = [] + var buffered = 0 + var waiting = null + var ended = null + handle.onread = function (n, data) { - if(n <= 0) ended = true + if (n <= 0) ended = true - if(waiting) { + if (waiting) { var cb = waiting waiting = null return cb(ended, data) } - if(data) { - buffer.push(data) + if (data) { + queue.push(data) buffered += data.length - if(buffered > HIGH) handle.readStop() + if (buffered > HIGH) handle.readStop() } } @@ -26,37 +31,47 @@ module.exports = function (handle, cb) { var end = new ShutdownWrap() end.async = false end.handle = handle - end.oncomplete = function (_,_,_, err) { cb(err) } + end.oncomplete = function (_, __, ___, err) { + cb(err) + } handle.shutdown(end) } return { source: function (abort, _cb) { - if(abort) shutdown(function (err) { _cb(err || abort); cb(err) }) - if(queue.length) { + if (abort) { + shutdown(function (err) { + _cb(err || abort) + cb(err) + }) + } + + if (queue.length) { var data = queue.shift() buffered -= data.length _cb(null, data) + } else if (ended) { + _cb(ended) + } else { + waiting = _cb } - else if(ended) _cb(ended) - else waiting = _cb - if(!ended && buffered < LOW) handle.readStart() + if (!ended && buffered < LOW) handle.readStart() }, sink: function (read) { read(null, function next (err, data) { - if(err) shutdown(cb) + if (err) shutdown(cb) else { var write = new WriteWrap() - write.async = false //what does this mean? + write.async = false // what does this mean? write.handle = handle - //this keep the buffer being GC'd till write is complete (i think) + // this keeps the buffer being GC'd till write is complete (i think) write.buffer = data write.oncomplete = function (status, handle, req, err) { - if(err) return read(err, cb) + if (err) return read(err, cb) else read(null, next) } - if(0 === handle.writeBuffer(write, data)) { + if (handle.writeBuffer(write, data) === 0) { write.oncomplete = noop read(null, next) } diff --git a/package.json b/package.json index 20e59d5..d969d1b 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "pull-net", - "description": "", - "version": "0.0.0", + "description": "A replacement for net using pull streams all the way", + "version": "0.1.0", "homepage": "https://github.com/dominictarr/pull-net", "repository": { "type": "git", @@ -13,10 +13,12 @@ }, "devDependencies": { "pull-stream": "^3.4.0", + "standard": "^7.1.2", "stream-to-pull-stream": "^1.6.8" }, "scripts": { - "test": "set -e; for t in test/*.js; do node $t; done" + "test": "set -e; for t in test/*.js; do node $t; done", + "lint": "standard" }, "author": "'Dominic Tarr' (dominictarr.com)", "license": "MIT" diff --git a/test/echo.js b/test/echo.js index 93b00e0..a2fc09f 100644 --- a/test/echo.js +++ b/test/echo.js @@ -5,19 +5,19 @@ var toPull = require('stream-to-pull-stream') var createServer = require('../server') -var server = createServer(1234, 'localhost', function (stream) { +var server = createServer(function (stream) { console.log(stream) pull( stream.source, pull.through(function (data) { console.log('THROUGH', data) - },function (err) { + }, function (err) { console.log('END', err) }), stream.sink) -}) +}).listen(9090, '127.0.0.1') -var client = net.connect(1234, 'localhost') +var client = net.connect(9090, '127.0.0.1') pull( pull.values([new Buffer('HELLO THERE')]), toPull.duplex(client), diff --git a/test/echo2js b/test/echo2.js similarity index 89% rename from test/echo2js rename to test/echo2.js index 37b6f08..9c568b2 100644 --- a/test/echo2js +++ b/test/echo2.js @@ -9,7 +9,7 @@ var server = createServer(function (stream) { stream.source, pull.through(function (data) { console.log('THROUGH', data) - },function (err) { + }, function (err) { console.log('END', err) }), stream.sink) @@ -17,9 +17,9 @@ var server = createServer(function (stream) { console.log('server', server) -//setTimeout(function () { +// setTimeout(function () { -var client = connect(9988, '127.0.0.1') +var client = connect(9988, '127.0.0.1') //, function (err, stream) { // if(err) throw err @@ -33,7 +33,7 @@ var client = connect(9988, '127.0.0.1') // }) // ) // }) -//},100) +// },100) pull( pull.values([new Buffer('HELLO THERE')]), @@ -43,4 +43,3 @@ pull( server.close() }) ) -