From d5147e44fc62be021f749d696d26a1b735e810df Mon Sep 17 00:00:00 2001 From: Isaac Seymour Date: Thu, 4 Apr 2019 14:30:22 +0100 Subject: [PATCH] [#221] [Message queue] Block to wait for futures on worker stop (@isaacseymour) When running the `mq/stop`, deref all the worker-loop futures. This is helpful if, for example, the worker processing depends on other parts of a component system. At the moment I'm getting errors where the component system shuts down without being aware of the fact that there are still some worker threads running, meaning that the workers try to access closed resources (for example). --- src/taoensso/carmine/message_queue.clj | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/taoensso/carmine/message_queue.clj b/src/taoensso/carmine/message_queue.clj index 34204e03..be74ecf5 100644 --- a/src/taoensso/carmine/message_queue.clj +++ b/src/taoensso/carmine/message_queue.clj @@ -230,13 +230,15 @@ (start [this]) (stop [this])) -(defrecord Worker [conn-opts qname running?_ opts] +(defrecord Worker [conn-opts qname running?_ thread-futures_ opts] java.io.Closeable (close [this] (stop this)) IWorker (stop [_] (when (compare-and-set! running?_ true false) + (timbre/infof "Initiating shutdown of queue worker: %s" qname) + (run! deref @thread-futures_) (timbre/infof "Message queue worker stopped: %s" qname) true)) @@ -284,7 +286,8 @@ (Thread/sleep (exp-backoff (inc nerrors))) (recur (inc nerrors))))))))] - (dorun (repeatedly nthreads (fn [] (future (start-polling-loop!)))))) + (reset! thread-futures_ + (doall (repeatedly nthreads (fn [] (future (start-polling-loop!))))))) true))) @@ -333,7 +336,7 @@ eoq-backoff-ms exp-backoff auto-start true}}]] - (let [w (Worker. conn-opts qname (atom false) + (let [w (Worker. conn-opts qname (atom false) (atom []) {:handler handler :monitor monitor :lock-ms lock-ms