From 4195a0523696a9c0b8cdf72d87bc02be2063760e Mon Sep 17 00:00:00 2001 From: Peter Taoussanis Date: Sun, 4 Dec 2022 15:02:11 +0100 Subject: [PATCH] [mod] [new] [mq] [#278] NB rewrite message queue Introduces a significant rewrite of major parts of Carmine's message queue. The underlying architecture remains unchanged. The API remains MOSTLY unchanged: a small number of users may be affected, please see the changes below. CHANGES - [BREAKING] `enqueue` return value has changed: It used to return or {:carmine.mq/error }. It now always returns a map with possible keys: [mid action error]. Please see docstring for more info. - Improved error messages and logging output. NEW - Added admin utils: `queue-names`, `clear-all-queues`. - Added `enqueue` option: `:lock-ms` to support per-message lock times [#223]. - Added `enqueue` option: `:can-update?` to support message updating. - Handler fn data now includes `:age-ms` to support integration with Tufte or other profiling tools. - `queue-status` util now includes a `:by-mid` { } hash. - Worker object's string/pprint representation is now more useful. - Worker object can now be dereffed to get useful state and stats. - Worker threads are now automatically desynchronized to reduce contention. - Improved docstrings and mq architecture documentation. - General improvements to implementation, debuggability, and tests. --- src/taoensso/carmine/lua/mq/dequeue.lua | 143 +-- src/taoensso/carmine/lua/mq/enqueue.lua | 152 ++-- src/taoensso/carmine/lua/mq/msg-status.lua | 53 +- src/taoensso/carmine/message_queue.clj | 844 ++++++++++++------ test/taoensso/carmine/tests/message_queue.clj | 463 +++++----- 5 files changed, 1002 insertions(+), 653 deletions(-) diff --git a/src/taoensso/carmine/lua/mq/dequeue.lua b/src/taoensso/carmine/lua/mq/dequeue.lua index d3eeab2e..9fc930df 100644 --- a/src/taoensso/carmine/lua/mq/dequeue.lua +++ b/src/taoensso/carmine/lua/mq/dequeue.lua @@ -1,81 +1,112 @@ -if redis.call('exists', _:qk-eoq-backoff) == 1 then - return 'eoq-backoff'; +-- Return e/o {'sleep' ...}, {'skip' ...}, {'handle' ...}, {'unexpected' ...} + +if (redis.call('exists', _:qk-eoq-backoff) == 1) then + return {'sleep', 'eoq-backoff', redis.call('pttl', _:qk-eoq-backoff)}; end --- TODO Waiting for Lua brpoplpush support to get us long polling local mid = redis.call('rpoplpush', _:qk-mid-circle, _:qk-mid-circle); local now = tonumber(_:now); -if (not mid) or (mid == 'end-of-circle') then -- Uninit'd or eoq +if ((not mid) or (mid == 'end-of-circle')) then -- Uninit'd or eoq -- Calculate eoq_backoff_ms - local ndry_runs = tonumber(redis.call('get', _:qk-ndry-runs) or 0); - local eoq_ms_tab = {_:eoq-ms0, _:eoq-ms1, _:eoq-ms2, _:eoq-ms3, _:eoq-ms4}; + local ndry_runs = tonumber(redis.call('get', _:qk-ndry-runs)) or 0; + local eoq_ms_tab = {_:eoq-bo1, _:eoq-bo2, _:eoq-bo3, _:eoq-bo4, _:eoq-bo5}; local eoq_backoff_ms = tonumber(eoq_ms_tab[math.min(5, (ndry_runs + 1))]); -- Set queue-wide polling backoff flag redis.call('psetex', _:qk-eoq-backoff, eoq_backoff_ms, 'true'); redis.call('incr', _:qk-ndry-runs); - return 'eoq-backoff'; + return {'sleep', 'end-of-circle', eoq_backoff_ms}; end -- From msg_status.lua --------------------------------------------------------- --- local mid = _:mid; -local now = tonumber(_:now); -local lock_exp = tonumber(redis.call('hget', _:qk-locks, mid)) or 0; -local backoff_exp = tonumber(redis.call('hget', _:qk-backoffs, mid)) or 0; -local state = nil; - -if redis.call('hexists', _:qk-messages, mid) == 1 then - if redis.call('sismember', _:qk-done, mid) == 1 then - if (now < backoff_exp) then - if redis.call('sismember', _:qk-requeue, mid) == 1 then - state = 'done-with-requeue'; - else - state = 'done-with-backoff'; - end - else - state = 'done-awaiting-gc'; - end - else - if (now < lock_exp) then - if redis.call('sismember', _:qk-requeue, mid) == 1 then - state = 'locked-with-requeue'; - else - state = 'locked'; - end - elseif (now < backoff_exp) then - state = 'queued-with-backoff'; - else - state = 'queued'; - end - end -end +local now = tonumber(_:now); + +local status = nil; -- base status e/o nil, done, queued, locked +local is_bo = false; -- backoff flag for: done, queued +local is_rq = false; -- requeue flag for: done, locked +-- 8x cases: nil, done(bo/rq), queued(bo), locked(rq) +-- Describe with: {status, $bo, $rq} with $ prefixes e/o: _, +, -, * + +if (redis.call('hexists', _:qk-messages, mid) == 1) then + local exp_lock = tonumber(redis.call('hget', _:qk-locks, mid)) or 0; + local exp_bo = tonumber(redis.call('hget', _:qk-backoffs, mid)) or 0; + + is_bo = (now < exp_bo); + is_rq = (redis.call('sismember', _:qk-requeue, mid) == 1) or -- Deprecated + (redis.call('hexists', _:qk-messages-rq, mid) == 1); --- return state; + if (redis.call('sismember', _:qk-done, mid) == 1) then status = 'done'; + elseif (now < exp_lock) then status = 'locked'; + else status = 'queued'; end +else + status = 'nx'; +end -------------------------------------------------------------------------------- -if (state == 'locked') or - (state == 'locked-with-requeue') or - (state == 'queued-with-backoff') or - (state == 'done-with-backoff') then - return nil; +if (status == 'nx') then return {'skip', 'unexpected'}; +elseif (status == 'locked') then return {'skip', 'locked'}; +elseif ((status == 'queued') and is_bo) then return {'skip', 'queued-with-backoff'}; +elseif ((status == 'done') and is_bo) then return {'skip', 'done-with-backoff'}; end redis.call('set', _:qk-ndry-runs, 0); -- Doing useful work -if (state == 'done-awaiting-gc') then - redis.call('hdel', _:qk-messages, mid); - redis.call('hdel', _:qk-locks, mid); - redis.call('hdel', _:qk-backoffs, mid); - redis.call('hdel', _:qk-nattempts, mid); - redis.call('ltrim', _:qk-mid-circle, 1, -1); - redis.call('srem', _:qk-done, mid); - return nil; +if (status == 'done') then + if is_rq then + -- {done, -bo, +rq} -> requeue now + local mcontent = + redis.call('hget', _:qk-messages-rq, mid) or + redis.call('hget', _:qk-messages, mid); -- Deprecated (for qk-requeue) + + local lock_ms = + tonumber(redis.call('hget', _:qk-lock-times-rq, mid)) or + tonumber(_:default-lock-ms); + + redis.call('hset', _:qk-messages, mid, mcontent); + redis.call('hset', _:qk-udts, mid, now); + + if lock_ms then + redis.call('hset', _:qk-lock-times, mid, lock_ms); + else + redis.call('hdel', _:qk-lock-times, mid); + end + + redis.call('hdel', _:qk-messages-rq, mid); + redis.call('hdel', _:qk-lock-times-rq, mid); + redis.call('hdel', _:qk-nattempts, mid); + redis.call('srem', _:qk-done, mid); + redis.call('srem', _:qk-requeue, mid); + return {'skip', 'did-requeue'}; + else + -- {done, -bo, -rq} -> full GC now + redis.call('hdel', _:qk-messages, mid); + redis.call('hdel', _:qk-messages-rq, mid); + redis.call('hdel', _:qk-lock-times, mid); + redis.call('hdel', _:qk-lock-times-rq, mid); + redis.call('hdel', _:qk-udts, mid); + redis.call('hdel', _:qk-locks, mid); + redis.call('hdel', _:qk-backoffs, mid); + redis.call('hdel', _:qk-nattempts, mid); + redis.call('srem', _:qk-done, mid); + redis.call('srem', _:qk-requeue, mid); + redis.call('ltrim', _:qk-mid-circle, 1, -1); + return {'skip', 'did-gc'}; + end +elseif (status == 'queued') then + -- {queued, -bo, _rq} -> handle now + local lock_ms = + tonumber(redis.call('hget', _:qk-lock-times, mid)) or + tonumber(_:default-lock-ms); + + redis.call('hset', _:qk-locks, mid, now + lock_ms); -- Acquire + local mcontent = redis.call('hget', _:qk-messages, mid); + local udt = redis.call('hget', _:qk-udts, mid); + local nattempts = redis.call('hincrby', _:qk-nattempts, mid, 1); + + return {'handle', mid, mcontent, nattempts, lock_ms, tonumber(udt)}; end -redis.call('hset', _:qk-locks, mid, now + tonumber(_:lock-ms)); -- Acquire -local mcontent = redis.call('hget', _:qk-messages, mid); -local nattempts = redis.call('hincrby', _:qk-nattempts, mid, 1); -return {mid, mcontent, nattempts}; +return {'unexpected'}; diff --git a/src/taoensso/carmine/lua/mq/enqueue.lua b/src/taoensso/carmine/lua/mq/enqueue.lua index 998411ef..5e6af69f 100644 --- a/src/taoensso/carmine/lua/mq/enqueue.lua +++ b/src/taoensso/carmine/lua/mq/enqueue.lua @@ -1,70 +1,116 @@ -- From msg_status.lua --------------------------------------------------------- -local mid = _:mid; -local now = tonumber(_:now); -local lock_exp = tonumber(redis.call('hget', _:qk-locks, mid)) or 0; -local backoff_exp = tonumber(redis.call('hget', _:qk-backoffs, mid)) or 0; -local state = nil; - -if redis.call('hexists', _:qk-messages, mid) == 1 then - if redis.call('sismember', _:qk-done, mid) == 1 then - if (now < backoff_exp) then - if redis.call('sismember', _:qk-requeue, mid) == 1 then - state = 'done-with-requeue'; - else - state = 'done-with-backoff'; - end - else - state = 'done-awaiting-gc'; - end +local mid = _:mid; +local now = tonumber(_:now); + +local status = nil; -- base status e/o nil, done, queued, locked +local is_bo = false; -- backoff flag for: done, queued +local is_rq = false; -- requeue flag for: done, locked +-- 8x cases: nil, done(bo/rq), queued(bo), locked(rq) +-- Describe with: {status, $bo, $rq} with $ prefixes e/o: _, +, -, * + +if (redis.call('hexists', _:qk-messages, mid) == 1) then + local exp_lock = tonumber(redis.call('hget', _:qk-locks, mid)) or 0; + local exp_bo = tonumber(redis.call('hget', _:qk-backoffs, mid)) or 0; + + is_bo = (now < exp_bo); + is_rq = (redis.call('sismember', _:qk-requeue, mid) == 1) or -- Deprecated + (redis.call('hexists', _:qk-messages-rq, mid) == 1); + + if (redis.call('sismember', _:qk-done, mid) == 1) then status = 'done'; + elseif (now < exp_lock) then status = 'locked'; + else status = 'queued'; end +else + status = 'nx'; +end +-------------------------------------------------------------------------------- +-- Return {action, error} + +local reset_in_queue = function() + redis.call('hset', _:qk-messages, _:mid, _:mcnt); + redis.call('hsetnx', _:qk-udts, _:mid, now); + + local lock_ms = tonumber(_:lock-ms); + if (lock_ms ~= -1) then + redis.call('hset', _:qk-lock-times, _:mid, lock_ms); else - if (now < lock_exp) then - if redis.call('sismember', _:qk-requeue, mid) == 1 then - state = 'locked-with-requeue'; - else - state = 'locked'; - end - elseif (now < backoff_exp) then - state = 'queued-with-backoff'; - else - state = 'queued'; - end + redis.call('hdel', _:qk-lock-times, _:mid); end end --- return state; --------------------------------------------------------------------------------- +local reset_in_requeue = function() + redis.call('hset', _:qk-messages-rq, _:mid, _:mcnt); -if (state == 'done-awaiting-gc') or - ((state == 'done-with-backoff') and (_:allow-requeue? == 'true')) -then - redis.call('hdel', _:qk-nattempts, _:mid); - redis.call('srem', _:qk-done, _:mid); - return {_:mid}; + local lock_ms = tonumber(_:lock-ms); + if (lock_ms ~= -1) then + redis.call('hset', _:qk-lock-times-rq, _:mid, lock_ms); + else + redis.call('hdel', _:qk-lock-times-rq, _:mid); + end end -if (state == 'locked') and (_:allow-requeue? == 'true') and - (redis.call('sismember', _:qk-requeue, _:mid) ~= 1) -then - redis.call('sadd', _:qk-requeue, _:mid); - return {_:mid}; +local can_upd = (_:can-upd? == '1'); +local can_rq = (_:can-rq? == '1'); + +local ensure_update_in_requeue = function() + if is_rq then + if can_upd then + reset_in_requeue(); + return {'updated'}; + else + return {false, 'already-queued'}; + end + else + reset_in_requeue(); + return {'added'}; + end end -if state == nil then - redis.call('hset', _:qk-messages, _:mid, _:mcontent); +if (status == 'nx') then + -- {nil, _bo, _rq} -> add to queue - -- lpushnx end-of-circle marker to ensure an initialized mid-circle - if redis.call('exists', _:qk-mid-circle) ~= 1 then - redis.call('lpush', _:qk-mid-circle, 'end-of-circle'); + -- Set the initial backoff if requested + local init_bo = tonumber(_:init-bo); + if (init_bo ~= 0) then + redis.call('hset', _:qk-backoffs, _:mid, now + init_bo); end - -- Set the initial backoff if requested - local initial_backoff_ms = tonumber(_:initial-backoff-ms); - if (initial_backoff_ms ~= 0) then - redis.call('hset', _:qk-backoffs, _:mid, now + initial_backoff_ms); + -- Ensure that mid-circle is initialized + if redis.call('exists', _:qk-mid-circle) ~= 1 then + redis.call('lpush', _:qk-mid-circle, 'end-of-circle'); end redis.call('lpush', _:qk-mid-circle, _:mid); - return {_:mid}; -else - return state; -- Reject + reset_in_queue(); + return {'added'}; + +elseif (status == 'queued') then + if can_upd then + -- {queued, *bo, _rq} -> update in queue + reset_in_queue(); + return {'updated'}; + else + return {false, 'already-queued'}; + end +elseif (status == 'locked') then + if can_rq then + -- {locked, _bo, *rq} -> ensure/update in requeue + return ensure_update_in_requeue(); + else + return {false, 'locked'}; + end +elseif (status == 'done') then + if is_bo then + if can_rq then + -- {done, +bo, *rq} -> ensure/update in requeue + return ensure_update_in_requeue(); + else + return {false, 'backoff'}; + end + else + -- {done, -bo, *rq} -> ensure/update in requeue + -- (We're appropriating the requeue mechanism here) + return ensure_update_in_requeue(); + end end + +return {false, 'unexpected'}; diff --git a/src/taoensso/carmine/lua/mq/msg-status.lua b/src/taoensso/carmine/lua/mq/msg-status.lua index 383d6c6e..1208d1ce 100644 --- a/src/taoensso/carmine/lua/mq/msg-status.lua +++ b/src/taoensso/carmine/lua/mq/msg-status.lua @@ -1,35 +1,26 @@ --- Careful, logic here's subtle! See state diagram for assistance. +-- Careful! Logic here is subtle, see mq-diagram.svg for assistance. +local mid = _:mid; +local now = tonumber(_:now); -local mid = _:mid; -local now = tonumber(_:now); -local lock_exp = tonumber(redis.call('hget', _:qk-locks, mid)) or 0; -local backoff_exp = tonumber(redis.call('hget', _:qk-backoffs, mid)) or 0; -local state = nil; +local status = nil; -- base status e/o nil, done, queued, locked +local is_bo = false; -- backoff flag for: done, queued +local is_rq = false; -- requeue flag for: done, locked +-- 8x cases: nil, done(bo/rq), queued(bo), locked(rq) +-- Describe with: {status, $bo, $rq} with $ prefixes e/o: _, +, -, * -if redis.call('hexists', _:qk-messages, mid) == 1 then - if redis.call('sismember', _:qk-done, mid) == 1 then - if (now < backoff_exp) then - if redis.call('sismember', _:qk-requeue, mid) == 1 then - state = 'done-with-requeue'; - else - state = 'done-with-backoff'; - end - else - state = 'done-awaiting-gc'; - end - else - if (now < lock_exp) then - if redis.call('sismember', _:qk-requeue, mid) == 1 then - state = 'locked-with-requeue'; - else - state = 'locked'; - end - elseif (now < backoff_exp) then - state = 'queued-with-backoff'; - else - state = 'queued'; - end - end +if (redis.call('hexists', _:qk-messages, mid) == 1) then + local exp_lock = tonumber(redis.call('hget', _:qk-locks, mid)) or 0; + local exp_bo = tonumber(redis.call('hget', _:qk-backoffs, mid)) or 0; + + is_bo = (now < exp_bo); + is_rq = (redis.call('sismember', _:qk-requeue, mid) == 1) or -- Deprecated + (redis.call('hexists', _:qk-messages-rq, mid) == 1); + + if (redis.call('sismember', _:qk-done, mid) == 1) then status = 'done'; + elseif (now < exp_lock) then status = 'locked'; + else status = 'queued'; end +else + status = 'nx'; end -return state; +return {status, is_bo, is_rq}; diff --git a/src/taoensso/carmine/message_queue.clj b/src/taoensso/carmine/message_queue.clj index db69d2d9..2da3be95 100644 --- a/src/taoensso/carmine/message_queue.clj +++ b/src/taoensso/carmine/message_queue.clj @@ -1,30 +1,50 @@ (ns taoensso.carmine.message-queue - "Carmine-backed Clojure message queue. All heavy lifting by Redis. - Message circle architecture used here is simple, reliable, and has - reasonable throughput but at best mediocre latency. - - Redis keys: - * carmine:mq::messages - hash, {mid mcontent}. - * carmine:mq::locks - hash, {mid lock-expiry-time}. - * carmine:mq::backoffs - hash, {mid backoff-expiry-time}. - * carmine:mq::nattempts - hash, {mid attempt-count}. - * carmine:mq::mid-circle - list, rotating list of mids (next on right). - * carmine:mq::done - set, awaiting gc, requeue, etc. - * carmine:mq::requeue - set, for `allow-requeue?` option. - * carmine:mq::eoq-backoff? - ttl flag, used for queue-wide - (every-worker) polling backoff. - * carmine:mq::ndry-runs - int, number of times worker(s) have - burnt through queue w/o work to do. - - Ref. http://antirez.com/post/250 for basic implementation details" + "Carmine-backed Clojure message queue, v2. + All heavy lifting by Redis. + + Uses a message circle architecture that is simple, reliable, and + has reasonable throughput but only moderate latency [1]. + + See `mq-diagram.svg` in repo for diagram of architecture, + Ref. http://antirez.com/post/250 for initial inspiration. + + Message status e/o: + :nil - Not in queue or already GC'd + :queued - Awaiting handler + :queued-with-backoff - Awaiting handler, but skip until backoff expired + :locked - Currently with handler + :locked-with-requeue - Currently with handler, will requeue when done + :done-awaiting-gc - Finished handling, awaiting GC + :done-with-backoff - Finished handling, awaiting GC, + but skip until dedupe backoff expired + :done-with-requeue - Will requeue, but skip until dedupe backoff expired + + Redis keys (all prefixed with `carmine:mq::`): + * messages - hash: {mid mcontent} ; Message content + * messages-rq - hash: {mid mcontent} ; '' for requeues + * lock-times - hash: {mid lock-ms} ; Optional mid-specific lock duration + * lock-times-rq - hash: {mid lock-ms} ; '' for requeues + * udts - hash: {mid udt-first-enqueued} + * locks - hash: {mid lock-expiry-time} ; Active locks + * backoffs - hash: {mid backoff-expiry-time} ; Active backoffs + * nattempts - hash: {mid attempt-count} + * done - mid set: awaiting gc, etc. + * requeue - mid set: awaiting requeue ; Deprecated + + * mid-circle - list: rotating list of mids (next on right) + * ndry-runs - int: num times worker(s) have lapped queue w/o work to do + * eoq-backoff? - ttl flag: for global (every-worker) polling backoff" + {:author "Peter Taoussanis (@ptaoussanis)"} - (:require [clojure.string :as str] - [taoensso.encore :as enc] - [taoensso.timbre :as timbre] - [taoensso.carmine :as car :refer (wcar)])) + (:require + [clojure.string :as str] + [taoensso.encore :as enc] + [taoensso.timbre :as timbre] + [taoensso.carmine :as car :refer [wcar]])) -;; TODO Consider adding a size-1 blocking list for await/notify style -;; backoff? Would allow us to eliminate non-maintenance polling. +;;;; TODO/later +;; - New docs + examples in v4 Wiki. +;; - Support cbs? (Could decouple Timbre) ;;;; Utils @@ -34,282 +54,530 @@ (def ^:private qkey (enc/fmemoize (partial car/key :carmine :mq))) -(comment (qkey :foo)) +(comment (enc/qb 1e6 (qkey :foo))) ;;;; Admin -(defn clear-queues [conn-opts & qnames] - (when (seq qnames) - (wcar conn-opts - (enc/run! - (fn [qname] - (let [qk (partial qkey qname)] - (car/del - (qk :messages) - (qk :locks) - (qk :backoffs) - (qk :nattempts) - (qk :mid-circle) - (qk :done) - (qk :requeue) - (qk :eoq-backoff?) - (qk :ndry-runs)))) - qnames)))) - -(defn- kvs->map [kvs] (enc/reduce-kvs assoc {} kvs)) - -(defn queue-status [conn-opts qname] - (let [qk (partial qkey qname)] - (zipmap [:last-mid :next-mid :messages :locks :backoffs :nattempts - :mid-circle :done :requeue :eoq-backoff? :ndry-runs] - (wcar conn-opts - (car/lindex (qk :mid-circle) 0) - (car/lindex (qk :mid-circle) -1) - (car/parse kvs->map - (car/hgetall (qk :messages)) - (car/hgetall (qk :locks)) - (car/hgetall (qk :backoffs)) - (car/hgetall (qk :nattempts))) - (car/lrange (qk :mid-circle) 0 -1) - (->> (car/smembers (qk :done)) (car/parse set)) - (->> (car/smembers (qk :requeue)) (car/parse set)) - (->> (car/get (qk :eoq-backoff?)) (car/parse-bool)) ; Give TTL? - (->> (car/get (qk :ndry-runs)) (car/parse-int)))))) +(defn queue-names + "Returns a non-empty set of existing queue names, or nil." + ([conn-opts ] (queue-names conn-opts "*")) + ([conn-opts pattern] + (when-let [qks (not-empty (car/scan-keys conn-opts (qkey pattern)))] + (let [qk-prefix-len (inc (count (qkey)))] ; "carmine:mq:", etc. + (into #{} (map #(enc/get-substr-by-idx % qk-prefix-len)) qks))))) + +(comment (queue-names {})) + +(defn clear-queues + "Deletes ALL content for the Carmine message queues with given names." + {:arglists '([conn-opts qnames])} + [conn-opts & more] + (let [qnames ; Back compatibility + (when-let [[x1] more] + (if (coll? x1) ; Common case (new API) + x1 + more))] + + (when (seq qnames) + (wcar conn-opts + (enc/run! + (fn [qname] + (when qname + (let [qk (partial qkey qname)] + (car/del + (qk :messages) + (qk :messages-rq) + (qk :lock-times) + (qk :lock-times-rq) + (qk :udts) + (qk :locks) + (qk :backoffs) + (qk :nattempts) + (qk :done) + (qk :requeue) + (qk :mid-circle) + (qk :ndry-runs) + (qk :eoq-backoff?))))) + qnames))))) + +(defn clear-all-queues + "Deletes ALL content for ALL Carmine message queues and returns a + non-empty vector of the queue names that were cleared, or nil." + [conn-opts] + (when-let [qnames (queue-names conn-opts "*")] + (clear-queues conn-opts qnames) + (do qnames))) + +(defn- kvs->map [kvs] + (if (empty? kvs) + {} + (persistent! (enc/reduce-kvs assoc! (transient {}) kvs)))) + +(defn- ->message-status + "[ ] -> " + ([?base-status-str backoff? requeue?] + (case ?base-status-str + "nx" nil + "queued" (if backoff? :queued-with-backoff :queued) + "locked" (if requeue? :locked-with-requeue :locked) + "done" + (enc/cond + requeue? :done-with-requeue + backoff? :done-with-backoff + :else :done-awaiting-gc) + (do :unknown))) + + ([reply] + (if (vector? reply) + (let [[?bss bo? rq?] reply] (->message-status ?bss bo? rq?)) + :unknown))) + +(comment (->message-status ["queued" "1" nil])) +(comment (wcar {} (message-status "qname" "mid1"))) + +(defn queue-status + "Returns a detailed status map for given named queue. + Expensive, O(n-items-in-queue) - avoid use in production." + ([conn-opts qname ] (queue-status conn-opts qname nil)) + ([conn-opts qname opts] + (let [now (enc/now-udt) + qk (partial qkey qname) + {:keys [incl-legacy-data?] + :or {incl-legacy-data? true}} opts + + m + (zipmap + [:messages :messages-rq + :lock-times :lock-times-rq + :udts :locks :backoffs :nattempts + :done :requeue + :mid-circle :eoq-backoff? :ndry-runs] + + (wcar conn-opts + (car/parse kvs->map + (car/hgetall (qk :messages)) + (car/hgetall (qk :messages-rq)) + (car/hgetall (qk :lock-times)) + (car/hgetall (qk :lock-times-rq)) + (car/hgetall (qk :udts)) + (car/hgetall (qk :locks)) + (car/hgetall (qk :backoffs)) + (car/hgetall (qk :nattempts))) + + (->> (car/smembers (qk :done)) (car/parse set)) + (->> (car/smembers (qk :requeue)) (car/parse set)) + + (do (car/lrange (qk :mid-circle) 0 -1)) + (->> (car/pttl (qk :eoq-backoff?)) (car/parse enc/as-?pos-int)) ; ?ttl + (->> (car/get (qk :ndry-runs)) (car/parse-int)))) + + {:keys [messages messages-rq + lock-times lock-times-rq + udts locks backoffs nattempts + done requeue + mid-circle]} m] + + (assoc + (if incl-legacy-data? + (do m) + (select-keys m [:mid-circle :eoq-backoff? :ndry-runs])) + + :last-mid (first mid-circle) + :next-mid (peek mid-circle) + :by-mid ; { {:keys [message status ...]}} + (persistent! + (reduce-kv + (fn [m mid mcontent] + (assoc! m mid + (let [exp-lock (enc/as-int (get locks mid 0)) + exp-backoff (enc/as-int (get backoffs mid 0)) + + locked? (< now exp-lock) + backoff? (< now exp-backoff) + done? (contains? done mid) + requeue? + (or + (contains? messages-rq mid) + (contains? requeue mid)) + + backoff-ms (when backoff? (- exp-backoff now)) + age-ms (when-let [udt (get udts mid)] + (- now (enc/as-int udt))) + + base-status + (enc/cond + done? "done" + locked? "locked" + :else "queued")] + + (enc/assoc-some + {:message mcontent + :status (->message-status base-status backoff? requeue?) + :nattempts (get nattempts mid 0)} + :backoff-ms backoff-ms + :age-ms age-ms)))) + + (transient messages) + (do messages))))))) ;;;; Implementation -(def message-status +(do ; Lua scripts + (def lua-msg-status_ (delay (enc/have (enc/slurp-resource "taoensso/carmine/lua/mq/msg-status.lua")))) + (def lua-enqueue_ (delay (enc/have (enc/slurp-resource "taoensso/carmine/lua/mq/enqueue.lua")))) + (def lua-dequeue_ (delay (enc/have (enc/slurp-resource "taoensso/carmine/lua/mq/dequeue.lua"))))) + +(defn message-status "Returns current message status, e/o: - :queued - Awaiting handler. - :queued-with-backoff - Awaiting rehandling. - :locked - Currently with handler. - :locked-with-requeue - Currently with handler, will requeue on success. - :done-awaiting-gc - Finished handling, awaiting GC. - :done-with-backoff - Finished handling, awaiting dedupe timeout. - nil - Already GC'd or invalid message id." - (let [script (enc/have (enc/slurp-resource "taoensso/carmine/lua/mq/msg-status.lua"))] - (fn [qname mid] - (car/parse-keyword - (car/lua script - {:qk-messages (qkey qname :messages) - :qk-locks (qkey qname :locks) - :qk-backoffs (qkey qname :backoffs) - :qk-done (qkey qname :done) - :qk-requeue (qkey qname :requeue)} - {:now (enc/now-udt) - :mid mid}))))) - -(let [script_ (delay (enc/have (enc/slurp-resource "taoensso/carmine/lua/mq/enqueue.lua")))] - (defn enqueue - "Pushes given message (any Clojure datatype) to named queue and returns unique - message id or {:carmine.mq/error }. Options: - * unique-message-id - Specify an explicit message id (e.g. message hash) to - perform a de-duplication check. If unspecified, a - unique id will be auto-generated. - * allow-requeue? - When true, allow buffered escrow-requeue for a - message in the :locked or :done-with-backoff state. - * initial-backoff-ms - Initial backoff in millis." - - ;; Note some gymnastics here for backwards-compatible API change: - ;; [a b & [c d]] -> [a b ?{:keys [c d]}] - {:arglists - '([qname message] - [qname message {:keys [unique-message-id allow-requeue? initial-backoff-ms]}])} - - [qname message & more] - (let [{:keys [unique-message-id allow-requeue? initial-backoff-ms]} - (when-let [[m1 m2] more] - (if (map? m1) - m1 ; Common case (new API) - {:unique-message-id m1 - :allow-requeue? m2}))] - - (car/parse - #(if (vector? %) (get % 0) {:carmine.mq/error (keyword %)}) - (car/lua @script_ - {:qk-messages (qkey qname :messages) - :qk-locks (qkey qname :locks) - :qk-backoffs (qkey qname :backoffs) - :qk-nattempts (qkey qname :nattempts) - :qk-mid-circle (qkey qname :mid-circle) - :qk-done (qkey qname :done) - :qk-requeue (qkey qname :requeue)} - {:now (enc/now-udt) - :mid (or unique-message-id (enc/uuid-str)) - :mcontent (car/freeze message) - :allow-requeue? (if allow-requeue? "true" "false") - :initial-backoff-ms (or initial-backoff-ms 0)}))))) - -(def dequeue - "IMPLEMENTATION DETAIL: Use `worker` instead. - Rotates queue's mid-circle and processes next mid. Returns: - nil - If msg GC'd, locked, or set to backoff. - \"eoq-backoff\" - If circle uninitialized or end-of-circle marker reached. - [ ] - If message should be (re)handled now." - (let [script (enc/have (enc/slurp-resource "taoensso/carmine/lua/mq/dequeue.lua"))] - (fn [qname & [{:keys [lock-ms eoq-backoff-ms] - :or {lock-ms (enc/ms :mins 60) eoq-backoff-ms exp-backoff}}]] - (let [;; Precomp 5 backoffs so that `dequeue` can init the backoff - ;; atomically. This is hackish, but a decent tradeoff. - eoq-backoff-ms-vec - (cond - (fn? eoq-backoff-ms) (mapv eoq-backoff-ms (range 5)) - (integer? eoq-backoff-ms) (mapv (constantly eoq-backoff-ms) (range 5)) - :else (throw (ex-info (str "Bad eoq-backoff-ms: " eoq-backoff-ms) - {:eoq-backoff-ms eoq-backoff-ms})))] - - (car/lua script - {:qk-messages (qkey qname :messages) - :qk-locks (qkey qname :locks) - :qk-backoffs (qkey qname :backoffs) - :qk-nattempts (qkey qname :nattempts) - :qk-mid-circle (qkey qname :mid-circle) - :qk-done (qkey qname :done) - :qk-requeue (qkey qname :requeue) - :qk-eoq-backoff (qkey qname :eoq-backoff?) - :qk-ndry-runs (qkey qname :ndry-runs)} - {:now (enc/now-udt) - :lock-ms lock-ms - :eoq-ms0 (nth eoq-backoff-ms-vec 0) - :eoq-ms1 (nth eoq-backoff-ms-vec 1) - :eoq-ms2 (nth eoq-backoff-ms-vec 2) - :eoq-ms3 (nth eoq-backoff-ms-vec 3) - :eoq-ms4 (nth eoq-backoff-ms-vec 4)}))))) + nil - Not in queue or already GC'd + :queued - Awaiting handler + :queued-with-backoff - Awaiting handler, but skip until backoff expired + :locked - Currently with handler + :locked-with-requeue - Currently with handler, will requeue when done + :done-awaiting-gc - Finished handling, awaiting GC + :done-with-backoff - Finished handling, awaiting GC, + but skip until dedupe backoff expired + :done-with-requeue - Will requeue, but skip until dedupe backoff expired" + [qname mid] + (car/parse ->message-status + (car/lua @lua-msg-status_ + {:qk-messages (qkey qname :messages) + :qk-messages-rq (qkey qname :messages-rq) + :qk-locks (qkey qname :locks) + :qk-backoffs (qkey qname :backoffs) + :qk-done (qkey qname :done) + :qk-requeue (qkey qname :requeue)} + {:now (enc/now-udt) + :mid mid}))) + +(defn enqueue + "Pushes given message (any Clojure data type) to named queue and returns + - {:keys [mid action]} on success ; action e/o #{:added :updated} + - {:keys [error]} on error ; error e/o #{:already-queued :locked :backoff} + + Options: + :init-backoff-ms - Optional initial backoff in msecs. + :lock-ms - Optional lock time in msecs. When unspecified, the + worker's default lock time will be used. + + :mid - Optional unique message id (e.g. message hash) to + identify a specific message for dedupe/update/requeue. + When unspecified, a random uuid will be used. + + :can-update? - When true, will update message content and/or lock-ms for + an mid still awaiting handling. + :can-requeue? - When true, will mark message with `:locked` or + `:done-with-backoff` status so that it will be + automatically requeued after garbage collection." + + {:arglists + '([qname message] + [qname message {:keys [init-backoff-ms lock-ms + mid can-update? can-requeue?]}])} + + [qname message & more] + (let [opts ; Back compatibility: [a b & [c d]] -> [a b ?{:keys [c d]}] + (when-let [[x1 x2] more] + (if (map? x1) ; Common case (new API) + x1 + {:mid x1 + :can-requeue? x2})) + + parse-fn + (fn [mid reply] + (enc/cond + :let [[action error] (when (vector? reply) reply)] + error {:error (keyword error)} + action {:action (keyword action), :mid mid} + :else {:error :unknown})) + + {:keys [init-backoff-ms lock-ms + mid can-update? can-requeue?]} + opts + + ;;; Back compatibility + mid (or mid (get opts :unique-message-id)) + init-backoff-ms (or init-backoff-ms (get opts :initial-backoff-ms)) + can-requeue? (or can-requeue? (get opts :allow-requeue?)) + + mid (or mid (enc/uuid-str))] + + (car/parse + (partial parse-fn mid) + (car/lua @lua-enqueue_ + {:qk-messages (qkey qname :messages) + :qk-messages-rq (qkey qname :messages-rq) + :qk-lock-times (qkey qname :lock-times) + :qk-lock-times-rq (qkey qname :lock-times-rq) + :qk-udts (qkey qname :udts) + :qk-locks (qkey qname :locks) + :qk-backoffs (qkey qname :backoffs) + :qk-nattempts (qkey qname :nattempts) + :qk-done (qkey qname :done) + :qk-requeue (qkey qname :requeue) + :qk-mid-circle (qkey qname :mid-circle)} + + {:now (enc/now-udt) + :mid mid + :mcnt (car/freeze message) + :foof "1" + :can-upd? (if can-update? "1" "0") + :can-rq? (if can-requeue? "1" "0") + :init-bo (or init-backoff-ms 0) + :lock-ms (or lock-ms -1)})))) + +(defn- dequeue + "Rotates queue's mid-circle and processes next mid. + Returns: + - [\"skip\" ] ; Worker thread should skip + - [\"sleep\" ] ; Worker thread should sleep + - [\"handle\" ] ; Worker thread should handle" + [qname + {:keys [default-lock-ms eoq-backoff-ms] + :or {default-lock-ms (enc/ms :mins 60) + eoq-backoff-ms exp-backoff}}] + + (let [;; Precompute 5 backoffs so that `dequeue.lua` can init the backoff atomically + [bo1 bo2 bo3 bo4 bo5] + (cond + (fn? eoq-backoff-ms) (mapv eoq-backoff-ms (range 5)) + (integer? eoq-backoff-ms) (repeat 5 eoq-backoff-ms) + :else + (throw + (ex-info + (str "[Carmine/mq] Unexpected `eoq-backoff-ms` arg: " eoq-backoff-ms) + {:arg {:value eoq-backoff-ms :type (type eoq-backoff-ms)}})))] + + (car/lua @lua-dequeue_ + {:qk-messages (qkey qname :messages) + :qk-messages-rq (qkey qname :messages-rq) + :qk-lock-times (qkey qname :lock-times) + :qk-lock-times-rq (qkey qname :lock-times-rq) + :qk-udts (qkey qname :udts) + :qk-locks (qkey qname :locks) + :qk-backoffs (qkey qname :backoffs) + :qk-nattempts (qkey qname :nattempts) + :qk-done (qkey qname :done) + :qk-requeue (qkey qname :requeue) + :qk-mid-circle (qkey qname :mid-circle) + :qk-eoq-backoff (qkey qname :eoq-backoff?) + :qk-ndry-runs (qkey qname :ndry-runs)} + + {:now (enc/now-udt) + :default-lock-ms default-lock-ms + :eoq-bo1 bo1 + :eoq-bo2 bo2 + :eoq-bo3 bo3 + :eoq-bo4 bo4 + :eoq-bo5 bo5}))) (comment (clear-queues {} :q1) (queue-status {} :q1) (wcar {} (enqueue :q1 :msg1 :mid1)) (wcar {} (message-status :q1 :mid1)) - (wcar {} (dequeue :q1)) + (wcar {} (dequeue :q1 {})) ;;(mapv exp-backoff (range 5)) (wcar {} (car/pttl (qkey :q1 :eoq-backoff?)))) -(defn handle1 "Implementation detail!" - [conn-opts qname handler [mid mcontent attempt :as poll-reply]] - (when (and poll-reply (not= poll-reply "eoq-backoff")) - (let [qk (partial qkey qname) - done - (fn [status mid & [backoff-ms]] - ;; TODO Switch to Lua script - (car/atomic conn-opts 100 - (car/watch (qk :requeue)) - (let [requeue? - (car/with-replies (->> (car/sismember (qk :requeue) mid) - (car/parse-bool))) - status (if (and (= status :success) requeue?) - :requeue status)] - (car/multi) - (when backoff-ms ; Retry or dedupe backoff, depending on type - (car/hset (qk :backoffs) mid (+ (enc/now-udt) backoff-ms))) - - (car/hdel (qk :locks) mid) - (case status - (:success :error) (car/sadd (qk :done) mid) - :requeue (do (car/srem (qk :requeue) mid) - (car/hdel (qk :nattempts) mid)) - nil)))) - - error - (fn [mid poll-reply ?throwable] - (done :error mid) - (timbre/errorf - (ex-info ":error handler response" - {:qname qname - :mid mid - :attempt attempt - :mcontent mcontent} - ?throwable) - "Error handling %s queue message: %s" - qname mid)) +(defn- inc-stat! + ([stats_ k1 ] (when stats_ (swap! stats_ (fn [m] (enc/update-in m [k1] (fn [?n] (inc (long (or ?n 0))))))))) + ([stats_ k1 k2] (when stats_ (swap! stats_ (fn [m] (enc/update-in m [k1 k2] (fn [?n] (inc (long (or ?n 0)))))))))) + +(comment (inc-stat! (atom {}) :k1)) + +(defn- thread-desync-ms + "Returns ms ± 20%" + [ms] + (let [r (+ 0.8 (* 0.4 ^double (rand)))] + (int (* r (long ms))))) + +(comment (repeatedly 5 #(thread-desync-ms 500))) + +(defn- handle1 + [conn-opts qname handler poll-reply stats_] + (enc/cond + :let [[kind] (when (vector? poll-reply) poll-reply)] + + (= kind "skip") + (let [[_kind reason] poll-reply] + #_(inc-stat! stats_ (keyword "skip" reason)) ; Noisy + [:skipped reason]) + + (= kind "handle") + (let [[_kind mid mcontent attempt lock-ms udt] poll-reply + qk (partial qkey qname) + + age-ms + (when-let [udt (enc/as-?udt udt)] + (- (enc/now-udt) ^long udt)) + + result + (try + (handler + {:qname qname :mid mid + :message mcontent :attempt attempt + :lock-ms lock-ms :age-ms age-ms}) + (catch Throwable t + {:status :error :throwable t})) {:keys [status throwable backoff-ms]} - (let [result - (try (handler {:qname qname :mid mid - :message mcontent :attempt attempt}) - (catch Throwable t - {:status :error :throwable t}))] + (when (map? result) result) + + fin + (fn [mid status done? backoff-ms] + (let [done? (case status (:success :error) true false)] - (when (map? result) result))] + (do (inc-stat! stats_ (keyword "handler" (name status)))) + (when backoff-ms (inc-stat! stats_ :handler/backoff)) + + ;; Don't need atomicity here, simple pipeline sufficient + (wcar conn-opts + (when backoff-ms ; Possible done/retry backoff + (car/hset (qk :backoffs) mid + (+ (enc/now-udt) (long backoff-ms)))) + + (when done? (car/sadd (qk :done) mid)) + (do (car/hdel (qk :locks) mid)))))] (case status - :success (done status mid backoff-ms) - :retry (done status mid backoff-ms) - :error (error mid poll-reply throwable) + :success (fin mid :success true backoff-ms) + :retry (fin mid :retry false backoff-ms) + :error (do - (done :success mid) ; For backwards-comp with old API - (timbre/warn "Invalid handler status" - {:qname qname :status status :mid mid}))) + (fin mid :error true nil) + (timbre/error + (ex-info "[Carmine/mq] Handler returned `:error` status" + {:qname qname, :mid mid, :attempt attempt, :message mcontent} + throwable) + "[Carmine/mq] Handler returned `:error` status" + {:qname qname, :mid mid, :backoff-ms backoff-ms})) - status))) + (do + (fin mid :success true nil) ; For backwards-comp with old API + (timbre/warn "[Carmine/mq] Handler returned unexpected status" + {:qname qname, :mid mid, :attempt attempt, :message mcontent, + :handler-result {:value result :type (type result)} + :handler-status {:value status :type (type status)}}))) + [:handled status]) + + (= kind "sleep") + (let [[_kind reason ttl-ms] poll-reply + ttl-ms (thread-desync-ms (long ttl-ms))] + + (inc-stat! stats_ (keyword "poll" reason)) + (Thread/sleep (int ttl-ms)) + [:slept ttl-ms]) + + :else + (do + (inc-stat! stats_ :poll/unexpected) + (throw + (ex-info "[Carmine/mq] Unexpected poll reply" + {:reply {:value poll-reply :type (type poll-reply)}}))))) ;;;; Workers (defprotocol IWorker + "Implementation detail." (start [this]) (stop [this])) -(defrecord Worker [conn-opts qname running?_ thread-futures_ opts] +(deftype CarmineMessageQueueWorker + [qname opts conn-opts running?_ thread-futures_ stats_] java.io.Closeable (close [this] (stop this)) + Object + (toString [this] ; "CarmineMessageQueueWorker[nthreads=1, running]" + (str "CarmineMessageQueueWorker[" + "nthreads=" (count @thread-futures_) ", " + (if @running?_ "running" "shut down") "]")) + + clojure.lang.IDeref + (deref [this] + {:qname qname + :nthreads (count @thread-futures_) + :running? @running?_ + :conn-opts conn-opts + :opts opts + :stats @stats_ + :queue-status_ + (delay + (queue-status conn-opts qname + {:incl-legacy-data? false}))}) IWorker (stop [_] (when (compare-and-set! running?_ true false) - (timbre/infof "Initiating shutdown of queue worker: %s" qname) + (timbre/info "[Carmine/mq] Queue worker shutting down" {:qname qname}) (run! deref @thread-futures_) - (timbre/infof "Message queue worker stopped: %s" qname) + (timbre/info "[Carmine/mq] Queue worker has shut down" {:qname qname}) true)) (start [_] (when (compare-and-set! running?_ false true) - (timbre/infof "Message queue worker starting: %s" qname) + (timbre/info "[Carmine/mq] Queue worker starting" {:qname qname}) (let [{:keys [handler monitor nthreads throttle-ms]} opts qk (partial qkey qname) + throttle-ms + (when (and throttle-ms (> (long throttle-ms) 0)) + throttle-ms) + start-polling-loop! - (fn [] - (loop [nerrors 0] + (fn [^long thread-idx] + (when (> thread-idx 0) + (Thread/sleep (int (thread-desync-ms (or throttle-ms 100))))) + + (loop [nloops 0, nconsecutive-errors 0] (when @running?_ - (let [?error + (let [ex (try - (let [[poll-reply ndruns mid-circle-size :as -resp] + (let [[poll-reply ndry-runs mid-circle-size :as -resp] (wcar conn-opts (dequeue qname opts) (car/get (qk :ndry-runs)) (car/llen (qk :mid-circle)))] - (when-let [t (some #(when (instance? Throwable %) %) - -resp)] + (when-let [t (enc/rfirst #(instance? Throwable %) -resp)] (throw t)) (when monitor - (monitor {:mid-circle-size mid-circle-size - :ndry-runs (or ndruns 0) - :poll-reply poll-reply})) - - (if (= poll-reply "eoq-backoff") - (Thread/sleep - (max (wcar conn-opts (car/pttl (qk :eoq-backoff?))) - 10)) - (handle1 conn-opts qname handler poll-reply)) - - (when (and throttle-ms (> throttle-ms 0)) - (Thread/sleep throttle-ms))) - nil ; Successful worker loop + (monitor + {:mid-circle-size mid-circle-size + :ndry-runs (or ndry-runs 0) + :poll-reply poll-reply})) + + (handle1 conn-opts qname handler poll-reply stats_) + nil ; Successful loop + ) (catch Throwable t t))] - (if-not ?error - (recur 0) - (let [t ?error] - (timbre/errorf t "Worker error! Will backoff & retry.") - (Thread/sleep (exp-backoff (inc nerrors))) - (recur (inc nerrors))))))))] + (if ex + (let [nce (inc nconsecutive-errors) + backoff-ms (exp-backoff (min 12 nce) {:factor (or throttle-ms 200)})] + (timbre/error ex "[Carmine/mq] Worker error, will backoff & retry." + {:qname qname, :backoff-ms backoff-ms, :nconsecutive-errors nce}) + + (Thread/sleep (int backoff-ms)) + (recur (inc nloops) nce)) + + (do + (when throttle-ms (Thread/sleep (int throttle-ms))) + (recur (inc nloops) nconsecutive-errors)))))))] (reset! thread-futures_ - (doall (repeatedly nthreads (fn [] (future (start-polling-loop!))))))) + (enc/reduce-n + (fn [v idx] (conj v (future (start-polling-loop! idx)))) + [] nthreads))) true))) +(let [ns *ns*] + (defmethod print-method CarmineMessageQueueWorker + [x ^java.io.Writer w] (.write w (str "#" ns "." x)))) + +(defn worker? [x] (instance? CarmineMessageQueueWorker x)) + (defn monitor-fn "Returns a worker monitor fn that warns when queue's mid-circle exceeds the prescribed size. A backoff timeout can be provided to rate-limit this @@ -317,66 +585,76 @@ [qname max-circle-size warn-backoff-ms] (let [udt-last-warning_ (atom 0)] (fn [{:keys [mid-circle-size]}] - (when (> mid-circle-size max-circle-size) + (when (> (long mid-circle-size) (long max-circle-size)) (let [instant (enc/now-udt) - udt-last-warning @udt-last-warning_] - (when (> (- instant udt-last-warning) (or warn-backoff-ms 0)) + udt-last-warning (long @udt-last-warning_)] + (when (> (- instant udt-last-warning) (long (or warn-backoff-ms 0))) (when (compare-and-set! udt-last-warning_ udt-last-warning instant) - (timbre/warnf "Message queue size warning: %s (mid-circle-size: %s)" - qname mid-circle-size)))))))) + (timbre/warn "[Carmine/mq] Message queue monitor-fn size warning" + {:qname qname, :circle-size {:max max-circle-size, :current mid-circle-size}})))))))) (defn worker - "Returns a threaded worker to poll for and handle messages `enqueue`'d to - named queue. Options: - :handler - (fn [{:keys [qname mid message attempt]}]) that throws an ex - or returns {:status <#{:success :error :retry}> - :throwable - :backoff-ms ms (n<=5) will be used. - Sleep synchronized for all queue workers. - :nthreads - Number of synchronized worker threads to use. - :throttle-ms - Thread sleep period between each poll." - - [conn-opts qname & - [{:keys [handler monitor lock-ms eoq-backoff-ms nthreads - throttle-ms auto-start] :as opts - :or {handler (fn [args] (timbre/infof "%s" args) {:status :success}) + "Returns a stateful threaded CarmineMessageQueueWorker to handle messages + added to named queue with `enqueue`. + + Options: + :handler - (fn [{:keys [qname mid message attempt]}]) that throws + or returns {:status <#{:success :error :retry}> + :throwable + :backoff-ms msecs for n<=5. + Sleep synchronized for all queue workers. + :nthreads - Number of worker threads to use. + :throttle-ms - Thread sleep period between each poll." + + ([conn-opts qname] (worker conn-opts qname nil)) + ([conn-opts qname + {:keys [handler monitor lock-ms eoq-backoff-ms nthreads + throttle-ms auto-start] :as worker-opts + :or {handler (fn [m] (timbre/info m) {:status :success}) monitor (monitor-fn qname 1000 (enc/ms :hours 6)) - lock-ms (enc/ms :hours 1) + lock-ms (enc/ms :mins 60) nthreads 1 throttle-ms 200 eoq-backoff-ms exp-backoff - auto-start true}}]] + auto-start true}}] + + (let [worker-opts + (conj (or worker-opts {}) + {:handler handler + :monitor monitor + :default-lock-ms lock-ms + :eoq-backoff-ms eoq-backoff-ms + :nthreads nthreads + :throttle-ms throttle-ms}) - (let [w (->Worker conn-opts qname (atom false) (atom []) - {:handler handler - :monitor monitor - :lock-ms lock-ms - :eoq-backoff-ms eoq-backoff-ms - :nthreads nthreads - :throttle-ms throttle-ms}) + w + (CarmineMessageQueueWorker. + qname worker-opts conn-opts (atom false) (atom []) (atom {})) - ;; For backwards-compatibility with old API: - auto-start (if-let [e (find opts :auto-start?)] (val e) auto-start)] + ;; Back compatibility + auto-start (get worker-opts :auto-start? auto-start)] - (when auto-start - (if (integer? auto-start) - (future (Thread/sleep auto-start) (start w)) - (start w))) + (when auto-start + (if (integer? auto-start) ; Undocumented + (future (Thread/sleep (int auto-start)) (start w)) + (do (start w)))) - w)) + w))) ;;;; Deprecated (enc/deprecated - (defn make-dequeue-worker "DEPRECATED: Use `worker` instead." + (defn ^:deprecated make-dequeue-worker + "DEPRECATED: Use `worker` instead." [pool spec & {:keys [handler-fn handler-ttl-msecs backoff-msecs throttle-msecs auto-start?]}] (worker {:pool pool :spec spec} diff --git a/test/taoensso/carmine/tests/message_queue.clj b/test/taoensso/carmine/tests/message_queue.clj index 3003341c..705f07bb 100644 --- a/test/taoensso/carmine/tests/message_queue.clj +++ b/test/taoensso/carmine/tests/message_queue.clj @@ -1,6 +1,7 @@ (ns taoensso.carmine.tests.message-queue (:require [clojure.test :as test :refer [deftest testing is]] + [taoensso.encore :as enc] [taoensso.carmine :as car :refer [wcar]] [taoensso.carmine.message-queue :as mq])) @@ -8,6 +9,21 @@ (remove-ns 'taoensso.carmine.tests.message-queue) (test/run-tests 'taoensso.carmine.tests.message-queue)) +;;;; Utils, etc. + +(defn subvec? [v sub] + (enc/reduce-indexed + (fn [acc idx in] + (if (= in (get v idx ::nx)) + acc + (reduced false))) + true + sub)) + +(comment + [(subvec? [:a :b :c] [:a :b]) + (subvec? [:a :b] [:a :b :c])]) + ;;;; Config, etc. (def conn-opts {}) @@ -18,306 +34,293 @@ (defn tq-status [] (mq/queue-status conn-opts tq)) (defn test-fixture [f] (f) (clear-tq!)) -(test/use-fixtures :once test-fixture) +(test/use-fixtures :once test-fixture) ; Just for final teardown -(let [default-opts {:eoq-backoff-ms 100}] - (defn- dequeue* [qname & [opts]] - (mq/dequeue qname (conj default-opts opts)))) +(def ^:const default-lock-ms (enc/ms :mins 60)) +(def ^:const eoq-backoff-ms 100) + +(do + (def handle1 #'mq/handle1) + (def enqueue mq/enqueue) + (def msg-status mq/message-status) + + (let [default-opts {:eoq-backoff-ms eoq-backoff-ms}] + (defn- dequeue [qname & [opts]] + (#'mq/dequeue qname (conj default-opts opts))))) (defn sleep [n] - (let [n (case n :eoq 500 n)] + (let [n (int (case n :eoq (* 2.5 eoq-backoff-ms) n))] (Thread/sleep n) (str "slept " n "msecs"))) ;;;; -(deftest tests-01 - (testing "Basic enqueue & dequeue" - (clear-tq!) - [(is (= (wcar* (dequeue* tq)) "eoq-backoff")) - (is (:eoq-backoff? (tq-status))) - - (sleep :eoq) - (is (not (:eoq-backoff? (tq-status)))) - - (is (= (wcar* (mq/enqueue tq :msg1 {:unique-message-id :mid1})) "mid1")) - (is (= (wcar* (mq/enqueue tq :msg1 {:unique-message-id :mid1})) - {:carmine.mq/error :queued})) ; Dupe - - (is (= :queued (wcar* (mq/message-status tq :mid1)))) - (let [{:keys [messages mid-circle]} (tq-status)] - [(is (= messages {"mid1" :msg1})) - (is (= mid-circle ["mid1" "end-of-circle"]))]) - - (is (= (wcar* (dequeue* tq)) "eoq-backoff")) ; End of circle - (sleep :eoq) - (is (= (peek (:mid-circle (tq-status))) "mid1")) - (is (= (wcar* (dequeue* tq)) ["mid1" :msg1 1])) +(defn throw! [] (throw (Exception.))) +(defn handle-end-of-circle [] + (let [reply (wcar* (dequeue tq))] + (every? identity + [(is (= reply ["sleep" "end-of-circle" eoq-backoff-ms])) + (is (subvec? (handle1 conn-opts tq (fn hf [_] (throw!)) reply nil) + [:slept #_eoq-backoff-ms])) + (sleep :eoq)]))) - (is (= (wcar* (mq/message-status tq :mid1)) :locked)) - (is (= (wcar* (dequeue* tq)) "eoq-backoff")) ; End of circle - (sleep :eoq) - (is (= (wcar* (dequeue* tq)) nil)) ; Locked mid1 - ])) +;;;; -(deftest tests-02 - (testing "Handling: success" +(deftest basics + (testing "Basic enqueue & dequeue" (clear-tq!) - [(is (= (wcar* (mq/enqueue tq :msg1 {:unique-message-id :mid1})) "mid1")) - ;; (is (= (wcar* (dequeue* tq)) "eoq-backoff)) - - ;; Handler will *not* run against eoq-backoff/nil reply: - (let [reply (wcar* (dequeue* tq))] - [(is (= reply "eoq-backoff")) - (is (= (mq/handle1 conn-opts tq (fn handler [reply]) reply) nil))]) - + [(is (= (wcar* (dequeue tq)) ["sleep" "end-of-circle" eoq-backoff-ms])) + (is (subvec? (wcar* (dequeue tq)) ["sleep" "eoq-backoff" #_msecs])) (sleep :eoq) - (let [reply (wcar* (dequeue* tq)) - p_ (promise)] + (is (= (wcar* (enqueue tq :msg1a {:mid :mid1})) {:action :added, :mid :mid1})) + (is (= (wcar* (enqueue tq :msg1b {:mid :mid1})) {:error :already-queued}) "Dupe mid") + (is (= (wcar* (enqueue tq :msg1b {:mid :mid1 :can-update? true})) {:action :updated, :mid :mid1})) - [(is (= reply ["mid1" :msg1 1])) - (is (= (mq/handle1 conn-opts tq - (fn handler [reply] (deliver p_ reply) {:status :success}) - reply) - :success)) - (is (= (deref p_ 0 ::timeout) - {:qname :carmine-test-queue :mid "mid1" - :message :msg1, :attempt 1}))]) + (is (= (wcar* (msg-status tq :mid1)) :queued)) + (is (enc/submap? (tq-status) + {:messages {"mid1" :msg1b} + :mid-circle ["mid1" "end-of-circle"]})) - (is (= (wcar* (mq/message-status tq :mid1)) :done-awaiting-gc )) - (is (= (wcar* (dequeue* tq)) "eoq-backoff")) + (is (= (wcar* (dequeue tq)) ["sleep" "end-of-circle" eoq-backoff-ms])) (sleep :eoq) - (is (= (wcar* (dequeue* tq)) nil)) ; Will gc - (is (= (wcar* (mq/message-status tq :mid1)) nil))])) + (is (subvec? (wcar* (dequeue tq)) ["handle" "mid1" :msg1b 1 default-lock-ms #_udt])) + (is (= (wcar* (msg-status tq :mid1)) :locked)) + (is (= (wcar* (dequeue tq)) ["sleep" "end-of-circle" eoq-backoff-ms]))])) -(deftest tests-03 - (testing "Handling: handler crash" +(deftest init-backoff + (testing "Enqueue with initial backoff" (clear-tq!) - [(is (= (wcar* (mq/enqueue tq :msg1 {:unique-message-id :mid1})) "mid1")) - (is (= (wcar* (dequeue* tq)) "eoq-backoff")) - (sleep :eoq) - - ;; Simulate bad handler - (is (= (wcar* (dequeue* tq {:lock-ms 1000})) ["mid1" :msg1 1])) + [(is (= (wcar* (dequeue tq)) ["sleep" "end-of-circle" eoq-backoff-ms])) + (is (= (wcar* (enqueue tq :msg1 {:mid :mid1 :init-backoff-ms 500})) {:action :added, :mid :mid1})) + (is (= (wcar* (enqueue tq :msg2 {:mid :mid2 :init-backoff-ms 100})) {:action :added, :mid :mid2})) - (is (= (wcar* (mq/message-status tq :mid1)) :locked)) - (is (= (wcar* (dequeue* tq)) "eoq-backoff")) + (is (enc/submap? (tq-status) + {:messages {"mid1" :msg1, "mid2" :msg2} + :mid-circle ["mid2" "mid1" "end-of-circle"]})) - (sleep 1500) ; Wait for lock to expire - (is (= (wcar* (dequeue* tq)) ["mid1" :msg1 2]))])) + ;; Dupes before the backoff expired + (is (= (wcar* (enqueue tq :msg1 {:mid :mid1})) {:error :already-queued})) + (is (= (wcar* (enqueue tq :msg2 {:mid :mid2})) {:error :already-queued})) -(deftest tests-04 - (testing "Handling: retry with backoff" - (clear-tq!) - [(is (= (wcar* (mq/enqueue tq :msg1 {:unique-message-id :mid1})) "mid1")) - (is (= (wcar* (dequeue* tq)) "eoq-backoff")) - (sleep :eoq) + ;; Both should be queued with backoff before the backoff expires + (is (= (wcar* (msg-status tq :mid1)) :queued-with-backoff)) + (is (= (wcar* (msg-status tq :mid2)) :queued-with-backoff)) - (let [reply (wcar* (dequeue* tq)) - p_ (promise)] + (sleep 150) ; > 2nd msg + (is (= (wcar* (msg-status tq :mid1)) :queued-with-backoff)) + (is (= (wcar* (msg-status tq :mid2)) :queued)) - [(is (= reply ["mid1" :msg1 1])) - (is (= (mq/handle1 conn-opts tq - (fn handler [reply] (deliver p_ reply) {:status :retry :backoff-ms 2000}) - reply) - :retry)) + (sleep 750) ; > 1st msg + (is (= (wcar* (msg-status tq :mid1)) :queued)) + (is (= (wcar* (msg-status tq :mid2)) :queued)) - (is (= (deref p_ 0 ::timeout) - {:qname :carmine-test-queue :mid "mid1" - :message :msg1, :attempt 1}))]) + ;; Dupes after backoff expired + (is (= (wcar* (enqueue tq :msg1 {:mid :mid1})) {:error :already-queued})) + (is (= (wcar* (enqueue tq :msg2 {:mid :mid2})) {:error :already-queued})) - (is (= (wcar* (mq/message-status tq :mid1)) :queued-with-backoff)) - (is (= (wcar* (dequeue* tq)) "eoq-backoff")) - (sleep :eoq) - (is (= (wcar* (dequeue* tq)) nil)) ; < handler backoff - (is (= (wcar* (dequeue* tq)) "eoq-backoff")) + (handle-end-of-circle) - (sleep 2500) ; > handler backoff - (is (= (wcar* (dequeue* tq)) ["mid1" :msg1 2]))])) + (is (subvec? (wcar* (dequeue tq)) ["handle" "mid1" :msg1 1 default-lock-ms #_udt])) + (is (= (wcar* (msg-status tq :mid1)) :locked)) -(deftest tests-05 - (testing "Handling: success with backoff (dedupe)" - (clear-tq!) - [(is (= (wcar* (mq/enqueue tq :msg1 {:unique-message-id :mid1})) "mid1")) - (is (= (wcar* (dequeue* tq)) "eoq-backoff")) - (sleep :eoq) + (is (subvec? (wcar* (dequeue tq)) ["handle" "mid2" :msg2 1 default-lock-ms #_udt])) + (is (= (wcar* (msg-status tq :mid2)) :locked))])) - (let [reply (wcar* (dequeue* tq)) - p_ (promise)] +(defn test-handler + "Returns [ ]" + ([ hf] (test-handler false hf)) + ([async? hf] + (let [poll-reply (wcar* (dequeue tq)) + handler-arg_ (promise) + handle1 + (fn [] + (handle1 conn-opts tq + (fn [m] (deliver handler-arg_ m) (hf m)) poll-reply nil)) - [(is (= reply ["mid1" :msg1 1])) - (is (= (mq/handle1 conn-opts tq - (fn handler [reply] (deliver p_ reply) {:status :success :backoff-ms 2000}) - reply) - :success)) + handle1-result + (if async? + (future-call handle1) + (do (handle1)))] - (= (deref p_ 0 ::timeout) - {:qname :carmine-test-queue :mid "mid1" - :message :msg1, :attempt 1})]) + [poll-reply (deref handler-arg_ 5000 :timeout) handle1-result]))) - (is (= (wcar* (mq/message-status tq :mid1)) :done-with-backoff)) - (is (= (wcar* (dequeue* tq)) "eoq-backoff")) - (sleep :eoq) +(deftest handlers + [(testing "Handler => success" + (clear-tq!) + [(is (= (wcar* (enqueue tq :msg1 {:mid :mid1})) {:action :added, :mid :mid1})) + (handle-end-of-circle) + + (let [[pr ha hr] (test-handler (fn [_m] {:status :success}))] + [(is (subvec? pr ["handle" "mid1" :msg1 1 default-lock-ms #_udt])) + (is (enc/submap? ha + {:qname :carmine-test-queue, :mid "mid1", :message :msg1, + :attempt 1, :lock-ms default-lock-ms})) + (is (= hr [:handled :success]))]) + + (is (= (wcar* (msg-status tq :mid1)) :done-awaiting-gc)) + (handle-end-of-circle) + (is (= (wcar* (dequeue tq)) ["skip" "did-gc"])) + (is (= (wcar* (msg-status tq :mid1)) nil))]) + + (testing "Handler => throws" + (clear-tq!) + [(is (= (wcar* (enqueue tq :msg1 {:mid :mid1})) {:action :added, :mid :mid1})) + (handle-end-of-circle) - (is (= (wcar* (dequeue* tq)) nil)) ; Will gc - (is (= (wcar* (mq/message-status tq :mid1)) :done-with-backoff )) ; < handler backoff + (let [[pr ha hr] (test-handler (fn [_m] (throw!)))] + [(is (subvec? pr ["handle" "mid1" :msg1 1 default-lock-ms #_udt])) + (is (= hr [:handled :error]))]) - (is (= (wcar* (mq/enqueue tq :msg1 {:unique-message-id :mid1})) - {:carmine.mq/error :done-with-backoff})) ; Dupe + (is (= (wcar* (msg-status tq :mid1)) :done-awaiting-gc )) + (handle-end-of-circle) + (is (= (wcar* (dequeue tq)) ["skip" "did-gc"])) + (is (= (wcar* (msg-status tq :mid1)) nil))]) - (sleep 2500) ; > handler backoff + (testing "Handler => success with backoff (dedupe)" + (clear-tq!) + [(is (= (wcar* (enqueue tq :msg1 {:mid :mid1})) {:action :added, :mid :mid1})) + (handle-end-of-circle) - (is (= (wcar* (mq/enqueue tq :msg1 {:unique-message-id :mid1})) "mid1"))])) + (let [[pr ha hr] (test-handler (fn [_m] {:status :success :backoff-ms 2000}))] + [(is (subvec? pr ["handle" "mid1" :msg1 1 default-lock-ms #_udt])) + (is (= hr [:handled :success]))]) -(deftest tests-06 - (testing "Handling: enqueue while :locked" - (clear-tq!) - [(is (= (wcar* (mq/enqueue tq :msg1 {:unique-message-id :mid1})) "mid1")) - (is (= (wcar* (dequeue* tq)) "eoq-backoff")) - (sleep :eoq) + (is (= (wcar* (msg-status tq :mid1)) :done-with-backoff)) + (handle-end-of-circle) + (is (= (wcar* (dequeue tq)) ["skip" "done-with-backoff"])) - (let [reply (wcar* (dequeue* tq)) - p_ (promise)] + (sleep 2500) ; > handler backoff + (is (= (wcar* (msg-status tq :mid1)) :done-awaiting-gc)) + (handle-end-of-circle) - [(is (= reply ["mid1" :msg1 1])) - (future - (mq/handle1 conn-opts tq - (fn handler [_reply] (deliver p_ :handler-running) - (sleep 2000) {:status :success}) ; Hold lock - reply)) + (is (= (wcar* (dequeue tq)) ["skip" "did-gc"]))]) - (is (= (deref p_ 1000 ::timeout) :handler-running)) - (is (= (wcar* (mq/message-status tq :mid1)) :locked))]) + (testing "Handler => retry with backoff" + (clear-tq!) + [(is (= (wcar* (enqueue tq :msg1 {:mid :mid1})) {:action :added, :mid :mid1})) + (handle-end-of-circle) - (is (= (wcar* (mq/enqueue tq :msg1 {:unique-message-id :mid1})) {:carmine.mq/error :locked})) - (is (= (wcar* (mq/enqueue tq :msg1 {:unique-message-id :mid1 :allow-requeue? true})) "mid1")) - (is (= (wcar* (mq/enqueue tq :msg1-requeued {:unique-message-id :mid1 :allow-requeue? true})) - {:carmine.mq/error :locked-with-requeue})) + (let [[pr ha hr] (test-handler (fn [_m] {:status :retry :backoff-ms 2000}))] + [(is (subvec? pr ["handle" "mid1" :msg1 1 default-lock-ms #_udt])) + (is (= hr [:handled :retry]))]) - (sleep 2500) ; > handler lock + (is (= (wcar* (msg-status tq :mid1)) :queued-with-backoff)) + (handle-end-of-circle) + (is (= (wcar* (dequeue tq)) ["skip" "queued-with-backoff"])) - (is (= (wcar* (mq/message-status tq :mid1)) :queued)) ; Not :done-awaiting-gc - (is (= (wcar* (dequeue* tq)) "eoq-backoff")) - (sleep :eoq) - (is (= (wcar* (dequeue* tq)) ["mid1" :msg1 1]))])) + (sleep 2500) ; > handler backoff + (is (= (wcar* (msg-status tq :mid1)) :queued)) + (handle-end-of-circle) -(deftest tests-07 - (testing "Handling: enqueue while :done-with-backoff" - (clear-tq!) - [(is (= (wcar* (mq/enqueue tq :msg1 {:unique-message-id :mid1})) "mid1")) - (is (= (wcar* (dequeue* tq)) "eoq-backoff")) - (sleep :eoq) + (is (subvec? (wcar* (dequeue tq)) ["handle" "mid1" :msg1 2 default-lock-ms #_udt]))]) - (let [reply (wcar* (dequeue* tq))] + (testing "Handler => lock timeout" - [(is (= reply ["mid1" :msg1 1])) - (is (= (mq/handle1 conn-opts tq - (fn handler [_reply] {:status :success :backoff-ms 2000}) - reply) - :success)) - (is (= (wcar* (mq/message-status tq :mid1)) :done-with-backoff))]) + (testing "Default lock time" + (clear-tq!) + [(is (= (wcar* (enqueue tq :msg1 {:mid :mid1})) {:action :added, :mid :mid1})) + (handle-end-of-circle) - (is (= (wcar* (mq/enqueue tq :msg1 {:unique-message-id :mid1})) {:carmine.mq/error :done-with-backoff})) - (is (= (wcar* (mq/enqueue tq :msg1-requeued {:unique-message-id :mid1 :allow-requeue? true})) "mid1")) + ;; Simulate bad handler + (is (subvec? (wcar* (dequeue tq {:default-lock-ms 1000})) ["handle" "mid1" :msg1 1 1000 #_udt])) - (sleep 2500) ; > handler backoff - (is (= (wcar* (mq/message-status tq :mid1)) :queued)) ; Not :done-awaiting-gc + (is (= (wcar* (msg-status tq :mid1)) :locked)) + (handle-end-of-circle) - (is (= (wcar* (dequeue* tq)) "eoq-backoff")) - (sleep :eoq) - (is (= (wcar* (dequeue* tq)) ["mid1" :msg1 1]))])) + (sleep 1500) ; Wait for lock to expire + (is (subvec? (wcar* (dequeue tq {:default-lock-ms 1000})) ["handle" "mid1" :msg1 2 1000 #_udt]))]) -(deftest tests-08 - (testing "Enqueue/dequeue with initial backoff" - (clear-tq!) - [(is (= (wcar* (dequeue* tq)) "eoq-backoff")) - (is (= (wcar* (mq/enqueue tq :msg1 {:unique-message-id :mid1 :initial-backoff-ms 500})) "mid1")) - (is (= (wcar* (mq/enqueue tq :msg2 {:unique-message-id :mid2 :initial-backoff-ms 100})) "mid2")) + (testing "Custom lock time" + (clear-tq!) + [(is (= (wcar* (enqueue tq :msg1 {:mid :mid1 :lock-ms 2000})) {:action :added, :mid :mid1})) + (handle-end-of-circle) - (let [{:keys [messages mid-circle]} (tq-status)] - [(is (= messages {"mid1" :msg1 "mid2" :msg2})) - (is (= mid-circle ["mid2" "mid1" "end-of-circle"]))]) + ;; Simulate bad handler + (is (subvec? (wcar* (dequeue tq {:default-lock-ms 500})) ["handle" "mid1" :msg1 1 2000 #_udt])) - ;; Dupes before the backoff expired - (is (= (wcar* (mq/enqueue tq :msg1 {:unique-message-id :mid1})) {:carmine.mq/error :queued-with-backoff})) - (is (= (wcar* (mq/enqueue tq :msg2 {:unique-message-id :mid2})) {:carmine.mq/error :queued-with-backoff})) + (is (= (wcar* (msg-status tq :mid1)) :locked)) + (handle-end-of-circle) - ;; Both should be queued with backoff before the backoff expires - (is (= (wcar* (mq/message-status tq :mid1)) :queued-with-backoff)) - (is (= (wcar* (mq/message-status tq :mid2)) :queued-with-backoff)) + (sleep 2500) ; Wait for lock to expire + (is (subvec? (wcar* (dequeue tq {:default-lock-ms 500})) ["handle" "mid1" :msg1 2 2000 #_udt]))]))]) - (sleep 150) ; > 2nd msg - (is (= (wcar* (mq/message-status tq :mid1)) :queued-with-backoff)) - (is (= (wcar* (mq/message-status tq :mid2)) :queued)) +(deftest requeue + [(testing "Enqueue while :locked" + (clear-tq!) + [(is (= (wcar* (enqueue tq :msg1a {:mid :mid1})) {:action :added, :mid :mid1})) + (handle-end-of-circle) - (sleep 750) ; > 1st msg - (is (= (wcar* (mq/message-status tq :mid1)) :queued)) - (is (= (wcar* (mq/message-status tq :mid2)) :queued)) + (do (test-handler :async (fn [_m] (Thread/sleep 2000) {:status :success})) :async-handler-running) - ;; Dupes after backoff expired - (is (= (wcar* (mq/enqueue tq :msg1 {:unique-message-id :mid1})) {:carmine.mq/error :queued})) - (is (= (wcar* (mq/enqueue tq :msg2 {:unique-message-id :mid2})) {:carmine.mq/error :queued})) + (is (= (wcar* (msg-status tq :mid1)) :locked)) + (is (= (wcar* (enqueue tq :msg1b {:mid :mid1})) {:error :locked})) - (is (= (wcar* (dequeue* tq)) "eoq-backoff")) - (sleep :eoq) + (is (= (wcar* (enqueue tq :msg1c {:mid :mid1, :can-requeue? true})) {:action :added, :mid :mid1})) + (is (= (wcar* (enqueue tq :msg1d {:mid :mid1, :can-requeue? true})) {:error :already-queued})) + (is (= (wcar* (enqueue tq :msg1e {:mid :mid1, :can-requeue? true, + :can-update? true, :lock-ms 500})) {:action :updated, :mid :mid1})) - (is (= (wcar* (dequeue* tq)) ["mid1" :msg1 1])) - (is (= (wcar* (mq/message-status tq :mid1)) :locked)) + (is (= (wcar* (msg-status tq :mid1)) :locked-with-requeue)) + (sleep 2500) ; > handler lock + (is (= (wcar* (msg-status tq :mid1)) :done-with-requeue) "Not :done-awaiting-gc") + (handle-end-of-circle) - (is (= (wcar* (dequeue* tq)) ["mid2" :msg2 1])) + (is (= (wcar* (dequeue tq)) ["skip" "did-requeue"])) + (handle-end-of-circle) - (is (= (wcar* (mq/message-status tq :mid2)) :locked)) - (is (= (wcar* (dequeue* tq)) "eoq-backoff")) - (sleep :eoq) - (is (= (wcar* (dequeue* tq)) nil))])) + (is (subvec? (wcar* (dequeue tq)) ["handle" "mid1" :msg1e 1 500 #_udt]))]) -(deftest test-message-queue-with-initial-backoff - [(testing "Message status changes over time" + (testing "Enqueue while :done-with-backoff" (clear-tq!) - [(is (= (wcar* (dequeue* tq)) "eoq-backoff")) - (sleep :eoq) + [(is (= (wcar* (enqueue tq :msg1a {:mid :mid1})) {:action :added, :mid :mid1})) + (handle-end-of-circle) - (is (= (wcar* (mq/enqueue tq :msg1 {:unique-message-id :mid1 :initial-backoff-ms 500})) "mid1")) - (is (= (wcar* (mq/enqueue tq :msg2 {:unique-message-id :mid2 :initial-backoff-ms 100})) "mid2")) + (do (test-handler (fn [_m] {:status :success :backoff-ms 2000})) :ran-handler) - (is (= (wcar* (mq/message-status tq :mid1)) :queued-with-backoff)) - (is (= (wcar* (mq/message-status tq :mid2)) :queued-with-backoff)) + (is (= (wcar* (msg-status tq :mid1)) :done-with-backoff)) + (is (= (wcar* (enqueue tq :msg1b {:mid :mid1})) {:error :backoff})) + (is (= (wcar* (enqueue tq :msg1c {:mid :mid1, :can-requeue? true, + :lock-ms 500})) {:action :added, :mid :mid1})) + (is (= (wcar* (msg-status tq :mid1)) :done-with-requeue)) - (sleep 150) ; > 2nd msg - (is (= (wcar* (mq/message-status tq :mid1)) :queued-with-backoff)) - (is (= (wcar* (mq/message-status tq :mid2)) :queued)) + (handle-end-of-circle) + (sleep 2500) ; > handler backoff - (sleep 750) ; > 1st msg - (is (= (wcar* (mq/message-status tq :mid1)) :queued)) - (is (= (wcar* (mq/message-status tq :mid2)) :queued))]) + (is (= (wcar* (dequeue tq)) ["skip" "did-requeue"])) + (handle-end-of-circle) - (testing "Errors when we enqueue with same ids" - (clear-tq!) - [(is (= (wcar* (dequeue* tq)) "eoq-backoff")) - (sleep :eoq) + (is (subvec? (wcar* (dequeue tq)) ["handle" "mid1" :msg1c 1 500 #_udt]))])]) - (is (= (wcar* (mq/enqueue tq :msg1 {:unique-message-id :mid1 :initial-backoff-ms 500})) "mid1")) - (is (= (wcar* (mq/enqueue tq :msg2 {:unique-message-id :mid2 :initial-backoff-ms 100})) "mid2")) - (is (= (wcar* (mq/message-status tq :mid1)) :queued-with-backoff)) - (is (= (wcar* (mq/message-status tq :mid2)) :queued-with-backoff)) +(deftest workers + (testing "Basic worker functionality" + (clear-tq!) + (let [msgs_ (atom []) + handler-fn + (fn [{:keys [mid message] :as in}] + (swap! msgs_ conj message) + {:status :success}) - (is (= (wcar* (mq/enqueue tq :msg1 {:unique-message-id :mid1})) {:carmine.mq/error :queued-with-backoff})) - (is (= (wcar* (mq/enqueue tq :msg2 {:unique-message-id :mid2})) {:carmine.mq/error :queued-with-backoff} ))]) + queue-status (fn [] (mq/queue-status conn-opts tq {:incl-legacy-data? false}))] - (testing "Errors change over time" - (clear-tq!) - [(is (= (wcar* (dequeue* tq)) "eoq-backoff")) - (sleep :eoq) + (with-open [^java.io.Closeable worker + (mq/worker conn-opts tq + {:auto-start false, + :handler handler-fn + :throttle-ms 10 + :eoq-backoff-ms 10})] + + [(is (enc/submap? (wcar* (enqueue tq :msg1 {:mid :mid1})) {:action :added})) + (is (enc/submap? (wcar* (enqueue tq :msg2 {:mid :mid2})) {:action :added})) + + (is (= (:mid-circle (queue-status)) ["mid2" "mid1" "end-of-circle"])) - (is (= (wcar* (mq/enqueue tq :msg1 {:unique-message-id :mid1 :initial-backoff-ms 500})) "mid1")) - (is (= (wcar* (mq/enqueue tq :msg2 {:unique-message-id :mid2 :initial-backoff-ms 100})) "mid2")) - (is (= (wcar* (mq/message-status tq :mid1)) :queued-with-backoff)) - (is (= (wcar* (mq/message-status tq :mid2)) :queued-with-backoff)) + (is (mq/start worker)) + (is (:running? @worker)) - (sleep 150) ; > 2nd msg - (is (= (wcar* (mq/message-status tq :mid2)) :queued)) + (sleep 1000) + (is (= @msgs_ [:msg1 :msg2])) + (is (= (:mid-circle (queue-status)) ["end-of-circle"])) - (sleep 750) ; > 1st msg - (is (= (wcar* (mq/message-status tq :mid1)) :queued))])]) + (is (mq/stop worker))]))))