Skip to content

Commit

Permalink
[#207] Listeners: support optional keep-alive (@aravindbaskaran)
Browse files Browse the repository at this point in the history
  • Loading branch information
ptaoussanis committed Sep 16, 2020
1 parent fa7e72e commit 4431fe3
Showing 1 changed file with 46 additions and 7 deletions.
53 changes: 46 additions & 7 deletions src/taoensso/carmine.clj
Original file line number Diff line number Diff line change
Expand Up @@ -419,9 +419,27 @@
java.io.Closeable
(close [this] (close-listener this)))

(defn- get-keep-alive-fn
"Returns (fn send-keep-alive!?) which
records activity, returns true iff first activity recorded in last `msecs`."
;; Much simpler to implement only for listeners than as a general conns feature
;; (where hooking in to recording activity is non-trivial).
[msecs]
(let [msecs (long msecs)
last-activity_ (atom (System/currentTimeMillis))]

(fn send-keep-alive!? []
(let [now (System/currentTimeMillis)
last-activity (enc/reset-in! last-activity_ now)]
(> (- now (long last-activity)) msecs)))))

(comment (def kaf (get-keep-alive-fn 2000)) (kaf))

(defn -with-new-listener
"Implementation detail. Returns new Listener."
[{:keys [conn-spec init-state handler-fn swapping-handler? body-fn error-fn]}]
[{:keys [conn-spec init-state handler-fn swapping-handler? body-fn error-fn
keep-alive-ms]}]

(let [state_ (atom init-state)
handler-fn_ (atom handler-fn)

Expand All @@ -430,6 +448,10 @@
(assoc (conns/conn-spec conn-spec)
:listener? true))

keep-alive-fn
(when-let [ms keep-alive-ms]
(get-keep-alive-fn ms))

error-fn
(fn [error-m]
(when-let [ef error-fn]
Expand All @@ -450,18 +472,19 @@
(error-fn {:error :connection-broken :throwable t})
(timbre/error "Listener connection broken"))))

f
(future-call ; Thread to long-poll for messages
msg-polling-future
(future-call
(bound-fn []
(loop []
(when-not @broken?_

(when-let [kaf keep-alive-fn] (kaf)) ; Record activity on conn
(when-let [reply
(try
(protocol/get-unparsed-reply in {})

(catch java.net.SocketTimeoutException _
(when-let [ex (conns/-conn-error conn)]
(if-let [ex (conns/-conn-error conn)]
(break! ex)))

(catch Exception ex
Expand All @@ -480,12 +503,28 @@

(recur)))))]

(reset! future_ f)
(reset! future_ msg-polling-future)

(protocol/with-context conn (body-fn)
(protocol/execute-requests (not :get-replies) nil))
(protocol/execute-requests (not :get-replies) nil))

(when-let [kaf keep-alive-fn]
(let [f
(bound-fn []
(loop []
(when-not @broken?_
(Thread/sleep (long keep-alive-ms))
(when (kaf) ; Should send keep-alive now?
(when-let [ex (conns/-conn-error conn)]
(break! ex)))

(recur))))]

(doto (Thread. ^Runnable f)
(.setDaemon true)
(.start))))

(Listener. conn handler-fn_ state_ f)))
(Listener. conn handler-fn_ state_ msg-polling-future)))

(defmacro with-new-listener
"Creates a persistent[1] connection to Redis server and a thread to listen for
Expand Down

0 comments on commit 4431fe3

Please sign in to comment.