Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: add consistency delay to fetch blocks #2009

Merged
merged 1 commit into from
Feb 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 7 additions & 30 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/run"
"github.com/oklog/ulid"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -184,17 +183,11 @@ func runCompact(
Name: "thanos_compactor_iterations_total",
Help: "Total number of iterations that were executed successfully.",
})
consistencyDelayMetric := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "thanos_consistency_delay_seconds",
Help: "Configured consistency delay in seconds.",
}, func() float64 {
return consistencyDelay.Seconds()
})
partialUploadDeleteAttempts := prometheus.NewCounter(prometheus.CounterOpts{
Name: "thanos_compactor_aborted_partial_uploads_deletion_attempts_total",
Help: "Total number of started deletions of blocks that are assumed aborted and only partially uploaded.",
})
reg.MustRegister(halted, retried, iterations, consistencyDelayMetric, partialUploadDeleteAttempts)
reg.MustRegister(halted, retried, iterations, partialUploadDeleteAttempts)

downsampleMetrics := newDownsampleMetrics(reg)

Expand Down Expand Up @@ -247,15 +240,18 @@ func runCompact(
}
}()

metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg),
duplicateBlocksFilter := block.NewDeduplicateFilter()
prometheusRegisterer := extprom.WrapRegistererWithPrefix("thanos_", reg)
metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", prometheusRegisterer,
block.NewLabelShardedMetaFilter(relabelConfig).Filter,
(&consistencyDelayMetaFilter{logger: logger, consistencyDelay: consistencyDelay}).Filter,
block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, prometheusRegisterer).Filter,
khyatisoneji marked this conversation as resolved.
Show resolved Hide resolved
duplicateBlocksFilter.Filter,
)
if err != nil {
return errors.Wrap(err, "create meta fetcher")
}

sy, err := compact.NewSyncer(logger, reg, bkt, metaFetcher, blockSyncConcurrency, acceptMalformedIndex, false)
sy, err := compact.NewSyncer(logger, reg, bkt, metaFetcher, duplicateBlocksFilter, blockSyncConcurrency, acceptMalformedIndex, false)
if err != nil {
return errors.Wrap(err, "create syncer")
}
Expand Down Expand Up @@ -392,25 +388,6 @@ func runCompact(
return nil
}

type consistencyDelayMetaFilter struct {
logger log.Logger
consistencyDelay time.Duration
}

func (f *consistencyDelayMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced block.GaugeLabeled, _ bool) {
for id, meta := range metas {
if ulid.Now()-id.Time() < uint64(f.consistencyDelay/time.Millisecond) &&
meta.Thanos.Source != metadata.BucketRepairSource &&
meta.Thanos.Source != metadata.CompactorSource &&
meta.Thanos.Source != metadata.CompactorRepairSource {

level.Debug(f.logger).Log("msg", "block is too fresh for now", "block", id)
synced.WithLabelValues(block.TooFreshMeta).Inc()
delete(metas, id)
}
}
}

