Skip to content

Commit

Permalink
Add thread based delayed workers (#3887)
Browse files Browse the repository at this point in the history
Co-authored-by: Johannes Haass <[email protected]>
  • Loading branch information
johha authored Sep 3, 2024
1 parent 39939a5 commit 8200f4c
Show file tree
Hide file tree
Showing 7 changed files with 296 additions and 13 deletions.
3 changes: 2 additions & 1 deletion lib/cloud_controller/config_schemas/base/api_schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,8 @@ class ApiSchema < VCAP::Config
optional(:app_usage_events_cleanup) => { timeout_in_seconds: Integer },
optional(:blobstore_delete) => { timeout_in_seconds: Integer },
optional(:diego_sync) => { timeout_in_seconds: Integer },
optional(:priorities) => Hash
optional(:priorities) => Hash,
optional(:number_of_worker_threads) => Integer
},

# perm settings no longer have any effect but are preserved here
Expand Down
3 changes: 2 additions & 1 deletion lib/cloud_controller/config_schemas/base/worker_schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ class WorkerSchema < VCAP::Config
optional(:app_usage_events_cleanup) => { timeout_in_seconds: Integer },
optional(:blobstore_delete) => { timeout_in_seconds: Integer },
optional(:diego_sync) => { timeout_in_seconds: Integer },
optional(:priorities) => Hash
optional(:priorities) => Hash,
optional(:number_of_worker_threads) => Integer
},

volume_services_enabled: bool,
Expand Down
5 changes: 4 additions & 1 deletion lib/delayed_job/delayed_worker.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
require 'delayed_job/threaded_worker'

