Skip to content
This repository was archived by the owner on Feb 7, 2023. It is now read-only.

Commit

Permalink
Rewrite the handling of process stalled jobs to be atomic so it doesn…
Browse files Browse the repository at this point in the history
…'t double-process jobs. See OptimalBits#356 for more on how this can happen.

Additionally, this addresses long-standing issue where a job can be considered "stalled" even though it was just moved to active and before a lock could be obtained by the worker that moved it (OptimalBits#258), by waiting a grace period (with a default of LOCK_RENEW_TIME) before considering a job as possibly stalled. This gives the (real) worker time to acquire its lock after moving it to active.

Note that this includes a small API change: the 'stalled' event is now passed an array of events.
  • Loading branch information
bradvogel committed Oct 15, 2016
1 parent 7b7d04c commit 00e213a
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 40 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ A queue emits also some useful events:
// Job started
// You can use jobPromise.cancel() to abort this job.
})
.on('stalled', function(job){
// The job was considered stalled (i.e. its lock was not renewed in LOCK_RENEW_TIME).
.on('stalled', function(jobs){
// Array of jobs that were considered 'stalled' and re-enqueued (from 'active' to 'wait').
// Useful for debugging job workers that crash or pause the event loop.
})
.on('progress', function(job, progress){
Expand Down
43 changes: 17 additions & 26 deletions lib/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,6 @@ var Queue = function Queue(name, redisPort, redisHost, redisOptions){
// Bind these methods to avoid constant rebinding and/or creating closures
// in processJobs etc.
this.processStalledJobs = this.processStalledJobs.bind(this);
this.processStalledJob = this.processStalledJob.bind(this);
this.getNextJob = this.getNextJob.bind(this);
this.processJobs = this.processJobs.bind(this);
this.processJob = this.processJob.bind(this);
Expand Down Expand Up @@ -528,42 +527,34 @@ Queue.prototype.updateDelayTimer = function(newDelayedTimestamp){
};

/**
Process jobs that have been added to the active list but are not being
processed properly.
* Process jobs that have been added to the active list but are not being
* processed properly. This can happen due to a process crash in the middle
* of processing a job, leaving it in 'active' but without a job lock.
* Note that there is no way to know for _certain_ if a job is "stalled"
* (since the process that moved it to active might just be slow to get the
* lock on it), so this function takes a 'grace period' parameter to ignore
* jobs that were created in the recent X milliseconds.
*
* @param {Number?} grace Duration in milliseconds. Ignore jobs created since this many milliseconds ago.
* Defaults to LOCK_RENEW_TIME.
*/
Queue.prototype.processStalledJobs = function(){
Queue.prototype.processStalledJobs = function(grace){
var _this = this;

grace = grace || this.LOCK_RENEW_TIME;

if(this.closing){
return this.closing;
} else{
return this.client.lrangeAsync(this.toKey('active'), 0, -1).then(function(jobs){
return Promise.each(jobs, function(jobId) {
return Job.fromId(_this, jobId).then(_this.processStalledJob);
});
return scripts.processStalledJobs(this, grace).then(function(jobs){
_this.emit('stalled', jobs);
return null;
}).catch(function(err){
console.error(err);
});
}
};

Queue.prototype.processStalledJob = function(job){
var _this = this;
if(this.closing){
return this.closing;
}

if(!job){
return Promise.resolve();
}else{
return scripts.getStalledJob(this, job, _this.token).then(function(isStalled){
if(isStalled){
_this.distEmit('stalled', job.toJSON());
return _this.processJob(job, true);
}
});
}
};

Queue.prototype.processJobs = function(resolve, reject){
var _this = this;
var processJobs = this.processJobs.bind(this, resolve, reject);
Expand Down
34 changes: 22 additions & 12 deletions lib/scripts.js
Original file line number Diff line number Diff line change
Expand Up @@ -355,26 +355,36 @@ var scripts = {
},

/**
* Gets a stalled job by locking it and checking it is not already completed.
* Returns a "OK" if the job was locked and not in completed set.
* Iterates to queue and moves all jobs in the active queue that appear to be
* stalled back to the wait state to be reprocessed.
*/
getStalledJob: function(queue, job, token){
processStalledJobs: function(queue, grace){
var script = [
'if redis.call("sismember", KEYS[1], ARGV[1]) == 0 then',
' return redis.call("set", KEYS[2], ARGV[2], "PX", ARGV[3], "NX")',
'local activeJobs = redis.call("LRANGE", KEYS[1], 0, -1)',
'local stalled = {}',
'for _, job in ipairs(activeJobs) do',
' local jobKey = ARGV[2] .. job',
' if(redis.call("EXISTS", jobKey .. ":lock") == 0) then',
' local jobTS = redis.call("HGET", jobKey, "timestamp")',
' if(jobTS and jobTS < ARGV[1]) then',
' redis.call("LREM", KEYS[1], 0, job)',
' redis.call("RPUSH", KEYS[2], job)',
' table.insert(stalled, job)',
' end',
' end',
'end',
'return 0'].join('\n');
'return stalled'
].join('\n');

var args = [
queue.client,
'getStalledJob',
'processStalledJobs',
script,
2,
queue.toKey('completed'),
job.lockKey(),
job.jobId,
token,
queue.LOCK_RENEW_TIME
queue.toKey('active'),
queue.toKey('wait'),
Date.now() - grace,
queue.toKey('')
];

return execScript.apply(scripts, args);
Expand Down

0 comments on commit 00e213a

Please sign in to comment.