Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Making some improvements #1

Merged
merged 2 commits into from
Aug 5, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
node_modules
51 changes: 27 additions & 24 deletions client.js
Original file line number Diff line number Diff line change
@@ -1,33 +1,36 @@
var Handle = require('./handle')
var TCP = process.binding('tcp_wrap').TCP;
var TCPConnectWrap = process.binding('tcp_wrap').TCPConnectWrap;
var TCP = process.binding('tcp_wrap').TCP
var TCPConnectWrap = process.binding('tcp_wrap').TCPConnectWrap
var pull = require('pull-stream')
var net = require('net')

module.exports = function (port, address, cb) {
cb = cb || function () {}
port |= 0
var clientHandle = new TCP()
var connect = new TCPConnectWrap(), stream
var connect = new TCPConnectWrap()
var stream

connect.oncomplete = function (err) {
if(err) return cb(new Error('error connecting 1:'+err))
connect.port = port
connect.address = address
connect.oncomplete = function afterConnect (err) {
if (err) return cb(new Error('error connecting 1:' + err))
cb && cb(null, stream)
}
var err = clientHandle.connect(connect, address, port);

stream = err ? Handle(clientHandle, function () {}) : error.duplex(err)
if(!err) return Handle(clientHandle, function () {})
if(err) return cb(new Error('error connecting 2:'+err))

//so, I could actually return the client stream syncly.

var err
if (net.isIPv4) {
err = clientHandle.connect(connect, address, port)
} else {
err = clientHandle.connect6(connect, address, port)
}

//
// if(err) {
// console.log("ERROR", err)
// err = new Error('connection failed:'+err)
// return {
// source: Error(err),
// sink: function (read) {read(err, cb)}
// }
// }
// return Handle(clientHandle, cb)
if (err) {
console.log('ERROR', err)
err = new Error('connection failed: ' + err)
return {
source: pull.error(err),
sink: function (read) { read(err, cb) }
}
}
return Handle(clientHandle, cb)
}

56 changes: 35 additions & 21 deletions handle.js
Original file line number Diff line number Diff line change
@@ -1,62 +1,77 @@
const ShutdownWrap = process.binding('stream_wrap').ShutdownWrap;
const WriteWrap = process.binding('stream_wrap').WriteWrap;
const LOW = 32*1024, HIGH = 64*1024
const ShutdownWrap = process.binding('stream_wrap').ShutdownWrap
const WriteWrap = process.binding('stream_wrap').WriteWrap
const LOW = 32 * 1024
const HIGH = 64 * 1024

function noop () {}

module.exports = function (handle, cb) {
var queue = [], buffered = 0, waiting = null, ended = null
var queue = []
var buffered = 0
var waiting = null
var ended = null

handle.onread = function (n, data) {
if(n <= 0) ended = true
if (n <= 0) ended = true

if(waiting) {
if (waiting) {
var cb = waiting
waiting = null
return cb(ended, data)
}

if(data) {
buffer.push(data)
if (data) {
queue.push(data)
buffered += data.length
if(buffered > HIGH) handle.readStop()
if (buffered > HIGH) handle.readStop()
}
}

function shutdown (cb) {
var end = new ShutdownWrap()
end.async = false
end.handle = handle
end.oncomplete = function (_,_,_, err) { cb(err) }
end.oncomplete = function (_, __, ___, err) {
cb(err)
}
handle.shutdown(end)
}

return {
source: function (abort, _cb) {
if(abort) shutdown(function (err) { _cb(err || abort); cb(err) })
if(queue.length) {
if (abort) {
shutdown(function (err) {
_cb(err || abort)
cb(err)
})
}

if (queue.length) {
var data = queue.shift()
buffered -= data.length
_cb(null, data)
} else if (ended) {
_cb(ended)
} else {
waiting = _cb
}
else if(ended) _cb(ended)
else waiting = _cb

if(!ended && buffered < LOW) handle.readStart()
if (!ended && buffered < LOW) handle.readStart()
},
sink: function (read) {
read(null, function next (err, data) {
if(err) shutdown(cb)
if (err) shutdown(cb)
else {
var write = new WriteWrap()
write.async = false //what does this mean?
write.async = false // what does this mean?
write.handle = handle
//this keep the buffer being GC'd till write is complete (i think)
// this keeps the buffer being GC'd till write is complete (i think)
write.buffer = data
write.oncomplete = function (status, handle, req, err) {
if(err) return read(err, cb)
if (err) return read(err, cb)
else read(null, next)
}
if(0 === handle.writeBuffer(write, data)) {
if (handle.writeBuffer(write, data) === 0) {
write.oncomplete = noop
read(null, next)
}
Expand All @@ -65,4 +80,3 @@ module.exports = function (handle, cb) {
}
}
}

8 changes: 5 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "pull-net",
"description": "",
"version": "0.0.0",
"description": "A replacement for net using pull streams all the way",
"version": "0.1.0",
"homepage": "https://github.com/dominictarr/pull-net",
"repository": {
"type": "git",
Expand All @@ -13,10 +13,12 @@
},
"devDependencies": {
"pull-stream": "^3.4.0",
"standard": "^7.1.2",
"stream-to-pull-stream": "^1.6.8"
},
"scripts": {
"test": "set -e; for t in test/*.js; do node $t; done"
"test": "set -e; for t in test/*.js; do node $t; done",
"lint": "standard"
},
"author": "'Dominic Tarr' <[email protected]> (dominictarr.com)",
"license": "MIT"
Expand Down
57 changes: 35 additions & 22 deletions server.js
Original file line number Diff line number Diff line change
@@ -1,40 +1,53 @@
var TCP = process.binding('tcp_wrap').TCP;
var TCP = process.binding('tcp_wrap').TCP
var net = require('net')
var Handle = require('./handle')

function noop() {}
function noop () {}

module.exports = function (onConnect) {
var server = new TCP();
var server = new TCP()

return {
listen: function (port, addr, cb) {
var err = server.bind(addr, port)
if(err) throw Error('could not bind') //server.close(), cb && cb(err)
cb = cb || noop
var err
if (net.isIPv6(addr)) {
err = server.bind6(addr, port)
} else {
err = server.bind(addr, port)
}

if (err) {
server.close()
cb(err)
return
}

//512 connections allowed in backlog
// 512 connections allowed in backlog
server.listen(511)

server.onconnection = function(err, client) {
if (err) return console.error(new Error('error connected:'+err))
server.onconnection = function (err, client) {
if (err) {
return console.error(new Error('error connected:' + err))
}
onConnect(Handle(client, noop))
}
return server
},
close: function () {
server.close()
address: function () {
if (server && server.getsockname) {
var out = {}
server.getsockname(out)
return out
} else if (this._pipeName) {
return this._pipeName
} else {
return null
}
},
close: function (cb) {
server.close(cb)
return server
}
}
}












9 changes: 4 additions & 5 deletions test/echo.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,19 @@ var toPull = require('stream-to-pull-stream')

var createServer = require('../server')

var server = createServer(1234, 'localhost', function (stream) {
var server = createServer(function (stream) {
console.log(stream)
pull(
stream.source,
pull.through(function (data) {
console.log('THROUGH', data)
},function (err) {
}, function (err) {
console.log('END', err)
}),
stream.sink)
})
}).listen(9090, '127.0.0.1')

var client = net.connect(1234, 'localhost')
var client = net.connect(9090, '127.0.0.1')
pull(
pull.values([new Buffer('HELLO THERE')]),
toPull.duplex(client),
Expand All @@ -26,4 +26,3 @@ pull(
server.close()
})
)

9 changes: 4 additions & 5 deletions test/echo2js → test/echo2.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@ var server = createServer(function (stream) {
stream.source,
pull.through(function (data) {
console.log('THROUGH', data)
},function (err) {
}, function (err) {
console.log('END', err)
}),
stream.sink)
}).listen(9988, '127.0.0.1')

console.log('server', server)

//setTimeout(function () {
// setTimeout(function () {

var client = connect(9988, '127.0.0.1')
var client = connect(9988, '127.0.0.1')

//, function (err, stream) {
// if(err) throw err
Expand All @@ -33,7 +33,7 @@ var client = connect(9988, '127.0.0.1')
// })
// )
// })
//},100)
// },100)

pull(
pull.values([new Buffer('HELLO THERE')]),
Expand All @@ -43,4 +43,3 @@ pull(
server.close()
})
)