Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
bensheldon authored Feb 21, 2024
2 parents 05aaed1 + 019b5b6 commit 09d2b4a
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 31 deletions.
2 changes: 1 addition & 1 deletion app/models/good_job/process.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ module GoodJob # :nodoc:
# ActiveRecord model that represents an GoodJob process (either async or CLI).
class Process < BaseRecord
include AdvisoryLockable
include AssignableConnection
include OverridableConnection

# Interval until the process record being updated
STALE_INTERVAL = 30.seconds
Expand Down
2 changes: 1 addition & 1 deletion lib/good_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
require "good_job/active_job_extensions/labels"
require "good_job/active_job_extensions/notify_options"

require "good_job/assignable_connection"
require "good_job/overridable_connection"
require "good_job/bulk"
require "good_job/callable"
require "good_job/capsule"
Expand Down
6 changes: 3 additions & 3 deletions lib/good_job/notifier/process_heartbeat.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ module ProcessHeartbeat

# Registers the current process.
def register_process
GoodJob::Process.with_connection(connection) do
GoodJob::Process.override_connection(connection) do
GoodJob::Process.cleanup
@process = GoodJob::Process.register
end
end

def refresh_process
Rails.application.executor.wrap do
GoodJob::Process.with_connection(connection) do
GoodJob::Process.override_connection(connection) do
GoodJob::Process.with_logger_silenced do
@process&.refresh_if_stale(cleanup: true)
end
Expand All @@ -32,7 +32,7 @@ def refresh_process

# Deregisters the current process.
def deregister_process
GoodJob::Process.with_connection(connection) do
GoodJob::Process.override_connection(connection) do
@process&.deregister
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,36 +3,29 @@
module GoodJob # :nodoc:
# Extends an ActiveRecord odel to override the connection and use
# an explicit connection that has been removed from the pool.
module AssignableConnection
module OverridableConnection
extend ActiveSupport::Concern

included do
thread_cattr_accessor :_connection
thread_cattr_accessor :_overridden_connection
end

class_methods do
# Assigns a connection to the model.
# @param conn [ActiveRecord::ConnectionAdapters::AbstractAdapter]
# @return [void]
def connection=(conn)
self._connection = conn
end

# Overrides the existing connection method to use the assigned connection
# @return [ActiveRecord::ConnectionAdapters::AbstractAdapter]
def connection
_connection || super
_overridden_connection || super
end

# Block interface to assign the connection, yield, then unassign the connection.
# @param conn [ActiveRecord::ConnectionAdapters::AbstractAdapter]
# @return [void]
def with_connection(conn)
original_conn = _connection
self.connection = conn
def override_connection(conn)
original_conn = _overridden_connection
self._overridden_connection = conn
yield
ensure
self._connection = original_conn
self._overridden_connection = original_conn
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion sorbet/tapioca/require.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
require "good_job/active_job_extensions/interrupt_errors"
require "good_job/active_job_extensions/notify_options"
require "good_job/adapter"
require "good_job/assignable_connection"
require "good_job/overridable_connection"
require "good_job/bulk"
require "good_job/capsule"
require "good_job/cleanup_tracker"
Expand Down
2 changes: 2 additions & 0 deletions spec/app/models/concerns/good_job/advisory_lockable_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,8 @@
done_event = Concurrent::Event.new

promise = rails_promise do
execution.class.connection # <= This is necessary to fixate the connection in the thread

execution.class.transaction do
execution.advisory_lock
locked_event.set
Expand Down
25 changes: 14 additions & 11 deletions spec/support/reset_good_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
THREAD_ERRORS = Concurrent::Array.new

ActiveSupport.on_load :active_record do
ActiveRecord::ConnectionAdapters::AbstractAdapter.set_callback :checkout, :before, lambda { |conn|
ActiveRecord::ConnectionAdapters::AbstractAdapter.set_callback :checkout, :after, lambda {
thread_name = Thread.current.name || Thread.current.object_id
conn.exec_query("SET application_name = #{conn.quote(thread_name)}", "Set application name")
sql = "SET application_name = #{quote(thread_name)}"

# Necessary because of https://github.com/rails/rails/pull/51083/files#r1496720821
@raw_connection ? @raw_connection.query(sql) : exec_query(sql, "Set application name")
}
end

Expand Down Expand Up @@ -127,14 +130,14 @@ def initialize_type_map(map = type_map)
end

class PgStatActivity < ActiveRecord::Base
include GoodJob::AssignableConnection
include GoodJob::OverridableConnection

self.table_name = 'pg_stat_activity'
self.primary_key = 'datid'
end

class PgLock < ActiveRecord::Base
include GoodJob::AssignableConnection
include GoodJob::OverridableConnection

self.table_name = 'pg_locks'
self.primary_key = 'objid'
Expand All @@ -147,31 +150,31 @@ class PgLock < ActiveRecord::Base
scope :others, -> { where('pid != pg_backend_pid()') }

def self.debug_own_locks(connection = ActiveRecord::Base.connection)
count = PgLock.with_connection(connection) do
count = PgLock.override_connection(connection) do
PgLock.current_database.advisory_lock.owns.count
end
return false if count.zero?

output = []
output << "There are #{count} advisory locks still open by the current database connection."
GoodJob::Execution.include(GoodJob::AssignableConnection)
GoodJob::Execution.with_connection(connection) do
GoodJob::Execution.include(GoodJob::OverridableConnection)
GoodJob::Execution.override_connection(connection) do
GoodJob::Execution.owns_advisory_locked.each.with_index do |execution, index|
output << "\nAdvisory locked GoodJob::Execution:" if index.zero?
output << " - Execution ID: #{execution.id} / Active Job ID: #{execution.active_job_id}"
end
end

GoodJob::BatchRecord.include(GoodJob::AssignableConnection)
GoodJob::BatchRecord.with_connection(connection) do
GoodJob::BatchRecord.include(GoodJob::OverridableConnection)
GoodJob::BatchRecord.override_connection(connection) do
GoodJob::BatchRecord.owns_advisory_locked.each.with_index do |batch, index|
output << "\nAdvisory locked GoodJob::Batch:" if index.zero?
output << " - BatchRecord ID: #{batch.id}"
end
end

GoodJob::Process.include(GoodJob::AssignableConnection)
GoodJob::Process.with_connection(connection) do
GoodJob::Process.include(GoodJob::OverridableConnection)
GoodJob::Process.override_connection(connection) do
GoodJob::Process.owns_advisory_locked.each.with_index do |process, index|
output << "\nAdvisory locked GoodJob::Process:" if index.zero?
output << " - Process ID: #{process.id}"
Expand Down

0 comments on commit 09d2b4a

Please sign in to comment.