Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Examples #1304

Merged
merged 3 commits into from
Mar 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions examples/consumergroup/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Consumergroup example

This example shows you how to use the Sarama consumer group consumer. The example simply starts consuming the given Kafka topics and logs the consumed messages.

```bash
$ go run main.go -brokers="127.0.0.1:9092" -topics="sarama" -group="example"
```
5 changes: 5 additions & 0 deletions examples/consumergroup/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
module github.com/Shopify/sarama/examples/consumer

replace github.com/Shopify/sarama => ../../

require github.com/Shopify/sarama v0.0.0-00010101000000-000000000000
17 changes: 17 additions & 0 deletions examples/consumergroup/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
github.com/DataDog/zstd v1.3.5 h1:DtpNbljikUepEPD16hD4LvIcmhnhdLTiW/5pHgbmp14=
github.com/DataDog/zstd v1.3.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eapache/go-resiliency v1.1.0 h1:1NtRmCAqadE2FN4ZcN6g90TP3uk8cg9rn9eNK2197aU=
github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
131 changes: 131 additions & 0 deletions examples/consumergroup/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package main

import (
"context"
"flag"
"log"
"os"
"os/signal"
"strings"
"syscall"

"github.com/Shopify/sarama"
)

// Sarma configuration options
var (
brokers = ""
version = ""
group = ""
topics = ""
oldest = true
verbose = false
)

func init() {
flag.StringVar(&brokers, "brokers", "", "Kafka bootstrap brokers to connect to, as a comma separated list")
flag.StringVar(&group, "group", "", "Kafka consumer group definition")
flag.StringVar(&version, "version", "2.1.1", "Kafka cluster version")
flag.StringVar(&topics, "topics", "", "Kafka topics to be consumed, as a comma seperated list")
flag.BoolVar(&oldest, "oldest", true, "Kafka consumer consume initial ofset from oldest")
flag.BoolVar(&verbose, "verbose", false, "Sarama logging")
flag.Parse()

if len(brokers) == 0 {
panic("no Kafka bootstrap brokers defined, please set the -brokers flag")
}

if len(topics) == 0 {
panic("no topics given to be consumed, please set the -topics flag")
}

if len(group) == 0 {
panic("no Kafka consumer group defined, please set the -group flag")
}
}

func main() {
log.Println("Starting a new Sarama consumer")

if verbose {
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
}

version, err := sarama.ParseKafkaVersion(version)
if err != nil {
panic(err)
}

/**
* Construct a new Sarama configuration.
* The Kafka cluster version has to be defined before the consumer/producer is initialized.
*/
config := sarama.NewConfig()
config.Version = version

if oldest {
config.Consumer.Offsets.Initial = sarama.OffsetOldest
}

/**
* Setup a new Sarama consumer group
*/
consumer := Consumer{
ready: make(chan bool, 0),
}

ctx := context.Background()
client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), group, config)
if err != nil {
panic(err)
}

go func() {
for {
err := client.Consume(ctx, strings.Split(topics, ","), &consumer)
if err != nil {
panic(err)
}
}
}()

<-consumer.ready // Await till the consumer has been set up
log.Println("Sarama consumer up and running!...")

sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)

<-sigterm // Await a sigterm signal before safely closing the consumer

err = client.Close()
if err != nil {
panic(err)
}
}

// Consumer represents a Sarama consumer group consumer
type Consumer struct {
ready chan bool
}

// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
// Mark the consumer as ready
close(consumer.ready)
return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
session.MarkMessage(message, "")
}

return nil
}