Skip to content

Commit

Permalink
Ignore errors serializing single metrics (influxdata#5943)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnelson authored and Mathieu Lecarme committed Apr 17, 2020
1 parent 359e932 commit 0185819
Show file tree
Hide file tree
Showing 13 changed files with 42 additions and 28 deletions.
3 changes: 2 additions & 1 deletion plugins/outputs/amqp/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,8 @@ func (q *AMQP) serialize(metrics []telegraf.Metric) ([]byte, error) {
for _, metric := range metrics {
octets, err := q.serializer.Serialize(metric)
if err != nil {
return nil, err
log.Printf("D! [outputs.amqp] Could not serialize metric: %v", err)
continue
}
_, err = buf.Write(octets)
if err != nil {
Expand Down
6 changes: 4 additions & 2 deletions plugins/outputs/cloud_pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package cloud_pubsub

import (
"context"
"encoding/base64"
"fmt"
"log"
"sync"

"cloud.google.com/go/pubsub"
"encoding/base64"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/outputs"
Expand Down Expand Up @@ -229,7 +230,8 @@ func (ps *PubSub) toMessages(metrics []telegraf.Metric) ([]*pubsub.Message, erro
for i, m := range metrics {
b, err := ps.serializer.Serialize(m)
if err != nil {
return nil, err
log.Printf("D! [outputs.cloud_pubsub] Could not serialize metric: %v", err)
continue
}

if ps.Base64Data {
Expand Down
3 changes: 2 additions & 1 deletion plugins/outputs/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package file
import (
"fmt"
"io"
"log"
"os"
"time"

Expand Down Expand Up @@ -101,7 +102,7 @@ func (f *File) Write(metrics []telegraf.Metric) error {
for _, metric := range metrics {
b, err := f.serializer.Serialize(metric)
if err != nil {
return fmt.Errorf("failed to serialize message: %s", err)
log.Printf("D! [outputs.file] Could not serialize metric: %v", err)
}

_, err = f.writer.Write(b)
Expand Down
3 changes: 2 additions & 1 deletion plugins/outputs/instrumental/instrumental.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ func (i *Instrumental) Write(metrics []telegraf.Metric) error {

buf, err := s.Serialize(m)
if err != nil {
log.Printf("E! Error serializing a metric to Instrumental: %s", err)
log.Printf("D! [outputs.instrumental] Could not serialize metric: %v", err)
continue
}

switch metricType {
Expand Down
6 changes: 3 additions & 3 deletions plugins/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@ import (
"log"
"strings"

"github.com/Shopify/sarama"
"github.com/influxdata/telegraf"
tlsint "github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
uuid "github.com/satori/go.uuid"

"github.com/Shopify/sarama"
)

var ValidTopicSuffixMethods = []string{
Expand Down Expand Up @@ -294,7 +293,8 @@ func (k *Kafka) Write(metrics []telegraf.Metric) error {
for _, metric := range metrics {
buf, err := k.serializer.Serialize(metric)
if err != nil {
return err
log.Printf("D! [outputs.kafka] Could not serialize metric: %v", err)
continue
}

m := &sarama.ProducerMessage{
Expand Down
6 changes: 3 additions & 3 deletions plugins/outputs/kinesis/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@ import (

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/satori/go.uuid"

"github.com/influxdata/telegraf"
internalaws "github.com/influxdata/telegraf/internal/config/aws"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/satori/go.uuid"
)

type (
Expand Down Expand Up @@ -221,7 +220,8 @@ func (k *KinesisOutput) Write(metrics []telegraf.Metric) error {

values, err := k.serializer.Serialize(metric)
if err != nil {
return err
log.Printf("D! [outputs.kinesis] Could not serialize metric: %v", err)
continue
}

partitionKey := k.getPartitionKey(metric)
Expand Down
5 changes: 3 additions & 2 deletions plugins/outputs/mqtt/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mqtt

import (
"fmt"
"log"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -150,9 +151,9 @@ func (m *MQTT) Write(metrics []telegraf.Metric) error {
metricsmap[topic] = append(metricsmap[topic], metric)
} else {
buf, err := m.serializer.Serialize(metric)

if err != nil {
return err
log.Printf("D! [outputs.mqtt] Could not serialize metric: %v", err)
continue
}

err = m.publish(topic, buf)
Expand Down
7 changes: 4 additions & 3 deletions plugins/outputs/nats/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package nats

import (
"fmt"

nats_client "github.com/nats-io/go-nats"
"log"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
nats_client "github.com/nats-io/go-nats"
)

type NATS struct {
Expand Down Expand Up @@ -108,7 +108,8 @@ func (n *NATS) Write(metrics []telegraf.Metric) error {
for _, metric := range metrics {
buf, err := n.serializer.Serialize(metric)
if err != nil {
return err
log.Printf("D! [outputs.nats] Could not serialize metric: %v", err)
continue
}

err = n.conn.Publish(n.Subject, buf)
Expand Down
7 changes: 4 additions & 3 deletions plugins/outputs/nsq/nsq.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package nsq

import (
"fmt"

"github.com/nsqio/go-nsq"
"log"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/nsqio/go-nsq"
)

type NSQ struct {
Expand Down Expand Up @@ -68,7 +68,8 @@ func (n *NSQ) Write(metrics []telegraf.Metric) error {
for _, metric := range metrics {
buf, err := n.serializer.Serialize(metric)
if err != nil {
return err
log.Printf("D! [outputs.nsq] Could not serialize metric: %v", err)
continue
}

err = n.producer.Publish(n.Topic, buf)
Expand Down
7 changes: 3 additions & 4 deletions plugins/outputs/socket_writer/socket_writer.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package socket_writer

import (
"crypto/tls"
"fmt"
"log"
"net"
"strings"

"crypto/tls"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
tlsint "github.com/influxdata/telegraf/internal/tls"
Expand Down Expand Up @@ -128,8 +127,8 @@ func (sw *SocketWriter) Write(metrics []telegraf.Metric) error {
for _, m := range metrics {
bs, err := sw.Serialize(m)
if err != nil {
//TODO log & keep going with remaining metrics
return err
log.Printf("D! [outputs.socket_writer] Could not serialize metric: %v", err)
continue
}
if _, err := sw.Conn.Write(bs); err != nil {
//TODO log & keep going with remaining strings
Expand Down
3 changes: 3 additions & 0 deletions plugins/serializers/influx/influx.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
for _, m := range metrics {
_, err := s.Write(&s.buf, m)
if err != nil {
if _, ok := err.(*MetricError); ok {
continue
}
return nil, err
}
}
Expand Down
9 changes: 5 additions & 4 deletions plugins/serializers/influx/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,13 @@ func (r *reader) Read(p []byte) (int, error) {
r.offset += 1
if err != nil {
r.buf.Reset()
if err != nil {
// Since we are serializing multiple metrics, don't fail the
// the entire batch just because of one unserializable metric.
log.Printf("E! [serializers.influx] could not serialize metric: %v; discarding metric", err)
if _, ok := err.(*MetricError); ok {
continue
}
// Since we are serializing multiple metrics, don't fail the
// the entire batch just because of one unserializable metric.
log.Printf("E! [serializers.influx] could not serialize metric: %v; discarding metric", err)
continue
}
break
}
Expand Down
5 changes: 4 additions & 1 deletion plugins/serializers/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ type Serializer interface {
// Serialize takes a single telegraf metric and turns it into a byte buffer.
// separate metrics should be separated by a newline, and there should be
// a newline at the end of the buffer.
//
// New plugins should use SerializeBatch instead to allow for non-line
// delimited metrics.
Serialize(metric telegraf.Metric) ([]byte, error)

// SerializeBatch takes an array of telegraf metric and serializes it into
Expand All @@ -41,7 +44,7 @@ type Serializer interface {
// Config is a struct that covers the data types needed for all serializer types,
// and can be used to instantiate _any_ of the serializers.
type Config struct {
// Dataformat can be one of: influx, graphite, or json
// Dataformat can be one of the serializer types listed in NewSerializer.
DataFormat string

// Support tags in graphite protocol
Expand Down

0 comments on commit 0185819

Please sign in to comment.