Skip to content

Commit

Permalink
Merge pull request #500 from airbrake/thread-pool-class
Browse files Browse the repository at this point in the history
async_sender: factor out thread pool to a seprate class
  • Loading branch information
kyrylo authored Sep 26, 2019
2 parents d977196 + bac6f89 commit be051a5
Show file tree
Hide file tree
Showing 6 changed files with 335 additions and 200 deletions.
1 change: 1 addition & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ Style/SignalException:
Naming/PredicateName:
Exclude:
- 'lib/airbrake-ruby/async_sender.rb'
- 'lib/airbrake-ruby/thread_pool.rb'

Metrics/ClassLength:
Max: 120
Expand Down
1 change: 1 addition & 0 deletions lib/airbrake-ruby.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
require 'airbrake-ruby/config'
require 'airbrake-ruby/config/validator'
require 'airbrake-ruby/promise'
require 'airbrake-ruby/thread_pool'
require 'airbrake-ruby/sync_sender'
require 'airbrake-ruby/async_sender'
require 'airbrake-ruby/response'
Expand Down
100 changes: 15 additions & 85 deletions lib/airbrake-ruby/async_sender.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
module Airbrake
# Responsible for sending notices to Airbrake asynchronously. The class
# supports an unlimited number of worker threads and an unlimited queue size
# (both values are configurable).
# Responsible for sending notices to Airbrake asynchronously.
#
# @see SyncSender
# @api private
Expand All @@ -15,25 +13,15 @@ class AsyncSender
"and the following notice will not be delivered " \
"Error: %<type>s - %<message>s\nBacktrace: %<backtrace>s\n".freeze

# @return [ThreadGroup] the list of workers
# @note This is exposed for eaiser unit testing
# @since v4.0.0
attr_reader :workers

# @return [Array<[Airbrake::Notice,Airbrake::Promise]>] the list of unsent
# payload
# @note This is exposed for eaiser unit testing
# @since v4.0.0
attr_reader :unsent

def initialize
@config = Airbrake::Config.instance
@unsent = SizedQueue.new(Airbrake::Config.instance.queue_size)
@sender = SyncSender.new
@closed = false
@workers = ThreadGroup.new
@mutex = Mutex.new
@pid = nil

sender = SyncSender.new
@thread_pool = ThreadPool.new(
worker_size: @config.workers,
queue_size: @config.queue_size,
block: proc { |(notice, promise)| sender.send(notice, promise) }
)
end

# Asynchronously sends a notice to Airbrake.
Expand All @@ -42,101 +30,43 @@ def initialize
# library
# @return [Airbrake::Promise]
def send(notice, promise)
return will_not_deliver(notice, promise) if @unsent.size >= @unsent.max

@unsent << [notice, promise]
return will_not_deliver(notice, promise) unless @thread_pool << [notice, promise]
promise
end

# Closes the instance making it a no-op (it shut downs all worker
# threads). Before closing, waits on all unsent notices to be sent.
#
# @return [void]
# @raise [Airbrake::Error] when invoked more than one time
def close
threads = @mutex.synchronize do
raise Airbrake::Error, 'attempted to close already closed sender' if closed?

unless @unsent.empty?
msg = "#{LOG_LABEL} waiting to send #{@unsent.size} unsent notice(s)..."
logger.debug(msg + ' (Ctrl-C to abort)')
end

@config.workers.times { @unsent << [:stop, Airbrake::Promise.new] }
@closed = true
@workers.list.dup
end

threads.each(&:join)
logger.debug("#{LOG_LABEL} closed")
@thread_pool.close
end

# Checks whether the sender is closed and thus usable.
# @return [Boolean]
def closed?
@closed
@thread_pool.closed?
end

# Checks if an active sender has any workers. A sender doesn't have any
# workers only in two cases: when it was closed or when all workers
# crashed. An *active* sender doesn't have any workers only when something
# went wrong.
#
# Workers are expected to crash when you +fork+ the process the workers are
# living in. In this case we detect a +fork+ and try to revive them here.
#
# Another possible scenario that crashes workers is when you close the
# instance on +at_exit+, but some other +at_exit+ hook prevents the process
# from exiting.
#
# @return [Boolean] true if an instance wasn't closed, but has no workers
# @see https://goo.gl/oydz8h Example of at_exit that prevents exit
# @return [Boolean]
def has_workers?
@mutex.synchronize do
return false if @closed

