Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#244] MQ: Add initial-backoff option to enqueue #245

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
*assert* true}

:dependencies
[[com.taoensso/encore "3.1.0"]
[com.taoensso/timbre "5.0.0"]
[[com.taoensso/encore "3.8.0"]
[com.taoensso/timbre "5.1.0"]
[com.taoensso/nippy "3.0.0"]
[org.apache.commons/commons-pool2 "2.8.1"]
[org.apache.commons/commons-pool2 "2.9.0"]
[commons-codec/commons-codec "1.15"]]

:plugins
Expand All @@ -38,7 +38,7 @@
[[org.clojure/data.json "1.0.0"]
[com.taoensso/faraday "1.9.0"]
[clj-aws-s3 "0.3.10"]
[ring/ring-core "1.8.1"]]}]}
[ring/ring-core "1.8.2"]]}]}

:test-paths ["test" "src"]

Expand Down
6 changes: 6 additions & 0 deletions src/taoensso/carmine/lua/mq/enqueue.lua
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ if state == nil then
redis.call('lpush', _:qk-mid-circle, 'end-of-circle');
end

-- Set the initial backoff if requested
local initial_backoff_ms = tonumber(_:initial-backoff-ms);
if (initial_backoff_ms ~= 0) then
redis.call('hset', _:qk-backoffs, _:mid, now + initial_backoff_ms);
end

redis.call('lpush', _:qk-mid-circle, _:mid);
return {_:mid};
else
Expand Down
29 changes: 15 additions & 14 deletions src/taoensso/carmine/message_queue.clj
Original file line number Diff line number Diff line change
Expand Up @@ -106,24 +106,25 @@
perform a de-duplication check. If unspecified, a
unique id will be auto-generated.
* allow-requeue? - When true, allow buffered escrow-requeue for a
message in the :locked or :done-with-backoff state."
;; TODO Option to enqueue something with an init backoff?
message in the :locked or :done-with-backoff state.
* initial-backoff-ms - Initial backoff in millis."
(let [script (enc/have (enc/slurp-resource "taoensso/carmine/lua/mq/enqueue.lua"))]
(fn [qname message & [unique-message-id allow-requeue?]]
(fn [qname message & [{:keys [unique-message-id allow-requeue? initial-backoff-ms]}]]
(car/parse
#(if (vector? %) (get % 0) {:carmine.mq/error (keyword %)})
(car/lua script
{:qk-messages (qkey qname :messages)
:qk-locks (qkey qname :locks)
:qk-backoffs (qkey qname :backoffs)
:qk-nattempts (qkey qname :nattempts)
:qk-mid-circle (qkey qname :mid-circle)
:qk-done (qkey qname :done)
:qk-requeue (qkey qname :requeue)}
{:now (enc/now-udt)
:mid (or unique-message-id (enc/uuid-str))
:mcontent (car/freeze message)
:allow-requeue? (if allow-requeue? "true" "false")})))))
{:qk-messages (qkey qname :messages)
:qk-locks (qkey qname :locks)
:qk-backoffs (qkey qname :backoffs)
:qk-nattempts (qkey qname :nattempts)
:qk-mid-circle (qkey qname :mid-circle)
:qk-done (qkey qname :done)
:qk-requeue (qkey qname :requeue)}
{:now (enc/now-udt)
:mid (or unique-message-id (enc/uuid-str))
:mcontent (car/freeze message)
:allow-requeue? (if allow-requeue? "true" "false")
:initial-backoff-ms (or initial-backoff-ms 0)})))))

