Skip to content

Commit

Permalink
[doc] [mq] Misc docstring improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
ptaoussanis committed May 28, 2024
1 parent 46d57aa commit 165a4ef
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 77 deletions.
152 changes: 92 additions & 60 deletions src/taoensso/carmine/message_queue.clj
Original file line number Diff line number Diff line change
Expand Up @@ -298,15 +298,15 @@

(defn message-status
"Returns current message status, e/o:
nil - Not in queue or already GC'd
:queued - Awaiting handler
:queued-with-backoff - Awaiting handler, but skip until backoff expired
:locked - Currently with handler
:locked-with-requeue - Currently with handler, will requeue when done
:done-awaiting-gc - Finished handling, awaiting GC
:done-with-backoff - Finished handling, awaiting GC,
but skip until dedupe backoff expired
:done-with-requeue - Will requeue, but skip until dedupe backoff expired"
`nil` - Not in queue or already GC'd
`:queued` - Awaiting handler
`:queued-with-backoff` - Awaiting handler, but skip until backoff expired
`:locked` - Currently with handler
`:locked-with-requeue` - Currently with handler, will requeue when done
`:done-awaiting-gc` - Finished handling, awaiting GC
`:done-with-backoff` - Finished handling, awaiting GC,
but skip until dedupe backoff expired
`:done-with-requeue` - Will requeue, but skip until dedupe backoff expired"
[qname mid]
(car/parse ->message-status
(car/lua @lua-msg-status_
Expand All @@ -330,19 +330,20 @@
`error` e/o #{:already-queued :locked :backoff}.
Options:
:init-backoff-ms - Optional initial backoff in msecs.
:lock-ms - Optional lock time in msecs. When unspecified, the
worker's default lock time will be used.
`:init-backoff-ms` - Optional initial backoff in msecs.
`:lock-ms` - Optional lock time in msecs. When unspecified, the
worker's default lock time will be used.
:mid - Optional unique message id (e.g. message hash) to
identify a specific message for dedupe/update/requeue.
When unspecified, a random uuid will be used.
`:mid` - Optional unique message id (e.g. message hash) to
identify a specific message for dedupe/update/requeue.
When unspecified, a random uuid will be used.
:can-update? - When true, will update message content and/or lock-ms for
an mid still awaiting handling.
:can-requeue? - When true, will mark message with `:locked` or
`:done-with-backoff` status so that it will be
automatically requeued after garbage collection."
`:can-update?` - When true, will update message content and/or lock-ms for
an mid still awaiting handling.
`:can-requeue?` - When true, will mark message with `:locked` or
`:done-with-backoff` status so that it will be
automatically requeued after garbage collection."

{:arglists
'([qname message]
Expand Down Expand Up @@ -497,7 +498,7 @@
:a [qk-a qk-b]
:b [qk-b qk-a]))]

(try ; NB conn's read-timeout may be insufficient!
(try ; NB conn's read-timeout may be insufficient!
(wcar conn-opts (car/brpoplpush qk-src qk-dst secs-dbl))
(catch Throwable _ nil))))

Expand Down Expand Up @@ -607,8 +608,8 @@

(defprotocol IWorker
"Implementation detail."
(start [this])
(stop [this]))
(^:no-doc start [this])
(^:no-doc stop [this]))

