Skip to content

Commit

Permalink
fix(job-scheduler): avoid duplicates when upserting in a quick sequence
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Dec 31, 2024
1 parent 0763cd6 commit 2c967b7
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 19 deletions.
32 changes: 18 additions & 14 deletions src/classes/job-scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,36 +81,38 @@ export class JobScheduler extends QueueBase {
return;
}

const prevMillis = opts.prevMillis || 0;
now = prevMillis < now ? now : prevMillis;

// Check if we have a start date for the repeatable job
const { startDate, immediately, ...filteredRepeatOpts } = repeatOpts;
let startMillis = now;
if (startDate) {
const startMillis = new Date(startDate).getTime();
now = startMillis > now ? startMillis : now;
startMillis = new Date(startDate).getTime();
startMillis = startMillis > now ? startMillis : now;
}

const prevMillis = opts.prevMillis || 0;
now = prevMillis < now ? now : prevMillis;

let nextMillis: number;
let newOffset = offset;
let newOffset = offset || 0;

if (every) {
const nextSlot = Math.floor(now / every) * every + every;
const prevSlot = Math.floor(startMillis / every) * every;
const nextSlot = prevSlot + every;
if (prevMillis || offset) {
nextMillis = nextSlot + (offset || 0);
nextMillis = nextSlot;
} else {
nextMillis = now;
newOffset = every - (nextSlot - now);
nextMillis = prevSlot;
newOffset = startMillis - prevSlot;

// newOffset should always be positive, but as an extra safety check
// newOffset should always be positive, but we do an extra safety check
newOffset = newOffset < 0 ? 0 : newOffset;
}
} else if (pattern) {
nextMillis = await this.repeatStrategy(now, repeatOpts, jobName);

if (nextMillis < now) {
nextMillis = now;
}
} else if (pattern) {
nextMillis = await this.repeatStrategy(now, repeatOpts, jobName);
}

const multi = (await this.client).multi();
Expand Down Expand Up @@ -163,6 +165,7 @@ export class JobScheduler extends QueueBase {
(<unknown>multi) as RedisClient,
jobName,
nextMillis,
newOffset,
jobSchedulerId,
{
...opts,
Expand Down Expand Up @@ -203,6 +206,7 @@ export class JobScheduler extends QueueBase {
client: RedisClient,
name: N,
nextMillis: number,
offset: number,
jobSchedulerId: string,
opts: JobsOptions,
data: T,
Expand All @@ -219,7 +223,7 @@ export class JobScheduler extends QueueBase {
});

const now = Date.now();
const delay = nextMillis - now;
const delay = nextMillis + offset - now;

const mergedOpts = {
...opts,
Expand Down
2 changes: 1 addition & 1 deletion src/commands/removeJobScheduler-3.lua
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@

--[[
Removes a repeatable job
Removes a job scheduler and its next scheduled job.
Input:
KEYS[1] job schedulers key
KEYS[2] delayed jobs key
Expand Down
59 changes: 55 additions & 4 deletions tests/test_job_scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,53 @@ describe('Job Scheduler', function () {
});
});

describe('when job schedulers are upserted in quick succession', function () {
let worker: Worker;
beforeEach(async function () {
const date = new Date('2017-02-07 9:24:00');
this.clock.setSystemTime(date);
worker = new Worker(
queueName,
async () => {
await this.clock.tickAsync(1);
},
{
connection,
prefix,
concurrency: 1,
},
);
await worker.waitUntilReady();
});
afterEach(async function () {
await worker.close();
});
it('should create only one job scheduler and one delayed job', async function () {
const jobSchedulerId = 'test';
await queue.upsertJobScheduler(jobSchedulerId, {
every: ONE_MINUTE * 5,
});
await this.clock.tickAsync(1);
await queue.upsertJobScheduler(jobSchedulerId, {
every: ONE_MINUTE * 5,
});

await queue.upsertJobScheduler(jobSchedulerId, {
every: ONE_MINUTE * 5,
});

await queue.upsertJobScheduler(jobSchedulerId, {
every: ONE_MINUTE * 5,
});

const repeatableJobs = await queue.getJobSchedulers();
expect(repeatableJobs.length).to.be.eql(1);
await this.clock.tickAsync(ONE_MINUTE);
const delayed = await queue.getDelayed();
expect(delayed).to.have.length(1);
});
});

describe('when clocks are slightly out of sync', function () {
it('should create only one delayed job', async function () {
const date = new Date('2017-02-07 9:24:00');
Expand Down Expand Up @@ -1684,13 +1731,17 @@ describe('Job Scheduler', function () {

await processingAfterFailing;

const failedCountAfterProcessing = await queue.getFailedCount();
expect(failedCountAfterProcessing).to.be.equal(0);

await worker.close();

const waitingCount = await queue.getWaitingCount();
const delayedCount2 = await queue.getDelayedCount();
expect(delayedCount2).to.be.equal(1);

const waitingCount = await queue.getWaitingCount();
expect(waitingCount).to.be.equal(0);
// Due to asynchronicities, the next job could be already in waiting state
// We just check that both are 1, as it should only exist 1 job in either waiting or delayed state
expect(waitingCount + delayedCount2).to.be.equal(1);
});

it('should not create a new delayed job if the failed job is retried with Job.retry()', async function () {
Expand Down Expand Up @@ -2250,7 +2301,7 @@ describe('Job Scheduler', function () {
delayStub.restore();
});

it('should repeat every 2 seconds with a startDate in the future', async function () {
it('should repeat every day with a startDate in the future', async function () {
this.timeout(10000);

// Set the initial system time
Expand Down

0 comments on commit 2c967b7

Please sign in to comment.