(def dequeue
"IMPLEMENTATION DETAIL: Use `worker` instead.
Expand Down
102 changes: 86 additions & 16 deletions test/taoensso/carmine/tests/message_queue.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
(ns taoensso.carmine.tests.message-queue
(:require
[clojure.test :as test :refer [is deftest]]
[clojure.test :as test :refer [is deftest testing]]
[taoensso.carmine :as car :refer [wcar]]
[taoensso.carmine.message-queue :as mq]))

Expand All @@ -25,7 +25,7 @@
(deftest tests-1 ; Basic enqueuing & dequeuing
(is (do (println (str "Running message queue tests")) true))
(is (= "eoq-backoff" (do (clear-tq) (wcar* (dequeue* tq)))))
(is (= "mid1" (wcar* (mq/enqueue tq :msg1 :mid1))))
(is (= "mid1" (wcar* (mq/enqueue tq :msg1 {:unique-message-id :mid1}))))
(is
(let [status (tq-status)
{:keys [messages mid-circle]} status]
Expand All @@ -35,7 +35,7 @@
(is (= :queued (wcar* (mq/message-status tq :mid1))))

;; Dupe
(is (= {:carmine.mq/error :queued} (wcar* (mq/enqueue tq :msg1 :mid1))))
(is (= {:carmine.mq/error :queued} (wcar* (mq/enqueue tq :msg1 {:unique-message-id :mid1}))))

(is (= "eoq-backoff" (wcar* (dequeue* tq))))
(is (= ["mid1" :msg1 1] (wcar* (dequeue* tq)))) ; New msg
Expand All @@ -45,7 +45,7 @@
)

(deftest tests-2 ; Handling: success
(is (= "mid1" (do (clear-tq) (wcar* (mq/enqueue tq :msg1 :mid1)))))
(is (= "mid1" (do (clear-tq) (wcar* (mq/enqueue tq :msg1 {:unique-message-id :mid1})))))
;; (is (= "eoq-backoff" (wcar* (dequeue* tq))))

;; Handler will *not* run against eoq-backoff/nil reply:
Expand All @@ -62,7 +62,7 @@
(is (= nil (wcar* (mq/message-status tq :mid1)))))

(deftest tests-3 ; Handling: handler crash
(is (= "mid1" (do (clear-tq) (wcar* (mq/enqueue tq :msg1 :mid1)))))
(is (= "mid1" (do (clear-tq) (wcar* (mq/enqueue tq :msg1 {:unique-message-id :mid1})))))
(is (= "eoq-backoff" (wcar* (dequeue* tq))))

;; Simulates bad handler
Expand All @@ -74,7 +74,7 @@
(wcar* (dequeue* tq))))))

(deftest tests-4 ; Handling: retry with backoff
(is (= "mid1" (do (clear-tq) (wcar* (mq/enqueue tq :msg1 :mid1)))))
(is (= "mid1" (do (clear-tq) (wcar* (mq/enqueue tq :msg1 {:unique-message-id :mid1})))))
(is (= "eoq-backoff" (wcar* (dequeue* tq))))
(is (= {:qname :carmine-test-queue :mid "mid1" :message :msg1, :attempt 1}
(let [p (promise)]
Expand All @@ -90,7 +90,7 @@
(wcar* (dequeue* tq))))))

(deftest tests-5 ; Handling: success with backoff (dedupe)
(is (= "mid1" (do (clear-tq) (wcar* (mq/enqueue tq :msg1 :mid1)))))
(is (= "mid1" (do (clear-tq) (wcar* (mq/enqueue tq :msg1 {:unique-message-id :mid1})))))
(is (= "eoq-backoff" (wcar* (dequeue* tq))))
(is (= {:qname :carmine-test-queue :mid "mid1" :message :msg1, :attempt 1}
(let [p (promise)]
Expand All @@ -103,12 +103,12 @@
(is (= nil (wcar* (dequeue* tq)))) ; Will gc
(is (= :done-with-backoff (wcar* (mq/message-status tq :mid1)))) ; Backoff (< 3s)
(is (= {:carmine.mq/error :done-with-backoff}
(wcar* (mq/enqueue tq :msg1 :mid1)))) ; Dupe
(wcar* (mq/enqueue tq :msg1 {:unique-message-id :mid1})))) ; Dupe
(is (= "mid1" (do (Thread/sleep 3000) ; Wait for backoff to expire
(wcar* (mq/enqueue tq :msg1 :mid1))))))
(wcar* (mq/enqueue tq :msg1 {:unique-message-id :mid1}))))))

