Skip to content

Commit

Permalink
Add GoodJob.current_thread_running? and `GoodJob.current_thread_shu…
Browse files Browse the repository at this point in the history
…tting_down?` for graceful shutdowns
  • Loading branch information
bensheldon committed Feb 28, 2024
1 parent ab51290 commit 4ad1805
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 3 deletions.
21 changes: 18 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ For more of the story of GoodJob, read the [introductory blog post](https://isla
- [Exceptions](#exceptions)
- [Retries](#retries)
- [Action Mailer retries](#action-mailer-retries)
- [Interrupts](#interrupts)
- [Interrupts, graceful shutdown, and SIGKILL](#Interrupts-graceful-shutdown-and-SIGKILL)
- [Timeouts](#timeouts)
- [Optimize queues, threads, and processes](#optimize-queues-threads-and-processes)
- [Database connections](#database-connections)
Expand Down Expand Up @@ -937,9 +937,24 @@ end
Note, that `ActionMailer::MailDeliveryJob` is a default since Rails 6.0. Be sure that your app is using that class, as it
might also be configured to use (deprecated now) `ActionMailer::DeliveryJob`.
### Interrupts
### Interrupts, graceful shutdown, and SIGKILL
Jobs will be automatically retried if the process is interrupted while performing a job, for example as the result of a `SIGKILL` or power failure.
When GoodJob receives an interrupt (SIGINT, SIGTERM) or explicitly with `GoodJob.shutdown`, GoodJob will attempt to gracefully shut down, waiting for all jobs to finish before exiting based on the `shutdown_timeout` configuration.
To detect the start of a graceful shutdown from within a performing job, for example while looping/iterating over multiple items, you can call `GoodJob.current_thread_shutting_down?` or `GoodJob.current_thread_running?` from within the job. For example:
```ruby
def perform(lots_of_records)
lots_of_records.each do |record|
break if GoodJob.current_thread_shutting_down? # or `unless GoodJob.current_thread.running?`
# process record ...
end
end
````
Note that when running jobs in `:inline` execution mode, `GoodJob.current_thread_running?` will always be truthy and `GoodJob.current_thread_shutting_down?` will always be falsey.
Jobs will be automatically retried if the process is interrupted while performing a job and the job is unable to finish before the timeout or as the result of a `SIGKILL` or power failure.
If you need more control over interrupt-caused retries, include the `GoodJob::ActiveJobExtensions::InterruptErrors` extension in your job class. When an interrupted job is retried, the extension will raise a `GoodJob::InterruptError` exception within the job, which allows you to use Active Job's `retry_on` and `discard_on` to control the behavior of the job.
Expand Down
2 changes: 2 additions & 0 deletions lib/good_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@
require "good_job/scheduler"
require "good_job/shared_executor"
require "good_job/systemd_service"
require "good_job/thread_status"

# GoodJob is a multithreaded, Postgres-based, ActiveJob backend for Ruby on Rails.
#
# +GoodJob+ is the top-level namespace and exposes configuration attributes.
module GoodJob
include GoodJob::Dependencies
include GoodJob::ThreadStatus

# Default, null, blank value placeholder.
NONE = Module.new.freeze
Expand Down
2 changes: 2 additions & 0 deletions lib/good_job/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,8 @@ def create_executor
def create_task(delay = 0, fanout: false)
future = Concurrent::ScheduledTask.new(delay, args: [self, performer], executor: executor, timer_set: timer_set) do |thr_scheduler, thr_performer|
Thread.current.name = Thread.current.name.sub("-worker-", "-thread-") if Thread.current.name
Thread.current[:good_job_scheduler] = thr_scheduler

Rails.application.reloader.wrap do
thr_performer.next do |found|
thr_scheduler.create_thread({ fanout: fanout }) if found && fanout
Expand Down
25 changes: 25 additions & 0 deletions lib/good_job/thread_status.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
module GoodJob
# Provides methods for determining the status of the
# current job execution thread. This is useful for determining
# whether to continue processing a job or to shut down gracefully.
module ThreadStatus
extend ActiveSupport::Concern

class_methods do
# Whether the current job execution thread is in a running state.
# @return [Boolean]
def current_thread_running?
scheduler = Thread.current[:good_job_scheduler]
scheduler ? scheduler.running? : true
end

# Whether the current job execution thread is shutting down
# (the opposite of running).
# @return [Boolean]
def current_thread_shutting_down?
scheduler = Thread.current[:good_job_scheduler]
scheduler && !scheduler.running?
end
end
end
end
55 changes: 55 additions & 0 deletions spec/lib/good_job/thread_status_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# frozen_string_literal: true

require 'rails_helper'

RSpec.describe GoodJob::ThreadStatus do
describe ".current_thread_running?" do
context "when called outside of an execution context" do
it "returns true" do
expect(GoodJob.current_thread_running?).to eq(true)
end
end
end

describe ".current_thread_shutting_down?" do
context "when called outside of an execution context" do
it "returns nil" do
expect(GoodJob.current_thread_shutting_down?).to eq(nil)
end
end

end

context "when inside of an execution context" do
let(:capsule) { GoodJob::Capsule.new }
let(:adapter) { GoodJob::Adapter.new(execution_mode: :async_all, _capsule: capsule) }

before do
stub_const "JOB_RUNNING_EVENT", Concurrent::Event.new
stub_const "START_SHUTDOWN_EVENT", Concurrent::Event.new
stub_const "STATUSES", []
stub_const "TestJob", (Class.new(ActiveJob::Base) do def perform
STATUSES << GoodJob.current_thread_running?
STATUSES << GoodJob.current_thread_shutting_down?
JOB_RUNNING_EVENT.set
START_SHUTDOWN_EVENT.wait(5)
STATUSES << GoodJob.current_thread_running?
STATUSES << GoodJob.current_thread_shutting_down?
end
end)

TestJob.queue_adapter = adapter
end

it "returns proper values" do
TestJob.perform_later

JOB_RUNNING_EVENT.wait(5)
capsule.shutdown(timeout: nil) # don't wait for the shutdown to complete
START_SHUTDOWN_EVENT.set
capsule.shutdown

expect(STATUSES).to eq([true, false, false, true])
end
end
end

0 comments on commit 4ad1805

Please sign in to comment.