diff --git a/CHANGELOG.md b/CHANGELOG.md index bde408da39..411cd09209 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#7248](https://github.com/thanos-io/thanos/pull/7248) Receive: Fix RemoteWriteAsync was sequentially executed causing high latency in the ingestion path. - [#7271](https://github.com/thanos-io/thanos/pull/7271) Query: fixing dedup iterator when working on mixed sample types. - [#7289](https://github.com/thanos-io/thanos/pull/7289) Query Frontend: show warnings from downstream queries. +- [#7308](https://github.com/thanos-io/thanos/pull/7308) Store: Batch TSDB Infos for blocks. ### Added diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 1951c217bf..9c95285a55 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -42,7 +42,6 @@ import ( "google.golang.org/grpc/status" "github.com/thanos-io/objstore" - "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/indexheader" "github.com/thanos-io/thanos/pkg/block/metadata" @@ -874,18 +873,39 @@ func (s *BucketStore) TSDBInfos() []infopb.TSDBInfo { s.mtx.RLock() defer s.mtx.RUnlock() - infos := make([]infopb.TSDBInfo, 0, len(s.blocks)) + infoMap := make(map[uint64][]infopb.TSDBInfo, len(s.blocks)) for _, b := range s.blocks { - infos = append(infos, infopb.TSDBInfo{ + lbls := labels.FromMap(b.meta.Thanos.Labels) + hash := lbls.Hash() + infoMap[hash] = append(infoMap[hash], infopb.TSDBInfo{ Labels: labelpb.ZLabelSet{ - Labels: labelpb.ZLabelsFromPromLabels(labels.FromMap(b.meta.Thanos.Labels)), + Labels: labelpb.ZLabelsFromPromLabels(lbls), }, MinTime: b.meta.MinTime, MaxTime: b.meta.MaxTime, }) } - return infos + // join adjacent blocks so we emit less TSDBInfos + res := make([]infopb.TSDBInfo, 0, len(s.blocks)) + for _, infos := range infoMap { + sort.Slice(infos, func(i, j int) bool { return infos[i].MinTime < infos[j].MinTime }) + + cur := infos[0] + for i, info := range infos { + if info.MinTime > cur.MaxTime { + res = append(res, cur) + cur = info + continue + } + cur.MaxTime = info.MaxTime + if i == len(infos)-1 { + res = append(res, cur) + } + } + } + + return res } func (s *BucketStore) LabelSet() []labelpb.ZLabelSet { diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 6abfe75b22..60489256cf 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -48,13 +48,13 @@ import ( "github.com/thanos-io/objstore" "github.com/thanos-io/objstore/providers/filesystem" - "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/indexheader" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact" "github.com/thanos-io/thanos/pkg/compact/downsample" "github.com/thanos-io/thanos/pkg/gate" + "github.com/thanos-io/thanos/pkg/info/infopb" "github.com/thanos-io/thanos/pkg/pool" storecache "github.com/thanos-io/thanos/pkg/store/cache" "github.com/thanos-io/thanos/pkg/store/hintspb" @@ -636,6 +636,83 @@ func TestBucketStoreConfig_validate(t *testing.T) { } } +func TestBucketStore_TSDBInfo(t *testing.T) { + defer custom.TolerantVerifyLeak(t) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + logger := log.NewNopLogger() + dir := t.TempDir() + + bkt := objstore.WithNoopInstr(objstore.NewInMemBucket()) + series := []labels.Labels{labels.FromStrings("a", "1", "b", "1")} + + for _, tt := range []struct { + mint, maxt int64 + extLabels labels.Labels + }{ + {mint: 0, maxt: 1000, extLabels: labels.FromStrings("a", "b")}, + {mint: 1000, maxt: 2000, extLabels: labels.FromStrings("a", "b")}, + {mint: 3000, maxt: 4000, extLabels: labels.FromStrings("a", "b")}, + {mint: 3500, maxt: 5000, extLabels: labels.FromStrings("a", "b")}, + {mint: 0, maxt: 1000, extLabels: labels.FromStrings("a", "c")}, + {mint: 500, maxt: 2000, extLabels: labels.FromStrings("a", "c")}, + } { + id1, err := e2eutil.CreateBlock(ctx, dir, series, 10, tt.mint, tt.maxt, tt.extLabels, 0, metadata.NoneFunc) + testutil.Ok(t, err) + testutil.Ok(t, block.Upload(ctx, logger, bkt, filepath.Join(dir, id1.String()), metadata.NoneFunc)) + } + + baseBlockIDsFetcher := block.NewConcurrentLister(logger, bkt) + metaFetcher, err := block.NewMetaFetcher(logger, 20, bkt, baseBlockIDsFetcher, dir, nil, []block.MetadataFilter{ + block.NewTimePartitionMetaFilter(allowAllFilterConf.MinTime, allowAllFilterConf.MaxTime), + }) + testutil.Ok(t, err) + + chunkPool, err := NewDefaultChunkBytesPool(2e5) + testutil.Ok(t, err) + + bucketStore, err := NewBucketStore( + objstore.WithNoopInstr(bkt), + metaFetcher, + dir, + NewChunksLimiterFactory(0), + NewSeriesLimiterFactory(0), + NewBytesLimiterFactory(0), + NewGapBasedPartitioner(PartitionerMaxGapSize), + 20, + true, + DefaultPostingOffsetInMemorySampling, + false, + false, + 0, + WithChunkPool(chunkPool), + WithFilterConfig(allowAllFilterConf), + ) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, bucketStore.Close()) }() + + testutil.Ok(t, bucketStore.SyncBlocks(ctx)) + testutil.Equals(t, bucketStore.TSDBInfos(), []infopb.TSDBInfo{ + { + Labels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{{Name: "a", Value: "b"}}}, + MinTime: 0, + MaxTime: 2000, + }, + { + Labels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{{Name: "a", Value: "b"}}}, + MinTime: 3000, + MaxTime: 5000, + }, + { + Labels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{{Name: "a", Value: "c"}}}, + MinTime: 0, + MaxTime: 2000, + }, + }) +} + func TestBucketStore_Info(t *testing.T) { defer custom.TolerantVerifyLeak(t)