From 51d0baadd4d093e5c4a72e29df84e50cc944a84d Mon Sep 17 00:00:00 2001 From: khyatisoneji Date: Wed, 29 Jan 2020 22:19:14 +0530 Subject: [PATCH] store: add consistency delay to fetch blocks Signed-off-by: khyatisoneji --- cmd/thanos/compact.go | 37 +--- cmd/thanos/store.go | 9 +- docs/components/store.md | 15 +- pkg/block/fetcher.go | 78 +++++++- pkg/block/fetcher_test.go | 296 ++++++++++++++++++++++++----- pkg/compact/compact.go | 108 ++--------- pkg/compact/compact_e2e_test.go | 72 ++----- pkg/testutil/e2eutil/prometheus.go | 128 +++++++++++++ test/e2e/store_gateway_test.go | 7 +- 9 files changed, 500 insertions(+), 250 deletions(-) diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 4f4b9a578f..b0ea2f346c 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -16,7 +16,6 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/oklog/run" - "github.com/oklog/ulid" "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -184,17 +183,11 @@ func runCompact( Name: "thanos_compactor_iterations_total", Help: "Total number of iterations that were executed successfully.", }) - consistencyDelayMetric := prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: "thanos_consistency_delay_seconds", - Help: "Configured consistency delay in seconds.", - }, func() float64 { - return consistencyDelay.Seconds() - }) partialUploadDeleteAttempts := prometheus.NewCounter(prometheus.CounterOpts{ Name: "thanos_compactor_aborted_partial_uploads_deletion_attempts_total", Help: "Total number of started deletions of blocks that are assumed aborted and only partially uploaded.", }) - reg.MustRegister(halted, retried, iterations, consistencyDelayMetric, partialUploadDeleteAttempts) + reg.MustRegister(halted, retried, iterations, partialUploadDeleteAttempts) downsampleMetrics := newDownsampleMetrics(reg) @@ -247,15 +240,18 @@ func runCompact( } }() - metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg), + duplicateBlocksFilter := block.NewDeduplicateFilter() + prometheusRegisterer := extprom.WrapRegistererWithPrefix("thanos_", reg) + metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", prometheusRegisterer, block.NewLabelShardedMetaFilter(relabelConfig).Filter, - (&consistencyDelayMetaFilter{logger: logger, consistencyDelay: consistencyDelay}).Filter, + block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, prometheusRegisterer).Filter, + duplicateBlocksFilter.Filter, ) if err != nil { return errors.Wrap(err, "create meta fetcher") } - sy, err := compact.NewSyncer(logger, reg, bkt, metaFetcher, blockSyncConcurrency, acceptMalformedIndex, false) + sy, err := compact.NewSyncer(logger, reg, bkt, metaFetcher, duplicateBlocksFilter, blockSyncConcurrency, acceptMalformedIndex, false) if err != nil { return errors.Wrap(err, "create syncer") } @@ -392,25 +388,6 @@ func runCompact( return nil } -type consistencyDelayMetaFilter struct { - logger log.Logger - consistencyDelay time.Duration -} - -func (f *consistencyDelayMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced block.GaugeLabeled, _ bool) { - for id, meta := range metas { - if ulid.Now()-id.Time() < uint64(f.consistencyDelay/time.Millisecond) && - meta.Thanos.Source != metadata.BucketRepairSource && - meta.Thanos.Source != metadata.CompactorSource && - meta.Thanos.Source != metadata.CompactorRepairSource { - - level.Debug(f.logger).Log("msg", "block is too fresh for now", "block", id) - synced.WithLabelValues(block.TooFreshMeta).Inc() - delete(metas, id) - } - } -} - // genMissingIndexCacheFiles scans over all blocks, generates missing index cache files and uploads them to object storage. func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, reg *prometheus.Registry, bkt objstore.Bucket, fetcher block.MetadataFetcher, dir string) error { genIndex := prometheus.NewCounter(prometheus.CounterOpts{ diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index e85f80e67d..4a25391484 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -81,6 +81,9 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) { enableIndexHeader := cmd.Flag("experimental.enable-index-header", "If true, Store Gateway will recreate index-header instead of index-cache.json for each block. This will replace index-cache.json permanently once it will be out of experimental stage."). Hidden().Default("false").Bool() + consistencyDelay := modelDuration(cmd.Flag("consistency-delay", "Minimum age of all blocks before they are being read."). + Default("30m")) + m[component.Store.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, debugLogging bool) error { if minTime.PrometheusTimestamp() > maxTime.PrometheusTimestamp() { return errors.Errorf("invalid argument: --min-time '%s' can't be greater than --max-time '%s'", @@ -116,6 +119,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) { selectorRelabelConf, *advertiseCompatibilityLabel, *enableIndexHeader, + time.Duration(*consistencyDelay), ) } } @@ -148,6 +152,7 @@ func runStore( selectorRelabelConf *extflag.PathOrContent, advertiseCompatibilityLabel bool, enableIndexHeader bool, + consistencyDelay time.Duration, ) error { grpcProbe := prober.NewGRPC() httpProbe := prober.NewHTTP() @@ -220,9 +225,11 @@ func runStore( return errors.Wrap(err, "create index cache") } - metaFetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg), + prometheusRegisterer := extprom.WrapRegistererWithPrefix("thanos_", reg) + metaFetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, dataDir, prometheusRegisterer, block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime).Filter, block.NewLabelShardedMetaFilter(relabelConfig).Filter, + block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, prometheusRegisterer).Filter, block.NewDeduplicateFilter().Filter, ) if err != nil { diff --git a/docs/components/store.md b/docs/components/store.md index 3bda0a7351..f1e900212e 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -27,7 +27,8 @@ In general about 1MB of local disk space is required per TSDB block stored in th ## Flags -[embedmd]:# (flags/store.txt $) +[embedmd]: # "flags/store.txt $" + ```$ usage: thanos store [] @@ -137,6 +138,7 @@ Flags: Prometheus relabel-config syntax. See format details: https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config + --consistency-delay=30m Minimum age of all blocks before they are being read. ``` @@ -179,7 +181,8 @@ The `in-memory` index cache is enabled by default and its max size can be config Alternatively, the `in-memory` index cache can also by configured using `--index-cache.config-file` to reference to the configuration file or `--index-cache.config` to put yaml config directly: -[embedmd]:# (../flags/config_index_cache_in_memory.txt yaml) +[embedmd]: # "../flags/config_index_cache_in_memory.txt yaml" + ```yaml type: IN-MEMORY config: @@ -196,7 +199,8 @@ All the settings are **optional**: The `memcached` index cache allows to use [Memcached](https://memcached.org) as cache backend. This cache type is configured using `--index-cache.config-file` to reference to the configuration file or `--index-cache.config` to put yaml config directly: -[embedmd]:# (../flags/config_index_cache_memcached.txt yaml) +[embedmd]: # "../flags/config_index_cache_memcached.txt yaml" + ```yaml type: MEMCACHED config: @@ -224,13 +228,12 @@ While the remaining settings are **optional**: - `max_get_multi_batch_size`: maximum number of keys a single underlying operation should fetch. If more keys are specified, internally keys are splitted into multiple batches and fetched concurrently, honoring `max_get_multi_concurrency`. If set to `0`, the batch size is unlimited. - `dns_provider_update_interval`: the DNS discovery update interval. - ## Index Header In order to query series inside blocks from object storage, Store Gateway has to know certain initial info about each block such as: -* symbols table to unintern string values -* postings offset for posting lookup +- symbols table to unintern string values +- postings offset for posting lookup In order to achieve so, on startup for each block `index-header` is built from pieces of original block's index and stored on disk. Such `index-header` file is then mmaped and used by Store Gateway. diff --git a/pkg/block/fetcher.go b/pkg/block/fetcher.go index b6411e5431..d37dca38d0 100644 --- a/pkg/block/fetcher.go +++ b/pkg/block/fetcher.go @@ -417,7 +417,7 @@ func (f *LabelShardedMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, sync // DeduplicateFilter is a MetaFetcher filter that filters out older blocks that have exactly the same data. type DeduplicateFilter struct { - DuplicateIDs []ulid.ULID + duplicateIDs []ulid.ULID } // NewDeduplicateFilter creates DeduplicateFilter. @@ -428,16 +428,30 @@ func NewDeduplicateFilter() *DeduplicateFilter { // Filter filters out duplicate blocks that can be formed // from two or more overlapping blocks that fully submatches the source blocks of the older blocks. func (f *DeduplicateFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, _ bool) { - root := NewNode(&metadata.Meta{ - BlockMeta: tsdb.BlockMeta{ - ULID: ulid.MustNew(uint64(0), nil), - }, - }) + var wg sync.WaitGroup - metaSlice := []*metadata.Meta{} + metasByResolution := make(map[int64][]*metadata.Meta) for _, meta := range metas { - metaSlice = append(metaSlice, meta) + res := meta.Thanos.Downsample.Resolution + metasByResolution[res] = append(metasByResolution[res], meta) + } + + for res := range metasByResolution { + wg.Add(1) + go func(res int64) { + defer wg.Done() + f.filterForResolution(NewNode(&metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ + ULID: ulid.MustNew(uint64(0), nil), + }, + }), metasByResolution[res], metas, res, synced) + }(res) } + + wg.Wait() +} + +func (f *DeduplicateFilter) filterForResolution(root *Node, metaSlice []*metadata.Meta, metas map[ulid.ULID]*metadata.Meta, res int64, synced GaugeLabeled) { sort.Slice(metaSlice, func(i, j int) bool { ilen := len(metaSlice[i].Compaction.Sources) jlen := len(metaSlice[j].Compaction.Sources) @@ -456,13 +470,19 @@ func (f *DeduplicateFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced Ga duplicateULIDs := getNonRootIDs(root) for _, id := range duplicateULIDs { if metas[id] != nil { - f.DuplicateIDs = append(f.DuplicateIDs, id) + f.duplicateIDs = append(f.duplicateIDs, id) } synced.WithLabelValues(duplicateMeta).Inc() delete(metas, id) } } +// DuplicateIDs returns slice of block ids +// that are filtered out by DeduplicateFilter. +func (f *DeduplicateFilter) DuplicateIDs() []ulid.ULID { + return f.duplicateIDs +} + func addNodeBySources(root *Node, add *Node) bool { var rootNode *Node for _, node := range root.Children { @@ -506,3 +526,43 @@ func contains(s1 []ulid.ULID, s2 []ulid.ULID) bool { } return true } + +// ConsistencyDelayMetaFilter is a MetaFetcher filter that filters out blocks that are created before a specified consistency delay. +type ConsistencyDelayMetaFilter struct { + logger log.Logger + consistencyDelay time.Duration +} + +// NewConsistencyDelayMetaFilter creates ConsistencyDelayMetaFilter. +func NewConsistencyDelayMetaFilter(logger log.Logger, consistencyDelay time.Duration, reg prometheus.Registerer) *ConsistencyDelayMetaFilter { + consistencyDelayMetric := prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "consistency_delay_seconds", + Help: "Configured consistency delay in seconds.", + }, func() float64 { + return consistencyDelay.Seconds() + }) + reg.MustRegister(consistencyDelayMetric) + + return &ConsistencyDelayMetaFilter{ + logger: logger, + consistencyDelay: consistencyDelay, + } +} + +// Filter filters out blocks that filters blocks that have are created before a specified consistency delay. +func (f *ConsistencyDelayMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, _ bool) { + for id, meta := range metas { + // TODO(khyatisoneji): Remove the checks about Thanos Source + // by implementing delete delay to fetch metas. + // TODO(bwplotka): Check consistency delay based on file upload / modification time instead of ULID. + if ulid.Now()-id.Time() < uint64(f.consistencyDelay/time.Millisecond) && + meta.Thanos.Source != metadata.BucketRepairSource && + meta.Thanos.Source != metadata.CompactorSource && + meta.Thanos.Source != metadata.CompactorRepairSource { + + level.Debug(f.logger).Log("msg", "block is too fresh for now", "block", id) + synced.WithLabelValues(TooFreshMeta).Inc() + delete(metas, id) + } + } +} diff --git a/pkg/block/fetcher_test.go b/pkg/block/fetcher_test.go index 1f0cfa341d..9435d9d73c 100644 --- a/pkg/block/fetcher_test.go +++ b/pkg/block/fetcher_test.go @@ -396,18 +396,32 @@ func TestTimePartitionMetaFilter_Filter(t *testing.T) { } +type sourcesAndResolution struct { + sources []ulid.ULID + resolution int64 +} + func TestDeduplicateFilter_Filter(t *testing.T) { for _, tcase := range []struct { name string - input map[ulid.ULID][]ulid.ULID + input map[ulid.ULID]*sourcesAndResolution expected []ulid.ULID }{ { name: "3 non compacted blocks in bucket", - input: map[ulid.ULID][]ulid.ULID{ - ULID(1): []ulid.ULID{ULID(1)}, - ULID(2): []ulid.ULID{ULID(2)}, - ULID(3): []ulid.ULID{ULID(3)}, + input: map[ulid.ULID]*sourcesAndResolution{ + ULID(1): &sourcesAndResolution{ + sources: []ulid.ULID{ULID(1)}, + resolution: 0, + }, + ULID(2): &sourcesAndResolution{ + sources: []ulid.ULID{ULID(2)}, + resolution: 0, + }, + ULID(3): &sourcesAndResolution{ + sources: []ulid.ULID{ULID(3)}, + resolution: 0, + }, }, expected: []ulid.ULID{ ULID(1), @@ -417,10 +431,19 @@ func TestDeduplicateFilter_Filter(t *testing.T) { }, { name: "compacted block with sources in bucket", - input: map[ulid.ULID][]ulid.ULID{ - ULID(6): []ulid.ULID{ULID(6)}, - ULID(4): []ulid.ULID{ULID(1), ULID(3), ULID(2)}, - ULID(5): []ulid.ULID{ULID(5)}, + input: map[ulid.ULID]*sourcesAndResolution{ + ULID(6): &sourcesAndResolution{ + sources: []ulid.ULID{ULID(6)}, + resolution: 0, + }, + ULID(4): &sourcesAndResolution{ + sources: []ulid.ULID{ULID(1), ULID(3), ULID(2)}, + resolution: 0, + }, + ULID(5): &sourcesAndResolution{ + sources: []ulid.ULID{ULID(5)}, + resolution: 0, + }, }, expected: []ulid.ULID{ ULID(4), @@ -430,11 +453,23 @@ func TestDeduplicateFilter_Filter(t *testing.T) { }, { name: "two compacted blocks with same sources", - input: map[ulid.ULID][]ulid.ULID{ - ULID(5): []ulid.ULID{ULID(5)}, - ULID(6): []ulid.ULID{ULID(6)}, - ULID(3): []ulid.ULID{ULID(1), ULID(2)}, - ULID(4): []ulid.ULID{ULID(1), ULID(2)}, + input: map[ulid.ULID]*sourcesAndResolution{ + ULID(5): &sourcesAndResolution{ + sources: []ulid.ULID{ULID(5)}, + resolution: 0, + }, + ULID(6): &sourcesAndResolution{ + sources: []ulid.ULID{ULID(6)}, + resolution: 0, + }, + ULID(3): &sourcesAndResolution{ + sources: []ulid.ULID{ULID(1), ULID(2)}, + resolution: 0, + }, + ULID(4): &sourcesAndResolution{ + sources: []ulid.ULID{ULID(1), ULID(2)}, + resolution: 0, + }, }, expected: []ulid.ULID{ ULID(3), @@ -444,10 +479,19 @@ func TestDeduplicateFilter_Filter(t *testing.T) { }, { name: "two compacted blocks with overlapping sources", - input: map[ulid.ULID][]ulid.ULID{ - ULID(4): []ulid.ULID{ULID(1), ULID(2)}, - ULID(6): []ulid.ULID{ULID(6)}, - ULID(5): []ulid.ULID{ULID(1), ULID(3), ULID(2)}, + input: map[ulid.ULID]*sourcesAndResolution{ + ULID(4): &sourcesAndResolution{ + sources: []ulid.ULID{ULID(1), ULID(2)}, + resolution: 0, + }, + ULID(6): &sourcesAndResolution{ + sources: []ulid.ULID{ULID(6)}, + resolution: 0, + }, + ULID(5): &sourcesAndResolution{ + sources: []ulid.ULID{ULID(1), ULID(3), ULID(2)}, + resolution: 0, + }, }, expected: []ulid.ULID{ ULID(5), @@ -456,12 +500,27 @@ func TestDeduplicateFilter_Filter(t *testing.T) { }, { name: "3 non compacted blocks and compacted block of level 2 in bucket", - input: map[ulid.ULID][]ulid.ULID{ - ULID(6): []ulid.ULID{ULID(6)}, - ULID(1): []ulid.ULID{ULID(1)}, - ULID(2): []ulid.ULID{ULID(2)}, - ULID(3): []ulid.ULID{ULID(3)}, - ULID(4): []ulid.ULID{ULID(2), ULID(1), ULID(3)}, + input: map[ulid.ULID]*sourcesAndResolution{ + ULID(6): &sourcesAndResolution{ + sources: []ulid.ULID{ULID(6)}, + resolution: 0, + }, + ULID(1): &sourcesAndResolution{ + sources: []ulid.ULID{ULID(1)}, + resolution: 0, + }, + ULID(2): &sourcesAndResolution{ + sources: []ulid.ULID{ULID(2)}, + resolution: 0, + }, + ULID(3): &sourcesAndResolution{ + sources: []ulid.ULID{ULID(3)}, + resolution: 0, + }, + ULID(4): &sourcesAndResolution{ + sources: []ulid.ULID{ULID(2), ULID(1), ULID(3)}, + resolution: 0, + }, }, expected: []ulid.ULID{ ULID(4), @@ -470,13 +529,31 @@ func TestDeduplicateFilter_Filter(t *testing.T) { }, { name: "3 compacted blocks of level 2 and one compacted block of level 3 in bucket", - input: map[ulid.ULID][]ulid.ULID{ - ULID(10): []ulid.ULID{ULID(1), ULID(2), ULID(3)}, - ULID(11): []ulid.ULID{ULID(6), ULID(4), ULID(5)}, - ULID(14): []ulid.ULID{ULID(14)}, - ULID(1): []ulid.ULID{ULID(1)}, - ULID(13): []ulid.ULID{ULID(1), ULID(6), ULID(2), ULID(3), ULID(5), ULID(7), ULID(4), ULID(8), ULID(9)}, - ULID(12): []ulid.ULID{ULID(7), ULID(9), ULID(8)}, + input: map[ulid.ULID]*sourcesAndResolution{ + ULID(10): &sourcesAndResolution{ + sources: []ulid.ULID{ULID(1), ULID(2), ULID(3)}, + resolution: 0, + }, + ULID(11): &sourcesAndResolution{ + sources: []ulid.ULID{ULID(6), ULID(4), ULID(5)}, + resolution: 0, + }, + ULID(14): &sourcesAndResolution{ + sources: []ulid.ULID{ULID(14)}, + resolution: 0, + }, + ULID(1): &sourcesAndResolution{ + sources: []ulid.ULID{ULID(1)}, + resolution: 0, + }, + ULID(13): &sourcesAndResolution{ + sources: []ulid.ULID{ULID(1), ULID(6), ULID(2), ULID(3), ULID(5), ULID(7), ULID(4), ULID(8), ULID(9)}, + resolution: 0, + }, + ULID(12): &sourcesAndResolution{ + sources: []ulid.ULID{ULID(7), ULID(9), ULID(8)}, + resolution: 0, + }, }, expected: []ulid.ULID{ ULID(14), @@ -485,12 +562,27 @@ func TestDeduplicateFilter_Filter(t *testing.T) { }, { name: "compacted blocks with overlapping sources", - input: map[ulid.ULID][]ulid.ULID{ - ULID(8): []ulid.ULID{ULID(1), ULID(3), ULID(2), ULID(4)}, - ULID(1): []ulid.ULID{ULID(1)}, - ULID(5): []ulid.ULID{ULID(1), ULID(2)}, - ULID(6): []ulid.ULID{ULID(1), ULID(3), ULID(2), ULID(4)}, - ULID(7): []ulid.ULID{ULID(3), ULID(1), ULID(2)}, + input: map[ulid.ULID]*sourcesAndResolution{ + ULID(8): &sourcesAndResolution{ + sources: []ulid.ULID{ULID(1), ULID(3), ULID(2), ULID(4)}, + resolution: 0, + }, + ULID(1): &sourcesAndResolution{ + sources: []ulid.ULID{ULID(1)}, + resolution: 0, + }, + ULID(5): &sourcesAndResolution{ + sources: []ulid.ULID{ULID(1), ULID(2)}, + resolution: 0, + }, + ULID(6): &sourcesAndResolution{ + sources: []ulid.ULID{ULID(1), ULID(3), ULID(2), ULID(4)}, + resolution: 0, + }, + ULID(7): &sourcesAndResolution{ + sources: []ulid.ULID{ULID(3), ULID(1), ULID(2)}, + resolution: 0, + }, }, expected: []ulid.ULID{ ULID(6), @@ -498,10 +590,19 @@ func TestDeduplicateFilter_Filter(t *testing.T) { }, { name: "compacted blocks of level 3 with overlapping sources of equal length", - input: map[ulid.ULID][]ulid.ULID{ - ULID(10): []ulid.ULID{ULID(1), ULID(2), ULID(6), ULID(7)}, - ULID(1): []ulid.ULID{ULID(1)}, - ULID(11): []ulid.ULID{ULID(6), ULID(8), ULID(1), ULID(2)}, + input: map[ulid.ULID]*sourcesAndResolution{ + ULID(10): &sourcesAndResolution{ + sources: []ulid.ULID{ULID(1), ULID(2), ULID(6), ULID(7)}, + resolution: 0, + }, + ULID(1): &sourcesAndResolution{ + sources: []ulid.ULID{ULID(1)}, + resolution: 0, + }, + ULID(11): &sourcesAndResolution{ + sources: []ulid.ULID{ULID(6), ULID(8), ULID(1), ULID(2)}, + resolution: 0, + }, }, expected: []ulid.ULID{ ULID(10), @@ -510,29 +611,128 @@ func TestDeduplicateFilter_Filter(t *testing.T) { }, { name: "compacted blocks of level 3 with overlapping sources of different length", - input: map[ulid.ULID][]ulid.ULID{ - ULID(10): []ulid.ULID{ULID(6), ULID(7), ULID(1), ULID(2)}, - ULID(1): []ulid.ULID{ULID(1)}, - ULID(5): []ulid.ULID{ULID(1), ULID(2)}, - ULID(11): []ulid.ULID{ULID(2), ULID(3), ULID(1)}, + input: map[ulid.ULID]*sourcesAndResolution{ + ULID(10): &sourcesAndResolution{ + sources: []ulid.ULID{ULID(6), ULID(7), ULID(1), ULID(2)}, + resolution: 0, + }, + ULID(1): &sourcesAndResolution{ + sources: []ulid.ULID{ULID(1)}, + resolution: 0, + }, + ULID(5): &sourcesAndResolution{ + sources: []ulid.ULID{ULID(1), ULID(2)}, + resolution: 0, + }, + ULID(11): &sourcesAndResolution{ + sources: []ulid.ULID{ULID(2), ULID(3), ULID(1)}, + resolution: 0, + }, }, expected: []ulid.ULID{ ULID(10), ULID(11), }, }, + { + name: "blocks with same sources and different resolutions", + input: map[ulid.ULID]*sourcesAndResolution{ + ULID(1): &sourcesAndResolution{ + sources: []ulid.ULID{ULID(1)}, + resolution: 0, + }, + ULID(2): &sourcesAndResolution{ + sources: []ulid.ULID{ULID(1)}, + resolution: 1000, + }, + ULID(3): &sourcesAndResolution{ + sources: []ulid.ULID{ULID(1)}, + resolution: 10000, + }, + }, + expected: []ulid.ULID{ + ULID(1), + ULID(2), + ULID(3), + }, + }, + { + name: "compacted blocks with overlapping sources and different resolutions", + input: map[ulid.ULID]*sourcesAndResolution{ + ULID(1): &sourcesAndResolution{ + sources: []ulid.ULID{ULID(1)}, + resolution: 0, + }, + ULID(6): &sourcesAndResolution{ + sources: []ulid.ULID{ULID(6)}, + resolution: 10000, + }, + ULID(4): &sourcesAndResolution{ + sources: []ulid.ULID{ULID(1), ULID(3), ULID(2)}, + resolution: 0, + }, + ULID(5): &sourcesAndResolution{ + sources: []ulid.ULID{ULID(2), ULID(3), ULID(1)}, + resolution: 1000, + }, + }, + expected: []ulid.ULID{ + ULID(4), + ULID(5), + ULID(6), + }, + }, + { + name: "compacted blocks of level 3 with overlapping sources of different length and different resolutions", + input: map[ulid.ULID]*sourcesAndResolution{ + ULID(10): &sourcesAndResolution{ + sources: []ulid.ULID{ULID(7), ULID(5), ULID(1), ULID(2)}, + resolution: 0, + }, + ULID(12): &sourcesAndResolution{ + sources: []ulid.ULID{ULID(6), ULID(7), ULID(1)}, + resolution: 10000, + }, + ULID(1): &sourcesAndResolution{ + sources: []ulid.ULID{ULID(1)}, + resolution: 0, + }, + ULID(13): &sourcesAndResolution{ + sources: []ulid.ULID{ULID(1)}, + resolution: 10000, + }, + ULID(5): &sourcesAndResolution{ + sources: []ulid.ULID{ULID(1), ULID(2)}, + resolution: 0, + }, + ULID(11): &sourcesAndResolution{ + sources: []ulid.ULID{ULID(2), ULID(3), ULID(1)}, + resolution: 0, + }, + }, + expected: []ulid.ULID{ + ULID(10), + ULID(11), + ULID(12), + }, + }, } { f := NewDeduplicateFilter() if ok := t.Run(tcase.name, func(t *testing.T) { synced := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"state"}) metas := make(map[ulid.ULID]*metadata.Meta) inputLen := len(tcase.input) - for id, sources := range tcase.input { + for id, metaInfo := range tcase.input { metas[id] = &metadata.Meta{ BlockMeta: tsdb.BlockMeta{ ULID: id, Compaction: tsdb.BlockMetaCompaction{ - Sources: sources, + Sources: metaInfo.sources, + }, + }, + Thanos: metadata.Thanos{ + Downsample: metadata.ThanosDownsample{ + Resolution: metaInfo.resolution, }, }, } diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 8c02f11c00..3a963783f8 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -49,6 +49,7 @@ type Syncer struct { metrics *syncerMetrics acceptMalformedIndex bool enableVerticalCompaction bool + duplicateBlocksFilter *block.DeduplicateFilter } type syncerMetrics struct { @@ -123,19 +124,20 @@ func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics { // NewMetaSyncer returns a new Syncer for the given Bucket and directory. // Blocks must be at least as old as the sync delay for being considered. -func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, fetcher block.MetadataFetcher, blockSyncConcurrency int, acceptMalformedIndex bool, enableVerticalCompaction bool) (*Syncer, error) { +func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, fetcher block.MetadataFetcher, duplicateBlocksFilter *block.DeduplicateFilter, blockSyncConcurrency int, acceptMalformedIndex bool, enableVerticalCompaction bool) (*Syncer, error) { if logger == nil { logger = log.NewNopLogger() } return &Syncer{ - logger: logger, - reg: reg, - bkt: bkt, - fetcher: fetcher, - blocks: map[ulid.ULID]*metadata.Meta{}, - metrics: newSyncerMetrics(reg), - blockSyncConcurrency: blockSyncConcurrency, - acceptMalformedIndex: acceptMalformedIndex, + logger: logger, + reg: reg, + bkt: bkt, + fetcher: fetcher, + blocks: map[ulid.ULID]*metadata.Meta{}, + metrics: newSyncerMetrics(reg), + duplicateBlocksFilter: duplicateBlocksFilter, + blockSyncConcurrency: blockSyncConcurrency, + acceptMalformedIndex: acceptMalformedIndex, // The syncer offers an option to enable vertical compaction, even if it's // not currently used by Thanos, because the compactor is also used by Cortex // which needs vertical compaction. @@ -225,95 +227,14 @@ func (s *Syncer) Groups() (res []*Group, err error) { // GarbageCollect deletes blocks from the bucket if their data is available as part of a // block with a higher compaction level. +// Call to SyncMetas function is required to populate duplicateIDs in duplicateBlocksFilter. func (s *Syncer) GarbageCollect(ctx context.Context) error { s.mtx.Lock() defer s.mtx.Unlock() begin := time.Now() - // Run a separate round of garbage collections for each valid resolution. - for _, res := range []int64{ - downsample.ResLevel0, downsample.ResLevel1, downsample.ResLevel2, - } { - err := s.garbageCollect(ctx, res) - if err != nil { - s.metrics.garbageCollectionFailures.Inc() - } - s.metrics.garbageCollections.Inc() - s.metrics.garbageCollectionDuration.Observe(time.Since(begin).Seconds()) - - if err != nil { - return errors.Wrapf(err, "garbage collect resolution %d", res) - } - } - return nil -} - -func (s *Syncer) GarbageBlocks(resolution int64) (ids []ulid.ULID, err error) { - // Map each block to its highest priority parent. Initial blocks have themselves - // in their source section, i.e. are their own parent. - parents := map[ulid.ULID]ulid.ULID{} - - for id, meta := range s.blocks { - // Skip any block that has a different resolution. - if meta.Thanos.Downsample.Resolution != resolution { - continue - } - - // For each source block we contain, check whether we are the highest priority parent block. - for _, sid := range meta.Compaction.Sources { - pid, ok := parents[sid] - // No parents for the source block so far. - if !ok { - parents[sid] = id - continue - } - pmeta, ok := s.blocks[pid] - if !ok { - return nil, errors.Errorf("previous parent block %s not found", pid) - } - // The current block is the higher priority parent for the source if its - // compaction level is higher than that of the previously set parent. - // If compaction levels are equal, the more recent ULID wins. - // - // The ULID recency alone is not sufficient since races, e.g. induced - // by downtime of garbage collection, may re-compact blocks that are - // were already compacted into higher-level blocks multiple times. - level, plevel := meta.Compaction.Level, pmeta.Compaction.Level - - if level > plevel || (level == plevel && id.Compare(pid) > 0) { - parents[sid] = id - } - } - } - - // A block can safely be deleted if they are not the highest priority parent for - // any source block. - topParents := map[ulid.ULID]struct{}{} - for _, pid := range parents { - topParents[pid] = struct{}{} - } - - for id, meta := range s.blocks { - // Skip any block that has a different resolution. - if meta.Thanos.Downsample.Resolution != resolution { - continue - } - if _, ok := topParents[id]; ok { - continue - } - - ids = append(ids, id) - } - return ids, nil -} - -func (s *Syncer) garbageCollect(ctx context.Context, resolution int64) error { - garbageIds, err := s.GarbageBlocks(resolution) - if err != nil { - return err - } - + garbageIds := s.duplicateBlocksFilter.DuplicateIDs() for _, id := range garbageIds { if ctx.Err() != nil { return ctx.Err() @@ -327,6 +248,7 @@ func (s *Syncer) garbageCollect(ctx context.Context, resolution int64) error { err := block.Delete(delCtx, s.logger, s.bkt, id) cancel() if err != nil { + s.metrics.garbageCollectionFailures.Inc() return retry(errors.Wrapf(err, "delete block %s from bucket", id)) } @@ -335,6 +257,8 @@ func (s *Syncer) garbageCollect(ctx context.Context, resolution int64) error { delete(s.blocks, id) s.metrics.garbageCollectedBlocks.Inc() } + s.metrics.garbageCollections.Inc() + s.metrics.garbageCollectionDuration.Observe(time.Since(begin).Seconds()) return nil } diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index 56869e5bdc..43ce297434 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -9,7 +9,6 @@ import ( "encoding/json" "fmt" "io/ioutil" - "math/rand" "os" "path" "path/filepath" @@ -19,12 +18,10 @@ import ( "github.com/go-kit/kit/log" "github.com/oklog/ulid" - "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" promtest "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/tsdb" - "github.com/prometheus/prometheus/tsdb/index" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/objstore" @@ -91,10 +88,13 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) { testutil.Ok(t, bkt.Upload(ctx, path.Join(m.ULID.String(), metadata.MetaFilename), &buf)) } - metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil) + duplicateBlocksFilter := block.NewDeduplicateFilter() + metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, + duplicateBlocksFilter.Filter, + ) testutil.Ok(t, err) - sy, err := NewSyncer(nil, nil, bkt, metaFetcher, 1, false, false) + sy, err := NewSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, 1, false, false) testutil.Ok(t, err) // Do one initial synchronization with the bucket. @@ -164,10 +164,13 @@ func TestGroup_Compact_e2e(t *testing.T) { reg := prometheus.NewRegistry() - metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil) + duplicateBlocksFilter := block.NewDeduplicateFilter() + metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, + duplicateBlocksFilter.Filter, + ) testutil.Ok(t, err) - sy, err := NewSyncer(nil, nil, bkt, metaFetcher, 5, false, false) + sy, err := NewSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, 5, false, false) testutil.Ok(t, err) comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, []int64{1000, 3000}, nil) @@ -382,7 +385,7 @@ func createAndUpload(t testing.TB, bkt objstore.Bucket, blocks []blockgenSpec) ( var id ulid.ULID var err error if b.numSamples == 0 { - id, err = createEmptyBlock(prepareDir, b.mint, b.maxt, b.extLset, b.res) + id, err = e2eutil.CreateEmptyBlock(prepareDir, b.mint, b.maxt, b.extLset, b.res) } else { id, err = e2eutil.CreateBlock(ctx, prepareDir, b.series, b.numSamples, b.mint, b.maxt, b.extLset, b.res) } @@ -396,56 +399,3 @@ func createAndUpload(t testing.TB, bkt objstore.Bucket, blocks []blockgenSpec) ( } return metas } - -// createEmptyBlock produces empty block like it was the case before fix: https://github.com/prometheus/tsdb/pull/374. -// (Prometheus pre v2.7.0). -func createEmptyBlock(dir string, mint int64, maxt int64, extLset labels.Labels, resolution int64) (ulid.ULID, error) { - entropy := rand.New(rand.NewSource(time.Now().UnixNano())) - uid := ulid.MustNew(ulid.Now(), entropy) - - if err := os.Mkdir(path.Join(dir, uid.String()), os.ModePerm); err != nil { - return ulid.ULID{}, errors.Wrap(err, "close index") - } - - if err := os.Mkdir(path.Join(dir, uid.String(), "chunks"), os.ModePerm); err != nil { - return ulid.ULID{}, errors.Wrap(err, "close index") - } - - w, err := index.NewWriter(context.Background(), path.Join(dir, uid.String(), "index")) - if err != nil { - return ulid.ULID{}, errors.Wrap(err, "new index") - } - - if err := w.Close(); err != nil { - return ulid.ULID{}, errors.Wrap(err, "close index") - } - - m := tsdb.BlockMeta{ - Version: 1, - ULID: uid, - MinTime: mint, - MaxTime: maxt, - Compaction: tsdb.BlockMetaCompaction{ - Level: 1, - Sources: []ulid.ULID{uid}, - }, - } - b, err := json.Marshal(&m) - if err != nil { - return ulid.ULID{}, err - } - - if err := ioutil.WriteFile(path.Join(dir, uid.String(), "meta.json"), b, os.ModePerm); err != nil { - return ulid.ULID{}, errors.Wrap(err, "saving meta.json") - } - - if _, err = metadata.InjectThanos(log.NewNopLogger(), filepath.Join(dir, uid.String()), metadata.Thanos{ - Labels: extLset.Map(), - Downsample: metadata.ThanosDownsample{Resolution: resolution}, - Source: metadata.TestSource, - }, nil); err != nil { - return ulid.ULID{}, errors.Wrap(err, "finalize block") - } - - return uid, nil -} diff --git a/pkg/testutil/e2eutil/prometheus.go b/pkg/testutil/e2eutil/prometheus.go index b298ae91f9..fcc6ad2db5 100644 --- a/pkg/testutil/e2eutil/prometheus.go +++ b/pkg/testutil/e2eutil/prometheus.go @@ -5,6 +5,7 @@ package e2eutil import ( "context" + "encoding/json" "fmt" "io/ioutil" "math" @@ -12,6 +13,7 @@ import ( "net/http" "os" "os/exec" + "path" "path/filepath" "runtime" "strings" @@ -24,6 +26,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/index" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/testutil" @@ -277,6 +280,131 @@ func (p *Prometheus) Appender() tsdb.Appender { return p.db.Appender() } +// CreateEmptyBlock produces empty block like it was the case before fix: https://github.com/prometheus/tsdb/pull/374. +// (Prometheus pre v2.7.0). +func CreateEmptyBlock(dir string, mint int64, maxt int64, extLset labels.Labels, resolution int64) (ulid.ULID, error) { + entropy := rand.New(rand.NewSource(time.Now().UnixNano())) + uid := ulid.MustNew(ulid.Now(), entropy) + + if err := os.Mkdir(path.Join(dir, uid.String()), os.ModePerm); err != nil { + return ulid.ULID{}, errors.Wrap(err, "close index") + } + + if err := os.Mkdir(path.Join(dir, uid.String(), "chunks"), os.ModePerm); err != nil { + return ulid.ULID{}, errors.Wrap(err, "close index") + } + + w, err := index.NewWriter(context.Background(), path.Join(dir, uid.String(), "index")) + if err != nil { + return ulid.ULID{}, errors.Wrap(err, "new index") + } + + if err := w.Close(); err != nil { + return ulid.ULID{}, errors.Wrap(err, "close index") + } + + m := tsdb.BlockMeta{ + Version: 1, + ULID: uid, + MinTime: mint, + MaxTime: maxt, + Compaction: tsdb.BlockMetaCompaction{ + Level: 1, + Sources: []ulid.ULID{uid}, + }, + } + b, err := json.Marshal(&m) + if err != nil { + return ulid.ULID{}, err + } + + if err := ioutil.WriteFile(path.Join(dir, uid.String(), "meta.json"), b, os.ModePerm); err != nil { + return ulid.ULID{}, errors.Wrap(err, "saving meta.json") + } + + if _, err = metadata.InjectThanos(log.NewNopLogger(), filepath.Join(dir, uid.String()), metadata.Thanos{ + Labels: extLset.Map(), + Downsample: metadata.ThanosDownsample{Resolution: resolution}, + Source: metadata.TestSource, + }, nil); err != nil { + return ulid.ULID{}, errors.Wrap(err, "finalize block") + } + + return uid, nil +} + +// CreateBlockWithBlockDelay writes a block with the given series and numSamples samples each. +// Samples will be in the time range [mint, maxt) +// Block ID will be created with a delay of time duration blockDelay. +func CreateBlockWithBlockDelay( + ctx context.Context, + dir string, + series []labels.Labels, + numSamples int, + mint, maxt int64, + blockDelay time.Duration, + extLset labels.Labels, + resolution int64, +) (id ulid.ULID, err error) { + blockID, err := createBlock(ctx, dir, series, numSamples, mint, maxt, extLset, resolution, false) + if err != nil { + return id, errors.Wrap(err, "block creation") + } + + id, err = ulid.New(uint64(time.Unix(int64(blockID.Time()), 0).Add(-blockDelay*1000).Unix()), nil) + if err != nil { + return id, errors.Wrap(err, "create block id") + } + + if blockID.Compare(id) == 0 { + return + } + + metaFile := path.Join(dir, blockID.String(), "meta.json") + r, err := os.Open(metaFile) + if err != nil { + return id, errors.Wrap(err, "open meta file") + } + + metaContent, err := ioutil.ReadAll(r) + if err != nil { + return id, errors.Wrap(err, "read meta file") + } + + m := &metadata.Meta{} + if err := json.Unmarshal(metaContent, m); err != nil { + return id, errors.Wrap(err, "meta.json corrupted") + } + m.ULID = id + m.Compaction.Sources = []ulid.ULID{id} + + if err := os.MkdirAll(path.Join(dir, id.String()), 0777); err != nil { + return id, errors.Wrap(err, "create directory") + } + + err = copyRecursive(path.Join(dir, blockID.String()), path.Join(dir, id.String())) + if err != nil { + return id, errors.Wrap(err, "copy directory") + } + + err = os.RemoveAll(path.Join(dir, blockID.String())) + if err != nil { + return id, errors.Wrap(err, "delete directory") + } + + jsonMeta, err := json.MarshalIndent(m, "", "\t") + if err != nil { + return id, errors.Wrap(err, "meta marshal") + } + + err = ioutil.WriteFile(path.Join(dir, id.String(), "meta.json"), jsonMeta, 0644) + if err != nil { + return id, errors.Wrap(err, "write meta.json file") + } + + return +} + // CreateBlock writes a block with the given series and numSamples samples each. // Samples will be in the time range [mint, maxt). func CreateBlock( diff --git a/test/e2e/store_gateway_test.go b/test/e2e/store_gateway_test.go index 484fbec325..72c8945480 100644 --- a/test/e2e/store_gateway_test.go +++ b/test/e2e/store_gateway_test.go @@ -75,18 +75,19 @@ func TestStoreGateway(t *testing.T) { series := []labels.Labels{ labels.FromStrings("a", "1", "b", "2"), } + extLset := labels.FromStrings("ext1", "value1", "replica", "1") extLset2 := labels.FromStrings("ext1", "value1", "replica", "2") extLset3 := labels.FromStrings("ext1", "value2", "replica", "3") now := time.Now() - id1, err := e2eutil.CreateBlock(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), extLset, 0) + id1, err := e2eutil.CreateBlockWithBlockDelay(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), 30*time.Minute, extLset, 0) testutil.Ok(t, err) - id2, err := e2eutil.CreateBlock(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), extLset2, 0) + id2, err := e2eutil.CreateBlockWithBlockDelay(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), 30*time.Minute, extLset2, 0) testutil.Ok(t, err) - id3, err := e2eutil.CreateBlock(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), extLset3, 0) + id3, err := e2eutil.CreateBlockWithBlockDelay(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), 30*time.Minute, extLset3, 0) testutil.Ok(t, err) l := log.NewLogfmtLogger(os.Stdout)