Skip to content

Commit

Permalink
fixed #125
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Aug 29, 2016
1 parent d55ad1c commit 7b3859a
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 6 deletions.
82 changes: 76 additions & 6 deletions lib/job.js
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,7 @@ Job.prototype.moveToCompleted = function(returnValue, token){

Job.prototype.moveToFailed = function(err){
var _this = this;
this.stacktrace.push(err.stack);
return this._saveAttempt().then(function() {
return this._saveAttempt(err).then(function() {
// Check if an automatic retry should be performed
if(_this.attemptsMade < _this.attempts){
// Check if backoff is needed
Expand Down Expand Up @@ -321,6 +320,72 @@ Job.prototype.remove = function(token){
});
};

/**
* Returns a promise the resolves when the job has been finished.
* TODO: Add a watchdog to check if the job has finished periodically.
* since pubsub does not give any guarantees.
*/
Job.prototype.finished = function(){
var _this = this;

function status(resolve, reject){
return _this.isCompleted().then(function(completed){
if(!completed){
return _this.isFailed().then(function(failed){
if(failed){
return Job.fromId(_this.queue, _this.jobId, 'failedReason').then(function(data){
reject(Error(data.failedReason));
return true;
});
}
});
}
resolve();
return true;
});
}

return new Promise(function(resolve, reject){
status(resolve, reject).then(function(finished){
if(!finished){
function onCompleted(job){
if(job.jobId === _this.jobId){
resolve();
}
removeListeners();
}

function onFailed(job, err){
if(job.jobId === _this.jobId){
reject(err);
}
removeListeners();
}

function removeListeners(){
_this.queue.removeListener('completed', onCompleted);
_this.queue.removeListener('failed', onFailed);
}

_this.queue.on('completed', onCompleted);
_this.queue.on('failed', onFailed);

//
// Watchdog
//
var interval = setInterval(function(){
status(resolve, reject).then(function(finished){
if(finished){
removeListeners();
clearInterval(interval );
}
})
}, 5000);
};
});
});
}

// -----------------------------------------------------------------------------
// Private methods
// -----------------------------------------------------------------------------
Expand Down Expand Up @@ -407,7 +472,7 @@ Job.prototype._retryAtOnce = function(){
});
};

Job.prototype._saveAttempt = function(){
Job.prototype._saveAttempt = function(err){
if(isNaN(this.attemptsMade)){
this.attemptsMade = 1;
}else{
Expand All @@ -416,9 +481,12 @@ Job.prototype._saveAttempt = function(){
var params = {
attemptsMade: this.attemptsMade
};
if(this.stacktrace){
params.stacktrace = JSON.stringify(this.stacktrace);
}

this.stacktrace.push(err.stack);
params.stacktrace = JSON.stringify(this.stacktrace);

params.failedReason = err.message;

return this.queue.client.hmsetAsync(this.queue.toKey(this.jobId), params);
};

Expand All @@ -430,6 +498,8 @@ Job.fromData = function(queue, jobId, data){
job._progress = parseInt(data.progress);
job.delay = parseInt(data.delay);
job.timestamp = parseInt(data.timestamp);

job.failedReason = data.failedReason;
job.attempts = parseInt(data.attempts);
if(isNaN(job.attempts)) {
job.attempts = 1; // Default to 1 try for legacy jobs
Expand Down
59 changes: 59 additions & 0 deletions test/test_job.js
Original file line number Diff line number Diff line change
Expand Up @@ -446,4 +446,63 @@ describe('Job', function(){
});
});

describe('.finished', function() {
it('should resolve when the job has been completed', function(done){
queue.process(function () {
return Promise.resolve();
});
queue.add({ foo: 'bar' }).then(function(job){
return job.finished();
}).then(function(){
done();
}, done);
});

it('should reject when the job has been completed', function(done){
queue.process(function () {
return Promise.reject(Error('test error'));
});
queue.add({ foo: 'bar' }).then(function(job){
return job.finished();
}).then(function(){
done(Error('should have been rejected'));
}, function(err){
expect(err.message).equal('test error');
done();
});
});

it('should resolve directly if already processed', function(done){
queue.process(function () {
return Promise.resolve();
});
queue.add({ foo: 'bar' }).then(function(job){
return Promise.delay(1500).then(function(){
return job.finished();
})
}).then(function(){
done();
}, done);
});

it('should reject directly if already processed', function(done){
queue.process(function () {
return Promise.reject(Error('test error'));
});
queue.add({ foo: 'bar' }).then(function(job){
return Promise.delay(1500).then(function(){
return job.finished();
});
}).then(function(){
done(Error('should have been rejected'));
}, function(err){
expect(err.message).equal('test error');
done();
});
});

it.skip('should resolve using the watchdog if pubsub was lost');
it.skip('should reject using the watchdog if pubsub was lost');

});
});

0 comments on commit 7b3859a

Please sign in to comment.