diff --git a/lib/commands/reprocessJob-4.lua b/lib/commands/reprocessJob-4.lua deleted file mode 100644 index 4861c40b0..000000000 --- a/lib/commands/reprocessJob-4.lua +++ /dev/null @@ -1,42 +0,0 @@ ---[[ - Attempts to reprocess a job - - Input: - KEYS[1] job key - KEYS[2] job lock key - KEYS[3] job state - KEYS[4] wait key - - ARGV[1] job.id, - ARGV[2] (job.opts.lifo ? 'R' : 'L') + 'PUSH' - ARGV[3] token - ARGV[4] timestamp - - Output: - 1 means the operation was a success - 0 means the job does not exist - -1 means the job is currently locked and can't be retried. - -2 means the job was not found in the expected set. - - -]] -if (redis.call("EXISTS", KEYS[1]) == 1) then - if (redis.call("EXISTS", KEYS[2]) == 0) then - redis.call("HDEL", KEYS[1], "finishedOn", "processedOn", "failedReason") - redis.call("HSET", KEYS[1], "retriedOn", ARGV[4]) - - if (redis.call("ZREM", KEYS[3], ARGV[1]) == 1) then - redis.call(ARGV[2], KEYS[4], ARGV[1]) - - -- Emit waiting event (wait..ing@token) - redis.call("PUBLISH", KEYS[4] .. "ing@" .. ARGV[3], ARGV[1]) - return 1 - else - return -2 - end - else - return -1 - end -else - return 0 -end diff --git a/lib/commands/reprocessJob-6.lua b/lib/commands/reprocessJob-6.lua new file mode 100644 index 000000000..0eaed54a4 --- /dev/null +++ b/lib/commands/reprocessJob-6.lua @@ -0,0 +1,52 @@ +--[[ + Attempts to reprocess a job + + Input: + KEYS[1] job key + KEYS[2] job lock key + KEYS[3] job state + KEYS[4] wait key + KEYS[5] meta-pause + KEYS[6] paused key + + ARGV[1] job.id, + ARGV[2] (job.opts.lifo ? 'R' : 'L') + 'PUSH' + ARGV[3] token + ARGV[4] timestamp + + Output: + 1 means the operation was a success + 0 means the job does not exist + -1 means the job is currently locked and can't be retried. + -2 means the job was not found in the expected set. + + +]] +local rcall = redis.call; +if (rcall("EXISTS", KEYS[1]) == 1) then + if (rcall("EXISTS", KEYS[2]) == 0) then + rcall("HDEL", KEYS[1], "finishedOn", "processedOn", "failedReason") + rcall("HSET", KEYS[1], "retriedOn", ARGV[4]) + + if (rcall("ZREM", KEYS[3], ARGV[1]) == 1) then + local target + if rcall("EXISTS", KEYS[5]) ~= 1 then + target = KEYS[4] + else + target = KEYS[6] + end + + rcall(ARGV[2], target, ARGV[1]) + + -- Emit waiting event (wait..ing@token) + rcall("PUBLISH", KEYS[4] .. "ing@" .. ARGV[3], ARGV[1]) + return 1 + else + return -2 + end + else + return -1 + end +else + return 0 +end diff --git a/lib/commands/retryJob-3.lua b/lib/commands/retryJob-5.lua similarity index 76% rename from lib/commands/retryJob-3.lua rename to lib/commands/retryJob-5.lua index 0a1d13528..c6c834cdd 100644 --- a/lib/commands/retryJob-3.lua +++ b/lib/commands/retryJob-5.lua @@ -5,6 +5,8 @@ KEYS[1] 'active', KEYS[2] 'wait' KEYS[3] jobId + KEYS[4] 'meta-paused' + KEYS[5] 'paused' ARGV[1] pushCmd ARGV[2] jobId @@ -30,7 +32,15 @@ if redis.call("EXISTS", KEYS[3]) == 1 then end redis.call("LREM", KEYS[1], 0, ARGV[2]) - redis.call(ARGV[1], KEYS[2], ARGV[2]) + + local target + if rcall("EXISTS", KEYS[4]) ~= 1 then + target = KEYS[2] + else + target = KEYS[5] + end + + redis.call(ARGV[1], target, ARGV[2]) return 0 else diff --git a/lib/commands/retryJobs-3.lua b/lib/commands/retryJobs-3.lua deleted file mode 100644 index c2f8190b5..000000000 --- a/lib/commands/retryJobs-3.lua +++ /dev/null @@ -1,57 +0,0 @@ ---[[ - Attempts to retry all failed jobs - - Input: - KEYS[1] base key - KEYS[2] failed state key - KEYS[3] wait state key - - ARGV[1] count - - Output: - 1 means the operation is not completed - 0 means the operation is completed -]] -local baseKey = KEYS[1] -local maxCount = tonumber(ARGV[1]) - -local rcall = redis.call; - -local function batches(n, batchSize) - local i = 0 - - return function() - local from = i * batchSize + 1 - i = i + 1 - if (from <= n) then - local to = math.min(from + batchSize - 1, n) - return from, to - end - end -end - -local function getZSetItems(keyName, max) - return rcall('ZRANGE', keyName, 0, max - 1) -end - -local jobs = getZSetItems(KEYS[2], maxCount) - -if (#jobs > 0) then - for i, key in ipairs(jobs) do - local jobKey = baseKey .. key - rcall("HDEL", jobKey, "finishedOn", "processedOn", "failedReason") - end - - for from, to in batches(#jobs, 7000) do - rcall("ZREM", KEYS[2], unpack(jobs, from, to)) - rcall("LPUSH", KEYS[3], unpack(jobs, from, to)) - end -end - -maxCount = maxCount - #jobs - -if(maxCount <= 0) then - return 1 -end - -return 0 diff --git a/lib/commands/retryJobs-5.lua b/lib/commands/retryJobs-5.lua new file mode 100644 index 000000000..75f7d5ba9 --- /dev/null +++ b/lib/commands/retryJobs-5.lua @@ -0,0 +1,64 @@ +--[[ + Attempts to retry all failed jobs + + Input: + KEYS[1] base key + KEYS[2] failed state key + KEYS[3] wait state key + KEYS[4] 'meta-paused' + KEYS[5] 'paused' + + ARGV[1] count + + Output: + 1 means the operation is not completed + 0 means the operation is completed +]] +local baseKey = KEYS[1] +local maxCount = tonumber(ARGV[1]) + +local rcall = redis.call; + +local function batches(n, batchSize) + local i = 0 + + return function() + local from = i * batchSize + 1 + i = i + 1 + if (from <= n) then + local to = math.min(from + batchSize - 1, n) + return from, to + end + end +end + +local function getZSetItems(keyName, max) + return rcall('ZRANGE', keyName, 0, max - 1) +end + +local jobs = getZSetItems(KEYS[2], maxCount) + +if (#jobs > 0) then + for i, key in ipairs(jobs) do + local jobKey = baseKey .. key + rcall("HDEL", jobKey, "finishedOn", "processedOn", "failedReason") + end + + local target + if rcall("EXISTS", KEYS[4]) ~= 1 then + target = KEYS[3] + else + target = KEYS[5] + end + + for from, to in batches(#jobs, 7000) do + rcall("ZREM", KEYS[2], unpack(jobs, from, to)) + rcall("LPUSH", target, unpack(jobs, from, to)) + end +end + +maxCount = maxCount - #jobs + +if (maxCount <= 0) then return 1 end + +return 0 diff --git a/lib/scripts.js b/lib/scripts.js index ba0b58675..6e116a81c 100644 --- a/lib/scripts.js +++ b/lib/scripts.js @@ -115,7 +115,13 @@ const scripts = { }, retryJobsArgs(queue, count) { - const keys = [queue.toKey(''), queue.toKey('failed'), queue.toKey('wait')]; + const keys = [ + queue.toKey(''), + queue.toKey('failed'), + queue.toKey('wait'), + queue.toKey('meta-paused'), + queue.toKey('paused') + ]; const args = [count]; @@ -455,9 +461,12 @@ const scripts = { const queue = job.queue; const jobId = job.id; - const keys = _.map(['active', 'wait', jobId], name => { - return queue.toKey(name); - }); + const keys = _.map( + ['active', 'wait', jobId, 'meta-paused', 'paused'], + name => { + return queue.toKey(name); + } + ); const pushCmd = (job.opts.lifo ? 'R' : 'L') + 'PUSH'; @@ -485,7 +494,9 @@ const scripts = { queue.toKey(job.id), queue.toKey(job.id) + ':lock', queue.toKey(options.state), - queue.toKey('wait') + queue.toKey('wait'), + queue.toKey('meta-paused'), + queue.toKey('paused') ]; const args = [ diff --git a/test/test_queue.js b/test/test_queue.js index a4edf845a..b7693262c 100644 --- a/test/test_queue.js +++ b/test/test_queue.js @@ -665,6 +665,65 @@ describe('Queue', () => { const CompletedCount = await queue.getJobCounts('completed'); expect(CompletedCount.completed).to.be.equal(jobCount); }); + + it('should move to pause all failed jobs if the queue is paused', async () => { + const jobCount = 8; + + let fail = true; + queue.process(async () => { + await delay(10); + if (fail) { + throw new Error('failed'); + } + }); + + let order = 0; + const failing = new Promise(resolve => { + queue.on('failed', job => { + expect(order).to.be.eql(job.data.idx); + if (order === jobCount - 1) { + resolve(); + } + order++; + }); + }); + + for (const index of Array.from(Array(jobCount).keys())) { + await queue.add({ idx: index }); + } + + await failing; + + const failedCount = await queue.getJobCounts('failed'); + expect(failedCount.failed).to.be.equal(jobCount); + + order = 0; + const completing = new Promise(resolve => { + queue.on('completed', job => { + expect(order).to.be.eql(job.data.idx); + if (order === jobCount - 1) { + resolve(); + } + order++; + }); + }); + + fail = false; + + await queue.pause(); + + await queue.retryJobs({ count: 2 }); + + const pausedJobs = await queue.getJobs(['paused']); + expect(pausedJobs).to.have.lengthOf(jobCount); + + await queue.resume(); + + await completing; + + const CompletedCount = await queue.getJobCounts('completed'); + expect(CompletedCount.completed).to.be.equal(jobCount); + }); }); it('should keep specified number of jobs after completed with removeOnComplete', async () => { @@ -1885,6 +1944,56 @@ describe('Queue', () => { }); }); + it('retry a job that fails on a paused queue moves the job to paused', async () => { + let called = 0; + let failedOnce = false; + const notEvenErr = new Error('Not even!'); + + const retryQueue = utils.buildQueue('retry-test-queue'); + + const job = await retryQueue.add({ foo: 'bar' }); + expect(job.id).to.be.ok; + expect(job.data.foo).to.be.eql('bar'); + + retryQueue.process((job, jobDone) => { + called++; + if (called % 2 !== 0) { + throw notEvenErr; + } + jobDone(); + }); + + const failed = new Promise(resolve => { + retryQueue.once('failed', async (job, err) => { + expect(job).to.be.ok; + expect(job.data.foo).to.be.eql('bar'); + expect(err).to.be.eql(notEvenErr); + failedOnce = true; + resolve(); + }); + }); + + await failed; + + await retryQueue.pause(); + + await retryQueue.retryJob(job); + + const pausedJobs = await retryQueue.getJobs(['paused']); + expect(pausedJobs).to.have.length(1); + + await retryQueue.resume(); + + const completed = new Promise(resolve => { + retryQueue.once('completed', () => { + expect(failedOnce).to.be.eql(true); + retryQueue.close().then(resolve); + }); + }); + + await completed; + }); + it('retry a job that fails using job retry method', done => { let called = 0; let failedOnce = false;