diff --git a/README.md b/README.md index cd37e2d2..4a4e6d45 100644 --- a/README.md +++ b/README.md @@ -453,6 +453,64 @@ module Maintenance end ``` +### Subscribing to instrumentation events + +If you are interested in actioning a specific task event, please refer to the [Using Task Callbacks](#using-task-callbacks) section below. However, if you want to subscribe to all events, irrespective of the task, you can use the following Active Support notifications: + +```ruby +enqueued.maintenance_tasks # This event is published when a task has been enqueued by the user. +succeeded.maintenance_tasks # This event is published when a task has finished without any errors. +cancelled.maintenance_tasks # This event is published when the user explicitly halts the execution of a task. +paused.maintenance_tasks # This event is published when a task is paused by the user in the middle of its run. +errored.maintenance_tasks # This event is published when the task's code produces an unhandled exception. +``` + +These notifications offer a way to monitor the lifecycle of maintenance tasks in your application. + +Usage example: + + ```ruby + ActiveSupport::Notifications.subscribe("succeeded.maintenance_tasks") do |*, payload| + task_name = payload[:task_name] + arguments = payload[:arguments] + metadata = payload[:metadata] + job_id = payload[:job_id] + run_id = payload[:run_id] + time_running = payload[:time_running] + started_at = payload[:started_at] + ended_at = payload[:ended_at] +rescue => e + Rails.logger.error(e) +end + +ActiveSupport::Notifications.subscribe("errored.maintenance_tasks") do |*, payload| + task_name = payload[:task_name] + error = payload[:error] + error_message = error[:message] + error_class = error[:class] + error_backtrace = error[:backtrace] +rescue => e + Rails.logger.error(e) +end + +# or + +class MaintenanceTasksInstrumenter < ActiveSupport::Subscriber + attach_to :maintenance_tasks + + def enqueued(event) + task_name = event.payload[:task_name] + arguments = event.payload[:arguments] + metadata = event.payload[:metadata] + + SlackNotifier.broadcast(SLACK_CHANNEL, + "Job #{task_name} was started by #{metadata[:user_email]}} with arguments #{arguments.to_s.truncate(255)}") + rescue => e + Rails.logger.error(e) + end +end +``` + ### Using Task Callbacks The Task provides callbacks that hook into its life cycle. @@ -503,21 +561,6 @@ end If any of the other callbacks cause an exception, it will be handled by the error handler, and will cause the task to stop running. -Callback behaviour can be shared across all tasks using an initializer. - -```ruby -# config/initializer/maintenance_tasks.rb -Rails.autoloaders.main.on_load("MaintenanceTasks::Task") do - MaintenanceTasks::Task.class_eval do - after_start(:notify) - - private - - def notify; end - end -end -``` - ### Considerations when writing Tasks Maintenance Tasks relies on the queue adapter configured for your application to diff --git a/app/models/maintenance_tasks/run.rb b/app/models/maintenance_tasks/run.rb index e684fa13..44b6c2d4 100644 --- a/app/models/maintenance_tasks/run.rb +++ b/app/models/maintenance_tasks/run.rb @@ -39,6 +39,8 @@ class Run < ApplicationRecord enum status: STATUSES.to_h { |status| [status, status.to_s] } end + after_save :instrument_status_change + validate :task_name_belongs_to_a_valid_task, on: :create validate :csv_attachment_presence, on: :create validate :csv_content_type, on: :create @@ -452,6 +454,30 @@ def task private + def instrument_status_change + return unless status_previously_changed? || id_previously_changed? + return if running? || pausing? || cancelling? || interrupted? + + attr = { + run_id: id, + job_id: job_id, + task_name: task_name, + arguments: arguments, + metadata: metadata, + time_running: time_running, + started_at: started_at, + ended_at: ended_at, + } + + attr[:error] = { + message: error_message, + class: error_class, + backtrace: backtrace, + } if errored? + + ActiveSupport::Notifications.instrument("#{status}.maintenance_tasks", attr) + end + def run_task_callbacks(callback) task.run_callbacks(callback) rescue Task::NotFoundError diff --git a/test/jobs/maintenance_tasks/task_job_test.rb b/test/jobs/maintenance_tasks/task_job_test.rb index b13cf7dc..c4dcee7d 100644 --- a/test/jobs/maintenance_tasks/task_job_test.rb +++ b/test/jobs/maintenance_tasks/task_job_test.rb @@ -499,7 +499,9 @@ class << self JobIteration.stubs(interruption_adapter: -> { true }) # Simulate cancel happening after we've already checked @run.cancelling? - @run.expects(:cancelling?).twice.with do + @run.expects(:cancelling?).at_least(2).with do + next true if caller.any?(/`instrument_status_change'\z/) # avoid endless loop + Run.find(@run.id).cancel end.returns(false).then.returns(true) @@ -512,7 +514,9 @@ class << self JobIteration.stubs(interruption_adapter: -> { true }) # Simulate pause happening after we've already checked @run.pausing? - @run.expects(:pausing?).twice.with do + @run.expects(:pausing?).at_least(2).with do + next true if caller.any?(/`instrument_status_change'\z/) # avoid endless loop + Run.find(@run.id).pausing! end.returns(false).then.returns(true) diff --git a/test/models/maintenance_tasks/run_test.rb b/test/models/maintenance_tasks/run_test.rb index 6904d01e..7490ade9 100644 --- a/test/models/maintenance_tasks/run_test.rb +++ b/test/models/maintenance_tasks/run_test.rb @@ -3,6 +3,30 @@ require "test_helper" module MaintenanceTasks + class Notifier < ActiveSupport::Subscriber + NOTIFICATIONS = [ + :enqueued, + :succeeded, + :paused, + :cancelled, + :errored, + ].freeze + + attach_to :maintenance_tasks + + class << self + def payload + Thread.current[:payload] ||= {} + end + end + + NOTIFICATIONS.each do |notification| + define_method(notification) do |event| + self.class.payload.merge!("#{notification}.maintenance_tasks" => event.payload) + end + end + end + class RunTest < ActiveSupport::TestCase test "invalid if the task doesn't exist" do run = Run.new(task_name: "Maintenance::DoesNotExist") @@ -81,7 +105,9 @@ class RunTest < ActiveSupport::TestCase run = Run.create!(task_name: "Maintenance::CallbackTestTask", status: "running") run.status = :succeeded run.task.expects(:after_complete_callback) + run.persist_transition + assert_equal(expected_notification(run), Notifier.payload.fetch("succeeded.maintenance_tasks")) end test "#persist_transition calls the cancel callback" do @@ -92,6 +118,7 @@ class RunTest < ActiveSupport::TestCase run.status = :cancelled run.task.expects(:after_cancel_callback) run.persist_transition + assert_equal(expected_notification(run), Notifier.payload.fetch("cancelled.maintenance_tasks")) end test "#persist_transition calls the pause callback" do @@ -99,6 +126,7 @@ class RunTest < ActiveSupport::TestCase run.status = :paused run.task.expects(:after_pause_callback) run.persist_transition + assert_equal(expected_notification(run), Notifier.payload.fetch("paused.maintenance_tasks")) end test "#persist_transition with a race condition moves the run to the proper status and calls the right callback" do @@ -110,6 +138,7 @@ class RunTest < ActiveSupport::TestCase run.status = :interrupted run.persist_transition + assert_equal(expected_notification(run), Notifier.payload.fetch("cancelled.maintenance_tasks")) assert_predicate run.reload, :cancelled? end @@ -122,6 +151,7 @@ class RunTest < ActiveSupport::TestCase run.status = :succeeded run.persist_transition + assert_equal(expected_notification(run), Notifier.payload.fetch("succeeded.maintenance_tasks")) assert_predicate run.reload, :succeeded? end @@ -172,6 +202,13 @@ class RunTest < ActiveSupport::TestCase error.set_backtrace(["lib/foo.rb:42:in `bar'"]) run.task.expects(:after_error_callback) run.persist_error(error) + + payload = Notifier.payload.fetch("errored.maintenance_tasks") + error = payload.delete(:error) + assert_equal(expected_notification(run), payload) + assert_equal "ArgumentError", error[:class] + assert_equal "Something went wrong", error[:message] + assert_equal ["lib/foo.rb:42:in `bar'"], error[:backtrace] end test "#persist_error can handle error callback raising" do @@ -583,6 +620,12 @@ class RunTest < ActiveSupport::TestCase end end + test "#create defaults to the enqueued status" do + run = Run.create!(task_name: "Maintenance::CallbackTestTask") + assert_predicate(run, :enqueued?) + assert_equal(expected_notification(run), Notifier.payload.fetch("enqueued.maintenance_tasks")) + end + test "#task returns Task instance for Run" do run = Run.new(task_name: "Maintenance::UpdatePostsTask") assert_kind_of Maintenance::UpdatePostsTask, run.task @@ -690,6 +733,19 @@ class RunTest < ActiveSupport::TestCase private + def expected_notification(run) + { + run_id: run.id, + job_id: run.job_id, + task_name: run.task_name, + arguments: run.arguments, + metadata: run.metadata, + time_running: run.time_running, + started_at: run.started_at, + ended_at: run.ended_at, + } + end + def count_uncached_queries(&block) count = 0