Skip to content

Commit

Permalink
Add GoodJob.current_thread_running? and `GoodJob.current_thread_shu…
Browse files Browse the repository at this point in the history
…tting_down?` for graceful shutdowns
  • Loading branch information
bensheldon committed Feb 29, 2024
1 parent ab51290 commit b5648f8
Show file tree
Hide file tree
Showing 10 changed files with 127 additions and 11 deletions.
21 changes: 18 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ For more of the story of GoodJob, read the [introductory blog post](https://isla
- [Exceptions](#exceptions)
- [Retries](#retries)
- [Action Mailer retries](#action-mailer-retries)
- [Interrupts](#interrupts)
- [Interrupts, graceful shutdown, and SIGKILL](#Interrupts-graceful-shutdown-and-SIGKILL)
- [Timeouts](#timeouts)
- [Optimize queues, threads, and processes](#optimize-queues-threads-and-processes)
- [Database connections](#database-connections)
Expand Down Expand Up @@ -937,9 +937,24 @@ end
Note, that `ActionMailer::MailDeliveryJob` is a default since Rails 6.0. Be sure that your app is using that class, as it
might also be configured to use (deprecated now) `ActionMailer::DeliveryJob`.
### Interrupts
### Interrupts, graceful shutdown, and SIGKILL
Jobs will be automatically retried if the process is interrupted while performing a job, for example as the result of a `SIGKILL` or power failure.
When GoodJob receives an interrupt (SIGINT, SIGTERM) or explicitly with `GoodJob.shutdown`, GoodJob will attempt to gracefully shut down, waiting for all jobs to finish before exiting based on the `shutdown_timeout` configuration.
To detect the start of a graceful shutdown from within a performing job, for example while looping/iterating over multiple items, you can call `GoodJob.current_thread_shutting_down?` or `GoodJob.current_thread_running?` from within the job. For example:
```ruby
def perform(lots_of_records)
lots_of_records.each do |record|
break if GoodJob.current_thread_shutting_down? # or `unless GoodJob.current_thread.running?`
# process record ...
end
end
````
Note that when running jobs in `:inline` execution mode, `GoodJob.current_thread_running?` will always be truthy and `GoodJob.current_thread_shutting_down?` will always be falsey.
Jobs will be automatically retried if the process is interrupted while performing a job and the job is unable to finish before the timeout or as the result of a `SIGKILL` or power failure.
If you need more control over interrupt-caused retries, include the `GoodJob::ActiveJobExtensions::InterruptErrors` extension in your job class. When an interrupted job is retried, the extension will raise a `GoodJob::InterruptError` exception within the job, which allows you to use Active Job's `retry_on` and `discard_on` to control the behavior of the job.
Expand Down
2 changes: 2 additions & 0 deletions lib/good_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@
require "good_job/scheduler"
require "good_job/shared_executor"
require "good_job/systemd_service"
require "good_job/thread_status"

# GoodJob is a multithreaded, Postgres-based, ActiveJob backend for Ruby on Rails.
#
# +GoodJob+ is the top-level namespace and exposes configuration attributes.
module GoodJob
include GoodJob::Dependencies
include GoodJob::ThreadStatus

# Default, null, blank value placeholder.
NONE = Module.new.freeze
Expand Down
2 changes: 1 addition & 1 deletion lib/good_job/current_thread.rb
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ module CurrentThread
# @return [void]
def self.reset(values = {})
ACCESSORS.each do |accessor|
send("#{accessor}=", values[accessor])
send(:"#{accessor}=", values[accessor])
end
end

Expand Down
6 changes: 3 additions & 3 deletions lib/good_job/notifier.rb
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def connected?(timeout: nil)
if timeout.nil?
@connected.set?
else
@connected.wait(timeout == -1 ? nil : timeout)
@connected.wait(timeout&.negative? ? nil : timeout)
end
end

Expand All @@ -104,7 +104,7 @@ def listening?(timeout: nil)
if timeout.nil?
@listening.set?
else
@listening.wait(timeout == -1 ? nil : timeout)
@listening.wait(timeout&.negative? ? nil : timeout)
end
end

Expand All @@ -130,7 +130,7 @@ def shutdown(timeout: -1)
@listening.reset
@shutdown_event.set
else
@shutdown_event.wait(timeout == -1 ? nil : timeout) unless timeout.nil?
@shutdown_event.wait(timeout&.negative? ? nil : timeout) unless timeout.nil?
@connected.reset if @shutdown_event.set?
end
@shutdown_event.set?
Expand Down
2 changes: 2 additions & 0 deletions lib/good_job/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,8 @@ def create_executor
def create_task(delay = 0, fanout: false)
future = Concurrent::ScheduledTask.new(delay, args: [self, performer], executor: executor, timer_set: timer_set) do |thr_scheduler, thr_performer|
Thread.current.name = Thread.current.name.sub("-worker-", "-thread-") if Thread.current.name
Thread.current[:good_job_scheduler] = thr_scheduler

Rails.application.reloader.wrap do
thr_performer.next do |found|
thr_scheduler.create_thread({ fanout: fanout }) if found && fanout
Expand Down
27 changes: 27 additions & 0 deletions lib/good_job/thread_status.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# frozen_string_literal: true

module GoodJob
# Provides methods for determining the status of the
# current job execution thread. This is useful for determining
# whether to continue processing a job or to shut down gracefully.
module ThreadStatus
extend ActiveSupport::Concern

class_methods do
# Whether the current job execution thread is in a running state.
# @return [Boolean]
def current_thread_running?
scheduler = Thread.current[:good_job_scheduler]
scheduler ? scheduler.running? : true
end

# Whether the current job execution thread is shutting down
# (the opposite of running).
# @return [Boolean]
def current_thread_shutting_down?
scheduler = Thread.current[:good_job_scheduler]
scheduler && !scheduler.running?
end
end
end
end
3 changes: 3 additions & 0 deletions sorbet/rbi/todo.rbi
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ module ::ERROR_TRIGGERED; end
module ::ErrorJob; end
module ::ExpectedError; end
module ::JOB_PERFORMED; end
module ::JOB_RUNNING_EVENT; end
module ::JobError; end
module ::LATCH; end
module ::MemoryProfiler; end
Expand All @@ -21,6 +22,8 @@ module ::RESULTS; end
module ::RUN_JOBS; end
module ::RecursiveJob; end
module ::RetryableError; end
module ::START_SHUTDOWN_EVENT; end
module ::STATUSES; end
module ::SimpleJob; end
module ::THREAD_HAS_RUN; end
module ::THREAD_JOBS; end
Expand Down
8 changes: 4 additions & 4 deletions spec/lib/good_job/current_thread_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,25 @@
].each do |accessor|
describe ".#{accessor}" do
it 'maintains value across threads' do
described_class.send "#{accessor}=", 'apple'
described_class.send :"#{accessor}=", 'apple'

Thread.new do
described_class.send "#{accessor}=", 'bear'
described_class.send :"#{accessor}=", 'bear'
end.join

expect(described_class.send(accessor)).to eq 'apple'
end

it 'maintains value across Rails reloader wrapper' do
Rails.application.reloader.wrap do
described_class.send "#{accessor}=", 'apple'
described_class.send :"#{accessor}=", 'apple'
end

expect(described_class.send(accessor)).to eq 'apple'
end

it 'is resettable' do
described_class.send "#{accessor}=", 'apple'
described_class.send :"#{accessor}=", 'apple'
described_class.reset
expect(described_class.send(accessor)).to be_nil
end
Expand Down
8 changes: 8 additions & 0 deletions spec/lib/good_job/notifier_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,14 @@
notifier.shutdown
expect(notifier).to be_shutdown
end

it 'can be shut down asynchronously' do
notifier = described_class.new(executor: executor, enable_listening: true)
wait_until { expect(notifier).to be_listening }
notifier.shutdown(timeout: nil)
wait_until { expect(notifier).to be_shutdown }
notifier.shutdown
end
end

describe '#restart' do
Expand Down
59 changes: 59 additions & 0 deletions spec/lib/good_job/thread_status_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# frozen_string_literal: true

require 'rails_helper'

RSpec.describe GoodJob::ThreadStatus do
describe ".current_thread_running?" do
context "when called outside of an execution context" do
it "returns true" do
expect(GoodJob.current_thread_running?).to eq(true)
end
end
end

describe ".current_thread_shutting_down?" do
context "when called outside of an execution context" do
it "returns nil" do
expect(GoodJob.current_thread_shutting_down?).to eq(nil)
end
end
end

context "when inside of an execution context" do
let(:capsule) { GoodJob::Capsule.new }
let(:adapter) { GoodJob::Adapter.new(execution_mode: :async_all, _capsule: capsule) }

before do
stub_const "JOB_RUNNING_EVENT", Concurrent::Event.new
stub_const "START_SHUTDOWN_EVENT", Concurrent::Event.new
stub_const "STATUSES", []
stub_const "TestJob", (Class.new(ActiveJob::Base) do
def perform
STATUSES << GoodJob.current_thread_running?
STATUSES << GoodJob.current_thread_shutting_down?
JOB_RUNNING_EVENT.set
START_SHUTDOWN_EVENT.wait(5)
STATUSES << GoodJob.current_thread_running?
STATUSES << GoodJob.current_thread_shutting_down?
end
end)

TestJob.queue_adapter = adapter
end

after do
capsule.shutdown
end

it "returns proper values" do
TestJob.perform_later

JOB_RUNNING_EVENT.wait(5)
capsule.shutdown(timeout: nil) # don't wait for the shutdown to complete
START_SHUTDOWN_EVENT.set
capsule.shutdown

expect(STATUSES).to eq([true, false, false, true])
end
end
end

0 comments on commit b5648f8

Please sign in to comment.