diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index ddf9893061..00ba48e65f 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -44,11 +44,19 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string blockSyncConcurrency := cmd.Flag("block-sync-concurrency", "Number of goroutines to use when syncing blocks from object storage."). Default("20").Int() + skipWindow := modelDuration(cmd.Flag("skip-window", "Time duration, which won't be reported to Thanos Query.")) + m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, debugLogging bool) error { peer, err := newPeerFn(logger, reg, false, "", false) if err != nil { return errors.Wrap(err, "new cluster peer") } + + var skipWindowDur *time.Duration + if skipWindow != nil { + dur := time.Duration(*skipWindow) + skipWindowDur = &dur + } return runStore(g, logger, reg, @@ -67,6 +75,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string debugLogging, *syncInterval, *blockSyncConcurrency, + skipWindowDur, ) } } @@ -91,6 +100,7 @@ func runStore( verbose bool, syncInterval time.Duration, blockSyncConcurrency int, + skipWindow *time.Duration, ) error { { confContentYaml, err := objStoreConfig.Content() @@ -119,6 +129,7 @@ func runStore( chunkPoolSizeBytes, verbose, blockSyncConcurrency, + skipWindow, ) if err != nil { return errors.Wrap(err, "create object storage store") diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 775210e3a1..34b2e35a21 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -174,6 +174,7 @@ type BucketStore struct { blockSyncConcurrency int partitioner partitioner + skipWindow *time.Duration } // NewBucketStore creates a new bucket backed store that implements the store API against @@ -187,6 +188,7 @@ func NewBucketStore( maxChunkPoolBytes uint64, debugLogging bool, blockSyncConcurrency int, + skipWindow *time.Duration, ) (*BucketStore, error) { if logger == nil { logger = log.NewNopLogger() @@ -429,6 +431,14 @@ func (s *BucketStore) TimeRange() (mint, maxt int64) { // Info implements the storepb.StoreServer interface. func (s *BucketStore) Info(context.Context, *storepb.InfoRequest) (*storepb.InfoResponse, error) { mint, maxt := s.TimeRange() + + if s.skipWindow != nil { + //TODO: is TimeRange really a Unix timestamp? + newt := time.Now().Add(-*s.skipWindow).Unix() + if maxt > newt { + maxt = newt + } + } // Store nodes hold global data and thus have no labels. return &storepb.InfoResponse{ StoreType: component.Store.ToProto(),