Skip to content

Commit

Permalink
Expect Job#scheduled_at to always be present; remove nil checks (#1585
Browse files Browse the repository at this point in the history
)
  • Loading branch information
bensheldon authored Jan 26, 2025
1 parent 2477ef0 commit 8ec59c5
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 18 deletions.
6 changes: 3 additions & 3 deletions app/models/concerns/good_job/filterable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ module Filterable
# Display records after this ID for keyset pagination
# @return [ActiveRecord::Relation]
scope :display_all, (lambda do |after_scheduled_at: nil, after_id: nil|
query = order(Arel.sql('COALESCE(scheduled_at, created_at) DESC, id DESC'))
query = order(Arel.sql('scheduled_at DESC, id DESC'))
if after_scheduled_at.present? && after_id.present?
query = query.where Arel::Nodes::Grouping.new([coalesce_scheduled_at_created_at, arel_table["id"]]).lt(Arel::Nodes::Grouping.new([bind_value('coalesce', after_scheduled_at, ActiveRecord::Type::DateTime), bind_value('id', after_id, ActiveRecord::ConnectionAdapters::PostgreSQL::OID::Uuid)]))
query = query.where Arel::Nodes::Grouping.new([arel_table["scheduled_at"], arel_table["id"]]).lt(Arel::Nodes::Grouping.new([bind_value('scheduled_at', after_scheduled_at, ActiveRecord::Type::DateTime), bind_value('id', after_id, ActiveRecord::ConnectionAdapters::PostgreSQL::OID::Uuid)]))
elsif after_scheduled_at.present?
query = query.where coalesce_scheduled_at_created_at.lt(bind_value('coalesce', after_scheduled_at, ActiveRecord::Type::DateTime))
query = query.where arel_table["scheduled_at"].lt(bind_value('scheduled_at', after_scheduled_at, ActiveRecord::Type::DateTime))
end
query
end)
Expand Down
23 changes: 10 additions & 13 deletions app/models/good_job/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ class Job < BaseRecord
scope :finished_before, ->(timestamp) { where(arel_table['finished_at'].lteq(bind_value('finished_at', timestamp, ActiveRecord::Type::DateTime))) }

# First execution will run in the future
scope :scheduled, -> { where(finished_at: nil).where(coalesce_scheduled_at_created_at.gt(bind_value('coalesce', Time.current, ActiveRecord::Type::DateTime))).where(params_execution_count.lt(2)) }
scope :scheduled, -> { where(finished_at: nil).where(arel_table['scheduled_at'].gt(bind_value('scheduled_at', Time.current, ActiveRecord::Type::DateTime))).where(params_execution_count.lt(2)) }
# Execution errored, will run in the future
scope :retried, -> { where(finished_at: nil).where(coalesce_scheduled_at_created_at.gt(bind_value('coalesce', Time.current, ActiveRecord::Type::DateTime))).where(params_execution_count.gt(1)) }
scope :retried, -> { where(finished_at: nil).where(arel_table['scheduled_at'].gt(bind_value('scheduled_at', Time.current, ActiveRecord::Type::DateTime))).where(params_execution_count.gt(1)) }
# Immediate/Scheduled time to run has passed, waiting for an available thread run
scope :queued, -> { where(performed_at: nil, finished_at: nil).where(coalesce_scheduled_at_created_at.lteq(bind_value('coalesce', Time.current, ActiveRecord::Type::DateTime))) }
scope :queued, -> { where(performed_at: nil, finished_at: nil).where(arel_table['scheduled_at'].lteq(bind_value('scheduled_at', Time.current, ActiveRecord::Type::DateTime))) }
# Advisory locked and executing
scope :running, -> { where.not(performed_at: nil).where(finished_at: nil) }
# Finished executing (succeeded or discarded)
Expand Down Expand Up @@ -94,7 +94,7 @@ class Job < BaseRecord
# @!method only_scheduled
# @!scope class
# @return [ActiveRecord::Relation]
scope :only_scheduled, -> { where(arel_table['scheduled_at'].lteq(bind_value('scheduled_at', DateTime.current, ActiveRecord::Type::DateTime))).or(where(scheduled_at: nil)) }
scope :only_scheduled, -> { where(arel_table['scheduled_at'].lteq(bind_value('scheduled_at', DateTime.current, ActiveRecord::Type::DateTime))) }

# Exclude jobs that are paused via queue_name or job_class.
# Only applies when enable_pauses configuration is true.
Expand Down Expand Up @@ -166,7 +166,7 @@ class Job < BaseRecord
# @!method schedule_ordered
# @!scope class
# @return [ActiveRecord::Relation]
scope :schedule_ordered, -> { order(coalesce_scheduled_at_created_at.asc) }
scope :schedule_ordered, -> { order(scheduled_at: :asc) }

# Get Jobs on queues that match the given queue string.
# @!method queue_string(string)
Expand Down Expand Up @@ -249,10 +249,6 @@ def params_execution_count
Arel.sql('integer')
)
end

def coalesce_scheduled_at_created_at
arel_table.coalesce(arel_table['scheduled_at'], arel_table['created_at'])
end
end

def self.build_for_enqueue(active_job, scheduled_at: nil)
Expand Down Expand Up @@ -344,12 +340,13 @@ def self.next_scheduled_at(after: nil, limit: 100, now_limit: nil)

after ||= Time.current
after_bind = bind_value('scheduled_at', after, ActiveRecord::Type::DateTime)
after_query = query.where(arel_table['scheduled_at'].gt(after_bind)).or query.where(scheduled_at: nil).where(arel_table['created_at'].gt(after_bind))
after_at = after_query.limit(limit).pluck(:scheduled_at, :created_at).map { |timestamps| timestamps.compact.first }
after_query = query.where(arel_table['scheduled_at'].gt(after_bind))
after_at = after_query.limit(limit).pluck(:scheduled_at)

if now_limit&.positive?
now_query = query.where(arel_table['scheduled_at'].lt(bind_value('scheduled_at', Time.current, ActiveRecord::Type::DateTime))).or query.where(scheduled_at: nil)
now_at = now_query.limit(now_limit).pluck(:scheduled_at, :created_at).map { |timestamps| timestamps.compact.first }
now_bind = bind_value('scheduled_at', Time.current, ActiveRecord::Type::DateTime)
now_query = query.where(arel_table['scheduled_at'].lt(now_bind))
now_at = now_query.limit(now_limit).pluck(:scheduled_at)
end

Array(now_at) + after_at
Expand Down
4 changes: 2 additions & 2 deletions spec/app/models/good_job/job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ def job_params

context "with multiple jobs and ordered queues" do
def job_params
{ active_job_id: SecureRandom.uuid, queue_name: "default", priority: 0, serialized_params: { job_class: "TestJob" } }
{ active_job_id: SecureRandom.uuid, scheduled_at: Time.current, queue_name: "default", priority: 0, serialized_params: { job_class: "TestJob" } }
end

let(:parsed_queues) { { include: %w{one two}, ordered_queues: true } }
Expand Down Expand Up @@ -1067,7 +1067,7 @@ def job_params
describe '.schedule_ordered' do
it 'orders by scheduled or created (oldest first)' do
query = described_class.schedule_ordered
expect(query.to_sql).to include('ORDER BY COALESCE')
expect(query.to_sql).to include('ORDER BY')
end
end

Expand Down

0 comments on commit 8ec59c5

Please sign in to comment.