Skip to content

Commit

Permalink
notifier: respawn workers if they're all dead
Browse files Browse the repository at this point in the history
Potentially fixes airbrake/airbrake#463
(falling back to sync delivery because there are no running async
workers)

I noticed I can reliably reproduce this message when I invoke the Rails
console and manually send a notice via `Airbrake.notify`. It happens
because by default Rails launches [Spring][1], which preloads the app
by using the `fork()` system call. And when that happens, as we know,
our worker threads die.

The fix is simply to respawn workers. It works reliably with the Rails
console and I assume it will fix the above mentioned issue.

[1]: https://github.com/rails/spring/
  • Loading branch information
kyrylo committed Jan 13, 2016
1 parent e6e73db commit 116f9cb
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 92 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ Airbrake Ruby Changelog
([#25](https://github.com/airbrake/airbrake-ruby/pull/25))
* Made sure that generated notices always have a backtrace
([#21](https://github.com/airbrake/airbrake-ruby/pull/21))
* Made the asynchronous delivery mechanism more robust
([#26](https://github.com/airbrake/airbrake-ruby/pull/26))

### [v1.0.2][v1.0.2] (January 3, 2016)

Expand Down
23 changes: 19 additions & 4 deletions lib/airbrake-ruby/async_sender.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,7 @@ def initialize(config)
@unsent = SizedQueue.new(config.queue_size)
@sender = SyncSender.new(config)
@closed = false
@workers = ThreadGroup.new

config.workers.times { @workers.add(spawn_worker) }
@workers.enclose
@workers = nil
end

##
Expand All @@ -41,6 +38,11 @@ def close
raise Airbrake::Error, 'attempted to close already closed sender'
end

unless @workers
@closed = true
return
end

unless @unsent.empty?
msg = "#{LOG_LABEL} waiting to send #{@unsent.size} unsent notice(s)..."
@config.logger.debug(msg + ' (Ctrl-C to abort)')
Expand Down Expand Up @@ -74,9 +76,22 @@ def closed?
# @return [Boolean] true if an instance wasn't closed, but has no workers
# @see https://goo.gl/oydz8h Example of at_exit that prevents exit
def has_workers?
return false unless @workers
!@closed && @workers.list.any?
end

##
# Spawns N workers (according to the config option) and creates a new
# enclosed ThreadGroup.
#
# @since v1.0.3
# @return [void]
def spawn_workers
@workers = ThreadGroup.new
@config.workers.times { @workers.add(spawn_worker) }
@workers.enclose
end

private

def spawn_worker
Expand Down
24 changes: 15 additions & 9 deletions lib/airbrake-ruby/notifier.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ def initialize(user_config)
@filter_chain = FilterChain.new(@config)
@async_sender = AsyncSender.new(@config)
@sync_sender = SyncSender.new(@config)
@pid = nil
@workers_spawned = false
end

##
Expand All @@ -42,6 +44,12 @@ def initialize(user_config)
##
# @macro see_public_api_method
def notify(exception, params = {})
if !@workers_spawned || @pid != Process.pid
@pid = Process.pid
@async_sender.spawn_workers
@workers_spawned = true
end

send_notice(exception, params)
nil
end
Expand Down Expand Up @@ -127,15 +135,13 @@ def send_notice(exception, params, sender = default_sender)
end

def default_sender
if @async_sender.has_workers?
@async_sender
else
@config.logger.warn(
"#{LOG_LABEL} falling back to sync delivery because there are no " \
"running async workers"
)
@sync_sender
end
return @async_sender if @async_sender.has_workers?

@config.logger.warn(
"#{LOG_LABEL} falling back to sync delivery because there are no " \
"running async workers"
)
@sync_sender
end

def clean_backtrace
Expand Down
132 changes: 74 additions & 58 deletions spec/async_sender_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,68 +3,23 @@
RSpec.describe Airbrake::AsyncSender do
before do
stub_request(:post, /.*/).to_return(status: 201, body: '{}')
@sender = described_class.new(Airbrake::Config.new)
@workers = @sender.instance_variable_get(:@workers)
end

describe "#new" do
context "workers_count parameter" do
let(:new_workers) { 5 }
let(:config) { Airbrake::Config.new(workers: new_workers) }

it "spawns alive threads in an enclosed ThreadGroup" do
expect(@workers).to be_a(ThreadGroup)
expect(@workers.list).to all(be_alive)
expect(@workers).to be_enclosed
end

it "controls the number of spawned threads" do
expect(@workers.list.size).to eq(1)

sender = described_class.new(config)
workers = sender.instance_variable_get(:@workers)

expect(workers.list.size).to eq(new_workers)
sender.close
end
end

context "queue" do
before do
@stdout = StringIO.new
end

let(:notices) { 1000 }

let(:config) do
Airbrake::Config.new(logger: Logger.new(@stdout), workers: 3, queue_size: 10)
end

it "limits the size of the queue, but still sends all notices" do
sender = described_class.new(config)

notices.times { |i| sender.send(i) }
sender.close

log = @stdout.string.split("\n")
expect(log.grep(/\*\*Airbrake: \{\}/).size).to eq(notices)
end
end
end

describe "#close" do
before do
@stderr = StringIO.new
config = Airbrake::Config.new(logger: Logger.new(@stderr))
@sender = described_class.new(config)
@workers = @sender.instance_variable_get(:@workers).list
@sender.spawn_workers
end

context "when there are no unsent notices" do
it "joins the spawned thread" do
expect(@workers).to all(be_alive)
workers = @sender.instance_variable_get(:@workers).list

expect(workers).to all(be_alive)
@sender.close
expect(@workers).to all(be_stop)
expect(workers).to all(be_stop)
end
end

Expand All @@ -91,31 +46,92 @@

context "when it was already closed" do
it "doesn't increase the unsent queue size" do
@sender.close
begin
@sender.close
rescue Airbrake::Error
nil
end

expect(@sender.instance_variable_get(:@unsent).size).to be_zero
end

it "raises error" do
@sender.close

expect(@sender).to be_closed
expect { @sender.close }.
to raise_error(Airbrake::Error, 'attempted to close already closed sender')
end
end

context "when workers were not spawned" do
it "correctly closes the notifier nevertheless" do
sender = described_class.new(Airbrake::Config.new)
sender.close

expect(sender).to be_closed
end
end
end

describe "#has_workers?" do
it "returns false when the sender is not closed, but has 0 workers" do
sender = described_class.new(Airbrake::Config.new)
expect(sender.has_workers?).to be_truthy
before do
@sender = described_class.new(Airbrake::Config.new)
@sender.spawn_workers
expect(@sender.has_workers?).to be_truthy
end

sender.instance_variable_get(:@workers).list.each(&:kill)
it "returns false when the sender is not closed, but has 0 workers" do
@sender.instance_variable_get(:@workers).list.each(&:kill)
sleep 1
expect(sender.has_workers?).to be_falsey
expect(@sender.has_workers?).to be_falsey
end

it "returns false when the sender is closed" do
@sender.close
expect(@sender.has_workers?).to be_falsey
end
end

describe "#spawn_workers" do
it "spawns alive threads in an enclosed ThreadGroup" do
sender = described_class.new(Airbrake::Config.new)
expect(sender.has_workers?).to be_truthy
sender.spawn_workers

workers = sender.instance_variable_get(:@workers)

expect(workers).to be_a(ThreadGroup)
expect(workers.list).to all(be_alive)
expect(workers).to be_enclosed

sender.close
expect(sender.has_workers?).to be_falsey
end

it "spawns exactly config.workers workers" do
workers_count = 5
sender = described_class.new(Airbrake::Config.new(workers: workers_count))
sender.spawn_workers

workers = sender.instance_variable_get(:@workers)

expect(workers.list.size).to eq(workers_count)
sender.close
end

it "limits the size of the queue, but still sends all notices" do
stdout = StringIO.new
notices_count = 1000
config = Airbrake::Config.new(
logger: Logger.new(stdout), workers: 3, queue_size: 10
)
sender = described_class.new(config)
sender.spawn_workers

notices_count.times { |i| sender.send(i) }
sender.close

log = stdout.string.split("\n")
expect(log.grep(/\*\*Airbrake: \{\}/).size).to eq(notices_count)
end
end
end
68 changes: 47 additions & 21 deletions spec/notifier_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -375,18 +375,53 @@ def to_json(*)

it "falls back to synchronous delivery when the async sender is dead" do
out = StringIO.new
notifier = described_class.new(airbrake_params.merge(logger: Logger.new(out)))
async_sender = notifier.instance_variable_get(:@async_sender)

airbrake = described_class.new(airbrake_params.merge(logger: Logger.new(out)))
airbrake.
instance_variable_get(:@async_sender).
instance_variable_get(:@workers).
list.
each(&:kill)
notifier.notify('bingo')
expect(async_sender.has_workers?).to be_truthy

async_sender.instance_variable_get(:@workers).list.each(&:kill)
sleep 1
expect(async_sender.has_workers?).to be_falsey

expect(airbrake.notify('bingo')).to be_nil
notifier.notify('bango')
expect(out.string).to match(/falling back to sync delivery/)

notifier.close
end

it "spawns workers on first invocation" do
out = StringIO.new
notifier = described_class.new(airbrake_params.merge(logger: Logger.new(out)))
async_sender = notifier.instance_variable_get(:@async_sender)

expect(async_sender.has_workers?).to be_falsey
notifier.notify('bingo')
sleep 1
expect(async_sender.has_workers?).to be_truthy

expect(out.string).not_to match(/falling back to sync delivery/)
notifier.close
end

it "respawns workers on fork()", skip: RUBY_ENGINE == 'jruby' do
notifier = described_class.new(airbrake_params)
async_sender = notifier.instance_variable_get(:@async_sender)

notifier.notify('bingo')
sleep 1
expect(async_sender.has_workers?).to be_truthy

pid = fork do
expect(async_sender.has_workers?).to be_falsey
notifier.notify('bango')
sleep 1
expect(async_sender.has_workers?).to be_truthy
end

Process.wait(pid)
notifier.close
end
end

Expand Down Expand Up @@ -671,25 +706,16 @@ def to_json(*)
end

describe "#close" do
shared_examples 'close' do |method|
context "when using #notify on a closed notifier" do
it "raises error" do
@airbrake.close
expect { method.call(@airbrake) }.
notifier = described_class.new(airbrake_params)
notifier.close

expect { notifier.notify(AirbrakeTestError.new) }.
to raise_error(Airbrake::Error, /closed Airbrake instance/)
end
end

context "when using #notify" do
include_examples 'close', proc { |a| a.notify(AirbrakeTestError.new) }
end

context "when using #send_notice" do
include_examples 'close', proc { |a|
notice = a.build_notice(AirbrakeTestError.new)
a.send_notice(notice)
}
end

context "at program exit when it was closed manually" do
it "doesn't raise error", skip: RUBY_ENGINE == 'jruby' do
expect do
Expand Down

0 comments on commit 116f9cb

Please sign in to comment.