Skip to content

Commit

Permalink
Merge branch 'jbrady42-handler_queue_config'
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelklishin committed Mar 1, 2019
2 parents e8161e8 + 2ca56c8 commit 05dbd9c
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 16 deletions.
14 changes: 2 additions & 12 deletions examples/max_retry_handler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,7 @@
#
class MaxRetryWorker
include Sneakers::Worker
from_queue 'downloads',
WORKER_OPTIONS.merge({
:arguments => {
:'x-dead-letter-exchange' => 'downloads-retry'
},
})
from_queue 'downloads', WORKER_OPTIONS

def work(msg)
logger.info("MaxRetryWorker rejecting msg: #{msg.inspect}")
Expand All @@ -52,12 +47,7 @@ def work(msg)
# see the message once.
class SucceedingWorker
include Sneakers::Worker
from_queue 'uploads',
WORKER_OPTIONS.merge({
:arguments => {
:'x-dead-letter-exchange' => 'uploads-retry'
},
})
from_queue 'uploads', WORKER_OPTIONS

def work(msg)
logger.info("SucceedingWorker succeeding on msg: #{msg.inspect}")
Expand Down
5 changes: 5 additions & 0 deletions lib/sneakers/handlers/maxretry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ def initialize(channel, queue, opts)

end

def self.configure_queue(name, opts)
retry_name = opts.fetch(:retry_exchange, "#{name}-retry")
opts.merge(arguments: { "x-dead-letter-exchange": retry_name })
end

def acknowledge(hdr, props, msg)
@channel.acknowledge(hdr.delivery_tag, false)
end
Expand Down
10 changes: 6 additions & 4 deletions lib/sneakers/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,12 @@ def subscribe(worker)
routing_key = @opts[:routing_key] || @name
routing_keys = [*routing_key]

# TODO: get the arguments from the handler? Retry handler wants this so you
# don't have to line up the queue's dead letter argument with the exchange
# you'll create for retry.
handler_klass = worker.opts[:handler] || Sneakers::CONFIG.fetch(:handler)
# Configure options if needed
if handler_klass.respond_to?(:configure_queue)
@opts[:queue_options] = handler_klass.configure_queue(@name, @opts[:queue_options])
end

queue = @channel.queue(@name, @opts[:queue_options])

if exchange_name.length > 0
Expand All @@ -50,7 +53,6 @@ def subscribe(worker)
# has the same configuration as the worker. Also pass along the exchange and
# queue in case the handler requires access to them (for things like binding
# retry queues, etc).
handler_klass = worker.opts[:handler] || Sneakers::CONFIG.fetch(:handler)
handler = handler_klass.new(@channel, queue, worker.opts)

@consumer = queue.subscribe(block: false, manual_ack: @opts[:ack]) do | delivery_info, metadata, msg |
Expand Down

0 comments on commit 05dbd9c

Please sign in to comment.