Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sumo Logic output plugin: carbon2 default to include field in metric #8132

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion plugins/outputs/sumologic/sumologic.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,19 @@ func (s *SumoLogic) SetSerializer(serializer serializers.Serializer) {
s.headers = make(map[string]string)
}

switch serializer.(type) {
switch sr := serializer.(type) {
case *carbon2.Serializer:
s.headers[contentTypeHeader] = carbon2ContentType

// In case Carbon2 is used and the metrics format was unset, default to
// include field in metric name.
if sr.IsMetricsFormatUnset() {
sr.SetMetricsFormat(carbon2.Carbon2FormatMetricIncludesField)
}

case *graphite.GraphiteSerializer:
s.headers[contentTypeHeader] = graphiteContentType

case *prometheus.Serializer:
s.headers[contentTypeHeader] = prometheusContentType

Expand Down
101 changes: 63 additions & 38 deletions plugins/outputs/sumologic/sumologic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sumologic

import (
"bufio"
"bytes"
"compress/gzip"
"fmt"
"io"
Expand All @@ -20,7 +21,6 @@ import (
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/plugins/serializers/carbon2"
"github.com/influxdata/telegraf/plugins/serializers/graphite"
"github.com/influxdata/telegraf/plugins/serializers/prometheus"
Expand Down Expand Up @@ -135,7 +135,7 @@ func TestMethod(t *testing.T) {
w.WriteHeader(http.StatusOK)
})

serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate)
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
require.NoError(t, err)

plugin := tt.plugin()
Expand Down Expand Up @@ -212,7 +212,7 @@ func TestStatusCode(t *testing.T) {
w.WriteHeader(tt.statusCode)
})

serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate)
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
require.NoError(t, err)

tt.plugin.SetSerializer(serializer)
Expand All @@ -226,77 +226,102 @@ func TestStatusCode(t *testing.T) {
}

func TestContentType(t *testing.T) {
ts := httptest.NewServer(http.NotFoundHandler())
defer ts.Close()

ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})

u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
require.NoError(t, err)

carbon2Serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate)
require.NoError(t, err)

tests := []struct {
name string
plugin func() *SumoLogic
expectedErr bool
serializer serializers.Serializer
name string
plugin func() *SumoLogic
expectedBody []byte
}{
{
name: "carbon2 is supported",
name: "carbon2 (data format = field separate) is supported",
plugin: func() *SumoLogic {
s := Default()
s.URL = u.String()
s.headers = map[string]string{
contentTypeHeader: carbon2ContentType,
}
sr, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
require.NoError(t, err)
s.SetSerializer(sr)
return s
},
expectedBody: []byte("metric=cpu field=value 42 0\n"),
},
{
name: "carbon2 (data format unset) is supported and falls back to include field in metric name",
plugin: func() *SumoLogic {
s := Default()
s.headers = map[string]string{
contentTypeHeader: carbon2ContentType,
}
sr, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldEmpty))
require.NoError(t, err)
s.SetSerializer(sr)
return s
},
serializer: carbon2Serializer,
expectedErr: false,
expectedBody: []byte("metric=cpu_value 42 0\n"),
},
{
name: "carbon2 (data format = metric includes field) is supported",
plugin: func() *SumoLogic {
s := Default()
s.headers = map[string]string{
contentTypeHeader: carbon2ContentType,
}
sr, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatMetricIncludesField))
require.NoError(t, err)
s.SetSerializer(sr)
return s
},
expectedBody: []byte("metric=cpu_value 42 0\n"),
},
{
name: "graphite is supported",
plugin: func() *SumoLogic {
s := Default()
s.URL = u.String()
s.headers = map[string]string{
contentTypeHeader: graphiteContentType,
}
s.SetSerializer(&graphite.GraphiteSerializer{})
return s
},
serializer: &graphite.GraphiteSerializer{},
expectedErr: false,
},
{
name: "prometheus is supported",
plugin: func() *SumoLogic {
s := Default()
s.URL = u.String()
s.headers = map[string]string{
contentTypeHeader: prometheusContentType,
}
s.SetSerializer(&prometheus.Serializer{})
return s
},
serializer: &prometheus.Serializer{},
expectedErr: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
plugin := tt.plugin()

plugin.SetSerializer(tt.serializer)
var body bytes.Buffer
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
gz, err := gzip.NewReader(r.Body)
require.NoError(t, err)
io.Copy(&body, gz)
w.WriteHeader(http.StatusOK)
}))
defer ts.Close()

err := plugin.Connect()
u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
require.NoError(t, err)

plugin := tt.plugin()
plugin.URL = u.String()

require.NoError(t, plugin.Connect())

err = plugin.Write([]telegraf.Metric{getMetric(t)})
require.NoError(t, err)

if tt.expectedBody != nil {
require.Equal(t, string(tt.expectedBody), body.String())
}
})
}
}
Expand Down Expand Up @@ -338,7 +363,7 @@ func TestContentEncodingGzip(t *testing.T) {
w.WriteHeader(http.StatusNoContent)
})

serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate)
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
require.NoError(t, err)

