Database-agnostic task scheduler.
In every project that I have undertaken, there was always a need to run a periodic task of some sorts, e.g. hard-delete after 14-day soft delete. Typically this problem is solved by using a CRON-like system that simply runs a task at a set interval. However, ad-hoc solutions are added as requirements evolve, e.g. the need to run a task at a different interval and concurrency depending on whether the last attempt at running the task was successful.
Planton is a database-agnostic task scheduler for these type of tasks that abstracts logic for handling concurrency and different task scheduling strategies.
import {
createPlanton,
} from 'planton';
/**
* @property activeTaskInstructions A list of active task instructions as retrieved using `getActiveTaskInstructions`.
* @property concurrency The current concurrency setting value.
* @property limit A limit derived based on the value of `concurrency` and the number of `activeTaskInstructions` (CONCURRENCY - ACTIVE TASK INSTRUCTIONS = LIMIT).
*/
type ScheduleConfiguration = {
readonly activeTaskInstructions: TaskInstruction[];
readonly concurrency: number;
readonly limit: number;
readonly taskName: string;
};
/**
*
*/
type Schedule = (configuration: ScheduleConfiguration) => Promise<TaskInstruction[]>;
/**
* Produces a number (time in milliseconds) representing how long Planton must wait before attempting `schedule` function.
*/
type CalculateDelay = (attemptNumber: number) => Promise<number> | number;
/**
* Produces a number indicating how many tasks can be scheduled at most.
*/
type CalculateLimit = (concurrency: number, activeTaskInstructions: TaskInstruction[]) => Promise<number> | number;
/**
* @property concurrency Together with `getActiveTaskInstructions`, the `concurrency` setting is used to generate `limit` value that is passed to task scheduler.
* @property name A unique name of the task. Used to identify task scheduler in errors and for tracking active task instructions (see `getActiveTaskInstructions`).
*/
type TaskInput = {
readonly calculateDelay?: CalculateDelay;
readonly calculateLimit?: CalculateLimit;
readonly concurrency?: number;
readonly name: string;
readonly schedule: Schedule;
};
/**
* @property getActiveTaskInstructions Returns list of tasks that are currently being executed. Used for concurrency control.
*/
type PlantonConfiguration = {
readonly getActiveTaskInstructions: (taskName: string) => Promise<TaskInstruction[]>;
readonly tasks: TaskInput[]
};
createPlanton(configuration: PlantonConfigurationInput): Planton;
Planton schedulers are functions that upon invocation produce an array of instructions.
Planton schedulers are invoked at an interval no greater than the instructions produced by delay
function.
Schedulers are registered when constructing a Planton instance (see Example Usage). New schedulers cannot be registered after Planton has been initiated.
planton.events
is an instance of an event emitter.
Planton emits 2 types of events:
Emitted for each result returned by the schedulers.
planton.events.on('task', (taskEvent: TaskEvent) => {
// {
// taskName: 'send_user_email',
// instruction: 1,
// };
console.log(taskEvent);
});
Emitted for errors that happen during scheduling.
planton.events.on('error', (errorEvent: ErrorEvent) => {
// {
// taskName: 'send_user_email',
// error: Error,
// };
console.log(errorEvent);
});
All Planton errors extend from PlantonError
.
Planton produces 2 types of errors:
Produced when Planton is initiated with non-unique task names.
Additional error properties:
duplicateTaskName
describes the offending task name.
Produced when Planton is initiated with invalid configuration, e.g. concurrency
value less than 1.
Additional error properties:
taskName
Produced when task scheduler produces a result that is not an array or members of array are not strings.
Additional error properties:
taskName
unexpectedTaskInstructions
describes the offending instructions.
Tip: Inspect logs for additional details.
Planton is only responsible for dispatching the job tasks, i.e. task execution is outside of the scope of this package.
Use one the popular message queue systems:
Planton uses roarr to produce logs.
To enable logging, define ROARR_LOG=true
environment variable.
import {
createPlanton,
} from 'planton';
const planton = createPlanton({
getActiveTaskInstructions: (taskName) => {
return pool.anyFirst(sql`
SELECT mte1.instruction
FROM maintenance_task mt1
INNER JOIN maintenance_task_execution mte1 ON mte1.maintenance_task_id = mt1.id
WHERE
mt1.nid = ${taskName} AND
mte1.ended_at IS NULL
`);
},
tasks: [
{
calculateDelay: (attemptNumber) => {
// `attemptNumber` indicates how many times `schedule` was called
// without producing new instructions.
if (attemptNumber === 0) {
return 100;
}
// Incrementally increase back-off time when there are no new instructions.
// In production, consider using something more advanced (e.g. https://www.npmjs.com/package/simple-backoff).
return Math.max(
attemptNumber * 1000,
60 * 1000,
);
},
// New task scheduling will be attempted only when there are less than 2
// active instructions for the current task.
concurrency: 2,
name: 'send_user_email',
schedule: ({activeTaskInstructions, limit}) => {
return pool.anyFirst(sql`
UPDATE user_account as ua2
SET email_sent_at = now()
WHERE
ua2.id = ANY(
SELECT ua1.id
FROM user_account ua1
WHERE
ua1.email_sent_at IS NULL AND
ua1.id != ALL(${sql.array(activeTaskInstructions, 'int4')})
LIMIT ${limit}
FOR UPDATE OF ua1 SKIP LOCKED
)
RETURNING ua2.id
`);
},
},
],
});
planton.events.on('task', (taskEvent: TaskEvent) => {
// {
// taskName: 'send_user_email',
// instruction: 1,
// };
console.log(taskEvent);
});
While Planton is 100% database agnostic, an example schema may help you to get started. Below is a PostgreSQL schema on which the usage example is based.
The key insights from the schema:
nid
(named identifier) is used to establishing relationship between task instructions in the codebase and the database.- Each task execution gets a record in
maintenance_task_execution
which records instructions passed by Planton. - There are a lot of supporting fields used to describe the state of the task execution. All of these fields are provided for example purposes only. It is up to the task executor to utilise them.
maintenance_task_execution
isUNLOGGED
for performance reasons.
CREATE TABLE public.maintenance_task (
id integer NOT NULL,
nid text NOT NULL,
maximum_execution_duration interval DEFAULT '00:05:00'::interval NOT NULL,
maximum_concurrent_execution_count integer DEFAULT 0 NOT NULL
);
CREATE UNLOGGED TABLE public.maintenance_task_execution (
id integer NOT NULL,
maintenance_task_id integer NOT NULL,
instruction text,
started_at timestamp with time zone NOT NULL,
ended_at timestamp with time zone,
hostname text,
state text NOT NULL,
error jsonb,
terminated_at timestamp with time zone,
terminated_reason text,
last_seen_at timestamp with time zone,
created_at timestamp with time zone DEFAULT now() NOT NULL,
CONSTRAINT maintenance_task_execution_check CHECK (((state = 'COMPLETED'::text) OR (ended_at IS NULL))),
CONSTRAINT maintenance_task_execution_check1 CHECK (((terminated_at IS NULL) OR (ended_at IS NOT NULL))),
CONSTRAINT maintenance_task_execution_state_check CHECK ((state = ANY (ARRAY['RUNNING'::text, 'COMPLETED'::text])))
);
CREATE INDEX maintenance_task_id_idx
ON public.maintenance_task_execution
USING btree (maintenance_task_id);
CREATE INDEX maintenance_task_execution_ended_at_maintenance_task_id_idx
ON public.maintenance_task_execution
USING btree (ended_at, maintenance_task_id);
CREATE INDEX maintenance_task_execution_maintenance_task_id_idx
ON public.maintenance_task_execution
USING btree (maintenance_task_id)
WHERE (ended_at IS NULL);
CREATE UNIQUE INDEX maintenance_task_name_idx
ON public.maintenance_task
USING btree (nid);
ALTER TABLE ONLY public.maintenance_task_execution
ADD CONSTRAINT maintenance_task_execution_maintenance_task_id_fkey
FOREIGN KEY (maintenance_task_id)
REFERENCES public.maintenance_task(id)
ON DELETE CASCADE;
Planton is only aware of scheduling instructions produced by delay
function. This (may) make Planton unsuitable for horizontal scaling, e.g. consider a scenario where you want to execute a function at most once every 5 minutes, and executing it more often would produce undesirable side-effects. For use cases such as these, you may want to consider the alternatives.
However, for many use cases the above limitation shouldn't be an issue. As illustrated in an example implementation, using row locking when polling tasks ensures that you do not schedule the same tasks, meanwhile limit
in combination with activeTaskInstructions
reenforce the exclusivity constraint and limit concurrency.
Planton is designed to solve the problem of concurrency and task scheduling by polling the underlying database to identify if tasks need & can be scheduled.
Direct alternatives to Planton:
- bree – uses workers to spawn sandboxed processes, and supports async/await, retries, throttling, concurrency, and graceful shutdown.
If you simply need to run tasks at a pre-defined time interval, then you may want to consider simpler alternatives such as the ones below.