Skip to content

Commit

Permalink
Merge pull request #54 from honeycombio/tredman.zstd-support
Browse files Browse the repository at this point in the history
[router] add zstd support
  • Loading branch information
tredman authored Feb 11, 2020
2 parents aeded55 + 9bad502 commit 8bf9e75
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 8 deletions.
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ module github.com/honeycombio/samproxy

require (
github.com/BurntSushi/toml v0.3.1 // indirect
github.com/aws/aws-sdk-go v1.28.14 // indirect
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 // indirect
github.com/davecgh/go-spew v1.1.1
github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a // indirect
Expand All @@ -20,10 +21,11 @@ require (
github.com/honeycombio/dynsampler-go v0.0.0-20171107180038-7f9929d9ca1f
github.com/honeycombio/libhoney-go v1.12.2
github.com/jessevdk/go-flags v1.4.0
github.com/klauspost/compress v1.9.1 // indirect
github.com/klauspost/compress v1.9.1
github.com/kr/pretty v0.1.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/pelletier/go-toml v1.2.0
github.com/pierrec/lz4 v2.4.1+incompatible // indirect
github.com/pkg/errors v0.8.0
github.com/prometheus/client_golang v0.9.0
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/aws/aws-sdk-go v1.28.14 h1:ZeFS5GVtsJMZ0TBJ5n4HYwB/4MpY0hWkRthNNZkIzNo=
github.com/aws/aws-sdk-go v1.28.14/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
Expand Down Expand Up @@ -40,6 +42,8 @@ github.com/honeycombio/libhoney-go v1.12.2 h1:KA66J2HxOxV8kTEDZ4f9d97KQwb5aTKt7Y
github.com/honeycombio/libhoney-go v1.12.2/go.mod h1:jdLxh51fcBTy6XIpx1efuJmHePs2xUfVkw25lr+hsmg=
github.com/jessevdk/go-flags v1.4.0 h1:4IU2WS7AumrZ/40jfhf4QVDMsQwqA7VEHozFRrGARJA=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/klauspost/compress v1.9.1 h1:TWy0o9J9c6LK9C8t7Msh6IAJNXbsU/nvKLTQUU5HdaY=
github.com/klauspost/compress v1.9.1/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/konsorten/go-windows-terminal-sequences v0.0.0-20180402223658-b729f2633dfe h1:CHRGQ8V7OlCYtwaKPJi3iA7J+YdNKdo8j7nG5IgDhjs=
Expand All @@ -53,6 +57,8 @@ github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0j
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pierrec/lz4 v2.4.1+incompatible h1:mFe7ttWaflA46Mhqh+jUfjp2qTbPYxLB2/OyBppH9dg=
github.com/pierrec/lz4 v2.4.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down
66 changes: 59 additions & 7 deletions route/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"github.com/gorilla/mux"
"github.com/klauspost/compress/zstd"

"github.com/honeycombio/samproxy/collect"
"github.com/honeycombio/samproxy/config"
Expand All @@ -23,6 +24,13 @@ import (
"github.com/honeycombio/samproxy/types"
)

const (
// numZstdDecoders is set statically here - we may make it into a config option
// A normal practice might be to use some multiple of the CPUs, but that goes south
// in kubernetes
numZstdDecoders = 4
)

type Router struct {
Config config.Config `inject:""`
Logger logger.Logger `inject:""`
Expand All @@ -45,6 +53,8 @@ type Router struct {

// iopLogger is a logger that knows whether it's incoming or peer
iopLogger logger.Entry

zstdDecoders chan *zstd.Decoder
}

type BatchResponse struct {
Expand All @@ -69,6 +79,13 @@ func (r *Router) LnS(incomingOrPeer string) {
Transport: r.HTTPTransport,
}

var err error
r.zstdDecoders, err = makeDecoders(numZstdDecoders)
if err != nil {
r.iopLogger.Errorf("couldn't start zstd decoders: %s", err.Error())
return
}

r.Metrics.Register(r.incomingOrPeer+"_router_proxied", "counter")
r.Metrics.Register(r.incomingOrPeer+"_router_event", "counter")
r.Metrics.Register(r.incomingOrPeer+"_router_batch", "counter")
Expand Down Expand Up @@ -98,7 +115,6 @@ func (r *Router) LnS(incomingOrPeer string) {
muxxer.PathPrefix("/").HandlerFunc(r.proxy).Name("proxy")

var listenAddr string
var err error
if r.incomingOrPeer == "incoming" {
listenAddr, err = r.Config.GetListenAddr()
if err != nil {
Expand Down Expand Up @@ -281,7 +297,7 @@ func (r *Router) batch(w http.ResponseWriter, req *http.Request) {
reqID := req.Context().Value(types.RequestIDContextKey{})
logger := r.iopLogger.WithField("request_id", reqID)

bodyReader, err := getMaybeGzippedBody(req)
bodyReader, err := r.getMaybeCompressedBody(req)
if err != nil {
r.handlerReturnWithError(w, ErrPostBody, err)
return
Expand Down Expand Up @@ -407,19 +423,38 @@ func (r *Router) batch(w http.ResponseWriter, req *http.Request) {
w.Write(response)
}

func getMaybeGzippedBody(req *http.Request) (io.Reader, error) {
func (r *Router) getMaybeCompressedBody(req *http.Request) (io.Reader, error) {
var reader io.Reader
switch req.Header.Get("Content-Encoding") {
case "gzip":
buf := bytes.Buffer{}
if _, err := io.Copy(&buf, req.Body); err != nil {
gzipReader, err := gzip.NewReader(req.Body)
if err != nil {
return nil, err
}
var err error
reader, err = gzip.NewReader(&buf)
defer gzipReader.Close()

buf := &bytes.Buffer{}
if _, err := io.Copy(buf, gzipReader); err != nil {
return nil, err
}
reader = buf
case "zstd":
zReader := <-r.zstdDecoders
defer func(zReader *zstd.Decoder) {
zReader.Reset(nil)
r.zstdDecoders <- zReader
}(zReader)

err := zReader.Reset(req.Body)
if err != nil {
return nil, err
}
buf := &bytes.Buffer{}
if _, err := io.Copy(buf, zReader); err != nil {
return nil, err
}

reader = buf
default:
reader = req.Body
}
Expand Down Expand Up @@ -505,3 +540,20 @@ func getEventTime(etHeader string) time.Time {
}
return eventTime
}

func makeDecoders(num int) (chan *zstd.Decoder, error) {
zstdDecoders := make(chan *zstd.Decoder, num)
for i := 0; i < num; i++ {
zReader, err := zstd.NewReader(
nil,
zstd.WithDecoderConcurrency(1),
zstd.WithDecoderLowmem(true),
zstd.WithDecoderMaxMemory(8*1024*1024),
)
if err != nil {
return nil, err
}
zstdDecoders <- zReader
}
return zstdDecoders, nil
}
88 changes: 88 additions & 0 deletions route/route_test.go
Original file line number Diff line number Diff line change
@@ -1 +1,89 @@
package route

import (
"bytes"
"compress/gzip"
"io/ioutil"
"net/http"
"strings"
"testing"

"github.com/klauspost/compress/zstd"
)

func TestDecompression(t *testing.T) {
payload := "payload"
pReader := strings.NewReader(payload)

decoders, err := makeDecoders(numZstdDecoders)
if err != nil {
t.Errorf("unexpected err: %s", err.Error())
}

router := &Router{zstdDecoders: decoders}
req := &http.Request{
Body: ioutil.NopCloser(pReader),
Header: http.Header{},
}
reader, err := router.getMaybeCompressedBody(req)
if err != nil {
t.Errorf("unexpected err: %s", err.Error())
}

b, err := ioutil.ReadAll(reader)
if err != nil {
t.Errorf("unexpected err: %s", err.Error())
}
if string(b) != payload {
t.Errorf("%s != %s", string(b), payload)
}

buf := &bytes.Buffer{}
w := gzip.NewWriter(buf)
_, err = w.Write([]byte(payload))
if err != nil {
t.Errorf("unexpected err: %s", err.Error())
}
w.Close()

req.Body = ioutil.NopCloser(buf)
req.Header.Set("Content-Encoding", "gzip")
reader, err = router.getMaybeCompressedBody(req)
if err != nil {
t.Errorf("unexpected err: %s", err.Error())
}

b, err = ioutil.ReadAll(reader)
if err != nil {
t.Errorf("unexpected err: %s", err.Error())
}
if string(b) != payload {
t.Errorf("%s != %s", string(b), payload)
}

buf = &bytes.Buffer{}
zstdW, err := zstd.NewWriter(buf)
if err != nil {
t.Errorf("unexpected err: %s", err.Error())
}
_, err = zstdW.Write([]byte(payload))
if err != nil {
t.Errorf("unexpected err: %s", err.Error())
}
zstdW.Close()

req.Body = ioutil.NopCloser(buf)
req.Header.Set("Content-Encoding", "zstd")
reader, err = router.getMaybeCompressedBody(req)
if err != nil {
t.Errorf("unexpected err: %s", err.Error())
}

b, err = ioutil.ReadAll(reader)
if err != nil {
t.Errorf("unexpected err: %s", err.Error())
}
if string(b) != payload {
t.Errorf("%s != %s", string(b), payload)
}
}

0 comments on commit 8bf9e75

Please sign in to comment.