From 53f9ff6a97626f48e7d6dc3f5bf7cba904ab1070 Mon Sep 17 00:00:00 2001 From: Robert Luo Date: Fri, 30 Aug 2024 11:41:46 +0800 Subject: [PATCH] add `:commit-strategy` option --- deps.edn | 2 +- src/robertluo/waterfall.clj | 1 + src/robertluo/waterfall/core.clj | 20 +++++++++++++------- 3 files changed, 15 insertions(+), 8 deletions(-) diff --git a/deps.edn b/deps.edn index 5c03f3b..bf48db6 100644 --- a/deps.edn +++ b/deps.edn @@ -1,7 +1,7 @@ ;Configuration for Clojure tools.deps {:paths ["src"] - :deps {manifold/manifold {:mvn/version "0.4.3"} + :deps {manifold/manifold {:mvn/version "0.4.1"} org.apache.kafka/kafka-clients {:mvn/version "3.4.0" :scope "provided"} org.clojure/core.match {:mvn/version "1.0.1"}} :aliases {:dev diff --git a/src/robertluo/waterfall.clj b/src/robertluo/waterfall.clj index f19ba33..fdd191d 100644 --- a/src/robertluo/waterfall.clj +++ b/src/robertluo/waterfall.clj @@ -97,6 +97,7 @@ - optional `::consumer-config` can specify additional kafka consumer configuration. With additions: - `:position` either `:beginning` `:end`, none for commited position (default) - `:poll-duration` for how long the consumer poll returns, is a Duration value, default 10 seconds + - `:commit-strategy` one of `:sync`, `:async`, `:auto`, default `:sync` The returned map has different level of key-values let you use: - Highest level, no additional knowledge: diff --git a/src/robertluo/waterfall/core.clj b/src/robertluo/waterfall/core.clj index fceaf1e..1fd6c5b 100644 --- a/src/robertluo/waterfall/core.clj +++ b/src/robertluo/waterfall/core.clj @@ -59,6 +59,7 @@ Args: - consumer: Kafka Consumer instance. + - committer: Function to commit the offset. - out-sink: Manifold stream where consumed messages are put. Returns `cmd-self`, a function that accepts following commands: @@ -69,7 +70,7 @@ - [::poll duration]: Polls the consumer for messages. Unknown commands will raise an exception." - [^Consumer consumer out-sink] + [^Consumer consumer committer out-sink] (let [mailbox (ms/stream) cmd-self (fn [cmd] (ms/put! mailbox cmd)) ; function to post commands to self closing? (atom false) ; flag to indicate if the actor is closing @@ -96,7 +97,7 @@ (do (when (.paused consumer) (.resume consumer (.assignment consumer))) - (.commitSync consumer) + (committer consumer) (cmd-self [::poll duration])) ; resume and poll [::poll duration] (let [putting-all (fn [events] ; function to handle events and resume @@ -114,17 +115,22 @@ (defn consumer [nodes group-id topics - {:keys [poll-duration position] + {:keys [poll-duration position commit-strategy] :as conf - :or {poll-duration (Duration/ofSeconds 10)}}] - (let [conr (-> (dissoc conf :poll-duration :position) ;avoid kafka complaints + :or {poll-duration (Duration/ofSeconds 10) + commit-strategy :sync}}] + (let [committer (case commit-strategy + :sync (fn [consumer] (.commitSync consumer)) + :async (fn [consumer] (.commitAsync consumer)) + :auto (constantly nil)) + conr (-> (dissoc conf :poll-duration :position) ;avoid kafka complaints (merge {:bootstrap-servers nodes :group-id group-id - :enable-auto-commit false}) + :enable-auto-commit (= commit-strategy :auto)}) util/->config-map (KafkaConsumer. (ByteArrayDeserializer.) (ByteArrayDeserializer.))) out-sink (ms/stream) - actor (consumer-actor conr out-sink)] + actor (consumer-actor conr committer out-sink)] (actor [::subscribe topics]) (when position (actor [::seek position])) (ms/on-closed out-sink (fn [] @(actor [::close])))