Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
prdn committed Apr 1, 2016
1 parent 547d5e2 commit 3b6655e
Show file tree
Hide file tree
Showing 5 changed files with 233 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
node_modules
*.swp
*.swo
24 changes: 24 additions & 0 deletions examples/client1.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
'use strict'

var _ = require('lodash')
var Base = require('grenache-nodejs-base')
var Client = require('./../lib/Client')

var gc = new Base.Link({
grape: 'ws://127.0.0.1:30001'
})
gc.start()

var tc = new Client(gc, {})
var service = tc.listen('req', 'tcp://127.0.0.1:5000')

setInterval(function() {
tc.grape.announce('test', service.port, {}, () => {
console.log('announced')
})
}, 1000)

service.socket.on('message', (rid, type, payload, reply) => {
//console.log('here', rid, type, payload)
service.socket.send([rid, type, 'world'])
})
31 changes: 31 additions & 0 deletions examples/client2.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
'use strict'

var _ = require('lodash')
var Base = require('grenache-nodejs-base')
var Client = require('./../lib/Client')

var gc = new Base.Link({
grape: 'ws://127.0.0.1:30002'
})
gc.start()

var tc = new Client(gc, {})

var cnt = 10000
var reps = 0

setTimeout(() => {
gc.lookup('test', { timeout: 1000 }, (err, data) => {
var d1 = new Date()
for (var i = 0; i < cnt; i++) {
tc.request(data[0], 'test', 'hello', (err, data) => {
//console.log("here", err, data)
if (++reps === cnt) {
var d2 = new Date()
console.log(d2 - d1)
}
})
}
console.log(err, data)
})
}, 2000)
125 changes: 125 additions & 0 deletions lib/Client.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
'use strict'

var uuid = require('uuid')
var zmq = require('zmq')
var url = require('url')
var _ = require('lodash')
var Base = require('grenache-nodejs-base')

class Client extends Base.Client {

constructor(grape, conf) {
super(grape, conf)
}

enhance(socket) {
socket.identity = new Buffer(uuid.v4())
socket.setsockopt('linger', 1)
}

listen(type, dest) {
var socket
var needListener = false

switch (type) {
case 'req':
socket = zmq.socket('router')
needListener = true
break
case 'pub':
socket = zmq.socket('pub')
break
}

this.enhance(socket)

if (needListener) {
socket.on('message', (client, rid, type, data) => {
this.emit(
'request', rid.toString(), type.toString(), data.toString(), {
end: (res) => {
this.socket.send([client, rid, res])
}
}
)
})
}

socket.bindSync(dest)

var bindInfo = socket._zmq.getsockopt(zmq.ZMQ_LAST_ENDPOINT)
bindInfo = url.parse(bindInfo)

socket._sDest = dest
socket._sDir = 'server'
socket._sType = type
socket._sPort = +bindInfo.port

var target = this._targets[dest] = {
socket: socket,
port: socket._sPort,
connected: true
}

return this._targets[dest]
}

_connect(type, dest, cb) {
var socket

switch (type) {
case 'req':
socket = zmq.socket('dealer')
break
case 'sub':
socket = zmq.socket('sub')
break
}

this.enhance(socket)

socket.on('message', (rid, data) => {
//console.log("Transport.onMessage", rid, data)
this.handleReply(rid.toString(), data.toString())
})

socket.connect(dest)
socket._sDest = dest
socket._sDir = 'client'
socket._sType = type

var target = this._targets[dest] = {
socket: socket,
connected: true,
_queue: [cb]
}

setImmediate(() => {
_.each(target._queue, cb => {
cb()
})
target._queue = []
})
}

_request(dest, type, payload, cb) {
var target = this._targets[dest]
var req = this.req(type, payload, {}, cb)
target.socket.send([req.rid, type, payload])
}

publish(dest, chan, payload) {
var target = this._targets[dest]
target.socket.send(chan + ' ' + payload)
}

stop() {
_.each(this._targets, socket => {
socket.close()
})

super.stop()
}
}

module.exports = Client
50 changes: 50 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
{
"name": "grenache-nodejs-zmq",
"version": "0.0.1",
"private": false,
"description": "Granache Grape",
"author": "prdn <[email protected]> (https://bitfinex.com/)",
"keywords": [
"grenache",
"kademlia",
"nodejs",
"micro-services"
],
"dependencies": {
"async": "~1.5.2",
"debug": "~2.2.0",
"grenache-nodejs-base": "git+https://github.com/bitfinexcom/grenache-nodejs-base.git",
"lodash": "~4.6.1",
"uuid": "~2.0.1",
"underscore.string": "~3.2.3",
"zmq": "~2.14.0"
},
"engine": {
"node": ">=0.10"
},
"main": "index.js",
"directories": {
"example": "examples",
"test": "test"
},
"devDependencies": {
"chai": "~3.2.0",
"mocha": "~2.2.5"
},
"scripts": {
"test": "make test",
"lint": "eslint --config .eslintrc --fix index.js lib/ test/"
},
"license": "MIT",
"repository": {
"type": "git",
"url": "https://github.com/bitfinexcom/grenache-nodejs-zmq.git"
},
"bugs": {
"url": "https://github.com/bitfinexcom/grenache-nodejs-zmq/issues"
},
"homepage": "https://github.com/bitfinexcom/grenache-nodejs-zmq",
"optionalDependencies": {
"eslint": "~1.6.0"
}
}

0 comments on commit 3b6655e

Please sign in to comment.