Skip to content

Commit

Permalink
More consistent handling of options
Browse files Browse the repository at this point in the history
  • Loading branch information
benjie committed Nov 13, 2023
1 parent 26e0006 commit 3563980
Show file tree
Hide file tree
Showing 9 changed files with 167 additions and 106 deletions.
64 changes: 40 additions & 24 deletions __tests__/getTasks.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const options: WorkerSharedOptions = {};
describe("commonjs", () => {
test("gets tasks from folder", () =>
withPgClient(async (client) => {
const { tasks, release } = await getTasks(
const { tasks, release, compiledSharedOptions } = await getTasks(
options,
`${__dirname}/fixtures/tasks`,
);
Expand All @@ -19,10 +19,14 @@ Array [
"wouldyoulike_default",
]
`);
const helpers = makeJobHelpers(options, makeMockJob("would you like"), {
withPgClient: makeWithPgClientFromClient(client),
abortSignal: undefined,
});
const helpers = makeJobHelpers(
compiledSharedOptions,
makeMockJob("would you like"),
{
withPgClient: makeWithPgClientFromClient(client),
abortSignal: undefined,
},
);
expect(await tasks.wouldyoulike!(helpers.job.payload, helpers)).toEqual(
"some sausages",
);
Expand All @@ -34,7 +38,7 @@ Array [

test("get tasks from file (vanilla)", () =>
withPgClient(async (client) => {
const { tasks, release } = await getTasks(
const { tasks, release, compiledSharedOptions } = await getTasks(
options,
`${__dirname}/fixtures/tasksFile.js`,
);
Expand All @@ -46,10 +50,14 @@ Array [
]
`);

const helpers = makeJobHelpers(options, makeMockJob("task1"), {
withPgClient: makeWithPgClientFromClient(client),
abortSignal: undefined,
});
const helpers = makeJobHelpers(
compiledSharedOptions,
makeMockJob("task1"),
{
withPgClient: makeWithPgClientFromClient(client),
abortSignal: undefined,
},
);
expect(await tasks.task1!(helpers.job.payload, helpers)).toEqual("hi");
expect(await tasks.task2!(helpers.job.payload, helpers)).toEqual("hello");

Expand All @@ -58,7 +66,7 @@ Array [

test("get tasks from file (default)", () =>
withPgClient(async (client) => {
const { tasks, release } = await getTasks(
const { tasks, release, compiledSharedOptions } = await getTasks(
options,
`${__dirname}/fixtures/tasksFile_default.js`,
);
Expand All @@ -70,7 +78,7 @@ Array [
]
`);

const helpers = makeJobHelpers(options, makeMockJob("t1"), {
const helpers = makeJobHelpers(compiledSharedOptions, makeMockJob("t1"), {
withPgClient: makeWithPgClientFromClient(client),
abortSignal: undefined,
});
Expand All @@ -88,7 +96,7 @@ Array [
describe("esm", () => {
test("gets tasks from folder", () =>
withPgClient(async (client) => {
const { tasks, release } = await getTasks(
const { tasks, release, compiledSharedOptions } = await getTasks(
options,
`${__dirname}/fixtures-esm/tasks`,
);
Expand All @@ -99,10 +107,14 @@ Array [
"wouldyoulike_default",
]
`);
const helpers = makeJobHelpers(options, makeMockJob("would you like"), {
withPgClient: makeWithPgClientFromClient(client),
abortSignal: undefined,
});
const helpers = makeJobHelpers(
compiledSharedOptions,
makeMockJob("would you like"),
{
withPgClient: makeWithPgClientFromClient(client),
abortSignal: undefined,
},
);
expect(await tasks.wouldyoulike!(helpers.job.payload, helpers)).toEqual(
"some sausages",
);
Expand All @@ -114,7 +126,7 @@ Array [

test("get tasks from file (vanilla)", () =>
withPgClient(async (client) => {
const { tasks, release } = await getTasks(
const { tasks, release, compiledSharedOptions } = await getTasks(
options,
`${__dirname}/fixtures-esm/tasksFile.js`,
);
Expand All @@ -126,10 +138,14 @@ Array [
]
`);

const helpers = makeJobHelpers(options, makeMockJob("task1"), {
withPgClient: makeWithPgClientFromClient(client),
abortSignal: undefined,
});
const helpers = makeJobHelpers(
compiledSharedOptions,
makeMockJob("task1"),
{
withPgClient: makeWithPgClientFromClient(client),
abortSignal: undefined,
},
);
expect(await tasks.task1!(helpers.job.payload, helpers)).toEqual("hi");
expect(await tasks.task2!(helpers.job.payload, helpers)).toEqual("hello");

Expand All @@ -138,7 +154,7 @@ Array [

test("get tasks from file (default)", () =>
withPgClient(async (client) => {
const { tasks, release } = await getTasks(
const { tasks, release, compiledSharedOptions } = await getTasks(
options,
`${__dirname}/fixtures-esm/tasksFile_default.js`,
);
Expand All @@ -150,7 +166,7 @@ Array [
]
`);

const helpers = makeJobHelpers(options, makeMockJob("t1"), {
const helpers = makeJobHelpers(compiledSharedOptions, makeMockJob("t1"), {
withPgClient: makeWithPgClientFromClient(client),
abortSignal: undefined,
});
Expand Down
1 change: 1 addition & 0 deletions src/getTasks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ export default async function getTasks(
let released = false;
return {
tasks,
compiledSharedOptions,
release: () => {
if (released) {
return;
Expand Down
25 changes: 9 additions & 16 deletions src/helpers.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,14 @@
import { Pool, PoolClient } from "pg";

import {
AddJobFunction,
Job,
JobHelpers,
SharedOptions,
WithPgClient,
WorkerSharedOptions,
} from "./interfaces";
import { processSharedOptions } from "./lib";
import { AddJobFunction, Job, JobHelpers, WithPgClient } from "./interfaces";
import { CompiledSharedOptions, processSharedOptions } from "./lib";
import { Logger } from "./logger";

export function makeAddJob(
options: WorkerSharedOptions,
compiledSharedOptions: CompiledSharedOptions,
withPgClient: WithPgClient,
): AddJobFunction {
const { escapedWorkerSchema, useNodeTime } = processSharedOptions(options);
const { escapedWorkerSchema, useNodeTime } = compiledSharedOptions;
return (identifier, payload, spec = {}) => {
return withPgClient(async (pgClient) => {
const { rows } = await pgClient.query(
Expand Down Expand Up @@ -59,19 +52,19 @@ export function makeAddJob(
}

export function makeJobHelpers(
options: SharedOptions,
compiledSharedOptions: CompiledSharedOptions,
job: Job,
{
withPgClient,
logger: overrideLogger,
abortSignal,
logger: overrideLogger,
}: {
withPgClient: WithPgClient;
logger?: Logger;
abortSignal: AbortSignal | undefined;
logger?: Logger;
},
): JobHelpers {
const baseLogger = overrideLogger || processSharedOptions(options).logger;
const baseLogger = overrideLogger ?? compiledSharedOptions.logger;
const logger = baseLogger.scope({
label: "job",
taskIdentifier: job.task_identifier,
Expand All @@ -84,7 +77,7 @@ export function makeJobHelpers(
withPgClient,
query: (queryText, values) =>
withPgClient((pgClient) => pgClient.query(queryText, values)),
addJob: makeAddJob(options, withPgClient),
addJob: makeAddJob(compiledSharedOptions, withPgClient),

// TODO: add an API for giving workers more helpers
};
Expand Down
4 changes: 3 additions & 1 deletion src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import type {
QueryResultRow,
} from "pg";

import type { Release } from "./lib";
import type { CompiledSharedOptions, Release } from "./lib";
import type { Logger } from "./logger";
import type { Signal } from "./signals";

Expand Down Expand Up @@ -200,6 +200,8 @@ export type TaskList = {
export interface WatchedTaskList {
tasks: TaskList;
release: () => void;
/** @internal */
compiledSharedOptions: CompiledSharedOptions;
}

export interface WatchedCronItems {
Expand Down
52 changes: 34 additions & 18 deletions src/lib.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ export const BREAKING_MIGRATIONS = Object.entries(migrations)
.map(([migrationFile]) => parseInt(migrationFile.slice(0, 6), 10));

// NOTE: when you add things here, you may also want to add them to WorkerPluginContext
export interface CompiledSharedOptions {
export interface CompiledSharedOptions<
T extends SharedOptions = SharedOptions,
> {
version: string;
maxMigrationNumber: number;
breakingMigrationNumbers: number[];
Expand All @@ -46,7 +48,7 @@ export interface CompiledSharedOptions {
useNodeTime: boolean;
minResetLockedInterval: number;
maxResetLockedInterval: number;
options: SharedOptions;
options: T;
hooks: AsyncHooks<GraphileConfig.WorkerHooks>;
resolvedPreset?: GraphileConfig.ResolvedPreset;
gracefulShutdownAbortTimeout: number;
Expand All @@ -57,11 +59,13 @@ interface ProcessSharedOptionsSettings {
}

const _sharedOptionsCache = new WeakMap<SharedOptions, CompiledSharedOptions>();
export function processSharedOptions(
options: SharedOptions,
export function processSharedOptions<T extends SharedOptions>(
options: T,
{ scope }: ProcessSharedOptionsSettings = {},
): CompiledSharedOptions {
let compiled = _sharedOptionsCache.get(options);
): CompiledSharedOptions<T> {
let compiled = _sharedOptionsCache.get(options) as
| CompiledSharedOptions<T>
| undefined;
if (!compiled) {
const {
logger = defaultLogger,
Expand Down Expand Up @@ -137,30 +141,37 @@ export function processSharedOptions(
export type Releasers = Array<() => void | Promise<void>>;

export async function assertPool(
options: SharedOptions,
compiledSharedOptions: CompiledSharedOptions,
releasers: Releasers,
): Promise<Pool> {
const { logger } = processSharedOptions(options);
const { logger, options } = compiledSharedOptions;
const {
preset,
maxPoolSize = preset?.worker?.maxPoolSize ?? defaults.maxPoolSize,
} = options;
assert.ok(
!options.pgPool || !options.connectionString,
"Both `pgPool` and `connectionString` are set, at most one of these options should be provided",
);
let pgPool: Pool;
const connectionString = options.connectionString || process.env.DATABASE_URL;
const connectionString =
options.connectionString ||
options.preset?.worker?.connectionString ||
process.env.DATABASE_URL;
if (options.pgPool) {
pgPool = options.pgPool;
} else if (connectionString) {
pgPool = new Pool({
connectionString,
max: options.maxPoolSize,
max: maxPoolSize,
});
releasers.push(() => {
pgPool.end();
});
} else if (process.env.PGDATABASE) {
pgPool = new Pool({
/* Pool automatically pulls settings from envvars */
max: options.maxPoolSize,
max: maxPoolSize,
});
releasers.push(() => {
pgPool.end();
Expand Down Expand Up @@ -259,14 +270,19 @@ export const getUtilsAndReleasersFromOptions = async (
options: RunnerOptions,
settings: ProcessSharedOptionsSettings = {},
): Promise<CompiledOptions> => {
const shared = processSharedOptions(options, settings);
const { concurrency = defaults.concurrentJobs } = options;
const { hooks } = shared;
const compiledSharedOptions = processSharedOptions(options, settings);
const {
hooks,
options: {
preset,
concurrency = preset?.worker?.concurrentJobs ?? defaults.concurrentJobs,
},
} = compiledSharedOptions;
return withReleasers(async function getUtilsFromOptions(
releasers,
release,
): Promise<CompiledOptions> {
const pgPool: Pool = await assertPool(options, releasers);
const pgPool: Pool = await assertPool(compiledSharedOptions, releasers);
// @ts-ignore
const max = pgPool?.options?.max || 10;
if (max < concurrency) {
Expand All @@ -281,14 +297,14 @@ export const getUtilsAndReleasersFromOptions = async (
await withPgClient(async function migrateWithPgClient(client) {
const event = { client };
await hooks.process("premigrate", event);
await migrate(options, event.client);
await migrate(compiledSharedOptions, event.client);
await hooks.process("postmigrate", event);
});

const addJob = makeAddJob(options, withPgClient);
const addJob = makeAddJob(compiledSharedOptions, withPgClient);

return {
...shared,
...compiledSharedOptions,
pgPool,
withPgClient,
addJob,
Expand Down
Loading

0 comments on commit 3563980

Please sign in to comment.