diff --git a/.rubocop.yml b/.rubocop.yml index 3967bafc..ab643f3d 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -39,6 +39,7 @@ Style/SignalException: Naming/PredicateName: Exclude: - 'lib/airbrake-ruby/async_sender.rb' + - 'lib/airbrake-ruby/thread_pool.rb' Metrics/ClassLength: Max: 120 diff --git a/lib/airbrake-ruby.rb b/lib/airbrake-ruby.rb index 74c01337..83fa75ff 100644 --- a/lib/airbrake-ruby.rb +++ b/lib/airbrake-ruby.rb @@ -12,6 +12,7 @@ require 'airbrake-ruby/config' require 'airbrake-ruby/config/validator' require 'airbrake-ruby/promise' +require 'airbrake-ruby/thread_pool' require 'airbrake-ruby/sync_sender' require 'airbrake-ruby/async_sender' require 'airbrake-ruby/response' diff --git a/lib/airbrake-ruby/async_sender.rb b/lib/airbrake-ruby/async_sender.rb index 4319d27e..e24a8a9a 100644 --- a/lib/airbrake-ruby/async_sender.rb +++ b/lib/airbrake-ruby/async_sender.rb @@ -1,7 +1,5 @@ module Airbrake - # Responsible for sending notices to Airbrake asynchronously. The class - # supports an unlimited number of worker threads and an unlimited queue size - # (both values are configurable). + # Responsible for sending notices to Airbrake asynchronously. # # @see SyncSender # @api private @@ -15,25 +13,15 @@ class AsyncSender "and the following notice will not be delivered " \ "Error: %s - %s\nBacktrace: %s\n".freeze - # @return [ThreadGroup] the list of workers - # @note This is exposed for eaiser unit testing - # @since v4.0.0 - attr_reader :workers - - # @return [Array<[Airbrake::Notice,Airbrake::Promise]>] the list of unsent - # payload - # @note This is exposed for eaiser unit testing - # @since v4.0.0 - attr_reader :unsent - def initialize @config = Airbrake::Config.instance - @unsent = SizedQueue.new(Airbrake::Config.instance.queue_size) - @sender = SyncSender.new - @closed = false - @workers = ThreadGroup.new - @mutex = Mutex.new - @pid = nil + + sender = SyncSender.new + @thread_pool = ThreadPool.new( + worker_size: @config.workers, + queue_size: @config.queue_size, + block: proc { |(notice, promise)| sender.send(notice, promise) } + ) end # Asynchronously sends a notice to Airbrake. @@ -42,85 +30,27 @@ def initialize # library # @return [Airbrake::Promise] def send(notice, promise) - return will_not_deliver(notice, promise) if @unsent.size >= @unsent.max - - @unsent << [notice, promise] + return will_not_deliver(notice, promise) unless @thread_pool << [notice, promise] promise end - # Closes the instance making it a no-op (it shut downs all worker - # threads). Before closing, waits on all unsent notices to be sent. - # # @return [void] - # @raise [Airbrake::Error] when invoked more than one time def close - threads = @mutex.synchronize do - raise Airbrake::Error, 'attempted to close already closed sender' if closed? - - unless @unsent.empty? - msg = "#{LOG_LABEL} waiting to send #{@unsent.size} unsent notice(s)..." - logger.debug(msg + ' (Ctrl-C to abort)') - end - - @config.workers.times { @unsent << [:stop, Airbrake::Promise.new] } - @closed = true - @workers.list.dup - end - - threads.each(&:join) - logger.debug("#{LOG_LABEL} closed") + @thread_pool.close end - # Checks whether the sender is closed and thus usable. # @return [Boolean] def closed? - @closed + @thread_pool.closed? end - # Checks if an active sender has any workers. A sender doesn't have any - # workers only in two cases: when it was closed or when all workers - # crashed. An *active* sender doesn't have any workers only when something - # went wrong. - # - # Workers are expected to crash when you +fork+ the process the workers are - # living in. In this case we detect a +fork+ and try to revive them here. - # - # Another possible scenario that crashes workers is when you close the - # instance on +at_exit+, but some other +at_exit+ hook prevents the process - # from exiting. - # - # @return [Boolean] true if an instance wasn't closed, but has no workers - # @see https://goo.gl/oydz8h Example of at_exit that prevents exit + # @return [Boolean] def has_workers? - @mutex.synchronize do - return false if @closed - - if @pid != Process.pid && @workers.list.empty? - @pid = Process.pid - spawn_workers - end - - !@closed && @workers.list.any? - end + @thread_pool.has_workers? end private - def spawn_workers - @workers = ThreadGroup.new - @config.workers.times { @workers.add(spawn_worker) } - @workers.enclose - end - - def spawn_worker - Thread.new do - while (message = @unsent.pop) - break if message.first == :stop - @sender.send(*message) - end - end - end - def will_not_deliver(notice, promise) error = notice[:errors].first @@ -128,7 +58,7 @@ def will_not_deliver(notice, promise) format( WILL_NOT_DELIVER_MSG, log_label: LOG_LABEL, - capacity: @unsent.max, + capacity: @config.queue_size, type: error[:type], message: error[:message], backtrace: error[:backtrace].map do |line| @@ -136,7 +66,7 @@ def will_not_deliver(notice, promise) end.join("\n") ) ) - promise.reject("AsyncSender has reached its capacity of #{@unsent.max}") + promise.reject("AsyncSender has reached its capacity of #{@config.queue_size}") end end end diff --git a/lib/airbrake-ruby/thread_pool.rb b/lib/airbrake-ruby/thread_pool.rb new file mode 100644 index 00000000..493076d1 --- /dev/null +++ b/lib/airbrake-ruby/thread_pool.rb @@ -0,0 +1,128 @@ +module Airbrake + # ThreadPool implements a simple thread pool that can configure the number of + # worker threads and the size of the queue to process. + # + # @example + # # Initialize a new thread pool with 5 workers and a queue size of 100. Set + # # the block to be run concurrently. + # thread_pool = ThreadPool.new( + # worker_size: 5, + # queue_size: 100, + # block: proc { |message| print "ECHO: #{message}..."} + # ) + # + # # Send work. + # 10.times { |i| thread_pool << i } + # #=> ECHO: 0...ECHO: 1...ECHO: 2... + # + # @api private + # @since v4.6.1 + class ThreadPool + include Loggable + + # @return [ThreadGroup] the list of workers + # @note This is exposed for eaiser unit testing + attr_reader :workers + + def initialize(worker_size:, queue_size:, block:) + @worker_size = worker_size + @queue_size = queue_size + @block = block + + @queue = SizedQueue.new(queue_size) + @workers = ThreadGroup.new + @mutex = Mutex.new + @pid = nil + @closed = false + + has_workers? + end + + # Adds a new message to the thread pool. Rejects messages if the queue is at + # its capacity. + # + # @param [Object] message The message that gets passed to the block + # @return [Boolean] true if the message was successfully sent to the pool, + # false if the queue is full + def <<(message) + return false if backlog >= @queue_size + @queue << message + true + end + + # @return [Integer] how big the queue is at the moment + def backlog + @queue.size + end + + # Checks if a thread pool has any workers. A thread pool doesn't have any + # workers only in two cases: when it was closed or when all workers + # crashed. An *active* thread pool doesn't have any workers only when + # something went wrong. + # + # Workers are expected to crash when you +fork+ the process the workers are + # living in. In this case we detect a +fork+ and try to revive them here. + # + # Another possible scenario that crashes workers is when you close the + # instance on +at_exit+, but some other +at_exit+ hook prevents the process + # from exiting. + # + # @return [Boolean] true if an instance wasn't closed, but has no workers + # @see https://goo.gl/oydz8h Example of at_exit that prevents exit + def has_workers? + @mutex.synchronize do + return false if @closed + + if @pid != Process.pid && @workers.list.empty? + @pid = Process.pid + spawn_workers + end + + !@closed && @workers.list.any? + end + end + + # Closes the thread pool making it a no-op (it shut downs all worker + # threads). Before closing, waits on all unprocessed tasks to be processed. + # + # @return [void] + # @raise [Airbrake::Error] when invoked more than one time + def close + threads = @mutex.synchronize do + raise Airbrake::Error, 'this thread pool is closed already' if @closed + + unless @queue.empty? + msg = "#{LOG_LABEL} waiting to process #{@queue.size} task(s)..." + logger.debug(msg + ' (Ctrl-C to abort)') + end + + @worker_size.times { @queue << :stop } + @closed = true + @workers.list.dup + end + + threads.each(&:join) + logger.debug("#{LOG_LABEL} thread pool closed") + end + + def closed? + @closed + end + + def spawn_workers + @worker_size.times { @workers.add(spawn_worker) } + @workers.enclose + end + + private + + def spawn_worker + Thread.new do + while (message = @queue.pop) + break if message == :stop + @block.call(message) + end + end + end + end +end diff --git a/spec/async_sender_spec.rb b/spec/async_sender_spec.rb index d39dceb1..8a32caf3 100644 --- a/spec/async_sender_spec.rb +++ b/spec/async_sender_spec.rb @@ -8,148 +8,65 @@ Airbrake::Config.instance = Airbrake::Config.new( project_id: '1', workers: 3, - queue_size: queue_size + queue_size: 10 ) - - allow(Airbrake::Loggable.instance).to receive(:debug) - expect(subject).to have_workers end describe "#send" do - it "sends payload to Airbrake" do - 2.times do - subject.send(notice, Airbrake::Promise.new) - end - subject.close - - expect(a_request(:post, endpoint)).to have_been_made.twice - end - - context "when the queue is full" do - before do - allow(subject.unsent).to receive(:size).and_return(queue_size) - end - - it "discards payload" do - 200.times do - subject.send(notice, Airbrake::Promise.new) - end + context "when sender has the capacity to send" do + it "sends notices to Airbrake" do + 2.times { subject.send(notice, Airbrake::Promise.new) } subject.close - expect(a_request(:post, endpoint)).not_to have_been_made + expect(a_request(:post, endpoint)).to have_been_made.twice end - it "logs discarded payload" do - expect(Airbrake::Loggable.instance).to receive(:error).with( - /reached its capacity/ - ).exactly(15).times - - 15.times do - subject.send(notice, Airbrake::Promise.new) - end - subject.close - end - - it "returns a rejected promise" do + it "returns a resolved promise" do promise = Airbrake::Promise.new - 200.times { subject.send(notice, promise) } - expect(promise.value).to eq( - 'error' => "AsyncSender has reached its capacity of #{queue_size}" - ) - end - end - end - - describe "#close" do - context "when there are no unsent notices" do - it "joins the spawned thread" do - workers = subject.workers.list - expect(workers).to all(be_alive) - + subject.send(notice, promise) subject.close - expect(workers).to all(be_stop) + + expect(promise).to be_resolved end end - context "when there are some unsent notices" do - it "logs how many notices are left to send" do - expect(Airbrake::Loggable.instance).to receive(:debug).with( - /waiting to send \d+ unsent notice\(s\)/ + context "when sender has exceeded the capacity to send" do + before do + Airbrake::Config.instance = Airbrake::Config.new( + project_id: '1', + workers: 0, + queue_size: 1 ) - expect(Airbrake::Loggable.instance).to receive(:debug).with(/closed/) - - 300.times { subject.send(notice, Airbrake::Promise.new) } - subject.close end - it "waits until the unsent notices queue is empty" do + it "doesn't send the exceeded notices to Airbrake" do + 15.times { subject.send(notice, Airbrake::Promise.new) } subject.close - expect(subject.unsent.size).to be_zero - end - end - context "when it was already closed" do - it "doesn't increase the unsent queue size" do - begin - subject.close - rescue Airbrake::Error - nil - end - - expect(subject.unsent.size).to be_zero + expect(a_request(:post, endpoint)).not_to have_been_made end - it "raises error" do + it "returns a rejected promise" do + promise = nil + 15.times do + promise = subject.send(notice, Airbrake::Promise.new) + end subject.close - expect(subject).to be_closed - expect { subject.close }.to raise_error( - Airbrake::Error, 'attempted to close already closed sender' + expect(promise).to be_rejected + expect(promise.value).to eq( + 'error' => "AsyncSender has reached its capacity of 1" ) end - end - context "when workers were not spawned" do - it "correctly closes the notifier nevertheless" do - subject.close - expect(subject).to be_closed - end - end - end + it "logs discarded notice" do + expect(Airbrake::Loggable.instance).to receive(:error).with( + /reached its capacity/ + ).at_least(:once) - describe "#has_workers?" do - it "returns false when the sender is not closed, but has 0 workers" do - subject.workers.list.each do |worker| - worker.kill.join + 15.times { subject.send(notice, Airbrake::Promise.new) } + subject.close end - expect(subject).not_to have_workers - end - - it "returns false when the sender is closed" do - subject.close - expect(subject).not_to have_workers - end - - it "respawns workers on fork()", skip: %w[jruby rbx].include?(RUBY_ENGINE) do - pid = fork { expect(subject).to have_workers } - Process.wait(pid) - subject.close - expect(subject).not_to have_workers - end - end - - describe "#spawn_workers" do - it "spawns alive threads in an enclosed ThreadGroup" do - expect(subject.workers).to be_a(ThreadGroup) - expect(subject.workers.list).to all(be_alive) - expect(subject.workers).to be_enclosed - - subject.close - end - - it "spawns exactly config.workers workers" do - expect(subject.workers.list.size).to eq(Airbrake::Config.instance.workers) - subject.close end end end diff --git a/spec/thread_pool_spec.rb b/spec/thread_pool_spec.rb new file mode 100644 index 00000000..2a821571 --- /dev/null +++ b/spec/thread_pool_spec.rb @@ -0,0 +1,158 @@ +RSpec.describe Airbrake::ThreadPool do + let(:tasks) { [] } + let(:worker_size) { 1 } + let(:queue_size) { 2 } + + subject do + described_class.new( + worker_size: worker_size, + queue_size: queue_size, + block: proc { |message| tasks << message } + ) + end + + describe "#<<" do + it "returns true" do + retval = subject << 1 + subject.close + expect(retval).to eq(true) + end + + it "performs work in background" do + subject << 2 + subject << 1 + subject.close + + expect(tasks).to eq([2, 1]) + end + + context "when the queue is full" do + before do + allow(subject).to receive(:backlog).and_return(queue_size) + end + + subject do + described_class.new( + worker_size: 1, + queue_size: 1, + block: proc { |message| tasks << message } + ) + end + + it "returns false" do + retval = subject << 1 + subject.close + expect(retval).to eq(false) + end + + it "discards tasks" do + 200.times { subject << 1 } + subject.close + + expect(tasks.size).to be_zero + end + end + end + + describe "#backlog" do + let(:worker_size) { 0 } + + it "returns the size of the queue" do + subject << 1 + expect(subject.backlog).to eq(1) + end + end + + describe "#has_workers?" do + it "returns false when the thread pool is not closed, but has 0 workers" do + subject.workers.list.each do |worker| + worker.kill.join + end + expect(subject).not_to have_workers + end + + it "returns false when the thread pool is closed" do + subject.close + expect(subject).not_to have_workers + end + + it "respawns workers on fork()", skip: %w[jruby].include?(RUBY_ENGINE) do + pid = fork { expect(subject).to have_workers } + Process.wait(pid) + subject.close + expect(subject).not_to have_workers + end + end + + describe "#close" do + context "when there's no work to do" do + it "joins the spawned thread" do + workers = subject.workers.list + expect(workers).to all(be_alive) + + subject.close + expect(workers).to all(be_stop) + end + end + + context "when there's some work to do" do + it "logs how many tasks are left to process" do + thread_pool = described_class.new( + worker_size: 0, queue_size: 2, block: proc {} + ) + + expect(Airbrake::Loggable.instance).to receive(:debug).with( + /waiting to process \d+ task\(s\)/ + ) + expect(Airbrake::Loggable.instance).to receive(:debug).with(/closed/) + + 2.times { thread_pool << 1 } + thread_pool.close + end + + it "waits until the queue gets empty" do + thread_pool = described_class.new( + worker_size: 1, queue_size: 2, block: proc {} + ) + + 10.times { subject << 1 } + thread_pool.close + expect(thread_pool.backlog).to be_zero + end + end + + context "when it was already closed" do + it "doesn't increase the queue size" do + begin + subject.close + rescue Airbrake::Error + nil + end + + expect(subject.backlog).to be_zero + end + + it "raises error" do + subject.close + expect { subject.close }.to raise_error( + Airbrake::Error, 'this thread pool is closed already' + ) + end + end + end + + describe "#spawn_workers" do + it "spawns alive threads in an enclosed ThreadGroup" do + expect(subject.workers).to be_a(ThreadGroup) + expect(subject.workers.list).to all(be_alive) + expect(subject.workers).to be_enclosed + + subject.close + end + + it "spawns exactly `workers_size` workers" do + expect(subject.workers.list.size).to eq(worker_size) + subject.close + end + end +end