Skip to content

Commit

Permalink
Implement unique_across_jobs/queues for concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
Earlopain committed Nov 7, 2023
1 parent 9729c3a commit d006a22
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 1 deletion.
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,15 @@ class MyJob < ApplicationJob
# Can be an Integer or Lambda/Proc that is invoked in the context of the job
perform_limit: 1,
# If true, the job class name will be part of this jobs concurrency key
# This allows two distinct job classes to set the same key (like a model id)
# without manually specifying a unique prefix for each job
unique_across_jobs: false
# If true, the job queue name will be part of this jobs concurrency key
# MyJob.perform_later("Alice") and MyJob.set(queue: "other").perform_later("Alice") would be considered distinct
unique_across_queues: false
# Note: Under heavy load, the total number of jobs may exceed the
# sum of `enqueue_limit` and `perform_limit` because of race conditions
# caused by imperfectly disjunctive states. If you need to constrain
Expand Down
9 changes: 8 additions & 1 deletion lib/good_job/active_job_extensions/concurrency.rb
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ def deserialize(job_data)

class_methods do
def good_job_control_concurrency_with(config)
config[:unique_across_jobs] = false if config[:unique_across_jobs].nil?
config[:unique_across_queues] = false if config[:unique_across_queues].nil?
self.good_job_concurrency_config = config
end
end
Expand All @@ -102,7 +104,12 @@ def _good_job_concurrency_key
key = instance_exec(&key) if key.respond_to?(:call)
raise TypeError, "Concurrency key must be a String; was a #{key.class}" unless VALID_TYPES.any? { |type| key.is_a?(type) }

key
parts = []
parts << self.class.name if self.class.good_job_concurrency_config[:unique_across_jobs]
parts << queue_name if self.class.good_job_concurrency_config[:unique_across_queues]
parts << key

parts.join("-")
end

private
Expand Down
44 changes: 44 additions & 0 deletions spec/lib/good_job/active_job_extensions/concurrency_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,50 @@ def perform
end
end

describe 'unique_across_jobs: and unique_across_queues:' do
before do
stub_const 'TestJob', (Class.new(ActiveJob::Base) do
include GoodJob::ActiveJobExtensions::Concurrency
def perform(arg)
end
end)
end

it 'includes the job name in the concurrency key if unique_across_jobs is true' do
TestJob.good_job_control_concurrency_with(
unique_across_jobs: true,
key: -> { arguments.first }
)
job = TestJob.perform_later("Alice")

expect(job.good_job_concurrency_key).to eq("TestJob-Alice")
end

it 'includes the job name in the concurrency key if unique_across_queues is true' do
TestJob.good_job_control_concurrency_with(
unique_across_queues: true,
key: -> { arguments.first }
)

first_job = TestJob.perform_later("Alice")
expect(first_job.good_job_concurrency_key).to eq("default-Alice")

second_job = TestJob.set(queue: :other_queue).perform_later("Bob")
expect(second_job.good_job_concurrency_key).to eq("other_queue-Bob")
end

it 'includes the job name and queue in the concurrency key if both are true' do
TestJob.good_job_control_concurrency_with(
unique_across_jobs: true,
unique_across_queues: true,
key: -> { arguments.first }
)

job = TestJob.perform_later("Alice")
expect(job.good_job_concurrency_key).to eq("TestJob-default-Alice")
end
end

describe '#perform_later' do
before do
stub_const 'TestJob', (Class.new(ActiveJob::Base) do
Expand Down

0 comments on commit d006a22

Please sign in to comment.