Skip to content

Latest commit



89 lines (68 loc) · 2.9 KB

File metadata and controls

89 lines (68 loc) · 2.9 KB

![Torna] (

A simple clojure library for your Kafka to 'X' needs..

Why ?

Reading messages from Kafka and doing 'X' is a very common need. This library simplifies that via a common usage pattern. It hides all the complexities about connecting to kafka, reading the messages, handling timeouts etc. It lets you concentrate on the application logic of processing the kafka messages. You simply need to pass your batch-handler, a function that has all your core processing logic and it handles all the low level details about Kafka.

Few Examples of Batch Handlers:

  • Couchbase inserter

  • Elasticsearch indexer

  • Redis inserter

  • Custom processor.


Torna is available from clojars. Add the following to the dependences in your project.clj file:

With Leiningen

Clojars Project


(ns couchbaseinserter
  (:require [torna.core :as torna]))

(def props
  { "couchbaseinserter.mygroupd-id"
   :kafka.zk.connect ",," "topic-name"
   :health.port 31522
   :batch.size 100
   :batch.time 5
   :couchbase.bucketname "mybucket"
   :couchbase.hosts [""

(defn format-mydoc
  "Do Formatting stuff"
  [docid mydoc])

(defn insert-mydocs
  "Do Inserting stuff"

(defn handle-kafka-batch
 "Core application logic handler"
  [props json-docs]
  (let [cb-docs (map (fn [item]
                       (let [mydocid (item "mydocid")]
                         (format-mydoc mydocid item)))
    (insert-mydocs cb-docs)))

(defn -main
  [& args]
  (torna/read-kafka props handle-kafka-batch))

The properties definitions

Property Name Mandatory Default Meaning Yes N/A The Kafka consumer group id
:kafka.zk.connect Yes N/A A comma separated list Kafka host:port/zkpath Yes N/A Kafka topic name to read from
:batch.size Yes N/A Accumulation size until handler is called
:batch.time No N/A Time until handler will be called, when accum messages are less than batch.size
:health.port No N/A Health Port
:couchbase.bucketname No N/A Your custom property
:couchbase.hosts No N/A Your custom property


$ lein uberjar
$ java -jar your-ns-STANDALONE-1.0.0.jar -c <any custom flag>


Copyright © 2015 Yieldbot

Distributed under the Eclipse Public License either version 1.0 or (at your option) any later version.