Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: improve latency when streaming series from Prometheus #3146

Merged
merged 1 commit into from
Sep 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

- [#3133](https://github.com/thanos-io/thanos/pull/3133) Query: Allow passing a `storeMatch[]` to Labels APIs. Also time range metadata based store filtering is supported on Labels APIs.
- [#3154](https://github.com/thanos-io/thanos/pull/3154) Query Frontend: Add metric `thanos_memcached_getmulti_gate_queries_max`.
- [#3146](https://github.com/thanos-io/thanos/pull/3146) Sidecar: Add `thanos_sidecar_prometheus_store_received_frames` histogram metric.

### Changed

Expand Down
4 changes: 2 additions & 2 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func runSidecar(
})
lastHeartbeat := promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "thanos_sidecar_last_heartbeat_success_time_seconds",
Help: "Second timestamp of the last successful heartbeat.",
Help: "Timestamp of the last successful heartbeat in seconds.",
})

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -205,7 +205,7 @@ func runSidecar(
t.MaxIdleConns = conf.connection.maxIdleConns
c := promclient.NewClient(&http.Client{Transport: tracing.HTTPTripperware(logger, t)}, logger, thanoshttp.ThanosUserAgent)

promStore, err := store.NewPrometheusStore(logger, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps)
promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps)
if err != nil {
return errors.Wrap(err, "create Prometheus store")
}
Expand Down
35 changes: 15 additions & 20 deletions pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"github.com/golang/snappy"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/tsdb/chunkenc"
Expand All @@ -49,6 +51,8 @@ type PrometheusStore struct {
timestamps func() (mint int64, maxt int64)

remoteReadAcceptableResponses []prompb.ReadRequest_ResponseType

framesRead prometheus.Histogram
}

const initialBufSize = 32 * 1024 // 32KB seems like a good minimum starting size for sync pool size.
Expand All @@ -58,6 +62,7 @@ const initialBufSize = 32 * 1024 // 32KB seems like a good minimum starting size
// It attaches the provided external labels to all results.
func NewPrometheusStore(
logger log.Logger,
reg prometheus.Registerer,
client *promclient.Client,
baseURL *url.URL,
component component.StoreAPI,
Expand All @@ -79,6 +84,13 @@ func NewPrometheusStore(
b := make([]byte, 0, initialBufSize)
return &b
}},
framesRead: promauto.With(reg).NewHistogram(
prometheus.HistogramOpts{
Name: "prometheus_store_received_frames",
Help: "Number of frames received per streamed response.",
Buckets: prometheus.ExponentialBuckets(10, 10, 5),
},
),
}
return p, nil
}
Expand Down Expand Up @@ -259,20 +271,16 @@ func (p *PrometheusStore) handleStreamedPrometheusResponse(s storepb.Store_Serie
level.Debug(p.logger).Log("msg", "started handling ReadRequest_STREAMED_XOR_CHUNKS streamed read response.")

framesNum := 0
seriesNum := 0

defer func() {
p.framesRead.Observe(float64(framesNum))
querySpan.SetTag("frames", framesNum)
querySpan.SetTag("series", seriesNum)
querySpan.Finish()
}()
defer runutil.CloseWithLogOnErr(p.logger, httpResp.Body, "prom series request body")

var (
lastSeries string
currSeries string
tmp []string
data = p.getBuffer()
data = p.getBuffer()
)
defer p.putBuffer(data)

Expand All @@ -294,19 +302,6 @@ func (p *PrometheusStore) handleStreamedPrometheusResponse(s storepb.Store_Serie

framesNum++
for _, series := range res.ChunkedSeries {
{
// Calculate hash of series for counting.
tmp = tmp[:0]
for _, l := range series.Labels {
tmp = append(tmp, l.String())
}
currSeries = strings.Join(tmp, ";")
if currSeries != lastSeries {
seriesNum++
lastSeries = currSeries
}
}

thanosChks := make([]storepb.AggrChunk, len(series.Chunks))
for i, chk := range series.Chunks {
thanosChks[i] = storepb.AggrChunk{
Expand All @@ -332,7 +327,7 @@ func (p *PrometheusStore) handleStreamedPrometheusResponse(s storepb.Store_Serie
}
}
}
level.Debug(p.logger).Log("msg", "handled ReadRequest_STREAMED_XOR_CHUNKS request.", "frames", framesNum, "series", seriesNum)
level.Debug(p.logger).Log("msg", "handled ReadRequest_STREAMED_XOR_CHUNKS request.", "frames", framesNum)
return nil
}

Expand Down
16 changes: 8 additions & 8 deletions pkg/store/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func testPrometheusStoreSeriesE2e(t *testing.T, prefix string) {
testutil.Ok(t, err)

limitMinT := int64(0)
proxy, err := NewPrometheusStore(nil, promclient.NewDefaultClient(), u, component.Sidecar,
proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return limitMinT, -1 }) // Maxt does not matter.
testutil.Ok(t, err)
Expand Down Expand Up @@ -198,7 +198,7 @@ func TestPrometheusStore_SeriesLabels_e2e(t *testing.T) {
u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr()))
testutil.Ok(t, err)

promStore, err := NewPrometheusStore(nil, promclient.NewDefaultClient(), u, component.Sidecar,
promStore, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return math.MinInt64/1000 + 62135596801, math.MaxInt64/1000 - 62135596801 })
testutil.Ok(t, err)
Expand Down Expand Up @@ -376,7 +376,7 @@ func TestPrometheusStore_LabelNames_e2e(t *testing.T) {
u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr()))
testutil.Ok(t, err)

