Skip to content

Commit

Permalink
lets do some version control yall
Browse files Browse the repository at this point in the history
  • Loading branch information
bpot committed Sep 4, 2010
0 parents commit 9862f29
Show file tree
Hide file tree
Showing 20 changed files with 932 additions and 0 deletions.
18 changes: 18 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
Copyright 2010 Bob Potter. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to
deal in the Software without restriction, including without limitation the
rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
sell copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
IN THE SOFTWARE.
69 changes: 69 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
node-gossip implements a gossip protocol w/failure detection, allowing you to create a fault-tolerant, self-managing cluster of node.js processes. Each server in the cluster has it's own set of key-value pairs which are propogated to the others peers in the cluster. The API allows you to make changes to the local state, listen for changes in state, listen for new peers and be notified when a peer appears to be dead or appears to have come back to life.

The library is currently in 'hey it seems to work for me' state, there are probably some bugs lurking around. The API will probably change and suggestions on how to improve it are very welcome.

Check out the the scripts in the simulations/ directory for some examples.

### Usage

var Gossiper = require('gossiper').Gossiper;
// Create a seed peer.
var seed = new Gossiper(9000, []);
seed.start();

// Create 20 new peers and point them at the seed (usually this would happen in 20 separate processes)
// To prevent having a single point of failure you would probably have multiple seeds
for(var i = 9001; i <= 9020;i++) {
var g = new Gossiper(i, ['127.0.0.1:9000']);
g.start();
g.on('update', function(peer, k, v) {
console.log("peer " + peer + " set " + k + " to " + v); // peer 127.0.0.1:9999 set somekey to somevalue
});
}

// Add another peer which updates it's state after 15 seconds
var updater = new Gossiper(9999, ['127.0.0.1:9000']);
updater.start();
setTimeout(function() {
updater.setLocalState('somekey', 'somevalue');
}, 15000);


### API

Gossiper methods:

allPeeers()
livePeers()
deadPeers()
peerValue(peer, key)
peerKeys(peer)
getLocalState(key)
setLocalSate(key, value)

Gossiper events:

on('update', function(peer_name, key, value) {})
on('new_peer', function(peer_name) {})
on('peer_alive', function(peer_name) {})
on('peer_failed', function(peer_name) {})

### Tests

