Skip to content

Commit

Permalink
Sumo Logic output plugin: carbon2 default to include field in metric (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
pmalek-sumo authored Sep 16, 2020
1 parent d764f86 commit d6a0f00
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 77 deletions.
10 changes: 9 additions & 1 deletion plugins/outputs/sumologic/sumologic.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,19 @@ func (s *SumoLogic) SetSerializer(serializer serializers.Serializer) {
s.headers = make(map[string]string)
}

switch serializer.(type) {
switch sr := serializer.(type) {
case *carbon2.Serializer:
s.headers[contentTypeHeader] = carbon2ContentType

// In case Carbon2 is used and the metrics format was unset, default to
// include field in metric name.
if sr.IsMetricsFormatUnset() {
sr.SetMetricsFormat(carbon2.Carbon2FormatMetricIncludesField)
}

case *graphite.GraphiteSerializer:
s.headers[contentTypeHeader] = graphiteContentType

case *prometheus.Serializer:
s.headers[contentTypeHeader] = prometheusContentType

Expand Down
100 changes: 63 additions & 37 deletions plugins/outputs/sumologic/sumologic_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package sumologic

import (
"bytes"
"compress/gzip"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
Expand All @@ -17,7 +19,6 @@ import (
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/plugins/serializers/carbon2"
"github.com/influxdata/telegraf/plugins/serializers/graphite"
"github.com/influxdata/telegraf/plugins/serializers/prometheus"
Expand Down Expand Up @@ -133,7 +134,7 @@ func TestMethod(t *testing.T) {
w.WriteHeader(http.StatusOK)
})

serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate)
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
require.NoError(t, err)

plugin := tt.plugin()
Expand Down Expand Up @@ -210,7 +211,7 @@ func TestStatusCode(t *testing.T) {
w.WriteHeader(tt.statusCode)
})

serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate)
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
require.NoError(t, err)

tt.plugin.SetSerializer(serializer)
Expand All @@ -224,77 +225,102 @@ func TestStatusCode(t *testing.T) {
}

func TestContentType(t *testing.T) {
ts := httptest.NewServer(http.NotFoundHandler())
defer ts.Close()

ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})

u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
require.NoError(t, err)

carbon2Serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate)
require.NoError(t, err)

tests := []struct {
name string
plugin func() *SumoLogic
expectedErr bool
serializer serializers.Serializer
name string
plugin func() *SumoLogic
expectedBody []byte
}{
{
name: "carbon2 is supported",
name: "carbon2 (data format = field separate) is supported",
plugin: func() *SumoLogic {
s := Default()
s.URL = u.String()
s.headers = map[string]string{
contentTypeHeader: carbon2ContentType,
}
sr, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
require.NoError(t, err)
s.SetSerializer(sr)
return s
},
expectedBody: []byte("metric=cpu field=value 42 0\n"),
},
{
name: "carbon2 (data format unset) is supported and falls back to include field in metric name",
plugin: func() *SumoLogic {
s := Default()
s.headers = map[string]string{
contentTypeHeader: carbon2ContentType,
}
sr, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldEmpty))
require.NoError(t, err)
s.SetSerializer(sr)
return s
},
serializer: carbon2Serializer,
expectedErr: false,
expectedBody: []byte("metric=cpu_value 42 0\n"),
},
{
name: "carbon2 (data format = metric includes field) is supported",
plugin: func() *SumoLogic {
s := Default()
s.headers = map[string]string{
contentTypeHeader: carbon2ContentType,
}
sr, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatMetricIncludesField))
require.NoError(t, err)
s.SetSerializer(sr)
return s
},
expectedBody: []byte("metric=cpu_value 42 0\n"),
},
{
name: "graphite is supported",
plugin: func() *SumoLogic {
s := Default()
s.URL = u.String()
s.headers = map[string]string{
contentTypeHeader: graphiteContentType,
}
s.SetSerializer(&graphite.GraphiteSerializer{})
return s
},
serializer: &graphite.GraphiteSerializer{},
expectedErr: false,
},
{
name: "prometheus is supported",
plugin: func() *SumoLogic {
s := Default()
s.URL = u.String()
s.headers = map[string]string{
contentTypeHeader: prometheusContentType,
}
s.SetSerializer(&prometheus.Serializer{})
return s
},
serializer: &prometheus.Serializer{},
expectedErr: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
plugin := tt.plugin()

plugin.SetSerializer(tt.serializer)
var body bytes.Buffer
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
gz, err := gzip.NewReader(r.Body)
require.NoError(t, err)
io.Copy(&body, gz)
w.WriteHeader(http.StatusOK)
}))
defer ts.Close()