proxy, err := NewPrometheusStore(nil, promclient.NewDefaultClient(), u, component.Sidecar, getExternalLabels, nil)
proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, getExternalLabels, nil)
testutil.Ok(t, err)

resp, err := proxy.LabelNames(ctx, &storepb.LabelNamesRequest{
Expand Down Expand Up @@ -421,7 +421,7 @@ func TestPrometheusStore_LabelValues_e2e(t *testing.T) {
u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr()))
testutil.Ok(t, err)

proxy, err := NewPrometheusStore(nil, promclient.NewDefaultClient(), u, component.Sidecar, getExternalLabels, nil)
proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, getExternalLabels, nil)
testutil.Ok(t, err)

resp, err := proxy.LabelValues(ctx, &storepb.LabelValuesRequest{
Expand Down Expand Up @@ -467,7 +467,7 @@ func TestPrometheusStore_ExternalLabelValues_e2e(t *testing.T) {
u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr()))
testutil.Ok(t, err)

proxy, err := NewPrometheusStore(nil, promclient.NewDefaultClient(), u, component.Sidecar, getExternalLabels, nil)
proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, getExternalLabels, nil)
testutil.Ok(t, err)

resp, err := proxy.LabelValues(ctx, &storepb.LabelValuesRequest{
Expand Down Expand Up @@ -511,7 +511,7 @@ func TestPrometheusStore_Series_MatchExternalLabel_e2e(t *testing.T) {
u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr()))
testutil.Ok(t, err)

proxy, err := NewPrometheusStore(nil, promclient.NewDefaultClient(), u, component.Sidecar,
proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return 0, math.MaxInt64 })
testutil.Ok(t, err)
Expand Down Expand Up @@ -556,7 +556,7 @@ func TestPrometheusStore_Info(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

proxy, err := NewPrometheusStore(nil, promclient.NewDefaultClient(), nil, component.Sidecar,
proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), nil, component.Sidecar,
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return 123, 456 })
testutil.Ok(t, err)
Expand Down Expand Up @@ -634,7 +634,7 @@ func TestPrometheusStore_Series_SplitSamplesIntoChunksWithMaxSizeOf120(t *testin
u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr()))
testutil.Ok(t, err)

proxy, err := NewPrometheusStore(nil, promclient.NewDefaultClient(), u, component.Sidecar,
proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return 0, math.MaxInt64 })
testutil.Ok(t, err)
Expand Down