-
-
Notifications
You must be signed in to change notification settings - Fork 133
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[mod] [BREAK] [#278] Merge work from message queue v2 branch
This merge introduces a major rewrite of Carmine's message queue system and includes **POTENTIAL BREAKING CHANGES**. - If you do NOT use Carmine's message queue, you can safely ignore this. - If you DO use Carmine's message queue, please carefully read the details below. === POTENTIAL BREAKING CHANGES === 1. `enqueue` return value has changed This change is relevant to you iff you use the return value of `enqueue` calls (most users do not). Check your `enqueue` call sites to be sure. The fn previously returned `<mid>` (message id) on success, or `{:carmine.mq/error <message-status>}` on error. The fn now always returns a map with possible keys: [success? mid action error]. See the updated `enqueue` docstring for details. 2. `queue-status` return value has changed This change is relevant to you iff you use the `queue-status` util. The fn previously returned a detailed map of all queue content in O(queue-size). The fn now returns a small {:keys [nwaiting nlocked nbackoff ntotal]} map in O(1). If you want the detailed map of all queue content in O(queue-size), use the new `queue-content` util. 3. The definition of "queue-size" has changed The old definition: total size of queue. The new definition: total size of queue, LESS mids that may be locked or in backoff. I.e. the new definition now better represents the number of messages awaiting processing. Most users won't be affected by this change since the new definition better corresponds to how most users actually understood the term. 4. `clear-queues` has been deprecated This utility is now called `queues-clear!!` to better match the rest of the API. === Performance improvements === - Significantly improved latency (esp. worst-case latency) of handling new messages. Workers will now always prioritise handling of newly queued messages when available, and otherwise fall back to maintaining the mid circle. - Decouple threads for handling and queue maintenance. Thread counts can now be individually customized. - Worker end-of-queue backoff sleeps are now interrupted by new messages. Sleeping workers will awaken automatically when new messages arrive. - Prioritize requeues (treat as "ready"). Requeues no longer need to wait for a queue cycle to be reprocessed. - Smart worker throttling. The `:throttle-ms` worker option can now be a function of the current queue size, enabling dynamic worker throttling. The default `:throttle-ms` value is now `:auto`, which uses such a function. See the updated `worker` docstring for details. - Worker threads are now automatically desynchronized to reduce contention. === New stuff === - Added `enqueue` option: `:lock-ms` to support per-message lock times [#223]. - Added `enqueue` option: `:can-update?` to support message updating. - Handler fn data now includes `:worker`, `:queue-size`. - Handler fn data now includes `:age-ms`. This enables easy integration with Tufte or other profiling tools. - Added utils: `queue-size`, `queue-names`, `queues-clear!!`, `queues-clear-all!!!`. - Worker object's string/pprint representation is now more useful. - Worker object can now be dereffed to get useful state and stats. - Workers can now be dereffed to get various diagnostic info. In particular, the new `:stats` key contains detailed statistics on queue size, queueing time, handling time, etc. - Workers can now be invoked as fns to execute common actions. Actions include: `:start`, `:stop`, `:queue-size`, `:queue-status`. - Various improvements to docstrings, error messages, and logging output - Improved message queue state diagram - General improvements to implementation, observability, and tests.
- Loading branch information
Showing
9 changed files
with
2,763 additions
and
713 deletions.
There are no files selected for viewing
Binary file not shown.
Binary file not shown.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,81 +1,131 @@ | ||
if redis.call('exists', _:qk-eoq-backoff) == 1 then | ||
return 'eoq-backoff'; | ||
end | ||
-- Return e/o {'sleep' ...}, {'skip' ...}, {'handle' ...}, {'unexpected' ...} | ||
|
||
-- TODO Waiting for Lua brpoplpush support to get us long polling | ||
local mid = redis.call('rpoplpush', _:qk-mid-circle, _:qk-mid-circle); | ||
local now = tonumber(_:now); | ||
-- Prioritize mids from ready list | ||
local mid_src = nil; | ||
local mid = nil; | ||
mid = redis.call('rpoplpush', _:qk-mids-ready, _:qk-mid-circle); | ||
if mid then | ||
mid_src = 'ready'; | ||
else | ||
mid = redis.call('rpoplpush', _:qk-mid-circle, _:qk-mid-circle); | ||
if mid then | ||
mid_src = 'circle'; | ||
end | ||
end | ||
|
||
if (not mid) or (mid == 'end-of-circle') then -- Uninit'd or eoq | ||
if ((not mid) or (mid == 'end-of-circle')) then -- Uninit'd or eoq | ||
|
||
-- Calculate eoq_backoff_ms | ||
local ndry_runs = tonumber(redis.call('get', _:qk-ndry-runs) or 0); | ||
local eoq_ms_tab = {_:eoq-ms0, _:eoq-ms1, _:eoq-ms2, _:eoq-ms3, _:eoq-ms4}; | ||
local ndry_runs = tonumber(redis.call('get', _:qk-ndry-runs)) or 0; | ||
local eoq_ms_tab = {_:eoq-bo1, _:eoq-bo2, _:eoq-bo3, _:eoq-bo4, _:eoq-bo5}; | ||
local eoq_backoff_ms = tonumber(eoq_ms_tab[math.min(5, (ndry_runs + 1))]); | ||
redis.call('incr', _:qk-ndry-runs); | ||
|
||
-- Set queue-wide polling backoff flag | ||
redis.call('psetex', _:qk-eoq-backoff, eoq_backoff_ms, 'true'); | ||
redis.call('incr', _:qk-ndry-runs); | ||
local isleep_on = nil; | ||
if (redis.call('llen', _:qk-isleep-b) > 0) then isleep_on = 'b'; else isleep_on = 'a'; end | ||
|
||
return 'eoq-backoff'; | ||
return {'sleep', 'end-of-circle', isleep_on, eoq_backoff_ms}; | ||
end | ||
|
||
-- From msg_status.lua --------------------------------------------------------- | ||
-- local mid = _:mid; | ||
local now = tonumber(_:now); | ||
local lock_exp = tonumber(redis.call('hget', _:qk-locks, mid)) or 0; | ||
local backoff_exp = tonumber(redis.call('hget', _:qk-backoffs, mid)) or 0; | ||
local state = nil; | ||
|
||
if redis.call('hexists', _:qk-messages, mid) == 1 then | ||
if redis.call('sismember', _:qk-done, mid) == 1 then | ||
if (now < backoff_exp) then | ||
if redis.call('sismember', _:qk-requeue, mid) == 1 then | ||
state = 'done-with-requeue'; | ||
else | ||
state = 'done-with-backoff'; | ||
end | ||
else | ||
state = 'done-awaiting-gc'; | ||
end | ||
else | ||
if (now < lock_exp) then | ||
if redis.call('sismember', _:qk-requeue, mid) == 1 then | ||
state = 'locked-with-requeue'; | ||
else | ||
state = 'locked'; | ||
end | ||
elseif (now < backoff_exp) then | ||
state = 'queued-with-backoff'; | ||
else | ||
state = 'queued'; | ||
end | ||
end | ||
end | ||
local now = tonumber(_:now); | ||
|
||
local status = nil; -- base status e/o nil, done, queued, locked | ||
local is_bo = false; -- backoff flag for: done, queued | ||
local is_rq = false; -- requeue flag for: done, locked | ||
-- 8x cases: nil, done(bo/rq), queued(bo), locked(rq) | ||
-- Describe with: {status, $bo, $rq} with $ prefixes e/o: _, +, -, * | ||
|
||
-- return state; | ||
if (redis.call('hexists', _:qk-messages, mid) == 1) then | ||
local exp_lock = tonumber(redis.call('hget', _:qk-locks, mid)) or 0; | ||
local exp_bo = tonumber(redis.call('hget', _:qk-backoffs, mid)) or 0; | ||
|
||
is_bo = (now < exp_bo); | ||
is_rq = (redis.call('sismember', _:qk-requeue, mid) == 1) or -- Deprecated | ||
(redis.call('hexists', _:qk-messages-rq, mid) == 1); | ||
|
||
if (redis.call('sismember', _:qk-done, mid) == 1) then status = 'done'; | ||
elseif (now < exp_lock) then status = 'locked'; | ||
else status = 'queued'; end | ||
else | ||
status = 'nx'; | ||
end | ||
-------------------------------------------------------------------------------- | ||
|
||
if (state == 'locked') or | ||
(state == 'locked-with-requeue') or | ||
(state == 'queued-with-backoff') or | ||
(state == 'done-with-backoff') then | ||
return nil; | ||
if (status == 'nx') then | ||
if (mid_src == 'circle') then | ||
redis.call('ltrim', _:qk-mid-circle, 1, -1); -- Remove from circle | ||
return {'skip', 'did-trim'}; | ||
else | ||
return {'skip', 'unexpected'}; | ||
end | ||
elseif (status == 'locked') then return {'skip', 'locked'}; | ||
elseif ((status == 'queued') and is_bo) then return {'skip', 'queued-with-backoff'}; | ||
elseif ((status == 'done') and is_bo) then return {'skip', 'done-with-backoff'}; | ||
end | ||
|
||
redis.call('set', _:qk-ndry-runs, 0); -- Doing useful work | ||
|
||
if (state == 'done-awaiting-gc') then | ||
redis.call('hdel', _:qk-messages, mid); | ||
redis.call('hdel', _:qk-locks, mid); | ||
redis.call('hdel', _:qk-backoffs, mid); | ||
redis.call('hdel', _:qk-nattempts, mid); | ||
redis.call('ltrim', _:qk-mid-circle, 1, -1); | ||
redis.call('srem', _:qk-done, mid); | ||
return nil; | ||
if (status == 'done') then | ||
if is_rq then | ||
-- {done, -bo, +rq} -> requeue now | ||
local mcontent = | ||
redis.call('hget', _:qk-messages-rq, mid) or | ||
redis.call('hget', _:qk-messages, mid); -- Deprecated (for qk-requeue) | ||
|
||
local lock_ms = | ||
tonumber(redis.call('hget', _:qk-lock-times-rq, mid)) or | ||
tonumber(_:default-lock-ms); | ||
|
||
redis.call('hset', _:qk-messages, mid, mcontent); | ||
redis.call('hset', _:qk-udts, mid, now); | ||
|
||
if lock_ms then | ||
redis.call('hset', _:qk-lock-times, mid, lock_ms); | ||
else | ||
redis.call('hdel', _:qk-lock-times, mid); | ||
end | ||
|
||
redis.call('hdel', _:qk-messages-rq, mid); | ||
redis.call('hdel', _:qk-lock-times-rq, mid); | ||
redis.call('hdel', _:qk-nattempts, mid); | ||
redis.call('srem', _:qk-done, mid); | ||
redis.call('srem', _:qk-requeue, mid); | ||
|
||
redis.call('lpush', _:qk-mids-ready, mid); -- -> Priority queue (>once okay) | ||
|
||
return {'skip', 'did-requeue'}; | ||
else | ||
-- {done, -bo, -rq} -> full GC now | ||
redis.call('hdel', _:qk-messages, mid); | ||
redis.call('hdel', _:qk-messages-rq, mid); | ||
redis.call('hdel', _:qk-lock-times, mid); | ||
redis.call('hdel', _:qk-lock-times-rq, mid); | ||
redis.call('hdel', _:qk-udts, mid); | ||
redis.call('hdel', _:qk-locks, mid); | ||
redis.call('hdel', _:qk-backoffs, mid); | ||
redis.call('hdel', _:qk-nattempts, mid); | ||
redis.call('srem', _:qk-done, mid); | ||
redis.call('srem', _:qk-requeue, mid); | ||
|
||
if (mid_src == 'circle') then | ||
redis.call('ltrim', _:qk-mid-circle, 1, -1); -- Remove from circle | ||
end | ||
|
||
return {'skip', 'did-gc'}; | ||
end | ||
elseif (status == 'queued') then | ||
-- {queued, -bo, _rq} -> handle now | ||
local lock_ms = | ||
tonumber(redis.call('hget', _:qk-lock-times, mid)) or | ||
tonumber(_:default-lock-ms); | ||
|
||
redis.call('hset', _:qk-locks, mid, now + lock_ms); -- Acquire | ||
local mcontent = redis.call('hget', _:qk-messages, mid); | ||
local udt = redis.call('hget', _:qk-udts, mid); | ||
local nattempts = redis.call('hincrby', _:qk-nattempts, mid, 1); | ||
|
||
return {'handle', mid, mcontent, nattempts, lock_ms, tonumber(udt)}; | ||
end | ||
|
||
redis.call('hset', _:qk-locks, mid, now + tonumber(_:lock-ms)); -- Acquire | ||
local mcontent = redis.call('hget', _:qk-messages, mid); | ||
local nattempts = redis.call('hincrby', _:qk-nattempts, mid, 1); | ||
return {mid, mcontent, nattempts}; | ||
return {'unexpected'}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,70 +1,129 @@ | ||
-- From msg_status.lua --------------------------------------------------------- | ||
local mid = _:mid; | ||
local now = tonumber(_:now); | ||
local lock_exp = tonumber(redis.call('hget', _:qk-locks, mid)) or 0; | ||
local backoff_exp = tonumber(redis.call('hget', _:qk-backoffs, mid)) or 0; | ||
local state = nil; | ||
|
||
if redis.call('hexists', _:qk-messages, mid) == 1 then | ||
if redis.call('sismember', _:qk-done, mid) == 1 then | ||
if (now < backoff_exp) then | ||
if redis.call('sismember', _:qk-requeue, mid) == 1 then | ||
state = 'done-with-requeue'; | ||
else | ||
state = 'done-with-backoff'; | ||
end | ||
else | ||
state = 'done-awaiting-gc'; | ||
end | ||
local mid = _:mid; | ||
local now = tonumber(_:now); | ||
|
||
local status = nil; -- base status e/o nil, done, queued, locked | ||
local is_bo = false; -- backoff flag for: done, queued | ||
local is_rq = false; -- requeue flag for: done, locked | ||
-- 8x cases: nil, done(bo/rq), queued(bo), locked(rq) | ||
-- Describe with: {status, $bo, $rq} with $ prefixes e/o: _, +, -, * | ||
|
||
if (redis.call('hexists', _:qk-messages, mid) == 1) then | ||
local exp_lock = tonumber(redis.call('hget', _:qk-locks, mid)) or 0; | ||
local exp_bo = tonumber(redis.call('hget', _:qk-backoffs, mid)) or 0; | ||
|
||
is_bo = (now < exp_bo); | ||
is_rq = (redis.call('sismember', _:qk-requeue, mid) == 1) or -- Deprecated | ||
(redis.call('hexists', _:qk-messages-rq, mid) == 1); | ||
|
||
if (redis.call('sismember', _:qk-done, mid) == 1) then status = 'done'; | ||
elseif (now < exp_lock) then status = 'locked'; | ||
else status = 'queued'; end | ||
else | ||
status = 'nx'; | ||
end | ||
-------------------------------------------------------------------------------- | ||
-- Return {action, error} | ||
|
||
local interrupt_sleep = function () | ||
if redis.call('rpoplpush', _:qk-isleep-a, _:qk-isleep-b) then | ||
elseif redis.call('rpoplpush', _:qk-isleep-b, _:qk-isleep-a) then | ||
else redis.call('lpush', _:qk-isleep-a, '_'); end -- Init | ||
end | ||
|
||
local reset_in_queue = function() | ||
redis.call('hset', _:qk-messages, _:mid, _:mcnt); | ||
redis.call('hsetnx', _:qk-udts, _:mid, now); | ||
|
||
local lock_ms = tonumber(_:lock-ms); | ||
if (lock_ms ~= -1) then | ||
redis.call('hset', _:qk-lock-times, _:mid, lock_ms); | ||
else | ||
if (now < lock_exp) then | ||
if redis.call('sismember', _:qk-requeue, mid) == 1 then | ||
state = 'locked-with-requeue'; | ||
else | ||
state = 'locked'; | ||
end | ||
elseif (now < backoff_exp) then | ||
state = 'queued-with-backoff'; | ||
else | ||
state = 'queued'; | ||
end | ||
redis.call('hdel', _:qk-lock-times, _:mid); | ||
end | ||
end | ||
|
||
-- return state; | ||
-------------------------------------------------------------------------------- | ||
local reset_in_requeue = function() | ||
redis.call('hset', _:qk-messages-rq, _:mid, _:mcnt); | ||
|
||
if (state == 'done-awaiting-gc') or | ||
((state == 'done-with-backoff') and (_:allow-requeue? == 'true')) | ||
then | ||
redis.call('hdel', _:qk-nattempts, _:mid); | ||
redis.call('srem', _:qk-done, _:mid); | ||
return {_:mid}; | ||
local lock_ms = tonumber(_:lock-ms); | ||
if (lock_ms ~= -1) then | ||
redis.call('hset', _:qk-lock-times-rq, _:mid, lock_ms); | ||
else | ||
redis.call('hdel', _:qk-lock-times-rq, _:mid); | ||
end | ||
end | ||
|
||
if (state == 'locked') and (_:allow-requeue? == 'true') and | ||
(redis.call('sismember', _:qk-requeue, _:mid) ~= 1) | ||
then | ||
redis.call('sadd', _:qk-requeue, _:mid); | ||
return {_:mid}; | ||
local can_upd = (_:can-upd? == '1'); | ||
local can_rq = (_:can-rq? == '1'); | ||
|
||
local ensure_update_in_requeue = function() | ||
if is_rq then | ||
if can_upd then | ||
reset_in_requeue(); | ||
return {'updated'}; | ||
else | ||
return {false, 'already-queued'}; | ||
end | ||
else | ||
reset_in_requeue(); | ||
return {'added'}; | ||
end | ||
end | ||
|
||
if state == nil then | ||
redis.call('hset', _:qk-messages, _:mid, _:mcontent); | ||
if (status == 'nx') then | ||
-- {nil, _bo, _rq} -> add to queue | ||
|
||
-- lpushnx end-of-circle marker to ensure an initialized mid-circle | ||
-- Ensure that mid-circle is initialized | ||
if redis.call('exists', _:qk-mid-circle) ~= 1 then | ||
redis.call('lpush', _:qk-mid-circle, 'end-of-circle'); | ||
redis.call('lpush', _:qk-mid-circle, 'end-of-circle'); | ||
end | ||
|
||
-- Set the initial backoff if requested | ||
local initial_backoff_ms = tonumber(_:initial-backoff-ms); | ||
if (initial_backoff_ms ~= 0) then | ||
redis.call('hset', _:qk-backoffs, _:mid, now + initial_backoff_ms); | ||
local init_bo = tonumber(_:init-bo); | ||
if (init_bo ~= 0) then | ||
redis.call('hset', _:qk-backoffs, _:mid, now + init_bo); | ||
redis.call('lpush', _:qk-mid-circle, _:mid); -- -> Maintenance queue | ||
else | ||
redis.call('lpush', _:qk-mids-ready, _:mid); -- -> Priority queue | ||
end | ||
|
||
redis.call('lpush', _:qk-mid-circle, _:mid); | ||
return {_:mid}; | ||
else | ||
return state; -- Reject | ||
reset_in_queue(); | ||
interrupt_sleep(); | ||
return {'added'}; | ||
|
||
elseif (status == 'queued') then | ||
if can_upd then | ||
-- {queued, *bo, _rq} -> update in queue | ||
reset_in_queue(); | ||
return {'updated'}; | ||
else | ||
return {false, 'already-queued'}; | ||
end | ||
elseif (status == 'locked') then | ||
if can_rq then | ||
-- {locked, _bo, *rq} -> ensure/update in requeue | ||
return ensure_update_in_requeue(); | ||
else | ||
return {false, 'locked'}; | ||
end | ||
elseif (status == 'done') then | ||
if is_bo then | ||
if can_rq then | ||
-- {done, +bo, *rq} -> ensure/update in requeue | ||
return ensure_update_in_requeue(); | ||
else | ||
return {false, 'backoff'}; | ||
end | ||
else | ||
-- {done, -bo, *rq} -> ensure/update in requeue | ||
-- (We're appropriating the requeue mechanism here) | ||
|
||
redis.call('lpush', _:qk-mids-ready, _:mid); -- -> Priority queue | ||
|
||
interrupt_sleep(); | ||
return ensure_update_in_requeue(); | ||
end | ||
end | ||
|
||
return {false, 'unexpected'}; |
Oops, something went wrong.