Skip to content

Commit

Permalink
*: improve latency when streaming series from Prometheus
Browse files Browse the repository at this point in the history
I've found that when requesting many series (in the order of ten
thousands), the Thanos sidecar spends half of its time computing the
number of received series. To calculate the number of series, it needs
to build a label-based identifier for each chunked series and compare it
with the previous identifier. Eventually this number is only used for
logging and tracing so it doesn't feel like it's worth the penalty.

This change adds an histogram metric,
`thanos_sidecar_prometheus_store_received_frames`, which tracks the
number of frames per request received from the Prometheus remote read
API (buckets: 10, 100, 1000, 10000, 100000). It can be used to evaluate
whether expensive Series requests are being performed.

Signed-off-by: Simon Pasquier <[email protected]>
  • Loading branch information
simonpasquier committed Sep 10, 2020
1 parent eee604f commit ae9dbe4
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 30 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

## Unreleased

### Added

- [#3146](https://github.com/thanos-io/thanos/pull/3146) Sidecar: Add `thanos_sidecar_prometheus_store_stream_frames` histogram metric.

## [v0.15.0](https://github.com/thanos-io/thanos/releases) - 2020.09.07

Highlights:
Expand Down
4 changes: 2 additions & 2 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func runSidecar(
})
lastHeartbeat := promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "thanos_sidecar_last_heartbeat_success_time_seconds",
Help: "Second timestamp of the last successful heartbeat.",
Help: "Timestamp of the last successful heartbeat in seconds.",
})

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -205,7 +205,7 @@ func runSidecar(
t.MaxIdleConns = conf.connection.maxIdleConns
c := promclient.NewClient(&http.Client{Transport: tracing.HTTPTripperware(logger, t)}, logger, thanoshttp.ThanosUserAgent)

promStore, err := store.NewPrometheusStore(logger, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps)
promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps)
if err != nil {
return errors.Wrap(err, "create Prometheus store")
}
Expand Down
35 changes: 15 additions & 20 deletions pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"github.com/golang/snappy"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/tsdb/chunkenc"
Expand All @@ -49,6 +51,8 @@ type PrometheusStore struct {
timestamps func() (mint int64, maxt int64)

remoteReadAcceptableResponses []prompb.ReadRequest_ResponseType

framesRead prometheus.Histogram
}

const initialBufSize = 32 * 1024 // 32KB seems like a good minimum starting size for sync pool size.
Expand All @@ -58,6 +62,7 @@ const initialBufSize = 32 * 1024 // 32KB seems like a good minimum starting size
// It attaches the provided external labels to all results.
func NewPrometheusStore(
logger log.Logger,
reg prometheus.Registerer,
client *promclient.Client,
baseURL *url.URL,
component component.StoreAPI,
Expand All @@ -79,6 +84,13 @@ func NewPrometheusStore(
b := make([]byte, 0, initialBufSize)
return &b
}},
framesRead: promauto.With(reg).NewHistogram(
prometheus.HistogramOpts{
Name: "prometheus_store_stream_frames",
Help: "Number of frames received per streamed response.",
Buckets: prometheus.ExponentialBuckets(10, 10, 5),
},
),
}
return p, nil
}
Expand Down Expand Up @@ -259,20 +271,16 @@ func (p *PrometheusStore) handleStreamedPrometheusResponse(s storepb.Store_Serie
level.Debug(p.logger).Log("msg", "started handling ReadRequest_STREAMED_XOR_CHUNKS streamed read response.")

framesNum := 0
seriesNum := 0

defer func() {
p.framesRead.Observe(float64(framesNum))
querySpan.SetTag("frames", framesNum)
querySpan.SetTag("series", seriesNum)
querySpan.Finish()
}()
defer runutil.CloseWithLogOnErr(p.logger, httpResp.Body, "prom series request body")

var (
lastSeries string
currSeries string
tmp []string
data = p.getBuffer()
data = p.getBuffer()
)
defer p.putBuffer(data)

Expand All @@ -294,19 +302,6 @@ func (p *PrometheusStore) handleStreamedPrometheusResponse(s storepb.Store_Serie

framesNum++
for _, series := range res.ChunkedSeries {
{
// Calculate hash of series for counting.
tmp = tmp[:0]
for _, l := range series.Labels {
tmp = append(tmp, l.String())
}
currSeries = strings.Join(tmp, ";")
if currSeries != lastSeries {
seriesNum++
lastSeries = currSeries
}
}

thanosChks := make([]storepb.AggrChunk, len(series.Chunks))
for i, chk := range series.Chunks {
thanosChks[i] = storepb.AggrChunk{
Expand All @@ -332,7 +327,7 @@ func (p *PrometheusStore) handleStreamedPrometheusResponse(s storepb.Store_Serie
}
}
}
level.Debug(p.logger).Log("msg", "handled ReadRequest_STREAMED_XOR_CHUNKS request.", "frames", framesNum, "series", seriesNum)
level.Debug(p.logger).Log("msg", "handled ReadRequest_STREAMED_XOR_CHUNKS request.", "frames", framesNum)
return nil
}

Expand Down
16 changes: 8 additions & 8 deletions pkg/store/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func testPrometheusStoreSeriesE2e(t *testing.T, prefix string) {
testutil.Ok(t, err)

limitMinT := int64(0)
proxy, err := NewPrometheusStore(nil, promclient.NewDefaultClient(), u, component.Sidecar,
proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return limitMinT, -1 }) // Maxt does not matter.
testutil.Ok(t, err)
Expand Down Expand Up @@ -198,7 +198,7 @@ func TestPrometheusStore_SeriesLabels_e2e(t *testing.T) {
u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr()))
testutil.Ok(t, err)

promStore, err := NewPrometheusStore(nil, promclient.NewDefaultClient(), u, component.Sidecar,
promStore, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return math.MinInt64/1000 + 62135596801, math.MaxInt64/1000 - 62135596801 })
testutil.Ok(t, err)
Expand Down Expand Up @@ -376,7 +376,7 @@ func TestPrometheusStore_LabelNames_e2e(t *testing.T) {
u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr()))
testutil.Ok(t, err)

