Skip to content

Commit

Permalink
Use a custom error message for migration 11 error (#401)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjie authored Nov 22, 2023
2 parents 1f6d9bd + a883130 commit a6c3215
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 7 deletions.
62 changes: 61 additions & 1 deletion __tests__/migrate.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { WorkerSharedOptions } from "../src";
import { migrations } from "../src/generated/sql";
import { processSharedOptions } from "../src/lib";
import { migrate } from "../src/migrate";
import { installSchema, migrate, runMigration } from "../src/migrate";
import {
ESCAPED_GRAPHILE_WORKER_SCHEMA,
getJobs,
Expand Down Expand Up @@ -160,6 +160,66 @@ test("aborts if database is more up to date than current worker", async () => {
});
});

test("throws helpful error message in migration 11", async () => {
await withPgClient(async (pgClient) => {
await pgClient.query(
`drop schema if exists ${ESCAPED_GRAPHILE_WORKER_SCHEMA} cascade;`,
);
});
// We need to use a fresh connection after dropping the schema because the SQL
// functions' plans get cached using the stale OIDs.
await withPgClient(async (pgClient) => {
// Assert DB is empty
const {
rows: [graphileWorkerNamespaceBeforeMigration],
} = await pgClient.query(
`select * from pg_catalog.pg_namespace where nspname = $1`,
[GRAPHILE_WORKER_SCHEMA],
);
expect(graphileWorkerNamespaceBeforeMigration).toBeFalsy();

const compiledSharedOptions = processSharedOptions(options);

// Manually run the first 10 migrations
const event = {
client: pgClient,
postgresVersion: 120000, // TODO: use the actual postgres version
scratchpad: Object.create(null),
};
await installSchema(compiledSharedOptions, event);
const migrationFiles = Object.keys(
migrations,
) as (keyof typeof migrations)[];
for (const migrationFile of migrationFiles) {
const migrationNumber = parseInt(migrationFile.slice(0, 6), 10);
if (migrationNumber > 10) {
break;
}
await runMigration(
compiledSharedOptions,
event,
migrationFile,
migrationNumber,
);
}

// Lock a job
await pgClient.query(
`select ${compiledSharedOptions.escapedWorkerSchema}.add_job('lock_me', '{}');`,
);
await pgClient.query(
`update ${compiledSharedOptions.escapedWorkerSchema}.jobs set locked_at = now(), locked_by = 'test_runner';`,
);

// Perform migration
const promise = migrate(compiledSharedOptions, pgClient);

await expect(promise).rejects.toThrowErrorMatchingInlineSnapshot(
`"There are locked jobs present; migration 11 cannot complete. Please ensure all workers are shut down cleanly and all locked jobs and queues are unlocked before attempting this migration. To achieve this with minimal downtime, please consider using Worker Pro: https://worker.graphile.org/docs/pro/migration"`,
);
});
});

afterAll(() => {
process.exitCode = 0;
});
7 changes: 7 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,13 @@ declare global {
*/
postmigrate(event: GraphileWorker.MigrateEvent): PromiseOrDirect<void>;

/**
* Called if an error occurs during migration.
*/
migrationError(
event: GraphileWorker.MigrateEvent & { error: Error },
): PromiseOrDirect<void>;

/**
* Used to build a given `taskIdentifier`'s handler given a list of files,
* if possible.
Expand Down
20 changes: 14 additions & 6 deletions src/migrate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ async function fetchAndCheckPostgresVersion(client: PoolClient) {
return checkPostgresVersion(row.server_version_num);
}

async function installSchema(
/** @internal */
export async function installSchema(
compiledSharedOptions: CompiledSharedOptions<WorkerSharedOptions>,
event: GraphileWorker.MigrateEvent,
) {
Expand All @@ -49,13 +50,14 @@ async function installSchema(
await hooks.process("postbootstrap", event);
}

async function runMigration(
/** @internal */
export async function runMigration(
compiledSharedOptions: CompiledSharedOptions<WorkerSharedOptions>,
event: GraphileWorker.MigrateEvent,
migrationFile: keyof typeof migrations,
migrationNumber: number,
) {
const { escapedWorkerSchema, logger } = compiledSharedOptions;
const { escapedWorkerSchema, logger, hooks } = compiledSharedOptions;
const rawText = migrations[migrationFile];
const text = rawText.replace(
/:GRAPHILE_WORKER_SCHEMA\b/g,
Expand Down Expand Up @@ -84,16 +86,22 @@ async function runMigration(
JSON.stringify({ migrationNumber, breaking }),
]);
await event.client.query("commit");
} catch (e) {
} catch (error) {
await event.client.query("rollback");
if (!migrationInsertComplete && e.code === "23505") {
await hooks.process("migrationError", { ...event, error });
if (!migrationInsertComplete && error.code === "23505") {
// Someone else did this migration! Success!
logger.debug(
`Some other worker has performed migration ${migrationFile}; continuing.`,
);
return;
}
throw e;
if (error.code === "22012" && migrationNumber === 11) {
throw new Error(
`There are locked jobs present; migration 11 cannot complete. Please ensure all workers are shut down cleanly and all locked jobs and queues are unlocked before attempting this migration. To achieve this with minimal downtime, please consider using Worker Pro: https://worker.graphile.org/docs/pro/migration`,
);
}
throw error;
}
}

Expand Down

0 comments on commit a6c3215

Please sign in to comment.