Skip to content

Commit

Permalink
initial
Browse files Browse the repository at this point in the history
  • Loading branch information
dominictarr committed May 31, 2016
0 parents commit bd23213
Show file tree
Hide file tree
Showing 9 changed files with 314 additions and 0 deletions.
4 changes: 4 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
language: node_js
node_js:
- 0.6
- 0.8
22 changes: 22 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
Copyright (c) 2016 'Dominic Tarr'

Permission is hereby granted, free of charge,
to any person obtaining a copy of this software and
associated documentation files (the "Software"), to
deal in the Software without restriction, including
without limitation the rights to use, copy, modify,
merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom
the Software is furnished to do so,
subject to the following conditions:

The above copyright notice and this permission notice
shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR
ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
49 changes: 49 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# pull-net

pull-stream directly to node's libuv bindings.

echo server works, don't handle all the edge cases yet,
or nice error messages etc.

## example

``` js
var createServer = require('pull-net/server')

createServer(function (stream) {
pull(stream.source, stream.sink) //ECHO
}).listen(9999, '127.0.0.1')

var connect = require('pull-net/client')

var stream = connect(9999, '127.0.0.1')

pull(
pull.once(new Buffer('hello tcp')),
stream,
pull.collect(console.log)
)
```

## Questions

node does some things that turn out to be unnecessary,
like, take a callback for `server.listen`.

Maybe these cause problem when trying to use other stream types though,
(such as like unix pipes, which are also handled in
[node/lib/net.js](https://github.com/nodejs/node/blob/master/lib/net.js))

This is probably mainly to handle some errors... maybe those errors
could just throw?

Also, there are often client type connections which may error
before receiving data (at least in their context, such as authentication errors)
often, this can't be a sync error. So that would suggest an api
that was `connect(function (err, stream) {...})`

what if a server was a stream of clients? does that really help?

## License

MIT
33 changes: 33 additions & 0 deletions client.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
var Handle = require('./handle')
var TCP = process.binding('tcp_wrap').TCP;
var TCPConnectWrap = process.binding('tcp_wrap').TCPConnectWrap;

module.exports = function (port, address, cb) {
var clientHandle = new TCP()
var connect = new TCPConnectWrap(), stream

connect.oncomplete = function (err) {
if(err) return cb(new Error('error connecting 1:'+err))
cb && cb(null, stream)
}
var err = clientHandle.connect(connect, address, port);

stream = err ? Handle(clientHandle, function () {}) : error.duplex(err)
if(!err) return Handle(clientHandle, function () {})
if(err) return cb(new Error('error connecting 2:'+err))

//so, I could actually return the client stream syncly.


//
// if(err) {
// console.log("ERROR", err)
// err = new Error('connection failed:'+err)
// return {
// source: Error(err),
// sink: function (read) {read(err, cb)}
// }
// }
// return Handle(clientHandle, cb)
}

68 changes: 68 additions & 0 deletions handle.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
const ShutdownWrap = process.binding('stream_wrap').ShutdownWrap;
const WriteWrap = process.binding('stream_wrap').WriteWrap;
const LOW = 32*1024, HIGH = 64*1024

function noop () {}

module.exports = function (handle, cb) {
var queue = [], buffered = 0, waiting = null, ended = null
handle.onread = function (n, data) {
if(n <= 0) ended = true

if(waiting) {
var cb = waiting
waiting = null
return cb(ended, data)
}

if(data) {
buffer.push(data)
buffered += data.length
if(buffered > HIGH) handle.readStop()
}
}

function shutdown (cb) {
var end = new ShutdownWrap()
end.async = false
end.handle = handle
end.oncomplete = function (_,_,_, err) { cb(err) }
handle.shutdown(end)
}

return {
source: function (abort, _cb) {
if(abort) shutdown(function (err) { _cb(err || abort); cb(err) })
if(queue.length) {
var data = queue.shift()
buffered -= data.length
_cb(null, data)
}
else if(ended) _cb(ended)
else waiting = _cb

if(!ended && buffered < LOW) handle.readStart()
},
sink: function (read) {
read(null, function next (err, data) {
if(err) shutdown(cb)
else {
var write = new WriteWrap()
write.async = false //what does this mean?
write.handle = handle
//this keep the buffer being GC'd till write is complete (i think)
write.buffer = data
write.oncomplete = function (status, handle, req, err) {
if(err) return read(err, cb)
else read(null, next)
}
if(0 === handle.writeBuffer(write, data)) {
write.oncomplete = noop
read(null, next)
}
}
})
}
}
}

23 changes: 23 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"name": "pull-net",
"description": "",
"version": "0.0.0",
"homepage": "https://github.com/dominictarr/pull-net",
"repository": {
"type": "git",
"url": "git://github.com/dominictarr/pull-net.git"
},
"dependencies": {
"pull-pushable": "^2.0.0",
"pull-stream": "^3.4.0"
},
"devDependencies": {
"pull-stream": "^3.4.0",
"stream-to-pull-stream": "^1.6.8"
},
"scripts": {
"test": "set -e; for t in test/*.js; do node $t; done"
},
"author": "'Dominic Tarr' <[email protected]> (dominictarr.com)",
"license": "MIT"
}
40 changes: 40 additions & 0 deletions server.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
var TCP = process.binding('tcp_wrap').TCP;
var Handle = require('./handle')

function noop() {}

module.exports = function (onConnect) {
var server = new TCP();

return {
listen: function (port, addr, cb) {
var err = server.bind(addr, port)
if(err) throw Error('could not bind') //server.close(), cb && cb(err)

//512 connections allowed in backlog
server.listen(511)

server.onconnection = function(err, client) {
if (err) return console.error(new Error('error connected:'+err))
onConnect(Handle(client, noop))
}
return server
},
close: function () {
server.close()
return server
}
}
}












29 changes: 29 additions & 0 deletions test/echo.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
var pull = require('pull-stream')

var net = require('net')
var toPull = require('stream-to-pull-stream')

var createServer = require('../server')

var server = createServer(1234, 'localhost', function (stream) {
console.log(stream)
pull(
stream.source,
pull.through(function (data) {
console.log('THROUGH', data)
},function (err) {
console.log('END', err)
}),
stream.sink)
})

var client = net.connect(1234, 'localhost')
pull(
pull.values([new Buffer('HELLO THERE')]),
toPull.duplex(client),
pull.drain(console.log, function () {
console.log('END')
server.close()
})
)

46 changes: 46 additions & 0 deletions test/echo2js
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
var pull = require('pull-stream')

var connect = require('../client')
var createServer = require('../server')

var server = createServer(function (stream) {
console.log(stream)
pull(
stream.source,
pull.through(function (data) {
console.log('THROUGH', data)
},function (err) {
console.log('END', err)
}),
stream.sink)
}).listen(9988, '127.0.0.1')

console.log('server', server)

//setTimeout(function () {

var client = connect(9988, '127.0.0.1')

//, function (err, stream) {
// if(err) throw err
// console.log(err, stream)
// pull(
// pull.values([new Buffer('HELLO THERE')]),
// stream,
// pull.drain(console.log, function () {
// console.log('END')
// server.close()
// })
// )
// })
//},100)

pull(
pull.values([new Buffer('HELLO THERE')]),
client,
pull.drain(console.log, function () {
console.log('END')
server.close()
})
)

0 comments on commit bd23213

Please sign in to comment.