plugin := tt.plugin()
Expand Down Expand Up @@ -374,7 +399,7 @@ func TestDefaultUserAgent(t *testing.T) {
MaxRequstBodySize: Default().MaxRequstBodySize,
}

serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate)
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
require.NoError(t, err)

plugin.SetSerializer(serializer)
Expand Down Expand Up @@ -627,7 +652,7 @@ func TestMaxRequestBodySize(t *testing.T) {
w.WriteHeader(http.StatusOK)
})

serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate)
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
require.NoError(t, err)

plugin := tt.plugin()
Expand Down Expand Up @@ -659,7 +684,7 @@ func TestTryingToSendEmptyMetricsDoesntFail(t *testing.T) {
plugin := Default()
plugin.URL = u.String()

serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate)
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
require.NoError(t, err)
plugin.SetSerializer(serializer)

Expand Down
61 changes: 40 additions & 21 deletions plugins/serializers/carbon2/carbon2.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,42 +5,38 @@ import (
"fmt"
"strconv"
"strings"
"sync"

"github.com/influxdata/telegraf"
)

type format string

const (
Carbon2FormatFieldSeparate string = "field_separate"
Carbon2FormatMetricIncludesField string = "metric_includes_field"

formatFieldSeparate = format(Carbon2FormatFieldSeparate)
formatMetricIncludesField = format(Carbon2FormatMetricIncludesField)
Carbon2FormatFieldEmpty = format("")
Carbon2FormatFieldSeparate = format("field_separate")
Carbon2FormatMetricIncludesField = format("metric_includes_field")
)

var formats = map[string]format{
// Field separate is the default when no format specified.
"": formatFieldSeparate,
Carbon2FormatFieldSeparate: formatFieldSeparate,
Carbon2FormatMetricIncludesField: formatMetricIncludesField,
var formats = map[format]struct{}{
Carbon2FormatFieldEmpty: {},
Carbon2FormatFieldSeparate: {},
Carbon2FormatMetricIncludesField: {},
}

type Serializer struct {
metricsFormat format
metricsFormat format
metricsFormatLock sync.RWMutex
}

func NewSerializer(f string) (*Serializer, error) {
var (
ok bool
metricsFormat format
)
if metricsFormat, ok = formats[f]; !ok {
func NewSerializer(metricsFormat string) (*Serializer, error) {
var f = format(metricsFormat)
if _, ok := formats[f]; !ok {
return nil, fmt.Errorf("unknown carbon2 format: %s", f)
}

return &Serializer{
metricsFormat: metricsFormat,
metricsFormat: f,
}, nil
}

Expand All @@ -58,17 +54,22 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {

func (s *Serializer) createObject(metric telegraf.Metric) []byte {
var m bytes.Buffer
metricsFormat := s.getMetricsFormat()

for fieldName, fieldValue := range metric.Fields() {
if !isNumeric(fieldValue) {
continue
}

switch s.metricsFormat {
case formatFieldSeparate:
switch metricsFormat {
// Field separate is the default when no format specified.
case Carbon2FormatFieldEmpty:
case Carbon2FormatFieldSeparate:
m.WriteString(serializeMetricFieldSeparate(
metric.Name(), fieldName,
))
case formatMetricIncludesField:

case Carbon2FormatMetricIncludesField:
m.WriteString(serializeMetricIncludeField(
metric.Name(), fieldName,
))
Expand All @@ -93,6 +94,24 @@ func (s *Serializer) createObject(metric telegraf.Metric) []byte {
return m.Bytes()
}

func (s *Serializer) SetMetricsFormat(f format) {
s.metricsFormatLock.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are locks needed here?

Copy link
Contributor Author

@pmalek-sumo pmalek-sumo Sep 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general yes because we don't know how it will be called.
Perhaps calling SetMetricsFormat and IsMetricsFormatUnset from different goroutines will never happen but this is just to be sure.
It's not a hot path so my assumption is that it's ok.
Unless there's a specific concern here I'd leave it this way.

EDIT
Actually, what I'm most concerned is the getMetricsFormat which is called in createObject, which in turn is called in Serialize and SerializeBatch and that I'd like to protect with a lock.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless you've specifically seen race conditions, this should be unnecessary. Plugins have their own dedicated goroutines and the interface functions are never called in parallel by Telegraf. We try to avoid this type of defensive programming, as it tends to be confusing and sets a bad example for other plugins.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Removed.

s.metricsFormat = f
s.metricsFormatLock.Unlock()
}

func (s *Serializer) getMetricsFormat() format {
s.metricsFormatLock.RLock()
defer s.metricsFormatLock.RUnlock()
return s.metricsFormat
}

func (s *Serializer) IsMetricsFormatUnset() bool {
s.metricsFormatLock.RLock()
defer s.metricsFormatLock.RUnlock()
return s.metricsFormat == Carbon2FormatFieldEmpty
}

func serializeMetricFieldSeparate(name, fieldName string) string {
return fmt.Sprintf("metric=%s field=%s ",
strings.Replace(name, " ", "_", -1),
Expand Down
Loading