Skip to content

Commit

Permalink
Reduce calls to processSharedOptions by passing pre-processed values …
Browse files Browse the repository at this point in the history
…more consistently (#390)
  • Loading branch information
benjie authored Nov 14, 2023
2 parents 8ed836c + 5eeee8c commit 50752a2
Show file tree
Hide file tree
Showing 14 changed files with 233 additions and 149 deletions.
64 changes: 40 additions & 24 deletions __tests__/getTasks.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const options: WorkerSharedOptions = {};
describe("commonjs", () => {
test("gets tasks from folder", () =>
withPgClient(async (client) => {
const { tasks, release } = await getTasks(
const { tasks, release, compiledSharedOptions } = await getTasks(
options,
`${__dirname}/fixtures/tasks`,
);
Expand All @@ -19,10 +19,14 @@ Array [
"wouldyoulike_default",
]
`);
const helpers = makeJobHelpers(options, makeMockJob("would you like"), {
withPgClient: makeWithPgClientFromClient(client),
abortSignal: undefined,
});
const helpers = makeJobHelpers(
compiledSharedOptions,
makeMockJob("would you like"),
{
withPgClient: makeWithPgClientFromClient(client),
abortSignal: undefined,
},
);
expect(await tasks.wouldyoulike!(helpers.job.payload, helpers)).toEqual(
"some sausages",
);
Expand All @@ -34,7 +38,7 @@ Array [

test("get tasks from file (vanilla)", () =>
withPgClient(async (client) => {
const { tasks, release } = await getTasks(
const { tasks, release, compiledSharedOptions } = await getTasks(
options,
`${__dirname}/fixtures/tasksFile.js`,
);
Expand All @@ -46,10 +50,14 @@ Array [
]
`);

const helpers = makeJobHelpers(options, makeMockJob("task1"), {
withPgClient: makeWithPgClientFromClient(client),
abortSignal: undefined,
});
const helpers = makeJobHelpers(
compiledSharedOptions,
makeMockJob("task1"),
{
withPgClient: makeWithPgClientFromClient(client),
abortSignal: undefined,
},
);
expect(await tasks.task1!(helpers.job.payload, helpers)).toEqual("hi");
expect(await tasks.task2!(helpers.job.payload, helpers)).toEqual("hello");

Expand All @@ -58,7 +66,7 @@ Array [

test("get tasks from file (default)", () =>
withPgClient(async (client) => {
const { tasks, release } = await getTasks(
const { tasks, release, compiledSharedOptions } = await getTasks(
options,
`${__dirname}/fixtures/tasksFile_default.js`,
);
Expand All @@ -70,7 +78,7 @@ Array [
]
`);

const helpers = makeJobHelpers(options, makeMockJob("t1"), {
const helpers = makeJobHelpers(compiledSharedOptions, makeMockJob("t1"), {
withPgClient: makeWithPgClientFromClient(client),
abortSignal: undefined,
});
Expand All @@ -88,7 +96,7 @@ Array [
describe("esm", () => {
test("gets tasks from folder", () =>
withPgClient(async (client) => {
const { tasks, release } = await getTasks(
const { tasks, release, compiledSharedOptions } = await getTasks(
options,
`${__dirname}/fixtures-esm/tasks`,
);
Expand All @@ -99,10 +107,14 @@ Array [
"wouldyoulike_default",
]
`);
const helpers = makeJobHelpers(options, makeMockJob("would you like"), {
withPgClient: makeWithPgClientFromClient(client),
abortSignal: undefined,
});
const helpers = makeJobHelpers(
compiledSharedOptions,
makeMockJob("would you like"),
{
withPgClient: makeWithPgClientFromClient(client),
abortSignal: undefined,
},
);
expect(await tasks.wouldyoulike!(helpers.job.payload, helpers)).toEqual(
"some sausages",
);
Expand All @@ -114,7 +126,7 @@ Array [

test("get tasks from file (vanilla)", () =>
withPgClient(async (client) => {
const { tasks, release } = await getTasks(
const { tasks, release, compiledSharedOptions } = await getTasks(
options,
`${__dirname}/fixtures-esm/tasksFile.js`,
);
Expand All @@ -126,10 +138,14 @@ Array [
]
`);

const helpers = makeJobHelpers(options, makeMockJob("task1"), {
withPgClient: makeWithPgClientFromClient(client),
abortSignal: undefined,
});
const helpers = makeJobHelpers(
compiledSharedOptions,
makeMockJob("task1"),
{
withPgClient: makeWithPgClientFromClient(client),
abortSignal: undefined,
},
);
expect(await tasks.task1!(helpers.job.payload, helpers)).toEqual("hi");
expect(await tasks.task2!(helpers.job.payload, helpers)).toEqual("hello");

Expand All @@ -138,7 +154,7 @@ Array [

test("get tasks from file (default)", () =>
withPgClient(async (client) => {
const { tasks, release } = await getTasks(
const { tasks, release, compiledSharedOptions } = await getTasks(
options,
`${__dirname}/fixtures-esm/tasksFile_default.js`,
);
Expand All @@ -150,7 +166,7 @@ Array [
]
`);

const helpers = makeJobHelpers(options, makeMockJob("t1"), {
const helpers = makeJobHelpers(compiledSharedOptions, makeMockJob("t1"), {
withPgClient: makeWithPgClientFromClient(client),
abortSignal: undefined,
});
Expand Down
6 changes: 4 additions & 2 deletions __tests__/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
WorkerPoolOptions,
WorkerUtils,
} from "../src/interfaces";
import { processSharedOptions } from "../src/lib";
import { _allWorkerPools } from "../src/main";
import { migrate } from "../src/migrate";

Expand Down Expand Up @@ -111,12 +112,13 @@ export async function reset(
await pgPoolOrClient.query(
`drop schema if exists ${ESCAPED_GRAPHILE_WORKER_SCHEMA} cascade;`,
);
const compiledSharedOptions = processSharedOptions(options);
if (isPoolClient(pgPoolOrClient)) {
await migrate(options, pgPoolOrClient);
await migrate(compiledSharedOptions, pgPoolOrClient);
} else {
const client = await pgPoolOrClient.connect();
try {
await migrate(options, client);
await migrate(compiledSharedOptions, client);
} finally {
client.release();
}
Expand Down
26 changes: 16 additions & 10 deletions __tests__/migrate.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { WorkerSharedOptions } from "../src";
import { migrations } from "../src/generated/sql";
import { processSharedOptions } from "../src/lib";
import { migrate } from "../src/migrate";
import {
ESCAPED_GRAPHILE_WORKER_SCHEMA,
Expand Down Expand Up @@ -29,7 +30,8 @@ test("migration installs schema; second migration does no harm", async () => {
expect(graphileWorkerNamespaceBeforeMigration).toBeFalsy();

// Perform migration
await migrate(options, pgClient);
const compiledSharedOptions = processSharedOptions(options);
await migrate(compiledSharedOptions, pgClient);

// Assert migrations table exists and has relevant entries
const { rows: migrationRows } = await pgClient.query(
Expand All @@ -50,9 +52,9 @@ test("migration installs schema; second migration does no harm", async () => {
}

// Assert that re-migrating causes no issues
await migrate(options, pgClient);
await migrate(options, pgClient);
await migrate(options, pgClient);
await migrate(compiledSharedOptions, pgClient);
await migrate(compiledSharedOptions, pgClient);
await migrate(compiledSharedOptions, pgClient);
{
const jobsRows = await getJobs(pgClient);
expect(jobsRows).toHaveLength(1);
Expand Down Expand Up @@ -83,8 +85,10 @@ insert into ${ESCAPED_GRAPHILE_WORKER_SCHEMA}.migrations (id) values (1);
// We need to use a fresh connection after dropping the schema because the SQL
// functions' plans get cached using the stale OIDs.
await withPgClient(async (pgClient) => {
const compiledSharedOptions = processSharedOptions(options);

// Perform migration
await migrate(options, pgClient);
await migrate(compiledSharedOptions, pgClient);

// Assert migrations table exists and has relevant entries
const { rows: migrationRows } = await pgClient.query(
Expand All @@ -109,9 +113,9 @@ insert into ${ESCAPED_GRAPHILE_WORKER_SCHEMA}.migrations (id) values (1);
}

// Assert that re-migrating causes no issues
await migrate(options, pgClient);
await migrate(options, pgClient);
await migrate(options, pgClient);
await migrate(compiledSharedOptions, pgClient);
await migrate(compiledSharedOptions, pgClient);
await migrate(compiledSharedOptions, pgClient);
{
const jobsRows = await getJobs(pgClient);
expect(jobsRows).toHaveLength(1);
Expand All @@ -138,16 +142,18 @@ test("aborts if database is more up to date than current worker", async () => {
);
expect(graphileWorkerNamespaceBeforeMigration).toBeFalsy();

const compiledSharedOptions = processSharedOptions(options);

// Perform migration
await migrate(options, pgClient);
await migrate(compiledSharedOptions, pgClient);

// Insert a more up to date migration
await pgClient.query(
`insert into ${ESCAPED_GRAPHILE_WORKER_SCHEMA}.migrations (id, ts, breaking) values (999999, '2023-10-19T10:31:00Z', true);`,
);

await expect(
migrate(options, pgClient),
migrate(compiledSharedOptions, pgClient),
).rejects.toThrowErrorMatchingInlineSnapshot(
`"Database is using Graphile Worker schema revision 999999 which includes breaking migration 999999, but the currently running worker only supports up to revision 18. It would be unsafe to continue; please ensure all versions of Graphile Worker are compatible."`,
);
Expand Down
19 changes: 15 additions & 4 deletions src/cron.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import {
TimestampDigest,
WorkerEvents,
} from "./interfaces";
import { processSharedOptions, Releasers } from "./lib";
import { CompiledOptions, processSharedOptions, Releasers } from "./lib";

interface CronRequirements {
pgPool: Pool;
Expand Down Expand Up @@ -494,11 +494,19 @@ export const runCron = (
};
};

/** @internal */
export async function getParsedCronItemsFromOptions(
options: RunnerOptions,
compiledOptions: CompiledOptions,
releasers: Releasers,
): Promise<Array<ParsedCronItem>> {
const { crontabFile, parsedCronItems, crontab } = options;
const {
options: {
preset,
crontabFile = preset?.worker?.crontabFile,
parsedCronItems,
crontab,
},
} = compiledOptions;

if (!crontabFile && !parsedCronItems && !crontab) {
return [];
Expand All @@ -521,7 +529,10 @@ export async function getParsedCronItemsFromOptions(
"`crontabFile` and `parsedCronItems` must not be set at the same time.",
);

const watchedCronItems = await getCronItems(options, crontabFile);
const watchedCronItems = await getCronItems(
compiledOptions.options,
crontabFile,
);
releasers.push(() => watchedCronItems.release());
return watchedCronItems.items;
} else {
Expand Down
1 change: 1 addition & 0 deletions src/getTasks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ export default async function getTasks(
let released = false;
return {
tasks,
compiledSharedOptions,
release: () => {
if (released) {
return;
Expand Down
25 changes: 9 additions & 16 deletions src/helpers.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,14 @@
import { Pool, PoolClient } from "pg";

import {
AddJobFunction,
Job,
JobHelpers,
SharedOptions,
WithPgClient,
WorkerSharedOptions,
} from "./interfaces";
import { processSharedOptions } from "./lib";
import { AddJobFunction, Job, JobHelpers, WithPgClient } from "./interfaces";
import { CompiledSharedOptions } from "./lib";
import { Logger } from "./logger";

export function makeAddJob(
options: WorkerSharedOptions,
compiledSharedOptions: CompiledSharedOptions,
withPgClient: WithPgClient,
): AddJobFunction {
const { escapedWorkerSchema, useNodeTime } = processSharedOptions(options);
const { escapedWorkerSchema, useNodeTime } = compiledSharedOptions;
return (identifier, payload, spec = {}) => {
return withPgClient(async (pgClient) => {
const { rows } = await pgClient.query(
Expand Down Expand Up @@ -59,19 +52,19 @@ export function makeAddJob(
}

export function makeJobHelpers(
options: SharedOptions,
compiledSharedOptions: CompiledSharedOptions,
job: Job,
{
withPgClient,
logger: overrideLogger,
abortSignal,
logger: overrideLogger,
}: {
withPgClient: WithPgClient;
logger?: Logger;
abortSignal: AbortSignal | undefined;
logger?: Logger;
},
): JobHelpers {
const baseLogger = overrideLogger || processSharedOptions(options).logger;
const baseLogger = overrideLogger ?? compiledSharedOptions.logger;
const logger = baseLogger.scope({
label: "job",
taskIdentifier: job.task_identifier,
Expand All @@ -84,7 +77,7 @@ export function makeJobHelpers(
withPgClient,
query: (queryText, values) =>
withPgClient((pgClient) => pgClient.query(queryText, values)),
addJob: makeAddJob(options, withPgClient),
addJob: makeAddJob(compiledSharedOptions, withPgClient),

// TODO: add an API for giving workers more helpers
};
Expand Down
2 changes: 1 addition & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ declare global {
/**
* Called when Graphile Worker starts up.
*/
init(): PromiseOrDirect<void>;
init(): void;

/**
* Called before migrating the DB.
Expand Down
7 changes: 6 additions & 1 deletion src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import type {
QueryResultRow,
} from "pg";

import type { Release } from "./lib";
import type { CompiledSharedOptions, Release } from "./lib";
import type { Logger } from "./logger";
import type { Signal } from "./signals";

Expand Down Expand Up @@ -200,6 +200,8 @@ export type TaskList = {
export interface WatchedTaskList {
tasks: TaskList;
release: () => void;
/** @internal */
compiledSharedOptions: CompiledSharedOptions;
}

export interface WatchedCronItems {
Expand Down Expand Up @@ -628,6 +630,9 @@ export interface RunOnceOptions extends SharedOptions {
* handle graceful shutdown of the worker if the process receives a signal.
*/
noHandleSignals?: boolean;

/** Single worker only! */
concurrency?: 1;
}

/**
Expand Down
Loading

0 comments on commit 50752a2

Please sign in to comment.