diff --git a/src/taoensso/carmine/message_queue.clj b/src/taoensso/carmine/message_queue.clj index c4f9ad8b..d0b3d85d 100644 --- a/src/taoensso/carmine/message_queue.clj +++ b/src/taoensso/carmine/message_queue.clj @@ -298,15 +298,15 @@ (defn message-status "Returns current message status, e/o: - nil - Not in queue or already GC'd - :queued - Awaiting handler - :queued-with-backoff - Awaiting handler, but skip until backoff expired - :locked - Currently with handler - :locked-with-requeue - Currently with handler, will requeue when done - :done-awaiting-gc - Finished handling, awaiting GC - :done-with-backoff - Finished handling, awaiting GC, - but skip until dedupe backoff expired - :done-with-requeue - Will requeue, but skip until dedupe backoff expired" + `nil` - Not in queue or already GC'd + `:queued` - Awaiting handler + `:queued-with-backoff` - Awaiting handler, but skip until backoff expired + `:locked` - Currently with handler + `:locked-with-requeue` - Currently with handler, will requeue when done + `:done-awaiting-gc` - Finished handling, awaiting GC + `:done-with-backoff` - Finished handling, awaiting GC, + but skip until dedupe backoff expired + `:done-with-requeue` - Will requeue, but skip until dedupe backoff expired" [qname mid] (car/parse ->message-status (car/lua @lua-msg-status_ @@ -330,19 +330,20 @@ `error` e/o #{:already-queued :locked :backoff}. Options: - :init-backoff-ms - Optional initial backoff in msecs. - :lock-ms - Optional lock time in msecs. When unspecified, the - worker's default lock time will be used. + `:init-backoff-ms` - Optional initial backoff in msecs. + `:lock-ms` - Optional lock time in msecs. When unspecified, the + worker's default lock time will be used. - :mid - Optional unique message id (e.g. message hash) to - identify a specific message for dedupe/update/requeue. - When unspecified, a random uuid will be used. + `:mid` - Optional unique message id (e.g. message hash) to + identify a specific message for dedupe/update/requeue. + When unspecified, a random uuid will be used. - :can-update? - When true, will update message content and/or lock-ms for - an mid still awaiting handling. - :can-requeue? - When true, will mark message with `:locked` or - `:done-with-backoff` status so that it will be - automatically requeued after garbage collection." + `:can-update?` - When true, will update message content and/or lock-ms for + an mid still awaiting handling. + + `:can-requeue?` - When true, will mark message with `:locked` or + `:done-with-backoff` status so that it will be + automatically requeued after garbage collection." {:arglists '([qname message] @@ -497,7 +498,7 @@ :a [qk-a qk-b] :b [qk-b qk-a]))] - (try ; NB conn's read-timeout may be insufficient! + (try ; NB conn's read-timeout may be insufficient! (wcar conn-opts (car/brpoplpush qk-src qk-dst secs-dbl)) (catch Throwable _ nil)))) @@ -607,8 +608,8 @@ (defprotocol IWorker "Implementation detail." - (start [this]) - (stop [this])) + (^:no-doc start [this]) + (^:no-doc stop [this])) (deftype CarmineMessageQueueWorker [qname worker-opts conn-opts running?_ future-pool worker-futures_ @@ -772,7 +773,7 @@ (defmethod print-method CarmineMessageQueueWorker [x ^java.io.Writer w] (.write w (str "#" ns "." x)))) -(defn worker? [x] (instance? CarmineMessageQueueWorker x)) +(defn ^:no-doc worker? [x] (instance? CarmineMessageQueueWorker x)) (defn monitor-fn "Returns a worker monitor fn that warns when queue exceeds the prescribed @@ -802,54 +803,85 @@ (comment (default-throttle-ms-fn 0)) (defn worker - "Returns a stateful threaded CarmineMessageQueueWorker to handle messages + "Returns a stateful threaded `CarmineMessageQueueWorker` to handle messages added to named queue with `enqueue`. API: - - (deref ) => Status map, {:keys [running? nthreads stats ...]}. + - (deref ) => Detailed worker status map (see \"Debugging\" below). - ( :start) => Same as calling (start ). - ( :stop) => Same as calling (stop ). - ( :queue-size) => Same as calling `queue-size` for given qname. - ( :queue-status) => Same as calling `queue-status` for given qname. - ( :queue-content) => Same as calling `queue-content` for given qname. + Debugging: + + To debug a worker, deref it to see detailed status map with keys: + `:qname` - Worker's queue name (string) + `:opts` - Worker's options map + `:conn-opts` - Worker's connection options map + `:running?` - Is the worker currently running? (true/false) + `:nthreads` - {:keys [worker handler]} + `:stats` + `:queue-size` - {:keys [last min max mean mad var p50 p90 ...]} + `:queueing-time-ms` - {:keys [last min max mean mad var p50 p90 ...]} + `:handling-time-ns` - {:keys [last min max mean mad var p50 p90 ...]} + `:counts` + `:handler/success` - Number of handler calls with `:success` status + `:handler/error` - Number of handler calls with `:error` status + `:handler/retry` - Number of handler calls with `:retry` status + `:handler/backoff` - Number of handler calls encountering an mid in backoff + ... + + See also the `:monitor` option below, and utils: + `queue-names`, `queue-size`, `queue-content`, `message-status`, etc. + Options: - :handler - (fn [{:keys [qname mid message attempt]}]) that throws - or returns {:status <#{:success :error :retry}> - :throwable - :backoff-ms ?sleep-msecs, - or :auto (to use `default-throttle-ms-fn`). - - :eoq-backoff-ms - Max msecs to sleep thread each time end of queue is reached. - Can be a (fn [ndry-runs]) -> msecs for n<=5. - Sleep may be interrupted when new messages are enqueued. - If present, connection read timeout should be >= max msecs. - - :nthreads-worker - Number of threads to monitor and maintain queue. - :nthreads-handler - Number of threads to handle queue messages with handler fn." + + `:handler` + (fn [{:keys [qname mid message attempt]}]) called for each worker message. + Should throw or return a map with possible keys: + + `:monitor` + (fn [{:keys [queue-size ndry-runs poll-reply]}]) called on each worker + worker loop iteration. Useful for queue monitoring/logging. + See also `monitor-fn`. + + `:lock-ms` (default 60 minutes) + Default time (in milliseconds) that handler may keep a message before handler + is considered fatally stalled and message is re-queued. Must be large enough + to prevent double handling! + + Can be overridden on a per-message basis via `enqueue`. + + `:throttle-ms` (default `:auto`) + Thread sleep period (in milliseconds) between each poll. + Can be a (fn [queue-size]) -> ?sleep-msecs, + or `:auto` (to use `default-throttle-ms-fn`). + + `:eoq-backoff-ms` (default `exp-backoff` fn) + Max time (in milliseconds) to sleep thread each time end of queue is reached. + Can be a (fn [ndry-runs]) -> msecs for n<=5. + Sleep may be interrupted when new messages are enqueued. + If present, connection read timeout should be >= max msecs! + + `:nthreads-worker` - Number of threads to monitor and maintain queue. + `:nthreads-handler` - Number of threads to handle queue messages with handler fn." ([conn-opts qname] (worker conn-opts qname nil)) ([conn-opts qname {:keys [handler monitor lock-ms eoq-backoff-ms throttle-ms auto-start nthreads-worker nthreads-handler] :as worker-opts - :or {handler (fn [m] (timbre/info m) {:status :success}) - monitor (monitor-fn qname 1000 (enc/ms :hours 6)) - lock-ms (enc/ms :mins 60) - nthreads-worker 1 - nthreads-handler 1 - throttle-ms :auto #_200 - eoq-backoff-ms exp-backoff - auto-start true}}] + + :or + {handler (fn [m] (timbre/info m) {:status :success}) + monitor (monitor-fn qname 1000 (enc/ms :hours 6)) + lock-ms (enc/ms :mins 60) + nthreads-worker 1 + nthreads-handler 1 + throttle-ms :auto #_200 + eoq-backoff-ms exp-backoff + auto-start true}}] (let [nthreads (get worker-opts :nthreads 1) ; Back compatibility nthreads-worker (if (contains? worker-opts :nthreads-worker) nthreads-worker nthreads) @@ -895,10 +927,10 @@ ;;;; Deprecated (enc/deprecated - (enc/defalias clear-queues queues-clear!! {:deprecated "v3.3.0-RC1 (2023-07-18)"}) + (enc/defalias ^:no-doc clear-queues queues-clear!! {:deprecated "v3.3.0-RC1 (2023-07-18)"}) - (defn ^:deprecated make-dequeue-worker - "DEPRECATED: Use `worker` instead." + (defn ^:no-doc ^:deprecated make-dequeue-worker + "Prefer `worker`." [pool spec & {:keys [handler-fn handler-ttl-msecs backoff-msecs throttle-msecs auto-start?]}] (worker {:pool pool :spec spec} diff --git a/wiki/3-Message-queue.md b/wiki/3-Message-queue.md index 137bdec0..36e7e1f6 100644 --- a/wiki/3-Message-queue.md +++ b/wiki/3-Message-queue.md @@ -1,31 +1,57 @@ Carmine includes a simple **distributed message queue** originally inspired by a [post](http://oldblog.antirez.com/post/250) by Redis's original author Salvatore Sanfilippo. -> **Note**: [Carmine 3.3](../releases/tag/v3.3.0-RC1) is introducing major improvements to Carmine's message queue. I plan to update and expand this documentation before the final 3.3 release. - [Peter](https://www.taoensso.com) +# API + +See linked docstrings below for features and usage: + +| Name | Description | +| :------------------------------------------------------------------------------------------------------------------------------ | ------------------------------------------------------------------------ | +| [`worker`](https://cljdoc.org/d/com.taoensso/carmine/CURRENT/api/taoensso.carmine.message-queue#worker) | Returns a worker for named queue. Deref worker for detailed status info! | +| [`enqueue`](https://cljdoc.org/d/com.taoensso/carmine/CURRENT/api/taoensso.carmine.message-queue#enqueue) | Enqueues given message for processing by active worker/s. | +| [`set-min-log-level!`](https://cljdoc.org/d/com.taoensso/carmine/CURRENT/api/taoensso.carmine.message-queue#set-min-log-level!) | Sets minimum log level for message queue logs. | + +# Example -# Example usage ```clojure -(:require [taoensso.carmine.message-queue :as car-mq]) ; Add to `ns` macro +(def my-conn-opts {:pool {} :spec {}}) (def my-worker - (car-mq/worker {:pool {} :spec {}} "my-queue" - {:handler (fn [{:keys [message attempt]}] - (println "Received" message) - {:status :success})})) + (car-mq/worker my-conn-opts "my-queue" + {:handler + (fn [{:keys [message attempt]}] + (try + (println "Received" message) + {:status :success} + (catch Throwable _ + (println "Handler error!") + {:status :retry})))})) (wcar* (car-mq/enqueue "my-queue" "my message!")) -%> Received my message! -(car-mq/stop my-worker) +@my-worker => +{:qname "my-queue" + :opts + :conn-opts + :running? true + :nthreads {:worker 1, :handler 1} + :stats + {:queue-size {:last 1332, :max 1352, :p90 1323, ...} + :queueing-time-ms {:last 203, :max 4774, :p90 300, ...} + :handling-time-ms {:last 11, :max 879, :p90 43, ...} + :counts + {:handler/success 5892 + :handler/retry 808 + :handler/error 2 + :handler/backoff 2034 + :sleep/end-of-circle 350}}} ``` -The following guarantees are provided: - -- Messages are persistent (durable) as per Redis config. -- Each message will be handled once and only once. -- Handling is fault-tolerant: a message cannot be lost due to handler crash. -- Message de-duplication can be requested on an ad hoc (per message) basis. +# Guarantees - In these cases, the same message cannot ever be entered into the queue more than once simultaneously or within a (per message) specifiable post-handling backoff period. +The following guarantees are provided: -See the [message queue API](https://taoensso.github.io/carmine/taoensso.carmine.message-queue.html) for more info. \ No newline at end of file +- Messages are **persistent** (durable as per Redis config). +- Each message will be handled **once and only once**. +- Handling is **fault-tolerant** (messages won't be lost due to crashed handlers). +- Messages support optional per-message **de-duplication**, preventing the same message from being simultaneously queued more than once within a specifiable backoff period. \ No newline at end of file