diff --git a/src/taoensso/carmine.clj b/src/taoensso/carmine.clj index 98028d31..770ada76 100644 --- a/src/taoensso/carmine.clj +++ b/src/taoensso/carmine.clj @@ -419,9 +419,27 @@ java.io.Closeable (close [this] (close-listener this))) +(defn- get-keep-alive-fn + "Returns (fn send-keep-alive!?) which + records activity, returns true iff first activity recorded in last `msecs`." + ;; Much simpler to implement only for listeners than as a general conns feature + ;; (where hooking in to recording activity is non-trivial). + [msecs] + (let [msecs (long msecs) + last-activity_ (atom (System/currentTimeMillis))] + + (fn send-keep-alive!? [] + (let [now (System/currentTimeMillis) + last-activity (enc/reset-in! last-activity_ now)] + (> (- now (long last-activity)) msecs))))) + +(comment (def kaf (get-keep-alive-fn 2000)) (kaf)) + (defn -with-new-listener "Implementation detail. Returns new Listener." - [{:keys [conn-spec init-state handler-fn swapping-handler? body-fn error-fn]}] + [{:keys [conn-spec init-state handler-fn swapping-handler? body-fn error-fn + keep-alive-ms]}] + (let [state_ (atom init-state) handler-fn_ (atom handler-fn) @@ -430,6 +448,10 @@ (assoc (conns/conn-spec conn-spec) :listener? true)) + keep-alive-fn + (when-let [ms keep-alive-ms] + (get-keep-alive-fn ms)) + error-fn (fn [error-m] (when-let [ef error-fn] @@ -450,18 +472,19 @@ (error-fn {:error :connection-broken :throwable t}) (timbre/error "Listener connection broken")))) - f - (future-call ; Thread to long-poll for messages + msg-polling-future + (future-call (bound-fn [] (loop [] (when-not @broken?_ + (when-let [kaf keep-alive-fn] (kaf)) ; Record activity on conn (when-let [reply (try (protocol/get-unparsed-reply in {}) (catch java.net.SocketTimeoutException _ - (when-let [ex (conns/-conn-error conn)] + (if-let [ex (conns/-conn-error conn)] (break! ex))) (catch Exception ex @@ -480,12 +503,28 @@ (recur)))))] - (reset! future_ f) + (reset! future_ msg-polling-future) (protocol/with-context conn (body-fn) - (protocol/execute-requests (not :get-replies) nil)) + (protocol/execute-requests (not :get-replies) nil)) + + (when-let [kaf keep-alive-fn] + (let [f + (bound-fn [] + (loop [] + (when-not @broken?_ + (Thread/sleep (long keep-alive-ms)) + (when (kaf) ; Should send keep-alive now? + (when-let [ex (conns/-conn-error conn)] + (break! ex))) + + (recur))))] + + (doto (Thread. ^Runnable f) + (.setDaemon true) + (.start)))) - (Listener. conn handler-fn_ state_ f))) + (Listener. conn handler-fn_ state_ msg-polling-future))) (defmacro with-new-listener "Creates a persistent[1] connection to Redis server and a thread to listen for