// genMissingIndexCacheFiles scans over all blocks, generates missing index cache files and uploads them to object storage.
func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, reg *prometheus.Registry, bkt objstore.Bucket, fetcher block.MetadataFetcher, dir string) error {
genIndex := prometheus.NewCounter(prometheus.CounterOpts{
Expand Down
9 changes: 8 additions & 1 deletion cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {
enableIndexHeader := cmd.Flag("experimental.enable-index-header", "If true, Store Gateway will recreate index-header instead of index-cache.json for each block. This will replace index-cache.json permanently once it will be out of experimental stage.").
Hidden().Default("false").Bool()

consistencyDelay := modelDuration(cmd.Flag("consistency-delay", "Minimum age of all blocks before they are being read.").
Default("30m"))

m[component.Store.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, debugLogging bool) error {
if minTime.PrometheusTimestamp() > maxTime.PrometheusTimestamp() {
return errors.Errorf("invalid argument: --min-time '%s' can't be greater than --max-time '%s'",
Expand Down Expand Up @@ -116,6 +119,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {
selectorRelabelConf,
*advertiseCompatibilityLabel,
*enableIndexHeader,
time.Duration(*consistencyDelay),
)
}
}
Expand Down Expand Up @@ -148,6 +152,7 @@ func runStore(
selectorRelabelConf *extflag.PathOrContent,
advertiseCompatibilityLabel bool,
enableIndexHeader bool,
consistencyDelay time.Duration,
) error {
grpcProbe := prober.NewGRPC()
httpProbe := prober.NewHTTP()
Expand Down Expand Up @@ -220,9 +225,11 @@ func runStore(
return errors.Wrap(err, "create index cache")
}

metaFetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg),
prometheusRegisterer := extprom.WrapRegistererWithPrefix("thanos_", reg)
metaFetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, dataDir, prometheusRegisterer,
block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime).Filter,
block.NewLabelShardedMetaFilter(relabelConfig).Filter,
block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, prometheusRegisterer).Filter,
block.NewDeduplicateFilter().Filter,
)
if err != nil {
Expand Down
15 changes: 9 additions & 6 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ In general about 1MB of local disk space is required per TSDB block stored in th

## Flags

[embedmd]:# (flags/store.txt $)
[embedmd]: # "flags/store.txt $"
khyatisoneji marked this conversation as resolved.
Show resolved Hide resolved

```$
usage: thanos store [<flags>]

Expand Down Expand Up @@ -137,6 +138,7 @@ Flags:
Prometheus relabel-config syntax. See format
details:
https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config
--consistency-delay=30m Minimum age of all blocks before they are being read.

```

Expand Down Expand Up @@ -179,7 +181,8 @@ The `in-memory` index cache is enabled by default and its max size can be config

Alternatively, the `in-memory` index cache can also by configured using `--index-cache.config-file` to reference to the configuration file or `--index-cache.config` to put yaml config directly:

[embedmd]:# (../flags/config_index_cache_in_memory.txt yaml)
[embedmd]: # "../flags/config_index_cache_in_memory.txt yaml"

```yaml
type: IN-MEMORY
config:
Expand All @@ -196,7 +199,8 @@ All the settings are **optional**:

The `memcached` index cache allows to use [Memcached](https://memcached.org) as cache backend. This cache type is configured using `--index-cache.config-file` to reference to the configuration file or `--index-cache.config` to put yaml config directly:

[embedmd]:# (../flags/config_index_cache_memcached.txt yaml)
[embedmd]: # "../flags/config_index_cache_memcached.txt yaml"

```yaml
type: MEMCACHED
config:
Expand Down Expand Up @@ -224,13 +228,12 @@ While the remaining settings are **optional**:
- `max_get_multi_batch_size`: maximum number of keys a single underlying operation should fetch. If more keys are specified, internally keys are splitted into multiple batches and fetched concurrently, honoring `max_get_multi_concurrency`. If set to `0`, the batch size is unlimited.
- `dns_provider_update_interval`: the DNS discovery update interval.


## Index Header

In order to query series inside blocks from object storage, Store Gateway has to know certain initial info about each block such as:

* symbols table to unintern string values
* postings offset for posting lookup
- symbols table to unintern string values
- postings offset for posting lookup

In order to achieve so, on startup for each block `index-header` is built from pieces of original block's index and stored on disk.
Such `index-header` file is then mmaped and used by Store Gateway.
Expand Down
78 changes: 69 additions & 9 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ func (f *LabelShardedMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, sync

// DeduplicateFilter is a MetaFetcher filter that filters out older blocks that have exactly the same data.
type DeduplicateFilter struct {
DuplicateIDs []ulid.ULID
duplicateIDs []ulid.ULID
}

// NewDeduplicateFilter creates DeduplicateFilter.
Expand All @@ -428,16 +428,30 @@ func NewDeduplicateFilter() *DeduplicateFilter {
// Filter filters out duplicate blocks that can be formed
// from two or more overlapping blocks that fully submatches the source blocks of the older blocks.
func (f *DeduplicateFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, _ bool) {
root := NewNode(&metadata.Meta{
BlockMeta: tsdb.BlockMeta{
ULID: ulid.MustNew(uint64(0), nil),
},
})
var wg sync.WaitGroup

metaSlice := []*metadata.Meta{}
metasByResolution := make(map[int64][]*metadata.Meta)
for _, meta := range metas {
metaSlice = append(metaSlice, meta)
res := meta.Thanos.Downsample.Resolution
metasByResolution[res] = append(metasByResolution[res], meta)
}

for res := range metasByResolution {
wg.Add(1)
go func(res int64) {
defer wg.Done()
f.filterForResolution(NewNode(&metadata.Meta{
khyatisoneji marked this conversation as resolved.
Show resolved Hide resolved
BlockMeta: tsdb.BlockMeta{
ULID: ulid.MustNew(uint64(0), nil),
},
}), metasByResolution[res], metas, res, synced)
}(res)
}

wg.Wait()
}

func (f *DeduplicateFilter) filterForResolution(root *Node, metaSlice []*metadata.Meta, metas map[ulid.ULID]*metadata.Meta, res int64, synced GaugeLabeled) {
sort.Slice(metaSlice, func(i, j int) bool {
ilen := len(metaSlice[i].Compaction.Sources)
jlen := len(metaSlice[j].Compaction.Sources)
Expand All @@ -456,13 +470,19 @@ func (f *DeduplicateFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced Ga
duplicateULIDs := getNonRootIDs(root)
for _, id := range duplicateULIDs {
if metas[id] != nil {
f.DuplicateIDs = append(f.DuplicateIDs, id)
f.duplicateIDs = append(f.duplicateIDs, id)
}
synced.WithLabelValues(duplicateMeta).Inc()
delete(metas, id)
}
}

// DuplicateIDs returns slice of block ids
// that are filtered out by DeduplicateFilter.
func (f *DeduplicateFilter) DuplicateIDs() []ulid.ULID {
return f.duplicateIDs
}

func addNodeBySources(root *Node, add *Node) bool {
var rootNode *Node
for _, node := range root.Children {
Expand Down Expand Up @@ -506,3 +526,43 @@ func contains(s1 []ulid.ULID, s2 []ulid.ULID) bool {
}
return true
}

// ConsistencyDelayMetaFilter is a MetaFetcher filter that filters out blocks that are created before a specified consistency delay.
type ConsistencyDelayMetaFilter struct {
logger log.Logger
consistencyDelay time.Duration
}

// NewConsistencyDelayMetaFilter creates ConsistencyDelayMetaFilter.
func NewConsistencyDelayMetaFilter(logger log.Logger, consistencyDelay time.Duration, reg prometheus.Registerer) *ConsistencyDelayMetaFilter {
consistencyDelayMetric := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "consistency_delay_seconds",
Help: "Configured consistency delay in seconds.",
}, func() float64 {
return consistencyDelay.Seconds()
})
reg.MustRegister(consistencyDelayMetric)

return &ConsistencyDelayMetaFilter{
logger: logger,
consistencyDelay: consistencyDelay,
}
}

// Filter filters out blocks that filters blocks that have are created before a specified consistency delay.
func (f *ConsistencyDelayMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, _ bool) {
for id, meta := range metas {
// TODO(khyatisoneji): Remove the checks about Thanos Source
khyatisoneji marked this conversation as resolved.
Show resolved Hide resolved
// by implementing delete delay to fetch metas.
// TODO(bwplotka): Check consistency delay based on file upload / modification time instead of ULID.
if ulid.Now()-id.Time() < uint64(f.consistencyDelay/time.Millisecond) &&
khyatisoneji marked this conversation as resolved.
Show resolved Hide resolved
meta.Thanos.Source != metadata.BucketRepairSource &&
khyatisoneji marked this conversation as resolved.
Show resolved Hide resolved
meta.Thanos.Source != metadata.CompactorSource &&
meta.Thanos.Source != metadata.CompactorRepairSource {

level.Debug(f.logger).Log("msg", "block is too fresh for now", "block", id)
synced.WithLabelValues(TooFreshMeta).Inc()
delete(metas, id)
}
}
}
Loading