Skip to content

Commit

Permalink
[#221] [Message queue] Block to wait for futures on worker stop (@isa…
Browse files Browse the repository at this point in the history
…acseymour)

When running the `mq/stop`, deref all the worker-loop futures.

This is helpful if, for example, the worker processing depends on other
parts of a component system. At the moment I'm getting errors where the
component system shuts down without being aware of the fact that there
are still some worker threads running, meaning that the workers try to
access closed resources (for example).
  • Loading branch information
isaacseymour authored and ptaoussanis committed May 11, 2019
1 parent 9b727cd commit d5147e4
Showing 1 changed file with 6 additions and 3 deletions.
9 changes: 6 additions & 3 deletions src/taoensso/carmine/message_queue.clj
Original file line number Diff line number Diff line change
Expand Up @@ -230,13 +230,15 @@
(start [this])
(stop [this]))

(defrecord Worker [conn-opts qname running?_ opts]
(defrecord Worker [conn-opts qname running?_ thread-futures_ opts]

java.io.Closeable (close [this] (stop this))

IWorker
(stop [_]
(when (compare-and-set! running?_ true false)
(timbre/infof "Initiating shutdown of queue worker: %s" qname)
(run! deref @thread-futures_)
(timbre/infof "Message queue worker stopped: %s" qname)
true))

Expand Down Expand Up @@ -284,7 +286,8 @@
(Thread/sleep (exp-backoff (inc nerrors)))
(recur (inc nerrors))))))))]

(dorun (repeatedly nthreads (fn [] (future (start-polling-loop!))))))
(reset! thread-futures_
(doall (repeatedly nthreads (fn [] (future (start-polling-loop!)))))))

true)))

Expand Down Expand Up @@ -333,7 +336,7 @@
eoq-backoff-ms exp-backoff
auto-start true}}]]

(let [w (Worker. conn-opts qname (atom false)
(let [w (Worker. conn-opts qname (atom false) (atom [])
{:handler handler
:monitor monitor
:lock-ms lock-ms
Expand Down

0 comments on commit d5147e4

Please sign in to comment.