Skip to content

Commit

Permalink
[mq] [mod] [new] [#278] NB Carmine message queue v2
Browse files Browse the repository at this point in the history
Introduces a significant rewrite of major parts of Carmine's message queue.

The underlying architecture remains unchanged.
The API remains MOSTLY unchanged: a small number of users may be affected,
please see the changes below.

CHANGES

  - [BREAKING] `enqueue` return value has changed:
    It used to return <mid> or {:carmine.mq/error <message-status>}.
    It now always returns a map with possible keys: [mid action error].
    Please see docstring for more info.

  - Improved error messages and logging output.

NEW

  - Added admin utils: `queue-names`, `clear-all-queues`.
  - Added `enqueue` option: `:lock-ms` to support per-message lock times [#223].
  - Added `enqueue` option: `:can-update?` to support message updating.
  - Handler fn data now includes `:age-ms` to support integration with Tufte
    or other profiling tools.
  - `queue-status` util now includes a `:by-mid` {<mid> <message-status>} hash.
  - Worker object's string/pprint representation is now more useful.
  - Worker object can now be dereffed to get useful state and stats.
  - Improved docstrings and mq architecture documentation.
  - General improvements to implementation, debuggability, and tests.
  • Loading branch information
ptaoussanis committed Dec 12, 2022
1 parent 2baf355 commit 6220a0a
Show file tree
Hide file tree
Showing 6 changed files with 972 additions and 658 deletions.
4 changes: 2 additions & 2 deletions project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
*unchecked-math* false #_:warn-on-boxed}

:dependencies
[[com.taoensso/encore "3.41.0"]
[com.taoensso/timbre "6.0.2"]
[[com.taoensso/encore "3.43.0"]
[com.taoensso/timbre "6.0.4"]
[com.taoensso/nippy "3.2.0"]
[org.apache.commons/commons-pool2 "2.11.1"]
[commons-codec/commons-codec "1.15"]]
Expand Down
143 changes: 87 additions & 56 deletions src/taoensso/carmine/lua/mq/dequeue.lua
Original file line number Diff line number Diff line change
@@ -1,81 +1,112 @@
if redis.call('exists', _:qk-eoq-backoff) == 1 then
return 'eoq-backoff';
-- Return e/o {'sleep' ...}, {'skip' ...}, {'handle' ...}, {'unexpected' ...}

if (redis.call('exists', _:qk-eoq-backoff) == 1) then
return {'sleep', 'eoq-backoff', redis.call('pttl', _:qk-eoq-backoff)};
end

-- TODO Waiting for Lua brpoplpush support to get us long polling
local mid = redis.call('rpoplpush', _:qk-mid-circle, _:qk-mid-circle);
local now = tonumber(_:now);

if (not mid) or (mid == 'end-of-circle') then -- Uninit'd or eoq
if ((not mid) or (mid == 'end-of-circle')) then -- Uninit'd or eoq

-- Calculate eoq_backoff_ms
local ndry_runs = tonumber(redis.call('get', _:qk-ndry-runs) or 0);
local eoq_ms_tab = {_:eoq-ms0, _:eoq-ms1, _:eoq-ms2, _:eoq-ms3, _:eoq-ms4};
local ndry_runs = tonumber(redis.call('get', _:qk-ndry-runs)) or 0;
local eoq_ms_tab = {_:eoq-bo1, _:eoq-bo2, _:eoq-bo3, _:eoq-bo4, _:eoq-bo5};
local eoq_backoff_ms = tonumber(eoq_ms_tab[math.min(5, (ndry_runs + 1))]);

-- Set queue-wide polling backoff flag
redis.call('psetex', _:qk-eoq-backoff, eoq_backoff_ms, 'true');
redis.call('incr', _:qk-ndry-runs);

return 'eoq-backoff';
return {'sleep', 'end-of-circle', eoq_backoff_ms};
end

-- From msg_status.lua ---------------------------------------------------------
-- local mid = _:mid;
local now = tonumber(_:now);
local lock_exp = tonumber(redis.call('hget', _:qk-locks, mid)) or 0;
local backoff_exp = tonumber(redis.call('hget', _:qk-backoffs, mid)) or 0;
local state = nil;

if redis.call('hexists', _:qk-messages, mid) == 1 then
if redis.call('sismember', _:qk-done, mid) == 1 then
if (now < backoff_exp) then
if redis.call('sismember', _:qk-requeue, mid) == 1 then
state = 'done-with-requeue';
else
state = 'done-with-backoff';
end
else
state = 'done-awaiting-gc';
end
else
if (now < lock_exp) then
if redis.call('sismember', _:qk-requeue, mid) == 1 then
state = 'locked-with-requeue';
else
state = 'locked';
end
elseif (now < backoff_exp) then
state = 'queued-with-backoff';
else
state = 'queued';
end
end
end
local now = tonumber(_:now);

local status = nil; -- base status e/o nil, done, queued, locked
local is_bo = false; -- backoff flag for: done, queued
local is_rq = false; -- requeue flag for: done, locked
-- 8x cases: nil, done(bo/rq), queued(bo), locked(rq)
-- Describe with: {status, $bo, $rq} with $ prefixes e/o: _, +, -, *

if (redis.call('hexists', _:qk-messages, mid) == 1) then
local exp_lock = tonumber(redis.call('hget', _:qk-locks, mid)) or 0;
local exp_bo = tonumber(redis.call('hget', _:qk-backoffs, mid)) or 0;

is_bo = (now < exp_bo);
is_rq = (redis.call('sismember', _:qk-requeue, mid) == 1) or -- Deprecated
(redis.call('hexists', _:qk-messages-rq, mid) == 1);

-- return state;
if (redis.call('sismember', _:qk-done, mid) == 1) then status = 'done';
elseif (now < exp_lock) then status = 'locked';
else status = 'queued'; end
else
status = 'nx';
end
--------------------------------------------------------------------------------

if (state == 'locked') or
(state == 'locked-with-requeue') or
(state == 'queued-with-backoff') or
(state == 'done-with-backoff') then
return nil;
if (status == 'nx') then return {'skip', 'unexpected'};
elseif (status == 'locked') then return {'skip', 'locked'};
elseif ((status == 'queued') and is_bo) then return {'skip', 'queued-with-backoff'};
elseif ((status == 'done') and is_bo) then return {'skip', 'done-with-backoff'};
end

redis.call('set', _:qk-ndry-runs, 0); -- Doing useful work

if (state == 'done-awaiting-gc') then
redis.call('hdel', _:qk-messages, mid);
redis.call('hdel', _:qk-locks, mid);
redis.call('hdel', _:qk-backoffs, mid);
redis.call('hdel', _:qk-nattempts, mid);
redis.call('ltrim', _:qk-mid-circle, 1, -1);
redis.call('srem', _:qk-done, mid);
return nil;
if (status == 'done') then
if is_rq then
-- {done, -bo, +rq} -> requeue now
local mcontent =
redis.call('hget', _:qk-messages-rq, mid) or
redis.call('hget', _:qk-messages, mid); -- Deprecated (for qk-requeue)

local lock_ms =
tonumber(redis.call('hget', _:qk-lock-times-rq, mid)) or
tonumber(_:default-lock-ms);

redis.call('hset', _:qk-messages, mid, mcontent);
redis.call('hset', _:qk-udts, mid, now);

if lock_ms then
redis.call('hset', _:qk-lock-times, mid, lock_ms);
else
redis.call('hdel', _:qk-lock-times, mid);
end

redis.call('hdel', _:qk-messages-rq, mid);
redis.call('hdel', _:qk-lock-times-rq, mid);
redis.call('hdel', _:qk-nattempts, mid);
redis.call('srem', _:qk-done, mid);
redis.call('srem', _:qk-requeue, mid);
return {'skip', 'did-requeue'};
else
-- {done, -bo, -rq} -> full GC now
redis.call('hdel', _:qk-messages, mid);
redis.call('hdel', _:qk-messages-rq, mid);
redis.call('hdel', _:qk-lock-times, mid);
redis.call('hdel', _:qk-lock-times-rq, mid);
redis.call('hdel', _:qk-udts, mid);
redis.call('hdel', _:qk-locks, mid);
redis.call('hdel', _:qk-backoffs, mid);
redis.call('hdel', _:qk-nattempts, mid);
redis.call('srem', _:qk-done, mid);
redis.call('srem', _:qk-requeue, mid);
redis.call('ltrim', _:qk-mid-circle, 1, -1);
return {'skip', 'did-gc'};
end
elseif (status == 'queued') then
-- {queued, -bo, _rq} -> handle now
local lock_ms =
tonumber(redis.call('hget', _:qk-lock-times, mid)) or
tonumber(_:default-lock-ms);

redis.call('hset', _:qk-locks, mid, now + lock_ms); -- Acquire
local mcontent = redis.call('hget', _:qk-messages, mid);
local udt = redis.call('hget', _:qk-udts, mid);
local nattempts = redis.call('hincrby', _:qk-nattempts, mid, 1);

return {'handle', mid, mcontent, nattempts, lock_ms, tonumber(udt)};
end

redis.call('hset', _:qk-locks, mid, now + tonumber(_:lock-ms)); -- Acquire
local mcontent = redis.call('hget', _:qk-messages, mid);
local nattempts = redis.call('hincrby', _:qk-nattempts, mid, 1);
return {mid, mcontent, nattempts};
return {'unexpected'};
152 changes: 99 additions & 53 deletions src/taoensso/carmine/lua/mq/enqueue.lua
Original file line number Diff line number Diff line change
@@ -1,70 +1,116 @@
-- From msg_status.lua ---------------------------------------------------------
local mid = _:mid;
local now = tonumber(_:now);
local lock_exp = tonumber(redis.call('hget', _:qk-locks, mid)) or 0;
local backoff_exp = tonumber(redis.call('hget', _:qk-backoffs, mid)) or 0;
local state = nil;

if redis.call('hexists', _:qk-messages, mid) == 1 then
if redis.call('sismember', _:qk-done, mid) == 1 then
if (now < backoff_exp) then
if redis.call('sismember', _:qk-requeue, mid) == 1 then
state = 'done-with-requeue';
else
state = 'done-with-backoff';
end
else
state = 'done-awaiting-gc';
end
local mid = _:mid;
local now = tonumber(_:now);

local status = nil; -- base status e/o nil, done, queued, locked
local is_bo = false; -- backoff flag for: done, queued
local is_rq = false; -- requeue flag for: done, locked
-- 8x cases: nil, done(bo/rq), queued(bo), locked(rq)
-- Describe with: {status, $bo, $rq} with $ prefixes e/o: _, +, -, *

if (redis.call('hexists', _:qk-messages, mid) == 1) then
local exp_lock = tonumber(redis.call('hget', _:qk-locks, mid)) or 0;
local exp_bo = tonumber(redis.call('hget', _:qk-backoffs, mid)) or 0;

is_bo = (now < exp_bo);
is_rq = (redis.call('sismember', _:qk-requeue, mid) == 1) or -- Deprecated
(redis.call('hexists', _:qk-messages-rq, mid) == 1);

if (redis.call('sismember', _:qk-done, mid) == 1) then status = 'done';
elseif (now < exp_lock) then status = 'locked';
else status = 'queued'; end
else
status = 'nx';
end
--------------------------------------------------------------------------------
-- Return {action, error}

local reset_in_queue = function()
redis.call('hset', _:qk-messages, _:mid, _:mcnt);
redis.call('hsetnx', _:qk-udts, _:mid, now);

local lock_ms = tonumber(_:lock-ms);
if (lock_ms ~= -1) then
redis.call('hset', _:qk-lock-times, _:mid, lock_ms);
else
if (now < lock_exp) then
if redis.call('sismember', _:qk-requeue, mid) == 1 then
state = 'locked-with-requeue';
else
state = 'locked';
end
elseif (now < backoff_exp) then
state = 'queued-with-backoff';
else
state = 'queued';
end
redis.call('hdel', _:qk-lock-times, _:mid);
end
end

-- return state;
--------------------------------------------------------------------------------
local reset_in_requeue = function()
redis.call('hset', _:qk-messages-rq, _:mid, _:mcnt);

if (state == 'done-awaiting-gc') or
((state == 'done-with-backoff') and (_:allow-requeue? == 'true'))
then
redis.call('hdel', _:qk-nattempts, _:mid);
redis.call('srem', _:qk-done, _:mid);
return {_:mid};
local lock_ms = tonumber(_:lock-ms);
if (lock_ms ~= -1) then
redis.call('hset', _:qk-lock-times-rq, _:mid, lock_ms);
else
redis.call('hdel', _:qk-lock-times-rq, _:mid);
end
end

if (state == 'locked') and (_:allow-requeue? == 'true') and
(redis.call('sismember', _:qk-requeue, _:mid) ~= 1)
then
redis.call('sadd', _:qk-requeue, _:mid);
return {_:mid};
local can_upd = (_:can-upd? == '1');
local can_rq = (_:can-rq? == '1');

local ensure_update_in_requeue = function()
if is_rq then
if can_upd then
reset_in_requeue();
return {'updated'};
else
return {false, 'already-queued'};
end
else
reset_in_requeue();
return {'added'};
end
end

if state == nil then
redis.call('hset', _:qk-messages, _:mid, _:mcontent);
if (status == 'nx') then
-- {nil, _bo, _rq} -> add to queue

-- lpushnx end-of-circle marker to ensure an initialized mid-circle
if redis.call('exists', _:qk-mid-circle) ~= 1 then
redis.call('lpush', _:qk-mid-circle, 'end-of-circle');
-- Set the initial backoff if requested
local init_bo = tonumber(_:init-bo);
if (init_bo ~= 0) then
redis.call('hset', _:qk-backoffs, _:mid, now + init_bo);
end

-- Set the initial backoff if requested
local initial_backoff_ms = tonumber(_:initial-backoff-ms);
if (initial_backoff_ms ~= 0) then
redis.call('hset', _:qk-backoffs, _:mid, now + initial_backoff_ms);
-- Ensure that mid-circle is initialized
if redis.call('exists', _:qk-mid-circle) ~= 1 then
redis.call('lpush', _:qk-mid-circle, 'end-of-circle');
end

redis.call('lpush', _:qk-mid-circle, _:mid);
return {_:mid};
else
return state; -- Reject
reset_in_queue();
return {'added'};

elseif (status == 'queued') then
if can_upd then
-- {queued, *bo, _rq} -> update in queue
reset_in_queue();
return {'updated'};
else
return {false, 'already-queued'};
end
elseif (status == 'locked') then
if can_rq then
-- {locked, _bo, *rq} -> ensure/update in requeue
return ensure_update_in_requeue();
else
return {false, 'locked'};
end
elseif (status == 'done') then
if is_bo then
if can_rq then
-- {done, +bo, *rq} -> ensure/update in requeue
return ensure_update_in_requeue();
else
return {false, 'backoff'};
end
else
-- {done, -bo, *rq} -> ensure/update in requeue
-- (We're appropriating the requeue mechanism here)
return ensure_update_in_requeue();
end
end

return {false, 'unexpected'};
Loading

0 comments on commit 6220a0a

Please sign in to comment.