Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add :commit-strategy to consumer-options #31

Merged
merged 2 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/robertluo/waterfall.clj
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
- optional `::consumer-config` can specify additional kafka consumer configuration. With additions:
- `:position` either `:beginning` `:end`, none for commited position (default)
- `:poll-duration` for how long the consumer poll returns, is a Duration value, default 10 seconds
- `:commit-strategy` one of `:sync`, `:async`, `:auto`, default `:sync`

The returned map has different level of key-values let you use:
- Highest level, no additional knowledge:
Expand Down
20 changes: 13 additions & 7 deletions src/robertluo/waterfall/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@

Args:
- consumer: Kafka Consumer instance.
- committer: Function to commit the offset.
- out-sink: Manifold stream where consumed messages are put.

Returns `cmd-self`, a function that accepts following commands:
Expand All @@ -69,7 +70,7 @@
- [::poll duration]: Polls the consumer for messages.

Unknown commands will raise an exception."
[^Consumer consumer out-sink]
[^Consumer consumer committer out-sink]
(let [mailbox (ms/stream)
cmd-self (fn [cmd] (ms/put! mailbox cmd)) ; function to post commands to self
closing? (atom false) ; flag to indicate if the actor is closing
Expand All @@ -96,7 +97,7 @@
(do
(when (.paused consumer)
(.resume consumer (.assignment consumer)))
(.commitSync consumer)
(committer consumer)
(cmd-self [::poll duration])) ; resume and poll
[::poll duration]
(let [putting-all (fn [events] ; function to handle events and resume
Expand All @@ -114,17 +115,22 @@

(defn consumer
[nodes group-id topics
{:keys [poll-duration position]
{:keys [poll-duration position commit-strategy]
:as conf
:or {poll-duration (Duration/ofSeconds 10)}}]
(let [conr (-> (dissoc conf :poll-duration :position) ;avoid kafka complaints
:or {poll-duration (Duration/ofSeconds 10)
commit-strategy :sync}}]
(let [committer (case commit-strategy
:sync (fn [consumer] (.commitSync consumer))
:async (fn [consumer] (.commitAsync consumer))
:auto (constantly nil))
conr (-> (dissoc conf :poll-duration :position) ;avoid kafka complaints
(merge {:bootstrap-servers nodes
:group-id group-id
:enable-auto-commit false})
:enable-auto-commit (= commit-strategy :auto)})
util/->config-map
(KafkaConsumer. (ByteArrayDeserializer.) (ByteArrayDeserializer.)))
out-sink (ms/stream)
actor (consumer-actor conr out-sink)]
actor (consumer-actor conr committer out-sink)]
(actor [::subscribe topics])
(when position (actor [::seek position]))
(ms/on-closed out-sink (fn [] @(actor [::close])))
Expand Down
Loading