Skip to content

Commit

Permalink
Merge branch 'main' into expose-good-job-labels-in-dashboard
Browse files Browse the repository at this point in the history
  • Loading branch information
bensheldon authored Dec 27, 2024
2 parents 38a55f8 + 83ff5f7 commit 4a5139a
Show file tree
Hide file tree
Showing 16 changed files with 92 additions and 12 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,11 @@ jobs:
strategy:
fail-fast: false
matrix:
ruby: ["3.0", 3.1, 3.2, 3.3]
ruby: ["3.0", 3.1, 3.2, 3.3, 3.4]
gemfile: [rails_6.1, rails_7.0, rails_7.1, rails_7.2, rails_8.0, rails_head]
pg: [17]
include:
- ruby: 3.3
- ruby: 3.4
gemfile: rails_7.2
pg: 10
exclude:
Expand Down
20 changes: 20 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,25 @@
# Changelog

## [v4.6.0](https://github.com/bensheldon/good_job/tree/v4.6.0) (2024-12-12)

[Full Changelog](https://github.com/bensheldon/good_job/compare/v4.5.1...v4.6.0)

**Implemented enhancements:**

- Set job execution thread priority to `-3` when in async mode [\#1560](https://github.com/bensheldon/good_job/pull/1560) ([bensheldon](https://github.com/bensheldon))

**Closed issues:**

- Attaching metadata to jobs [\#1558](https://github.com/bensheldon/good_job/issues/1558)
- Lower Ruby Thread priority for jobs by default when running in Async mode [\#1554](https://github.com/bensheldon/good_job/issues/1554)
- NoMethodError: undefined method `\<' for nil \(process.rb:125 in stale?\) [\#1363](https://github.com/bensheldon/good_job/issues/1363)
- Install PgHero on the Demo app [\#1166](https://github.com/bensheldon/good_job/issues/1166)

**Merged pull requests:**

- Bump rails-html-sanitizer from 1.6.0 to 1.6.1 [\#1557](https://github.com/bensheldon/good_job/pull/1557) ([dependabot[bot]](https://github.com/apps/dependabot))
- Add PGHero to the demo app [\#1294](https://github.com/bensheldon/good_job/pull/1294) ([mec](https://github.com/mec))

## [v4.5.1](https://github.com/bensheldon/good_job/tree/v4.5.1) (2024-11-29)

[Full Changelog](https://github.com/bensheldon/good_job/compare/v4.5.0...v4.5.1)
Expand Down
2 changes: 1 addition & 1 deletion Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ gem 'pg', platforms: [:mri, :mingw, :x64_mingw]

rails_versions = {
"6.1" => { github: "rails/rails", branch: "6-1-stable" }, # https://github.com/bensheldon/good_job/issues/1280
"7.0" => "~> 7.0.1", # Ruby 3.1 requires Rails 7.0.1+
"7.0" => { github: "rails/rails", branch: "7-0-stable" }, # Ruby 3.4 requires bigdecimal which rails doesn't declare
"7.1" => "~> 7.1.0",
"7.2" => "~> 7.2.0",
"8.0" => "~> 8.0.0",
Expand Down
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
good_job (4.5.1)
good_job (4.6.0)
activejob (>= 6.1.0)
activerecord (>= 6.1.0)
concurrent-ruby (>= 1.3.1)
Expand Down
1 change: 1 addition & 0 deletions checksums/good_job-4.6.0.gem.sha256
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
a5287e96f2d756199107954a1621e2c1ab3fd6cc6f6a7021ae8cd5e9773d264a
1 change: 1 addition & 0 deletions checksums/good_job-4.6.0.gem.sha512
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
5057a9bf9bc45c8462e4ff90a14c13e6e15fedf553ba53ef3964de96fdf65061cbe57a269c359268d9f53f0ceafceaeba2a6c2623ff10fc98b8b0db5f93b31c4
1 change: 1 addition & 0 deletions lib/good_job/adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ def start_async
return unless execute_async?

@capsule.start
@capsule.lower_thread_priority = true if GoodJob.configuration.lower_thread_priority.in?([true, nil])
@_async_started = true
end

Expand Down
10 changes: 9 additions & 1 deletion lib/good_job/capsule.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def initialize(configuration: nil)

@shared_executor = GoodJob::SharedExecutor.new
@tracker = GoodJob::CapsuleTracker.new(executor: @shared_executor)
@lower_thread_priority = nil

self.class.instances << self
end
Expand All @@ -38,7 +39,9 @@ def start(force: false)

@notifier = GoodJob::Notifier.new(enable_listening: configuration.enable_listen_notify, capsule: self, executor: @shared_executor)
@poller = GoodJob::Poller.new(poll_interval: configuration.poll_interval)
@multi_scheduler = GoodJob::MultiScheduler.from_configuration(configuration, capsule: self, warm_cache_on_initialize: true)
@multi_scheduler = GoodJob::MultiScheduler.from_configuration(configuration, capsule: self, warm_cache_on_initialize: true).tap do |multischeduler|
multischeduler.lower_thread_priority = @lower_thread_priority unless @lower_thread_priority.nil?
end
@notifier.recipients.push([@multi_scheduler, :create_thread])
@poller.recipients.push(-> { @multi_scheduler.create_thread({ fanout: true }) })

Expand Down Expand Up @@ -110,6 +113,11 @@ def process_id
@tracker.process_id
end

def lower_thread_priority=(value)
@lower_thread_priority = value
@multi_scheduler&.lower_thread_priority = value
end

private

def configuration
Expand Down
8 changes: 8 additions & 0 deletions lib/good_job/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,14 @@ def in_webserver?
end || false
end

def lower_thread_priority
return options[:lower_thread_priority] unless options[:lower_thread_priority].nil?
return rails_config[:lower_thread_priority] unless rails_config[:lower_thread_priority].nil?
return ActiveModel::Type::Boolean.new.cast(env['GOOD_JOB_LOWER_THREAD_PRIORITY']) unless env['GOOD_JOB_LOWER_THREAD_PRIORITY'].nil?

nil
end

# Whether to take an advisory lock on the process record in the notifier reactor.
# @return [Boolean]
def advisory_lock_heartbeat
Expand Down
9 changes: 8 additions & 1 deletion lib/good_job/multi_scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ def self.from_configuration(configuration, capsule: GoodJob.capsule, warm_cache_
max_cache: configuration.max_cache,
warm_cache_on_initialize: warm_cache_on_initialize,
cleanup_interval_seconds: configuration.cleanup_interval_seconds,
cleanup_interval_jobs: configuration.cleanup_interval_jobs
cleanup_interval_jobs: configuration.cleanup_interval_jobs,
lower_thread_priority: configuration.lower_thread_priority
)
end

Expand Down Expand Up @@ -85,6 +86,12 @@ def create_thread(state = nil)
end
end

def lower_thread_priority=(value)
schedulers.each do |scheduler|
scheduler.lower_thread_priority = value
end
end

def stats
scheduler_stats = schedulers.map(&:stats)

Expand Down
13 changes: 12 additions & 1 deletion lib/good_job/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ class Scheduler
fallback_policy: :discard,
}.freeze

# In CRuby, this sets the thread quantum to ~12.5ms ( 100ms * 2^(-3) ).
LOW_THREAD_PRIORITY = -3

# @!attribute [r] instances
# @!scope class
# List of all instantiated Schedulers in the current process.
Expand All @@ -39,13 +42,18 @@ class Scheduler
# @return [String]
attr_reader :name

# Whether to lower the thread priority to a fixed value
# @return [Boolean]
attr_accessor :lower_thread_priority

# @param performer [GoodJob::JobPerformer]
# @param max_threads [Numeric, nil] number of seconds between polls for jobs
# @param max_cache [Numeric, nil] maximum number of scheduled jobs to cache in memory
# @param warm_cache_on_initialize [Boolean] whether to warm the cache immediately, or manually by calling +warm_cache+
# @param cleanup_interval_seconds [Numeric, nil] number of seconds between cleaning up job records
# @param cleanup_interval_jobs [Numeric, nil] number of executed jobs between cleaning up job records
def initialize(performer, max_threads: nil, max_cache: nil, warm_cache_on_initialize: false, cleanup_interval_seconds: nil, cleanup_interval_jobs: nil)
# @param lower_thread_priority [Boolean] whether to lower the thread priority of execution threads
def initialize(performer, max_threads: nil, max_cache: nil, warm_cache_on_initialize: false, cleanup_interval_seconds: nil, cleanup_interval_jobs: nil, lower_thread_priority: false)
raise ArgumentError, "Performer argument must implement #next" unless performer.respond_to?(:next)

@performer = performer
Expand All @@ -62,6 +70,8 @@ def initialize(performer, max_threads: nil, max_cache: nil, warm_cache_on_initia
@cleanup_tracker = CleanupTracker.new(cleanup_interval_seconds: cleanup_interval_seconds, cleanup_interval_jobs: cleanup_interval_jobs)
@executor_options[:name] = name

self.lower_thread_priority = lower_thread_priority

create_executor
warm_cache if warm_cache_on_initialize
self.class.instances << self
Expand Down Expand Up @@ -271,6 +281,7 @@ def create_task(delay = 0, fanout: false)
future = Concurrent::ScheduledTask.new(delay, args: [self, performer], executor: executor, timer_set: timer_set) do |thr_scheduler, thr_performer|
Thread.current.name = Thread.current.name.sub("-worker-", "-thread-") if Thread.current.name
Thread.current[:good_job_scheduler] = thr_scheduler
Thread.current.priority = -3 if thr_scheduler.lower_thread_priority

Rails.application.reloader.wrap do
thr_performer.next do |found|
Expand Down
2 changes: 1 addition & 1 deletion lib/good_job/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

module GoodJob
# GoodJob gem version.
VERSION = '4.5.1'
VERSION = '4.6.0'

# GoodJob version as Gem::Version object
GEM_VERSION = Gem::Version.new(VERSION)
Expand Down
7 changes: 6 additions & 1 deletion spec/app/jobs/example_job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,12 @@
execution = good_job.executions.last
expect(execution.error).to be_present
expect(execution.error_backtrace.count).to be > 100
expect(execution.filtered_error_backtrace).to eq(["app/jobs/example_job.rb:41:in `perform'"])

if RUBY_VERSION >= "3.4"
expect(execution.filtered_error_backtrace).to eq(["app/jobs/example_job.rb:41:in 'ExampleJob#perform'"])
else
expect(execution.filtered_error_backtrace).to eq(["app/jobs/example_job.rb:41:in `perform'"])
end
end
end

Expand Down
9 changes: 7 additions & 2 deletions spec/lib/good_job/active_job_extensions/concurrency_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,13 @@ def perform(name:)
expect(GoodJob::Job.where(concurrency_key: "Bob").count).to eq 1

expect(TestJob.logger.formatter).to have_received(:call).with("INFO", anything, anything, a_string_matching(/Aborted enqueue of TestJob \(Job ID: .*\) because the concurrency key 'Alice' has reached its enqueue limit of 2 jobs/)).exactly(:once)
expect(TestJob.logger.formatter).to have_received(:call).with("INFO", anything, anything, a_string_matching(/Enqueued TestJob \(Job ID: .*\) to \(default\) with arguments: {:name=>"Alice"}/)).exactly(:twice)
expect(TestJob.logger.formatter).to have_received(:call).with("INFO", anything, anything, a_string_matching(/Enqueued TestJob \(Job ID: .*\) to \(default\) with arguments: {:name=>"Bob"}/)).exactly(:once)
if RUBY_VERSION >= "3.4"
expect(TestJob.logger.formatter).to have_received(:call).with("INFO", anything, anything, a_string_matching(/Enqueued TestJob \(Job ID: .*\) to \(default\) with arguments: {name: "Alice"}/)).exactly(:twice)
expect(TestJob.logger.formatter).to have_received(:call).with("INFO", anything, anything, a_string_matching(/Enqueued TestJob \(Job ID: .*\) to \(default\) with arguments: {name: "Bob"}/)).exactly(:once)
else
expect(TestJob.logger.formatter).to have_received(:call).with("INFO", anything, anything, a_string_matching(/Enqueued TestJob \(Job ID: .*\) to \(default\) with arguments: {:name=>"Alice"}/)).exactly(:twice)
expect(TestJob.logger.formatter).to have_received(:call).with("INFO", anything, anything, a_string_matching(/Enqueued TestJob \(Job ID: .*\) to \(default\) with arguments: {:name=>"Bob"}/)).exactly(:once)
end
end

it 'excludes jobs that are already executing/locked' do
Expand Down
12 changes: 11 additions & 1 deletion spec/lib/good_job/adapter_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def perform(succeed: true)
allow(GoodJob::Job).to receive(:enqueue).and_return(good_job)
allow(GoodJob::Notifier).to receive(:notify)

capsule = instance_double(GoodJob::Capsule, start: nil, create_thread: nil)
capsule = instance_double(GoodJob::Capsule, start: nil, create_thread: nil, "lower_thread_priority=": nil)
allow(GoodJob).to receive(:capsule).and_return(capsule)
allow(capsule).to receive(:start)

Expand All @@ -99,6 +99,16 @@ def perform(succeed: true)
expect(capsule).to have_received(:create_thread)
expect(GoodJob::Notifier).to have_received(:notify).with({ queue_name: 'default' })
end

it 'lowers the thread priority of the capsule' do
capsule = instance_double(GoodJob::Capsule, start: nil, create_thread: nil, "lower_thread_priority=": nil)
allow(GoodJob).to receive(:capsule).and_return(capsule)
allow(capsule).to receive(:start)

described_class.new(execution_mode: :async_all)

expect(capsule).to have_received(:lower_thread_priority=).with(true)
end
end
end

Expand Down
3 changes: 3 additions & 0 deletions spec/rails_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
Warning.ignore(%r{/lib/selenium/.*URI::RFC3986_PARSER.escape is obsolete})
# https://github.com/teamcapybara/capybara/pull/2781
Warning.ignore(%r{/lib/capybara/.*URI::RFC3986_PARSER.make_regexp is obsolete})
# https://github.com/rails/rails/pull/54053
Warning.ignore(%r{the block passed to 'ActiveModel::Type::Value#serializable\?'})
Warning.ignore(%r{the block passed to 'ActiveModel::Attribute#value'})

require File.expand_path('../demo/config/environment', __dir__)

Expand Down

0 comments on commit 4a5139a

Please sign in to comment.