You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Emulate a long-running operation in the handle_fn, e.g.
defprocess_links(
context: RequestContext,
body: Any,
message: kombu.Message,
):
sleep(11) # default server_timeout is 10 seconds, so just going over it.
Send the message to the queue.
See that the ServerTimeout error is logged in the consumer
Send another message to the queue
Actual
The message won't be consumed.
Expected
The message is consumed and ServerTimeout error is logged once again.
One thing to note, I think in your proposal, the message still wouldn't be consumed because an error requeue-s the message and in your test code, it's always going to time out.
I think your proposal makes sense, in reality a worker thread crashing should cause the entire queue consumer server to shut down (which should trigger it to restart)... but I think that logic might have the same problem. 😅
Repro steps
Setup consumer via
KombuQueueConsumerFactory.new(...)
like in the docs:https://baseplate.readthedocs.io/en/stable/api/baseplate/frameworks/queue_consumer/kombu.html
Emulate a long-running operation in the
handle_fn
, e.g.Actual
The message won't be consumed.
Expected
The message is consumed and ServerTimeout error is logged once again.
Details
What I found is that ServerTimeout is inherited from BaseException (https://github.com/reddit/baseplate.py/blob/master/baseplate/observers/timeout.py#L18) but MessageHandler handles Exception inherited messages only:
https://github.com/reddit/baseplate.py/blob/master/baseplate/frameworks/queue_consumer/kombu.py#L113
So the ServerTimeout exception is passed all the way through and it crashes the Thread the QueueConsumer is running in: https://github.com/reddit/baseplate.py/blob/master/baseplate/server/queue_consumer.py#L217
Proposal
I propose to handle ServerTimeout in MessageHandler as any other exception.
The text was updated successfully, but these errors were encountered: