Skip to content

Commit

Permalink
add task timeoutMS option
Browse files Browse the repository at this point in the history
  • Loading branch information
vkarpov15 committed Dec 2, 2024
1 parent 639bdb8 commit f1799b7
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 5 deletions.
32 changes: 27 additions & 5 deletions src/taskSchema.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ const taskSchema = new mongoose.Schema({
repeatAfterMS: {
type: Number
},
timeoutMS: {
type: Number
},
previousTaskId: {
type: mongoose.ObjectId
},
Expand Down Expand Up @@ -155,9 +158,21 @@ taskSchema.statics.execute = async function(task) {
}

try {
const result = await Promise.resolve(
this._handlers.get(task.name).call(task, task.params, task)
);
let result = null;
if (typeof task.timeoutMS === 'number') {
result = await Promise.race([
Promise.resolve(
this._handlers.get(task.name).call(task, task.params, task)
),
new Promise((_, reject) => {
setTimeout(() => reject(new Error(`Task timed out after ${task.timeoutMS} ms`)), task.timeoutMS);
})
]);
} else {
result = await Promise.resolve(
this._handlers.get(task.name).call(task, task.params, task)
);
}
task.status = 'succeeded';
task.result = result;
await task.save();
Expand Down Expand Up @@ -191,12 +206,19 @@ taskSchema.statics.execute = async function(task) {
return task;
};

taskSchema.statics.schedule = async function schedule(name, scheduledAt, params, repeatAfterMS) {
taskSchema.statics.schedule = async function schedule(name, scheduledAt, params, optionsOrRepeat) {
let repeatAfterMS = null;
let options = optionsOrRepeat;
if (typeof optionsOrRepeat === 'number') {
repeatAfterMS = optionsOrRepeat;
options = {};
}
return this.create({
name,
scheduledAt,
params,
repeatAfterMS
repeatAfterMS,
...options
});
};

Expand Down
21 changes: 21 additions & 0 deletions test/task.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -218,4 +218,25 @@ describe('Task', function() {
assert.equal(task.status, 'failed');
assert.equal(task.error.message, 'Sample error message');
});

it('handles task timeouts', async function() {
let resolve;
let reject;
const p = new Promise((_resolve, _reject) => {
resolve = _resolve;
reject = _reject;
});
Task.registerHandler('getQuestion', async () => {
await new Promise(resolve => setTimeout(resolve, 10000));
});

let task = await Task.schedule('getQuestion', time.now().valueOf() + 100000, null, { timeoutMS: 50 });

task = await Task.execute(task);

task = await Task.findById(task._id);
assert.ok(task);
assert.equal(task.status, 'failed');
assert.equal(task.error.message, 'Task timed out after 50 ms');
});
});

0 comments on commit f1799b7

Please sign in to comment.