Skip to content

Commit

Permalink
Apply review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
johha committed Sep 3, 2024
1 parent d916965 commit 496be60
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 36 deletions.
8 changes: 3 additions & 5 deletions lib/delayed_job/threaded_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,8 @@ def start
Thread.current[:thread_name] = "thread:#{thread_index + 1}"
threaded_start
rescue Exception => e # rubocop:disable Lint/RescueException
@mutex.synchronize do
say "Unexpected error: #{e.message}\n#{e.backtrace.join("\n")}", 'error'
@unexpected_error = true
end
say "Unexpected error: #{e.message}\n#{e.backtrace.join("\n")}", 'error'
@mutex.synchronize { @unexpected_error = true }
stop
end
@mutex.synchronize do
Expand All @@ -53,7 +51,7 @@ def start
def name
base_name = super
thread_name = Thread.current[:thread_name]
thread_name ? "#{base_name} #{thread_name}" : base_name
thread_name.nil? ? base_name : "#{base_name} #{thread_name}"
end

def stop
Expand Down
48 changes: 17 additions & 31 deletions spec/unit/lib/delayed_job/threaded_worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,11 @@

RSpec.describe ThreadedWorker do
let(:num_threads) { 2 }
let(:grace_period_seconds) { 5 }
let(:grace_period_seconds) { 2 }
let(:worker) { ThreadedWorker.new(num_threads, {}, grace_period_seconds) }
let(:worker_name) { 'instance_name' }

before do
allow(worker).to receive(:say)
allow(worker).to receive_messages(work_off: [5, 2], sleep: nil)

worker.name = worker_name
end
before { worker.name = worker_name }

describe '#initialize' do
it 'sets up the thread count' do
Expand All @@ -33,7 +28,6 @@
describe '#start' do
before do
allow(worker).to receive(:threaded_start)
allow(worker.instance_variable_get(:@mutex)).to receive(:synchronize).and_call_original
end

it 'sets up signal traps for all signals' do
Expand All @@ -46,14 +40,13 @@
it 'starts the specified number of threads' do
expect(worker).to receive(:threaded_start).exactly(num_threads).times

expect(worker.instance_variable_get(:@threads).length).to eq(0)
worker.start

expect(worker.instance_variable_get(:@threads).length).to eq(2)
expect(worker.instance_variable_get(:@mutex)).to have_received(:synchronize).twice
expect(worker.instance_variable_get(:@threads).length).to eq(num_threads)
end

it 'logs the start and shutdown messages' do
expect(worker).to receive(:say).with('Starting threaded delayed worker with 2 threads')
expect(worker).to receive(:say).with("Starting threaded delayed worker with #{num_threads} threads")
worker.start
end

Expand All @@ -77,6 +70,11 @@
allow(Thread.current).to receive(:[]).with(:thread_name).and_return('some-thread-name')
expect(worker.name).to eq('instance_name some-thread-name')
end

it 'returns the instance name if thread name is not set' do
allow(Thread.current).to receive(:[]).with(:thread_name).and_return(nil)
expect(worker.name).to eq(worker_name)
end
end

describe '#stop' do
Expand All @@ -90,56 +88,44 @@

it 'sets the exit flag in the parent worker' do
worker.stop
sleep 0.1 until worker.instance_variable_get(:@exit)
sleep 0.1 until worker.instance_variable_defined?(:@exit)
expect(worker.instance_variable_get(:@exit)).to be true
end

it 'allows threads to finish their work without being killed prematurely' do
allow(worker).to receive(:threaded_start) do
5.times do
break if worker.instance_variable_get(:@exit)

sleep 0.5
end
5.times { sleep grace_period_seconds / 2 until worker.instance_variable_get(:@exit) == true }
end

worker_thread = Thread.new { worker.start }
sleep(0.5)
expect(worker.instance_variable_get(:@threads).all?(&:alive?)).to be true
sleep 0.1 until worker.instance_variable_get(:@threads).length == num_threads && worker.instance_variable_get(:@threads).all?(&:alive?)
worker.instance_variable_get(:@threads).each { |t| allow(t).to receive(:kill).and_call_original }

Thread.new { worker.stop }.join
worker.instance_variable_get(:@threads).each(&:join)
expect(worker.instance_variable_get(:@threads).all?(&:alive?)).to be false
worker_thread.join
worker.instance_variable_get(:@threads).each { |t| expect(t).not_to have_received(:kill) }
end

it 'kills threads that exceed the grace period during shutdown' do
worker = ThreadedWorker.new(num_threads, {}, 3)
allow(worker).to receive(:threaded_start) do
10.times do
break if worker.instance_variable_get(:@exit)

sleep 4
end
10.times { sleep grace_period_seconds * 2 until worker.instance_variable_get(:@exit) == true }
end

worker_thread = Thread.new { worker.start }
sleep(0.5)
expect(worker.instance_variable_get(:@threads).all?(&:alive?)).to be true
sleep 0.1 until worker.instance_variable_get(:@threads).length == num_threads && worker.instance_variable_get(:@threads).all?(&:alive?)
worker.instance_variable_get(:@threads).each { |t| allow(t).to receive(:kill).and_call_original }

Thread.new { worker.stop }.join
worker.instance_variable_get(:@threads).each(&:join)
expect(worker.instance_variable_get(:@threads).all?(&:alive?)).to be false
worker_thread.join
expect(worker.instance_variable_get(:@threads)).to all(have_received(:kill))
end
end

describe '#threaded_start' do
before do
allow(worker).to receive(:work_off).and_return([5, 2])
allow(worker).to receive(:sleep)
allow(worker).to receive(:stop?).and_return(false, true)
allow(worker).to receive(:reload!).and_call_original
end
Expand Down

0 comments on commit 496be60

Please sign in to comment.