Skip to content

Commit

Permalink
Fix Sumo Logic output plugin not splitting requests properly (#25) (i…
Browse files Browse the repository at this point in the history
  • Loading branch information
pmalek-sumo authored and pmalek committed Sep 28, 2020
1 parent c9a6de7 commit 536e60b
Show file tree
Hide file tree
Showing 2 changed files with 209 additions and 50 deletions.
93 changes: 72 additions & 21 deletions plugins/outputs/sumologic/sumologic.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"compress/gzip"
"context"
"fmt"
"log"
"net/http"
"strings"
"time"
Expand Down Expand Up @@ -107,6 +108,8 @@ type SumoLogic struct {
SourceCategory string `toml:"source_category"`
Dimensions string `toml:"dimensions"`

Log telegraf.Logger `toml:"-"`

client *http.Client
serializer serializers.Serializer

Expand Down Expand Up @@ -189,33 +192,19 @@ func (s *SumoLogic) Write(metrics []telegraf.Metric) error {
if s.serializer == nil {
return errors.New("sumologic: serializer unset")
}
if len(metrics) == 0 {
return nil
}

reqBody, err := s.serializer.SerializeBatch(metrics)
if err != nil {
return err
}

if l := len(reqBody); l > int(s.MaxRequstBodySize) {
var (
// Do the rounded up integer division
numChunks = (l + int(s.MaxRequstBodySize) - 1) / int(s.MaxRequstBodySize)
chunks = make([][]byte, 0, numChunks)
numMetrics = len(metrics)
// Do the rounded up integer division
stepMetrics = (numMetrics + numChunks - 1) / numChunks
)

for i := 0; i < numMetrics; i += stepMetrics {
boundary := i + stepMetrics
if boundary > numMetrics {
boundary = numMetrics - 1
}

chunkBody, err := s.serializer.SerializeBatch(metrics[i:boundary])
if err != nil {
return err
}
chunks = append(chunks, chunkBody)
chunks, err := s.splitIntoChunks(metrics)
if err != nil {
return err
}

return s.writeRequestChunks(chunks)
Expand All @@ -227,7 +216,7 @@ func (s *SumoLogic) Write(metrics []telegraf.Metric) error {
func (s *SumoLogic) writeRequestChunks(chunks [][]byte) error {
for _, reqChunk := range chunks {
if err := s.write(reqChunk); err != nil {
return err
s.Log.Errorf("Error sending chunk: %v", err)
}
}
return nil
Expand Down Expand Up @@ -282,6 +271,68 @@ func (s *SumoLogic) write(reqBody []byte) error {
return nil
}

// splitIntoChunks splits metrics to be sent into chunks so that every request
// is smaller than s.MaxRequstBodySize unless it was configured so small so that
// even a single metric cannot fit.
// In such a situation metrics will be sent one by one with a warning being logged
// for every request sent even though they don't fit in s.MaxRequstBodySize bytes.
func (s *SumoLogic) splitIntoChunks(metrics []telegraf.Metric) ([][]byte, error) {
var (
numMetrics = len(metrics)
chunks = make([][]byte, 0)
)

for i := 0; i < numMetrics; {
var toAppend []byte
for i < numMetrics {
chunkBody, err := s.serializer.Serialize(metrics[i])
if err != nil {
return nil, err
}

la := len(toAppend)
if la != 0 {
// We already have something to append ...
if la+len(chunkBody) > int(s.MaxRequstBodySize) {
// ... and it's just the right size, without currently processed chunk.
break
} else {
// ... we can try appending more.
i++
toAppend = append(toAppend, chunkBody...)
continue
}
} else { // la == 0
i++
toAppend = chunkBody

if len(chunkBody) > int(s.MaxRequstBodySize) {
log.Printf(
"W! [SumoLogic] max_request_body_size set to %d which is too small even for a single metric (len: %d), sending without split",
s.MaxRequstBodySize, len(chunkBody),
)

// The serialized metric is too big but we have no choice
// but to send it.
// max_request_body_size was set so small that it wouldn't
// even accomodate a single metric.
break
}

continue
}
}

if toAppend == nil {
break
}

chunks = append(chunks, toAppend)
}

return chunks, nil
}

func setHeaderIfSetInConfig(r *http.Request, h header, value string) {
if value != "" {
r.Header.Set(string(h), value)
Expand Down
166 changes: 137 additions & 29 deletions plugins/outputs/sumologic/sumologic_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package sumologic

import (
"bufio"
"compress/gzip"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
"net/url"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -36,8 +39,7 @@ func getMetric(t *testing.T) telegraf.Metric {
return m
}

func getMetrics(t *testing.T) []telegraf.Metric {
const count = 10
func getMetrics(t *testing.T, count int) []telegraf.Metric {
var metrics = make([]telegraf.Metric, count)

for i := 0; i < count; i++ {
Expand Down Expand Up @@ -480,12 +482,15 @@ func TestMaxRequestBodySize(t *testing.T) {
u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
require.NoError(t, err)

const count = 100

testcases := []struct {
name string
plugin func() *SumoLogic
metrics []telegraf.Metric
expectedError bool
expectedRequestCount int
name string
plugin func() *SumoLogic
metrics []telegraf.Metric
expectedError bool
expectedRequestCount int32
expectedMetricLinesCount int32
}{
{
name: "default max request body size is 1MB and doesn't split small enough metric slices",
Expand All @@ -494,9 +499,10 @@ func TestMaxRequestBodySize(t *testing.T) {
s.URL = u.String()
return s
},
metrics: []telegraf.Metric{getMetric(t)},
expectedError: false,
expectedRequestCount: 1,
metrics: []telegraf.Metric{getMetric(t)},
expectedError: false,
expectedRequestCount: 1,
expectedMetricLinesCount: 1,
},
{
name: "default max request body size is 1MB and doesn't split small even medium sized metrics",
Expand All @@ -505,33 +511,90 @@ func TestMaxRequestBodySize(t *testing.T) {
s.URL = u.String()
return s
},
metrics: getMetrics(t),
expectedError: false,
expectedRequestCount: 1,
metrics: getMetrics(t, count),
expectedError: false,
expectedRequestCount: 1,
expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500
},
{
name: "when short by at least 1B the request is split",
plugin: func() *SumoLogic {
s := Default()
s.URL = u.String()
// getMetrics returns metrics that serialized (using carbon2),
// uncompressed size is 43750B
s.MaxRequstBodySize = 43_749
return s
},
metrics: getMetrics(t, count),
expectedError: false,
expectedRequestCount: 2,
expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500
},
{
name: "max request body size properly splits requests - max 10_000",
plugin: func() *SumoLogic {
s := Default()
s.URL = u.String()
s.MaxRequstBodySize = 10_000
return s
},
metrics: getMetrics(t, count),
expectedError: false,
expectedRequestCount: 5,
expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500
},
{
name: "max request body size properly splits requests - max 5_000",
plugin: func() *SumoLogic {
s := Default()
s.URL = u.String()
s.MaxRequstBodySize = 5_000
return s
},
metrics: getMetrics(t, count),
expectedError: false,
expectedRequestCount: 10,
expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500
},
{
name: "max request body size properly splits requests - max 2_500",
plugin: func() *SumoLogic {
s := Default()
s.URL = u.String()
s.MaxRequstBodySize = 2_500
return s
},
metrics: getMetrics(t, count),
expectedError: false,
expectedRequestCount: 20,
expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500
},
{
name: "max request body size properly splits requests - max 2500",
name: "max request body size properly splits requests - max 1_000",
plugin: func() *SumoLogic {
s := Default()
s.URL = u.String()
s.MaxRequstBodySize = 2500
s.MaxRequstBodySize = 1_000
return s
},
metrics: getMetrics(t),
expectedError: false,
expectedRequestCount: 2,
metrics: getMetrics(t, count),
expectedError: false,
expectedRequestCount: 50,
expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500
},
{
name: "max request body size properly splits requests - max 1000",
name: "max request body size properly splits requests - max 500",
plugin: func() *SumoLogic {
s := Default()
s.URL = u.String()
s.MaxRequstBodySize = 1000
s.MaxRequstBodySize = 500
return s
},
metrics: getMetrics(t),
expectedError: false,
expectedRequestCount: 5,
metrics: getMetrics(t, count),
expectedError: false,
expectedRequestCount: 100,
expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500
},
{
name: "max request body size properly splits requests - max 300",
Expand All @@ -541,17 +604,26 @@ func TestMaxRequestBodySize(t *testing.T) {
s.MaxRequstBodySize = 300
return s
},
metrics: getMetrics(t),
expectedError: false,
expectedRequestCount: 10,
metrics: getMetrics(t, count),
expectedError: false,
expectedRequestCount: 100,
expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500
},
}

for _, tt := range testcases {
t.Run(tt.name, func(t *testing.T) {
var requestCount int
var (
requestCount int32
linesCount int32
)
ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
requestCount++
atomic.AddInt32(&requestCount, 1)

if tt.expectedMetricLinesCount != 0 {
atomic.AddInt32(&linesCount, int32(countLines(t, r.Body)))
}

w.WriteHeader(http.StatusOK)
})

Expand All @@ -569,8 +641,44 @@ func TestMaxRequestBodySize(t *testing.T) {
require.Error(t, err)
} else {
require.NoError(t, err)
require.Equal(t, tt.expectedRequestCount, requestCount)
require.Equal(t, tt.expectedRequestCount, atomic.LoadInt32(&requestCount))
require.Equal(t, tt.expectedMetricLinesCount, atomic.LoadInt32(&linesCount))
}
})
}
}

func TestTryingToSendEmptyMetricsDoesntFail(t *testing.T) {
ts := httptest.NewServer(http.NotFoundHandler())
defer ts.Close()

u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
require.NoError(t, err)

metrics := make([]telegraf.Metric, 0)
plugin := Default()
plugin.URL = u.String()

serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate)
require.NoError(t, err)
plugin.SetSerializer(serializer)

err = plugin.Connect()
require.NoError(t, err)

err = plugin.Write(metrics)
require.NoError(t, err)
}

func countLines(t *testing.T, body io.Reader) int {
// All requests coming from Sumo Logic output plugin are gzipped.
gz, err := gzip.NewReader(body)
require.NoError(t, err)

var linesCount int
for s := bufio.NewScanner(gz); s.Scan(); {
linesCount++
}

return linesCount
}

0 comments on commit 536e60b

Please sign in to comment.