diff --git a/CHANGELOG.md b/CHANGELOG.md index 63ec3e6b976..8cd06466be8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,10 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ## Unreleased +### Added + +- [#3146](https://github.com/thanos-io/thanos/pull/3146) Sidecar: Add `thanos_sidecar_prometheus_store_stream_frames` histogram metric. + ## [v0.15.0](https://github.com/thanos-io/thanos/releases) - 2020.09.07 Highlights: diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 2a790226690..0a7842eed5b 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -127,7 +127,7 @@ func runSidecar( }) lastHeartbeat := promauto.With(reg).NewGauge(prometheus.GaugeOpts{ Name: "thanos_sidecar_last_heartbeat_success_time_seconds", - Help: "Second timestamp of the last successful heartbeat.", + Help: "Timestamp of the last successful heartbeat in seconds.", }) ctx, cancel := context.WithCancel(context.Background()) @@ -205,7 +205,7 @@ func runSidecar( t.MaxIdleConns = conf.connection.maxIdleConns c := promclient.NewClient(&http.Client{Transport: tracing.HTTPTripperware(logger, t)}, logger, thanoshttp.ThanosUserAgent) - promStore, err := store.NewPrometheusStore(logger, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps) + promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps) if err != nil { return errors.Wrap(err, "create Prometheus store") } diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index e937a40192b..3009e2a6183 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -23,6 +23,8 @@ import ( "github.com/golang/snappy" "github.com/opentracing/opentracing-go" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage/remote" "github.com/prometheus/prometheus/tsdb/chunkenc" @@ -49,6 +51,8 @@ type PrometheusStore struct { timestamps func() (mint int64, maxt int64) remoteReadAcceptableResponses []prompb.ReadRequest_ResponseType + + framesRead prometheus.Histogram } const initialBufSize = 32 * 1024 // 32KB seems like a good minimum starting size for sync pool size. @@ -58,6 +62,7 @@ const initialBufSize = 32 * 1024 // 32KB seems like a good minimum starting size // It attaches the provided external labels to all results. func NewPrometheusStore( logger log.Logger, + reg *prometheus.Registry, client *promclient.Client, baseURL *url.URL, component component.StoreAPI, @@ -79,6 +84,13 @@ func NewPrometheusStore( b := make([]byte, 0, initialBufSize) return &b }}, + framesRead: promauto.With(reg).NewHistogram( + prometheus.HistogramOpts{ + Name: "prometheus_store_stream_frames", + Help: "Number of frames received per streamed response.", + Buckets: prometheus.ExponentialBuckets(10, 10, 5), + }, + ), } return p, nil } @@ -259,20 +271,16 @@ func (p *PrometheusStore) handleStreamedPrometheusResponse(s storepb.Store_Serie level.Debug(p.logger).Log("msg", "started handling ReadRequest_STREAMED_XOR_CHUNKS streamed read response.") framesNum := 0 - seriesNum := 0 defer func() { + p.framesRead.Observe(float64(framesNum)) querySpan.SetTag("frames", framesNum) - querySpan.SetTag("series", seriesNum) querySpan.Finish() }() defer runutil.CloseWithLogOnErr(p.logger, httpResp.Body, "prom series request body") var ( - lastSeries string - currSeries string - tmp []string - data = p.getBuffer() + data = p.getBuffer() ) defer p.putBuffer(data) @@ -294,19 +302,6 @@ func (p *PrometheusStore) handleStreamedPrometheusResponse(s storepb.Store_Serie framesNum++ for _, series := range res.ChunkedSeries { - { - // Calculate hash of series for counting. - tmp = tmp[:0] - for _, l := range series.Labels { - tmp = append(tmp, l.String()) - } - currSeries = strings.Join(tmp, ";") - if currSeries != lastSeries { - seriesNum++ - lastSeries = currSeries - } - } - thanosChks := make([]storepb.AggrChunk, len(series.Chunks)) for i, chk := range series.Chunks { thanosChks[i] = storepb.AggrChunk{ @@ -332,7 +327,7 @@ func (p *PrometheusStore) handleStreamedPrometheusResponse(s storepb.Store_Serie } } } - level.Debug(p.logger).Log("msg", "handled ReadRequest_STREAMED_XOR_CHUNKS request.", "frames", framesNum, "series", seriesNum) + level.Debug(p.logger).Log("msg", "handled ReadRequest_STREAMED_XOR_CHUNKS request.", "frames", framesNum) return nil }