proxy, err := NewPrometheusStore(nil, promclient.NewDefaultClient(), u, component.Sidecar, getExternalLabels, nil)
proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, getExternalLabels, nil)
testutil.Ok(t, err)

resp, err := proxy.LabelNames(ctx, &storepb.LabelNamesRequest{
Expand Down Expand Up @@ -421,7 +421,7 @@ func TestPrometheusStore_LabelValues_e2e(t *testing.T) {
u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr()))
testutil.Ok(t, err)

proxy, err := NewPrometheusStore(nil, promclient.NewDefaultClient(), u, component.Sidecar, getExternalLabels, nil)
proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, getExternalLabels, nil)
testutil.Ok(t, err)

resp, err := proxy.LabelValues(ctx, &storepb.LabelValuesRequest{
Expand Down Expand Up @@ -467,7 +467,7 @@ func TestPrometheusStore_ExternalLabelValues_e2e(t *testing.T) {
u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr()))
testutil.Ok(t, err)

proxy, err := NewPrometheusStore(nil, promclient.NewDefaultClient(), u, component.Sidecar, getExternalLabels, nil)
proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, getExternalLabels, nil)
testutil.Ok(t, err)

resp, err := proxy.LabelValues(ctx, &storepb.LabelValuesRequest{
Expand Down Expand Up @@ -511,7 +511,7 @@ func TestPrometheusStore_Series_MatchExternalLabel_e2e(t *testing.T) {
u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr()))
testutil.Ok(t, err)

proxy, err := NewPrometheusStore(nil, promclient.NewDefaultClient(), u, component.Sidecar,
proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return 0, math.MaxInt64 })
testutil.Ok(t, err)
Expand Down Expand Up @@ -556,7 +556,7 @@ func TestPrometheusStore_Info(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

proxy, err := NewPrometheusStore(nil, promclient.NewDefaultClient(), nil, component.Sidecar,
proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), nil, component.Sidecar,
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return 123, 456 })
testutil.Ok(t, err)
Expand Down Expand Up @@ -634,7 +634,7 @@ func TestPrometheusStore_Series_SplitSamplesIntoChunksWithMaxSizeOf120(t *testin
u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr()))
testutil.Ok(t, err)

proxy, err := NewPrometheusStore(nil, promclient.NewDefaultClient(), u, component.Sidecar,
proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return 0, math.MaxInt64 })
testutil.Ok(t, err)
Expand Down

0 comments on commit ae9dbe4

Please sign in to comment.