err := plugin.Connect()
u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
require.NoError(t, err)

plugin := tt.plugin()
plugin.URL = u.String()

require.NoError(t, plugin.Connect())

err = plugin.Write([]telegraf.Metric{getMetric(t)})
require.NoError(t, err)

if tt.expectedBody != nil {
require.Equal(t, string(tt.expectedBody), body.String())
}
})
}
}
Expand Down Expand Up @@ -336,7 +362,7 @@ func TestContentEncodingGzip(t *testing.T) {
w.WriteHeader(http.StatusNoContent)
})

serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate)
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
require.NoError(t, err)

plugin := tt.plugin()
Expand Down Expand Up @@ -372,7 +398,7 @@ func TestDefaultUserAgent(t *testing.T) {
MaxRequstBodySize: Default().MaxRequstBodySize,
}

serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate)
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
require.NoError(t, err)

plugin.SetSerializer(serializer)
Expand Down Expand Up @@ -555,7 +581,7 @@ func TestMaxRequestBodySize(t *testing.T) {
w.WriteHeader(http.StatusOK)
})

serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate)
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
require.NoError(t, err)

plugin := tt.plugin()
Expand Down
61 changes: 40 additions & 21 deletions plugins/serializers/carbon2/carbon2.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,42 +5,38 @@ import (
"fmt"
"strconv"
"strings"
"sync"

"github.com/influxdata/telegraf"
)

type format string

const (
Carbon2FormatFieldSeparate string = "field_separate"
Carbon2FormatMetricIncludesField string = "metric_includes_field"

formatFieldSeparate = format(Carbon2FormatFieldSeparate)
formatMetricIncludesField = format(Carbon2FormatMetricIncludesField)
Carbon2FormatFieldEmpty = format("")
Carbon2FormatFieldSeparate = format("field_separate")
Carbon2FormatMetricIncludesField = format("metric_includes_field")
)

var formats = map[string]format{
// Field separate is the default when no format specified.
"": formatFieldSeparate,
Carbon2FormatFieldSeparate: formatFieldSeparate,
Carbon2FormatMetricIncludesField: formatMetricIncludesField,
var formats = map[format]struct{}{
Carbon2FormatFieldEmpty: {},
Carbon2FormatFieldSeparate: {},
Carbon2FormatMetricIncludesField: {},
}

type Serializer struct {
metricsFormat format
metricsFormat format
metricsFormatLock sync.RWMutex
}

func NewSerializer(f string) (*Serializer, error) {
var (
ok bool
metricsFormat format
)
if metricsFormat, ok = formats[f]; !ok {
func NewSerializer(metricsFormat string) (*Serializer, error) {
var f = format(metricsFormat)
if _, ok := formats[f]; !ok {
return nil, fmt.Errorf("unknown carbon2 format: %s", f)
}

return &Serializer{
metricsFormat: metricsFormat,
metricsFormat: f,
}, nil
}

Expand All @@ -58,17 +54,22 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {

func (s *Serializer) createObject(metric telegraf.Metric) []byte {
var m bytes.Buffer
metricsFormat := s.getMetricsFormat()

for fieldName, fieldValue := range metric.Fields() {
if !isNumeric(fieldValue) {
continue
}

switch s.metricsFormat {
case formatFieldSeparate:
switch metricsFormat {
// Field separate is the default when no format specified.
case Carbon2FormatFieldEmpty:
case Carbon2FormatFieldSeparate:
m.WriteString(serializeMetricFieldSeparate(
metric.Name(), fieldName,
))
case formatMetricIncludesField:

case Carbon2FormatMetricIncludesField:
m.WriteString(serializeMetricIncludeField(
metric.Name(), fieldName,
))
Expand All @@ -93,6 +94,24 @@ func (s *Serializer) createObject(metric telegraf.Metric) []byte {
return m.Bytes()
}

func (s *Serializer) SetMetricsFormat(f format) {
s.metricsFormatLock.Lock()
s.metricsFormat = f
s.metricsFormatLock.Unlock()
}

func (s *Serializer) getMetricsFormat() format {
s.metricsFormatLock.RLock()
defer s.metricsFormatLock.RUnlock()
return s.metricsFormat
}

func (s *Serializer) IsMetricsFormatUnset() bool {
s.metricsFormatLock.RLock()
defer s.metricsFormatLock.RUnlock()
return s.metricsFormat == Carbon2FormatFieldEmpty
}

func serializeMetricFieldSeparate(name, fieldName string) string {
return fmt.Sprintf("metric=%s field=%s ",
strings.Replace(name, " ", "_", -1),
Expand Down
Loading

0 comments on commit d6a0f00

Please sign in to comment.