diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 05e82607e3..52e4033ddb 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -832,9 +832,10 @@ type seriesEntry struct { // single TSDB block in object storage. type blockSeriesClient struct { grpc.ClientStream - ctx context.Context - logger log.Logger - extLset labels.Labels + ctx context.Context + logger log.Logger + extLset labels.Labels + extLsetToRemove map[string]struct{} mint int64 maxt int64 @@ -884,9 +885,11 @@ func newBlockSeriesClient( } return &blockSeriesClient{ - ctx: ctx, - logger: logger, - extLset: extLset, + ctx: ctx, + logger: logger, + extLset: extLset, + extLsetToRemove: extLsetToRemove, + mint: req.MinTime, maxt: req.MaxTime, indexr: b.indexReader(), @@ -1006,6 +1009,10 @@ func (b *blockSeriesClient) nextBatch() error { } completeLabelset := labelpb.ExtendSortedLabels(b.lset, b.extLset) + if b.extLsetToRemove != nil { + completeLabelset = rmLabels(completeLabelset, b.extLsetToRemove) + } + if !b.shardMatcher.MatchesLabels(completeLabelset) { continue } diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 28b96025db..a1f83f90ea 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -1881,6 +1881,170 @@ func TestSeries_BlockWithMultipleChunks(t *testing.T) { } } +func TestSeries_SeriesSortedWithoutReplicaLabels(t *testing.T) { + tests := map[string]struct { + series [][]labels.Labels + replicaLabels []string + expectedSeries []labels.Labels + }{ + "use TSDB label as replica label": { + series: [][]labels.Labels{ + { + labels.FromStrings("a", "1", "replica", "1", "z", "1"), + labels.FromStrings("a", "1", "replica", "1", "z", "2"), + labels.FromStrings("a", "1", "replica", "2", "z", "1"), + labels.FromStrings("a", "1", "replica", "2", "z", "2"), + labels.FromStrings("a", "2", "replica", "1", "z", "1"), + labels.FromStrings("a", "2", "replica", "2", "z", "1"), + }, + { + labels.FromStrings("a", "1", "replica", "3", "z", "1"), + labels.FromStrings("a", "1", "replica", "3", "z", "2"), + labels.FromStrings("a", "2", "replica", "3", "z", "1"), + }, + }, + replicaLabels: []string{"replica"}, + expectedSeries: []labels.Labels{ + labels.FromStrings("a", "1", "ext1", "0", "z", "1"), + labels.FromStrings("a", "1", "ext1", "0", "z", "2"), + labels.FromStrings("a", "1", "ext1", "0", "z", "1"), + labels.FromStrings("a", "1", "ext1", "0", "z", "2"), + labels.FromStrings("a", "1", "ext1", "1", "z", "1"), + labels.FromStrings("a", "1", "ext1", "1", "z", "2"), + labels.FromStrings("a", "2", "ext1", "0", "z", "1"), + labels.FromStrings("a", "2", "ext1", "1", "z", "1"), + }, + }, + "use external label as replica label": { + series: [][]labels.Labels{ + { + labels.FromStrings("a", "1", "replica", "1", "z", "1"), + labels.FromStrings("a", "1", "replica", "1", "z", "2"), + labels.FromStrings("a", "1", "replica", "2", "z", "1"), + labels.FromStrings("a", "1", "replica", "2", "z", "2"), + }, + { + labels.FromStrings("a", "1", "replica", "1", "z", "1"), + labels.FromStrings("a", "1", "replica", "1", "z", "2"), + }, + }, + replicaLabels: []string{"ext1"}, + expectedSeries: []labels.Labels{ + labels.FromStrings("a", "1", "replica", "1", "z", "1"), + labels.FromStrings("a", "1", "replica", "1", "z", "2"), + labels.FromStrings("a", "1", "replica", "2", "z", "1"), + labels.FromStrings("a", "1", "replica", "2", "z", "2"), + }, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + tb := testutil.NewTB(t) + + tmpDir := t.TempDir() + + bktDir := filepath.Join(tmpDir, "bucket") + bkt, err := filesystem.NewBucket(bktDir) + testutil.Ok(t, err) + defer testutil.Ok(t, bkt.Close()) + + instrBkt := objstore.WithNoopInstr(bkt) + logger := log.NewNopLogger() + + for i, series := range testData.series { + replicaVal := strconv.Itoa(i) + head := uploadSeriesToBucket(t, bkt, replicaVal, filepath.Join(tmpDir, replicaVal), series) + defer testutil.Ok(t, head.Close()) + } + + // Instance a real bucket store we'll use to query the series. + fetcher, err := block.NewMetaFetcher(logger, 10, instrBkt, tmpDir, nil, nil) + testutil.Ok(tb, err) + + indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(logger, nil, storecache.InMemoryIndexCacheConfig{}) + testutil.Ok(tb, err) + + store, err := NewBucketStore( + instrBkt, + fetcher, + tmpDir, + NewChunksLimiterFactory(100000/MaxSamplesPerChunk), + NewSeriesLimiterFactory(0), + NewBytesLimiterFactory(0), + NewGapBasedPartitioner(PartitionerMaxGapSize), + 10, + false, + DefaultPostingOffsetInMemorySampling, + true, + false, + 0, + WithLogger(logger), + WithIndexCache(indexCache), + ) + testutil.Ok(tb, err) + testutil.Ok(tb, store.SyncBlocks(context.Background())) + + req := &storepb.SeriesRequest{ + MinTime: math.MinInt, + MaxTime: math.MaxInt64, + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_RE, Name: "a", Value: ".+"}, + }, + WithoutReplicaLabels: testData.replicaLabels, + } + + srv := newStoreSeriesServer(context.Background()) + err = store.Series(req, srv) + testutil.Ok(t, err) + testutil.Assert(t, len(srv.SeriesSet) == len(testData.expectedSeries)) + + var response []labels.Labels + for _, respSeries := range srv.SeriesSet { + promLabels := labelpb.ZLabelsToPromLabels(respSeries.Labels) + response = append(response, promLabels) + } + + testutil.Equals(t, testData.expectedSeries, response) + }) + } +} + +func uploadSeriesToBucket(t *testing.T, bkt *filesystem.Bucket, replica string, path string, series []labels.Labels) *tsdb.Head { + headOpts := tsdb.DefaultHeadOptions() + headOpts.ChunkDirRoot = filepath.Join(path, "block") + + h, err := tsdb.NewHead(nil, nil, nil, nil, headOpts, nil) + testutil.Ok(t, err) + + for _, s := range series { + for ts := int64(0); ts < 100; ts++ { + // Appending a single sample is very unoptimised, but guarantees each chunk is always MaxSamplesPerChunk + // (except the last one, which could be smaller). + app := h.Appender(context.Background()) + _, err := app.Append(0, s, ts, float64(ts)) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + } + } + + blk := storetestutil.CreateBlockFromHead(t, headOpts.ChunkDirRoot, h) + + thanosMeta := metadata.Thanos{ + Labels: labels.Labels{{Name: "ext1", Value: replica}}.Map(), + Downsample: metadata.ThanosDownsample{Resolution: 0}, + Source: metadata.TestSource, + } + + _, err = metadata.InjectThanos(log.NewNopLogger(), filepath.Join(headOpts.ChunkDirRoot, blk.String()), thanosMeta, nil) + testutil.Ok(t, err) + + testutil.Ok(t, block.Upload(context.Background(), log.NewNopLogger(), bkt, filepath.Join(headOpts.ChunkDirRoot, blk.String()), metadata.NoneFunc)) + testutil.Ok(t, err) + + return h +} + func mustMarshalAny(pb proto.Message) *types.Any { out, err := types.MarshalAny(pb) if err != nil { diff --git a/pkg/store/storepb/testutil/series.go b/pkg/store/storepb/testutil/series.go index aa70498aea..a7acb2b91f 100644 --- a/pkg/store/storepb/testutil/series.go +++ b/pkg/store/storepb/testutil/series.go @@ -17,7 +17,9 @@ import ( "github.com/cespare/xxhash" "github.com/efficientgo/core/testutil" + "github.com/go-kit/log" "github.com/gogo/protobuf/types" + "github.com/oklog/ulid" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" @@ -58,6 +60,19 @@ type HeadGenOptions struct { Random *rand.Rand } +func CreateBlockFromHead(t testing.TB, dir string, head *tsdb.Head) ulid.ULID { + compactor, err := tsdb.NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil, nil) + testutil.Ok(t, err) + + testutil.Ok(t, os.MkdirAll(dir, 0777)) + + // Add +1 millisecond to block maxt because block intervals are half-open: [b.MinTime, b.MaxTime). + // Because of this block intervals are always +1 than the total samples it includes. + ulid, err := compactor.Write(dir, head, head.MinTime(), head.MaxTime()+1, nil) + testutil.Ok(t, err) + return ulid +} + // CreateHeadWithSeries returns head filled with given samples and same series returned in separate list for assertion purposes. // Returned series list has "ext1"="1" prepended. Each series looks as follows: // {foo=bar,i=000001aaaaaaaaaabbbbbbbbbbccccccccccdddddddddd} <random value> where number indicate sample number from 0.