Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

notifier: respawn workers if they're all dead #26

Merged
merged 1 commit into from
Jan 14, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ Airbrake Ruby Changelog
([#25](https://github.com/airbrake/airbrake-ruby/pull/25))
* Made sure that generated notices always have a backtrace
([#21](https://github.com/airbrake/airbrake-ruby/pull/21))
* Made the asynchronous delivery mechanism more robust
([#26](https://github.com/airbrake/airbrake-ruby/pull/26))

### [v1.0.2][v1.0.2] (January 3, 2016)

Expand Down
25 changes: 19 additions & 6 deletions lib/airbrake-ruby/async_sender.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ def initialize(config)
@sender = SyncSender.new(config)
@closed = false
@workers = ThreadGroup.new

config.workers.times { @workers.add(spawn_worker) }
@workers.enclose
@pid = nil
end

##
Expand Down Expand Up @@ -67,18 +65,33 @@ def closed?
# went wrong.
#
# Workers are expected to crash when you +fork+ the process the workers are
# living in. Another possible scenario is when you close the instance on
# +at_exit+, but some other +at_exit+ hook prevents the process from
# exiting.
# living in. In this case we detect a +fork+ and try to revive them here.
#
# Another possible scenario that crashes workers is when you close the
# instance on +at_exit+, but some other +at_exit+ hook prevents the process
# from exiting.
#
# @return [Boolean] true if an instance wasn't closed, but has no workers
# @see https://goo.gl/oydz8h Example of at_exit that prevents exit
def has_workers?
return false if @closed

if @pid != Process.pid && @workers.list.empty?
@pid = Process.pid
spawn_workers
end

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should not spawn any workers if notifier is closed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be enough, then?

diff --git a/lib/airbrake-ruby/async_sender.rb b/lib/airbrake-ruby/async_sender.rb
index 35b1aa4..dc3a71d 100644
--- a/lib/airbrake-ruby/async_sender.rb
+++ b/lib/airbrake-ruby/async_sender.rb
@@ -74,7 +74,7 @@ module Airbrake
     def has_workers?
       if @pid != Process.pid && @workers.list.empty?
         @pid = Process.pid
-        spawn_workers
+        spawn_workers unless closed?
       end

       !@closed && @workers.list.any?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not

return false if @closed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also works. My thinking was that in this case @pid won't be updated, but since the sender is closed it doesn't matter. Updated according to your suggestion.


!@closed && @workers.list.any?
end

private

def spawn_workers
@workers = ThreadGroup.new
@config.workers.times { @workers.add(spawn_worker) }
@workers.enclose
end

def spawn_worker
Thread.new do
while (notice = @unsent.pop) != :stop
Expand Down
16 changes: 7 additions & 9 deletions lib/airbrake-ruby/notifier.rb
Original file line number Diff line number Diff line change
Expand Up @@ -127,15 +127,13 @@ def send_notice(exception, params, sender = default_sender)
end

def default_sender
if @async_sender.has_workers?
@async_sender
else
@config.logger.warn(
"#{LOG_LABEL} falling back to sync delivery because there are no " \
"running async workers"
)
@sync_sender
end
return @async_sender if @async_sender.has_workers?

@config.logger.warn(
"#{LOG_LABEL} falling back to sync delivery because there are no " \
"running async workers"
)
@sync_sender
end

def clean_backtrace
Expand Down
125 changes: 71 additions & 54 deletions spec/async_sender_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,52 +3,23 @@
RSpec.describe Airbrake::AsyncSender do
before do
stub_request(:post, /.*/).to_return(status: 201, body: '{}')
@sender = described_class.new(Airbrake::Config.new)
@workers = @sender.instance_variable_get(:@workers)
end

describe "#new" do
context "workers_count parameter" do
let(:new_workers) { 5 }
let(:config) { Airbrake::Config.new(workers: new_workers) }

it "spawns alive threads in an enclosed ThreadGroup" do
expect(@workers).to be_a(ThreadGroup)
expect(@workers.list).to all(be_alive)
expect(@workers).to be_enclosed
end

it "controls the number of spawned threads" do
expect(@workers.list.size).to eq(1)

sender = described_class.new(config)
workers = sender.instance_variable_get(:@workers)

expect(workers.list.size).to eq(new_workers)
sender.close
end
end

context "queue" do
before do
@stdout = StringIO.new
end

let(:notices) { 1000 }

let(:config) do
Airbrake::Config.new(logger: Logger.new(@stdout), workers: 3, queue_size: 10)
end

it "limits the size of the queue, but still sends all notices" do
sender = described_class.new(config)

notices.times { |i| sender.send(i) }
sender.close
describe "#send" do
it "limits the size of the queue, but still sends all notices" do
stdout = StringIO.new
notices_count = 1000
config = Airbrake::Config.new(
logger: Logger.new(stdout), workers: 3, queue_size: 10
)
sender = described_class.new(config)
expect(sender).to have_workers

notices_count.times { |i| sender.send(i) }
sender.close

log = @stdout.string.split("\n")
expect(log.grep(/\*\*Airbrake: \{\}/).size).to eq(notices)
end
log = stdout.string.split("\n")
expect(log.grep(/\*\*Airbrake: \{\}/).size).to eq(notices_count)
end
end

Expand All @@ -57,14 +28,16 @@
@stderr = StringIO.new
config = Airbrake::Config.new(logger: Logger.new(@stderr))
@sender = described_class.new(config)
@workers = @sender.instance_variable_get(:@workers).list
expect(@sender).to have_workers
end

context "when there are no unsent notices" do
it "joins the spawned thread" do
expect(@workers).to all(be_alive)
workers = @sender.instance_variable_get(:@workers).list

expect(workers).to all(be_alive)
@sender.close
expect(@workers).to all(be_stop)
expect(workers).to all(be_stop)
end
end

Expand All @@ -91,31 +64,75 @@

context "when it was already closed" do
it "doesn't increase the unsent queue size" do
@sender.close
begin
@sender.close
rescue Airbrake::Error
nil
end

expect(@sender.instance_variable_get(:@unsent).size).to be_zero
end

it "raises error" do
@sender.close

expect(@sender).to be_closed
expect { @sender.close }.
to raise_error(Airbrake::Error, 'attempted to close already closed sender')
end
end

context "when workers were not spawned" do
it "correctly closes the notifier nevertheless" do
sender = described_class.new(Airbrake::Config.new)
sender.close

expect(sender).to be_closed
end
end
end

describe "#has_workers?" do
it "returns false when the sender is not closed, but has 0 workers" do
sender = described_class.new(Airbrake::Config.new)
expect(sender.has_workers?).to be_truthy
before do
@sender = described_class.new(Airbrake::Config.new)
expect(@sender).to have_workers
end

sender.instance_variable_get(:@workers).list.each(&:kill)
it "returns false when the sender is not closed, but has 0 workers" do
@sender.instance_variable_get(:@workers).list.each(&:kill)
sleep 1
expect(sender.has_workers?).to be_falsey
expect(@sender).not_to have_workers
end

it "returns false when the sender is closed" do
@sender.close
expect(@sender).not_to have_workers
end
end

describe "#spawn_workers" do
it "spawns alive threads in an enclosed ThreadGroup" do
sender = described_class.new(Airbrake::Config.new)
expect(sender.has_workers?).to be_truthy
expect(sender).to have_workers

workers = sender.instance_variable_get(:@workers)

expect(workers).to be_a(ThreadGroup)
expect(workers.list).to all(be_alive)
expect(workers).to be_enclosed

sender.close
end

it "spawns exactly config.workers workers" do
workers_count = 5
sender = described_class.new(Airbrake::Config.new(workers: workers_count))
expect(sender).to have_workers

workers = sender.instance_variable_get(:@workers)

expect(workers.list.size).to eq(workers_count)
sender.close
expect(sender.has_workers?).to be_falsey
end
end
end
57 changes: 35 additions & 22 deletions spec/notifier_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -375,18 +375,40 @@ def to_json(*)

it "falls back to synchronous delivery when the async sender is dead" do
out = StringIO.new
notifier = described_class.new(airbrake_params.merge(logger: Logger.new(out)))
async_sender = notifier.instance_variable_get(:@async_sender)

airbrake = described_class.new(airbrake_params.merge(logger: Logger.new(out)))
airbrake.
instance_variable_get(:@async_sender).
instance_variable_get(:@workers).
list.
each(&:kill)

expect(async_sender).to have_workers
async_sender.instance_variable_get(:@workers).list.each(&:kill)
sleep 1
expect(async_sender).not_to have_workers

expect(airbrake.notify('bingo')).to be_nil
notifier.notify('bango')
expect(out.string).to match(/falling back to sync delivery/)

notifier.close
end

it "respawns workers on fork()", skip: %w(jruby rbx).include?(RUBY_ENGINE) do
out = StringIO.new
notifier = described_class.new(airbrake_params.merge(logger: Logger.new(out)))

notifier.notify('bingo', bingo: 'bango')
sleep 1
expect(out.string).not_to match(/falling back to sync delivery/)
expect_a_request_with_body(/"bingo":"bango"/)

pid = fork do
expect(notifier.instance_variable_get(:@async_sender)).to have_workers
notifier.notify('bango', bongo: 'bish')
sleep 1
expect(out.string).not_to match(/falling back to sync delivery/)
expect_a_request_with_body(/"bingo":"bango"/)
end

Process.wait(pid)
notifier.close
expect(notifier.instance_variable_get(:@async_sender)).not_to have_workers
end
end

Expand Down Expand Up @@ -671,25 +693,16 @@ def to_json(*)
end

describe "#close" do
shared_examples 'close' do |method|
context "when using #notify on a closed notifier" do
it "raises error" do
@airbrake.close
expect { method.call(@airbrake) }.
notifier = described_class.new(airbrake_params)
notifier.close

expect { notifier.notify(AirbrakeTestError.new) }.
to raise_error(Airbrake::Error, /closed Airbrake instance/)
end
end

context "when using #notify" do
include_examples 'close', proc { |a| a.notify(AirbrakeTestError.new) }
end

context "when using #send_notice" do
include_examples 'close', proc { |a|
notice = a.build_notice(AirbrakeTestError.new)
a.send_notice(notice)
}
end

context "at program exit when it was closed manually" do
it "doesn't raise error", skip: RUBY_ENGINE == 'jruby' do
expect do
Expand Down