(deftest test-6 ; Handling: enqueue while :locked
(is (= "mid1" (do (clear-tq) (wcar* (mq/enqueue tq :msg1 :mid1)))))
(is (= "mid1" (do (clear-tq) (wcar* (mq/enqueue tq :msg1 {:unique-message-id :mid1})))))
(is (= "eoq-backoff" (wcar* (dequeue* tq))))
(is (= :locked
(do (future
Expand All @@ -118,29 +118,99 @@
(wcar* (dequeue* tq))))
(Thread/sleep 50)
(wcar* (mq/message-status tq :mid1)))))
(is (= {:carmine.mq/error :locked} (wcar* (mq/enqueue tq :msg1 :mid1))))
(is (= "mid1" (wcar* (mq/enqueue tq :msg1 :mid1 :allow-requeue))))
(is (= {:carmine.mq/error :locked} (wcar* (mq/enqueue tq :msg1 {:unique-message-id :mid1}))))
(is (= "mid1" (wcar* (mq/enqueue tq :msg1 {:unique-message-id :mid1 :allow-requeue? true}))))
(is (= {:carmine.mq/error :locked-with-requeue}
(wcar* (mq/enqueue tq :msg1-requeued :mid1 :allow-requeue))))
(wcar* (mq/enqueue tq :msg1-requeued {:unique-message-id :mid1 :allow-requeue? true}))))
(is (= :queued ; cmp :done-awaiting-gc
(do (Thread/sleep 3500) ; Wait for handler to complete (extra time for future!)
(wcar* (mq/message-status tq :mid1)))))
(is (= "eoq-backoff" (wcar* (dequeue* tq))))
(is (= ["mid1" :msg1 1] (wcar* (dequeue* tq)))))

(deftest test-7 ; Handling: enqueue while :done-with-backoff
(is (= "mid1" (do (clear-tq) (wcar* (mq/enqueue tq :msg1 :mid1)))))
(is (= "mid1" (do (clear-tq) (wcar* (mq/enqueue tq :msg1 {:unique-message-id :mid1})))))
(is (= "eoq-backoff" (wcar* (dequeue* tq))))
(is (= :done-with-backoff
(do (mq/handle1 conn-opts tq
(fn [_] {:status :success :backoff-ms 3000})
(wcar* (dequeue* tq)))
(Thread/sleep 20)
(wcar* (mq/message-status tq :mid1)))))
(is (= {:carmine.mq/error :done-with-backoff} (wcar* (mq/enqueue tq :msg1 :mid1))))
(is (= "mid1" (wcar* (mq/enqueue tq :msg1-requeued :mid1 :allow-requeue))))
(is (= {:carmine.mq/error :done-with-backoff} (wcar* (mq/enqueue tq :msg1 {:unique-message-id :mid1}))))
(is (= "mid1" (wcar* (mq/enqueue tq :msg1-requeued {:unique-message-id :mid1 :allow-requeue? true}))))
(is (= :queued ; cmp :done-awaiting-gc
(do (Thread/sleep 3000) ; Wait for backoff to expire
(wcar* (mq/message-status tq :mid1)))))
(is (= "eoq-backoff" (wcar* (dequeue* tq))))
(is (= ["mid1" :msg1 1] (wcar* (dequeue* tq)))))

