From ff88d3f2fe76b6198b7436b10a5e29b4ca3fd2f4 Mon Sep 17 00:00:00 2001 From: Kyrylo Silin Date: Wed, 13 Jan 2016 14:45:38 +0200 Subject: [PATCH] notifier: respawn workers if they're all dead Potentially fixes https://github.com/airbrake/airbrake/issues/463 (falling back to sync delivery because there are no running async workers) I noticed I can reliably reproduce this message when I invoke the Rails console and manually send a notice via `Airbrake.notify`. It happens because by default Rails launches [Spring][1], which preloads the app by using the `fork()` system call. And when that happens, as we know, our worker threads die. The fix is simply to respawn workers. It works reliably with the Rails console and I assume it will fix the above mentioned issue. [1]: https://github.com/rails/spring/ --- CHANGELOG.md | 2 + lib/airbrake-ruby/async_sender.rb | 25 ++++-- lib/airbrake-ruby/notifier.rb | 16 ++-- spec/async_sender_spec.rb | 125 +++++++++++++++++------------- spec/notifier_spec.rb | 57 ++++++++------ 5 files changed, 134 insertions(+), 91 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 31e40fc6..924c217e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ Airbrake Ruby Changelog ([#25](https://github.com/airbrake/airbrake-ruby/pull/25)) * Made sure that generated notices always have a backtrace ([#21](https://github.com/airbrake/airbrake-ruby/pull/21)) +* Made the asynchronous delivery mechanism more robust + ([#26](https://github.com/airbrake/airbrake-ruby/pull/26)) ### [v1.0.2][v1.0.2] (January 3, 2016) diff --git a/lib/airbrake-ruby/async_sender.rb b/lib/airbrake-ruby/async_sender.rb index c79d2047..bd1a0d8f 100644 --- a/lib/airbrake-ruby/async_sender.rb +++ b/lib/airbrake-ruby/async_sender.rb @@ -14,9 +14,7 @@ def initialize(config) @sender = SyncSender.new(config) @closed = false @workers = ThreadGroup.new - - config.workers.times { @workers.add(spawn_worker) } - @workers.enclose + @pid = nil end ## @@ -67,18 +65,33 @@ def closed? # went wrong. # # Workers are expected to crash when you +fork+ the process the workers are - # living in. Another possible scenario is when you close the instance on - # +at_exit+, but some other +at_exit+ hook prevents the process from - # exiting. + # living in. In this case we detect a +fork+ and try to revive them here. + # + # Another possible scenario that crashes workers is when you close the + # instance on +at_exit+, but some other +at_exit+ hook prevents the process + # from exiting. # # @return [Boolean] true if an instance wasn't closed, but has no workers # @see https://goo.gl/oydz8h Example of at_exit that prevents exit def has_workers? + return false if @closed + + if @pid != Process.pid && @workers.list.empty? + @pid = Process.pid + spawn_workers + end + !@closed && @workers.list.any? end private + def spawn_workers + @workers = ThreadGroup.new + @config.workers.times { @workers.add(spawn_worker) } + @workers.enclose + end + def spawn_worker Thread.new do while (notice = @unsent.pop) != :stop diff --git a/lib/airbrake-ruby/notifier.rb b/lib/airbrake-ruby/notifier.rb index a1244fd2..87701aa6 100644 --- a/lib/airbrake-ruby/notifier.rb +++ b/lib/airbrake-ruby/notifier.rb @@ -127,15 +127,13 @@ def send_notice(exception, params, sender = default_sender) end def default_sender - if @async_sender.has_workers? - @async_sender - else - @config.logger.warn( - "#{LOG_LABEL} falling back to sync delivery because there are no " \ - "running async workers" - ) - @sync_sender - end + return @async_sender if @async_sender.has_workers? + + @config.logger.warn( + "#{LOG_LABEL} falling back to sync delivery because there are no " \ + "running async workers" + ) + @sync_sender end def clean_backtrace diff --git a/spec/async_sender_spec.rb b/spec/async_sender_spec.rb index f4087902..a00a6f54 100644 --- a/spec/async_sender_spec.rb +++ b/spec/async_sender_spec.rb @@ -3,52 +3,23 @@ RSpec.describe Airbrake::AsyncSender do before do stub_request(:post, /.*/).to_return(status: 201, body: '{}') - @sender = described_class.new(Airbrake::Config.new) - @workers = @sender.instance_variable_get(:@workers) end - describe "#new" do - context "workers_count parameter" do - let(:new_workers) { 5 } - let(:config) { Airbrake::Config.new(workers: new_workers) } - - it "spawns alive threads in an enclosed ThreadGroup" do - expect(@workers).to be_a(ThreadGroup) - expect(@workers.list).to all(be_alive) - expect(@workers).to be_enclosed - end - - it "controls the number of spawned threads" do - expect(@workers.list.size).to eq(1) - - sender = described_class.new(config) - workers = sender.instance_variable_get(:@workers) - - expect(workers.list.size).to eq(new_workers) - sender.close - end - end - - context "queue" do - before do - @stdout = StringIO.new - end - - let(:notices) { 1000 } - - let(:config) do - Airbrake::Config.new(logger: Logger.new(@stdout), workers: 3, queue_size: 10) - end - - it "limits the size of the queue, but still sends all notices" do - sender = described_class.new(config) - - notices.times { |i| sender.send(i) } - sender.close + describe "#send" do + it "limits the size of the queue, but still sends all notices" do + stdout = StringIO.new + notices_count = 1000 + config = Airbrake::Config.new( + logger: Logger.new(stdout), workers: 3, queue_size: 10 + ) + sender = described_class.new(config) + expect(sender).to have_workers + + notices_count.times { |i| sender.send(i) } + sender.close - log = @stdout.string.split("\n") - expect(log.grep(/\*\*Airbrake: \{\}/).size).to eq(notices) - end + log = stdout.string.split("\n") + expect(log.grep(/\*\*Airbrake: \{\}/).size).to eq(notices_count) end end @@ -57,14 +28,16 @@ @stderr = StringIO.new config = Airbrake::Config.new(logger: Logger.new(@stderr)) @sender = described_class.new(config) - @workers = @sender.instance_variable_get(:@workers).list + expect(@sender).to have_workers end context "when there are no unsent notices" do it "joins the spawned thread" do - expect(@workers).to all(be_alive) + workers = @sender.instance_variable_get(:@workers).list + + expect(workers).to all(be_alive) @sender.close - expect(@workers).to all(be_stop) + expect(workers).to all(be_stop) end end @@ -91,31 +64,75 @@ context "when it was already closed" do it "doesn't increase the unsent queue size" do - @sender.close + begin + @sender.close + rescue Airbrake::Error + nil + end + expect(@sender.instance_variable_get(:@unsent).size).to be_zero + end + it "raises error" do + @sender.close + + expect(@sender).to be_closed expect { @sender.close }. to raise_error(Airbrake::Error, 'attempted to close already closed sender') end end + + context "when workers were not spawned" do + it "correctly closes the notifier nevertheless" do + sender = described_class.new(Airbrake::Config.new) + sender.close + + expect(sender).to be_closed + end + end end describe "#has_workers?" do - it "returns false when the sender is not closed, but has 0 workers" do - sender = described_class.new(Airbrake::Config.new) - expect(sender.has_workers?).to be_truthy + before do + @sender = described_class.new(Airbrake::Config.new) + expect(@sender).to have_workers + end - sender.instance_variable_get(:@workers).list.each(&:kill) + it "returns false when the sender is not closed, but has 0 workers" do + @sender.instance_variable_get(:@workers).list.each(&:kill) sleep 1 - expect(sender.has_workers?).to be_falsey + expect(@sender).not_to have_workers end it "returns false when the sender is closed" do + @sender.close + expect(@sender).not_to have_workers + end + end + + describe "#spawn_workers" do + it "spawns alive threads in an enclosed ThreadGroup" do sender = described_class.new(Airbrake::Config.new) - expect(sender.has_workers?).to be_truthy + expect(sender).to have_workers + + workers = sender.instance_variable_get(:@workers) + + expect(workers).to be_a(ThreadGroup) + expect(workers.list).to all(be_alive) + expect(workers).to be_enclosed + + sender.close + end + + it "spawns exactly config.workers workers" do + workers_count = 5 + sender = described_class.new(Airbrake::Config.new(workers: workers_count)) + expect(sender).to have_workers + + workers = sender.instance_variable_get(:@workers) + expect(workers.list.size).to eq(workers_count) sender.close - expect(sender.has_workers?).to be_falsey end end end diff --git a/spec/notifier_spec.rb b/spec/notifier_spec.rb index f6cc4ab9..9d6d9ac2 100644 --- a/spec/notifier_spec.rb +++ b/spec/notifier_spec.rb @@ -375,18 +375,40 @@ def to_json(*) it "falls back to synchronous delivery when the async sender is dead" do out = StringIO.new + notifier = described_class.new(airbrake_params.merge(logger: Logger.new(out))) + async_sender = notifier.instance_variable_get(:@async_sender) - airbrake = described_class.new(airbrake_params.merge(logger: Logger.new(out))) - airbrake. - instance_variable_get(:@async_sender). - instance_variable_get(:@workers). - list. - each(&:kill) - + expect(async_sender).to have_workers + async_sender.instance_variable_get(:@workers).list.each(&:kill) sleep 1 + expect(async_sender).not_to have_workers - expect(airbrake.notify('bingo')).to be_nil + notifier.notify('bango') expect(out.string).to match(/falling back to sync delivery/) + + notifier.close + end + + it "respawns workers on fork()", skip: %w(jruby rbx).include?(RUBY_ENGINE) do + out = StringIO.new + notifier = described_class.new(airbrake_params.merge(logger: Logger.new(out))) + + notifier.notify('bingo', bingo: 'bango') + sleep 1 + expect(out.string).not_to match(/falling back to sync delivery/) + expect_a_request_with_body(/"bingo":"bango"/) + + pid = fork do + expect(notifier.instance_variable_get(:@async_sender)).to have_workers + notifier.notify('bango', bongo: 'bish') + sleep 1 + expect(out.string).not_to match(/falling back to sync delivery/) + expect_a_request_with_body(/"bingo":"bango"/) + end + + Process.wait(pid) + notifier.close + expect(notifier.instance_variable_get(:@async_sender)).not_to have_workers end end @@ -671,25 +693,16 @@ def to_json(*) end describe "#close" do - shared_examples 'close' do |method| + context "when using #notify on a closed notifier" do it "raises error" do - @airbrake.close - expect { method.call(@airbrake) }. + notifier = described_class.new(airbrake_params) + notifier.close + + expect { notifier.notify(AirbrakeTestError.new) }. to raise_error(Airbrake::Error, /closed Airbrake instance/) end end - context "when using #notify" do - include_examples 'close', proc { |a| a.notify(AirbrakeTestError.new) } - end - - context "when using #send_notice" do - include_examples 'close', proc { |a| - notice = a.build_notice(AirbrakeTestError.new) - a.send_notice(notice) - } - end - context "at program exit when it was closed manually" do it "doesn't raise error", skip: RUBY_ENGINE == 'jruby' do expect do