Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sumo Logic output plugin: carbon2 default to include field in metric #26

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion plugins/outputs/sumologic/sumologic.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,19 @@ func (s *SumoLogic) SetSerializer(serializer serializers.Serializer) {
s.headers = make(map[string]string)
}

switch serializer.(type) {
switch sr := serializer.(type) {
case *carbon2.Serializer:
s.headers[contentTypeHeader] = carbon2ContentType

// In case Carbon2 is used and the metrics format was unset, default to
// include field in metric name.
if sr.IsMetricsFormatUnset() {
sr.SetMetricsFormat(carbon2.Carbon2FormatMetricIncludesField)
}

case *graphite.GraphiteSerializer:
s.headers[contentTypeHeader] = graphiteContentType

case *prometheus.Serializer:
s.headers[contentTypeHeader] = prometheusContentType

Expand Down
100 changes: 63 additions & 37 deletions plugins/outputs/sumologic/sumologic_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package sumologic

import (
"bytes"
"compress/gzip"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
Expand All @@ -17,7 +19,6 @@ import (
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/plugins/serializers/carbon2"
"github.com/influxdata/telegraf/plugins/serializers/graphite"
"github.com/influxdata/telegraf/plugins/serializers/prometheus"
Expand Down Expand Up @@ -133,7 +134,7 @@ func TestMethod(t *testing.T) {
w.WriteHeader(http.StatusOK)
})

serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate)
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
require.NoError(t, err)

plugin := tt.plugin()
Expand Down Expand Up @@ -210,7 +211,7 @@ func TestStatusCode(t *testing.T) {
w.WriteHeader(tt.statusCode)
})

serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate)
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
require.NoError(t, err)

tt.plugin.SetSerializer(serializer)
Expand All @@ -224,77 +225,102 @@ func TestStatusCode(t *testing.T) {
}

func TestContentType(t *testing.T) {
ts := httptest.NewServer(http.NotFoundHandler())
defer ts.Close()

ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})

u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
require.NoError(t, err)

carbon2Serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate)
require.NoError(t, err)

tests := []struct {
name string
plugin func() *SumoLogic
expectedErr bool
serializer serializers.Serializer
name string
plugin func() *SumoLogic
expectedBody []byte
}{
{
name: "carbon2 is supported",
name: "carbon2 (data format = field separate) is supported",
plugin: func() *SumoLogic {
s := Default()
s.URL = u.String()
s.headers = map[string]string{
contentTypeHeader: carbon2ContentType,
}
sr, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
require.NoError(t, err)
s.SetSerializer(sr)
return s
},
expectedBody: []byte("metric=cpu field=value 42 0\n"),
},
{
name: "carbon2 (data format unset) is supported and falls back to include field in metric name",
plugin: func() *SumoLogic {
s := Default()
s.headers = map[string]string{
contentTypeHeader: carbon2ContentType,
}
sr, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldEmpty))
require.NoError(t, err)
s.SetSerializer(sr)
return s
},
serializer: carbon2Serializer,
expectedErr: false,
expectedBody: []byte("metric=cpu_value 42 0\n"),
},
{
name: "carbon2 (data format = metric includes field) is supported",
plugin: func() *SumoLogic {
s := Default()
s.headers = map[string]string{
contentTypeHeader: carbon2ContentType,
}
sr, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatMetricIncludesField))
require.NoError(t, err)
s.SetSerializer(sr)
return s
},
expectedBody: []byte("metric=cpu_value 42 0\n"),
},
{
name: "graphite is supported",
plugin: func() *SumoLogic {
s := Default()
s.URL = u.String()
s.headers = map[string]string{
contentTypeHeader: graphiteContentType,
}
s.SetSerializer(&graphite.GraphiteSerializer{})
return s
},
serializer: &graphite.GraphiteSerializer{},
expectedErr: false,
},
{
name: "prometheus is supported",
plugin: func() *SumoLogic {
s := Default()
s.URL = u.String()
s.headers = map[string]string{
contentTypeHeader: prometheusContentType,
}
s.SetSerializer(&prometheus.Serializer{})
return s
},
serializer: &prometheus.Serializer{},
expectedErr: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
plugin := tt.plugin()

plugin.SetSerializer(tt.serializer)
var body bytes.Buffer
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
gz, err := gzip.NewReader(r.Body)
require.NoError(t, err)
io.Copy(&body, gz)
w.WriteHeader(http.StatusOK)
}))
defer ts.Close()

err := plugin.Connect()
u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
require.NoError(t, err)

