Skip to content

Commit

Permalink
upgraded redis
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Feb 6, 2016
1 parent d0116be commit 2fdc232
Show file tree
Hide file tree
Showing 9 changed files with 511 additions and 1 deletion.
1 change: 1 addition & 0 deletions .npmignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
node_modules
32 changes: 32 additions & 0 deletions a.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"use strict";

var Queue = require('./');

var videoQueue = Queue('video transcoding', 6379, '127.0.0.1');

videoQueue.process(function(job, done){
console.log('video job %d started.', job.jobId);
done();
});

videoQueue.on('completed', function(job) {
console.log('video job %d completed.', job.jobId);
});

videoQueue.add({video: 'http://example.com/video1.mov'});
videoQueue.add({video: 'http://example.com/video1.mov'});
videoQueue.add({video: 'http://example.com/video1.mov'});


/**
*
* Tasks
*
*/
queue.task('video', opts, function(input, next){

output = do_something_with_input(input);

next('postprocess', output);
});

50 changes: 50 additions & 0 deletions b.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 31 additions & 0 deletions lib/cluster-queue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"use strict"

var
Queue = require('./queue'),
cluster = require('cluster');

var numWorkers = 8;
var queue = Queue("test concurrent queue", 6379, '127.0.0.1');

if(cluster.isMaster){
for (var i = 0; i < numWorkers; i++) {
cluster.fork();
}

cluster.on('online', function(worker) {
// Lets create a few jobs for every created worker
for(var i=0; i<500; i++){
queue.add({foo: 'bar'});
};
});

cluster.on('exit', function(worker, code, signal) {
console.log('worker ' + worker.process.pid + ' died');
});
}else{
queue.process(function(job, jobDone){
console.log("Job done by worker", cluster.worker.id, job.jobId);
jobDone();
});
}

125 changes: 125 additions & 0 deletions lib/message.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
"use strict";
var redis = require('redis');
var when = require('when');

/**
interface JobOptions
{
attempts: number;
}
*/

// queue: Queue, msgId: string, data: {}, opts: JobOptions
var Message = function Message(queue, msgId, data, opts){
this.queue = queue;
this.msgId = msgId;
this.data = data;
this.opts = opts;
this._progress = 0;
}

Message.create = function(queue, msgId, data, opts){
var deferred = when.defer();
var msg = new Message(queue, msgId, data, opts);
queue.client.HMSET(queue.toKey(msgId), msg.toData(), function(err){
if(err){
deferred.reject(err);
}else{
deferred.resolve(job);
}
});
return deferred.promise;
}

Message.fromId = function(queue, msgId){
var deferred = when.defer();
queue.client.HGETALL(queue.toKey(msgId), function(err, data){
if(data){
deferred.resolve(Message.fromData(queue, msgId, data));
}else{
deferred.reject(err);
}
});
return deferred.promise;
}

Message.prototype.toData = function(){
return {
name: this.name,
data: JSON.stringify(this.data || {}),
opts: JSON.stringify(this.opts || {}),
progress: this._progress
}
}

Message.prototype.progress = function(progress){
if(progress){
var deferred = when.defer();
var _this = this;
this.queue.client.hset(this.queue.toKey(this.msgId), 'progress', progress, function(err){
if(err){
deferred.reject(err);
}else{
deferred.resolve();
_this.queue.emit('progress', _this, progress);
}
});
return deferred.promise;
}else{
return this._progress;
}
}

Job.prototype.completed = function(){
return this._done('completed');
}

Job.prototype.failed = function(err){
return this._done('failed');
}

Job.prototype.isCompleted = function(){
return this._isDone('completed');
}

Job.prototype.isFailed = function(){
return this._isDone('failed');
}

Job.prototype._isDone = function(list){
var deferred = when.defer();
this.queue.client.SISMEMBER(this.queue.toKey(list), this.jobId, function(err, isMember){
if(err){
deferred.reject(err);
}else{
deferred.resolve(isMember === 1);
}
});
return deferred.promise;
}

Job.prototype._done = function(list){
var deferred = when.defer();
var queue = this.queue;
var activeList = queue.toKey('active');
var completedList = queue.toKey(list);

queue.client.multi()
.lrem(activeList, 0, this.jobId)
.sadd(completedList, this.jobId)
.exec(function(err){
!err && deferred.resolve();
err && deferred.reject(err);
});
return deferred.promise;
}

/**
*/
Job.fromData = function(queue, jobId, data){
var job = new Job(queue, jobId, data.name, JSON.parse(data.data), data.opts);
job._progress = parseInt(data.progress);
return job;
}

module.exports = Job;
57 changes: 57 additions & 0 deletions lib/scripts.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
* Includes all the scripts needed by the queue and jobs.
*
*
*/
/*eslint-env node */
/*global Promise:true */
'use strict';

var cache = {};

function execScript(client, hash, script){
var sha = cache[hash];
var args;
var cached = Promise.resolved(sha);
if(!sha){
cached = cacheScript(client, hash, script);
}
cached.then(function(sha){
args.shift(sha);
return client.evalshaAsync.apply(client, args).catch(function(err){
// if ERR is that script is missing we need to re-cache and test again.
// delete cache[hash];
// return execScript(client, hash, script)
})
});
}

function cacheScript(client, hash, script){
return client.scriptAsync('LOAD', script).then(function(sha){
cache[hash] = sha;
return sha;
})
}

var scripts = {
isJobInList: function(client, listKey, jobId){
var script = [
'local function item_in_list (list, item)',
' for _, v in pairs(list) do',
' if v == item then',
' return 1',
' end',
' end',
' return nil',
'end',
'local items = redis.call("LRANGE", KEYS[1], 0, -1)',
'return item_in_list(items, ARGV[1])'
].join('\n');

return scripts._execScript(client, 'isJobInList', 1, listKey, jobId).then(function(result){
return result === 1;
});
}
}

module.exports.scripts = scripts;
48 changes: 48 additions & 0 deletions lib/state.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/**
State machine.
Distributed state machine.
*/

var machine = Machine('signup', opts); // opts -> redis connection mostly, name of the machine.

machine.state('send mail', function(data, next){
// In this state we send an email with a confirmation link and exit the state
next('wait confirmation', data);
});

machine.state('wait confirmation'); // // In this state we do nothing we just wait for an external input

machine.state('confirm user', function(task){
return data;
})

machine.next('wait confirmation', data);

/**
queue('wait confirmation').add(data);
*/
machine.state('transcode video', function(data){
// transcode...
this.next('append moov');
}).catch(function(err){
this.next('delete tmp');
});

machine.state('append moov', input, function(data, next){
// Append MOOV etc.
this.next('delete tmp');
});

machine.next('delete temp', input, function(data, next){
// delete temp file
this.next('update user account');
});

machine.state('update user account', function(data, next){
// update database
});




2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
"bluebird": "^2.10.2",
"lodash": "^3.10.1",
"node-uuid": "^1.4.7",
"redis": "^0.12.1",
"redis": "^2.4.2",
"semver": "^4.2.0"
},
"devDependencies": {
Expand Down
Loading

0 comments on commit 2fdc232

Please sign in to comment.