Skip to content

Commit

Permalink
feat: add error events
Browse files Browse the repository at this point in the history
  • Loading branch information
gajus committed Nov 26, 2020
1 parent 64fa52f commit b910bfa
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 6 deletions.
40 changes: 38 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,12 @@ const planton = createPlanton({
],
});

planton.events.on('task', (task) => {
planton.events.on('task', (taskEvent: TaskEvent) => {
// {
// taskName: 'send_user_email',
// instruction: 1,
// };
console.log(task);
console.log(taskEvent);
});

```
Expand Down Expand Up @@ -158,3 +158,39 @@ Use one the popular message queue systems:

* [RabbitMQ](https://www.rabbitmq.com/)
* [BullMQ](https://github.com/taskforcesh/bullmq)

## Handling events

`planton.events` is an instance of an event emitter.

Planton emits 2 types of events:

### `task`

Emitted for each result returned by the schedulers.

```js
planton.events.on('task', (taskEvent: TaskEvent) => {
// {
// taskName: 'send_user_email',
// instruction: 1,
// };
console.log(taskEvent);
});

```

### `error`

Emitter for errors that happen during scheduling.

```js
planton.events.on('error', (errorEvent: ErrorEvent) => {
// {
// taskName: 'send_user_email',
// error: Error,
// };
console.log(errorEvent);
});

```
25 changes: 21 additions & 4 deletions src/factories/createPlanton.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ type TaskEvent = {
readonly instruction: string;
};

type ErrorEvent = {
readonly taskName: string;
readonly error: Error;
};

type ScheduleConfigurationInput = {
readonly activeTaskInstructions: TaskInstruction[];
readonly limit: number;
Expand Down Expand Up @@ -51,6 +56,7 @@ type InternalTask = {
};

type EventMap = {
error: ErrorEvent,
task: TaskEvent,
};

Expand Down Expand Up @@ -103,10 +109,21 @@ const createPlanton = (configuration: PlantonConfigurationInput): Planton => {
break;
}

const taskInstructions = await inputTask.schedule({
activeTaskInstructions,
limit: concurrency - activeTaskInstructions.length,
});
let taskInstructions: TaskInstruction[];

try {
taskInstructions = await inputTask.schedule({
activeTaskInstructions,
limit: concurrency - activeTaskInstructions.length,
});
} catch (error) {
events.emit('error', {
error,
taskName: task.name || '',
});

taskInstructions = [];
}

if (taskInstructions.length > 0) {
task.attemptNumber = 0;
Expand Down
40 changes: 40 additions & 0 deletions test/planton/factories/createPlanton.ts
Original file line number Diff line number Diff line change
Expand Up @@ -272,3 +272,43 @@ test('terminate waits for scheduling to complete', async (t) => {

t.true(Date.now() - startTermination >= 500);
});

test('unhandled scheduler errors trigger error event', async (t) => {
const eventHandler = sinon.spy();

const error = new Error('foo');

const planton = createPlanton({
getActiveTaskInstructions: () => {
return [];
},
tasks: [
{
delay: () => {
return 50;
},
name: 'foo',
schedule: async () => {
throw error;

return [];
},
},
],
});

planton.events.on('error', eventHandler);

await delay(140);

// Ensures that we do not stop calling scheduler after the first error.
// Ensures that even after error we use the same delay.
t.is(eventHandler.callCount, 3);

t.deepEqual(eventHandler.firstCall.firstArg, {
error,
taskName: 'foo',
});

t.true(true);
});

0 comments on commit b910bfa

Please sign in to comment.