diff --git a/cmd/thanos/flags.go b/cmd/thanos/flags.go index db370ed97c..a655f6d1d7 100644 --- a/cmd/thanos/flags.go +++ b/cmd/thanos/flags.go @@ -145,6 +145,31 @@ func modelDuration(flags *kingpin.FlagClause) *model.Duration { return value } +type flagTime struct { + time time.Time +} + +func (ft *flagTime) Set(s string) error { + var err error + ft.time, err = time.Parse(time.RFC3339, s) + return err +} + +func (ft flagTime) String() string { + return ft.time.String() +} + +func (ft flagTime) Time() time.Time { + return ft.time +} + +func timeFlag(flags *kingpin.FlagClause) *flagTime { + var value = new(flagTime) + flags.SetValue(value) + + return value +} + type pathOrContent struct { fileFlagName string contentFlagName string diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index ddf9893061..8ce418e1c4 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -17,6 +17,7 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/pkg/timestamp" "google.golang.org/grpc" "gopkg.in/alecthomas/kingpin.v2" ) @@ -44,6 +45,12 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string blockSyncConcurrency := cmd.Flag("block-sync-concurrency", "Number of goroutines to use when syncing blocks from object storage."). Default("20").Int() + minTime := timeFlag(cmd.Flag("min-time", "Start of time range limit to serve"). + Default("0000-01-01T00:00:00Z")) + + maxTime := timeFlag(cmd.Flag("max-time", "End of time range limit to serve"). + Default("9999-12-31T23:59:59Z")) + m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, debugLogging bool) error { peer, err := newPeerFn(logger, reg, false, "", false) if err != nil { @@ -67,6 +74,8 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string debugLogging, *syncInterval, *blockSyncConcurrency, + timestamp.FromTime(minTime.Time()), + timestamp.FromTime(maxTime.Time()), ) } } @@ -91,6 +100,8 @@ func runStore( verbose bool, syncInterval time.Duration, blockSyncConcurrency int, + minTime int64, + maxTime int64, ) error { { confContentYaml, err := objStoreConfig.Content() @@ -119,6 +130,8 @@ func runStore( chunkPoolSizeBytes, verbose, blockSyncConcurrency, + minTime, + maxTime, ) if err != nil { return errors.Wrap(err, "create object storage store") diff --git a/docs/components/store.md b/docs/components/store.md index 8b9d5fc79c..0bb7a0c94d 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -115,5 +115,9 @@ Flags: --block-sync-concurrency=20 Number of goroutines to use when syncing blocks from object storage. + --min-time=0000-01-01T00:00:00Z + Start of time range limit to serve + --max-time=9999-12-31T23:59:59Z + End of time range limit to serve ``` diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 6c38ee81a7..187782a4fd 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -173,6 +173,11 @@ type BucketStore struct { // Number of goroutines to use when syncing blocks from object storage. blockSyncConcurrency int + // Start and end time to serve with this store process. The meta.json files + // loaded will be filtered by these ranges. + minTime int64 + maxTime int64 + partitioner partitioner } @@ -187,6 +192,8 @@ func NewBucketStore( maxChunkPoolBytes uint64, debugLogging bool, blockSyncConcurrency int, + minTime int64, + maxTime int64, ) (*BucketStore, error) { if logger == nil { logger = log.NewNopLogger() @@ -212,6 +219,8 @@ func NewBucketStore( blockSets: map[uint64]*bucketBlockSet{}, debugLogging: debugLogging, blockSyncConcurrency: blockSyncConcurrency, + minTime: minTime, + maxTime: maxTime, partitioner: gapBasedPartitioner{maxGapSize: maxGapSize}, } s.metrics = newBucketStoreMetrics(reg) @@ -359,11 +368,20 @@ func (s *BucketStore) addBlock(ctx context.Context, id ulid.ULID) (err error) { dir, s.indexCache, s.chunkPool, + s.minTime, + s.maxTime, s.partitioner, ) if err != nil { return errors.Wrap(err, "new bucket block") } + + // If newBucketBlock doesn't return a bucket (outside time range?) then skip + // the rest of the work. + if b == nil { + return nil + } + s.mtx.Lock() defer s.mtx.Unlock() @@ -1001,6 +1019,8 @@ func newBucketBlock( dir string, indexCache *indexCache, chunkPool *pool.BytesPool, + minTime int64, + maxTime int64, p partitioner, ) (b *bucketBlock, err error) { b = &bucketBlock{ @@ -1015,6 +1035,19 @@ func newBucketBlock( if err = b.loadMeta(ctx, id); err != nil { return nil, errors.Wrap(err, "load meta") } + + // We want to make sure a single stores owns a block. In order to do that, + // we check if the Mintime is contained within the time frame. We're more + // concerned that we have exactly one owning store per block than perfectly + // containing the blocks within the start and end times. If the MinTime is + // outside our time range, then clean up downloaded files and return early. + if b.meta.MinTime < minTime || b.meta.MinTime > maxTime { + if err = os.RemoveAll(dir); err != nil { + return nil, err + } + return nil, nil + } + if err = b.loadIndexCache(ctx); err != nil { return nil, errors.Wrap(err, "load index cache") } diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index 8133e9bc86..ecd6f87d7c 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -3,6 +3,7 @@ package store import ( "context" "io/ioutil" + "math" "os" "path/filepath" "sync" @@ -35,7 +36,7 @@ func (s *storeSuite) Close() { s.wg.Wait() } -func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool) *storeSuite { +func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, startTime time.Time, manyParts bool) *storeSuite { series := []labels.Labels{ labels.FromStrings("a", "1", "b", "1"), labels.FromStrings("a", "1", "b", "2"), @@ -48,8 +49,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m } extLset := labels.FromStrings("ext1", "value1") - start := time.Now() - now := start + now := startTime ctx, cancel := context.WithCancel(context.Background()) s := &storeSuite{cancel: cancel} @@ -87,7 +87,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m testutil.Ok(t, os.RemoveAll(dir2)) } - store, err := NewBucketStore(log.NewLogfmtLogger(os.Stderr), nil, bkt, dir, 100, 0, false, 20) + store, err := NewBucketStore(log.NewLogfmtLogger(os.Stderr), nil, bkt, dir, 100, 0, false, 20, math.MinInt64, math.MaxInt64) testutil.Ok(t, err) s.store = store @@ -334,7 +334,7 @@ func TestBucketStore_e2e(t *testing.T) { testutil.Ok(t, err) defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - s := prepareStoreWithTestBlocks(t, dir, bkt, false) + s := prepareStoreWithTestBlocks(t, dir, bkt, time.Now(), false) defer s.Close() testBucketStore_e2e(t, ctx, s) @@ -363,9 +363,35 @@ func TestBucketStore_ManyParts_e2e(t *testing.T) { testutil.Ok(t, err) defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - s := prepareStoreWithTestBlocks(t, dir, bkt, true) + s := prepareStoreWithTestBlocks(t, dir, bkt, time.Now(), true) defer s.Close() testBucketStore_e2e(t, ctx, s) }) } + +func TestBucketStore_timeRanges(t *testing.T) { + objtesting.ForeachStore(t, func(t testing.TB, bkt objstore.Bucket) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + dir, err := ioutil.TempDir("", "test_bucketstore_e2e") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() + + startTime := time.Now() + s := prepareStoreWithTestBlocks(t, dir, bkt, startTime, false) + s.Close() + + // Store is now populated, so create a new bucket store instance with a + // time limit and validate that it only returns the two blocks in the first time range. + minTimeLimit := timestamp.FromTime(startTime.Add(-time.Minute)) + maxTimeLimit := timestamp.FromTime(startTime.Add(time.Minute)) + store, err := NewBucketStore(log.NewLogfmtLogger(os.Stderr), nil, bkt, dir, 100, 0, false, 20, minTimeLimit, maxTimeLimit) + testutil.Ok(t, err) + + err = store.SyncBlocks(ctx) + testutil.Ok(t, err) + testutil.Equals(t, 2, store.numBlocks()) + }) +} diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index b0d43f23ce..9219e5b3b4 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -283,7 +283,7 @@ func TestBucketStore_Info(t *testing.T) { dir, err := ioutil.TempDir("", "prometheus-test") testutil.Ok(t, err) - bucketStore, err := NewBucketStore(nil, nil, nil, dir, 2e5, 2e5, false, 20) + bucketStore, err := NewBucketStore(nil, nil, nil, dir, 2e5, 2e5, false, 20, math.MaxInt64, math.MaxInt64) testutil.Ok(t, err) resp, err := bucketStore.Info(ctx, &storepb.InfoRequest{})