Skip to content

Commit

Permalink
Added min-time limitaiton for sidecar. This allows optionally storing…
Browse files Browse the repository at this point in the history
… longer retention time on Prometheus.

Signed-off-by: Bartek Plotka <[email protected]>
  • Loading branch information
bwplotka committed Oct 10, 2019
1 parent aaaa95f commit 782f0ca
Show file tree
Hide file tree
Showing 9 changed files with 108 additions and 43 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ jobs:
test:
docker:
# Build by Thanos make docker-ci
- image: quay.io/thanos/thanos-ci:v0.1.0
- image: quay.io/thanos/thanos-ci:v0.2.0
working_directory: /go/src/github.com/thanos-io/thanos
environment:
GO111MODULE: 'on'
Expand Down
9 changes: 5 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ FILES_TO_FMT ?= $(shell find . -path ./vendor -prune -o -name '*.go' -print

DOCKER_IMAGE_REPO ?= quay.io/thanos/thanos
DOCKER_IMAGE_TAG ?= $(subst /,-,$(shell git rev-parse --abbrev-ref HEAD))-$(shell date +%Y-%m-%d)-$(shell git rev-parse --short HEAD)
DOCKER_CI_TAG ?= test

TMP_GOPATH ?= /tmp/thanos-go
GOBIN ?= $(firstword $(subst :, ,${GOPATH}))/bin
Expand Down Expand Up @@ -45,8 +46,8 @@ ME ?= $(shell whoami)
# Referenced by github.com/thanos-io/thanos/blob/master/docs/getting_started.md#prometheus

# Limited prom version, because testing was not possible. This should fix it: https://github.com/thanos-io/thanos/issues/758
PROM_VERSIONS ?= v2.4.3 v2.5.0 v2.8.1 v2.9.2
PROMS ?= $(GOBIN)/prometheus-v2.4.3 $(GOBIN)/prometheus-v2.5.0 $(GOBIN)/prometheus-v2.8.1 $(GOBIN)/prometheus-v2.9.2
PROM_VERSIONS ?= v2.4.3 v2.5.0 v2.8.1 v2.9.2 v2.13.0
PROMS ?= $(GOBIN)/prometheus-v2.4.3 $(GOBIN)/prometheus-v2.5.0 $(GOBIN)/prometheus-v2.8.1 $(GOBIN)/prometheus-v2.9.2 $(GOBIN)/prometheus-v2.13.0

ALERTMANAGER_VERSION ?= v0.15.2
ALERTMANAGER ?= $(GOBIN)/alertmanager-$(ALERTMANAGER_VERSION)
Expand Down Expand Up @@ -234,8 +235,8 @@ docker-ci: install-deps
@cp -r $(GOBIN)/* ./tmp/bin
@docker build -t thanos-ci -f Dockerfile.thanos-ci .
@echo ">> pushing thanos-ci image"
@docker tag "thanos-ci" "quay.io/thanos/thanos-ci:v0.1.0"
@docker push "quay.io/thanos/thanos-ci:v0.1.0"
@docker tag "thanos-ci" "quay.io/thanos/thanos-ci:$(DOCKER_CI_TAG)"
@docker push "quay.io/thanos/thanos-ci:$(DOCKER_CI_TAG)"

# tooling deps. TODO(bwplotka): Pin them all to certain version!
.PHONY: check-git
Expand Down
21 changes: 18 additions & 3 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/prometheus/prometheus/tsdb/labels"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/component"
thanosmodel "github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/promclient"
Expand Down Expand Up @@ -56,6 +57,9 @@ func registerSidecar(m map[string]setupFunc, app *kingpin.Application) {

uploadCompacted := cmd.Flag("shipper.upload-compacted", "[Experimental] If true sidecar will try to upload compacted blocks as well. Useful for migration purposes. Works only if compaction is disabled on Prometheus.").Default("false").Hidden().Bool()

minTime := thanosmodel.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to serve. Thanos sidecar will serve only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("0000-01-01T00:00:00Z"))

m[component.Sidecar.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
rl := reloader.New(
log.With(logger, "component", "reloader"),
Expand All @@ -64,6 +68,7 @@ func registerSidecar(m map[string]setupFunc, app *kingpin.Application) {
*reloaderCfgOutputFile,
*reloaderRuleDirs,
)

return runSidecar(
g,
logger,
Expand All @@ -80,6 +85,7 @@ func registerSidecar(m map[string]setupFunc, app *kingpin.Application) {
rl,
*uploadCompacted,
component.Sidecar,
*minTime,
)
}
}
Expand All @@ -100,14 +106,17 @@ func runSidecar(
reloader *reloader.Reloader,
uploadCompacted bool,
comp component.Component,
limitMinTime thanosmodel.TimeOrDurationValue,
) error {
var m = &promMetadata{
promURL: promURL,

// Start out with the full time range. The shipper will constrain it later.
// TODO(fabxc): minimum timestamp is never adjusted if shipping is disabled.
mint: 0,
mint: limitMinTime.PrometheusTimestamp(),
maxt: math.MaxInt64,

limitMinTime: limitMinTime,
}

confContentYaml, err := objStoreConfig.Content()
Expand Down Expand Up @@ -285,9 +294,9 @@ func runSidecar(
minTime, _, err := s.Timestamps()
if err != nil {
level.Warn(logger).Log("msg", "reading timestamps failed", "err", err)
} else {
m.UpdateTimestamps(minTime, math.MaxInt64)
return nil
}
m.UpdateTimestamps(minTime, math.MaxInt64)
return nil
})
}, func(error) {
Expand Down Expand Up @@ -341,6 +350,8 @@ type promMetadata struct {
mint int64
maxt int64
labels labels.Labels

limitMinTime thanosmodel.TimeOrDurationValue
}

func (s *promMetadata) UpdateLabels(ctx context.Context, logger log.Logger) error {
Expand All @@ -360,6 +371,10 @@ func (s *promMetadata) UpdateTimestamps(mint int64, maxt int64) {
s.mtx.Lock()
defer s.mtx.Unlock()

if mint < s.limitMinTime.PrometheusTimestamp() {
mint = s.limitMinTime.PrometheusTimestamp()
}

s.mint = mint
s.maxt = maxt
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {
blockSyncConcurrency := cmd.Flag("block-sync-concurrency", "Number of goroutines to use when syncing blocks from object storage.").
Default("20").Int()

minTime := model.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to serve. Thanos Store serves only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
minTime := model.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to serve. Thanos Store will serve only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("0000-01-01T00:00:00Z"))

maxTime := model.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to serve. Thanos Store serves only blocks, which happened eariler than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
maxTime := model.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to serve. Thanos Store will serve only blocks, which happened eariler than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("9999-12-31T23:59:59Z"))

m[component.Store.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, debugLogging bool) error {
Expand Down
7 changes: 7 additions & 0 deletions docs/components/sidecar.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,5 +125,12 @@ Flags:
contains object store configuration. See format
details:
https://thanos.io/storage.md/#configuration
--min-time=0000-01-01T00:00:00Z
Start of time range limit to serve. Thanos
sidecar will serve only metrics, which happened
later than this value. Option can be a constant
time in RFC3339 format or time duration
relative to current time, such as -1d or 2h45m.
Valid duration units are ms, s, m, h, d, w, y.

```
16 changes: 8 additions & 8 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,16 @@ Flags:
from object storage.
--min-time=0000-01-01T00:00:00Z
Start of time range limit to serve. Thanos
Store serves only metrics, which happened later
than this value. Option can be a constant time
in RFC3339 format or time duration relative to
current time, such as -1d or 2h45m. Valid
duration units are ms, s, m, h, d, w, y.
Store will serve only metrics, which happened
later than this value. Option can be a constant
time in RFC3339 format or time duration
relative to current time, such as -1d or 2h45m.
Valid duration units are ms, s, m, h, d, w, y.
--max-time=9999-12-31T23:59:59Z
End of time range limit to serve. Thanos Store
serves only blocks, which happened eariler than
this value. Option can be a constant time in
RFC3339 format or time duration relative to
will serve only blocks, which happened eariler
than this value. Option can be a constant time
in RFC3339 format or time duration relative to
current time, such as -1d or 2h45m. Valid
duration units are ms, s, m, h, d, w, y.

Expand Down
23 changes: 16 additions & 7 deletions pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type PrometheusStore struct {
component component.StoreAPI
externalLabels func() labels.Labels
timestamps func() (mint int64, maxt int64)

remoteReadAcceptableResponses []prompb.ReadRequest_ResponseType
}

// NewPrometheusStore returns a new PrometheusStore that uses the given HTTP client
Expand All @@ -72,12 +74,13 @@ func NewPrometheusStore(
}
}
p := &PrometheusStore{
logger: logger,
base: baseURL,
client: client,
component: component,
externalLabels: externalLabels,
timestamps: timestamps,
logger: logger,
base: baseURL,
client: client,
component: component,
externalLabels: externalLabels,
timestamps: timestamps,
remoteReadAcceptableResponses: []prompb.ReadRequest_ResponseType{prompb.ReadRequest_STREAMED_XOR_CHUNKS, prompb.ReadRequest_SAMPLES},
}
return p, nil
}
Expand Down Expand Up @@ -143,6 +146,12 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie
return status.Error(codes.InvalidArgument, errors.New("no matchers specified (excluding external labels)").Error())
}

// Don't ask for more than available time. This includes potential `minTime` flag limit.
availableMinTime, _ := p.timestamps()
if r.MinTime < availableMinTime {
r.MinTime = availableMinTime
}

q := &prompb.Query{StartTimestampMs: r.MinTime, EndTimestampMs: r.MaxTime}

for _, m := range newMatchers {
Expand Down Expand Up @@ -375,7 +384,7 @@ func (p *PrometheusStore) chunkSamples(series *prompb.TimeSeries, maxSamplesPerC
func (p *PrometheusStore) startPromSeries(ctx context.Context, q *prompb.Query) (*http.Response, error) {
reqb, err := proto.Marshal(&prompb.ReadRequest{
Queries: []*prompb.Query{q},
AcceptedResponseTypes: []prompb.ReadRequest_ResponseType{prompb.ReadRequest_STREAMED_XOR_CHUNKS},
AcceptedResponseTypes: p.remoteReadAcceptableResponses,
})
if err != nil {
return nil, errors.Wrap(err, "marshal read request")
Expand Down
67 changes: 50 additions & 17 deletions pkg/store/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/fortytw2/leaktest"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/labels"
Expand Down Expand Up @@ -55,15 +56,15 @@ func testPrometheusStoreSeriesE2e(t *testing.T, prefix string) {
u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr()))
testutil.Ok(t, err)

limitMinT := int64(0)
proxy, err := NewPrometheusStore(nil, nil, u, component.Sidecar,
func() labels.Labels {
return labels.FromStrings("region", "eu-west")
}, nil)
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return limitMinT, -1 }) // Maxt does not matter.
testutil.Ok(t, err)

// Query all three samples except for the first one. Since we round up queried data
// to seconds, we can test whether the extra sample gets stripped properly.
{
// Query all three samples except for the first one. Since we round up queried data
// to seconds, we can test whether the extra sample gets stripped properly.
srv := newStoreSeriesServer(ctx)
testutil.Ok(t, proxy.Series(&storepb.SeriesRequest{
MinTime: baseT + 101,
Expand Down Expand Up @@ -92,6 +93,38 @@ func testPrometheusStoreSeriesE2e(t *testing.T, prefix string) {
testutil.Equals(t, []sample{{baseT + 200, 2}, {baseT + 300, 3}}, samples)

}
// Query all samples, but limit mint time to exclude the first one.
{
limitMinT = baseT + 101
srv := newStoreSeriesServer(ctx)
testutil.Ok(t, proxy.Series(&storepb.SeriesRequest{
MinTime: 0,
MaxTime: baseT + 300,
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "a", Value: "b"},
},
}, srv))
// Revert for next cases.
limitMinT = 0

testutil.Equals(t, 1, len(srv.SeriesSet))

testutil.Equals(t, []storepb.Label{
{Name: "a", Value: "b"},
{Name: "region", Value: "eu-west"},
}, srv.SeriesSet[0].Labels)

testutil.Equals(t, 1, len(srv.SeriesSet[0].Chunks))

c := srv.SeriesSet[0].Chunks[0]
testutil.Equals(t, storepb.Chunk_XOR, c.Raw.Type)

chk, err := chunkenc.FromData(chunkenc.EncXOR, c.Raw.Data)
testutil.Ok(t, err)

samples := expandChunk(chk.Iterator(nil))
testutil.Equals(t, []sample{{baseT + 200, 2}, {baseT + 300, 3}}, samples)
}
// Querying by external labels only.
{
srv := newStoreSeriesServer(ctx)
Expand Down Expand Up @@ -230,9 +263,9 @@ func TestPrometheusStore_Series_MatchExternalLabel_e2e(t *testing.T) {
testutil.Ok(t, err)

proxy, err := NewPrometheusStore(nil, nil, u, component.Sidecar,
func() labels.Labels {
return labels.FromStrings("region", "eu-west")
}, nil)
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return 0, math.MaxInt64 },
)
testutil.Ok(t, err)
srv := newStoreSeriesServer(ctx)

Expand Down Expand Up @@ -276,12 +309,9 @@ func TestPrometheusStore_Info(t *testing.T) {
defer cancel()

proxy, err := NewPrometheusStore(nil, nil, nil, component.Sidecar,
func() labels.Labels {
return labels.FromStrings("region", "eu-west")
},
func() (int64, int64) {
return 123, 456
})
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return 123, 456 },
)
testutil.Ok(t, err)

resp, err := proxy.Info(ctx, &storepb.InfoRequest{})
Expand Down Expand Up @@ -358,11 +388,14 @@ func TestPrometheusStore_Series_SplitSamplesIntoChunksWithMaxSizeOfUint16_e2e(t
testutil.Ok(t, err)

proxy, err := NewPrometheusStore(nil, nil, u, component.Sidecar,
func() labels.Labels {
return labels.FromStrings("region", "eu-west")
}, nil)
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return 0, math.MaxInt64 },
)
testutil.Ok(t, err)

// We build chunks only for SAMPLES method. Make sure we ask for SAMPLES only.
proxy.remoteReadAcceptableResponses = []prompb.ReadRequest_ResponseType{prompb.ReadRequest_SAMPLES}

return proxy
})
}
2 changes: 1 addition & 1 deletion pkg/testutil/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

const (
defaultPrometheusVersion = "v2.9.2"
defaultPrometheusVersion = "v2.13.0"
defaultAlertmanagerVersion = "v0.15.2"
defaultMinioVersion = "RELEASE.2018-10-06T00-15-16Z"

Expand Down

0 comments on commit 782f0ca

Please sign in to comment.