plugin := tt.plugin()
plugin.URL = u.String()

require.NoError(t, plugin.Connect())

err = plugin.Write([]telegraf.Metric{getMetric(t)})
require.NoError(t, err)

if tt.expectedBody != nil {
require.Equal(t, string(tt.expectedBody), body.String())
}
})
}
}
Expand Down Expand Up @@ -336,7 +362,7 @@ func TestContentEncodingGzip(t *testing.T) {
w.WriteHeader(http.StatusNoContent)
})

serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate)
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
require.NoError(t, err)

plugin := tt.plugin()
Expand Down Expand Up @@ -372,7 +398,7 @@ func TestDefaultUserAgent(t *testing.T) {
MaxRequstBodySize: Default().MaxRequstBodySize,
}

serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate)
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
require.NoError(t, err)

plugin.SetSerializer(serializer)
Expand Down Expand Up @@ -555,7 +581,7 @@ func TestMaxRequestBodySize(t *testing.T) {
w.WriteHeader(http.StatusOK)
})

serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate)
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
require.NoError(t, err)

plugin := tt.plugin()
Expand Down
61 changes: 40 additions & 21 deletions plugins/serializers/carbon2/carbon2.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,42 +5,38 @@ import (
"fmt"
"strconv"
"strings"
"sync"

"github.com/influxdata/telegraf"
)

type format string

const (
Carbon2FormatFieldSeparate string = "field_separate"
Carbon2FormatMetricIncludesField string = "metric_includes_field"

formatFieldSeparate = format(Carbon2FormatFieldSeparate)
formatMetricIncludesField = format(Carbon2FormatMetricIncludesField)
Carbon2FormatFieldEmpty = format("")
Carbon2FormatFieldSeparate = format("field_separate")
Carbon2FormatMetricIncludesField = format("metric_includes_field")
)

var formats = map[string]format{
// Field separate is the default when no format specified.
"": formatFieldSeparate,
Carbon2FormatFieldSeparate: formatFieldSeparate,
Carbon2FormatMetricIncludesField: formatMetricIncludesField,
var formats = map[format]struct{}{
Carbon2FormatFieldEmpty: {},
Carbon2FormatFieldSeparate: {},
Carbon2FormatMetricIncludesField: {},
}

type Serializer struct {
metricsFormat format
metricsFormat format
metricsFormatLock sync.RWMutex
}

func NewSerializer(f string) (*Serializer, error) {
var (
ok bool
metricsFormat format
)
if metricsFormat, ok = formats[f]; !ok {
func NewSerializer(metricsFormat string) (*Serializer, error) {
var f = format(metricsFormat)
if _, ok := formats[f]; !ok {
return nil, fmt.Errorf("unknown carbon2 format: %s", f)
}

return &Serializer{
metricsFormat: metricsFormat,
metricsFormat: f,
}, nil
}

Expand All @@ -58,17 +54,22 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {

func (s *Serializer) createObject(metric telegraf.Metric) []byte {
var m bytes.Buffer
metricsFormat := s.getMetricsFormat()

for fieldName, fieldValue := range metric.Fields() {
if !isNumeric(fieldValue) {
continue
}

switch s.metricsFormat {
case formatFieldSeparate:
switch metricsFormat {
// Field separate is the default when no format specified.
case Carbon2FormatFieldEmpty:
case Carbon2FormatFieldSeparate:
m.WriteString(serializeMetricFieldSeparate(
metric.Name(), fieldName,
))
case formatMetricIncludesField:

case Carbon2FormatMetricIncludesField:
m.WriteString(serializeMetricIncludeField(
metric.Name(), fieldName,
))
Expand All @@ -93,6 +94,24 @@ func (s *Serializer) createObject(metric telegraf.Metric) []byte {
return m.Bytes()
}

func (s *Serializer) SetMetricsFormat(f format) {
s.metricsFormatLock.Lock()
s.metricsFormat = f
s.metricsFormatLock.Unlock()
}

func (s *Serializer) getMetricsFormat() format {
s.metricsFormatLock.RLock()
defer s.metricsFormatLock.RUnlock()
return s.metricsFormat
}

func (s *Serializer) IsMetricsFormatUnset() bool {
s.metricsFormatLock.RLock()
defer s.metricsFormatLock.RUnlock()
return s.metricsFormat == Carbon2FormatFieldEmpty
}

func serializeMetricFieldSeparate(name, fieldName string) string {
return fmt.Sprintf("metric=%s field=%s ",
strings.Replace(name, " ", "_", -1),
Expand Down
Loading