Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

channel#queue can return wrong queue after Timeout::Error #558

Closed
sbonebrake opened this issue Aug 3, 2018 · 8 comments
Closed

channel#queue can return wrong queue after Timeout::Error #558

sbonebrake opened this issue Aug 3, 2018 · 8 comments
Assignees
Milestone

Comments

@sbonebrake
Copy link

sbonebrake commented Aug 3, 2018

Reproduced with Bunny 2.11.0 and RabbitMQ 3.7.6

channel#queue will return wrong queue if called after a Timeout::Error occurs during previous #queue call.

We hit this issue where we started getting messages enqueued in the wrong queues. In our logs we found a Timeout::Error with this call stack:

gems/bunny-2.11.0/lib/bunny/concurrent/continuation_queue.rb:39:in `block in poll'
gems/bunny-2.11.0/lib/bunny/concurrent/continuation_queue.rb:30:in `synchronize'
gems/bunny-2.11.0/lib/bunny/concurrent/continuation_queue.rb:30:in `poll'
gems/bunny-2.11.0/lib/bunny/channel.rb:1780:in `wait_on_continuations'
gems/bunny-2.11.0/lib/bunny/channel.rb:1023:in `queue_declare'
gems/bunny-2.11.0/lib/bunny/queue.rb:372:in `declare!'
gems/bunny-2.11.0/lib/bunny/queue.rb:51:in `initialize'
gems/bunny-2.11.0/lib/bunny/channel.rb:427:in `new'
gems/bunny-2.11.0/lib/bunny/channel.rb:427:in `queue'

Our code does not dispose of the Session when this occurs. In this case, the next call to Bunny is another channel#queue for a different queue. During this second call, the wait_on_continuations call returns the DeclareOk from the first call. This causes the first queue to be incorrectly returned from channel#queue, but the channels @queues will contain the correct mapping.

Order of events:

1. Initialize with Bunny.new(url) with all defaults
2. channel = connection.create_channel
3. channel.queue("Bob")
4. Wait 15 Seconds (DEFAULT_CONTINUATION_TIMEOUT) for Timeout::Error from ContinuationQueue#poll
5. App catches Timeout::Error and logs it
6. AMQP returns to reader loop with DeclareOk-Bob sometime after 15s. This pushes on to ContinuationQueue's @q
7. channel.queue("Sally")
8. channel calls wait_on_continuations 
9. AMQP returns DeclareOk-Sally in less than a second. This pushes DeclareOk-Sally on to ContinuationQueue's @q
10. wait_on_continuations pops DeclareOk-Bob from @q
11. queue calls "@name = queue_declare_ok.queue" changing it's own @name from "Sally" to "Bob"
12. channel calls register_queue with queue "Bob".
13. channel#queue returns queue "Bob"

Because of step 11 and 12, subsequent calls to channel.queue("Bob") will be correct. Because of step 9, the first call to any queue will return the previously declared queue. In our case we declare around 30 queues in a row and the first enqueue to each of them ended up publishing to the wrong queue.

There is an implementation of this here: https://github.com/bmorton/bunny_repro
Debug Logs:

D, [2018-08-03T12:55:32.741182 #5445] DEBUG -- : Sent protocol preamble
D, [2018-08-03T12:55:32.745110 #5445] DEBUG -- : Sent connection.start-ok
D, [2018-08-03T12:55:32.746035 #5445] DEBUG -- : Heartbeat interval negotiation: client = server, server = 60, result = 60
I, [2018-08-03T12:55:32.746100 #5445]  INFO -- : Heartbeat interval used (in seconds): 60
D, [2018-08-03T12:55:32.746122 #5445] DEBUG -- : Will use socket read timeout of 132.0
D, [2018-08-03T12:55:32.746168 #5445] DEBUG -- : Initializing channel ID allocator with channel_max = 2047
D, [2018-08-03T12:55:32.746293 #5445] DEBUG -- : Sent connection.tune-ok with heartbeat interval = 60, frame_max = 131072, channel_max = 2047
D, [2018-08-03T12:55:32.746386 #5445] DEBUG -- : Sent connection.open with vhost = /
D, [2018-08-03T12:55:32.747292 #5445] DEBUG -- : Initializing heartbeat sender...
D, [2018-08-03T12:55:32.747572 #5445] DEBUG -- : Allocated channel id: 1
D, [2018-08-03T12:55:32.749232 #5445] DEBUG -- : Session#handle_frame on 1: #<AMQ::Protocol::Channel::OpenOk:0x00007fc761108468 @channel_id="">
I, [2018-08-03T12:55:32.749405 #5445]  INFO -- : Declaring Bob with ch.queue('Bob')...
W, [2018-08-03T12:55:47.752968 #5445]  WARN -- : Exception: Timeout::Error
W, [2018-08-03T12:55:47.753113 #5445]  WARN -- : /Users/scbone/.rvm/gems/ruby-2.3.5/gems/bunny-2.11.0/lib/bunny/concurrent/continuation_queue.rb:39:in `block in poll'
/Users/scbone/.rvm/gems/ruby-2.3.5/gems/bunny-2.11.0/lib/bunny/concurrent/continuation_queue.rb:30:in `synchronize'
/Users/scbone/.rvm/gems/ruby-2.3.5/gems/bunny-2.11.0/lib/bunny/concurrent/continuation_queue.rb:30:in `poll'
/Users/scbone/.rvm/gems/ruby-2.3.5/gems/bunny-2.11.0/lib/bunny/channel.rb:1780:in `wait_on_continuations'
/Users/scbone/.rvm/gems/ruby-2.3.5/gems/bunny-2.11.0/lib/bunny/channel.rb:1023:in `queue_declare'
/Users/scbone/.rvm/gems/ruby-2.3.5/gems/bunny-2.11.0/lib/bunny/queue.rb:372:in `declare!'
/Users/scbone/.rvm/gems/ruby-2.3.5/gems/bunny-2.11.0/lib/bunny/queue.rb:51:in `initialize'
/Users/scbone/.rvm/gems/ruby-2.3.5/gems/bunny-2.11.0/lib/bunny/channel.rb:427:in `new'
/Users/scbone/.rvm/gems/ruby-2.3.5/gems/bunny-2.11.0/lib/bunny/channel.rb:427:in `queue'
app.rb:24:in `block in <main>'
/Users/scbone/.rvm/gems/ruby-2.3.5/gems/toxiproxy-1.0.3/lib/toxiproxy/toxic_collection.rb:24:in `apply'
app.rb:22:in `<main>'
D, [2018-08-03T12:55:47.754481 #5445] DEBUG -- : Session#handle_frame on 1: #<AMQ::Protocol::Queue::DeclareOk:0x00007fc76183bfa0 @queue="Bob", @message_count=0, @consumer_count=0>
D, [2018-08-03T12:55:47.754587 #5445] DEBUG -- : Channel#handle_frame on channel 1: #<AMQ::Protocol::Queue::DeclareOk:0x00007fc76183bfa0 @queue="Bob", @message_count=0, @consumer_count=0>
I, [2018-08-03T12:55:47.754879 #5445]  INFO -- : Declaring Sally with ch.queue('Sally')...
I, [2018-08-03T12:55:47.755185 #5445]  INFO -- : Received Bob for ch.queue('Sally')
I, [2018-08-03T12:55:47.755227 #5445]  INFO -- : Declaring Rick with ch.queue('Rick'))...
D, [2018-08-03T12:55:47.757327 #5445] DEBUG -- : Session#handle_frame on 1: #<AMQ::Protocol::Queue::DeclareOk:0x00007fc761832cc0 @queue="Sally", @message_count=0, @consumer_count=0>
D, [2018-08-03T12:55:47.757403 #5445] DEBUG -- : Channel#handle_frame on channel 1: #<AMQ::Protocol::Queue::DeclareOk:0x00007fc761832cc0 @queue="Sally", @message_count=0, @consumer_count=0>
D, [2018-08-03T12:55:47.757494 #5445] DEBUG -- : Session#handle_frame on 1: #<AMQ::Protocol::Queue::DeclareOk:0x00007fc761831cd0 @queue="Rick", @message_count=0, @consumer_count=0>
D, [2018-08-03T12:55:47.757534 #5445] DEBUG -- : Channel#handle_frame on channel 1: #<AMQ::Protocol::Queue::DeclareOk:0x00007fc761831cd0 @queue="Rick", @message_count=0, @consumer_count=0>
I, [2018-08-03T12:55:47.757592 #5445]  INFO -- : Received Sally for ch.queue('Rick')
I, [2018-08-03T12:55:47.757679 #5445]  INFO -- : Declaring Bob again with ch.queue('Bob'))...
I, [2018-08-03T12:55:47.757710 #5445]  INFO -- : Received Bob for ch.queue('Bob')

The Bunny Changelog says automatic_recovery isn't started on Timeout::Error because "It will be started in the network activity loop anyway". However the reader loop will never get a Timeout::Error if the continuation_timeout (default 15s) is < read_timeout and the DeclareOk is returned between after continuation_timeout and before read_timeout.
https://github.com/ruby-amqp/bunny/blob/master/ChangeLog.md#time-bound-continuations

There's several approaches to fixing this, but the simplest may be to register the queue from any unexpected DeclareOk and try again:

class Channel
  def queue(name = AMQ::Protocol::EMPTY_STRING, opts = {})
    q = nil
    loop do 
      q = find_queue(name) || Bunny::Queue.new(self, name, opts)
      register_queue(q)
      break if q.name == name || name == AMQ::Protocol::EMPTY_STRING
    end
    q
  end
end
@michaelklishin
Copy link
Member

michaelklishin commented Aug 3, 2018

If there was a channel operation timeout we should clear all pending continuations/responses and register no queues. That's it. A PR would be welcome.

@michaelklishin
Copy link
Member

What complicates things is that a response might arrive a moment after a timeout was registered by the channel. In that case the response should be thrown away but it's not easy to know what's the originating response was. For server-named queues the specified name is always an empty string and the response is a generated name, for example.

Clearing continuation response before starting a new continuation might be sufficient to avoid confusing behavior in that scenario.

@sbonebrake
Copy link
Author

Yeah, I fee like there's probably many scenarios where the reader loop may get a responses from another message after a Timeout. Rather than trying to ensure the continuation queues are always correct after various timeouts, I think doing a higher level sanity check that the queue.name returned from the channel#queue if the queue name isn't empty (server named). An exception from channel#queue is better than returning the wrong queue.

@michaelklishin
Copy link
Member

Sorry but I'm not interested in sanity checks and workarounds. Those can be introduced in your own code.

Bunny channels are not supposed to be shared between threads so it should be possible to clear continuations on timeouts.

@michaelklishin
Copy link
Member

Curiously Java client now uses a variation of the suggested "sanity check" mechanism: all responses are checked for "compatibility" with the outstanding continuation and the incompatible ones are ignored. So maybe doing that is not such a bad idea after all.

@michaelklishin
Copy link
Member

@sbonebrake can you please give #566 a try? It seems to do something sensible in the provided repro scenario but with timeouts, delays and such it's not always obvious what that looks like :)

@olivierlacan
Copy link

I just encountered this issue in 2.9 and upgraded to 2.12 to see if I could reproduce it and sadly I still am in a setup with a share connection and a shared channel.

So this new logic may be useful in some cases, but (understandably) does not protect you in a situation where for example you have:

  • A singleton class storing a Bunny connection and a single channel
  • Sidekiq workers using that channel to declare an exchange and queue
  • Those same workers (concurrently) attempting to publish & confirm more than one message

What happens in this case is similar to what @sbonebrake described except it all starts with a NoMethodError: undefined method queue' for #AMQ::Protocol::Exchange::DeclareOk:0x00007fa8457b76c8inBunny::Queue.declare!` in the second worker to attempt to publish a message using the shared connection & channel.

