Skip to content

Commit

Permalink
fix tests and cleanup code
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire committed Aug 5, 2016
1 parent b647de0 commit 04a61c2
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 49 deletions.
27 changes: 10 additions & 17 deletions client.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ var pull = require('pull-stream')
var net = require('net')

module.exports = function (port, address, cb) {
cb = cb || function () {}
port |= 0
var clientHandle = new TCP()
var connect = new TCPConnectWrap()
Expand All @@ -23,21 +24,13 @@ module.exports = function (port, address, cb) {
err = clientHandle.connect6(connect, address, port)
}

// stream = err ? Handle(clientHandle, function () {}) : pull.error(err)
// if (!err) return Handle(clientHandle, function () {})
// if (err) return cb(new Error('error connecting 2:' + err))

// so, I could actually return the client stream syncly.

if (err) {
console.log('ERROR', err)
err = new Error('connection failed: ' + err)
return {
source: pull.error(err),
sink: function () {
return function (read) { read(err, cb) }
}
}
}
return Handle(clientHandle, cb)
if (err) {
console.log('ERROR', err)
err = new Error('connection failed: ' + err)
return {
source: pull.error(err),
sink: function (read) { read(err, cb) }
}
}
return Handle(clientHandle, cb)
}
55 changes: 35 additions & 20 deletions handle.js
Original file line number Diff line number Diff line change
@@ -1,62 +1,77 @@
const ShutdownWrap = process.binding('stream_wrap').ShutdownWrap;
const WriteWrap = process.binding('stream_wrap').WriteWrap;
const LOW = 32*1024, HIGH = 64*1024
const ShutdownWrap = process.binding('stream_wrap').ShutdownWrap
const WriteWrap = process.binding('stream_wrap').WriteWrap
const LOW = 32 * 1024
const HIGH = 64 * 1024

function noop () {}

module.exports = function (handle, cb) {
var queue = [], buffered = 0, waiting = null, ended = null
var queue = []
var buffered = 0
var waiting = null
var ended = null

handle.onread = function (n, data) {
if(n <= 0) ended = true
if (n <= 0) ended = true

if(waiting) {
if (waiting) {
var cb = waiting
waiting = null
return cb(ended, data)
}

if(data) {
buffer.push(data)
if (data) {
queue.push(data)
buffered += data.length
if(buffered > HIGH) handle.readStop()
if (buffered > HIGH) handle.readStop()
}
}

function shutdown (cb) {
var end = new ShutdownWrap()
end.async = false
end.handle = handle
end.oncomplete = function (_,_,_, err) { cb(err) }
end.oncomplete = function (_, __, ___, err) {
cb(err)
}
handle.shutdown(end)
}

return {
source: function (abort, _cb) {
if(abort) shutdown(function (err) { _cb(err || abort); cb(err) })
if(queue.length) {
if (abort) {
shutdown(function (err) {
_cb(err || abort)
cb(err)
})
}

if (queue.length) {
var data = queue.shift()
buffered -= data.length
_cb(null, data)
} else if (ended) {
_cb(ended)
} else {
waiting = _cb
}
else if(ended) _cb(ended)
else waiting = _cb

if(!ended && buffered < LOW) handle.readStart()
if (!ended && buffered < LOW) handle.readStart()
},
sink: function (read) {
read(null, function next (err, data) {
if(err) shutdown(cb)
if (err) shutdown(cb)
else {
var write = new WriteWrap()
write.async = false //what does this mean?
write.async = false // what does this mean?
write.handle = handle
//this keep the buffer being GC'd till write is complete (i think)
// this keeps the buffer being GC'd till write is complete (i think)
write.buffer = data
write.oncomplete = function (status, handle, req, err) {
if(err) return read(err, cb)
if (err) return read(err, cb)
else read(null, next)
}
if(0 === handle.writeBuffer(write, data)) {
if (handle.writeBuffer(write, data) === 0) {
write.oncomplete = noop
read(null, next)
}
Expand Down
8 changes: 5 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "pull-net",
"description": "",
"version": "0.0.0",
"description": "A replacement for net using pull streams all the way",
"version": "0.1.0",
"homepage": "https://github.com/dominictarr/pull-net",
"repository": {
"type": "git",
Expand All @@ -13,10 +13,12 @@
},
"devDependencies": {
"pull-stream": "^3.4.0",
"standard": "^7.1.2",
"stream-to-pull-stream": "^1.6.8"
},
"scripts": {
"test": "set -e; for t in test/*.js; do node $t; done"
"test": "set -e; for t in test/*.js; do node $t; done",
"lint": "standard"
},
"author": "'Dominic Tarr' <[email protected]> (dominictarr.com)",
"license": "MIT"
Expand Down
8 changes: 4 additions & 4 deletions test/echo.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,19 @@ var toPull = require('stream-to-pull-stream')

var createServer = require('../server')

var server = createServer(1234, 'localhost', function (stream) {
var server = createServer(function (stream) {
console.log(stream)
pull(
stream.source,
pull.through(function (data) {
console.log('THROUGH', data)
},function (err) {
}, function (err) {
console.log('END', err)
}),
stream.sink)
})
}).listen(9090, '127.0.0.1')

var client = net.connect(1234, 'localhost')
var client = net.connect(9090, '127.0.0.1')
pull(
pull.values([new Buffer('HELLO THERE')]),
toPull.duplex(client),
Expand Down
9 changes: 4 additions & 5 deletions test/echo2js → test/echo2.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@ var server = createServer(function (stream) {
stream.source,
pull.through(function (data) {
console.log('THROUGH', data)
},function (err) {
}, function (err) {
console.log('END', err)
}),
stream.sink)
}).listen(9988, '127.0.0.1')

console.log('server', server)

//setTimeout(function () {
// setTimeout(function () {

var client = connect(9988, '127.0.0.1')
var client = connect(9988, '127.0.0.1')

//, function (err, stream) {
// if(err) throw err
Expand All @@ -33,7 +33,7 @@ var client = connect(9988, '127.0.0.1')
// })
// )
// })
//},100)
// },100)

pull(
pull.values([new Buffer('HELLO THERE')]),
Expand All @@ -43,4 +43,3 @@ pull(
server.close()
})
)

0 comments on commit 04a61c2

Please sign in to comment.