diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 50814ccd..0f519fb7 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -39,6 +39,10 @@ Read more: output via logging (thanks @wineTGH). - Fix race condition when multiple workers attempt to initialise the database at the same time +- `helpers.abortSignal` is no longer typed as `| undefined`. It is still + experimental! +- `helpers.abortPromise` added; will reject when `abortSignal` aborts (useful + for `Promise.race()`) ## v0.16.6 diff --git a/__tests__/getTasks.test.ts b/__tests__/getTasks.test.ts index 8dc7be73..db8d5498 100644 --- a/__tests__/getTasks.test.ts +++ b/__tests__/getTasks.test.ts @@ -10,6 +10,12 @@ import { makeMockJob, withPgClient } from "./helpers"; const options: WorkerSharedOptions = {}; +const neverAbortController = new AbortController(); +const abortSignal = neverAbortController.signal; +const abortPromise = new Promise((_, reject) => { + abortSignal.addEventListener("abort", reject); +}); + describe("commonjs", () => { test("gets tasks from folder", () => withPgClient(async (client) => { @@ -32,7 +38,8 @@ Array [ withPgClient: makeEnhancedWithPgClient( makeWithPgClientFromClient(client), ), - abortSignal: undefined, + abortSignal, + abortPromise, }, ); expect(await tasks.wouldyoulike!(helpers.job.payload, helpers)).toEqual( @@ -68,7 +75,8 @@ Array [ withPgClient: makeEnhancedWithPgClient( makeWithPgClientFromClient(client), ), - abortSignal: undefined, + abortSignal, + abortPromise, }, ); expect(await tasks.task1!(helpers.job.payload, helpers)).toEqual("hi"); @@ -98,7 +106,8 @@ Array [ withPgClient: makeEnhancedWithPgClient( makeWithPgClientFromClient(client), ), - abortSignal: undefined, + abortSignal, + abortPromise, }, ); expect(await tasks.task1!(helpers.job.payload, helpers)).toEqual("hi"); @@ -127,7 +136,8 @@ Array [ withPgClient: makeEnhancedWithPgClient( makeWithPgClientFromClient(client), ), - abortSignal: undefined, + abortSignal, + abortPromise, }); expect(await tasks.t1!(helpers.job.payload, helpers)).toEqual( "come with me", @@ -157,7 +167,8 @@ Array [ withPgClient: makeEnhancedWithPgClient( makeWithPgClientFromClient(client), ), - abortSignal: undefined, + abortSignal, + abortPromise, }); expect(await tasks.t1!(helpers.job.payload, helpers)).toEqual( "come with me, TS", @@ -191,7 +202,8 @@ Array [ withPgClient: makeEnhancedWithPgClient( makeWithPgClientFromClient(client), ), - abortSignal: undefined, + abortSignal, + abortPromise, }, ); expect(await tasks.wouldyoulike!(helpers.job.payload, helpers)).toEqual( @@ -224,7 +236,8 @@ Array [ withPgClient: makeEnhancedWithPgClient( makeWithPgClientFromClient(client), ), - abortSignal: undefined, + abortSignal, + abortPromise, }, ); expect(await tasks.task1!(helpers.job.payload, helpers)).toEqual("hi"); @@ -251,7 +264,8 @@ Array [ withPgClient: makeEnhancedWithPgClient( makeWithPgClientFromClient(client), ), - abortSignal: undefined, + abortSignal, + abortPromise, }); expect(await tasks.t1!(helpers.job.payload, helpers)).toEqual( "come with me", diff --git a/__tests__/helpers.ts b/__tests__/helpers.ts index 966e5a2e..6a69e50b 100644 --- a/__tests__/helpers.ts +++ b/__tests__/helpers.ts @@ -101,14 +101,16 @@ export async function withPgPool( cb: (pool: pg.Pool) => Promise, ): Promise { const { TEST_CONNECTION_STRING } = databaseDetails!; - const pool = new pg.Pool({ + const pgPool = new pg.Pool({ connectionString: TEST_CONNECTION_STRING, max: 100, }); + pgPool.on("error", () => {}); + pgPool.on("connect", () => {}); try { - return await cb(pool); + return await cb(pgPool); } finally { - pool.end(); + pgPool.end(); } } @@ -298,14 +300,22 @@ export function makeMockJob(taskIdentifier: string): Job { export async function makeSelectionOfJobs( utils: WorkerUtils, - pgClient: pg.PoolClient, + pgClient: pg.PoolClient | pg.Pool, ) { const future = new Date(Date.now() + 60 * 60 * 1000); - const failedJob: DbJob = await utils.addJob("job3", { a: 1, runAt: future }); - const regularJob1 = await utils.addJob("job3", { a: 2, runAt: future }); - const lockedJob: DbJob = await utils.addJob("job3", { a: 3, runAt: future }); - const regularJob2 = await utils.addJob("job3", { a: 4, runAt: future }); - const untouchedJob = await utils.addJob("job3", { a: 5, runAt: future }); + const failedJob: DbJob = await utils.addJob( + "job3", + { a: 1 }, + { runAt: future }, + ); + const regularJob1 = await utils.addJob("job3", { a: 2 }, { runAt: future }); + const lockedJob: DbJob = await utils.addJob( + "job3", + { a: 3 }, + { runAt: future }, + ); + const regularJob2 = await utils.addJob("job3", { a: 4 }, { runAt: future }); + const untouchedJob = await utils.addJob("job3", { a: 5 }, { runAt: future }); const { rows: [lockedJobUpdate], } = await pgClient.query( diff --git a/__tests__/main.runTaskList.test.ts b/__tests__/main.runTaskList.test.ts index 211f0ca9..cf32d567 100644 --- a/__tests__/main.runTaskList.test.ts +++ b/__tests__/main.runTaskList.test.ts @@ -2,11 +2,12 @@ import { Pool } from "pg"; import deferred, { Deferred } from "../src/deferred"; -import { Task, TaskList, WorkerSharedOptions } from "../src/interfaces"; +import { Job, Task, TaskList, WorkerSharedOptions } from "../src/interfaces"; import { runTaskList } from "../src/main"; import { ESCAPED_GRAPHILE_WORKER_SCHEMA, expectJobCount, + getJobs, reset, sleep, sleepUntil, @@ -100,3 +101,35 @@ test("doesn't bail on deprecated `debug` function", () => } } })); + +test("gracefulShutdown", async () => + withPgPool(async (pgPool) => { + let jobStarted = false; + const tasks: TaskList = { + job1(payload, helpers) { + jobStarted = true; + return Promise.race([sleep(100000, true), helpers.abortPromise]); + }, + }; + const workerPool = runTaskList( + { concurrency: 3, gracefulShutdownAbortTimeout: 20, useNodeTime: true }, + tasks, + pgPool, + ); + await addJob(pgPool); + await sleepUntil(() => jobStarted); + await workerPool.gracefulShutdown(); + await workerPool.promise; + let jobs: Job[] = []; + for (let attempts = 0; attempts < 10; attempts++) { + jobs = await getJobs(pgPool); + if (jobs[0]?.last_error) { + break; + } else { + await sleep(25 * attempts); + } + } + expect(jobs).toHaveLength(1); + const [job] = jobs; + expect(job.last_error).toBeTruthy(); + })); diff --git a/__tests__/runner.runOnce.test.ts b/__tests__/runner.runOnce.test.ts index 89a0246e..089c0587 100644 --- a/__tests__/runner.runOnce.test.ts +++ b/__tests__/runner.runOnce.test.ts @@ -1,10 +1,20 @@ import { Pool } from "pg"; import { makeWorkerPresetWorkerOptions } from "../src/config"; -import { RunnerOptions } from "../src/interfaces"; +import { Job, RunnerOptions, WorkerUtils } from "../src/interfaces"; +import { _allWorkerPools } from "../src/main"; import { WorkerPreset } from "../src/preset"; import { runOnce } from "../src/runner"; -import { databaseDetails, withPgPool } from "./helpers"; +import { makeWorkerUtils } from "../src/workerUtils"; +import { + databaseDetails, + getJobs, + makeSelectionOfJobs, + reset, + sleep, + sleepUntil, + withPgPool, +} from "./helpers"; delete process.env.DATABASE_URL; delete process.env.PGDATABASE; @@ -83,10 +93,13 @@ test("at least a connectionString, a pgPool, the DATABASE_URL or PGDATABASE envv }); test("connectionString and a pgPool cannot provided a the same time", async () => { + const pgPool = new Pool(); + pgPool.on("error", () => {}); + pgPool.on("connect", () => {}); const options: RunnerOptions = { taskList: { task: () => {} }, connectionString: databaseDetails!.TEST_CONNECTION_STRING, - pgPool: new Pool(), + pgPool, }; await runOnceErrorAssertion( options, @@ -141,3 +154,92 @@ test("providing just a pgPool is possible", async () => expect.assertions(0); await runOnce(options); })); + +let utils: WorkerUtils | null = null; +afterEach(async () => { + await utils?.release(); + utils = null; +}); + +test("runs all available tasks and then exits", async () => + withPgPool(async (pgPool) => { + const options: RunnerOptions = { + taskList: { job1: () => {}, job2: () => {}, job3: () => {} }, + pgPool: pgPool, + useNodeTime: true, + }; + utils = await makeWorkerUtils(options); + await utils.addJob("job1", { id: "PRE_SELECTION_1" }); + await utils.addJob("job2", { id: "PRE_SELECTION_2" }); + await utils.addJob("job3", { id: "PRE_SELECTION_3" }); + const unavailableJobs = Object.values( + await makeSelectionOfJobs(utils, pgPool), + ); + await utils.addJob("job1", { id: "POST_SELECTION_1" }); + await utils.addJob("job2", { id: "POST_SELECTION_2" }); + await utils.addJob("job3", { id: "POST_SELECTION_3" }); + { + const jobs = await getJobs(pgPool); + expect(jobs).toHaveLength(unavailableJobs.length + 6); + } + await runOnce(options); + { + const unavailableJobIds = unavailableJobs.map((j) => j.id); + let jobs!: Job[]; + for (let attempts = 0; attempts < 10; attempts++) { + jobs = await getJobs(pgPool); + if (jobs.length === unavailableJobs.length) { + break; + } else { + await sleep(attempts * 50); + } + } + expect(jobs).toHaveLength(unavailableJobs.length); + expect( + jobs.filter((j) => !unavailableJobIds.includes(j.id)), + ).toHaveLength(0); + } + })); + +test("gracefulShutdown", async () => + withPgPool(async (pgPool) => { + let jobStarted = false; + const options: RunnerOptions = { + taskList: { + job1(payload, helpers) { + jobStarted = true; + return Promise.race([sleep(100000, true), helpers.abortPromise]); + }, + }, + pgPool, + preset: { + worker: { + gracefulShutdownAbortTimeout: 20, + useNodeTime: true, + }, + }, + }; + await reset(pgPool, options); + utils = await makeWorkerUtils(options); + await utils.addJob("job1", { id: "test sleep" }); + expect(_allWorkerPools).toHaveLength(0); + const promise = runOnce(options); + await sleepUntil(() => _allWorkerPools.length === 1); + expect(_allWorkerPools).toHaveLength(1); + const pool = _allWorkerPools[0]; + await sleepUntil(() => jobStarted); + await pool.gracefulShutdown(); + await promise; + let jobs: Job[] = []; + for (let attempts = 0; attempts < 10; attempts++) { + jobs = await getJobs(pgPool); + if (jobs[0]?.last_error) { + break; + } else { + await sleep(25 * attempts); + } + } + expect(jobs).toHaveLength(1); + const [job] = jobs; + expect(job.last_error).toBeTruthy(); + })); diff --git a/package.json b/package.json index 919325d7..eb432422 100644 --- a/package.json +++ b/package.json @@ -14,7 +14,7 @@ "prettier:check": "prettier --cache --ignore-path .eslintignore --check '**/*.{js,jsx,ts,tsx,graphql,md,json}'", "test": "yarn prepack && yarn depcheck && yarn test:setupdb && yarn test:only", "test:setupdb": "./scripts/setup_template_db.sh", - "test:only": "node --experimental-vm-modules node_modules/.bin/jest", + "test:only": "NO_LOG_SUCCESS=1 node --experimental-vm-modules node_modules/.bin/jest", "depcheck": "depcheck --ignores='graphile-worker,faktory-worker,@google-cloud/tasks,bullmq,jest-environment-node,@docusaurus/*,@fortawesome/*,@mdx-js/*,@types/jest,clsx,eslint_d,graphile,juice,postcss-nested,prism-react-renderer,react,react-dom,svgo,ts-node,@types/debug,tslib'", "db:dump": "./scripts/dump_db", "perfTest": "cd perfTest && node ./run.js", @@ -84,7 +84,7 @@ "eslint_d": "^13.0.0", "graphile": "^5.0.0-beta.16", "jest": "^26.0.0", - "jest-time-helpers": "0.1.0", + "jest-time-helpers": "0.1.1", "juice": "5.2.0", "pg-connection-string": "^2.6.2", "postcss-nested": "^6.0.1", diff --git a/src/helpers.ts b/src/helpers.ts index 00880182..c2f547c2 100644 --- a/src/helpers.ts +++ b/src/helpers.ts @@ -225,10 +225,12 @@ export function makeJobHelpers( { withPgClient, abortSignal, + abortPromise, logger: overrideLogger, }: { withPgClient: EnhancedWithPgClient; - abortSignal: AbortSignal | undefined; + abortSignal: AbortSignal; + abortPromise: Promise; logger?: Logger; }, ): JobHelpers { @@ -240,6 +242,7 @@ export function makeJobHelpers( }); const helpers: JobHelpers = { abortSignal, + abortPromise, job, getQueueName(queueId = job.job_queue_id) { return getQueueName(compiledSharedOptions, withPgClient, queueId); diff --git a/src/interfaces.ts b/src/interfaces.ts index a2434c68..4b67592f 100644 --- a/src/interfaces.ts +++ b/src/interfaces.ts @@ -185,7 +185,14 @@ export interface JobHelpers extends Helpers { * * @experimental */ - abortSignal?: AbortSignal; + abortSignal: AbortSignal; + + /** + * A promise that rejects when the AbortSignal aborts. + * + * @experimental + */ + abortPromise: Promise; } export type CleanupTask = @@ -509,8 +516,10 @@ export interface WorkerPool { gracefulShutdown: (message?: string) => Promise; forcefulShutdown: (message: string) => Promise; promise: Promise; - /** @experimental */ + /** Fires 'abort' when all running jobs should stop because worker is shutting down. @experimental */ abortSignal: AbortSignal; + /** Rejects when the abortSignal aborts. @experimental */ + abortPromise: Promise; /** @internal */ _shuttingDown: boolean; /** @internal */ @@ -730,7 +739,7 @@ export interface WorkerOptions extends WorkerSharedOptions { */ workerId?: string; - abortSignal?: AbortSignal; + abortSignal: AbortSignal; workerPool: WorkerPool; diff --git a/src/main.ts b/src/main.ts index a1b8e343..5b29a0b8 100644 --- a/src/main.ts +++ b/src/main.ts @@ -585,6 +585,13 @@ export function _runTaskList( const abortController = new AbortController(); const abortSignal = abortController.signal; + const abortPromise = new Promise((_resolve, reject) => { + abortSignal.addEventListener("abort", () => { + reject(abortSignal.reason); + }); + }); + // Make sure Node doesn't get upset about unhandled rejection + abortPromise.then(null, () => /* noop */ void 0); // This is a representation of us that can be interacted with externally const workerPool: WorkerPool = { @@ -598,7 +605,8 @@ export function _runTaskList( return concurrency === 1 ? this._workers[0] ?? null : null; }, abortSignal, - release: async () => { + abortPromise, + release() { logger.error( "DEPRECATED: You are calling `workerPool.release()`; please use `workerPool.gracefulShutdown()` instead.", ); @@ -848,6 +856,7 @@ export function _runTaskList( withPgClient, continuous, abortSignal, + abortPromise, workerPool, autostart, workerId, diff --git a/src/worker.ts b/src/worker.ts index 7377d4e4..ecb5043e 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -24,6 +24,7 @@ export function makeNewWorker( withPgClient: EnhancedWithPgClient; continuous: boolean; abortSignal: AbortSignal; + abortPromise: Promise; workerPool: WorkerPool; autostart?: boolean; workerId?: string; @@ -35,6 +36,7 @@ export function makeNewWorker( withPgClient, continuous, abortSignal, + abortPromise, workerPool, autostart = true, workerId = `worker-${randomBytes(9).toString("hex")}`, @@ -244,6 +246,7 @@ export function makeNewWorker( withPgClient, logger, abortSignal, + abortPromise, }); result = await task(job.payload, helpers); } catch (error) { diff --git a/website/docs/tasks.md b/website/docs/tasks.md index f0e46d6b..644d883c 100644 --- a/website/docs/tasks.md +++ b/website/docs/tasks.md @@ -36,9 +36,13 @@ Each task function is passed two arguments: shouldn't need this - `getQueueName()` — get the name of the queue the job is in (may or may not return a promise - recommend you always `await` it) - - `abortSignal` — could be an `AbortSignal` or `undefined`; if set, use - this to abort your task early on graceful shutdown (can be passed to a - number of asynchronous Node.js methods) + - `abortSignal` — could be an `AbortSignal`, or `undefined` if not + supported by this release of worker; if set, use this to abort your task + early on graceful shutdown (can be passed to a number of asynchronous + Node.js methods) + - `abortPromise` — if present, a promise that will reject when + `abortSignal` aborts; convenient for exiting your task when the abortSignal + fires: `Promise.race([abortPromise, doYourThing()])` - `withPgClient` — a helper to use to get a database client - `query(sql, values)` — a convenience wrapper for `withPgClient(pgClient => pgClient.query(sql, values))` diff --git a/yarn.lock b/yarn.lock index 3223ab37..8ea65ee4 100644 --- a/yarn.lock +++ b/yarn.lock @@ -7714,10 +7714,10 @@ jest-snapshot@^26.6.2: pretty-format "^26.6.2" semver "^7.3.2" -jest-time-helpers@0.1.0: - version "0.1.0" - resolved "https://registry.yarnpkg.com/jest-time-helpers/-/jest-time-helpers-0.1.0.tgz#0d28164b4109035ce5010bfc5375b78700bfe793" - integrity sha512-rj3g5CPey4t1n/6HCEWL5epe7b33bPKurGOFmdDG7A6PoO6jtPfaCWNtRangFUCX7Z9aPr7D6nQfga635j1Fuw== +jest-time-helpers@0.1.1: + version "0.1.1" + resolved "https://registry.yarnpkg.com/jest-time-helpers/-/jest-time-helpers-0.1.1.tgz#93a6ee318ccd50a27f27898460f62d987f431bc9" + integrity sha512-FJdTB98OTD16HYe0Y+98FLYPlfWAcU0lj8OZ4AUpKkWSbqX3lL4aLjYZaH9gvQG4KcG7/cpBFwV9MhGQ71MD4Q== jest-util@^26.1.0, jest-util@^26.6.2: version "26.6.2"