Skip to content

Commit

Permalink
Do not add invalid timestamps to kafka messages (#6908)
Browse files Browse the repository at this point in the history
(cherry picked from commit f6b3026)
  • Loading branch information
danielnelson committed Jan 15, 2020
1 parent e9b3e86 commit aa3d0a9
Showing 1 changed file with 10 additions and 3 deletions.
13 changes: 10 additions & 3 deletions plugins/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"log"
"strings"
"time"

"github.com/Shopify/sarama"
"github.com/gofrs/uuid"
Expand All @@ -20,6 +21,8 @@ var ValidTopicSuffixMethods = []string{
"tags",
}

var zeroTime = time.Unix(0, 0)

type (
Kafka struct {
Brokers []string
Expand Down Expand Up @@ -323,9 +326,13 @@ func (k *Kafka) Write(metrics []telegraf.Metric) error {
}

m := &sarama.ProducerMessage{
Topic: k.GetTopicName(metric),
Value: sarama.ByteEncoder(buf),
Timestamp: metric.Time(),
Topic: k.GetTopicName(metric),
Value: sarama.ByteEncoder(buf),
}

// Negative timestamps are not allowed by the Kafka protocol.
if !metric.Time().Before(zeroTime) {
m.Timestamp = metric.Time()
}

key, err := k.routingKey(metric)
Expand Down

0 comments on commit aa3d0a9

Please sign in to comment.