Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add GoodJob.current_thread_running? and GoodJob.current_thread_shutting_down? for graceful shutdowns #1253

Merged
merged 1 commit into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ For more of the story of GoodJob, read the [introductory blog post](https://isla
- [Exceptions](#exceptions)
- [Retries](#retries)
- [Action Mailer retries](#action-mailer-retries)
- [Interrupts](#interrupts)
- [Interrupts, graceful shutdown, and SIGKILL](#Interrupts-graceful-shutdown-and-SIGKILL)
- [Timeouts](#timeouts)
- [Optimize queues, threads, and processes](#optimize-queues-threads-and-processes)
- [Database connections](#database-connections)
Expand Down Expand Up @@ -974,9 +974,24 @@ end
Note, that `ActionMailer::MailDeliveryJob` is a default since Rails 6.0. Be sure that your app is using that class, as it
might also be configured to use (deprecated now) `ActionMailer::DeliveryJob`.

### Interrupts
### Interrupts, graceful shutdown, and SIGKILL

Jobs will be automatically retried if the process is interrupted while performing a job, for example as the result of a `SIGKILL` or power failure.
When GoodJob receives an interrupt (SIGINT, SIGTERM) or explicitly with `GoodJob.shutdown`, GoodJob will attempt to gracefully shut down, waiting for all jobs to finish before exiting based on the `shutdown_timeout` configuration.

To detect the start of a graceful shutdown from within a performing job, for example while looping/iterating over multiple items, you can call `GoodJob.current_thread_shutting_down?` or `GoodJob.current_thread_running?` from within the job. For example:

```ruby
def perform(lots_of_records)
lots_of_records.each do |record|
break if GoodJob.current_thread_shutting_down? # or `unless GoodJob.current_thread.running?`
# process record ...
end
end
````

Note that when running jobs in `:inline` execution mode, `GoodJob.current_thread_running?` will always be truthy and `GoodJob.current_thread_shutting_down?` will always be falsey.

Jobs will be automatically retried if the process is interrupted while performing a job and the job is unable to finish before the timeout or as the result of a `SIGKILL` or power failure.

If you need more control over interrupt-caused retries, include the `GoodJob::ActiveJobExtensions::InterruptErrors` extension in your job class. When an interrupted job is retried, the extension will raise a `GoodJob::InterruptError` exception within the job, which allows you to use Active Job's `retry_on` and `discard_on` to control the behavior of the job.

Expand Down
2 changes: 2 additions & 0 deletions lib/good_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@
require "good_job/scheduler"
require "good_job/shared_executor"
require "good_job/systemd_service"
require "good_job/thread_status"

# GoodJob is a multithreaded, Postgres-based, ActiveJob backend for Ruby on Rails.
#
# +GoodJob+ is the top-level namespace and exposes configuration attributes.
module GoodJob
include GoodJob::Dependencies
include GoodJob::ThreadStatus

# Default, null, blank value placeholder.
NONE = Module.new.freeze
Expand Down
2 changes: 1 addition & 1 deletion lib/good_job/current_thread.rb
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ module CurrentThread
# @return [void]
def self.reset(values = {})
ACCESSORS.each do |accessor|
send("#{accessor}=", values[accessor])
send(:"#{accessor}=", values[accessor])
end
end

Expand Down
6 changes: 3 additions & 3 deletions lib/good_job/notifier.rb
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def connected?(timeout: nil)
if timeout.nil?
@connected.set?
else
@connected.wait(timeout == -1 ? nil : timeout)
@connected.wait(timeout&.negative? ? nil : timeout)
end
end

Expand All @@ -104,7 +104,7 @@ def listening?(timeout: nil)
if timeout.nil?
@listening.set?
else
@listening.wait(timeout == -1 ? nil : timeout)
@listening.wait(timeout&.negative? ? nil : timeout)
end
end

Expand All @@ -130,7 +130,7 @@ def shutdown(timeout: -1)
@listening.reset
@shutdown_event.set
else
@shutdown_event.wait(timeout == -1 ? nil : timeout) unless timeout.nil?
@shutdown_event.wait(timeout&.negative? ? nil : timeout) unless timeout.nil?
@connected.reset if @shutdown_event.set?
end
@shutdown_event.set?
Expand Down
2 changes: 2 additions & 0 deletions lib/good_job/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ def create_executor
def create_task(delay = 0, fanout: false)
future = Concurrent::ScheduledTask.new(delay, args: [self, performer], executor: executor, timer_set: timer_set) do |thr_scheduler, thr_performer|
Thread.current.name = Thread.current.name.sub("-worker-", "-thread-") if Thread.current.name
Thread.current[:good_job_scheduler] = thr_scheduler

Rails.application.reloader.wrap do
thr_performer.next do |found|
thr_scheduler.create_thread({ fanout: fanout }) if found && fanout
Expand Down
27 changes: 27 additions & 0 deletions lib/good_job/thread_status.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# frozen_string_literal: true

module GoodJob
# Provides methods for determining the status of the
# current job execution thread. This is useful for determining
# whether to continue processing a job or to shut down gracefully.
module ThreadStatus
extend ActiveSupport::Concern

class_methods do
# Whether the current job execution thread is in a running state.
# @return [Boolean]
def current_thread_running?
scheduler = Thread.current[:good_job_scheduler]
scheduler ? scheduler.running? : true
end

# Whether the current job execution thread is shutting down
# (the opposite of running).
# @return [Boolean]
def current_thread_shutting_down?
scheduler = Thread.current[:good_job_scheduler]
scheduler && !scheduler.running?
end
end
end
end
3 changes: 3 additions & 0 deletions sorbet/rbi/todo.rbi
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ module ::ERROR_TRIGGERED; end
module ::ErrorJob; end
module ::ExpectedError; end
module ::JOB_PERFORMED; end
module ::JOB_RUNNING_EVENT; end
module ::JobError; end
module ::LATCH; end
module ::MemoryProfiler; end
Expand All @@ -21,6 +22,8 @@ module ::RESULTS; end
module ::RUN_JOBS; end
module ::RecursiveJob; end
module ::RetryableError; end
module ::START_SHUTDOWN_EVENT; end
module ::STATUSES; end
module ::SimpleJob; end
module ::THREAD_HAS_RUN; end
module ::THREAD_JOBS; end
Expand Down
8 changes: 4 additions & 4 deletions spec/lib/good_job/current_thread_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,25 @@
].each do |accessor|
describe ".#{accessor}" do
it 'maintains value across threads' do
described_class.send "#{accessor}=", 'apple'
described_class.send :"#{accessor}=", 'apple'

Thread.new do
described_class.send "#{accessor}=", 'bear'
described_class.send :"#{accessor}=", 'bear'
end.join

expect(described_class.send(accessor)).to eq 'apple'
end

it 'maintains value across Rails reloader wrapper' do
Rails.application.reloader.wrap do
described_class.send "#{accessor}=", 'apple'
described_class.send :"#{accessor}=", 'apple'
end

expect(described_class.send(accessor)).to eq 'apple'
end

it 'is resettable' do
described_class.send "#{accessor}=", 'apple'
described_class.send :"#{accessor}=", 'apple'
described_class.reset
expect(described_class.send(accessor)).to be_nil
end
Expand Down
8 changes: 8 additions & 0 deletions spec/lib/good_job/notifier_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,14 @@
notifier.shutdown
expect(notifier).to be_shutdown
end

it 'can be shut down asynchronously' do
notifier = described_class.new(executor: executor, enable_listening: true)
wait_until { expect(notifier).to be_listening }
notifier.shutdown(timeout: nil)
wait_until { expect(notifier).to be_shutdown }
notifier.shutdown
end
end

describe '#restart' do
Expand Down
59 changes: 59 additions & 0 deletions spec/lib/good_job/thread_status_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# frozen_string_literal: true

require 'rails_helper'

RSpec.describe GoodJob::ThreadStatus do
describe ".current_thread_running?" do
context "when called outside of an execution context" do
it "returns true" do
expect(GoodJob.current_thread_running?).to eq(true)
end
end
end

describe ".current_thread_shutting_down?" do
context "when called outside of an execution context" do
it "returns nil" do
expect(GoodJob.current_thread_shutting_down?).to eq(nil)
end
end
end

context "when inside of an execution context" do
let(:capsule) { GoodJob::Capsule.new }
let(:adapter) { GoodJob::Adapter.new(execution_mode: :async_all, _capsule: capsule) }

before do
stub_const "JOB_RUNNING_EVENT", Concurrent::Event.new
stub_const "START_SHUTDOWN_EVENT", Concurrent::Event.new
stub_const "STATUSES", []
stub_const "TestJob", (Class.new(ActiveJob::Base) do
def perform
STATUSES << GoodJob.current_thread_running?
STATUSES << GoodJob.current_thread_shutting_down?
JOB_RUNNING_EVENT.set
START_SHUTDOWN_EVENT.wait(5)
STATUSES << GoodJob.current_thread_running?
STATUSES << GoodJob.current_thread_shutting_down?
end
end)

TestJob.queue_adapter = adapter
end

after do
capsule.shutdown
end

it "returns proper values" do
TestJob.perform_later

JOB_RUNNING_EVENT.wait(5)
capsule.shutdown(timeout: nil) # don't wait for the shutdown to complete
START_SHUTDOWN_EVENT.set
capsule.shutdown

expect(STATUSES).to eq([true, false, false, true])
end
end
end