This is Clojure adaptation of example from Alpakka Kafka documentation
- First we will import required namespaces:
'[ :as actor]
'[ :as s]
'[fr33m0nk.alpakka-kafka.committer :as committer]
'[fr33m0nk.alpakka-kafka.consumer :as consumer]
'[fr33m0nk.utils :as utils])
- Now we will create a stream topology
This topology consumes messages
executes mapping function with 2 messages being processed in parallel- Processing of
may not be in order, however output ofs/map-async
is always in order - If processing order is needed, use
- Processing of
Then we commit offsets to Kafka via
Finally, we run the stream with our actor-system using
(defn test-stream
[actor-system consumer-settings committer-settings consumer-topics]
(-> (consumer/->committable-source consumer-settings consumer-topics)
(s/map-async 2
(fn [message]
(let [key (consumer/key message)
value (consumer/value message)]
;; Do business processing
(println "Key is " key)
(println "Value is " value))
;; Return message
;; then-fn returns committable offset for offset committing
(fn [message]
(consumer/committable-offset message)))
(s/to-mat (committer/sink committer-settings) consumer/create-draining-control)
(s/run actor-system)))
- Lets now run the stream and see it in action
;; Create actor system
(def actor-system (actor/->actor-system "test-actor-system"))
;; Create committer settings
(def committer-settings (committer/committer-settings actor-system {:batch-size 1}))
;; Create consumer-settings
(def consumer-settings (consumer/consumer-settings actor-system
{:group-id "a-test-consumer"
:bootstrap-servers "localhost:9092"
:key-deserializer (StringDeserializer.)
:value-deserializer (StringDeserializer.)}))
;; holding on to consumer-control to shutdown streams
(def consumer-control (test-stream actor-system consumer-settings committer-settings ["testing_stuff"]))
- Stream in action 😃
;; shutdown streams using consumer-control var
@(consumer/drain-and-shutdown consumer-control
(utils/->fn0 (fn [] ::done)))
(actor/get-dispatcher actor-system))
- Let's shutdown our actor-system as well
@(actor/terminate actor-system)
- Let's require following namespace
(require '[fr33m0nk.alpakka-kafka.producer :as producer])
- We will create a new stream topology
- This topology consumes messages
executes mapping function with 2 messages being processed in parallel- Then we will publish messages to another topic and commit offsets to Kafka via
- Finally, we run the stream with our actor-system using
(defn test-stream-with-producer
[actor-system consumer-settings committer-settings producer-settings consumer-topics producer-topic]
(-> (consumer/->committable-source consumer-settings consumer-topics)
(s/map-async 2
(fn [message]
(let [_key (consumer/key message) ;; Don't care as it is null
value (consumer/value message)
committable-offset (consumer/committable-offset message)
message-to-publish (producer/->producer-record producer-topic (str/upper-case value))]
(producer/single-producer-message-envelope committable-offset message-to-publish))))
(s/to-mat (producer/committable-sink producer-settings committer-settings) consumer/create-draining-control)
(s/run actor-system)))
- Let's create required dependencies
(def actor-system (actor/->actor-system "test-actor-system"))
(def committer-settings (committer/committer-settings actor-system {:batch-size 1}))
(def consumer-settings (consumer/consumer-settings actor-system
{:group-id "a-test-consumer"
:bootstrap-servers "localhost:9092"
:key-deserializer (StringDeserializer.)
:value-deserializer (StringDeserializer.)}))
(def producer-settings (producer/producer-settings actor-system {:bootstrap-servers "localhost:9092"
:key-serializer (StringSerializer.)
:value-serializer (StringSerializer.)}))
- Let's run the stream and see it in action
(def consumer-control (test-stream-with-producer actor-system consumer-settings committer-settings producer-settings ["testing_stuff"] "output-topic"))
- Streams in action 😃
- Let's shutdown the stream now
;; shutdown streams using consumer-control var
@(consumer/drain-and-shutdown consumer-control
(utils/->fn0 (fn [] ::done)))
(actor/get-dispatcher actor-system))
- Let's shutdown our actor-system as well
@(actor/terminate actor-system)
- We will create a new stream topology
- This topology consumes messages
executes mapping function with 2 messages being processed in parallel and produces multiple producer records- These producer-records are wrapped in
which ensures offset commits only happen when all the producer-records are published - Then we will publish messages to another topic and commit offsets to Kafka via
- Finally, we run the stream with our actor-system using
(defn test-stream-with-producing-multiple-messages
[actor-system consumer-settings committer-settings producer-settings consumer-topics producer-topic]
(-> (consumer/->committable-source consumer-settings consumer-topics)
(s/map-async 2
(fn [message]
(let [_key (consumer/key message) ;; Don't care as it is null
value (consumer/value message)
committable-offset (consumer/committable-offset message)
messages-to-publish (->> (repeat 5 value)
(mapv #(producer/->producer-record producer-topic (str/upper-case %))))]
(producer/multi-producer-message-envelope committable-offset messages-to-publish))))
(s/to-mat (producer/committable-sink producer-settings committer-settings) consumer/create-draining-control)
(s/run actor-system)))
- Let's create required dependencies
(def actor-system (actor/->actor-system "test-actor-system"))
(def committer-settings (committer/committer-settings actor-system {:batch-size 1}))
(def consumer-settings (consumer/consumer-settings actor-system
{:group-id "a-test-consumer"
:bootstrap-servers "localhost:9092"
:key-deserializer (StringDeserializer.)
:value-deserializer (StringDeserializer.)}))
(def producer-settings (producer/producer-settings actor-system {:bootstrap-servers "localhost:9092"
:key-serializer (StringSerializer.)
:value-serializer (StringSerializer.)}))
- Let's run the stream and see it in action
(def consumer-control (test-stream-with-producing-multiple-messages actor-system consumer-settings committer-settings producer-settings ["testing_stuff"] "output-topic"))
- Streams in action 😃
- Let's shutdown the stream now
;; shutdown streams using consumer-control var
@(consumer/drain-and-shutdown consumer-control
(utils/->fn0 (fn [] ::done)))
(actor/get-dispatcher actor-system))
- Let's shutdown our actor-system as well
@(actor/terminate actor-system)
- We will create a new stream topology
- This topology consumes messages
executes mapping function with 2 messages being processed in parallel and produces multiple producer records- These producer-records are wrapped in
which ensures offset commits only happen when all the producer-records are published - Then we will publish messages to another topic and commit offsets to Kafka via
- Finally, we run the stream with our actor-system using
(defn test-stream-producing-multiple-messages-with-at-least-once-semantics
[actor-system consumer-settings committer-settings producer-settings consumer-topics producer-topic]
(-> (consumer/->committable-source consumer-settings consumer-topics)
(s/map-async 2
(fn [message]
(let [_key (consumer/key message) ;; Don't care as it is null
value (consumer/value message)
committable-offset (consumer/committable-offset message)
messages-to-publish (->> (repeat 3 value)
(mapv #(producer/->producer-record producer-topic (str/upper-case %))))]
(producer/multi-producer-message-envelope committable-offset messages-to-publish))))
(s/via (producer/flexi-flow producer-settings))
(s/map producer/producer-message-passthrough)
(s/to-mat (committer/sink committer-settings) consumer/create-draining-control)
(s/run actor-system)))
- Let's create required dependencies
(def actor-system (actor/->actor-system "test-actor-system"))
(def committer-settings (committer/committer-settings actor-system {:batch-size 1}))
(def consumer-settings (consumer/consumer-settings actor-system
{:group-id "a-test-consumer"
:bootstrap-servers "localhost:9092"
:key-deserializer (StringDeserializer.)
:value-deserializer (StringDeserializer.)}))
(def producer-settings (producer/producer-settings actor-system {:bootstrap-servers "localhost:9092"
:key-serializer (StringSerializer.)
:value-serializer (StringSerializer.)}))
- Let's run the stream and see it in action
(def consumer-control (test-stream-producing-multiple-messages-with-at-least-once-semantics actor-system consumer-settings committer-settings producer-settings ["testing_stuff"] "output-topic"))
- Streams in action 😃
- Let's shutdown the stream now
;; shutdown streams using consumer-control var
@(consumer/drain-and-shutdown consumer-control
(utils/->fn0 (fn [] ::done)))
(actor/get-dispatcher actor-system))
- Let's shutdown our actor-system as well
@(actor/terminate actor-system)
- We will add a restart-counter to keep restart-source's action count and require restart-source namespace
(def restart-counter (atom 0))
(require '[fr33m0nk.akka.restart-source :as restart])
- Due to limitation with the design of restart-source, consumer-control is not directly accessible. In order to gain access, we will use an atom
- Reference
(def consumer-control-reference (atom nil))
- We will create a new stream topology such that
- This topology fails 3 times and throws exception. After throwing 3 times it proceeds with following
- This topology consumes messages
executes mapping function with 2 messages being processed in parallel and produces multiple producer records- These producer-records are wrapped in
which ensures offset commits only happen when all the producer-records are published - Then we will publish messages to another topic and commit offsets to Kafka via
- Finally, we run the RestartSource with our actor-system using
(defn test-stream-with-error-handling
[actor-system restart-settings consumer-settings committer-settings producer-settings consumer-topics producer-topic]
(-> (restart/->restart-source-on-failures-with-backoff
(fn []
(-> (consumer/->committable-source consumer-settings consumer-topics)
;; Hack to gain access to underlying consumer control instance
;; reference
;; reference
(s/map-materialized-value (fn [consumer-control] (swap! consumer-control-reference #(identity %2) consumer-control) consumer-control))
(s/map (fn [message]
;; throwing exception to simulate processing failure and stream-restart and incrementing restart-counter
(when (< @restart-counter 3)
(throw (ex-info "Simulating processing failure" {:error-count (swap! restart-counter inc)})))
(let [_key (consumer/key message) ;; Don't care as it is null
value (consumer/value message)
committable-offset (consumer/committable-offset message)
messages-to-publish (->> (repeat 3 value)
(mapv #(producer/->producer-record producer-topic (str/upper-case %))))]
(producer/multi-producer-message-envelope committable-offset messages-to-publish))))
(s/via (producer/flexi-flow producer-settings))
(s/map producer/producer-message-passthrough)
(s/via (committer/flow committer-settings)))))
(s/run-with s/ignoring-sink actor-system)))
- Let's create required dependencies
(def actor-system (actor/->actor-system "test-actor-system"))
(def committer-settings (committer/committer-settings actor-system {:batch-size 1}))
(def consumer-settings (consumer/consumer-settings actor-system
{:group-id "a-test-consumer"
:bootstrap-servers "localhost:9092"
:key-deserializer (StringDeserializer.)
:value-deserializer (StringDeserializer.)}))
(def producer-settings (producer/producer-settings actor-system {:bootstrap-servers "localhost:9092"
:key-serializer (StringSerializer.)
:value-serializer (StringSerializer.)}))
(def restart-settings (restart/restart-settings 1000 5000 0.2 {}))
- Let's run the stream and see it in action
(def restart-source (test-stream-with-error-handling actor-system
- Error handling in action 😃
- logs:
[INFO] [05/21/2023 03:25:40.542] [] [SingleSourceLogic(akka://test-actor-system)] [164b3] Completing
[WARN] [05/21/2023 03:25:40.548] [] [RestartWithBackoffSource(akka://test-actor-system)] Restarting stream due to failure [1]: clojure.lang.ExceptionInfo: Simulating processing failure {:error-count 1}
clojure.lang.ExceptionInfo: Simulating processing failure {:error-count 1}
[INFO] [05/21/2023 03:25:42.320] [] [SingleSourceLogic(akka://test-actor-system)] [3955d] Completing
[WARN] [05/21/2023 03:25:42.323] [] [RestartWithBackoffSource(akka://test-actor-system)] Restarting stream due to failure [2]: clojure.lang.ExceptionInfo: Simulating processing failure {:error-count 2}
clojure.lang.ExceptionInfo: Simulating processing failure {:error-count 2}
[INFO] [05/21/2023 03:25:45.315] [] [SingleSourceLogic(akka://test-actor-system)] [83500] Completing
[WARN] [05/21/2023 03:25:45.316] [] [RestartWithBackoffSource(akka://test-actor-system)] Restarting stream due to failure [3]: clojure.lang.ExceptionInfo: Simulating processing failure {:error-count 3}
clojure.lang.ExceptionInfo: Simulating processing failure {:error-count 3}
- restart counter:
;;=> 3
- After 3 restarts, state of stream is healthy again 😃
- Let's shutdown the stream now
;; shutdown streams using consumer-control var
@(consumer/drain-and-shutdown @consumer-control-reference
(utils/->fn0 (fn [] ::done)))
(actor/get-dispatcher actor-system))
- Let's shutdown our actor-system as well
@(actor/terminate actor-system)
- We will require transactional namespace for setting sink up Transactional flow
(require '[fr33m0nk.alpakka-kafka.transactional :as transactional])
- Due to limitation with the design of restart-source, consumer-control is not directly accessible. In order to gain access, we will use an atom
- Reference
(def consumer-control-reference (atom nil))
- We will create a new stream topology such that
- This topology uses a Transactional source and flow and wraps it over RestartSource
- This topology consumes messages
executes mapping function with 2 messages being processed in parallel and produces multiple producer records- These producer-records are wrapped in
which ensures offset commits only happen when all the producer-records are published - Then we will publish messages to another topic and commit offsets to Kafka via
and usingtransactional/transactional-flow
- Finally, we run the RestartSource with our actor-system using
(defn test-transactional-stream-with-error-handling
[actor-system restart-settings consumer-settings committer-settings producer-settings consumer-topics producer-topic]
(-> (restart/->restart-source-on-failures-with-backoff
(fn []
(-> (transactional/->transactional-source consumer-settings consumer-topics)
;; Hack to gain access to underlying consumer control instance
;; reference
;; reference
(s/map-materialized-value (fn [consumer-control] (swap! consumer-control-reference #(identity %2) consumer-control) consumer-control))
(s/map-async 2 (fn [message]
(let [_key (consumer/key message) ;; Don't care as it is null
value (consumer/value message)
partition-offset (consumer/partition-offset message)
messages-to-publish (->> (repeat 3 value)
(mapv #(producer/->producer-record producer-topic (str/upper-case %))))]
(producer/multi-producer-message-envelope partition-offset messages-to-publish))))
(s/via (transactional/transactional-flow producer-settings "unique-transaction-id-for-this-application")))))
(s/run-with s/ignoring-sink actor-system)))
- Let's create required dependencies
(def actor-system (actor/->actor-system "test-actor-system"))
(def committer-settings (committer/committer-settings actor-system {:batch-size 1}))
(def consumer-settings (consumer/consumer-settings actor-system
{:group-id "a-test-consumer"
:bootstrap-servers "localhost:9092"
:key-deserializer (StringDeserializer.)
:value-deserializer (StringDeserializer.)}))
(def producer-settings (producer/producer-settings actor-system {:bootstrap-servers "localhost:9092"
:key-serializer (StringSerializer.)
:value-serializer (StringSerializer.)}))
(def restart-settings (restart/restart-settings 1000 5000 0.2 {}))
- Let's run the stream and see it in action
(def restart-source (test-transactional-stream-with-error-handling actor-system
- Let's shutdown the stream now
;; shutdown streams using consumer-control var
@(consumer/drain-and-shutdown @consumer-control-reference
(utils/->fn0 (fn [] ::done)))
(actor/get-dispatcher actor-system))
- Let's shutdown our actor-system as well
@(actor/terminate actor-system)