Skip to content

Commit

Permalink
*: improve latency when streaming series from Prometheus
Browse files Browse the repository at this point in the history
I've found that when requesting many series (in the order of ten
thousands), the Thanos sidecar spends half of its time computing the
number of received series. To calculate the number of series, it needs
to build a label-based identifier for each chunked series and compare it
with the previous identifier. Eventually this number is only used for
logging and tracing so it doesn't feel like it's worth the penalty.

This change adds an histogram metric,
`thanos_sidecar_prometheus_store_stream_frames`, which tracks the number
of frames per request received from the Prometheus remote read API
(buckets: 10, 100, 1000, 10000, 100000). It can be used to evaluate
whether expensive Series requests are performed.

Signed-off-by: Simon Pasquier <[email protected]>
  • Loading branch information
simonpasquier committed Sep 9, 2020
1 parent 39c2b41 commit 6389103
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 22 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

## Unreleased

### Added

- [#3146](https://github.com/thanos-io/thanos/pull/3146) Sidecar: Add `thanos_sidecar_prometheus_store_stream_frames` histogram metric.

## [v0.15.0](https://github.com/thanos-io/thanos/releases) - 2020.09.07

Highlights:
Expand Down
4 changes: 2 additions & 2 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func runSidecar(
})
lastHeartbeat := promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "thanos_sidecar_last_heartbeat_success_time_seconds",
Help: "Second timestamp of the last successful heartbeat.",
Help: "Timestamp of the last successful heartbeat in seconds.",
})

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -205,7 +205,7 @@ func runSidecar(
t.MaxIdleConns = conf.connection.maxIdleConns
c := promclient.NewClient(&http.Client{Transport: tracing.HTTPTripperware(logger, t)}, logger, thanoshttp.ThanosUserAgent)

promStore, err := store.NewPrometheusStore(logger, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps)
promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps)
if err != nil {
return errors.Wrap(err, "create Prometheus store")
}
Expand Down
35 changes: 15 additions & 20 deletions pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"github.com/golang/snappy"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/tsdb/chunkenc"
Expand All @@ -49,6 +51,8 @@ type PrometheusStore struct {
timestamps func() (mint int64, maxt int64)

remoteReadAcceptableResponses []prompb.ReadRequest_ResponseType

framesRead prometheus.Histogram
}

const initialBufSize = 32 * 1024 // 32KB seems like a good minimum starting size for sync pool size.
Expand All @@ -58,6 +62,7 @@ const initialBufSize = 32 * 1024 // 32KB seems like a good minimum starting size
// It attaches the provided external labels to all results.
func NewPrometheusStore(
logger log.Logger,
reg *prometheus.Registry,
client *promclient.Client,
baseURL *url.URL,
component component.StoreAPI,
Expand All @@ -79,6 +84,13 @@ func NewPrometheusStore(
b := make([]byte, 0, initialBufSize)
return &b
}},
framesRead: promauto.With(reg).NewHistogram(
prometheus.HistogramOpts{
Name: "prometheus_store_stream_frames",
Help: "Number of frames received per streamed response.",
Buckets: prometheus.ExponentialBuckets(10, 10, 5),
},
),
}
return p, nil
}
Expand Down Expand Up @@ -259,20 +271,16 @@ func (p *PrometheusStore) handleStreamedPrometheusResponse(s storepb.Store_Serie
level.Debug(p.logger).Log("msg", "started handling ReadRequest_STREAMED_XOR_CHUNKS streamed read response.")

framesNum := 0
seriesNum := 0

defer func() {
p.framesRead.Observe(float64(framesNum))
querySpan.SetTag("frames", framesNum)
querySpan.SetTag("series", seriesNum)
querySpan.Finish()
}()
defer runutil.CloseWithLogOnErr(p.logger, httpResp.Body, "prom series request body")

var (
lastSeries string
currSeries string
tmp []string
data = p.getBuffer()
data = p.getBuffer()
)
defer p.putBuffer(data)

Expand All @@ -294,19 +302,6 @@ func (p *PrometheusStore) handleStreamedPrometheusResponse(s storepb.Store_Serie

framesNum++
for _, series := range res.ChunkedSeries {
{
// Calculate hash of series for counting.
tmp = tmp[:0]
for _, l := range series.Labels {
tmp = append(tmp, l.String())
}
currSeries = strings.Join(tmp, ";")
if currSeries != lastSeries {
seriesNum++
lastSeries = currSeries
}
}

thanosChks := make([]storepb.AggrChunk, len(series.Chunks))
for i, chk := range series.Chunks {
thanosChks[i] = storepb.AggrChunk{
Expand All @@ -332,7 +327,7 @@ func (p *PrometheusStore) handleStreamedPrometheusResponse(s storepb.Store_Serie
}
}
}
level.Debug(p.logger).Log("msg", "handled ReadRequest_STREAMED_XOR_CHUNKS request.", "frames", framesNum, "series", seriesNum)
level.Debug(p.logger).Log("msg", "handled ReadRequest_STREAMED_XOR_CHUNKS request.", "frames", framesNum)
return nil
}

Expand Down

0 comments on commit 6389103

Please sign in to comment.