diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 5f9405b4b4..04dc03145e 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -365,6 +365,21 @@ func runReceive( grpcserver.WithTLSConfig(tlsCfg), ) + ctx, cancel := context.WithCancel(context.Background()) + level.Debug(logger).Log("msg", "setting up periodic update for label names") + g.Add(func() error { + return runutil.Repeat(10*time.Second, ctx.Done(), func() error { + level.Debug(logger).Log("msg", "Starting label names update") + + dbs.UpdateLabelNames(ctx) + + level.Debug(logger).Log("msg", "Finished label names update") + return nil + }) + }, func(err error) { + cancel() + }) + g.Add( func() error { level.Info(logger).Log("msg", "listening for StoreAPI and WritableStoreAPI gRPC", "address", conf.grpcConfig.bindAddress) diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index ab03f19d39..21147e7532 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -48,6 +48,7 @@ import ( "github.com/thanos-io/thanos/pkg/shipper" "github.com/thanos-io/thanos/pkg/store" "github.com/thanos-io/thanos/pkg/store/labelpb" + "github.com/thanos-io/thanos/pkg/stringset" "github.com/thanos-io/thanos/pkg/targets" "github.com/thanos-io/thanos/pkg/tls" ) @@ -112,8 +113,9 @@ func runSidecar( mint: conf.limitMinTime.PrometheusTimestamp(), maxt: math.MaxInt64, - limitMinTime: conf.limitMinTime, - client: promclient.NewWithTracingClient(logger, httpClient, "thanos-sidecar"), + limitMinTime: conf.limitMinTime, + client: promclient.NewWithTracingClient(logger, httpClient, "thanos-sidecar"), + labelNamesSet: stringset.AllStrings(), } confContentYaml, err := conf.objStore.Content() @@ -237,6 +239,19 @@ func runSidecar( }, func(error) { cancel() }) + + g.Add(func() error { + return runutil.Repeat(10*time.Second, ctx.Done(), func() error { + level.Debug(logger).Log("msg", "Starting label names update") + + m.UpdateLabelNames(context.Background()) + + level.Debug(logger).Log("msg", "Finished label names update") + return nil + }) + }, func(err error) { + cancel() + }) } { ctx, cancel := context.WithCancel(context.Background()) @@ -249,7 +264,7 @@ func runSidecar( { c := promclient.NewWithTracingClient(logger, httpClient, httpconfig.ThanosUserAgent) - promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.Version) + promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.LabelNamesSet, m.Version) if err != nil { return errors.Wrap(err, "create Prometheus store") } @@ -411,15 +426,16 @@ func validatePrometheus(ctx context.Context, client *promclient.Client, logger l type promMetadata struct { promURL *url.URL - mtx sync.Mutex - mint int64 - maxt int64 - labels labels.Labels - promVersion string - + mtx sync.Mutex + mint int64 + maxt int64 + labels labels.Labels + promVersion string limitMinTime thanosmodel.TimeOrDurationValue client *promclient.Client + + labelNamesSet stringset.Set } func (s *promMetadata) UpdateLabels(ctx context.Context) error { @@ -447,6 +463,30 @@ func (s *promMetadata) UpdateTimestamps(mint, maxt int64) { s.maxt = maxt } +func (s *promMetadata) UpdateLabelNames(ctx context.Context) { + mint, _ := s.Timestamps() + labelNames, err := s.client.LabelNamesInGRPC(ctx, s.promURL, nil, mint, time.Now().UnixMilli()) + if err != nil { + s.mtx.Lock() + defer s.mtx.Unlock() + + s.labelNamesSet = stringset.AllStrings() + return + } + + filter := stringset.NewFromStrings(labelNames...) + s.mtx.Lock() + s.labelNamesSet = filter + s.mtx.Unlock() +} + +func (s *promMetadata) LabelNamesSet() stringset.Set { + s.mtx.Lock() + defer s.mtx.Unlock() + + return s.labelNamesSet +} + func (s *promMetadata) Labels() labels.Labels { s.mtx.Lock() defer s.mtx.Unlock() diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 3bc0082da3..62fb14751f 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -495,6 +495,24 @@ func runStore( s.Shutdown(err) }) } + + { + ctx, cancel := context.WithCancel(context.Background()) + level.Debug(logger).Log("msg", "setting up periodic update for label names") + g.Add(func() error { + return runutil.Repeat(10*time.Second, ctx.Done(), func() error { + level.Debug(logger).Log("msg", "Starting label names update") + + bs.UpdateLabelNames() + + level.Debug(logger).Log("msg", "Finished label names update") + return nil + }) + }, func(err error) { + cancel() + }) + + } // Add bucket UI for loaded blocks. { ins := extpromhttp.NewInstrumentationMiddleware(reg, nil) diff --git a/go.mod b/go.mod index 48f90fd33a..abbc2718b6 100644 --- a/go.mod +++ b/go.mod @@ -118,12 +118,14 @@ require ( require ( github.com/onsi/gomega v1.27.10 + github.com/seiflotfy/cuckoofilter v0.0.0-20220411075957-e3b120b3f5fb go.opentelemetry.io/contrib/propagators/autoprop v0.38.0 go4.org/intern v0.0.0-20220617035311-6925f38cc365 golang.org/x/exp v0.0.0-20230321023759-10a507213a29 ) require ( + github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165 // indirect github.com/huaweicloud/huaweicloud-sdk-go-obs v3.23.3+incompatible // indirect github.com/onsi/ginkgo v1.16.5 // indirect go.opentelemetry.io/contrib/propagators/ot v1.13.0 // indirect diff --git a/go.sum b/go.sum index edfc83f79f..b12faf011f 100644 --- a/go.sum +++ b/go.sum @@ -223,6 +223,8 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/dennwc/varint v1.0.0 h1:kGNFFSSw8ToIy3obO/kKr8U9GZYUAxQEVuix4zfDWzE= github.com/dennwc/varint v1.0.0/go.mod h1:hnItb35rvZvJrbTALZtY/iQfDs48JKRG1RPpgziApxA= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= +github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165 h1:BS21ZUJ/B5X2UVUbczfmdWH7GapPWAhxcMsDnjJTU1E= +github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165/go.mod h1:c9O8+fpSOX1DM8cPNSkX/qsBWdkD4yd2dpciOWQjpBw= github.com/digitalocean/godo v1.99.0 h1:gUHO7n9bDaZFWvbzOum4bXE0/09ZuYA9yA8idQHX57E= github.com/dnaeon/go-vcr v1.2.0 h1:zHCHvJYTMh1N7xnV7zf1m1GPBF9Ad0Jk/whtQ1663qI= github.com/dnaeon/go-vcr v1.2.0/go.mod h1:R4UdLID7HZT3taECzJs4YgbbH6PIGXB6W/sc5OLb6RQ= @@ -846,6 +848,8 @@ github.com/santhosh-tekuri/jsonschema v1.2.4/go.mod h1:TEAUOeZSmIxTTuHatJzrvARHi github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b h1:gQZ0qzfKHQIybLANtM3mBXNUtOfsCFXeTsnBqCsx1KM= github.com/scaleway/scaleway-sdk-go v1.0.0-beta.15 h1:Y7xOFbD+3jaPw+VN7lkakNJ/pa+ZSQVFp1ONtJaBxns= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= +github.com/seiflotfy/cuckoofilter v0.0.0-20220411075957-e3b120b3f5fb h1:XfLJSPIOUX+osiMraVgIrMR27uMXnRJWGm1+GL8/63U= +github.com/seiflotfy/cuckoofilter v0.0.0-20220411075957-e3b120b3f5fb/go.mod h1:bR6DqgcAl1zTcOX8/pE2Qkj9XO00eCNqmKb7lXP8EAg= github.com/sercand/kuberesolver v2.4.0+incompatible h1:WE2OlRf6wjLxHwNkkFLQGaZcVLEXjMjBPjjEU5vksH8= github.com/sercand/kuberesolver v2.4.0+incompatible/go.mod h1:lWF3GL0xptCB/vCiJPl/ZshwPsX/n4Y7u0CW9E7aQIQ= github.com/shirou/gopsutil/v3 v3.21.2/go.mod h1:ghfMypLDrFSWN2c9cDYFLHyynQ+QUht0cv/18ZqVczw= diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 2458e7123f..4dadb97343 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -97,40 +97,31 @@ func NewMultiTSDB( type localClient struct { storepb.StoreClient - labelSetFunc func() []labelpb.ZLabelSet - timeRangeFunc func() (int64, int64) - tsdbOpts *tsdb.Options + store *store.TSDBStore } -func NewLocalClient( - c storepb.StoreClient, - labelSetFunc func() []labelpb.ZLabelSet, - timeRangeFunc func() (int64, int64), - tsdbOpts *tsdb.Options, -) store.Client { +func newLocalClient(c storepb.StoreClient, store *store.TSDBStore) *localClient { return &localClient{ - StoreClient: c, - labelSetFunc: labelSetFunc, - timeRangeFunc: timeRangeFunc, - tsdbOpts: tsdbOpts, + StoreClient: c, + store: store, } } func (l *localClient) LabelSets() []labels.Labels { - return labelpb.ZLabelSetsToPromLabelSets(l.labelSetFunc()...) + return labelpb.ZLabelSetsToPromLabelSets(l.store.LabelSet()...) } func (l *localClient) TimeRange() (mint int64, maxt int64) { - return l.timeRangeFunc() + return l.store.TimeRange() } func (l *localClient) TSDBInfos() []infopb.TSDBInfo { - labelsets := l.labelSetFunc() + labelsets := l.store.LabelSet() if len(labelsets) == 0 { return []infopb.TSDBInfo{} } - mint, maxt := l.timeRangeFunc() + mint, maxt := l.store.TimeRange() return []infopb.TSDBInfo{ { Labels: labelsets[0], @@ -141,7 +132,7 @@ func (l *localClient) TSDBInfos() []infopb.TSDBInfo { } func (l *localClient) String() string { - mint, maxt := l.timeRangeFunc() + mint, maxt := l.store.TimeRange() return fmt.Sprintf( "LabelSets: %v MinTime: %d MaxTime: %d", labelpb.PromLabelSetsToString(l.LabelSets()), mint, maxt, @@ -186,7 +177,7 @@ func (t *tenant) store() *store.TSDBStore { return t.storeTSDB } -func (t *tenant) client(logger log.Logger, tsdbOpts *tsdb.Options) store.Client { +func (t *tenant) client(logger log.Logger) store.Client { t.mtx.RLock() defer t.mtx.RUnlock() @@ -196,7 +187,7 @@ func (t *tenant) client(logger log.Logger, tsdbOpts *tsdb.Options) store.Client } client := storepb.ServerAsClient(store.NewRecoverableStoreServer(logger, tsdbStore), 0) - return NewLocalClient(client, tsdbStore.LabelSet, tsdbStore.TimeRange, tsdbOpts) + return newLocalClient(client, tsdbStore) } func (t *tenant) exemplars() *exemplars.TSDB { @@ -495,7 +486,7 @@ func (t *MultiTSDB) TSDBLocalClients() []store.Client { res := make([]store.Client, 0, len(t.tenants)) for _, tenant := range t.tenants { - client := tenant.client(t.logger, t.tsdbOpts) + client := tenant.client(t.logger) if client != nil { res = append(res, client) } @@ -876,6 +867,19 @@ func (t *MultiTSDB) extractTenantsLabels(tenantID string, initialLset labels.Lab return initialLset, nil } +func (t *MultiTSDB) UpdateLabelNames(ctx context.Context) { + t.mtx.RLock() + defer t.mtx.RUnlock() + + for _, tenant := range t.tenants { + db := tenant.storeTSDB + if db == nil { + continue + } + db.UpdateLabelNames(ctx) + } +} + // extendLabels extends external labels of the initial label set. // If an external label shares same name with a label in the initial label set, // use the label in the initial label set and inform user about it. diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 09bfba3206..1763a33e5e 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -43,6 +43,7 @@ import ( "google.golang.org/grpc/status" "github.com/thanos-io/objstore" + "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/indexheader" "github.com/thanos-io/thanos/pkg/block/metadata" @@ -58,6 +59,7 @@ import ( "github.com/thanos-io/thanos/pkg/store/hintspb" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" + "github.com/thanos-io/thanos/pkg/stringset" "github.com/thanos-io/thanos/pkg/strutil" "github.com/thanos-io/thanos/pkg/tenancy" "github.com/thanos-io/thanos/pkg/tracing" @@ -364,6 +366,9 @@ type BucketStore struct { enableChunkHashCalculation bool + bmtx sync.Mutex + labelNamesSet stringset.Set + blockEstimatedMaxSeriesFunc BlockEstimator blockEstimatedMaxChunkFunc BlockEstimator } @@ -510,6 +515,7 @@ func NewBucketStore( enableSeriesResponseHints: enableSeriesResponseHints, enableChunkHashCalculation: enableChunkHashCalculation, seriesBatchSize: SeriesBatchSize, + labelNamesSet: stringset.AllStrings(), } for _, option := range options { @@ -879,9 +885,10 @@ type seriesEntry struct { // single TSDB block in object storage. type blockSeriesClient struct { grpc.ClientStream - ctx context.Context - logger log.Logger - extLset labels.Labels + ctx context.Context + logger log.Logger + extLset labels.Labels + extLsetToRemove map[string]struct{} mint int64 maxt int64 @@ -931,9 +938,11 @@ func newBlockSeriesClient( } return &blockSeriesClient{ - ctx: ctx, - logger: logger, - extLset: extLset, + ctx: ctx, + logger: logger, + extLset: extLset, + extLsetToRemove: extLsetToRemove, + mint: req.MinTime, maxt: req.MaxTime, indexr: b.indexReader(), @@ -1072,6 +1081,10 @@ func (b *blockSeriesClient) nextBatch() error { } completeLabelset := labelpb.ExtendSortedLabels(b.lset, b.extLset) + if b.extLsetToRemove != nil { + completeLabelset = rmLabels(completeLabelset, b.extLsetToRemove) + } + if !b.shardMatcher.MatchesLabels(completeLabelset) { continue } @@ -1240,7 +1253,9 @@ func debugFoundBlockSetOverview(logger log.Logger, mint, maxt, maxResolutionMill } // Series implements the storepb.StoreServer interface. -func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) (err error) { +func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) (err error) { + srv := newFlushableServer(seriesSrv, s.LabelNamesSet(), req.WithoutReplicaLabels) + if s.queryGate != nil { tracing.DoInSpan(srv.Context(), "store_query_gate_ismyturn", func(ctx context.Context) { err = s.queryGate.Start(srv.Context()) @@ -1489,7 +1504,10 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie } } - return err + if err != nil { + return err + } + return srv.Flush() } func chunksSize(chks []storepb.AggrChunk) (size int) { @@ -1674,6 +1692,35 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq }, nil } +func (s *BucketStore) UpdateLabelNames() { + newSet := stringset.New() + for _, b := range s.blocks { + labelNames, err := b.indexHeaderReader.LabelNames() + if err != nil { + level.Warn(s.logger).Log("msg", "error getting label names", "block", b.meta.ULID, "err", err.Error()) + s.updateLabelNamesSet(stringset.AllStrings()) + return + } + for _, l := range labelNames { + newSet.Insert(l) + } + } + s.updateLabelNamesSet(newSet) +} + +func (s *BucketStore) updateLabelNamesSet(newSet stringset.Set) { + s.bmtx.Lock() + s.labelNamesSet = newSet + s.bmtx.Unlock() +} + +func (b *BucketStore) LabelNamesSet() stringset.Set { + b.bmtx.Lock() + defer b.bmtx.Unlock() + + return b.labelNamesSet +} + func (b *bucketBlock) FilterExtLabelsMatchers(matchers []*labels.Matcher) ([]*labels.Matcher, bool) { // We filter external labels from matchers so we won't try to match series on them. var result []*labels.Matcher diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index 57049f6fc4..85b1d597d8 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -30,6 +30,7 @@ import ( "github.com/thanos-io/objstore/objtesting" "github.com/efficientgo/core/testutil" + "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/model" @@ -778,6 +779,29 @@ func TestBucketStore_LabelNames_e2e(t *testing.T) { }) } +func TestBucketStore_LabelNamesSet_e2e(t *testing.T) { + objtesting.ForeachStore(t, func(t *testing.T, bkt objstore.Bucket) { + dir := t.TempDir() + + s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), NewBytesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf) + s.cache.SwapWith(noopCache{}) + + mint, maxt := s.store.TimeRange() + testutil.Equals(t, s.minTime, mint) + testutil.Equals(t, s.maxTime, maxt) + + s.store.UpdateLabelNames() + for _, b := range s.store.blocks { + waitTimeout(t, &b.pendingReaders, 5*time.Second) + } + + filter := s.store.LabelNamesSet() + for _, n := range []string{"a", "b", "c"} { + testutil.Assert(t, filter.Has(n), "expected filter to have %s", n) + } + }) +} + func TestBucketStore_LabelNames_SeriesLimiter_e2e(t *testing.T) { cases := map[string]struct { maxSeriesLimit uint64 diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index fa29c69bc5..0d40f18bc3 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -59,6 +59,7 @@ import ( "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" storetestutil "github.com/thanos-io/thanos/pkg/store/storepb/testutil" + "github.com/thanos-io/thanos/pkg/stringset" "github.com/thanos-io/thanos/pkg/testutil/custom" "github.com/thanos-io/thanos/pkg/testutil/e2eutil" ) @@ -1633,6 +1634,7 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { chunksLimiterFactory: NewChunksLimiterFactory(0), seriesLimiterFactory: NewSeriesLimiterFactory(0), bytesLimiterFactory: NewBytesLimiterFactory(0), + labelNamesSet: stringset.AllStrings(), } t.Run("invoke series for one block. Fill the cache on the way.", func(t *testing.T) { @@ -1990,6 +1992,170 @@ func TestSeries_BlockWithMultipleChunks(t *testing.T) { } } +func TestSeries_SeriesSortedWithoutReplicaLabels(t *testing.T) { + tests := map[string]struct { + series [][]labels.Labels + replicaLabels []string + expectedSeries []labels.Labels + }{ + "use TSDB label as replica label": { + series: [][]labels.Labels{ + { + labels.FromStrings("a", "1", "replica", "1", "z", "1"), + labels.FromStrings("a", "1", "replica", "1", "z", "2"), + labels.FromStrings("a", "1", "replica", "2", "z", "1"), + labels.FromStrings("a", "1", "replica", "2", "z", "2"), + labels.FromStrings("a", "2", "replica", "1", "z", "1"), + labels.FromStrings("a", "2", "replica", "2", "z", "1"), + }, + { + labels.FromStrings("a", "1", "replica", "3", "z", "1"), + labels.FromStrings("a", "1", "replica", "3", "z", "2"), + labels.FromStrings("a", "2", "replica", "3", "z", "1"), + }, + }, + replicaLabels: []string{"replica"}, + expectedSeries: []labels.Labels{ + labels.FromStrings("a", "1", "ext1", "0", "z", "1"), + labels.FromStrings("a", "1", "ext1", "0", "z", "1"), + labels.FromStrings("a", "1", "ext1", "0", "z", "2"), + labels.FromStrings("a", "1", "ext1", "0", "z", "2"), + labels.FromStrings("a", "1", "ext1", "1", "z", "1"), + labels.FromStrings("a", "1", "ext1", "1", "z", "2"), + labels.FromStrings("a", "2", "ext1", "0", "z", "1"), + labels.FromStrings("a", "2", "ext1", "1", "z", "1"), + }, + }, + "use external label as replica label": { + series: [][]labels.Labels{ + { + labels.FromStrings("a", "1", "replica", "1", "z", "1"), + labels.FromStrings("a", "1", "replica", "1", "z", "2"), + labels.FromStrings("a", "1", "replica", "2", "z", "1"), + labels.FromStrings("a", "1", "replica", "2", "z", "2"), + }, + { + labels.FromStrings("a", "1", "replica", "1", "z", "1"), + labels.FromStrings("a", "1", "replica", "1", "z", "2"), + }, + }, + replicaLabels: []string{"ext1"}, + expectedSeries: []labels.Labels{ + labels.FromStrings("a", "1", "replica", "1", "z", "1"), + labels.FromStrings("a", "1", "replica", "1", "z", "2"), + labels.FromStrings("a", "1", "replica", "2", "z", "1"), + labels.FromStrings("a", "1", "replica", "2", "z", "2"), + }, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + tb := testutil.NewTB(t) + + tmpDir := t.TempDir() + + bktDir := filepath.Join(tmpDir, "bucket") + bkt, err := filesystem.NewBucket(bktDir) + testutil.Ok(t, err) + defer testutil.Ok(t, bkt.Close()) + + instrBkt := objstore.WithNoopInstr(bkt) + logger := log.NewNopLogger() + + for i, series := range testData.series { + replicaVal := strconv.Itoa(i) + head := uploadSeriesToBucket(t, bkt, replicaVal, filepath.Join(tmpDir, replicaVal), series) + defer testutil.Ok(t, head.Close()) + } + + // Instance a real bucket store we'll use to query the series. + fetcher, err := block.NewMetaFetcher(logger, 10, instrBkt, tmpDir, nil, nil) + testutil.Ok(tb, err) + + indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(logger, nil, nil, storecache.InMemoryIndexCacheConfig{}) + testutil.Ok(tb, err) + + store, err := NewBucketStore( + instrBkt, + fetcher, + tmpDir, + NewChunksLimiterFactory(100000/MaxSamplesPerChunk), + NewSeriesLimiterFactory(0), + NewBytesLimiterFactory(0), + NewGapBasedPartitioner(PartitionerMaxGapSize), + 10, + false, + DefaultPostingOffsetInMemorySampling, + true, + false, + 0, + WithLogger(logger), + WithIndexCache(indexCache), + ) + testutil.Ok(tb, err) + testutil.Ok(tb, store.SyncBlocks(context.Background())) + + req := &storepb.SeriesRequest{ + MinTime: math.MinInt, + MaxTime: math.MaxInt64, + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_RE, Name: "a", Value: ".+"}, + }, + WithoutReplicaLabels: testData.replicaLabels, + } + + srv := newStoreSeriesServer(context.Background()) + err = store.Series(req, srv) + testutil.Ok(t, err) + testutil.Assert(t, len(srv.SeriesSet) == len(testData.expectedSeries)) + + var response []labels.Labels + for _, respSeries := range srv.SeriesSet { + promLabels := labelpb.ZLabelsToPromLabels(respSeries.Labels) + response = append(response, promLabels) + } + + testutil.Equals(t, testData.expectedSeries, response) + }) + } +} + +func uploadSeriesToBucket(t *testing.T, bkt *filesystem.Bucket, replica string, path string, series []labels.Labels) *tsdb.Head { + headOpts := tsdb.DefaultHeadOptions() + headOpts.ChunkDirRoot = filepath.Join(path, "block") + + h, err := tsdb.NewHead(nil, nil, nil, nil, headOpts, nil) + testutil.Ok(t, err) + + for _, s := range series { + for ts := int64(0); ts < 100; ts++ { + // Appending a single sample is very unoptimised, but guarantees each chunk is always MaxSamplesPerChunk + // (except the last one, which could be smaller). + app := h.Appender(context.Background()) + _, err := app.Append(0, s, ts, float64(ts)) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + } + } + + blk := storetestutil.CreateBlockFromHead(t, headOpts.ChunkDirRoot, h) + + thanosMeta := metadata.Thanos{ + Labels: labels.Labels{{Name: "ext1", Value: replica}}.Map(), + Downsample: metadata.ThanosDownsample{Resolution: 0}, + Source: metadata.TestSource, + } + + _, err = metadata.InjectThanos(log.NewNopLogger(), filepath.Join(headOpts.ChunkDirRoot, blk.String()), thanosMeta, nil) + testutil.Ok(t, err) + + testutil.Ok(t, block.Upload(context.Background(), log.NewNopLogger(), bkt, filepath.Join(headOpts.ChunkDirRoot, blk.String()), metadata.NoneFunc)) + testutil.Ok(t, err) + + return h +} + func mustMarshalAny(pb proto.Message) *types.Any { out, err := types.MarshalAny(pb) if err != nil { diff --git a/pkg/store/flushable.go b/pkg/store/flushable.go new file mode 100644 index 0000000000..c41b67d152 --- /dev/null +++ b/pkg/store/flushable.go @@ -0,0 +1,72 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package store + +import ( + "github.com/prometheus/prometheus/model/labels" + "golang.org/x/exp/slices" + + "github.com/thanos-io/thanos/pkg/store/labelpb" + "github.com/thanos-io/thanos/pkg/store/storepb" + "github.com/thanos-io/thanos/pkg/stringset" +) + +// flushableServer is an extension of storepb.Store_SeriesServer with a Flush method. +type flushableServer interface { + storepb.Store_SeriesServer + Flush() error +} + +func newFlushableServer( + upstream storepb.Store_SeriesServer, + labelNames stringset.Set, + replicaLabels []string, +) flushableServer { + if labelNames.HasAny(replicaLabels) { + return &resortingServer{Store_SeriesServer: upstream} + } + return &passthroughServer{Store_SeriesServer: upstream} +} + +// passthroughServer is a flushableServer that forwards all data to +// an upstream server without additional processing. +type passthroughServer struct { + storepb.Store_SeriesServer +} + +func (p *passthroughServer) Flush() error { return nil } + +// resortingServer is a flushableServer that resorts all series by their labels. +// This is required if replica labels are stored internally in a TSDB. +// Data is resorted and sent to an upstream server upon calling Flush. +type resortingServer struct { + storepb.Store_SeriesServer + series []*storepb.Series +} + +func (r *resortingServer) Send(response *storepb.SeriesResponse) error { + if response.GetSeries() == nil { + return r.Store_SeriesServer.Send(response) + } + + series := response.GetSeries() + labelpb.ReAllocZLabelsStrings(&series.Labels, false) + r.series = append(r.series, series) + return nil +} + +func (r *resortingServer) Flush() error { + slices.SortFunc(r.series, func(a, b *storepb.Series) bool { + return labels.Compare( + labelpb.ZLabelsToPromLabels(a.Labels), + labelpb.ZLabelsToPromLabels(b.Labels), + ) < 0 + }) + for _, response := range r.series { + if err := r.Store_SeriesServer.Send(storepb.NewSeriesResponse(response)); err != nil { + return err + } + } + return nil +} diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index 388fd0033e..18e90ccc21 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -42,6 +42,7 @@ import ( "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" + "github.com/thanos-io/thanos/pkg/stringset" "github.com/thanos-io/thanos/pkg/tracing" ) @@ -53,8 +54,10 @@ type PrometheusStore struct { buffers sync.Pool component component.StoreAPI externalLabelsFn func() labels.Labels - promVersion func() string - timestamps func() (mint int64, maxt int64) + labelNamesSet func() stringset.Set + + promVersion func() string + timestamps func() (mint int64, maxt int64) remoteReadAcceptableResponses []prompb.ReadRequest_ResponseType @@ -78,6 +81,7 @@ func NewPrometheusStore( component component.StoreAPI, externalLabelsFn func() labels.Labels, timestamps func() (mint int64, maxt int64), + labelNamesSet func() stringset.Set, promVersion func() string, ) (*PrometheusStore, error) { if logger == nil { @@ -91,6 +95,7 @@ func NewPrometheusStore( externalLabelsFn: externalLabelsFn, promVersion: promVersion, timestamps: timestamps, + labelNamesSet: labelNamesSet, remoteReadAcceptableResponses: []prompb.ReadRequest_ResponseType{prompb.ReadRequest_STREAMED_XOR_CHUNKS, prompb.ReadRequest_SAMPLES}, buffers: sync.Pool{New: func() interface{} { b := make([]byte, 0, initialBufSize) @@ -143,7 +148,8 @@ func (p *PrometheusStore) putBuffer(b *[]byte) { } // Series returns all series for a requested time range and label matcher. -func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_SeriesServer) error { +func (p *PrometheusStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) error { + s := newFlushableServer(seriesSrv, p.labelNamesSet(), r.WithoutReplicaLabels) extLset := p.externalLabelsFn() match, matchers, err := matchesExternalLabels(r.Matchers, extLset) @@ -200,7 +206,7 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie return err } } - return nil + return s.Flush() } shardMatcher := r.ShardInfo.Matcher(&p.buffers) @@ -323,7 +329,7 @@ func (p *PrometheusStore) queryPrometheus( } func (p *PrometheusStore) handleSampledPrometheusResponse( - s storepb.Store_SeriesServer, + s flushableServer, httpResp *http.Response, querySpan tracing.Span, extLset labels.Labels, @@ -373,11 +379,11 @@ func (p *PrometheusStore) handleSampledPrometheusResponse( } } level.Debug(p.logger).Log("msg", "handled ReadRequest_SAMPLED request.", "series", len(resp.Results[0].Timeseries)) - return nil + return s.Flush() } func (p *PrometheusStore) handleStreamedPrometheusResponse( - s storepb.Store_SeriesServer, + s flushableServer, shardMatcher *storepb.ShardMatcher, httpResp *http.Response, querySpan tracing.Span, @@ -455,9 +461,7 @@ func (p *PrometheusStore) handleStreamedPrometheusResponse( } r := storepb.NewSeriesResponse(&storepb.Series{ - Labels: labelpb.ZLabelsFromPromLabels( - completeLabelset, - ), + Labels: labelpb.ZLabelsFromPromLabels(completeLabelset), Chunks: thanosChks, }) if err := s.Send(r); err != nil { @@ -472,7 +476,7 @@ func (p *PrometheusStore) handleStreamedPrometheusResponse( querySpan.SetTag("processed.bytes", bodySizer.BytesCount()) level.Debug(p.logger).Log("msg", "handled ReadRequest_STREAMED_XOR_CHUNKS request.", "frames", framesNum) - return nil + return s.Flush() } type BytesCounter struct { diff --git a/pkg/store/prometheus_test.go b/pkg/store/prometheus_test.go index cb3c500579..e0d6052b65 100644 --- a/pkg/store/prometheus_test.go +++ b/pkg/store/prometheus_test.go @@ -20,11 +20,13 @@ import ( "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/efficientgo/core/testutil" + "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/promclient" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" + "github.com/thanos-io/thanos/pkg/stringset" "github.com/thanos-io/thanos/pkg/testutil/custom" "github.com/thanos-io/thanos/pkg/testutil/e2eutil" ) @@ -67,7 +69,10 @@ func testPrometheusStoreSeriesE2e(t *testing.T, prefix string) { limitMinT := int64(0) proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, - func() (int64, int64) { return limitMinT, -1 }, nil) // MaxTime does not matter. + func() (int64, int64) { return limitMinT, -1 }, + func() stringset.Set { return stringset.AllStrings() }, + nil, + ) // MaxTime does not matter. testutil.Ok(t, err) // Query all three samples except for the first one. Since we round up queried data @@ -194,7 +199,10 @@ func TestPrometheusStore_SeriesLabels_e2e(t *testing.T) { promStore, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, - func() (int64, int64) { return math.MinInt64/1000 + 62135596801, math.MaxInt64/1000 - 62135596801 }, nil) + func() (int64, int64) { return math.MinInt64/1000 + 62135596801, math.MaxInt64/1000 - 62135596801 }, + func() stringset.Set { return stringset.AllStrings() }, + nil, + ) testutil.Ok(t, err) for _, tcase := range []struct { @@ -362,9 +370,11 @@ func TestPrometheusStore_LabelAPIs(t *testing.T) { version, err := promclient.NewDefaultClient().BuildVersion(context.Background(), u) testutil.Ok(t, err) - promStore, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { - return extLset - }, nil, func() string { return version }) + promStore, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, + func() labels.Labels { return extLset }, + nil, + func() stringset.Set { return stringset.AllStrings() }, + func() string { return version }) testutil.Ok(t, err) return promStore @@ -399,7 +409,9 @@ func TestPrometheusStore_Series_MatchExternalLabel(t *testing.T) { proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, - func() (int64, int64) { return 0, math.MaxInt64 }, nil) + func() (int64, int64) { return 0, math.MaxInt64 }, + func() stringset.Set { return stringset.AllStrings() }, + nil) testutil.Ok(t, err) srv := newStoreSeriesServer(ctx) @@ -461,7 +473,9 @@ func TestPrometheusStore_Series_ChunkHashCalculation_Integration(t *testing.T) { proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, - func() (int64, int64) { return 0, math.MaxInt64 }, nil) + func() (int64, int64) { return 0, math.MaxInt64 }, + func() stringset.Set { return stringset.AllStrings() }, + nil) testutil.Ok(t, err) srv := newStoreSeriesServer(ctx) @@ -490,7 +504,9 @@ func TestPrometheusStore_Info(t *testing.T) { proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), nil, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, - func() (int64, int64) { return 123, 456 }, nil) + func() (int64, int64) { return 123, 456 }, + func() stringset.Set { return stringset.AllStrings() }, + nil) testutil.Ok(t, err) resp, err := proxy.Info(ctx, &storepb.InfoRequest{}) @@ -568,7 +584,9 @@ func TestPrometheusStore_Series_SplitSamplesIntoChunksWithMaxSizeOf120(t *testin proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, - func() (int64, int64) { return 0, math.MaxInt64 }, nil) + func() (int64, int64) { return 0, math.MaxInt64 }, + func() stringset.Set { return stringset.AllStrings() }, + nil) testutil.Ok(t, err) // We build chunks only for SAMPLES method. Make sure we ask for SAMPLES only. diff --git a/pkg/store/storepb/testutil/series.go b/pkg/store/storepb/testutil/series.go index cb8abb607c..58e20325c1 100644 --- a/pkg/store/storepb/testutil/series.go +++ b/pkg/store/storepb/testutil/series.go @@ -17,7 +17,9 @@ import ( "github.com/cespare/xxhash" "github.com/efficientgo/core/testutil" + "github.com/go-kit/log" "github.com/gogo/protobuf/types" + "github.com/oklog/ulid" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" @@ -58,6 +60,19 @@ type HeadGenOptions struct { Random *rand.Rand } +func CreateBlockFromHead(t testing.TB, dir string, head *tsdb.Head) ulid.ULID { + compactor, err := tsdb.NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil, nil) + testutil.Ok(t, err) + + testutil.Ok(t, os.MkdirAll(dir, 0777)) + + // Add +1 millisecond to block maxt because block intervals are half-open: [b.MinTime, b.MaxTime). + // Because of this block intervals are always +1 than the total samples it includes. + ulid, err := compactor.Write(dir, head, head.MinTime(), head.MaxTime()+1, nil) + testutil.Ok(t, err) + return ulid +} + // CreateHeadWithSeries returns head filled with given samples and same series returned in separate list for assertion purposes. // Returned series list has "ext1"="1" prepended. Each series looks as follows: // {foo=bar,i=000001aaaaaaaaaabbbbbbbbbbccccccccccdddddddddd} where number indicate sample number from 0. diff --git a/pkg/store/tsdb.go b/pkg/store/tsdb.go index 5bbb469e98..73604b9236 100644 --- a/pkg/store/tsdb.go +++ b/pkg/store/tsdb.go @@ -13,6 +13,7 @@ import ( "sync" "github.com/go-kit/log" + "github.com/go-kit/log/level" "github.com/pkg/errors" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" @@ -25,6 +26,7 @@ import ( "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" + "github.com/thanos-io/thanos/pkg/stringset" ) const RemoteReadFrameLimit = 1048576 @@ -44,6 +46,9 @@ type TSDBStore struct { buffers sync.Pool maxBytesPerFrame int + lmx sync.RWMutex + labelNamesSet stringset.Set + extLset labels.Labels mtx sync.RWMutex } @@ -72,6 +77,7 @@ func NewTSDBStore(logger log.Logger, db TSDBReader, component component.StoreAPI component: component, extLset: extLset, maxBytesPerFrame: RemoteReadFrameLimit, + labelNamesSet: stringset.AllStrings(), buffers: sync.Pool{New: func() interface{} { b := make([]byte, 0, initialBufSize) return &b @@ -168,7 +174,9 @@ type CloseDelegator interface { // Series returns all series for a requested time range and label matcher. The returned data may // exceed the requested time bounds. -func (s *TSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { +func (s *TSDBStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) error { + srv := newFlushableServer(seriesSrv, s.LabelNamesSet(), r.WithoutReplicaLabels) + match, matchers, err := matchesExternalLabels(r.Matchers, s.getExtLset()) if err != nil { return status.Error(codes.InvalidArgument, err.Error()) @@ -204,8 +212,8 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSer for _, lbl := range r.WithoutReplicaLabels { extLsetToRemove[lbl] = struct{}{} } - finalExtLset := rmLabels(s.extLset.Copy(), extLsetToRemove) + // Stream at most one series per frame; series may be split over multiple frames according to maxBytesInFrame. for set.Next() { series := set.At() @@ -279,7 +287,7 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSer return status.Error(codes.Aborted, err.Error()) } } - return nil + return srv.Flush() } // LabelNames returns all known label names constrained with the given matchers. @@ -368,3 +376,38 @@ func (s *TSDBStore) LabelValues(ctx context.Context, r *storepb.LabelValuesReque return &storepb.LabelValuesResponse{Values: values}, nil } + +func (s *TSDBStore) UpdateLabelNames(ctx context.Context) { + newSet := stringset.New() + q, err := s.db.ChunkQuerier(ctx, math.MinInt64, math.MaxInt64) + if err != nil { + level.Warn(s.logger).Log("msg", "error creating tsdb querier", "err", err.Error()) + s.setLabelNamesSet(stringset.AllStrings()) + return + } + defer runutil.CloseWithLogOnErr(s.logger, q, "close tsdb querier label names") + + res, _, err := q.LabelNames() + if err != nil { + level.Warn(s.logger).Log("msg", "error getting label names", "err", err.Error()) + s.setLabelNamesSet(stringset.AllStrings()) + return + } + for _, l := range res { + newSet.Insert(l) + } + s.setLabelNamesSet(newSet) +} + +func (s *TSDBStore) setLabelNamesSet(newSet stringset.Set) { + s.lmx.Lock() + s.labelNamesSet = newSet + s.lmx.Unlock() +} + +func (b *TSDBStore) LabelNamesSet() stringset.Set { + b.lmx.RLock() + defer b.lmx.RUnlock() + + return b.labelNamesSet +} diff --git a/pkg/stringset/set.go b/pkg/stringset/set.go new file mode 100644 index 0000000000..defe699353 --- /dev/null +++ b/pkg/stringset/set.go @@ -0,0 +1,85 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package stringset + +import ( + cuckoo "github.com/seiflotfy/cuckoofilter" +) + +type Set interface { + Has(string) bool + HasAny([]string) bool +} + +type fixedSet struct { + cuckoo *cuckoo.Filter +} + +func (f fixedSet) HasAny(strings []string) bool { + for _, s := range strings { + if f.Has(s) { + return true + } + } + return false +} + +func NewFromStrings(items ...string) Set { + f := cuckoo.NewFilter(uint(len(items))) + for _, label := range items { + f.InsertUnique([]byte(label)) + } + + return &fixedSet{cuckoo: f} +} + +func (f fixedSet) Has(s string) bool { + return f.cuckoo.Lookup([]byte(s)) +} + +type mutableSet struct { + cuckoo *cuckoo.ScalableCuckooFilter +} + +type MutableSet interface { + Set + Insert(string) +} + +func New() MutableSet { + return &mutableSet{ + cuckoo: cuckoo.NewScalableCuckooFilter(), + } +} + +func (e mutableSet) Insert(s string) { + e.cuckoo.Insert([]byte(s)) +} + +func (e mutableSet) Has(s string) bool { + return e.cuckoo.Lookup([]byte(s)) +} + +func (e mutableSet) HasAny(strings []string) bool { + for _, s := range strings { + if e.Has(s) { + return true + } + } + return false +} + +type allStringsSet struct{} + +func (e allStringsSet) HasAny(_ []string) bool { + return true +} + +func AllStrings() *allStringsSet { + return &allStringsSet{} +} + +func (e allStringsSet) Has(_ string) bool { + return true +} diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index 26cc55a8d5..042e053f65 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -45,6 +45,7 @@ import ( "github.com/thanos-io/objstore/providers/s3" "github.com/efficientgo/core/testutil" + "github.com/thanos-io/thanos/pkg/api/query/querypb" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/exemplars/exemplarspb" @@ -1016,7 +1017,6 @@ func TestQueryStoreDedup(t *testing.T) { blockFinderLabel string series []seriesWithLabels expectedSeries int - expectedDedupBug bool }{ { desc: "Deduplication works with external label", @@ -1036,7 +1036,7 @@ func TestQueryStoreDedup(t *testing.T) { }, { desc: "Deduplication works on external label with resorting required", - intReplicaLabel: "a", + extReplicaLabel: "a", series: []seriesWithLabels{ { intLabels: labels.FromStrings("__name__", "simple_series"), @@ -1071,10 +1071,8 @@ func TestQueryStoreDedup(t *testing.T) { }, blockFinderLabel: "dedupint", expectedSeries: 1, - // This test is expected to fail until the bug outlined in https://github.com/thanos-io/thanos/issues/6257 - // is fixed. This means that it will return double the expected series until then. - expectedDedupBug: true, }, + // This is a regression test for the bug outlined in https://github.com/thanos-io/thanos/issues/6257. { desc: "Deduplication works on internal label with resorting required", intReplicaLabel: "a", @@ -1094,10 +1092,8 @@ func TestQueryStoreDedup(t *testing.T) { }, blockFinderLabel: "dedupintresort", expectedSeries: 2, - // This test is expected to fail until the bug outlined in https://github.com/thanos-io/thanos/issues/6257 - // is fixed. This means that it will return double the expected series until then. - expectedDedupBug: true, }, + // This is a regression test for the bug outlined in https://github.com/thanos-io/thanos/issues/6257. { desc: "Deduplication works with extra internal label", intReplicaLabel: "replica", @@ -1117,10 +1113,8 @@ func TestQueryStoreDedup(t *testing.T) { }, blockFinderLabel: "dedupintextra", expectedSeries: 2, - // This test is expected to fail until the bug outlined in https://github.com/thanos-io/thanos/issues/6257 - // is fixed. This means that it will return double the expected series until then. - expectedDedupBug: true, }, + // This is a regression test for the bug outlined in https://github.com/thanos-io/thanos/issues/6257. { desc: "Deduplication works with both internal and external label", intReplicaLabel: "replica", @@ -1137,9 +1131,6 @@ func TestQueryStoreDedup(t *testing.T) { }, blockFinderLabel: "dedupintext", expectedSeries: 1, - // This test is expected to fail until the bug outlined in https://github.com/thanos-io/thanos/issues/6257 - // is fixed. This means that it will return double the expected series until then. - expectedDedupBug: true, }, } @@ -1169,9 +1160,6 @@ func TestQueryStoreDedup(t *testing.T) { testutil.Ok(t, e2e.StartAndWaitReady(querier)) expectedSeries := tt.expectedSeries - if tt.expectedDedupBug { - expectedSeries *= 2 - } instantQuery(t, ctx, querier.Endpoint("http"), func() string { return fmt.Sprintf("max_over_time(simple_series{block_finder='%s'}[2h])", tt.blockFinderLabel) }, time.Now, promclient.QueryOptions{ @@ -1302,13 +1290,13 @@ func TestSidecarQueryDedup(t *testing.T) { t.Run("deduplication on internal label with reorder", func(t *testing.T) { // Uses "a" as replica label, which is an internal label from the samples used. - // Should return 4 samples as long as the bug described by https://github.com/thanos-io/thanos/issues/6257#issuecomment-1544023978 - // is not fixed. When it is fixed, it should return 2 samples. + // This is a regression test for the bug outlined in https://github.com/thanos-io/thanos/issues/6257. + // Until the bug was fixed, this testcase would return 4 samples instead of 2. instantQuery(t, ctx, query4.Endpoint("http"), func() string { return "my_fake_metric" }, time.Now, promclient.QueryOptions{ Deduplicate: true, - }, 4) + }, 2) }) } diff --git a/test/e2e/receive_test.go b/test/e2e/receive_test.go index 117a4e4c1c..6d455a9cea 100644 --- a/test/e2e/receive_test.go +++ b/test/e2e/receive_test.go @@ -20,6 +20,7 @@ import ( "github.com/prometheus/prometheus/model/relabel" "github.com/efficientgo/core/testutil" + "github.com/thanos-io/thanos/pkg/promclient" "github.com/thanos-io/thanos/pkg/receive" "github.com/thanos-io/thanos/test/e2e/e2ethanos" @@ -233,13 +234,12 @@ test_metric{a="2", b="2"} 1`) }, }) - // This should've returned only 2 series, but is returning 4 until the problem reported in - // https://github.com/thanos-io/thanos/issues/6257 is fixed + // This is a regression test for the bug outlined in https://github.com/thanos-io/thanos/issues/6257. instantQuery(t, ctx, qStatic.Endpoint("http"), func() string { return "test_metric" }, time.Now, promclient.QueryOptions{ Deduplicate: true, - }, 4) + }, 2) }) t.Run("router_replication", func(t *testing.T) { @@ -755,7 +755,6 @@ test_metric{a="2", b="2"} 1`) }) t.Run("multitenant_active_series_limiting", func(t *testing.T) { - /* The multitenant_active_series_limiting suite configures a hashring with two avalanche writers and dedicated meta-monitoring.