Skip to content

Commit

Permalink
[mod] [BREAK] [#278] Merge work from message queue v2 branch
Browse files Browse the repository at this point in the history
This merge introduces a major rewrite of Carmine's message queue system
and includes **POTENTIAL BREAKING CHANGES**.

- If you do NOT use Carmine's message queue, you can safely ignore this.
- If you DO     use Carmine's message queue, please carefully read the
  details below.

1. `enqueue` return value has changed.

   This change is relevant to you iff you use the return value of
   `enqueue` calls (most users do not). Check your `enqueue` call sites
   to be sure.

   The fn previously returned `<mid>` (message id) on success, or
   `{:carmine.mq/error <message-status>}` on error.

   The fn now always returns a map with possible keys:
     [success? mid action error].

   See the updated `enqueue` docstring for details.

2. `queue-status` return value has changed.

   This change is relevant to you iff you use the `queue-status` util.

   The fn previously returned a detailed map of all queue content in
   O(queue-size).

   The fn now returns a small {:keys [nwaiting nlocked nbackoff ntotal]}
   map in O(1).

   If you want the detailed map of all queue content in O(queue-size),
   use the new `queue-content` util.

3. The definition of "queue-size" has changed.

   The old definition: total size of queue.
   The new definition: total size of queue, LESS mids that may be locked
                       or in backoff.

   I.e. the new definition now better represents the number of messages
   awaiting processing.

   Most users won't be affected by this change since the new definition
   better corresponds to how most users actually understood the term.

4. `clear-queues` has been deprecated.

   This utility is now called `queues-clear!!` to better match the
   rest of the API.

- Significantly improved latency (esp. worst-case latency) of handling new
  messages. Workers will now always prioritise handling of newly queued
  messages when available, and otherwise fall back to maintaining the mid
  circle.

- Decouple threads for handling and queue maintenance.
  Thread counts can now be individually customized.

- Worker end-of-queue backoff sleeps are now interrupted by new messages.
  Sleeping workers will awaken automatically when new messages arrive.

- Prioritize requeues (treat as "ready").
  Requeues no longer need to wait for a queue cycle to be reprocessed.

- Smart worker throttling.
  The `:throttle-ms` worker option can now be a function of the current
  queue size, enabling dynamic worker throttling.

  The default `:throttle-ms` value is now `:auto`, which uses such a
  function.

  See the updated `worker` docstring for details.

- Worker threads are now automatically desynchronized to reduce contention.

- Added `enqueue` option: `:lock-ms` to support per-message lock times [#223].
- Added `enqueue` option: `:can-update?` to support message updating.
- Handler fn data now includes `:worker`, `:queue-size`.
- Handler fn data now includes `:age-ms`.
  This enables easy integration with Tufte or other profiling tools.

- Added utils: `queue-size`, `queue-names`, `queues-clear!!`, `queues-clear-all!!!`.

- Worker object's string/pprint representation is now more useful.
- Worker object can now be dereffed to get useful state and stats.
- Workers can now be dereffed to get various diagnostic info.
  In particular, the new `:stats` key contains detailed statistics on
  queue size, queueing time, handling time, etc.

- Workers can now be invoked as fns to execute common actions.
  Actions include: `:start`, `:stop`, `:queue-size`, `:queue-status`.

- Various improvements to docstrings, error messages, and logging output
- Improved message queue state diagram
- General improvements to implementation, observability, and tests.
  • Loading branch information
ptaoussanis committed Jul 17, 2023
2 parents 37f0030 + 8df1002 commit 72da523
Show file tree
Hide file tree
Showing 9 changed files with 2,763 additions and 713 deletions.
Binary file removed message-states.jpg
Binary file not shown.
Binary file added mq-diagram.monopic
Binary file not shown.
1,436 changes: 1,436 additions & 0 deletions mq-diagram.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
[[com.taoensso/encore "3.62.1"]
[com.taoensso/timbre "6.2.1"]
[com.taoensso/nippy "3.2.0"]
[com.taoensso/tufte "2.5.0"]
[org.apache.commons/commons-pool2 "2.11.1"]
[commons-codec/commons-codec "1.16.0"]]

Expand Down
174 changes: 112 additions & 62 deletions src/taoensso/carmine/lua/mq/dequeue.lua
Original file line number Diff line number Diff line change
@@ -1,81 +1,131 @@
if redis.call('exists', _:qk-eoq-backoff) == 1 then
return 'eoq-backoff';
end
-- Return e/o {'sleep' ...}, {'skip' ...}, {'handle' ...}, {'unexpected' ...}

-- TODO Waiting for Lua brpoplpush support to get us long polling
local mid = redis.call('rpoplpush', _:qk-mid-circle, _:qk-mid-circle);
local now = tonumber(_:now);
-- Prioritize mids from ready list
local mid_src = nil;
local mid = nil;
mid = redis.call('rpoplpush', _:qk-mids-ready, _:qk-mid-circle);
if mid then
mid_src = 'ready';
else
mid = redis.call('rpoplpush', _:qk-mid-circle, _:qk-mid-circle);
if mid then
mid_src = 'circle';
end
end

if (not mid) or (mid == 'end-of-circle') then -- Uninit'd or eoq
if ((not mid) or (mid == 'end-of-circle')) then -- Uninit'd or eoq

-- Calculate eoq_backoff_ms
local ndry_runs = tonumber(redis.call('get', _:qk-ndry-runs) or 0);
local eoq_ms_tab = {_:eoq-ms0, _:eoq-ms1, _:eoq-ms2, _:eoq-ms3, _:eoq-ms4};
local ndry_runs = tonumber(redis.call('get', _:qk-ndry-runs)) or 0;
local eoq_ms_tab = {_:eoq-bo1, _:eoq-bo2, _:eoq-bo3, _:eoq-bo4, _:eoq-bo5};
local eoq_backoff_ms = tonumber(eoq_ms_tab[math.min(5, (ndry_runs + 1))]);
redis.call('incr', _:qk-ndry-runs);

-- Set queue-wide polling backoff flag
redis.call('psetex', _:qk-eoq-backoff, eoq_backoff_ms, 'true');
redis.call('incr', _:qk-ndry-runs);
local isleep_on = nil;
if (redis.call('llen', _:qk-isleep-b) > 0) then isleep_on = 'b'; else isleep_on = 'a'; end

return 'eoq-backoff';
return {'sleep', 'end-of-circle', isleep_on, eoq_backoff_ms};
end

-- From msg_status.lua ---------------------------------------------------------
-- local mid = _:mid;
local now = tonumber(_:now);
local lock_exp = tonumber(redis.call('hget', _:qk-locks, mid)) or 0;
local backoff_exp = tonumber(redis.call('hget', _:qk-backoffs, mid)) or 0;
local state = nil;

if redis.call('hexists', _:qk-messages, mid) == 1 then
if redis.call('sismember', _:qk-done, mid) == 1 then
if (now < backoff_exp) then
if redis.call('sismember', _:qk-requeue, mid) == 1 then
state = 'done-with-requeue';
else
state = 'done-with-backoff';
end
else
state = 'done-awaiting-gc';
end
else
if (now < lock_exp) then
if redis.call('sismember', _:qk-requeue, mid) == 1 then
state = 'locked-with-requeue';
else
state = 'locked';
end
elseif (now < backoff_exp) then
state = 'queued-with-backoff';
else
state = 'queued';
end
end
end
local now = tonumber(_:now);

local status = nil; -- base status e/o nil, done, queued, locked
local is_bo = false; -- backoff flag for: done, queued
local is_rq = false; -- requeue flag for: done, locked
-- 8x cases: nil, done(bo/rq), queued(bo), locked(rq)
-- Describe with: {status, $bo, $rq} with $ prefixes e/o: _, +, -, *

-- return state;
if (redis.call('hexists', _:qk-messages, mid) == 1) then
local exp_lock = tonumber(redis.call('hget', _:qk-locks, mid)) or 0;
local exp_bo = tonumber(redis.call('hget', _:qk-backoffs, mid)) or 0;

is_bo = (now < exp_bo);
is_rq = (redis.call('sismember', _:qk-requeue, mid) == 1) or -- Deprecated
(redis.call('hexists', _:qk-messages-rq, mid) == 1);

if (redis.call('sismember', _:qk-done, mid) == 1) then status = 'done';
elseif (now < exp_lock) then status = 'locked';
else status = 'queued'; end
else
status = 'nx';
end
--------------------------------------------------------------------------------

if (state == 'locked') or
(state == 'locked-with-requeue') or
(state == 'queued-with-backoff') or
(state == 'done-with-backoff') then
return nil;
if (status == 'nx') then
if (mid_src == 'circle') then
redis.call('ltrim', _:qk-mid-circle, 1, -1); -- Remove from circle
return {'skip', 'did-trim'};
else
return {'skip', 'unexpected'};
end
elseif (status == 'locked') then return {'skip', 'locked'};
elseif ((status == 'queued') and is_bo) then return {'skip', 'queued-with-backoff'};
elseif ((status == 'done') and is_bo) then return {'skip', 'done-with-backoff'};
end

redis.call('set', _:qk-ndry-runs, 0); -- Doing useful work

if (state == 'done-awaiting-gc') then
redis.call('hdel', _:qk-messages, mid);
redis.call('hdel', _:qk-locks, mid);
redis.call('hdel', _:qk-backoffs, mid);
redis.call('hdel', _:qk-nattempts, mid);
redis.call('ltrim', _:qk-mid-circle, 1, -1);
redis.call('srem', _:qk-done, mid);
return nil;
if (status == 'done') then
if is_rq then
-- {done, -bo, +rq} -> requeue now
local mcontent =
redis.call('hget', _:qk-messages-rq, mid) or
redis.call('hget', _:qk-messages, mid); -- Deprecated (for qk-requeue)

local lock_ms =
tonumber(redis.call('hget', _:qk-lock-times-rq, mid)) or
tonumber(_:default-lock-ms);

redis.call('hset', _:qk-messages, mid, mcontent);
redis.call('hset', _:qk-udts, mid, now);

if lock_ms then
redis.call('hset', _:qk-lock-times, mid, lock_ms);
else
redis.call('hdel', _:qk-lock-times, mid);
end

redis.call('hdel', _:qk-messages-rq, mid);
redis.call('hdel', _:qk-lock-times-rq, mid);
redis.call('hdel', _:qk-nattempts, mid);
redis.call('srem', _:qk-done, mid);
redis.call('srem', _:qk-requeue, mid);

redis.call('lpush', _:qk-mids-ready, mid); -- -> Priority queue (>once okay)

return {'skip', 'did-requeue'};
else
-- {done, -bo, -rq} -> full GC now
redis.call('hdel', _:qk-messages, mid);
redis.call('hdel', _:qk-messages-rq, mid);
redis.call('hdel', _:qk-lock-times, mid);
redis.call('hdel', _:qk-lock-times-rq, mid);
redis.call('hdel', _:qk-udts, mid);
redis.call('hdel', _:qk-locks, mid);
redis.call('hdel', _:qk-backoffs, mid);
redis.call('hdel', _:qk-nattempts, mid);
redis.call('srem', _:qk-done, mid);
redis.call('srem', _:qk-requeue, mid);

if (mid_src == 'circle') then
redis.call('ltrim', _:qk-mid-circle, 1, -1); -- Remove from circle
end

return {'skip', 'did-gc'};
end
elseif (status == 'queued') then
-- {queued, -bo, _rq} -> handle now
local lock_ms =
tonumber(redis.call('hget', _:qk-lock-times, mid)) or
tonumber(_:default-lock-ms);

redis.call('hset', _:qk-locks, mid, now + lock_ms); -- Acquire
local mcontent = redis.call('hget', _:qk-messages, mid);
local udt = redis.call('hget', _:qk-udts, mid);
local nattempts = redis.call('hincrby', _:qk-nattempts, mid, 1);

return {'handle', mid, mcontent, nattempts, lock_ms, tonumber(udt)};
end

redis.call('hset', _:qk-locks, mid, now + tonumber(_:lock-ms)); -- Acquire
local mcontent = redis.call('hget', _:qk-messages, mid);
local nattempts = redis.call('hincrby', _:qk-nattempts, mid, 1);
return {mid, mcontent, nattempts};
return {'unexpected'};
163 changes: 111 additions & 52 deletions src/taoensso/carmine/lua/mq/enqueue.lua
Original file line number Diff line number Diff line change
@@ -1,70 +1,129 @@
-- From msg_status.lua ---------------------------------------------------------
local mid = _:mid;
local now = tonumber(_:now);
local lock_exp = tonumber(redis.call('hget', _:qk-locks, mid)) or 0;
local backoff_exp = tonumber(redis.call('hget', _:qk-backoffs, mid)) or 0;
local state = nil;

if redis.call('hexists', _:qk-messages, mid) == 1 then
if redis.call('sismember', _:qk-done, mid) == 1 then
if (now < backoff_exp) then
if redis.call('sismember', _:qk-requeue, mid) == 1 then
state = 'done-with-requeue';
else
state = 'done-with-backoff';
end
else
state = 'done-awaiting-gc';
end
local mid = _:mid;
local now = tonumber(_:now);

local status = nil; -- base status e/o nil, done, queued, locked
local is_bo = false; -- backoff flag for: done, queued
local is_rq = false; -- requeue flag for: done, locked
-- 8x cases: nil, done(bo/rq), queued(bo), locked(rq)
-- Describe with: {status, $bo, $rq} with $ prefixes e/o: _, +, -, *

if (redis.call('hexists', _:qk-messages, mid) == 1) then
local exp_lock = tonumber(redis.call('hget', _:qk-locks, mid)) or 0;
local exp_bo = tonumber(redis.call('hget', _:qk-backoffs, mid)) or 0;

is_bo = (now < exp_bo);
is_rq = (redis.call('sismember', _:qk-requeue, mid) == 1) or -- Deprecated
(redis.call('hexists', _:qk-messages-rq, mid) == 1);

if (redis.call('sismember', _:qk-done, mid) == 1) then status = 'done';
elseif (now < exp_lock) then status = 'locked';
else status = 'queued'; end
else
status = 'nx';
end
--------------------------------------------------------------------------------
-- Return {action, error}

local interrupt_sleep = function ()
if redis.call('rpoplpush', _:qk-isleep-a, _:qk-isleep-b) then
elseif redis.call('rpoplpush', _:qk-isleep-b, _:qk-isleep-a) then
else redis.call('lpush', _:qk-isleep-a, '_'); end -- Init
end

local reset_in_queue = function()
redis.call('hset', _:qk-messages, _:mid, _:mcnt);
redis.call('hsetnx', _:qk-udts, _:mid, now);

local lock_ms = tonumber(_:lock-ms);
if (lock_ms ~= -1) then
redis.call('hset', _:qk-lock-times, _:mid, lock_ms);
else
if (now < lock_exp) then
if redis.call('sismember', _:qk-requeue, mid) == 1 then
state = 'locked-with-requeue';
else
state = 'locked';
end
elseif (now < backoff_exp) then
state = 'queued-with-backoff';
else
state = 'queued';
end
redis.call('hdel', _:qk-lock-times, _:mid);
end
end

-- return state;
--------------------------------------------------------------------------------
local reset_in_requeue = function()
redis.call('hset', _:qk-messages-rq, _:mid, _:mcnt);

if (state == 'done-awaiting-gc') or
((state == 'done-with-backoff') and (_:allow-requeue? == 'true'))
then
redis.call('hdel', _:qk-nattempts, _:mid);
redis.call('srem', _:qk-done, _:mid);
return {_:mid};
local lock_ms = tonumber(_:lock-ms);
if (lock_ms ~= -1) then
redis.call('hset', _:qk-lock-times-rq, _:mid, lock_ms);
else
redis.call('hdel', _:qk-lock-times-rq, _:mid);
end
end

if (state == 'locked') and (_:allow-requeue? == 'true') and
(redis.call('sismember', _:qk-requeue, _:mid) ~= 1)
then
redis.call('sadd', _:qk-requeue, _:mid);
return {_:mid};
local can_upd = (_:can-upd? == '1');
local can_rq = (_:can-rq? == '1');

local ensure_update_in_requeue = function()
if is_rq then
if can_upd then
reset_in_requeue();
return {'updated'};
else
return {false, 'already-queued'};
end
else
reset_in_requeue();
return {'added'};
end
end

if state == nil then
redis.call('hset', _:qk-messages, _:mid, _:mcontent);
if (status == 'nx') then
-- {nil, _bo, _rq} -> add to queue

-- lpushnx end-of-circle marker to ensure an initialized mid-circle
-- Ensure that mid-circle is initialized
if redis.call('exists', _:qk-mid-circle) ~= 1 then
redis.call('lpush', _:qk-mid-circle, 'end-of-circle');
redis.call('lpush', _:qk-mid-circle, 'end-of-circle');
end

-- Set the initial backoff if requested
local initial_backoff_ms = tonumber(_:initial-backoff-ms);
if (initial_backoff_ms ~= 0) then
redis.call('hset', _:qk-backoffs, _:mid, now + initial_backoff_ms);
local init_bo = tonumber(_:init-bo);
if (init_bo ~= 0) then
redis.call('hset', _:qk-backoffs, _:mid, now + init_bo);
redis.call('lpush', _:qk-mid-circle, _:mid); -- -> Maintenance queue
else
redis.call('lpush', _:qk-mids-ready, _:mid); -- -> Priority queue
end

redis.call('lpush', _:qk-mid-circle, _:mid);
return {_:mid};
else
return state; -- Reject
reset_in_queue();
interrupt_sleep();
return {'added'};

elseif (status == 'queued') then
if can_upd then
-- {queued, *bo, _rq} -> update in queue
reset_in_queue();
return {'updated'};
else
return {false, 'already-queued'};
end
elseif (status == 'locked') then
if can_rq then
-- {locked, _bo, *rq} -> ensure/update in requeue
return ensure_update_in_requeue();
else
return {false, 'locked'};
end
elseif (status == 'done') then
if is_bo then
if can_rq then
-- {done, +bo, *rq} -> ensure/update in requeue
return ensure_update_in_requeue();
else
return {false, 'backoff'};
end
else
-- {done, -bo, *rq} -> ensure/update in requeue
-- (We're appropriating the requeue mechanism here)

redis.call('lpush', _:qk-mids-ready, _:mid); -- -> Priority queue

interrupt_sleep();
return ensure_update_in_requeue();
end
end

return {false, 'unexpected'};
Loading

0 comments on commit 72da523

Please sign in to comment.