class CloudController::DelayedWorker
def initialize(options)
@queue_options = {
Expand All @@ -21,7 +23,8 @@ def start_working
Delayed::Worker.max_attempts = 3
Delayed::Worker.max_run_time = config.get(:jobs, :global, :timeout_in_seconds) + 1
Delayed::Worker.logger = logger
worker = Delayed::Worker.new(@queue_options)
num_worker_threads = config.get(:jobs, :number_of_worker_threads)
worker = num_worker_threads.nil? ? Delayed::Worker.new(@queue_options) : ThreadedWorker.new(num_worker_threads, @queue_options)
worker.name = @queue_options[:worker_name]
worker.start
end
Expand Down
100 changes: 100 additions & 0 deletions lib/delayed_job/threaded_worker.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
class ThreadedWorker < Delayed::Worker
def initialize(num_threads, options={}, grace_period_seconds=30)
super(options)
@num_threads = num_threads
@threads = []
@unexpected_error = false
@mutex = Mutex.new
@grace_period_seconds = grace_period_seconds
end

def start
# add quit trap as in QuitTrap monkey patch
trap('QUIT') do
Thread.new { say 'Exiting...' }
stop
end

trap('TERM') do
Thread.new { say 'Exiting...' }
stop
raise SignalException.new('TERM') if self.class.raise_signal_exceptions
end

trap('INT') do
Thread.new { say 'Exiting...' }
stop
raise SignalException.new('INT') if self.class.raise_signal_exceptions && self.class.raise_signal_exceptions != :term
end

say "Starting threaded delayed worker with #{@num_threads} threads"

@num_threads.times do |thread_index|
thread = Thread.new do
Thread.current[:thread_name] = "thread:#{thread_index + 1}"
threaded_start
rescue Exception => e # rubocop:disable Lint/RescueException
say "Unexpected error: #{e.message}\n#{e.backtrace.join("\n")}", 'error'
@mutex.synchronize { @unexpected_error = true }
stop
end
@mutex.synchronize do
@threads << thread
end
end

@threads.each(&:join)
ensure
raise 'Unexpected error occurred in one of the worker threads' if @unexpected_error
end

def name
base_name = super
thread_name = Thread.current[:thread_name]
thread_name.nil? ? base_name : "#{base_name} #{thread_name}"
end

def stop
Thread.new do
say 'Shutting down worker threads gracefully...'
super

@threads.each do |t|
Thread.new do
t.join(@grace_period_seconds)
if t.alive?
say "Killing thread '#{t[:thread_name]}'"
t.kill
end
end
end.each(&:join) # Ensure all join threads complete
end
end

def threaded_start
self.class.lifecycle.run_callbacks(:execute, self) do
loop do
self.class.lifecycle.run_callbacks(:loop, self) do
@realtime = Benchmark.realtime do
@result = work_off
end
end

count = @result[0] + @result[1]

if count.zero?
if self.class.exit_on_complete
say 'No more jobs available. Exiting'
break
elsif !stop?
sleep(self.class.sleep_delay)
reload!
end
else
say sprintf("#{count} jobs processed at %.4f j/s, %d failed", count / @realtime, @result.last)
end
break if stop?
end
end
end
end
3 changes: 2 additions & 1 deletion spec/unit/lib/delayed_job/delayed_job_spec.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
RSpec.describe 'delayed_job' do
describe 'version' do
it 'is not updated' do
expect(Gem.loaded_specs['delayed_job'].version).to eq('4.1.11'), 'revisit monkey patch in lib/delayed_job/quit_trap.rb'
expect(Gem.loaded_specs['delayed_job'].version).to eq('4.1.11'),
'revisit monkey patch in lib/delayed_job/quit_trap.rb + review the changes related to lib/delayed_job/threaded_worker.rb'
end
end
end
35 changes: 26 additions & 9 deletions spec/unit/lib/delayed_job/delayed_worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@
RSpec.describe CloudController::DelayedWorker do
let(:options) { { queues: 'default', name: 'test_worker' } }
let(:environment) { instance_double(BackgroundJobEnvironment, setup_environment: nil) }
let(:worker) { instance_double(Delayed::Worker, start: nil) }
let(:delayed_worker) { instance_double(Delayed::Worker, start: nil) }
let(:threaded_worker) { instance_double(ThreadedWorker, start: nil) }

before do
allow(RakeConfig).to receive(:config).and_return(TestConfig.config_instance)
allow(BackgroundJobEnvironment).to receive(:new).with(anything).and_return(environment)
allow(Delayed::Worker).to receive(:new).and_return(worker)
allow(worker).to receive(:name=).with(anything)
allow(Delayed::Worker).to receive(:new).and_return(delayed_worker)
allow(delayed_worker).to receive(:name=).with(anything)
allow(ThreadedWorker).to receive(:new).and_return(threaded_worker)
allow(threaded_worker).to receive(:name=).with(anything)
end

describe '#initialize' do
Expand All @@ -28,22 +31,36 @@
end

describe '#start_working' do
let(:worker_instance) { CloudController::DelayedWorker.new(options) }
let(:cc_delayed_worker) { CloudController::DelayedWorker.new(options) }

it 'sets up the environment and starts the worker' do
expect(environment).to receive(:setup_environment).with(anything)
expect(worker).to receive(:name=).with('test_worker')
expect(worker).to receive(:start)
expect(environment).to receive(:setup_environment).with(nil)
expect(Delayed::Worker).to receive(:new).with(anything).and_return(delayed_worker)
expect(delayed_worker).to receive(:name=).with('test_worker')
expect(delayed_worker).to receive(:start)

worker_instance.start_working
cc_delayed_worker.start_working
end

it 'configures Delayed::Worker settings' do
worker_instance.start_working
cc_delayed_worker.start_working

expect(Delayed::Worker.destroy_failed_jobs).to be false
expect(Delayed::Worker.max_attempts).to eq(3)
expect(Delayed::Worker.max_run_time).to eq(14_401)
end

context 'when the number of threads is specified' do
before { TestConfig.config[:jobs].merge!(number_of_worker_threads: 7) }

it 'creates a ThreadedWorker with the specified number of threads' do
expect(environment).to receive(:setup_environment).with(nil)
expect(ThreadedWorker).to receive(:new).with(7, anything).and_return(threaded_worker)
expect(threaded_worker).to receive(:name=).with('test_worker')
expect(threaded_worker).to receive(:start)

cc_delayed_worker.start_working
end
end
end
end
160 changes: 160 additions & 0 deletions spec/unit/lib/delayed_job/threaded_worker_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
require 'spec_helper'
require 'delayed_job'
require 'delayed_job/threaded_worker'

RSpec.describe ThreadedWorker do
let(:num_threads) { 2 }
let(:grace_period_seconds) { 2 }
let(:worker) { ThreadedWorker.new(num_threads, {}, grace_period_seconds) }
let(:worker_name) { 'instance_name' }

before { worker.name = worker_name }

describe '#initialize' do
it 'sets up the thread count' do
expect(worker.instance_variable_get(:@num_threads)).to eq(num_threads)
end

it 'sets up the grace period' do
expect(worker.instance_variable_get(:@grace_period_seconds)).to eq(grace_period_seconds)
end

it 'sets up the grace period to 30 seconds by default' do
worker = ThreadedWorker.new(num_threads)
expect(worker.instance_variable_get(:@grace_period_seconds)).to eq(30)
end
end

describe '#start' do
before do
allow(worker).to receive(:threaded_start)
end

it 'sets up signal traps for all signals' do
expect(worker).to receive(:trap).with('TERM')
expect(worker).to receive(:trap).with('INT')
expect(worker).to receive(:trap).with('QUIT')
worker.start
end

it 'starts the specified number of threads' do
expect(worker).to receive(:threaded_start).exactly(num_threads).times

expect(worker.instance_variable_get(:@threads).length).to eq(0)
worker.start
expect(worker.instance_variable_get(:@threads).length).to eq(num_threads)
end

it 'logs the start and shutdown messages' do
expect(worker).to receive(:say).with("Starting threaded delayed worker with #{num_threads} threads")
worker.start
end

it 'sets the thread_name variable for each thread' do
worker.start
worker.instance_variable_get(:@threads).each_with_index do |thread, index|
expect(thread[:thread_name]).to eq("thread:#{index + 1}")
end
end

it 'logs the error and stops the worker when an unexpected error occurs' do
allow(worker).to receive(:threaded_start).and_raise(StandardError.new('test error'))
allow(worker).to receive(:stop)
expect { worker.start }.to raise_error('Unexpected error occurred in one of the worker threads')
expect(worker.instance_variable_get(:@unexpected_error)).to be true
end
end

describe '#name' do
it 'returns the instance name if thread name is set' do
allow(Thread.current).to receive(:[]).with(:thread_name).and_return('some-thread-name')
expect(worker.name).to eq('instance_name some-thread-name')
end

it 'returns the instance name if thread name is not set' do
allow(Thread.current).to receive(:[]).with(:thread_name).and_return(nil)
expect(worker.name).to eq(worker_name)
end
end

describe '#stop' do
it 'logs the shutdown message' do
queue = Queue.new
allow(worker).to(receive(:say)) { |message| queue.push(message) }

worker.stop
expect(queue.pop).to eq('Shutting down worker threads gracefully...')
end

it 'sets the exit flag in the parent worker' do
worker.stop
sleep 0.1 until worker.instance_variable_defined?(:@exit)
expect(worker.instance_variable_get(:@exit)).to be true
end

it 'allows threads to finish their work without being killed prematurely' do
allow(worker).to receive(:threaded_start) do
sleep grace_period_seconds / 2 until worker.instance_variable_get(:@exit) == true
end

worker_thread = Thread.new { worker.start }
sleep 0.1 until worker.instance_variable_get(:@threads).length == num_threads && worker.instance_variable_get(:@threads).all?(&:alive?)
worker.instance_variable_get(:@threads).each { |t| allow(t).to receive(:kill).and_call_original }

Thread.new { worker.stop }.join
worker_thread.join
worker.instance_variable_get(:@threads).each { |t| expect(t).not_to have_received(:kill) }
end

it 'kills threads that exceed the grace period during shutdown' do
allow(worker).to receive(:threaded_start) do
sleep grace_period_seconds * 2 until worker.instance_variable_get(:@exit) == true
end

worker_thread = Thread.new { worker.start }
sleep 0.1 until worker.instance_variable_get(:@threads).length == num_threads && worker.instance_variable_get(:@threads).all?(&:alive?)
worker.instance_variable_get(:@threads).each { |t| allow(t).to receive(:kill).and_call_original }

Thread.new { worker.stop }.join
worker_thread.join
expect(worker.instance_variable_get(:@threads)).to all(have_received(:kill))
end
end

describe '#threaded_start' do
before do
allow(worker).to receive(:work_off).and_return([5, 2])
allow(worker).to receive(:sleep)
allow(worker).to receive(:stop?).and_return(false, true)
allow(worker).to receive(:reload!).and_call_original
end

it 'runs the work_off loop twice' do
worker.threaded_start
expect(worker).to have_received(:work_off).twice
end

it 'logs the number of jobs processed' do
expect(worker).to receive(:say).with(%r{7 jobs processed at \d+\.\d+ j/s, 2 failed}).twice
worker.threaded_start
end

it 'reloads the worker if stop is not set' do
allow(worker).to receive(:work_off).and_return([0, 0])
worker.threaded_start
expect(worker).to have_received(:reload!).once
end

context 'when exit_on_complete is set' do
before do
allow(worker.class).to receive(:exit_on_complete).and_return(true)
allow(worker).to receive(:work_off).and_return([0, 0])
end

it 'exits the worker when no more jobs are available' do
expect(worker).to receive(:say).with('No more jobs available. Exiting')
worker.threaded_start
end
end
end
end

0 comments on commit 8200f4c

Please sign in to comment.