Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When queuing multiple jobs at once, nudge more than one worker #389

Merged
merged 6 commits into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ imported.
Logging: changed format of task completion/failure logs to include attempts/max
attempts and to reduce duplicate parenthesis.

Replaces job announcement trigger with calls directly in `add_job` / `add_jobs`
to reduce queuing overhead.

Fixes bug where queuing 100 jobs in a single statement would only nudge a single
inactive worker. Now as many workers as necessary and available will be nudged.

### v0.15.1

Fixes issues with graceful worker shutdowns:
Expand Down
6 changes: 3 additions & 3 deletions __tests__/migrate.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ test("migration installs schema; second migration does no harm", async () => {
const { rows: migrationRows } = await pgClient.query(
`select * from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}.migrations`,
);
expect(migrationRows).toHaveLength(17);
expect(migrationRows).toHaveLength(18);
const migration = migrationRows[0];
expect(migration.id).toEqual(1);

Expand Down Expand Up @@ -90,7 +90,7 @@ insert into ${ESCAPED_GRAPHILE_WORKER_SCHEMA}.migrations (id) values (1);
const { rows: migrationRows } = await pgClient.query(
`select * from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}.migrations`,
);
expect(migrationRows.length).toBeGreaterThanOrEqual(17);
expect(migrationRows.length).toBeGreaterThanOrEqual(18);
const migration2 = migrationRows[1];
expect(migration2.id).toEqual(2);
expect(migration2.breaking).toEqual(false);
Expand Down Expand Up @@ -149,7 +149,7 @@ test("aborts if database is more up to date than current worker", async () => {
await expect(
migrate(options, pgClient),
).rejects.toThrowErrorMatchingInlineSnapshot(
`"Database is using Graphile Worker schema revision 999999 which includes breaking migration 999999, but the currently running worker only supports up to revision 17. It would be unsafe to continue; please ensure all versions of Graphile Worker are compatible."`,
`"Database is using Graphile Worker schema revision 999999 which includes breaking migration 999999, but the currently running worker only supports up to revision 18. It would be unsafe to continue; please ensure all versions of Graphile Worker are compatible."`,
);
});
});
Expand Down
15 changes: 6 additions & 9 deletions __tests__/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ begin
updated_at = now()
returning *
into v_job;
if v_job.revision = 0 then
perform pg_notify('jobs:insert', '{"count":1}');
end if;
return v_job;
else
raise exception 'Invalid job_key_mode value, expected ''replace'', ''preserve_run_at'' or ''unsafe_dedupe''.' using errcode = 'GWBKM';
Expand Down Expand Up @@ -140,6 +143,8 @@ begin
where spec.job_key is not null
and jobs.key = spec.job_key
and is_available is not true;
-- WARNING: this count is not 100% accurate; 'on conflict' clause will cause it to be an overestimate
perform pg_notify('jobs:insert', '{"count":' || array_length(specs, 1)::text || '}');
-- TODO: is there a risk that a conflict could occur depending on the
-- isolation level?
return query insert into "graphile_worker"._private_jobs as jobs (
Expand Down Expand Up @@ -249,6 +254,7 @@ begin
)
returning * into v_job;
if not (v_job is null) then
perform pg_notify('jobs:insert', '{"count":-1}');
return v_job;
end if;
-- Otherwise prevent job from retrying, and clear the key
Expand Down Expand Up @@ -280,14 +286,6 @@ CREATE FUNCTION graphile_worker.reschedule_jobs(job_ids bigint[], run_at timesta
)
returning *;
$$;
CREATE FUNCTION graphile_worker.tg_jobs__after_insert() RETURNS trigger
LANGUAGE plpgsql
AS $$
begin
perform pg_notify('jobs:insert', '');
return new;
end;
$$;
CREATE TABLE graphile_worker._private_job_queues (
id integer NOT NULL,
queue_name text NOT NULL,
Expand Down Expand Up @@ -372,7 +370,6 @@ ALTER TABLE ONLY graphile_worker._private_tasks
ADD CONSTRAINT tasks_pkey PRIMARY KEY (id);
CREATE INDEX jobs_main_index ON graphile_worker._private_jobs USING btree (priority, run_at) INCLUDE (id, task_id, job_queue_id) WHERE (is_available = true);
CREATE INDEX jobs_no_queue_index ON graphile_worker._private_jobs USING btree (priority, run_at) INCLUDE (id, task_id) WHERE ((is_available = true) AND (job_queue_id IS NULL));
CREATE TRIGGER _900_after_insert AFTER INSERT ON graphile_worker._private_jobs FOR EACH STATEMENT EXECUTE PROCEDURE graphile_worker.tg_jobs__after_insert();
ALTER TABLE graphile_worker._private_job_queues ENABLE ROW LEVEL SECURITY;
ALTER TABLE graphile_worker._private_jobs ENABLE ROW LEVEL SECURITY;
ALTER TABLE graphile_worker._private_known_crontabs ENABLE ROW LEVEL SECURITY;
Expand Down
5 changes: 4 additions & 1 deletion perfTest/run.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ process.env.NO_LOG_SUCCESS = "1";

// if connection string not provided, assume postgres is available locally
process.env.PERF_DATABASE_URL = `${
process.env.TEST_CONNECTION_STRING || "graphile_worker_perftest"
process.env.TEST_CONNECTION_STRING || "postgres:///graphile_worker_perftest"
}`;

const env = {
Expand All @@ -38,6 +38,9 @@ const execOptions = {
};

async function main() {
console.log("Building");
execSync("yarn prepack", execOptions);

console.log("Dropping and recreating the test database");
execSync("node ./recreateDb.js", execOptions);

Expand Down
206 changes: 206 additions & 0 deletions sql/000018.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
DROP TRIGGER _900_after_insert ON :GRAPHILE_WORKER_SCHEMA._private_jobs;
DROP FUNCTION :GRAPHILE_WORKER_SCHEMA.tg_jobs__after_insert;

CREATE OR REPLACE FUNCTION :GRAPHILE_WORKER_SCHEMA.add_job(identifier text, payload json DEFAULT NULL::json, queue_name text DEFAULT NULL::text, run_at timestamp with time zone DEFAULT NULL::timestamp with time zone, max_attempts integer DEFAULT NULL::integer, job_key text DEFAULT NULL::text, priority integer DEFAULT NULL::integer, flags text[] DEFAULT NULL::text[], job_key_mode text DEFAULT 'replace'::text) RETURNS :GRAPHILE_WORKER_SCHEMA._private_jobs
LANGUAGE plpgsql
AS $$
declare
v_job :GRAPHILE_WORKER_SCHEMA._private_jobs;
begin
if (job_key is null or job_key_mode is null or job_key_mode in ('replace', 'preserve_run_at')) then
select * into v_job
from :GRAPHILE_WORKER_SCHEMA.add_jobs(
ARRAY[(
identifier,
payload,
queue_name,
run_at,
max_attempts::smallint,
job_key,
priority::smallint,
flags
):::GRAPHILE_WORKER_SCHEMA.job_spec],
(job_key_mode = 'preserve_run_at')
)
limit 1;
return v_job;
elsif job_key_mode = 'unsafe_dedupe' then
-- Ensure all the tasks exist
insert into :GRAPHILE_WORKER_SCHEMA._private_tasks as tasks (identifier)
values (add_job.identifier)
on conflict do nothing;
-- Ensure all the queues exist
if add_job.queue_name is not null then
insert into :GRAPHILE_WORKER_SCHEMA._private_job_queues as job_queues (queue_name)
values (add_job.queue_name)
on conflict do nothing;
end if;
-- Insert job, but if one already exists then do nothing, even if the
-- existing job has already started (and thus represents an out-of-date
-- world state). This is dangerous because it means that whatever state
-- change triggered this add_job may not be acted upon (since it happened
-- after the existing job started executing, but no further job is being
-- scheduled), but it is useful in very rare circumstances for
-- de-duplication. If in doubt, DO NOT USE THIS.
insert into :GRAPHILE_WORKER_SCHEMA._private_jobs as jobs (
job_queue_id,
task_id,
payload,
run_at,
max_attempts,
key,
priority,
flags
)
select
job_queues.id,
tasks.id,
coalesce(add_job.payload, '{}'::json),
coalesce(add_job.run_at, now()),
coalesce(add_job.max_attempts::smallint, 25::smallint),
add_job.job_key,
coalesce(add_job.priority::smallint, 0::smallint),
(
select jsonb_object_agg(flag, true)
from unnest(add_job.flags) as item(flag)
)
from :GRAPHILE_WORKER_SCHEMA._private_tasks as tasks
left join :GRAPHILE_WORKER_SCHEMA._private_job_queues as job_queues
on job_queues.queue_name = add_job.queue_name
where tasks.identifier = add_job.identifier
on conflict (key)
-- Bump the updated_at so that there's something to return
do update set
revision = jobs.revision + 1,
updated_at = now()
returning *
into v_job;
if v_job.revision = 0 then
perform pg_notify('jobs:insert', '{"count":1}');
end if;
return v_job;
else
raise exception 'Invalid job_key_mode value, expected ''replace'', ''preserve_run_at'' or ''unsafe_dedupe''.' using errcode = 'GWBKM';
end if;
end;
$$;

CREATE OR REPLACE FUNCTION :GRAPHILE_WORKER_SCHEMA.add_jobs(specs :GRAPHILE_WORKER_SCHEMA.job_spec[], job_key_preserve_run_at boolean DEFAULT false) RETURNS SETOF :GRAPHILE_WORKER_SCHEMA._private_jobs
LANGUAGE plpgsql
AS $$
begin
-- Ensure all the tasks exist
insert into :GRAPHILE_WORKER_SCHEMA._private_tasks as tasks (identifier)
select distinct spec.identifier
from unnest(specs) spec
on conflict do nothing;
-- Ensure all the queues exist
insert into :GRAPHILE_WORKER_SCHEMA._private_job_queues as job_queues (queue_name)
select distinct spec.queue_name
from unnest(specs) spec
where spec.queue_name is not null
on conflict do nothing;
-- Ensure any locked jobs have their key cleared - in the case of locked
-- existing job create a new job instead as it must have already started
-- executing (i.e. it's world state is out of date, and the fact add_job
-- has been called again implies there's new information that needs to be
-- acted upon).
update :GRAPHILE_WORKER_SCHEMA._private_jobs as jobs
set
key = null,
attempts = jobs.max_attempts,
updated_at = now()
from unnest(specs) spec
where spec.job_key is not null
and jobs.key = spec.job_key
and is_available is not true;

-- WARNING: this count is not 100% accurate; 'on conflict' clause will cause it to be an overestimate
perform pg_notify('jobs:insert', '{"count":' || array_length(specs, 1)::text || '}');

-- TODO: is there a risk that a conflict could occur depending on the
-- isolation level?
return query insert into :GRAPHILE_WORKER_SCHEMA._private_jobs as jobs (
job_queue_id,
task_id,
payload,
run_at,
max_attempts,
key,
priority,
flags
)
select
job_queues.id,
tasks.id,
coalesce(spec.payload, '{}'::json),
coalesce(spec.run_at, now()),
coalesce(spec.max_attempts, 25),
spec.job_key,
coalesce(spec.priority, 0),
(
select jsonb_object_agg(flag, true)
from unnest(spec.flags) as item(flag)
)
from unnest(specs) spec
inner join :GRAPHILE_WORKER_SCHEMA._private_tasks as tasks
on tasks.identifier = spec.identifier
left join :GRAPHILE_WORKER_SCHEMA._private_job_queues as job_queues
on job_queues.queue_name = spec.queue_name
on conflict (key) do update set
job_queue_id = excluded.job_queue_id,
task_id = excluded.task_id,
payload =
case
when json_typeof(jobs.payload) = 'array' and json_typeof(excluded.payload) = 'array' then
(jobs.payload::jsonb || excluded.payload::jsonb)::json
else
excluded.payload
end,
max_attempts = excluded.max_attempts,
run_at = (case
when job_key_preserve_run_at is true and jobs.attempts = 0 then jobs.run_at
else excluded.run_at
end),
priority = excluded.priority,
revision = jobs.revision + 1,
flags = excluded.flags,
-- always reset error/retry state
attempts = 0,
last_error = null,
updated_at = now()
where jobs.locked_at is null
returning *;
end;
$$;

CREATE OR REPLACE FUNCTION :GRAPHILE_WORKER_SCHEMA.remove_job(job_key text) RETURNS :GRAPHILE_WORKER_SCHEMA._private_jobs
LANGUAGE plpgsql STRICT
AS $$
declare
v_job :GRAPHILE_WORKER_SCHEMA._private_jobs;
begin
-- Delete job if not locked
delete from :GRAPHILE_WORKER_SCHEMA._private_jobs as jobs
where key = job_key
and (
locked_at is null
or
locked_at < NOW() - interval '4 hours'
)
returning * into v_job;
if not (v_job is null) then
perform pg_notify('jobs:insert', '{"count":-1}');
return v_job;
end if;
-- Otherwise prevent job from retrying, and clear the key
update :GRAPHILE_WORKER_SCHEMA._private_jobs as jobs
set
key = null,
attempts = jobs.max_attempts,
updated_at = now()
where key = job_key
returning * into v_job;
return v_job;
end;
$$;
Loading