Skip to content
This repository has been archived by the owner on Mar 29, 2022. It is now read-only.

Commit

Permalink
Merge pull request #62 from stitchfix/gregg/multiple-routing-keys
Browse files Browse the repository at this point in the history
Add support for multiple routing keys
  • Loading branch information
greggroth authored Dec 18, 2017
2 parents 428dae8 + 6bf59be commit 919bf1d
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 2 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ It requires some environment variables to work:

* `HANDLER_KLASS` (required) refers to the class you have to write in your app (equivalent to a `job` in Resque)
* `QUEUE_NAME` (required) we must use named queues - see below
* `ROUTING_KEY` (optional) defaults to `#.#` (all messages)
* `ROUTING_KEY` (optional) comma separated list of routing keys (e.g. `foo.bar.*,foo.baz.*`). defaults to `#.#` (all messages)
* `PREFETCH` (optional) sets a [prefetch value](http://rubybunny.info/articles/queues.html#qos__prefetching_messages) for the subscriber

You'll also need to bring the Rake task into your app. For Rails, you'll need to edit the top-level `Rakefile`:
Expand Down
2 changes: 1 addition & 1 deletion lib/pwwka/receiver.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def self.subscribe(handler_klass, queue_name, routing_key: "#.#", block: true, p
def topic_queue
@topic_queue ||= begin
queue = channel.queue(queue_name, durable: true, arguments: {})
queue.bind(topic_exchange, routing_key: routing_key)
routing_key.split(',').each { |k| queue.bind(topic_exchange, routing_key: k) }
queue
end
end
Expand Down
8 changes: 8 additions & 0 deletions spec/integration/send_and_receive_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

[AllReceiver , "all_receiver_pwwkatesting" , "#"] ,
[FooReceiver , "foo_receiver_pwwkatesting" , "pwwka.testing.foo"] ,
[MultiRoutingReceived , "multi_routing_receiver_pwwkatesting" , "pwwka.testing.bar,pwwka.testing.foo"] ,
[OtherFooReceiver , "other_foo_receiver_pwwkatesting" , "pwwka.testing.foo"] ,
[Pwwka::QueueResqueJobHandler , "queue_resque_job_handler_pwwkatesting" , "#" ] ,

Expand All @@ -27,6 +28,7 @@
end
AllReceiver.reset!
FooReceiver.reset!
MultiRoutingReceived.reset!
OtherFooReceiver.reset!
clear_queue(:delayed)
clear_queue(MyTestJob)
Expand All @@ -46,6 +48,7 @@

expect(AllReceiver.messages_received.size).to eq(1)
expect(FooReceiver.messages_received.size).to eq(1)
expect(MultiRoutingReceived.messages_received.size).to eq(1)
expect(OtherFooReceiver.messages_received.size).to eq(1)
@testing_setup.queues.each do |queue|
expect(queue.message_count).to eq(0)
Expand All @@ -58,6 +61,7 @@

expect(AllReceiver.messages_received.size).to eq(1)
expect(FooReceiver.messages_received.size).to eq(0)
expect(MultiRoutingReceived.messages_received.size).to eq(1)
expect(OtherFooReceiver.messages_received.size).to eq(0)
@testing_setup.queues.each do |queue|
expect(queue.message_count).to eq(0)
Expand Down Expand Up @@ -200,11 +204,13 @@

expect(AllReceiver.messages_received.size).to eq(0)
expect(FooReceiver.messages_received.size).to eq(0)
expect(MultiRoutingReceived.messages_received.size).to eq(0)
expect(OtherFooReceiver.messages_received.size).to eq(0)

allow_receivers_to_process_queues(5_000)
expect(AllReceiver.messages_received.size).to eq(1)
expect(FooReceiver.messages_received.size).to eq(1)
expect(MultiRoutingReceived.messages_received.size).to eq(1)
expect(OtherFooReceiver.messages_received.size).to eq(1)
end

Expand Down Expand Up @@ -241,4 +247,6 @@ class FooReceiver < AllReceiver
end
class OtherFooReceiver < AllReceiver
end
class MultiRoutingReceived < AllReceiver
end
end

0 comments on commit 919bf1d

Please sign in to comment.