Skip to content

Commit

Permalink
Merge pull request #26 from airbrake/respawn-workers
Browse files Browse the repository at this point in the history
notifier: respawn workers if they're all dead
  • Loading branch information
kyrylo committed Jan 14, 2016
2 parents e6e73db + ff88d3f commit e1d70b3
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 91 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ Airbrake Ruby Changelog
([#25](https://github.com/airbrake/airbrake-ruby/pull/25))
* Made sure that generated notices always have a backtrace
([#21](https://github.com/airbrake/airbrake-ruby/pull/21))
* Made the asynchronous delivery mechanism more robust
([#26](https://github.com/airbrake/airbrake-ruby/pull/26))

### [v1.0.2][v1.0.2] (January 3, 2016)

Expand Down
25 changes: 19 additions & 6 deletions lib/airbrake-ruby/async_sender.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ def initialize(config)
@sender = SyncSender.new(config)
@closed = false
@workers = ThreadGroup.new

config.workers.times { @workers.add(spawn_worker) }
@workers.enclose
@pid = nil
end

##
Expand Down Expand Up @@ -67,18 +65,33 @@ def closed?
# went wrong.
#
# Workers are expected to crash when you +fork+ the process the workers are
# living in. Another possible scenario is when you close the instance on
# +at_exit+, but some other +at_exit+ hook prevents the process from
# exiting.
# living in. In this case we detect a +fork+ and try to revive them here.
#
# Another possible scenario that crashes workers is when you close the
# instance on +at_exit+, but some other +at_exit+ hook prevents the process
# from exiting.
#
# @return [Boolean] true if an instance wasn't closed, but has no workers
# @see https://goo.gl/oydz8h Example of at_exit that prevents exit
def has_workers?
return false if @closed

if @pid != Process.pid && @workers.list.empty?
@pid = Process.pid
spawn_workers
end

!@closed && @workers.list.any?
end

private

def spawn_workers
@workers = ThreadGroup.new
@config.workers.times { @workers.add(spawn_worker) }
@workers.enclose
end

def spawn_worker
Thread.new do
while (notice = @unsent.pop) != :stop
Expand Down
16 changes: 7 additions & 9 deletions lib/airbrake-ruby/notifier.rb
Original file line number Diff line number Diff line change
Expand Up @@ -127,15 +127,13 @@ def send_notice(exception, params, sender = default_sender)
end

def default_sender
if @async_sender.has_workers?
@async_sender
else
@config.logger.warn(
"#{LOG_LABEL} falling back to sync delivery because there are no " \
"running async workers"
)
@sync_sender
end
return @async_sender if @async_sender.has_workers?

@config.logger.warn(
"#{LOG_LABEL} falling back to sync delivery because there are no " \
"running async workers"
)
@sync_sender
end

def clean_backtrace
Expand Down
125 changes: 71 additions & 54 deletions spec/async_sender_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,52 +3,23 @@
RSpec.describe Airbrake::AsyncSender do
before do
stub_request(:post, /.*/).to_return(status: 201, body: '{}')
@sender = described_class.new(Airbrake::Config.new)
@workers = @sender.instance_variable_get(:@workers)
end

describe "#new" do
context "workers_count parameter" do
let(:new_workers) { 5 }
let(:config) { Airbrake::Config.new(workers: new_workers) }

it "spawns alive threads in an enclosed ThreadGroup" do
expect(@workers).to be_a(ThreadGroup)
expect(@workers.list).to all(be_alive)
expect(@workers).to be_enclosed
end

it "controls the number of spawned threads" do
expect(@workers.list.size).to eq(1)

sender = described_class.new(config)
workers = sender.instance_variable_get(:@workers)

expect(workers.list.size).to eq(new_workers)
sender.close
end
end

context "queue" do
before do
@stdout = StringIO.new
end

let(:notices) { 1000 }

let(:config) do
Airbrake::Config.new(logger: Logger.new(@stdout), workers: 3, queue_size: 10)
end

it "limits the size of the queue, but still sends all notices" do
sender = described_class.new(config)

notices.times { |i| sender.send(i) }
sender.close
describe "#send" do
it "limits the size of the queue, but still sends all notices" do
stdout = StringIO.new
notices_count = 1000
config = Airbrake::Config.new(
logger: Logger.new(stdout), workers: 3, queue_size: 10
)
sender = described_class.new(config)
expect(sender).to have_workers

notices_count.times { |i| sender.send(i) }
sender.close

log = @stdout.string.split("\n")
expect(log.grep(/\*\*Airbrake: \{\}/).size).to eq(notices)
end
log = stdout.string.split("\n")
expect(log.grep(/\*\*Airbrake: \{\}/).size).to eq(notices_count)
end
end

Expand All @@ -57,14 +28,16 @@
@stderr = StringIO.new
config = Airbrake::Config.new(logger: Logger.new(@stderr))
@sender = described_class.new(config)
@workers = @sender.instance_variable_get(:@workers).list
expect(@sender).to have_workers
end

context "when there are no unsent notices" do
it "joins the spawned thread" do
expect(@workers).to all(be_alive)
workers = @sender.instance_variable_get(:@workers).list

expect(workers).to all(be_alive)
@sender.close
expect(@workers).to all(be_stop)
expect(workers).to all(be_stop)
end
end

Expand All @@ -91,31 +64,75 @@

context "when it was already closed" do
it "doesn't increase the unsent queue size" do
@sender.close
begin
@sender.close
rescue Airbrake::Error
nil
end

expect(@sender.instance_variable_get(:@unsent).size).to be_zero
end

it "raises error" do
@sender.close

expect(@sender).to be_closed
expect { @sender.close }.
to raise_error(Airbrake::Error, 'attempted to close already closed sender')
end
end

context "when workers were not spawned" do
it "correctly closes the notifier nevertheless" do
sender = described_class.new(Airbrake::Config.new)
sender.close

expect(sender).to be_closed
end
end
end

describe "#has_workers?" do
it "returns false when the sender is not closed, but has 0 workers" do
sender = described_class.new(Airbrake::Config.new)
expect(sender.has_workers?).to be_truthy
before do
@sender = described_class.new(Airbrake::Config.new)
expect(@sender).to have_workers
end

sender.instance_variable_get(:@workers).list.each(&:kill)
it "returns false when the sender is not closed, but has 0 workers" do
@sender.instance_variable_get(:@workers).list.each(&:kill)
sleep 1
expect(sender.has_workers?).to be_falsey
expect(@sender).not_to have_workers
end

it "returns false when the sender is closed" do
@sender.close
expect(@sender).not_to have_workers
end
end

describe "#spawn_workers" do
it "spawns alive threads in an enclosed ThreadGroup" do
sender = described_class.new(Airbrake::Config.new)
expect(sender.has_workers?).to be_truthy
expect(sender).to have_workers

workers = sender.instance_variable_get(:@workers)

expect(workers).to be_a(ThreadGroup)
expect(workers.list).to all(be_alive)
expect(workers).to be_enclosed

sender.close
end

it "spawns exactly config.workers workers" do
workers_count = 5
sender = described_class.new(Airbrake::Config.new(workers: workers_count))
expect(sender).to have_workers

workers = sender.instance_variable_get(:@workers)

expect(workers.list.size).to eq(workers_count)
sender.close
expect(sender.has_workers?).to be_falsey
end
end
end
57 changes: 35 additions & 22 deletions spec/notifier_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -375,18 +375,40 @@ def to_json(*)

it "falls back to synchronous delivery when the async sender is dead" do
out = StringIO.new
notifier = described_class.new(airbrake_params.merge(logger: Logger.new(out)))
async_sender = notifier.instance_variable_get(:@async_sender)

airbrake = described_class.new(airbrake_params.merge(logger: Logger.new(out)))
airbrake.
instance_variable_get(:@async_sender).
instance_variable_get(:@workers).
list.
each(&:kill)

expect(async_sender).to have_workers
async_sender.instance_variable_get(:@workers).list.each(&:kill)
sleep 1
expect(async_sender).not_to have_workers

expect(airbrake.notify('bingo')).to be_nil
notifier.notify('bango')
expect(out.string).to match(/falling back to sync delivery/)

notifier.close
end

it "respawns workers on fork()", skip: %w(jruby rbx).include?(RUBY_ENGINE) do
out = StringIO.new
notifier = described_class.new(airbrake_params.merge(logger: Logger.new(out)))

notifier.notify('bingo', bingo: 'bango')
sleep 1
expect(out.string).not_to match(/falling back to sync delivery/)
expect_a_request_with_body(/"bingo":"bango"/)

pid = fork do
expect(notifier.instance_variable_get(:@async_sender)).to have_workers
notifier.notify('bango', bongo: 'bish')
sleep 1
expect(out.string).not_to match(/falling back to sync delivery/)
expect_a_request_with_body(/"bingo":"bango"/)
end

Process.wait(pid)
notifier.close
expect(notifier.instance_variable_get(:@async_sender)).not_to have_workers
end
end

Expand Down Expand Up @@ -671,25 +693,16 @@ def to_json(*)
end

describe "#close" do
shared_examples 'close' do |method|
context "when using #notify on a closed notifier" do
it "raises error" do
@airbrake.close
expect { method.call(@airbrake) }.
notifier = described_class.new(airbrake_params)
notifier.close

expect { notifier.notify(AirbrakeTestError.new) }.
to raise_error(Airbrake::Error, /closed Airbrake instance/)
end
end

context "when using #notify" do
include_examples 'close', proc { |a| a.notify(AirbrakeTestError.new) }
end

context "when using #send_notice" do
include_examples 'close', proc { |a|
notice = a.build_notice(AirbrakeTestError.new)
a.send_notice(notice)
}
end

context "at program exit when it was closed manually" do
it "doesn't raise error", skip: RUBY_ENGINE == 'jruby' do
expect do
Expand Down

0 comments on commit e1d70b3

Please sign in to comment.