expresso -I lib test/*

### TODO

* test edge cases
* Cluster name -- dont allow peers to accidentally join the wrong cluster
The scuttlebutt paper mentions a couple things we don't current do:
* congestion throttling
* make digests only be random subsets

### Acknowledgements

Both the gossip protocol and the failure detection algorithms are based off of academic papers and Cassandra's (http://www.cassandra.org/) implementation of those papers. This library is highly indebted to both.

* ["Efficient reconciliation and flow control for anti-entropy protocols"](http://www.cs.cornell.edu/home/rvr/papers/flowgossip.pdf)
* ["The Phi accrual failure detector"](http://vsedach.googlepages.com/HDY04.pdf)
Binary file added lib/.gossiper.js.swp
Binary file not shown.
Binary file added lib/.peer_state.js.swp
Binary file not shown.
34 changes: 34 additions & 0 deletions lib/accrual_failure_detector.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
var AccrualFailureDetector = exports.AccrualFailureDetector = function() {
this.last_time = undefined;
this.intervals = [];
}

AccrualFailureDetector.prototype.add = function(arrival_time) {
if(this.last_time == undefined) {
var i = 750;
} else {
var i = arrival_time - this.last_time;
}

this.last_time = arrival_time;
this.intervals.push(i);
if(this.intervals.length > 1000) {
this.intervals.shift();
}
};

AccrualFailureDetector.prototype.phi = function(current_time) {
var current_interval = current_time - this.last_time;
var exp = -1 * current_interval / this.interval_mean();

var p = Math.pow(Math.E, exp);
return -1 * (Math.log(p) / Math.log(10));
};

AccrualFailureDetector.prototype.interval_mean = function(current_time) {
sum = 0;
for(i in this.intervals) {
sum += this.intervals[i];
}
return sum / this.intervals.length;
};
228 changes: 228 additions & 0 deletions lib/gossiper.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
var PeerState = require('peer_state').PeerState,
Scuttle = require('scuttle').Scuttle,
EventEmitter = require('events').EventEmitter,
PeerState = require('peer_state').PeerState,
net = require('net'),
sys = require('sys'),
child_process = require('child_process'),
dns = require('dns'),
msgpack = require('msgpack');

var Gossiper = exports.Gossiper = function(port, seeds, ip_to_bind) {
EventEmitter.call(this);
this.peers = {};
this.ip_to_bind = ip_to_bind;
this.port = port;
this.my_state = new PeerState();
this.scuttle = new Scuttle(this.peers, this.my_state);

for(var i in seeds) {
var s = seeds[i];
this.peers[s] = new PeerState(s);
}
}

sys.inherits(Gossiper, EventEmitter);

Gossiper.prototype.start = function(msg) {
var self = this;

// Create Server
this.server = net.createServer(function (net_stream) {
var mp_stream = new msgpack.Stream(net_stream);
mp_stream.on('msg', function(msg) { self.handleMessage(net_stream, mp_stream, msg) });
});

// Bind to ip/port
if(this.ip_to_bind) {
this.peer_name = [this.address, this.port.toString()].join(':');
this.peers[this.peer_name] = this.my_state;
this.server.listen(this.port, this.ip_to_bind);
} else {
child_process.exec('hostname', function(error, stdout, stderr) {
var l = stdout.length;
var hostname = stdout.slice(0, l - 1);
dns.lookup(hostname, 4, function(err,address, family) {
self.peer_name = [address, self.port.toString()].join(':');
self.peers[self.peer_name] = self.my_state;
self.server.listen(self.port, address);
});
});
}

this.heartBeatTimer = setInterval(function() { self.my_state.beatHeart() }, 1000 );
this.gossipTimer = setInterval(function() { self.gossip() }, 1000);
}

Gossiper.prototype.stop = function() {
this.server.close();
clearInterval(this.heartBeatTimer);
clearInterval(this.gossipTimer);
}


// The method of choosing whice peer(s) to gossip to is borrowed from Cassandra.
// They seemed to have worked out all of the edge cases
// http://wiki.apache.org/cassandra/ArchitectureGossip
Gossiper.prototype.gossip = function() {
// Find a live peer to gossip to
if(this.livePeers() > 0) {
var live_peer = this.chooseRandom(this.livePeers());
this.gossipToPeer(live_peer);
}

// Possilby gossip to a dead peer
var prob = this.deadPeers().length / (this.livePeers().length + 1)
if(Math.random() < prob) {
var dead_peer = this.chooseRandom(this.deadPeers());
this.gossipToPeer(dead_peer);
}

// Gossip to seed under certain conditions
if(live_peer && !this.seeds[live_peer] && this.livePeers().length < this.seeds.length) {
if(Math.random() < (this.seeds / this.allPeers.size())) {
this.gossipToPeer(chooseRandom(this.peers));
}
}

// Check health of peers
for(var i in this.peers) {
var peer = this.peers[i];
if(peer != this.my_state) {
peer.isSuspect();
}
}
}

Gossiper.prototype.chooseRandom = function(peers) {
// Choose random peer to gossip to
var i = Math.floor(Math.random()*1000000) % peers.length;
return peers[i];
}

Gossiper.prototype.gossipToPeer = function(peer) {
var a = peer.split(":");
var gosipee = new net.createConnection(a[1], a[0]);
var self = this;
gosipee.on('connect', function(net_stream) {
var mp_stream = new msgpack.Stream(gosipee);
mp_stream.on('msg', function(msg) { self.handleMessage(gosipee, mp_stream, msg) });
mp_stream.send(self.requestMessage());
});
gosipee.on('error', function(exception) {
// console.log(self.peer_name + " received " + sys.inspect(exception));
});
}

Gossiper.REQUEST = 0;
Gossiper.FIRST_RESPONSE = 1;
Gossiper.SECOND_RESPONSE = 2;

Gossiper.prototype.handleMessage = function(net_stream, mp_stream, msg) {
switch(msg.type) {
case Gossiper.REQUEST:
mp_stream.send(this.firstResponseMessage(msg.digest));
break;
case Gossiper.FIRST_RESPONSE:
this.scuttle.updateKnownState(msg.updates);
mp_stream.send(this.secondResponseMessage(msg.request_digest));
net_stream.end();
break;
case Gossiper.SECOND_RESPONSE:
this.scuttle.updateKnownState(msg.updates);
net_stream.end();
break;
default:
// shit went bad
break;
}
}

// MESSSAGES


Gossiper.prototype.handleNewPeers = function(new_peers) {
var self = this;
for(var i in new_peers) {
var peer_name = new_peers[i];
this.peers[peer_name] = new PeerState(peer_name);
this.emit('new_peer', peer_name);

var peer = this.peers[peer_name];
this.listenToPeer(peer);
}
}

Gossiper.prototype.listenToPeer = function(peer) {
var self = this;
peer.on('update', function(k,v) {
self.emit('update', peer.name, k, v);
});
peer.on('peer_alive', function() {
self.emit('peer_alive', peer.name);
});
peer.on('peer_failed', function() {
self.emit('peer_failed', peer.name);
});
}

Gossiper.prototype.requestMessage = function() {
var m = {
'type' : Gossiper.REQUEST,
'digest' : this.scuttle.digest(),
};
return m;
};

Gossiper.prototype.firstResponseMessage = function(peer_digest) {
var sc = this.scuttle.scuttle(peer_digest)
this.handleNewPeers(sc.new_peers);
var m = {
'type' : Gossiper.FIRST_RESPONSE,
'request_digest' : sc.requests,
'updates' : sc.deltas
};
return m;
};

Gossiper.prototype.secondResponseMessage = function(requests) {
var m = {
'type' : Gossiper.SECOND_RESPONSE,
'updates' : this.scuttle.fetchDeltas(requests)
};
return m;
};

Gossiper.prototype.setLocalState = function(k, v) {
this.my_state.updateLocal(k,v);
}

Gossiper.prototype.getLocalState = function(k) {
return this.my_state.getValue(k);
}

Gossiper.prototype.peerKeys = function(peer) {
return this.peers[peer].getKeys();
}

Gossiper.prototype.peerValue = function(peer, k) {
return this.peers[peer].getValue(k);
}

Gossiper.prototype.allPeers = function() {
var keys = [];
for(var k in this.peers) { keys.push(k) };
return keys;
}

Gossiper.prototype.livePeers = function() {
var keys = [];
for(var k in this.peers) { if(k.alive) { keys.push(k)} };
return keys;
}

Gossiper.prototype.deadPeers = function() {
var keys = [];
for(var k in this.peers) { if(!k.alive) { keys.push(k) } };
return keys;
}
Loading

0 comments on commit 9862f29

Please sign in to comment.