Skip to content

Commit

Permalink
store: Add --skip-window functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
povilasv committed Mar 15, 2019
1 parent e568813 commit 417a129
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 0 deletions.
11 changes: 11 additions & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,19 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
blockSyncConcurrency := cmd.Flag("block-sync-concurrency", "Number of goroutines to use when syncing blocks from object storage.").
Default("20").Int()

skipWindow := modelDuration(cmd.Flag("skip-window", "Time duration, which won't be reported to Thanos Query."))

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, debugLogging bool) error {
peer, err := newPeerFn(logger, reg, false, "", false)
if err != nil {
return errors.Wrap(err, "new cluster peer")
}

var skipWindowDur *time.Duration
if skipWindow != nil {
dur := time.Duration(*skipWindow)
skipWindowDur = &dur
}
return runStore(g,
logger,
reg,
Expand All @@ -67,6 +75,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
debugLogging,
*syncInterval,
*blockSyncConcurrency,
skipWindowDur,
)
}
}
Expand All @@ -91,6 +100,7 @@ func runStore(
verbose bool,
syncInterval time.Duration,
blockSyncConcurrency int,
skipWindow *time.Duration,
) error {
{
confContentYaml, err := objStoreConfig.Content()
Expand Down Expand Up @@ -119,6 +129,7 @@ func runStore(
chunkPoolSizeBytes,
verbose,
blockSyncConcurrency,
skipWindow,
)
if err != nil {
return errors.Wrap(err, "create object storage store")
Expand Down
10 changes: 10 additions & 0 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ type BucketStore struct {
blockSyncConcurrency int

partitioner partitioner
skipWindow *time.Duration
}

// NewBucketStore creates a new bucket backed store that implements the store API against
Expand All @@ -187,6 +188,7 @@ func NewBucketStore(
maxChunkPoolBytes uint64,
debugLogging bool,
blockSyncConcurrency int,
skipWindow *time.Duration,
) (*BucketStore, error) {
if logger == nil {
logger = log.NewNopLogger()
Expand Down Expand Up @@ -429,6 +431,14 @@ func (s *BucketStore) TimeRange() (mint, maxt int64) {
// Info implements the storepb.StoreServer interface.
func (s *BucketStore) Info(context.Context, *storepb.InfoRequest) (*storepb.InfoResponse, error) {
mint, maxt := s.TimeRange()

if s.skipWindow != nil {
//TODO: is TimeRange really a Unix timestamp?
newt := time.Now().Add(-*s.skipWindow).Unix()
if maxt > newt {
maxt = newt
}
}
// Store nodes hold global data and thus have no labels.
return &storepb.InfoResponse{
StoreType: component.Store.ToProto(),
Expand Down

0 comments on commit 417a129

Please sign in to comment.