Skip to content

Latest commit

 

History

History
478 lines (414 loc) · 25.6 KB

examples.md

File metadata and controls

478 lines (414 loc) · 25.6 KB

Introduction to fr33m0nk/clj-alpakka-kafka

Using Alpakka Kafka stream with a sink

This is Clojure adaptation of example from Alpakka Kafka documentation

  1. First we will import required namespaces:
(require 
 '[fr33m0nk.akka.actor :as actor]
 '[fr33m0nk.akka.stream :as s]
 '[fr33m0nk.alpakka-kafka.committer :as committer]
 '[fr33m0nk.alpakka-kafka.consumer :as consumer]
 '[fr33m0nk.utils :as utils])
  1. Now we will create a stream topology
  • This topology consumes messages

  • s/map-async executes mapping function with 2 messages being processed in parallel

    • Processing of s/map-async may not be in order, however output of s/map-async is always in order
    • If processing order is needed, use s/map
  • Then we commit offsets to Kafka via s/to-mat and committer/sink

  • Finally, we run the stream with our actor-system using s/run

(defn test-stream
  [actor-system consumer-settings committer-settings consumer-topics]
  (-> (consumer/->committable-source consumer-settings consumer-topics)
      (s/map-async 2
                   (fn [message]
                     (let [key (consumer/key message)
                           value (consumer/value message)]
                       ;; Do business processing    
                       (println "Key is " key)
                       (println "Value is " value))
                       ;; Return message
                     message)
                     ;; then-fn returns committable offset for offset committing
                   (fn [message]
                     (consumer/committable-offset message)))
      (s/to-mat (committer/sink committer-settings) consumer/create-draining-control)
      (s/run actor-system)))
  1. Lets now run the stream and see it in action
;; Create actor system
(def actor-system (actor/->actor-system "test-actor-system"))

;; Create committer settings
(def committer-settings (committer/committer-settings actor-system {:batch-size 1}))

;; Create consumer-settings
(def consumer-settings (consumer/consumer-settings actor-system
                                                   {:group-id "a-test-consumer"
                                                    :bootstrap-servers "localhost:9092"
                                                    :key-deserializer (StringDeserializer.)
                                                    :value-deserializer (StringDeserializer.)}))

;; holding on to consumer-control to shutdown streams
(def consumer-control (test-stream actor-system consumer-settings committer-settings ["testing_stuff"]))
  1. Stream in action 😃
image 5. Let's shutdown the stream now
;; shutdown streams using consumer-control var
@(consumer/drain-and-shutdown consumer-control
                             (CompletableFuture/supplyAsync
                               (utils/->fn0 (fn [] ::done)))
                             (actor/get-dispatcher actor-system))
  1. Let's shutdown our actor-system as well
@(actor/terminate actor-system)

Using Alpakka Kafka stream with a Kafka Producer

This is Clojure adaptation of example from Alpakka Kafka documentation

  1. Let's require following namespace
(require '[fr33m0nk.alpakka-kafka.producer :as producer])
  1. We will create a new stream topology
  • This topology consumes messages
  • s/map-async executes mapping function with 2 messages being processed in parallel
  • Then we will publish messages to another topic and commit offsets to Kafka via s/to-mat and producer/committable-sink
  • Finally, we run the stream with our actor-system using s/run
(defn test-stream-with-producer
  [actor-system consumer-settings committer-settings producer-settings consumer-topics producer-topic]
  (-> (consumer/->committable-source consumer-settings consumer-topics)
      (s/map-async 2
                   (fn [message]
                     (let [_key (consumer/key message)      ;; Don't care as it is null
                           value (consumer/value message)
                           committable-offset (consumer/committable-offset message)
                           message-to-publish (producer/->producer-record producer-topic (str/upper-case value))]
                       (producer/single-producer-message-envelope committable-offset message-to-publish))))
      (s/to-mat (producer/committable-sink producer-settings committer-settings) consumer/create-draining-control)
      (s/run actor-system)))
  1. Let's create required dependencies
(def actor-system (actor/->actor-system "test-actor-system"))

(def committer-settings (committer/committer-settings actor-system {:batch-size 1}))

(def consumer-settings (consumer/consumer-settings actor-system
                                                   {:group-id "a-test-consumer"
                                                    :bootstrap-servers "localhost:9092"
                                                    :key-deserializer (StringDeserializer.)
                                                    :value-deserializer (StringDeserializer.)}))

(def producer-settings (producer/producer-settings actor-system {:bootstrap-servers "localhost:9092"
                                                                 :key-serializer (StringSerializer.)
                                                                 :value-serializer (StringSerializer.)}))
  1. Let's run the stream and see it in action
(def consumer-control (test-stream-with-producer actor-system consumer-settings committer-settings producer-settings ["testing_stuff"] "output-topic"))
  1. Streams in action 😃
image
  1. Let's shutdown the stream now
;; shutdown streams using consumer-control var
@(consumer/drain-and-shutdown consumer-control
                             (CompletableFuture/supplyAsync
                               (utils/->fn0 (fn [] ::done)))
                             (actor/get-dispatcher actor-system))
  1. Let's shutdown our actor-system as well
@(actor/terminate actor-system)

Using Alpakka Kafka stream with a Kafka Producer for producing multiple messages

This is Clojure adaptation of example from Alpakka Kafka documentation

  1. We will create a new stream topology
  • This topology consumes messages
  • s/map-async executes mapping function with 2 messages being processed in parallel and produces multiple producer records
  • These producer-records are wrapped in producer/multi-producer-message-envelope which ensures offset commits only happen when all the producer-records are published
  • Then we will publish messages to another topic and commit offsets to Kafka via s/to-mat and producer/committable-sink
  • Finally, we run the stream with our actor-system using s/run
(defn test-stream-with-producing-multiple-messages
  [actor-system consumer-settings committer-settings producer-settings consumer-topics producer-topic]
  (-> (consumer/->committable-source consumer-settings consumer-topics)
      (s/map-async 2
                   (fn [message]
                     (let [_key (consumer/key message)      ;; Don't care as it is null
                           value (consumer/value message)
                           committable-offset (consumer/committable-offset message)
                           messages-to-publish (->> (repeat 5 value)
                                                    (mapv #(producer/->producer-record producer-topic (str/upper-case %))))]
                       (producer/multi-producer-message-envelope committable-offset messages-to-publish))))
      (s/to-mat (producer/committable-sink producer-settings committer-settings) consumer/create-draining-control)
      (s/run actor-system)))
  1. Let's create required dependencies
(def actor-system (actor/->actor-system "test-actor-system"))

(def committer-settings (committer/committer-settings actor-system {:batch-size 1}))

(def consumer-settings (consumer/consumer-settings actor-system
                                                   {:group-id "a-test-consumer"
                                                    :bootstrap-servers "localhost:9092"
                                                    :key-deserializer (StringDeserializer.)
                                                    :value-deserializer (StringDeserializer.)}))

(def producer-settings (producer/producer-settings actor-system {:bootstrap-servers "localhost:9092"
                                                                 :key-serializer (StringSerializer.)
                                                                 :value-serializer (StringSerializer.)}))
  1. Let's run the stream and see it in action
(def consumer-control (test-stream-with-producing-multiple-messages actor-system consumer-settings committer-settings producer-settings ["testing_stuff"] "output-topic"))
  1. Streams in action 😃
image
  1. Let's shutdown the stream now
;; shutdown streams using consumer-control var
@(consumer/drain-and-shutdown consumer-control
                             (CompletableFuture/supplyAsync
                               (utils/->fn0 (fn [] ::done)))
                             (actor/get-dispatcher actor-system))
  1. Let's shutdown our actor-system as well
@(actor/terminate actor-system)

Using Alpakka Kafka stream with At-Least-Once Delivery

This is Clojure adaptation of example from Alpakka Kafka documentation

  1. We will create a new stream topology
  • This topology consumes messages
  • s/map-async executes mapping function with 2 messages being processed in parallel and produces multiple producer records
  • These producer-records are wrapped in producer/multi-producer-message-envelope which ensures offset commits only happen when all the producer-records are published
  • Then we will publish messages to another topic and commit offsets to Kafka via s/to-mat and producer/committable-sink
  • Finally, we run the stream with our actor-system using s/run
(defn test-stream-producing-multiple-messages-with-at-least-once-semantics
  [actor-system consumer-settings committer-settings producer-settings consumer-topics producer-topic]
  (-> (consumer/->committable-source consumer-settings consumer-topics)
      (s/map-async 2
                   (fn [message]
                     (let [_key (consumer/key message)      ;; Don't care as it is null
                           value (consumer/value message)
                           committable-offset (consumer/committable-offset message)
                           messages-to-publish (->> (repeat 3 value)
                                                    (mapv #(producer/->producer-record producer-topic (str/upper-case %))))]
                       (producer/multi-producer-message-envelope committable-offset messages-to-publish))))
      (s/via (producer/flexi-flow producer-settings))
      (s/map producer/producer-message-passthrough)
      (s/to-mat (committer/sink committer-settings) consumer/create-draining-control)
      (s/run actor-system)))
  1. Let's create required dependencies
(def actor-system (actor/->actor-system "test-actor-system"))

(def committer-settings (committer/committer-settings actor-system {:batch-size 1}))

(def consumer-settings (consumer/consumer-settings actor-system
                                                   {:group-id "a-test-consumer"
                                                    :bootstrap-servers "localhost:9092"
                                                    :key-deserializer (StringDeserializer.)
                                                    :value-deserializer (StringDeserializer.)}))

(def producer-settings (producer/producer-settings actor-system {:bootstrap-servers "localhost:9092"
                                                                 :key-serializer (StringSerializer.)
                                                                 :value-serializer (StringSerializer.)}))
  1. Let's run the stream and see it in action
(def consumer-control (test-stream-producing-multiple-messages-with-at-least-once-semantics actor-system consumer-settings committer-settings producer-settings ["testing_stuff"] "output-topic"))
  1. Streams in action 😃
image
  1. Let's shutdown the stream now
;; shutdown streams using consumer-control var
@(consumer/drain-and-shutdown consumer-control
                             (CompletableFuture/supplyAsync
                               (utils/->fn0 (fn [] ::done)))
                             (actor/get-dispatcher actor-system))
  1. Let's shutdown our actor-system as well
@(actor/terminate actor-system)

Using Alpakka Kafka stream with Error handling

This is Clojure adaptation of example from Alpakka Kafka documentation

  1. We will add a restart-counter to keep restart-source's action count and require restart-source namespace
(def restart-counter (atom 0))

(require '[fr33m0nk.akka.restart-source :as restart])
  1. Due to limitation with the design of restart-source, consumer-control is not directly accessible. In order to gain access, we will use an atom
(def consumer-control-reference (atom nil))
  1. We will create a new stream topology such that
  • This topology fails 3 times and throws exception. After throwing 3 times it proceeds with following
  • This topology consumes messages
  • s/map executes mapping function with 2 messages being processed in parallel and produces multiple producer records
  • These producer-records are wrapped in producer/multi-producer-message-envelope which ensures offset commits only happen when all the producer-records are published
  • Then we will publish messages to another topic and commit offsets to Kafka via s/via and producer/flexi-flow and committer/flow
  • Finally, we run the RestartSource with our actor-system using s/run-with
(defn test-stream-with-error-handling
  [actor-system restart-settings consumer-settings committer-settings producer-settings consumer-topics producer-topic]
  (-> (restart/->restart-source-on-failures-with-backoff
        restart-settings
        (fn []
          (-> (consumer/->committable-source consumer-settings consumer-topics)
              ;; Hack to gain access to underlying consumer control instance
              ;; reference https://github.com/akka/alpakka-kafka/issues/1291
              ;; reference https://doc.akka.io/docs/alpakka-kafka/current/errorhandling.html#restarting-the-stream-with-a-backoff-stage
              (s/map-materialized-value (fn [consumer-control] (swap! consumer-control-reference #(identity %2) consumer-control) consumer-control))
              (s/map (fn [message]
                       ;; throwing exception to simulate processing failure and stream-restart and incrementing restart-counter
                       (when (< @restart-counter 3)
                         (throw (ex-info "Simulating processing failure" {:error-count (swap! restart-counter inc)})))
                       (let [_key (consumer/key message)    ;; Don't care as it is null
                             value (consumer/value message)
                             committable-offset (consumer/committable-offset message)
                             messages-to-publish (->> (repeat 3 value)
                                                      (mapv #(producer/->producer-record producer-topic (str/upper-case %))))]
                         (producer/multi-producer-message-envelope committable-offset messages-to-publish))))
              (s/via (producer/flexi-flow producer-settings))
              (s/map producer/producer-message-passthrough)
              (s/via (committer/flow committer-settings)))))
      (s/run-with s/ignoring-sink actor-system)))
  1. Let's create required dependencies
(def actor-system (actor/->actor-system "test-actor-system"))

(def committer-settings (committer/committer-settings actor-system {:batch-size 1}))

(def consumer-settings (consumer/consumer-settings actor-system
                                                   {:group-id "a-test-consumer"
                                                    :bootstrap-servers "localhost:9092"
                                                    :key-deserializer (StringDeserializer.)
                                                    :value-deserializer (StringDeserializer.)}))

(def producer-settings (producer/producer-settings actor-system {:bootstrap-servers "localhost:9092"
                                                                 :key-serializer (StringSerializer.)
                                                                 :value-serializer (StringSerializer.)}))

(def restart-settings (restart/restart-settings 1000 5000 0.2 {}))
  1. Let's run the stream and see it in action
(def restart-source (test-stream-with-error-handling actor-system
                                                     restart-settings
                                                     consumer-settings
                                                     committer-settings
                                                     producer-settings
                                                     ["testing_stuff"]
                                                     "output-topic"))
  1. Error handling in action 😃
  • logs:
[INFO] [05/21/2023 03:25:40.542] [test-actor-system-akka.actor.default-dispatcher-4] [SingleSourceLogic(akka://test-actor-system)] [164b3] Completing
[WARN] [05/21/2023 03:25:40.548] [test-actor-system-akka.actor.default-dispatcher-4] [RestartWithBackoffSource(akka://test-actor-system)] Restarting stream due to failure [1]: clojure.lang.ExceptionInfo: Simulating processing failure {:error-count 1}
clojure.lang.ExceptionInfo: Simulating processing failure {:error-count 1}

[INFO] [05/21/2023 03:25:42.320] [test-actor-system-akka.actor.default-dispatcher-8] [SingleSourceLogic(akka://test-actor-system)] [3955d] Completing
[WARN] [05/21/2023 03:25:42.323] [test-actor-system-akka.actor.default-dispatcher-4] [RestartWithBackoffSource(akka://test-actor-system)] Restarting stream due to failure [2]: clojure.lang.ExceptionInfo: Simulating processing failure {:error-count 2}
clojure.lang.ExceptionInfo: Simulating processing failure {:error-count 2}

[INFO] [05/21/2023 03:25:45.315] [test-actor-system-akka.actor.default-dispatcher-6] [SingleSourceLogic(akka://test-actor-system)] [83500] Completing
[WARN] [05/21/2023 03:25:45.316] [test-actor-system-akka.actor.default-dispatcher-4] [RestartWithBackoffSource(akka://test-actor-system)] Restarting stream due to failure [3]: clojure.lang.ExceptionInfo: Simulating processing failure {:error-count 3}
clojure.lang.ExceptionInfo: Simulating processing failure {:error-count 3}
  • restart counter:
@restart-counter
;;=> 3
  1. After 3 restarts, state of stream is healthy again 😃
image
  1. Let's shutdown the stream now
;; shutdown streams using consumer-control var
@(consumer/drain-and-shutdown @consumer-control-reference
                              (CompletableFuture/supplyAsync
                                (utils/->fn0 (fn [] ::done)))
                              (actor/get-dispatcher actor-system))
  1. Let's shutdown our actor-system as well
@(actor/terminate actor-system)

Using Alpakka Kafka stream with Transactional Source and Sink

This is Clojure adaptation of example from Alpakka Kafka documentation

  1. We will require transactional namespace for setting sink up Transactional flow
(require '[fr33m0nk.alpakka-kafka.transactional :as transactional])
  1. Due to limitation with the design of restart-source, consumer-control is not directly accessible. In order to gain access, we will use an atom
(def consumer-control-reference (atom nil))
  1. We will create a new stream topology such that
  • This topology uses a Transactional source and flow and wraps it over RestartSource
  • This topology consumes messages
  • s/map-async executes mapping function with 2 messages being processed in parallel and produces multiple producer records
  • These producer-records are wrapped in producer/multi-producer-message-envelope which ensures offset commits only happen when all the producer-records are published
  • Then we will publish messages to another topic and commit offsets to Kafka via s/via and using transactional/transactional-flow and committer/flow
  • Finally, we run the RestartSource with our actor-system using s/run-with
(defn test-transactional-stream-with-error-handling
  [actor-system restart-settings consumer-settings committer-settings producer-settings consumer-topics producer-topic]
  (-> (restart/->restart-source-on-failures-with-backoff
        restart-settings
        (fn []
          (-> (transactional/->transactional-source consumer-settings consumer-topics)
              ;; Hack to gain access to underlying consumer control instance
              ;; reference https://github.com/akka/alpakka-kafka/issues/1291
              ;; reference https://doc.akka.io/docs/alpakka-kafka/current/errorhandling.html#restarting-the-stream-with-a-backoff-stage
              (s/map-materialized-value (fn [consumer-control] (swap! consumer-control-reference #(identity %2) consumer-control) consumer-control))
              (s/map-async 2 (fn [message]
                               (let [_key (consumer/key message) ;; Don't care as it is null
                                     value (consumer/value message)
                                     partition-offset (consumer/partition-offset message)
                                     messages-to-publish (->> (repeat 3 value)
                                                              (mapv #(producer/->producer-record producer-topic (str/upper-case %))))]
                                 (producer/multi-producer-message-envelope partition-offset messages-to-publish))))
              (s/via (transactional/transactional-flow producer-settings "unique-transaction-id-for-this-application")))))
      (s/run-with s/ignoring-sink actor-system)))
  1. Let's create required dependencies
(def actor-system (actor/->actor-system "test-actor-system"))

(def committer-settings (committer/committer-settings actor-system {:batch-size 1}))

(def consumer-settings (consumer/consumer-settings actor-system
                                                   {:group-id "a-test-consumer"
                                                    :bootstrap-servers "localhost:9092"
                                                    :key-deserializer (StringDeserializer.)
                                                    :value-deserializer (StringDeserializer.)}))

(def producer-settings (producer/producer-settings actor-system {:bootstrap-servers "localhost:9092"
                                                                 :key-serializer (StringSerializer.)
                                                                 :value-serializer (StringSerializer.)}))

(def restart-settings (restart/restart-settings 1000 5000 0.2 {}))
  1. Let's run the stream and see it in action
(def restart-source (test-transactional-stream-with-error-handling actor-system
                                                                   restart-settings
                                                                   consumer-settings
                                                                   committer-settings
                                                                   producer-settings
                                                                   ["testing_stuff"]
                                                                   "output-topic"))
image
  1. Let's shutdown the stream now
;; shutdown streams using consumer-control var
@(consumer/drain-and-shutdown @consumer-control-reference
                              (CompletableFuture/supplyAsync
                                (utils/->fn0 (fn [] ::done)))
                              (actor/get-dispatcher actor-system))
  1. Let's shutdown our actor-system as well
@(actor/terminate actor-system)