(deftype CarmineMessageQueueWorker
[qname worker-opts conn-opts running?_ future-pool worker-futures_
Expand Down Expand Up @@ -772,7 +773,7 @@
(defmethod print-method CarmineMessageQueueWorker
[x ^java.io.Writer w] (.write w (str "#" ns "." x))))

(defn worker? [x] (instance? CarmineMessageQueueWorker x))
(defn ^:no-doc worker? [x] (instance? CarmineMessageQueueWorker x))

(defn monitor-fn
"Returns a worker monitor fn that warns when queue exceeds the prescribed
Expand Down Expand Up @@ -802,54 +803,85 @@
(comment (default-throttle-ms-fn 0))

(defn worker
"Returns a stateful threaded CarmineMessageQueueWorker to handle messages
"Returns a stateful threaded `CarmineMessageQueueWorker` to handle messages
added to named queue with `enqueue`.
API:
- (deref <worker>) => Status map, {:keys [running? nthreads stats ...]}.
- (deref <worker>) => Detailed worker status map (see \"Debugging\" below).
- (<worker> :start) => Same as calling (start <worker>).
- (<worker> :stop) => Same as calling (stop <worker>).
- (<worker> :queue-size) => Same as calling `queue-size` for given qname.
- (<worker> :queue-status) => Same as calling `queue-status` for given qname.
- (<worker> :queue-content) => Same as calling `queue-content` for given qname.
Debugging:
To debug a worker, deref it to see detailed status map with keys:
`:qname` - Worker's queue name (string)
`:opts` - Worker's options map
`:conn-opts` - Worker's connection options map
`:running?` - Is the worker currently running? (true/false)
`:nthreads` - {:keys [worker handler]}
`:stats`
`:queue-size` - {:keys [last min max mean mad var p50 p90 ...]}
`:queueing-time-ms` - {:keys [last min max mean mad var p50 p90 ...]}
`:handling-time-ns` - {:keys [last min max mean mad var p50 p90 ...]}
`:counts`
`:handler/success` - Number of handler calls with `:success` status
`:handler/error` - Number of handler calls with `:error` status
`:handler/retry` - Number of handler calls with `:retry` status
`:handler/backoff` - Number of handler calls encountering an mid in backoff
...
See also the `:monitor` option below, and utils:
`queue-names`, `queue-size`, `queue-content`, `message-status`, etc.
Options:
:handler - (fn [{:keys [qname mid message attempt]}]) that throws
or returns {:status <#{:success :error :retry}>
:throwable <Throwable>
:backoff-ms <retry-or-dedupe-backoff-ms}.
:monitor - (fn [{:keys [queue-size ndry-runs poll-reply]}])
called on each worker loop iteration. Useful for queue
monitoring/logging. See also `monitor-fn`.
:lock-ms - Default time that handler may keep a message before handler
considered fatally stalled and message is re-queued. Must be
sufficiently high to prevent double handling. Can be
overridden on a per-message basis via `enqueue`.
:throttle-ms - Thread sleep period between each poll.
Can be a (fn [queue-size]) -> ?sleep-msecs,
or :auto (to use `default-throttle-ms-fn`).
:eoq-backoff-ms - Max msecs to sleep thread each time end of queue is reached.
Can be a (fn [ndry-runs]) -> msecs for n<=5.
Sleep may be interrupted when new messages are enqueued.
If present, connection read timeout should be >= max msecs.
:nthreads-worker - Number of threads to monitor and maintain queue.
:nthreads-handler - Number of threads to handle queue messages with handler fn."
`:handler`
(fn [{:keys [qname mid message attempt]}]) called for each worker message.
Should throw or return a map with possible keys:
`:monitor`
(fn [{:keys [queue-size ndry-runs poll-reply]}]) called on each worker
worker loop iteration. Useful for queue monitoring/logging.
See also `monitor-fn`.
`:lock-ms` (default 60 minutes)
Default time (in milliseconds) that handler may keep a message before handler
is considered fatally stalled and message is re-queued. Must be large enough
to prevent double handling!
Can be overridden on a per-message basis via `enqueue`.
`:throttle-ms` (default `:auto`)
Thread sleep period (in milliseconds) between each poll.
Can be a (fn [queue-size]) -> ?sleep-msecs,
or `:auto` (to use `default-throttle-ms-fn`).
`:eoq-backoff-ms` (default `exp-backoff` fn)
Max time (in milliseconds) to sleep thread each time end of queue is reached.
Can be a (fn [ndry-runs]) -> msecs for n<=5.
Sleep may be interrupted when new messages are enqueued.
If present, connection read timeout should be >= max msecs!
`:nthreads-worker` - Number of threads to monitor and maintain queue.
`:nthreads-handler` - Number of threads to handle queue messages with handler fn."

([conn-opts qname] (worker conn-opts qname nil))
([conn-opts qname
{:keys [handler monitor lock-ms eoq-backoff-ms throttle-ms auto-start
nthreads-worker nthreads-handler] :as worker-opts
:or {handler (fn [m] (timbre/info m) {:status :success})
monitor (monitor-fn qname 1000 (enc/ms :hours 6))
lock-ms (enc/ms :mins 60)
nthreads-worker 1
nthreads-handler 1
throttle-ms :auto #_200
eoq-backoff-ms exp-backoff
auto-start true}}]

:or
{handler (fn [m] (timbre/info m) {:status :success})
monitor (monitor-fn qname 1000 (enc/ms :hours 6))
lock-ms (enc/ms :mins 60)
nthreads-worker 1
nthreads-handler 1
throttle-ms :auto #_200
eoq-backoff-ms exp-backoff
auto-start true}}]

(let [nthreads (get worker-opts :nthreads 1) ; Back compatibility
nthreads-worker (if (contains? worker-opts :nthreads-worker) nthreads-worker nthreads)
Expand Down Expand Up @@ -895,10 +927,10 @@
;;;; Deprecated

(enc/deprecated
(enc/defalias clear-queues queues-clear!! {:deprecated "v3.3.0-RC1 (2023-07-18)"})
(enc/defalias ^:no-doc clear-queues queues-clear!! {:deprecated "v3.3.0-RC1 (2023-07-18)"})

(defn ^:deprecated make-dequeue-worker
"DEPRECATED: Use `worker` instead."
(defn ^:no-doc ^:deprecated make-dequeue-worker
"Prefer `worker`."
[pool spec & {:keys [handler-fn handler-ttl-msecs backoff-msecs throttle-msecs
auto-start?]}]
(worker {:pool pool :spec spec}
Expand Down
60 changes: 43 additions & 17 deletions wiki/3-Message-queue.md
Original file line number Diff line number Diff line change
@@ -1,31 +1,57 @@
Carmine includes a simple **distributed message queue** originally inspired by a [post](http://oldblog.antirez.com/post/250) by Redis's original author Salvatore Sanfilippo.

> **Note**: [Carmine 3.3](../releases/tag/v3.3.0-RC1) is introducing major improvements to Carmine's message queue. I plan to update and expand this documentation before the final 3.3 release. - [Peter](https://www.taoensso.com)
# API

See linked docstrings below for features and usage:

| Name | Description |
| :------------------------------------------------------------------------------------------------------------------------------ | ------------------------------------------------------------------------ |
| [`worker`](https://cljdoc.org/d/com.taoensso/carmine/CURRENT/api/taoensso.carmine.message-queue#worker) | Returns a worker for named queue. Deref worker for detailed status info! |
| [`enqueue`](https://cljdoc.org/d/com.taoensso/carmine/CURRENT/api/taoensso.carmine.message-queue#enqueue) | Enqueues given message for processing by active worker/s. |
| [`set-min-log-level!`](https://cljdoc.org/d/com.taoensso/carmine/CURRENT/api/taoensso.carmine.message-queue#set-min-log-level!) | Sets minimum log level for message queue logs. |

# Example

# Example usage

```clojure
(:require [taoensso.carmine.message-queue :as car-mq]) ; Add to `ns` macro
(def my-conn-opts {:pool {<opts>} :spec {<opts>}})

(def my-worker
(car-mq/worker {:pool {<opts>} :spec {<opts>}} "my-queue"
{:handler (fn [{:keys [message attempt]}]
(println "Received" message)
{:status :success})}))
(car-mq/worker my-conn-opts "my-queue"
{:handler
(fn [{:keys [message attempt]}]
(try
(println "Received" message)
{:status :success}
(catch Throwable _
(println "Handler error!")
{:status :retry})))}))

(wcar* (car-mq/enqueue "my-queue" "my message!"))
%> Received my message!

(car-mq/stop my-worker)
@my-worker =>
{:qname "my-queue"
:opts <opts-map>
:conn-opts <my-conn-opts>
:running? true
:nthreads {:worker 1, :handler 1}
:stats
{:queue-size {:last 1332, :max 1352, :p90 1323, ...}
:queueing-time-ms {:last 203, :max 4774, :p90 300, ...}
:handling-time-ms {:last 11, :max 879, :p90 43, ...}
:counts
{:handler/success 5892
:handler/retry 808
:handler/error 2
:handler/backoff 2034
:sleep/end-of-circle 350}}}
```

The following guarantees are provided:

- Messages are persistent (durable) as per Redis config.
- Each message will be handled once and only once.
- Handling is fault-tolerant: a message cannot be lost due to handler crash.
- Message de-duplication can be requested on an ad hoc (per message) basis.
# Guarantees

In these cases, the same message cannot ever be entered into the queue more than once simultaneously or within a (per message) specifiable post-handling backoff period.
The following guarantees are provided:

See the [message queue API](https://taoensso.github.io/carmine/taoensso.carmine.message-queue.html) for more info.
- Messages are **persistent** (durable as per Redis config).
- Each message will be handled **once and only once**.
- Handling is **fault-tolerant** (messages won't be lost due to crashed handlers).
- Messages support optional per-message **de-duplication**, preventing the same message from being simultaneously queued more than once within a specifiable backoff period.

0 comments on commit 165a4ef

Please sign in to comment.