From 782f0caa088eb6068df44fb8fb9c0964e3d1775b Mon Sep 17 00:00:00 2001 From: Bartek Plotka Date: Wed, 9 Oct 2019 18:10:25 +0100 Subject: [PATCH] Added min-time limitaiton for sidecar. This allows optionally storing longer retention time on Prometheus. Signed-off-by: Bartek Plotka --- .circleci/config.yml | 2 +- Makefile | 9 ++--- cmd/thanos/sidecar.go | 21 +++++++++-- cmd/thanos/store.go | 4 +-- docs/components/sidecar.md | 7 ++++ docs/components/store.md | 16 ++++----- pkg/store/prometheus.go | 23 +++++++++---- pkg/store/prometheus_test.go | 67 +++++++++++++++++++++++++++--------- pkg/testutil/prometheus.go | 2 +- 9 files changed, 108 insertions(+), 43 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index b1d34a15c28..389399039cd 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -4,7 +4,7 @@ jobs: test: docker: # Build by Thanos make docker-ci - - image: quay.io/thanos/thanos-ci:v0.1.0 + - image: quay.io/thanos/thanos-ci:v0.2.0 working_directory: /go/src/github.com/thanos-io/thanos environment: GO111MODULE: 'on' diff --git a/Makefile b/Makefile index e606b9ea07c..05773257ce5 100644 --- a/Makefile +++ b/Makefile @@ -3,6 +3,7 @@ FILES_TO_FMT ?= $(shell find . -path ./vendor -prune -o -name '*.go' -print DOCKER_IMAGE_REPO ?= quay.io/thanos/thanos DOCKER_IMAGE_TAG ?= $(subst /,-,$(shell git rev-parse --abbrev-ref HEAD))-$(shell date +%Y-%m-%d)-$(shell git rev-parse --short HEAD) +DOCKER_CI_TAG ?= test TMP_GOPATH ?= /tmp/thanos-go GOBIN ?= $(firstword $(subst :, ,${GOPATH}))/bin @@ -45,8 +46,8 @@ ME ?= $(shell whoami) # Referenced by github.com/thanos-io/thanos/blob/master/docs/getting_started.md#prometheus # Limited prom version, because testing was not possible. This should fix it: https://github.com/thanos-io/thanos/issues/758 -PROM_VERSIONS ?= v2.4.3 v2.5.0 v2.8.1 v2.9.2 -PROMS ?= $(GOBIN)/prometheus-v2.4.3 $(GOBIN)/prometheus-v2.5.0 $(GOBIN)/prometheus-v2.8.1 $(GOBIN)/prometheus-v2.9.2 +PROM_VERSIONS ?= v2.4.3 v2.5.0 v2.8.1 v2.9.2 v2.13.0 +PROMS ?= $(GOBIN)/prometheus-v2.4.3 $(GOBIN)/prometheus-v2.5.0 $(GOBIN)/prometheus-v2.8.1 $(GOBIN)/prometheus-v2.9.2 $(GOBIN)/prometheus-v2.13.0 ALERTMANAGER_VERSION ?= v0.15.2 ALERTMANAGER ?= $(GOBIN)/alertmanager-$(ALERTMANAGER_VERSION) @@ -234,8 +235,8 @@ docker-ci: install-deps @cp -r $(GOBIN)/* ./tmp/bin @docker build -t thanos-ci -f Dockerfile.thanos-ci . @echo ">> pushing thanos-ci image" - @docker tag "thanos-ci" "quay.io/thanos/thanos-ci:v0.1.0" - @docker push "quay.io/thanos/thanos-ci:v0.1.0" + @docker tag "thanos-ci" "quay.io/thanos/thanos-ci:$(DOCKER_CI_TAG)" + @docker push "quay.io/thanos/thanos-ci:$(DOCKER_CI_TAG)" # tooling deps. TODO(bwplotka): Pin them all to certain version! .PHONY: check-git diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 13e75e5b747..9a39d63e8ff 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -20,6 +20,7 @@ import ( "github.com/prometheus/prometheus/tsdb/labels" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/component" + thanosmodel "github.com/thanos-io/thanos/pkg/model" "github.com/thanos-io/thanos/pkg/objstore/client" "github.com/thanos-io/thanos/pkg/prober" "github.com/thanos-io/thanos/pkg/promclient" @@ -56,6 +57,9 @@ func registerSidecar(m map[string]setupFunc, app *kingpin.Application) { uploadCompacted := cmd.Flag("shipper.upload-compacted", "[Experimental] If true sidecar will try to upload compacted blocks as well. Useful for migration purposes. Works only if compaction is disabled on Prometheus.").Default("false").Hidden().Bool() + minTime := thanosmodel.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to serve. Thanos sidecar will serve only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y."). + Default("0000-01-01T00:00:00Z")) + m[component.Sidecar.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error { rl := reloader.New( log.With(logger, "component", "reloader"), @@ -64,6 +68,7 @@ func registerSidecar(m map[string]setupFunc, app *kingpin.Application) { *reloaderCfgOutputFile, *reloaderRuleDirs, ) + return runSidecar( g, logger, @@ -80,6 +85,7 @@ func registerSidecar(m map[string]setupFunc, app *kingpin.Application) { rl, *uploadCompacted, component.Sidecar, + *minTime, ) } } @@ -100,14 +106,17 @@ func runSidecar( reloader *reloader.Reloader, uploadCompacted bool, comp component.Component, + limitMinTime thanosmodel.TimeOrDurationValue, ) error { var m = &promMetadata{ promURL: promURL, // Start out with the full time range. The shipper will constrain it later. // TODO(fabxc): minimum timestamp is never adjusted if shipping is disabled. - mint: 0, + mint: limitMinTime.PrometheusTimestamp(), maxt: math.MaxInt64, + + limitMinTime: limitMinTime, } confContentYaml, err := objStoreConfig.Content() @@ -285,9 +294,9 @@ func runSidecar( minTime, _, err := s.Timestamps() if err != nil { level.Warn(logger).Log("msg", "reading timestamps failed", "err", err) - } else { - m.UpdateTimestamps(minTime, math.MaxInt64) + return nil } + m.UpdateTimestamps(minTime, math.MaxInt64) return nil }) }, func(error) { @@ -341,6 +350,8 @@ type promMetadata struct { mint int64 maxt int64 labels labels.Labels + + limitMinTime thanosmodel.TimeOrDurationValue } func (s *promMetadata) UpdateLabels(ctx context.Context, logger log.Logger) error { @@ -360,6 +371,10 @@ func (s *promMetadata) UpdateTimestamps(mint int64, maxt int64) { s.mtx.Lock() defer s.mtx.Unlock() + if mint < s.limitMinTime.PrometheusTimestamp() { + mint = s.limitMinTime.PrometheusTimestamp() + } + s.mint = mint s.maxt = maxt } diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 8cacabcfca6..8dc398ff0ae 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -52,10 +52,10 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) { blockSyncConcurrency := cmd.Flag("block-sync-concurrency", "Number of goroutines to use when syncing blocks from object storage."). Default("20").Int() - minTime := model.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to serve. Thanos Store serves only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y."). + minTime := model.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to serve. Thanos Store will serve only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y."). Default("0000-01-01T00:00:00Z")) - maxTime := model.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to serve. Thanos Store serves only blocks, which happened eariler than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y."). + maxTime := model.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to serve. Thanos Store will serve only blocks, which happened eariler than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y."). Default("9999-12-31T23:59:59Z")) m[component.Store.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, debugLogging bool) error { diff --git a/docs/components/sidecar.md b/docs/components/sidecar.md index b6868cf6b85..027d7089b16 100644 --- a/docs/components/sidecar.md +++ b/docs/components/sidecar.md @@ -125,5 +125,12 @@ Flags: contains object store configuration. See format details: https://thanos.io/storage.md/#configuration + --min-time=0000-01-01T00:00:00Z + Start of time range limit to serve. Thanos + sidecar will serve only metrics, which happened + later than this value. Option can be a constant + time in RFC3339 format or time duration + relative to current time, such as -1d or 2h45m. + Valid duration units are ms, s, m, h, d, w, y. ``` diff --git a/docs/components/store.md b/docs/components/store.md index 072dbfbae12..05f02894e79 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -93,16 +93,16 @@ Flags: from object storage. --min-time=0000-01-01T00:00:00Z Start of time range limit to serve. Thanos - Store serves only metrics, which happened later - than this value. Option can be a constant time - in RFC3339 format or time duration relative to - current time, such as -1d or 2h45m. Valid - duration units are ms, s, m, h, d, w, y. + Store will serve only metrics, which happened + later than this value. Option can be a constant + time in RFC3339 format or time duration + relative to current time, such as -1d or 2h45m. + Valid duration units are ms, s, m, h, d, w, y. --max-time=9999-12-31T23:59:59Z End of time range limit to serve. Thanos Store - serves only blocks, which happened eariler than - this value. Option can be a constant time in - RFC3339 format or time duration relative to + will serve only blocks, which happened eariler + than this value. Option can be a constant time + in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y. diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index 2af0b16277d..3102fb3465a 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -50,6 +50,8 @@ type PrometheusStore struct { component component.StoreAPI externalLabels func() labels.Labels timestamps func() (mint int64, maxt int64) + + remoteReadAcceptableResponses []prompb.ReadRequest_ResponseType } // NewPrometheusStore returns a new PrometheusStore that uses the given HTTP client @@ -72,12 +74,13 @@ func NewPrometheusStore( } } p := &PrometheusStore{ - logger: logger, - base: baseURL, - client: client, - component: component, - externalLabels: externalLabels, - timestamps: timestamps, + logger: logger, + base: baseURL, + client: client, + component: component, + externalLabels: externalLabels, + timestamps: timestamps, + remoteReadAcceptableResponses: []prompb.ReadRequest_ResponseType{prompb.ReadRequest_STREAMED_XOR_CHUNKS, prompb.ReadRequest_SAMPLES}, } return p, nil } @@ -143,6 +146,12 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie return status.Error(codes.InvalidArgument, errors.New("no matchers specified (excluding external labels)").Error()) } + // Don't ask for more than available time. This includes potential `minTime` flag limit. + availableMinTime, _ := p.timestamps() + if r.MinTime < availableMinTime { + r.MinTime = availableMinTime + } + q := &prompb.Query{StartTimestampMs: r.MinTime, EndTimestampMs: r.MaxTime} for _, m := range newMatchers { @@ -375,7 +384,7 @@ func (p *PrometheusStore) chunkSamples(series *prompb.TimeSeries, maxSamplesPerC func (p *PrometheusStore) startPromSeries(ctx context.Context, q *prompb.Query) (*http.Response, error) { reqb, err := proto.Marshal(&prompb.ReadRequest{ Queries: []*prompb.Query{q}, - AcceptedResponseTypes: []prompb.ReadRequest_ResponseType{prompb.ReadRequest_STREAMED_XOR_CHUNKS}, + AcceptedResponseTypes: p.remoteReadAcceptableResponses, }) if err != nil { return nil, errors.Wrap(err, "marshal read request") diff --git a/pkg/store/prometheus_test.go b/pkg/store/prometheus_test.go index d8bb3ed634b..609ae117636 100644 --- a/pkg/store/prometheus_test.go +++ b/pkg/store/prometheus_test.go @@ -10,6 +10,7 @@ import ( "github.com/fortytw2/leaktest" "github.com/prometheus/prometheus/pkg/timestamp" + "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/labels" @@ -55,15 +56,15 @@ func testPrometheusStoreSeriesE2e(t *testing.T, prefix string) { u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr())) testutil.Ok(t, err) + limitMinT := int64(0) proxy, err := NewPrometheusStore(nil, nil, u, component.Sidecar, - func() labels.Labels { - return labels.FromStrings("region", "eu-west") - }, nil) + func() labels.Labels { return labels.FromStrings("region", "eu-west") }, + func() (int64, int64) { return limitMinT, -1 }) // Maxt does not matter. testutil.Ok(t, err) + // Query all three samples except for the first one. Since we round up queried data + // to seconds, we can test whether the extra sample gets stripped properly. { - // Query all three samples except for the first one. Since we round up queried data - // to seconds, we can test whether the extra sample gets stripped properly. srv := newStoreSeriesServer(ctx) testutil.Ok(t, proxy.Series(&storepb.SeriesRequest{ MinTime: baseT + 101, @@ -92,6 +93,38 @@ func testPrometheusStoreSeriesE2e(t *testing.T, prefix string) { testutil.Equals(t, []sample{{baseT + 200, 2}, {baseT + 300, 3}}, samples) } + // Query all samples, but limit mint time to exclude the first one. + { + limitMinT = baseT + 101 + srv := newStoreSeriesServer(ctx) + testutil.Ok(t, proxy.Series(&storepb.SeriesRequest{ + MinTime: 0, + MaxTime: baseT + 300, + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "a", Value: "b"}, + }, + }, srv)) + // Revert for next cases. + limitMinT = 0 + + testutil.Equals(t, 1, len(srv.SeriesSet)) + + testutil.Equals(t, []storepb.Label{ + {Name: "a", Value: "b"}, + {Name: "region", Value: "eu-west"}, + }, srv.SeriesSet[0].Labels) + + testutil.Equals(t, 1, len(srv.SeriesSet[0].Chunks)) + + c := srv.SeriesSet[0].Chunks[0] + testutil.Equals(t, storepb.Chunk_XOR, c.Raw.Type) + + chk, err := chunkenc.FromData(chunkenc.EncXOR, c.Raw.Data) + testutil.Ok(t, err) + + samples := expandChunk(chk.Iterator(nil)) + testutil.Equals(t, []sample{{baseT + 200, 2}, {baseT + 300, 3}}, samples) + } // Querying by external labels only. { srv := newStoreSeriesServer(ctx) @@ -230,9 +263,9 @@ func TestPrometheusStore_Series_MatchExternalLabel_e2e(t *testing.T) { testutil.Ok(t, err) proxy, err := NewPrometheusStore(nil, nil, u, component.Sidecar, - func() labels.Labels { - return labels.FromStrings("region", "eu-west") - }, nil) + func() labels.Labels { return labels.FromStrings("region", "eu-west") }, + func() (int64, int64) { return 0, math.MaxInt64 }, + ) testutil.Ok(t, err) srv := newStoreSeriesServer(ctx) @@ -276,12 +309,9 @@ func TestPrometheusStore_Info(t *testing.T) { defer cancel() proxy, err := NewPrometheusStore(nil, nil, nil, component.Sidecar, - func() labels.Labels { - return labels.FromStrings("region", "eu-west") - }, - func() (int64, int64) { - return 123, 456 - }) + func() labels.Labels { return labels.FromStrings("region", "eu-west") }, + func() (int64, int64) { return 123, 456 }, + ) testutil.Ok(t, err) resp, err := proxy.Info(ctx, &storepb.InfoRequest{}) @@ -358,11 +388,14 @@ func TestPrometheusStore_Series_SplitSamplesIntoChunksWithMaxSizeOfUint16_e2e(t testutil.Ok(t, err) proxy, err := NewPrometheusStore(nil, nil, u, component.Sidecar, - func() labels.Labels { - return labels.FromStrings("region", "eu-west") - }, nil) + func() labels.Labels { return labels.FromStrings("region", "eu-west") }, + func() (int64, int64) { return 0, math.MaxInt64 }, + ) testutil.Ok(t, err) + // We build chunks only for SAMPLES method. Make sure we ask for SAMPLES only. + proxy.remoteReadAcceptableResponses = []prompb.ReadRequest_ResponseType{prompb.ReadRequest_SAMPLES} + return proxy }) } diff --git a/pkg/testutil/prometheus.go b/pkg/testutil/prometheus.go index cdcc0752320..b89aac77f20 100644 --- a/pkg/testutil/prometheus.go +++ b/pkg/testutil/prometheus.go @@ -27,7 +27,7 @@ import ( ) const ( - defaultPrometheusVersion = "v2.9.2" + defaultPrometheusVersion = "v2.13.0" defaultAlertmanagerVersion = "v0.15.2" defaultMinioVersion = "RELEASE.2018-10-06T00-15-16Z"