From 237776c6da3112c5ee752cb1aa597ed3d38b012a Mon Sep 17 00:00:00 2001 From: Mihai Todor Date: Fri, 5 Oct 2018 23:06:41 +0100 Subject: [PATCH] Add entity-body compression to http output (#4807) --- internal/internal.go | 22 ++++++++++ internal/internal_test.go | 20 ++++++++++ plugins/outputs/http/README.md | 4 ++ plugins/outputs/http/http.go | 41 ++++++++++++++----- plugins/outputs/http/http_test.go | 62 +++++++++++++++++++++++++++++ plugins/outputs/influxdb/http.go | 18 +-------- plugins/outputs/influxdb_v2/http.go | 18 +-------- 7 files changed, 142 insertions(+), 43 deletions(-) diff --git a/internal/internal.go b/internal/internal.go index f7d75dfb3a301..6d087ccebcfba 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -3,8 +3,10 @@ package internal import ( "bufio" "bytes" + "compress/gzip" "crypto/rand" "errors" + "io" "log" "math/big" "os" @@ -208,3 +210,23 @@ func ExitStatus(err error) (int, bool) { } return 0, false } + +// CompressWithGzip takes an io.Reader as input and pipes +// it through a gzip.Writer returning an io.Reader containing +// the gzipped data. +// An error is returned if passing data to the gzip.Writer fails +func CompressWithGzip(data io.Reader) (io.Reader, error) { + pipeReader, pipeWriter := io.Pipe() + gzipWriter := gzip.NewWriter(pipeWriter) + + var err error + go func() { + _, err = io.Copy(gzipWriter, data) + gzipWriter.Close() + // subsequent reads from the read half of the pipe will + // return no bytes and the error err, or EOF if err is nil. + pipeWriter.CloseWithError(err) + }() + + return pipeReader, err +} diff --git a/internal/internal_test.go b/internal/internal_test.go index ee1d24418ad18..3b4ec5dda0713 100644 --- a/internal/internal_test.go +++ b/internal/internal_test.go @@ -1,6 +1,9 @@ package internal import ( + "bytes" + "compress/gzip" + "io/ioutil" "os/exec" "testing" "time" @@ -162,3 +165,20 @@ func TestDuration(t *testing.T) { d.UnmarshalTOML([]byte(`1.5`)) assert.Equal(t, time.Second, d.Duration) } + +func TestCompressWithGzip(t *testing.T) { + testData := "the quick brown fox jumps over the lazy dog" + inputBuffer := bytes.NewBuffer([]byte(testData)) + + outputBuffer, err := CompressWithGzip(inputBuffer) + assert.NoError(t, err) + + gzipReader, err := gzip.NewReader(outputBuffer) + assert.NoError(t, err) + defer gzipReader.Close() + + output, err := ioutil.ReadAll(gzipReader) + assert.NoError(t, err) + + assert.Equal(t, testData, string(output)) +} diff --git a/plugins/outputs/http/README.md b/plugins/outputs/http/README.md index 0c11896f9621b..5697b603076de 100644 --- a/plugins/outputs/http/README.md +++ b/plugins/outputs/http/README.md @@ -44,4 +44,8 @@ data formats. For data_formats that support batching, metrics are sent in batch # [outputs.http.headers] # # Should be set manually to "application/json" for json data_format # Content-Type = "text/plain; charset=utf-8" + + ## HTTP Content-Encoding for write request body, can be set to "gzip" to + ## compress body or "identity" to apply no encoding. + # content_encoding = "identity" ``` diff --git a/plugins/outputs/http/http.go b/plugins/outputs/http/http.go index ccb8f89495b02..8393d049926ae 100644 --- a/plugins/outputs/http/http.go +++ b/plugins/outputs/http/http.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "io" "io/ioutil" "net/http" "strings" @@ -55,6 +56,10 @@ var sampleConfig = ` # [outputs.http.headers] # # Should be set manually to "application/json" for json data_format # Content-Type = "text/plain; charset=utf-8" + + ## HTTP Content-Encoding for write request body, can be set to "gzip" to + ## compress body or "identity" to apply no encoding. + # content_encoding = "identity" ` const ( @@ -64,16 +69,17 @@ const ( ) type HTTP struct { - URL string `toml:"url"` - Timeout internal.Duration `toml:"timeout"` - Method string `toml:"method"` - Username string `toml:"username"` - Password string `toml:"password"` - Headers map[string]string `toml:"headers"` - ClientID string `toml:"client_id"` - ClientSecret string `toml:"client_secret"` - TokenURL string `toml:"token_url"` - Scopes []string `toml:"scopes"` + URL string `toml:"url"` + Timeout internal.Duration `toml:"timeout"` + Method string `toml:"method"` + Username string `toml:"username"` + Password string `toml:"password"` + Headers map[string]string `toml:"headers"` + ClientID string `toml:"client_id"` + ClientSecret string `toml:"client_secret"` + TokenURL string `toml:"token_url"` + Scopes []string `toml:"scopes"` + ContentEncoding string `toml:"content_encoding"` tls.ClientConfig client *http.Client @@ -162,7 +168,17 @@ func (h *HTTP) Write(metrics []telegraf.Metric) error { } func (h *HTTP) write(reqBody []byte) error { - req, err := http.NewRequest(h.Method, h.URL, bytes.NewBuffer(reqBody)) + var reqBodyBuffer io.Reader = bytes.NewBuffer(reqBody) + + var err error + if h.ContentEncoding == "gzip" { + reqBodyBuffer, err = internal.CompressWithGzip(reqBodyBuffer) + if err != nil { + return err + } + } + + req, err := http.NewRequest(h.Method, h.URL, reqBodyBuffer) if err != nil { return err } @@ -172,6 +188,9 @@ func (h *HTTP) write(reqBody []byte) error { } req.Header.Set("Content-Type", defaultContentType) + if h.ContentEncoding == "gzip" { + req.Header.Set("Content-Encoding", "gzip") + } for k, v := range h.Headers { req.Header.Set(k, v) } diff --git a/plugins/outputs/http/http_test.go b/plugins/outputs/http/http_test.go index 0b6c784557923..5b314cceb8aa4 100644 --- a/plugins/outputs/http/http_test.go +++ b/plugins/outputs/http/http_test.go @@ -1,7 +1,9 @@ package http import ( + "compress/gzip" "fmt" + "io/ioutil" "net/http" "net/http/httptest" "net/url" @@ -227,6 +229,66 @@ func TestContentType(t *testing.T) { } } +func TestContentEncodingGzip(t *testing.T) { + ts := httptest.NewServer(http.NotFoundHandler()) + defer ts.Close() + + u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String())) + require.NoError(t, err) + + tests := []struct { + name string + plugin *HTTP + payload string + expected string + }{ + { + name: "default is no content encoding", + plugin: &HTTP{ + URL: u.String(), + }, + expected: "", + }, + { + name: "overwrite content_encoding", + plugin: &HTTP{ + URL: u.String(), + ContentEncoding: "gzip", + }, + expected: "gzip", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, tt.expected, r.Header.Get("Content-Encoding")) + + body := r.Body + var err error + if r.Header.Get("Content-Encoding") == "gzip" { + body, err = gzip.NewReader(r.Body) + require.NoError(t, err) + } + + payload, err := ioutil.ReadAll(body) + require.NoError(t, err) + require.Contains(t, string(payload), "cpu value=42") + + w.WriteHeader(http.StatusNoContent) + }) + + serializer := influx.NewSerializer() + tt.plugin.SetSerializer(serializer) + err = tt.plugin.Connect() + require.NoError(t, err) + + err = tt.plugin.Write([]telegraf.Metric{getMetric()}) + require.NoError(t, err) + }) + } +} + func TestBasicAuth(t *testing.T) { ts := httptest.NewServer(http.NotFoundHandler()) defer ts.Close() diff --git a/plugins/outputs/influxdb/http.go b/plugins/outputs/influxdb/http.go index 164261feb623f..f32ad79a44ac6 100644 --- a/plugins/outputs/influxdb/http.go +++ b/plugins/outputs/influxdb/http.go @@ -1,7 +1,6 @@ package influxdb import ( - "compress/gzip" "context" "crypto/tls" "encoding/json" @@ -16,6 +15,7 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/serializers/influx" ) @@ -360,7 +360,7 @@ func (c *httpClient) makeQueryRequest(query string) (*http.Request, error) { func (c *httpClient) makeWriteRequest(body io.Reader) (*http.Request, error) { var err error if c.ContentEncoding == "gzip" { - body, err = compressWithGzip(body) + body, err = internal.CompressWithGzip(body) if err != nil { return nil, err } @@ -381,20 +381,6 @@ func (c *httpClient) makeWriteRequest(body io.Reader) (*http.Request, error) { return req, nil } -func compressWithGzip(data io.Reader) (io.Reader, error) { - pr, pw := io.Pipe() - gw := gzip.NewWriter(pw) - var err error - - go func() { - _, err = io.Copy(gw, data) - gw.Close() - pw.Close() - }() - - return pr, err -} - func (c *httpClient) addHeaders(req *http.Request) { if c.Username != "" || c.Password != "" { req.SetBasicAuth(c.Username, c.Password) diff --git a/plugins/outputs/influxdb_v2/http.go b/plugins/outputs/influxdb_v2/http.go index 1e7061a270a39..12826ff92b387 100644 --- a/plugins/outputs/influxdb_v2/http.go +++ b/plugins/outputs/influxdb_v2/http.go @@ -1,7 +1,6 @@ package influxdb_v2 import ( - "compress/gzip" "context" "crypto/tls" "encoding/json" @@ -17,6 +16,7 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/serializers/influx" ) @@ -231,7 +231,7 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error func (c *httpClient) makeWriteRequest(body io.Reader) (*http.Request, error) { var err error if c.ContentEncoding == "gzip" { - body, err = compressWithGzip(body) + body, err = internal.CompressWithGzip(body) if err != nil { return nil, err } @@ -258,20 +258,6 @@ func (c *httpClient) addHeaders(req *http.Request) { } } -func compressWithGzip(data io.Reader) (io.Reader, error) { - pipeReader, pipeWriter := io.Pipe() - gzipWriter := gzip.NewWriter(pipeWriter) - var err error - - go func() { - _, err = io.Copy(gzipWriter, data) - gzipWriter.Close() - pipeWriter.Close() - }() - - return pipeReader, err -} - func makeWriteURL(loc url.URL, org, bucket string) (string, error) { params := url.Values{} params.Set("bucket", bucket)