Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve decompression within ParseProtoReader. #3682

Merged
merged 10 commits into from
Jan 21, 2021
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## master / unreleased

* [CHANGE] FramedSnappy encoding support has been removed from Push and Remote Read APIs. This means Prometheus 1.6 support has been removed and the oldest Prometheus version supported in the remote write is 1.7. #3682
* [CHANGE] Ruler: removed the flag `-ruler.evaluation-delay-duration-deprecated` which was deprecated in 1.4.0. Please use the `ruler_evaluation_delay_duration` per-tenant limit instead. #3693
* [CHANGE] Removed the flags `-<prefix>.grpc-use-gzip-compression` which were deprecated in 1.3.0: #3693
* `-query-scheduler.grpc-client-config.grpc-use-gzip-compression`: use `-query-scheduler.grpc-client-config.grpc-compression` instead
Expand All @@ -21,6 +22,7 @@
* `-cluster.peer` in favor of `-alertmanager.cluster.peers`
* `-cluster.peer-timeout` in favor of `-alertmanager.cluster.peer-timeout`
* [FEATURE] Querier: Queries can be federated across multiple tenants. The tenants IDs involved need to be specified separated by a `|` character in the `X-Scope-OrgID` request header. This is an experimental feature, which can be enabled by setting `-tenant-federation.enabled=true` on all Cortex services. #3250
* [ENHANCEMENT] Allow specifying JAEGER_ENDPOINT instead of sampling server or local agent port. #3682
* [FEATURE] Alertmanager: introduced the experimental option `-alertmanager.sharding-enabled` to shard tenants across multiple Alertmanager instances. This feature is still under heavy development and its usage is discouraged. The following new metrics are exported by the Alertmanager: #3664
* `cortex_alertmanager_ring_check_errors_total`
* `cortex_alertmanager_sync_configs_total`
Expand Down
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ require (
github.com/alicebob/miniredis v2.5.0+incompatible
github.com/armon/go-metrics v0.3.3
github.com/aws/aws-sdk-go v1.35.31
github.com/blang/semver v3.5.0+incompatible
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
github.com/cespare/xxhash v1.1.0
github.com/dustin/go-humanize v1.0.0
Expand Down Expand Up @@ -56,7 +55,7 @@ require (
github.com/stretchr/testify v1.6.1
github.com/thanos-io/thanos v0.13.1-0.20210108102609-f85e4003ba51
github.com/uber/jaeger-client-go v2.25.0+incompatible
github.com/weaveworks/common v0.0.0-20201119133501-0619918236ec
github.com/weaveworks/common v0.0.0-20210112142934-23c8d7fa6120
go.etcd.io/bbolt v1.3.5-0.20200615073812-232d8fc87f50
go.etcd.io/etcd v0.5.0-alpha.5.0.20200520232829-54ba9589114f
go.uber.org/atomic v1.7.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1179,8 +1179,8 @@ github.com/weaveworks/common v0.0.0-20200206153930-760e36ae819a/go.mod h1:6enWAq
github.com/weaveworks/common v0.0.0-20200625145055-4b1847531bc9/go.mod h1:c98fKi5B9u8OsKGiWHLRKus6ToQ1Tubeow44ECO1uxY=
github.com/weaveworks/common v0.0.0-20200914083218-61ffdd448099 h1:MS5M2antM8wzMUqVxIfAi+yb6yjXvDINRFvLnmNXeIw=
github.com/weaveworks/common v0.0.0-20200914083218-61ffdd448099/go.mod h1:hz10LOsAdzC3K/iXaKoFxOKTDRgxJl+BTGX1GY+TzO4=
github.com/weaveworks/common v0.0.0-20201119133501-0619918236ec h1:5JmevdpzK10Z2ua0VDToj7Kg2+/t0FzdYBjsurYRE8k=
github.com/weaveworks/common v0.0.0-20201119133501-0619918236ec/go.mod h1:ykzWac1LtVfOxdCK+jD754at1Ws9dKCwFeUzkFBffPs=
github.com/weaveworks/common v0.0.0-20210112142934-23c8d7fa6120 h1:zQtcwREXYNvW116ipgc0bRDg1avD2b6QP0RGPLlPWkc=
github.com/weaveworks/common v0.0.0-20210112142934-23c8d7fa6120/go.mod h1:ykzWac1LtVfOxdCK+jD754at1Ws9dKCwFeUzkFBffPs=
github.com/weaveworks/promrus v1.2.0 h1:jOLf6pe6/vss4qGHjXmGz4oDJQA+AOCqEL3FvvZGz7M=
github.com/weaveworks/promrus v1.2.0/go.mod h1:SaE82+OJ91yqjrE1rsvBWVzNZKcHYFtMUyS1+Ogs/KA=
github.com/willf/bitset v1.1.3/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4=
Expand Down
8 changes: 3 additions & 5 deletions pkg/querier/remote_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,11 @@ const maxRemoteReadQuerySize = 1024 * 1024
// RemoteReadHandler handles Prometheus remote read requests.
func RemoteReadHandler(q storage.Queryable) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
compressionType := util.CompressionTypeFor(r.Header.Get("X-Prometheus-Remote-Read-Version"))

ctx := r.Context()
var req client.ReadRequest
logger := util.WithContext(r.Context(), util.Logger)
if err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRemoteReadQuerySize, &req, compressionType); err != nil {
level.Error(logger).Log("err", err.Error())
if err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRemoteReadQuerySize, &req, util.RawSnappy); err != nil {
level.Error(logger).Log("msg", "failed to parse proto", "err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
Expand Down Expand Up @@ -68,7 +66,7 @@ func RemoteReadHandler(q storage.Queryable) http.Handler {
return
}
w.Header().Add("Content-Type", "application/x-protobuf")
if err := util.SerializeProtoResponse(w, &resp, compressionType); err != nil {
if err := util.SerializeProtoResponse(w, &resp, util.RawSnappy); err != nil {
level.Error(logger).Log("msg", "error sending remote read response", "err", err)
}
})
Expand Down
147 changes: 87 additions & 60 deletions pkg/util/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@ import (
"net/http"
"strings"

"github.com/blang/semver"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
"gopkg.in/yaml.v2"
)

const messageSizeLargerErrFmt = "received message larger than max (%d vs %d)"

// WriteJSONResponse writes some JSON as a HTTP response.
func WriteJSONResponse(w http.ResponseWriter, v interface{}) {
w.Header().Set("Content-Type", "application/json")
Expand Down Expand Up @@ -81,71 +82,22 @@ type CompressionType int
// Values for CompressionType
const (
NoCompression CompressionType = iota
FramedSnappy
RawSnappy
)

var rawSnappyFromVersion = semver.MustParse("0.1.0")

// CompressionTypeFor a given version of the Prometheus remote storage protocol.
// See https://github.com/prometheus/prometheus/issues/2692.
func CompressionTypeFor(version string) CompressionType {
ver, err := semver.Make(version)
if err != nil {
return FramedSnappy
}

if ver.GTE(rawSnappyFromVersion) {
return RawSnappy
}
return FramedSnappy
}

// ParseProtoReader parses a compressed proto from an io.Reader.
func ParseProtoReader(ctx context.Context, reader io.Reader, expectedSize, maxSize int, req proto.Message, compression CompressionType) error {
var body []byte
var err error
sp := opentracing.SpanFromContext(ctx)
if sp != nil {
sp.LogFields(otlog.String("event", "util.ParseProtoRequest[start reading]"))
}
var buf bytes.Buffer
if expectedSize > 0 {
if expectedSize > maxSize {
return fmt.Errorf("message expected size larger than max (%d vs %d)", expectedSize, maxSize)
}
buf.Grow(expectedSize + bytes.MinRead) // extra space guarantees no reallocation
}
switch compression {
case NoCompression:
// Read from LimitReader with limit max+1. So if the underlying
// reader is over limit, the result will be bigger than max.
_, err = buf.ReadFrom(io.LimitReader(reader, int64(maxSize)+1))
body = buf.Bytes()
case FramedSnappy:
_, err = buf.ReadFrom(io.LimitReader(snappy.NewReader(reader), int64(maxSize)+1))
body = buf.Bytes()
case RawSnappy:
_, err = buf.ReadFrom(reader)
body = buf.Bytes()
if sp != nil {
sp.LogFields(otlog.String("event", "util.ParseProtoRequest[decompress]"),
otlog.Int("size", len(body)))
}
if err == nil && len(body) <= maxSize {
body, err = snappy.Decode(nil, body)
}
}
body, err := decompressRequest(reader, expectedSize, maxSize, compression, sp)
if err != nil {
return err
}
if len(body) > maxSize {
return fmt.Errorf("received message larger than max (%d vs %d)", len(body), maxSize)
}

if sp != nil {
sp.LogFields(otlog.String("event", "util.ParseProtoRequest[unmarshal]"),
otlog.Int("size", len(body)))
sp.LogFields(otlog.String("event", "util.ParseProtoRequest[unmarshal]"), otlog.Int("size", len(body)))
}

// We re-implement proto.Unmarshal here as it calls XXX_Unmarshal first,
Expand All @@ -163,6 +115,89 @@ func ParseProtoReader(ctx context.Context, reader io.Reader, expectedSize, maxSi
return nil
}

func decompressRequest(reader io.Reader, expectedSize, maxSize int, compression CompressionType, sp opentracing.Span) (body []byte, err error) {
defer func() {
if err != nil && len(body) > maxSize {
err = fmt.Errorf(messageSizeLargerErrFmt, len(body), maxSize)
}
}()
if expectedSize > maxSize {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: There is no need to use defer in this function, it just makes code more tricky to follow.

return nil, fmt.Errorf(messageSizeLargerErrFmt, expectedSize, maxSize)
}
buffer, ok := tryBufferFromReader(reader)
if ok {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to check for non-nil buffer. It would be slightly more robust, as reader implementing interface { BytesBuffer() *bytes.Buffer } can still return nil buffer, which currently leads to panic.

body, err = decompressFromBuffer(buffer, maxSize, compression, sp)
return
}
body, err = decompressFromReader(reader, expectedSize, maxSize, compression, sp)
return
}

func decompressFromReader(reader io.Reader, expectedSize, maxSize int, compression CompressionType, sp opentracing.Span) ([]byte, error) {
var (
buf bytes.Buffer
body []byte
err error
)
if expectedSize > 0 {
buf.Grow(expectedSize + bytes.MinRead) // extra space guarantees no reallocation
}
reader = io.LimitReader(reader, int64(maxSize)+1)
switch compression {
case NoCompression:
// Read from LimitReader with limit max+1. So if the underlying
// reader is over limit, the result will be bigger than max.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this comment above, where io.LimitReader is used, please.

_, err = buf.ReadFrom(reader)
body = buf.Bytes()
case RawSnappy:
_, err = buf.ReadFrom(reader)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a possible DoS attack? (I see it is what the old code did)
Seems we could use the same ReadFrom in either case, then check compression after.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could fix it up later; I don't mind merging this as-is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I see the potential issue, not sure if there's a better option then using a limitReader in both case ?

Decoding the length still leave us open for hijacked/fake requests.

I made the change let me know what you think ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think using LimitReader in all paths is correct, if the point is to stop someone blowing up the process.

Is 'result bigger than max' actually detected in the NoCompression case now?
Suggest just having one ReadFrom call then check the len, before we get into a routine named decompress

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes because we read a bit more the defer in decompressRequest will return the error.

if err != nil {
return nil, err
}
body, err = decompressFromBuffer(&buf, maxSize, RawSnappy, sp)
}
return body, err
}

func decompressFromBuffer(buffer *bytes.Buffer, maxSize int, compression CompressionType, sp opentracing.Span) ([]byte, error) {
if len(buffer.Bytes()) > maxSize {
return nil, fmt.Errorf(messageSizeLargerErrFmt, len(buffer.Bytes()), maxSize)
}
switch compression {
case NoCompression:
return buffer.Bytes(), nil
case RawSnappy:
if sp != nil {
sp.LogFields(otlog.String("event", "util.ParseProtoRequest[decompress]"),
otlog.Int("size", len(buffer.Bytes())))
}
size, err := snappy.DecodedLen(buffer.Bytes())
if err != nil {
return nil, err
}
if size > maxSize {
return nil, fmt.Errorf(messageSizeLargerErrFmt, size, maxSize)
}
body, err := snappy.Decode(nil, buffer.Bytes())
if err != nil {
return nil, err
}
return body, nil
}
return nil, nil
}

// tryBufferFromReader attempts to cast the reader to a `*bytes.Buffer` this is possible when using httpgrpc.
// If it fails it will return nil and false.
func tryBufferFromReader(reader io.Reader) (*bytes.Buffer, bool) {
if bufReader, ok := reader.(interface {
BytesBuffer() *bytes.Buffer
}); ok && bufReader != nil {
return bufReader.BytesBuffer(), true
}
return nil, false
}

// SerializeProtoResponse serializes a protobuf response into an HTTP response.
func SerializeProtoResponse(w http.ResponseWriter, resp proto.Message, compression CompressionType) error {
data, err := proto.Marshal(resp)
Expand All @@ -173,14 +208,6 @@ func SerializeProtoResponse(w http.ResponseWriter, resp proto.Message, compressi

switch compression {
case NoCompression:
case FramedSnappy:
buf := bytes.Buffer{}
writer := snappy.NewBufferedWriter(&buf)
if _, err := writer.Write(data); err != nil {
return err
}
writer.Close()
data = buf.Bytes()
case RawSnappy:
data = snappy.Encode(nil, data)
}
Expand Down
87 changes: 84 additions & 3 deletions pkg/util/http_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package util
package util_test

import (
"bytes"
"context"
"html/template"
"net/http/httptest"
"testing"

"github.com/stretchr/testify/assert"

"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/util"
)

func TestRenderHTTPResponse(t *testing.T) {
Expand Down Expand Up @@ -58,7 +63,7 @@ func TestRenderHTTPResponse(t *testing.T) {
request.Header.Add(k, v)
}

RenderHTTPResponse(writer, tt.value, tmpl, request)
util.RenderHTTPResponse(writer, tt.value, tmpl, request)

assert.Equal(t, tt.expectedContentType, writer.Header().Get("Content-Type"))
assert.Equal(t, 200, writer.Code)
Expand All @@ -70,9 +75,85 @@ func TestRenderHTTPResponse(t *testing.T) {
func TestWriteTextResponse(t *testing.T) {
w := httptest.NewRecorder()

WriteTextResponse(w, "hello world")
util.WriteTextResponse(w, "hello world")

assert.Equal(t, 200, w.Code)
assert.Equal(t, "hello world", w.Body.String())
assert.Equal(t, "text/plain", w.Header().Get("Content-Type"))
}

func TestParseProtoReader(t *testing.T) {
// 47 bytes compressed and 53 uncompressed
req := &client.PreallocWriteRequest{
WriteRequest: client.WriteRequest{
Timeseries: []client.PreallocTimeseries{
{
TimeSeries: &client.TimeSeries{
Labels: []client.LabelAdapter{
{Name: "foo", Value: "bar"},
},
Samples: []client.Sample{
{Value: 10, TimestampMs: 1},
{Value: 20, TimestampMs: 2},
{Value: 30, TimestampMs: 3},
},
},
},
},
},
}

for _, tt := range []struct {
name string
compression util.CompressionType
maxSize int
expectErr bool
useBytesBuffer bool
}{
{"rawSnappy", util.RawSnappy, 53, false, false},
{"noCompression", util.NoCompression, 53, false, false},
{"too big rawSnappy", util.RawSnappy, 10, true, false},
{"too big decoded rawSnappy", util.RawSnappy, 50, true, false},
{"too big noCompression", util.NoCompression, 10, true, false},

{"bytesbuffer rawSnappy", util.RawSnappy, 53, false, true},
{"bytesbuffer noCompression", util.NoCompression, 53, false, true},
{"bytesbuffer too big rawSnappy", util.RawSnappy, 10, true, true},
{"bytesbuffer too big decoded rawSnappy", util.RawSnappy, 50, true, true},
{"bytesbuffer too big noCompression", util.NoCompression, 10, true, true},
} {
t.Run(tt.name, func(t *testing.T) {
w := httptest.NewRecorder()
assert.Nil(t, util.SerializeProtoResponse(w, req, tt.compression))
var fromWire client.PreallocWriteRequest

reader := w.Result().Body
if tt.useBytesBuffer {
buf := bytes.Buffer{}
_, err := buf.ReadFrom(reader)
assert.Nil(t, err)
reader = bytesBuffered{Buffer: &buf}
}

err := util.ParseProtoReader(context.Background(), reader, 0, tt.maxSize, &fromWire, tt.compression)
if tt.expectErr {
assert.NotNil(t, err)
return
}
assert.Nil(t, err)
assert.Equal(t, req, &fromWire)
})
}
}

type bytesBuffered struct {
*bytes.Buffer
}

func (b bytesBuffered) Close() error {
return nil
}

func (b bytesBuffered) BytesBuffer() *bytes.Buffer {
return b.Buffer
}
3 changes: 1 addition & 2 deletions pkg/util/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@ func Handler(cfg distributor.Config, sourceIPs *middleware.SourceIPExtractor, pu
logger = util.WithSourceIPs(source, logger)
}
}
compressionType := util.CompressionTypeFor(r.Header.Get("X-Prometheus-Remote-Write-Version"))
var req client.PreallocWriteRequest
err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), cfg.MaxRecvMsgSize, &req, compressionType)
err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), cfg.MaxRecvMsgSize, &req, util.RawSnappy)
if err != nil {
level.Error(logger).Log("err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
Expand Down
Loading