From aea09b3a204126d5bf753f86c555c656ebab1b08 Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Mon, 29 Jul 2019 20:41:12 -0700 Subject: [PATCH] Use sarama built in support for consumer groups (#6172) --- Gopkg.lock | 108 ++++- Gopkg.toml | 4 - Makefile | 8 +- docs/LICENSE_OF_DEPENDENCIES.md | 3 +- plugins/inputs/kafka_consumer/README.md | 17 +- .../inputs/kafka_consumer/kafka_consumer.go | 407 +++++++++------- .../kafka_consumer_integration_test.go | 94 ---- .../kafka_consumer/kafka_consumer_test.go | 435 +++++++++++------- 8 files changed, 643 insertions(+), 433 deletions(-) delete mode 100644 plugins/inputs/kafka_consumer/kafka_consumer_integration_test.go diff --git a/Gopkg.lock b/Gopkg.lock index 3bf5686866b67..470c56cda660d 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -61,6 +61,14 @@ revision = "1f7cd6cfe0adea687ad44a512dfe76140f804318" version = "v10.12.0" +[[projects]] + digest = "1:82041ab48e5c76da656b723fdc13a2b9ec716cdc736f82adaac77f5c39d4fca8" + name = "github.com/DataDog/zstd" + packages = ["."] + pruneopts = "" + revision = "2347a397da4ee9c6b8226d4aff82c302d0e52773" + version = "v1.4.1" + [[projects]] branch = "master" digest = "1:298712a3ee36b59c3ca91f4183bd75d174d5eaa8b4aed5072831f126e2e752f6" @@ -81,12 +89,12 @@ version = "v0.4.9" [[projects]] - digest = "1:213b41361ad1cb4768add9d26c2e27794c65264eefdb24ed6ea34cdfeeff3f3c" + digest = "1:5dd52495eaf9fad11f4742f341166aa9eb68f70061fc1a9b546f9481b284b6d8" name = "github.com/Shopify/sarama" packages = ["."] pruneopts = "" - revision = "a6144ae922fd99dd0ea5046c8137acfb7fab0914" - version = "v1.18.0" + revision = "46c83074a05474240f9620fb7c70fb0d80ca401a" + version = "v1.23.1" [[projects]] digest = "1:f82b8ac36058904227087141017bb82f4b0fc58272990a4cdae3e2d6d222644e" @@ -195,14 +203,6 @@ pruneopts = "" revision = "3a771d992973f24aa725d07868b467d1ddfceafb" -[[projects]] - digest = "1:c5978131c797af795972c27c25396c81d1bf53b7b6e8e3e0259e58375765c071" - name = "github.com/bsm/sarama-cluster" - packages = ["."] - pruneopts = "" - revision = "cf455bc755fe41ac9bb2861e7a961833d9c2ecc3" - version = "v2.1.13" - [[projects]] digest = "1:e5691038f8e87e7da05280095d968e50c17d624e25cca095d4e4cd947a805563" name = "github.com/caio/go-tdigest" @@ -640,6 +640,14 @@ pruneopts = "" revision = "6bb64b370b90e7ef1fa532be9e591a81c3493e00" +[[projects]] + digest = "1:0038a7f43b51c8b2a8cd03b5372e73f8eadfe156484c2ae8185ae836f8ebc2cd" + name = "github.com/hashicorp/go-uuid" + packages = ["."] + pruneopts = "" + revision = "4f571afc59f3043a65f8fe6bf46d887b10a01d43" + version = "v1.0.1" + [[projects]] digest = "1:f72168ea995f398bab88e84bd1ff58a983466ba162fb8d50d47420666cd57fad" name = "github.com/hashicorp/serf" @@ -710,6 +718,17 @@ revision = "8faa4453fc7051d1076053f8854077753ab912f2" version = "v3.4.0" +[[projects]] + digest = "1:d45477e90c25c8c6d7d4237281167aa56079382fc042db4b44a8328071649bfa" + name = "github.com/jcmturner/gofork" + packages = [ + "encoding/asn1", + "x/crypto/pbkdf2", + ] + pruneopts = "" + revision = "dc7c13fece037a4a36e2b3c69db4991498d30692" + version = "v1.0.0" + [[projects]] digest = "1:6f49eae0c1e5dab1dafafee34b207aeb7a42303105960944828c2079b92fc88e" name = "github.com/jmespath/go-jmespath" @@ -1523,6 +1542,72 @@ revision = "d2d2541c53f18d2a059457998ce2876cc8e67cbf" version = "v0.9.1" +[[projects]] + digest = "1:4777ba481cc12866b89aafb0a67529e7ac48b9aea06a25f3737b2cf5a3ffda12" + name = "gopkg.in/jcmturner/aescts.v1" + packages = ["."] + pruneopts = "" + revision = "f6abebb3171c4c1b1fea279cb7c7325020a26290" + version = "v1.0.1" + +[[projects]] + digest = "1:84c5b1392ef65ad1bb64da4b4d0beb2f204eefc769d6d96082347bb7057cb7b1" + name = "gopkg.in/jcmturner/dnsutils.v1" + packages = ["."] + pruneopts = "" + revision = "13eeb8d49ffb74d7a75784c35e4d900607a3943c" + version = "v1.0.1" + +[[projects]] + digest = "1:502ab576ba8c47c4de77fe3f2b2386adc1a1447bb5afae2ac7bf0edd2b6f7c52" + name = "gopkg.in/jcmturner/gokrb5.v7" + packages = [ + "asn1tools", + "client", + "config", + "credentials", + "crypto", + "crypto/common", + "crypto/etype", + "crypto/rfc3961", + "crypto/rfc3962", + "crypto/rfc4757", + "crypto/rfc8009", + "gssapi", + "iana", + "iana/addrtype", + "iana/adtype", + "iana/asnAppTag", + "iana/chksumtype", + "iana/errorcode", + "iana/etypeID", + "iana/flags", + "iana/keyusage", + "iana/msgtype", + "iana/nametype", + "iana/patype", + "kadmin", + "keytab", + "krberror", + "messages", + "pac", + "types", + ] + pruneopts = "" + revision = "363118e62befa8a14ff01031c025026077fe5d6d" + version = "v7.3.0" + +[[projects]] + digest = "1:f9956ccc103c6208cd50c71ee5191b6fdcc635972c12624ef949c9b20b2bb9d1" + name = "gopkg.in/jcmturner/rpc.v1" + packages = [ + "mstypes", + "ndr", + ] + pruneopts = "" + revision = "99a8ce2fbf8b8087b6ed12a37c61b10f04070043" + version = "v1.1.0" + [[projects]] digest = "1:367baf06b7dbd0ef0bbdd785f6a79f929c96b0c18e9d3b29c0eed1ac3f5db133" name = "gopkg.in/ldap.v2" @@ -1598,7 +1683,6 @@ "github.com/aws/aws-sdk-go/service/cloudwatch", "github.com/aws/aws-sdk-go/service/dynamodb", "github.com/aws/aws-sdk-go/service/kinesis", - "github.com/bsm/sarama-cluster", "github.com/cisco-ie/nx-telemetry-proto/mdt_dialout", "github.com/cisco-ie/nx-telemetry-proto/telemetry_bis", "github.com/couchbase/go-couchbase", diff --git a/Gopkg.toml b/Gopkg.toml index 92457c626d419..d1c7f45899f43 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -18,10 +18,6 @@ name = "github.com/aws/aws-sdk-go" version = "1.15.54" -[[constraint]] - name = "github.com/bsm/sarama-cluster" - version = "2.1.13" - [[constraint]] name = "github.com/couchbase/go-couchbase" branch = "master" diff --git a/Makefile b/Makefile index 9bf1e342b4573..6c9717c5d129a 100644 --- a/Makefile +++ b/Makefile @@ -1,9 +1,8 @@ -ifeq ($(SHELL), cmd) - VERSION := $(shell git describe --exact-match --tags 2>nil) - HOME := $(HOMEPATH) -else ifeq ($(SHELL), sh.exe) +ifeq ($(OS), Windows_NT) VERSION := $(shell git describe --exact-match --tags 2>nil) HOME := $(HOMEPATH) + CGO_ENABLED ?= 0 + export CGO_ENABLED else VERSION := $(shell git describe --exact-match --tags 2>/dev/null) endif @@ -48,7 +47,6 @@ install: telegraf mkdir -p $(DESTDIR)$(PREFIX)/bin/ cp telegraf $(DESTDIR)$(PREFIX)/bin/ - .PHONY: test test: go test -short ./... diff --git a/docs/LICENSE_OF_DEPENDENCIES.md b/docs/LICENSE_OF_DEPENDENCIES.md index 17bac0a1a5163..755fbbbaeb234 100644 --- a/docs/LICENSE_OF_DEPENDENCIES.md +++ b/docs/LICENSE_OF_DEPENDENCIES.md @@ -14,7 +14,6 @@ following works: - github.com/aws/aws-sdk-go [Apache License 2.0](https://github.com/aws/aws-sdk-go/blob/master/LICENSE.txt) - github.com/Azure/go-autorest [Apache License 2.0](https://github.com/Azure/go-autorest/blob/master/LICENSE) - github.com/beorn7/perks [MIT License](https://github.com/beorn7/perks/blob/master/LICENSE) -- github.com/bsm/sarama-cluster [MIT License](https://github.com/bsm/sarama-cluster/blob/master/LICENSE) - github.com/cenkalti/backoff [MIT License](https://github.com/cenkalti/backoff/blob/master/LICENSE) - github.com/cisco-ie/nx-telemetry-proto [Apache License 2.0](https://github.com/cisco-ie/nx-telemetry-proto/blob/master/LICENSE) - github.com/couchbase/go-couchbase [MIT License](https://github.com/couchbase/go-couchbase/blob/master/LICENSE) @@ -139,4 +138,4 @@ following works: - gopkg.in/yaml.v2 [Apache License 2.0](https://github.com/go-yaml/yaml/blob/v2.2.2/LICENSE) ## telegraf used and modified code from these projects -- github.com/DataDog/datadog-agent [Apache License 2.0](https://github.com/DataDog/datadog-agent/LICENSE) \ No newline at end of file +- github.com/DataDog/datadog-agent [Apache License 2.0](https://github.com/DataDog/datadog-agent/LICENSE) diff --git a/plugins/inputs/kafka_consumer/README.md b/plugins/inputs/kafka_consumer/README.md index 56fc59245ad0e..26ebca39d3e41 100644 --- a/plugins/inputs/kafka_consumer/README.md +++ b/plugins/inputs/kafka_consumer/README.md @@ -10,11 +10,13 @@ and use the old zookeeper connection method. ```toml [[inputs.kafka_consumer]] - ## kafka servers + ## Kafka brokers. brokers = ["localhost:9092"] - ## topic(s) to consume + + ## Topics to consume. topics = ["telegraf"] - ## Add topic as tag if topic_tag is not empty + + ## When set this tag will be added to all metrics with the topic as the value. # topic_tag = "" ## Optional Client id @@ -37,10 +39,11 @@ and use the old zookeeper connection method. # sasl_username = "kafka" # sasl_password = "secret" - ## the name of the consumer group - consumer_group = "telegraf_metrics_consumers" - ## Offset (must be either "oldest" or "newest") - offset = "oldest" + ## Name of the consumer group. + # consumer_group = "telegraf_metrics_consumers" + + ## Initial offset position; one of "oldest" or "newest". + # offset = "oldest" ## Maximum length of a message to consume, in bytes (default 0/unlimited); ## larger messages are dropped diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index 545e37f5a54de..10a6251be2955 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -8,61 +8,20 @@ import ( "sync" "github.com/Shopify/sarama" - cluster "github.com/bsm/sarama-cluster" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal/tls" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" ) -const ( - defaultMaxUndeliveredMessages = 1000 -) - -type empty struct{} -type semaphore chan empty - -type Consumer interface { - Errors() <-chan error - Messages() <-chan *sarama.ConsumerMessage - MarkOffset(msg *sarama.ConsumerMessage, metadata string) - Close() error -} - -type Kafka struct { - ConsumerGroup string `toml:"consumer_group"` - ClientID string `toml:"client_id"` - Topics []string `toml:"topics"` - Brokers []string `toml:"brokers"` - MaxMessageLen int `toml:"max_message_len"` - Version string `toml:"version"` - MaxUndeliveredMessages int `toml:"max_undelivered_messages"` - Offset string `toml:"offset"` - SASLUsername string `toml:"sasl_username"` - SASLPassword string `toml:"sasl_password"` - TopicTag string `toml:"topic_tag"` - - tls.ClientConfig - - cluster Consumer - parser parsers.Parser - wg *sync.WaitGroup - cancel context.CancelFunc - - // Unconfirmed messages - messages map[telegraf.TrackingID]*sarama.ConsumerMessage - - // doNotCommitMsgs tells the parser not to call CommitUpTo on the consumer - // this is mostly for test purposes, but there may be a use-case for it later. - doNotCommitMsgs bool -} - -var sampleConfig = ` - ## kafka servers +const sampleConfig = ` + ## Kafka brokers. brokers = ["localhost:9092"] - ## topic(s) to consume + + ## Topics to consume. topics = ["telegraf"] - ## Add topic as tag if topic_tag is not empty + + ## When set this tag will be added to all metrics with the topic as the value. # topic_tag = "" ## Optional Client id @@ -85,10 +44,12 @@ var sampleConfig = ` # sasl_username = "kafka" # sasl_password = "secret" - ## the name of the consumer group - consumer_group = "telegraf_metrics_consumers" - ## Offset (must be either "oldest" or "newest") - offset = "oldest" + ## Name of the consumer group. + # consumer_group = "telegraf_metrics_consumers" + + ## Initial offset position; one of "oldest" or "newest". + # offset = "oldest" + ## Maximum length of a message to consume, in bytes (default 0/unlimited); ## larger messages are dropped max_message_len = 1000000 @@ -110,22 +71,77 @@ var sampleConfig = ` data_format = "influx" ` -func (k *Kafka) SampleConfig() string { +const ( + defaultMaxUndeliveredMessages = 1000 + defaultMaxMessageLen = 1000000 + defaultConsumerGroup = "telegraf_metrics_consumers" +) + +type empty struct{} +type semaphore chan empty + +type KafkaConsumer struct { + Brokers []string `toml:"brokers"` + ClientID string `toml:"client_id"` + ConsumerGroup string `toml:"consumer_group"` + MaxMessageLen int `toml:"max_message_len"` + MaxUndeliveredMessages int `toml:"max_undelivered_messages"` + Offset string `toml:"offset"` + Topics []string `toml:"topics"` + TopicTag string `toml:"topic_tag"` + Version string `toml:"version"` + SASLPassword string `toml:"sasl_password"` + SASLUsername string `toml:"sasl_username"` + + tls.ClientConfig + + ConsumerCreator ConsumerGroupCreator `toml:"-"` + consumer ConsumerGroup + config *sarama.Config + + parser parsers.Parser + wg sync.WaitGroup + cancel context.CancelFunc +} + +type ConsumerGroup interface { + Consume(ctx context.Context, topics []string, handler sarama.ConsumerGroupHandler) error + Errors() <-chan error + Close() error +} + +type ConsumerGroupCreator interface { + Create(brokers []string, group string, config *sarama.Config) (ConsumerGroup, error) +} + +type SaramaCreator struct{} + +func (*SaramaCreator) Create(brokers []string, group string, config *sarama.Config) (ConsumerGroup, error) { + return sarama.NewConsumerGroup(brokers, group, config) +} + +func (k *KafkaConsumer) SampleConfig() string { return sampleConfig } -func (k *Kafka) Description() string { - return "Read metrics from Kafka topic(s)" +func (k *KafkaConsumer) Description() string { + return "Read metrics from Kafka topics" } -func (k *Kafka) SetParser(parser parsers.Parser) { +func (k *KafkaConsumer) SetParser(parser parsers.Parser) { k.parser = parser } -func (k *Kafka) Start(acc telegraf.Accumulator) error { - var clusterErr error +func (k *KafkaConsumer) Init() error { + if k.MaxUndeliveredMessages == 0 { + k.MaxUndeliveredMessages = defaultMaxUndeliveredMessages + } + if k.ConsumerGroup == "" { + k.ConsumerGroup = defaultConsumerGroup + } - config := cluster.NewConfig() + config := sarama.NewConfig() + config.Consumer.Return.Errors = true if k.Version != "" { version, err := sarama.ParseKafkaVersion(k.Version) @@ -135,172 +151,255 @@ func (k *Kafka) Start(acc telegraf.Accumulator) error { config.Version = version } - config.Consumer.Return.Errors = true - tlsConfig, err := k.ClientConfig.TLSConfig() if err != nil { return err } - if k.ClientID != "" { - config.ClientID = k.ClientID - } else { - config.ClientID = "Telegraf" - } - if tlsConfig != nil { - log.Printf("D! TLS Enabled") config.Net.TLS.Config = tlsConfig config.Net.TLS.Enable = true } + if k.SASLUsername != "" && k.SASLPassword != "" { - log.Printf("D! Using SASL auth with username '%s',", - k.SASLUsername) config.Net.SASL.User = k.SASLUsername config.Net.SASL.Password = k.SASLPassword config.Net.SASL.Enable = true } + if k.ClientID != "" { + config.ClientID = k.ClientID + } else { + config.ClientID = "Telegraf" + } + switch strings.ToLower(k.Offset) { case "oldest", "": config.Consumer.Offsets.Initial = sarama.OffsetOldest case "newest": config.Consumer.Offsets.Initial = sarama.OffsetNewest default: - log.Printf("I! WARNING: Kafka consumer invalid offset '%s', using 'oldest'", - k.Offset) - config.Consumer.Offsets.Initial = sarama.OffsetOldest + return fmt.Errorf("invalid offset %q", k.Offset) } - if k.cluster == nil { - k.cluster, clusterErr = cluster.NewConsumer( - k.Brokers, - k.ConsumerGroup, - k.Topics, - config, - ) - - if clusterErr != nil { - log.Printf("E! Error when creating Kafka Consumer, brokers: %v, topics: %v", - k.Brokers, k.Topics) - return clusterErr - } + if k.ConsumerCreator == nil { + k.ConsumerCreator = &SaramaCreator{} + } + + k.config = config + return nil +} + +func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error { + var err error + k.consumer, err = k.ConsumerCreator.Create( + k.Brokers, + k.ConsumerGroup, + k.config, + ) + if err != nil { + return err } ctx, cancel := context.WithCancel(context.Background()) k.cancel = cancel // Start consumer goroutine - k.wg = &sync.WaitGroup{} k.wg.Add(1) go func() { defer k.wg.Done() - k.receiver(ctx, acc) + for ctx.Err() == nil { + handler := NewConsumerGroupHandler(acc, k.MaxUndeliveredMessages, k.parser) + handler.MaxMessageLen = k.MaxMessageLen + handler.TopicTag = k.TopicTag + err := k.consumer.Consume(ctx, k.Topics, handler) + if err != nil { + acc.AddError(err) + } + } + err = k.consumer.Close() + if err != nil { + acc.AddError(err) + } }() - log.Printf("I! Started the kafka consumer service, brokers: %v, topics: %v", - k.Brokers, k.Topics) + k.wg.Add(1) + go func() { + defer k.wg.Done() + for err := range k.consumer.Errors() { + acc.AddError(err) + } + }() + + return nil +} + +func (k *KafkaConsumer) Gather(acc telegraf.Accumulator) error { return nil } -// receiver() reads all incoming messages from the consumer, and parses them into -// influxdb metric points. -func (k *Kafka) receiver(ctx context.Context, ac telegraf.Accumulator) { - k.messages = make(map[telegraf.TrackingID]*sarama.ConsumerMessage) +func (k *KafkaConsumer) Stop() { + k.cancel() + k.wg.Wait() +} + +// Message is an aggregate type binding the Kafka message and the session so +// that offsets can be updated. +type Message struct { + message *sarama.ConsumerMessage + session sarama.ConsumerGroupSession +} + +func NewConsumerGroupHandler(acc telegraf.Accumulator, maxUndelivered int, parser parsers.Parser) *ConsumerGroupHandler { + handler := &ConsumerGroupHandler{ + acc: acc.WithTracking(maxUndelivered), + sem: make(chan empty, maxUndelivered), + undelivered: make(map[telegraf.TrackingID]Message, maxUndelivered), + parser: parser, + } + return handler +} + +// ConsumerGroupHandler is a sarama.ConsumerGroupHandler implementation. +type ConsumerGroupHandler struct { + MaxMessageLen int + TopicTag string + + acc telegraf.TrackingAccumulator + sem semaphore + parser parsers.Parser + wg sync.WaitGroup + cancel context.CancelFunc + + mu sync.Mutex + undelivered map[telegraf.TrackingID]Message +} + +// Setup is called once when a new session is opened. It setups up the handler +// and begins processing delivered messages. +func (h *ConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error { + h.undelivered = make(map[telegraf.TrackingID]Message) + + ctx, cancel := context.WithCancel(context.Background()) + h.cancel = cancel - acc := ac.WithTracking(k.MaxUndeliveredMessages) - sem := make(semaphore, k.MaxUndeliveredMessages) + h.wg.Add(1) + go func() { + defer h.wg.Done() + h.run(ctx) + }() + return nil +} +// Run processes any delivered metrics during the lifetime of the session. +func (h *ConsumerGroupHandler) run(ctx context.Context) error { for { select { case <-ctx.Done(): - return - case track := <-acc.Delivered(): - <-sem - k.onDelivery(track) - case err := <-k.cluster.Errors(): - acc.AddError(err) - case sem <- empty{}: - select { - case <-ctx.Done(): - return - case track := <-acc.Delivered(): - // Once for the delivered message, once to leave the case - <-sem - <-sem - k.onDelivery(track) - case err := <-k.cluster.Errors(): - <-sem - acc.AddError(err) - case msg := <-k.cluster.Messages(): - err := k.onMessage(acc, msg) - if err != nil { - acc.AddError(err) - <-sem - } - } + return nil + case track := <-h.acc.Delivered(): + h.onDelivery(track) } } } -func (k *Kafka) markOffset(msg *sarama.ConsumerMessage) { - if !k.doNotCommitMsgs { - k.cluster.MarkOffset(msg, "") +func (h *ConsumerGroupHandler) onDelivery(track telegraf.DeliveryInfo) { + h.mu.Lock() + defer h.mu.Unlock() + + msg, ok := h.undelivered[track.ID()] + if !ok { + log.Printf("E! [inputs.kafka_consumer] Could not mark message delivered: %d", track.ID()) + return + } + + if track.Delivered() { + msg.session.MarkMessage(msg.message, "") + } + + delete(h.undelivered, track.ID()) + <-h.sem +} + +// Reserve blocks until there is an available slot for a new message. +func (h *ConsumerGroupHandler) Reserve(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + case h.sem <- empty{}: + return nil } } -func (k *Kafka) onMessage(acc telegraf.TrackingAccumulator, msg *sarama.ConsumerMessage) error { - if k.MaxMessageLen != 0 && len(msg.Value) > k.MaxMessageLen { - k.markOffset(msg) - return fmt.Errorf("Message longer than max_message_len (%d > %d)", - len(msg.Value), k.MaxMessageLen) +func (h *ConsumerGroupHandler) release() { + <-h.sem +} + +// Handle processes a message and if successful saves it to be acknowledged +// after delivery. +func (h *ConsumerGroupHandler) Handle(session sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) error { + if h.MaxMessageLen != 0 && len(msg.Value) > h.MaxMessageLen { + session.MarkMessage(msg, "") + h.release() + return fmt.Errorf("message exceeds max_message_len (actual %d, max %d)", + len(msg.Value), h.MaxMessageLen) } - metrics, err := k.parser.Parse(msg.Value) + metrics, err := h.parser.Parse(msg.Value) if err != nil { + h.release() return err } - if len(k.TopicTag) > 0 { + + if len(h.TopicTag) > 0 { for _, metric := range metrics { - metric.AddTag(k.TopicTag, msg.Topic) + metric.AddTag(h.TopicTag, msg.Topic) } } - id := acc.AddTrackingMetricGroup(metrics) - k.messages[id] = msg + id := h.acc.AddTrackingMetricGroup(metrics) + h.mu.Lock() + h.undelivered[id] = Message{session: session, message: msg} + h.mu.Unlock() return nil } -func (k *Kafka) onDelivery(track telegraf.DeliveryInfo) { - msg, ok := k.messages[track.ID()] - if !ok { - log.Printf("E! [inputs.kafka_consumer] Could not mark message delivered: %d", track.ID()) - return - } - - if track.Delivered() { - k.markOffset(msg) - } - delete(k.messages, track.ID()) -} +// ConsumeClaim is called once each claim in a goroutine and must be +// thread-safe. Should run until the claim is closed. +func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + ctx := session.Context() -func (k *Kafka) Stop() { - k.cancel() - k.wg.Wait() + for { + err := h.Reserve(ctx) + if err != nil { + return nil + } - if err := k.cluster.Close(); err != nil { - log.Printf("E! [inputs.kafka_consumer] Error closing consumer: %v", err) + select { + case <-ctx.Done(): + return nil + case msg, ok := <-claim.Messages(): + if !ok { + return nil + } + err := h.Handle(session, msg) + if err != nil { + h.acc.AddError(err) + } + } } } -func (k *Kafka) Gather(acc telegraf.Accumulator) error { +// Cleanup stops the internal goroutine and is called after all ConsumeClaim +// functions have completed. +func (h *ConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error { + h.cancel() + h.wg.Wait() return nil } func init() { inputs.Add("kafka_consumer", func() telegraf.Input { - return &Kafka{ - MaxUndeliveredMessages: defaultMaxUndeliveredMessages, - } + return &KafkaConsumer{} }) } diff --git a/plugins/inputs/kafka_consumer/kafka_consumer_integration_test.go b/plugins/inputs/kafka_consumer/kafka_consumer_integration_test.go deleted file mode 100644 index 23f9e0f920ac6..0000000000000 --- a/plugins/inputs/kafka_consumer/kafka_consumer_integration_test.go +++ /dev/null @@ -1,94 +0,0 @@ -package kafka_consumer - -import ( - "fmt" - "testing" - "time" - - "github.com/Shopify/sarama" - "github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/influxdata/telegraf/plugins/parsers" -) - -func TestReadsMetricsFromKafka(t *testing.T) { - if testing.Short() { - t.Skip("Skipping integration test in short mode") - } - - brokerPeers := []string{testutil.GetLocalHost() + ":9092"} - testTopic := fmt.Sprintf("telegraf_test_topic_%d", time.Now().Unix()) - - // Send a Kafka message to the kafka host - msg := "cpu_load_short,direction=in,host=server01,region=us-west value=23422.0 1422568543702900257\n" - producer, err := sarama.NewSyncProducer(brokerPeers, nil) - require.NoError(t, err) - _, _, err = producer.SendMessage( - &sarama.ProducerMessage{ - Topic: testTopic, - Value: sarama.StringEncoder(msg), - }) - require.NoError(t, err) - defer producer.Close() - - // Start the Kafka Consumer - k := &Kafka{ - ConsumerGroup: "telegraf_test_consumers", - Topics: []string{testTopic}, - Brokers: brokerPeers, - Offset: "oldest", - } - p, _ := parsers.NewInfluxParser() - k.SetParser(p) - - // Verify that we can now gather the sent message - var acc testutil.Accumulator - - // Sanity check - assert.Equal(t, 0, len(acc.Metrics), "There should not be any points") - if err := k.Start(&acc); err != nil { - t.Fatal(err.Error()) - } else { - defer k.Stop() - } - - waitForPoint(&acc, t) - - // Gather points - err = acc.GatherError(k.Gather) - require.NoError(t, err) - if len(acc.Metrics) == 1 { - point := acc.Metrics[0] - assert.Equal(t, "cpu_load_short", point.Measurement) - assert.Equal(t, map[string]interface{}{"value": 23422.0}, point.Fields) - assert.Equal(t, map[string]string{ - "host": "server01", - "direction": "in", - "region": "us-west", - }, point.Tags) - assert.Equal(t, time.Unix(0, 1422568543702900257).Unix(), point.Time.Unix()) - } else { - t.Errorf("No points found in accumulator, expected 1") - } -} - -// Waits for the metric that was sent to the kafka broker to arrive at the kafka -// consumer -func waitForPoint(acc *testutil.Accumulator, t *testing.T) { - // Give the kafka container up to 2 seconds to get the point to the consumer - ticker := time.NewTicker(5 * time.Millisecond) - counter := 0 - for { - select { - case <-ticker.C: - counter++ - if counter > 1000 { - t.Fatal("Waited for 5s, point never arrived to consumer") - } else if acc.NFields() == 1 { - return - } - } - } -} diff --git a/plugins/inputs/kafka_consumer/kafka_consumer_test.go b/plugins/inputs/kafka_consumer/kafka_consumer_test.go index a4d06efe6fba8..3aa0efa506db3 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer_test.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer_test.go @@ -2,219 +2,344 @@ package kafka_consumer import ( "context" - "strings" "testing" + "time" "github.com/Shopify/sarama" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/plugins/parsers/value" "github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) -const ( - testMsg = "cpu_load_short,host=server01 value=23422.0 1422568543702900257\n" - testMsgGraphite = "cpu.load.short.graphite 23422 1454780029" - testMsgJSON = "{\"a\": 5, \"b\": {\"c\": 6}}\n" - invalidMsg = "cpu_load_short,host=server01 1422568543702900257\n" -) +type FakeConsumerGroup struct { + brokers []string + group string + config *sarama.Config -type TestConsumer struct { - errors chan error - messages chan *sarama.ConsumerMessage + handler sarama.ConsumerGroupHandler + errors chan error } -func (c *TestConsumer) Errors() <-chan error { - return c.errors +func (g *FakeConsumerGroup) Consume(ctx context.Context, topics []string, handler sarama.ConsumerGroupHandler) error { + g.handler = handler + g.handler.Setup(nil) + return nil } -func (c *TestConsumer) Messages() <-chan *sarama.ConsumerMessage { - return c.messages +func (g *FakeConsumerGroup) Errors() <-chan error { + return g.errors } -func (c *TestConsumer) MarkOffset(msg *sarama.ConsumerMessage, metadata string) { +func (g *FakeConsumerGroup) Close() error { + close(g.errors) + return nil } -func (c *TestConsumer) Close() error { - return nil +type FakeCreator struct { + ConsumerGroup *FakeConsumerGroup } -func (c *TestConsumer) Inject(msg *sarama.ConsumerMessage) { - c.messages <- msg +func (c *FakeCreator) Create(brokers []string, group string, config *sarama.Config) (ConsumerGroup, error) { + c.ConsumerGroup.brokers = brokers + c.ConsumerGroup.group = group + c.ConsumerGroup.config = config + return c.ConsumerGroup, nil } -func newTestKafka() (*Kafka, *TestConsumer) { - consumer := &TestConsumer{ - errors: make(chan error), - messages: make(chan *sarama.ConsumerMessage, 1000), +func TestInit(t *testing.T) { + tests := []struct { + name string + plugin *KafkaConsumer + initError bool + check func(t *testing.T, plugin *KafkaConsumer) + }{ + { + name: "default config", + plugin: &KafkaConsumer{}, + check: func(t *testing.T, plugin *KafkaConsumer) { + require.Equal(t, plugin.ConsumerGroup, defaultConsumerGroup) + require.Equal(t, plugin.MaxUndeliveredMessages, defaultMaxUndeliveredMessages) + require.Equal(t, plugin.config.ClientID, "Telegraf") + require.Equal(t, plugin.config.Consumer.Offsets.Initial, sarama.OffsetOldest) + }, + }, + { + name: "parses valid version string", + plugin: &KafkaConsumer{ + Version: "1.0.0", + }, + check: func(t *testing.T, plugin *KafkaConsumer) { + require.Equal(t, plugin.config.Version, sarama.V1_0_0_0) + }, + }, + { + name: "invalid version string", + plugin: &KafkaConsumer{ + Version: "100", + }, + initError: true, + }, + { + name: "custom client_id", + plugin: &KafkaConsumer{ + ClientID: "custom", + }, + check: func(t *testing.T, plugin *KafkaConsumer) { + require.Equal(t, plugin.config.ClientID, "custom") + }, + }, + { + name: "custom offset", + plugin: &KafkaConsumer{ + Offset: "newest", + }, + check: func(t *testing.T, plugin *KafkaConsumer) { + require.Equal(t, plugin.config.Consumer.Offsets.Initial, sarama.OffsetNewest) + }, + }, + { + name: "invalid offset", + plugin: &KafkaConsumer{ + Offset: "middle", + }, + initError: true, + }, } - k := Kafka{ - cluster: consumer, - ConsumerGroup: "test", - Topics: []string{"telegraf"}, - Brokers: []string{"localhost:9092"}, - Offset: "oldest", - MaxUndeliveredMessages: defaultMaxUndeliveredMessages, - doNotCommitMsgs: true, - messages: make(map[telegraf.TrackingID]*sarama.ConsumerMessage), + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cg := &FakeConsumerGroup{} + tt.plugin.ConsumerCreator = &FakeCreator{ConsumerGroup: cg} + err := tt.plugin.Init() + if tt.initError { + require.Error(t, err) + return + } + + tt.check(t, tt.plugin) + }) } - return &k, consumer } -func newTestKafkaWithTopicTag() (*Kafka, *TestConsumer) { - consumer := &TestConsumer{ - errors: make(chan error), - messages: make(chan *sarama.ConsumerMessage, 1000), - } - k := Kafka{ - cluster: consumer, - ConsumerGroup: "test", - Topics: []string{"telegraf"}, - Brokers: []string{"localhost:9092"}, - Offset: "oldest", - MaxUndeliveredMessages: defaultMaxUndeliveredMessages, - doNotCommitMsgs: true, - messages: make(map[telegraf.TrackingID]*sarama.ConsumerMessage), - TopicTag: "topic", +func TestStartStop(t *testing.T) { + cg := &FakeConsumerGroup{errors: make(chan error)} + plugin := &KafkaConsumer{ + ConsumerCreator: &FakeCreator{ConsumerGroup: cg}, } - return &k, consumer -} + err := plugin.Init() + require.NoError(t, err) -// Test that the parser parses kafka messages into points -func TestRunParser(t *testing.T) { - k, consumer := newTestKafka() - acc := testutil.Accumulator{} - ctx := context.Background() + var acc testutil.Accumulator + err = plugin.Start(&acc) + require.NoError(t, err) - k.parser, _ = parsers.NewInfluxParser() - go k.receiver(ctx, &acc) - consumer.Inject(saramaMsg(testMsg)) - acc.Wait(1) + plugin.Stop() +} - assert.Equal(t, acc.NFields(), 1) +type FakeConsumerGroupSession struct { + ctx context.Context } -// Test that the parser parses kafka messages into points -// and adds the topic tag -func TestRunParserWithTopic(t *testing.T) { - k, consumer := newTestKafkaWithTopicTag() - acc := testutil.Accumulator{} - ctx := context.Background() +func (s *FakeConsumerGroupSession) Claims() map[string][]int32 { + panic("not implemented") +} - k.parser, _ = parsers.NewInfluxParser() - go k.receiver(ctx, &acc) - consumer.Inject(saramaMsgWithTopic(testMsg, "test_topic")) - acc.Wait(1) +func (s *FakeConsumerGroupSession) MemberID() string { + panic("not implemented") +} - assert.Equal(t, acc.NFields(), 1) - assert.True(t, acc.HasTag("cpu_load_short", "topic")) +func (s *FakeConsumerGroupSession) GenerationID() int32 { + panic("not implemented") } -// Test that the parser ignores invalid messages -func TestRunParserInvalidMsg(t *testing.T) { - k, consumer := newTestKafka() - acc := testutil.Accumulator{} - ctx := context.Background() +func (s *FakeConsumerGroupSession) MarkOffset(topic string, partition int32, offset int64, metadata string) { + panic("not implemented") +} - k.parser, _ = parsers.NewInfluxParser() - go k.receiver(ctx, &acc) - consumer.Inject(saramaMsg(invalidMsg)) - acc.WaitError(1) +func (s *FakeConsumerGroupSession) ResetOffset(topic string, partition int32, offset int64, metadata string) { + panic("not implemented") +} - assert.Equal(t, acc.NFields(), 0) +func (s *FakeConsumerGroupSession) MarkMessage(msg *sarama.ConsumerMessage, metadata string) { } -// Test that overlong messages are dropped -func TestDropOverlongMsg(t *testing.T) { - const maxMessageLen = 64 * 1024 - k, consumer := newTestKafka() - k.MaxMessageLen = maxMessageLen - acc := testutil.Accumulator{} - ctx := context.Background() - overlongMsg := strings.Repeat("v", maxMessageLen+1) +func (s *FakeConsumerGroupSession) Context() context.Context { + return s.ctx +} - go k.receiver(ctx, &acc) - consumer.Inject(saramaMsg(overlongMsg)) - acc.WaitError(1) +type FakeConsumerGroupClaim struct { + messages chan *sarama.ConsumerMessage +} - assert.Equal(t, acc.NFields(), 0) +func (c *FakeConsumerGroupClaim) Topic() string { + panic("not implemented") } -// Test that the parser parses kafka messages into points -func TestRunParserAndGather(t *testing.T) { - k, consumer := newTestKafka() - acc := testutil.Accumulator{} - ctx := context.Background() +func (c *FakeConsumerGroupClaim) Partition() int32 { + panic("not implemented") +} - k.parser, _ = parsers.NewInfluxParser() - go k.receiver(ctx, &acc) - consumer.Inject(saramaMsg(testMsg)) - acc.Wait(1) +func (c *FakeConsumerGroupClaim) InitialOffset() int64 { + panic("not implemented") +} - acc.GatherError(k.Gather) +func (c *FakeConsumerGroupClaim) HighWaterMarkOffset() int64 { + panic("not implemented") +} - assert.Equal(t, acc.NFields(), 1) - acc.AssertContainsFields(t, "cpu_load_short", - map[string]interface{}{"value": float64(23422)}) +func (c *FakeConsumerGroupClaim) Messages() <-chan *sarama.ConsumerMessage { + return c.messages } -// Test that the parser parses kafka messages into points -func TestRunParserAndGatherGraphite(t *testing.T) { - k, consumer := newTestKafka() - acc := testutil.Accumulator{} - ctx := context.Background() +func TestConsumerGroupHandler_Lifecycle(t *testing.T) { + acc := &testutil.Accumulator{} + parser := &value.ValueParser{MetricName: "cpu", DataType: "int"} + cg := NewConsumerGroupHandler(acc, 1, parser) - k.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil) - go k.receiver(ctx, &acc) - consumer.Inject(saramaMsg(testMsgGraphite)) - acc.Wait(1) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + session := &FakeConsumerGroupSession{ + ctx: ctx, + } + var claim FakeConsumerGroupClaim + var err error - acc.GatherError(k.Gather) + err = cg.Setup(session) + require.NoError(t, err) - assert.Equal(t, acc.NFields(), 1) - acc.AssertContainsFields(t, "cpu_load_short_graphite", - map[string]interface{}{"value": float64(23422)}) + cancel() + err = cg.ConsumeClaim(session, &claim) + require.NoError(t, err) + + err = cg.Cleanup(session) + require.NoError(t, err) } -// Test that the parser parses kafka messages into points -func TestRunParserAndGatherJSON(t *testing.T) { - k, consumer := newTestKafka() - acc := testutil.Accumulator{} - ctx := context.Background() +func TestConsumerGroupHandler_ConsumeClaim(t *testing.T) { + acc := &testutil.Accumulator{} + parser := &value.ValueParser{MetricName: "cpu", DataType: "int"} + cg := NewConsumerGroupHandler(acc, 1, parser) - k.parser, _ = parsers.NewParser(&parsers.Config{ - DataFormat: "json", - MetricName: "kafka_json_test", - }) - go k.receiver(ctx, &acc) - consumer.Inject(saramaMsg(testMsgJSON)) - acc.Wait(1) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - acc.GatherError(k.Gather) + session := &FakeConsumerGroupSession{ctx: ctx} + claim := &FakeConsumerGroupClaim{ + messages: make(chan *sarama.ConsumerMessage, 1), + } - assert.Equal(t, acc.NFields(), 2) - acc.AssertContainsFields(t, "kafka_json_test", - map[string]interface{}{ - "a": float64(5), - "b_c": float64(6), - }) -} + err := cg.Setup(session) + require.NoError(t, err) -func saramaMsg(val string) *sarama.ConsumerMessage { - return &sarama.ConsumerMessage{ - Key: nil, - Value: []byte(val), - Offset: 0, - Partition: 0, + claim.messages <- &sarama.ConsumerMessage{ + Topic: "telegraf", + Value: []byte("42"), } + + go func() { + err = cg.ConsumeClaim(session, claim) + require.NoError(t, err) + }() + + acc.Wait(1) + cancel() + + err = cg.Cleanup(session) + require.NoError(t, err) + + expected := []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 42, + }, + time.Now(), + ), + } + + testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime()) } -func saramaMsgWithTopic(val string, topic string) *sarama.ConsumerMessage { - return &sarama.ConsumerMessage{ - Key: nil, - Value: []byte(val), - Offset: 0, - Partition: 0, - Topic: topic, +func TestConsumerGroupHandler_Handle(t *testing.T) { + tests := []struct { + name string + maxMessageLen int + topicTag string + msg *sarama.ConsumerMessage + expected []telegraf.Metric + }{ + { + name: "happy path", + msg: &sarama.ConsumerMessage{ + Topic: "telegraf", + Value: []byte("42"), + }, + expected: []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 42, + }, + time.Now(), + ), + }, + }, + { + name: "message to long", + maxMessageLen: 4, + msg: &sarama.ConsumerMessage{ + Topic: "telegraf", + Value: []byte("12345"), + }, + expected: []telegraf.Metric{}, + }, + { + name: "parse error", + msg: &sarama.ConsumerMessage{ + Topic: "telegraf", + Value: []byte("not an integer"), + }, + expected: []telegraf.Metric{}, + }, + { + name: "add topic tag", + topicTag: "topic", + msg: &sarama.ConsumerMessage{ + Topic: "telegraf", + Value: []byte("42"), + }, + expected: []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{ + "topic": "telegraf", + }, + map[string]interface{}{ + "value": 42, + }, + time.Now(), + ), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + acc := &testutil.Accumulator{} + parser := &value.ValueParser{MetricName: "cpu", DataType: "int"} + cg := NewConsumerGroupHandler(acc, 1, parser) + cg.MaxMessageLen = tt.maxMessageLen + cg.TopicTag = tt.topicTag + + ctx := context.Background() + session := &FakeConsumerGroupSession{ctx: ctx} + + cg.Reserve(ctx) + cg.Handle(session, tt.msg) + + testutil.RequireMetricsEqual(t, tt.expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime()) + }) } }