This occurs in multiple workers until at some point the continuation timeout is reached and they all starts throwing Timeouts — which is initially confusing.

The simplest solution for me was to stop sharing the channel across threads as it's clearly recommended against in Bunny's concurrency documentation but it does seem like something people may not realize unless they happen upon that article. It took me a few days to question why we were sharing the channel. I first thought it was a misguided attempt to be thrifty but with 30 concurrent workers in production channel usage can become fairly taxing on a rabbit cluster, which some people definitely may want to avoid.

I don't know if there's a simple way to detect that the channel is shared between more than a single thread an alert users that it's a bad idea. Or if @michaelklishin you'd prefer follow the example of the Java client and try to handle such out-of-order situations. I'd love to help with repro and implementation if I can.

@michaelklishin
Copy link
Member

There is a fundamental problem with sharing a channel. Some protocol methods are synchronous (expect a response) and a channel cannot have two pending operations — or more, since Bunny API encourages method chaining — and have a clear way of figuring out what the user would expect. Heck, most users have no idea how some concurrent scenarios should work, they want the maintainer to pull a magic rabbit (no pun intended) out of the hat.

What Java client does and this client does for queue.declare is dropping responses. It DOES NOT solve the problem, it just replaces it with a less visible one. I don't expect a fundamental solution to emerge any time soon since a lot of pretty smart people who contributed to clients with way better concurrency libraries available [compared to Ruby] still haven't arrived at solution that's better than dropping responses.

Don't share channels between threads. If you don't read the docs and run into issues that have been documented for years, you can only blame yourself.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants