Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Sumo Logic output plugin not splitting requests properly (#25) #8115

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 72 additions & 21 deletions plugins/outputs/sumologic/sumologic.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"compress/gzip"
"context"
"fmt"
"log"
"net/http"
"strings"
"time"
Expand Down Expand Up @@ -107,6 +108,8 @@ type SumoLogic struct {
SourceCategory string `toml:"source_category"`
Dimensions string `toml:"dimensions"`

Log telegraf.Logger `toml:"-"`

client *http.Client
serializer serializers.Serializer

Expand Down Expand Up @@ -189,33 +192,19 @@ func (s *SumoLogic) Write(metrics []telegraf.Metric) error {
if s.serializer == nil {
return errors.New("sumologic: serializer unset")
}
if len(metrics) == 0 {
return nil
}

reqBody, err := s.serializer.SerializeBatch(metrics)
if err != nil {
return err
}

if l := len(reqBody); l > int(s.MaxRequstBodySize) {
var (
// Do the rounded up integer division
numChunks = (l + int(s.MaxRequstBodySize) - 1) / int(s.MaxRequstBodySize)
chunks = make([][]byte, 0, numChunks)
numMetrics = len(metrics)
// Do the rounded up integer division
stepMetrics = (numMetrics + numChunks - 1) / numChunks
)

for i := 0; i < numMetrics; i += stepMetrics {
boundary := i + stepMetrics
if boundary > numMetrics {
boundary = numMetrics - 1
}

chunkBody, err := s.serializer.SerializeBatch(metrics[i:boundary])
if err != nil {
return err
}
chunks = append(chunks, chunkBody)
chunks, err := s.splitIntoChunks(metrics)
if err != nil {
return err
}

return s.writeRequestChunks(chunks)
Expand All @@ -227,7 +216,7 @@ func (s *SumoLogic) Write(metrics []telegraf.Metric) error {
func (s *SumoLogic) writeRequestChunks(chunks [][]byte) error {
for _, reqChunk := range chunks {
if err := s.write(reqChunk); err != nil {
return err
s.Log.Errorf("Error sending chunk: %v", err)
}
}
return nil
Expand Down Expand Up @@ -282,6 +271,68 @@ func (s *SumoLogic) write(reqBody []byte) error {
return nil
}

// splitIntoChunks splits metrics to be sent into chunks so that every request
// is smaller than s.MaxRequstBodySize unless it was configured so small so that
// even a single metric cannot fit.
// In such a situation metrics will be sent one by one with a warning being logged
// for every request sent even though they don't fit in s.MaxRequstBodySize bytes.
func (s *SumoLogic) splitIntoChunks(metrics []telegraf.Metric) ([][]byte, error) {
var (
numMetrics = len(metrics)
chunks = make([][]byte, 0)
)

for i := 0; i < numMetrics; {
var toAppend []byte
for i < numMetrics {
chunkBody, err := s.serializer.Serialize(metrics[i])
if err != nil {
return nil, err
}

la := len(toAppend)
if la != 0 {
// We already have something to append ...
if la+len(chunkBody) > int(s.MaxRequstBodySize) {
// ... and it's just the right size, without currently processed chunk.
break
} else {
// ... we can try appending more.
i++
toAppend = append(toAppend, chunkBody...)
continue
}
} else { // la == 0
i++
toAppend = chunkBody

if len(chunkBody) > int(s.MaxRequstBodySize) {
log.Printf(
"W! [SumoLogic] max_request_body_size set to %d which is too small even for a single metric (len: %d), sending without split",
s.MaxRequstBodySize, len(chunkBody),
)

// The serialized metric is too big but we have no choice
// but to send it.
// max_request_body_size was set so small that it wouldn't
// even accomodate a single metric.
break
}

continue
}
}

if toAppend == nil {
break
}

chunks = append(chunks, toAppend)
}

return chunks, nil
}

func setHeaderIfSetInConfig(r *http.Request, h header, value string) {
if value != "" {
r.Header.Set(string(h), value)
Expand Down
166 changes: 137 additions & 29 deletions plugins/outputs/sumologic/sumologic_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package sumologic

import (
"bufio"
"compress/gzip"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
"net/url"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -36,8 +39,7 @@ func getMetric(t *testing.T) telegraf.Metric {
return m
}

func getMetrics(t *testing.T) []telegraf.Metric {
const count = 10
func getMetrics(t *testing.T, count int) []telegraf.Metric {
var metrics = make([]telegraf.Metric, count)

for i := 0; i < count; i++ {
Expand Down Expand Up @@ -480,12 +482,15 @@ func TestMaxRequestBodySize(t *testing.T) {
u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
require.NoError(t, err)

const count = 100

testcases := []struct {
name string
plugin func() *SumoLogic
metrics []telegraf.Metric
expectedError bool
expectedRequestCount int
name string
plugin func() *SumoLogic
metrics []telegraf.Metric
expectedError bool
expectedRequestCount int32
expectedMetricLinesCount int32
}{
{
name: "default max request body size is 1MB and doesn't split small enough metric slices",
Expand All @@ -494,9 +499,10 @@ func TestMaxRequestBodySize(t *testing.T) {
s.URL = u.String()
return s
},
metrics: []telegraf.Metric{getMetric(t)},
expectedError: false,
expectedRequestCount: 1,
metrics: []telegraf.Metric{getMetric(t)},
expectedError: false,
expectedRequestCount: 1,
expectedMetricLinesCount: 1,
},
{
name: "default max request body size is 1MB and doesn't split small even medium sized metrics",
Expand All @@ -505,33 +511,90 @@ func TestMaxRequestBodySize(t *testing.T) {
s.URL = u.String()
return s
},
metrics: getMetrics(t),
expectedError: false,
expectedRequestCount: 1,
metrics: getMetrics(t, count),
expectedError: false,
expectedRequestCount: 1,
expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500
},
{
name: "when short by at least 1B the request is split",
plugin: func() *SumoLogic {
s := Default()
s.URL = u.String()
// getMetrics returns metrics that serialized (using carbon2),
// uncompressed size is 43750B
s.MaxRequstBodySize = 43_749
return s
},
metrics: getMetrics(t, count),
expectedError: false,
expectedRequestCount: 2,
expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500
},
{
name: "max request body size properly splits requests - max 10_000",
plugin: func() *SumoLogic {
s := Default()
s.URL = u.String()
s.MaxRequstBodySize = 10_000
return s
},
metrics: getMetrics(t, count),
expectedError: false,
expectedRequestCount: 5,
expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500
},
{
name: "max request body size properly splits requests - max 5_000",
plugin: func() *SumoLogic {
s := Default()
s.URL = u.String()
s.MaxRequstBodySize = 5_000
return s
},
metrics: getMetrics(t, count),
expectedError: false,
expectedRequestCount: 10,
expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500
},
{
name: "max request body size properly splits requests - max 2_500",
plugin: func() *SumoLogic {
s := Default()
s.URL = u.String()
s.MaxRequstBodySize = 2_500
return s
},
metrics: getMetrics(t, count),
expectedError: false,
expectedRequestCount: 20,
expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500
},
{
name: "max request body size properly splits requests - max 2500",
name: "max request body size properly splits requests - max 1_000",
plugin: func() *SumoLogic {
s := Default()
s.URL = u.String()
s.MaxRequstBodySize = 2500
s.MaxRequstBodySize = 1_000
return s
},
metrics: getMetrics(t),
expectedError: false,
expectedRequestCount: 2,
metrics: getMetrics(t, count),
expectedError: false,
expectedRequestCount: 50,
expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500
},
{
name: "max request body size properly splits requests - max 1000",
name: "max request body size properly splits requests - max 500",
plugin: func() *SumoLogic {
s := Default()
s.URL = u.String()
s.MaxRequstBodySize = 1000
s.MaxRequstBodySize = 500
return s
},
metrics: getMetrics(t),
expectedError: false,
expectedRequestCount: 5,
metrics: getMetrics(t, count),
expectedError: false,
expectedRequestCount: 100,
expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500
},
{
name: "max request body size properly splits requests - max 300",
Expand All @@ -541,17 +604,26 @@ func TestMaxRequestBodySize(t *testing.T) {
s.MaxRequstBodySize = 300
return s
},
metrics: getMetrics(t),
expectedError: false,
expectedRequestCount: 10,
metrics: getMetrics(t, count),
expectedError: false,
expectedRequestCount: 100,
expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500
},
}

for _, tt := range testcases {
t.Run(tt.name, func(t *testing.T) {
var requestCount int
var (
requestCount int32
linesCount int32
)
ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
requestCount++
atomic.AddInt32(&requestCount, 1)

if tt.expectedMetricLinesCount != 0 {
atomic.AddInt32(&linesCount, int32(countLines(t, r.Body)))
}

w.WriteHeader(http.StatusOK)
})

Expand All @@ -569,8 +641,44 @@ func TestMaxRequestBodySize(t *testing.T) {
require.Error(t, err)
} else {
require.NoError(t, err)
require.Equal(t, tt.expectedRequestCount, requestCount)
require.Equal(t, tt.expectedRequestCount, atomic.LoadInt32(&requestCount))
require.Equal(t, tt.expectedMetricLinesCount, atomic.LoadInt32(&linesCount))
}
})
}
}

func TestTryingToSendEmptyMetricsDoesntFail(t *testing.T) {
ts := httptest.NewServer(http.NotFoundHandler())
defer ts.Close()

u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
require.NoError(t, err)

metrics := make([]telegraf.Metric, 0)
plugin := Default()
plugin.URL = u.String()

serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate)
require.NoError(t, err)
plugin.SetSerializer(serializer)

err = plugin.Connect()
require.NoError(t, err)

err = plugin.Write(metrics)
require.NoError(t, err)
}

func countLines(t *testing.T, body io.Reader) int {
// All requests coming from Sumo Logic output plugin are gzipped.
gz, err := gzip.NewReader(body)
require.NoError(t, err)

var linesCount int
for s := bufio.NewScanner(gz); s.Scan(); {
linesCount++
}

return linesCount
}