Skip to content

Commit

Permalink
More consistency and fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
benjie committed Nov 13, 2023
1 parent 1aa1acb commit 5eeee8c
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 15 deletions.
6 changes: 4 additions & 2 deletions __tests__/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
WorkerPoolOptions,
WorkerUtils,
} from "../src/interfaces";
import { processSharedOptions } from "../src/lib";
import { _allWorkerPools } from "../src/main";
import { migrate } from "../src/migrate";

Expand Down Expand Up @@ -111,12 +112,13 @@ export async function reset(
await pgPoolOrClient.query(
`drop schema if exists ${ESCAPED_GRAPHILE_WORKER_SCHEMA} cascade;`,
);
const compiledSharedOptions = processSharedOptions(options);
if (isPoolClient(pgPoolOrClient)) {
await migrate(options, pgPoolOrClient);
await migrate(compiledSharedOptions, pgPoolOrClient);
} else {
const client = await pgPoolOrClient.connect();
try {
await migrate(options, client);
await migrate(compiledSharedOptions, client);
} finally {
client.release();
}
Expand Down
26 changes: 16 additions & 10 deletions __tests__/migrate.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { WorkerSharedOptions } from "../src";
import { migrations } from "../src/generated/sql";
import { processSharedOptions } from "../src/lib";
import { migrate } from "../src/migrate";
import {
ESCAPED_GRAPHILE_WORKER_SCHEMA,
Expand Down Expand Up @@ -29,7 +30,8 @@ test("migration installs schema; second migration does no harm", async () => {
expect(graphileWorkerNamespaceBeforeMigration).toBeFalsy();

// Perform migration
await migrate(options, pgClient);
const compiledSharedOptions = processSharedOptions(options);
await migrate(compiledSharedOptions, pgClient);

// Assert migrations table exists and has relevant entries
const { rows: migrationRows } = await pgClient.query(
Expand All @@ -50,9 +52,9 @@ test("migration installs schema; second migration does no harm", async () => {
}

// Assert that re-migrating causes no issues
await migrate(options, pgClient);
await migrate(options, pgClient);
await migrate(options, pgClient);
await migrate(compiledSharedOptions, pgClient);
await migrate(compiledSharedOptions, pgClient);
await migrate(compiledSharedOptions, pgClient);
{
const jobsRows = await getJobs(pgClient);
expect(jobsRows).toHaveLength(1);
Expand Down Expand Up @@ -83,8 +85,10 @@ insert into ${ESCAPED_GRAPHILE_WORKER_SCHEMA}.migrations (id) values (1);
// We need to use a fresh connection after dropping the schema because the SQL
// functions' plans get cached using the stale OIDs.
await withPgClient(async (pgClient) => {
const compiledSharedOptions = processSharedOptions(options);

// Perform migration
await migrate(options, pgClient);
await migrate(compiledSharedOptions, pgClient);

// Assert migrations table exists and has relevant entries
const { rows: migrationRows } = await pgClient.query(
Expand All @@ -109,9 +113,9 @@ insert into ${ESCAPED_GRAPHILE_WORKER_SCHEMA}.migrations (id) values (1);
}

// Assert that re-migrating causes no issues
await migrate(options, pgClient);
await migrate(options, pgClient);
await migrate(options, pgClient);
await migrate(compiledSharedOptions, pgClient);
await migrate(compiledSharedOptions, pgClient);
await migrate(compiledSharedOptions, pgClient);
{
const jobsRows = await getJobs(pgClient);
expect(jobsRows).toHaveLength(1);
Expand All @@ -138,16 +142,18 @@ test("aborts if database is more up to date than current worker", async () => {
);
expect(graphileWorkerNamespaceBeforeMigration).toBeFalsy();

const compiledSharedOptions = processSharedOptions(options);

// Perform migration
await migrate(options, pgClient);
await migrate(compiledSharedOptions, pgClient);

// Insert a more up to date migration
await pgClient.query(
`insert into ${ESCAPED_GRAPHILE_WORKER_SCHEMA}.migrations (id, ts, breaking) values (999999, '2023-10-19T10:31:00Z', true);`,
);

await expect(
migrate(options, pgClient),
migrate(compiledSharedOptions, pgClient),
).rejects.toThrowErrorMatchingInlineSnapshot(
`"Database is using Graphile Worker schema revision 999999 which includes breaking migration 999999, but the currently running worker only supports up to revision 18. It would be unsafe to continue; please ensure all versions of Graphile Worker are compatible."`,
);
Expand Down
3 changes: 3 additions & 0 deletions src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,9 @@ export interface RunOnceOptions extends SharedOptions {
* handle graceful shutdown of the worker if the process receives a signal.
*/
noHandleSignals?: boolean;

/** Single worker only! */
concurrency?: 1;
}

/**
Expand Down
9 changes: 6 additions & 3 deletions src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -497,10 +497,13 @@ export function _runTaskList(
},
): WorkerPool {
const {
options: { preset, noHandleSignals = false },
} = compiledSharedOptions;
preset,
noHandleSignals = false,
concurrency: baseConcurrency = preset?.worker?.concurrentJobs ??
defaults.concurrentJobs,
} = compiledSharedOptions.options;
const {
concurrency = preset?.worker?.concurrentJobs ?? defaults.concurrentJobs,
concurrency = baseConcurrency,
continuous,
autostart: rawAutostart = true,
onTerminate,
Expand Down
1 change: 1 addition & 0 deletions src/migrate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ async function runMigration(
}
}

/** @internal */
export async function migrate(
compiledSharedOptions: CompiledSharedOptions<WorkerSharedOptions>,
client: PoolClient,
Expand Down

0 comments on commit 5eeee8c

Please sign in to comment.