diff --git a/pkg/store/acceptance_test.go b/pkg/store/acceptance_test.go index f6a5ef55ec3..2d5a7aebfdf 100644 --- a/pkg/store/acceptance_test.go +++ b/pkg/store/acceptance_test.go @@ -238,9 +238,9 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_EQ, Name: "n", Value: "1"}, }, expectedLabels: []labels.Labels{ + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), labels.FromStrings("n", "1", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), }, }, { @@ -251,7 +251,7 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_EQ, Name: "i", Value: "a"}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), }, }, { @@ -270,9 +270,9 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_EQ, Name: "missing", Value: ""}, }, expectedLabels: []labels.Labels{ + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), labels.FromStrings("n", "1", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), labels.FromStrings("n", "2", "region", "eu-west"), labels.FromStrings("n", "2.5", "region", "eu-west"), }, @@ -295,8 +295,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "i", Value: ".+"}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), }, }, { @@ -306,9 +306,9 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "i", Value: ".*"}, }, expectedLabels: []labels.Labels{ + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), labels.FromStrings("n", "1", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), labels.FromStrings("n", "2", "region", "eu-west"), labels.FromStrings("n", "2.5", "region", "eu-west"), }, @@ -332,8 +332,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_NEQ, Name: "i", Value: ""}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), }, }, { @@ -352,8 +352,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_NEQ, Name: "i", Value: "a"}, }, expectedLabels: []labels.Labels{ + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), labels.FromStrings("n", "1", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), }, }, { @@ -363,9 +363,9 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "n", Value: "^1$"}, }, expectedLabels: []labels.Labels{ + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), labels.FromStrings("n", "1", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), }, }, { @@ -376,7 +376,7 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "i", Value: "^a$"}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), }, }, { @@ -387,8 +387,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "i", Value: "^a?$"}, }, expectedLabels: []labels.Labels{ + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), labels.FromStrings("n", "1", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), }, }, { @@ -422,9 +422,9 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "i", Value: "^.*$"}, }, expectedLabels: []labels.Labels{ + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), labels.FromStrings("n", "1", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), }, }, { @@ -435,8 +435,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "i", Value: "^.+$"}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), }, }, { @@ -489,8 +489,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_NRE, Name: "i", Value: "^a$"}, }, expectedLabels: []labels.Labels{ + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), labels.FromStrings("n", "1", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), }, }, { @@ -501,7 +501,7 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_NRE, Name: "i", Value: "^a?$"}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), }, }, { @@ -512,8 +512,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_NRE, Name: "i", Value: "^$"}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), }, }, { @@ -545,7 +545,7 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_EQ, Name: "i", Value: "a"}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), }, }, { @@ -557,7 +557,7 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "i", Value: "^(b|a).*$"}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), }, }, { @@ -567,9 +567,9 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "n", Value: "(1|2)"}, }, expectedLabels: []labels.Labels{ + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), labels.FromStrings("n", "1", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), labels.FromStrings("n", "2", "region", "eu-west"), }, }, @@ -580,8 +580,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "i", Value: "a|b"}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), }, }, { @@ -591,8 +591,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "i", Value: "(a|b)"}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), }, }, { @@ -706,12 +706,15 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset } testutil.Ok(t, err) + testutil.Equals(t, true, slices.IsSortedFunc(srv.SeriesSet, func(x, y storepb.Series) bool { + return labels.Compare(x.PromLabels(), y.PromLabels()) < 0 + })) + receivedLabels := make([]labels.Labels, 0) for _, s := range srv.SeriesSet { receivedLabels = append(receivedLabels, s.PromLabels()) } - slices.SortFunc(c.expectedLabels, func(a, b labels.Labels) bool { return labels.Compare(a, b) < 0 }) - slices.SortFunc(receivedLabels, func(a, b labels.Labels) bool { return labels.Compare(a, b) < 0 }) + testutil.Equals(t, c.expectedLabels, receivedLabels) }) } diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 5b4ce2bb1dc..e6ec63d0194 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1334,7 +1334,8 @@ func debugFoundBlockSetOverview(logger log.Logger, mint, maxt, maxResolutionMill // Series implements the storepb.StoreServer interface. func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) (err error) { - srv := newFlushableServer(seriesSrv) + srv := newFlushableServer(seriesSrv, sortingStrategyNone) + if s.queryGate != nil { tracing.DoInSpan(srv.Context(), "store_query_gate_ismyturn", func(ctx context.Context) { err = s.queryGate.Start(srv.Context()) @@ -1464,44 +1465,19 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store return errors.Wrapf(err, "fetch postings for block %s", blk.meta.ULID) } - // If we have inner replica labels we need to resort. - s.mtx.Lock() - needsEagerRetrival := len(req.WithoutReplicaLabels) > 0 && s.labelNamesSet.HasAny(req.WithoutReplicaLabels) - s.mtx.Unlock() - - var resp respSet - if needsEagerRetrival { - labelsToRemove := make(map[string]struct{}) - for _, replicaLabel := range req.WithoutReplicaLabels { - labelsToRemove[replicaLabel] = struct{}{} - } - resp = newEagerRespSet( - srv.Context(), - span, - 10*time.Minute, - blk.meta.ULID.String(), - []labels.Labels{blk.extLset}, - onClose, - blockClient, - shardMatcher, - false, - s.metrics.emptyPostingCount, - labelsToRemove, - ) - } else { - resp = newLazyRespSet( - srv.Context(), - span, - 10*time.Minute, - blk.meta.ULID.String(), - []labels.Labels{blk.extLset}, - onClose, - blockClient, - shardMatcher, - false, - s.metrics.emptyPostingCount, - ) - } + resp := newEagerRespSet( + srv.Context(), + span, + 10*time.Minute, + blk.meta.ULID.String(), + []labels.Labels{blk.extLset}, + onClose, + blockClient, + shardMatcher, + false, + s.metrics.emptyPostingCount, + nil, + ) mtx.Lock() respSets = append(respSets, resp) diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 82d42dd9d71..50ead2c36f3 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -3484,5 +3484,8 @@ func TestBucketStoreDedupOnBlockSeriesSet(t *testing.T) { }, }, srv)) + testutil.Equals(t, true, slices.IsSortedFunc(srv.SeriesSet, func(x, y storepb.Series) bool { + return labels.Compare(x.PromLabels(), y.PromLabels()) < 0 + })) testutil.Equals(t, 2, len(srv.SeriesSet)) } diff --git a/pkg/store/flushable.go b/pkg/store/flushable.go index 60722d4b0a3..e6cadfbea9d 100644 --- a/pkg/store/flushable.go +++ b/pkg/store/flushable.go @@ -11,18 +11,43 @@ import ( "github.com/thanos-io/thanos/pkg/store/storepb" ) +type sortingStrategy uint64 + +const ( + sortingStrategyStore sortingStrategy = iota + 1 + sortingStrategyNone +) + // flushableServer is an extension of storepb.Store_SeriesServer with a Flush method. type flushableServer interface { storepb.Store_SeriesServer + Flush() error } func newFlushableServer( upstream storepb.Store_SeriesServer, + sortingsortingStrategy sortingStrategy, ) flushableServer { - return &resortingServer{Store_SeriesServer: upstream} + switch sortingsortingStrategy { + case sortingStrategyStore: + return &resortingServer{Store_SeriesServer: upstream} + case sortingStrategyNone: + return &passthroughServer{Store_SeriesServer: upstream} + default: + // should not happen. + panic("unexpected sorting strategy") + } } +// passthroughServer is a flushableServer that forwards all data to +// an upstream server without additional processing. +type passthroughServer struct { + storepb.Store_SeriesServer +} + +func (p *passthroughServer) Flush() error { return nil } + // resortingServer is a flushableServer that resorts all series by their labels. // This is required if replica labels are stored internally in a TSDB. // Data is resorted and sent to an upstream server upon calling Flush. diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index 4c69df5d4fd..308a0741cea 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -149,7 +149,8 @@ func (p *PrometheusStore) putBuffer(b *[]byte) { // Series returns all series for a requested time range and label matcher. func (p *PrometheusStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) error { - s := newFlushableServer(seriesSrv) + s := newFlushableServer(seriesSrv, sortingStrategyStore) + extLset := p.externalLabelsFn() match, matchers, err := matchesExternalLabels(r.Matchers, extLset) diff --git a/pkg/store/proxy_heap.go b/pkg/store/proxy_heap.go index 757a9fa0d33..51631b388a3 100644 --- a/pkg/store/proxy_heap.go +++ b/pkg/store/proxy_heap.go @@ -756,9 +756,9 @@ func newEagerRespSet( // This should be used only for stores that does not support doing this on server side. // See docs/proposals-accepted/20221129-avoid-global-sort.md for details. - if len(l.removeLabels) > 0 { - sortWithoutLabels(l.bufferedResponses, l.removeLabels) - } + // NOTE. Client is not guaranteed to give a sorted response when extLset is added + // Generally we need to resort here. + sortWithoutLabels(l.bufferedResponses, l.removeLabels) }(ret) @@ -785,7 +785,9 @@ func sortWithoutLabels(set []*storepb.SeriesResponse, labelsToRemove map[string] continue } - ser.Labels = labelpb.ZLabelsFromPromLabels(rmLabels(labelpb.ZLabelsToPromLabels(ser.Labels), labelsToRemove)) + if len(labelsToRemove) > 0 { + ser.Labels = labelpb.ZLabelsFromPromLabels(rmLabels(labelpb.ZLabelsToPromLabels(ser.Labels), labelsToRemove)) + } } // With the re-ordered label sets, re-sorting all series aligns the same series diff --git a/pkg/store/tsdb.go b/pkg/store/tsdb.go index f1bd44040d5..37badaf1dc8 100644 --- a/pkg/store/tsdb.go +++ b/pkg/store/tsdb.go @@ -175,7 +175,7 @@ type CloseDelegator interface { // Series returns all series for a requested time range and label matcher. The returned data may // exceed the requested time bounds. func (s *TSDBStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) error { - srv := newFlushableServer(seriesSrv) + srv := newFlushableServer(seriesSrv, sortingStrategyStore) match, matchers, err := matchesExternalLabels(r.Matchers, s.getExtLset()) if err != nil {