diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 9ddfbd89b76..a7e2a7871d9 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -88,6 +88,7 @@ type storeConfig struct { reqLogConfig *extflag.PathOrContent lazyIndexReaderEnabled bool lazyIndexReaderIdleTimeout time.Duration + dontResortResponses bool } func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) { @@ -182,6 +183,9 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("store.index-header-lazy-reader-idle-timeout", "If index-header lazy reader is enabled and this idle timeout setting is > 0, memory map-ed index-headers will be automatically released after 'idle timeout' inactivity."). Hidden().Default("5m").DurationVar(&sc.lazyIndexReaderIdleTimeout) + cmd.Flag("store.dont-resort-responses", "Do not resort responses in the store. This should only be done if you are sure that the store response is sorted even after adding external labels or dropping of replica labels. It is generally unsafe and can lead to deduplication not working correctly but can improve query performance."). + Hidden().Default("false").BoolVar(&sc.dontResortResponses) + cmd.Flag("web.disable", "Disable Block Viewer UI.").Default("false").BoolVar(&sc.disableWeb) cmd.Flag("web.external-prefix", "Static prefix for all HTML links and redirect URLs in the bucket web UI interface. Actual endpoints are still served on / or the web.route-prefix. This allows thanos bucket web UI to be served behind a reverse proxy that strips a URL sub-path."). @@ -387,6 +391,9 @@ func runStore( if conf.debugLogging { options = append(options, store.WithDebugLogging()) } + if conf.dontResortResponses { + options = append(options, store.WithDontResortResponse()) + } bs, err := store.NewBucketStore( insBkt, diff --git a/pkg/store/acceptance_test.go b/pkg/store/acceptance_test.go index ecc23f5aa34..d0a1bcfaf43 100644 --- a/pkg/store/acceptance_test.go +++ b/pkg/store/acceptance_test.go @@ -238,9 +238,9 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_EQ, Name: "n", Value: "1"}, }, expectedLabels: []labels.Labels{ + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), labels.FromStrings("n", "1", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), }, }, { @@ -251,7 +251,7 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_EQ, Name: "i", Value: "a"}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), }, }, { @@ -270,9 +270,9 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_EQ, Name: "missing", Value: ""}, }, expectedLabels: []labels.Labels{ + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), labels.FromStrings("n", "1", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), labels.FromStrings("n", "2", "region", "eu-west"), labels.FromStrings("n", "2.5", "region", "eu-west"), }, @@ -295,8 +295,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "i", Value: ".+"}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), }, }, { @@ -306,9 +306,9 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "i", Value: ".*"}, }, expectedLabels: []labels.Labels{ + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), labels.FromStrings("n", "1", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), labels.FromStrings("n", "2", "region", "eu-west"), labels.FromStrings("n", "2.5", "region", "eu-west"), }, @@ -332,8 +332,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_NEQ, Name: "i", Value: ""}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), }, }, { @@ -352,8 +352,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_NEQ, Name: "i", Value: "a"}, }, expectedLabels: []labels.Labels{ + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), labels.FromStrings("n", "1", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), }, }, { @@ -363,9 +363,9 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "n", Value: "^1$"}, }, expectedLabels: []labels.Labels{ + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), labels.FromStrings("n", "1", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), }, }, { @@ -376,7 +376,7 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "i", Value: "^a$"}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), }, }, { @@ -387,8 +387,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "i", Value: "^a?$"}, }, expectedLabels: []labels.Labels{ + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), labels.FromStrings("n", "1", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), }, }, { @@ -422,9 +422,9 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "i", Value: "^.*$"}, }, expectedLabels: []labels.Labels{ + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), labels.FromStrings("n", "1", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), }, }, { @@ -435,8 +435,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "i", Value: "^.+$"}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), }, }, { @@ -489,8 +489,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_NRE, Name: "i", Value: "^a$"}, }, expectedLabels: []labels.Labels{ + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), labels.FromStrings("n", "1", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), }, }, { @@ -501,7 +501,7 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_NRE, Name: "i", Value: "^a?$"}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), }, }, { @@ -512,8 +512,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_NRE, Name: "i", Value: "^$"}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), }, }, { @@ -545,7 +545,7 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_EQ, Name: "i", Value: "a"}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), }, }, { @@ -557,7 +557,7 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "i", Value: "^(b|a).*$"}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), }, }, { @@ -567,9 +567,9 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "n", Value: "(1|2)"}, }, expectedLabels: []labels.Labels{ + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), labels.FromStrings("n", "1", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), labels.FromStrings("n", "2", "region", "eu-west"), }, }, @@ -580,8 +580,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "i", Value: "a|b"}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), }, }, { @@ -591,8 +591,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "i", Value: "(a|b)"}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), }, }, { @@ -706,12 +706,15 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset } testutil.Ok(t, err) + testutil.Equals(t, true, slices.IsSortedFunc(srv.SeriesSet, func(x, y storepb.Series) bool { + return labels.Compare(x.PromLabels(), y.PromLabels()) < 0 + })) + receivedLabels := make([]labels.Labels, 0) for _, s := range srv.SeriesSet { receivedLabels = append(receivedLabels, s.PromLabels()) } - slices.SortFunc(c.expectedLabels, func(a, b labels.Labels) bool { return labels.Compare(a, b) < 0 }) - slices.SortFunc(receivedLabels, func(a, b labels.Labels) bool { return labels.Compare(a, b) < 0 }) + testutil.Equals(t, c.expectedLabels, receivedLabels) }) } diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index f093f0cc61a..d0ffba53b4d 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -371,6 +371,8 @@ type BucketStore struct { blockEstimatedMaxSeriesFunc BlockEstimator blockEstimatedMaxChunkFunc BlockEstimator + + sortingStrategy sortingStrategy } func (s *BucketStore) validate() error { @@ -473,6 +475,12 @@ func WithBlockEstimatedMaxChunkFunc(f BlockEstimator) BucketStoreOption { } } +func WithDontResortResponse() BucketStoreOption { + return func(s *BucketStore) { + s.sortingStrategy = sortingStrategyNone + } +} + // NewBucketStore creates a new bucket backed store that implements the store API against // an object store bucket. It is optimized to work against high latency backends. func NewBucketStore( @@ -516,6 +524,7 @@ func NewBucketStore( enableChunkHashCalculation: enableChunkHashCalculation, seriesBatchSize: SeriesBatchSize, labelNamesSet: stringset.AllStrings(), + sortingStrategy: sortingStrategyStore, } for _, option := range options { @@ -1254,7 +1263,8 @@ func debugFoundBlockSetOverview(logger log.Logger, mint, maxt, maxResolutionMill // Series implements the storepb.StoreServer interface. func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) (err error) { - srv := newFlushableServer(seriesSrv) + srv := newFlushableServer(seriesSrv, s.sortingStrategy) + if s.queryGate != nil { tracing.DoInSpan(srv.Context(), "store_query_gate_ismyturn", func(ctx context.Context) { err = s.queryGate.Start(srv.Context()) @@ -1375,44 +1385,18 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store return errors.Wrapf(err, "fetch postings for block %s", blk.meta.ULID) } - // If we have inner replica labels we need to resort. - s.mtx.Lock() - needsEagerRetrival := len(req.WithoutReplicaLabels) > 0 && s.labelNamesSet.HasAny(req.WithoutReplicaLabels) - s.mtx.Unlock() - - var resp respSet - if needsEagerRetrival { - labelsToRemove := make(map[string]struct{}) - for _, replicaLabel := range req.WithoutReplicaLabels { - labelsToRemove[replicaLabel] = struct{}{} - } - resp = newEagerRespSet( - srv.Context(), - span, - 10*time.Minute, - blk.meta.ULID.String(), - []labels.Labels{blk.extLset}, - onClose, - blockClient, - shardMatcher, - false, - s.metrics.emptyPostingCount, - labelsToRemove, - ) - } else { - resp = newLazyRespSet( - srv.Context(), - span, - 10*time.Minute, - blk.meta.ULID.String(), - []labels.Labels{blk.extLset}, - onClose, - blockClient, - shardMatcher, - false, - s.metrics.emptyPostingCount, - ) - } + resp := newLazyRespSet( + srv.Context(), + span, + 10*time.Minute, + blk.meta.ULID.String(), + []labels.Labels{blk.extLset}, + onClose, + blockClient, + shardMatcher, + false, + s.metrics.emptyPostingCount, + ) mtx.Lock() respSets = append(respSets, resp) diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index e13afd65183..2a6230306ba 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -3432,5 +3432,7 @@ func TestBucketStoreDedupOnBlockSeriesSet(t *testing.T) { }, }, srv)) - testutil.Equals(t, 2, len(srv.SeriesSet)) + testutil.Equals(t, true, slices.IsSortedFunc(srv.SeriesSet, func(x, y storepb.Series) bool { + return labels.Compare(x.PromLabels(), y.PromLabels()) < 0 + })) } diff --git a/pkg/store/flushable.go b/pkg/store/flushable.go index 60722d4b0a3..e6cadfbea9d 100644 --- a/pkg/store/flushable.go +++ b/pkg/store/flushable.go @@ -11,18 +11,43 @@ import ( "github.com/thanos-io/thanos/pkg/store/storepb" ) +type sortingStrategy uint64 + +const ( + sortingStrategyStore sortingStrategy = iota + 1 + sortingStrategyNone +) + // flushableServer is an extension of storepb.Store_SeriesServer with a Flush method. type flushableServer interface { storepb.Store_SeriesServer + Flush() error } func newFlushableServer( upstream storepb.Store_SeriesServer, + sortingsortingStrategy sortingStrategy, ) flushableServer { - return &resortingServer{Store_SeriesServer: upstream} + switch sortingsortingStrategy { + case sortingStrategyStore: + return &resortingServer{Store_SeriesServer: upstream} + case sortingStrategyNone: + return &passthroughServer{Store_SeriesServer: upstream} + default: + // should not happen. + panic("unexpected sorting strategy") + } } +// passthroughServer is a flushableServer that forwards all data to +// an upstream server without additional processing. +type passthroughServer struct { + storepb.Store_SeriesServer +} + +func (p *passthroughServer) Flush() error { return nil } + // resortingServer is a flushableServer that resorts all series by their labels. // This is required if replica labels are stored internally in a TSDB. // Data is resorted and sent to an upstream server upon calling Flush. diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index 4c69df5d4fd..308a0741cea 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -149,7 +149,8 @@ func (p *PrometheusStore) putBuffer(b *[]byte) { // Series returns all series for a requested time range and label matcher. func (p *PrometheusStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) error { - s := newFlushableServer(seriesSrv) + s := newFlushableServer(seriesSrv, sortingStrategyStore) + extLset := p.externalLabelsFn() match, matchers, err := matchesExternalLabels(r.Matchers, extLset) diff --git a/pkg/store/tsdb.go b/pkg/store/tsdb.go index f1bd44040d5..37badaf1dc8 100644 --- a/pkg/store/tsdb.go +++ b/pkg/store/tsdb.go @@ -175,7 +175,7 @@ type CloseDelegator interface { // Series returns all series for a requested time range and label matcher. The returned data may // exceed the requested time bounds. func (s *TSDBStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) error { - srv := newFlushableServer(seriesSrv) + srv := newFlushableServer(seriesSrv, sortingStrategyStore) match, matchers, err := matchesExternalLabels(r.Matchers, s.getExtLset()) if err != nil {