if @pid != Process.pid && @workers.list.empty?
@pid = Process.pid
spawn_workers
end

!@closed && @workers.list.any?
end
@thread_pool.has_workers?
end

private

def spawn_workers
@workers = ThreadGroup.new
@config.workers.times { @workers.add(spawn_worker) }
@workers.enclose
end

def spawn_worker
Thread.new do
while (message = @unsent.pop)
break if message.first == :stop
@sender.send(*message)
end
end
end

def will_not_deliver(notice, promise)
error = notice[:errors].first

logger.error(
format(
WILL_NOT_DELIVER_MSG,
log_label: LOG_LABEL,
capacity: @unsent.max,
capacity: @config.queue_size,
type: error[:type],
message: error[:message],
backtrace: error[:backtrace].map do |line|
"#{line[:file]}:#{line[:line]} in `#{line[:function]}'"
end.join("\n")
)
)
promise.reject("AsyncSender has reached its capacity of #{@unsent.max}")
promise.reject("AsyncSender has reached its capacity of #{@config.queue_size}")
end
end
end
128 changes: 128 additions & 0 deletions lib/airbrake-ruby/thread_pool.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
module Airbrake
# ThreadPool implements a simple thread pool that can configure the number of
# worker threads and the size of the queue to process.
#
# @example
# # Initialize a new thread pool with 5 workers and a queue size of 100. Set
# # the block to be run concurrently.
# thread_pool = ThreadPool.new(
# worker_size: 5,
# queue_size: 100,
# block: proc { |message| print "ECHO: #{message}..."}
# )
#
# # Send work.
# 10.times { |i| thread_pool << i }
# #=> ECHO: 0...ECHO: 1...ECHO: 2...
#
# @api private
# @since v4.6.1
class ThreadPool
include Loggable

# @return [ThreadGroup] the list of workers
# @note This is exposed for eaiser unit testing
attr_reader :workers

def initialize(worker_size:, queue_size:, block:)
@worker_size = worker_size
@queue_size = queue_size
@block = block

@queue = SizedQueue.new(queue_size)
@workers = ThreadGroup.new
@mutex = Mutex.new
@pid = nil
@closed = false

has_workers?
end

# Adds a new message to the thread pool. Rejects messages if the queue is at
# its capacity.
#
# @param [Object] message The message that gets passed to the block
# @return [Boolean] true if the message was successfully sent to the pool,
# false if the queue is full
def <<(message)
return false if backlog >= @queue_size
@queue << message
true
end

# @return [Integer] how big the queue is at the moment
def backlog
@queue.size
end

# Checks if a thread pool has any workers. A thread pool doesn't have any
# workers only in two cases: when it was closed or when all workers
# crashed. An *active* thread pool doesn't have any workers only when
# something went wrong.
#
# Workers are expected to crash when you +fork+ the process the workers are
# living in. In this case we detect a +fork+ and try to revive them here.
#
# Another possible scenario that crashes workers is when you close the
# instance on +at_exit+, but some other +at_exit+ hook prevents the process
# from exiting.
#
# @return [Boolean] true if an instance wasn't closed, but has no workers
# @see https://goo.gl/oydz8h Example of at_exit that prevents exit
def has_workers?
@mutex.synchronize do
return false if @closed

if @pid != Process.pid && @workers.list.empty?
@pid = Process.pid
spawn_workers
end

!@closed && @workers.list.any?
end
end

# Closes the thread pool making it a no-op (it shut downs all worker
# threads). Before closing, waits on all unprocessed tasks to be processed.
#
# @return [void]
# @raise [Airbrake::Error] when invoked more than one time
def close
threads = @mutex.synchronize do
raise Airbrake::Error, 'this thread pool is closed already' if @closed

unless @queue.empty?
msg = "#{LOG_LABEL} waiting to process #{@queue.size} task(s)..."
logger.debug(msg + ' (Ctrl-C to abort)')
end

@worker_size.times { @queue << :stop }
@closed = true
@workers.list.dup
end

threads.each(&:join)
logger.debug("#{LOG_LABEL} thread pool closed")
end

def closed?
@closed
end

def spawn_workers
@worker_size.times { @workers.add(spawn_worker) }
@workers.enclose
end

private

def spawn_worker
Thread.new do
while (message = @queue.pop)
break if message == :stop
@block.call(message)
end
end
end
end
end
Loading

0 comments on commit be051a5

Please sign in to comment.