diff --git a/README.md b/README.md index 26e86423b..1ff479b44 100644 --- a/README.md +++ b/README.md @@ -57,7 +57,7 @@ For more of the story of GoodJob, read the [introductory blog post](https://isla - [Exceptions](#exceptions) - [Retries](#retries) - [Action Mailer retries](#action-mailer-retries) - - [Interrupts](#interrupts) + - [Interrupts, graceful shutdown, and SIGKILL](#Interrupts-graceful-shutdown-and-SIGKILL) - [Timeouts](#timeouts) - [Optimize queues, threads, and processes](#optimize-queues-threads-and-processes) - [Database connections](#database-connections) @@ -974,9 +974,24 @@ end Note, that `ActionMailer::MailDeliveryJob` is a default since Rails 6.0. Be sure that your app is using that class, as it might also be configured to use (deprecated now) `ActionMailer::DeliveryJob`. -### Interrupts +### Interrupts, graceful shutdown, and SIGKILL -Jobs will be automatically retried if the process is interrupted while performing a job, for example as the result of a `SIGKILL` or power failure. +When GoodJob receives an interrupt (SIGINT, SIGTERM) or explicitly with `GoodJob.shutdown`, GoodJob will attempt to gracefully shut down, waiting for all jobs to finish before exiting based on the `shutdown_timeout` configuration. + +To detect the start of a graceful shutdown from within a performing job, for example while looping/iterating over multiple items, you can call `GoodJob.current_thread_shutting_down?` or `GoodJob.current_thread_running?` from within the job. For example: + +```ruby +def perform(lots_of_records) + lots_of_records.each do |record| + break if GoodJob.current_thread_shutting_down? # or `unless GoodJob.current_thread.running?` + # process record ... + end +end +```` + +Note that when running jobs in `:inline` execution mode, `GoodJob.current_thread_running?` will always be truthy and `GoodJob.current_thread_shutting_down?` will always be falsey. + +Jobs will be automatically retried if the process is interrupted while performing a job and the job is unable to finish before the timeout or as the result of a `SIGKILL` or power failure. If you need more control over interrupt-caused retries, include the `GoodJob::ActiveJobExtensions::InterruptErrors` extension in your job class. When an interrupted job is retried, the extension will raise a `GoodJob::InterruptError` exception within the job, which allows you to use Active Job's `retry_on` and `discard_on` to control the behavior of the job. diff --git a/lib/good_job.rb b/lib/good_job.rb index e81607c52..75609a63d 100644 --- a/lib/good_job.rb +++ b/lib/good_job.rb @@ -40,12 +40,14 @@ require "good_job/scheduler" require "good_job/shared_executor" require "good_job/systemd_service" +require "good_job/thread_status" # GoodJob is a multithreaded, Postgres-based, ActiveJob backend for Ruby on Rails. # # +GoodJob+ is the top-level namespace and exposes configuration attributes. module GoodJob include GoodJob::Dependencies + include GoodJob::ThreadStatus # Default, null, blank value placeholder. NONE = Module.new.freeze diff --git a/lib/good_job/current_thread.rb b/lib/good_job/current_thread.rb index 018db7592..9078d40b0 100644 --- a/lib/good_job/current_thread.rb +++ b/lib/good_job/current_thread.rb @@ -78,7 +78,7 @@ module CurrentThread # @return [void] def self.reset(values = {}) ACCESSORS.each do |accessor| - send("#{accessor}=", values[accessor]) + send(:"#{accessor}=", values[accessor]) end end diff --git a/lib/good_job/notifier.rb b/lib/good_job/notifier.rb index 4a35ff34a..ec16360cf 100644 --- a/lib/good_job/notifier.rb +++ b/lib/good_job/notifier.rb @@ -93,7 +93,7 @@ def connected?(timeout: nil) if timeout.nil? @connected.set? else - @connected.wait(timeout == -1 ? nil : timeout) + @connected.wait(timeout&.negative? ? nil : timeout) end end @@ -104,7 +104,7 @@ def listening?(timeout: nil) if timeout.nil? @listening.set? else - @listening.wait(timeout == -1 ? nil : timeout) + @listening.wait(timeout&.negative? ? nil : timeout) end end @@ -130,7 +130,7 @@ def shutdown(timeout: -1) @listening.reset @shutdown_event.set else - @shutdown_event.wait(timeout == -1 ? nil : timeout) unless timeout.nil? + @shutdown_event.wait(timeout&.negative? ? nil : timeout) unless timeout.nil? @connected.reset if @shutdown_event.set? end @shutdown_event.set? diff --git a/lib/good_job/scheduler.rb b/lib/good_job/scheduler.rb index 7b409130f..9af6b1ca3 100644 --- a/lib/good_job/scheduler.rb +++ b/lib/good_job/scheduler.rb @@ -270,6 +270,8 @@ def create_executor def create_task(delay = 0, fanout: false) future = Concurrent::ScheduledTask.new(delay, args: [self, performer], executor: executor, timer_set: timer_set) do |thr_scheduler, thr_performer| Thread.current.name = Thread.current.name.sub("-worker-", "-thread-") if Thread.current.name + Thread.current[:good_job_scheduler] = thr_scheduler + Rails.application.reloader.wrap do thr_performer.next do |found| thr_scheduler.create_thread({ fanout: fanout }) if found && fanout diff --git a/lib/good_job/thread_status.rb b/lib/good_job/thread_status.rb new file mode 100644 index 000000000..ea9eb838c --- /dev/null +++ b/lib/good_job/thread_status.rb @@ -0,0 +1,27 @@ +# frozen_string_literal: true + +module GoodJob + # Provides methods for determining the status of the + # current job execution thread. This is useful for determining + # whether to continue processing a job or to shut down gracefully. + module ThreadStatus + extend ActiveSupport::Concern + + class_methods do + # Whether the current job execution thread is in a running state. + # @return [Boolean] + def current_thread_running? + scheduler = Thread.current[:good_job_scheduler] + scheduler ? scheduler.running? : true + end + + # Whether the current job execution thread is shutting down + # (the opposite of running). + # @return [Boolean] + def current_thread_shutting_down? + scheduler = Thread.current[:good_job_scheduler] + scheduler && !scheduler.running? + end + end + end +end diff --git a/sorbet/rbi/todo.rbi b/sorbet/rbi/todo.rbi index b21a37d30..1f008f767 100644 --- a/sorbet/rbi/todo.rbi +++ b/sorbet/rbi/todo.rbi @@ -13,6 +13,7 @@ module ::ERROR_TRIGGERED; end module ::ErrorJob; end module ::ExpectedError; end module ::JOB_PERFORMED; end +module ::JOB_RUNNING_EVENT; end module ::JobError; end module ::LATCH; end module ::MemoryProfiler; end @@ -21,6 +22,8 @@ module ::RESULTS; end module ::RUN_JOBS; end module ::RecursiveJob; end module ::RetryableError; end +module ::START_SHUTDOWN_EVENT; end +module ::STATUSES; end module ::SimpleJob; end module ::THREAD_HAS_RUN; end module ::THREAD_JOBS; end diff --git a/spec/lib/good_job/current_thread_spec.rb b/spec/lib/good_job/current_thread_spec.rb index b316d33fc..00fd5a19e 100644 --- a/spec/lib/good_job/current_thread_spec.rb +++ b/spec/lib/good_job/current_thread_spec.rb @@ -18,10 +18,10 @@ ].each do |accessor| describe ".#{accessor}" do it 'maintains value across threads' do - described_class.send "#{accessor}=", 'apple' + described_class.send :"#{accessor}=", 'apple' Thread.new do - described_class.send "#{accessor}=", 'bear' + described_class.send :"#{accessor}=", 'bear' end.join expect(described_class.send(accessor)).to eq 'apple' @@ -29,14 +29,14 @@ it 'maintains value across Rails reloader wrapper' do Rails.application.reloader.wrap do - described_class.send "#{accessor}=", 'apple' + described_class.send :"#{accessor}=", 'apple' end expect(described_class.send(accessor)).to eq 'apple' end it 'is resettable' do - described_class.send "#{accessor}=", 'apple' + described_class.send :"#{accessor}=", 'apple' described_class.reset expect(described_class.send(accessor)).to be_nil end diff --git a/spec/lib/good_job/notifier_spec.rb b/spec/lib/good_job/notifier_spec.rb index 2f2bef2e7..649d573f8 100644 --- a/spec/lib/good_job/notifier_spec.rb +++ b/spec/lib/good_job/notifier_spec.rb @@ -147,6 +147,14 @@ notifier.shutdown expect(notifier).to be_shutdown end + + it 'can be shut down asynchronously' do + notifier = described_class.new(executor: executor, enable_listening: true) + wait_until { expect(notifier).to be_listening } + notifier.shutdown(timeout: nil) + wait_until { expect(notifier).to be_shutdown } + notifier.shutdown + end end describe '#restart' do diff --git a/spec/lib/good_job/thread_status_spec.rb b/spec/lib/good_job/thread_status_spec.rb new file mode 100644 index 000000000..6b4b724a0 --- /dev/null +++ b/spec/lib/good_job/thread_status_spec.rb @@ -0,0 +1,59 @@ +# frozen_string_literal: true + +require 'rails_helper' + +RSpec.describe GoodJob::ThreadStatus do + describe ".current_thread_running?" do + context "when called outside of an execution context" do + it "returns true" do + expect(GoodJob.current_thread_running?).to eq(true) + end + end + end + + describe ".current_thread_shutting_down?" do + context "when called outside of an execution context" do + it "returns nil" do + expect(GoodJob.current_thread_shutting_down?).to eq(nil) + end + end + end + + context "when inside of an execution context" do + let(:capsule) { GoodJob::Capsule.new } + let(:adapter) { GoodJob::Adapter.new(execution_mode: :async_all, _capsule: capsule) } + + before do + stub_const "JOB_RUNNING_EVENT", Concurrent::Event.new + stub_const "START_SHUTDOWN_EVENT", Concurrent::Event.new + stub_const "STATUSES", [] + stub_const "TestJob", (Class.new(ActiveJob::Base) do + def perform + STATUSES << GoodJob.current_thread_running? + STATUSES << GoodJob.current_thread_shutting_down? + JOB_RUNNING_EVENT.set + START_SHUTDOWN_EVENT.wait(5) + STATUSES << GoodJob.current_thread_running? + STATUSES << GoodJob.current_thread_shutting_down? + end + end) + + TestJob.queue_adapter = adapter + end + + after do + capsule.shutdown + end + + it "returns proper values" do + TestJob.perform_later + + JOB_RUNNING_EVENT.wait(5) + capsule.shutdown(timeout: nil) # don't wait for the shutdown to complete + START_SHUTDOWN_EVENT.set + capsule.shutdown + + expect(STATUSES).to eq([true, false, false, true]) + end + end +end