(deftest test-8 ; Enqueue/dequeue with initial backoff
(is (= "eoq-backoff" (do (clear-tq) (wcar* (dequeue* tq)))))
(is (= "mid1" (wcar* (mq/enqueue tq :msg1 {:unique-message-id :mid1 :initial-backoff-ms 500}))))
(is (= "mid2" (wcar* (mq/enqueue tq :msg2 {:unique-message-id :mid2 :initial-backoff-ms 100}))))
(is
(let [status (tq-status)
{:keys [messages mid-circle]} status]
(and
(= messages {"mid1" :msg1 "mid2" :msg2})
(= mid-circle ["mid2" "mid1" "end-of-circle"]))))
;; Dupes before the backoff expired
(is (= {:carmine.mq/error :queued-with-backoff} (wcar* (mq/enqueue tq :msg1 {:unique-message-id :mid1}))))
(is (= {:carmine.mq/error :queued-with-backoff} (wcar* (mq/enqueue tq :msg2 {:unique-message-id :mid2}))))
;; Both should be queued with backoff before the backoff expires
(is (= :queued-with-backoff (wcar* (mq/message-status tq :mid1))))
(is (= :queued-with-backoff (wcar* (mq/message-status tq :mid2))))
;; Move time past second message
(is (do (Thread/sleep 150) true))
(is (= :queued-with-backoff (wcar* (mq/message-status tq :mid1))))
(is (= :queued (wcar* (mq/message-status tq :mid2))))
;; Move time past first message
(is (do (Thread/sleep 750) true))
(is (= :queued (wcar* (mq/message-status tq :mid1))))
(is (= :queued (wcar* (mq/message-status tq :mid2))))
;; Dupes after the backoff expired
(is (= {:carmine.mq/error :queued} (wcar* (mq/enqueue tq :msg1 {:unique-message-id :mid1}))))
(is (= {:carmine.mq/error :queued} (wcar* (mq/enqueue tq :msg2 {:unique-message-id :mid2}))))
;; TODO Is the order of retrieval actually predictable?
st3fan marked this conversation as resolved.
Show resolved Hide resolved
(is (= "eoq-backoff" (wcar* (dequeue* tq))))
(is (= ["mid1" :msg1 1] (wcar* (dequeue* tq))))
(is (= :locked (wcar* (mq/message-status tq :mid1))))
(is (= ["mid2" :msg2 1] (wcar* (dequeue* tq))))
(is (= :locked (wcar* (mq/message-status tq :mid2))))
(is (= "eoq-backoff" (wcar* (dequeue* tq))))
(is (= nil (wcar* (dequeue* tq))))
)

(deftest test-message-queue-with-initial-backoff
(testing "Message status changes over time"
;; Setup
(is (= "eoq-backoff" (do (clear-tq) (wcar* (dequeue* tq)))))
(is (= "mid1" (wcar* (mq/enqueue tq :msg1 {:unique-message-id :mid1 :initial-backoff-ms 500}))))
(is (= "mid2" (wcar* (mq/enqueue tq :msg2 {:unique-message-id :mid2 :initial-backoff-ms 100}))))
(is (= :queued-with-backoff (wcar* (mq/message-status tq :mid1))))
(is (= :queued-with-backoff (wcar* (mq/message-status tq :mid2))))
(is (do (Thread/sleep 150) true)) ;; Move time past second message
(is (= :queued-with-backoff (wcar* (mq/message-status tq :mid1))))
(is (= :queued (wcar* (mq/message-status tq :mid2))))
(is (do (Thread/sleep 750) true)) ;; Move time past first message
(is (= :queued (wcar* (mq/message-status tq :mid1))))
(is (= :queued (wcar* (mq/message-status tq :mid2)))))
(testing "Errors when we enqueue with same ids"
(is (= "eoq-backoff" (do (clear-tq) (wcar* (dequeue* tq)))))
(is (= "mid1" (wcar* (mq/enqueue tq :msg1 {:unique-message-id :mid1 :initial-backoff-ms 500}))))
(is (= "mid2" (wcar* (mq/enqueue tq :msg2 {:unique-message-id :mid2 :initial-backoff-ms 100}))))
(is (= :queued-with-backoff (wcar* (mq/message-status tq :mid1))))
(is (= :queued-with-backoff (wcar* (mq/message-status tq :mid2))))
(is (= {:carmine.mq/error :queued-with-backoff} (wcar* (mq/enqueue tq :msg1 {:unique-message-id :mid1}))))
(is (= {:carmine.mq/error :queued-with-backoff} (wcar* (mq/enqueue tq :msg2 {:unique-message-id :mid2})))))
(testing "Errors change over time"
(is (= "eoq-backoff" (do (clear-tq) (wcar* (dequeue* tq)))))
(is (= "mid1" (wcar* (mq/enqueue tq :msg1 {:unique-message-id :mid1 :initial-backoff-ms 500}))))
(is (= "mid2" (wcar* (mq/enqueue tq :msg2 {:unique-message-id :mid2 :initial-backoff-ms 100}))))
(is (= :queued-with-backoff (wcar* (mq/message-status tq :mid1))))
(is (= :queued-with-backoff (wcar* (mq/message-status tq :mid2))))
(is (do (Thread/sleep 150) true)) ;; Move time past second message
(is (= :queued (wcar* (mq/message-status tq :mid2))))
(is (do (Thread/sleep 750) true)) ;; Move time past first message
(is (= :queued (wcar* (mq/message-status tq :mid1))))))