From 998611f545161b56fe09336af4382e5675ee03cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Patryk=20Ma=C5=82ek?= <69143962+pmalek-sumo@users.noreply.github.com> Date: Wed, 16 Sep 2020 08:56:23 +0200 Subject: [PATCH] Sumo Logic output plugin: carbon2 default to include field in metric (#26) --- plugins/outputs/sumologic/sumologic.go | 10 ++- plugins/outputs/sumologic/sumologic_test.go | 99 +++++++++++++-------- plugins/serializers/carbon2/carbon2.go | 61 ++++++++----- plugins/serializers/carbon2/carbon2_test.go | 36 ++++---- 4 files changed, 129 insertions(+), 77 deletions(-) diff --git a/plugins/outputs/sumologic/sumologic.go b/plugins/outputs/sumologic/sumologic.go index 42ffc3dd3ef6d..5497da6066478 100644 --- a/plugins/outputs/sumologic/sumologic.go +++ b/plugins/outputs/sumologic/sumologic.go @@ -122,11 +122,19 @@ func (s *SumoLogic) SetSerializer(serializer serializers.Serializer) { s.headers = make(map[string]string) } - switch serializer.(type) { + switch sr := serializer.(type) { case *carbon2.Serializer: s.headers[contentTypeHeader] = carbon2ContentType + + // In case Carbon2 is used and the metrics format was unset, default to + // include field in metric name. + if sr.IsMetricsFormatUnset() { + sr.SetMetricsFormat(carbon2.Carbon2FormatMetricIncludesField) + } + case *graphite.GraphiteSerializer: s.headers[contentTypeHeader] = graphiteContentType + case *prometheus.Serializer: s.headers[contentTypeHeader] = prometheusContentType diff --git a/plugins/outputs/sumologic/sumologic_test.go b/plugins/outputs/sumologic/sumologic_test.go index ff9e39d8f58bf..1abbbee838f62 100644 --- a/plugins/outputs/sumologic/sumologic_test.go +++ b/plugins/outputs/sumologic/sumologic_test.go @@ -2,6 +2,7 @@ package sumologic import ( "bufio" + "bytes" "compress/gzip" "fmt" "io" @@ -20,7 +21,6 @@ import ( "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/metric" - "github.com/influxdata/telegraf/plugins/serializers" "github.com/influxdata/telegraf/plugins/serializers/carbon2" "github.com/influxdata/telegraf/plugins/serializers/graphite" "github.com/influxdata/telegraf/plugins/serializers/prometheus" @@ -135,7 +135,7 @@ func TestMethod(t *testing.T) { w.WriteHeader(http.StatusOK) }) - serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate) + serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate)) require.NoError(t, err) plugin := tt.plugin() @@ -212,7 +212,7 @@ func TestStatusCode(t *testing.T) { w.WriteHeader(tt.statusCode) }) - serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate) + serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate)) require.NoError(t, err) tt.plugin.SetSerializer(serializer) @@ -226,77 +226,102 @@ func TestStatusCode(t *testing.T) { } func TestContentType(t *testing.T) { - ts := httptest.NewServer(http.NotFoundHandler()) - defer ts.Close() - - ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - }) - - u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String())) - require.NoError(t, err) - - carbon2Serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate) - require.NoError(t, err) - tests := []struct { - name string - plugin func() *SumoLogic - expectedErr bool - serializer serializers.Serializer + name string + plugin func() *SumoLogic + expectedBody []byte }{ { - name: "carbon2 is supported", + name: "carbon2 (data format = field separate) is supported", plugin: func() *SumoLogic { s := Default() - s.URL = u.String() s.headers = map[string]string{ contentTypeHeader: carbon2ContentType, } + sr, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate)) + require.NoError(t, err) + s.SetSerializer(sr) + return s + }, + expectedBody: []byte("metric=cpu field=value 42 0\n"), + }, + { + name: "carbon2 (data format unset) is supported and falls back to include field in metric name", + plugin: func() *SumoLogic { + s := Default() + s.headers = map[string]string{ + contentTypeHeader: carbon2ContentType, + } + sr, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldEmpty)) + require.NoError(t, err) + s.SetSerializer(sr) return s }, - serializer: carbon2Serializer, - expectedErr: false, + expectedBody: []byte("metric=cpu_value 42 0\n"), + }, + { + name: "carbon2 (data format = metric includes field) is supported", + plugin: func() *SumoLogic { + s := Default() + s.headers = map[string]string{ + contentTypeHeader: carbon2ContentType, + } + sr, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatMetricIncludesField)) + require.NoError(t, err) + s.SetSerializer(sr) + return s + }, + expectedBody: []byte("metric=cpu_value 42 0\n"), }, { name: "graphite is supported", plugin: func() *SumoLogic { s := Default() - s.URL = u.String() s.headers = map[string]string{ contentTypeHeader: graphiteContentType, } + s.SetSerializer(&graphite.GraphiteSerializer{}) return s }, - serializer: &graphite.GraphiteSerializer{}, - expectedErr: false, }, { name: "prometheus is supported", plugin: func() *SumoLogic { s := Default() - s.URL = u.String() s.headers = map[string]string{ contentTypeHeader: prometheusContentType, } + s.SetSerializer(&prometheus.Serializer{}) return s }, - serializer: &prometheus.Serializer{}, - expectedErr: false, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - plugin := tt.plugin() - - plugin.SetSerializer(tt.serializer) + var body bytes.Buffer + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + gz, err := gzip.NewReader(r.Body) + require.NoError(t, err) + io.Copy(&body, gz) + w.WriteHeader(http.StatusOK) + })) + defer ts.Close() - err := plugin.Connect() + u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String())) require.NoError(t, err) + plugin := tt.plugin() + plugin.URL = u.String() + + require.NoError(t, plugin.Connect()) + err = plugin.Write([]telegraf.Metric{getMetric(t)}) require.NoError(t, err) + + if tt.expectedBody != nil { + require.Equal(t, string(tt.expectedBody), body.String()) + } }) } } @@ -338,7 +363,7 @@ func TestContentEncodingGzip(t *testing.T) { w.WriteHeader(http.StatusNoContent) }) - serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate) + serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate)) require.NoError(t, err) plugin := tt.plugin() @@ -374,7 +399,7 @@ func TestDefaultUserAgent(t *testing.T) { MaxRequstBodySize: Default().MaxRequstBodySize, } - serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate) + serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate)) require.NoError(t, err) plugin.SetSerializer(serializer) @@ -627,7 +652,7 @@ func TestMaxRequestBodySize(t *testing.T) { w.WriteHeader(http.StatusOK) }) - serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate) + serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate)) require.NoError(t, err) plugin := tt.plugin() diff --git a/plugins/serializers/carbon2/carbon2.go b/plugins/serializers/carbon2/carbon2.go index 10611815b8a7e..4267bfe710c16 100644 --- a/plugins/serializers/carbon2/carbon2.go +++ b/plugins/serializers/carbon2/carbon2.go @@ -5,6 +5,7 @@ import ( "fmt" "strconv" "strings" + "sync" "github.com/influxdata/telegraf" ) @@ -12,35 +13,30 @@ import ( type format string const ( - Carbon2FormatFieldSeparate string = "field_separate" - Carbon2FormatMetricIncludesField string = "metric_includes_field" - - formatFieldSeparate = format(Carbon2FormatFieldSeparate) - formatMetricIncludesField = format(Carbon2FormatMetricIncludesField) + Carbon2FormatFieldEmpty = format("") + Carbon2FormatFieldSeparate = format("field_separate") + Carbon2FormatMetricIncludesField = format("metric_includes_field") ) -var formats = map[string]format{ - // Field separate is the default when no format specified. - "": formatFieldSeparate, - Carbon2FormatFieldSeparate: formatFieldSeparate, - Carbon2FormatMetricIncludesField: formatMetricIncludesField, +var formats = map[format]struct{}{ + Carbon2FormatFieldEmpty: {}, + Carbon2FormatFieldSeparate: {}, + Carbon2FormatMetricIncludesField: {}, } type Serializer struct { - metricsFormat format + metricsFormat format + metricsFormatLock sync.RWMutex } -func NewSerializer(f string) (*Serializer, error) { - var ( - ok bool - metricsFormat format - ) - if metricsFormat, ok = formats[f]; !ok { +func NewSerializer(metricsFormat string) (*Serializer, error) { + var f = format(metricsFormat) + if _, ok := formats[f]; !ok { return nil, fmt.Errorf("unknown carbon2 format: %s", f) } return &Serializer{ - metricsFormat: metricsFormat, + metricsFormat: f, }, nil } @@ -58,17 +54,22 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { func (s *Serializer) createObject(metric telegraf.Metric) []byte { var m bytes.Buffer + metricsFormat := s.getMetricsFormat() + for fieldName, fieldValue := range metric.Fields() { if !isNumeric(fieldValue) { continue } - switch s.metricsFormat { - case formatFieldSeparate: + switch metricsFormat { + // Field separate is the default when no format specified. + case Carbon2FormatFieldEmpty: + case Carbon2FormatFieldSeparate: m.WriteString(serializeMetricFieldSeparate( metric.Name(), fieldName, )) - case formatMetricIncludesField: + + case Carbon2FormatMetricIncludesField: m.WriteString(serializeMetricIncludeField( metric.Name(), fieldName, )) @@ -93,6 +94,24 @@ func (s *Serializer) createObject(metric telegraf.Metric) []byte { return m.Bytes() } +func (s *Serializer) SetMetricsFormat(f format) { + s.metricsFormatLock.Lock() + s.metricsFormat = f + s.metricsFormatLock.Unlock() +} + +func (s *Serializer) getMetricsFormat() format { + s.metricsFormatLock.RLock() + defer s.metricsFormatLock.RUnlock() + return s.metricsFormat +} + +func (s *Serializer) IsMetricsFormatUnset() bool { + s.metricsFormatLock.RLock() + defer s.metricsFormatLock.RUnlock() + return s.metricsFormat == Carbon2FormatFieldEmpty +} + func serializeMetricFieldSeparate(name, fieldName string) string { return fmt.Sprintf("metric=%s field=%s ", strings.Replace(name, " ", "_", -1), diff --git a/plugins/serializers/carbon2/carbon2_test.go b/plugins/serializers/carbon2/carbon2_test.go index aadc55f7ede96..64f621875c35d 100644 --- a/plugins/serializers/carbon2/carbon2_test.go +++ b/plugins/serializers/carbon2/carbon2_test.go @@ -31,7 +31,7 @@ func TestSerializeMetricFloat(t *testing.T) { require.NoError(t, err) testcases := []struct { - format string + format format expected string }{ { @@ -45,8 +45,8 @@ func TestSerializeMetricFloat(t *testing.T) { } for _, tc := range testcases { - t.Run(tc.format, func(t *testing.T) { - s, err := NewSerializer(tc.format) + t.Run(string(tc.format), func(t *testing.T) { + s, err := NewSerializer(string(tc.format)) require.NoError(t, err) buf, err := s.Serialize(m) @@ -69,7 +69,7 @@ func TestSerializeMetricWithEmptyStringTag(t *testing.T) { require.NoError(t, err) testcases := []struct { - format string + format format expected string }{ { @@ -83,8 +83,8 @@ func TestSerializeMetricWithEmptyStringTag(t *testing.T) { } for _, tc := range testcases { - t.Run(tc.format, func(t *testing.T) { - s, err := NewSerializer(tc.format) + t.Run(string(tc.format), func(t *testing.T) { + s, err := NewSerializer(string(tc.format)) require.NoError(t, err) buf, err := s.Serialize(m) @@ -107,7 +107,7 @@ func TestSerializeWithSpaces(t *testing.T) { require.NoError(t, err) testcases := []struct { - format string + format format expected string }{ { @@ -121,8 +121,8 @@ func TestSerializeWithSpaces(t *testing.T) { } for _, tc := range testcases { - t.Run(tc.format, func(t *testing.T) { - s, err := NewSerializer(tc.format) + t.Run(string(tc.format), func(t *testing.T) { + s, err := NewSerializer(string(tc.format)) require.NoError(t, err) buf, err := s.Serialize(m) @@ -145,7 +145,7 @@ func TestSerializeMetricInt(t *testing.T) { require.NoError(t, err) testcases := []struct { - format string + format format expected string }{ { @@ -159,8 +159,8 @@ func TestSerializeMetricInt(t *testing.T) { } for _, tc := range testcases { - t.Run(tc.format, func(t *testing.T) { - s, err := NewSerializer(tc.format) + t.Run(string(tc.format), func(t *testing.T) { + s, err := NewSerializer(string(tc.format)) require.NoError(t, err) buf, err := s.Serialize(m) @@ -183,7 +183,7 @@ func TestSerializeMetricString(t *testing.T) { assert.NoError(t, err) testcases := []struct { - format string + format format expected string }{ { @@ -197,8 +197,8 @@ func TestSerializeMetricString(t *testing.T) { } for _, tc := range testcases { - t.Run(tc.format, func(t *testing.T) { - s, err := NewSerializer(tc.format) + t.Run(string(tc.format), func(t *testing.T) { + s, err := NewSerializer(string(tc.format)) require.NoError(t, err) buf, err := s.Serialize(m) @@ -224,7 +224,7 @@ func TestSerializeBatch(t *testing.T) { metrics := []telegraf.Metric{m, m} testcases := []struct { - format string + format format expected string }{ { @@ -242,8 +242,8 @@ metric=cpu_value 42 0 } for _, tc := range testcases { - t.Run(tc.format, func(t *testing.T) { - s, err := NewSerializer(tc.format) + t.Run(string(tc.format), func(t *testing.T) { + s, err := NewSerializer(string(tc.format)) require.NoError(t, err) buf, err := s.SerializeBatch(metrics)